FEATURE (upgrader): Add background upgrading of the agent

This commit is contained in:
Rostislav Dugin
2026-03-17 14:38:32 +03:00
parent 6355301903
commit 4b5478e60a
16 changed files with 340 additions and 35 deletions

View File

@@ -1,12 +1,14 @@
package main
import (
"errors"
"flag"
"fmt"
"log/slog"
"os"
"path/filepath"
"strings"
"syscall"
"databasus-agent/internal/config"
"databasus-agent/internal/features/api"
@@ -60,7 +62,11 @@ func runStart(args []string) {
isDev := checkIsDevelopment()
runUpdateCheck(cfg.DatabasusHost, *isSkipUpdate, isDev, log)
if err := start.Start(cfg, log); err != nil {
if err := start.Start(cfg, Version, isDev, log); err != nil {
if errors.Is(err, upgrade.ErrUpgradeRestart) {
reexecAfterUpgrade(log)
}
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
@@ -78,7 +84,11 @@ func runDaemon(args []string) {
cfg := &config.Config{}
cfg.LoadFromJSON()
if err := start.RunDaemon(cfg, log); err != nil {
if err := start.RunDaemon(cfg, Version, checkIsDevelopment(), log); err != nil {
if errors.Is(err, upgrade.ErrUpgradeRestart) {
reexecAfterUpgrade(log)
}
log.Error("Agent exited with error", "error", err)
os.Exit(1)
}
@@ -153,10 +163,15 @@ func runUpdateCheck(host string, isSkipUpdate, isDev bool, log *slog.Logger) {
apiClient := api.NewClient(host, "", log)
if err := upgrade.CheckAndUpdate(apiClient, Version, isDev, log); err != nil {
isUpgraded, err := upgrade.CheckAndUpdate(apiClient, Version, isDev, log)
if err != nil {
log.Error("Auto-update failed", "error", err)
os.Exit(1)
}
if isUpgraded {
reexecAfterUpgrade(log)
}
}
func checkIsDevelopment() bool {
@@ -195,3 +210,18 @@ func parseEnvMode(data []byte) bool {
return false
}
func reexecAfterUpgrade(log *slog.Logger) {
selfPath, err := os.Executable()
if err != nil {
log.Error("Failed to resolve executable for re-exec", "error", err)
os.Exit(1)
}
log.Info("Re-executing after upgrade...")
if err := syscall.Exec(selfPath, os.Args, os.Environ()); err != nil {
log.Error("Failed to re-exec after upgrade", "error", err)
os.Exit(1)
}
}

View File

@@ -24,6 +24,7 @@ func main() {
http.HandleFunc("/api/v1/system/version", s.handleVersion)
http.HandleFunc("/api/v1/system/agent", s.handleAgentDownload)
http.HandleFunc("/mock/set-version", s.handleSetVersion)
http.HandleFunc("/mock/set-binary-path", s.handleSetBinaryPath)
http.HandleFunc("/health", s.handleHealth)
addr := ":" + port
@@ -78,6 +79,29 @@ func (s *server) handleSetVersion(w http.ResponseWriter, r *http.Request) {
_, _ = fmt.Fprintf(w, "version set to %s", body.Version)
}
func (s *server) handleSetBinaryPath(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "POST only", http.StatusMethodNotAllowed)
return
}
var body struct {
BinaryPath string `json:"binaryPath"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
s.mu.Lock()
s.binaryPath = body.BinaryPath
s.mu.Unlock()
log.Printf("POST /mock/set-binary-path -> %s", body.BinaryPath)
_, _ = fmt.Fprintf(w, "binary path set to %s", body.BinaryPath)
}
func (s *server) handleHealth(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ok"))

View File

@@ -27,11 +27,12 @@ run_test() {
if [ "$MODE" = "host" ]; then
run_test "Test 1: Upgrade success (v1 -> v2)" "$SCRIPT_DIR/test-upgrade-success.sh"
run_test "Test 2: Upgrade skip (version matches)" "$SCRIPT_DIR/test-upgrade-skip.sh"
run_test "Test 3: pg_basebackup in PATH" "$SCRIPT_DIR/test-pg-host-path.sh"
run_test "Test 4: pg_basebackup via bindir" "$SCRIPT_DIR/test-pg-host-bindir.sh"
run_test "Test 3: Background upgrade (v1 -> v2 while running)" "$SCRIPT_DIR/test-upgrade-background.sh"
run_test "Test 4: pg_basebackup in PATH" "$SCRIPT_DIR/test-pg-host-path.sh"
run_test "Test 5: pg_basebackup via bindir" "$SCRIPT_DIR/test-pg-host-bindir.sh"
elif [ "$MODE" = "docker" ]; then
run_test "Test 5: pg_basebackup via docker exec" "$SCRIPT_DIR/test-pg-docker-exec.sh"
run_test "Test 6: pg_basebackup via docker exec" "$SCRIPT_DIR/test-pg-docker-exec.sh"
else
echo "Unknown mode: $MODE (expected 'host' or 'docker')"

View File

@@ -5,6 +5,16 @@ ARTIFACTS="/opt/agent/artifacts"
AGENT="/tmp/test-agent"
PG_CONTAINER="e2e-agent-postgres"
# Cleanup from previous runs
pkill -f "test-agent" 2>/dev/null || true
for i in $(seq 1 20); do
pgrep -f "test-agent" > /dev/null 2>&1 || break
sleep 0.5
done
pkill -9 -f "test-agent" 2>/dev/null || true
sleep 0.5
rm -f "$AGENT" "$AGENT.update" databasus.lock databasus.log databasus.log.old databasus.json 2>/dev/null || true
# Copy agent binary
cp "$ARTIFACTS/agent-v1" "$AGENT"
chmod +x "$AGENT"
@@ -26,7 +36,7 @@ OUTPUT=$("$AGENT" start \
--pg-port 5432 \
--pg-user testuser \
--pg-password testpassword \
--wal-dir /tmp/wal \
--pg-wal-dir /tmp/wal \
--pg-type docker \
--pg-docker-container-name "$PG_CONTAINER" 2>&1)

View File

@@ -5,6 +5,16 @@ ARTIFACTS="/opt/agent/artifacts"
AGENT="/tmp/test-agent"
CUSTOM_BIN_DIR="/opt/pg/bin"
# Cleanup from previous runs
pkill -f "test-agent" 2>/dev/null || true
for i in $(seq 1 20); do
pgrep -f "test-agent" > /dev/null 2>&1 || break
sleep 0.5
done
pkill -9 -f "test-agent" 2>/dev/null || true
sleep 0.5
rm -f "$AGENT" "$AGENT.update" databasus.lock databasus.log databasus.log.old databasus.json 2>/dev/null || true
# Copy agent binary
cp "$ARTIFACTS/agent-v1" "$AGENT"
chmod +x "$AGENT"
@@ -32,7 +42,7 @@ OUTPUT=$("$AGENT" start \
--pg-port 5432 \
--pg-user testuser \
--pg-password testpassword \
--wal-dir /tmp/wal \
--pg-wal-dir /tmp/wal \
--pg-type host \
--pg-host-bin-dir "$CUSTOM_BIN_DIR" 2>&1)

View File

@@ -4,6 +4,16 @@ set -euo pipefail
ARTIFACTS="/opt/agent/artifacts"
AGENT="/tmp/test-agent"
# Cleanup from previous runs
pkill -f "test-agent" 2>/dev/null || true
for i in $(seq 1 20); do
pgrep -f "test-agent" > /dev/null 2>&1 || break
sleep 0.5
done
pkill -9 -f "test-agent" 2>/dev/null || true
sleep 0.5
rm -f "$AGENT" "$AGENT.update" databasus.lock databasus.log databasus.log.old databasus.json 2>/dev/null || true
# Copy agent binary
cp "$ARTIFACTS/agent-v1" "$AGENT"
chmod +x "$AGENT"
@@ -25,7 +35,7 @@ OUTPUT=$("$AGENT" start \
--pg-port 5432 \
--pg-user testuser \
--pg-password testpassword \
--wal-dir /tmp/wal \
--pg-wal-dir /tmp/wal \
--pg-type host 2>&1)
EXIT_CODE=$?

View File

@@ -0,0 +1,90 @@
#!/bin/bash
set -euo pipefail
ARTIFACTS="/opt/agent/artifacts"
AGENT="/tmp/test-agent"
# Cleanup from previous runs
pkill -f "test-agent" 2>/dev/null || true
for i in $(seq 1 20); do
pgrep -f "test-agent" > /dev/null 2>&1 || break
sleep 0.5
done
pkill -9 -f "test-agent" 2>/dev/null || true
sleep 0.5
rm -f "$AGENT" "$AGENT.update" databasus.lock databasus.log databasus.log.old databasus.json 2>/dev/null || true
# Set mock server to v1.0.0 (same as agent — no sync upgrade on start)
curl -sf -X POST http://e2e-mock-server:4050/mock/set-version \
-H "Content-Type: application/json" \
-d '{"version":"v1.0.0"}'
curl -sf -X POST http://e2e-mock-server:4050/mock/set-binary-path \
-H "Content-Type: application/json" \
-d '{"binaryPath":"/artifacts/agent-v1"}'
# Copy v1 binary to writable location
cp "$ARTIFACTS/agent-v1" "$AGENT"
chmod +x "$AGENT"
# Verify initial version
VERSION=$("$AGENT" version)
if [ "$VERSION" != "v1.0.0" ]; then
echo "FAIL: Expected initial version v1.0.0, got $VERSION"
exit 1
fi
echo "Initial version: $VERSION"
# Start agent as daemon (versions match → no sync upgrade)
mkdir -p /tmp/wal
"$AGENT" start \
--databasus-host http://e2e-mock-server:4050 \
--db-id test-db-id \
--token test-token \
--pg-host e2e-postgres \
--pg-port 5432 \
--pg-user testuser \
--pg-password testpassword \
--pg-wal-dir /tmp/wal \
--pg-type host
echo "Agent started as daemon, waiting for stabilization..."
sleep 2
# Change mock server to v2.0.0 and point to v2 binary
curl -sf -X POST http://e2e-mock-server:4050/mock/set-version \
-H "Content-Type: application/json" \
-d '{"version":"v2.0.0"}'
curl -sf -X POST http://e2e-mock-server:4050/mock/set-binary-path \
-H "Content-Type: application/json" \
-d '{"binaryPath":"/artifacts/agent-v2"}'
echo "Mock server updated to v2.0.0, waiting for background upgrade..."
# Poll for upgrade (timeout 60s, poll every 3s)
DEADLINE=$((SECONDS + 60))
while [ $SECONDS -lt $DEADLINE ]; do
VERSION=$("$AGENT" version)
if [ "$VERSION" = "v2.0.0" ]; then
echo "Binary upgraded to $VERSION"
break
fi
sleep 3
done
VERSION=$("$AGENT" version)
if [ "$VERSION" != "v2.0.0" ]; then
echo "FAIL: Expected v2.0.0 after background upgrade, got $VERSION"
cat databasus.log 2>/dev/null || true
exit 1
fi
# Verify agent is still running after restart
sleep 2
"$AGENT" status || true
# Cleanup
"$AGENT" stop || true
echo "Background upgrade test passed"

View File

@@ -4,6 +4,16 @@ set -euo pipefail
ARTIFACTS="/opt/agent/artifacts"
AGENT="/tmp/test-agent"
# Cleanup from previous runs
pkill -f "test-agent" 2>/dev/null || true
for i in $(seq 1 20); do
pgrep -f "test-agent" > /dev/null 2>&1 || break
sleep 0.5
done
pkill -9 -f "test-agent" 2>/dev/null || true
sleep 0.5
rm -f "$AGENT" "$AGENT.update" databasus.lock databasus.log databasus.log.old databasus.json 2>/dev/null || true
# Set mock server to return v1.0.0 (same as agent)
curl -sf -X POST http://e2e-mock-server:4050/mock/set-version \
-H "Content-Type: application/json" \
@@ -30,7 +40,7 @@ OUTPUT=$("$AGENT" start \
--pg-port 5432 \
--pg-user testuser \
--pg-password testpassword \
--wal-dir /tmp/wal \
--pg-wal-dir /tmp/wal \
--pg-type host 2>&1) || true
echo "$OUTPUT"

View File

@@ -4,11 +4,25 @@ set -euo pipefail
ARTIFACTS="/opt/agent/artifacts"
AGENT="/tmp/test-agent"
# Ensure mock server returns v2.0.0
# Cleanup from previous runs
pkill -f "test-agent" 2>/dev/null || true
for i in $(seq 1 20); do
pgrep -f "test-agent" > /dev/null 2>&1 || break
sleep 0.5
done
pkill -9 -f "test-agent" 2>/dev/null || true
sleep 0.5
rm -f "$AGENT" "$AGENT.update" databasus.lock databasus.log databasus.log.old databasus.json 2>/dev/null || true
# Ensure mock server returns v2.0.0 and serves v2 binary
curl -sf -X POST http://e2e-mock-server:4050/mock/set-version \
-H "Content-Type: application/json" \
-d '{"version":"v2.0.0"}'
curl -sf -X POST http://e2e-mock-server:4050/mock/set-binary-path \
-H "Content-Type: application/json" \
-d '{"binaryPath":"/artifacts/agent-v2"}'
# Copy v1 binary to writable location
cp "$ARTIFACTS/agent-v1" "$AGENT"
chmod +x "$AGENT"
@@ -37,7 +51,7 @@ OUTPUT=$("$AGENT" start \
--pg-port 5432 \
--pg-user testuser \
--pg-password testpassword \
--wal-dir /tmp/wal \
--pg-wal-dir /tmp/wal \
--pg-type host 2>&1) || true
echo "$OUTPUT"

View File

@@ -374,7 +374,7 @@ func Test_RunFullBackup_WhenStderrParsingFails_FinalizesWithErrorAndRetries(t *t
var mu sync.Mutex
var errorReported bool
var finalizeWithErrorReceived bool
var finalizeBody map[string]interface{}
var finalizeBody map[string]any
server := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {

View File

@@ -95,7 +95,6 @@ func spawnDaemon(log *slog.Logger) (int, error) {
cmd := exec.Command(execPath, args...)
cmd.Dir = cwd
cmd.Stdout = logFile
cmd.Stderr = logFile
cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true}

View File

@@ -20,6 +20,7 @@ import (
"databasus-agent/internal/config"
"databasus-agent/internal/features/api"
full_backup "databasus-agent/internal/features/full_backup"
"databasus-agent/internal/features/upgrade"
"databasus-agent/internal/features/wal"
)
@@ -29,7 +30,7 @@ const (
minPgMajorVersion = 15
)
func Start(cfg *config.Config, log *slog.Logger) error {
func Start(cfg *config.Config, agentVersion string, isDev bool, log *slog.Logger) error {
if err := validateConfig(cfg); err != nil {
return err
}
@@ -43,7 +44,7 @@ func Start(cfg *config.Config, log *slog.Logger) error {
}
if runtime.GOOS == "windows" {
return RunDaemon(cfg, log)
return RunDaemon(cfg, agentVersion, isDev, log)
}
pid, err := spawnDaemon(log)
@@ -56,7 +57,7 @@ func Start(cfg *config.Config, log *slog.Logger) error {
return nil
}
func RunDaemon(cfg *config.Config, log *slog.Logger) error {
func RunDaemon(cfg *config.Config, agentVersion string, isDev bool, log *slog.Logger) error {
lockFile, err := AcquireLock(log)
if err != nil {
return err
@@ -74,12 +75,26 @@ func RunDaemon(cfg *config.Config, log *slog.Logger) error {
apiClient := api.NewClient(cfg.DatabasusHost, cfg.Token, log)
var backgroundUpgrader *upgrade.BackgroundUpgrader
if agentVersion != "dev" && runtime.GOOS != "windows" {
backgroundUpgrader = upgrade.NewBackgroundUpgrader(apiClient, agentVersion, isDev, cancel, log)
go backgroundUpgrader.Run(ctx)
}
fullBackuper := full_backup.NewFullBackuper(cfg, apiClient, log)
go fullBackuper.Run(ctx)
streamer := wal.NewStreamer(cfg, apiClient, log)
streamer.Run(ctx)
if backgroundUpgrader != nil {
backgroundUpgrader.WaitForCompletion(30 * time.Second)
if backgroundUpgrader.IsUpgraded() {
return upgrade.ErrUpgradeRestart
}
}
log.Info("Agent stopped")
return nil

View File

@@ -0,0 +1,88 @@
package upgrade
import (
"context"
"log/slog"
"sync/atomic"
"time"
"databasus-agent/internal/features/api"
)
const backgroundCheckInterval = 5 * time.Second
type BackgroundUpgrader struct {
apiClient *api.Client
currentVersion string
isDev bool
cancel context.CancelFunc
isUpgraded atomic.Bool
log *slog.Logger
done chan struct{}
}
func NewBackgroundUpgrader(
apiClient *api.Client,
currentVersion string,
isDev bool,
cancel context.CancelFunc,
log *slog.Logger,
) *BackgroundUpgrader {
return &BackgroundUpgrader{
apiClient,
currentVersion,
isDev,
cancel,
atomic.Bool{},
log,
make(chan struct{}),
}
}
func (u *BackgroundUpgrader) Run(ctx context.Context) {
defer close(u.done)
ticker := time.NewTicker(backgroundCheckInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if u.checkAndUpgrade() {
return
}
}
}
}
func (u *BackgroundUpgrader) IsUpgraded() bool {
return u.isUpgraded.Load()
}
func (u *BackgroundUpgrader) WaitForCompletion(timeout time.Duration) {
select {
case <-u.done:
case <-time.After(timeout):
}
}
func (u *BackgroundUpgrader) checkAndUpgrade() bool {
isUpgraded, err := CheckAndUpdate(u.apiClient, u.currentVersion, u.isDev, u.log)
if err != nil {
u.log.Warn("Background update check failed", "error", err)
return false
}
if !isUpgraded {
return false
}
u.log.Info("Background upgrade complete, restarting...")
u.isUpgraded.Store(true)
u.cancel()
return true
}

View File

@@ -0,0 +1,5 @@
package upgrade
import "errors"
var ErrUpgradeRestart = errors.New("agent upgraded, restart required")

View File

@@ -8,26 +8,25 @@ import (
"os/exec"
"runtime"
"strings"
"syscall"
"databasus-agent/internal/features/api"
)
// CheckAndUpdate ensures the agent binary matches the server's expected version.
// It fetches the server version, downloads the new binary if different, verifies it,
// replaces the current executable, and re-execs the process with the same arguments.
// Skipped in development mode. Runs once on startup before the main agent loop.
func CheckAndUpdate(apiClient *api.Client, currentVersion string, isDev bool, log *slog.Logger) error {
// CheckAndUpdate checks if a new version is available and upgrades the binary on disk.
// Returns (true, nil) if the binary was upgraded, (false, nil) if already up to date,
// or (false, err) on failure. Callers are responsible for re-exec or restart signaling.
func CheckAndUpdate(apiClient *api.Client, currentVersion string, isDev bool, log *slog.Logger) (bool, error) {
if isDev {
log.Info("Skipping update check (development mode)")
return nil
return false, nil
}
serverVersion, err := apiClient.FetchServerVersion(context.Background())
if err != nil {
log.Warn("Could not reach server for update check, continuing", "error", err)
log.Warn("Could not reach server for update check", "error", err)
return fmt.Errorf(
return false, fmt.Errorf(
"unable to check version, please verify Databasus server is available: %w",
err,
)
@@ -35,14 +34,15 @@ func CheckAndUpdate(apiClient *api.Client, currentVersion string, isDev bool, lo
if serverVersion == currentVersion {
log.Info("Agent version is up to date", "version", currentVersion)
return nil
return false, nil
}
log.Info("Updating agent...", "current", currentVersion, "target", serverVersion)
selfPath, err := os.Executable()
if err != nil {
return fmt.Errorf("failed to determine executable path: %w", err)
return false, fmt.Errorf("failed to determine executable path: %w", err)
}
tempPath := selfPath + ".update"
@@ -52,24 +52,24 @@ func CheckAndUpdate(apiClient *api.Client, currentVersion string, isDev bool, lo
}()
if err := apiClient.DownloadAgentBinary(context.Background(), runtime.GOARCH, tempPath); err != nil {
return fmt.Errorf("failed to download update: %w", err)
return false, fmt.Errorf("failed to download update: %w", err)
}
if err := os.Chmod(tempPath, 0o755); err != nil {
return fmt.Errorf("failed to set permissions on update: %w", err)
return false, fmt.Errorf("failed to set permissions on update: %w", err)
}
if err := verifyBinary(tempPath, serverVersion); err != nil {
return fmt.Errorf("update verification failed: %w", err)
return false, fmt.Errorf("update verification failed: %w", err)
}
if err := os.Rename(tempPath, selfPath); err != nil {
return fmt.Errorf("failed to replace binary (try --skip-update if this persists): %w", err)
return false, fmt.Errorf("failed to replace binary (try --skip-update if this persists): %w", err)
}
log.Info("Update complete, re-executing...")
log.Info("Agent binary updated", "version", serverVersion)
return syscall.Exec(selfPath, os.Args, os.Environ())
return true, nil
}
func verifyBinary(binaryPath, expectedVersion string) error {

View File

@@ -52,7 +52,6 @@ func Test_UploadSegment_SingleSegment_ServerReceivesCorrectHeadersAndBody(t *tes
require.NotNil(t, receivedHeaders)
assert.Equal(t, "test-token", receivedHeaders.Get("Authorization"))
assert.Equal(t, "application/octet-stream", receivedHeaders.Get("Content-Type"))
assert.Equal(t, "wal", receivedHeaders.Get("X-Upload-Type"))
assert.Equal(t, "000000010000000100000001", receivedHeaders.Get("X-Wal-Segment-Name"))
decompressed := decompressZstd(t, receivedBody)