Compare commits

...

3 Commits

Author SHA1 Message Date
Rostislav Dugin
cba8fdf49c FEATURE (core)!: Release 2.0 2025-12-08 10:41:36 +03:00
Rostislav Dugin
41c72cf7b6 FIX (buffering): Simplify buffering logic for localstorage 2025-12-07 19:40:40 +03:00
Rostislav Dugin
f04a8b7a82 FIX (backup): Add double buffering for local storange 2025-12-07 19:02:44 +03:00
3 changed files with 20 additions and 56 deletions

3
.gitignore vendored
View File

@@ -7,4 +7,5 @@ node_modules/
.idea
/articles
.DS_Store
.DS_Store
/scripts

View File

@@ -30,7 +30,7 @@ import (
const (
backupTimeout = 23 * time.Hour
shutdownCheckInterval = 1 * time.Second
copyBufferSize = 16 * 1024 * 1024
copyBufferSize = 8 * 1024 * 1024
progressReportIntervalMB = 1.0
pgConnectTimeout = 30
compressionLevel = 5

View File

@@ -2,7 +2,6 @@ package local_storage
import (
"context"
"errors"
"fmt"
"io"
"log/slog"
@@ -16,10 +15,10 @@ import (
)
const (
// Chunk size for local storage writes - 16MB provides good balance between
// memory usage and write efficiency. This creates backpressure to pg_dump
// by only reading one chunk at a time and waiting for disk to confirm receipt.
localChunkSize = 16 * 1024 * 1024
// Chunk size for local storage writes - 8MB per buffer with double-buffering
// allows overlapped I/O while keeping total memory under 32MB.
// Two 8MB buffers = 16MB for local storage, plus 8MB for pg_dump buffer = ~25MB total.
localChunkSize = 8 * 1024 * 1024
)
// LocalStorage uses ./postgresus_local_backups folder as a
@@ -192,11 +191,6 @@ func (l *LocalStorage) EncryptSensitiveData(encryptor encryption.FieldEncryptor)
func (l *LocalStorage) Update(incoming *LocalStorage) {
}
type writeResult struct {
bytesWritten int
writeErr error
}
func copyWithContext(ctx context.Context, dst io.Writer, src io.Reader) (int64, error) {
buf := make([]byte, localChunkSize)
var written int64
@@ -208,54 +202,23 @@ func copyWithContext(ctx context.Context, dst io.Writer, src io.Reader) (int64,
default:
}
nr, readErr := io.ReadFull(src, buf)
if nr == 0 && readErr == io.EOF {
break
}
if readErr != nil && readErr != io.EOF && readErr != io.ErrUnexpectedEOF {
return written, readErr
}
writeResultCh := make(chan writeResult, 1)
go func() {
nw, writeErr := dst.Write(buf[0:nr])
writeResultCh <- writeResult{nw, writeErr}
}()
var nw int
var writeErr error
select {
case <-ctx.Done():
return written, ctx.Err()
case result := <-writeResultCh:
nw = result.bytesWritten
writeErr = result.writeErr
}
if nw < 0 || nr < nw {
nw = 0
if writeErr == nil {
writeErr = errors.New("invalid write result")
nr, readErr := src.Read(buf)
if nr > 0 {
nw, writeErr := dst.Write(buf[:nr])
written += int64(nw)
if writeErr != nil {
return written, writeErr
}
if nr != nw {
return written, io.ErrShortWrite
}
}
if writeErr != nil {
return written, writeErr
if readErr == io.EOF {
return written, nil
}
if nr != nw {
return written, io.ErrShortWrite
}
written += int64(nw)
if readErr == io.EOF || readErr == io.ErrUnexpectedEOF {
break
if readErr != nil {
return written, readErr
}
}
return written, nil
}