mirror of
https://github.com/databasus/databasus.git
synced 2026-04-06 00:32:03 +02:00
18
Dockerfile
18
Dockerfile
@@ -172,19 +172,23 @@ RUN if [ "$TARGETARCH" = "amd64" ]; then \
|
||||
|
||||
# ========= Install MongoDB Database Tools =========
|
||||
# Note: MongoDB Database Tools are backward compatible - single version supports all server versions (4.0-8.0)
|
||||
# Use dpkg with apt-get -f install to handle dependencies
|
||||
# Note: For ARM64, we use Ubuntu 22.04 package as MongoDB doesn't provide Debian 12 ARM64 packages
|
||||
RUN apt-get update && \
|
||||
if [ "$TARGETARCH" = "amd64" ]; then \
|
||||
wget -q https://fastdl.mongodb.org/tools/db/mongodb-database-tools-debian12-x86_64-100.10.0.deb -O /tmp/mongodb-database-tools.deb; \
|
||||
elif [ "$TARGETARCH" = "arm64" ]; then \
|
||||
wget -q https://fastdl.mongodb.org/tools/db/mongodb-database-tools-debian12-aarch64-100.10.0.deb -O /tmp/mongodb-database-tools.deb; \
|
||||
wget -q https://fastdl.mongodb.org/tools/db/mongodb-database-tools-ubuntu2204-arm64-100.10.0.deb -O /tmp/mongodb-database-tools.deb; \
|
||||
fi && \
|
||||
dpkg -i /tmp/mongodb-database-tools.deb || true && \
|
||||
apt-get install -f -y --no-install-recommends && \
|
||||
rm /tmp/mongodb-database-tools.deb && \
|
||||
dpkg -i /tmp/mongodb-database-tools.deb || apt-get install -f -y --no-install-recommends && \
|
||||
rm -f /tmp/mongodb-database-tools.deb && \
|
||||
rm -rf /var/lib/apt/lists/* && \
|
||||
ln -sf /usr/bin/mongodump /usr/local/mongodb-database-tools/bin/mongodump && \
|
||||
ln -sf /usr/bin/mongorestore /usr/local/mongodb-database-tools/bin/mongorestore
|
||||
mkdir -p /usr/local/mongodb-database-tools/bin && \
|
||||
if [ -f /usr/bin/mongodump ]; then \
|
||||
ln -sf /usr/bin/mongodump /usr/local/mongodb-database-tools/bin/mongodump; \
|
||||
fi && \
|
||||
if [ -f /usr/bin/mongorestore ]; then \
|
||||
ln -sf /usr/bin/mongorestore /usr/local/mongodb-database-tools/bin/mongorestore; \
|
||||
fi
|
||||
|
||||
# Create postgres user and set up directories
|
||||
RUN useradd -m -s /bin/bash postgres || true && \
|
||||
|
||||
17
assets/tools/README.md
Normal file
17
assets/tools/README.md
Normal file
@@ -0,0 +1,17 @@
|
||||
We keep binaries here to speed up CI \ CD tasks and building.
|
||||
|
||||
Docker image needs:
|
||||
- PostgreSQL client tools (versions 12-18)
|
||||
- MySQL client tools (versions 5.7, 8.0, 8.4, 9)
|
||||
- MariaDB client tools (versions 10.6, 12.1)
|
||||
- MongoDB Database Tools (latest)
|
||||
|
||||
For the most of tools, we need a couple of binaries for each version. However, if we download them on each run, it will download a couple of GBs each time.
|
||||
|
||||
So, for speed up we keep only required executables (like pg_dump, mysqldump, mariadb-dump, mongodump, etc.).
|
||||
|
||||
It takes:
|
||||
- ~ 100MB for ARM
|
||||
- ~ 100MB for x64
|
||||
|
||||
Instead of GBs. See Dockefile for usage details.
|
||||
3
backend/.gitignore
vendored
3
backend/.gitignore
vendored
@@ -16,4 +16,5 @@ databasus-backend.exe
|
||||
ui/build/*
|
||||
pgdata-for-restore/
|
||||
temp/
|
||||
cmd.exe
|
||||
cmd.exe
|
||||
temp/
|
||||
17
backend/internal/features/backups/backups/common/dto.go
Normal file
17
backend/internal/features/backups/backups/common/dto.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package common
|
||||
|
||||
import backups_config "databasus-backend/internal/features/backups/config"
|
||||
|
||||
type BackupType string
|
||||
|
||||
const (
|
||||
BackupTypeDefault BackupType = "DEFAULT" // For MySQL, MongoDB, PostgreSQL legacy (-Fc)
|
||||
BackupTypeDirectory BackupType = "DIRECTORY" // PostgreSQL directory type (-Fd)
|
||||
)
|
||||
|
||||
type BackupMetadata struct {
|
||||
EncryptionSalt *string
|
||||
EncryptionIV *string
|
||||
Encryption backups_config.BackupEncryption
|
||||
Type BackupType
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package backups
|
||||
|
||||
import (
|
||||
"databasus-backend/internal/features/backups/backups/common"
|
||||
"databasus-backend/internal/features/databases"
|
||||
users_middleware "databasus-backend/internal/features/users/middleware"
|
||||
"fmt"
|
||||
@@ -182,7 +183,7 @@ func (c *BackupController) GetFile(ctx *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
fileReader, dbType, err := c.backupService.GetBackupFile(user, id)
|
||||
fileReader, backup, database, err := c.backupService.GetBackupFile(user, id)
|
||||
if err != nil {
|
||||
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
@@ -193,15 +194,12 @@ func (c *BackupController) GetFile(ctx *gin.Context) {
|
||||
}
|
||||
}()
|
||||
|
||||
extension := ".dump"
|
||||
if dbType == databases.DatabaseTypeMysql || dbType == databases.DatabaseTypeMariadb {
|
||||
extension = ".sql.zst"
|
||||
}
|
||||
filename := c.generateBackupFilename(backup, database)
|
||||
|
||||
ctx.Header("Content-Type", "application/octet-stream")
|
||||
ctx.Header(
|
||||
"Content-Disposition",
|
||||
fmt.Sprintf("attachment; filename=\"backup_%s%s\"", id.String(), extension),
|
||||
fmt.Sprintf("attachment; filename=\"%s\"", filename),
|
||||
)
|
||||
|
||||
_, err = io.Copy(ctx.Writer, fileReader)
|
||||
@@ -214,3 +212,66 @@ func (c *BackupController) GetFile(ctx *gin.Context) {
|
||||
type MakeBackupRequest struct {
|
||||
DatabaseID uuid.UUID `json:"database_id" binding:"required"`
|
||||
}
|
||||
|
||||
func (c *BackupController) generateBackupFilename(
|
||||
backup *Backup,
|
||||
database *databases.Database,
|
||||
) string {
|
||||
// Format timestamp as YYYY-MM-DD_HH-mm-ss
|
||||
timestamp := backup.CreatedAt.Format("2006-01-02_15-04-05")
|
||||
|
||||
// Sanitize database name for filename (replace spaces and special chars)
|
||||
safeName := sanitizeFilename(database.Name)
|
||||
|
||||
// Determine extension based on database type and backup type
|
||||
extension := c.getBackupExtension(database.Type, backup.Type)
|
||||
|
||||
return fmt.Sprintf("%s_backup_%s%s", safeName, timestamp, extension)
|
||||
}
|
||||
|
||||
func (c *BackupController) getBackupExtension(
|
||||
dbType databases.DatabaseType,
|
||||
backupType common.BackupType,
|
||||
) string {
|
||||
switch dbType {
|
||||
case databases.DatabaseTypeMysql, databases.DatabaseTypeMariadb:
|
||||
return ".sql.zst"
|
||||
case databases.DatabaseTypePostgres:
|
||||
// For PostgreSQL, use .tar for directory type, .dump for custom type
|
||||
if backupType == common.BackupTypeDirectory {
|
||||
return ".tar"
|
||||
}
|
||||
return ".dump"
|
||||
case databases.DatabaseTypeMongodb:
|
||||
return ".archive"
|
||||
default:
|
||||
return ".backup"
|
||||
}
|
||||
}
|
||||
|
||||
func sanitizeFilename(name string) string {
|
||||
// Replace characters that are invalid in filenames
|
||||
replacer := map[rune]rune{
|
||||
' ': '_',
|
||||
'/': '-',
|
||||
'\\': '-',
|
||||
':': '-',
|
||||
'*': '-',
|
||||
'?': '-',
|
||||
'"': '-',
|
||||
'<': '-',
|
||||
'>': '-',
|
||||
'|': '-',
|
||||
}
|
||||
|
||||
result := make([]rune, 0, len(name))
|
||||
for _, char := range name {
|
||||
if replacement, exists := replacer[char]; exists {
|
||||
result = append(result, replacement)
|
||||
} else {
|
||||
result = append(result, char)
|
||||
}
|
||||
}
|
||||
|
||||
return string(result)
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
audit_logs "databasus-backend/internal/features/audit_logs"
|
||||
common "databasus-backend/internal/features/backups/backups/common"
|
||||
backups_config "databasus-backend/internal/features/backups/config"
|
||||
"databasus-backend/internal/features/databases"
|
||||
"databasus-backend/internal/features/databases/databases/postgresql"
|
||||
@@ -494,6 +495,123 @@ func Test_DownloadBackup_AuditLogWritten(t *testing.T) {
|
||||
assert.True(t, found, "Audit log for backup download not found")
|
||||
}
|
||||
|
||||
func Test_DownloadBackup_ProperFilenameForPostgreSQL(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
databaseName string
|
||||
backupType string
|
||||
expectedExt string
|
||||
expectedInName string
|
||||
}{
|
||||
{
|
||||
name: "PostgreSQL with directory type",
|
||||
databaseName: "my_postgres_db",
|
||||
backupType: "DIRECTORY",
|
||||
expectedExt: ".tar",
|
||||
expectedInName: "my_postgres_db_backup_",
|
||||
},
|
||||
{
|
||||
name: "PostgreSQL with default type",
|
||||
databaseName: "my_postgres_db",
|
||||
backupType: "DEFAULT",
|
||||
expectedExt: ".dump",
|
||||
expectedInName: "my_postgres_db_backup_",
|
||||
},
|
||||
{
|
||||
name: "Database name with spaces",
|
||||
databaseName: "my test db",
|
||||
backupType: "DIRECTORY",
|
||||
expectedExt: ".tar",
|
||||
expectedInName: "my_test_db_backup_",
|
||||
},
|
||||
{
|
||||
name: "Database name with special characters",
|
||||
databaseName: "my:db/test",
|
||||
backupType: "DEFAULT",
|
||||
expectedExt: ".dump",
|
||||
expectedInName: "my-db-test_backup_",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
router := createTestRouter()
|
||||
owner := users_testing.CreateTestUser(users_enums.UserRoleMember)
|
||||
workspace := workspaces_testing.CreateTestWorkspace("Test Workspace", owner, router)
|
||||
|
||||
database := createTestDatabase(tt.databaseName, workspace.ID, owner.Token, router)
|
||||
storage := createTestStorage(workspace.ID)
|
||||
|
||||
configService := backups_config.GetBackupConfigService()
|
||||
config, err := configService.GetBackupConfigByDbId(database.ID)
|
||||
assert.NoError(t, err)
|
||||
|
||||
config.IsBackupsEnabled = true
|
||||
config.StorageID = &storage.ID
|
||||
config.Storage = storage
|
||||
_, err = configService.SaveBackupConfig(config)
|
||||
assert.NoError(t, err)
|
||||
|
||||
backup := createTestBackupWithType(database, owner, tt.backupType)
|
||||
|
||||
resp := test_utils.MakeGetRequest(
|
||||
t,
|
||||
router,
|
||||
fmt.Sprintf("/api/v1/backups/%s/file", backup.ID.String()),
|
||||
"Bearer "+owner.Token,
|
||||
http.StatusOK,
|
||||
)
|
||||
|
||||
contentDisposition := resp.Headers.Get("Content-Disposition")
|
||||
assert.NotEmpty(t, contentDisposition, "Content-Disposition header should be present")
|
||||
|
||||
// Verify the filename contains expected parts
|
||||
assert.Contains(
|
||||
t,
|
||||
contentDisposition,
|
||||
tt.expectedInName,
|
||||
"Filename should contain sanitized database name",
|
||||
)
|
||||
assert.Contains(
|
||||
t,
|
||||
contentDisposition,
|
||||
tt.expectedExt,
|
||||
"Filename should have correct extension",
|
||||
)
|
||||
assert.Contains(t, contentDisposition, "attachment", "Should be an attachment")
|
||||
|
||||
// Verify timestamp format (YYYY-MM-DD_HH-mm-ss)
|
||||
assert.Regexp(
|
||||
t,
|
||||
`\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}`,
|
||||
contentDisposition,
|
||||
"Filename should contain timestamp",
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_SanitizeFilename(t *testing.T) {
|
||||
tests := []struct {
|
||||
input string
|
||||
expected string
|
||||
}{
|
||||
{input: "simple_name", expected: "simple_name"},
|
||||
{input: "name with spaces", expected: "name_with_spaces"},
|
||||
{input: "name/with\\slashes", expected: "name-with-slashes"},
|
||||
{input: "name:with*special?chars", expected: "name-with-special-chars"},
|
||||
{input: "name<with>pipes|", expected: "name-with-pipes-"},
|
||||
{input: `name"with"quotes`, expected: "name-with-quotes"},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.input, func(t *testing.T) {
|
||||
result := sanitizeFilename(tt.input)
|
||||
assert.Equal(t, tt.expected, result)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_CancelBackup_InProgressBackup_SuccessfullyCancelled(t *testing.T) {
|
||||
router := createTestRouter()
|
||||
owner := users_testing.CreateTestUser(users_enums.UserRoleMember)
|
||||
@@ -709,3 +827,20 @@ func createTestBackup(
|
||||
|
||||
return backup
|
||||
}
|
||||
|
||||
func createTestBackupWithType(
|
||||
database *databases.Database,
|
||||
owner *users_dto.SignInResponseDTO,
|
||||
backupType string,
|
||||
) *Backup {
|
||||
backup := createTestBackup(database, owner)
|
||||
|
||||
// Update the format field
|
||||
repo := &BackupRepository{}
|
||||
backup.Type = common.BackupType(backupType)
|
||||
if err := repo.Save(backup); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return backup
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ package backups
|
||||
import (
|
||||
"context"
|
||||
|
||||
usecases_common "databasus-backend/internal/features/backups/backups/usecases/common"
|
||||
usecases_common "databasus-backend/internal/features/backups/backups/common"
|
||||
backups_config "databasus-backend/internal/features/backups/config"
|
||||
"databasus-backend/internal/features/databases"
|
||||
"databasus-backend/internal/features/notifiers"
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package backups
|
||||
|
||||
import (
|
||||
common "databasus-backend/internal/features/backups/backups/common"
|
||||
backups_config "databasus-backend/internal/features/backups/config"
|
||||
"time"
|
||||
|
||||
@@ -24,5 +25,6 @@ type Backup struct {
|
||||
EncryptionIV *string `json:"-" gorm:"column:encryption_iv"`
|
||||
Encryption backups_config.BackupEncryption `json:"encryption" gorm:"column:encryption;type:text;not null;default:'NONE'"`
|
||||
|
||||
CreatedAt time.Time `json:"createdAt" gorm:"column:created_at"`
|
||||
Type common.BackupType `json:"type" gorm:"column:type;type:text;not null;default:'DEFAULT'"`
|
||||
CreatedAt time.Time `json:"createdAt" gorm:"column:created_at"`
|
||||
}
|
||||
|
||||
@@ -343,6 +343,7 @@ func (s *BackupService) MakeBackup(databaseID uuid.UUID, isLastTry bool) {
|
||||
backup.EncryptionSalt = backupMetadata.EncryptionSalt
|
||||
backup.EncryptionIV = backupMetadata.EncryptionIV
|
||||
backup.Encryption = backupMetadata.Encryption
|
||||
backup.Type = backupMetadata.Type
|
||||
}
|
||||
|
||||
if err := s.backupRepository.Save(backup); err != nil {
|
||||
@@ -502,19 +503,19 @@ func (s *BackupService) CancelBackup(
|
||||
func (s *BackupService) GetBackupFile(
|
||||
user *users_models.User,
|
||||
backupID uuid.UUID,
|
||||
) (io.ReadCloser, databases.DatabaseType, error) {
|
||||
) (io.ReadCloser, *Backup, *databases.Database, error) {
|
||||
backup, err := s.backupRepository.FindByID(backupID)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
database, err := s.databaseService.GetDatabaseByID(backup.DatabaseID)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
if database.WorkspaceID == nil {
|
||||
return nil, "", errors.New("cannot download backup for database without workspace")
|
||||
return nil, nil, nil, errors.New("cannot download backup for database without workspace")
|
||||
}
|
||||
|
||||
canAccess, _, err := s.workspaceService.CanUserAccessWorkspace(
|
||||
@@ -522,10 +523,12 @@ func (s *BackupService) GetBackupFile(
|
||||
user,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
if !canAccess {
|
||||
return nil, "", errors.New("insufficient permissions to download backup for this database")
|
||||
return nil, nil, nil, errors.New(
|
||||
"insufficient permissions to download backup for this database",
|
||||
)
|
||||
}
|
||||
|
||||
s.auditLogService.WriteAuditLog(
|
||||
@@ -540,10 +543,10 @@ func (s *BackupService) GetBackupFile(
|
||||
|
||||
reader, err := s.getBackupReader(backupID)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
return reader, database.Type, nil
|
||||
return reader, backup, database, nil
|
||||
}
|
||||
|
||||
func (s *BackupService) deleteBackup(backup *Backup) error {
|
||||
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"databasus-backend/internal/features/backups/backups/usecases/common"
|
||||
common "databasus-backend/internal/features/backups/backups/common"
|
||||
backups_config "databasus-backend/internal/features/backups/config"
|
||||
"databasus-backend/internal/features/databases"
|
||||
encryption_secrets "databasus-backend/internal/features/encryption/secrets"
|
||||
|
||||
@@ -1,9 +0,0 @@
|
||||
package common
|
||||
|
||||
import backups_config "databasus-backend/internal/features/backups/config"
|
||||
|
||||
type BackupMetadata struct {
|
||||
EncryptionSalt *string
|
||||
EncryptionIV *string
|
||||
Encryption backups_config.BackupEncryption
|
||||
}
|
||||
@@ -4,7 +4,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
usecases_common "databasus-backend/internal/features/backups/backups/usecases/common"
|
||||
common "databasus-backend/internal/features/backups/backups/common"
|
||||
usecases_mariadb "databasus-backend/internal/features/backups/backups/usecases/mariadb"
|
||||
usecases_mongodb "databasus-backend/internal/features/backups/backups/usecases/mongodb"
|
||||
usecases_mysql "databasus-backend/internal/features/backups/backups/usecases/mysql"
|
||||
@@ -30,7 +30,7 @@ func (uc *CreateBackupUsecase) Execute(
|
||||
database *databases.Database,
|
||||
storage *storages.Storage,
|
||||
backupProgressListener func(completedMBs float64),
|
||||
) (*usecases_common.BackupMetadata, error) {
|
||||
) (*common.BackupMetadata, error) {
|
||||
switch database.Type {
|
||||
case databases.DatabaseTypePostgres:
|
||||
return uc.CreatePostgresqlBackupUsecase.Execute(
|
||||
|
||||
@@ -18,8 +18,8 @@ import (
|
||||
"github.com/klauspost/compress/zstd"
|
||||
|
||||
"databasus-backend/internal/config"
|
||||
common "databasus-backend/internal/features/backups/backups/common"
|
||||
backup_encryption "databasus-backend/internal/features/backups/backups/encryption"
|
||||
usecases_common "databasus-backend/internal/features/backups/backups/usecases/common"
|
||||
backups_config "databasus-backend/internal/features/backups/config"
|
||||
"databasus-backend/internal/features/databases"
|
||||
mariadbtypes "databasus-backend/internal/features/databases/databases/mariadb"
|
||||
@@ -30,7 +30,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
backupTimeout = 23 * time.Hour
|
||||
backupTimeout = 6 * time.Hour
|
||||
shutdownCheckInterval = 1 * time.Second
|
||||
copyBufferSize = 8 * 1024 * 1024
|
||||
progressReportIntervalMB = 1.0
|
||||
@@ -57,7 +57,7 @@ func (uc *CreateMariadbBackupUsecase) Execute(
|
||||
db *databases.Database,
|
||||
storage *storages.Storage,
|
||||
backupProgressListener func(completedMBs float64),
|
||||
) (*usecases_common.BackupMetadata, error) {
|
||||
) (*common.BackupMetadata, error) {
|
||||
uc.logger.Info(
|
||||
"Creating MariaDB backup via mariadb-dump",
|
||||
"databaseId", db.ID,
|
||||
@@ -140,7 +140,7 @@ func (uc *CreateMariadbBackupUsecase) streamToStorage(
|
||||
storage *storages.Storage,
|
||||
backupProgressListener func(completedMBs float64),
|
||||
mdbConfig *mariadbtypes.MariadbDatabase,
|
||||
) (*usecases_common.BackupMetadata, error) {
|
||||
) (*common.BackupMetadata, error) {
|
||||
uc.logger.Info("Streaming MariaDB backup to storage", "mariadbBin", mariadbBin)
|
||||
|
||||
ctx, cancel := uc.createBackupContext(parentCtx)
|
||||
@@ -196,7 +196,7 @@ func (uc *CreateMariadbBackupUsecase) streamToStorage(
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create zstd writer: %w", err)
|
||||
}
|
||||
countingWriter := usecases_common.NewCountingWriter(zstdWriter)
|
||||
countingWriter := common.NewCountingWriter(zstdWriter)
|
||||
|
||||
saveErrCh := make(chan error, 1)
|
||||
go func() {
|
||||
@@ -264,7 +264,7 @@ func (uc *CreateMariadbBackupUsecase) createTempMyCnfFile(
|
||||
mdbConfig *mariadbtypes.MariadbDatabase,
|
||||
password string,
|
||||
) (string, error) {
|
||||
tempDir, err := os.MkdirTemp("", "mycnf")
|
||||
tempDir, err := os.MkdirTemp(config.GetEnv().TempFolder, "mycnf_"+uuid.New().String())
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to create temp directory: %w", err)
|
||||
}
|
||||
@@ -401,8 +401,8 @@ func (uc *CreateMariadbBackupUsecase) setupBackupEncryption(
|
||||
backupID uuid.UUID,
|
||||
backupConfig *backups_config.BackupConfig,
|
||||
storageWriter io.WriteCloser,
|
||||
) (io.Writer, *backup_encryption.EncryptionWriter, usecases_common.BackupMetadata, error) {
|
||||
metadata := usecases_common.BackupMetadata{}
|
||||
) (io.Writer, *backup_encryption.EncryptionWriter, common.BackupMetadata, error) {
|
||||
metadata := common.BackupMetadata{}
|
||||
|
||||
if backupConfig.Encryption != backups_config.BackupEncryptionEncrypted {
|
||||
metadata.Encryption = backups_config.BackupEncryptionNone
|
||||
|
||||
@@ -15,8 +15,8 @@ import (
|
||||
"github.com/google/uuid"
|
||||
|
||||
"databasus-backend/internal/config"
|
||||
common "databasus-backend/internal/features/backups/backups/common"
|
||||
backup_encryption "databasus-backend/internal/features/backups/backups/encryption"
|
||||
usecases_common "databasus-backend/internal/features/backups/backups/usecases/common"
|
||||
backups_config "databasus-backend/internal/features/backups/config"
|
||||
"databasus-backend/internal/features/databases"
|
||||
mongodbtypes "databasus-backend/internal/features/databases/databases/mongodb"
|
||||
@@ -27,7 +27,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
backupTimeout = 23 * time.Hour
|
||||
backupTimeout = 6 * time.Hour
|
||||
shutdownCheckInterval = 1 * time.Second
|
||||
copyBufferSize = 8 * 1024 * 1024
|
||||
progressReportIntervalMB = 1.0
|
||||
@@ -51,7 +51,7 @@ func (uc *CreateMongodbBackupUsecase) Execute(
|
||||
db *databases.Database,
|
||||
storage *storages.Storage,
|
||||
backupProgressListener func(completedMBs float64),
|
||||
) (*usecases_common.BackupMetadata, error) {
|
||||
) (*common.BackupMetadata, error) {
|
||||
uc.logger.Info(
|
||||
"Creating MongoDB backup via mongodump",
|
||||
"databaseId", db.ID,
|
||||
@@ -124,7 +124,7 @@ func (uc *CreateMongodbBackupUsecase) streamToStorage(
|
||||
args []string,
|
||||
storage *storages.Storage,
|
||||
backupProgressListener func(completedMBs float64),
|
||||
) (*usecases_common.BackupMetadata, error) {
|
||||
) (*common.BackupMetadata, error) {
|
||||
uc.logger.Info("Streaming MongoDB backup to storage", "mongodumpBin", mongodumpBin)
|
||||
|
||||
ctx, cancel := uc.createBackupContext(parentCtx)
|
||||
@@ -175,7 +175,7 @@ func (uc *CreateMongodbBackupUsecase) streamToStorage(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
countingWriter := usecases_common.NewCountingWriter(finalWriter)
|
||||
countingWriter := common.NewCountingWriter(finalWriter)
|
||||
|
||||
saveErrCh := make(chan error, 1)
|
||||
go func() {
|
||||
@@ -264,8 +264,8 @@ func (uc *CreateMongodbBackupUsecase) setupBackupEncryption(
|
||||
backupID uuid.UUID,
|
||||
backupConfig *backups_config.BackupConfig,
|
||||
storageWriter io.WriteCloser,
|
||||
) (io.Writer, *backup_encryption.EncryptionWriter, usecases_common.BackupMetadata, error) {
|
||||
backupMetadata := usecases_common.BackupMetadata{
|
||||
) (io.Writer, *backup_encryption.EncryptionWriter, common.BackupMetadata, error) {
|
||||
backupMetadata := common.BackupMetadata{
|
||||
Encryption: backups_config.BackupEncryptionNone,
|
||||
}
|
||||
|
||||
|
||||
@@ -18,8 +18,8 @@ import (
|
||||
"github.com/klauspost/compress/zstd"
|
||||
|
||||
"databasus-backend/internal/config"
|
||||
common "databasus-backend/internal/features/backups/backups/common"
|
||||
backup_encryption "databasus-backend/internal/features/backups/backups/encryption"
|
||||
usecases_common "databasus-backend/internal/features/backups/backups/usecases/common"
|
||||
backups_config "databasus-backend/internal/features/backups/config"
|
||||
"databasus-backend/internal/features/databases"
|
||||
mysqltypes "databasus-backend/internal/features/databases/databases/mysql"
|
||||
@@ -30,7 +30,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
backupTimeout = 23 * time.Hour
|
||||
backupTimeout = 6 * time.Hour
|
||||
shutdownCheckInterval = 1 * time.Second
|
||||
copyBufferSize = 8 * 1024 * 1024
|
||||
progressReportIntervalMB = 1.0
|
||||
@@ -57,7 +57,7 @@ func (uc *CreateMysqlBackupUsecase) Execute(
|
||||
db *databases.Database,
|
||||
storage *storages.Storage,
|
||||
backupProgressListener func(completedMBs float64),
|
||||
) (*usecases_common.BackupMetadata, error) {
|
||||
) (*common.BackupMetadata, error) {
|
||||
uc.logger.Info(
|
||||
"Creating MySQL backup via mysqldump",
|
||||
"databaseId", db.ID,
|
||||
@@ -155,7 +155,7 @@ func (uc *CreateMysqlBackupUsecase) streamToStorage(
|
||||
storage *storages.Storage,
|
||||
backupProgressListener func(completedMBs float64),
|
||||
myConfig *mysqltypes.MysqlDatabase,
|
||||
) (*usecases_common.BackupMetadata, error) {
|
||||
) (*common.BackupMetadata, error) {
|
||||
uc.logger.Info("Streaming MySQL backup to storage", "mysqlBin", mysqlBin)
|
||||
|
||||
ctx, cancel := uc.createBackupContext(parentCtx)
|
||||
@@ -211,7 +211,7 @@ func (uc *CreateMysqlBackupUsecase) streamToStorage(
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create zstd writer: %w", err)
|
||||
}
|
||||
countingWriter := usecases_common.NewCountingWriter(zstdWriter)
|
||||
countingWriter := common.NewCountingWriter(zstdWriter)
|
||||
|
||||
saveErrCh := make(chan error, 1)
|
||||
go func() {
|
||||
@@ -279,7 +279,7 @@ func (uc *CreateMysqlBackupUsecase) createTempMyCnfFile(
|
||||
myConfig *mysqltypes.MysqlDatabase,
|
||||
password string,
|
||||
) (string, error) {
|
||||
tempDir, err := os.MkdirTemp("", "mycnf")
|
||||
tempDir, err := os.MkdirTemp(config.GetEnv().TempFolder, "mycnf_"+uuid.New().String())
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to create temp directory: %w", err)
|
||||
}
|
||||
@@ -414,8 +414,8 @@ func (uc *CreateMysqlBackupUsecase) setupBackupEncryption(
|
||||
backupID uuid.UUID,
|
||||
backupConfig *backups_config.BackupConfig,
|
||||
storageWriter io.WriteCloser,
|
||||
) (io.Writer, *backup_encryption.EncryptionWriter, usecases_common.BackupMetadata, error) {
|
||||
metadata := usecases_common.BackupMetadata{}
|
||||
) (io.Writer, *backup_encryption.EncryptionWriter, common.BackupMetadata, error) {
|
||||
metadata := common.BackupMetadata{}
|
||||
|
||||
if backupConfig.Encryption != backups_config.BackupEncryptionEncrypted {
|
||||
metadata.Encryption = backups_config.BackupEncryptionNone
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package usecases_postgresql
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
@@ -15,23 +16,23 @@ import (
|
||||
"time"
|
||||
|
||||
"databasus-backend/internal/config"
|
||||
common "databasus-backend/internal/features/backups/backups/common"
|
||||
backup_encryption "databasus-backend/internal/features/backups/backups/encryption"
|
||||
usecases_common "databasus-backend/internal/features/backups/backups/usecases/common"
|
||||
backups_config "databasus-backend/internal/features/backups/config"
|
||||
"databasus-backend/internal/features/databases"
|
||||
pgtypes "databasus-backend/internal/features/databases/databases/postgresql"
|
||||
encryption_secrets "databasus-backend/internal/features/encryption/secrets"
|
||||
"databasus-backend/internal/features/storages"
|
||||
"databasus-backend/internal/util/encryption"
|
||||
files_utils "databasus-backend/internal/util/files"
|
||||
"databasus-backend/internal/util/tools"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
const (
|
||||
backupTimeout = 23 * time.Hour
|
||||
backupTimeout = 6 * time.Hour
|
||||
shutdownCheckInterval = 1 * time.Second
|
||||
copyBufferSize = 8 * 1024 * 1024
|
||||
progressReportIntervalMB = 1.0
|
||||
pgConnectTimeout = 30
|
||||
compressionLevel = 5
|
||||
@@ -46,11 +47,6 @@ type CreatePostgresqlBackupUsecase struct {
|
||||
fieldEncryptor encryption.FieldEncryptor
|
||||
}
|
||||
|
||||
type writeResult struct {
|
||||
bytesWritten int
|
||||
writeErr error
|
||||
}
|
||||
|
||||
func (uc *CreatePostgresqlBackupUsecase) Execute(
|
||||
ctx context.Context,
|
||||
backupID uuid.UUID,
|
||||
@@ -60,9 +56,9 @@ func (uc *CreatePostgresqlBackupUsecase) Execute(
|
||||
backupProgressListener func(
|
||||
completedMBs float64,
|
||||
),
|
||||
) (*usecases_common.BackupMetadata, error) {
|
||||
) (*common.BackupMetadata, error) {
|
||||
uc.logger.Info(
|
||||
"Creating PostgreSQL backup via pg_dump custom format",
|
||||
"Creating PostgreSQL backup via pg_dump directory type",
|
||||
"databaseId",
|
||||
db.ID,
|
||||
"storageId",
|
||||
@@ -83,14 +79,12 @@ func (uc *CreatePostgresqlBackupUsecase) Execute(
|
||||
return nil, fmt.Errorf("database name is required for pg_dump backups")
|
||||
}
|
||||
|
||||
args := uc.buildPgDumpArgs(pg)
|
||||
|
||||
decryptedPassword, err := uc.fieldEncryptor.Decrypt(db.ID, pg.Password)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to decrypt database password: %w", err)
|
||||
}
|
||||
|
||||
return uc.streamToStorage(
|
||||
return uc.executeDirectoryBackup(
|
||||
ctx,
|
||||
backupID,
|
||||
backupConfig,
|
||||
@@ -100,66 +94,127 @@ func (uc *CreatePostgresqlBackupUsecase) Execute(
|
||||
config.GetEnv().EnvMode,
|
||||
config.GetEnv().PostgresesInstallDir,
|
||||
),
|
||||
args,
|
||||
pg,
|
||||
decryptedPassword,
|
||||
storage,
|
||||
db,
|
||||
backupProgressListener,
|
||||
)
|
||||
}
|
||||
|
||||
// streamToStorage streams pg_dump output directly to storage
|
||||
func (uc *CreatePostgresqlBackupUsecase) streamToStorage(
|
||||
// executeDirectoryBackup runs pg_dump with directory type and streams as TAR to storage
|
||||
func (uc *CreatePostgresqlBackupUsecase) executeDirectoryBackup(
|
||||
parentCtx context.Context,
|
||||
backupID uuid.UUID,
|
||||
backupConfig *backups_config.BackupConfig,
|
||||
pgBin string,
|
||||
args []string,
|
||||
pg *pgtypes.PostgresqlDatabase,
|
||||
password string,
|
||||
storage *storages.Storage,
|
||||
db *databases.Database,
|
||||
backupProgressListener func(completedMBs float64),
|
||||
) (*usecases_common.BackupMetadata, error) {
|
||||
uc.logger.Info("Streaming PostgreSQL backup to storage", "pgBin", pgBin, "args", args)
|
||||
|
||||
) (*common.BackupMetadata, error) {
|
||||
ctx, cancel := uc.createBackupContext(parentCtx)
|
||||
defer cancel()
|
||||
|
||||
pgpassFile, err := uc.setupPgpassFile(db.Postgresql, password)
|
||||
// Create temporary directory for pg_dump output
|
||||
err := files_utils.EnsureDirectories([]string{config.GetEnv().TempFolder})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to ensure temp directories: %w", err)
|
||||
}
|
||||
|
||||
tempDir, err := os.MkdirTemp(config.GetEnv().TempFolder, "pgdump_"+backupID.String())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create temp directory: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
_ = os.RemoveAll(tempDir)
|
||||
}()
|
||||
|
||||
outputDir := filepath.Join(tempDir, "dump")
|
||||
|
||||
args := uc.buildPgDumpArgs(pg, outputDir)
|
||||
uc.logger.Info(
|
||||
"Executing PostgreSQL backup with directory type",
|
||||
"pgBin",
|
||||
pgBin,
|
||||
"args",
|
||||
args,
|
||||
"outputDir",
|
||||
outputDir,
|
||||
)
|
||||
|
||||
pgpassFile, err := uc.setupPgpassFile(pg, password)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
if pgpassFile != "" {
|
||||
// Remove the entire temp directory (which contains the .pgpass file)
|
||||
_ = os.RemoveAll(filepath.Dir(pgpassFile))
|
||||
}
|
||||
}()
|
||||
|
||||
// Execute pg_dump to directory
|
||||
cmd := exec.CommandContext(ctx, pgBin, args...)
|
||||
uc.logger.Info("Executing PostgreSQL backup command", "command", cmd.String())
|
||||
|
||||
if err := uc.setupPgEnvironment(cmd, pgpassFile, db.Postgresql.IsHttps, password, db.Postgresql.CpuCount, pgBin); err != nil {
|
||||
if err := uc.setupPgEnvironment(cmd, pgpassFile, pg.IsHttps, password, pg.CpuCount, pgBin); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pgStdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("stdout pipe: %w", err)
|
||||
}
|
||||
|
||||
pgStderr, err := cmd.StderrPipe()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("stderr pipe: %w", err)
|
||||
}
|
||||
|
||||
// Capture stderr in a separate goroutine to ensure we don't miss any error output
|
||||
stderrCh := make(chan []byte, 1)
|
||||
go func() {
|
||||
stderrOutput, _ := io.ReadAll(pgStderr)
|
||||
stderrCh <- stderrOutput
|
||||
}()
|
||||
|
||||
if err = cmd.Start(); err != nil {
|
||||
return nil, fmt.Errorf("start %s: %w", filepath.Base(pgBin), err)
|
||||
}
|
||||
|
||||
waitErr := cmd.Wait()
|
||||
stderrOutput := <-stderrCh
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, uc.checkCancellationReason()
|
||||
default:
|
||||
}
|
||||
|
||||
if waitErr != nil {
|
||||
if err := uc.checkCancellation(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, uc.buildPgDumpErrorMessage(waitErr, stderrOutput, pgBin, args, password)
|
||||
}
|
||||
|
||||
uc.logger.Info(
|
||||
"pg_dump completed successfully, streaming TAR to storage",
|
||||
"outputDir",
|
||||
outputDir,
|
||||
)
|
||||
|
||||
// Stream directory as TAR to storage
|
||||
return uc.streamDirectoryToStorage(
|
||||
ctx,
|
||||
backupID,
|
||||
backupConfig,
|
||||
outputDir,
|
||||
storage,
|
||||
backupProgressListener,
|
||||
)
|
||||
}
|
||||
|
||||
// streamDirectoryToStorage creates a TAR archive from the directory and streams it to storage
|
||||
func (uc *CreatePostgresqlBackupUsecase) streamDirectoryToStorage(
|
||||
ctx context.Context,
|
||||
backupID uuid.UUID,
|
||||
backupConfig *backups_config.BackupConfig,
|
||||
sourceDir string,
|
||||
storage *storages.Storage,
|
||||
backupProgressListener func(completedMBs float64),
|
||||
) (*common.BackupMetadata, error) {
|
||||
storageReader, storageWriter := io.Pipe()
|
||||
|
||||
finalWriter, encryptionWriter, backupMetadata, err := uc.setupBackupEncryption(
|
||||
@@ -171,162 +226,176 @@ func (uc *CreatePostgresqlBackupUsecase) streamToStorage(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
countingWriter := usecases_common.NewCountingWriter(finalWriter)
|
||||
// Set type to DIRECTORY for new PostgreSQL backups
|
||||
backupMetadata.Type = common.BackupTypeDirectory
|
||||
|
||||
// The backup ID becomes the object key / filename in storage
|
||||
|
||||
// Start streaming into storage in its own goroutine
|
||||
// Start streaming into storage
|
||||
saveErrCh := make(chan error, 1)
|
||||
go func() {
|
||||
saveErr := storage.SaveFile(ctx, uc.fieldEncryptor, uc.logger, backupID, storageReader)
|
||||
saveErrCh <- saveErr
|
||||
}()
|
||||
|
||||
// Start pg_dump
|
||||
if err = cmd.Start(); err != nil {
|
||||
return nil, fmt.Errorf("start %s: %w", filepath.Base(pgBin), err)
|
||||
}
|
||||
|
||||
// Copy pg output directly to storage with shutdown checks
|
||||
copyResultCh := make(chan error, 1)
|
||||
bytesWrittenCh := make(chan int64, 1)
|
||||
// Create TAR and stream to storage
|
||||
tarErrCh := make(chan error, 1)
|
||||
totalSizeCh := make(chan int64, 1)
|
||||
go func() {
|
||||
bytesWritten, err := uc.copyWithShutdownCheck(
|
||||
totalSize, tarErr := uc.writeTarToWriter(
|
||||
ctx,
|
||||
countingWriter,
|
||||
pgStdout,
|
||||
sourceDir,
|
||||
finalWriter,
|
||||
backupProgressListener,
|
||||
)
|
||||
bytesWrittenCh <- bytesWritten
|
||||
copyResultCh <- err
|
||||
totalSizeCh <- totalSize
|
||||
tarErrCh <- tarErr
|
||||
|
||||
// Close encryption writer first if present
|
||||
if encryptionWriter != nil {
|
||||
if closeErr := encryptionWriter.Close(); closeErr != nil {
|
||||
uc.logger.Error("Failed to close encryption writer", "error", closeErr)
|
||||
}
|
||||
}
|
||||
// Then close the pipe writer to signal EOF to storage
|
||||
if closeErr := storageWriter.Close(); closeErr != nil {
|
||||
uc.logger.Error("Failed to close pipe writer", "error", closeErr)
|
||||
}
|
||||
}()
|
||||
|
||||
copyErr := <-copyResultCh
|
||||
bytesWritten := <-bytesWrittenCh
|
||||
waitErr := cmd.Wait()
|
||||
tarErr := <-tarErrCh
|
||||
totalSize := <-totalSizeCh
|
||||
saveErr := <-saveErrCh
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
uc.cleanupOnCancellation(encryptionWriter, storageWriter, saveErrCh)
|
||||
return nil, uc.checkCancellationReason()
|
||||
default:
|
||||
}
|
||||
|
||||
if err := uc.closeWriters(encryptionWriter, storageWriter); err != nil {
|
||||
<-saveErrCh
|
||||
return nil, err
|
||||
}
|
||||
|
||||
saveErr := <-saveErrCh
|
||||
stderrOutput := <-stderrCh
|
||||
|
||||
// Send final sizing after backup is completed
|
||||
if waitErr == nil && copyErr == nil && saveErr == nil && backupProgressListener != nil {
|
||||
sizeMB := float64(bytesWritten) / (1024 * 1024)
|
||||
// Send final size after backup is completed
|
||||
if tarErr == nil && saveErr == nil && backupProgressListener != nil {
|
||||
sizeMB := float64(totalSize) / (1024 * 1024)
|
||||
backupProgressListener(sizeMB)
|
||||
}
|
||||
|
||||
switch {
|
||||
case waitErr != nil:
|
||||
if tarErr != nil {
|
||||
if err := uc.checkCancellation(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, uc.buildPgDumpErrorMessage(waitErr, stderrOutput, pgBin, args, password)
|
||||
case copyErr != nil:
|
||||
if err := uc.checkCancellation(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, fmt.Errorf("copy to storage: %w", copyErr)
|
||||
case saveErr != nil:
|
||||
if err := uc.checkCancellation(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, fmt.Errorf("save to storage: %w", saveErr)
|
||||
return nil, fmt.Errorf("failed to create TAR archive: %w", tarErr)
|
||||
}
|
||||
|
||||
if saveErr != nil {
|
||||
if err := uc.checkCancellation(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, fmt.Errorf("failed to save to storage: %w", saveErr)
|
||||
}
|
||||
|
||||
uc.logger.Info(
|
||||
"Backup completed successfully",
|
||||
"backupId",
|
||||
backupID,
|
||||
"totalSizeBytes",
|
||||
totalSize,
|
||||
)
|
||||
return &backupMetadata, nil
|
||||
}
|
||||
|
||||
func (uc *CreatePostgresqlBackupUsecase) copyWithShutdownCheck(
|
||||
// writeTarToWriter creates a TAR archive from sourceDir and writes it to the writer
|
||||
func (uc *CreatePostgresqlBackupUsecase) writeTarToWriter(
|
||||
ctx context.Context,
|
||||
dst io.Writer,
|
||||
src io.Reader,
|
||||
sourceDir string,
|
||||
writer io.Writer,
|
||||
backupProgressListener func(completedMBs float64),
|
||||
) (int64, error) {
|
||||
buf := make([]byte, copyBufferSize)
|
||||
var totalBytesWritten int64
|
||||
tarWriter := tar.NewWriter(writer)
|
||||
defer func() {
|
||||
_ = tarWriter.Close()
|
||||
}()
|
||||
|
||||
var totalSize int64
|
||||
var lastReportedMB float64
|
||||
|
||||
for {
|
||||
err := filepath.Walk(sourceDir, func(path string, info os.FileInfo, walkErr error) error {
|
||||
if walkErr != nil {
|
||||
return walkErr
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return totalBytesWritten, fmt.Errorf("copy cancelled: %w", ctx.Err())
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if config.IsShouldShutdown() {
|
||||
return totalBytesWritten, fmt.Errorf("copy cancelled due to shutdown")
|
||||
return fmt.Errorf("backup cancelled due to shutdown")
|
||||
}
|
||||
|
||||
bytesRead, readErr := src.Read(buf)
|
||||
if bytesRead > 0 {
|
||||
writeResultCh := make(chan writeResult, 1)
|
||||
go func() {
|
||||
bytesWritten, writeErr := dst.Write(buf[0:bytesRead])
|
||||
writeResultCh <- writeResult{bytesWritten, writeErr}
|
||||
}()
|
||||
// Get relative path for TAR header
|
||||
relPath, err := filepath.Rel(sourceDir, path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get relative path: %w", err)
|
||||
}
|
||||
|
||||
var bytesWritten int
|
||||
var writeErr error
|
||||
// Skip the root directory itself
|
||||
if relPath == "." {
|
||||
return nil
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return totalBytesWritten, fmt.Errorf("copy cancelled during write: %w", ctx.Err())
|
||||
case result := <-writeResultCh:
|
||||
bytesWritten = result.bytesWritten
|
||||
writeErr = result.writeErr
|
||||
}
|
||||
// Create TAR header
|
||||
header, err := tar.FileInfoHeader(info, "")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create TAR header: %w", err)
|
||||
}
|
||||
header.Name = relPath
|
||||
|
||||
if bytesWritten < 0 || bytesRead < bytesWritten {
|
||||
bytesWritten = 0
|
||||
if writeErr == nil {
|
||||
writeErr = fmt.Errorf("invalid write result")
|
||||
}
|
||||
}
|
||||
if err := tarWriter.WriteHeader(header); err != nil {
|
||||
return fmt.Errorf("failed to write TAR header: %w", err)
|
||||
}
|
||||
|
||||
if writeErr != nil {
|
||||
return totalBytesWritten, writeErr
|
||||
}
|
||||
// If it's a directory, we're done
|
||||
if info.IsDir() {
|
||||
return nil
|
||||
}
|
||||
|
||||
if bytesRead != bytesWritten {
|
||||
return totalBytesWritten, io.ErrShortWrite
|
||||
}
|
||||
// Copy file content to TAR
|
||||
file, err := os.Open(path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open file %s: %w", path, err)
|
||||
}
|
||||
defer func() {
|
||||
_ = file.Close()
|
||||
}()
|
||||
|
||||
totalBytesWritten += int64(bytesWritten)
|
||||
written, err := io.Copy(tarWriter, file)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to write file %s to TAR: %w", path, err)
|
||||
}
|
||||
|
||||
if backupProgressListener != nil {
|
||||
currentSizeMB := float64(totalBytesWritten) / (1024 * 1024)
|
||||
if currentSizeMB >= lastReportedMB+progressReportIntervalMB {
|
||||
backupProgressListener(currentSizeMB)
|
||||
lastReportedMB = currentSizeMB
|
||||
}
|
||||
totalSize += written
|
||||
|
||||
// Report progress
|
||||
if backupProgressListener != nil {
|
||||
currentSizeMB := float64(totalSize) / (1024 * 1024)
|
||||
if currentSizeMB >= lastReportedMB+progressReportIntervalMB {
|
||||
backupProgressListener(currentSizeMB)
|
||||
lastReportedMB = currentSizeMB
|
||||
}
|
||||
}
|
||||
|
||||
if readErr != nil {
|
||||
if readErr != io.EOF {
|
||||
return totalBytesWritten, readErr
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
return totalBytesWritten, nil
|
||||
return totalSize, err
|
||||
}
|
||||
|
||||
func (uc *CreatePostgresqlBackupUsecase) buildPgDumpArgs(pg *pgtypes.PostgresqlDatabase) []string {
|
||||
func (uc *CreatePostgresqlBackupUsecase) buildPgDumpArgs(
|
||||
pg *pgtypes.PostgresqlDatabase,
|
||||
outputDir string,
|
||||
) []string {
|
||||
args := []string{
|
||||
"-Fc",
|
||||
"-Fd", // Directory type (enables parallel dump)
|
||||
"-f", outputDir, // Output directory
|
||||
"--no-password",
|
||||
"-h", pg.Host,
|
||||
"-p", strconv.Itoa(pg.Port),
|
||||
@@ -335,7 +404,7 @@ func (uc *CreatePostgresqlBackupUsecase) buildPgDumpArgs(pg *pgtypes.PostgresqlD
|
||||
"--verbose",
|
||||
}
|
||||
|
||||
// Add parallel jobs based on CPU count
|
||||
// Parallel jobs now actually work with directory type
|
||||
if pg.CpuCount > 1 {
|
||||
args = append(args, "-j", strconv.Itoa(pg.CpuCount))
|
||||
}
|
||||
@@ -476,8 +545,8 @@ func (uc *CreatePostgresqlBackupUsecase) setupBackupEncryption(
|
||||
backupID uuid.UUID,
|
||||
backupConfig *backups_config.BackupConfig,
|
||||
storageWriter io.WriteCloser,
|
||||
) (io.Writer, *backup_encryption.EncryptionWriter, usecases_common.BackupMetadata, error) {
|
||||
metadata := usecases_common.BackupMetadata{}
|
||||
) (io.Writer, *backup_encryption.EncryptionWriter, common.BackupMetadata, error) {
|
||||
metadata := common.BackupMetadata{}
|
||||
|
||||
if backupConfig.Encryption != backups_config.BackupEncryptionEncrypted {
|
||||
metadata.Encryption = backups_config.BackupEncryptionNone
|
||||
@@ -521,63 +590,6 @@ func (uc *CreatePostgresqlBackupUsecase) setupBackupEncryption(
|
||||
return encWriter, encWriter, metadata, nil
|
||||
}
|
||||
|
||||
func (uc *CreatePostgresqlBackupUsecase) cleanupOnCancellation(
|
||||
encryptionWriter *backup_encryption.EncryptionWriter,
|
||||
storageWriter io.WriteCloser,
|
||||
saveErrCh chan error,
|
||||
) {
|
||||
if encryptionWriter != nil {
|
||||
go func() {
|
||||
if closeErr := encryptionWriter.Close(); closeErr != nil {
|
||||
uc.logger.Error(
|
||||
"Failed to close encrypting writer during cancellation",
|
||||
"error",
|
||||
closeErr,
|
||||
)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
if err := storageWriter.Close(); err != nil {
|
||||
uc.logger.Error("Failed to close pipe writer during cancellation", "error", err)
|
||||
}
|
||||
|
||||
<-saveErrCh
|
||||
}
|
||||
|
||||
func (uc *CreatePostgresqlBackupUsecase) closeWriters(
|
||||
encryptionWriter *backup_encryption.EncryptionWriter,
|
||||
storageWriter io.WriteCloser,
|
||||
) error {
|
||||
encryptionCloseErrCh := make(chan error, 1)
|
||||
if encryptionWriter != nil {
|
||||
go func() {
|
||||
closeErr := encryptionWriter.Close()
|
||||
if closeErr != nil {
|
||||
uc.logger.Error("Failed to close encrypting writer", "error", closeErr)
|
||||
}
|
||||
encryptionCloseErrCh <- closeErr
|
||||
}()
|
||||
} else {
|
||||
encryptionCloseErrCh <- nil
|
||||
}
|
||||
|
||||
encryptionCloseErr := <-encryptionCloseErrCh
|
||||
if encryptionCloseErr != nil {
|
||||
if err := storageWriter.Close(); err != nil {
|
||||
uc.logger.Error("Failed to close pipe writer after encryption error", "error", err)
|
||||
}
|
||||
return fmt.Errorf("failed to close encryption writer: %w", encryptionCloseErr)
|
||||
}
|
||||
|
||||
if err := storageWriter.Close(); err != nil {
|
||||
uc.logger.Error("Failed to close pipe writer", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (uc *CreatePostgresqlBackupUsecase) checkCancellation(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -759,7 +771,7 @@ func (uc *CreatePostgresqlBackupUsecase) createTempPgpassFile(
|
||||
escapedPassword,
|
||||
)
|
||||
|
||||
tempDir, err := os.MkdirTemp("", "pgpass")
|
||||
tempDir, err := os.MkdirTemp(config.GetEnv().TempFolder, "pgpass_"+uuid.New().String())
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to create temporary directory: %w", err)
|
||||
}
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package disk
|
||||
|
||||
import (
|
||||
"databasus-backend/internal/config"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
|
||||
"github.com/shirou/gopsutil/v4/disk"
|
||||
@@ -12,10 +14,14 @@ type DiskService struct{}
|
||||
func (s *DiskService) GetDiskUsage() (*DiskUsage, error) {
|
||||
platform := s.detectPlatform()
|
||||
|
||||
// Set path based on platform
|
||||
path := "/"
|
||||
var path string
|
||||
|
||||
if platform == PlatformWindows {
|
||||
path = "C:\\"
|
||||
} else {
|
||||
// Use databasus-data folder location for Linux (Docker)
|
||||
cfg := config.GetEnv()
|
||||
path = filepath.Dir(cfg.DataFolder) // Gets /databasus-data from /databasus-data/backups
|
||||
}
|
||||
|
||||
diskUsage, err := disk.Usage(path)
|
||||
|
||||
@@ -250,6 +250,44 @@ func Test_RestoreBackup_AuditLogWritten(t *testing.T) {
|
||||
assert.True(t, found, "Audit log for restore not found")
|
||||
}
|
||||
|
||||
func Test_RestoreBackup_InsufficientDiskSpace_ReturnsError(t *testing.T) {
|
||||
router := createTestRouter()
|
||||
owner := users_testing.CreateTestUser(users_enums.UserRoleMember)
|
||||
workspace := workspaces_testing.CreateTestWorkspace("Test Workspace", owner, router)
|
||||
|
||||
_, backup := createTestDatabaseWithBackupForRestore(workspace, owner, router)
|
||||
|
||||
// Update backup size to 10 TB via repository
|
||||
repo := &backups.BackupRepository{}
|
||||
backup.BackupSizeMb = 10485760.0 // 10 TB in MB
|
||||
err := repo.Save(backup)
|
||||
assert.NoError(t, err)
|
||||
|
||||
request := RestoreBackupRequest{
|
||||
PostgresqlDatabase: &postgresql.PostgresqlDatabase{
|
||||
Version: tools.PostgresqlVersion16,
|
||||
Host: "localhost",
|
||||
Port: 5432,
|
||||
Username: "postgres",
|
||||
Password: "postgres",
|
||||
},
|
||||
}
|
||||
|
||||
testResp := test_utils.MakePostRequest(
|
||||
t,
|
||||
router,
|
||||
fmt.Sprintf("/api/v1/restores/%s/restore", backup.ID.String()),
|
||||
"Bearer "+owner.Token,
|
||||
request,
|
||||
http.StatusBadRequest,
|
||||
)
|
||||
|
||||
bodyStr := string(testResp.Body)
|
||||
assert.Contains(t, bodyStr, "is required")
|
||||
assert.Contains(t, bodyStr, "is available")
|
||||
assert.Contains(t, bodyStr, "disk space")
|
||||
}
|
||||
|
||||
func createTestDatabaseWithBackupForRestore(
|
||||
workspace *workspaces_models.Workspace,
|
||||
owner *users_dto.SignInResponseDTO,
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"databasus-backend/internal/features/backups/backups"
|
||||
backups_config "databasus-backend/internal/features/backups/config"
|
||||
"databasus-backend/internal/features/databases"
|
||||
"databasus-backend/internal/features/disk"
|
||||
"databasus-backend/internal/features/restores/usecases"
|
||||
"databasus-backend/internal/features/storages"
|
||||
workspaces_services "databasus-backend/internal/features/workspaces/services"
|
||||
@@ -24,6 +25,7 @@ var restoreService = &RestoreService{
|
||||
workspaces_services.GetWorkspaceService(),
|
||||
audit_logs.GetAuditLogService(),
|
||||
encryption.GetFieldEncryptor(),
|
||||
disk.GetDiskService(),
|
||||
}
|
||||
var restoreController = &RestoreController{
|
||||
restoreService,
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"databasus-backend/internal/features/backups/backups"
|
||||
backups_config "databasus-backend/internal/features/backups/config"
|
||||
"databasus-backend/internal/features/databases"
|
||||
"databasus-backend/internal/features/disk"
|
||||
"databasus-backend/internal/features/restores/enums"
|
||||
"databasus-backend/internal/features/restores/models"
|
||||
"databasus-backend/internal/features/restores/usecases"
|
||||
@@ -32,6 +33,7 @@ type RestoreService struct {
|
||||
workspaceService *workspaces_services.WorkspaceService
|
||||
auditLogService *audit_logs.AuditLogService
|
||||
fieldEncryptor encryption.FieldEncryptor
|
||||
diskService *disk.DiskService
|
||||
}
|
||||
|
||||
func (s *RestoreService) OnBeforeBackupRemove(backup *backups.Backup) error {
|
||||
@@ -126,6 +128,11 @@ func (s *RestoreService) RestoreBackupWithAuth(
|
||||
return err
|
||||
}
|
||||
|
||||
// Validate disk space before starting restore
|
||||
if err := s.validateDiskSpace(backup); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go func() {
|
||||
if err := s.RestoreBackup(backup, requestDTO); err != nil {
|
||||
s.logger.Error("Failed to restore backup", "error", err)
|
||||
@@ -361,3 +368,41 @@ func (s *RestoreService) validateVersionCompatibility(
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *RestoreService) validateDiskSpace(backup *backups.Backup) error {
|
||||
diskUsage, err := s.diskService.GetDiskUsage()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check disk space: %w", err)
|
||||
}
|
||||
|
||||
// Convert backup size from MB to bytes
|
||||
backupSizeBytes := int64(backup.BackupSizeMb * 1024 * 1024)
|
||||
|
||||
// Calculate required space: backup size + 10% buffer
|
||||
bufferBytes := int64(float64(backupSizeBytes) * 0.1)
|
||||
requiredBytes := backupSizeBytes + bufferBytes
|
||||
|
||||
// Ensure minimum of 1 GB total (even if backup is small)
|
||||
minRequiredBytes := int64(1024 * 1024 * 1024) // 1 GB
|
||||
if requiredBytes < minRequiredBytes {
|
||||
requiredBytes = minRequiredBytes
|
||||
}
|
||||
|
||||
// Check if there's enough free space
|
||||
if diskUsage.FreeSpaceBytes < requiredBytes {
|
||||
backupSizeGB := float64(backupSizeBytes) / (1024 * 1024 * 1024)
|
||||
bufferSizeGB := float64(bufferBytes) / (1024 * 1024 * 1024)
|
||||
requiredGB := float64(requiredBytes) / (1024 * 1024 * 1024)
|
||||
availableGB := float64(diskUsage.FreeSpaceBytes) / (1024 * 1024 * 1024)
|
||||
|
||||
return fmt.Errorf(
|
||||
"to restore this backup, %.1f GB (%.1f GB backup + %.1f GB buffer) is required, but only %.1f GB is available. Please free up disk space before restoring",
|
||||
requiredGB,
|
||||
backupSizeGB,
|
||||
bufferSizeGB,
|
||||
availableGB,
|
||||
)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -330,7 +330,7 @@ func (uc *RestoreMariadbBackupUsecase) createTempMyCnfFile(
|
||||
mdbConfig *mariadbtypes.MariadbDatabase,
|
||||
password string,
|
||||
) (string, error) {
|
||||
tempDir, err := os.MkdirTemp("", "mycnf")
|
||||
tempDir, err := os.MkdirTemp(config.GetEnv().TempFolder, "mycnf_"+uuid.New().String())
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to create temp directory: %w", err)
|
||||
}
|
||||
|
||||
@@ -322,7 +322,7 @@ func (uc *RestoreMysqlBackupUsecase) createTempMyCnfFile(
|
||||
myConfig *mysqltypes.MysqlDatabase,
|
||||
password string,
|
||||
) (string, error) {
|
||||
tempDir, err := os.MkdirTemp("", "mycnf")
|
||||
tempDir, err := os.MkdirTemp(config.GetEnv().TempFolder, "mycnf_"+uuid.New().String())
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to create temp directory: %w", err)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package usecases_postgresql
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
@@ -16,6 +17,7 @@ import (
|
||||
|
||||
"databasus-backend/internal/config"
|
||||
"databasus-backend/internal/features/backups/backups"
|
||||
common "databasus-backend/internal/features/backups/backups/common"
|
||||
"databasus-backend/internal/features/backups/backups/encryption"
|
||||
backups_config "databasus-backend/internal/features/backups/config"
|
||||
"databasus-backend/internal/features/databases"
|
||||
@@ -54,6 +56,8 @@ func (uc *RestorePostgresqlBackupUsecase) Execute(
|
||||
restore.ID,
|
||||
"backupId",
|
||||
backup.ID,
|
||||
"format",
|
||||
backup.Type,
|
||||
)
|
||||
|
||||
pg := restoringToDB.Postgresql
|
||||
@@ -65,33 +69,72 @@ func (uc *RestorePostgresqlBackupUsecase) Execute(
|
||||
return fmt.Errorf("target database name is required for pg_restore")
|
||||
}
|
||||
|
||||
// Use parallel jobs based on CPU count (same as backup)
|
||||
pgBin := tools.GetPostgresqlExecutable(
|
||||
pg.Version,
|
||||
"pg_restore",
|
||||
config.GetEnv().EnvMode,
|
||||
config.GetEnv().PostgresesInstallDir,
|
||||
)
|
||||
|
||||
// Route based on backup type
|
||||
switch backup.Type {
|
||||
case common.BackupTypeDirectory:
|
||||
return uc.restoreDirectoryType(
|
||||
originalDB,
|
||||
restoringToDB,
|
||||
pgBin,
|
||||
backup,
|
||||
storage,
|
||||
pg,
|
||||
isExcludeExtensions,
|
||||
)
|
||||
case common.BackupTypeDefault, "": // empty = legacy DEFAULT
|
||||
return uc.restoreCustomType(
|
||||
originalDB,
|
||||
pgBin,
|
||||
backup,
|
||||
storage,
|
||||
pg,
|
||||
isExcludeExtensions,
|
||||
)
|
||||
default:
|
||||
return fmt.Errorf("unsupported backup type: %s", backup.Type)
|
||||
}
|
||||
}
|
||||
|
||||
// restoreCustomType restores a backup in custom type (-Fc) - legacy type
|
||||
func (uc *RestorePostgresqlBackupUsecase) restoreCustomType(
|
||||
originalDB *databases.Database,
|
||||
pgBin string,
|
||||
backup *backups.Backup,
|
||||
storage *storages.Storage,
|
||||
pg *pgtypes.PostgresqlDatabase,
|
||||
isExcludeExtensions bool,
|
||||
) error {
|
||||
uc.logger.Info("Restoring backup in custom type (-Fc)", "backupId", backup.ID)
|
||||
|
||||
// Use parallel jobs based on CPU count
|
||||
// Cap between 1 and 8 to avoid overwhelming the server
|
||||
parallelJobs := max(1, min(restoringToDB.Postgresql.CpuCount, 8))
|
||||
parallelJobs := max(1, min(pg.CpuCount, 8))
|
||||
|
||||
args := []string{
|
||||
"-Fc", // expect custom format (same as backup)
|
||||
"-Fc", // expect custom type
|
||||
"-j", strconv.Itoa(parallelJobs), // parallel jobs based on CPU count
|
||||
"--no-password", // Use environment variable for password, prevent prompts
|
||||
"--no-password",
|
||||
"-h", pg.Host,
|
||||
"-p", strconv.Itoa(pg.Port),
|
||||
"-U", pg.Username,
|
||||
"-d", *pg.Database,
|
||||
"--verbose", // Add verbose output to help with debugging
|
||||
"--clean", // Clean (drop) database objects before recreating them
|
||||
"--if-exists", // Use IF EXISTS when dropping objects
|
||||
"--no-owner", // Skip restoring ownership
|
||||
"--no-acl", // Skip restoring access privileges (GRANT/REVOKE commands)
|
||||
"--verbose",
|
||||
"--clean",
|
||||
"--if-exists",
|
||||
"--no-owner",
|
||||
"--no-acl",
|
||||
}
|
||||
|
||||
return uc.restoreFromStorage(
|
||||
originalDB,
|
||||
tools.GetPostgresqlExecutable(
|
||||
pg.Version,
|
||||
"pg_restore",
|
||||
config.GetEnv().EnvMode,
|
||||
config.GetEnv().PostgresesInstallDir,
|
||||
),
|
||||
pgBin,
|
||||
args,
|
||||
pg.Password,
|
||||
backup,
|
||||
@@ -101,6 +144,100 @@ func (uc *RestorePostgresqlBackupUsecase) Execute(
|
||||
)
|
||||
}
|
||||
|
||||
// restoreDirectoryType restores a backup in directory type (-Fd) - new TAR type
|
||||
func (uc *RestorePostgresqlBackupUsecase) restoreDirectoryType(
|
||||
originalDB *databases.Database,
|
||||
_ *databases.Database, // restoringToDB not used but kept for API consistency
|
||||
pgBin string,
|
||||
backup *backups.Backup,
|
||||
storage *storages.Storage,
|
||||
pg *pgtypes.PostgresqlDatabase,
|
||||
isExcludeExtensions bool,
|
||||
) error {
|
||||
uc.logger.Info("Restoring backup in directory type (-Fd)", "backupId", backup.ID)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
// Monitor for shutdown
|
||||
go func() {
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if config.IsShouldShutdown() {
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Create temporary .pgpass file
|
||||
pgpassFile, err := uc.createTempPgpassFile(pg, pg.Password)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create temporary .pgpass file: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if pgpassFile != "" {
|
||||
_ = os.RemoveAll(filepath.Dir(pgpassFile))
|
||||
}
|
||||
}()
|
||||
|
||||
// Download and extract TAR to temporary directory
|
||||
tempDir, cleanupFunc, err := uc.downloadAndExtractTar(ctx, backup, storage)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to download and extract backup: %w", err)
|
||||
}
|
||||
defer cleanupFunc()
|
||||
|
||||
// Use parallel jobs based on CPU count
|
||||
parallelJobs := max(1, min(pg.CpuCount, 8))
|
||||
|
||||
args := []string{
|
||||
"-Fd", // directory type
|
||||
"-j", strconv.Itoa(parallelJobs), // parallel restore
|
||||
"--no-password",
|
||||
"-h", pg.Host,
|
||||
"-p", strconv.Itoa(pg.Port),
|
||||
"-U", pg.Username,
|
||||
"-d", *pg.Database,
|
||||
"--verbose",
|
||||
"--clean",
|
||||
"--if-exists",
|
||||
"--no-owner",
|
||||
"--no-acl",
|
||||
}
|
||||
|
||||
// If excluding extensions, generate filtered TOC list
|
||||
if isExcludeExtensions {
|
||||
tocListFile, err := uc.generateFilteredTocList(
|
||||
ctx,
|
||||
pgBin,
|
||||
tempDir,
|
||||
pgpassFile,
|
||||
pg,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to generate filtered TOC list: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
_ = os.Remove(tocListFile)
|
||||
}()
|
||||
|
||||
args = append(args, "-L", tocListFile)
|
||||
}
|
||||
|
||||
// Add the directory as the last argument
|
||||
args = append(args, tempDir)
|
||||
|
||||
return uc.executePgRestore(ctx, originalDB, pgBin, args, pgpassFile, pg)
|
||||
}
|
||||
|
||||
// restoreFromStorage restores backup data from storage using pg_restore
|
||||
func (uc *RestorePostgresqlBackupUsecase) restoreFromStorage(
|
||||
database *databases.Database,
|
||||
@@ -150,7 +287,7 @@ func (uc *RestorePostgresqlBackupUsecase) restoreFromStorage(
|
||||
}
|
||||
defer func() {
|
||||
if pgpassFile != "" {
|
||||
_ = os.Remove(pgpassFile)
|
||||
_ = os.RemoveAll(filepath.Dir(pgpassFile))
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -321,6 +458,175 @@ func (uc *RestorePostgresqlBackupUsecase) downloadBackupToTempFile(
|
||||
return tempBackupFile, cleanupFunc, nil
|
||||
}
|
||||
|
||||
// downloadAndExtractTar downloads a TAR backup from storage and extracts it to a temporary directory
|
||||
func (uc *RestorePostgresqlBackupUsecase) downloadAndExtractTar(
|
||||
ctx context.Context,
|
||||
backup *backups.Backup,
|
||||
storage *storages.Storage,
|
||||
) (string, func(), error) {
|
||||
err := files_utils.EnsureDirectories([]string{config.GetEnv().TempFolder})
|
||||
if err != nil {
|
||||
return "", nil, fmt.Errorf("failed to ensure directories: %w", err)
|
||||
}
|
||||
|
||||
// Create temporary directory for extracted data
|
||||
tempDir, err := os.MkdirTemp(config.GetEnv().TempFolder, "restore_dir_"+uuid.New().String())
|
||||
if err != nil {
|
||||
return "", nil, fmt.Errorf("failed to create temporary directory: %w", err)
|
||||
}
|
||||
|
||||
cleanupFunc := func() {
|
||||
_ = os.RemoveAll(tempDir)
|
||||
}
|
||||
|
||||
uc.logger.Info(
|
||||
"Downloading TAR backup from storage",
|
||||
"backupId", backup.ID,
|
||||
"tempDir", tempDir,
|
||||
"encrypted", backup.Encryption == backups_config.BackupEncryptionEncrypted,
|
||||
)
|
||||
|
||||
fieldEncryptor := util_encryption.GetFieldEncryptor()
|
||||
rawReader, err := storage.GetFile(fieldEncryptor, backup.ID)
|
||||
if err != nil {
|
||||
cleanupFunc()
|
||||
return "", nil, fmt.Errorf("failed to get backup file from storage: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if err := rawReader.Close(); err != nil {
|
||||
uc.logger.Error("Failed to close backup reader", "error", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Create a reader that handles decryption if needed
|
||||
var backupReader io.Reader = rawReader
|
||||
if backup.Encryption == backups_config.BackupEncryptionEncrypted {
|
||||
if backup.EncryptionSalt == nil || backup.EncryptionIV == nil {
|
||||
cleanupFunc()
|
||||
return "", nil, fmt.Errorf("backup is encrypted but missing encryption metadata")
|
||||
}
|
||||
|
||||
masterKey, err := uc.secretKeyService.GetSecretKey()
|
||||
if err != nil {
|
||||
cleanupFunc()
|
||||
return "", nil, fmt.Errorf("failed to get master key for decryption: %w", err)
|
||||
}
|
||||
|
||||
salt, err := base64.StdEncoding.DecodeString(*backup.EncryptionSalt)
|
||||
if err != nil {
|
||||
cleanupFunc()
|
||||
return "", nil, fmt.Errorf("failed to decode encryption salt: %w", err)
|
||||
}
|
||||
|
||||
iv, err := base64.StdEncoding.DecodeString(*backup.EncryptionIV)
|
||||
if err != nil {
|
||||
cleanupFunc()
|
||||
return "", nil, fmt.Errorf("failed to decode encryption IV: %w", err)
|
||||
}
|
||||
|
||||
decryptReader, err := encryption.NewDecryptionReader(
|
||||
rawReader,
|
||||
masterKey,
|
||||
backup.ID,
|
||||
salt,
|
||||
iv,
|
||||
)
|
||||
if err != nil {
|
||||
cleanupFunc()
|
||||
return "", nil, fmt.Errorf("failed to create decryption reader: %w", err)
|
||||
}
|
||||
|
||||
backupReader = decryptReader
|
||||
uc.logger.Info("Using decryption for encrypted backup", "backupId", backup.ID)
|
||||
}
|
||||
|
||||
// Extract TAR archive to temp directory
|
||||
if err := uc.extractTar(ctx, backupReader, tempDir); err != nil {
|
||||
cleanupFunc()
|
||||
return "", nil, fmt.Errorf("failed to extract TAR archive: %w", err)
|
||||
}
|
||||
|
||||
uc.logger.Info("TAR backup extracted to temporary directory", "tempDir", tempDir)
|
||||
return tempDir, cleanupFunc, nil
|
||||
}
|
||||
|
||||
// extractTar extracts a TAR archive to the specified directory
|
||||
func (uc *RestorePostgresqlBackupUsecase) extractTar(
|
||||
ctx context.Context,
|
||||
reader io.Reader,
|
||||
destDir string,
|
||||
) error {
|
||||
tarReader := tar.NewReader(reader)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if config.IsShouldShutdown() {
|
||||
return fmt.Errorf("extraction cancelled due to shutdown")
|
||||
}
|
||||
|
||||
header, err := tarReader.Next()
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read TAR header: %w", err)
|
||||
}
|
||||
|
||||
targetPath := filepath.Join(destDir, header.Name)
|
||||
|
||||
// Ensure the target path is within destDir (prevent path traversal)
|
||||
if !strings.HasPrefix(filepath.Clean(targetPath), filepath.Clean(destDir)) {
|
||||
return fmt.Errorf("invalid file path in TAR: %s", header.Name)
|
||||
}
|
||||
|
||||
switch header.Typeflag {
|
||||
case tar.TypeDir:
|
||||
if err := os.MkdirAll(targetPath, os.FileMode(header.Mode)); err != nil {
|
||||
return fmt.Errorf("failed to create directory %s: %w", targetPath, err)
|
||||
}
|
||||
case tar.TypeReg:
|
||||
// Ensure parent directory exists
|
||||
if err := os.MkdirAll(filepath.Dir(targetPath), 0755); err != nil {
|
||||
return fmt.Errorf("failed to create parent directory for %s: %w", targetPath, err)
|
||||
}
|
||||
|
||||
outFile, err := os.OpenFile(
|
||||
targetPath,
|
||||
os.O_CREATE|os.O_WRONLY|os.O_TRUNC,
|
||||
os.FileMode(header.Mode),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create file %s: %w", targetPath, err)
|
||||
}
|
||||
|
||||
_, copyErr := uc.copyWithShutdownCheck(ctx, outFile, tarReader)
|
||||
closeErr := outFile.Close()
|
||||
|
||||
if copyErr != nil {
|
||||
return fmt.Errorf("failed to write file %s: %w", targetPath, copyErr)
|
||||
}
|
||||
if closeErr != nil {
|
||||
return fmt.Errorf("failed to close file %s: %w", targetPath, closeErr)
|
||||
}
|
||||
default:
|
||||
uc.logger.Warn(
|
||||
"Skipping unsupported TAR entry type",
|
||||
"type",
|
||||
header.Typeflag,
|
||||
"name",
|
||||
header.Name,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// executePgRestore executes the pg_restore command with proper environment setup
|
||||
func (uc *RestorePostgresqlBackupUsecase) executePgRestore(
|
||||
ctx context.Context,
|
||||
@@ -621,7 +927,7 @@ func (uc *RestorePostgresqlBackupUsecase) generateFilteredTocList(
|
||||
}
|
||||
|
||||
// Write filtered TOC to temporary file
|
||||
tocFile, err := os.CreateTemp("", "pg_restore_toc_*.list")
|
||||
tocFile, err := os.CreateTemp(config.GetEnv().TempFolder, "pg_restore_toc_*.list")
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to create TOC list file: %w", err)
|
||||
}
|
||||
@@ -668,7 +974,7 @@ func (uc *RestorePostgresqlBackupUsecase) createTempPgpassFile(
|
||||
escapedPassword,
|
||||
)
|
||||
|
||||
tempDir, err := os.MkdirTemp("", "pgpass")
|
||||
tempDir, err := os.MkdirTemp(config.GetEnv().TempFolder, "pgpass_"+uuid.New().String())
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to create temporary directory: %w", err)
|
||||
}
|
||||
|
||||
9
backend/migrations/20260101200438_add_backup_format.sql
Normal file
9
backend/migrations/20260101200438_add_backup_format.sql
Normal file
@@ -0,0 +1,9 @@
|
||||
-- +goose Up
|
||||
-- +goose StatementBegin
|
||||
ALTER TABLE backups ADD COLUMN type TEXT NOT NULL DEFAULT 'DEFAULT';
|
||||
-- +goose StatementEnd
|
||||
|
||||
-- +goose Down
|
||||
-- +goose StatementBegin
|
||||
ALTER TABLE backups DROP COLUMN type;
|
||||
-- +goose StatementEnd
|
||||
Binary file not shown.
@@ -1 +0,0 @@
|
||||
This is test data for storage testing
|
||||
Binary file not shown.
@@ -1 +0,0 @@
|
||||
This is test data for storage testing
|
||||
Binary file not shown.
@@ -29,8 +29,23 @@ export const backupsApi = {
|
||||
return apiHelper.fetchDeleteRaw(`${getApplicationServer()}/api/v1/backups/${id}`);
|
||||
},
|
||||
|
||||
async downloadBackup(id: string): Promise<Blob> {
|
||||
return apiHelper.fetchGetBlob(`${getApplicationServer()}/api/v1/backups/${id}/file`);
|
||||
async downloadBackup(id: string): Promise<{ blob: Blob; filename: string }> {
|
||||
const result = await apiHelper.fetchGetBlobWithHeaders(
|
||||
`${getApplicationServer()}/api/v1/backups/${id}/file`,
|
||||
);
|
||||
|
||||
// Extract filename from Content-Disposition header
|
||||
const contentDisposition = result.headers.get('Content-Disposition');
|
||||
let filename = `backup_${id}.backup`; // fallback filename
|
||||
|
||||
if (contentDisposition) {
|
||||
const filenameMatch = contentDisposition.match(/filename="?(.+?)"?$/);
|
||||
if (filenameMatch && filenameMatch[1]) {
|
||||
filename = filenameMatch[1];
|
||||
}
|
||||
}
|
||||
|
||||
return { blob: result.blob, filename };
|
||||
},
|
||||
|
||||
async cancelBackup(id: string) {
|
||||
|
||||
@@ -9,6 +9,6 @@ export interface MongodbDatabase {
|
||||
password: string;
|
||||
database: string;
|
||||
authDatabase: string;
|
||||
useTls: boolean;
|
||||
isHttps: boolean;
|
||||
cpuCount: number;
|
||||
}
|
||||
|
||||
@@ -64,21 +64,13 @@ export const BackupsComponent = ({ database, isCanManageDBs, scrollContainerRef
|
||||
|
||||
const downloadBackup = async (backupId: string) => {
|
||||
try {
|
||||
const blob = await backupsApi.downloadBackup(backupId);
|
||||
const { blob, filename } = await backupsApi.downloadBackup(backupId);
|
||||
|
||||
// Create a download link
|
||||
const url = window.URL.createObjectURL(blob);
|
||||
const link = document.createElement('a');
|
||||
link.href = url;
|
||||
|
||||
// Find the backup to get a meaningful filename
|
||||
const backup = backups.find((b) => b.id === backupId);
|
||||
const createdAt = backup ? dayjs(backup.createdAt).format('YYYY-MM-DD_HH-mm-ss') : 'backup';
|
||||
const extension =
|
||||
database.type === DatabaseType.MYSQL || database.type === DatabaseType.MARIADB
|
||||
? '.sql.zst'
|
||||
: '.dump';
|
||||
link.download = `${database.name}_backup_${createdAt}${extension}`;
|
||||
link.download = filename;
|
||||
|
||||
// Trigger download
|
||||
document.body.appendChild(link);
|
||||
|
||||
@@ -50,13 +50,13 @@ const initializeDatabaseTypeData = (db: Database): Database => {
|
||||
|
||||
switch (db.type) {
|
||||
case DatabaseType.POSTGRES:
|
||||
return { ...base, postgresql: db.postgresql ?? ({ cpuCount: 1 } as PostgresqlDatabase) };
|
||||
return { ...base, postgresql: db.postgresql ?? ({ cpuCount: 4 } as PostgresqlDatabase) };
|
||||
case DatabaseType.MYSQL:
|
||||
return { ...base, mysql: db.mysql ?? ({} as MysqlDatabase) };
|
||||
case DatabaseType.MARIADB:
|
||||
return { ...base, mariadb: db.mariadb ?? ({} as MariadbDatabase) };
|
||||
case DatabaseType.MONGODB:
|
||||
return { ...base, mongodb: db.mongodb ?? ({ cpuCount: 1 } as MongodbDatabase) };
|
||||
return { ...base, mongodb: db.mongodb ?? ({ cpuCount: 4 } as MongodbDatabase) };
|
||||
default:
|
||||
return db;
|
||||
}
|
||||
|
||||
@@ -77,8 +77,8 @@ export const EditMongoDbSpecificDataComponent = ({
|
||||
password: result.password,
|
||||
database: result.database,
|
||||
authDatabase: result.authDatabase,
|
||||
useTls: result.useTls,
|
||||
cpuCount: 1,
|
||||
isHttps: result.useTls,
|
||||
cpuCount: 4,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -287,15 +287,15 @@ export const EditMongoDbSpecificDataComponent = ({
|
||||
)}
|
||||
|
||||
<div className="mb-1 flex w-full items-center">
|
||||
<div className="min-w-[150px]">Use TLS</div>
|
||||
<div className="min-w-[150px]">Use HTTPS</div>
|
||||
<Switch
|
||||
checked={editingDatabase.mongodb?.useTls}
|
||||
checked={editingDatabase.mongodb?.isHttps}
|
||||
onChange={(checked) => {
|
||||
if (!editingDatabase.mongodb) return;
|
||||
|
||||
setEditingDatabase({
|
||||
...editingDatabase,
|
||||
mongodb: { ...editingDatabase.mongodb, useTls: checked },
|
||||
mongodb: { ...editingDatabase.mongodb, isHttps: checked },
|
||||
});
|
||||
setIsConnectionTested(false);
|
||||
}}
|
||||
@@ -309,7 +309,7 @@ export const EditMongoDbSpecificDataComponent = ({
|
||||
<InputNumber
|
||||
min={1}
|
||||
max={16}
|
||||
value={editingDatabase.mongodb?.cpuCount || 1}
|
||||
value={editingDatabase.mongodb?.cpuCount}
|
||||
onChange={(value) => {
|
||||
if (!editingDatabase.mongodb) return;
|
||||
|
||||
|
||||
@@ -82,7 +82,7 @@ export const EditPostgreSqlSpecificDataComponent = ({
|
||||
password: result.password,
|
||||
database: result.database,
|
||||
isHttps: result.isHttps,
|
||||
cpuCount: 1,
|
||||
cpuCount: 4,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -356,36 +356,34 @@ export const EditPostgreSqlSpecificDataComponent = ({
|
||||
/>
|
||||
</div>
|
||||
|
||||
{isRestoreMode && (
|
||||
<div className="mb-5 flex w-full items-center">
|
||||
<div className="min-w-[150px]">CPU count</div>
|
||||
<div className="flex items-center">
|
||||
<InputNumber
|
||||
min={1}
|
||||
max={128}
|
||||
value={editingDatabase.postgresql?.cpuCount || 1}
|
||||
onChange={(value) => {
|
||||
if (!editingDatabase.postgresql) return;
|
||||
<div className="mb-5 flex w-full items-center">
|
||||
<div className="min-w-[150px]">CPU count</div>
|
||||
<div className="flex items-center">
|
||||
<InputNumber
|
||||
min={1}
|
||||
max={128}
|
||||
value={editingDatabase.postgresql?.cpuCount}
|
||||
onChange={(value) => {
|
||||
if (!editingDatabase.postgresql) return;
|
||||
|
||||
setEditingDatabase({
|
||||
...editingDatabase,
|
||||
postgresql: { ...editingDatabase.postgresql, cpuCount: value || 1 },
|
||||
});
|
||||
setIsConnectionTested(false);
|
||||
}}
|
||||
size="small"
|
||||
className="max-w-[75px] grow"
|
||||
/>
|
||||
setEditingDatabase({
|
||||
...editingDatabase,
|
||||
postgresql: { ...editingDatabase.postgresql, cpuCount: value || 1 },
|
||||
});
|
||||
setIsConnectionTested(false);
|
||||
}}
|
||||
size="small"
|
||||
className="max-w-[75px] grow"
|
||||
/>
|
||||
|
||||
<Tooltip
|
||||
className="cursor-pointer"
|
||||
title="Number of CPU cores to use for backup and restore operations. Higher values may speed up operations but use more resources."
|
||||
>
|
||||
<InfoCircleOutlined className="ml-2" style={{ color: 'gray' }} />
|
||||
</Tooltip>
|
||||
</div>
|
||||
<Tooltip
|
||||
className="cursor-pointer"
|
||||
title="Number of CPU cores to use for backup and restore operations. Higher values may speed up operations but use more resources."
|
||||
>
|
||||
<InfoCircleOutlined className="ml-2" style={{ color: 'gray' }} />
|
||||
</Tooltip>
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
|
||||
<div className="mt-4 mb-1 flex items-center">
|
||||
<div
|
||||
|
||||
@@ -33,8 +33,13 @@ export const ShowMongoDbSpecificDataComponent = ({ database }: Props) => {
|
||||
</div>
|
||||
|
||||
<div className="mb-1 flex w-full items-center">
|
||||
<div className="min-w-[150px]">Use TLS</div>
|
||||
<div>{database.mongodb?.useTls ? 'Yes' : 'No'}</div>
|
||||
<div className="min-w-[150px]">Use HTTPS</div>
|
||||
<div>{database.mongodb?.isHttps ? 'Yes' : 'No'}</div>
|
||||
</div>
|
||||
|
||||
<div className="mb-1 flex w-full items-center">
|
||||
<div className="min-w-[150px]">CPU count</div>
|
||||
<div>{database.mongodb?.cpuCount}</div>
|
||||
</div>
|
||||
|
||||
{database.mongodb?.authDatabase && (
|
||||
@@ -43,11 +48,6 @@ export const ShowMongoDbSpecificDataComponent = ({ database }: Props) => {
|
||||
<div>{database.mongodb.authDatabase}</div>
|
||||
</div>
|
||||
)}
|
||||
|
||||
<div className="mb-1 flex w-full items-center">
|
||||
<div className="min-w-[150px]">CPU count</div>
|
||||
<div>{database.mongodb?.cpuCount || 1}</div>
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
};
|
||||
|
||||
@@ -54,6 +54,11 @@ export const ShowPostgreSqlSpecificDataComponent = ({ database }: Props) => {
|
||||
<div>{database.postgresql?.isHttps ? 'Yes' : 'No'}</div>
|
||||
</div>
|
||||
|
||||
<div className="mb-1 flex w-full items-center">
|
||||
<div className="min-w-[150px]">CPU count</div>
|
||||
<div>{database.postgresql?.cpuCount}</div>
|
||||
</div>
|
||||
|
||||
{!!database.postgresql?.includeSchemas?.length && (
|
||||
<div className="mb-1 flex w-full items-center">
|
||||
<div className="min-w-[150px]">Include schemas</div>
|
||||
|
||||
@@ -176,6 +176,25 @@ export const apiHelper = {
|
||||
return response.blob();
|
||||
},
|
||||
|
||||
fetchGetBlobWithHeaders: async (
|
||||
url: string,
|
||||
requestOptions?: RequestOptions,
|
||||
isRetryOnError = false,
|
||||
): Promise<{ blob: Blob; headers: Headers }> => {
|
||||
const optionsWrapper = (requestOptions ?? new RequestOptions())
|
||||
.addHeader('Access-Control-Allow-Methods', 'GET')
|
||||
.addHeader('Authorization', accessTokenHelper.getAccessToken());
|
||||
|
||||
const response = await makeRequest(
|
||||
url,
|
||||
optionsWrapper,
|
||||
isRetryOnError ? 0 : REPEAT_TRIES_COUNT,
|
||||
);
|
||||
|
||||
const blob = await response.blob();
|
||||
return { blob, headers: response.headers };
|
||||
},
|
||||
|
||||
fetchPutJson: async <T>(
|
||||
url: string,
|
||||
requestOptions?: RequestOptions,
|
||||
|
||||
Reference in New Issue
Block a user