From 68f15f766168e61bb79065a0ebadb1b99ba26344 Mon Sep 17 00:00:00 2001 From: Rostislav Dugin Date: Sun, 15 Mar 2026 14:04:54 +0300 Subject: [PATCH] FEATURE (agent): Add WAL streaming --- agent/.gitignore | 3 +- agent/go.mod | 1 + agent/go.sum | 2 + agent/internal/features/start/start.go | 15 +- agent/internal/features/wal/streamer.go | 219 ++++++++++++ agent/internal/features/wal/streamer_test.go | 346 +++++++++++++++++++ 6 files changed, 581 insertions(+), 5 deletions(-) create mode 100644 agent/internal/features/wal/streamer.go create mode 100644 agent/internal/features/wal/streamer_test.go diff --git a/agent/.gitignore b/agent/.gitignore index da0f211..3b479da 100644 --- a/agent/.gitignore +++ b/agent/.gitignore @@ -21,4 +21,5 @@ cmd.exe temp/ valkey-data/ victoria-logs-data/ -databasus.json \ No newline at end of file +databasus.json +.test-tmp/ \ No newline at end of file diff --git a/agent/go.mod b/agent/go.mod index ccec2af..a4259cb 100644 --- a/agent/go.mod +++ b/agent/go.mod @@ -4,6 +4,7 @@ go 1.26.1 require ( github.com/jackc/pgx/v5 v5.8.0 + github.com/klauspost/compress v1.18.4 github.com/stretchr/testify v1.11.1 ) diff --git a/agent/go.sum b/agent/go.sum index 6fe46bd..fc4a5c2 100644 --- a/agent/go.sum +++ b/agent/go.sum @@ -10,6 +10,8 @@ github.com/jackc/pgx/v5 v5.8.0 h1:TYPDoleBBme0xGSAX3/+NujXXtpZn9HBONkQC7IEZSo= github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= +github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= diff --git a/agent/internal/features/start/start.go b/agent/internal/features/start/start.go index 584b78c..1f32275 100644 --- a/agent/internal/features/start/start.go +++ b/agent/internal/features/start/start.go @@ -5,14 +5,18 @@ import ( "errors" "fmt" "log/slog" + "os" "os/exec" + "os/signal" "path/filepath" "strings" + "syscall" "time" "github.com/jackc/pgx/v5" "databasus-agent/internal/config" + "databasus-agent/internal/features/wal" ) const ( @@ -39,10 +43,13 @@ func Run(cfg *config.Config, log *slog.Logger) error { return err } - log.Info("start: stub — not yet implemented", - "dbId", cfg.DbID, - "hasToken", cfg.Token != "", - ) + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer cancel() + + streamer := wal.NewStreamer(cfg, log) + streamer.Run(ctx) + + log.Info("Agent stopped") return nil } diff --git a/agent/internal/features/wal/streamer.go b/agent/internal/features/wal/streamer.go new file mode 100644 index 0000000..105d760 --- /dev/null +++ b/agent/internal/features/wal/streamer.go @@ -0,0 +1,219 @@ +package wal + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" + "os" + "path/filepath" + "regexp" + "sort" + "strings" + "time" + + "github.com/klauspost/compress/zstd" + + "databasus-agent/internal/config" +) + +const ( + pollInterval = 2 * time.Second + uploadTimeout = 5 * time.Minute + uploadPath = "/api/v1/backups/postgres/wal/upload" +) + +var segmentNameRegex = regexp.MustCompile(`^[0-9A-Fa-f]{24}$`) + +type Streamer struct { + cfg *config.Config + httpClient *http.Client + log *slog.Logger +} + +type uploadErrorResponse struct { + Error string `json:"error"` + ExpectedSegmentName string `json:"expectedSegmentName"` + ReceivedSegmentName string `json:"receivedSegmentName"` +} + +func NewStreamer(cfg *config.Config, log *slog.Logger) *Streamer { + return &Streamer{ + cfg: cfg, + httpClient: &http.Client{}, + log: log, + } +} + +func (s *Streamer) Run(ctx context.Context) { + s.log.Info("WAL streamer started", "walDir", s.cfg.WalDir) + + s.processQueue(ctx) + + ticker := time.NewTicker(pollInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + s.log.Info("WAL streamer stopping") + return + case <-ticker.C: + s.processQueue(ctx) + } + } +} + +func (s *Streamer) processQueue(ctx context.Context) { + segments, err := s.listSegments() + if err != nil { + s.log.Error("Failed to list WAL segments", "error", err) + return + } + + for _, segmentName := range segments { + if ctx.Err() != nil { + return + } + + if err := s.uploadSegment(ctx, segmentName); err != nil { + s.log.Error("Failed to upload WAL segment", + "segment", segmentName, + "error", err, + ) + return + } + } +} + +func (s *Streamer) listSegments() ([]string, error) { + entries, err := os.ReadDir(s.cfg.WalDir) + if err != nil { + return nil, fmt.Errorf("read wal dir: %w", err) + } + + var segments []string + + for _, entry := range entries { + if entry.IsDir() { + continue + } + + name := entry.Name() + + if strings.HasSuffix(name, ".tmp") { + continue + } + + if !segmentNameRegex.MatchString(name) { + continue + } + + segments = append(segments, name) + } + + sort.Strings(segments) + + return segments, nil +} + +func (s *Streamer) uploadSegment(ctx context.Context, segmentName string) error { + filePath := filepath.Join(s.cfg.WalDir, segmentName) + + pr, pw := io.Pipe() + + go s.compressAndStream(pw, filePath) + + uploadCtx, cancel := context.WithTimeout(ctx, uploadTimeout) + defer cancel() + + req, err := http.NewRequestWithContext(uploadCtx, http.MethodPost, s.buildUploadURL(), pr) + if err != nil { + _ = pr.Close() + return fmt.Errorf("create request: %w", err) + } + + req.Header.Set("Authorization", s.cfg.Token) + req.Header.Set("Content-Type", "application/octet-stream") + req.Header.Set("X-Upload-Type", "wal") + req.Header.Set("X-Wal-Segment-Name", segmentName) + + resp, err := s.httpClient.Do(req) + if err != nil { + return fmt.Errorf("upload request: %w", err) + } + defer func() { _ = resp.Body.Close() }() + + switch resp.StatusCode { + case http.StatusNoContent: + s.log.Debug("WAL segment uploaded", "segment", segmentName) + + if *s.cfg.IsDeleteWalAfterUpload { + if err := os.Remove(filePath); err != nil { + s.log.Warn("Failed to delete uploaded WAL segment", + "segment", segmentName, + "error", err, + ) + } + } + + return nil + + case http.StatusConflict: + var errResp uploadErrorResponse + + if err := json.NewDecoder(resp.Body).Decode(&errResp); err == nil { + s.log.Warn("WAL chain gap detected", + "segment", segmentName, + "expected", errResp.ExpectedSegmentName, + "received", errResp.ReceivedSegmentName, + ) + } else { + s.log.Warn("WAL chain gap detected", "segment", segmentName) + } + + return fmt.Errorf("gap detected for segment %s", segmentName) + + default: + body, _ := io.ReadAll(resp.Body) + + return fmt.Errorf("upload failed with status %d: %s", resp.StatusCode, string(body)) + } +} + +func (s *Streamer) compressAndStream(pw *io.PipeWriter, filePath string) { + f, err := os.Open(filePath) + if err != nil { + _ = pw.CloseWithError(fmt.Errorf("open file: %w", err)) + return + } + defer func() { _ = f.Close() }() + + encoder, err := zstd.NewWriter(pw, + zstd.WithEncoderLevel(zstd.SpeedDefault), + zstd.WithEncoderCRC(true), + ) + if err != nil { + _ = pw.CloseWithError(fmt.Errorf("create zstd encoder: %w", err)) + return + } + + if _, err := io.Copy(encoder, f); err != nil { + _ = encoder.Close() + _ = pw.CloseWithError(fmt.Errorf("compress: %w", err)) + return + } + + if err := encoder.Close(); err != nil { + _ = pw.CloseWithError(fmt.Errorf("close encoder: %w", err)) + return + } + + _ = pw.Close() +} + +func (s *Streamer) buildUploadURL() string { + return s.cfg.DatabasusHost + uploadPath +} diff --git a/agent/internal/features/wal/streamer_test.go b/agent/internal/features/wal/streamer_test.go new file mode 100644 index 0000000..073a633 --- /dev/null +++ b/agent/internal/features/wal/streamer_test.go @@ -0,0 +1,346 @@ +package wal + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "sync" + "testing" + "time" + + "github.com/klauspost/compress/zstd" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "databasus-agent/internal/config" + "databasus-agent/internal/logger" +) + +func Test_UploadSegment_SingleSegment_ServerReceivesCorrectHeadersAndBody(t *testing.T) { + walDir := createTestWalDir(t) + segmentContent := []byte("test-wal-segment-data-for-upload") + writeTestSegment(t, walDir, "000000010000000100000001", segmentContent) + + var receivedHeaders http.Header + var receivedBody []byte + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + receivedHeaders = r.Header.Clone() + + body, err := io.ReadAll(r.Body) + require.NoError(t, err) + receivedBody = body + + w.WriteHeader(http.StatusNoContent) + })) + defer server.Close() + + cfg := createTestConfig(walDir, server.URL) + streamer := NewStreamer(cfg, logger.GetLogger()) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + go streamer.Run(ctx) + time.Sleep(500 * time.Millisecond) + cancel() + + require.NotNil(t, receivedHeaders) + assert.Equal(t, "test-token", receivedHeaders.Get("Authorization")) + assert.Equal(t, "application/octet-stream", receivedHeaders.Get("Content-Type")) + assert.Equal(t, "wal", receivedHeaders.Get("X-Upload-Type")) + assert.Equal(t, "000000010000000100000001", receivedHeaders.Get("X-Wal-Segment-Name")) + + decompressed := decompressZstd(t, receivedBody) + assert.Equal(t, segmentContent, decompressed) +} + +func Test_UploadSegments_MultipleSegmentsOutOfOrder_UploadedInAscendingOrder(t *testing.T) { + walDir := createTestWalDir(t) + writeTestSegment(t, walDir, "000000010000000100000003", []byte("third")) + writeTestSegment(t, walDir, "000000010000000100000001", []byte("first")) + writeTestSegment(t, walDir, "000000010000000100000002", []byte("second")) + + var mu sync.Mutex + var uploadOrder []string + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + mu.Lock() + uploadOrder = append(uploadOrder, r.Header.Get("X-Wal-Segment-Name")) + mu.Unlock() + + _, _ = io.ReadAll(r.Body) + w.WriteHeader(http.StatusNoContent) + })) + defer server.Close() + + cfg := createTestConfig(walDir, server.URL) + streamer := NewStreamer(cfg, logger.GetLogger()) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + go streamer.Run(ctx) + time.Sleep(500 * time.Millisecond) + cancel() + + mu.Lock() + defer mu.Unlock() + + require.Len(t, uploadOrder, 3) + assert.Equal(t, "000000010000000100000001", uploadOrder[0]) + assert.Equal(t, "000000010000000100000002", uploadOrder[1]) + assert.Equal(t, "000000010000000100000003", uploadOrder[2]) +} + +func Test_UploadSegments_DirectoryHasTmpFiles_TmpFilesIgnored(t *testing.T) { + walDir := createTestWalDir(t) + writeTestSegment(t, walDir, "000000010000000100000001", []byte("real segment")) + writeTestSegment(t, walDir, "000000010000000100000002.tmp", []byte("partial copy")) + + var mu sync.Mutex + var uploadedSegments []string + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + mu.Lock() + uploadedSegments = append(uploadedSegments, r.Header.Get("X-Wal-Segment-Name")) + mu.Unlock() + + _, _ = io.ReadAll(r.Body) + w.WriteHeader(http.StatusNoContent) + })) + defer server.Close() + + cfg := createTestConfig(walDir, server.URL) + streamer := NewStreamer(cfg, logger.GetLogger()) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + go streamer.Run(ctx) + time.Sleep(500 * time.Millisecond) + cancel() + + mu.Lock() + defer mu.Unlock() + + require.Len(t, uploadedSegments, 1) + assert.Equal(t, "000000010000000100000001", uploadedSegments[0]) +} + +func Test_UploadSegment_DeleteEnabled_FileRemovedAfterUpload(t *testing.T) { + walDir := createTestWalDir(t) + segmentName := "000000010000000100000001" + writeTestSegment(t, walDir, segmentName, []byte("segment data")) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = io.ReadAll(r.Body) + w.WriteHeader(http.StatusNoContent) + })) + defer server.Close() + + isDeleteEnabled := true + cfg := createTestConfig(walDir, server.URL) + cfg.IsDeleteWalAfterUpload = &isDeleteEnabled + streamer := NewStreamer(cfg, logger.GetLogger()) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + go streamer.Run(ctx) + time.Sleep(500 * time.Millisecond) + cancel() + + _, err := os.Stat(filepath.Join(walDir, segmentName)) + assert.True(t, os.IsNotExist(err), "segment file should be deleted after successful upload") +} + +func Test_UploadSegment_DeleteDisabled_FileKeptAfterUpload(t *testing.T) { + walDir := createTestWalDir(t) + segmentName := "000000010000000100000001" + writeTestSegment(t, walDir, segmentName, []byte("segment data")) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = io.ReadAll(r.Body) + w.WriteHeader(http.StatusNoContent) + })) + defer server.Close() + + isDeleteDisabled := false + cfg := createTestConfig(walDir, server.URL) + cfg.IsDeleteWalAfterUpload = &isDeleteDisabled + streamer := NewStreamer(cfg, logger.GetLogger()) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + go streamer.Run(ctx) + time.Sleep(500 * time.Millisecond) + cancel() + + _, err := os.Stat(filepath.Join(walDir, segmentName)) + assert.NoError(t, err, "segment file should be kept when delete is disabled") +} + +func Test_UploadSegment_ServerReturns500_FileKeptInQueue(t *testing.T) { + walDir := createTestWalDir(t) + segmentName := "000000010000000100000001" + writeTestSegment(t, walDir, segmentName, []byte("segment data")) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = io.ReadAll(r.Body) + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(`{"error":"internal server error"}`)) + })) + defer server.Close() + + cfg := createTestConfig(walDir, server.URL) + streamer := NewStreamer(cfg, logger.GetLogger()) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + go streamer.Run(ctx) + time.Sleep(500 * time.Millisecond) + cancel() + + _, err := os.Stat(filepath.Join(walDir, segmentName)) + assert.NoError(t, err, "segment file should remain in queue after server error") +} + +func Test_ProcessQueue_EmptyDirectory_NoUploads(t *testing.T) { + walDir := createTestWalDir(t) + + uploadCount := 0 + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + uploadCount++ + w.WriteHeader(http.StatusNoContent) + })) + defer server.Close() + + cfg := createTestConfig(walDir, server.URL) + streamer := NewStreamer(cfg, logger.GetLogger()) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + go streamer.Run(ctx) + time.Sleep(500 * time.Millisecond) + cancel() + + assert.Equal(t, 0, uploadCount, "no uploads should occur for empty directory") +} + +func Test_Run_ContextCancelled_StopsImmediately(t *testing.T) { + walDir := createTestWalDir(t) + + cfg := createTestConfig(walDir, "http://localhost:0") + streamer := NewStreamer(cfg, logger.GetLogger()) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + done := make(chan struct{}) + go func() { + streamer.Run(ctx) + close(done) + }() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("Run should have stopped immediately when context is already cancelled") + } +} + +func Test_UploadSegment_ServerReturns409_FileNotDeleted(t *testing.T) { + walDir := createTestWalDir(t) + segmentName := "000000010000000100000005" + writeTestSegment(t, walDir, segmentName, []byte("gap segment")) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = io.ReadAll(r.Body) + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusConflict) + + resp := uploadErrorResponse{ + Error: "gap_detected", + ExpectedSegmentName: "000000010000000100000003", + ReceivedSegmentName: segmentName, + } + _ = json.NewEncoder(w).Encode(resp) + })) + defer server.Close() + + cfg := createTestConfig(walDir, server.URL) + streamer := NewStreamer(cfg, logger.GetLogger()) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + go streamer.Run(ctx) + time.Sleep(500 * time.Millisecond) + cancel() + + _, err := os.Stat(filepath.Join(walDir, segmentName)) + assert.NoError(t, err, "segment file should not be deleted on gap detection") +} + +func createTestWalDir(t *testing.T) string { + t.Helper() + + baseDir := filepath.Join(".", ".test-tmp") + if err := os.MkdirAll(baseDir, 0o755); err != nil { + t.Fatalf("failed to create base test dir: %v", err) + } + + dir, err := os.MkdirTemp(baseDir, t.Name()+"-*") + if err != nil { + t.Fatalf("failed to create test wal dir: %v", err) + } + + t.Cleanup(func() { + _ = os.RemoveAll(dir) + }) + + return dir +} + +func writeTestSegment(t *testing.T, dir, name string, content []byte) { + t.Helper() + + if err := os.WriteFile(filepath.Join(dir, name), content, 0o644); err != nil { + t.Fatalf("failed to write test segment %s: %v", name, err) + } +} + +func createTestConfig(walDir, serverURL string) *config.Config { + isDeleteEnabled := true + + return &config.Config{ + DatabasusHost: serverURL, + DbID: "test-db-id", + Token: "test-token", + WalDir: walDir, + IsDeleteWalAfterUpload: &isDeleteEnabled, + } +} + +func decompressZstd(t *testing.T, data []byte) []byte { + t.Helper() + + decoder, err := zstd.NewReader(nil) + require.NoError(t, err) + defer decoder.Close() + + decoded, err := decoder.DecodeAll(data, nil) + require.NoError(t, err) + + return decoded +}