Compare commits

...

7 Commits

Author SHA1 Message Date
Rostislav Dugin
75c88bac50 FIX (webhook): Escape webhook characters 2025-12-04 14:28:49 +03:00
Rostislav Dugin
ff1b6536bf FIX (connection): Add standard_conforming_strings param when building string to connect to PG 2025-12-03 18:42:49 +03:00
Rostislav Dugin
06197f986d FIX (chunking): Add backuping chunk by chunk without buffering in RAM and improve cancelation process 2025-12-03 17:35:43 +03:00
Rostislav Dugin
fe72e9e0a6 FIX (healthcheck): Clean up healthcheck interval receving when tab changed 2025-12-03 08:08:49 +03:00
Rostislav Dugin
640cceadbd FIX (docs): Extend docs with HTTP route support 2025-12-03 07:43:00 +03:00
Rostislav Dugin
80e573fcb3 Merge pull request #121 from tylerobara/feature/add_httproute_support
FEATURE helm: Adding support for HTTPRoutes
2025-12-03 07:34:20 +03:00
Tyler Obara
35498d83f1 adding support for httperoutes 2025-12-02 17:01:38 -05:00
20 changed files with 808 additions and 114 deletions

View File

@@ -184,6 +184,8 @@ Access Postgresus at `http://<EXTERNAL-IP>` (port 80).
To customize the installation (e.g., storage size, NodePort instead of LoadBalancer), see the [Helm chart README](deploy/helm/README.md) for all configuration options.
Config uses by default LoadBalancer, but has predefined values for Ingress and HTTPRoute as well.
---
## 🚀 Usage

View File

@@ -2,20 +2,21 @@ package backups
import (
"context"
"errors"
"sync"
"github.com/google/uuid"
)
type BackupContextManager struct {
mu sync.RWMutex
cancelFuncs map[uuid.UUID]context.CancelFunc
mu sync.RWMutex
cancelFuncs map[uuid.UUID]context.CancelFunc
cancelledBackups map[uuid.UUID]bool
}
func NewBackupContextManager() *BackupContextManager {
return &BackupContextManager{
cancelFuncs: make(map[uuid.UUID]context.CancelFunc),
cancelFuncs: make(map[uuid.UUID]context.CancelFunc),
cancelledBackups: make(map[uuid.UUID]bool),
}
}
@@ -23,25 +24,37 @@ func (m *BackupContextManager) RegisterBackup(backupID uuid.UUID, cancelFunc con
m.mu.Lock()
defer m.mu.Unlock()
m.cancelFuncs[backupID] = cancelFunc
delete(m.cancelledBackups, backupID)
}
func (m *BackupContextManager) CancelBackup(backupID uuid.UUID) error {
m.mu.Lock()
defer m.mu.Unlock()
cancelFunc, exists := m.cancelFuncs[backupID]
if !exists {
return errors.New("backup is not in progress or already completed")
if m.cancelledBackups[backupID] {
return nil
}
cancelFunc()
delete(m.cancelFuncs, backupID)
cancelFunc, exists := m.cancelFuncs[backupID]
if exists {
cancelFunc()
delete(m.cancelFuncs, backupID)
}
m.cancelledBackups[backupID] = true
return nil
}
func (m *BackupContextManager) IsCancelled(backupID uuid.UUID) bool {
m.mu.RLock()
defer m.mu.RUnlock()
return m.cancelledBackups[backupID]
}
func (m *BackupContextManager) UnregisterBackup(backupID uuid.UUID) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.cancelFuncs, backupID)
delete(m.cancelledBackups, backupID)
}

View File

@@ -1,6 +1,7 @@
package backups
import (
"context"
"encoding/json"
"fmt"
"io"
@@ -701,7 +702,7 @@ func createTestBackup(
dummyContent := []byte("dummy backup content for testing")
reader := strings.NewReader(string(dummyContent))
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
if err := storages[0].SaveFile(encryption.GetFieldEncryptor(), logger, backup.ID, reader); err != nil {
if err := storages[0].SaveFile(context.Background(), encryption.GetFieldEncryptor(), logger, backup.ID, reader); err != nil {
panic(fmt.Sprintf("Failed to create test backup file: %v", err))
}

View File

@@ -275,7 +275,12 @@ func (s *BackupService) MakeBackup(databaseID uuid.UUID, isLastTry bool) {
errMsg := err.Error()
// Check if backup was cancelled (not due to shutdown)
if strings.Contains(errMsg, "backup cancelled") && !strings.Contains(errMsg, "shutdown") {
isCancelled := strings.Contains(errMsg, "backup cancelled") ||
strings.Contains(errMsg, "context canceled") ||
errors.Is(err, context.Canceled)
isShutdown := strings.Contains(errMsg, "shutdown")
if isCancelled && !isShutdown {
backup.Status = BackupStatusCanceled
backup.BackupDurationMs = time.Since(start).Milliseconds()
backup.BackupSizeMb = 0

View File

@@ -45,6 +45,11 @@ type CreatePostgresqlBackupUsecase struct {
fieldEncryptor encryption.FieldEncryptor
}
type writeResult struct {
bytesWritten int
writeErr error
}
// Execute creates a backup of the database
func (uc *CreatePostgresqlBackupUsecase) Execute(
ctx context.Context,
@@ -172,7 +177,7 @@ func (uc *CreatePostgresqlBackupUsecase) streamToStorage(
// Start streaming into storage in its own goroutine
saveErrCh := make(chan error, 1)
go func() {
saveErr := storage.SaveFile(uc.fieldEncryptor, uc.logger, backupID, storageReader)
saveErr := storage.SaveFile(ctx, uc.fieldEncryptor, uc.logger, backupID, storageReader)
saveErrCh <- saveErr
}()
@@ -195,12 +200,10 @@ func (uc *CreatePostgresqlBackupUsecase) streamToStorage(
copyResultCh <- err
}()
// Wait for the copy to finish first, then the dump process
copyErr := <-copyResultCh
bytesWritten := <-bytesWrittenCh
waitErr := cmd.Wait()
// Check for shutdown or cancellation before finalizing
select {
case <-ctx.Done():
uc.cleanupOnCancellation(encryptionWriter, storageWriter, saveErrCh)
@@ -213,7 +216,6 @@ func (uc *CreatePostgresqlBackupUsecase) streamToStorage(
return nil, err
}
// Wait until storage ends reading
saveErr := <-saveErrCh
stderrOutput := <-stderrCh
@@ -267,7 +269,23 @@ func (uc *CreatePostgresqlBackupUsecase) copyWithShutdownCheck(
bytesRead, readErr := src.Read(buf)
if bytesRead > 0 {
bytesWritten, writeErr := dst.Write(buf[0:bytesRead])
writeResultCh := make(chan writeResult, 1)
go func() {
bytesWritten, writeErr := dst.Write(buf[0:bytesRead])
writeResultCh <- writeResult{bytesWritten, writeErr}
}()
var bytesWritten int
var writeErr error
select {
case <-ctx.Done():
return totalBytesWritten, fmt.Errorf("copy cancelled during write: %w", ctx.Err())
case result := <-writeResultCh:
bytesWritten = result.bytesWritten
writeErr = result.writeErr
}
if bytesWritten < 0 || bytesRead < bytesWritten {
bytesWritten = 0
if writeErr == nil {
@@ -354,6 +372,9 @@ func (uc *CreatePostgresqlBackupUsecase) createBackupContext(
select {
case <-ctx.Done():
return
case <-parentCtx.Done():
cancel()
return
case <-ticker.C:
if config.IsShouldShutdown() {
cancel()

View File

@@ -594,7 +594,7 @@ func buildConnectionStringForDB(p *PostgresqlDatabase, dbName string, password s
}
return fmt.Sprintf(
"host=%s port=%d user=%s password=%s dbname=%s sslmode=%s default_query_exec_mode=simple_protocol",
"host=%s port=%d user=%s password=%s dbname=%s sslmode=%s default_query_exec_mode=simple_protocol standard_conforming_strings=on",
p.Host,
p.Port,
p.Username,

View File

@@ -206,8 +206,8 @@ func (t *WebhookNotifier) sendPOST(webhookURL, heading, message string, logger *
func (t *WebhookNotifier) buildRequestBody(heading, message string) []byte {
if t.BodyTemplate != nil && *t.BodyTemplate != "" {
result := *t.BodyTemplate
result = strings.ReplaceAll(result, "{{heading}}", heading)
result = strings.ReplaceAll(result, "{{message}}", message)
result = strings.ReplaceAll(result, "{{heading}}", escapeJSONString(heading))
result = strings.ReplaceAll(result, "{{message}}", escapeJSONString(message))
return []byte(result)
}
@@ -227,3 +227,17 @@ func (t *WebhookNotifier) applyHeaders(req *http.Request) {
}
}
}
func escapeJSONString(s string) string {
b, err := json.Marshal(s)
if err != nil || len(b) < 2 {
escaped := strings.ReplaceAll(s, `\`, `\\`)
escaped = strings.ReplaceAll(escaped, `"`, `\"`)
escaped = strings.ReplaceAll(escaped, "\n", `\n`)
escaped = strings.ReplaceAll(escaped, "\r", `\r`)
escaped = strings.ReplaceAll(escaped, "\t", `\t`)
return escaped
}
return string(b[1 : len(b)-1])
}

View File

@@ -1,6 +1,7 @@
package restores
import (
"context"
"encoding/json"
"fmt"
"io"
@@ -340,7 +341,7 @@ func createTestBackup(
dummyContent := []byte("dummy backup content for testing")
reader := strings.NewReader(string(dummyContent))
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
if err := storages[0].SaveFile(fieldEncryptor, logger, backup.ID, reader); err != nil {
if err := storages[0].SaveFile(context.Background(), fieldEncryptor, logger, backup.ID, reader); err != nil {
panic(fmt.Sprintf("Failed to create test backup file: %v", err))
}

View File

@@ -1,6 +1,7 @@
package storages
import (
"context"
"io"
"log/slog"
"postgresus-backend/internal/util/encryption"
@@ -10,6 +11,7 @@ import (
type StorageFileSaver interface {
SaveFile(
ctx context.Context,
encryptor encryption.FieldEncryptor,
logger *slog.Logger,
fileID uuid.UUID,

View File

@@ -1,6 +1,7 @@
package storages
import (
"context"
"errors"
"io"
"log/slog"
@@ -30,12 +31,13 @@ type Storage struct {
}
func (s *Storage) SaveFile(
ctx context.Context,
encryptor encryption.FieldEncryptor,
logger *slog.Logger,
fileID uuid.UUID,
file io.Reader,
) error {
err := s.getSpecificStorage().SaveFile(encryptor, logger, fileID, file)
err := s.getSpecificStorage().SaveFile(ctx, encryptor, logger, fileID, file)
if err != nil {
lastSaveError := err.Error()
s.LastSaveError = &lastSaveError

View File

@@ -167,6 +167,7 @@ func Test_Storage_BasicOperations(t *testing.T) {
fileID := uuid.New()
err = tc.storage.SaveFile(
context.Background(),
encryptor,
logger.GetLogger(),
fileID,
@@ -189,6 +190,7 @@ func Test_Storage_BasicOperations(t *testing.T) {
fileID := uuid.New()
err = tc.storage.SaveFile(
context.Background(),
encryptor,
logger.GetLogger(),
fileID,
@@ -238,7 +240,7 @@ func setupS3Container(ctx context.Context) (*S3Container, error) {
secretKey := "testpassword"
bucketName := "test-bucket"
region := "us-east-1"
endpoint := fmt.Sprintf("localhost:%s", env.TestMinioPort)
endpoint := fmt.Sprintf("127.0.0.1:%s", env.TestMinioPort)
// Create MinIO client and ensure bucket exists
minioClient, err := minio.New(endpoint, &minio.Options{

View File

@@ -3,19 +3,44 @@ package azure_blob_storage
import (
"bytes"
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"log/slog"
"net"
"net/http"
"postgresus-backend/internal/util/encryption"
"strings"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/google/uuid"
)
const (
azureConnectTimeout = 30 * time.Second
azureResponseTimeout = 30 * time.Second
azureIdleConnTimeout = 90 * time.Second
azureTLSHandshakeTimeout = 30 * time.Second
// Chunk size for block blob uploads - 16MB provides good balance between
// memory usage and upload efficiency. This creates backpressure to pg_dump
// by only reading one chunk at a time and waiting for Azure to confirm receipt.
azureChunkSize = 16 * 1024 * 1024
)
type readSeekCloser struct {
*bytes.Reader
}
func (r *readSeekCloser) Close() error {
return nil
}
type AuthMethod string
const (
@@ -39,27 +64,91 @@ func (s *AzureBlobStorage) TableName() string {
}
func (s *AzureBlobStorage) SaveFile(
ctx context.Context,
encryptor encryption.FieldEncryptor,
logger *slog.Logger,
fileID uuid.UUID,
file io.Reader,
) error {
select {
case <-ctx.Done():
return fmt.Errorf("upload cancelled before start: %w", ctx.Err())
default:
}
client, err := s.getClient(encryptor)
if err != nil {
return err
}
blobName := s.buildBlobName(fileID.String())
blockBlobClient := client.ServiceClient().
NewContainerClient(s.ContainerName).
NewBlockBlobClient(blobName)
_, err = client.UploadStream(
context.TODO(),
s.ContainerName,
blobName,
file,
nil,
)
var blockIDs []string
blockNumber := 0
buf := make([]byte, azureChunkSize)
for {
select {
case <-ctx.Done():
return fmt.Errorf("upload cancelled: %w", ctx.Err())
default:
}
n, readErr := io.ReadFull(file, buf)
if n == 0 && readErr == io.EOF {
break
}
if readErr != nil && readErr != io.EOF && readErr != io.ErrUnexpectedEOF {
return fmt.Errorf("read error: %w", readErr)
}
blockID := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%06d", blockNumber)))
_, err := blockBlobClient.StageBlock(
ctx,
blockID,
&readSeekCloser{bytes.NewReader(buf[:n])},
nil,
)
if err != nil {
select {
case <-ctx.Done():
return fmt.Errorf("upload cancelled: %w", ctx.Err())
default:
return fmt.Errorf("failed to stage block %d: %w", blockNumber, err)
}
}
blockIDs = append(blockIDs, blockID)
blockNumber++
if readErr == io.EOF || readErr == io.ErrUnexpectedEOF {
break
}
}
if len(blockIDs) == 0 {
_, err = client.UploadStream(
ctx,
s.ContainerName,
blobName,
bytes.NewReader([]byte{}),
nil,
)
if err != nil {
return fmt.Errorf("failed to upload empty blob: %w", err)
}
return nil
}
_, err = blockBlobClient.CommitBlockList(ctx, blockIDs, &blockblob.CommitBlockListOptions{})
if err != nil {
return fmt.Errorf("failed to upload blob to Azure: %w", err)
return fmt.Errorf("failed to commit block list: %w", err)
}
return nil
@@ -253,6 +342,8 @@ func (s *AzureBlobStorage) getClient(encryptor encryption.FieldEncryptor) (*azbl
var client *azblob.Client
var err error
clientOptions := s.buildClientOptions()
switch s.AuthMethod {
case AuthMethodConnectionString:
connectionString, decryptErr := encryptor.Decrypt(s.StorageID, s.ConnectionString)
@@ -260,7 +351,7 @@ func (s *AzureBlobStorage) getClient(encryptor encryption.FieldEncryptor) (*azbl
return nil, fmt.Errorf("failed to decrypt Azure connection string: %w", decryptErr)
}
client, err = azblob.NewClientFromConnectionString(connectionString, nil)
client, err = azblob.NewClientFromConnectionString(connectionString, clientOptions)
if err != nil {
return nil, fmt.Errorf(
"failed to create Azure Blob client from connection string: %w",
@@ -279,7 +370,7 @@ func (s *AzureBlobStorage) getClient(encryptor encryption.FieldEncryptor) (*azbl
return nil, fmt.Errorf("failed to create Azure shared key credential: %w", credErr)
}
client, err = azblob.NewClientWithSharedKeyCredential(accountURL, credential, nil)
client, err = azblob.NewClientWithSharedKeyCredential(accountURL, credential, clientOptions)
if err != nil {
return nil, fmt.Errorf("failed to create Azure Blob client with shared key: %w", err)
}
@@ -290,6 +381,26 @@ func (s *AzureBlobStorage) getClient(encryptor encryption.FieldEncryptor) (*azbl
return client, nil
}
func (s *AzureBlobStorage) buildClientOptions() *azblob.ClientOptions {
transport := &http.Transport{
DialContext: (&net.Dialer{
Timeout: azureConnectTimeout,
}).DialContext,
TLSHandshakeTimeout: azureTLSHandshakeTimeout,
ResponseHeaderTimeout: azureResponseTimeout,
IdleConnTimeout: azureIdleConnTimeout,
}
return &azblob.ClientOptions{
ClientOptions: azcore.ClientOptions{
Transport: &http.Client{Transport: transport},
Retry: policy.RetryOptions{
MaxRetries: 0,
},
},
}
}
func (s *AzureBlobStorage) buildAccountURL() string {
if s.Endpoint != "" {
endpoint := s.Endpoint

View File

@@ -7,6 +7,8 @@ import (
"fmt"
"io"
"log/slog"
"net"
"net/http"
"postgresus-backend/internal/util/encryption"
"strings"
"time"
@@ -16,9 +18,22 @@ import (
"golang.org/x/oauth2/google"
drive "google.golang.org/api/drive/v3"
"google.golang.org/api/googleapi"
"google.golang.org/api/option"
)
const (
gdConnectTimeout = 30 * time.Second
gdResponseTimeout = 30 * time.Second
gdIdleConnTimeout = 90 * time.Second
gdTLSHandshakeTimeout = 30 * time.Second
// Chunk size for Google Drive resumable uploads - 16MB provides good balance
// between memory usage and upload efficiency. Google Drive requires chunks
// to be multiples of 256KB for resumable uploads.
gdChunkSize = 16 * 1024 * 1024
)
type GoogleDriveStorage struct {
StorageID uuid.UUID `json:"storageId" gorm:"primaryKey;type:uuid;column:storage_id"`
ClientID string `json:"clientId" gorm:"not null;type:text;column:client_id"`
@@ -31,31 +46,44 @@ func (s *GoogleDriveStorage) TableName() string {
}
func (s *GoogleDriveStorage) SaveFile(
ctx context.Context,
encryptor encryption.FieldEncryptor,
logger *slog.Logger,
fileID uuid.UUID,
file io.Reader,
) error {
return s.withRetryOnAuth(encryptor, func(driveService *drive.Service) error {
ctx := context.Background()
return s.withRetryOnAuth(ctx, encryptor, func(driveService *drive.Service) error {
filename := fileID.String()
// Ensure the postgresus_backups folder exists
folderID, err := s.ensureBackupsFolderExists(ctx, driveService)
if err != nil {
return fmt.Errorf("failed to create/find backups folder: %w", err)
}
// Delete any previous copy so we keep at most one object per logical file.
_ = s.deleteByName(ctx, driveService, filename, folderID) // ignore "not found"
_ = s.deleteByName(ctx, driveService, filename, folderID)
fileMeta := &drive.File{
Name: filename,
Parents: []string{folderID},
}
_, err = driveService.Files.Create(fileMeta).Media(file).Context(ctx).Do()
backpressureReader := &backpressureReader{
reader: file,
ctx: ctx,
chunkSize: gdChunkSize,
buf: make([]byte, gdChunkSize),
}
_, err = driveService.Files.Create(fileMeta).
Media(backpressureReader, googleapi.ChunkSize(gdChunkSize)).
Context(ctx).
Do()
if err != nil {
select {
case <-ctx.Done():
return fmt.Errorf("upload cancelled: %w", ctx.Err())
default:
}
return fmt.Errorf("failed to upload file to Google Drive: %w", err)
}
@@ -70,30 +98,85 @@ func (s *GoogleDriveStorage) SaveFile(
})
}
type backpressureReader struct {
reader io.Reader
ctx context.Context
chunkSize int
buf []byte
bufStart int
bufEnd int
totalBytes int64
chunkCount int
}
func (r *backpressureReader) Read(p []byte) (n int, err error) {
select {
case <-r.ctx.Done():
return 0, r.ctx.Err()
default:
}
if r.bufStart >= r.bufEnd {
r.chunkCount++
bytesRead, readErr := io.ReadFull(r.reader, r.buf)
if bytesRead > 0 {
r.bufStart = 0
r.bufEnd = bytesRead
}
if readErr != nil && readErr != io.EOF && readErr != io.ErrUnexpectedEOF {
return 0, readErr
}
if bytesRead == 0 && readErr == io.EOF {
return 0, io.EOF
}
}
n = copy(p, r.buf[r.bufStart:r.bufEnd])
r.bufStart += n
r.totalBytes += int64(n)
if r.bufStart >= r.bufEnd {
select {
case <-r.ctx.Done():
return n, r.ctx.Err()
default:
}
}
return n, nil
}
func (s *GoogleDriveStorage) GetFile(
encryptor encryption.FieldEncryptor,
fileID uuid.UUID,
) (io.ReadCloser, error) {
var result io.ReadCloser
err := s.withRetryOnAuth(encryptor, func(driveService *drive.Service) error {
folderID, err := s.findBackupsFolder(driveService)
if err != nil {
return fmt.Errorf("failed to find backups folder: %w", err)
}
err := s.withRetryOnAuth(
context.Background(),
encryptor,
func(driveService *drive.Service) error {
folderID, err := s.findBackupsFolder(driveService)
if err != nil {
return fmt.Errorf("failed to find backups folder: %w", err)
}
fileIDGoogle, err := s.lookupFileID(driveService, fileID.String(), folderID)
if err != nil {
return err
}
fileIDGoogle, err := s.lookupFileID(driveService, fileID.String(), folderID)
if err != nil {
return err
}
resp, err := driveService.Files.Get(fileIDGoogle).Download()
if err != nil {
return fmt.Errorf("failed to download file from Google Drive: %w", err)
}
resp, err := driveService.Files.Get(fileIDGoogle).Download()
if err != nil {
return fmt.Errorf("failed to download file from Google Drive: %w", err)
}
result = resp.Body
return nil
})
result = resp.Body
return nil
},
)
return result, err
}
@@ -102,8 +185,8 @@ func (s *GoogleDriveStorage) DeleteFile(
encryptor encryption.FieldEncryptor,
fileID uuid.UUID,
) error {
return s.withRetryOnAuth(encryptor, func(driveService *drive.Service) error {
ctx := context.Background()
ctx := context.Background()
return s.withRetryOnAuth(ctx, encryptor, func(driveService *drive.Service) error {
folderID, err := s.findBackupsFolder(driveService)
if err != nil {
return fmt.Errorf("failed to find backups folder: %w", err)
@@ -142,8 +225,8 @@ func (s *GoogleDriveStorage) Validate(encryptor encryption.FieldEncryptor) error
}
func (s *GoogleDriveStorage) TestConnection(encryptor encryption.FieldEncryptor) error {
return s.withRetryOnAuth(encryptor, func(driveService *drive.Service) error {
ctx := context.Background()
ctx := context.Background()
return s.withRetryOnAuth(ctx, encryptor, func(driveService *drive.Service) error {
testFilename := "test-connection-" + uuid.New().String()
testData := []byte("test")
@@ -243,9 +326,16 @@ func (s *GoogleDriveStorage) Update(incoming *GoogleDriveStorage) {
// withRetryOnAuth executes the provided function with retry logic for authentication errors
func (s *GoogleDriveStorage) withRetryOnAuth(
ctx context.Context,
encryptor encryption.FieldEncryptor,
fn func(*drive.Service) error,
) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
driveService, err := s.getDriveService(encryptor)
if err != nil {
return err
@@ -253,6 +343,12 @@ func (s *GoogleDriveStorage) withRetryOnAuth(
err = fn(driveService)
if err != nil && s.isAuthError(err) {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Try to refresh token and retry once
fmt.Printf("Google Drive auth error detected, attempting token refresh: %v\n", err)
@@ -422,7 +518,6 @@ func (s *GoogleDriveStorage) getDriveService(
return nil, err
}
// Decrypt credentials before use
clientSecret, err := encryptor.Decrypt(s.StorageID, s.ClientSecret)
if err != nil {
return nil, fmt.Errorf("failed to decrypt Google Drive client secret: %w", err)
@@ -449,16 +544,16 @@ func (s *GoogleDriveStorage) getDriveService(
tokenSource := cfg.TokenSource(ctx, &token)
// Force token validation to ensure we're using the current token
currentToken, err := tokenSource.Token()
if err != nil {
return nil, fmt.Errorf("failed to get current token: %w", err)
}
// Create a new token source with the validated token
validatedTokenSource := oauth2.StaticTokenSource(currentToken)
driveService, err := drive.NewService(ctx, option.WithTokenSource(validatedTokenSource))
httpClient := s.buildHTTPClient(validatedTokenSource)
driveService, err := drive.NewService(ctx, option.WithHTTPClient(httpClient))
if err != nil {
return nil, fmt.Errorf("unable to create Drive client: %w", err)
}
@@ -466,6 +561,24 @@ func (s *GoogleDriveStorage) getDriveService(
return driveService, nil
}
func (s *GoogleDriveStorage) buildHTTPClient(tokenSource oauth2.TokenSource) *http.Client {
transport := &http.Transport{
DialContext: (&net.Dialer{
Timeout: gdConnectTimeout,
}).DialContext,
TLSHandshakeTimeout: gdTLSHandshakeTimeout,
ResponseHeaderTimeout: gdResponseTimeout,
IdleConnTimeout: gdIdleConnTimeout,
}
return &http.Client{
Transport: &oauth2.Transport{
Source: tokenSource,
Base: transport,
},
}
}
func (s *GoogleDriveStorage) lookupFileID(
driveService *drive.Service,
name string,

View File

@@ -1,6 +1,8 @@
package local_storage
import (
"context"
"errors"
"fmt"
"io"
"log/slog"
@@ -13,6 +15,13 @@ import (
"github.com/google/uuid"
)
const (
// Chunk size for local storage writes - 16MB provides good balance between
// memory usage and write efficiency. This creates backpressure to pg_dump
// by only reading one chunk at a time and waiting for disk to confirm receipt.
localChunkSize = 16 * 1024 * 1024
)
// LocalStorage uses ./postgresus_local_backups folder as a
// directory for backups and ./postgresus_local_temp folder as a
// directory for temp files
@@ -25,11 +34,18 @@ func (l *LocalStorage) TableName() string {
}
func (l *LocalStorage) SaveFile(
ctx context.Context,
encryptor encryption.FieldEncryptor,
logger *slog.Logger,
fileID uuid.UUID,
file io.Reader,
) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
logger.Info("Starting to save file to local storage", "fileId", fileID.String())
err := files_utils.EnsureDirectories([]string{
@@ -60,7 +76,7 @@ func (l *LocalStorage) SaveFile(
}()
logger.Debug("Copying file data to temp file", "fileId", fileID.String())
_, err = io.Copy(tempFile, file)
_, err = copyWithContext(ctx, tempFile, file)
if err != nil {
logger.Error("Failed to write to temp file", "fileId", fileID.String(), "error", err)
return fmt.Errorf("failed to write to temp file: %w", err)
@@ -175,3 +191,71 @@ func (l *LocalStorage) EncryptSensitiveData(encryptor encryption.FieldEncryptor)
func (l *LocalStorage) Update(incoming *LocalStorage) {
}
type writeResult struct {
bytesWritten int
writeErr error
}
func copyWithContext(ctx context.Context, dst io.Writer, src io.Reader) (int64, error) {
buf := make([]byte, localChunkSize)
var written int64
for {
select {
case <-ctx.Done():
return written, ctx.Err()
default:
}
nr, readErr := io.ReadFull(src, buf)
if nr == 0 && readErr == io.EOF {
break
}
if readErr != nil && readErr != io.EOF && readErr != io.ErrUnexpectedEOF {
return written, readErr
}
writeResultCh := make(chan writeResult, 1)
go func() {
nw, writeErr := dst.Write(buf[0:nr])
writeResultCh <- writeResult{nw, writeErr}
}()
var nw int
var writeErr error
select {
case <-ctx.Done():
return written, ctx.Err()
case result := <-writeResultCh:
nw = result.bytesWritten
writeErr = result.writeErr
}
if nw < 0 || nr < nw {
nw = 0
if writeErr == nil {
writeErr = errors.New("invalid write result")
}
}
if writeErr != nil {
return written, writeErr
}
if nr != nw {
return written, io.ErrShortWrite
}
written += int64(nw)
if readErr == io.EOF || readErr == io.ErrUnexpectedEOF {
break
}
}
return written, nil
}

View File

@@ -1,6 +1,7 @@
package nas_storage
import (
"context"
"crypto/tls"
"errors"
"fmt"
@@ -16,6 +17,13 @@ import (
"github.com/hirochachacha/go-smb2"
)
const (
// Chunk size for NAS uploads - 16MB provides good balance between
// memory usage and upload efficiency. This creates backpressure to pg_dump
// by only reading one chunk at a time and waiting for NAS to confirm receipt.
nasChunkSize = 16 * 1024 * 1024
)
type NASStorage struct {
StorageID uuid.UUID `json:"storageId" gorm:"primaryKey;type:uuid;column:storage_id"`
Host string `json:"host" gorm:"not null;type:text;column:host"`
@@ -33,14 +41,21 @@ func (n *NASStorage) TableName() string {
}
func (n *NASStorage) SaveFile(
ctx context.Context,
encryptor encryption.FieldEncryptor,
logger *slog.Logger,
fileID uuid.UUID,
file io.Reader,
) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
logger.Info("Starting to save file to NAS storage", "fileId", fileID.String(), "host", n.Host)
session, err := n.createSession(encryptor)
session, err := n.createSessionWithContext(ctx, encryptor)
if err != nil {
logger.Error("Failed to create NAS session", "fileId", fileID.String(), "error", err)
return fmt.Errorf("failed to create NAS session: %w", err)
@@ -121,7 +136,7 @@ func (n *NASStorage) SaveFile(
}()
logger.Debug("Copying file data to NAS", "fileId", fileID.String())
_, err = io.Copy(nasFile, file)
_, err = copyWithContext(ctx, nasFile, file)
if err != nil {
logger.Error("Failed to write file to NAS", "fileId", fileID.String(), "error", err)
return fmt.Errorf("failed to write file to NAS: %w", err)
@@ -290,20 +305,24 @@ func (n *NASStorage) Update(incoming *NASStorage) {
}
func (n *NASStorage) createSession(encryptor encryption.FieldEncryptor) (*smb2.Session, error) {
// Create connection with timeout
conn, err := n.createConnection()
return n.createSessionWithContext(context.Background(), encryptor)
}
func (n *NASStorage) createSessionWithContext(
ctx context.Context,
encryptor encryption.FieldEncryptor,
) (*smb2.Session, error) {
conn, err := n.createConnectionWithContext(ctx)
if err != nil {
return nil, err
}
// Decrypt password before use
password, err := encryptor.Decrypt(n.StorageID, n.Password)
if err != nil {
_ = conn.Close()
return nil, fmt.Errorf("failed to decrypt NAS password: %w", err)
}
// Create SMB2 dialer
d := &smb2.Dialer{
Initiator: &smb2.NTLMInitiator{
User: n.Username,
@@ -312,7 +331,6 @@ func (n *NASStorage) createSession(encryptor encryption.FieldEncryptor) (*smb2.S
},
}
// Create session
session, err := d.Dial(conn)
if err != nil {
_ = conn.Close()
@@ -322,34 +340,30 @@ func (n *NASStorage) createSession(encryptor encryption.FieldEncryptor) (*smb2.S
return session, nil
}
func (n *NASStorage) createConnection() (net.Conn, error) {
func (n *NASStorage) createConnectionWithContext(ctx context.Context) (net.Conn, error) {
address := net.JoinHostPort(n.Host, fmt.Sprintf("%d", n.Port))
// Create connection with timeout
dialer := &net.Dialer{
Timeout: 10 * time.Second,
Timeout: 30 * time.Second,
}
if n.UseSSL {
// Use TLS connection
tlsConfig := &tls.Config{
ServerName: n.Host,
InsecureSkipVerify: false, // Change to true if you want to skip cert verification
InsecureSkipVerify: false,
}
conn, err := tls.DialWithDialer(dialer, "tcp", address, tlsConfig)
if err != nil {
return nil, fmt.Errorf("failed to create SSL connection to %s: %w", address, err)
}
return conn, nil
} else {
// Use regular TCP connection
conn, err := dialer.Dial("tcp", address)
if err != nil {
return nil, fmt.Errorf("failed to create connection to %s: %w", address, err)
}
return conn, nil
}
conn, err := dialer.DialContext(ctx, "tcp", address)
if err != nil {
return nil, fmt.Errorf("failed to create connection to %s: %w", address, err)
}
return conn, nil
}
func (n *NASStorage) ensureDirectory(fs *smb2.Share, path string) error {
@@ -444,3 +458,71 @@ func (r *nasFileReader) Close() error {
return nil
}
type writeResult struct {
bytesWritten int
writeErr error
}
func copyWithContext(ctx context.Context, dst io.Writer, src io.Reader) (int64, error) {
buf := make([]byte, nasChunkSize)
var written int64
for {
select {
case <-ctx.Done():
return written, ctx.Err()
default:
}
nr, readErr := io.ReadFull(src, buf)
if nr == 0 && readErr == io.EOF {
break
}
if readErr != nil && readErr != io.EOF && readErr != io.ErrUnexpectedEOF {
return written, readErr
}
writeResultCh := make(chan writeResult, 1)
go func() {
nw, writeErr := dst.Write(buf[0:nr])
writeResultCh <- writeResult{nw, writeErr}
}()
var nw int
var writeErr error
select {
case <-ctx.Done():
return written, ctx.Err()
case result := <-writeResultCh:
nw = result.bytesWritten
writeErr = result.writeErr
}
if nw < 0 || nr < nw {
nw = 0
if writeErr == nil {
writeErr = errors.New("invalid write result")
}
}
if writeErr != nil {
return written, writeErr
}
if nr != nw {
return written, io.ErrShortWrite
}
written += int64(nw)
if readErr == io.EOF || readErr == io.ErrUnexpectedEOF {
break
}
}
return written, nil
}

View File

@@ -7,6 +7,8 @@ import (
"fmt"
"io"
"log/slog"
"net"
"net/http"
"postgresus-backend/internal/util/encryption"
"strings"
"time"
@@ -16,6 +18,18 @@ import (
"github.com/minio/minio-go/v7/pkg/credentials"
)
const (
s3ConnectTimeout = 30 * time.Second
s3ResponseTimeout = 30 * time.Second
s3IdleConnTimeout = 90 * time.Second
s3TLSHandshakeTimeout = 30 * time.Second
// Chunk size for multipart uploads - 16MB provides good balance between
// memory usage and upload efficiency. This creates backpressure to pg_dump
// by only reading one chunk at a time and waiting for S3 to confirm receipt.
multipartChunkSize = 16 * 1024 * 1024
)
type S3Storage struct {
StorageID uuid.UUID `json:"storageId" gorm:"primaryKey;type:uuid;column:storage_id"`
S3Bucket string `json:"s3Bucket" gorm:"not null;type:text;column:s3_bucket"`
@@ -33,29 +47,123 @@ func (s *S3Storage) TableName() string {
}
func (s *S3Storage) SaveFile(
ctx context.Context,
encryptor encryption.FieldEncryptor,
logger *slog.Logger,
fileID uuid.UUID,
file io.Reader,
) error {
client, err := s.getClient(encryptor)
select {
case <-ctx.Done():
return fmt.Errorf("upload cancelled before start: %w", ctx.Err())
default:
}
coreClient, err := s.getCoreClient(encryptor)
if err != nil {
return err
}
objectKey := s.buildObjectKey(fileID.String())
// Upload the file using MinIO client with streaming (size = -1 for unknown size)
_, err = client.PutObject(
context.TODO(),
uploadID, err := coreClient.NewMultipartUpload(
ctx,
s.S3Bucket,
objectKey,
file,
-1,
minio.PutObjectOptions{},
)
if err != nil {
return fmt.Errorf("failed to upload file to S3: %w", err)
return fmt.Errorf("failed to initiate multipart upload: %w", err)
}
var parts []minio.CompletePart
partNumber := 1
buf := make([]byte, multipartChunkSize)
for {
select {
case <-ctx.Done():
_ = coreClient.AbortMultipartUpload(ctx, s.S3Bucket, objectKey, uploadID)
return fmt.Errorf("upload cancelled: %w", ctx.Err())
default:
}
n, readErr := io.ReadFull(file, buf)
if n == 0 && readErr == io.EOF {
break
}
if readErr != nil && readErr != io.EOF && readErr != io.ErrUnexpectedEOF {
_ = coreClient.AbortMultipartUpload(ctx, s.S3Bucket, objectKey, uploadID)
return fmt.Errorf("read error: %w", readErr)
}
part, err := coreClient.PutObjectPart(
ctx,
s.S3Bucket,
objectKey,
uploadID,
partNumber,
bytes.NewReader(buf[:n]),
int64(n),
minio.PutObjectPartOptions{},
)
if err != nil {
_ = coreClient.AbortMultipartUpload(ctx, s.S3Bucket, objectKey, uploadID)
select {
case <-ctx.Done():
return fmt.Errorf("upload cancelled: %w", ctx.Err())
default:
return fmt.Errorf("failed to upload part %d: %w", partNumber, err)
}
}
parts = append(parts, minio.CompletePart{
PartNumber: partNumber,
ETag: part.ETag,
})
partNumber++
if readErr == io.EOF || readErr == io.ErrUnexpectedEOF {
break
}
}
if len(parts) == 0 {
_ = coreClient.AbortMultipartUpload(ctx, s.S3Bucket, objectKey, uploadID)
client, err := s.getClient(encryptor)
if err != nil {
return err
}
_, err = client.PutObject(
ctx,
s.S3Bucket,
objectKey,
bytes.NewReader([]byte{}),
0,
minio.PutObjectOptions{},
)
if err != nil {
return fmt.Errorf("failed to upload empty file: %w", err)
}
return nil
}
_, err = coreClient.CompleteMultipartUpload(
ctx,
s.S3Bucket,
objectKey,
uploadID,
parts,
minio.PutObjectOptions{},
)
if err != nil {
_ = coreClient.AbortMultipartUpload(ctx, s.S3Bucket, objectKey, uploadID)
return fmt.Errorf("failed to complete multipart upload: %w", err)
}
return nil
@@ -252,8 +360,54 @@ func (s *S3Storage) buildObjectKey(fileName string) string {
}
func (s *S3Storage) getClient(encryptor encryption.FieldEncryptor) (*minio.Client, error) {
endpoint := s.S3Endpoint
useSSL := true
endpoint, useSSL, accessKey, secretKey, bucketLookup, transport, err := s.getClientParams(
encryptor,
)
if err != nil {
return nil, err
}
minioClient, err := minio.New(endpoint, &minio.Options{
Creds: credentials.NewStaticV4(accessKey, secretKey, ""),
Secure: useSSL,
Region: s.S3Region,
BucketLookup: bucketLookup,
Transport: transport,
})
if err != nil {
return nil, fmt.Errorf("failed to initialize MinIO client: %w", err)
}
return minioClient, nil
}
func (s *S3Storage) getCoreClient(encryptor encryption.FieldEncryptor) (*minio.Core, error) {
endpoint, useSSL, accessKey, secretKey, bucketLookup, transport, err := s.getClientParams(
encryptor,
)
if err != nil {
return nil, err
}
coreClient, err := minio.NewCore(endpoint, &minio.Options{
Creds: credentials.NewStaticV4(accessKey, secretKey, ""),
Secure: useSSL,
Region: s.S3Region,
BucketLookup: bucketLookup,
Transport: transport,
})
if err != nil {
return nil, fmt.Errorf("failed to initialize MinIO Core client: %w", err)
}
return coreClient, nil
}
func (s *S3Storage) getClientParams(
encryptor encryption.FieldEncryptor,
) (endpoint string, useSSL bool, accessKey string, secretKey string, bucketLookup minio.BucketLookupType, transport *http.Transport, err error) {
endpoint = s.S3Endpoint
useSSL = true
if strings.HasPrefix(endpoint, "http://") {
useSSL = false
@@ -262,38 +416,33 @@ func (s *S3Storage) getClient(encryptor encryption.FieldEncryptor) (*minio.Clien
endpoint = strings.TrimPrefix(endpoint, "https://")
}
// If no endpoint is provided, use the AWS S3 endpoint for the region
if endpoint == "" {
endpoint = fmt.Sprintf("s3.%s.amazonaws.com", s.S3Region)
}
// Decrypt credentials before use
accessKey, err := encryptor.Decrypt(s.StorageID, s.S3AccessKey)
accessKey, err = encryptor.Decrypt(s.StorageID, s.S3AccessKey)
if err != nil {
return nil, fmt.Errorf("failed to decrypt S3 access key: %w", err)
return "", false, "", "", 0, nil, fmt.Errorf("failed to decrypt S3 access key: %w", err)
}
secretKey, err := encryptor.Decrypt(s.StorageID, s.S3SecretKey)
secretKey, err = encryptor.Decrypt(s.StorageID, s.S3SecretKey)
if err != nil {
return nil, fmt.Errorf("failed to decrypt S3 secret key: %w", err)
return "", false, "", "", 0, nil, fmt.Errorf("failed to decrypt S3 secret key: %w", err)
}
// Configure bucket lookup strategy
bucketLookup := minio.BucketLookupAuto
bucketLookup = minio.BucketLookupAuto
if s.S3UseVirtualHostedStyle {
bucketLookup = minio.BucketLookupDNS
}
// Initialize the MinIO client
minioClient, err := minio.New(endpoint, &minio.Options{
Creds: credentials.NewStaticV4(accessKey, secretKey, ""),
Secure: useSSL,
Region: s.S3Region,
BucketLookup: bucketLookup,
})
if err != nil {
return nil, fmt.Errorf("failed to initialize MinIO client: %w", err)
transport = &http.Transport{
DialContext: (&net.Dialer{
Timeout: s3ConnectTimeout,
}).DialContext,
TLSHandshakeTimeout: s3TLSHandshakeTimeout,
ResponseHeaderTimeout: s3ResponseTimeout,
IdleConnTimeout: s3IdleConnTimeout,
}
return minioClient, nil
return endpoint, useSSL, accessKey, secretKey, bucketLookup, transport, nil
}

View File

@@ -55,9 +55,17 @@ Access Postgresus at `http://<EXTERNAL-IP>` (port 80).
| `service.targetPort` | Container port | `4005` |
| `service.headless.enabled` | Enable headless service | `true` |
### Ingress (Optional)
### Traffic Exposure (3 Options)
Ingress is disabled by default. The chart uses LoadBalancer service for direct IP access.
The chart supports 3 ways to expose Postgresus:
| Method | Use Case | Default |
| ------ | -------- | ------- |
| **LoadBalancer/NodePort** | Simple cloud clusters | Enabled |
| **Ingress** | Traditional nginx/traefik ingress controllers | Disabled |
| **HTTPRoute (Gateway API)** | Modern gateways (Istio, Envoy, Cilium) | Disabled |
#### Ingress
| Parameter | Description | Default Value |
| ----------------------- | ----------------- | ------------------------ |
@@ -66,6 +74,16 @@ Ingress is disabled by default. The chart uses LoadBalancer service for direct I
| `ingress.hosts[0].host` | Hostname | `postgresus.example.com` |
| `ingress.tls` | TLS configuration | `[]` |
#### HTTPRoute (Gateway API)
| Parameter | Description | Default Value |
| --------------------- | -------------------------- | ---------------------------------- |
| `route.enabled` | Enable HTTPRoute | `false` |
| `route.apiVersion` | Gateway API version | `gateway.networking.k8s.io/v1` |
| `route.hostnames` | Hostnames for the route | `["postgresus.example.com"]` |
| `route.parentRefs` | Gateway references | `[]` |
| `route.annotations` | Route annotations | `{}` |
### Health Checks
| Parameter | Description | Default Value |
@@ -136,6 +154,28 @@ ingress:
helm install postgresus ./deploy/helm -n postgresus --create-namespace -f ingress-values.yaml
```
### HTTPRoute (Gateway API)
For clusters using Istio, Envoy Gateway, Cilium, or other Gateway API implementations:
```yaml
# httproute-values.yaml
service:
type: ClusterIP
route:
enabled: true
hostnames:
- backup.example.com
parentRefs:
- name: my-gateway
namespace: istio-system
```
```bash
helm install postgresus ./deploy/helm -n postgresus --create-namespace -f httproute-values.yaml
```
### Custom Storage Size
```yaml

View File

@@ -0,0 +1,35 @@
{{- if .Values.route.enabled -}}
apiVersion: {{ .Values.route.apiVersion}}
kind: {{ .Values.route.kind}}
metadata:
name: {{ template "postgresus.fullname" . }}
annotations: {{ toYaml .Values.route.annotations | nindent 4 }}
labels:
app.kubernetes.io/component: "app"
{{- include "postgresus.labels" . | nindent 4 }}
spec:
{{- with .Values.route.parentRefs }}
parentRefs:
{{- toYaml . | nindent 4 }}
{{- end }}
{{- with .Values.route.hostnames }}
hostnames:
{{- toYaml . | nindent 4 }}
{{- end }}
rules:
- backendRefs:
- name: {{ template "postgresus.fullname" . }}-service
port: {{ .Values.service.port }}
{{- with .Values.route.filters }}
filters:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.route.matches }}
matches:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.route.timeouts }}
timeouts:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- end }}

View File

@@ -55,6 +55,19 @@ ingress:
pathType: Prefix
tls: []
# HTTPRoute configuration for Gateway API
route:
enabled: false
apiVersion: gateway.networking.k8s.io/v1
kind: HTTPRoute
annotations: {}
hostnames:
- postgresus.example.com
parentRefs: []
filters: []
matches: []
timeouts: {}
# Health checks configuration
# Note: The application only has /api/v1/system/health endpoint
livenessProbe:

View File

@@ -79,9 +79,12 @@ export const HealthckeckAttemptsComponent = ({ database }: Props) => {
useEffect(() => {
let interval: number | null = null;
let isCancelled = false;
setIsHealthcheckConfigLoading(true);
healthcheckConfigApi.getHealthcheckConfig(database.id).then((healthcheckConfig) => {
if (isCancelled) return;
setIsHealthcheckConfigLoading(false);
if (healthcheckConfig.isHealthcheckEnabled) {
@@ -93,17 +96,18 @@ export const HealthckeckAttemptsComponent = ({ database }: Props) => {
if (period === 'today') {
interval = setInterval(() => {
loadHealthcheckAttempts(false);
}, 60_000); // 5 seconds
}, 60_000);
}
}
});
return () => {
isCancelled = true;
if (interval) {
clearInterval(interval);
}
};
}, [period]);
}, [database.id, period]);
if (isHealthcheckConfigLoading) {
return (