mirror of
https://github.com/databasus/databasus.git
synced 2026-04-06 00:32:03 +02:00
FIX (backups): Do not validate chain on WAL uploading
This commit is contained in:
@@ -3,7 +3,6 @@ package backups_controllers
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/google/uuid"
|
||||
@@ -25,6 +24,7 @@ func (c *PostgreWalBackupController) RegisterRoutes(router *gin.RouterGroup) {
|
||||
walRoutes := router.Group("/backups/postgres/wal")
|
||||
|
||||
walRoutes.GET("/next-full-backup-time", c.GetNextFullBackupTime)
|
||||
walRoutes.GET("/is-wal-chain-valid-since-last-full-backup", c.IsWalChainValidSinceLastBackup)
|
||||
walRoutes.POST("/error", c.ReportError)
|
||||
walRoutes.POST("/upload", c.Upload)
|
||||
walRoutes.GET("/restore/plan", c.GetRestorePlan)
|
||||
@@ -90,25 +90,49 @@ func (c *PostgreWalBackupController) ReportError(ctx *gin.Context) {
|
||||
ctx.Status(http.StatusOK)
|
||||
}
|
||||
|
||||
// IsWalChainValidSinceLastBackup
|
||||
// @Summary Check WAL chain validity since last full backup
|
||||
// @Description Checks whether the WAL chain is continuous since the last completed full backup.
|
||||
// Returns isValid=true if the chain is intact, or isValid=false with error details if not.
|
||||
// @Tags backups-wal
|
||||
// @Produce json
|
||||
// @Security AgentToken
|
||||
// @Success 200 {object} backups_dto.IsWalChainValidResponse
|
||||
// @Failure 401 {object} map[string]string
|
||||
// @Failure 500 {object} map[string]string
|
||||
// @Router /backups/postgres/wal/is-wal-chain-valid-since-last-full-backup [get]
|
||||
func (c *PostgreWalBackupController) IsWalChainValidSinceLastBackup(ctx *gin.Context) {
|
||||
database, err := c.getDatabase(ctx)
|
||||
if err != nil {
|
||||
ctx.JSON(http.StatusUnauthorized, gin.H{"error": "invalid agent token"})
|
||||
return
|
||||
}
|
||||
|
||||
response, err := c.walService.IsWalChainValid(database)
|
||||
if err != nil {
|
||||
ctx.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
ctx.JSON(http.StatusOK, response)
|
||||
}
|
||||
|
||||
// Upload
|
||||
// @Summary Stream upload a basebackup or WAL segment
|
||||
// @Description Accepts a zstd-compressed binary stream and stores it in the database's configured storage.
|
||||
// The server generates the storage filename; agents do not control the destination path.
|
||||
// For WAL segment uploads the server validates the WAL chain and returns 409 if a gap is detected
|
||||
// or 400 if no full backup exists yet (agent should trigger a full basebackup in both cases).
|
||||
// WAL segments are accepted unconditionally
|
||||
// @Tags backups-wal
|
||||
// @Accept application/octet-stream
|
||||
// @Produce json
|
||||
// @Security AgentToken
|
||||
// @Param X-Upload-Type header string true "Upload type" Enums(basebackup, wal)
|
||||
// @Param X-Wal-Segment-Name header string false "24-hex WAL segment identifier (required for wal uploads, e.g. 0000000100000001000000AB)"
|
||||
// @Param X-Wal-Segment-Size header int false "WAL segment size in bytes reported by the PostgreSQL instance (default: 16777216)"
|
||||
// @Param fullBackupWalStartSegment query string false "First WAL segment needed to make the basebackup consistent (required for basebackup uploads)"
|
||||
// @Param fullBackupWalStopSegment query string false "Last WAL segment included in the basebackup (required for basebackup uploads)"
|
||||
// @Success 204
|
||||
// @Failure 400 {object} backups_dto.UploadGapResponse "No full backup exists (error: no_full_backup)"
|
||||
// @Failure 400 {object} map[string]string
|
||||
// @Failure 401 {object} map[string]string
|
||||
// @Failure 409 {object} backups_dto.UploadGapResponse "WAL chain gap detected (error: gap_detected)"
|
||||
// @Failure 500 {object} map[string]string
|
||||
// @Router /backups/postgres/wal/upload [post]
|
||||
func (c *PostgreWalBackupController) Upload(ctx *gin.Context) {
|
||||
@@ -153,28 +177,13 @@ func (c *PostgreWalBackupController) Upload(ctx *gin.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
walSegmentSizeBytes := int64(0)
|
||||
if raw := ctx.GetHeader("X-Wal-Segment-Size"); raw != "" {
|
||||
parsed, parseErr := strconv.ParseInt(raw, 10, 64)
|
||||
if parseErr != nil || parsed <= 0 {
|
||||
ctx.JSON(
|
||||
http.StatusBadRequest,
|
||||
gin.H{"error": "X-Wal-Segment-Size must be a positive integer"},
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
walSegmentSizeBytes = parsed
|
||||
}
|
||||
|
||||
gapResp, uploadErr := c.walService.UploadWal(
|
||||
uploadErr := c.walService.UploadWal(
|
||||
ctx.Request.Context(),
|
||||
database,
|
||||
uploadType,
|
||||
walSegmentName,
|
||||
ctx.Query("fullBackupWalStartSegment"),
|
||||
ctx.Query("fullBackupWalStopSegment"),
|
||||
walSegmentSizeBytes,
|
||||
ctx.Request.Body,
|
||||
)
|
||||
|
||||
@@ -183,16 +192,6 @@ func (c *PostgreWalBackupController) Upload(ctx *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
if gapResp != nil {
|
||||
if gapResp.Error == "no_full_backup" {
|
||||
ctx.JSON(http.StatusBadRequest, gapResp)
|
||||
return
|
||||
}
|
||||
|
||||
ctx.JSON(http.StatusConflict, gapResp)
|
||||
return
|
||||
}
|
||||
|
||||
ctx.Status(http.StatusNoContent)
|
||||
}
|
||||
|
||||
|
||||
@@ -142,46 +142,34 @@ func Test_WalUpload_Basebackup_MissingWalSegments_Returns400(t *testing.T) {
|
||||
assert.Equal(t, http.StatusBadRequest, w.Code)
|
||||
}
|
||||
|
||||
func Test_WalUpload_WalSegment_NoFullBackup_Returns400(t *testing.T) {
|
||||
func Test_WalUpload_WalSegment_WithoutFullBackup_Returns204(t *testing.T) {
|
||||
router, db, storage, agentToken, _ := createWalTestSetup(t)
|
||||
defer removeWalTestSetup(db, storage)
|
||||
|
||||
// No full backup inserted — chain anchor is missing.
|
||||
body := bytes.NewReader([]byte("wal content"))
|
||||
req := newWalUploadRequest(body, agentToken, "wal", "000000010000000100000001", "", "")
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusBadRequest, w.Code)
|
||||
|
||||
var resp backups_dto.UploadGapResponse
|
||||
require.NoError(t, json.Unmarshal(w.Body.Bytes(), &resp))
|
||||
assert.Equal(t, "no_full_backup", resp.Error)
|
||||
assert.Equal(t, http.StatusNoContent, w.Code)
|
||||
}
|
||||
|
||||
func Test_WalUpload_WalSegment_GapDetected_Returns409WithExpectedAndReceived(t *testing.T) {
|
||||
func Test_WalUpload_WalSegment_WithGap_Returns204(t *testing.T) {
|
||||
router, db, storage, agentToken, _ := createWalTestSetup(t)
|
||||
defer removeWalTestSetup(db, storage)
|
||||
|
||||
// Full backup stops at ...0010; upload one WAL segment at ...0011.
|
||||
uploadBasebackup(t, router, agentToken, "000000010000000100000001", "000000010000000100000010")
|
||||
uploadWalSegment(t, router, agentToken, "000000010000000100000011")
|
||||
|
||||
// Send ...0013 — should be rejected because ...0012 is missing.
|
||||
// Skip ...0012, upload ...0013 — should succeed (no chain validation on upload).
|
||||
body := bytes.NewReader([]byte("wal content"))
|
||||
req := newWalUploadRequest(body, agentToken, "wal", "000000010000000100000013", "", "")
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusConflict, w.Code)
|
||||
|
||||
var resp backups_dto.UploadGapResponse
|
||||
require.NoError(t, json.Unmarshal(w.Body.Bytes(), &resp))
|
||||
assert.Equal(t, "gap_detected", resp.Error)
|
||||
assert.Equal(t, "000000010000000100000012", resp.ExpectedSegmentName)
|
||||
assert.Equal(t, "000000010000000100000013", resp.ReceivedSegmentName)
|
||||
assert.Equal(t, http.StatusNoContent, w.Code)
|
||||
}
|
||||
|
||||
func Test_WalUpload_WalSegment_DuplicateSegment_Returns200Idempotent(t *testing.T) {
|
||||
@@ -255,6 +243,108 @@ func Test_WalUpload_WalSegment_ValidNextSegment_Returns200AndCreatesRecord(t *te
|
||||
assert.Equal(t, "000000010000000100000011", *walBackup.PgWalSegmentName)
|
||||
}
|
||||
|
||||
func Test_IsWalChainValid_NoFullBackup_ReturnsFalse(t *testing.T) {
|
||||
router, db, storage, agentToken, _ := createWalTestSetup(t)
|
||||
defer removeWalTestSetup(db, storage)
|
||||
|
||||
var response backups_dto.IsWalChainValidResponse
|
||||
test_utils.MakeGetRequestAndUnmarshal(
|
||||
t, router,
|
||||
"/api/v1/backups/postgres/wal/is-wal-chain-valid-since-last-full-backup",
|
||||
agentToken,
|
||||
http.StatusOK,
|
||||
&response,
|
||||
)
|
||||
|
||||
assert.False(t, response.IsValid)
|
||||
assert.Equal(t, "no_full_backup", response.Error)
|
||||
}
|
||||
|
||||
func Test_IsWalChainValid_FullBackupOnly_ReturnsTrue(t *testing.T) {
|
||||
router, db, storage, agentToken, _ := createWalTestSetup(t)
|
||||
defer removeWalTestSetup(db, storage)
|
||||
|
||||
uploadBasebackup(t, router, agentToken, "000000010000000100000001", "000000010000000100000010")
|
||||
|
||||
var response backups_dto.IsWalChainValidResponse
|
||||
test_utils.MakeGetRequestAndUnmarshal(
|
||||
t, router,
|
||||
"/api/v1/backups/postgres/wal/is-wal-chain-valid-since-last-full-backup",
|
||||
agentToken,
|
||||
http.StatusOK,
|
||||
&response,
|
||||
)
|
||||
|
||||
assert.True(t, response.IsValid)
|
||||
assert.Empty(t, response.Error)
|
||||
}
|
||||
|
||||
func Test_IsWalChainValid_ContinuousChain_ReturnsTrue(t *testing.T) {
|
||||
router, db, storage, agentToken, _ := createWalTestSetup(t)
|
||||
defer removeWalTestSetup(db, storage)
|
||||
|
||||
uploadBasebackup(t, router, agentToken, "000000010000000100000001", "000000010000000100000010")
|
||||
uploadWalSegment(t, router, agentToken, "000000010000000100000011")
|
||||
uploadWalSegment(t, router, agentToken, "000000010000000100000012")
|
||||
uploadWalSegment(t, router, agentToken, "000000010000000100000013")
|
||||
|
||||
var response backups_dto.IsWalChainValidResponse
|
||||
test_utils.MakeGetRequestAndUnmarshal(
|
||||
t, router,
|
||||
"/api/v1/backups/postgres/wal/is-wal-chain-valid-since-last-full-backup",
|
||||
agentToken,
|
||||
http.StatusOK,
|
||||
&response,
|
||||
)
|
||||
|
||||
assert.True(t, response.IsValid)
|
||||
}
|
||||
|
||||
func Test_IsWalChainValid_BrokenChain_ReturnsFalse(t *testing.T) {
|
||||
router, db, storage, agentToken, _ := createWalTestSetup(t)
|
||||
defer removeWalTestSetup(db, storage)
|
||||
|
||||
uploadBasebackup(t, router, agentToken, "000000010000000100000001", "000000010000000100000010")
|
||||
uploadWalSegment(t, router, agentToken, "000000010000000100000011")
|
||||
uploadWalSegment(t, router, agentToken, "000000010000000100000012")
|
||||
uploadWalSegment(t, router, agentToken, "000000010000000100000013")
|
||||
|
||||
// Delete the middle segment to create a gap.
|
||||
middleSeg, err := backups_core.GetBackupRepository().FindWalSegmentByName(
|
||||
db.ID, "000000010000000100000012",
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, middleSeg)
|
||||
require.NoError(t, backups_core.GetBackupRepository().DeleteByID(middleSeg.ID))
|
||||
|
||||
var response backups_dto.IsWalChainValidResponse
|
||||
test_utils.MakeGetRequestAndUnmarshal(
|
||||
t, router,
|
||||
"/api/v1/backups/postgres/wal/is-wal-chain-valid-since-last-full-backup",
|
||||
agentToken,
|
||||
http.StatusOK,
|
||||
&response,
|
||||
)
|
||||
|
||||
assert.False(t, response.IsValid)
|
||||
assert.Equal(t, "wal_chain_broken", response.Error)
|
||||
assert.Equal(t, "000000010000000100000011", response.LastContiguousSegment)
|
||||
}
|
||||
|
||||
func Test_IsWalChainValid_InvalidToken_Returns401(t *testing.T) {
|
||||
router, db, storage, _, _ := createWalTestSetup(t)
|
||||
defer removeWalTestSetup(db, storage)
|
||||
|
||||
resp := test_utils.MakeGetRequest(
|
||||
t, router,
|
||||
"/api/v1/backups/postgres/wal/is-wal-chain-valid-since-last-full-backup",
|
||||
"invalid-token",
|
||||
http.StatusUnauthorized,
|
||||
)
|
||||
|
||||
assert.Contains(t, string(resp.Body), "invalid agent token")
|
||||
}
|
||||
|
||||
func Test_ReportError_ValidTokenAndError_CreatesFailedBackupRecord(t *testing.T) {
|
||||
router, db, storage, agentToken, _ := createWalTestSetup(t)
|
||||
defer removeWalTestSetup(db, storage)
|
||||
|
||||
@@ -44,10 +44,10 @@ type ReportErrorRequest struct {
|
||||
Error string `json:"error" binding:"required"`
|
||||
}
|
||||
|
||||
type UploadGapResponse struct {
|
||||
Error string `json:"error"`
|
||||
ExpectedSegmentName string `json:"expectedSegmentName"`
|
||||
ReceivedSegmentName string `json:"receivedSegmentName"`
|
||||
type IsWalChainValidResponse struct {
|
||||
IsValid bool `json:"isValid"`
|
||||
Error string `json:"error,omitempty"`
|
||||
LastContiguousSegment string `json:"lastContiguousSegment,omitempty"`
|
||||
}
|
||||
|
||||
type RestorePlanFullBackup struct {
|
||||
|
||||
@@ -31,8 +31,7 @@ type PostgreWalBackupService struct {
|
||||
}
|
||||
|
||||
// UploadWal accepts a streaming WAL segment or basebackup upload from the agent.
|
||||
// For WAL segments it validates the WAL chain before accepting. Returns an UploadGapResponse
|
||||
// (409) when the chain is broken so the agent knows to trigger a full basebackup.
|
||||
// WAL segments are accepted unconditionally
|
||||
func (s *PostgreWalBackupService) UploadWal(
|
||||
ctx context.Context,
|
||||
database *databases.Database,
|
||||
@@ -40,16 +39,15 @@ func (s *PostgreWalBackupService) UploadWal(
|
||||
walSegmentName string,
|
||||
fullBackupWalStartSegment string,
|
||||
fullBackupWalStopSegment string,
|
||||
walSegmentSizeBytes int64,
|
||||
body io.Reader,
|
||||
) (*backups_dto.UploadGapResponse, error) {
|
||||
) error {
|
||||
if err := s.validateWalBackupType(database); err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
if uploadType == backups_core.PgWalUploadTypeBasebackup {
|
||||
if fullBackupWalStartSegment == "" || fullBackupWalStopSegment == "" {
|
||||
return nil, fmt.Errorf(
|
||||
return fmt.Errorf(
|
||||
"fullBackupWalStartSegment and fullBackupWalStopSegment are required for basebackup uploads",
|
||||
)
|
||||
}
|
||||
@@ -57,32 +55,22 @@ func (s *PostgreWalBackupService) UploadWal(
|
||||
|
||||
backupConfig, err := s.backupConfigService.GetBackupConfigByDbId(database.ID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get backup config: %w", err)
|
||||
return fmt.Errorf("failed to get backup config: %w", err)
|
||||
}
|
||||
|
||||
if backupConfig.Storage == nil {
|
||||
return nil, fmt.Errorf("no storage configured for database %s", database.ID)
|
||||
return fmt.Errorf("no storage configured for database %s", database.ID)
|
||||
}
|
||||
|
||||
if uploadType == backups_core.PgWalUploadTypeWal {
|
||||
// Idempotency: check before chain validation so a successful re-upload is
|
||||
// not misidentified as a gap.
|
||||
// Idempotency: skip if this segment was already uploaded.
|
||||
existing, err := s.backupRepository.FindWalSegmentByName(database.ID, walSegmentName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to check for duplicate WAL segment: %w", err)
|
||||
return fmt.Errorf("failed to check for duplicate WAL segment: %w", err)
|
||||
}
|
||||
|
||||
if existing != nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
gapResp, err := s.validateWalChain(database.ID, walSegmentName, walSegmentSizeBytes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if gapResp != nil {
|
||||
return gapResp, nil
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -98,7 +86,7 @@ func (s *PostgreWalBackupService) UploadWal(
|
||||
)
|
||||
|
||||
if err := s.backupRepository.Save(backup); err != nil {
|
||||
return nil, fmt.Errorf("failed to create backup record: %w", err)
|
||||
return fmt.Errorf("failed to create backup record: %w", err)
|
||||
}
|
||||
|
||||
sizeBytes, streamErr := s.streamToStorage(ctx, backup, backupConfig, body)
|
||||
@@ -106,12 +94,12 @@ func (s *PostgreWalBackupService) UploadWal(
|
||||
errMsg := streamErr.Error()
|
||||
s.markFailed(backup, errMsg)
|
||||
|
||||
return nil, fmt.Errorf("upload failed: %w", streamErr)
|
||||
return fmt.Errorf("upload failed: %w", streamErr)
|
||||
}
|
||||
|
||||
s.markCompleted(backup, sizeBytes)
|
||||
|
||||
return nil, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *PostgreWalBackupService) GetRestorePlan(
|
||||
@@ -299,55 +287,48 @@ func (s *PostgreWalBackupService) ReportError(
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *PostgreWalBackupService) validateWalChain(
|
||||
databaseID uuid.UUID,
|
||||
incomingSegment string,
|
||||
walSegmentSizeBytes int64,
|
||||
) (*backups_dto.UploadGapResponse, error) {
|
||||
fullBackup, err := s.backupRepository.FindLastCompletedFullWalBackupByDatabaseID(databaseID)
|
||||
// IsWalChainValid checks whether the WAL chain is continuous since the last completed full backup.
|
||||
func (s *PostgreWalBackupService) IsWalChainValid(
|
||||
database *databases.Database,
|
||||
) (*backups_dto.IsWalChainValidResponse, error) {
|
||||
if err := s.validateWalBackupType(database); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fullBackup, err := s.backupRepository.FindLastCompletedFullWalBackupByDatabaseID(database.ID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query full backup: %w", err)
|
||||
}
|
||||
|
||||
// No full backup exists yet: cannot accept WAL segments without a chain anchor.
|
||||
if fullBackup == nil || fullBackup.PgFullBackupWalStopSegmentName == nil {
|
||||
return &backups_dto.UploadGapResponse{
|
||||
Error: "no_full_backup",
|
||||
ExpectedSegmentName: "",
|
||||
ReceivedSegmentName: incomingSegment,
|
||||
return &backups_dto.IsWalChainValidResponse{
|
||||
IsValid: false,
|
||||
Error: "no_full_backup",
|
||||
}, nil
|
||||
}
|
||||
|
||||
stopSegment := *fullBackup.PgFullBackupWalStopSegmentName
|
||||
startSegment := ""
|
||||
if fullBackup.PgFullBackupWalStartSegmentName != nil {
|
||||
startSegment = *fullBackup.PgFullBackupWalStartSegmentName
|
||||
}
|
||||
|
||||
lastWal, err := s.backupRepository.FindLastWalSegmentAfter(databaseID, stopSegment)
|
||||
walSegments, err := s.backupRepository.FindCompletedWalSegmentsAfter(database.ID, startSegment)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query last WAL segment: %w", err)
|
||||
return nil, fmt.Errorf("failed to query WAL segments: %w", err)
|
||||
}
|
||||
|
||||
walCalculator := util_wal.NewWalCalculator(walSegmentSizeBytes)
|
||||
|
||||
var chainTail string
|
||||
if lastWal != nil && lastWal.PgWalSegmentName != nil {
|
||||
chainTail = *lastWal.PgWalSegmentName
|
||||
} else {
|
||||
chainTail = stopSegment
|
||||
}
|
||||
|
||||
expectedNext, err := walCalculator.NextSegment(chainTail)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("WAL arithmetic failed for %q: %w", chainTail, err)
|
||||
}
|
||||
|
||||
if incomingSegment != expectedNext {
|
||||
return &backups_dto.UploadGapResponse{
|
||||
Error: "gap_detected",
|
||||
ExpectedSegmentName: expectedNext,
|
||||
ReceivedSegmentName: incomingSegment,
|
||||
chainErr := s.validateRestoreWalChain(fullBackup, walSegments)
|
||||
if chainErr != nil {
|
||||
return &backups_dto.IsWalChainValidResponse{
|
||||
IsValid: false,
|
||||
Error: chainErr.Error,
|
||||
LastContiguousSegment: chainErr.LastContiguousSegment,
|
||||
}, nil
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
return &backups_dto.IsWalChainValidResponse{
|
||||
IsValid: true,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *PostgreWalBackupService) createBackupRecord(
|
||||
|
||||
Reference in New Issue
Block a user