mirror of
https://github.com/databasus/databasus.git
synced 2026-04-06 00:32:03 +02:00
FIX (agent): Respect API responses status code when retying
This commit is contained in:
@@ -68,7 +68,7 @@ func NewClient(host, token string, log *slog.Logger) *Client {
|
||||
func (c *Client) CheckWalChainValidity(ctx context.Context) (*WalChainValidityResponse, error) {
|
||||
var resp WalChainValidityResponse
|
||||
|
||||
_, err := c.json.R().
|
||||
httpResp, err := c.json.R().
|
||||
SetContext(ctx).
|
||||
SetResult(&resp).
|
||||
Get(c.buildURL(chainValidPath))
|
||||
@@ -76,13 +76,17 @@ func (c *Client) CheckWalChainValidity(ctx context.Context) (*WalChainValidityRe
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := c.checkResponse(httpResp, "check WAL chain validity"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
func (c *Client) GetNextFullBackupTime(ctx context.Context) (*NextFullBackupTimeResponse, error) {
|
||||
var resp NextFullBackupTimeResponse
|
||||
|
||||
_, err := c.json.R().
|
||||
httpResp, err := c.json.R().
|
||||
SetContext(ctx).
|
||||
SetResult(&resp).
|
||||
Get(c.buildURL(nextBackupTimePath))
|
||||
@@ -90,16 +94,23 @@ func (c *Client) GetNextFullBackupTime(ctx context.Context) (*NextFullBackupTime
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := c.checkResponse(httpResp, "get next full backup time"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
func (c *Client) ReportBackupError(ctx context.Context, errMsg string) error {
|
||||
_, err := c.json.R().
|
||||
httpResp, err := c.json.R().
|
||||
SetContext(ctx).
|
||||
SetBody(reportErrorRequest{Error: errMsg}).
|
||||
Post(c.buildURL(reportErrorPath))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return err
|
||||
return c.checkResponse(httpResp, "report backup error")
|
||||
}
|
||||
|
||||
func (c *Client) UploadBasebackup(
|
||||
@@ -223,7 +234,7 @@ func (c *Client) UploadWalSegment(
|
||||
func (c *Client) FetchServerVersion(ctx context.Context) (string, error) {
|
||||
var ver versionResponse
|
||||
|
||||
_, err := c.json.R().
|
||||
httpResp, err := c.json.R().
|
||||
SetContext(ctx).
|
||||
SetResult(&ver).
|
||||
Get(c.buildURL(versionPath))
|
||||
@@ -231,6 +242,10 @@ func (c *Client) FetchServerVersion(ctx context.Context) (string, error) {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if err := c.checkResponse(httpResp, "fetch server version"); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return ver.Version, nil
|
||||
}
|
||||
|
||||
@@ -263,3 +278,11 @@ func (c *Client) DownloadAgentBinary(ctx context.Context, arch, destPath string)
|
||||
func (c *Client) buildURL(path string) string {
|
||||
return c.host + path
|
||||
}
|
||||
|
||||
func (c *Client) checkResponse(resp *resty.Response, method string) error {
|
||||
if resp.StatusCode() >= 400 {
|
||||
return fmt.Errorf("%s: server returned status %d: %s", method, resp.StatusCode(), resp.String())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -475,6 +475,39 @@ func Test_RunFullBackup_WhenNextBackupTimeNull_BasebackupTriggered(t *testing.T)
|
||||
assert.True(t, finalizeReceived)
|
||||
}
|
||||
|
||||
func Test_RunFullBackup_WhenChainValidityReturns401_NoBasebackupTriggered(t *testing.T) {
|
||||
var uploadReceived atomic.Bool
|
||||
|
||||
server := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
|
||||
switch r.URL.Path {
|
||||
case testChainValidPath:
|
||||
w.WriteHeader(http.StatusUnauthorized)
|
||||
_, _ = w.Write([]byte(`{"error":"invalid token"}`))
|
||||
case testFullStartPath:
|
||||
uploadReceived.Store(true)
|
||||
|
||||
_, _ = io.ReadAll(r.Body)
|
||||
writeJSON(w, map[string]string{"backupId": testBackupID})
|
||||
case testFullCompletePath:
|
||||
w.WriteHeader(http.StatusOK)
|
||||
default:
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
}
|
||||
})
|
||||
|
||||
fb := newTestFullBackuper(server.URL)
|
||||
fb.cmdBuilder = mockCmdBuilder(t, "data", validStderr())
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
go fb.Run(ctx)
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
cancel()
|
||||
|
||||
assert.False(t, uploadReceived.Load(), "should not trigger backup when API returns 401")
|
||||
}
|
||||
|
||||
func Test_RunFullBackup_WhenUploadSucceeds_BodyIsZstdCompressed(t *testing.T) {
|
||||
var mu sync.Mutex
|
||||
var receivedBody []byte
|
||||
|
||||
@@ -113,6 +113,35 @@ func (c *BackupCleaner) cleanStaleUploadedBasebackups() error {
|
||||
}
|
||||
|
||||
for _, backup := range staleBackups {
|
||||
staleStorage, storageErr := c.storageService.GetStorageByID(backup.StorageID)
|
||||
if storageErr != nil {
|
||||
c.logger.Error(
|
||||
"Failed to get storage for stale basebackup cleanup",
|
||||
"backupId", backup.ID,
|
||||
"storageId", backup.StorageID,
|
||||
"error", storageErr,
|
||||
)
|
||||
} else {
|
||||
if err := staleStorage.DeleteFile(c.fieldEncryptor, backup.FileName); err != nil {
|
||||
c.logger.Error(
|
||||
"Failed to delete stale basebackup file",
|
||||
"backupId", backup.ID,
|
||||
"fileName", backup.FileName,
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
|
||||
metadataFileName := backup.FileName + ".metadata"
|
||||
if err := staleStorage.DeleteFile(c.fieldEncryptor, metadataFileName); err != nil {
|
||||
c.logger.Error(
|
||||
"Failed to delete stale basebackup metadata file",
|
||||
"backupId", backup.ID,
|
||||
"fileName", metadataFileName,
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
failMsg := "basebackup finalization timed out after 10 minutes"
|
||||
backup.Status = backups_core.BackupStatusFailed
|
||||
backup.FailMessage = &failMsg
|
||||
@@ -127,7 +156,7 @@ func (c *BackupCleaner) cleanStaleUploadedBasebackups() error {
|
||||
}
|
||||
|
||||
c.logger.Info(
|
||||
"Marked stale uploaded basebackup as failed",
|
||||
"Marked stale uploaded basebackup as failed and cleaned storage",
|
||||
"backupId", backup.ID,
|
||||
"databaseId", backup.DatabaseID,
|
||||
)
|
||||
|
||||
@@ -1140,6 +1140,55 @@ func Test_CleanStaleUploadedBasebackups_SkipsActiveStreaming(t *testing.T) {
|
||||
assert.Nil(t, updated.UploadCompletedAt)
|
||||
}
|
||||
|
||||
func Test_CleanStaleUploadedBasebackups_CleansStorageFiles(t *testing.T) {
|
||||
router := CreateTestRouter()
|
||||
owner := users_testing.CreateTestUser(users_enums.UserRoleMember)
|
||||
workspace := workspaces_testing.CreateTestWorkspace("Test Workspace", owner, router)
|
||||
storage := storages.CreateTestStorage(workspace.ID)
|
||||
notifier := notifiers.CreateTestNotifier(workspace.ID)
|
||||
database := databases.CreateTestDatabase(workspace.ID, storage, notifier)
|
||||
|
||||
defer func() {
|
||||
backups, _ := backupRepository.FindByDatabaseID(database.ID)
|
||||
for _, backup := range backups {
|
||||
backupRepository.DeleteByID(backup.ID)
|
||||
}
|
||||
|
||||
databases.RemoveTestDatabase(database)
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
notifiers.RemoveTestNotifier(notifier)
|
||||
storages.RemoveTestStorage(storage.ID)
|
||||
workspaces_testing.RemoveTestWorkspace(workspace, router)
|
||||
}()
|
||||
|
||||
staleTime := time.Now().UTC().Add(-15 * time.Minute)
|
||||
walBackupType := backups_core.PgWalBackupTypeFullBackup
|
||||
staleBackup := &backups_core.Backup{
|
||||
ID: uuid.New(),
|
||||
DatabaseID: database.ID,
|
||||
StorageID: storage.ID,
|
||||
Status: backups_core.BackupStatusInProgress,
|
||||
PgWalBackupType: &walBackupType,
|
||||
UploadCompletedAt: &staleTime,
|
||||
BackupSizeMb: 500,
|
||||
FileName: "stale-basebackup-test-file",
|
||||
CreatedAt: staleTime,
|
||||
}
|
||||
|
||||
err := backupRepository.Save(staleBackup)
|
||||
assert.NoError(t, err)
|
||||
|
||||
cleaner := GetBackupCleaner()
|
||||
err = cleaner.cleanStaleUploadedBasebackups()
|
||||
assert.NoError(t, err)
|
||||
|
||||
updated, err := backupRepository.FindByID(staleBackup.ID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, backups_core.BackupStatusFailed, updated.Status)
|
||||
assert.NotNil(t, updated.FailMessage)
|
||||
assert.Contains(t, *updated.FailMessage, "finalization timed out")
|
||||
}
|
||||
|
||||
func createTestInterval() *intervals.Interval {
|
||||
timeOfDay := "04:00"
|
||||
interval := &intervals.Interval{
|
||||
|
||||
Reference in New Issue
Block a user