mirror of
https://github.com/databasus/databasus.git
synced 2026-04-06 00:32:03 +02:00
FIX (backups): Revert directory update
This commit is contained in:
@@ -1,7 +1,6 @@
|
||||
package backups
|
||||
|
||||
import (
|
||||
"databasus-backend/internal/features/backups/backups/common"
|
||||
"databasus-backend/internal/features/databases"
|
||||
users_middleware "databasus-backend/internal/features/users/middleware"
|
||||
"fmt"
|
||||
@@ -223,24 +222,20 @@ func (c *BackupController) generateBackupFilename(
|
||||
// Sanitize database name for filename (replace spaces and special chars)
|
||||
safeName := sanitizeFilename(database.Name)
|
||||
|
||||
// Determine extension based on database type and backup type
|
||||
extension := c.getBackupExtension(database.Type, backup.Type)
|
||||
// Determine extension based on database type
|
||||
extension := c.getBackupExtension(database.Type)
|
||||
|
||||
return fmt.Sprintf("%s_backup_%s%s", safeName, timestamp, extension)
|
||||
}
|
||||
|
||||
func (c *BackupController) getBackupExtension(
|
||||
dbType databases.DatabaseType,
|
||||
backupType common.BackupType,
|
||||
) string {
|
||||
switch dbType {
|
||||
case databases.DatabaseTypeMysql, databases.DatabaseTypeMariadb:
|
||||
return ".sql.zst"
|
||||
case databases.DatabaseTypePostgres:
|
||||
// For PostgreSQL, use .tar for directory type, .dump for custom type
|
||||
if backupType == common.BackupTypeDirectory {
|
||||
return ".tar"
|
||||
}
|
||||
// PostgreSQL custom format
|
||||
return ".dump"
|
||||
case databases.DatabaseTypeMongodb:
|
||||
return ".archive"
|
||||
|
||||
@@ -16,7 +16,6 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
audit_logs "databasus-backend/internal/features/audit_logs"
|
||||
common "databasus-backend/internal/features/backups/backups/common"
|
||||
backups_config "databasus-backend/internal/features/backups/config"
|
||||
"databasus-backend/internal/features/databases"
|
||||
"databasus-backend/internal/features/databases/databases/postgresql"
|
||||
@@ -499,35 +498,24 @@ func Test_DownloadBackup_ProperFilenameForPostgreSQL(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
databaseName string
|
||||
backupType string
|
||||
expectedExt string
|
||||
expectedInName string
|
||||
}{
|
||||
{
|
||||
name: "PostgreSQL with directory type",
|
||||
name: "PostgreSQL database",
|
||||
databaseName: "my_postgres_db",
|
||||
backupType: "DIRECTORY",
|
||||
expectedExt: ".tar",
|
||||
expectedInName: "my_postgres_db_backup_",
|
||||
},
|
||||
{
|
||||
name: "PostgreSQL with default type",
|
||||
databaseName: "my_postgres_db",
|
||||
backupType: "DEFAULT",
|
||||
expectedExt: ".dump",
|
||||
expectedInName: "my_postgres_db_backup_",
|
||||
},
|
||||
{
|
||||
name: "Database name with spaces",
|
||||
databaseName: "my test db",
|
||||
backupType: "DIRECTORY",
|
||||
expectedExt: ".tar",
|
||||
expectedExt: ".dump",
|
||||
expectedInName: "my_test_db_backup_",
|
||||
},
|
||||
{
|
||||
name: "Database name with special characters",
|
||||
databaseName: "my:db/test",
|
||||
backupType: "DEFAULT",
|
||||
expectedExt: ".dump",
|
||||
expectedInName: "my-db-test_backup_",
|
||||
},
|
||||
@@ -552,7 +540,7 @@ func Test_DownloadBackup_ProperFilenameForPostgreSQL(t *testing.T) {
|
||||
_, err = configService.SaveBackupConfig(config)
|
||||
assert.NoError(t, err)
|
||||
|
||||
backup := createTestBackupWithType(database, owner, tt.backupType)
|
||||
backup := createTestBackup(database, owner)
|
||||
|
||||
resp := test_utils.MakeGetRequest(
|
||||
t,
|
||||
@@ -827,20 +815,3 @@ func createTestBackup(
|
||||
|
||||
return backup
|
||||
}
|
||||
|
||||
func createTestBackupWithType(
|
||||
database *databases.Database,
|
||||
owner *users_dto.SignInResponseDTO,
|
||||
backupType string,
|
||||
) *Backup {
|
||||
backup := createTestBackup(database, owner)
|
||||
|
||||
// Update the format field
|
||||
repo := &BackupRepository{}
|
||||
backup.Type = common.BackupType(backupType)
|
||||
if err := repo.Save(backup); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return backup
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package backups
|
||||
|
||||
import (
|
||||
common "databasus-backend/internal/features/backups/backups/common"
|
||||
backups_config "databasus-backend/internal/features/backups/config"
|
||||
"time"
|
||||
|
||||
@@ -25,6 +24,5 @@ type Backup struct {
|
||||
EncryptionIV *string `json:"-" gorm:"column:encryption_iv"`
|
||||
Encryption backups_config.BackupEncryption `json:"encryption" gorm:"column:encryption;type:text;not null;default:'NONE'"`
|
||||
|
||||
Type common.BackupType `json:"type" gorm:"column:type;type:text;not null;default:'DEFAULT'"`
|
||||
CreatedAt time.Time `json:"createdAt" gorm:"column:created_at"`
|
||||
CreatedAt time.Time `json:"createdAt" gorm:"column:created_at"`
|
||||
}
|
||||
|
||||
@@ -343,7 +343,6 @@ func (s *BackupService) MakeBackup(databaseID uuid.UUID, isLastTry bool) {
|
||||
backup.EncryptionSalt = backupMetadata.EncryptionSalt
|
||||
backup.EncryptionIV = backupMetadata.EncryptionIV
|
||||
backup.Encryption = backupMetadata.Encryption
|
||||
backup.Type = backupMetadata.Type
|
||||
}
|
||||
|
||||
if err := s.backupRepository.Save(backup); err != nil {
|
||||
|
||||
@@ -30,7 +30,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
backupTimeout = 6 * time.Hour
|
||||
backupTimeout = 23 * time.Hour
|
||||
shutdownCheckInterval = 1 * time.Second
|
||||
copyBufferSize = 8 * 1024 * 1024
|
||||
progressReportIntervalMB = 1.0
|
||||
|
||||
@@ -27,7 +27,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
backupTimeout = 6 * time.Hour
|
||||
backupTimeout = 23 * time.Hour
|
||||
shutdownCheckInterval = 1 * time.Second
|
||||
copyBufferSize = 8 * 1024 * 1024
|
||||
progressReportIntervalMB = 1.0
|
||||
|
||||
@@ -30,7 +30,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
backupTimeout = 6 * time.Hour
|
||||
backupTimeout = 23 * time.Hour
|
||||
shutdownCheckInterval = 1 * time.Second
|
||||
copyBufferSize = 8 * 1024 * 1024
|
||||
progressReportIntervalMB = 1.0
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package usecases_postgresql
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
@@ -24,15 +23,15 @@ import (
|
||||
encryption_secrets "databasus-backend/internal/features/encryption/secrets"
|
||||
"databasus-backend/internal/features/storages"
|
||||
"databasus-backend/internal/util/encryption"
|
||||
files_utils "databasus-backend/internal/util/files"
|
||||
"databasus-backend/internal/util/tools"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
const (
|
||||
backupTimeout = 6 * time.Hour
|
||||
backupTimeout = 23 * time.Hour
|
||||
shutdownCheckInterval = 1 * time.Second
|
||||
copyBufferSize = 8 * 1024 * 1024
|
||||
progressReportIntervalMB = 1.0
|
||||
pgConnectTimeout = 30
|
||||
compressionLevel = 5
|
||||
@@ -47,6 +46,11 @@ type CreatePostgresqlBackupUsecase struct {
|
||||
fieldEncryptor encryption.FieldEncryptor
|
||||
}
|
||||
|
||||
type writeResult struct {
|
||||
bytesWritten int
|
||||
writeErr error
|
||||
}
|
||||
|
||||
func (uc *CreatePostgresqlBackupUsecase) Execute(
|
||||
ctx context.Context,
|
||||
backupID uuid.UUID,
|
||||
@@ -58,7 +62,7 @@ func (uc *CreatePostgresqlBackupUsecase) Execute(
|
||||
),
|
||||
) (*common.BackupMetadata, error) {
|
||||
uc.logger.Info(
|
||||
"Creating PostgreSQL backup via pg_dump directory type",
|
||||
"Creating PostgreSQL backup via pg_dump custom format",
|
||||
"databaseId",
|
||||
db.ID,
|
||||
"storageId",
|
||||
@@ -79,12 +83,14 @@ func (uc *CreatePostgresqlBackupUsecase) Execute(
|
||||
return nil, fmt.Errorf("database name is required for pg_dump backups")
|
||||
}
|
||||
|
||||
args := uc.buildPgDumpArgs(pg)
|
||||
|
||||
decryptedPassword, err := uc.fieldEncryptor.Decrypt(db.ID, pg.Password)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to decrypt database password: %w", err)
|
||||
}
|
||||
|
||||
return uc.executeDirectoryBackup(
|
||||
return uc.streamToStorage(
|
||||
ctx,
|
||||
backupID,
|
||||
backupConfig,
|
||||
@@ -94,127 +100,66 @@ func (uc *CreatePostgresqlBackupUsecase) Execute(
|
||||
config.GetEnv().EnvMode,
|
||||
config.GetEnv().PostgresesInstallDir,
|
||||
),
|
||||
pg,
|
||||
args,
|
||||
decryptedPassword,
|
||||
storage,
|
||||
db,
|
||||
backupProgressListener,
|
||||
)
|
||||
}
|
||||
|
||||
// executeDirectoryBackup runs pg_dump with directory type and streams as TAR to storage
|
||||
func (uc *CreatePostgresqlBackupUsecase) executeDirectoryBackup(
|
||||
// streamToStorage streams pg_dump output directly to storage
|
||||
func (uc *CreatePostgresqlBackupUsecase) streamToStorage(
|
||||
parentCtx context.Context,
|
||||
backupID uuid.UUID,
|
||||
backupConfig *backups_config.BackupConfig,
|
||||
pgBin string,
|
||||
pg *pgtypes.PostgresqlDatabase,
|
||||
args []string,
|
||||
password string,
|
||||
storage *storages.Storage,
|
||||
db *databases.Database,
|
||||
backupProgressListener func(completedMBs float64),
|
||||
) (*common.BackupMetadata, error) {
|
||||
uc.logger.Info("Streaming PostgreSQL backup to storage", "pgBin", pgBin, "args", args)
|
||||
|
||||
ctx, cancel := uc.createBackupContext(parentCtx)
|
||||
defer cancel()
|
||||
|
||||
// Create temporary directory for pg_dump output
|
||||
err := files_utils.EnsureDirectories([]string{config.GetEnv().TempFolder})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to ensure temp directories: %w", err)
|
||||
}
|
||||
|
||||
tempDir, err := os.MkdirTemp(config.GetEnv().TempFolder, "pgdump_"+backupID.String())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create temp directory: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
_ = os.RemoveAll(tempDir)
|
||||
}()
|
||||
|
||||
outputDir := filepath.Join(tempDir, "dump")
|
||||
|
||||
args := uc.buildPgDumpArgs(pg, outputDir)
|
||||
uc.logger.Info(
|
||||
"Executing PostgreSQL backup with directory type",
|
||||
"pgBin",
|
||||
pgBin,
|
||||
"args",
|
||||
args,
|
||||
"outputDir",
|
||||
outputDir,
|
||||
)
|
||||
|
||||
pgpassFile, err := uc.setupPgpassFile(pg, password)
|
||||
pgpassFile, err := uc.setupPgpassFile(db.Postgresql, password)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
if pgpassFile != "" {
|
||||
// Remove the entire temp directory (which contains the .pgpass file)
|
||||
_ = os.RemoveAll(filepath.Dir(pgpassFile))
|
||||
}
|
||||
}()
|
||||
|
||||
// Execute pg_dump to directory
|
||||
cmd := exec.CommandContext(ctx, pgBin, args...)
|
||||
if err := uc.setupPgEnvironment(cmd, pgpassFile, pg.IsHttps, password, pg.CpuCount, pgBin); err != nil {
|
||||
uc.logger.Info("Executing PostgreSQL backup command", "command", cmd.String())
|
||||
|
||||
if err := uc.setupPgEnvironment(cmd, pgpassFile, db.Postgresql.IsHttps, password, db.Postgresql.CpuCount, pgBin); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pgStdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("stdout pipe: %w", err)
|
||||
}
|
||||
|
||||
pgStderr, err := cmd.StderrPipe()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("stderr pipe: %w", err)
|
||||
}
|
||||
|
||||
// Capture stderr in a separate goroutine to ensure we don't miss any error output
|
||||
stderrCh := make(chan []byte, 1)
|
||||
go func() {
|
||||
stderrOutput, _ := io.ReadAll(pgStderr)
|
||||
stderrCh <- stderrOutput
|
||||
}()
|
||||
|
||||
if err = cmd.Start(); err != nil {
|
||||
return nil, fmt.Errorf("start %s: %w", filepath.Base(pgBin), err)
|
||||
}
|
||||
|
||||
waitErr := cmd.Wait()
|
||||
stderrOutput := <-stderrCh
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, uc.checkCancellationReason()
|
||||
default:
|
||||
}
|
||||
|
||||
if waitErr != nil {
|
||||
if err := uc.checkCancellation(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, uc.buildPgDumpErrorMessage(waitErr, stderrOutput, pgBin, args, password)
|
||||
}
|
||||
|
||||
uc.logger.Info(
|
||||
"pg_dump completed successfully, streaming TAR to storage",
|
||||
"outputDir",
|
||||
outputDir,
|
||||
)
|
||||
|
||||
// Stream directory as TAR to storage
|
||||
return uc.streamDirectoryToStorage(
|
||||
ctx,
|
||||
backupID,
|
||||
backupConfig,
|
||||
outputDir,
|
||||
storage,
|
||||
backupProgressListener,
|
||||
)
|
||||
}
|
||||
|
||||
// streamDirectoryToStorage creates a TAR archive from the directory and streams it to storage
|
||||
func (uc *CreatePostgresqlBackupUsecase) streamDirectoryToStorage(
|
||||
ctx context.Context,
|
||||
backupID uuid.UUID,
|
||||
backupConfig *backups_config.BackupConfig,
|
||||
sourceDir string,
|
||||
storage *storages.Storage,
|
||||
backupProgressListener func(completedMBs float64),
|
||||
) (*common.BackupMetadata, error) {
|
||||
storageReader, storageWriter := io.Pipe()
|
||||
|
||||
finalWriter, encryptionWriter, backupMetadata, err := uc.setupBackupEncryption(
|
||||
@@ -226,176 +171,162 @@ func (uc *CreatePostgresqlBackupUsecase) streamDirectoryToStorage(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Set type to DIRECTORY for new PostgreSQL backups
|
||||
backupMetadata.Type = common.BackupTypeDirectory
|
||||
countingWriter := common.NewCountingWriter(finalWriter)
|
||||
|
||||
// Start streaming into storage
|
||||
// The backup ID becomes the object key / filename in storage
|
||||
|
||||
// Start streaming into storage in its own goroutine
|
||||
saveErrCh := make(chan error, 1)
|
||||
go func() {
|
||||
saveErr := storage.SaveFile(ctx, uc.fieldEncryptor, uc.logger, backupID, storageReader)
|
||||
saveErrCh <- saveErr
|
||||
}()
|
||||
|
||||
// Create TAR and stream to storage
|
||||
tarErrCh := make(chan error, 1)
|
||||
totalSizeCh := make(chan int64, 1)
|
||||
// Start pg_dump
|
||||
if err = cmd.Start(); err != nil {
|
||||
return nil, fmt.Errorf("start %s: %w", filepath.Base(pgBin), err)
|
||||
}
|
||||
|
||||
// Copy pg output directly to storage with shutdown checks
|
||||
copyResultCh := make(chan error, 1)
|
||||
bytesWrittenCh := make(chan int64, 1)
|
||||
go func() {
|
||||
totalSize, tarErr := uc.writeTarToWriter(
|
||||
bytesWritten, err := uc.copyWithShutdownCheck(
|
||||
ctx,
|
||||
sourceDir,
|
||||
finalWriter,
|
||||
countingWriter,
|
||||
pgStdout,
|
||||
backupProgressListener,
|
||||
)
|
||||
totalSizeCh <- totalSize
|
||||
tarErrCh <- tarErr
|
||||
|
||||
// Close encryption writer first if present
|
||||
if encryptionWriter != nil {
|
||||
if closeErr := encryptionWriter.Close(); closeErr != nil {
|
||||
uc.logger.Error("Failed to close encryption writer", "error", closeErr)
|
||||
}
|
||||
}
|
||||
// Then close the pipe writer to signal EOF to storage
|
||||
if closeErr := storageWriter.Close(); closeErr != nil {
|
||||
uc.logger.Error("Failed to close pipe writer", "error", closeErr)
|
||||
}
|
||||
bytesWrittenCh <- bytesWritten
|
||||
copyResultCh <- err
|
||||
}()
|
||||
|
||||
tarErr := <-tarErrCh
|
||||
totalSize := <-totalSizeCh
|
||||
saveErr := <-saveErrCh
|
||||
copyErr := <-copyResultCh
|
||||
bytesWritten := <-bytesWrittenCh
|
||||
waitErr := cmd.Wait()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
uc.cleanupOnCancellation(encryptionWriter, storageWriter, saveErrCh)
|
||||
return nil, uc.checkCancellationReason()
|
||||
default:
|
||||
}
|
||||
|
||||
// Send final size after backup is completed
|
||||
if tarErr == nil && saveErr == nil && backupProgressListener != nil {
|
||||
sizeMB := float64(totalSize) / (1024 * 1024)
|
||||
if err := uc.closeWriters(encryptionWriter, storageWriter); err != nil {
|
||||
<-saveErrCh
|
||||
return nil, err
|
||||
}
|
||||
|
||||
saveErr := <-saveErrCh
|
||||
stderrOutput := <-stderrCh
|
||||
|
||||
// Send final sizing after backup is completed
|
||||
if waitErr == nil && copyErr == nil && saveErr == nil && backupProgressListener != nil {
|
||||
sizeMB := float64(bytesWritten) / (1024 * 1024)
|
||||
backupProgressListener(sizeMB)
|
||||
}
|
||||
|
||||
if tarErr != nil {
|
||||
switch {
|
||||
case waitErr != nil:
|
||||
if err := uc.checkCancellation(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, fmt.Errorf("failed to create TAR archive: %w", tarErr)
|
||||
}
|
||||
|
||||
if saveErr != nil {
|
||||
return nil, uc.buildPgDumpErrorMessage(waitErr, stderrOutput, pgBin, args, password)
|
||||
case copyErr != nil:
|
||||
if err := uc.checkCancellation(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, fmt.Errorf("failed to save to storage: %w", saveErr)
|
||||
return nil, fmt.Errorf("copy to storage: %w", copyErr)
|
||||
case saveErr != nil:
|
||||
if err := uc.checkCancellation(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, fmt.Errorf("save to storage: %w", saveErr)
|
||||
}
|
||||
|
||||
uc.logger.Info(
|
||||
"Backup completed successfully",
|
||||
"backupId",
|
||||
backupID,
|
||||
"totalSizeBytes",
|
||||
totalSize,
|
||||
)
|
||||
return &backupMetadata, nil
|
||||
}
|
||||
|
||||
// writeTarToWriter creates a TAR archive from sourceDir and writes it to the writer
|
||||
func (uc *CreatePostgresqlBackupUsecase) writeTarToWriter(
|
||||
func (uc *CreatePostgresqlBackupUsecase) copyWithShutdownCheck(
|
||||
ctx context.Context,
|
||||
sourceDir string,
|
||||
writer io.Writer,
|
||||
dst io.Writer,
|
||||
src io.Reader,
|
||||
backupProgressListener func(completedMBs float64),
|
||||
) (int64, error) {
|
||||
tarWriter := tar.NewWriter(writer)
|
||||
defer func() {
|
||||
_ = tarWriter.Close()
|
||||
}()
|
||||
|
||||
var totalSize int64
|
||||
buf := make([]byte, copyBufferSize)
|
||||
var totalBytesWritten int64
|
||||
var lastReportedMB float64
|
||||
|
||||
err := filepath.Walk(sourceDir, func(path string, info os.FileInfo, walkErr error) error {
|
||||
if walkErr != nil {
|
||||
return walkErr
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
return totalBytesWritten, fmt.Errorf("copy cancelled: %w", ctx.Err())
|
||||
default:
|
||||
}
|
||||
|
||||
if config.IsShouldShutdown() {
|
||||
return fmt.Errorf("backup cancelled due to shutdown")
|
||||
return totalBytesWritten, fmt.Errorf("copy cancelled due to shutdown")
|
||||
}
|
||||
|
||||
// Get relative path for TAR header
|
||||
relPath, err := filepath.Rel(sourceDir, path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get relative path: %w", err)
|
||||
}
|
||||
bytesRead, readErr := src.Read(buf)
|
||||
if bytesRead > 0 {
|
||||
writeResultCh := make(chan writeResult, 1)
|
||||
go func() {
|
||||
bytesWritten, writeErr := dst.Write(buf[0:bytesRead])
|
||||
writeResultCh <- writeResult{bytesWritten, writeErr}
|
||||
}()
|
||||
|
||||
// Skip the root directory itself
|
||||
if relPath == "." {
|
||||
return nil
|
||||
}
|
||||
var bytesWritten int
|
||||
var writeErr error
|
||||
|
||||
// Create TAR header
|
||||
header, err := tar.FileInfoHeader(info, "")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create TAR header: %w", err)
|
||||
}
|
||||
header.Name = relPath
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return totalBytesWritten, fmt.Errorf("copy cancelled during write: %w", ctx.Err())
|
||||
case result := <-writeResultCh:
|
||||
bytesWritten = result.bytesWritten
|
||||
writeErr = result.writeErr
|
||||
}
|
||||
|
||||
if err := tarWriter.WriteHeader(header); err != nil {
|
||||
return fmt.Errorf("failed to write TAR header: %w", err)
|
||||
}
|
||||
if bytesWritten < 0 || bytesRead < bytesWritten {
|
||||
bytesWritten = 0
|
||||
if writeErr == nil {
|
||||
writeErr = fmt.Errorf("invalid write result")
|
||||
}
|
||||
}
|
||||
|
||||
// If it's a directory, we're done
|
||||
if info.IsDir() {
|
||||
return nil
|
||||
}
|
||||
if writeErr != nil {
|
||||
return totalBytesWritten, writeErr
|
||||
}
|
||||
|
||||
// Copy file content to TAR
|
||||
file, err := os.Open(path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open file %s: %w", path, err)
|
||||
}
|
||||
defer func() {
|
||||
_ = file.Close()
|
||||
}()
|
||||
if bytesRead != bytesWritten {
|
||||
return totalBytesWritten, io.ErrShortWrite
|
||||
}
|
||||
|
||||
written, err := io.Copy(tarWriter, file)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to write file %s to TAR: %w", path, err)
|
||||
}
|
||||
totalBytesWritten += int64(bytesWritten)
|
||||
|
||||
totalSize += written
|
||||
|
||||
// Report progress
|
||||
if backupProgressListener != nil {
|
||||
currentSizeMB := float64(totalSize) / (1024 * 1024)
|
||||
if currentSizeMB >= lastReportedMB+progressReportIntervalMB {
|
||||
backupProgressListener(currentSizeMB)
|
||||
lastReportedMB = currentSizeMB
|
||||
if backupProgressListener != nil {
|
||||
currentSizeMB := float64(totalBytesWritten) / (1024 * 1024)
|
||||
if currentSizeMB >= lastReportedMB+progressReportIntervalMB {
|
||||
backupProgressListener(currentSizeMB)
|
||||
lastReportedMB = currentSizeMB
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if readErr != nil {
|
||||
if readErr != io.EOF {
|
||||
return totalBytesWritten, readErr
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return totalSize, err
|
||||
return totalBytesWritten, nil
|
||||
}
|
||||
|
||||
func (uc *CreatePostgresqlBackupUsecase) buildPgDumpArgs(
|
||||
pg *pgtypes.PostgresqlDatabase,
|
||||
outputDir string,
|
||||
) []string {
|
||||
func (uc *CreatePostgresqlBackupUsecase) buildPgDumpArgs(pg *pgtypes.PostgresqlDatabase) []string {
|
||||
args := []string{
|
||||
"-Fd", // Directory type (enables parallel dump)
|
||||
"-f", outputDir, // Output directory
|
||||
"-Fc",
|
||||
"--no-password",
|
||||
"-h", pg.Host,
|
||||
"-p", strconv.Itoa(pg.Port),
|
||||
@@ -404,7 +335,7 @@ func (uc *CreatePostgresqlBackupUsecase) buildPgDumpArgs(
|
||||
"--verbose",
|
||||
}
|
||||
|
||||
// Parallel jobs now actually work with directory type
|
||||
// Add parallel jobs based on CPU count
|
||||
if pg.CpuCount > 1 {
|
||||
args = append(args, "-j", strconv.Itoa(pg.CpuCount))
|
||||
}
|
||||
@@ -590,6 +521,63 @@ func (uc *CreatePostgresqlBackupUsecase) setupBackupEncryption(
|
||||
return encWriter, encWriter, metadata, nil
|
||||
}
|
||||
|
||||
func (uc *CreatePostgresqlBackupUsecase) cleanupOnCancellation(
|
||||
encryptionWriter *backup_encryption.EncryptionWriter,
|
||||
storageWriter io.WriteCloser,
|
||||
saveErrCh chan error,
|
||||
) {
|
||||
if encryptionWriter != nil {
|
||||
go func() {
|
||||
if closeErr := encryptionWriter.Close(); closeErr != nil {
|
||||
uc.logger.Error(
|
||||
"Failed to close encrypting writer during cancellation",
|
||||
"error",
|
||||
closeErr,
|
||||
)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
if err := storageWriter.Close(); err != nil {
|
||||
uc.logger.Error("Failed to close pipe writer during cancellation", "error", err)
|
||||
}
|
||||
|
||||
<-saveErrCh
|
||||
}
|
||||
|
||||
func (uc *CreatePostgresqlBackupUsecase) closeWriters(
|
||||
encryptionWriter *backup_encryption.EncryptionWriter,
|
||||
storageWriter io.WriteCloser,
|
||||
) error {
|
||||
encryptionCloseErrCh := make(chan error, 1)
|
||||
if encryptionWriter != nil {
|
||||
go func() {
|
||||
closeErr := encryptionWriter.Close()
|
||||
if closeErr != nil {
|
||||
uc.logger.Error("Failed to close encrypting writer", "error", closeErr)
|
||||
}
|
||||
encryptionCloseErrCh <- closeErr
|
||||
}()
|
||||
} else {
|
||||
encryptionCloseErrCh <- nil
|
||||
}
|
||||
|
||||
encryptionCloseErr := <-encryptionCloseErrCh
|
||||
if encryptionCloseErr != nil {
|
||||
if err := storageWriter.Close(); err != nil {
|
||||
uc.logger.Error("Failed to close pipe writer after encryption error", "error", err)
|
||||
}
|
||||
return fmt.Errorf("failed to close encryption writer: %w", encryptionCloseErr)
|
||||
}
|
||||
|
||||
if err := storageWriter.Close(); err != nil {
|
||||
uc.logger.Error("Failed to close pipe writer", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (uc *CreatePostgresqlBackupUsecase) checkCancellation(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -771,7 +759,7 @@ func (uc *CreatePostgresqlBackupUsecase) createTempPgpassFile(
|
||||
escapedPassword,
|
||||
)
|
||||
|
||||
tempDir, err := os.MkdirTemp(config.GetEnv().TempFolder, "pgpass_"+uuid.New().String())
|
||||
tempDir, err := os.MkdirTemp("", "pgpass")
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to create temporary directory: %w", err)
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package usecases_postgresql
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
@@ -17,7 +16,6 @@ import (
|
||||
|
||||
"databasus-backend/internal/config"
|
||||
"databasus-backend/internal/features/backups/backups"
|
||||
common "databasus-backend/internal/features/backups/backups/common"
|
||||
"databasus-backend/internal/features/backups/backups/encryption"
|
||||
backups_config "databasus-backend/internal/features/backups/config"
|
||||
"databasus-backend/internal/features/databases"
|
||||
@@ -26,7 +24,6 @@ import (
|
||||
"databasus-backend/internal/features/restores/models"
|
||||
"databasus-backend/internal/features/storages"
|
||||
util_encryption "databasus-backend/internal/util/encryption"
|
||||
files_utils "databasus-backend/internal/util/files"
|
||||
"databasus-backend/internal/util/tools"
|
||||
|
||||
"github.com/google/uuid"
|
||||
@@ -56,8 +53,6 @@ func (uc *RestorePostgresqlBackupUsecase) Execute(
|
||||
restore.ID,
|
||||
"backupId",
|
||||
backup.ID,
|
||||
"format",
|
||||
backup.Type,
|
||||
)
|
||||
|
||||
pg := restoringToDB.Postgresql
|
||||
@@ -76,30 +71,15 @@ func (uc *RestorePostgresqlBackupUsecase) Execute(
|
||||
config.GetEnv().PostgresesInstallDir,
|
||||
)
|
||||
|
||||
// Route based on backup type
|
||||
switch backup.Type {
|
||||
case common.BackupTypeDirectory:
|
||||
return uc.restoreDirectoryType(
|
||||
originalDB,
|
||||
restoringToDB,
|
||||
pgBin,
|
||||
backup,
|
||||
storage,
|
||||
pg,
|
||||
isExcludeExtensions,
|
||||
)
|
||||
case common.BackupTypeDefault, "": // empty = legacy DEFAULT
|
||||
return uc.restoreCustomType(
|
||||
originalDB,
|
||||
pgBin,
|
||||
backup,
|
||||
storage,
|
||||
pg,
|
||||
isExcludeExtensions,
|
||||
)
|
||||
default:
|
||||
return fmt.Errorf("unsupported backup type: %s", backup.Type)
|
||||
}
|
||||
// All PostgreSQL backups are now custom format (-Fc)
|
||||
return uc.restoreCustomType(
|
||||
originalDB,
|
||||
pgBin,
|
||||
backup,
|
||||
storage,
|
||||
pg,
|
||||
isExcludeExtensions,
|
||||
)
|
||||
}
|
||||
|
||||
// restoreCustomType restores a backup in custom type (-Fc) - legacy type
|
||||
@@ -144,100 +124,6 @@ func (uc *RestorePostgresqlBackupUsecase) restoreCustomType(
|
||||
)
|
||||
}
|
||||
|
||||
// restoreDirectoryType restores a backup in directory type (-Fd) - new TAR type
|
||||
func (uc *RestorePostgresqlBackupUsecase) restoreDirectoryType(
|
||||
originalDB *databases.Database,
|
||||
_ *databases.Database, // restoringToDB not used but kept for API consistency
|
||||
pgBin string,
|
||||
backup *backups.Backup,
|
||||
storage *storages.Storage,
|
||||
pg *pgtypes.PostgresqlDatabase,
|
||||
isExcludeExtensions bool,
|
||||
) error {
|
||||
uc.logger.Info("Restoring backup in directory type (-Fd)", "backupId", backup.ID)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
// Monitor for shutdown
|
||||
go func() {
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if config.IsShouldShutdown() {
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Create temporary .pgpass file
|
||||
pgpassFile, err := uc.createTempPgpassFile(pg, pg.Password)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create temporary .pgpass file: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if pgpassFile != "" {
|
||||
_ = os.RemoveAll(filepath.Dir(pgpassFile))
|
||||
}
|
||||
}()
|
||||
|
||||
// Download and extract TAR to temporary directory
|
||||
tempDir, cleanupFunc, err := uc.downloadAndExtractTar(ctx, backup, storage)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to download and extract backup: %w", err)
|
||||
}
|
||||
defer cleanupFunc()
|
||||
|
||||
// Use parallel jobs based on CPU count
|
||||
parallelJobs := max(1, min(pg.CpuCount, 8))
|
||||
|
||||
args := []string{
|
||||
"-Fd", // directory type
|
||||
"-j", strconv.Itoa(parallelJobs), // parallel restore
|
||||
"--no-password",
|
||||
"-h", pg.Host,
|
||||
"-p", strconv.Itoa(pg.Port),
|
||||
"-U", pg.Username,
|
||||
"-d", *pg.Database,
|
||||
"--verbose",
|
||||
"--clean",
|
||||
"--if-exists",
|
||||
"--no-owner",
|
||||
"--no-acl",
|
||||
}
|
||||
|
||||
// If excluding extensions, generate filtered TOC list
|
||||
if isExcludeExtensions {
|
||||
tocListFile, err := uc.generateFilteredTocList(
|
||||
ctx,
|
||||
pgBin,
|
||||
tempDir,
|
||||
pgpassFile,
|
||||
pg,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to generate filtered TOC list: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
_ = os.Remove(tocListFile)
|
||||
}()
|
||||
|
||||
args = append(args, "-L", tocListFile)
|
||||
}
|
||||
|
||||
// Add the directory as the last argument
|
||||
args = append(args, tempDir)
|
||||
|
||||
return uc.executePgRestore(ctx, originalDB, pgBin, args, pgpassFile, pg)
|
||||
}
|
||||
|
||||
// restoreFromStorage restores backup data from storage using pg_restore
|
||||
func (uc *RestorePostgresqlBackupUsecase) restoreFromStorage(
|
||||
database *databases.Database,
|
||||
@@ -345,13 +231,6 @@ func (uc *RestorePostgresqlBackupUsecase) downloadBackupToTempFile(
|
||||
backup *backups.Backup,
|
||||
storage *storages.Storage,
|
||||
) (string, func(), error) {
|
||||
err := files_utils.EnsureDirectories([]string{
|
||||
config.GetEnv().TempFolder,
|
||||
})
|
||||
if err != nil {
|
||||
return "", nil, fmt.Errorf("failed to ensure directories: %w", err)
|
||||
}
|
||||
|
||||
// Create temporary directory for backup data
|
||||
tempDir, err := os.MkdirTemp(config.GetEnv().TempFolder, "restore_"+uuid.New().String())
|
||||
if err != nil {
|
||||
@@ -458,175 +337,6 @@ func (uc *RestorePostgresqlBackupUsecase) downloadBackupToTempFile(
|
||||
return tempBackupFile, cleanupFunc, nil
|
||||
}
|
||||
|
||||
// downloadAndExtractTar downloads a TAR backup from storage and extracts it to a temporary directory
|
||||
func (uc *RestorePostgresqlBackupUsecase) downloadAndExtractTar(
|
||||
ctx context.Context,
|
||||
backup *backups.Backup,
|
||||
storage *storages.Storage,
|
||||
) (string, func(), error) {
|
||||
err := files_utils.EnsureDirectories([]string{config.GetEnv().TempFolder})
|
||||
if err != nil {
|
||||
return "", nil, fmt.Errorf("failed to ensure directories: %w", err)
|
||||
}
|
||||
|
||||
// Create temporary directory for extracted data
|
||||
tempDir, err := os.MkdirTemp(config.GetEnv().TempFolder, "restore_dir_"+uuid.New().String())
|
||||
if err != nil {
|
||||
return "", nil, fmt.Errorf("failed to create temporary directory: %w", err)
|
||||
}
|
||||
|
||||
cleanupFunc := func() {
|
||||
_ = os.RemoveAll(tempDir)
|
||||
}
|
||||
|
||||
uc.logger.Info(
|
||||
"Downloading TAR backup from storage",
|
||||
"backupId", backup.ID,
|
||||
"tempDir", tempDir,
|
||||
"encrypted", backup.Encryption == backups_config.BackupEncryptionEncrypted,
|
||||
)
|
||||
|
||||
fieldEncryptor := util_encryption.GetFieldEncryptor()
|
||||
rawReader, err := storage.GetFile(fieldEncryptor, backup.ID)
|
||||
if err != nil {
|
||||
cleanupFunc()
|
||||
return "", nil, fmt.Errorf("failed to get backup file from storage: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if err := rawReader.Close(); err != nil {
|
||||
uc.logger.Error("Failed to close backup reader", "error", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Create a reader that handles decryption if needed
|
||||
var backupReader io.Reader = rawReader
|
||||
if backup.Encryption == backups_config.BackupEncryptionEncrypted {
|
||||
if backup.EncryptionSalt == nil || backup.EncryptionIV == nil {
|
||||
cleanupFunc()
|
||||
return "", nil, fmt.Errorf("backup is encrypted but missing encryption metadata")
|
||||
}
|
||||
|
||||
masterKey, err := uc.secretKeyService.GetSecretKey()
|
||||
if err != nil {
|
||||
cleanupFunc()
|
||||
return "", nil, fmt.Errorf("failed to get master key for decryption: %w", err)
|
||||
}
|
||||
|
||||
salt, err := base64.StdEncoding.DecodeString(*backup.EncryptionSalt)
|
||||
if err != nil {
|
||||
cleanupFunc()
|
||||
return "", nil, fmt.Errorf("failed to decode encryption salt: %w", err)
|
||||
}
|
||||
|
||||
iv, err := base64.StdEncoding.DecodeString(*backup.EncryptionIV)
|
||||
if err != nil {
|
||||
cleanupFunc()
|
||||
return "", nil, fmt.Errorf("failed to decode encryption IV: %w", err)
|
||||
}
|
||||
|
||||
decryptReader, err := encryption.NewDecryptionReader(
|
||||
rawReader,
|
||||
masterKey,
|
||||
backup.ID,
|
||||
salt,
|
||||
iv,
|
||||
)
|
||||
if err != nil {
|
||||
cleanupFunc()
|
||||
return "", nil, fmt.Errorf("failed to create decryption reader: %w", err)
|
||||
}
|
||||
|
||||
backupReader = decryptReader
|
||||
uc.logger.Info("Using decryption for encrypted backup", "backupId", backup.ID)
|
||||
}
|
||||
|
||||
// Extract TAR archive to temp directory
|
||||
if err := uc.extractTar(ctx, backupReader, tempDir); err != nil {
|
||||
cleanupFunc()
|
||||
return "", nil, fmt.Errorf("failed to extract TAR archive: %w", err)
|
||||
}
|
||||
|
||||
uc.logger.Info("TAR backup extracted to temporary directory", "tempDir", tempDir)
|
||||
return tempDir, cleanupFunc, nil
|
||||
}
|
||||
|
||||
// extractTar extracts a TAR archive to the specified directory
|
||||
func (uc *RestorePostgresqlBackupUsecase) extractTar(
|
||||
ctx context.Context,
|
||||
reader io.Reader,
|
||||
destDir string,
|
||||
) error {
|
||||
tarReader := tar.NewReader(reader)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if config.IsShouldShutdown() {
|
||||
return fmt.Errorf("extraction cancelled due to shutdown")
|
||||
}
|
||||
|
||||
header, err := tarReader.Next()
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read TAR header: %w", err)
|
||||
}
|
||||
|
||||
targetPath := filepath.Join(destDir, header.Name)
|
||||
|
||||
// Ensure the target path is within destDir (prevent path traversal)
|
||||
if !strings.HasPrefix(filepath.Clean(targetPath), filepath.Clean(destDir)) {
|
||||
return fmt.Errorf("invalid file path in TAR: %s", header.Name)
|
||||
}
|
||||
|
||||
switch header.Typeflag {
|
||||
case tar.TypeDir:
|
||||
if err := os.MkdirAll(targetPath, os.FileMode(header.Mode)); err != nil {
|
||||
return fmt.Errorf("failed to create directory %s: %w", targetPath, err)
|
||||
}
|
||||
case tar.TypeReg:
|
||||
// Ensure parent directory exists
|
||||
if err := os.MkdirAll(filepath.Dir(targetPath), 0755); err != nil {
|
||||
return fmt.Errorf("failed to create parent directory for %s: %w", targetPath, err)
|
||||
}
|
||||
|
||||
outFile, err := os.OpenFile(
|
||||
targetPath,
|
||||
os.O_CREATE|os.O_WRONLY|os.O_TRUNC,
|
||||
os.FileMode(header.Mode),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create file %s: %w", targetPath, err)
|
||||
}
|
||||
|
||||
_, copyErr := uc.copyWithShutdownCheck(ctx, outFile, tarReader)
|
||||
closeErr := outFile.Close()
|
||||
|
||||
if copyErr != nil {
|
||||
return fmt.Errorf("failed to write file %s: %w", targetPath, copyErr)
|
||||
}
|
||||
if closeErr != nil {
|
||||
return fmt.Errorf("failed to close file %s: %w", targetPath, closeErr)
|
||||
}
|
||||
default:
|
||||
uc.logger.Warn(
|
||||
"Skipping unsupported TAR entry type",
|
||||
"type",
|
||||
header.Typeflag,
|
||||
"name",
|
||||
header.Name,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// executePgRestore executes the pg_restore command with proper environment setup
|
||||
func (uc *RestorePostgresqlBackupUsecase) executePgRestore(
|
||||
ctx context.Context,
|
||||
|
||||
@@ -0,0 +1,9 @@
|
||||
-- +goose Up
|
||||
-- +goose StatementBegin
|
||||
ALTER TABLE backups DROP COLUMN type;
|
||||
-- +goose StatementEnd
|
||||
|
||||
-- +goose Down
|
||||
-- +goose StatementBegin
|
||||
ALTER TABLE backups ADD COLUMN type TEXT NOT NULL DEFAULT 'DEFAULT';
|
||||
-- +goose StatementEnd
|
||||
Reference in New Issue
Block a user