Compare commits

...

15 Commits

Author SHA1 Message Date
Rostislav Dugin
3782846872 Merge pull request #251 from databasus/develop
FIX (tidy): Run go mod tidy
2026-01-12 11:32:25 +03:00
Rostislav Dugin
245a81897f FIX (tidy): Run go mod tidy 2026-01-12 11:31:52 +03:00
Rostislav Dugin
5cbc0773b6 Merge pull request #250 from databasus/develop
FEATURE (backups): Move backups cancellation to Valkey pub\sub
2026-01-12 11:26:29 +03:00
Rostislav Dugin
997fc01442 FEATURE (backups): Move backups cancellation to Valkey pub\sub 2026-01-12 11:24:25 +03:00
Rostislav Dugin
6d0ae32d0c Merge pull request #240 from databasus/develop
FIX (oauth): Enable GitHub and Google OAuth
2026-01-10 20:15:43 +03:00
Rostislav Dugin
011985d723 FIX (oauth): Enable GitHub and Google OAuth 2026-01-10 19:19:37 +03:00
Rostislav Dugin
d677ee61de Merge pull request #239 from databasus/develop
FIX (mariadb): --skip-ssl-verify-server-cert for mariadb / mysql
2026-01-10 18:34:58 +03:00
Rostislav Dugin
c6b8f6e87a Merge pull request #237 from wzzrd/bugfix/disable_mariadb_mysql_ssl_verify
--skip-ssl-verify-server-cert for mariadb
2026-01-10 18:33:45 +03:00
Maxim Burgerhout
2bb5f93d00 --skip-ssl-verify-server-cert for mariadb / mysql
This change adds the --skip-ssl-verify-server-cert flag to mariadb
database connections for both backups and restores. This errors when
trying to verify certificates during those procedures.
2026-01-10 15:50:09 +01:00
Rostislav Dugin
b91c150300 Merge pull request #236 from databasus/develop
Develop
2026-01-10 15:19:19 +03:00
Rostislav Dugin
12b119ce40 FIX (readme): Update readme 2026-01-10 15:16:25 +03:00
Rostislav Dugin
7c6f0ab4ba FIX (mysql\mariadb): Use custom TLS handler to skip verification instead of build-in 2026-01-10 15:13:47 +03:00
Rostislav Dugin
6d2db4b298 Merge pull request #232 from databasus/develop
Develop
2026-01-09 11:12:27 +03:00
Rostislav Dugin
6397423298 FIX (temp folder): Ensure permissions 0700 for temp folders to meet PG requirements for .pgpass ownership 2026-01-09 11:10:50 +03:00
Rostislav Dugin
3470aae8e3 FIX (mysql\mariadb): Remove PROCESS permission check before backup, because it is not mandatory for backup 2026-01-09 11:02:14 +03:00
32 changed files with 912 additions and 166 deletions

View File

@@ -17,7 +17,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: "1.24.4"
go-version: "1.24.9"
- name: Cache Go modules
uses: actions/cache@v4
@@ -134,7 +134,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: "1.24.4"
go-version: "1.24.9"
- name: Cache Go modules
uses: actions/cache@v4
@@ -221,6 +221,12 @@ jobs:
TEST_MONGODB_60_PORT=27060
TEST_MONGODB_70_PORT=27070
TEST_MONGODB_82_PORT=27082
# Valkey (cache)
VALKEY_HOST=localhost
VALKEY_PORT=6379
VALKEY_USERNAME=
VALKEY_PASSWORD=
VALKEY_IS_SSL=false
EOF
- name: Start test containers
@@ -233,6 +239,11 @@ jobs:
# Wait for main dev database
timeout 60 bash -c 'until docker exec dev-db pg_isready -h localhost -p 5437 -U postgres; do sleep 2; done'
# Wait for Valkey (cache)
echo "Waiting for Valkey..."
timeout 60 bash -c 'until docker exec dev-valkey valkey-cli ping 2>/dev/null | grep -q PONG; do sleep 2; done'
echo "Valkey is ready!"
# Wait for test databases
timeout 60 bash -c 'until nc -z localhost 5000; do sleep 2; done'
timeout 60 bash -c 'until nc -z localhost 5001; do sleep 2; done'

View File

@@ -22,7 +22,7 @@ RUN npm run build
# ========= BUILD BACKEND =========
# Backend build stage
FROM --platform=$BUILDPLATFORM golang:1.24.4 AS backend-build
FROM --platform=$BUILDPLATFORM golang:1.24.9 AS backend-build
# Make TARGET args available early so tools built here match the final image arch
ARG TARGETOS
@@ -123,6 +123,15 @@ RUN wget -qO- https://www.postgresql.org/media/keys/ACCC4CF8.asc | apt-key add -
apt-get install -y --no-install-recommends postgresql-17 && \
rm -rf /var/lib/apt/lists/*
# Install Valkey server from debian repository
# Valkey is only accessible internally (localhost) - not exposed outside container
RUN wget -O /usr/share/keyrings/greensec.github.io-valkey-debian.key https://greensec.github.io/valkey-debian/public.key && \
echo "deb [signed-by=/usr/share/keyrings/greensec.github.io-valkey-debian.key] https://greensec.github.io/valkey-debian/repo $(lsb_release -cs) main" \
> /etc/apt/sources.list.d/valkey-debian.list && \
apt-get update && \
apt-get install -y --no-install-recommends valkey && \
rm -rf /var/lib/apt/lists/*
# ========= Install rclone =========
RUN apt-get update && \
apt-get install -y --no-install-recommends rclone && \
@@ -245,7 +254,34 @@ PG_BIN="/usr/lib/postgresql/17/bin"
# Ensure proper ownership of data directory
echo "Setting up data directory permissions..."
mkdir -p /databasus-data/pgdata
mkdir -p /databasus-data/temp
mkdir -p /databasus-data/backups
chown -R postgres:postgres /databasus-data
chmod 700 /databasus-data/temp
# ========= Start Valkey (internal cache) =========
echo "Configuring Valkey cache..."
cat > /tmp/valkey.conf << 'VALKEY_CONFIG'
port 6379
bind 127.0.0.1
protected-mode yes
save ""
maxmemory 256mb
maxmemory-policy allkeys-lru
VALKEY_CONFIG
echo "Starting Valkey..."
valkey-server /tmp/valkey.conf &
VALKEY_PID=\$!
echo "Waiting for Valkey to be ready..."
for i in {1..30}; do
if valkey-cli ping >/dev/null 2>&1; then
echo "Valkey is ready!"
break
fi
sleep 1
done
# Initialize PostgreSQL if not already initialized
if [ ! -s "/databasus-data/pgdata/PG_VERSION" ]; then

View File

@@ -2,7 +2,7 @@
<img src="assets/logo.svg" alt="Databasus Logo" width="250"/>
<h3>Backup tool for PostgreSQL, MySQL and MongoDB</h3>
<p>Databasus is a free, open source and self-hosted tool to backup databases. Make backups with different storages (S3, Google Drive, FTP, etc.) and notifications about progress (Slack, Discord, Telegram, etc.). Previously known as Postgresus (see migration guide).</p>
<p>Databasus is a free, open source and self-hosted tool to backup databases (with focus on PostgreSQL). Make backups with different storages (S3, Google Drive, FTP, etc.) and notifications about progress (Slack, Discord, Telegram, etc.). Previously known as Postgresus (see migration guide).</p>
<!-- Badges -->
[![PostgreSQL](https://img.shields.io/badge/PostgreSQL-336791?logo=postgresql&logoColor=white)](https://www.postgresql.org/)

View File

@@ -11,6 +11,12 @@ DATABASE_URL=postgres://postgres:Q1234567@dev-db:5437/databasus?sslmode=disable
GOOSE_DRIVER=postgres
GOOSE_DBSTRING=postgres://postgres:Q1234567@dev-db:5437/databasus?sslmode=disable
GOOSE_MIGRATION_DIR=./migrations
# valkey
VALKEY_HOST=127.0.0.1
VALKEY_PORT=6379
VALKEY_USERNAME=
VALKEY_PASSWORD=
VALKEY_IS_SSL=false
# testing
# to get Google Drive env variables: add storage in UI and copy data from added storage here
TEST_GOOGLE_DRIVE_CLIENT_ID=

View File

@@ -10,4 +10,10 @@ DATABASE_URL=postgres://postgres:Q1234567@localhost:5437/databasus?sslmode=disab
# migrations
GOOSE_DRIVER=postgres
GOOSE_DBSTRING=postgres://postgres:Q1234567@localhost:5437/databasus?sslmode=disable
GOOSE_MIGRATION_DIR=./migrations
GOOSE_MIGRATION_DIR=./migrations
# valkey
VALKEY_HOST=127.0.0.1
VALKEY_PORT=6379
VALKEY_USERNAME=
VALKEY_PASSWORD=
VALKEY_IS_SSL=false

3
backend/.gitignore vendored
View File

@@ -17,4 +17,5 @@ ui/build/*
pgdata-for-restore/
temp/
cmd.exe
temp/
temp/
valkey-data/

View File

@@ -29,6 +29,7 @@ import (
users_middleware "databasus-backend/internal/features/users/middleware"
users_services "databasus-backend/internal/features/users/services"
workspaces_controllers "databasus-backend/internal/features/workspaces/controllers"
cache_utils "databasus-backend/internal/util/cache"
env_utils "databasus-backend/internal/util/env"
files_utils "databasus-backend/internal/util/files"
"databasus-backend/internal/util/logger"
@@ -52,10 +53,17 @@ import (
func main() {
log := logger.GetLogger()
cache_utils.TestCacheConnection()
err := cache_utils.ClearAllCache()
if err != nil {
log.Error("Failed to clear cache", "error", err)
os.Exit(1)
}
runMigrations(log)
// create directories that used for backups and restore
err := files_utils.EnsureDirectories([]string{
err = files_utils.EnsureDirectories([]string{
config.GetEnv().TempFolder,
config.GetEnv().DataFolder,
})
@@ -290,16 +298,13 @@ func generateSwaggerDocs(log *slog.Logger) {
func runMigrations(log *slog.Logger) {
log.Info("Running database migrations...")
cmd := exec.Command("goose", "up")
cmd := exec.Command("goose", "-dir", "./migrations", "up")
cmd.Env = append(
os.Environ(),
"GOOSE_DRIVER=postgres",
"GOOSE_DBSTRING="+config.GetEnv().DatabaseDsn,
)
// Set the working directory to where migrations are located
cmd.Dir = "./migrations"
output, err := cmd.CombinedOutput()
if err != nil {
log.Error("Failed to run migrations", "error", err, "output", string(output))

View File

@@ -19,6 +19,21 @@ services:
command: -p 5437
shm_size: 10gb
# Valkey for caching
dev-valkey:
image: valkey/valkey:9.0.1-alpine
ports:
- "${VALKEY_PORT:-6379}:6379"
volumes:
- ./valkey-data:/data
container_name: dev-valkey
healthcheck:
test: ["CMD", "valkey-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
start_period: 20s
# Test MinIO container
test-minio:
image: minio/minio:latest

View File

@@ -1,6 +1,6 @@
module databasus-backend
go 1.24.4
go 1.24.9
require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.20.0
@@ -25,6 +25,7 @@ require (
github.com/swaggo/files v1.0.1
github.com/swaggo/gin-swagger v1.6.0
github.com/swaggo/swag v1.16.4
github.com/valkey-io/valkey-go v1.0.70
go.mongodb.org/mongo-driver v1.17.6
golang.org/x/crypto v0.46.0
golang.org/x/time v0.14.0
@@ -269,7 +270,7 @@ require (
go.opentelemetry.io/otel/metric v1.38.0 // indirect
go.opentelemetry.io/otel/trace v1.38.0 // indirect
golang.org/x/arch v0.17.0 // indirect
golang.org/x/net v0.47.0 // indirect
golang.org/x/net v0.48.0 // indirect
golang.org/x/oauth2 v0.33.0
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.39.0 // indirect

View File

@@ -539,8 +539,8 @@ github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/ginkgo/v2 v2.17.3 h1:oJcvKpIb7/8uLpDDtnQuf18xVnwKp8DTD7DQ6gTd/MU=
github.com/onsi/ginkgo/v2 v2.17.3/go.mod h1:nP2DPOQoNsQmsVyv5rDA8JkXQoCs6goXIvr/PRJ1eCc=
github.com/onsi/gomega v1.37.0 h1:CdEG8g0S133B4OswTDC/5XPSzE1OeP29QOioj2PID2Y=
github.com/onsi/gomega v1.37.0/go.mod h1:8D9+Txp43QWKhM24yyOBEdpkzN8FvJyAwecBgsU4KU0=
github.com/onsi/gomega v1.38.3 h1:eTX+W6dobAYfFeGC2PV6RwXRu/MyT+cQguijutvkpSM=
github.com/onsi/gomega v1.38.3/go.mod h1:ZCU1pkQcXDO5Sl9/VVEGlDyp+zm0m1cmeG5TOzLgdh4=
github.com/oracle/oci-go-sdk/v65 v65.104.0 h1:l9awEvzWvxmYhy/97A0hZ87pa7BncYXmcO/S8+rvgK0=
github.com/oracle/oci-go-sdk/v65 v65.104.0/go.mod h1:oB8jFGVc/7/zJ+DbleE8MzGHjhs2ioCz5stRTdZdIcY=
github.com/panjf2000/ants/v2 v2.11.3 h1:AfI0ngBoXJmYOpDh9m516vjqoUu2sLrIVgppI9TZVpg=
@@ -660,6 +660,8 @@ github.com/ulikunitz/xz v0.5.15 h1:9DNdB5s+SgV3bQ2ApL10xRc35ck0DuIX/isZvIk+ubY=
github.com/ulikunitz/xz v0.5.15/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14=
github.com/unknwon/goconfig v1.0.0 h1:rS7O+CmUdli1T+oDm7fYj1MwqNWtEJfNj+FqcUHML8U=
github.com/unknwon/goconfig v1.0.0/go.mod h1:qu2ZQ/wcC/if2u32263HTVC39PeOQRSmidQk3DuDFQ8=
github.com/valkey-io/valkey-go v1.0.70 h1:mjYNT8qiazxDAJ0QNQ8twWT/YFOkOoRd40ERV2mB49Y=
github.com/valkey-io/valkey-go v1.0.70/go.mod h1:VGhZ6fs68Qrn2+OhH+6waZH27bjpgQOiLyUQyXuYK5k=
github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc=
github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw=
github.com/xanzy/ssh-agent v0.3.3 h1:+/15pJfg/RsTxqYcX6fHqOXZwwMP+2VyYWJeWM2qQFM=
@@ -720,6 +722,8 @@ go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU=
go.uber.org/mock v0.5.0/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM=
go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0=
go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8=
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
golang.org/x/arch v0.17.0 h1:4O3dfLzd+lQewptAHqjewQZQDyEdejz3VwgeYwkZneU=
golang.org/x/arch v0.17.0/go.mod h1:bdwinDaKcfZUGpH09BB7ZmOfhalA8lQdzl62l8gGWsk=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
@@ -818,8 +822,8 @@ golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY=
golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU=
golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU=
golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=

View File

@@ -79,6 +79,13 @@ type EnvVariables struct {
TestMongodb70Port string `env:"TEST_MONGODB_70_PORT"`
TestMongodb82Port string `env:"TEST_MONGODB_82_PORT"`
// Valkey
ValkeyHost string `env:"VALKEY_HOST" required:"true"`
ValkeyPort string `env:"VALKEY_PORT" required:"true"`
ValkeyUsername string `env:"VALKEY_USERNAME"`
ValkeyPassword string `env:"VALKEY_PASSWORD"`
ValkeyIsSsl bool `env:"VALKEY_IS_SSL" required:"true"`
// oauth
GitHubClientID string `env:"GITHUB_CLIENT_ID"`
GitHubClientSecret string `env:"GITHUB_CLIENT_SECRET"`
@@ -189,6 +196,16 @@ func loadEnvVariables() {
env.MongodbInstallDir = filepath.Join(backendRoot, "tools", "mongodb")
tools.VerifyMongodbInstallation(log, env.EnvMode, env.MongodbInstallDir)
// Valkey
if env.ValkeyHost == "" {
log.Error("VALKEY_HOST is empty")
os.Exit(1)
}
if env.ValkeyPort == "" {
log.Error("VALKEY_PORT is empty")
os.Exit(1)
}
// Store the data and temp folders one level below the root
// (projectRoot/databasus-data -> /databasus-data)
env.DataFolder = filepath.Join(filepath.Dir(backendRoot), "databasus-data", "backups")

View File

@@ -75,6 +75,9 @@ func Test_MakeBackupForDbHavingBackupDayAgo_BackupCreated(t *testing.T) {
backups, err := backupRepository.FindByDatabaseID(database.ID)
assert.NoError(t, err)
assert.Len(t, backups, 2)
// Wait for any cleanup operations to complete before defer cleanup runs
time.Sleep(200 * time.Millisecond)
}
func Test_MakeBackupForDbHavingHourAgoBackup_BackupSkipped(t *testing.T) {
@@ -135,6 +138,9 @@ func Test_MakeBackupForDbHavingHourAgoBackup_BackupSkipped(t *testing.T) {
backups, err := backupRepository.FindByDatabaseID(database.ID)
assert.NoError(t, err)
assert.Len(t, backups, 1) // Should still be 1 backup, no new backup created
// Wait for any cleanup operations to complete before defer cleanup runs
time.Sleep(200 * time.Millisecond)
}
func Test_MakeBackupHavingFailedBackupWithoutRetries_BackupSkipped(t *testing.T) {
@@ -199,6 +205,9 @@ func Test_MakeBackupHavingFailedBackupWithoutRetries_BackupSkipped(t *testing.T)
backups, err := backupRepository.FindByDatabaseID(database.ID)
assert.NoError(t, err)
assert.Len(t, backups, 1) // Should still be 1 backup, no retry attempted
// Wait for any cleanup operations to complete before defer cleanup runs
time.Sleep(200 * time.Millisecond)
}
func Test_MakeBackupHavingFailedBackupWithRetries_BackupCreated(t *testing.T) {
@@ -264,6 +273,9 @@ func Test_MakeBackupHavingFailedBackupWithRetries_BackupCreated(t *testing.T) {
backups, err := backupRepository.FindByDatabaseID(database.ID)
assert.NoError(t, err)
assert.Len(t, backups, 2) // Should have 2 backups, retry was attempted
// Wait for any cleanup operations to complete before defer cleanup runs
time.Sleep(200 * time.Millisecond)
}
func Test_MakeBackupHavingFailedBackupWithRetries_RetriesCountNotExceeded(t *testing.T) {
@@ -330,6 +342,9 @@ func Test_MakeBackupHavingFailedBackupWithRetries_RetriesCountNotExceeded(t *tes
backups, err := backupRepository.FindByDatabaseID(database.ID)
assert.NoError(t, err)
assert.Len(t, backups, 3) // Should have 3 backups, not more than max
// Wait for any cleanup operations to complete before defer cleanup runs
time.Sleep(200 * time.Millisecond)
}
func Test_MakeBackgroundBackupWhenBakupsDisabled_BackupSkipped(t *testing.T) {
@@ -386,4 +401,7 @@ func Test_MakeBackgroundBackupWhenBakupsDisabled_BackupSkipped(t *testing.T) {
backups, err := backupRepository.FindByDatabaseID(database.ID)
assert.NoError(t, err)
assert.Len(t, backups, 1)
// Wait for any cleanup operations to complete before defer cleanup runs
time.Sleep(200 * time.Millisecond)
}

View File

@@ -2,21 +2,57 @@ package backups
import (
"context"
cache_utils "databasus-backend/internal/util/cache"
"databasus-backend/internal/util/logger"
"log/slog"
"sync"
"github.com/google/uuid"
)
const backupCancelChannel = "backup:cancel"
type BackupContextManager struct {
mu sync.RWMutex
cancelFuncs map[uuid.UUID]context.CancelFunc
cancelledBackups map[uuid.UUID]bool
mu sync.RWMutex
cancelFuncs map[uuid.UUID]context.CancelFunc
pubsub *cache_utils.PubSubManager
logger *slog.Logger
}
func NewBackupContextManager() *BackupContextManager {
return &BackupContextManager{
cancelFuncs: make(map[uuid.UUID]context.CancelFunc),
cancelledBackups: make(map[uuid.UUID]bool),
cancelFuncs: make(map[uuid.UUID]context.CancelFunc),
pubsub: cache_utils.NewPubSubManager(),
logger: logger.GetLogger(),
}
}
func (m *BackupContextManager) StartSubscription() {
ctx := context.Background()
handler := func(message string) {
backupID, err := uuid.Parse(message)
if err != nil {
m.logger.Error("Invalid backup ID in cancel message", "message", message, "error", err)
return
}
m.mu.Lock()
defer m.mu.Unlock()
cancelFunc, exists := m.cancelFuncs[backupID]
if exists {
cancelFunc()
delete(m.cancelFuncs, backupID)
m.logger.Info("Cancelled backup via Pub/Sub", "backupID", backupID)
}
}
err := m.pubsub.Subscribe(ctx, backupCancelChannel, handler)
if err != nil {
m.logger.Error("Failed to subscribe to backup cancel channel", "error", err)
} else {
m.logger.Info("Successfully subscribed to backup cancel channel")
}
}
@@ -24,37 +60,25 @@ func (m *BackupContextManager) RegisterBackup(backupID uuid.UUID, cancelFunc con
m.mu.Lock()
defer m.mu.Unlock()
m.cancelFuncs[backupID] = cancelFunc
delete(m.cancelledBackups, backupID)
m.logger.Debug("Registered backup", "backupID", backupID)
}
func (m *BackupContextManager) CancelBackup(backupID uuid.UUID) error {
m.mu.Lock()
defer m.mu.Unlock()
ctx := context.Background()
if m.cancelledBackups[backupID] {
return nil
err := m.pubsub.Publish(ctx, backupCancelChannel, backupID.String())
if err != nil {
m.logger.Error("Failed to publish cancel message", "backupID", backupID, "error", err)
return err
}
cancelFunc, exists := m.cancelFuncs[backupID]
if exists {
cancelFunc()
delete(m.cancelFuncs, backupID)
}
m.cancelledBackups[backupID] = true
m.logger.Info("Published backup cancel message", "backupID", backupID)
return nil
}
func (m *BackupContextManager) IsCancelled(backupID uuid.UUID) bool {
m.mu.RLock()
defer m.mu.RUnlock()
return m.cancelledBackups[backupID]
}
func (m *BackupContextManager) UnregisterBackup(backupID uuid.UUID) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.cancelFuncs, backupID)
delete(m.cancelledBackups, backupID)
m.logger.Debug("Unregistered backup", "backupID", backupID)
}

View File

@@ -0,0 +1,200 @@
package backups
import (
"context"
"sync"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
)
func Test_RegisterBackup_BackupRegisteredSuccessfully(t *testing.T) {
manager := NewBackupContextManager()
backupID := uuid.New()
_, cancel := context.WithCancel(context.Background())
defer cancel()
manager.RegisterBackup(backupID, cancel)
manager.mu.RLock()
_, exists := manager.cancelFuncs[backupID]
manager.mu.RUnlock()
assert.True(t, exists, "Backup should be registered")
}
func Test_UnregisterBackup_BackupUnregisteredSuccessfully(t *testing.T) {
manager := NewBackupContextManager()
backupID := uuid.New()
_, cancel := context.WithCancel(context.Background())
defer cancel()
manager.RegisterBackup(backupID, cancel)
manager.UnregisterBackup(backupID)
manager.mu.RLock()
_, exists := manager.cancelFuncs[backupID]
manager.mu.RUnlock()
assert.False(t, exists, "Backup should be unregistered")
}
func Test_CancelBackup_OnSameInstance_BackupCancelledViaPubSub(t *testing.T) {
manager := NewBackupContextManager()
backupID := uuid.New()
ctx, cancel := context.WithCancel(context.Background())
cancelled := false
var mu sync.Mutex
wrappedCancel := func() {
mu.Lock()
cancelled = true
mu.Unlock()
cancel()
}
manager.RegisterBackup(backupID, wrappedCancel)
manager.StartSubscription()
time.Sleep(100 * time.Millisecond)
err := manager.CancelBackup(backupID)
assert.NoError(t, err, "Cancel should not return error")
time.Sleep(500 * time.Millisecond)
mu.Lock()
wasCancelled := cancelled
mu.Unlock()
assert.True(t, wasCancelled, "Cancel function should have been called")
assert.Error(t, ctx.Err(), "Context should be cancelled")
}
func Test_CancelBackup_FromDifferentInstance_BackupCancelledOnRunningInstance(t *testing.T) {
manager1 := NewBackupContextManager()
manager2 := NewBackupContextManager()
backupID := uuid.New()
ctx, cancel := context.WithCancel(context.Background())
cancelled := false
var mu sync.Mutex
wrappedCancel := func() {
mu.Lock()
cancelled = true
mu.Unlock()
cancel()
}
manager1.RegisterBackup(backupID, wrappedCancel)
manager1.StartSubscription()
manager2.StartSubscription()
time.Sleep(100 * time.Millisecond)
err := manager2.CancelBackup(backupID)
assert.NoError(t, err, "Cancel should not return error")
time.Sleep(500 * time.Millisecond)
mu.Lock()
wasCancelled := cancelled
mu.Unlock()
assert.True(t, wasCancelled, "Cancel function should have been called on instance 1")
assert.Error(t, ctx.Err(), "Context should be cancelled")
}
func Test_CancelBackup_WhenBackupDoesNotExist_NoErrorReturned(t *testing.T) {
manager := NewBackupContextManager()
manager.StartSubscription()
time.Sleep(100 * time.Millisecond)
nonExistentID := uuid.New()
err := manager.CancelBackup(nonExistentID)
assert.NoError(t, err, "Cancelling non-existent backup should not error")
}
func Test_CancelBackup_WithMultipleBackups_AllBackupsCancelled(t *testing.T) {
manager := NewBackupContextManager()
numBackups := 5
backupIDs := make([]uuid.UUID, numBackups)
contexts := make([]context.Context, numBackups)
cancels := make([]context.CancelFunc, numBackups)
cancelledFlags := make([]bool, numBackups)
var mu sync.Mutex
for i := 0; i < numBackups; i++ {
backupIDs[i] = uuid.New()
contexts[i], cancels[i] = context.WithCancel(context.Background())
idx := i
wrappedCancel := func() {
mu.Lock()
cancelledFlags[idx] = true
mu.Unlock()
cancels[idx]()
}
manager.RegisterBackup(backupIDs[i], wrappedCancel)
}
manager.StartSubscription()
time.Sleep(100 * time.Millisecond)
for i := 0; i < numBackups; i++ {
err := manager.CancelBackup(backupIDs[i])
assert.NoError(t, err, "Cancel should not return error")
}
time.Sleep(1 * time.Second)
mu.Lock()
for i := 0; i < numBackups; i++ {
assert.True(t, cancelledFlags[i], "Backup %d should be cancelled", i)
assert.Error(t, contexts[i].Err(), "Context %d should be cancelled", i)
}
mu.Unlock()
}
func Test_CancelBackup_AfterUnregister_BackupNotCancelled(t *testing.T) {
manager := NewBackupContextManager()
backupID := uuid.New()
_, cancel := context.WithCancel(context.Background())
defer cancel()
cancelled := false
var mu sync.Mutex
wrappedCancel := func() {
mu.Lock()
cancelled = true
mu.Unlock()
cancel()
}
manager.RegisterBackup(backupID, wrappedCancel)
manager.StartSubscription()
time.Sleep(100 * time.Millisecond)
manager.UnregisterBackup(backupID)
err := manager.CancelBackup(backupID)
assert.NoError(t, err, "Cancel should not return error")
time.Sleep(500 * time.Millisecond)
mu.Lock()
wasCancelled := cancelled
mu.Unlock()
assert.False(t, wasCancelled, "Cancel function should not be called after unregister")
}

View File

@@ -58,6 +58,8 @@ func SetupDependencies() {
databases.GetDatabaseService().AddDbRemoveListener(backupService)
databases.GetDatabaseService().AddDbCopyListener(backups_config.GetBackupConfigService())
backupContextManager.StartSubscription()
}
func GetBackupService() *BackupService {

View File

@@ -122,6 +122,7 @@ func (uc *CreateMariadbBackupUsecase) buildMariadbDumpArgs(
if mdb.IsHttps {
args = append(args, "--ssl")
args = append(args, "--skip-ssl-verify-server-cert")
}
if mdb.Database != nil && *mdb.Database != "" {
@@ -265,11 +266,24 @@ func (uc *CreateMariadbBackupUsecase) createTempMyCnfFile(
mdbConfig *mariadbtypes.MariadbDatabase,
password string,
) (string, error) {
tempDir, err := os.MkdirTemp(config.GetEnv().TempFolder, "mycnf_"+uuid.New().String())
tempFolder := config.GetEnv().TempFolder
if err := os.MkdirAll(tempFolder, 0700); err != nil {
return "", fmt.Errorf("failed to ensure temp folder exists: %w", err)
}
if err := os.Chmod(tempFolder, 0700); err != nil {
return "", fmt.Errorf("failed to set temp folder permissions: %w", err)
}
tempDir, err := os.MkdirTemp(tempFolder, "mycnf_"+uuid.New().String())
if err != nil {
return "", fmt.Errorf("failed to create temp directory: %w", err)
}
if err := os.Chmod(tempDir, 0700); err != nil {
_ = os.RemoveAll(tempDir)
return "", fmt.Errorf("failed to set temp directory permissions: %w", err)
}
myCnfFile := filepath.Join(tempDir, ".my.cnf")
content := fmt.Sprintf(`[client]
@@ -287,6 +301,7 @@ port=%d
err = os.WriteFile(myCnfFile, []byte(content), 0600)
if err != nil {
_ = os.RemoveAll(tempDir)
return "", fmt.Errorf("failed to write .my.cnf: %w", err)
}

View File

@@ -280,11 +280,24 @@ func (uc *CreateMysqlBackupUsecase) createTempMyCnfFile(
myConfig *mysqltypes.MysqlDatabase,
password string,
) (string, error) {
tempDir, err := os.MkdirTemp(config.GetEnv().TempFolder, "mycnf_"+uuid.New().String())
tempFolder := config.GetEnv().TempFolder
if err := os.MkdirAll(tempFolder, 0700); err != nil {
return "", fmt.Errorf("failed to ensure temp folder exists: %w", err)
}
if err := os.Chmod(tempFolder, 0700); err != nil {
return "", fmt.Errorf("failed to set temp folder permissions: %w", err)
}
tempDir, err := os.MkdirTemp(tempFolder, "mycnf_"+uuid.New().String())
if err != nil {
return "", fmt.Errorf("failed to create temp directory: %w", err)
}
if err := os.Chmod(tempDir, 0700); err != nil {
_ = os.RemoveAll(tempDir)
return "", fmt.Errorf("failed to set temp directory permissions: %w", err)
}
myCnfFile := filepath.Join(tempDir, ".my.cnf")
content := fmt.Sprintf(`[client]
@@ -300,6 +313,7 @@ port=%d
err = os.WriteFile(myCnfFile, []byte(content), 0600)
if err != nil {
_ = os.RemoveAll(tempDir)
return "", fmt.Errorf("failed to write .my.cnf: %w", err)
}

View File

@@ -757,14 +757,28 @@ func (uc *CreatePostgresqlBackupUsecase) createTempPgpassFile(
escapedPassword,
)
tempDir, err := os.MkdirTemp(config.GetEnv().TempFolder, "pgpass_"+uuid.New().String())
tempFolder := config.GetEnv().TempFolder
if err := os.MkdirAll(tempFolder, 0700); err != nil {
return "", fmt.Errorf("failed to ensure temp folder exists: %w", err)
}
if err := os.Chmod(tempFolder, 0700); err != nil {
return "", fmt.Errorf("failed to set temp folder permissions: %w", err)
}
tempDir, err := os.MkdirTemp(tempFolder, "pgpass_"+uuid.New().String())
if err != nil {
return "", fmt.Errorf("failed to create temporary directory: %w", err)
}
if err := os.Chmod(tempDir, 0700); err != nil {
_ = os.RemoveAll(tempDir)
return "", fmt.Errorf("failed to set temporary directory permissions: %w", err)
}
pgpassFile := filepath.Join(tempDir, ".pgpass")
err = os.WriteFile(pgpassFile, []byte(pgpassContent), 0600)
if err != nil {
_ = os.RemoveAll(tempDir)
return "", fmt.Errorf("failed to write temporary .pgpass file: %w", err)
}

View File

@@ -2,6 +2,7 @@ package mariadb
import (
"context"
"crypto/tls"
"database/sql"
"errors"
"fmt"
@@ -14,7 +15,7 @@ import (
"databasus-backend/internal/util/encryption"
"databasus-backend/internal/util/tools"
_ "github.com/go-sql-driver/mysql"
"github.com/go-sql-driver/mysql"
"github.com/google/uuid"
)
@@ -398,8 +399,16 @@ func HasPrivilege(privileges, priv string) bool {
func (m *MariadbDatabase) buildDSN(password string, database string) string {
tlsConfig := "false"
if m.IsHttps {
tlsConfig = "skip-verify"
err := mysql.RegisterTLSConfig("mariadb-skip-verify", &tls.Config{
InsecureSkipVerify: true,
})
if err != nil {
// Config might already be registered, which is fine
_ = err
}
tlsConfig = "mariadb-skip-verify"
}
return fmt.Sprintf(
@@ -562,9 +571,9 @@ func detectPrivileges(ctx context.Context, db *sql.DB, database string) (string,
}
// checkBackupPermissions verifies the user has sufficient privileges for mariadb-dump backup.
// Required: SELECT, SHOW VIEW, PROCESS. Optional: LOCK TABLES, TRIGGER, EVENT.
// Required: SELECT, SHOW VIEW
func checkBackupPermissions(privileges string) error {
requiredPrivileges := []string{"SELECT", "SHOW VIEW", "PROCESS"}
requiredPrivileges := []string{"SELECT", "SHOW VIEW"}
var missingPrivileges []string
for _, priv := range requiredPrivileges {
@@ -575,7 +584,7 @@ func checkBackupPermissions(privileges string) error {
if len(missingPrivileges) > 0 {
return fmt.Errorf(
"insufficient permissions for backup. Missing: %s. Required: SELECT, SHOW VIEW, PROCESS",
"insufficient permissions for backup. Missing: %s. Required: SELECT, SHOW VIEW",
strings.Join(missingPrivileges, ", "),
)
}

View File

@@ -2,6 +2,7 @@ package mysql
import (
"context"
"crypto/tls"
"database/sql"
"errors"
"fmt"
@@ -14,7 +15,7 @@ import (
"databasus-backend/internal/util/encryption"
"databasus-backend/internal/util/tools"
_ "github.com/go-sql-driver/mysql"
"github.com/go-sql-driver/mysql"
"github.com/google/uuid"
)
@@ -399,8 +400,17 @@ func HasPrivilege(privileges, priv string) bool {
func (m *MysqlDatabase) buildDSN(password string, database string) string {
tlsConfig := "false"
if m.IsHttps {
tlsConfig = "skip-verify"
err := mysql.RegisterTLSConfig("mysql-skip-verify", &tls.Config{
InsecureSkipVerify: true,
})
if err != nil {
// Config might already be registered, which is fine
_ = err
}
tlsConfig = "mysql-skip-verify"
}
return fmt.Sprintf(
@@ -532,9 +542,9 @@ func detectPrivileges(ctx context.Context, db *sql.DB, database string) (string,
}
// checkBackupPermissions verifies the user has sufficient privileges for mysqldump backup.
// Required: SELECT, SHOW VIEW, PROCESS. Optional: LOCK TABLES, TRIGGER, EVENT.
// Required: SELECT, SHOW VIEW
func checkBackupPermissions(privileges string) error {
requiredPrivileges := []string{"SELECT", "SHOW VIEW", "PROCESS"}
requiredPrivileges := []string{"SELECT", "SHOW VIEW"}
var missingPrivileges []string
for _, priv := range requiredPrivileges {
@@ -545,7 +555,7 @@ func checkBackupPermissions(privileges string) error {
if len(missingPrivileges) > 0 {
return fmt.Errorf(
"insufficient permissions for backup. Missing: %s. Required: SELECT, SHOW VIEW, PROCESS",
"insufficient permissions for backup. Missing: %s. Required: SELECT, SHOW VIEW",
strings.Join(missingPrivileges, ", "),
)
}

View File

@@ -71,6 +71,7 @@ func (uc *RestoreMariadbBackupUsecase) Execute(
if mdb.IsHttps {
args = append(args, "--ssl")
args = append(args, "--skip-ssl-verify-server-cert")
}
if mdb.Database != nil && *mdb.Database != "" {
@@ -265,11 +266,24 @@ func (uc *RestoreMariadbBackupUsecase) createTempMyCnfFile(
mdbConfig *mariadbtypes.MariadbDatabase,
password string,
) (string, error) {
tempDir, err := os.MkdirTemp(config.GetEnv().TempFolder, "mycnf_"+uuid.New().String())
tempFolder := config.GetEnv().TempFolder
if err := os.MkdirAll(tempFolder, 0700); err != nil {
return "", fmt.Errorf("failed to ensure temp folder exists: %w", err)
}
if err := os.Chmod(tempFolder, 0700); err != nil {
return "", fmt.Errorf("failed to set temp folder permissions: %w", err)
}
tempDir, err := os.MkdirTemp(tempFolder, "mycnf_"+uuid.New().String())
if err != nil {
return "", fmt.Errorf("failed to create temp directory: %w", err)
}
if err := os.Chmod(tempDir, 0700); err != nil {
_ = os.RemoveAll(tempDir)
return "", fmt.Errorf("failed to set temp directory permissions: %w", err)
}
myCnfFile := filepath.Join(tempDir, ".my.cnf")
content := fmt.Sprintf(`[client]
@@ -287,6 +301,7 @@ port=%d
err = os.WriteFile(myCnfFile, []byte(content), 0600)
if err != nil {
_ = os.RemoveAll(tempDir)
return "", fmt.Errorf("failed to write .my.cnf: %w", err)
}

View File

@@ -257,11 +257,24 @@ func (uc *RestoreMysqlBackupUsecase) createTempMyCnfFile(
myConfig *mysqltypes.MysqlDatabase,
password string,
) (string, error) {
tempDir, err := os.MkdirTemp(config.GetEnv().TempFolder, "mycnf_"+uuid.New().String())
tempFolder := config.GetEnv().TempFolder
if err := os.MkdirAll(tempFolder, 0700); err != nil {
return "", fmt.Errorf("failed to ensure temp folder exists: %w", err)
}
if err := os.Chmod(tempFolder, 0700); err != nil {
return "", fmt.Errorf("failed to set temp folder permissions: %w", err)
}
tempDir, err := os.MkdirTemp(tempFolder, "mycnf_"+uuid.New().String())
if err != nil {
return "", fmt.Errorf("failed to create temp directory: %w", err)
}
if err := os.Chmod(tempDir, 0700); err != nil {
_ = os.RemoveAll(tempDir)
return "", fmt.Errorf("failed to set temp directory permissions: %w", err)
}
myCnfFile := filepath.Join(tempDir, ".my.cnf")
content := fmt.Sprintf(`[client]
@@ -277,6 +290,7 @@ port=%d
err = os.WriteFile(myCnfFile, []byte(content), 0600)
if err != nil {
_ = os.RemoveAll(tempDir)
return "", fmt.Errorf("failed to write .my.cnf: %w", err)
}

View File

@@ -925,14 +925,28 @@ func (uc *RestorePostgresqlBackupUsecase) createTempPgpassFile(
escapedPassword,
)
tempDir, err := os.MkdirTemp(config.GetEnv().TempFolder, "pgpass_"+uuid.New().String())
tempFolder := config.GetEnv().TempFolder
if err := os.MkdirAll(tempFolder, 0700); err != nil {
return "", fmt.Errorf("failed to ensure temp folder exists: %w", err)
}
if err := os.Chmod(tempFolder, 0700); err != nil {
return "", fmt.Errorf("failed to set temp folder permissions: %w", err)
}
tempDir, err := os.MkdirTemp(tempFolder, "pgpass_"+uuid.New().String())
if err != nil {
return "", fmt.Errorf("failed to create temporary directory: %w", err)
}
if err := os.Chmod(tempDir, 0700); err != nil {
_ = os.RemoveAll(tempDir)
return "", fmt.Errorf("failed to set temporary directory permissions: %w", err)
}
pgpassFile := filepath.Join(tempDir, ".pgpass")
err = os.WriteFile(pgpassFile, []byte(pgpassContent), 0600)
if err != nil {
_ = os.RemoveAll(tempDir)
return "", fmt.Errorf("failed to write temporary .pgpass file: %w", err)
}

121
backend/internal/util/cache/cache.go vendored Normal file
View File

@@ -0,0 +1,121 @@
package cache_utils
import (
"context"
"crypto/tls"
"databasus-backend/internal/config"
"sync"
"github.com/valkey-io/valkey-go"
)
var (
once sync.Once
valkeyClient valkey.Client
)
func getCache() valkey.Client {
once.Do(func() {
env := config.GetEnv()
options := valkey.ClientOption{
InitAddress: []string{env.ValkeyHost + ":" + env.ValkeyPort},
Password: env.ValkeyPassword,
Username: env.ValkeyUsername,
}
if env.ValkeyIsSsl {
options.TLSConfig = &tls.Config{
ServerName: env.ValkeyHost,
}
}
client, err := valkey.NewClient(options)
if err != nil {
panic(err)
}
valkeyClient = client
})
return valkeyClient
}
func TestCacheConnection() {
// Get Valkey client from cache package
client := getCache()
// Create a simple test cache util for strings
cacheUtil := NewCacheUtil[string](client, "test:")
// Test data
testKey := "connection_test"
testValue := "valkey_is_working"
// Test Set operation
cacheUtil.Set(testKey, &testValue)
// Test Get operation
retrievedValue := cacheUtil.Get(testKey)
// Verify the value was retrieved correctly
if retrievedValue == nil {
panic("Cache test failed: could not retrieve cached value")
}
if *retrievedValue != testValue {
panic("Cache test failed: retrieved value does not match expected")
}
// Clean up - remove test key
cacheUtil.Invalidate(testKey)
// Verify cleanup worked
cleanupCheck := cacheUtil.Get(testKey)
if cleanupCheck != nil {
panic("Cache test failed: test key was not properly invalidated")
}
}
func ClearAllCache() error {
pattern := "*"
cursor := uint64(0)
batchSize := int64(100)
cacheUtil := NewCacheUtil[string](getCache(), "")
for {
ctx, cancel := context.WithTimeout(context.Background(), DefaultQueueTimeout)
result := cacheUtil.client.Do(
ctx,
cacheUtil.client.B().Scan().Cursor(cursor).Match(pattern).Count(batchSize).Build(),
)
cancel()
if result.Error() != nil {
return result.Error()
}
scanResult, err := result.AsScanEntry()
if err != nil {
return err
}
if len(scanResult.Elements) > 0 {
delCtx, delCancel := context.WithTimeout(context.Background(), cacheUtil.timeout)
cacheUtil.client.Do(
delCtx,
cacheUtil.client.B().Del().Key(scanResult.Elements...).Build(),
)
delCancel()
}
cursor = scanResult.Cursor
if cursor == 0 {
break
}
}
return nil
}

109
backend/internal/util/cache/pubsub.go vendored Normal file
View File

@@ -0,0 +1,109 @@
package cache_utils
import (
"context"
"fmt"
"log/slog"
"sync"
"databasus-backend/internal/util/logger"
"github.com/valkey-io/valkey-go"
)
type PubSubManager struct {
client valkey.Client
subscriptions map[string]context.CancelFunc
mu sync.RWMutex
logger *slog.Logger
}
func NewPubSubManager() *PubSubManager {
return &PubSubManager{
client: getCache(),
subscriptions: make(map[string]context.CancelFunc),
logger: logger.GetLogger(),
}
}
func (m *PubSubManager) Subscribe(
ctx context.Context,
channel string,
handler func(message string),
) error {
m.mu.Lock()
if _, exists := m.subscriptions[channel]; exists {
m.mu.Unlock()
return fmt.Errorf("already subscribed to channel: %s", channel)
}
subCtx, cancel := context.WithCancel(ctx)
m.subscriptions[channel] = cancel
m.mu.Unlock()
go m.subscriptionLoop(subCtx, channel, handler)
return nil
}
func (m *PubSubManager) Publish(ctx context.Context, channel string, message string) error {
cmd := m.client.B().Publish().Channel(channel).Message(message).Build()
result := m.client.Do(ctx, cmd)
if err := result.Error(); err != nil {
m.logger.Error("Failed to publish message to Redis", "channel", channel, "error", err)
return fmt.Errorf("failed to publish message: %w", err)
}
return nil
}
func (m *PubSubManager) Close() error {
m.mu.Lock()
defer m.mu.Unlock()
for channel, cancel := range m.subscriptions {
cancel()
delete(m.subscriptions, channel)
}
return nil
}
func (m *PubSubManager) subscriptionLoop(
ctx context.Context,
channel string,
handler func(message string),
) {
defer func() {
if r := recover(); r != nil {
m.logger.Error("Panic in subscription handler", "channel", channel, "panic", r)
}
}()
m.logger.Info("Starting subscription", "channel", channel)
err := m.client.Receive(
ctx,
m.client.B().Subscribe().Channel(channel).Build(),
func(msg valkey.PubSubMessage) {
defer func() {
if r := recover(); r != nil {
m.logger.Error("Panic in message handler", "channel", channel, "panic", r)
}
}()
handler(msg.Message)
},
)
if err != nil && ctx.Err() == nil {
m.logger.Error("Subscription error", "channel", channel, "error", err)
} else if ctx.Err() != nil {
m.logger.Info("Subscription cancelled", "channel", channel)
}
m.mu.Lock()
delete(m.subscriptions, channel)
m.mu.Unlock()
}

76
backend/internal/util/cache/utils.go vendored Normal file
View File

@@ -0,0 +1,76 @@
package cache_utils
import (
"context"
"encoding/json"
"time"
"github.com/valkey-io/valkey-go"
)
const (
DefaultCacheTimeout = 10 * time.Second
DefaultCacheExpiry = 10 * time.Minute
DefaultQueueTimeout = 30 * time.Second
)
type CacheUtil[T any] struct {
client valkey.Client
prefix string
timeout time.Duration
expiry time.Duration
}
func NewCacheUtil[T any](client valkey.Client, prefix string) *CacheUtil[T] {
return &CacheUtil[T]{
client: client,
prefix: prefix,
timeout: DefaultCacheTimeout,
expiry: DefaultCacheExpiry,
}
}
func (c *CacheUtil[T]) Get(key string) *T {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()
fullKey := c.prefix + key
result := c.client.Do(ctx, c.client.B().Get().Key(fullKey).Build())
if result.Error() != nil {
return nil
}
data, err := result.AsBytes()
if err != nil {
return nil
}
var item T
if err := json.Unmarshal(data, &item); err != nil {
return nil
}
return &item
}
func (c *CacheUtil[T]) Set(key string, item *T) {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()
data, err := json.Marshal(item)
if err != nil {
return
}
fullKey := c.prefix + key
c.client.Do(ctx, c.client.B().Set().Key(fullKey).Value(string(data)).Ex(c.expiry).Build())
}
func (c *CacheUtil[T]) Invalidate(key string) {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()
fullKey := c.prefix + key
c.client.Do(ctx, c.client.B().Del().Key(fullKey).Build())
}

View File

@@ -40,7 +40,3 @@ export const GOOGLE_CLIENT_ID =
export function getOAuthRedirectUri(): string {
return `${window.location.origin}/auth/callback`;
}
export function isOAuthEnabled(): boolean {
return IS_CLOUD && (!!GITHUB_CLIENT_ID || !!GOOGLE_CLIENT_ID);
}

View File

@@ -1,96 +0,0 @@
import { GithubOutlined, GoogleOutlined } from '@ant-design/icons';
import { Button, message } from 'antd';
import {
GITHUB_CLIENT_ID,
GOOGLE_CLIENT_ID,
getOAuthRedirectUri,
isOAuthEnabled,
} from '../../../constants';
export function OauthComponent() {
if (!isOAuthEnabled()) {
return null;
}
const redirectUri = getOAuthRedirectUri();
const handleGitHubLogin = () => {
if (!GITHUB_CLIENT_ID) {
message.error('GitHub OAuth is not configured');
return;
}
try {
const params = new URLSearchParams({
client_id: GITHUB_CLIENT_ID,
redirect_uri: redirectUri,
state: 'github',
scope: 'user:email',
});
const githubAuthUrl = `https://github.com/login/oauth/authorize?${params.toString()}`;
// Validate URL is properly formed
new URL(githubAuthUrl);
window.location.href = githubAuthUrl;
} catch (error) {
message.error('Invalid OAuth configuration');
console.error('GitHub OAuth URL error:', error);
}
};
const handleGoogleLogin = () => {
if (!GOOGLE_CLIENT_ID) {
message.error('Google OAuth is not configured');
return;
}
try {
const params = new URLSearchParams({
client_id: GOOGLE_CLIENT_ID,
redirect_uri: redirectUri,
response_type: 'code',
scope: 'openid email profile',
state: 'google',
});
const googleAuthUrl = `https://accounts.google.com/o/oauth2/v2/auth?${params.toString()}`;
// Validate URL is properly formed
new URL(googleAuthUrl);
window.location.href = googleAuthUrl;
} catch (error) {
message.error('Invalid OAuth configuration');
console.error('Google OAuth URL error:', error);
}
};
return (
<div className="mt-4">
<div className="space-y-2">
{GITHUB_CLIENT_ID && (
<Button
icon={<GithubOutlined />}
onClick={handleGitHubLogin}
className="w-full"
size="large"
>
Continue with GitHub
</Button>
)}
{GOOGLE_CLIENT_ID && (
<Button
icon={<GoogleOutlined />}
onClick={handleGoogleLogin}
className="w-full"
size="large"
>
Continue with Google
</Button>
)}
</div>
</div>
);
}

View File

@@ -2,11 +2,12 @@ import { EyeInvisibleOutlined, EyeTwoTone } from '@ant-design/icons';
import { Button, Input } from 'antd';
import { type JSX, useState } from 'react';
import { IS_CLOUD } from '../../../constants';
import { GITHUB_CLIENT_ID, GOOGLE_CLIENT_ID } from '../../../constants';
import { userApi } from '../../../entity/users';
import { StringUtils } from '../../../shared/lib';
import { FormValidator } from '../../../shared/lib/FormValidator';
import { OauthComponent } from './OauthComponent';
import { GithubOAuthComponent } from './oauth/GithubOAuthComponent';
import { GoogleOAuthComponent } from './oauth/GoogleOAuthComponent';
interface SignInComponentProps {
onSwitchToSignUp?: () => void;
@@ -67,9 +68,14 @@ export function SignInComponent({ onSwitchToSignUp }: SignInComponentProps): JSX
<div className="w-full max-w-[300px]">
<div className="mb-5 text-center text-2xl font-bold">Sign in</div>
<OauthComponent />
<div className="mt-4">
<div className="space-y-2">
<GithubOAuthComponent />
<GoogleOAuthComponent />
</div>
</div>
{IS_CLOUD && (
{(GOOGLE_CLIENT_ID || GITHUB_CLIENT_ID) && (
<div className="relative my-6">
<div className="absolute inset-0 flex items-center">
<div className="w-full border-t border-gray-300"></div>

View File

@@ -2,11 +2,12 @@ import { EyeInvisibleOutlined, EyeTwoTone } from '@ant-design/icons';
import { App, Button, Input } from 'antd';
import { type JSX, useState } from 'react';
import { IS_CLOUD } from '../../../constants';
import { GITHUB_CLIENT_ID, GOOGLE_CLIENT_ID } from '../../../constants';
import { userApi } from '../../../entity/users';
import { StringUtils } from '../../../shared/lib';
import { FormValidator } from '../../../shared/lib/FormValidator';
import { OauthComponent } from './OauthComponent';
import { GithubOAuthComponent } from './oauth/GithubOAuthComponent';
import { GoogleOAuthComponent } from './oauth/GoogleOAuthComponent';
interface SignUpComponentProps {
onSwitchToSignIn?: () => void;
@@ -98,9 +99,14 @@ export function SignUpComponent({ onSwitchToSignIn }: SignUpComponentProps): JSX
<div className="w-full max-w-[300px]">
<div className="mb-5 text-center text-2xl font-bold">Sign up</div>
<OauthComponent />
<div className="mt-4">
<div className="space-y-2">
<GithubOAuthComponent />
<GoogleOAuthComponent />
</div>
</div>
{IS_CLOUD && (
{(GOOGLE_CLIENT_ID || GITHUB_CLIENT_ID) && (
<div className="relative my-6">
<div className="absolute inset-0 flex items-center">
<div className="w-full border-t border-gray-300"></div>

View File

@@ -0,0 +1,38 @@
import { GithubOutlined } from '@ant-design/icons';
import { Button, message } from 'antd';
import { GITHUB_CLIENT_ID, getOAuthRedirectUri } from '../../../../constants';
export function GithubOAuthComponent() {
if (!GITHUB_CLIENT_ID) {
return null;
}
const redirectUri = getOAuthRedirectUri();
const handleGitHubLogin = () => {
try {
const params = new URLSearchParams({
client_id: GITHUB_CLIENT_ID,
redirect_uri: redirectUri,
state: 'github',
scope: 'user:email',
});
const githubAuthUrl = `https://github.com/login/oauth/authorize?${params.toString()}`;
// Validate URL is properly formed
new URL(githubAuthUrl);
window.location.href = githubAuthUrl;
} catch (error) {
message.error('Invalid OAuth configuration');
console.error('GitHub OAuth URL error:', error);
}
};
return (
<Button icon={<GithubOutlined />} onClick={handleGitHubLogin} className="w-full" size="large">
Continue with GitHub
</Button>
);
}

View File

@@ -0,0 +1,39 @@
import { GoogleOutlined } from '@ant-design/icons';
import { Button, message } from 'antd';
import { GOOGLE_CLIENT_ID, getOAuthRedirectUri } from '../../../../constants';
export function GoogleOAuthComponent() {
if (!GOOGLE_CLIENT_ID) {
return null;
}
const redirectUri = getOAuthRedirectUri();
const handleGoogleLogin = () => {
try {
const params = new URLSearchParams({
client_id: GOOGLE_CLIENT_ID,
redirect_uri: redirectUri,
response_type: 'code',
scope: 'openid email profile',
state: 'google',
});
const googleAuthUrl = `https://accounts.google.com/o/oauth2/v2/auth?${params.toString()}`;
// Validate URL is properly formed
new URL(googleAuthUrl);
window.location.href = googleAuthUrl;
} catch (error) {
message.error('Invalid OAuth configuration');
console.error('Google OAuth URL error:', error);
}
};
return (
<Button icon={<GoogleOutlined />} onClick={handleGoogleLogin} className="w-full" size="large">
Continue with Google
</Button>
);
}