diff --git a/agent/internal/features/api/api.go b/agent/internal/features/api/api.go index d217fb8..54da8f3 100644 --- a/agent/internal/features/api/api.go +++ b/agent/internal/features/api/api.go @@ -68,7 +68,7 @@ func NewClient(host, token string, log *slog.Logger) *Client { func (c *Client) CheckWalChainValidity(ctx context.Context) (*WalChainValidityResponse, error) { var resp WalChainValidityResponse - _, err := c.json.R(). + httpResp, err := c.json.R(). SetContext(ctx). SetResult(&resp). Get(c.buildURL(chainValidPath)) @@ -76,13 +76,17 @@ func (c *Client) CheckWalChainValidity(ctx context.Context) (*WalChainValidityRe return nil, err } + if err := c.checkResponse(httpResp, "check WAL chain validity"); err != nil { + return nil, err + } + return &resp, nil } func (c *Client) GetNextFullBackupTime(ctx context.Context) (*NextFullBackupTimeResponse, error) { var resp NextFullBackupTimeResponse - _, err := c.json.R(). + httpResp, err := c.json.R(). SetContext(ctx). SetResult(&resp). Get(c.buildURL(nextBackupTimePath)) @@ -90,16 +94,23 @@ func (c *Client) GetNextFullBackupTime(ctx context.Context) (*NextFullBackupTime return nil, err } + if err := c.checkResponse(httpResp, "get next full backup time"); err != nil { + return nil, err + } + return &resp, nil } func (c *Client) ReportBackupError(ctx context.Context, errMsg string) error { - _, err := c.json.R(). + httpResp, err := c.json.R(). SetContext(ctx). SetBody(reportErrorRequest{Error: errMsg}). Post(c.buildURL(reportErrorPath)) + if err != nil { + return err + } - return err + return c.checkResponse(httpResp, "report backup error") } func (c *Client) UploadBasebackup( @@ -223,7 +234,7 @@ func (c *Client) UploadWalSegment( func (c *Client) FetchServerVersion(ctx context.Context) (string, error) { var ver versionResponse - _, err := c.json.R(). + httpResp, err := c.json.R(). SetContext(ctx). SetResult(&ver). Get(c.buildURL(versionPath)) @@ -231,6 +242,10 @@ func (c *Client) FetchServerVersion(ctx context.Context) (string, error) { return "", err } + if err := c.checkResponse(httpResp, "fetch server version"); err != nil { + return "", err + } + return ver.Version, nil } @@ -263,3 +278,11 @@ func (c *Client) DownloadAgentBinary(ctx context.Context, arch, destPath string) func (c *Client) buildURL(path string) string { return c.host + path } + +func (c *Client) checkResponse(resp *resty.Response, method string) error { + if resp.StatusCode() >= 400 { + return fmt.Errorf("%s: server returned status %d: %s", method, resp.StatusCode(), resp.String()) + } + + return nil +} diff --git a/agent/internal/features/full_backup/backuper_test.go b/agent/internal/features/full_backup/backuper_test.go index eb9bee7..82cbc27 100644 --- a/agent/internal/features/full_backup/backuper_test.go +++ b/agent/internal/features/full_backup/backuper_test.go @@ -475,6 +475,39 @@ func Test_RunFullBackup_WhenNextBackupTimeNull_BasebackupTriggered(t *testing.T) assert.True(t, finalizeReceived) } +func Test_RunFullBackup_WhenChainValidityReturns401_NoBasebackupTriggered(t *testing.T) { + var uploadReceived atomic.Bool + + server := newTestServer(t, func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case testChainValidPath: + w.WriteHeader(http.StatusUnauthorized) + _, _ = w.Write([]byte(`{"error":"invalid token"}`)) + case testFullStartPath: + uploadReceived.Store(true) + + _, _ = io.ReadAll(r.Body) + writeJSON(w, map[string]string{"backupId": testBackupID}) + case testFullCompletePath: + w.WriteHeader(http.StatusOK) + default: + w.WriteHeader(http.StatusNotFound) + } + }) + + fb := newTestFullBackuper(server.URL) + fb.cmdBuilder = mockCmdBuilder(t, "data", validStderr()) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + go fb.Run(ctx) + time.Sleep(500 * time.Millisecond) + cancel() + + assert.False(t, uploadReceived.Load(), "should not trigger backup when API returns 401") +} + func Test_RunFullBackup_WhenUploadSucceeds_BodyIsZstdCompressed(t *testing.T) { var mu sync.Mutex var receivedBody []byte diff --git a/backend/internal/features/backups/backups/backuping/cleaner.go b/backend/internal/features/backups/backups/backuping/cleaner.go index 1588091..17b6202 100644 --- a/backend/internal/features/backups/backups/backuping/cleaner.go +++ b/backend/internal/features/backups/backups/backuping/cleaner.go @@ -113,6 +113,35 @@ func (c *BackupCleaner) cleanStaleUploadedBasebackups() error { } for _, backup := range staleBackups { + staleStorage, storageErr := c.storageService.GetStorageByID(backup.StorageID) + if storageErr != nil { + c.logger.Error( + "Failed to get storage for stale basebackup cleanup", + "backupId", backup.ID, + "storageId", backup.StorageID, + "error", storageErr, + ) + } else { + if err := staleStorage.DeleteFile(c.fieldEncryptor, backup.FileName); err != nil { + c.logger.Error( + "Failed to delete stale basebackup file", + "backupId", backup.ID, + "fileName", backup.FileName, + "error", err, + ) + } + + metadataFileName := backup.FileName + ".metadata" + if err := staleStorage.DeleteFile(c.fieldEncryptor, metadataFileName); err != nil { + c.logger.Error( + "Failed to delete stale basebackup metadata file", + "backupId", backup.ID, + "fileName", metadataFileName, + "error", err, + ) + } + } + failMsg := "basebackup finalization timed out after 10 minutes" backup.Status = backups_core.BackupStatusFailed backup.FailMessage = &failMsg @@ -127,7 +156,7 @@ func (c *BackupCleaner) cleanStaleUploadedBasebackups() error { } c.logger.Info( - "Marked stale uploaded basebackup as failed", + "Marked stale uploaded basebackup as failed and cleaned storage", "backupId", backup.ID, "databaseId", backup.DatabaseID, ) diff --git a/backend/internal/features/backups/backups/backuping/cleaner_test.go b/backend/internal/features/backups/backups/backuping/cleaner_test.go index a6d22e2..02b800e 100644 --- a/backend/internal/features/backups/backups/backuping/cleaner_test.go +++ b/backend/internal/features/backups/backups/backuping/cleaner_test.go @@ -1140,6 +1140,55 @@ func Test_CleanStaleUploadedBasebackups_SkipsActiveStreaming(t *testing.T) { assert.Nil(t, updated.UploadCompletedAt) } +func Test_CleanStaleUploadedBasebackups_CleansStorageFiles(t *testing.T) { + router := CreateTestRouter() + owner := users_testing.CreateTestUser(users_enums.UserRoleMember) + workspace := workspaces_testing.CreateTestWorkspace("Test Workspace", owner, router) + storage := storages.CreateTestStorage(workspace.ID) + notifier := notifiers.CreateTestNotifier(workspace.ID) + database := databases.CreateTestDatabase(workspace.ID, storage, notifier) + + defer func() { + backups, _ := backupRepository.FindByDatabaseID(database.ID) + for _, backup := range backups { + backupRepository.DeleteByID(backup.ID) + } + + databases.RemoveTestDatabase(database) + time.Sleep(50 * time.Millisecond) + notifiers.RemoveTestNotifier(notifier) + storages.RemoveTestStorage(storage.ID) + workspaces_testing.RemoveTestWorkspace(workspace, router) + }() + + staleTime := time.Now().UTC().Add(-15 * time.Minute) + walBackupType := backups_core.PgWalBackupTypeFullBackup + staleBackup := &backups_core.Backup{ + ID: uuid.New(), + DatabaseID: database.ID, + StorageID: storage.ID, + Status: backups_core.BackupStatusInProgress, + PgWalBackupType: &walBackupType, + UploadCompletedAt: &staleTime, + BackupSizeMb: 500, + FileName: "stale-basebackup-test-file", + CreatedAt: staleTime, + } + + err := backupRepository.Save(staleBackup) + assert.NoError(t, err) + + cleaner := GetBackupCleaner() + err = cleaner.cleanStaleUploadedBasebackups() + assert.NoError(t, err) + + updated, err := backupRepository.FindByID(staleBackup.ID) + assert.NoError(t, err) + assert.Equal(t, backups_core.BackupStatusFailed, updated.Status) + assert.NotNil(t, updated.FailMessage) + assert.Contains(t, *updated.FailMessage, "finalization timed out") +} + func createTestInterval() *intervals.Interval { timeOfDay := "04:00" interval := &intervals.Interval{