mirror of
https://github.com/databasus/databasus.git
synced 2026-04-06 00:32:03 +02:00
Compare commits
26 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
97ee4b55c2 | ||
|
|
12eea72392 | ||
|
|
75c88bac50 | ||
|
|
ff1b6536bf | ||
|
|
06197f986d | ||
|
|
fe72e9e0a6 | ||
|
|
640cceadbd | ||
|
|
80e573fcb3 | ||
|
|
35498d83f1 | ||
|
|
77ae8d1ac7 | ||
|
|
2f20845b3d | ||
|
|
a3d3df4093 | ||
|
|
8db83d40d5 | ||
|
|
065ded37bd | ||
|
|
71e801debb | ||
|
|
ffd4e3a27b | ||
|
|
d2a9085591 | ||
|
|
6f0152b60c | ||
|
|
7007236f2f | ||
|
|
db55cad310 | ||
|
|
25bd096c81 | ||
|
|
7e98dd578c | ||
|
|
ba37b30e83 | ||
|
|
34b3f822e3 | ||
|
|
14700130b7 | ||
|
|
de11ab8d8a |
34
.github/workflows/ci-release.yml
vendored
34
.github/workflows/ci-release.yml
vendored
@@ -465,3 +465,37 @@ jobs:
|
||||
body: ${{ steps.changelog.outputs.changelog }}
|
||||
draft: false
|
||||
prerelease: false
|
||||
|
||||
publish-helm-chart:
|
||||
runs-on: ubuntu-latest
|
||||
needs: [determine-version, build-and-push]
|
||||
if: ${{ needs.determine-version.outputs.should_release == 'true' }}
|
||||
permissions:
|
||||
contents: read
|
||||
packages: write
|
||||
steps:
|
||||
- name: Check out code
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Helm
|
||||
uses: azure/setup-helm@v4
|
||||
with:
|
||||
version: v3.14.0
|
||||
|
||||
- name: Log in to GHCR
|
||||
run: echo "${{ secrets.GITHUB_TOKEN }}" | helm registry login ghcr.io -u ${{ github.actor }} --password-stdin
|
||||
|
||||
- name: Update Chart.yaml with release version
|
||||
run: |
|
||||
VERSION="${{ needs.determine-version.outputs.new_version }}"
|
||||
sed -i "s/^version: .*/version: ${VERSION}/" deploy/helm/Chart.yaml
|
||||
sed -i "s/^appVersion: .*/appVersion: \"v${VERSION}\"/" deploy/helm/Chart.yaml
|
||||
cat deploy/helm/Chart.yaml
|
||||
|
||||
- name: Package Helm chart
|
||||
run: helm package deploy/helm --destination .
|
||||
|
||||
- name: Push Helm chart to GHCR
|
||||
run: |
|
||||
VERSION="${{ needs.determine-version.outputs.new_version }}"
|
||||
helm push postgresus-${VERSION}.tgz oci://ghcr.io/rostislavdugin/charts
|
||||
|
||||
40
README.md
40
README.md
@@ -157,6 +157,46 @@ Then run:
|
||||
docker compose up -d
|
||||
```
|
||||
|
||||
### Option 4: Kubernetes with Helm
|
||||
|
||||
For Kubernetes deployments, install directly from the OCI registry.
|
||||
|
||||
**With ClusterIP + port-forward (development/testing):**
|
||||
|
||||
```bash
|
||||
helm install postgresus oci://ghcr.io/rostislavdugin/charts/postgresus \
|
||||
-n postgresus --create-namespace
|
||||
```
|
||||
|
||||
```bash
|
||||
kubectl port-forward svc/postgresus-service 4005:4005 -n postgresus
|
||||
# Access at http://localhost:4005
|
||||
```
|
||||
|
||||
**With LoadBalancer (cloud environments):**
|
||||
|
||||
```bash
|
||||
helm install postgresus oci://ghcr.io/rostislavdugin/charts/postgresus \
|
||||
-n postgresus --create-namespace \
|
||||
--set service.type=LoadBalancer
|
||||
```
|
||||
|
||||
```bash
|
||||
kubectl get svc postgresus-service -n postgresus
|
||||
# Access at http://<EXTERNAL-IP>:4005
|
||||
```
|
||||
|
||||
**With Ingress (domain-based access):**
|
||||
|
||||
```bash
|
||||
helm install postgresus oci://ghcr.io/rostislavdugin/charts/postgresus \
|
||||
-n postgresus --create-namespace \
|
||||
--set ingress.enabled=true \
|
||||
--set ingress.hosts[0].host=backup.example.com
|
||||
```
|
||||
|
||||
For more options (NodePort, TLS, HTTPRoute for Gateway API), see the [Helm chart README](deploy/helm/README.md).
|
||||
|
||||
---
|
||||
|
||||
## 🚀 Usage
|
||||
|
||||
@@ -2,20 +2,21 @@ package backups
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type BackupContextManager struct {
|
||||
mu sync.RWMutex
|
||||
cancelFuncs map[uuid.UUID]context.CancelFunc
|
||||
mu sync.RWMutex
|
||||
cancelFuncs map[uuid.UUID]context.CancelFunc
|
||||
cancelledBackups map[uuid.UUID]bool
|
||||
}
|
||||
|
||||
func NewBackupContextManager() *BackupContextManager {
|
||||
return &BackupContextManager{
|
||||
cancelFuncs: make(map[uuid.UUID]context.CancelFunc),
|
||||
cancelFuncs: make(map[uuid.UUID]context.CancelFunc),
|
||||
cancelledBackups: make(map[uuid.UUID]bool),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,25 +24,37 @@ func (m *BackupContextManager) RegisterBackup(backupID uuid.UUID, cancelFunc con
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.cancelFuncs[backupID] = cancelFunc
|
||||
delete(m.cancelledBackups, backupID)
|
||||
}
|
||||
|
||||
func (m *BackupContextManager) CancelBackup(backupID uuid.UUID) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
cancelFunc, exists := m.cancelFuncs[backupID]
|
||||
if !exists {
|
||||
return errors.New("backup is not in progress or already completed")
|
||||
if m.cancelledBackups[backupID] {
|
||||
return nil
|
||||
}
|
||||
|
||||
cancelFunc()
|
||||
delete(m.cancelFuncs, backupID)
|
||||
cancelFunc, exists := m.cancelFuncs[backupID]
|
||||
if exists {
|
||||
cancelFunc()
|
||||
delete(m.cancelFuncs, backupID)
|
||||
}
|
||||
|
||||
m.cancelledBackups[backupID] = true
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *BackupContextManager) IsCancelled(backupID uuid.UUID) bool {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
return m.cancelledBackups[backupID]
|
||||
}
|
||||
|
||||
func (m *BackupContextManager) UnregisterBackup(backupID uuid.UUID) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
delete(m.cancelFuncs, backupID)
|
||||
delete(m.cancelledBackups, backupID)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package backups
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -701,7 +702,7 @@ func createTestBackup(
|
||||
dummyContent := []byte("dummy backup content for testing")
|
||||
reader := strings.NewReader(string(dummyContent))
|
||||
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
|
||||
if err := storages[0].SaveFile(encryption.GetFieldEncryptor(), logger, backup.ID, reader); err != nil {
|
||||
if err := storages[0].SaveFile(context.Background(), encryption.GetFieldEncryptor(), logger, backup.ID, reader); err != nil {
|
||||
panic(fmt.Sprintf("Failed to create test backup file: %v", err))
|
||||
}
|
||||
|
||||
|
||||
@@ -275,7 +275,12 @@ func (s *BackupService) MakeBackup(databaseID uuid.UUID, isLastTry bool) {
|
||||
errMsg := err.Error()
|
||||
|
||||
// Check if backup was cancelled (not due to shutdown)
|
||||
if strings.Contains(errMsg, "backup cancelled") && !strings.Contains(errMsg, "shutdown") {
|
||||
isCancelled := strings.Contains(errMsg, "backup cancelled") ||
|
||||
strings.Contains(errMsg, "context canceled") ||
|
||||
errors.Is(err, context.Canceled)
|
||||
isShutdown := strings.Contains(errMsg, "shutdown")
|
||||
|
||||
if isCancelled && !isShutdown {
|
||||
backup.Status = BackupStatusCanceled
|
||||
backup.BackupDurationMs = time.Since(start).Milliseconds()
|
||||
backup.BackupSizeMb = 0
|
||||
|
||||
@@ -45,6 +45,11 @@ type CreatePostgresqlBackupUsecase struct {
|
||||
fieldEncryptor encryption.FieldEncryptor
|
||||
}
|
||||
|
||||
type writeResult struct {
|
||||
bytesWritten int
|
||||
writeErr error
|
||||
}
|
||||
|
||||
// Execute creates a backup of the database
|
||||
func (uc *CreatePostgresqlBackupUsecase) Execute(
|
||||
ctx context.Context,
|
||||
@@ -172,7 +177,7 @@ func (uc *CreatePostgresqlBackupUsecase) streamToStorage(
|
||||
// Start streaming into storage in its own goroutine
|
||||
saveErrCh := make(chan error, 1)
|
||||
go func() {
|
||||
saveErr := storage.SaveFile(uc.fieldEncryptor, uc.logger, backupID, storageReader)
|
||||
saveErr := storage.SaveFile(ctx, uc.fieldEncryptor, uc.logger, backupID, storageReader)
|
||||
saveErrCh <- saveErr
|
||||
}()
|
||||
|
||||
@@ -195,12 +200,10 @@ func (uc *CreatePostgresqlBackupUsecase) streamToStorage(
|
||||
copyResultCh <- err
|
||||
}()
|
||||
|
||||
// Wait for the copy to finish first, then the dump process
|
||||
copyErr := <-copyResultCh
|
||||
bytesWritten := <-bytesWrittenCh
|
||||
waitErr := cmd.Wait()
|
||||
|
||||
// Check for shutdown or cancellation before finalizing
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
uc.cleanupOnCancellation(encryptionWriter, storageWriter, saveErrCh)
|
||||
@@ -213,7 +216,6 @@ func (uc *CreatePostgresqlBackupUsecase) streamToStorage(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Wait until storage ends reading
|
||||
saveErr := <-saveErrCh
|
||||
stderrOutput := <-stderrCh
|
||||
|
||||
@@ -267,7 +269,23 @@ func (uc *CreatePostgresqlBackupUsecase) copyWithShutdownCheck(
|
||||
|
||||
bytesRead, readErr := src.Read(buf)
|
||||
if bytesRead > 0 {
|
||||
bytesWritten, writeErr := dst.Write(buf[0:bytesRead])
|
||||
writeResultCh := make(chan writeResult, 1)
|
||||
go func() {
|
||||
bytesWritten, writeErr := dst.Write(buf[0:bytesRead])
|
||||
writeResultCh <- writeResult{bytesWritten, writeErr}
|
||||
}()
|
||||
|
||||
var bytesWritten int
|
||||
var writeErr error
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return totalBytesWritten, fmt.Errorf("copy cancelled during write: %w", ctx.Err())
|
||||
case result := <-writeResultCh:
|
||||
bytesWritten = result.bytesWritten
|
||||
writeErr = result.writeErr
|
||||
}
|
||||
|
||||
if bytesWritten < 0 || bytesRead < bytesWritten {
|
||||
bytesWritten = 0
|
||||
if writeErr == nil {
|
||||
@@ -354,6 +372,9 @@ func (uc *CreatePostgresqlBackupUsecase) createBackupContext(
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-parentCtx.Done():
|
||||
cancel()
|
||||
return
|
||||
case <-ticker.C:
|
||||
if config.IsShouldShutdown() {
|
||||
cancel()
|
||||
@@ -417,7 +438,6 @@ func (uc *CreatePostgresqlBackupUsecase) setupPgEnvironment(
|
||||
"PGCONNECT_TIMEOUT="+strconv.Itoa(pgConnectTimeout),
|
||||
"LC_ALL=C.UTF-8",
|
||||
"LANG=C.UTF-8",
|
||||
"PGOPTIONS=--client-encoding=UTF8",
|
||||
)
|
||||
|
||||
if shouldRequireSSL {
|
||||
@@ -611,7 +631,6 @@ func (uc *CreatePostgresqlBackupUsecase) handleExitCode1NoStderr(
|
||||
"PGCONNECT_TIMEOUT=" + strconv.Itoa(pgConnectTimeout),
|
||||
"LC_ALL=C.UTF-8",
|
||||
"LANG=C.UTF-8",
|
||||
"PGOPTIONS=--client-encoding=UTF8",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@@ -594,7 +594,7 @@ func buildConnectionStringForDB(p *PostgresqlDatabase, dbName string, password s
|
||||
}
|
||||
|
||||
return fmt.Sprintf(
|
||||
"host=%s port=%d user=%s password=%s dbname=%s sslmode=%s default_query_exec_mode=simple_protocol",
|
||||
"host=%s port=%d user=%s password=%s dbname=%s sslmode=%s default_query_exec_mode=simple_protocol standard_conforming_strings=on",
|
||||
p.Host,
|
||||
p.Port,
|
||||
p.Username,
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
package email_notifier
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net/smtp"
|
||||
)
|
||||
|
||||
type loginAuth struct {
|
||||
username, password string
|
||||
}
|
||||
|
||||
func (a *loginAuth) Start(server *smtp.ServerInfo) (string, []byte, error) {
|
||||
return "LOGIN", []byte{}, nil
|
||||
}
|
||||
|
||||
func (a *loginAuth) Next(fromServer []byte, more bool) ([]byte, error) {
|
||||
if more {
|
||||
switch string(fromServer) {
|
||||
case "Username:":
|
||||
return []byte(a.username), nil
|
||||
case "Password:":
|
||||
return []byte(a.password), nil
|
||||
default:
|
||||
return nil, errors.New("unknown LOGIN challenge: " + string(fromServer))
|
||||
}
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
@@ -58,11 +58,10 @@ func (e *EmailNotifier) Validate(encryptor encryption.FieldEncryptor) error {
|
||||
|
||||
func (e *EmailNotifier) Send(
|
||||
encryptor encryption.FieldEncryptor,
|
||||
logger *slog.Logger,
|
||||
_ *slog.Logger,
|
||||
heading string,
|
||||
message string,
|
||||
) error {
|
||||
// Decrypt SMTP password if provided
|
||||
var smtpPassword string
|
||||
if e.SMTPPassword != "" {
|
||||
decrypted, err := encryptor.Decrypt(e.NotifierID, e.SMTPPassword)
|
||||
@@ -72,7 +71,6 @@ func (e *EmailNotifier) Send(
|
||||
smtpPassword = decrypted
|
||||
}
|
||||
|
||||
// Compose email
|
||||
from := e.From
|
||||
if from == "" {
|
||||
from = e.SMTPUser
|
||||
@@ -81,153 +79,13 @@ func (e *EmailNotifier) Send(
|
||||
}
|
||||
}
|
||||
|
||||
to := []string{e.TargetEmail}
|
||||
|
||||
// Format the email content
|
||||
subject := fmt.Sprintf("Subject: %s\r\n", heading)
|
||||
mime := fmt.Sprintf(
|
||||
"MIME-version: 1.0;\nContent-Type: %s; charset=\"%s\";\n\n",
|
||||
MIMETypeHTML,
|
||||
MIMECharsetUTF8,
|
||||
)
|
||||
body := message
|
||||
fromHeader := fmt.Sprintf("From: %s\r\n", from)
|
||||
toHeader := fmt.Sprintf("To: %s\r\n", e.TargetEmail)
|
||||
|
||||
// Combine all parts of the email
|
||||
emailContent := []byte(fromHeader + toHeader + subject + mime + body)
|
||||
|
||||
addr := net.JoinHostPort(e.SMTPHost, fmt.Sprintf("%d", e.SMTPPort))
|
||||
timeout := DefaultTimeout
|
||||
|
||||
// Determine if authentication is required
|
||||
emailContent := e.buildEmailContent(heading, message, from)
|
||||
isAuthRequired := e.SMTPUser != "" && smtpPassword != ""
|
||||
|
||||
// Handle different port scenarios
|
||||
if e.SMTPPort == ImplicitTLSPort {
|
||||
// Implicit TLS (port 465)
|
||||
// Set up TLS config
|
||||
tlsConfig := &tls.Config{
|
||||
ServerName: e.SMTPHost,
|
||||
}
|
||||
|
||||
// Dial with timeout
|
||||
dialer := &net.Dialer{Timeout: timeout}
|
||||
conn, err := tls.DialWithDialer(dialer, "tcp", addr, tlsConfig)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to SMTP server: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
// Create SMTP client
|
||||
client, err := smtp.NewClient(conn, e.SMTPHost)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create SMTP client: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
_ = client.Quit()
|
||||
}()
|
||||
|
||||
// Set up authentication only if credentials are provided
|
||||
if isAuthRequired {
|
||||
auth := smtp.PlainAuth("", e.SMTPUser, smtpPassword, e.SMTPHost)
|
||||
if err := client.Auth(auth); err != nil {
|
||||
return fmt.Errorf("SMTP authentication failed: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Set sender and recipients
|
||||
if err := client.Mail(from); err != nil {
|
||||
return fmt.Errorf("failed to set sender: %w", err)
|
||||
}
|
||||
for _, recipient := range to {
|
||||
if err := client.Rcpt(recipient); err != nil {
|
||||
return fmt.Errorf("failed to set recipient: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Send the email body
|
||||
writer, err := client.Data()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get data writer: %w", err)
|
||||
}
|
||||
_, err = writer.Write(emailContent)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to write email content: %w", err)
|
||||
}
|
||||
err = writer.Close()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to close data writer: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
} else {
|
||||
// STARTTLS (port 587) or other ports
|
||||
// Create a custom dialer with timeout
|
||||
dialer := &net.Dialer{Timeout: timeout}
|
||||
conn, err := dialer.Dial("tcp", addr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to SMTP server: %w", err)
|
||||
}
|
||||
|
||||
// Create client from connection
|
||||
client, err := smtp.NewClient(conn, e.SMTPHost)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create SMTP client: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
_ = client.Quit()
|
||||
}()
|
||||
|
||||
// Send email using the client
|
||||
if err := client.Hello(DefaultHelloName); err != nil {
|
||||
return fmt.Errorf("SMTP hello failed: %w", err)
|
||||
}
|
||||
|
||||
// Start TLS if available
|
||||
if ok, _ := client.Extension("STARTTLS"); ok {
|
||||
if err := client.StartTLS(&tls.Config{ServerName: e.SMTPHost}); err != nil {
|
||||
return fmt.Errorf("STARTTLS failed: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Authenticate only if credentials are provided
|
||||
if isAuthRequired {
|
||||
auth := smtp.PlainAuth("", e.SMTPUser, smtpPassword, e.SMTPHost)
|
||||
if err := client.Auth(auth); err != nil {
|
||||
return fmt.Errorf("SMTP authentication failed: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := client.Mail(from); err != nil {
|
||||
return fmt.Errorf("failed to set sender: %w", err)
|
||||
}
|
||||
|
||||
for _, recipient := range to {
|
||||
if err := client.Rcpt(recipient); err != nil {
|
||||
return fmt.Errorf("failed to set recipient: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
writer, err := client.Data()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get data writer: %w", err)
|
||||
}
|
||||
|
||||
_, err = writer.Write(emailContent)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to write email content: %w", err)
|
||||
}
|
||||
|
||||
err = writer.Close()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to close data writer: %w", err)
|
||||
}
|
||||
|
||||
return client.Quit()
|
||||
return e.sendImplicitTLS(emailContent, from, smtpPassword, isAuthRequired)
|
||||
}
|
||||
return e.sendStartTLS(emailContent, from, smtpPassword, isAuthRequired)
|
||||
}
|
||||
|
||||
func (e *EmailNotifier) HideSensitiveData() {
|
||||
@@ -256,3 +114,166 @@ func (e *EmailNotifier) EncryptSensitiveData(encryptor encryption.FieldEncryptor
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *EmailNotifier) buildEmailContent(heading, message, from string) []byte {
|
||||
subject := fmt.Sprintf("Subject: %s\r\n", heading)
|
||||
mime := fmt.Sprintf(
|
||||
"MIME-version: 1.0;\nContent-Type: %s; charset=\"%s\";\n\n",
|
||||
MIMETypeHTML,
|
||||
MIMECharsetUTF8,
|
||||
)
|
||||
fromHeader := fmt.Sprintf("From: %s\r\n", from)
|
||||
toHeader := fmt.Sprintf("To: %s\r\n", e.TargetEmail)
|
||||
return []byte(fromHeader + toHeader + subject + mime + message)
|
||||
}
|
||||
|
||||
func (e *EmailNotifier) sendImplicitTLS(
|
||||
emailContent []byte,
|
||||
from string,
|
||||
password string,
|
||||
isAuthRequired bool,
|
||||
) error {
|
||||
createClient := func() (*smtp.Client, func(), error) {
|
||||
return e.createImplicitTLSClient()
|
||||
}
|
||||
|
||||
client, cleanup, err := e.authenticateWithRetry(createClient, password, isAuthRequired)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer cleanup()
|
||||
|
||||
return e.sendEmail(client, from, emailContent)
|
||||
}
|
||||
|
||||
func (e *EmailNotifier) sendStartTLS(
|
||||
emailContent []byte,
|
||||
from string,
|
||||
password string,
|
||||
isAuthRequired bool,
|
||||
) error {
|
||||
createClient := func() (*smtp.Client, func(), error) {
|
||||
return e.createStartTLSClient()
|
||||
}
|
||||
|
||||
client, cleanup, err := e.authenticateWithRetry(createClient, password, isAuthRequired)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer cleanup()
|
||||
|
||||
return e.sendEmail(client, from, emailContent)
|
||||
}
|
||||
|
||||
func (e *EmailNotifier) createImplicitTLSClient() (*smtp.Client, func(), error) {
|
||||
addr := net.JoinHostPort(e.SMTPHost, fmt.Sprintf("%d", e.SMTPPort))
|
||||
tlsConfig := &tls.Config{ServerName: e.SMTPHost}
|
||||
dialer := &net.Dialer{Timeout: DefaultTimeout}
|
||||
|
||||
conn, err := tls.DialWithDialer(dialer, "tcp", addr, tlsConfig)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to connect to SMTP server: %w", err)
|
||||
}
|
||||
|
||||
client, err := smtp.NewClient(conn, e.SMTPHost)
|
||||
if err != nil {
|
||||
_ = conn.Close()
|
||||
return nil, nil, fmt.Errorf("failed to create SMTP client: %w", err)
|
||||
}
|
||||
|
||||
return client, func() { _ = client.Quit() }, nil
|
||||
}
|
||||
|
||||
func (e *EmailNotifier) createStartTLSClient() (*smtp.Client, func(), error) {
|
||||
addr := net.JoinHostPort(e.SMTPHost, fmt.Sprintf("%d", e.SMTPPort))
|
||||
dialer := &net.Dialer{Timeout: DefaultTimeout}
|
||||
|
||||
conn, err := dialer.Dial("tcp", addr)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to connect to SMTP server: %w", err)
|
||||
}
|
||||
|
||||
client, err := smtp.NewClient(conn, e.SMTPHost)
|
||||
if err != nil {
|
||||
_ = conn.Close()
|
||||
return nil, nil, fmt.Errorf("failed to create SMTP client: %w", err)
|
||||
}
|
||||
|
||||
if err := client.Hello(DefaultHelloName); err != nil {
|
||||
_ = client.Quit()
|
||||
_ = conn.Close()
|
||||
return nil, nil, fmt.Errorf("SMTP hello failed: %w", err)
|
||||
}
|
||||
|
||||
if ok, _ := client.Extension("STARTTLS"); ok {
|
||||
if err := client.StartTLS(&tls.Config{ServerName: e.SMTPHost}); err != nil {
|
||||
_ = client.Quit()
|
||||
_ = conn.Close()
|
||||
return nil, nil, fmt.Errorf("STARTTLS failed: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return client, func() { _ = client.Quit() }, nil
|
||||
}
|
||||
|
||||
func (e *EmailNotifier) authenticateWithRetry(
|
||||
createClient func() (*smtp.Client, func(), error),
|
||||
password string,
|
||||
isAuthRequired bool,
|
||||
) (*smtp.Client, func(), error) {
|
||||
client, cleanup, err := createClient()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if !isAuthRequired {
|
||||
return client, cleanup, nil
|
||||
}
|
||||
|
||||
// Try PLAIN auth first
|
||||
plainAuth := smtp.PlainAuth("", e.SMTPUser, password, e.SMTPHost)
|
||||
if err := client.Auth(plainAuth); err == nil {
|
||||
return client, cleanup, nil
|
||||
}
|
||||
|
||||
// PLAIN auth failed, connection may be closed - recreate and try LOGIN auth
|
||||
cleanup()
|
||||
|
||||
client, cleanup, err = createClient()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
loginAuth := &loginAuth{username: e.SMTPUser, password: password}
|
||||
if err := client.Auth(loginAuth); err != nil {
|
||||
cleanup()
|
||||
return nil, nil, fmt.Errorf("SMTP authentication failed: %w", err)
|
||||
}
|
||||
|
||||
return client, cleanup, nil
|
||||
}
|
||||
|
||||
func (e *EmailNotifier) sendEmail(client *smtp.Client, from string, content []byte) error {
|
||||
if err := client.Mail(from); err != nil {
|
||||
return fmt.Errorf("failed to set sender: %w", err)
|
||||
}
|
||||
|
||||
if err := client.Rcpt(e.TargetEmail); err != nil {
|
||||
return fmt.Errorf("failed to set recipient: %w", err)
|
||||
}
|
||||
|
||||
writer, err := client.Data()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get data writer: %w", err)
|
||||
}
|
||||
|
||||
if _, err = writer.Write(content); err != nil {
|
||||
return fmt.Errorf("failed to write email content: %w", err)
|
||||
}
|
||||
|
||||
if err = writer.Close(); err != nil {
|
||||
return fmt.Errorf("failed to close data writer: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -206,8 +206,8 @@ func (t *WebhookNotifier) sendPOST(webhookURL, heading, message string, logger *
|
||||
func (t *WebhookNotifier) buildRequestBody(heading, message string) []byte {
|
||||
if t.BodyTemplate != nil && *t.BodyTemplate != "" {
|
||||
result := *t.BodyTemplate
|
||||
result = strings.ReplaceAll(result, "{{heading}}", heading)
|
||||
result = strings.ReplaceAll(result, "{{message}}", message)
|
||||
result = strings.ReplaceAll(result, "{{heading}}", escapeJSONString(heading))
|
||||
result = strings.ReplaceAll(result, "{{message}}", escapeJSONString(message))
|
||||
return []byte(result)
|
||||
}
|
||||
|
||||
@@ -227,3 +227,17 @@ func (t *WebhookNotifier) applyHeaders(req *http.Request) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func escapeJSONString(s string) string {
|
||||
b, err := json.Marshal(s)
|
||||
if err != nil || len(b) < 2 {
|
||||
escaped := strings.ReplaceAll(s, `\`, `\\`)
|
||||
escaped = strings.ReplaceAll(escaped, `"`, `\"`)
|
||||
escaped = strings.ReplaceAll(escaped, "\n", `\n`)
|
||||
escaped = strings.ReplaceAll(escaped, "\r", `\r`)
|
||||
escaped = strings.ReplaceAll(escaped, "\t", `\t`)
|
||||
return escaped
|
||||
}
|
||||
|
||||
return string(b[1 : len(b)-1])
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package restores
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -340,7 +341,7 @@ func createTestBackup(
|
||||
dummyContent := []byte("dummy backup content for testing")
|
||||
reader := strings.NewReader(string(dummyContent))
|
||||
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
|
||||
if err := storages[0].SaveFile(fieldEncryptor, logger, backup.ID, reader); err != nil {
|
||||
if err := storages[0].SaveFile(context.Background(), fieldEncryptor, logger, backup.ID, reader); err != nil {
|
||||
panic(fmt.Sprintf("Failed to create test backup file: %v", err))
|
||||
}
|
||||
|
||||
|
||||
@@ -378,7 +378,6 @@ func (uc *RestorePostgresqlBackupUsecase) setupPgRestoreEnvironment(
|
||||
// Add encoding-related environment variables
|
||||
cmd.Env = append(cmd.Env, "LC_ALL=C.UTF-8")
|
||||
cmd.Env = append(cmd.Env, "LANG=C.UTF-8")
|
||||
cmd.Env = append(cmd.Env, "PGOPTIONS=--client-encoding=UTF8")
|
||||
|
||||
shouldRequireSSL := pgConfig.IsHttps
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package storages
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"log/slog"
|
||||
"postgresus-backend/internal/util/encryption"
|
||||
@@ -10,6 +11,7 @@ import (
|
||||
|
||||
type StorageFileSaver interface {
|
||||
SaveFile(
|
||||
ctx context.Context,
|
||||
encryptor encryption.FieldEncryptor,
|
||||
logger *slog.Logger,
|
||||
fileID uuid.UUID,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package storages
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"log/slog"
|
||||
@@ -30,12 +31,13 @@ type Storage struct {
|
||||
}
|
||||
|
||||
func (s *Storage) SaveFile(
|
||||
ctx context.Context,
|
||||
encryptor encryption.FieldEncryptor,
|
||||
logger *slog.Logger,
|
||||
fileID uuid.UUID,
|
||||
file io.Reader,
|
||||
) error {
|
||||
err := s.getSpecificStorage().SaveFile(encryptor, logger, fileID, file)
|
||||
err := s.getSpecificStorage().SaveFile(ctx, encryptor, logger, fileID, file)
|
||||
if err != nil {
|
||||
lastSaveError := err.Error()
|
||||
s.LastSaveError = &lastSaveError
|
||||
|
||||
@@ -167,6 +167,7 @@ func Test_Storage_BasicOperations(t *testing.T) {
|
||||
fileID := uuid.New()
|
||||
|
||||
err = tc.storage.SaveFile(
|
||||
context.Background(),
|
||||
encryptor,
|
||||
logger.GetLogger(),
|
||||
fileID,
|
||||
@@ -189,6 +190,7 @@ func Test_Storage_BasicOperations(t *testing.T) {
|
||||
|
||||
fileID := uuid.New()
|
||||
err = tc.storage.SaveFile(
|
||||
context.Background(),
|
||||
encryptor,
|
||||
logger.GetLogger(),
|
||||
fileID,
|
||||
@@ -238,7 +240,7 @@ func setupS3Container(ctx context.Context) (*S3Container, error) {
|
||||
secretKey := "testpassword"
|
||||
bucketName := "test-bucket"
|
||||
region := "us-east-1"
|
||||
endpoint := fmt.Sprintf("localhost:%s", env.TestMinioPort)
|
||||
endpoint := fmt.Sprintf("127.0.0.1:%s", env.TestMinioPort)
|
||||
|
||||
// Create MinIO client and ensure bucket exists
|
||||
minioClient, err := minio.New(endpoint, &minio.Options{
|
||||
|
||||
@@ -3,19 +3,44 @@ package azure_blob_storage
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net"
|
||||
"net/http"
|
||||
"postgresus-backend/internal/util/encryption"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
const (
|
||||
azureConnectTimeout = 30 * time.Second
|
||||
azureResponseTimeout = 30 * time.Second
|
||||
azureIdleConnTimeout = 90 * time.Second
|
||||
azureTLSHandshakeTimeout = 30 * time.Second
|
||||
|
||||
// Chunk size for block blob uploads - 16MB provides good balance between
|
||||
// memory usage and upload efficiency. This creates backpressure to pg_dump
|
||||
// by only reading one chunk at a time and waiting for Azure to confirm receipt.
|
||||
azureChunkSize = 16 * 1024 * 1024
|
||||
)
|
||||
|
||||
type readSeekCloser struct {
|
||||
*bytes.Reader
|
||||
}
|
||||
|
||||
func (r *readSeekCloser) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type AuthMethod string
|
||||
|
||||
const (
|
||||
@@ -39,27 +64,91 @@ func (s *AzureBlobStorage) TableName() string {
|
||||
}
|
||||
|
||||
func (s *AzureBlobStorage) SaveFile(
|
||||
ctx context.Context,
|
||||
encryptor encryption.FieldEncryptor,
|
||||
logger *slog.Logger,
|
||||
fileID uuid.UUID,
|
||||
file io.Reader,
|
||||
) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("upload cancelled before start: %w", ctx.Err())
|
||||
default:
|
||||
}
|
||||
|
||||
client, err := s.getClient(encryptor)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
blobName := s.buildBlobName(fileID.String())
|
||||
blockBlobClient := client.ServiceClient().
|
||||
NewContainerClient(s.ContainerName).
|
||||
NewBlockBlobClient(blobName)
|
||||
|
||||
_, err = client.UploadStream(
|
||||
context.TODO(),
|
||||
s.ContainerName,
|
||||
blobName,
|
||||
file,
|
||||
nil,
|
||||
)
|
||||
var blockIDs []string
|
||||
blockNumber := 0
|
||||
buf := make([]byte, azureChunkSize)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("upload cancelled: %w", ctx.Err())
|
||||
default:
|
||||
}
|
||||
|
||||
n, readErr := io.ReadFull(file, buf)
|
||||
|
||||
if n == 0 && readErr == io.EOF {
|
||||
break
|
||||
}
|
||||
|
||||
if readErr != nil && readErr != io.EOF && readErr != io.ErrUnexpectedEOF {
|
||||
return fmt.Errorf("read error: %w", readErr)
|
||||
}
|
||||
|
||||
blockID := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%06d", blockNumber)))
|
||||
|
||||
_, err := blockBlobClient.StageBlock(
|
||||
ctx,
|
||||
blockID,
|
||||
&readSeekCloser{bytes.NewReader(buf[:n])},
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("upload cancelled: %w", ctx.Err())
|
||||
default:
|
||||
return fmt.Errorf("failed to stage block %d: %w", blockNumber, err)
|
||||
}
|
||||
}
|
||||
|
||||
blockIDs = append(blockIDs, blockID)
|
||||
blockNumber++
|
||||
|
||||
if readErr == io.EOF || readErr == io.ErrUnexpectedEOF {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if len(blockIDs) == 0 {
|
||||
_, err = client.UploadStream(
|
||||
ctx,
|
||||
s.ContainerName,
|
||||
blobName,
|
||||
bytes.NewReader([]byte{}),
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to upload empty blob: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err = blockBlobClient.CommitBlockList(ctx, blockIDs, &blockblob.CommitBlockListOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to upload blob to Azure: %w", err)
|
||||
return fmt.Errorf("failed to commit block list: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -253,6 +342,8 @@ func (s *AzureBlobStorage) getClient(encryptor encryption.FieldEncryptor) (*azbl
|
||||
var client *azblob.Client
|
||||
var err error
|
||||
|
||||
clientOptions := s.buildClientOptions()
|
||||
|
||||
switch s.AuthMethod {
|
||||
case AuthMethodConnectionString:
|
||||
connectionString, decryptErr := encryptor.Decrypt(s.StorageID, s.ConnectionString)
|
||||
@@ -260,7 +351,7 @@ func (s *AzureBlobStorage) getClient(encryptor encryption.FieldEncryptor) (*azbl
|
||||
return nil, fmt.Errorf("failed to decrypt Azure connection string: %w", decryptErr)
|
||||
}
|
||||
|
||||
client, err = azblob.NewClientFromConnectionString(connectionString, nil)
|
||||
client, err = azblob.NewClientFromConnectionString(connectionString, clientOptions)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"failed to create Azure Blob client from connection string: %w",
|
||||
@@ -279,7 +370,7 @@ func (s *AzureBlobStorage) getClient(encryptor encryption.FieldEncryptor) (*azbl
|
||||
return nil, fmt.Errorf("failed to create Azure shared key credential: %w", credErr)
|
||||
}
|
||||
|
||||
client, err = azblob.NewClientWithSharedKeyCredential(accountURL, credential, nil)
|
||||
client, err = azblob.NewClientWithSharedKeyCredential(accountURL, credential, clientOptions)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create Azure Blob client with shared key: %w", err)
|
||||
}
|
||||
@@ -290,6 +381,26 @@ func (s *AzureBlobStorage) getClient(encryptor encryption.FieldEncryptor) (*azbl
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func (s *AzureBlobStorage) buildClientOptions() *azblob.ClientOptions {
|
||||
transport := &http.Transport{
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: azureConnectTimeout,
|
||||
}).DialContext,
|
||||
TLSHandshakeTimeout: azureTLSHandshakeTimeout,
|
||||
ResponseHeaderTimeout: azureResponseTimeout,
|
||||
IdleConnTimeout: azureIdleConnTimeout,
|
||||
}
|
||||
|
||||
return &azblob.ClientOptions{
|
||||
ClientOptions: azcore.ClientOptions{
|
||||
Transport: &http.Client{Transport: transport},
|
||||
Retry: policy.RetryOptions{
|
||||
MaxRetries: 0,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (s *AzureBlobStorage) buildAccountURL() string {
|
||||
if s.Endpoint != "" {
|
||||
endpoint := s.Endpoint
|
||||
|
||||
@@ -7,6 +7,8 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net"
|
||||
"net/http"
|
||||
"postgresus-backend/internal/util/encryption"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -16,9 +18,22 @@ import (
|
||||
"golang.org/x/oauth2/google"
|
||||
|
||||
drive "google.golang.org/api/drive/v3"
|
||||
"google.golang.org/api/googleapi"
|
||||
"google.golang.org/api/option"
|
||||
)
|
||||
|
||||
const (
|
||||
gdConnectTimeout = 30 * time.Second
|
||||
gdResponseTimeout = 30 * time.Second
|
||||
gdIdleConnTimeout = 90 * time.Second
|
||||
gdTLSHandshakeTimeout = 30 * time.Second
|
||||
|
||||
// Chunk size for Google Drive resumable uploads - 16MB provides good balance
|
||||
// between memory usage and upload efficiency. Google Drive requires chunks
|
||||
// to be multiples of 256KB for resumable uploads.
|
||||
gdChunkSize = 16 * 1024 * 1024
|
||||
)
|
||||
|
||||
type GoogleDriveStorage struct {
|
||||
StorageID uuid.UUID `json:"storageId" gorm:"primaryKey;type:uuid;column:storage_id"`
|
||||
ClientID string `json:"clientId" gorm:"not null;type:text;column:client_id"`
|
||||
@@ -31,31 +46,44 @@ func (s *GoogleDriveStorage) TableName() string {
|
||||
}
|
||||
|
||||
func (s *GoogleDriveStorage) SaveFile(
|
||||
ctx context.Context,
|
||||
encryptor encryption.FieldEncryptor,
|
||||
logger *slog.Logger,
|
||||
fileID uuid.UUID,
|
||||
file io.Reader,
|
||||
) error {
|
||||
return s.withRetryOnAuth(encryptor, func(driveService *drive.Service) error {
|
||||
ctx := context.Background()
|
||||
return s.withRetryOnAuth(ctx, encryptor, func(driveService *drive.Service) error {
|
||||
filename := fileID.String()
|
||||
|
||||
// Ensure the postgresus_backups folder exists
|
||||
folderID, err := s.ensureBackupsFolderExists(ctx, driveService)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create/find backups folder: %w", err)
|
||||
}
|
||||
|
||||
// Delete any previous copy so we keep at most one object per logical file.
|
||||
_ = s.deleteByName(ctx, driveService, filename, folderID) // ignore "not found"
|
||||
_ = s.deleteByName(ctx, driveService, filename, folderID)
|
||||
|
||||
fileMeta := &drive.File{
|
||||
Name: filename,
|
||||
Parents: []string{folderID},
|
||||
}
|
||||
|
||||
_, err = driveService.Files.Create(fileMeta).Media(file).Context(ctx).Do()
|
||||
backpressureReader := &backpressureReader{
|
||||
reader: file,
|
||||
ctx: ctx,
|
||||
chunkSize: gdChunkSize,
|
||||
buf: make([]byte, gdChunkSize),
|
||||
}
|
||||
|
||||
_, err = driveService.Files.Create(fileMeta).
|
||||
Media(backpressureReader, googleapi.ChunkSize(gdChunkSize)).
|
||||
Context(ctx).
|
||||
Do()
|
||||
if err != nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("upload cancelled: %w", ctx.Err())
|
||||
default:
|
||||
}
|
||||
return fmt.Errorf("failed to upload file to Google Drive: %w", err)
|
||||
}
|
||||
|
||||
@@ -70,30 +98,85 @@ func (s *GoogleDriveStorage) SaveFile(
|
||||
})
|
||||
}
|
||||
|
||||
type backpressureReader struct {
|
||||
reader io.Reader
|
||||
ctx context.Context
|
||||
chunkSize int
|
||||
buf []byte
|
||||
bufStart int
|
||||
bufEnd int
|
||||
totalBytes int64
|
||||
chunkCount int
|
||||
}
|
||||
|
||||
func (r *backpressureReader) Read(p []byte) (n int, err error) {
|
||||
select {
|
||||
case <-r.ctx.Done():
|
||||
return 0, r.ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if r.bufStart >= r.bufEnd {
|
||||
r.chunkCount++
|
||||
|
||||
bytesRead, readErr := io.ReadFull(r.reader, r.buf)
|
||||
if bytesRead > 0 {
|
||||
r.bufStart = 0
|
||||
r.bufEnd = bytesRead
|
||||
}
|
||||
|
||||
if readErr != nil && readErr != io.EOF && readErr != io.ErrUnexpectedEOF {
|
||||
return 0, readErr
|
||||
}
|
||||
|
||||
if bytesRead == 0 && readErr == io.EOF {
|
||||
return 0, io.EOF
|
||||
}
|
||||
}
|
||||
|
||||
n = copy(p, r.buf[r.bufStart:r.bufEnd])
|
||||
r.bufStart += n
|
||||
r.totalBytes += int64(n)
|
||||
|
||||
if r.bufStart >= r.bufEnd {
|
||||
select {
|
||||
case <-r.ctx.Done():
|
||||
return n, r.ctx.Err()
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (s *GoogleDriveStorage) GetFile(
|
||||
encryptor encryption.FieldEncryptor,
|
||||
fileID uuid.UUID,
|
||||
) (io.ReadCloser, error) {
|
||||
var result io.ReadCloser
|
||||
err := s.withRetryOnAuth(encryptor, func(driveService *drive.Service) error {
|
||||
folderID, err := s.findBackupsFolder(driveService)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to find backups folder: %w", err)
|
||||
}
|
||||
err := s.withRetryOnAuth(
|
||||
context.Background(),
|
||||
encryptor,
|
||||
func(driveService *drive.Service) error {
|
||||
folderID, err := s.findBackupsFolder(driveService)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to find backups folder: %w", err)
|
||||
}
|
||||
|
||||
fileIDGoogle, err := s.lookupFileID(driveService, fileID.String(), folderID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fileIDGoogle, err := s.lookupFileID(driveService, fileID.String(), folderID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resp, err := driveService.Files.Get(fileIDGoogle).Download()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to download file from Google Drive: %w", err)
|
||||
}
|
||||
resp, err := driveService.Files.Get(fileIDGoogle).Download()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to download file from Google Drive: %w", err)
|
||||
}
|
||||
|
||||
result = resp.Body
|
||||
return nil
|
||||
})
|
||||
result = resp.Body
|
||||
return nil
|
||||
},
|
||||
)
|
||||
|
||||
return result, err
|
||||
}
|
||||
@@ -102,8 +185,8 @@ func (s *GoogleDriveStorage) DeleteFile(
|
||||
encryptor encryption.FieldEncryptor,
|
||||
fileID uuid.UUID,
|
||||
) error {
|
||||
return s.withRetryOnAuth(encryptor, func(driveService *drive.Service) error {
|
||||
ctx := context.Background()
|
||||
ctx := context.Background()
|
||||
return s.withRetryOnAuth(ctx, encryptor, func(driveService *drive.Service) error {
|
||||
folderID, err := s.findBackupsFolder(driveService)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to find backups folder: %w", err)
|
||||
@@ -142,8 +225,8 @@ func (s *GoogleDriveStorage) Validate(encryptor encryption.FieldEncryptor) error
|
||||
}
|
||||
|
||||
func (s *GoogleDriveStorage) TestConnection(encryptor encryption.FieldEncryptor) error {
|
||||
return s.withRetryOnAuth(encryptor, func(driveService *drive.Service) error {
|
||||
ctx := context.Background()
|
||||
ctx := context.Background()
|
||||
return s.withRetryOnAuth(ctx, encryptor, func(driveService *drive.Service) error {
|
||||
testFilename := "test-connection-" + uuid.New().String()
|
||||
testData := []byte("test")
|
||||
|
||||
@@ -243,9 +326,16 @@ func (s *GoogleDriveStorage) Update(incoming *GoogleDriveStorage) {
|
||||
|
||||
// withRetryOnAuth executes the provided function with retry logic for authentication errors
|
||||
func (s *GoogleDriveStorage) withRetryOnAuth(
|
||||
ctx context.Context,
|
||||
encryptor encryption.FieldEncryptor,
|
||||
fn func(*drive.Service) error,
|
||||
) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
driveService, err := s.getDriveService(encryptor)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -253,6 +343,12 @@ func (s *GoogleDriveStorage) withRetryOnAuth(
|
||||
|
||||
err = fn(driveService)
|
||||
if err != nil && s.isAuthError(err) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
// Try to refresh token and retry once
|
||||
fmt.Printf("Google Drive auth error detected, attempting token refresh: %v\n", err)
|
||||
|
||||
@@ -422,7 +518,6 @@ func (s *GoogleDriveStorage) getDriveService(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Decrypt credentials before use
|
||||
clientSecret, err := encryptor.Decrypt(s.StorageID, s.ClientSecret)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to decrypt Google Drive client secret: %w", err)
|
||||
@@ -449,16 +544,16 @@ func (s *GoogleDriveStorage) getDriveService(
|
||||
|
||||
tokenSource := cfg.TokenSource(ctx, &token)
|
||||
|
||||
// Force token validation to ensure we're using the current token
|
||||
currentToken, err := tokenSource.Token()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get current token: %w", err)
|
||||
}
|
||||
|
||||
// Create a new token source with the validated token
|
||||
validatedTokenSource := oauth2.StaticTokenSource(currentToken)
|
||||
|
||||
driveService, err := drive.NewService(ctx, option.WithTokenSource(validatedTokenSource))
|
||||
httpClient := s.buildHTTPClient(validatedTokenSource)
|
||||
|
||||
driveService, err := drive.NewService(ctx, option.WithHTTPClient(httpClient))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to create Drive client: %w", err)
|
||||
}
|
||||
@@ -466,6 +561,24 @@ func (s *GoogleDriveStorage) getDriveService(
|
||||
return driveService, nil
|
||||
}
|
||||
|
||||
func (s *GoogleDriveStorage) buildHTTPClient(tokenSource oauth2.TokenSource) *http.Client {
|
||||
transport := &http.Transport{
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: gdConnectTimeout,
|
||||
}).DialContext,
|
||||
TLSHandshakeTimeout: gdTLSHandshakeTimeout,
|
||||
ResponseHeaderTimeout: gdResponseTimeout,
|
||||
IdleConnTimeout: gdIdleConnTimeout,
|
||||
}
|
||||
|
||||
return &http.Client{
|
||||
Transport: &oauth2.Transport{
|
||||
Source: tokenSource,
|
||||
Base: transport,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (s *GoogleDriveStorage) lookupFileID(
|
||||
driveService *drive.Service,
|
||||
name string,
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package local_storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
@@ -13,6 +15,13 @@ import (
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
const (
|
||||
// Chunk size for local storage writes - 16MB provides good balance between
|
||||
// memory usage and write efficiency. This creates backpressure to pg_dump
|
||||
// by only reading one chunk at a time and waiting for disk to confirm receipt.
|
||||
localChunkSize = 16 * 1024 * 1024
|
||||
)
|
||||
|
||||
// LocalStorage uses ./postgresus_local_backups folder as a
|
||||
// directory for backups and ./postgresus_local_temp folder as a
|
||||
// directory for temp files
|
||||
@@ -25,11 +34,18 @@ func (l *LocalStorage) TableName() string {
|
||||
}
|
||||
|
||||
func (l *LocalStorage) SaveFile(
|
||||
ctx context.Context,
|
||||
encryptor encryption.FieldEncryptor,
|
||||
logger *slog.Logger,
|
||||
fileID uuid.UUID,
|
||||
file io.Reader,
|
||||
) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
logger.Info("Starting to save file to local storage", "fileId", fileID.String())
|
||||
|
||||
err := files_utils.EnsureDirectories([]string{
|
||||
@@ -60,7 +76,7 @@ func (l *LocalStorage) SaveFile(
|
||||
}()
|
||||
|
||||
logger.Debug("Copying file data to temp file", "fileId", fileID.String())
|
||||
_, err = io.Copy(tempFile, file)
|
||||
_, err = copyWithContext(ctx, tempFile, file)
|
||||
if err != nil {
|
||||
logger.Error("Failed to write to temp file", "fileId", fileID.String(), "error", err)
|
||||
return fmt.Errorf("failed to write to temp file: %w", err)
|
||||
@@ -175,3 +191,71 @@ func (l *LocalStorage) EncryptSensitiveData(encryptor encryption.FieldEncryptor)
|
||||
|
||||
func (l *LocalStorage) Update(incoming *LocalStorage) {
|
||||
}
|
||||
|
||||
type writeResult struct {
|
||||
bytesWritten int
|
||||
writeErr error
|
||||
}
|
||||
|
||||
func copyWithContext(ctx context.Context, dst io.Writer, src io.Reader) (int64, error) {
|
||||
buf := make([]byte, localChunkSize)
|
||||
var written int64
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return written, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
nr, readErr := io.ReadFull(src, buf)
|
||||
|
||||
if nr == 0 && readErr == io.EOF {
|
||||
break
|
||||
}
|
||||
|
||||
if readErr != nil && readErr != io.EOF && readErr != io.ErrUnexpectedEOF {
|
||||
return written, readErr
|
||||
}
|
||||
|
||||
writeResultCh := make(chan writeResult, 1)
|
||||
go func() {
|
||||
nw, writeErr := dst.Write(buf[0:nr])
|
||||
writeResultCh <- writeResult{nw, writeErr}
|
||||
}()
|
||||
|
||||
var nw int
|
||||
var writeErr error
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return written, ctx.Err()
|
||||
case result := <-writeResultCh:
|
||||
nw = result.bytesWritten
|
||||
writeErr = result.writeErr
|
||||
}
|
||||
|
||||
if nw < 0 || nr < nw {
|
||||
nw = 0
|
||||
if writeErr == nil {
|
||||
writeErr = errors.New("invalid write result")
|
||||
}
|
||||
}
|
||||
|
||||
if writeErr != nil {
|
||||
return written, writeErr
|
||||
}
|
||||
|
||||
if nr != nw {
|
||||
return written, io.ErrShortWrite
|
||||
}
|
||||
|
||||
written += int64(nw)
|
||||
|
||||
if readErr == io.EOF || readErr == io.ErrUnexpectedEOF {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return written, nil
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package nas_storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -16,6 +17,13 @@ import (
|
||||
"github.com/hirochachacha/go-smb2"
|
||||
)
|
||||
|
||||
const (
|
||||
// Chunk size for NAS uploads - 16MB provides good balance between
|
||||
// memory usage and upload efficiency. This creates backpressure to pg_dump
|
||||
// by only reading one chunk at a time and waiting for NAS to confirm receipt.
|
||||
nasChunkSize = 16 * 1024 * 1024
|
||||
)
|
||||
|
||||
type NASStorage struct {
|
||||
StorageID uuid.UUID `json:"storageId" gorm:"primaryKey;type:uuid;column:storage_id"`
|
||||
Host string `json:"host" gorm:"not null;type:text;column:host"`
|
||||
@@ -33,14 +41,21 @@ func (n *NASStorage) TableName() string {
|
||||
}
|
||||
|
||||
func (n *NASStorage) SaveFile(
|
||||
ctx context.Context,
|
||||
encryptor encryption.FieldEncryptor,
|
||||
logger *slog.Logger,
|
||||
fileID uuid.UUID,
|
||||
file io.Reader,
|
||||
) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
logger.Info("Starting to save file to NAS storage", "fileId", fileID.String(), "host", n.Host)
|
||||
|
||||
session, err := n.createSession(encryptor)
|
||||
session, err := n.createSessionWithContext(ctx, encryptor)
|
||||
if err != nil {
|
||||
logger.Error("Failed to create NAS session", "fileId", fileID.String(), "error", err)
|
||||
return fmt.Errorf("failed to create NAS session: %w", err)
|
||||
@@ -121,7 +136,7 @@ func (n *NASStorage) SaveFile(
|
||||
}()
|
||||
|
||||
logger.Debug("Copying file data to NAS", "fileId", fileID.String())
|
||||
_, err = io.Copy(nasFile, file)
|
||||
_, err = copyWithContext(ctx, nasFile, file)
|
||||
if err != nil {
|
||||
logger.Error("Failed to write file to NAS", "fileId", fileID.String(), "error", err)
|
||||
return fmt.Errorf("failed to write file to NAS: %w", err)
|
||||
@@ -290,20 +305,24 @@ func (n *NASStorage) Update(incoming *NASStorage) {
|
||||
}
|
||||
|
||||
func (n *NASStorage) createSession(encryptor encryption.FieldEncryptor) (*smb2.Session, error) {
|
||||
// Create connection with timeout
|
||||
conn, err := n.createConnection()
|
||||
return n.createSessionWithContext(context.Background(), encryptor)
|
||||
}
|
||||
|
||||
func (n *NASStorage) createSessionWithContext(
|
||||
ctx context.Context,
|
||||
encryptor encryption.FieldEncryptor,
|
||||
) (*smb2.Session, error) {
|
||||
conn, err := n.createConnectionWithContext(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Decrypt password before use
|
||||
password, err := encryptor.Decrypt(n.StorageID, n.Password)
|
||||
if err != nil {
|
||||
_ = conn.Close()
|
||||
return nil, fmt.Errorf("failed to decrypt NAS password: %w", err)
|
||||
}
|
||||
|
||||
// Create SMB2 dialer
|
||||
d := &smb2.Dialer{
|
||||
Initiator: &smb2.NTLMInitiator{
|
||||
User: n.Username,
|
||||
@@ -312,7 +331,6 @@ func (n *NASStorage) createSession(encryptor encryption.FieldEncryptor) (*smb2.S
|
||||
},
|
||||
}
|
||||
|
||||
// Create session
|
||||
session, err := d.Dial(conn)
|
||||
if err != nil {
|
||||
_ = conn.Close()
|
||||
@@ -322,34 +340,30 @@ func (n *NASStorage) createSession(encryptor encryption.FieldEncryptor) (*smb2.S
|
||||
return session, nil
|
||||
}
|
||||
|
||||
func (n *NASStorage) createConnection() (net.Conn, error) {
|
||||
func (n *NASStorage) createConnectionWithContext(ctx context.Context) (net.Conn, error) {
|
||||
address := net.JoinHostPort(n.Host, fmt.Sprintf("%d", n.Port))
|
||||
|
||||
// Create connection with timeout
|
||||
dialer := &net.Dialer{
|
||||
Timeout: 10 * time.Second,
|
||||
Timeout: 30 * time.Second,
|
||||
}
|
||||
|
||||
if n.UseSSL {
|
||||
// Use TLS connection
|
||||
tlsConfig := &tls.Config{
|
||||
ServerName: n.Host,
|
||||
InsecureSkipVerify: false, // Change to true if you want to skip cert verification
|
||||
InsecureSkipVerify: false,
|
||||
}
|
||||
|
||||
conn, err := tls.DialWithDialer(dialer, "tcp", address, tlsConfig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create SSL connection to %s: %w", address, err)
|
||||
}
|
||||
return conn, nil
|
||||
} else {
|
||||
// Use regular TCP connection
|
||||
conn, err := dialer.Dial("tcp", address)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create connection to %s: %w", address, err)
|
||||
}
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
conn, err := dialer.DialContext(ctx, "tcp", address)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create connection to %s: %w", address, err)
|
||||
}
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func (n *NASStorage) ensureDirectory(fs *smb2.Share, path string) error {
|
||||
@@ -444,3 +458,71 @@ func (r *nasFileReader) Close() error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type writeResult struct {
|
||||
bytesWritten int
|
||||
writeErr error
|
||||
}
|
||||
|
||||
func copyWithContext(ctx context.Context, dst io.Writer, src io.Reader) (int64, error) {
|
||||
buf := make([]byte, nasChunkSize)
|
||||
var written int64
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return written, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
nr, readErr := io.ReadFull(src, buf)
|
||||
|
||||
if nr == 0 && readErr == io.EOF {
|
||||
break
|
||||
}
|
||||
|
||||
if readErr != nil && readErr != io.EOF && readErr != io.ErrUnexpectedEOF {
|
||||
return written, readErr
|
||||
}
|
||||
|
||||
writeResultCh := make(chan writeResult, 1)
|
||||
go func() {
|
||||
nw, writeErr := dst.Write(buf[0:nr])
|
||||
writeResultCh <- writeResult{nw, writeErr}
|
||||
}()
|
||||
|
||||
var nw int
|
||||
var writeErr error
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return written, ctx.Err()
|
||||
case result := <-writeResultCh:
|
||||
nw = result.bytesWritten
|
||||
writeErr = result.writeErr
|
||||
}
|
||||
|
||||
if nw < 0 || nr < nw {
|
||||
nw = 0
|
||||
if writeErr == nil {
|
||||
writeErr = errors.New("invalid write result")
|
||||
}
|
||||
}
|
||||
|
||||
if writeErr != nil {
|
||||
return written, writeErr
|
||||
}
|
||||
|
||||
if nr != nw {
|
||||
return written, io.ErrShortWrite
|
||||
}
|
||||
|
||||
written += int64(nw)
|
||||
|
||||
if readErr == io.EOF || readErr == io.ErrUnexpectedEOF {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return written, nil
|
||||
}
|
||||
|
||||
@@ -7,6 +7,8 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net"
|
||||
"net/http"
|
||||
"postgresus-backend/internal/util/encryption"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -16,6 +18,18 @@ import (
|
||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||
)
|
||||
|
||||
const (
|
||||
s3ConnectTimeout = 30 * time.Second
|
||||
s3ResponseTimeout = 30 * time.Second
|
||||
s3IdleConnTimeout = 90 * time.Second
|
||||
s3TLSHandshakeTimeout = 30 * time.Second
|
||||
|
||||
// Chunk size for multipart uploads - 16MB provides good balance between
|
||||
// memory usage and upload efficiency. This creates backpressure to pg_dump
|
||||
// by only reading one chunk at a time and waiting for S3 to confirm receipt.
|
||||
multipartChunkSize = 16 * 1024 * 1024
|
||||
)
|
||||
|
||||
type S3Storage struct {
|
||||
StorageID uuid.UUID `json:"storageId" gorm:"primaryKey;type:uuid;column:storage_id"`
|
||||
S3Bucket string `json:"s3Bucket" gorm:"not null;type:text;column:s3_bucket"`
|
||||
@@ -33,29 +47,123 @@ func (s *S3Storage) TableName() string {
|
||||
}
|
||||
|
||||
func (s *S3Storage) SaveFile(
|
||||
ctx context.Context,
|
||||
encryptor encryption.FieldEncryptor,
|
||||
logger *slog.Logger,
|
||||
fileID uuid.UUID,
|
||||
file io.Reader,
|
||||
) error {
|
||||
client, err := s.getClient(encryptor)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("upload cancelled before start: %w", ctx.Err())
|
||||
default:
|
||||
}
|
||||
|
||||
coreClient, err := s.getCoreClient(encryptor)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
objectKey := s.buildObjectKey(fileID.String())
|
||||
|
||||
// Upload the file using MinIO client with streaming (size = -1 for unknown size)
|
||||
_, err = client.PutObject(
|
||||
context.TODO(),
|
||||
uploadID, err := coreClient.NewMultipartUpload(
|
||||
ctx,
|
||||
s.S3Bucket,
|
||||
objectKey,
|
||||
file,
|
||||
-1,
|
||||
minio.PutObjectOptions{},
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to upload file to S3: %w", err)
|
||||
return fmt.Errorf("failed to initiate multipart upload: %w", err)
|
||||
}
|
||||
|
||||
var parts []minio.CompletePart
|
||||
partNumber := 1
|
||||
buf := make([]byte, multipartChunkSize)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
_ = coreClient.AbortMultipartUpload(ctx, s.S3Bucket, objectKey, uploadID)
|
||||
return fmt.Errorf("upload cancelled: %w", ctx.Err())
|
||||
default:
|
||||
}
|
||||
|
||||
n, readErr := io.ReadFull(file, buf)
|
||||
|
||||
if n == 0 && readErr == io.EOF {
|
||||
break
|
||||
}
|
||||
|
||||
if readErr != nil && readErr != io.EOF && readErr != io.ErrUnexpectedEOF {
|
||||
_ = coreClient.AbortMultipartUpload(ctx, s.S3Bucket, objectKey, uploadID)
|
||||
return fmt.Errorf("read error: %w", readErr)
|
||||
}
|
||||
|
||||
part, err := coreClient.PutObjectPart(
|
||||
ctx,
|
||||
s.S3Bucket,
|
||||
objectKey,
|
||||
uploadID,
|
||||
partNumber,
|
||||
bytes.NewReader(buf[:n]),
|
||||
int64(n),
|
||||
minio.PutObjectPartOptions{},
|
||||
)
|
||||
if err != nil {
|
||||
_ = coreClient.AbortMultipartUpload(ctx, s.S3Bucket, objectKey, uploadID)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("upload cancelled: %w", ctx.Err())
|
||||
default:
|
||||
return fmt.Errorf("failed to upload part %d: %w", partNumber, err)
|
||||
}
|
||||
}
|
||||
|
||||
parts = append(parts, minio.CompletePart{
|
||||
PartNumber: partNumber,
|
||||
ETag: part.ETag,
|
||||
})
|
||||
|
||||
partNumber++
|
||||
|
||||
if readErr == io.EOF || readErr == io.ErrUnexpectedEOF {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if len(parts) == 0 {
|
||||
_ = coreClient.AbortMultipartUpload(ctx, s.S3Bucket, objectKey, uploadID)
|
||||
|
||||
client, err := s.getClient(encryptor)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = client.PutObject(
|
||||
ctx,
|
||||
s.S3Bucket,
|
||||
objectKey,
|
||||
bytes.NewReader([]byte{}),
|
||||
0,
|
||||
minio.PutObjectOptions{},
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to upload empty file: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err = coreClient.CompleteMultipartUpload(
|
||||
ctx,
|
||||
s.S3Bucket,
|
||||
objectKey,
|
||||
uploadID,
|
||||
parts,
|
||||
minio.PutObjectOptions{},
|
||||
)
|
||||
if err != nil {
|
||||
_ = coreClient.AbortMultipartUpload(ctx, s.S3Bucket, objectKey, uploadID)
|
||||
return fmt.Errorf("failed to complete multipart upload: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -252,8 +360,54 @@ func (s *S3Storage) buildObjectKey(fileName string) string {
|
||||
}
|
||||
|
||||
func (s *S3Storage) getClient(encryptor encryption.FieldEncryptor) (*minio.Client, error) {
|
||||
endpoint := s.S3Endpoint
|
||||
useSSL := true
|
||||
endpoint, useSSL, accessKey, secretKey, bucketLookup, transport, err := s.getClientParams(
|
||||
encryptor,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
minioClient, err := minio.New(endpoint, &minio.Options{
|
||||
Creds: credentials.NewStaticV4(accessKey, secretKey, ""),
|
||||
Secure: useSSL,
|
||||
Region: s.S3Region,
|
||||
BucketLookup: bucketLookup,
|
||||
Transport: transport,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize MinIO client: %w", err)
|
||||
}
|
||||
|
||||
return minioClient, nil
|
||||
}
|
||||
|
||||
func (s *S3Storage) getCoreClient(encryptor encryption.FieldEncryptor) (*minio.Core, error) {
|
||||
endpoint, useSSL, accessKey, secretKey, bucketLookup, transport, err := s.getClientParams(
|
||||
encryptor,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
coreClient, err := minio.NewCore(endpoint, &minio.Options{
|
||||
Creds: credentials.NewStaticV4(accessKey, secretKey, ""),
|
||||
Secure: useSSL,
|
||||
Region: s.S3Region,
|
||||
BucketLookup: bucketLookup,
|
||||
Transport: transport,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize MinIO Core client: %w", err)
|
||||
}
|
||||
|
||||
return coreClient, nil
|
||||
}
|
||||
|
||||
func (s *S3Storage) getClientParams(
|
||||
encryptor encryption.FieldEncryptor,
|
||||
) (endpoint string, useSSL bool, accessKey string, secretKey string, bucketLookup minio.BucketLookupType, transport *http.Transport, err error) {
|
||||
endpoint = s.S3Endpoint
|
||||
useSSL = true
|
||||
|
||||
if strings.HasPrefix(endpoint, "http://") {
|
||||
useSSL = false
|
||||
@@ -262,38 +416,33 @@ func (s *S3Storage) getClient(encryptor encryption.FieldEncryptor) (*minio.Clien
|
||||
endpoint = strings.TrimPrefix(endpoint, "https://")
|
||||
}
|
||||
|
||||
// If no endpoint is provided, use the AWS S3 endpoint for the region
|
||||
if endpoint == "" {
|
||||
endpoint = fmt.Sprintf("s3.%s.amazonaws.com", s.S3Region)
|
||||
}
|
||||
|
||||
// Decrypt credentials before use
|
||||
accessKey, err := encryptor.Decrypt(s.StorageID, s.S3AccessKey)
|
||||
accessKey, err = encryptor.Decrypt(s.StorageID, s.S3AccessKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to decrypt S3 access key: %w", err)
|
||||
return "", false, "", "", 0, nil, fmt.Errorf("failed to decrypt S3 access key: %w", err)
|
||||
}
|
||||
|
||||
secretKey, err := encryptor.Decrypt(s.StorageID, s.S3SecretKey)
|
||||
secretKey, err = encryptor.Decrypt(s.StorageID, s.S3SecretKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to decrypt S3 secret key: %w", err)
|
||||
return "", false, "", "", 0, nil, fmt.Errorf("failed to decrypt S3 secret key: %w", err)
|
||||
}
|
||||
|
||||
// Configure bucket lookup strategy
|
||||
bucketLookup := minio.BucketLookupAuto
|
||||
bucketLookup = minio.BucketLookupAuto
|
||||
if s.S3UseVirtualHostedStyle {
|
||||
bucketLookup = minio.BucketLookupDNS
|
||||
}
|
||||
|
||||
// Initialize the MinIO client
|
||||
minioClient, err := minio.New(endpoint, &minio.Options{
|
||||
Creds: credentials.NewStaticV4(accessKey, secretKey, ""),
|
||||
Secure: useSSL,
|
||||
Region: s.S3Region,
|
||||
BucketLookup: bucketLookup,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize MinIO client: %w", err)
|
||||
transport = &http.Transport{
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: s3ConnectTimeout,
|
||||
}).DialContext,
|
||||
TLSHandshakeTimeout: s3TLSHandshakeTimeout,
|
||||
ResponseHeaderTimeout: s3ResponseTimeout,
|
||||
IdleConnTimeout: s3IdleConnTimeout,
|
||||
}
|
||||
|
||||
return minioClient, nil
|
||||
return endpoint, useSSL, accessKey, secretKey, bucketLookup, transport, nil
|
||||
}
|
||||
|
||||
23
deploy/helm/.helmignore
Normal file
23
deploy/helm/.helmignore
Normal file
@@ -0,0 +1,23 @@
|
||||
# Patterns to ignore when building packages.
|
||||
# This supports shell glob matching, relative path matching, and
|
||||
# negation (prefixed with !). Only one pattern per line.
|
||||
.DS_Store
|
||||
# Common VCS dirs
|
||||
.git/
|
||||
.gitignore
|
||||
.bzr/
|
||||
.bzrignore
|
||||
.hg/
|
||||
.hgignore
|
||||
.svn/
|
||||
# Common backup files
|
||||
*.swp
|
||||
*.bak
|
||||
*.tmp
|
||||
*.orig
|
||||
*~
|
||||
# Various IDEs
|
||||
.project
|
||||
.idea/
|
||||
*.tmproj
|
||||
.vscode/
|
||||
22
deploy/helm/Chart.yaml
Normal file
22
deploy/helm/Chart.yaml
Normal file
@@ -0,0 +1,22 @@
|
||||
apiVersion: v2
|
||||
name: postgresus
|
||||
description: A Helm chart for Postgresus - PostgreSQL backup and management system
|
||||
type: application
|
||||
version: 0.0.0
|
||||
appVersion: "latest"
|
||||
keywords:
|
||||
- postgresql
|
||||
- backup
|
||||
- database
|
||||
- restore
|
||||
home: https://github.com/RostislavDugin/postgresus
|
||||
|
||||
sources:
|
||||
- https://github.com/RostislavDugin/postgresus
|
||||
- https://github.com/RostislavDugin/postgresus/tree/main/deploy/helm
|
||||
|
||||
maintainers:
|
||||
- name: Rostislav Dugin
|
||||
url: https://github.com/RostislavDugin
|
||||
|
||||
icon: https://raw.githubusercontent.com/RostislavDugin/postgresus/main/frontend/public/logo.svg
|
||||
222
deploy/helm/README.md
Normal file
222
deploy/helm/README.md
Normal file
@@ -0,0 +1,222 @@
|
||||
# Postgresus Helm Chart
|
||||
|
||||
## Installation
|
||||
|
||||
Install directly from the OCI registry (no need to clone the repository):
|
||||
|
||||
```bash
|
||||
helm install postgresus oci://ghcr.io/rostislavdugin/charts/postgresus \
|
||||
-n postgresus --create-namespace
|
||||
```
|
||||
|
||||
The `-n postgresus --create-namespace` flags control which namespace the chart is installed into. You can use any namespace name you prefer.
|
||||
|
||||
## Accessing Postgresus
|
||||
|
||||
By default, the chart creates a ClusterIP service. Use port-forward to access:
|
||||
|
||||
```bash
|
||||
kubectl port-forward svc/postgresus-service 4005:4005 -n postgresus
|
||||
```
|
||||
|
||||
Then open `http://localhost:4005` in your browser.
|
||||
|
||||
## Configuration
|
||||
|
||||
### Main Parameters
|
||||
|
||||
| Parameter | Description | Default Value |
|
||||
| ------------------ | ------------------ | --------------------------- |
|
||||
| `image.repository` | Docker image | `rostislavdugin/postgresus` |
|
||||
| `image.tag` | Image tag | `latest` |
|
||||
| `image.pullPolicy` | Image pull policy | `Always` |
|
||||
| `replicaCount` | Number of replicas | `1` |
|
||||
|
||||
### Service
|
||||
|
||||
| Parameter | Description | Default Value |
|
||||
| -------------------------- | ----------------------- | ------------- |
|
||||
| `service.type` | Service type | `ClusterIP` |
|
||||
| `service.port` | Service port | `4005` |
|
||||
| `service.targetPort` | Container port | `4005` |
|
||||
| `service.headless.enabled` | Enable headless service | `true` |
|
||||
|
||||
### Storage
|
||||
|
||||
| Parameter | Description | Default Value |
|
||||
| ------------------------------ | ------------------------- | ---------------------- |
|
||||
| `persistence.enabled` | Enable persistent storage | `true` |
|
||||
| `persistence.storageClassName` | Storage class | `""` (cluster default) |
|
||||
| `persistence.accessMode` | Access mode | `ReadWriteOnce` |
|
||||
| `persistence.size` | Storage size | `10Gi` |
|
||||
| `persistence.mountPath` | Mount path | `/postgresus-data` |
|
||||
|
||||
### Resources
|
||||
|
||||
| Parameter | Description | Default Value |
|
||||
| --------------------------- | -------------- | ------------- |
|
||||
| `resources.requests.memory` | Memory request | `1Gi` |
|
||||
| `resources.requests.cpu` | CPU request | `500m` |
|
||||
| `resources.limits.memory` | Memory limit | `1Gi` |
|
||||
| `resources.limits.cpu` | CPU limit | `500m` |
|
||||
|
||||
## External Access Options
|
||||
|
||||
### Option 1: Port Forward (Default)
|
||||
|
||||
Best for development or quick access:
|
||||
|
||||
```bash
|
||||
kubectl port-forward svc/postgresus-service 4005:4005 -n postgresus
|
||||
```
|
||||
|
||||
Access at `http://localhost:4005`
|
||||
|
||||
### Option 2: NodePort
|
||||
|
||||
For direct access via node IP:
|
||||
|
||||
```yaml
|
||||
# nodeport-values.yaml
|
||||
service:
|
||||
type: NodePort
|
||||
port: 4005
|
||||
targetPort: 4005
|
||||
nodePort: 30080
|
||||
```
|
||||
|
||||
```bash
|
||||
helm install postgresus oci://ghcr.io/rostislavdugin/charts/postgresus \
|
||||
-n postgresus --create-namespace \
|
||||
-f nodeport-values.yaml
|
||||
```
|
||||
|
||||
Access at `http://<NODE-IP>:30080`
|
||||
|
||||
### Option 3: LoadBalancer
|
||||
|
||||
For cloud environments with load balancer support:
|
||||
|
||||
```yaml
|
||||
# loadbalancer-values.yaml
|
||||
service:
|
||||
type: LoadBalancer
|
||||
port: 80
|
||||
targetPort: 4005
|
||||
```
|
||||
|
||||
```bash
|
||||
helm install postgresus oci://ghcr.io/rostislavdugin/charts/postgresus \
|
||||
-n postgresus --create-namespace \
|
||||
-f loadbalancer-values.yaml
|
||||
```
|
||||
|
||||
Get the external IP:
|
||||
|
||||
```bash
|
||||
kubectl get svc -n postgresus
|
||||
```
|
||||
|
||||
Access at `http://<EXTERNAL-IP>`
|
||||
|
||||
### Option 4: Ingress
|
||||
|
||||
For domain-based access with TLS:
|
||||
|
||||
```yaml
|
||||
# ingress-values.yaml
|
||||
ingress:
|
||||
enabled: true
|
||||
className: nginx
|
||||
annotations:
|
||||
nginx.ingress.kubernetes.io/ssl-redirect: "true"
|
||||
cert-manager.io/cluster-issuer: "letsencrypt-prod"
|
||||
hosts:
|
||||
- host: backup.example.com
|
||||
paths:
|
||||
- path: /
|
||||
pathType: Prefix
|
||||
tls:
|
||||
- secretName: backup-example-com-tls
|
||||
hosts:
|
||||
- backup.example.com
|
||||
```
|
||||
|
||||
```bash
|
||||
helm install postgresus oci://ghcr.io/rostislavdugin/charts/postgresus \
|
||||
-n postgresus --create-namespace \
|
||||
-f ingress-values.yaml
|
||||
```
|
||||
|
||||
### Option 5: HTTPRoute (Gateway API)
|
||||
|
||||
For clusters using Istio, Envoy Gateway, Cilium, or other Gateway API implementations:
|
||||
|
||||
```yaml
|
||||
# httproute-values.yaml
|
||||
route:
|
||||
enabled: true
|
||||
hostnames:
|
||||
- backup.example.com
|
||||
parentRefs:
|
||||
- name: my-gateway
|
||||
namespace: istio-system
|
||||
```
|
||||
|
||||
```bash
|
||||
helm install postgresus oci://ghcr.io/rostislavdugin/charts/postgresus \
|
||||
-n postgresus --create-namespace \
|
||||
-f httproute-values.yaml
|
||||
```
|
||||
|
||||
## Ingress Configuration
|
||||
|
||||
| Parameter | Description | Default Value |
|
||||
| ----------------------- | ----------------- | ------------------------ |
|
||||
| `ingress.enabled` | Enable Ingress | `false` |
|
||||
| `ingress.className` | Ingress class | `nginx` |
|
||||
| `ingress.hosts[0].host` | Hostname | `postgresus.example.com` |
|
||||
| `ingress.tls` | TLS configuration | `[]` |
|
||||
|
||||
## HTTPRoute Configuration
|
||||
|
||||
| Parameter | Description | Default Value |
|
||||
| ------------------ | ----------------------- | ------------------------------ |
|
||||
| `route.enabled` | Enable HTTPRoute | `false` |
|
||||
| `route.apiVersion` | Gateway API version | `gateway.networking.k8s.io/v1` |
|
||||
| `route.hostnames` | Hostnames for the route | `["postgresus.example.com"]` |
|
||||
| `route.parentRefs` | Gateway references | `[]` |
|
||||
|
||||
## Health Checks
|
||||
|
||||
| Parameter | Description | Default Value |
|
||||
| ------------------------ | ---------------------- | ------------- |
|
||||
| `livenessProbe.enabled` | Enable liveness probe | `true` |
|
||||
| `readinessProbe.enabled` | Enable readiness probe | `true` |
|
||||
|
||||
## Custom Storage Size
|
||||
|
||||
```yaml
|
||||
# storage-values.yaml
|
||||
persistence:
|
||||
size: 50Gi
|
||||
storageClassName: "fast-ssd"
|
||||
```
|
||||
|
||||
```bash
|
||||
helm install postgresus oci://ghcr.io/rostislavdugin/charts/postgresus \
|
||||
-n postgresus --create-namespace \
|
||||
-f storage-values.yaml
|
||||
```
|
||||
|
||||
## Upgrade
|
||||
|
||||
```bash
|
||||
helm upgrade postgresus oci://ghcr.io/rostislavdugin/charts/postgresus -n postgresus
|
||||
```
|
||||
|
||||
## Uninstall
|
||||
|
||||
```bash
|
||||
helm uninstall postgresus -n postgresus
|
||||
```
|
||||
68
deploy/helm/templates/_helpers.tpl
Normal file
68
deploy/helm/templates/_helpers.tpl
Normal file
@@ -0,0 +1,68 @@
|
||||
{{/*
|
||||
Expand the name of the chart.
|
||||
*/}}
|
||||
{{- define "postgresus.name" -}}
|
||||
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }}
|
||||
{{- end }}
|
||||
|
||||
{{/*
|
||||
Create a default fully qualified app name.
|
||||
*/}}
|
||||
{{- define "postgresus.fullname" -}}
|
||||
{{- if .Values.fullnameOverride }}
|
||||
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }}
|
||||
{{- else }}
|
||||
{{- $name := default .Chart.Name .Values.nameOverride }}
|
||||
{{- if contains $name .Release.Name }}
|
||||
{{- .Release.Name | trunc 63 | trimSuffix "-" }}
|
||||
{{- else }}
|
||||
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
|
||||
{{/*
|
||||
Create chart name and version as used by the chart label.
|
||||
*/}}
|
||||
{{- define "postgresus.chart" -}}
|
||||
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }}
|
||||
{{- end }}
|
||||
|
||||
{{/*
|
||||
Common labels
|
||||
*/}}
|
||||
{{- define "postgresus.labels" -}}
|
||||
helm.sh/chart: {{ include "postgresus.chart" . }}
|
||||
{{ include "postgresus.selectorLabels" . }}
|
||||
{{- if .Chart.AppVersion }}
|
||||
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
|
||||
{{- end }}
|
||||
app.kubernetes.io/managed-by: {{ .Release.Service }}
|
||||
{{- end }}
|
||||
|
||||
{{/*
|
||||
Selector labels
|
||||
*/}}
|
||||
{{- define "postgresus.selectorLabels" -}}
|
||||
app.kubernetes.io/name: {{ include "postgresus.name" . }}
|
||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||
app: postgresus
|
||||
{{- end }}
|
||||
|
||||
{{/*
|
||||
Create the name of the service account to use
|
||||
*/}}
|
||||
{{- define "postgresus.serviceAccountName" -}}
|
||||
{{- if .Values.serviceAccount.create }}
|
||||
{{- default (include "postgresus.fullname" .) .Values.serviceAccount.name }}
|
||||
{{- else }}
|
||||
{{- default "default" .Values.serviceAccount.name }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
|
||||
{{/*
|
||||
Namespace - uses the release namespace from helm install -n <namespace>
|
||||
*/}}
|
||||
{{- define "postgresus.namespace" -}}
|
||||
{{- .Release.Namespace }}
|
||||
{{- end }}
|
||||
35
deploy/helm/templates/httproute.yaml
Normal file
35
deploy/helm/templates/httproute.yaml
Normal file
@@ -0,0 +1,35 @@
|
||||
{{- if .Values.route.enabled -}}
|
||||
apiVersion: {{ .Values.route.apiVersion}}
|
||||
kind: {{ .Values.route.kind}}
|
||||
metadata:
|
||||
name: {{ template "postgresus.fullname" . }}
|
||||
annotations: {{ toYaml .Values.route.annotations | nindent 4 }}
|
||||
labels:
|
||||
app.kubernetes.io/component: "app"
|
||||
{{- include "postgresus.labels" . | nindent 4 }}
|
||||
spec:
|
||||
{{- with .Values.route.parentRefs }}
|
||||
parentRefs:
|
||||
{{- toYaml . | nindent 4 }}
|
||||
{{- end }}
|
||||
{{- with .Values.route.hostnames }}
|
||||
hostnames:
|
||||
{{- toYaml . | nindent 4 }}
|
||||
{{- end }}
|
||||
rules:
|
||||
- backendRefs:
|
||||
- name: {{ template "postgresus.fullname" . }}-service
|
||||
port: {{ .Values.service.port }}
|
||||
{{- with .Values.route.filters }}
|
||||
filters:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
{{- with .Values.route.matches }}
|
||||
matches:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
{{- with .Values.route.timeouts }}
|
||||
timeouts:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
42
deploy/helm/templates/ingress.yaml
Normal file
42
deploy/helm/templates/ingress.yaml
Normal file
@@ -0,0 +1,42 @@
|
||||
{{- if .Values.ingress.enabled -}}
|
||||
apiVersion: networking.k8s.io/v1
|
||||
kind: Ingress
|
||||
metadata:
|
||||
name: {{ include "postgresus.fullname" . }}-ingress
|
||||
namespace: {{ include "postgresus.namespace" . }}
|
||||
labels:
|
||||
{{- include "postgresus.labels" . | nindent 4 }}
|
||||
{{- with .Values.ingress.annotations }}
|
||||
annotations:
|
||||
{{- toYaml . | nindent 4 }}
|
||||
{{- end }}
|
||||
spec:
|
||||
{{- if .Values.ingress.className }}
|
||||
ingressClassName: {{ .Values.ingress.className }}
|
||||
{{- end }}
|
||||
{{- if .Values.ingress.tls }}
|
||||
tls:
|
||||
{{- range .Values.ingress.tls }}
|
||||
- hosts:
|
||||
{{- range .hosts }}
|
||||
- {{ . | quote }}
|
||||
{{- end }}
|
||||
secretName: {{ .secretName }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
rules:
|
||||
{{- range .Values.ingress.hosts }}
|
||||
- host: {{ .host | quote }}
|
||||
http:
|
||||
paths:
|
||||
{{- range .paths }}
|
||||
- path: {{ .path }}
|
||||
pathType: {{ .pathType }}
|
||||
backend:
|
||||
service:
|
||||
name: {{ include "postgresus.fullname" $ }}-service
|
||||
port:
|
||||
number: {{ $.Values.service.port }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
36
deploy/helm/templates/service.yaml
Normal file
36
deploy/helm/templates/service.yaml
Normal file
@@ -0,0 +1,36 @@
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: {{ include "postgresus.fullname" . }}-service
|
||||
namespace: {{ include "postgresus.namespace" . }}
|
||||
labels:
|
||||
{{- include "postgresus.labels" . | nindent 4 }}
|
||||
spec:
|
||||
type: {{ .Values.service.type }}
|
||||
ports:
|
||||
- port: {{ .Values.service.port }}
|
||||
targetPort: {{ .Values.service.targetPort }}
|
||||
protocol: TCP
|
||||
name: http
|
||||
selector:
|
||||
{{- include "postgresus.selectorLabels" . | nindent 4 }}
|
||||
---
|
||||
{{- if .Values.service.headless.enabled }}
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: {{ include "postgresus.fullname" . }}-headless
|
||||
namespace: {{ include "postgresus.namespace" . }}
|
||||
labels:
|
||||
{{- include "postgresus.labels" . | nindent 4 }}
|
||||
spec:
|
||||
type: ClusterIP
|
||||
clusterIP: None
|
||||
ports:
|
||||
- port: {{ .Values.service.port }}
|
||||
targetPort: {{ .Values.service.targetPort }}
|
||||
protocol: TCP
|
||||
name: http
|
||||
selector:
|
||||
{{- include "postgresus.selectorLabels" . | nindent 4 }}
|
||||
{{- end }}
|
||||
84
deploy/helm/templates/statefulset.yaml
Normal file
84
deploy/helm/templates/statefulset.yaml
Normal file
@@ -0,0 +1,84 @@
|
||||
apiVersion: apps/v1
|
||||
kind: StatefulSet
|
||||
metadata:
|
||||
name: {{ include "postgresus.fullname" . }}
|
||||
namespace: {{ include "postgresus.namespace" . }}
|
||||
labels:
|
||||
{{- include "postgresus.labels" . | nindent 4 }}
|
||||
spec:
|
||||
serviceName: {{ include "postgresus.fullname" . }}-headless
|
||||
replicas: {{ .Values.replicaCount }}
|
||||
selector:
|
||||
matchLabels:
|
||||
{{- include "postgresus.selectorLabels" . | nindent 6 }}
|
||||
template:
|
||||
metadata:
|
||||
annotations:
|
||||
{{- with .Values.podAnnotations }}
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
labels:
|
||||
{{- include "postgresus.selectorLabels" . | nindent 8 }}
|
||||
{{- with .Values.podLabels }}
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
spec:
|
||||
{{- with .Values.nodeSelector }}
|
||||
nodeSelector:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
{{- with .Values.affinity }}
|
||||
affinity:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
{{- with .Values.tolerations }}
|
||||
tolerations:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
{{- end }}
|
||||
containers:
|
||||
- name: {{ .Chart.Name }}
|
||||
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
|
||||
imagePullPolicy: {{ .Values.image.pullPolicy }}
|
||||
ports:
|
||||
- name: http
|
||||
containerPort: {{ .Values.service.targetPort }}
|
||||
protocol: TCP
|
||||
volumeMounts:
|
||||
- name: postgresus-storage
|
||||
mountPath: {{ .Values.persistence.mountPath }}
|
||||
resources:
|
||||
{{- toYaml .Values.resources | nindent 12 }}
|
||||
{{- if .Values.livenessProbe.enabled }}
|
||||
livenessProbe:
|
||||
httpGet:
|
||||
{{- toYaml .Values.livenessProbe.httpGet | nindent 14 }}
|
||||
initialDelaySeconds: {{ .Values.livenessProbe.initialDelaySeconds }}
|
||||
periodSeconds: {{ .Values.livenessProbe.periodSeconds }}
|
||||
timeoutSeconds: {{ .Values.livenessProbe.timeoutSeconds }}
|
||||
failureThreshold: {{ .Values.livenessProbe.failureThreshold }}
|
||||
{{- end }}
|
||||
{{- if .Values.readinessProbe.enabled }}
|
||||
readinessProbe:
|
||||
httpGet:
|
||||
{{- toYaml .Values.readinessProbe.httpGet | nindent 14 }}
|
||||
initialDelaySeconds: {{ .Values.readinessProbe.initialDelaySeconds }}
|
||||
periodSeconds: {{ .Values.readinessProbe.periodSeconds }}
|
||||
timeoutSeconds: {{ .Values.readinessProbe.timeoutSeconds }}
|
||||
failureThreshold: {{ .Values.readinessProbe.failureThreshold }}
|
||||
{{- end }}
|
||||
{{- if .Values.persistence.enabled }}
|
||||
volumeClaimTemplates:
|
||||
- metadata:
|
||||
name: postgresus-storage
|
||||
spec:
|
||||
accessModes:
|
||||
- {{ .Values.persistence.accessMode }}
|
||||
{{- if .Values.persistence.storageClassName }}
|
||||
storageClassName: {{ .Values.persistence.storageClassName }}
|
||||
{{- end }}
|
||||
resources:
|
||||
requests:
|
||||
storage: {{ .Values.persistence.size }}
|
||||
{{- end }}
|
||||
updateStrategy:
|
||||
{{- toYaml .Values.updateStrategy | nindent 4 }}
|
||||
101
deploy/helm/values.yaml
Normal file
101
deploy/helm/values.yaml
Normal file
@@ -0,0 +1,101 @@
|
||||
# Default values for postgresus
|
||||
|
||||
# Image configuration
|
||||
image:
|
||||
repository: rostislavdugin/postgresus
|
||||
tag: latest
|
||||
pullPolicy: Always
|
||||
|
||||
# StatefulSet configuration
|
||||
replicaCount: 1
|
||||
|
||||
# Service configuration
|
||||
service:
|
||||
type: ClusterIP
|
||||
port: 4005 # Service port
|
||||
targetPort: 4005 # Internal container port
|
||||
# Headless service for StatefulSet
|
||||
headless:
|
||||
enabled: true
|
||||
|
||||
# Resource limits and requests
|
||||
resources:
|
||||
requests:
|
||||
memory: "1Gi"
|
||||
cpu: "500m"
|
||||
limits:
|
||||
memory: "1Gi"
|
||||
cpu: "500m"
|
||||
|
||||
# Persistent storage configuration
|
||||
persistence:
|
||||
enabled: true
|
||||
# Storage class name. Leave empty to use cluster default.
|
||||
# Examples: "longhorn", "standard", "gp2", etc.
|
||||
storageClassName: ""
|
||||
accessMode: ReadWriteOnce
|
||||
size: 10Gi
|
||||
# Mount path in container
|
||||
mountPath: /postgresus-data
|
||||
|
||||
# Ingress configuration (disabled by default - using LoadBalancer instead)
|
||||
ingress:
|
||||
enabled: false
|
||||
className: nginx
|
||||
annotations: {}
|
||||
hosts:
|
||||
- host: postgresus.example.com
|
||||
paths:
|
||||
- path: /
|
||||
pathType: Prefix
|
||||
tls: []
|
||||
|
||||
# HTTPRoute configuration for Gateway API
|
||||
route:
|
||||
enabled: false
|
||||
apiVersion: gateway.networking.k8s.io/v1
|
||||
kind: HTTPRoute
|
||||
annotations: {}
|
||||
hostnames:
|
||||
- postgresus.example.com
|
||||
parentRefs: []
|
||||
filters: []
|
||||
matches: []
|
||||
timeouts: {}
|
||||
|
||||
# Health checks configuration
|
||||
# Note: The application only has /api/v1/system/health endpoint
|
||||
livenessProbe:
|
||||
enabled: true
|
||||
httpGet:
|
||||
path: /api/v1/system/health
|
||||
port: 4005
|
||||
initialDelaySeconds: 30
|
||||
periodSeconds: 10
|
||||
timeoutSeconds: 5
|
||||
failureThreshold: 3
|
||||
|
||||
readinessProbe:
|
||||
enabled: true
|
||||
httpGet:
|
||||
path: /api/v1/system/health
|
||||
port: 4005
|
||||
initialDelaySeconds: 10
|
||||
periodSeconds: 5
|
||||
timeoutSeconds: 3
|
||||
failureThreshold: 3
|
||||
|
||||
# StatefulSet update strategy
|
||||
updateStrategy:
|
||||
type: RollingUpdate
|
||||
rollingUpdate:
|
||||
partition: 0
|
||||
|
||||
# Pod labels and annotations
|
||||
podLabels: {}
|
||||
podAnnotations: {}
|
||||
|
||||
# Node selector, tolerations and affinity
|
||||
nodeSelector: {}
|
||||
tolerations: []
|
||||
affinity: {}
|
||||
@@ -3,7 +3,10 @@
|
||||
<head>
|
||||
<meta charset="UTF-8" />
|
||||
<link rel="icon" type="image/svg+xml" href="/logo.svg" />
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
|
||||
<meta
|
||||
name="viewport"
|
||||
content="width=device-width, initial-scale=1.0, maximum-scale=1.0, user-scalable=no"
|
||||
/>
|
||||
<meta name="robots" content="noindex" />
|
||||
<title>Postgresus - PostgreSQL backups</title>
|
||||
|
||||
|
||||
@@ -47,7 +47,7 @@ export const DatabasesComponent = ({ contentHeight, workspace, isCanManageDBs }:
|
||||
if (selectDatabaseId) {
|
||||
updateSelectedDatabaseId(selectDatabaseId);
|
||||
} else if (!selectedDatabaseId && !isSilent && !isMobile) {
|
||||
// On desktop, auto-select a database; on mobile, keep it unselected
|
||||
// On desktop, auto-select a database; on mobile, keep it unselected to show the list first
|
||||
const savedDatabaseId = localStorage.getItem(
|
||||
`${SELECTED_DATABASE_STORAGE_KEY}_${workspace.id}`,
|
||||
);
|
||||
|
||||
@@ -79,9 +79,12 @@ export const HealthckeckAttemptsComponent = ({ database }: Props) => {
|
||||
|
||||
useEffect(() => {
|
||||
let interval: number | null = null;
|
||||
let isCancelled = false;
|
||||
|
||||
setIsHealthcheckConfigLoading(true);
|
||||
healthcheckConfigApi.getHealthcheckConfig(database.id).then((healthcheckConfig) => {
|
||||
if (isCancelled) return;
|
||||
|
||||
setIsHealthcheckConfigLoading(false);
|
||||
|
||||
if (healthcheckConfig.isHealthcheckEnabled) {
|
||||
@@ -93,17 +96,18 @@ export const HealthckeckAttemptsComponent = ({ database }: Props) => {
|
||||
if (period === 'today') {
|
||||
interval = setInterval(() => {
|
||||
loadHealthcheckAttempts(false);
|
||||
}, 60_000); // 5 seconds
|
||||
}, 60_000);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return () => {
|
||||
isCancelled = true;
|
||||
if (interval) {
|
||||
clearInterval(interval);
|
||||
}
|
||||
};
|
||||
}, [period]);
|
||||
}, [database.id, period]);
|
||||
|
||||
if (isHealthcheckConfigLoading) {
|
||||
return (
|
||||
|
||||
@@ -47,7 +47,7 @@ export const NotifiersComponent = ({ contentHeight, workspace, isCanManageNotifi
|
||||
if (selectNotifierId) {
|
||||
updateSelectedNotifierId(selectNotifierId);
|
||||
} else if (!selectedNotifierId && !isSilent && !isMobile) {
|
||||
// On desktop, auto-select a notifier; on mobile, keep it unselected
|
||||
// On desktop, auto-select a notifier; on mobile, keep it unselected to show the list first
|
||||
const savedNotifierId = localStorage.getItem(
|
||||
`${SELECTED_NOTIFIER_STORAGE_KEY}_${workspace.id}`,
|
||||
);
|
||||
|
||||
@@ -34,15 +34,19 @@ export const StoragesComponent = ({ contentHeight, workspace, isCanManageStorage
|
||||
}
|
||||
};
|
||||
|
||||
const loadStorages = () => {
|
||||
setIsLoading(true);
|
||||
const loadStorages = (isSilent = false, selectStorageId?: string) => {
|
||||
if (!isSilent) {
|
||||
setIsLoading(true);
|
||||
}
|
||||
|
||||
storageApi
|
||||
.getStorages(workspace.id)
|
||||
.then((storages: Storage[]) => {
|
||||
setStorages(storages);
|
||||
if (!selectedStorageId && !isMobile) {
|
||||
// On desktop, auto-select a storage; on mobile, keep it unselected
|
||||
if (selectStorageId) {
|
||||
updateSelectedStorageId(selectStorageId);
|
||||
} else if (!selectedStorageId && !isSilent && !isMobile) {
|
||||
// On desktop, auto-select a storage; on mobile, keep it unselected to show the list first
|
||||
const savedStorageId = localStorage.getItem(
|
||||
`${SELECTED_STORAGE_STORAGE_KEY}_${workspace.id}`,
|
||||
);
|
||||
@@ -154,8 +158,8 @@ export const StoragesComponent = ({ contentHeight, workspace, isCanManageStorage
|
||||
isShowName
|
||||
isShowClose={false}
|
||||
onClose={() => setIsShowAddStorage(false)}
|
||||
onChanged={() => {
|
||||
loadStorages();
|
||||
onChanged={(storage) => {
|
||||
loadStorages(false, storage.id);
|
||||
setIsShowAddStorage(false);
|
||||
}}
|
||||
/>
|
||||
|
||||
@@ -7,14 +7,14 @@ import { useEffect, useState } from 'react';
|
||||
* @returns isMobile boolean
|
||||
*/
|
||||
export function useIsMobile(): boolean {
|
||||
const [isMobile, setIsMobile] = useState<boolean>(false);
|
||||
// Initialize with actual value to avoid race conditions
|
||||
const [isMobile, setIsMobile] = useState<boolean>(() => window.innerWidth <= 768);
|
||||
|
||||
useEffect(() => {
|
||||
const updateIsMobile = () => {
|
||||
setIsMobile(window.innerWidth <= 768);
|
||||
};
|
||||
|
||||
updateIsMobile(); // Set initial value
|
||||
window.addEventListener('resize', updateIsMobile);
|
||||
|
||||
return () => {
|
||||
|
||||
Reference in New Issue
Block a user