From 58ae86ff7a1fcd65a1cfa850a3c9cdd2043aa7a8 Mon Sep 17 00:00:00 2001 From: Rostislav Dugin Date: Fri, 2 Jan 2026 14:20:32 +0300 Subject: [PATCH] FIX (backups): Revert directory update --- .../features/backups/backups/controller.go | 11 +- .../backups/backups/controller_test.go | 35 +- .../features/backups/backups/model.go | 4 +- .../features/backups/backups/service.go | 1 - .../usecases/mariadb/create_backup_uc.go | 2 +- .../usecases/mongodb/create_backup_uc.go | 2 +- .../usecases/mysql/create_backup_uc.go | 2 +- .../usecases/postgresql/create_backup_uc.go | 380 +++++++++--------- .../usecases/postgresql/restore_backup_uc.go | 308 +------------- ...0260102110334_remove_backup_type_field.sql | 9 + 10 files changed, 212 insertions(+), 542 deletions(-) create mode 100644 backend/migrations/20260102110334_remove_backup_type_field.sql diff --git a/backend/internal/features/backups/backups/controller.go b/backend/internal/features/backups/backups/controller.go index 8d9163c..f1e1024 100644 --- a/backend/internal/features/backups/backups/controller.go +++ b/backend/internal/features/backups/backups/controller.go @@ -1,7 +1,6 @@ package backups import ( - "databasus-backend/internal/features/backups/backups/common" "databasus-backend/internal/features/databases" users_middleware "databasus-backend/internal/features/users/middleware" "fmt" @@ -223,24 +222,20 @@ func (c *BackupController) generateBackupFilename( // Sanitize database name for filename (replace spaces and special chars) safeName := sanitizeFilename(database.Name) - // Determine extension based on database type and backup type - extension := c.getBackupExtension(database.Type, backup.Type) + // Determine extension based on database type + extension := c.getBackupExtension(database.Type) return fmt.Sprintf("%s_backup_%s%s", safeName, timestamp, extension) } func (c *BackupController) getBackupExtension( dbType databases.DatabaseType, - backupType common.BackupType, ) string { switch dbType { case databases.DatabaseTypeMysql, databases.DatabaseTypeMariadb: return ".sql.zst" case databases.DatabaseTypePostgres: - // For PostgreSQL, use .tar for directory type, .dump for custom type - if backupType == common.BackupTypeDirectory { - return ".tar" - } + // PostgreSQL custom format return ".dump" case databases.DatabaseTypeMongodb: return ".archive" diff --git a/backend/internal/features/backups/backups/controller_test.go b/backend/internal/features/backups/backups/controller_test.go index c502a3e..b919ae5 100644 --- a/backend/internal/features/backups/backups/controller_test.go +++ b/backend/internal/features/backups/backups/controller_test.go @@ -16,7 +16,6 @@ import ( "github.com/stretchr/testify/assert" audit_logs "databasus-backend/internal/features/audit_logs" - common "databasus-backend/internal/features/backups/backups/common" backups_config "databasus-backend/internal/features/backups/config" "databasus-backend/internal/features/databases" "databasus-backend/internal/features/databases/databases/postgresql" @@ -499,35 +498,24 @@ func Test_DownloadBackup_ProperFilenameForPostgreSQL(t *testing.T) { tests := []struct { name string databaseName string - backupType string expectedExt string expectedInName string }{ { - name: "PostgreSQL with directory type", + name: "PostgreSQL database", databaseName: "my_postgres_db", - backupType: "DIRECTORY", - expectedExt: ".tar", - expectedInName: "my_postgres_db_backup_", - }, - { - name: "PostgreSQL with default type", - databaseName: "my_postgres_db", - backupType: "DEFAULT", expectedExt: ".dump", expectedInName: "my_postgres_db_backup_", }, { name: "Database name with spaces", databaseName: "my test db", - backupType: "DIRECTORY", - expectedExt: ".tar", + expectedExt: ".dump", expectedInName: "my_test_db_backup_", }, { name: "Database name with special characters", databaseName: "my:db/test", - backupType: "DEFAULT", expectedExt: ".dump", expectedInName: "my-db-test_backup_", }, @@ -552,7 +540,7 @@ func Test_DownloadBackup_ProperFilenameForPostgreSQL(t *testing.T) { _, err = configService.SaveBackupConfig(config) assert.NoError(t, err) - backup := createTestBackupWithType(database, owner, tt.backupType) + backup := createTestBackup(database, owner) resp := test_utils.MakeGetRequest( t, @@ -827,20 +815,3 @@ func createTestBackup( return backup } - -func createTestBackupWithType( - database *databases.Database, - owner *users_dto.SignInResponseDTO, - backupType string, -) *Backup { - backup := createTestBackup(database, owner) - - // Update the format field - repo := &BackupRepository{} - backup.Type = common.BackupType(backupType) - if err := repo.Save(backup); err != nil { - panic(err) - } - - return backup -} diff --git a/backend/internal/features/backups/backups/model.go b/backend/internal/features/backups/backups/model.go index 165aa2b..4d0c708 100644 --- a/backend/internal/features/backups/backups/model.go +++ b/backend/internal/features/backups/backups/model.go @@ -1,7 +1,6 @@ package backups import ( - common "databasus-backend/internal/features/backups/backups/common" backups_config "databasus-backend/internal/features/backups/config" "time" @@ -25,6 +24,5 @@ type Backup struct { EncryptionIV *string `json:"-" gorm:"column:encryption_iv"` Encryption backups_config.BackupEncryption `json:"encryption" gorm:"column:encryption;type:text;not null;default:'NONE'"` - Type common.BackupType `json:"type" gorm:"column:type;type:text;not null;default:'DEFAULT'"` - CreatedAt time.Time `json:"createdAt" gorm:"column:created_at"` + CreatedAt time.Time `json:"createdAt" gorm:"column:created_at"` } diff --git a/backend/internal/features/backups/backups/service.go b/backend/internal/features/backups/backups/service.go index 107b587..334addf 100644 --- a/backend/internal/features/backups/backups/service.go +++ b/backend/internal/features/backups/backups/service.go @@ -343,7 +343,6 @@ func (s *BackupService) MakeBackup(databaseID uuid.UUID, isLastTry bool) { backup.EncryptionSalt = backupMetadata.EncryptionSalt backup.EncryptionIV = backupMetadata.EncryptionIV backup.Encryption = backupMetadata.Encryption - backup.Type = backupMetadata.Type } if err := s.backupRepository.Save(backup); err != nil { diff --git a/backend/internal/features/backups/backups/usecases/mariadb/create_backup_uc.go b/backend/internal/features/backups/backups/usecases/mariadb/create_backup_uc.go index 4d3cf97..352e78f 100644 --- a/backend/internal/features/backups/backups/usecases/mariadb/create_backup_uc.go +++ b/backend/internal/features/backups/backups/usecases/mariadb/create_backup_uc.go @@ -30,7 +30,7 @@ import ( ) const ( - backupTimeout = 6 * time.Hour + backupTimeout = 23 * time.Hour shutdownCheckInterval = 1 * time.Second copyBufferSize = 8 * 1024 * 1024 progressReportIntervalMB = 1.0 diff --git a/backend/internal/features/backups/backups/usecases/mongodb/create_backup_uc.go b/backend/internal/features/backups/backups/usecases/mongodb/create_backup_uc.go index 000cd2d..9169e5a 100644 --- a/backend/internal/features/backups/backups/usecases/mongodb/create_backup_uc.go +++ b/backend/internal/features/backups/backups/usecases/mongodb/create_backup_uc.go @@ -27,7 +27,7 @@ import ( ) const ( - backupTimeout = 6 * time.Hour + backupTimeout = 23 * time.Hour shutdownCheckInterval = 1 * time.Second copyBufferSize = 8 * 1024 * 1024 progressReportIntervalMB = 1.0 diff --git a/backend/internal/features/backups/backups/usecases/mysql/create_backup_uc.go b/backend/internal/features/backups/backups/usecases/mysql/create_backup_uc.go index 293c8c2..91a59fe 100644 --- a/backend/internal/features/backups/backups/usecases/mysql/create_backup_uc.go +++ b/backend/internal/features/backups/backups/usecases/mysql/create_backup_uc.go @@ -30,7 +30,7 @@ import ( ) const ( - backupTimeout = 6 * time.Hour + backupTimeout = 23 * time.Hour shutdownCheckInterval = 1 * time.Second copyBufferSize = 8 * 1024 * 1024 progressReportIntervalMB = 1.0 diff --git a/backend/internal/features/backups/backups/usecases/postgresql/create_backup_uc.go b/backend/internal/features/backups/backups/usecases/postgresql/create_backup_uc.go index b59e95e..b3a3b0e6 100644 --- a/backend/internal/features/backups/backups/usecases/postgresql/create_backup_uc.go +++ b/backend/internal/features/backups/backups/usecases/postgresql/create_backup_uc.go @@ -1,7 +1,6 @@ package usecases_postgresql import ( - "archive/tar" "context" "encoding/base64" "errors" @@ -24,15 +23,15 @@ import ( encryption_secrets "databasus-backend/internal/features/encryption/secrets" "databasus-backend/internal/features/storages" "databasus-backend/internal/util/encryption" - files_utils "databasus-backend/internal/util/files" "databasus-backend/internal/util/tools" "github.com/google/uuid" ) const ( - backupTimeout = 6 * time.Hour + backupTimeout = 23 * time.Hour shutdownCheckInterval = 1 * time.Second + copyBufferSize = 8 * 1024 * 1024 progressReportIntervalMB = 1.0 pgConnectTimeout = 30 compressionLevel = 5 @@ -47,6 +46,11 @@ type CreatePostgresqlBackupUsecase struct { fieldEncryptor encryption.FieldEncryptor } +type writeResult struct { + bytesWritten int + writeErr error +} + func (uc *CreatePostgresqlBackupUsecase) Execute( ctx context.Context, backupID uuid.UUID, @@ -58,7 +62,7 @@ func (uc *CreatePostgresqlBackupUsecase) Execute( ), ) (*common.BackupMetadata, error) { uc.logger.Info( - "Creating PostgreSQL backup via pg_dump directory type", + "Creating PostgreSQL backup via pg_dump custom format", "databaseId", db.ID, "storageId", @@ -79,12 +83,14 @@ func (uc *CreatePostgresqlBackupUsecase) Execute( return nil, fmt.Errorf("database name is required for pg_dump backups") } + args := uc.buildPgDumpArgs(pg) + decryptedPassword, err := uc.fieldEncryptor.Decrypt(db.ID, pg.Password) if err != nil { return nil, fmt.Errorf("failed to decrypt database password: %w", err) } - return uc.executeDirectoryBackup( + return uc.streamToStorage( ctx, backupID, backupConfig, @@ -94,127 +100,66 @@ func (uc *CreatePostgresqlBackupUsecase) Execute( config.GetEnv().EnvMode, config.GetEnv().PostgresesInstallDir, ), - pg, + args, decryptedPassword, storage, + db, backupProgressListener, ) } -// executeDirectoryBackup runs pg_dump with directory type and streams as TAR to storage -func (uc *CreatePostgresqlBackupUsecase) executeDirectoryBackup( +// streamToStorage streams pg_dump output directly to storage +func (uc *CreatePostgresqlBackupUsecase) streamToStorage( parentCtx context.Context, backupID uuid.UUID, backupConfig *backups_config.BackupConfig, pgBin string, - pg *pgtypes.PostgresqlDatabase, + args []string, password string, storage *storages.Storage, + db *databases.Database, backupProgressListener func(completedMBs float64), ) (*common.BackupMetadata, error) { + uc.logger.Info("Streaming PostgreSQL backup to storage", "pgBin", pgBin, "args", args) + ctx, cancel := uc.createBackupContext(parentCtx) defer cancel() - // Create temporary directory for pg_dump output - err := files_utils.EnsureDirectories([]string{config.GetEnv().TempFolder}) - if err != nil { - return nil, fmt.Errorf("failed to ensure temp directories: %w", err) - } - - tempDir, err := os.MkdirTemp(config.GetEnv().TempFolder, "pgdump_"+backupID.String()) - if err != nil { - return nil, fmt.Errorf("failed to create temp directory: %w", err) - } - defer func() { - _ = os.RemoveAll(tempDir) - }() - - outputDir := filepath.Join(tempDir, "dump") - - args := uc.buildPgDumpArgs(pg, outputDir) - uc.logger.Info( - "Executing PostgreSQL backup with directory type", - "pgBin", - pgBin, - "args", - args, - "outputDir", - outputDir, - ) - - pgpassFile, err := uc.setupPgpassFile(pg, password) + pgpassFile, err := uc.setupPgpassFile(db.Postgresql, password) if err != nil { return nil, err } defer func() { if pgpassFile != "" { + // Remove the entire temp directory (which contains the .pgpass file) _ = os.RemoveAll(filepath.Dir(pgpassFile)) } }() - // Execute pg_dump to directory cmd := exec.CommandContext(ctx, pgBin, args...) - if err := uc.setupPgEnvironment(cmd, pgpassFile, pg.IsHttps, password, pg.CpuCount, pgBin); err != nil { + uc.logger.Info("Executing PostgreSQL backup command", "command", cmd.String()) + + if err := uc.setupPgEnvironment(cmd, pgpassFile, db.Postgresql.IsHttps, password, db.Postgresql.CpuCount, pgBin); err != nil { return nil, err } + pgStdout, err := cmd.StdoutPipe() + if err != nil { + return nil, fmt.Errorf("stdout pipe: %w", err) + } + pgStderr, err := cmd.StderrPipe() if err != nil { return nil, fmt.Errorf("stderr pipe: %w", err) } + // Capture stderr in a separate goroutine to ensure we don't miss any error output stderrCh := make(chan []byte, 1) go func() { stderrOutput, _ := io.ReadAll(pgStderr) stderrCh <- stderrOutput }() - if err = cmd.Start(); err != nil { - return nil, fmt.Errorf("start %s: %w", filepath.Base(pgBin), err) - } - - waitErr := cmd.Wait() - stderrOutput := <-stderrCh - - select { - case <-ctx.Done(): - return nil, uc.checkCancellationReason() - default: - } - - if waitErr != nil { - if err := uc.checkCancellation(ctx); err != nil { - return nil, err - } - return nil, uc.buildPgDumpErrorMessage(waitErr, stderrOutput, pgBin, args, password) - } - - uc.logger.Info( - "pg_dump completed successfully, streaming TAR to storage", - "outputDir", - outputDir, - ) - - // Stream directory as TAR to storage - return uc.streamDirectoryToStorage( - ctx, - backupID, - backupConfig, - outputDir, - storage, - backupProgressListener, - ) -} - -// streamDirectoryToStorage creates a TAR archive from the directory and streams it to storage -func (uc *CreatePostgresqlBackupUsecase) streamDirectoryToStorage( - ctx context.Context, - backupID uuid.UUID, - backupConfig *backups_config.BackupConfig, - sourceDir string, - storage *storages.Storage, - backupProgressListener func(completedMBs float64), -) (*common.BackupMetadata, error) { storageReader, storageWriter := io.Pipe() finalWriter, encryptionWriter, backupMetadata, err := uc.setupBackupEncryption( @@ -226,176 +171,162 @@ func (uc *CreatePostgresqlBackupUsecase) streamDirectoryToStorage( return nil, err } - // Set type to DIRECTORY for new PostgreSQL backups - backupMetadata.Type = common.BackupTypeDirectory + countingWriter := common.NewCountingWriter(finalWriter) - // Start streaming into storage + // The backup ID becomes the object key / filename in storage + + // Start streaming into storage in its own goroutine saveErrCh := make(chan error, 1) go func() { saveErr := storage.SaveFile(ctx, uc.fieldEncryptor, uc.logger, backupID, storageReader) saveErrCh <- saveErr }() - // Create TAR and stream to storage - tarErrCh := make(chan error, 1) - totalSizeCh := make(chan int64, 1) + // Start pg_dump + if err = cmd.Start(); err != nil { + return nil, fmt.Errorf("start %s: %w", filepath.Base(pgBin), err) + } + + // Copy pg output directly to storage with shutdown checks + copyResultCh := make(chan error, 1) + bytesWrittenCh := make(chan int64, 1) go func() { - totalSize, tarErr := uc.writeTarToWriter( + bytesWritten, err := uc.copyWithShutdownCheck( ctx, - sourceDir, - finalWriter, + countingWriter, + pgStdout, backupProgressListener, ) - totalSizeCh <- totalSize - tarErrCh <- tarErr - - // Close encryption writer first if present - if encryptionWriter != nil { - if closeErr := encryptionWriter.Close(); closeErr != nil { - uc.logger.Error("Failed to close encryption writer", "error", closeErr) - } - } - // Then close the pipe writer to signal EOF to storage - if closeErr := storageWriter.Close(); closeErr != nil { - uc.logger.Error("Failed to close pipe writer", "error", closeErr) - } + bytesWrittenCh <- bytesWritten + copyResultCh <- err }() - tarErr := <-tarErrCh - totalSize := <-totalSizeCh - saveErr := <-saveErrCh + copyErr := <-copyResultCh + bytesWritten := <-bytesWrittenCh + waitErr := cmd.Wait() select { case <-ctx.Done(): + uc.cleanupOnCancellation(encryptionWriter, storageWriter, saveErrCh) return nil, uc.checkCancellationReason() default: } - // Send final size after backup is completed - if tarErr == nil && saveErr == nil && backupProgressListener != nil { - sizeMB := float64(totalSize) / (1024 * 1024) + if err := uc.closeWriters(encryptionWriter, storageWriter); err != nil { + <-saveErrCh + return nil, err + } + + saveErr := <-saveErrCh + stderrOutput := <-stderrCh + + // Send final sizing after backup is completed + if waitErr == nil && copyErr == nil && saveErr == nil && backupProgressListener != nil { + sizeMB := float64(bytesWritten) / (1024 * 1024) backupProgressListener(sizeMB) } - if tarErr != nil { + switch { + case waitErr != nil: if err := uc.checkCancellation(ctx); err != nil { return nil, err } - return nil, fmt.Errorf("failed to create TAR archive: %w", tarErr) - } - - if saveErr != nil { + return nil, uc.buildPgDumpErrorMessage(waitErr, stderrOutput, pgBin, args, password) + case copyErr != nil: if err := uc.checkCancellation(ctx); err != nil { return nil, err } - return nil, fmt.Errorf("failed to save to storage: %w", saveErr) + return nil, fmt.Errorf("copy to storage: %w", copyErr) + case saveErr != nil: + if err := uc.checkCancellation(ctx); err != nil { + return nil, err + } + return nil, fmt.Errorf("save to storage: %w", saveErr) } - uc.logger.Info( - "Backup completed successfully", - "backupId", - backupID, - "totalSizeBytes", - totalSize, - ) return &backupMetadata, nil } -// writeTarToWriter creates a TAR archive from sourceDir and writes it to the writer -func (uc *CreatePostgresqlBackupUsecase) writeTarToWriter( +func (uc *CreatePostgresqlBackupUsecase) copyWithShutdownCheck( ctx context.Context, - sourceDir string, - writer io.Writer, + dst io.Writer, + src io.Reader, backupProgressListener func(completedMBs float64), ) (int64, error) { - tarWriter := tar.NewWriter(writer) - defer func() { - _ = tarWriter.Close() - }() - - var totalSize int64 + buf := make([]byte, copyBufferSize) + var totalBytesWritten int64 var lastReportedMB float64 - err := filepath.Walk(sourceDir, func(path string, info os.FileInfo, walkErr error) error { - if walkErr != nil { - return walkErr - } - + for { select { case <-ctx.Done(): - return ctx.Err() + return totalBytesWritten, fmt.Errorf("copy cancelled: %w", ctx.Err()) default: } if config.IsShouldShutdown() { - return fmt.Errorf("backup cancelled due to shutdown") + return totalBytesWritten, fmt.Errorf("copy cancelled due to shutdown") } - // Get relative path for TAR header - relPath, err := filepath.Rel(sourceDir, path) - if err != nil { - return fmt.Errorf("failed to get relative path: %w", err) - } + bytesRead, readErr := src.Read(buf) + if bytesRead > 0 { + writeResultCh := make(chan writeResult, 1) + go func() { + bytesWritten, writeErr := dst.Write(buf[0:bytesRead]) + writeResultCh <- writeResult{bytesWritten, writeErr} + }() - // Skip the root directory itself - if relPath == "." { - return nil - } + var bytesWritten int + var writeErr error - // Create TAR header - header, err := tar.FileInfoHeader(info, "") - if err != nil { - return fmt.Errorf("failed to create TAR header: %w", err) - } - header.Name = relPath + select { + case <-ctx.Done(): + return totalBytesWritten, fmt.Errorf("copy cancelled during write: %w", ctx.Err()) + case result := <-writeResultCh: + bytesWritten = result.bytesWritten + writeErr = result.writeErr + } - if err := tarWriter.WriteHeader(header); err != nil { - return fmt.Errorf("failed to write TAR header: %w", err) - } + if bytesWritten < 0 || bytesRead < bytesWritten { + bytesWritten = 0 + if writeErr == nil { + writeErr = fmt.Errorf("invalid write result") + } + } - // If it's a directory, we're done - if info.IsDir() { - return nil - } + if writeErr != nil { + return totalBytesWritten, writeErr + } - // Copy file content to TAR - file, err := os.Open(path) - if err != nil { - return fmt.Errorf("failed to open file %s: %w", path, err) - } - defer func() { - _ = file.Close() - }() + if bytesRead != bytesWritten { + return totalBytesWritten, io.ErrShortWrite + } - written, err := io.Copy(tarWriter, file) - if err != nil { - return fmt.Errorf("failed to write file %s to TAR: %w", path, err) - } + totalBytesWritten += int64(bytesWritten) - totalSize += written - - // Report progress - if backupProgressListener != nil { - currentSizeMB := float64(totalSize) / (1024 * 1024) - if currentSizeMB >= lastReportedMB+progressReportIntervalMB { - backupProgressListener(currentSizeMB) - lastReportedMB = currentSizeMB + if backupProgressListener != nil { + currentSizeMB := float64(totalBytesWritten) / (1024 * 1024) + if currentSizeMB >= lastReportedMB+progressReportIntervalMB { + backupProgressListener(currentSizeMB) + lastReportedMB = currentSizeMB + } } } - return nil - }) + if readErr != nil { + if readErr != io.EOF { + return totalBytesWritten, readErr + } + break + } + } - return totalSize, err + return totalBytesWritten, nil } -func (uc *CreatePostgresqlBackupUsecase) buildPgDumpArgs( - pg *pgtypes.PostgresqlDatabase, - outputDir string, -) []string { +func (uc *CreatePostgresqlBackupUsecase) buildPgDumpArgs(pg *pgtypes.PostgresqlDatabase) []string { args := []string{ - "-Fd", // Directory type (enables parallel dump) - "-f", outputDir, // Output directory + "-Fc", "--no-password", "-h", pg.Host, "-p", strconv.Itoa(pg.Port), @@ -404,7 +335,7 @@ func (uc *CreatePostgresqlBackupUsecase) buildPgDumpArgs( "--verbose", } - // Parallel jobs now actually work with directory type + // Add parallel jobs based on CPU count if pg.CpuCount > 1 { args = append(args, "-j", strconv.Itoa(pg.CpuCount)) } @@ -590,6 +521,63 @@ func (uc *CreatePostgresqlBackupUsecase) setupBackupEncryption( return encWriter, encWriter, metadata, nil } +func (uc *CreatePostgresqlBackupUsecase) cleanupOnCancellation( + encryptionWriter *backup_encryption.EncryptionWriter, + storageWriter io.WriteCloser, + saveErrCh chan error, +) { + if encryptionWriter != nil { + go func() { + if closeErr := encryptionWriter.Close(); closeErr != nil { + uc.logger.Error( + "Failed to close encrypting writer during cancellation", + "error", + closeErr, + ) + } + }() + } + + if err := storageWriter.Close(); err != nil { + uc.logger.Error("Failed to close pipe writer during cancellation", "error", err) + } + + <-saveErrCh +} + +func (uc *CreatePostgresqlBackupUsecase) closeWriters( + encryptionWriter *backup_encryption.EncryptionWriter, + storageWriter io.WriteCloser, +) error { + encryptionCloseErrCh := make(chan error, 1) + if encryptionWriter != nil { + go func() { + closeErr := encryptionWriter.Close() + if closeErr != nil { + uc.logger.Error("Failed to close encrypting writer", "error", closeErr) + } + encryptionCloseErrCh <- closeErr + }() + } else { + encryptionCloseErrCh <- nil + } + + encryptionCloseErr := <-encryptionCloseErrCh + if encryptionCloseErr != nil { + if err := storageWriter.Close(); err != nil { + uc.logger.Error("Failed to close pipe writer after encryption error", "error", err) + } + return fmt.Errorf("failed to close encryption writer: %w", encryptionCloseErr) + } + + if err := storageWriter.Close(); err != nil { + uc.logger.Error("Failed to close pipe writer", "error", err) + return err + } + + return nil +} + func (uc *CreatePostgresqlBackupUsecase) checkCancellation(ctx context.Context) error { select { case <-ctx.Done(): @@ -771,7 +759,7 @@ func (uc *CreatePostgresqlBackupUsecase) createTempPgpassFile( escapedPassword, ) - tempDir, err := os.MkdirTemp(config.GetEnv().TempFolder, "pgpass_"+uuid.New().String()) + tempDir, err := os.MkdirTemp("", "pgpass") if err != nil { return "", fmt.Errorf("failed to create temporary directory: %w", err) } diff --git a/backend/internal/features/restores/usecases/postgresql/restore_backup_uc.go b/backend/internal/features/restores/usecases/postgresql/restore_backup_uc.go index 642e6a1..44d34db 100644 --- a/backend/internal/features/restores/usecases/postgresql/restore_backup_uc.go +++ b/backend/internal/features/restores/usecases/postgresql/restore_backup_uc.go @@ -1,7 +1,6 @@ package usecases_postgresql import ( - "archive/tar" "context" "encoding/base64" "errors" @@ -17,7 +16,6 @@ import ( "databasus-backend/internal/config" "databasus-backend/internal/features/backups/backups" - common "databasus-backend/internal/features/backups/backups/common" "databasus-backend/internal/features/backups/backups/encryption" backups_config "databasus-backend/internal/features/backups/config" "databasus-backend/internal/features/databases" @@ -26,7 +24,6 @@ import ( "databasus-backend/internal/features/restores/models" "databasus-backend/internal/features/storages" util_encryption "databasus-backend/internal/util/encryption" - files_utils "databasus-backend/internal/util/files" "databasus-backend/internal/util/tools" "github.com/google/uuid" @@ -56,8 +53,6 @@ func (uc *RestorePostgresqlBackupUsecase) Execute( restore.ID, "backupId", backup.ID, - "format", - backup.Type, ) pg := restoringToDB.Postgresql @@ -76,30 +71,15 @@ func (uc *RestorePostgresqlBackupUsecase) Execute( config.GetEnv().PostgresesInstallDir, ) - // Route based on backup type - switch backup.Type { - case common.BackupTypeDirectory: - return uc.restoreDirectoryType( - originalDB, - restoringToDB, - pgBin, - backup, - storage, - pg, - isExcludeExtensions, - ) - case common.BackupTypeDefault, "": // empty = legacy DEFAULT - return uc.restoreCustomType( - originalDB, - pgBin, - backup, - storage, - pg, - isExcludeExtensions, - ) - default: - return fmt.Errorf("unsupported backup type: %s", backup.Type) - } + // All PostgreSQL backups are now custom format (-Fc) + return uc.restoreCustomType( + originalDB, + pgBin, + backup, + storage, + pg, + isExcludeExtensions, + ) } // restoreCustomType restores a backup in custom type (-Fc) - legacy type @@ -144,100 +124,6 @@ func (uc *RestorePostgresqlBackupUsecase) restoreCustomType( ) } -// restoreDirectoryType restores a backup in directory type (-Fd) - new TAR type -func (uc *RestorePostgresqlBackupUsecase) restoreDirectoryType( - originalDB *databases.Database, - _ *databases.Database, // restoringToDB not used but kept for API consistency - pgBin string, - backup *backups.Backup, - storage *storages.Storage, - pg *pgtypes.PostgresqlDatabase, - isExcludeExtensions bool, -) error { - uc.logger.Info("Restoring backup in directory type (-Fd)", "backupId", backup.ID) - - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Minute) - defer cancel() - - // Monitor for shutdown - go func() { - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - if config.IsShouldShutdown() { - cancel() - return - } - } - } - }() - - // Create temporary .pgpass file - pgpassFile, err := uc.createTempPgpassFile(pg, pg.Password) - if err != nil { - return fmt.Errorf("failed to create temporary .pgpass file: %w", err) - } - defer func() { - if pgpassFile != "" { - _ = os.RemoveAll(filepath.Dir(pgpassFile)) - } - }() - - // Download and extract TAR to temporary directory - tempDir, cleanupFunc, err := uc.downloadAndExtractTar(ctx, backup, storage) - if err != nil { - return fmt.Errorf("failed to download and extract backup: %w", err) - } - defer cleanupFunc() - - // Use parallel jobs based on CPU count - parallelJobs := max(1, min(pg.CpuCount, 8)) - - args := []string{ - "-Fd", // directory type - "-j", strconv.Itoa(parallelJobs), // parallel restore - "--no-password", - "-h", pg.Host, - "-p", strconv.Itoa(pg.Port), - "-U", pg.Username, - "-d", *pg.Database, - "--verbose", - "--clean", - "--if-exists", - "--no-owner", - "--no-acl", - } - - // If excluding extensions, generate filtered TOC list - if isExcludeExtensions { - tocListFile, err := uc.generateFilteredTocList( - ctx, - pgBin, - tempDir, - pgpassFile, - pg, - ) - if err != nil { - return fmt.Errorf("failed to generate filtered TOC list: %w", err) - } - defer func() { - _ = os.Remove(tocListFile) - }() - - args = append(args, "-L", tocListFile) - } - - // Add the directory as the last argument - args = append(args, tempDir) - - return uc.executePgRestore(ctx, originalDB, pgBin, args, pgpassFile, pg) -} - // restoreFromStorage restores backup data from storage using pg_restore func (uc *RestorePostgresqlBackupUsecase) restoreFromStorage( database *databases.Database, @@ -345,13 +231,6 @@ func (uc *RestorePostgresqlBackupUsecase) downloadBackupToTempFile( backup *backups.Backup, storage *storages.Storage, ) (string, func(), error) { - err := files_utils.EnsureDirectories([]string{ - config.GetEnv().TempFolder, - }) - if err != nil { - return "", nil, fmt.Errorf("failed to ensure directories: %w", err) - } - // Create temporary directory for backup data tempDir, err := os.MkdirTemp(config.GetEnv().TempFolder, "restore_"+uuid.New().String()) if err != nil { @@ -458,175 +337,6 @@ func (uc *RestorePostgresqlBackupUsecase) downloadBackupToTempFile( return tempBackupFile, cleanupFunc, nil } -// downloadAndExtractTar downloads a TAR backup from storage and extracts it to a temporary directory -func (uc *RestorePostgresqlBackupUsecase) downloadAndExtractTar( - ctx context.Context, - backup *backups.Backup, - storage *storages.Storage, -) (string, func(), error) { - err := files_utils.EnsureDirectories([]string{config.GetEnv().TempFolder}) - if err != nil { - return "", nil, fmt.Errorf("failed to ensure directories: %w", err) - } - - // Create temporary directory for extracted data - tempDir, err := os.MkdirTemp(config.GetEnv().TempFolder, "restore_dir_"+uuid.New().String()) - if err != nil { - return "", nil, fmt.Errorf("failed to create temporary directory: %w", err) - } - - cleanupFunc := func() { - _ = os.RemoveAll(tempDir) - } - - uc.logger.Info( - "Downloading TAR backup from storage", - "backupId", backup.ID, - "tempDir", tempDir, - "encrypted", backup.Encryption == backups_config.BackupEncryptionEncrypted, - ) - - fieldEncryptor := util_encryption.GetFieldEncryptor() - rawReader, err := storage.GetFile(fieldEncryptor, backup.ID) - if err != nil { - cleanupFunc() - return "", nil, fmt.Errorf("failed to get backup file from storage: %w", err) - } - defer func() { - if err := rawReader.Close(); err != nil { - uc.logger.Error("Failed to close backup reader", "error", err) - } - }() - - // Create a reader that handles decryption if needed - var backupReader io.Reader = rawReader - if backup.Encryption == backups_config.BackupEncryptionEncrypted { - if backup.EncryptionSalt == nil || backup.EncryptionIV == nil { - cleanupFunc() - return "", nil, fmt.Errorf("backup is encrypted but missing encryption metadata") - } - - masterKey, err := uc.secretKeyService.GetSecretKey() - if err != nil { - cleanupFunc() - return "", nil, fmt.Errorf("failed to get master key for decryption: %w", err) - } - - salt, err := base64.StdEncoding.DecodeString(*backup.EncryptionSalt) - if err != nil { - cleanupFunc() - return "", nil, fmt.Errorf("failed to decode encryption salt: %w", err) - } - - iv, err := base64.StdEncoding.DecodeString(*backup.EncryptionIV) - if err != nil { - cleanupFunc() - return "", nil, fmt.Errorf("failed to decode encryption IV: %w", err) - } - - decryptReader, err := encryption.NewDecryptionReader( - rawReader, - masterKey, - backup.ID, - salt, - iv, - ) - if err != nil { - cleanupFunc() - return "", nil, fmt.Errorf("failed to create decryption reader: %w", err) - } - - backupReader = decryptReader - uc.logger.Info("Using decryption for encrypted backup", "backupId", backup.ID) - } - - // Extract TAR archive to temp directory - if err := uc.extractTar(ctx, backupReader, tempDir); err != nil { - cleanupFunc() - return "", nil, fmt.Errorf("failed to extract TAR archive: %w", err) - } - - uc.logger.Info("TAR backup extracted to temporary directory", "tempDir", tempDir) - return tempDir, cleanupFunc, nil -} - -// extractTar extracts a TAR archive to the specified directory -func (uc *RestorePostgresqlBackupUsecase) extractTar( - ctx context.Context, - reader io.Reader, - destDir string, -) error { - tarReader := tar.NewReader(reader) - - for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - if config.IsShouldShutdown() { - return fmt.Errorf("extraction cancelled due to shutdown") - } - - header, err := tarReader.Next() - if err == io.EOF { - break - } - if err != nil { - return fmt.Errorf("failed to read TAR header: %w", err) - } - - targetPath := filepath.Join(destDir, header.Name) - - // Ensure the target path is within destDir (prevent path traversal) - if !strings.HasPrefix(filepath.Clean(targetPath), filepath.Clean(destDir)) { - return fmt.Errorf("invalid file path in TAR: %s", header.Name) - } - - switch header.Typeflag { - case tar.TypeDir: - if err := os.MkdirAll(targetPath, os.FileMode(header.Mode)); err != nil { - return fmt.Errorf("failed to create directory %s: %w", targetPath, err) - } - case tar.TypeReg: - // Ensure parent directory exists - if err := os.MkdirAll(filepath.Dir(targetPath), 0755); err != nil { - return fmt.Errorf("failed to create parent directory for %s: %w", targetPath, err) - } - - outFile, err := os.OpenFile( - targetPath, - os.O_CREATE|os.O_WRONLY|os.O_TRUNC, - os.FileMode(header.Mode), - ) - if err != nil { - return fmt.Errorf("failed to create file %s: %w", targetPath, err) - } - - _, copyErr := uc.copyWithShutdownCheck(ctx, outFile, tarReader) - closeErr := outFile.Close() - - if copyErr != nil { - return fmt.Errorf("failed to write file %s: %w", targetPath, copyErr) - } - if closeErr != nil { - return fmt.Errorf("failed to close file %s: %w", targetPath, closeErr) - } - default: - uc.logger.Warn( - "Skipping unsupported TAR entry type", - "type", - header.Typeflag, - "name", - header.Name, - ) - } - } - - return nil -} - // executePgRestore executes the pg_restore command with proper environment setup func (uc *RestorePostgresqlBackupUsecase) executePgRestore( ctx context.Context, diff --git a/backend/migrations/20260102110334_remove_backup_type_field.sql b/backend/migrations/20260102110334_remove_backup_type_field.sql new file mode 100644 index 0000000..e6b72c4 --- /dev/null +++ b/backend/migrations/20260102110334_remove_backup_type_field.sql @@ -0,0 +1,9 @@ +-- +goose Up +-- +goose StatementBegin +ALTER TABLE backups DROP COLUMN type; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +ALTER TABLE backups ADD COLUMN type TEXT NOT NULL DEFAULT 'DEFAULT'; +-- +goose StatementEnd