FEATURE (backups): Move backups cancellation to Valkey pub\sub

This commit is contained in:
Rostislav Dugin
2026-01-11 14:26:48 +03:00
parent 011985d723
commit 997fc01442
17 changed files with 687 additions and 37 deletions

View File

@@ -17,7 +17,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: "1.24.4"
go-version: "1.24.9"
- name: Cache Go modules
uses: actions/cache@v4
@@ -134,7 +134,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: "1.24.4"
go-version: "1.24.9"
- name: Cache Go modules
uses: actions/cache@v4
@@ -221,6 +221,12 @@ jobs:
TEST_MONGODB_60_PORT=27060
TEST_MONGODB_70_PORT=27070
TEST_MONGODB_82_PORT=27082
# Valkey (cache)
VALKEY_HOST=localhost
VALKEY_PORT=6379
VALKEY_USERNAME=
VALKEY_PASSWORD=
VALKEY_IS_SSL=false
EOF
- name: Start test containers
@@ -233,6 +239,11 @@ jobs:
# Wait for main dev database
timeout 60 bash -c 'until docker exec dev-db pg_isready -h localhost -p 5437 -U postgres; do sleep 2; done'
# Wait for Valkey (cache)
echo "Waiting for Valkey..."
timeout 60 bash -c 'until docker exec dev-valkey valkey-cli ping 2>/dev/null | grep -q PONG; do sleep 2; done'
echo "Valkey is ready!"
# Wait for test databases
timeout 60 bash -c 'until nc -z localhost 5000; do sleep 2; done'
timeout 60 bash -c 'until nc -z localhost 5001; do sleep 2; done'

View File

@@ -22,7 +22,7 @@ RUN npm run build
# ========= BUILD BACKEND =========
# Backend build stage
FROM --platform=$BUILDPLATFORM golang:1.24.4 AS backend-build
FROM --platform=$BUILDPLATFORM golang:1.24.9 AS backend-build
# Make TARGET args available early so tools built here match the final image arch
ARG TARGETOS
@@ -123,6 +123,15 @@ RUN wget -qO- https://www.postgresql.org/media/keys/ACCC4CF8.asc | apt-key add -
apt-get install -y --no-install-recommends postgresql-17 && \
rm -rf /var/lib/apt/lists/*
# Install Valkey server from debian repository
# Valkey is only accessible internally (localhost) - not exposed outside container
RUN wget -O /usr/share/keyrings/greensec.github.io-valkey-debian.key https://greensec.github.io/valkey-debian/public.key && \
echo "deb [signed-by=/usr/share/keyrings/greensec.github.io-valkey-debian.key] https://greensec.github.io/valkey-debian/repo $(lsb_release -cs) main" \
> /etc/apt/sources.list.d/valkey-debian.list && \
apt-get update && \
apt-get install -y --no-install-recommends valkey && \
rm -rf /var/lib/apt/lists/*
# ========= Install rclone =========
RUN apt-get update && \
apt-get install -y --no-install-recommends rclone && \
@@ -250,6 +259,30 @@ mkdir -p /databasus-data/backups
chown -R postgres:postgres /databasus-data
chmod 700 /databasus-data/temp
# ========= Start Valkey (internal cache) =========
echo "Configuring Valkey cache..."
cat > /tmp/valkey.conf << 'VALKEY_CONFIG'
port 6379
bind 127.0.0.1
protected-mode yes
save ""
maxmemory 256mb
maxmemory-policy allkeys-lru
VALKEY_CONFIG
echo "Starting Valkey..."
valkey-server /tmp/valkey.conf &
VALKEY_PID=\$!
echo "Waiting for Valkey to be ready..."
for i in {1..30}; do
if valkey-cli ping >/dev/null 2>&1; then
echo "Valkey is ready!"
break
fi
sleep 1
done
# Initialize PostgreSQL if not already initialized
if [ ! -s "/databasus-data/pgdata/PG_VERSION" ]; then
echo "Initializing PostgreSQL database..."

View File

@@ -11,6 +11,12 @@ DATABASE_URL=postgres://postgres:Q1234567@dev-db:5437/databasus?sslmode=disable
GOOSE_DRIVER=postgres
GOOSE_DBSTRING=postgres://postgres:Q1234567@dev-db:5437/databasus?sslmode=disable
GOOSE_MIGRATION_DIR=./migrations
# valkey
VALKEY_HOST=127.0.0.1
VALKEY_PORT=6379
VALKEY_USERNAME=
VALKEY_PASSWORD=
VALKEY_IS_SSL=false
# testing
# to get Google Drive env variables: add storage in UI and copy data from added storage here
TEST_GOOGLE_DRIVE_CLIENT_ID=

View File

@@ -10,4 +10,10 @@ DATABASE_URL=postgres://postgres:Q1234567@localhost:5437/databasus?sslmode=disab
# migrations
GOOSE_DRIVER=postgres
GOOSE_DBSTRING=postgres://postgres:Q1234567@localhost:5437/databasus?sslmode=disable
GOOSE_MIGRATION_DIR=./migrations
GOOSE_MIGRATION_DIR=./migrations
# valkey
VALKEY_HOST=127.0.0.1
VALKEY_PORT=6379
VALKEY_USERNAME=
VALKEY_PASSWORD=
VALKEY_IS_SSL=false

3
backend/.gitignore vendored
View File

@@ -17,4 +17,5 @@ ui/build/*
pgdata-for-restore/
temp/
cmd.exe
temp/
temp/
valkey-data/

View File

@@ -29,6 +29,7 @@ import (
users_middleware "databasus-backend/internal/features/users/middleware"
users_services "databasus-backend/internal/features/users/services"
workspaces_controllers "databasus-backend/internal/features/workspaces/controllers"
cache_utils "databasus-backend/internal/util/cache"
env_utils "databasus-backend/internal/util/env"
files_utils "databasus-backend/internal/util/files"
"databasus-backend/internal/util/logger"
@@ -52,10 +53,17 @@ import (
func main() {
log := logger.GetLogger()
cache_utils.TestCacheConnection()
err := cache_utils.ClearAllCache()
if err != nil {
log.Error("Failed to clear cache", "error", err)
os.Exit(1)
}
runMigrations(log)
// create directories that used for backups and restore
err := files_utils.EnsureDirectories([]string{
err = files_utils.EnsureDirectories([]string{
config.GetEnv().TempFolder,
config.GetEnv().DataFolder,
})
@@ -290,16 +298,13 @@ func generateSwaggerDocs(log *slog.Logger) {
func runMigrations(log *slog.Logger) {
log.Info("Running database migrations...")
cmd := exec.Command("goose", "up")
cmd := exec.Command("goose", "-dir", "./migrations", "up")
cmd.Env = append(
os.Environ(),
"GOOSE_DRIVER=postgres",
"GOOSE_DBSTRING="+config.GetEnv().DatabaseDsn,
)
// Set the working directory to where migrations are located
cmd.Dir = "./migrations"
output, err := cmd.CombinedOutput()
if err != nil {
log.Error("Failed to run migrations", "error", err, "output", string(output))

View File

@@ -19,6 +19,21 @@ services:
command: -p 5437
shm_size: 10gb
# Valkey for caching
dev-valkey:
image: valkey/valkey:9.0.1-alpine
ports:
- "${VALKEY_PORT:-6379}:6379"
volumes:
- ./valkey-data:/data
container_name: dev-valkey
healthcheck:
test: ["CMD", "valkey-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
start_period: 20s
# Test MinIO container
test-minio:
image: minio/minio:latest

View File

@@ -1,6 +1,6 @@
module databasus-backend
go 1.24.4
go 1.24.9
require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.20.0
@@ -171,6 +171,7 @@ require (
github.com/tklauser/go-sysconf v0.3.15 // indirect
github.com/tklauser/numcpus v0.10.0 // indirect
github.com/ulikunitz/xz v0.5.15 // indirect
github.com/valkey-io/valkey-go v1.0.70 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
github.com/xanzy/ssh-agent v0.3.3 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
@@ -269,7 +270,7 @@ require (
go.opentelemetry.io/otel/metric v1.38.0 // indirect
go.opentelemetry.io/otel/trace v1.38.0 // indirect
golang.org/x/arch v0.17.0 // indirect
golang.org/x/net v0.47.0 // indirect
golang.org/x/net v0.48.0 // indirect
golang.org/x/oauth2 v0.33.0
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.39.0 // indirect

View File

@@ -541,6 +541,7 @@ github.com/onsi/ginkgo/v2 v2.17.3 h1:oJcvKpIb7/8uLpDDtnQuf18xVnwKp8DTD7DQ6gTd/MU
github.com/onsi/ginkgo/v2 v2.17.3/go.mod h1:nP2DPOQoNsQmsVyv5rDA8JkXQoCs6goXIvr/PRJ1eCc=
github.com/onsi/gomega v1.37.0 h1:CdEG8g0S133B4OswTDC/5XPSzE1OeP29QOioj2PID2Y=
github.com/onsi/gomega v1.37.0/go.mod h1:8D9+Txp43QWKhM24yyOBEdpkzN8FvJyAwecBgsU4KU0=
github.com/onsi/gomega v1.38.3 h1:eTX+W6dobAYfFeGC2PV6RwXRu/MyT+cQguijutvkpSM=
github.com/oracle/oci-go-sdk/v65 v65.104.0 h1:l9awEvzWvxmYhy/97A0hZ87pa7BncYXmcO/S8+rvgK0=
github.com/oracle/oci-go-sdk/v65 v65.104.0/go.mod h1:oB8jFGVc/7/zJ+DbleE8MzGHjhs2ioCz5stRTdZdIcY=
github.com/panjf2000/ants/v2 v2.11.3 h1:AfI0ngBoXJmYOpDh9m516vjqoUu2sLrIVgppI9TZVpg=
@@ -660,6 +661,8 @@ github.com/ulikunitz/xz v0.5.15 h1:9DNdB5s+SgV3bQ2ApL10xRc35ck0DuIX/isZvIk+ubY=
github.com/ulikunitz/xz v0.5.15/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14=
github.com/unknwon/goconfig v1.0.0 h1:rS7O+CmUdli1T+oDm7fYj1MwqNWtEJfNj+FqcUHML8U=
github.com/unknwon/goconfig v1.0.0/go.mod h1:qu2ZQ/wcC/if2u32263HTVC39PeOQRSmidQk3DuDFQ8=
github.com/valkey-io/valkey-go v1.0.70 h1:mjYNT8qiazxDAJ0QNQ8twWT/YFOkOoRd40ERV2mB49Y=
github.com/valkey-io/valkey-go v1.0.70/go.mod h1:VGhZ6fs68Qrn2+OhH+6waZH27bjpgQOiLyUQyXuYK5k=
github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc=
github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw=
github.com/xanzy/ssh-agent v0.3.3 h1:+/15pJfg/RsTxqYcX6fHqOXZwwMP+2VyYWJeWM2qQFM=
@@ -820,6 +823,8 @@ golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY=
golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU=
golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU=
golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=

View File

@@ -79,6 +79,13 @@ type EnvVariables struct {
TestMongodb70Port string `env:"TEST_MONGODB_70_PORT"`
TestMongodb82Port string `env:"TEST_MONGODB_82_PORT"`
// Valkey
ValkeyHost string `env:"VALKEY_HOST" required:"true"`
ValkeyPort string `env:"VALKEY_PORT" required:"true"`
ValkeyUsername string `env:"VALKEY_USERNAME"`
ValkeyPassword string `env:"VALKEY_PASSWORD"`
ValkeyIsSsl bool `env:"VALKEY_IS_SSL" required:"true"`
// oauth
GitHubClientID string `env:"GITHUB_CLIENT_ID"`
GitHubClientSecret string `env:"GITHUB_CLIENT_SECRET"`
@@ -189,6 +196,16 @@ func loadEnvVariables() {
env.MongodbInstallDir = filepath.Join(backendRoot, "tools", "mongodb")
tools.VerifyMongodbInstallation(log, env.EnvMode, env.MongodbInstallDir)
// Valkey
if env.ValkeyHost == "" {
log.Error("VALKEY_HOST is empty")
os.Exit(1)
}
if env.ValkeyPort == "" {
log.Error("VALKEY_PORT is empty")
os.Exit(1)
}
// Store the data and temp folders one level below the root
// (projectRoot/databasus-data -> /databasus-data)
env.DataFolder = filepath.Join(filepath.Dir(backendRoot), "databasus-data", "backups")

View File

@@ -75,6 +75,9 @@ func Test_MakeBackupForDbHavingBackupDayAgo_BackupCreated(t *testing.T) {
backups, err := backupRepository.FindByDatabaseID(database.ID)
assert.NoError(t, err)
assert.Len(t, backups, 2)
// Wait for any cleanup operations to complete before defer cleanup runs
time.Sleep(200 * time.Millisecond)
}
func Test_MakeBackupForDbHavingHourAgoBackup_BackupSkipped(t *testing.T) {
@@ -135,6 +138,9 @@ func Test_MakeBackupForDbHavingHourAgoBackup_BackupSkipped(t *testing.T) {
backups, err := backupRepository.FindByDatabaseID(database.ID)
assert.NoError(t, err)
assert.Len(t, backups, 1) // Should still be 1 backup, no new backup created
// Wait for any cleanup operations to complete before defer cleanup runs
time.Sleep(200 * time.Millisecond)
}
func Test_MakeBackupHavingFailedBackupWithoutRetries_BackupSkipped(t *testing.T) {
@@ -199,6 +205,9 @@ func Test_MakeBackupHavingFailedBackupWithoutRetries_BackupSkipped(t *testing.T)
backups, err := backupRepository.FindByDatabaseID(database.ID)
assert.NoError(t, err)
assert.Len(t, backups, 1) // Should still be 1 backup, no retry attempted
// Wait for any cleanup operations to complete before defer cleanup runs
time.Sleep(200 * time.Millisecond)
}
func Test_MakeBackupHavingFailedBackupWithRetries_BackupCreated(t *testing.T) {
@@ -264,6 +273,9 @@ func Test_MakeBackupHavingFailedBackupWithRetries_BackupCreated(t *testing.T) {
backups, err := backupRepository.FindByDatabaseID(database.ID)
assert.NoError(t, err)
assert.Len(t, backups, 2) // Should have 2 backups, retry was attempted
// Wait for any cleanup operations to complete before defer cleanup runs
time.Sleep(200 * time.Millisecond)
}
func Test_MakeBackupHavingFailedBackupWithRetries_RetriesCountNotExceeded(t *testing.T) {
@@ -330,6 +342,9 @@ func Test_MakeBackupHavingFailedBackupWithRetries_RetriesCountNotExceeded(t *tes
backups, err := backupRepository.FindByDatabaseID(database.ID)
assert.NoError(t, err)
assert.Len(t, backups, 3) // Should have 3 backups, not more than max
// Wait for any cleanup operations to complete before defer cleanup runs
time.Sleep(200 * time.Millisecond)
}
func Test_MakeBackgroundBackupWhenBakupsDisabled_BackupSkipped(t *testing.T) {
@@ -386,4 +401,7 @@ func Test_MakeBackgroundBackupWhenBakupsDisabled_BackupSkipped(t *testing.T) {
backups, err := backupRepository.FindByDatabaseID(database.ID)
assert.NoError(t, err)
assert.Len(t, backups, 1)
// Wait for any cleanup operations to complete before defer cleanup runs
time.Sleep(200 * time.Millisecond)
}

View File

@@ -2,21 +2,57 @@ package backups
import (
"context"
cache_utils "databasus-backend/internal/util/cache"
"databasus-backend/internal/util/logger"
"log/slog"
"sync"
"github.com/google/uuid"
)
const backupCancelChannel = "backup:cancel"
type BackupContextManager struct {
mu sync.RWMutex
cancelFuncs map[uuid.UUID]context.CancelFunc
cancelledBackups map[uuid.UUID]bool
mu sync.RWMutex
cancelFuncs map[uuid.UUID]context.CancelFunc
pubsub *cache_utils.PubSubManager
logger *slog.Logger
}
func NewBackupContextManager() *BackupContextManager {
return &BackupContextManager{
cancelFuncs: make(map[uuid.UUID]context.CancelFunc),
cancelledBackups: make(map[uuid.UUID]bool),
cancelFuncs: make(map[uuid.UUID]context.CancelFunc),
pubsub: cache_utils.NewPubSubManager(),
logger: logger.GetLogger(),
}
}
func (m *BackupContextManager) StartSubscription() {
ctx := context.Background()
handler := func(message string) {
backupID, err := uuid.Parse(message)
if err != nil {
m.logger.Error("Invalid backup ID in cancel message", "message", message, "error", err)
return
}
m.mu.Lock()
defer m.mu.Unlock()
cancelFunc, exists := m.cancelFuncs[backupID]
if exists {
cancelFunc()
delete(m.cancelFuncs, backupID)
m.logger.Info("Cancelled backup via Pub/Sub", "backupID", backupID)
}
}
err := m.pubsub.Subscribe(ctx, backupCancelChannel, handler)
if err != nil {
m.logger.Error("Failed to subscribe to backup cancel channel", "error", err)
} else {
m.logger.Info("Successfully subscribed to backup cancel channel")
}
}
@@ -24,37 +60,25 @@ func (m *BackupContextManager) RegisterBackup(backupID uuid.UUID, cancelFunc con
m.mu.Lock()
defer m.mu.Unlock()
m.cancelFuncs[backupID] = cancelFunc
delete(m.cancelledBackups, backupID)
m.logger.Debug("Registered backup", "backupID", backupID)
}
func (m *BackupContextManager) CancelBackup(backupID uuid.UUID) error {
m.mu.Lock()
defer m.mu.Unlock()
ctx := context.Background()
if m.cancelledBackups[backupID] {
return nil
err := m.pubsub.Publish(ctx, backupCancelChannel, backupID.String())
if err != nil {
m.logger.Error("Failed to publish cancel message", "backupID", backupID, "error", err)
return err
}
cancelFunc, exists := m.cancelFuncs[backupID]
if exists {
cancelFunc()
delete(m.cancelFuncs, backupID)
}
m.cancelledBackups[backupID] = true
m.logger.Info("Published backup cancel message", "backupID", backupID)
return nil
}
func (m *BackupContextManager) IsCancelled(backupID uuid.UUID) bool {
m.mu.RLock()
defer m.mu.RUnlock()
return m.cancelledBackups[backupID]
}
func (m *BackupContextManager) UnregisterBackup(backupID uuid.UUID) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.cancelFuncs, backupID)
delete(m.cancelledBackups, backupID)
m.logger.Debug("Unregistered backup", "backupID", backupID)
}

View File

@@ -0,0 +1,200 @@
package backups
import (
"context"
"sync"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
)
func Test_RegisterBackup_BackupRegisteredSuccessfully(t *testing.T) {
manager := NewBackupContextManager()
backupID := uuid.New()
_, cancel := context.WithCancel(context.Background())
defer cancel()
manager.RegisterBackup(backupID, cancel)
manager.mu.RLock()
_, exists := manager.cancelFuncs[backupID]
manager.mu.RUnlock()
assert.True(t, exists, "Backup should be registered")
}
func Test_UnregisterBackup_BackupUnregisteredSuccessfully(t *testing.T) {
manager := NewBackupContextManager()
backupID := uuid.New()
_, cancel := context.WithCancel(context.Background())
defer cancel()
manager.RegisterBackup(backupID, cancel)
manager.UnregisterBackup(backupID)
manager.mu.RLock()
_, exists := manager.cancelFuncs[backupID]
manager.mu.RUnlock()
assert.False(t, exists, "Backup should be unregistered")
}
func Test_CancelBackup_OnSameInstance_BackupCancelledViaPubSub(t *testing.T) {
manager := NewBackupContextManager()
backupID := uuid.New()
ctx, cancel := context.WithCancel(context.Background())
cancelled := false
var mu sync.Mutex
wrappedCancel := func() {
mu.Lock()
cancelled = true
mu.Unlock()
cancel()
}
manager.RegisterBackup(backupID, wrappedCancel)
manager.StartSubscription()
time.Sleep(100 * time.Millisecond)
err := manager.CancelBackup(backupID)
assert.NoError(t, err, "Cancel should not return error")
time.Sleep(500 * time.Millisecond)
mu.Lock()
wasCancelled := cancelled
mu.Unlock()
assert.True(t, wasCancelled, "Cancel function should have been called")
assert.Error(t, ctx.Err(), "Context should be cancelled")
}
func Test_CancelBackup_FromDifferentInstance_BackupCancelledOnRunningInstance(t *testing.T) {
manager1 := NewBackupContextManager()
manager2 := NewBackupContextManager()
backupID := uuid.New()
ctx, cancel := context.WithCancel(context.Background())
cancelled := false
var mu sync.Mutex
wrappedCancel := func() {
mu.Lock()
cancelled = true
mu.Unlock()
cancel()
}
manager1.RegisterBackup(backupID, wrappedCancel)
manager1.StartSubscription()
manager2.StartSubscription()
time.Sleep(100 * time.Millisecond)
err := manager2.CancelBackup(backupID)
assert.NoError(t, err, "Cancel should not return error")
time.Sleep(500 * time.Millisecond)
mu.Lock()
wasCancelled := cancelled
mu.Unlock()
assert.True(t, wasCancelled, "Cancel function should have been called on instance 1")
assert.Error(t, ctx.Err(), "Context should be cancelled")
}
func Test_CancelBackup_WhenBackupDoesNotExist_NoErrorReturned(t *testing.T) {
manager := NewBackupContextManager()
manager.StartSubscription()
time.Sleep(100 * time.Millisecond)
nonExistentID := uuid.New()
err := manager.CancelBackup(nonExistentID)
assert.NoError(t, err, "Cancelling non-existent backup should not error")
}
func Test_CancelBackup_WithMultipleBackups_AllBackupsCancelled(t *testing.T) {
manager := NewBackupContextManager()
numBackups := 5
backupIDs := make([]uuid.UUID, numBackups)
contexts := make([]context.Context, numBackups)
cancels := make([]context.CancelFunc, numBackups)
cancelledFlags := make([]bool, numBackups)
var mu sync.Mutex
for i := 0; i < numBackups; i++ {
backupIDs[i] = uuid.New()
contexts[i], cancels[i] = context.WithCancel(context.Background())
idx := i
wrappedCancel := func() {
mu.Lock()
cancelledFlags[idx] = true
mu.Unlock()
cancels[idx]()
}
manager.RegisterBackup(backupIDs[i], wrappedCancel)
}
manager.StartSubscription()
time.Sleep(100 * time.Millisecond)
for i := 0; i < numBackups; i++ {
err := manager.CancelBackup(backupIDs[i])
assert.NoError(t, err, "Cancel should not return error")
}
time.Sleep(1 * time.Second)
mu.Lock()
for i := 0; i < numBackups; i++ {
assert.True(t, cancelledFlags[i], "Backup %d should be cancelled", i)
assert.Error(t, contexts[i].Err(), "Context %d should be cancelled", i)
}
mu.Unlock()
}
func Test_CancelBackup_AfterUnregister_BackupNotCancelled(t *testing.T) {
manager := NewBackupContextManager()
backupID := uuid.New()
_, cancel := context.WithCancel(context.Background())
defer cancel()
cancelled := false
var mu sync.Mutex
wrappedCancel := func() {
mu.Lock()
cancelled = true
mu.Unlock()
cancel()
}
manager.RegisterBackup(backupID, wrappedCancel)
manager.StartSubscription()
time.Sleep(100 * time.Millisecond)
manager.UnregisterBackup(backupID)
err := manager.CancelBackup(backupID)
assert.NoError(t, err, "Cancel should not return error")
time.Sleep(500 * time.Millisecond)
mu.Lock()
wasCancelled := cancelled
mu.Unlock()
assert.False(t, wasCancelled, "Cancel function should not be called after unregister")
}

View File

@@ -58,6 +58,8 @@ func SetupDependencies() {
databases.GetDatabaseService().AddDbRemoveListener(backupService)
databases.GetDatabaseService().AddDbCopyListener(backups_config.GetBackupConfigService())
backupContextManager.StartSubscription()
}
func GetBackupService() *BackupService {

121
backend/internal/util/cache/cache.go vendored Normal file
View File

@@ -0,0 +1,121 @@
package cache_utils
import (
"context"
"crypto/tls"
"databasus-backend/internal/config"
"sync"
"github.com/valkey-io/valkey-go"
)
var (
once sync.Once
valkeyClient valkey.Client
)
func getCache() valkey.Client {
once.Do(func() {
env := config.GetEnv()
options := valkey.ClientOption{
InitAddress: []string{env.ValkeyHost + ":" + env.ValkeyPort},
Password: env.ValkeyPassword,
Username: env.ValkeyUsername,
}
if env.ValkeyIsSsl {
options.TLSConfig = &tls.Config{
ServerName: env.ValkeyHost,
}
}
client, err := valkey.NewClient(options)
if err != nil {
panic(err)
}
valkeyClient = client
})
return valkeyClient
}
func TestCacheConnection() {
// Get Valkey client from cache package
client := getCache()
// Create a simple test cache util for strings
cacheUtil := NewCacheUtil[string](client, "test:")
// Test data
testKey := "connection_test"
testValue := "valkey_is_working"
// Test Set operation
cacheUtil.Set(testKey, &testValue)
// Test Get operation
retrievedValue := cacheUtil.Get(testKey)
// Verify the value was retrieved correctly
if retrievedValue == nil {
panic("Cache test failed: could not retrieve cached value")
}
if *retrievedValue != testValue {
panic("Cache test failed: retrieved value does not match expected")
}
// Clean up - remove test key
cacheUtil.Invalidate(testKey)
// Verify cleanup worked
cleanupCheck := cacheUtil.Get(testKey)
if cleanupCheck != nil {
panic("Cache test failed: test key was not properly invalidated")
}
}
func ClearAllCache() error {
pattern := "*"
cursor := uint64(0)
batchSize := int64(100)
cacheUtil := NewCacheUtil[string](getCache(), "")
for {
ctx, cancel := context.WithTimeout(context.Background(), DefaultQueueTimeout)
result := cacheUtil.client.Do(
ctx,
cacheUtil.client.B().Scan().Cursor(cursor).Match(pattern).Count(batchSize).Build(),
)
cancel()
if result.Error() != nil {
return result.Error()
}
scanResult, err := result.AsScanEntry()
if err != nil {
return err
}
if len(scanResult.Elements) > 0 {
delCtx, delCancel := context.WithTimeout(context.Background(), cacheUtil.timeout)
cacheUtil.client.Do(
delCtx,
cacheUtil.client.B().Del().Key(scanResult.Elements...).Build(),
)
delCancel()
}
cursor = scanResult.Cursor
if cursor == 0 {
break
}
}
return nil
}

109
backend/internal/util/cache/pubsub.go vendored Normal file
View File

@@ -0,0 +1,109 @@
package cache_utils
import (
"context"
"fmt"
"log/slog"
"sync"
"databasus-backend/internal/util/logger"
"github.com/valkey-io/valkey-go"
)
type PubSubManager struct {
client valkey.Client
subscriptions map[string]context.CancelFunc
mu sync.RWMutex
logger *slog.Logger
}
func NewPubSubManager() *PubSubManager {
return &PubSubManager{
client: getCache(),
subscriptions: make(map[string]context.CancelFunc),
logger: logger.GetLogger(),
}
}
func (m *PubSubManager) Subscribe(
ctx context.Context,
channel string,
handler func(message string),
) error {
m.mu.Lock()
if _, exists := m.subscriptions[channel]; exists {
m.mu.Unlock()
return fmt.Errorf("already subscribed to channel: %s", channel)
}
subCtx, cancel := context.WithCancel(ctx)
m.subscriptions[channel] = cancel
m.mu.Unlock()
go m.subscriptionLoop(subCtx, channel, handler)
return nil
}
func (m *PubSubManager) Publish(ctx context.Context, channel string, message string) error {
cmd := m.client.B().Publish().Channel(channel).Message(message).Build()
result := m.client.Do(ctx, cmd)
if err := result.Error(); err != nil {
m.logger.Error("Failed to publish message to Redis", "channel", channel, "error", err)
return fmt.Errorf("failed to publish message: %w", err)
}
return nil
}
func (m *PubSubManager) Close() error {
m.mu.Lock()
defer m.mu.Unlock()
for channel, cancel := range m.subscriptions {
cancel()
delete(m.subscriptions, channel)
}
return nil
}
func (m *PubSubManager) subscriptionLoop(
ctx context.Context,
channel string,
handler func(message string),
) {
defer func() {
if r := recover(); r != nil {
m.logger.Error("Panic in subscription handler", "channel", channel, "panic", r)
}
}()
m.logger.Info("Starting subscription", "channel", channel)
err := m.client.Receive(
ctx,
m.client.B().Subscribe().Channel(channel).Build(),
func(msg valkey.PubSubMessage) {
defer func() {
if r := recover(); r != nil {
m.logger.Error("Panic in message handler", "channel", channel, "panic", r)
}
}()
handler(msg.Message)
},
)
if err != nil && ctx.Err() == nil {
m.logger.Error("Subscription error", "channel", channel, "error", err)
} else if ctx.Err() != nil {
m.logger.Info("Subscription cancelled", "channel", channel)
}
m.mu.Lock()
delete(m.subscriptions, channel)
m.mu.Unlock()
}

76
backend/internal/util/cache/utils.go vendored Normal file
View File

@@ -0,0 +1,76 @@
package cache_utils
import (
"context"
"encoding/json"
"time"
"github.com/valkey-io/valkey-go"
)
const (
DefaultCacheTimeout = 10 * time.Second
DefaultCacheExpiry = 10 * time.Minute
DefaultQueueTimeout = 30 * time.Second
)
type CacheUtil[T any] struct {
client valkey.Client
prefix string
timeout time.Duration
expiry time.Duration
}
func NewCacheUtil[T any](client valkey.Client, prefix string) *CacheUtil[T] {
return &CacheUtil[T]{
client: client,
prefix: prefix,
timeout: DefaultCacheTimeout,
expiry: DefaultCacheExpiry,
}
}
func (c *CacheUtil[T]) Get(key string) *T {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()
fullKey := c.prefix + key
result := c.client.Do(ctx, c.client.B().Get().Key(fullKey).Build())
if result.Error() != nil {
return nil
}
data, err := result.AsBytes()
if err != nil {
return nil
}
var item T
if err := json.Unmarshal(data, &item); err != nil {
return nil
}
return &item
}
func (c *CacheUtil[T]) Set(key string, item *T) {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()
data, err := json.Marshal(item)
if err != nil {
return
}
fullKey := c.prefix + key
c.client.Do(ctx, c.client.B().Set().Key(fullKey).Value(string(data)).Ex(c.expiry).Build())
}
func (c *CacheUtil[T]) Invalidate(key string) {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()
fullKey := c.prefix + key
c.client.Do(ctx, c.client.B().Del().Key(fullKey).Build())
}