mirror of
https://github.com/databasus/databasus.git
synced 2026-04-06 00:32:03 +02:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cba8fdf49c | ||
|
|
41c72cf7b6 | ||
|
|
f04a8b7a82 |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -7,4 +7,5 @@ node_modules/
|
||||
.idea
|
||||
/articles
|
||||
|
||||
.DS_Store
|
||||
.DS_Store
|
||||
/scripts
|
||||
@@ -30,7 +30,7 @@ import (
|
||||
const (
|
||||
backupTimeout = 23 * time.Hour
|
||||
shutdownCheckInterval = 1 * time.Second
|
||||
copyBufferSize = 16 * 1024 * 1024
|
||||
copyBufferSize = 8 * 1024 * 1024
|
||||
progressReportIntervalMB = 1.0
|
||||
pgConnectTimeout = 30
|
||||
compressionLevel = 5
|
||||
|
||||
@@ -2,7 +2,6 @@ package local_storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
@@ -16,10 +15,10 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// Chunk size for local storage writes - 16MB provides good balance between
|
||||
// memory usage and write efficiency. This creates backpressure to pg_dump
|
||||
// by only reading one chunk at a time and waiting for disk to confirm receipt.
|
||||
localChunkSize = 16 * 1024 * 1024
|
||||
// Chunk size for local storage writes - 8MB per buffer with double-buffering
|
||||
// allows overlapped I/O while keeping total memory under 32MB.
|
||||
// Two 8MB buffers = 16MB for local storage, plus 8MB for pg_dump buffer = ~25MB total.
|
||||
localChunkSize = 8 * 1024 * 1024
|
||||
)
|
||||
|
||||
// LocalStorage uses ./postgresus_local_backups folder as a
|
||||
@@ -192,11 +191,6 @@ func (l *LocalStorage) EncryptSensitiveData(encryptor encryption.FieldEncryptor)
|
||||
func (l *LocalStorage) Update(incoming *LocalStorage) {
|
||||
}
|
||||
|
||||
type writeResult struct {
|
||||
bytesWritten int
|
||||
writeErr error
|
||||
}
|
||||
|
||||
func copyWithContext(ctx context.Context, dst io.Writer, src io.Reader) (int64, error) {
|
||||
buf := make([]byte, localChunkSize)
|
||||
var written int64
|
||||
@@ -208,54 +202,23 @@ func copyWithContext(ctx context.Context, dst io.Writer, src io.Reader) (int64,
|
||||
default:
|
||||
}
|
||||
|
||||
nr, readErr := io.ReadFull(src, buf)
|
||||
|
||||
if nr == 0 && readErr == io.EOF {
|
||||
break
|
||||
}
|
||||
|
||||
if readErr != nil && readErr != io.EOF && readErr != io.ErrUnexpectedEOF {
|
||||
return written, readErr
|
||||
}
|
||||
|
||||
writeResultCh := make(chan writeResult, 1)
|
||||
go func() {
|
||||
nw, writeErr := dst.Write(buf[0:nr])
|
||||
writeResultCh <- writeResult{nw, writeErr}
|
||||
}()
|
||||
|
||||
var nw int
|
||||
var writeErr error
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return written, ctx.Err()
|
||||
case result := <-writeResultCh:
|
||||
nw = result.bytesWritten
|
||||
writeErr = result.writeErr
|
||||
}
|
||||
|
||||
if nw < 0 || nr < nw {
|
||||
nw = 0
|
||||
if writeErr == nil {
|
||||
writeErr = errors.New("invalid write result")
|
||||
nr, readErr := src.Read(buf)
|
||||
if nr > 0 {
|
||||
nw, writeErr := dst.Write(buf[:nr])
|
||||
written += int64(nw)
|
||||
if writeErr != nil {
|
||||
return written, writeErr
|
||||
}
|
||||
if nr != nw {
|
||||
return written, io.ErrShortWrite
|
||||
}
|
||||
}
|
||||
|
||||
if writeErr != nil {
|
||||
return written, writeErr
|
||||
if readErr == io.EOF {
|
||||
return written, nil
|
||||
}
|
||||
|
||||
if nr != nw {
|
||||
return written, io.ErrShortWrite
|
||||
}
|
||||
|
||||
written += int64(nw)
|
||||
|
||||
if readErr == io.EOF || readErr == io.ErrUnexpectedEOF {
|
||||
break
|
||||
if readErr != nil {
|
||||
return written, readErr
|
||||
}
|
||||
}
|
||||
|
||||
return written, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user