Merge pull request #361 from databasus/develop

Develop
This commit is contained in:
Rostislav Dugin
2026-02-13 16:57:25 +03:00
committed by GitHub
6 changed files with 725 additions and 51 deletions

View File

@@ -115,7 +115,8 @@ func (uc *CreateMariadbBackupUsecase) buildMariadbDumpArgs(
if mdb.HasPrivilege("TRIGGER") {
args = append(args, "--triggers")
}
if mdb.HasPrivilege("EVENT") {
if mdb.HasPrivilege("EVENT") && !mdb.IsExcludeEvents {
args = append(args, "--events")
}

View File

@@ -25,13 +25,14 @@ type MariadbDatabase struct {
Version tools.MariadbVersion `json:"version" gorm:"type:text;not null"`
Host string `json:"host" gorm:"type:text;not null"`
Port int `json:"port" gorm:"type:int;not null"`
Username string `json:"username" gorm:"type:text;not null"`
Password string `json:"password" gorm:"type:text;not null"`
Database *string `json:"database" gorm:"type:text"`
IsHttps bool `json:"isHttps" gorm:"type:boolean;default:false"`
Privileges string `json:"privileges" gorm:"column:privileges;type:text;not null;default:''"`
Host string `json:"host" gorm:"type:text;not null"`
Port int `json:"port" gorm:"type:int;not null"`
Username string `json:"username" gorm:"type:text;not null"`
Password string `json:"password" gorm:"type:text;not null"`
Database *string `json:"database" gorm:"type:text"`
IsHttps bool `json:"isHttps" gorm:"type:boolean;default:false"`
IsExcludeEvents bool `json:"isExcludeEvents" gorm:"type:boolean;default:false"`
Privileges string `json:"privileges" gorm:"column:privileges;type:text;not null;default:''"`
}
func (m *MariadbDatabase) TableName() string {
@@ -124,6 +125,7 @@ func (m *MariadbDatabase) Update(incoming *MariadbDatabase) {
m.Username = incoming.Username
m.Database = incoming.Database
m.IsHttps = incoming.IsHttps
m.IsExcludeEvents = incoming.IsExcludeEvents
m.Privileges = incoming.Privileges
if incoming.Password != "" {

View File

@@ -564,12 +564,23 @@ func (p *PostgresqlDatabase) CreateReadOnlyUser(
logger.Warn("Failed to revoke TEMP privilege", "error", err, "username", baseUsername)
}
// Step 4: Discover all user-created schemas
rows, err := tx.Query(ctx, `
SELECT schema_name
FROM information_schema.schemata
WHERE schema_name NOT IN ('pg_catalog', 'information_schema')
`)
// Step 4: Discover schemas to grant privileges on
// If IncludeSchemas is specified, only use those schemas; otherwise use all non-system schemas
var rows pgx.Rows
if len(p.IncludeSchemas) > 0 {
rows, err = tx.Query(ctx, `
SELECT schema_name
FROM information_schema.schemata
WHERE schema_name NOT IN ('pg_catalog', 'information_schema')
AND schema_name = ANY($1::text[])
`, p.IncludeSchemas)
} else {
rows, err = tx.Query(ctx, `
SELECT schema_name
FROM information_schema.schemata
WHERE schema_name NOT IN ('pg_catalog', 'information_schema')
`)
}
if err != nil {
return "", "", fmt.Errorf("failed to get schemas: %w", err)
}
@@ -619,50 +630,197 @@ func (p *PostgresqlDatabase) CreateReadOnlyUser(
}
// Step 6: Grant SELECT on ALL existing tables and sequences
grantSelectSQL := fmt.Sprintf(`
DO $$
DECLARE
schema_rec RECORD;
BEGIN
FOR schema_rec IN
SELECT schema_name
FROM information_schema.schemata
WHERE schema_name NOT IN ('pg_catalog', 'information_schema')
LOOP
EXECUTE format('GRANT SELECT ON ALL TABLES IN SCHEMA %%I TO "%s"', schema_rec.schema_name);
EXECUTE format('GRANT SELECT ON ALL SEQUENCES IN SCHEMA %%I TO "%s"', schema_rec.schema_name);
END LOOP;
END $$;
`, baseUsername, baseUsername)
// Use the already-filtered schemas list from Step 4
for _, schema := range schemas {
_, err = tx.Exec(
ctx,
fmt.Sprintf(
`GRANT SELECT ON ALL TABLES IN SCHEMA "%s" TO "%s"`,
schema,
baseUsername,
),
)
if err != nil {
return "", "", fmt.Errorf(
"failed to grant select on tables in schema %s: %w",
schema,
err,
)
}
_, err = tx.Exec(ctx, grantSelectSQL)
if err != nil {
return "", "", fmt.Errorf("failed to grant select on tables: %w", err)
_, err = tx.Exec(
ctx,
fmt.Sprintf(
`GRANT SELECT ON ALL SEQUENCES IN SCHEMA "%s" TO "%s"`,
schema,
baseUsername,
),
)
if err != nil {
return "", "", fmt.Errorf(
"failed to grant select on sequences in schema %s: %w",
schema,
err,
)
}
}
// Step 7: Set default privileges for FUTURE tables and sequences
defaultPrivilegesSQL := fmt.Sprintf(`
DO $$
DECLARE
schema_rec RECORD;
BEGIN
FOR schema_rec IN
SELECT schema_name
FROM information_schema.schemata
WHERE schema_name NOT IN ('pg_catalog', 'information_schema')
LOOP
EXECUTE format('ALTER DEFAULT PRIVILEGES IN SCHEMA %%I GRANT SELECT ON TABLES TO "%s"', schema_rec.schema_name);
EXECUTE format('ALTER DEFAULT PRIVILEGES IN SCHEMA %%I GRANT SELECT ON SEQUENCES TO "%s"', schema_rec.schema_name);
END LOOP;
END $$;
`, baseUsername, baseUsername)
// First, set default privileges for objects created by the current user
// Use the already-filtered schemas list from Step 4
for _, schema := range schemas {
_, err = tx.Exec(
ctx,
fmt.Sprintf(
`ALTER DEFAULT PRIVILEGES IN SCHEMA "%s" GRANT SELECT ON TABLES TO "%s"`,
schema,
baseUsername,
),
)
if err != nil {
return "", "", fmt.Errorf(
"failed to set default privileges for tables in schema %s: %w",
schema,
err,
)
}
_, err = tx.Exec(ctx, defaultPrivilegesSQL)
if err != nil {
return "", "", fmt.Errorf("failed to set default privileges: %w", err)
_, err = tx.Exec(
ctx,
fmt.Sprintf(
`ALTER DEFAULT PRIVILEGES IN SCHEMA "%s" GRANT SELECT ON SEQUENCES TO "%s"`,
schema,
baseUsername,
),
)
if err != nil {
return "", "", fmt.Errorf(
"failed to set default privileges for sequences in schema %s: %w",
schema,
err,
)
}
}
// Step 8: Verify user creation before committing
// Step 8: Discover all roles that own objects in each schema
// This is needed because ALTER DEFAULT PRIVILEGES only applies to objects created by the current role.
// To handle tables created by OTHER users (like the GitHub issue with partitioned tables),
// we need to set "ALTER DEFAULT PRIVILEGES FOR ROLE <owner>" for each object owner.
// Filter by IncludeSchemas if specified.
type SchemaOwner struct {
SchemaName string
RoleName string
}
var ownerRows pgx.Rows
if len(p.IncludeSchemas) > 0 {
ownerRows, err = tx.Query(ctx, `
SELECT DISTINCT n.nspname as schema_name, pg_get_userbyid(c.relowner) as role_name
FROM pg_class c
JOIN pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast')
AND n.nspname = ANY($1::text[])
AND c.relkind IN ('r', 'p', 'v', 'm', 'f')
AND pg_get_userbyid(c.relowner) != current_user
ORDER BY n.nspname, role_name
`, p.IncludeSchemas)
} else {
ownerRows, err = tx.Query(ctx, `
SELECT DISTINCT n.nspname as schema_name, pg_get_userbyid(c.relowner) as role_name
FROM pg_class c
JOIN pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast')
AND c.relkind IN ('r', 'p', 'v', 'm', 'f')
AND pg_get_userbyid(c.relowner) != current_user
ORDER BY n.nspname, role_name
`)
}
if err != nil {
// Log warning but continue - this is a best-effort enhancement
logger.Warn("Failed to query object owners for default privileges", "error", err)
} else {
var schemaOwners []SchemaOwner
for ownerRows.Next() {
var so SchemaOwner
if err := ownerRows.Scan(&so.SchemaName, &so.RoleName); err != nil {
ownerRows.Close()
logger.Warn("Failed to scan schema owner", "error", err)
break
}
schemaOwners = append(schemaOwners, so)
}
ownerRows.Close()
if err := ownerRows.Err(); err != nil {
logger.Warn("Error iterating schema owners", "error", err)
}
// Step 9: Set default privileges FOR ROLE for each object owner
// Note: This may fail for some roles due to permission issues (e.g., roles owned by other superusers)
// We log warnings but continue - user creation should succeed even if some roles can't be configured
for _, so := range schemaOwners {
// Try to set default privileges for tables
_, err = tx.Exec(
ctx,
fmt.Sprintf(
`ALTER DEFAULT PRIVILEGES FOR ROLE "%s" IN SCHEMA "%s" GRANT SELECT ON TABLES TO "%s"`,
so.RoleName,
so.SchemaName,
baseUsername,
),
)
if err != nil {
logger.Warn(
"Failed to set default privileges for role (tables)",
"error",
err,
"role",
so.RoleName,
"schema",
so.SchemaName,
"readonly_user",
baseUsername,
)
}
// Try to set default privileges for sequences
_, err = tx.Exec(
ctx,
fmt.Sprintf(
`ALTER DEFAULT PRIVILEGES FOR ROLE "%s" IN SCHEMA "%s" GRANT SELECT ON SEQUENCES TO "%s"`,
so.RoleName,
so.SchemaName,
baseUsername,
),
)
if err != nil {
logger.Warn(
"Failed to set default privileges for role (sequences)",
"error",
err,
"role",
so.RoleName,
"schema",
so.SchemaName,
"readonly_user",
baseUsername,
)
}
}
if len(schemaOwners) > 0 {
logger.Info(
"Set default privileges for existing object owners",
"readonly_user",
baseUsername,
"owner_count",
len(schemaOwners),
)
}
}
// Step 10: Verify user creation before committing
var verifyUsername string
err = tx.QueryRow(ctx, fmt.Sprintf(`SELECT rolname FROM pg_roles WHERE rolname = '%s'`, baseUsername)).
Scan(&verifyUsername)

View File

@@ -1319,6 +1319,346 @@ type PostgresContainer struct {
DB *sqlx.DB
}
func Test_CreateReadOnlyUser_TablesCreatedByDifferentUser_ReadOnlyUserCanRead(t *testing.T) {
env := config.GetEnv()
container := connectToPostgresContainer(t, env.TestPostgres16Port)
defer container.DB.Close()
// Step 1: Create a second database user who will create tables
userCreatorUsername := fmt.Sprintf("user_creator_%s", uuid.New().String()[:8])
userCreatorPassword := "creator_password_123"
_, err := container.DB.Exec(fmt.Sprintf(
`CREATE USER "%s" WITH PASSWORD '%s' LOGIN`,
userCreatorUsername,
userCreatorPassword,
))
assert.NoError(t, err)
defer func() {
_, _ = container.DB.Exec(fmt.Sprintf(`DROP OWNED BY "%s" CASCADE`, userCreatorUsername))
_, _ = container.DB.Exec(fmt.Sprintf(`DROP USER IF EXISTS "%s"`, userCreatorUsername))
}()
// Step 2: Grant the user_creator privileges to connect and create tables
_, err = container.DB.Exec(fmt.Sprintf(
`GRANT CONNECT ON DATABASE "%s" TO "%s"`,
container.Database,
userCreatorUsername,
))
assert.NoError(t, err)
_, err = container.DB.Exec(fmt.Sprintf(
`GRANT USAGE ON SCHEMA public TO "%s"`,
userCreatorUsername,
))
assert.NoError(t, err)
_, err = container.DB.Exec(fmt.Sprintf(
`GRANT CREATE ON SCHEMA public TO "%s"`,
userCreatorUsername,
))
assert.NoError(t, err)
// Step 2b: Create an initial table by user_creator so they become an object owner
// This is important because our fix discovers existing object owners
userCreatorDSN := fmt.Sprintf(
"host=%s port=%d user=%s password=%s dbname=%s sslmode=disable",
container.Host,
container.Port,
userCreatorUsername,
userCreatorPassword,
container.Database,
)
userCreatorConn, err := sqlx.Connect("postgres", userCreatorDSN)
assert.NoError(t, err)
defer userCreatorConn.Close()
initialTableName := fmt.Sprintf(
"public.initial_table_%s",
strings.ReplaceAll(uuid.New().String()[:8], "-", ""),
)
_, err = userCreatorConn.Exec(fmt.Sprintf(`
CREATE TABLE %s (
id SERIAL PRIMARY KEY,
data TEXT NOT NULL
);
INSERT INTO %s (data) VALUES ('initial_data');
`, initialTableName, initialTableName))
assert.NoError(t, err)
defer func() {
_, _ = container.DB.Exec(fmt.Sprintf(`DROP TABLE IF EXISTS %s CASCADE`, initialTableName))
}()
// Step 3: NOW create read-only user via Databasus (as admin)
// At this point, user_creator already owns objects, so ALTER DEFAULT PRIVILEGES FOR ROLE should apply
pgModel := createPostgresModel(container)
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
ctx := context.Background()
readonlyUsername, readonlyPassword, err := pgModel.CreateReadOnlyUser(
ctx,
logger,
nil,
uuid.New(),
)
assert.NoError(t, err)
assert.NotEmpty(t, readonlyUsername)
assert.NotEmpty(t, readonlyPassword)
defer func() {
_, _ = container.DB.Exec(fmt.Sprintf(`DROP OWNED BY "%s" CASCADE`, readonlyUsername))
_, _ = container.DB.Exec(fmt.Sprintf(`DROP USER IF EXISTS "%s"`, readonlyUsername))
}()
// Step 4: user_creator creates a NEW table AFTER the read-only user was created
// This table should automatically grant SELECT to the read-only user via ALTER DEFAULT PRIVILEGES FOR ROLE
tableName := fmt.Sprintf(
"public.future_table_%s",
strings.ReplaceAll(uuid.New().String()[:8], "-", ""),
)
_, err = userCreatorConn.Exec(fmt.Sprintf(`
CREATE TABLE %s (
id SERIAL PRIMARY KEY,
data TEXT NOT NULL
);
INSERT INTO %s (data) VALUES ('test_data_1'), ('test_data_2');
`, tableName, tableName))
assert.NoError(t, err)
defer func() {
_, _ = container.DB.Exec(fmt.Sprintf(`DROP TABLE IF EXISTS %s CASCADE`, tableName))
}()
// Step 5: Connect as read-only user and verify it can SELECT from the new table
readonlyDSN := fmt.Sprintf(
"host=%s port=%d user=%s password=%s dbname=%s sslmode=disable",
container.Host,
container.Port,
readonlyUsername,
readonlyPassword,
container.Database,
)
readonlyConn, err := sqlx.Connect("postgres", readonlyDSN)
assert.NoError(t, err)
defer readonlyConn.Close()
var count int
err = readonlyConn.Get(&count, fmt.Sprintf("SELECT COUNT(*) FROM %s", tableName))
assert.NoError(t, err)
assert.Equal(
t,
2,
count,
"Read-only user should be able to SELECT from table created by different user",
)
// Step 6: Verify read-only user cannot write to the table
_, err = readonlyConn.Exec(
fmt.Sprintf("INSERT INTO %s (data) VALUES ('should-fail')", tableName),
)
assert.Error(t, err)
assert.Contains(t, err.Error(), "permission denied")
// Step 7: Verify pg_dump operations (LOCK TABLE) work
// pg_dump needs to lock tables in ACCESS SHARE MODE for consistent backup
tx, err := readonlyConn.Begin()
assert.NoError(t, err)
defer tx.Rollback()
_, err = tx.Exec(fmt.Sprintf("LOCK TABLE %s IN ACCESS SHARE MODE", tableName))
assert.NoError(t, err, "Read-only user should be able to LOCK TABLE (needed for pg_dump)")
err = tx.Commit()
assert.NoError(t, err)
}
func Test_CreateReadOnlyUser_WithIncludeSchemas_OnlyGrantsAccessToSpecifiedSchemas(t *testing.T) {
env := config.GetEnv()
container := connectToPostgresContainer(t, env.TestPostgres16Port)
defer container.DB.Close()
// Step 1: Create multiple schemas and tables
_, err := container.DB.Exec(`
DROP SCHEMA IF EXISTS included_schema CASCADE;
DROP SCHEMA IF EXISTS excluded_schema CASCADE;
CREATE SCHEMA included_schema;
CREATE SCHEMA excluded_schema;
CREATE TABLE public.public_table (id INT, data TEXT);
INSERT INTO public.public_table VALUES (1, 'public_data');
CREATE TABLE included_schema.included_table (id INT, data TEXT);
INSERT INTO included_schema.included_table VALUES (2, 'included_data');
CREATE TABLE excluded_schema.excluded_table (id INT, data TEXT);
INSERT INTO excluded_schema.excluded_table VALUES (3, 'excluded_data');
`)
assert.NoError(t, err)
defer func() {
_, _ = container.DB.Exec(`DROP SCHEMA IF EXISTS included_schema CASCADE`)
_, _ = container.DB.Exec(`DROP SCHEMA IF EXISTS excluded_schema CASCADE`)
}()
// Step 2: Create a second user who owns tables in both included and excluded schemas
userCreatorUsername := fmt.Sprintf("user_creator_%s", uuid.New().String()[:8])
userCreatorPassword := "creator_password_123"
_, err = container.DB.Exec(fmt.Sprintf(
`CREATE USER "%s" WITH PASSWORD '%s' LOGIN`,
userCreatorUsername,
userCreatorPassword,
))
assert.NoError(t, err)
defer func() {
_, _ = container.DB.Exec(fmt.Sprintf(`DROP OWNED BY "%s" CASCADE`, userCreatorUsername))
_, _ = container.DB.Exec(fmt.Sprintf(`DROP USER IF EXISTS "%s"`, userCreatorUsername))
}()
// Grant privileges to user_creator
_, err = container.DB.Exec(fmt.Sprintf(
`GRANT CONNECT ON DATABASE "%s" TO "%s"`,
container.Database,
userCreatorUsername,
))
assert.NoError(t, err)
for _, schema := range []string{"public", "included_schema", "excluded_schema"} {
_, err = container.DB.Exec(fmt.Sprintf(
`GRANT USAGE, CREATE ON SCHEMA %s TO "%s"`,
schema,
userCreatorUsername,
))
assert.NoError(t, err)
}
// User_creator creates tables in included and excluded schemas
userCreatorDSN := fmt.Sprintf(
"host=%s port=%d user=%s password=%s dbname=%s sslmode=disable",
container.Host,
container.Port,
userCreatorUsername,
userCreatorPassword,
container.Database,
)
userCreatorConn, err := sqlx.Connect("postgres", userCreatorDSN)
assert.NoError(t, err)
defer userCreatorConn.Close()
_, err = userCreatorConn.Exec(`
CREATE TABLE included_schema.user_table (id INT, data TEXT);
INSERT INTO included_schema.user_table VALUES (4, 'user_included_data');
CREATE TABLE excluded_schema.user_excluded_table (id INT, data TEXT);
INSERT INTO excluded_schema.user_excluded_table VALUES (5, 'user_excluded_data');
`)
assert.NoError(t, err)
// Step 3: Create read-only user with IncludeSchemas = ["public", "included_schema"]
pgModel := createPostgresModel(container)
pgModel.IncludeSchemas = []string{"public", "included_schema"}
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
ctx := context.Background()
readonlyUsername, readonlyPassword, err := pgModel.CreateReadOnlyUser(
ctx,
logger,
nil,
uuid.New(),
)
assert.NoError(t, err)
assert.NotEmpty(t, readonlyUsername)
assert.NotEmpty(t, readonlyPassword)
defer func() {
_, _ = container.DB.Exec(fmt.Sprintf(`DROP OWNED BY "%s" CASCADE`, readonlyUsername))
_, _ = container.DB.Exec(fmt.Sprintf(`DROP USER IF EXISTS "%s"`, readonlyUsername))
}()
// Step 4: Connect as read-only user
readonlyDSN := fmt.Sprintf(
"host=%s port=%d user=%s password=%s dbname=%s sslmode=disable",
container.Host,
container.Port,
readonlyUsername,
readonlyPassword,
container.Database,
)
readonlyConn, err := sqlx.Connect("postgres", readonlyDSN)
assert.NoError(t, err)
defer readonlyConn.Close()
// Step 5: Verify read-only user CAN access included schemas
var publicData string
err = readonlyConn.Get(&publicData, "SELECT data FROM public.public_table LIMIT 1")
assert.NoError(t, err)
assert.Equal(t, "public_data", publicData)
var includedData string
err = readonlyConn.Get(&includedData, "SELECT data FROM included_schema.included_table LIMIT 1")
assert.NoError(t, err)
assert.Equal(t, "included_data", includedData)
var userIncludedData string
err = readonlyConn.Get(&userIncludedData, "SELECT data FROM included_schema.user_table LIMIT 1")
assert.NoError(t, err)
assert.Equal(t, "user_included_data", userIncludedData)
// Step 6: Verify read-only user CANNOT access excluded schema
var excludedData string
err = readonlyConn.Get(&excludedData, "SELECT data FROM excluded_schema.excluded_table LIMIT 1")
assert.Error(t, err)
assert.Contains(t, err.Error(), "permission denied")
err = readonlyConn.Get(
&excludedData,
"SELECT data FROM excluded_schema.user_excluded_table LIMIT 1",
)
assert.Error(t, err)
assert.Contains(t, err.Error(), "permission denied")
// Step 7: Verify future tables in included schemas are accessible
_, err = userCreatorConn.Exec(`
CREATE TABLE included_schema.future_table (id INT, data TEXT);
INSERT INTO included_schema.future_table VALUES (6, 'future_data');
`)
assert.NoError(t, err)
var futureData string
err = readonlyConn.Get(&futureData, "SELECT data FROM included_schema.future_table LIMIT 1")
assert.NoError(t, err)
assert.Equal(
t,
"future_data",
futureData,
"Read-only user should access future tables in included schemas via ALTER DEFAULT PRIVILEGES FOR ROLE",
)
// Step 8: Verify future tables in excluded schema are NOT accessible
_, err = userCreatorConn.Exec(`
CREATE TABLE excluded_schema.future_excluded_table (id INT, data TEXT);
INSERT INTO excluded_schema.future_excluded_table VALUES (7, 'future_excluded_data');
`)
assert.NoError(t, err)
var futureExcludedData string
err = readonlyConn.Get(
&futureExcludedData,
"SELECT data FROM excluded_schema.future_excluded_table LIMIT 1",
)
assert.Error(t, err)
assert.Contains(
t,
err.Error(),
"permission denied",
"Read-only user should NOT access tables in excluded schemas",
)
}
func connectToPostgresContainer(t *testing.T, port string) *PostgresContainer {
dbName := "testdb"
password := "testpassword"

View File

@@ -147,6 +147,26 @@ func Test_BackupAndRestoreMariadb_WithReadOnlyUser_RestoreIsSuccessful(t *testin
}
}
func Test_BackupAndRestoreMariadb_WithExcludeEvents_EventsNotRestored(t *testing.T) {
env := config.GetEnv()
cases := []struct {
name string
version tools.MariadbVersion
port string
}{
{"MariaDB 10.5", tools.MariadbVersion105, env.TestMariadb105Port},
{"MariaDB 10.11", tools.MariadbVersion1011, env.TestMariadb1011Port},
{"MariaDB 11.4", tools.MariadbVersion114, env.TestMariadb114Port},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
testMariadbBackupRestoreWithExcludeEventsForVersion(t, tc.version, tc.port)
})
}
}
func testMariadbBackupRestoreForVersion(
t *testing.T,
mariadbVersion tools.MariadbVersion,
@@ -702,3 +722,145 @@ func updateMariadbDatabaseCredentialsViaAPI(
return &updatedDatabase
}
func testMariadbBackupRestoreWithExcludeEventsForVersion(
t *testing.T,
mariadbVersion tools.MariadbVersion,
port string,
) {
container, err := connectToMariadbContainer(mariadbVersion, port)
if err != nil {
t.Skipf("Skipping MariaDB %s test: %v", mariadbVersion, err)
return
}
defer func() {
if container.DB != nil {
container.DB.Close()
}
}()
setupMariadbTestData(t, container.DB)
_, err = container.DB.Exec(`
CREATE EVENT IF NOT EXISTS test_event
ON SCHEDULE EVERY 1 DAY
DO BEGIN
INSERT INTO test_data (name, value) VALUES ('event_test', 999);
END
`)
if err != nil {
t.Skipf(
"Skipping test: MariaDB version doesn't support events or event scheduler disabled: %v",
err,
)
return
}
router := createTestRouter()
user := users_testing.CreateTestUser(users_enums.UserRoleMember)
workspace := workspaces_testing.CreateTestWorkspace(
"MariaDB Exclude Events Test Workspace",
user,
router,
)
storage := storages.CreateTestStorage(workspace.ID)
database := createMariadbDatabaseViaAPI(
t, router, "MariaDB Exclude Events Test Database", workspace.ID,
container.Host, container.Port,
container.Username, container.Password, container.Database,
container.Version,
user.Token,
)
database.Mariadb.IsExcludeEvents = true
w := workspaces_testing.MakeAPIRequest(
router,
"POST",
"/api/v1/databases/update",
"Bearer "+user.Token,
database,
)
if w.Code != http.StatusOK {
t.Fatalf(
"Failed to update database with IsExcludeEvents. Status: %d, Body: %s",
w.Code,
w.Body.String(),
)
}
enableBackupsViaAPI(
t, router, database.ID, storage.ID,
backups_config.BackupEncryptionNone, user.Token,
)
createBackupViaAPI(t, router, database.ID, user.Token)
backup := waitForBackupCompletion(t, router, database.ID, user.Token, 5*time.Minute)
assert.Equal(t, backups_core.BackupStatusCompleted, backup.Status)
newDBName := "restoreddb_mariadb_no_events"
_, err = container.DB.Exec(fmt.Sprintf("DROP DATABASE IF EXISTS %s;", newDBName))
assert.NoError(t, err)
_, err = container.DB.Exec(fmt.Sprintf("CREATE DATABASE %s;", newDBName))
assert.NoError(t, err)
newDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?parseTime=true",
container.Username, container.Password, container.Host, container.Port, newDBName)
newDB, err := sqlx.Connect("mysql", newDSN)
assert.NoError(t, err)
defer newDB.Close()
createMariadbRestoreViaAPI(
t, router, backup.ID,
container.Host, container.Port,
container.Username, container.Password, newDBName,
container.Version,
user.Token,
)
restore := waitForMariadbRestoreCompletion(t, router, backup.ID, user.Token, 5*time.Minute)
assert.Equal(t, restores_core.RestoreStatusCompleted, restore.Status)
var tableExists int
err = newDB.Get(
&tableExists,
"SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = ? AND table_name = 'test_data'",
newDBName,
)
assert.NoError(t, err)
assert.Equal(t, 1, tableExists, "Table 'test_data' should exist in restored database")
verifyMariadbDataIntegrity(t, container.DB, newDB)
var eventCount int
err = newDB.Get(
&eventCount,
"SELECT COUNT(*) FROM information_schema.events WHERE event_schema = ? AND event_name = 'test_event'",
newDBName,
)
assert.NoError(t, err)
assert.Equal(
t,
0,
eventCount,
"Event 'test_event' should NOT exist in restored database when IsExcludeEvents is true",
)
err = os.Remove(filepath.Join(config.GetEnv().DataFolder, backup.ID.String()))
if err != nil {
t.Logf("Warning: Failed to delete backup file: %v", err)
}
test_utils.MakeDeleteRequest(
t,
router,
"/api/v1/databases/"+database.ID.String(),
"Bearer "+user.Token,
http.StatusNoContent,
)
storages.RemoveTestStorage(storage.ID)
workspaces_testing.RemoveTestWorkspace(workspace, router)
}

View File

@@ -0,0 +1,11 @@
-- +goose Up
-- +goose StatementBegin
ALTER TABLE mariadb_databases
ADD COLUMN IF NOT EXISTS is_exclude_events BOOLEAN NOT NULL DEFAULT FALSE;
-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
ALTER TABLE mariadb_databases
DROP COLUMN IF EXISTS is_exclude_events;
-- +goose StatementEnd