Compare commits

...

9 Commits

Author SHA1 Message Date
Rostislav Dugin
4602dc3f88 Merge pull request #267 from databasus/develop
FIX (mysql): Enable allowCleartextPasswords over SSL
2026-01-14 18:13:46 +03:00
Rostislav Dugin
cbbfc5ea8f FIX (mysql): Enable allowCleartextPasswords over SSL 2026-01-14 18:11:49 +03:00
Rostislav Dugin
dd1072e230 Merge pull request #265 from databasus/develop
FIX (pre-commit): Add running go mod tidy in pre-commit
2026-01-14 15:18:35 +03:00
Rostislav Dugin
a495e5317a FIX (pre-commit): Add running go mod tidy in pre-commit 2026-01-14 15:18:06 +03:00
Rostislav Dugin
7eed647038 Merge pull request #264 from databasus/develop
Develop
2026-01-14 15:14:05 +03:00
Rostislav Dugin
6973241e25 FIX (backups): Throw error on parallel download token generation 2026-01-14 14:40:22 +03:00
Rostislav Dugin
ab181f5b81 FEATURE (bandwidth): Limit download throughput for backups to not exhaust more than 75% of server network bandwidth 2026-01-14 14:40:22 +03:00
Rostislav Dugin
b60a0cc170 FEATURE (backups): Allow single backup download to avoid exhausting of server throughput 2026-01-14 14:40:22 +03:00
Rostislav Dugin
f319a497b3 FEATURE (auth): Add rate limiting for sign in via email using sliding window 2026-01-14 14:40:22 +03:00
17 changed files with 1156 additions and 50 deletions

View File

@@ -27,3 +27,10 @@ repos:
language: system
files: ^backend/.*\.go$
pass_filenames: false
- id: backend-go-mod-tidy
name: Backend Go Mod Tidy
entry: bash -c "cd backend && go mod tidy"
language: system
files: ^backend/.*\.go$
pass_filenames: false

View File

@@ -28,7 +28,6 @@ require (
github.com/valkey-io/valkey-go v1.0.70
go.mongodb.org/mongo-driver v1.17.6
golang.org/x/crypto v0.46.0
golang.org/x/time v0.14.0
gorm.io/driver/postgres v1.5.11
gorm.io/gorm v1.26.1
)
@@ -186,6 +185,7 @@ require (
go.yaml.in/yaml/v2 v2.4.3 // indirect
golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect
golang.org/x/term v0.38.0 // indirect
golang.org/x/time v0.14.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/validator.v2 v2.0.1 // indirect
moul.io/http2curl/v2 v2.3.0 // indirect

View File

@@ -1,12 +1,15 @@
package backups
import (
"context"
backups_core "databasus-backend/internal/features/backups/backups/core"
backups_download "databasus-backend/internal/features/backups/backups/download"
"databasus-backend/internal/features/databases"
users_middleware "databasus-backend/internal/features/users/middleware"
"fmt"
"io"
"net/http"
"time"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
@@ -174,6 +177,7 @@ func (c *BackupController) CancelBackup(ctx *gin.Context) {
// @Success 200 {object} backups_download.GenerateDownloadTokenResponse
// @Failure 400
// @Failure 401
// @Failure 409 {object} map[string]string "Download already in progress"
// @Router /backups/{id}/download-token [post]
func (c *BackupController) GenerateDownloadToken(ctx *gin.Context) {
user, ok := users_middleware.GetUserFromContext(ctx)
@@ -190,6 +194,15 @@ func (c *BackupController) GenerateDownloadToken(ctx *gin.Context) {
response, err := c.backupService.GenerateDownloadToken(user, id)
if err != nil {
if err == backups_download.ErrDownloadAlreadyInProgress {
ctx.JSON(
http.StatusConflict,
gin.H{
"error": "Download already in progress for some of backups. Please wait until previous download completed or cancel it",
},
)
return
}
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
@@ -199,14 +212,22 @@ func (c *BackupController) GenerateDownloadToken(ctx *gin.Context) {
// GetFile
// @Summary Download a backup file
// @Description Download the backup file for the specified backup using a download token
// @Description Download the backup file for the specified backup using a download token.
// @Description
// @Description **Download Concurrency Control:**
// @Description - Only one download per user is allowed at a time
// @Description - If a download is already in progress, returns 409 Conflict
// @Description - Downloads are tracked using cache with 5-second TTL and 3-second heartbeat
// @Description - Browser cancellations automatically release the download lock
// @Description - Server crashes are handled via automatic cache expiry (5 seconds)
// @Tags backups
// @Param id path string true "Backup ID"
// @Param token query string true "Download token"
// @Success 200 {file} file
// @Failure 400
// @Failure 401
// @Failure 500
// @Failure 400 {object} map[string]string
// @Failure 401 {object} map[string]string
// @Failure 409 {object} map[string]string "Download already in progress"
// @Failure 500 {object} map[string]string
// @Router /backups/{id}/file [get]
func (c *BackupController) GetFile(ctx *gin.Context) {
token := ctx.Query("token")
@@ -215,7 +236,6 @@ func (c *BackupController) GetFile(ctx *gin.Context) {
return
}
// Get backup ID from URL
backupIDParam := ctx.Param("id")
backupID, err := uuid.Parse(backupIDParam)
if err != nil {
@@ -223,13 +243,22 @@ func (c *BackupController) GetFile(ctx *gin.Context) {
return
}
downloadToken, err := c.backupService.ValidateDownloadToken(token)
downloadToken, rateLimiter, err := c.backupService.ValidateDownloadToken(token)
if err != nil {
if err == backups_download.ErrDownloadAlreadyInProgress {
ctx.JSON(
http.StatusConflict,
gin.H{
"error": "download already in progress for this user. Please wait until previous download completed or cancel it",
},
)
return
}
ctx.JSON(http.StatusUnauthorized, gin.H{"error": "invalid or expired download token"})
return
}
// Verify token is for the requested backup
if downloadToken.BackupID != backupID {
ctx.JSON(http.StatusUnauthorized, gin.H{"error": "invalid or expired download token"})
return
@@ -239,18 +268,28 @@ func (c *BackupController) GetFile(ctx *gin.Context) {
downloadToken.BackupID,
)
if err != nil {
c.backupService.UnregisterDownload(downloadToken.UserID)
c.backupService.ReleaseDownloadLock(downloadToken.UserID)
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
rateLimitedReader := backups_download.NewRateLimitedReader(fileReader, rateLimiter)
heartbeatCtx, cancelHeartbeat := context.WithCancel(context.Background())
defer func() {
if err := fileReader.Close(); err != nil {
cancelHeartbeat()
c.backupService.UnregisterDownload(downloadToken.UserID)
c.backupService.ReleaseDownloadLock(downloadToken.UserID)
if err := rateLimitedReader.Close(); err != nil {
fmt.Printf("Error closing file reader: %v\n", err)
}
}()
go c.startDownloadHeartbeat(heartbeatCtx, downloadToken.UserID)
filename := c.generateBackupFilename(backup, database)
// Set Content-Length for progress tracking
if backup.BackupSizeMb > 0 {
sizeBytes := int64(backup.BackupSizeMb * 1024 * 1024)
ctx.Header("Content-Length", fmt.Sprintf("%d", sizeBytes))
@@ -262,13 +301,12 @@ func (c *BackupController) GetFile(ctx *gin.Context) {
fmt.Sprintf("attachment; filename=\"%s\"", filename),
)
_, err = io.Copy(ctx.Writer, fileReader)
_, err = io.Copy(ctx.Writer, rateLimitedReader)
if err != nil {
fmt.Printf("Error streaming file: %v\n", err)
return
}
// Write audit log after successful download
c.backupService.WriteAuditLogForDownload(downloadToken.UserID, backup, database)
}
@@ -334,3 +372,17 @@ func sanitizeFilename(name string) string {
return string(result)
}
func (c *BackupController) startDownloadHeartbeat(ctx context.Context, userID uuid.UUID) {
ticker := time.NewTicker(backups_download.GetDownloadHeartbeatInterval())
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.backupService.RefreshDownloadLock(userID)
}
}
}

View File

@@ -950,6 +950,189 @@ func Test_CancelBackup_InProgressBackup_SuccessfullyCancelled(t *testing.T) {
assert.True(t, foundCancelLog, "Cancel audit log should be created")
}
func Test_ConcurrentDownloadPrevention(t *testing.T) {
router := createTestRouter()
owner := users_testing.CreateTestUser(users_enums.UserRoleMember)
workspace := workspaces_testing.CreateTestWorkspace("Test Workspace", owner, router)
database, backup := createTestDatabaseWithBackups(workspace, owner, router)
var token1Response backups_download.GenerateDownloadTokenResponse
test_utils.MakePostRequestAndUnmarshal(
t,
router,
fmt.Sprintf("/api/v1/backups/%s/download-token", backup.ID.String()),
"Bearer "+owner.Token,
nil,
http.StatusOK,
&token1Response,
)
var token2Response backups_download.GenerateDownloadTokenResponse
test_utils.MakePostRequestAndUnmarshal(
t,
router,
fmt.Sprintf("/api/v1/backups/%s/download-token", backup.ID.String()),
"Bearer "+owner.Token,
nil,
http.StatusOK,
&token2Response,
)
downloadInProgress := make(chan bool, 1)
downloadComplete := make(chan bool, 1)
go func() {
test_utils.MakeGetRequest(
t,
router,
fmt.Sprintf(
"/api/v1/backups/%s/file?token=%s",
backup.ID.String(),
token1Response.Token,
),
"",
http.StatusOK,
)
downloadComplete <- true
}()
time.Sleep(50 * time.Millisecond)
service := GetBackupService()
if !service.IsDownloadInProgress(owner.UserID) {
t.Log("Warning: First download completed before we could test concurrency")
<-downloadComplete
return
}
downloadInProgress <- true
resp := test_utils.MakeGetRequest(
t,
router,
fmt.Sprintf("/api/v1/backups/%s/file?token=%s", backup.ID.String(), token2Response.Token),
"",
http.StatusConflict,
)
var errorResponse map[string]string
err := json.Unmarshal(resp.Body, &errorResponse)
assert.NoError(t, err)
assert.Contains(t, errorResponse["error"], "download already in progress")
<-downloadComplete
<-downloadInProgress
time.Sleep(100 * time.Millisecond)
var token3Response backups_download.GenerateDownloadTokenResponse
test_utils.MakePostRequestAndUnmarshal(
t,
router,
fmt.Sprintf("/api/v1/backups/%s/download-token", backup.ID.String()),
"Bearer "+owner.Token,
nil,
http.StatusOK,
&token3Response,
)
test_utils.MakeGetRequest(
t,
router,
fmt.Sprintf("/api/v1/backups/%s/file?token=%s", backup.ID.String(), token3Response.Token),
"",
http.StatusOK,
)
t.Log("Database:", database.Name)
t.Log(
"Successfully prevented concurrent downloads and allowed subsequent downloads after completion",
)
}
func Test_GenerateDownloadToken_BlockedWhenDownloadInProgress(t *testing.T) {
router := createTestRouter()
owner := users_testing.CreateTestUser(users_enums.UserRoleMember)
workspace := workspaces_testing.CreateTestWorkspace("Test Workspace", owner, router)
database, backup := createTestDatabaseWithBackups(workspace, owner, router)
var token1Response backups_download.GenerateDownloadTokenResponse
test_utils.MakePostRequestAndUnmarshal(
t,
router,
fmt.Sprintf("/api/v1/backups/%s/download-token", backup.ID.String()),
"Bearer "+owner.Token,
nil,
http.StatusOK,
&token1Response,
)
downloadComplete := make(chan bool, 1)
go func() {
test_utils.MakeGetRequest(
t,
router,
fmt.Sprintf(
"/api/v1/backups/%s/file?token=%s",
backup.ID.String(),
token1Response.Token,
),
"",
http.StatusOK,
)
downloadComplete <- true
}()
time.Sleep(50 * time.Millisecond)
service := GetBackupService()
if !service.IsDownloadInProgress(owner.UserID) {
t.Log("Warning: First download completed before we could test token generation blocking")
<-downloadComplete
return
}
resp := test_utils.MakePostRequest(
t,
router,
fmt.Sprintf("/api/v1/backups/%s/download-token", backup.ID.String()),
"Bearer "+owner.Token,
nil,
http.StatusConflict,
)
var errorResponse map[string]string
err := json.Unmarshal(resp.Body, &errorResponse)
assert.NoError(t, err)
assert.Contains(t, errorResponse["error"], "download already in progress")
<-downloadComplete
time.Sleep(100 * time.Millisecond)
var token2Response backups_download.GenerateDownloadTokenResponse
test_utils.MakePostRequestAndUnmarshal(
t,
router,
fmt.Sprintf("/api/v1/backups/%s/download-token", backup.ID.String()),
"Bearer "+owner.Token,
nil,
http.StatusOK,
&token2Response,
)
assert.NotEmpty(t, token2Response.Token)
assert.NotEqual(t, token1Response.Token, token2Response.Token)
t.Log("Database:", database.Name)
t.Log(
"Successfully blocked token generation during download and allowed generation after completion",
)
}
func createTestRouter() *gin.Engine {
return CreateTestRouter()
}
@@ -1131,3 +1314,267 @@ func createExpiredDownloadToken(backupID, userID uuid.UUID) string {
return token
}
func Test_BandwidthThrottling_SingleDownload_Uses75Percent(t *testing.T) {
router := createTestRouter()
owner := users_testing.CreateTestUser(users_enums.UserRoleMember)
workspace := workspaces_testing.CreateTestWorkspace("Test Workspace", owner, router)
_, backup := createTestDatabaseWithBackups(workspace, owner, router)
bandwidthManager := backups_download.GetBandwidthManager()
initialCount := bandwidthManager.GetActiveDownloadCount()
var tokenResponse backups_download.GenerateDownloadTokenResponse
test_utils.MakePostRequestAndUnmarshal(
t,
router,
fmt.Sprintf("/api/v1/backups/%s/download-token", backup.ID.String()),
"Bearer "+owner.Token,
nil,
http.StatusOK,
&tokenResponse,
)
downloadStarted := make(chan bool, 1)
downloadComplete := make(chan bool, 1)
go func() {
test_utils.MakeGetRequest(
t,
router,
fmt.Sprintf(
"/api/v1/backups/%s/file?token=%s",
backup.ID.String(),
tokenResponse.Token,
),
"",
http.StatusOK,
)
downloadComplete <- true
}()
time.Sleep(50 * time.Millisecond)
activeCount := bandwidthManager.GetActiveDownloadCount()
if activeCount > initialCount {
downloadStarted <- true
assert.Equal(t, initialCount+1, activeCount, "Should have one active download")
}
<-downloadComplete
if len(downloadStarted) > 0 {
<-downloadStarted
}
time.Sleep(50 * time.Millisecond)
finalCount := bandwidthManager.GetActiveDownloadCount()
assert.Equal(t, initialCount, finalCount, "Download should be unregistered after completion")
}
func Test_BandwidthThrottling_MultipleDownloads_ShareBandwidth(t *testing.T) {
router := createTestRouter()
owner1 := users_testing.CreateTestUser(users_enums.UserRoleMember)
owner2 := users_testing.CreateTestUser(users_enums.UserRoleMember)
owner3 := users_testing.CreateTestUser(users_enums.UserRoleMember)
workspace := workspaces_testing.CreateTestWorkspace("Test Workspace", owner1, router)
workspaces_testing.AddMemberToWorkspace(
workspace,
owner2,
users_enums.WorkspaceRoleMember,
owner1.Token,
router,
)
workspaces_testing.AddMemberToWorkspace(
workspace,
owner3,
users_enums.WorkspaceRoleMember,
owner1.Token,
router,
)
database := createTestDatabase("Test Database", workspace.ID, owner1.Token, router)
storage := createTestStorage(workspace.ID)
configService := backups_config.GetBackupConfigService()
config, err := configService.GetBackupConfigByDbId(database.ID)
assert.NoError(t, err)
config.IsBackupsEnabled = true
config.StorageID = &storage.ID
config.Storage = storage
_, err = configService.SaveBackupConfig(config)
assert.NoError(t, err)
backup1 := createTestBackup(database, owner1)
backup2 := createTestBackup(database, owner2)
backup3 := createTestBackup(database, owner3)
var token1, token2, token3 backups_download.GenerateDownloadTokenResponse
test_utils.MakePostRequestAndUnmarshal(
t,
router,
fmt.Sprintf("/api/v1/backups/%s/download-token", backup1.ID.String()),
"Bearer "+owner1.Token,
nil,
http.StatusOK,
&token1,
)
test_utils.MakePostRequestAndUnmarshal(
t,
router,
fmt.Sprintf("/api/v1/backups/%s/download-token", backup2.ID.String()),
"Bearer "+owner2.Token,
nil,
http.StatusOK,
&token2,
)
test_utils.MakePostRequestAndUnmarshal(
t,
router,
fmt.Sprintf("/api/v1/backups/%s/download-token", backup3.ID.String()),
"Bearer "+owner3.Token,
nil,
http.StatusOK,
&token3,
)
bandwidthManager := backups_download.GetBandwidthManager()
initialCount := bandwidthManager.GetActiveDownloadCount()
complete1 := make(chan bool, 1)
complete2 := make(chan bool, 1)
complete3 := make(chan bool, 1)
go func() {
test_utils.MakeGetRequest(
t,
router,
fmt.Sprintf("/api/v1/backups/%s/file?token=%s", backup1.ID.String(), token1.Token),
"",
http.StatusOK,
)
complete1 <- true
}()
go func() {
test_utils.MakeGetRequest(
t,
router,
fmt.Sprintf("/api/v1/backups/%s/file?token=%s", backup2.ID.String(), token2.Token),
"",
http.StatusOK,
)
complete2 <- true
}()
go func() {
test_utils.MakeGetRequest(
t,
router,
fmt.Sprintf("/api/v1/backups/%s/file?token=%s", backup3.ID.String(), token3.Token),
"",
http.StatusOK,
)
complete3 <- true
}()
time.Sleep(100 * time.Millisecond)
<-complete1
<-complete2
<-complete3
time.Sleep(100 * time.Millisecond)
finalCount := bandwidthManager.GetActiveDownloadCount()
assert.Equal(t, initialCount, finalCount, "All downloads should be unregistered")
}
func Test_BandwidthThrottling_DynamicAdjustment(t *testing.T) {
router := createTestRouter()
owner1 := users_testing.CreateTestUser(users_enums.UserRoleMember)
owner2 := users_testing.CreateTestUser(users_enums.UserRoleMember)
workspace := workspaces_testing.CreateTestWorkspace("Test Workspace", owner1, router)
workspaces_testing.AddMemberToWorkspace(
workspace,
owner2,
users_enums.WorkspaceRoleMember,
owner1.Token,
router,
)
database := createTestDatabase("Test Database", workspace.ID, owner1.Token, router)
storage := createTestStorage(workspace.ID)
configService := backups_config.GetBackupConfigService()
config, err := configService.GetBackupConfigByDbId(database.ID)
assert.NoError(t, err)
config.IsBackupsEnabled = true
config.StorageID = &storage.ID
config.Storage = storage
_, err = configService.SaveBackupConfig(config)
assert.NoError(t, err)
backup1 := createTestBackup(database, owner1)
backup2 := createTestBackup(database, owner2)
var token1, token2 backups_download.GenerateDownloadTokenResponse
test_utils.MakePostRequestAndUnmarshal(
t,
router,
fmt.Sprintf("/api/v1/backups/%s/download-token", backup1.ID.String()),
"Bearer "+owner1.Token,
nil,
http.StatusOK,
&token1,
)
test_utils.MakePostRequestAndUnmarshal(
t,
router,
fmt.Sprintf("/api/v1/backups/%s/download-token", backup2.ID.String()),
"Bearer "+owner2.Token,
nil,
http.StatusOK,
&token2,
)
bandwidthManager := backups_download.GetBandwidthManager()
initialCount := bandwidthManager.GetActiveDownloadCount()
complete1 := make(chan bool, 1)
complete2 := make(chan bool, 1)
go func() {
test_utils.MakeGetRequest(
t,
router,
fmt.Sprintf("/api/v1/backups/%s/file?token=%s", backup1.ID.String(), token1.Token),
"",
http.StatusOK,
)
complete1 <- true
}()
time.Sleep(50 * time.Millisecond)
go func() {
test_utils.MakeGetRequest(
t,
router,
fmt.Sprintf("/api/v1/backups/%s/file?token=%s", backup2.ID.String(), token2.Token),
"",
http.StatusOK,
)
complete2 <- true
}()
<-complete1
<-complete2
time.Sleep(100 * time.Millisecond)
finalCount := bandwidthManager.GetActiveDownloadCount()
assert.Equal(t, initialCount, finalCount, "All downloads completed and unregistered")
}

View File

@@ -0,0 +1,81 @@
package backups_download
import (
"fmt"
"sync"
"github.com/google/uuid"
)
type BandwidthManager struct {
mu sync.RWMutex
activeDownloads map[uuid.UUID]*activeDownload
maxTotalBytesPerSecond int64
bytesPerSecondPerDownload int64
}
type activeDownload struct {
userID uuid.UUID
rateLimiter *RateLimiter
}
func NewBandwidthManager(throughputMBs int) *BandwidthManager {
// Use 75% of total throughput
maxBytes := int64(throughputMBs) * 1024 * 1024 * 75 / 100
return &BandwidthManager{
activeDownloads: make(map[uuid.UUID]*activeDownload),
maxTotalBytesPerSecond: maxBytes,
bytesPerSecondPerDownload: maxBytes,
}
}
func (bm *BandwidthManager) RegisterDownload(userID uuid.UUID) (*RateLimiter, error) {
bm.mu.Lock()
defer bm.mu.Unlock()
if _, exists := bm.activeDownloads[userID]; exists {
return nil, fmt.Errorf("download already registered for user %s", userID)
}
rateLimiter := NewRateLimiter(bm.bytesPerSecondPerDownload)
bm.activeDownloads[userID] = &activeDownload{
userID: userID,
rateLimiter: rateLimiter,
}
bm.recalculateRates()
return rateLimiter, nil
}
func (bm *BandwidthManager) UnregisterDownload(userID uuid.UUID) {
bm.mu.Lock()
defer bm.mu.Unlock()
delete(bm.activeDownloads, userID)
bm.recalculateRates()
}
func (bm *BandwidthManager) GetActiveDownloadCount() int {
bm.mu.RLock()
defer bm.mu.RUnlock()
return len(bm.activeDownloads)
}
func (bm *BandwidthManager) recalculateRates() {
activeCount := len(bm.activeDownloads)
if activeCount == 0 {
bm.bytesPerSecondPerDownload = bm.maxTotalBytesPerSecond
return
}
newRate := bm.maxTotalBytesPerSecond / int64(activeCount)
bm.bytesPerSecondPerDownload = newRate
for _, download := range bm.activeDownloads {
download.rateLimiter.UpdateRate(newRate)
}
}

View File

@@ -0,0 +1,150 @@
package backups_download
import (
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
)
func Test_BandwidthManager_RegisterSingleDownload(t *testing.T) {
throughputMBs := 100
manager := NewBandwidthManager(throughputMBs)
expectedBytesPerSec := int64(100 * 1024 * 1024 * 75 / 100)
assert.Equal(t, expectedBytesPerSec, manager.maxTotalBytesPerSecond)
assert.Equal(t, expectedBytesPerSec, manager.bytesPerSecondPerDownload)
userID := uuid.New()
rateLimiter, err := manager.RegisterDownload(userID)
assert.NoError(t, err)
assert.NotNil(t, rateLimiter)
assert.Equal(t, 1, manager.GetActiveDownloadCount())
assert.Equal(t, expectedBytesPerSec, manager.bytesPerSecondPerDownload)
assert.Equal(t, expectedBytesPerSec, rateLimiter.bytesPerSecond)
}
func Test_BandwidthManager_RegisterMultipleDownloads_BandwidthShared(t *testing.T) {
throughputMBs := 100
manager := NewBandwidthManager(throughputMBs)
maxBytes := int64(100 * 1024 * 1024 * 75 / 100)
user1 := uuid.New()
rateLimiter1, err := manager.RegisterDownload(user1)
assert.NoError(t, err)
assert.Equal(t, maxBytes, rateLimiter1.bytesPerSecond)
user2 := uuid.New()
rateLimiter2, err := manager.RegisterDownload(user2)
assert.NoError(t, err)
expectedPerDownload := maxBytes / 2
assert.Equal(t, expectedPerDownload, manager.bytesPerSecondPerDownload)
assert.Equal(t, expectedPerDownload, rateLimiter1.bytesPerSecond)
assert.Equal(t, expectedPerDownload, rateLimiter2.bytesPerSecond)
assert.Equal(t, expectedPerDownload, rateLimiter2.bytesPerSecond)
user3 := uuid.New()
rateLimiter3, err := manager.RegisterDownload(user3)
assert.NoError(t, err)
expectedPerDownload = maxBytes / 3
assert.Equal(t, expectedPerDownload, manager.bytesPerSecondPerDownload)
assert.Equal(t, expectedPerDownload, rateLimiter1.bytesPerSecond)
assert.Equal(t, expectedPerDownload, rateLimiter2.bytesPerSecond)
assert.Equal(t, expectedPerDownload, rateLimiter3.bytesPerSecond)
assert.Equal(t, 3, manager.GetActiveDownloadCount())
}
func Test_BandwidthManager_UnregisterDownload_BandwidthRebalanced(t *testing.T) {
throughputMBs := 100
manager := NewBandwidthManager(throughputMBs)
maxBytes := int64(100 * 1024 * 1024 * 75 / 100)
user1 := uuid.New()
rateLimiter1, _ := manager.RegisterDownload(user1)
user2 := uuid.New()
_, _ = manager.RegisterDownload(user2)
user3 := uuid.New()
rateLimiter3, _ := manager.RegisterDownload(user3)
assert.Equal(t, 3, manager.GetActiveDownloadCount())
expectedPerDownload := maxBytes / 3
assert.Equal(t, expectedPerDownload, rateLimiter1.bytesPerSecond)
manager.UnregisterDownload(user2)
assert.Equal(t, 2, manager.GetActiveDownloadCount())
expectedPerDownload = maxBytes / 2
assert.Equal(t, expectedPerDownload, manager.bytesPerSecondPerDownload)
assert.Equal(t, expectedPerDownload, rateLimiter1.bytesPerSecond)
assert.Equal(t, expectedPerDownload, rateLimiter3.bytesPerSecond)
manager.UnregisterDownload(user1)
assert.Equal(t, 1, manager.GetActiveDownloadCount())
assert.Equal(t, maxBytes, manager.bytesPerSecondPerDownload)
assert.Equal(t, maxBytes, rateLimiter3.bytesPerSecond)
manager.UnregisterDownload(user3)
assert.Equal(t, 0, manager.GetActiveDownloadCount())
assert.Equal(t, maxBytes, manager.bytesPerSecondPerDownload)
}
func Test_BandwidthManager_RegisterDuplicateUser_ReturnsError(t *testing.T) {
manager := NewBandwidthManager(100)
userID := uuid.New()
_, err := manager.RegisterDownload(userID)
assert.NoError(t, err)
_, err = manager.RegisterDownload(userID)
assert.Error(t, err)
assert.Contains(t, err.Error(), "download already registered")
}
func Test_RateLimiter_TokenBucketBasic(t *testing.T) {
bytesPerSec := int64(1024 * 1024)
limiter := NewRateLimiter(bytesPerSec)
assert.Equal(t, bytesPerSec, limiter.bytesPerSecond)
assert.Equal(t, bytesPerSec*2, limiter.bucketSize)
start := time.Now()
limiter.Wait(512 * 1024)
elapsed := time.Since(start)
assert.Less(t, elapsed, 100*time.Millisecond)
}
func Test_RateLimiter_UpdateRate(t *testing.T) {
limiter := NewRateLimiter(1024 * 1024)
assert.Equal(t, int64(1024*1024), limiter.bytesPerSecond)
newRate := int64(2 * 1024 * 1024)
limiter.UpdateRate(newRate)
assert.Equal(t, newRate, limiter.bytesPerSecond)
assert.Equal(t, newRate*2, limiter.bucketSize)
}
func Test_RateLimiter_ThrottlesCorrectly(t *testing.T) {
bytesPerSec := int64(1024 * 1024)
limiter := NewRateLimiter(bytesPerSec)
limiter.availableTokens = 0
start := time.Now()
limiter.Wait(bytesPerSec / 2)
elapsed := time.Since(start)
assert.GreaterOrEqual(t, elapsed, 400*time.Millisecond)
assert.LessOrEqual(t, elapsed, 700*time.Millisecond)
}

View File

@@ -1,19 +1,38 @@
package backups_download
import (
"databasus-backend/internal/config"
cache_utils "databasus-backend/internal/util/cache"
"databasus-backend/internal/util/logger"
)
var downloadTokenRepository = &DownloadTokenRepository{}
var downloadTokenService = &DownloadTokenService{
downloadTokenRepository,
logger.GetLogger(),
}
var downloadTracker = NewDownloadTracker(cache_utils.GetValkeyClient())
var downloadTokenBackgroundService = &DownloadTokenBackgroundService{
downloadTokenService,
logger.GetLogger(),
var bandwidthManager *BandwidthManager
var downloadTokenService *DownloadTokenService
var downloadTokenBackgroundService *DownloadTokenBackgroundService
func init() {
env := config.GetEnv()
throughputMBs := env.NodeNetworkThroughputMBs
if throughputMBs == 0 {
throughputMBs = 125
}
bandwidthManager = NewBandwidthManager(throughputMBs)
downloadTokenService = &DownloadTokenService{
downloadTokenRepository,
logger.GetLogger(),
downloadTracker,
bandwidthManager,
}
downloadTokenBackgroundService = &DownloadTokenBackgroundService{
downloadTokenService,
logger.GetLogger(),
}
}
func GetDownloadTokenService() *DownloadTokenService {
@@ -23,3 +42,7 @@ func GetDownloadTokenService() *DownloadTokenService {
func GetDownloadTokenBackgroundService() *DownloadTokenBackgroundService {
return downloadTokenBackgroundService
}
func GetBandwidthManager() *BandwidthManager {
return bandwidthManager
}

View File

@@ -0,0 +1,101 @@
package backups_download
import (
"io"
"sync"
"time"
)
type RateLimiter struct {
mu sync.Mutex
bytesPerSecond int64
bucketSize int64
availableTokens float64
lastRefill time.Time
}
func NewRateLimiter(bytesPerSecond int64) *RateLimiter {
if bytesPerSecond <= 0 {
bytesPerSecond = 1024 * 1024 * 100
}
return &RateLimiter{
bytesPerSecond: bytesPerSecond,
bucketSize: bytesPerSecond * 2,
availableTokens: float64(bytesPerSecond * 2),
lastRefill: time.Now().UTC(),
}
}
func (rl *RateLimiter) UpdateRate(bytesPerSecond int64) {
rl.mu.Lock()
defer rl.mu.Unlock()
if bytesPerSecond <= 0 {
bytesPerSecond = 1024 * 1024 * 100
}
rl.bytesPerSecond = bytesPerSecond
rl.bucketSize = bytesPerSecond * 2
if rl.availableTokens > float64(rl.bucketSize) {
rl.availableTokens = float64(rl.bucketSize)
}
}
func (rl *RateLimiter) Wait(bytes int64) {
rl.mu.Lock()
defer rl.mu.Unlock()
for {
now := time.Now().UTC()
elapsed := now.Sub(rl.lastRefill).Seconds()
tokensToAdd := elapsed * float64(rl.bytesPerSecond)
rl.availableTokens += tokensToAdd
if rl.availableTokens > float64(rl.bucketSize) {
rl.availableTokens = float64(rl.bucketSize)
}
rl.lastRefill = now
if rl.availableTokens >= float64(bytes) {
rl.availableTokens -= float64(bytes)
return
}
tokensNeeded := float64(bytes) - rl.availableTokens
waitTime := time.Duration(tokensNeeded/float64(rl.bytesPerSecond)*1000) * time.Millisecond
if waitTime < time.Millisecond {
waitTime = time.Millisecond
}
rl.mu.Unlock()
time.Sleep(waitTime)
rl.mu.Lock()
}
}
type RateLimitedReader struct {
reader io.ReadCloser
rateLimiter *RateLimiter
}
func NewRateLimitedReader(reader io.ReadCloser, limiter *RateLimiter) *RateLimitedReader {
return &RateLimitedReader{
reader: reader,
rateLimiter: limiter,
}
}
func (r *RateLimitedReader) Read(p []byte) (n int, err error) {
n, err = r.reader.Read(p)
if n > 0 {
r.rateLimiter.Wait(int64(n))
}
return n, err
}
func (r *RateLimitedReader) Close() error {
return r.reader.Close()
}

View File

@@ -9,11 +9,17 @@ import (
)
type DownloadTokenService struct {
repository *DownloadTokenRepository
logger *slog.Logger
repository *DownloadTokenRepository
logger *slog.Logger
downloadTracker *DownloadTracker
bandwidthManager *BandwidthManager
}
func (s *DownloadTokenService) Generate(backupID, userID uuid.UUID) (string, error) {
if s.downloadTracker.IsDownloadInProgress(userID) {
return "", ErrDownloadAlreadyInProgress
}
token := GenerateSecureToken()
downloadToken := &DownloadToken{
@@ -32,22 +38,34 @@ func (s *DownloadTokenService) Generate(backupID, userID uuid.UUID) (string, err
return token, nil
}
func (s *DownloadTokenService) ValidateAndConsume(token string) (*DownloadToken, error) {
func (s *DownloadTokenService) ValidateAndConsume(
token string,
) (*DownloadToken, *RateLimiter, error) {
dt, err := s.repository.FindByToken(token)
if err != nil {
return nil, err
return nil, nil, err
}
if dt == nil {
return nil, errors.New("invalid token")
return nil, nil, errors.New("invalid token")
}
if dt.Used {
return nil, errors.New("token already used")
return nil, nil, errors.New("token already used")
}
if time.Now().UTC().After(dt.ExpiresAt) {
return nil, errors.New("token expired")
return nil, nil, errors.New("token expired")
}
if err := s.downloadTracker.AcquireDownloadLock(dt.UserID); err != nil {
return nil, nil, err
}
rateLimiter, err := s.bandwidthManager.RegisterDownload(dt.UserID)
if err != nil {
s.downloadTracker.ReleaseDownloadLock(dt.UserID)
return nil, nil, err
}
dt.Used = true
@@ -55,8 +73,26 @@ func (s *DownloadTokenService) ValidateAndConsume(token string) (*DownloadToken,
s.logger.Error("Failed to mark token as used", "error", err)
}
s.logger.Info("Token validated and consumed", "backupId", dt.BackupID)
return dt, nil
s.logger.Info("Token validated and consumed", "backupId", dt.BackupID, "userId", dt.UserID)
return dt, rateLimiter, nil
}
func (s *DownloadTokenService) RefreshDownloadLock(userID uuid.UUID) {
s.downloadTracker.RefreshDownloadLock(userID)
}
func (s *DownloadTokenService) ReleaseDownloadLock(userID uuid.UUID) {
s.downloadTracker.ReleaseDownloadLock(userID)
s.logger.Info("Released download lock", "userId", userID)
}
func (s *DownloadTokenService) IsDownloadInProgress(userID uuid.UUID) bool {
return s.downloadTracker.IsDownloadInProgress(userID)
}
func (s *DownloadTokenService) UnregisterDownload(userID uuid.UUID) {
s.bandwidthManager.UnregisterDownload(userID)
s.logger.Info("Unregistered from bandwidth manager", "userId", userID)
}
func (s *DownloadTokenService) CleanExpiredTokens() error {

View File

@@ -0,0 +1,66 @@
package backups_download
import (
cache_utils "databasus-backend/internal/util/cache"
"errors"
"time"
"github.com/google/uuid"
"github.com/valkey-io/valkey-go"
)
const (
downloadLockPrefix = "backup_download_lock:"
downloadLockTTL = 5 * time.Second
downloadLockValue = "1"
downloadHeartbeatDelay = 3 * time.Second
)
var (
ErrDownloadAlreadyInProgress = errors.New("download already in progress for this user")
)
type DownloadTracker struct {
cache *cache_utils.CacheUtil[string]
}
func NewDownloadTracker(client valkey.Client) *DownloadTracker {
return &DownloadTracker{
cache: cache_utils.NewCacheUtil[string](client, downloadLockPrefix),
}
}
func (t *DownloadTracker) AcquireDownloadLock(userID uuid.UUID) error {
key := userID.String()
existingLock := t.cache.Get(key)
if existingLock != nil {
return ErrDownloadAlreadyInProgress
}
value := downloadLockValue
t.cache.Set(key, &value)
return nil
}
func (t *DownloadTracker) RefreshDownloadLock(userID uuid.UUID) {
key := userID.String()
value := downloadLockValue
t.cache.Set(key, &value)
}
func (t *DownloadTracker) ReleaseDownloadLock(userID uuid.UUID) {
key := userID.String()
t.cache.Invalidate(key)
}
func (t *DownloadTracker) IsDownloadInProgress(userID uuid.UUID) bool {
key := userID.String()
existingLock := t.cache.Get(key)
return existingLock != nil
}
func GetDownloadHeartbeatInterval() time.Duration {
return downloadHeartbeatDelay
}

View File

@@ -481,7 +481,7 @@ func (s *BackupService) GenerateDownloadToken(
func (s *BackupService) ValidateDownloadToken(
token string,
) (*backups_download.DownloadToken, error) {
) (*backups_download.DownloadToken, *backups_download.RateLimiter, error) {
return s.downloadTokenService.ValidateAndConsume(token)
}
@@ -522,6 +522,22 @@ func (s *BackupService) WriteAuditLogForDownload(
)
}
func (s *BackupService) RefreshDownloadLock(userID uuid.UUID) {
s.downloadTokenService.RefreshDownloadLock(userID)
}
func (s *BackupService) ReleaseDownloadLock(userID uuid.UUID) {
s.downloadTokenService.ReleaseDownloadLock(userID)
}
func (s *BackupService) IsDownloadInProgress(userID uuid.UUID) bool {
return s.downloadTokenService.IsDownloadInProgress(userID)
}
func (s *BackupService) UnregisterDownload(userID uuid.UUID) {
s.downloadTokenService.UnregisterDownload(userID)
}
func (s *BackupService) generateBackupFilename(
backup *backups_core.Backup,
database *databases.Database,

View File

@@ -400,6 +400,7 @@ func HasPrivilege(privileges, priv string) bool {
func (m *MysqlDatabase) buildDSN(password string, database string) string {
tlsConfig := "false"
allowCleartext := ""
if m.IsHttps {
err := mysql.RegisterTLSConfig("mysql-skip-verify", &tls.Config{
@@ -411,16 +412,18 @@ func (m *MysqlDatabase) buildDSN(password string, database string) string {
}
tlsConfig = "mysql-skip-verify"
allowCleartext = "&allowCleartextPasswords=1"
}
return fmt.Sprintf(
"%s:%s@tcp(%s:%d)/%s?parseTime=true&timeout=15s&tls=%s&charset=utf8mb4",
"%s:%s@tcp(%s:%d)/%s?parseTime=true&timeout=15s&tls=%s&charset=utf8mb4%s",
m.Username,
password,
m.Host,
m.Port,
database,
tlsConfig,
allowCleartext,
)
}

View File

@@ -2,13 +2,12 @@ package users_controllers
import (
users_services "databasus-backend/internal/features/users/services"
"golang.org/x/time/rate"
cache_utils "databasus-backend/internal/util/cache"
)
var userController = &UserController{
users_services.GetUserService(),
rate.NewLimiter(rate.Limit(3), 3), // 3 rps with 3 burst
cache_utils.NewRateLimiter(cache_utils.GetValkeyClient()),
}
var settingsController = &SettingsController{

View File

@@ -14,7 +14,6 @@ import (
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"golang.org/x/time/rate"
)
func Test_AdminLifecycleE2E_CompletesSuccessfully(t *testing.T) {
@@ -185,7 +184,6 @@ func createUserTestRouter() *gin.Engine {
// Register protected routes with auth middleware
protected := v1.Group("").Use(users_middleware.AuthMiddleware(users_services.GetUserService()))
GetUserController().RegisterProtectedRoutes(protected.(*gin.RouterGroup))
GetUserController().SetSignInLimiter(rate.NewLimiter(rate.Limit(100), 100))
// Setup audit log service
users_services.GetUserService().SetAuditLogWriter(&AuditLogWriterStub{})

View File

@@ -3,20 +3,21 @@ package users_controllers
import (
"errors"
"net/http"
"time"
"databasus-backend/internal/config"
user_dto "databasus-backend/internal/features/users/dto"
users_errors "databasus-backend/internal/features/users/errors"
user_middleware "databasus-backend/internal/features/users/middleware"
users_services "databasus-backend/internal/features/users/services"
cache_utils "databasus-backend/internal/util/cache"
"github.com/gin-gonic/gin"
"golang.org/x/time/rate"
)
type UserController struct {
userService *users_services.UserService
signinLimiter *rate.Limiter
userService *users_services.UserService
rateLimiter *cache_utils.RateLimiter
}
func (c *UserController) RegisterRoutes(router *gin.RouterGroup) {
@@ -39,10 +40,6 @@ func (c *UserController) RegisterProtectedRoutes(router *gin.RouterGroup) {
router.POST("/users/invite", c.InviteUser)
}
func (c *UserController) SetSignInLimiter(limiter *rate.Limiter) {
c.signinLimiter = limiter
}
// SignUp
// @Summary Register a new user
// @Description Register a new user with email and password
@@ -81,8 +78,14 @@ func (c *UserController) SignUp(ctx *gin.Context) {
// @Failure 429 {object} map[string]string "Rate limit exceeded"
// @Router /users/signin [post]
func (c *UserController) SignIn(ctx *gin.Context) {
// We use rate limiter to prevent brute force attacks
if !c.signinLimiter.Allow() {
var request user_dto.SignInRequestDTO
if err := ctx.ShouldBindJSON(&request); err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request format"})
return
}
allowed, _ := c.rateLimiter.CheckLimit(request.Email, "signin", 10, 1*time.Minute)
if !allowed {
ctx.JSON(
http.StatusTooManyRequests,
gin.H{"error": "Rate limit exceeded. Please try again later."},
@@ -90,12 +93,6 @@ func (c *UserController) SignIn(ctx *gin.Context) {
return
}
var request user_dto.SignInRequestDTO
if err := ctx.ShouldBindJSON(&request); err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request format"})
return
}
response, err := c.userService.SignIn(&request)
if err != nil {
ctx.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})

View File

@@ -1146,3 +1146,48 @@ func Test_GoogleOAuth_WithInvitedUser_ActivatesUser(t *testing.T) {
assert.Equal(t, email, response.Email)
assert.False(t, response.IsNewUser)
}
func Test_SignIn_WithExcessiveAttempts_RateLimitEnforced(t *testing.T) {
router := createUserTestRouter()
email := "ratelimit" + uuid.New().String() + "@example.com"
password := "testpassword123"
// Create a user first
signupRequest := users_dto.SignUpRequestDTO{
Email: email,
Password: password,
Name: "Rate Limit Test User",
}
test_utils.MakePostRequest(t, router, "/api/v1/users/signup", "", signupRequest, http.StatusOK)
// Make 10 sign-in attempts (should succeed)
for range 10 {
signinRequest := users_dto.SignInRequestDTO{
Email: email,
Password: password,
}
test_utils.MakePostRequest(
t,
router,
"/api/v1/users/signin",
"",
signinRequest,
http.StatusOK,
)
}
// 11th attempt should be rate limited
signinRequest := users_dto.SignInRequestDTO{
Email: email,
Password: password,
}
resp := test_utils.MakePostRequest(
t,
router,
"/api/v1/users/signin",
"",
signinRequest,
http.StatusTooManyRequests,
)
assert.Contains(t, string(resp.Body), "Rate limit exceeded")
}

View File

@@ -0,0 +1,85 @@
package cache_utils
import (
"context"
"fmt"
"time"
"github.com/google/uuid"
"github.com/valkey-io/valkey-go"
)
type RateLimiter struct {
client valkey.Client
}
func NewRateLimiter(client valkey.Client) *RateLimiter {
return &RateLimiter{
client: client,
}
}
func (r *RateLimiter) CheckLimit(
identifier string,
endpoint string,
maxRequests int,
windowDuration time.Duration,
) (bool, error) {
requestID := uuid.New().String()
keyPrefix := fmt.Sprintf("ratelimit:%s:%s", endpoint, identifier)
fullKey := fmt.Sprintf("%s:%s", keyPrefix, requestID)
ctx, cancel := context.WithTimeout(context.Background(), DefaultCacheTimeout)
defer cancel()
// Set the key with TTL
setCmd := r.client.B().
Set().
Key(fullKey).
Value("1").
ExSeconds(int64(windowDuration.Seconds())).
Build()
if err := r.client.Do(ctx, setCmd).Error(); err != nil {
return true, fmt.Errorf("failed to set rate limit key: %w", err)
}
// Count keys matching the pattern
count, err := r.countKeys(keyPrefix)
if err != nil {
return true, fmt.Errorf("failed to count rate limit keys: %w", err)
}
return count <= maxRequests, nil
}
func (r *RateLimiter) countKeys(keyPrefix string) (int, error) {
pattern := keyPrefix + ":*"
cursor := uint64(0)
totalCount := 0
for {
ctx, cancel := context.WithTimeout(context.Background(), DefaultCacheTimeout)
scanCmd := r.client.B().Scan().Cursor(cursor).Match(pattern).Count(100).Build()
result := r.client.Do(ctx, scanCmd)
cancel()
if result.Error() != nil {
return 0, result.Error()
}
scanResult, err := result.AsScanEntry()
if err != nil {
return 0, err
}
totalCount += len(scanResult.Elements)
cursor = scanResult.Cursor
if cursor == 0 {
break
}
}
return totalCount, nil
}