FIX (permissions check): Check permissions only in schemas selected for backup

This commit is contained in:
Rostislav Dugin
2026-01-12 14:22:12 +03:00
parent 245a81897f
commit 54b9e67656

View File

@@ -671,7 +671,7 @@ func testSingleDatabaseConnection(
postgresDb.Version = detectedVersion
// Verify user has sufficient permissions for backup operations
if err := checkBackupPermissions(ctx, conn, *postgresDb.Database); err != nil {
if err := checkBackupPermissions(ctx, conn, postgresDb.IncludeSchemas); err != nil {
return err
}
@@ -707,7 +707,12 @@ func detectDatabaseVersion(ctx context.Context, conn *pgx.Conn) (tools.Postgresq
// checkBackupPermissions verifies the user has sufficient privileges for pg_dump backup.
// Required privileges: CONNECT on database, USAGE on schemas, SELECT on tables.
func checkBackupPermissions(ctx context.Context, conn *pgx.Conn, dbName string) error {
// If includeSchemas is specified, only checks permissions on those schemas.
func checkBackupPermissions(
ctx context.Context,
conn *pgx.Conn,
includeSchemas []string,
) error {
var missingPrivileges []string
// Check CONNECT privilege on database
@@ -723,14 +728,29 @@ func checkBackupPermissions(ctx context.Context, conn *pgx.Conn, dbName string)
// Check USAGE privilege on at least one non-system schema
var schemaCount int
err = conn.QueryRow(ctx, `
SELECT COUNT(*)
FROM pg_namespace n
WHERE has_schema_privilege(current_user, n.nspname, 'USAGE')
AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast')
AND n.nspname NOT LIKE 'pg_temp_%'
AND n.nspname NOT LIKE 'pg_toast_temp_%'
`).Scan(&schemaCount)
if len(includeSchemas) > 0 {
// Check only the specified schemas
err = conn.QueryRow(ctx, `
SELECT COUNT(*)
FROM pg_namespace n
WHERE has_schema_privilege(current_user, n.nspname, 'USAGE')
AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast')
AND n.nspname NOT LIKE 'pg_temp_%'
AND n.nspname NOT LIKE 'pg_toast_temp_%'
AND n.nspname = ANY($1::text[])
`, includeSchemas).Scan(&schemaCount)
} else {
// Check all non-system schemas
err = conn.QueryRow(ctx, `
SELECT COUNT(*)
FROM pg_namespace n
WHERE has_schema_privilege(current_user, n.nspname, 'USAGE')
AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast')
AND n.nspname NOT LIKE 'pg_temp_%'
AND n.nspname NOT LIKE 'pg_toast_temp_%'
`).Scan(&schemaCount)
}
if err != nil {
return fmt.Errorf("cannot check schema privileges: %w", err)
}
@@ -741,11 +761,23 @@ func checkBackupPermissions(ctx context.Context, conn *pgx.Conn, dbName string)
// Check SELECT privilege on at least one table (if tables exist)
// Use pg_tables from pg_catalog which shows all tables regardless of user privileges
var tableCount int
err = conn.QueryRow(ctx, `
SELECT COUNT(*)
FROM pg_catalog.pg_tables t
WHERE t.schemaname NOT IN ('pg_catalog', 'information_schema')
`).Scan(&tableCount)
if len(includeSchemas) > 0 {
// Check only tables in the specified schemas
err = conn.QueryRow(ctx, `
SELECT COUNT(*)
FROM pg_catalog.pg_tables t
WHERE t.schemaname NOT IN ('pg_catalog', 'information_schema')
AND t.schemaname = ANY($1::text[])
`, includeSchemas).Scan(&tableCount)
} else {
// Check all tables in non-system schemas
err = conn.QueryRow(ctx, `
SELECT COUNT(*)
FROM pg_catalog.pg_tables t
WHERE t.schemaname NOT IN ('pg_catalog', 'information_schema')
`).Scan(&tableCount)
}
if err != nil {
return fmt.Errorf("cannot check table count: %w", err)
}
@@ -753,12 +785,25 @@ func checkBackupPermissions(ctx context.Context, conn *pgx.Conn, dbName string)
if tableCount > 0 {
// Check if user has SELECT on at least one of these tables
var selectableTableCount int
err = conn.QueryRow(ctx, `
SELECT COUNT(*)
FROM pg_catalog.pg_tables t
WHERE t.schemaname NOT IN ('pg_catalog', 'information_schema')
AND has_table_privilege(current_user, quote_ident(t.schemaname) || '.' || quote_ident(t.tablename), 'SELECT')
`).Scan(&selectableTableCount)
if len(includeSchemas) > 0 {
// Check only tables in the specified schemas
err = conn.QueryRow(ctx, `
SELECT COUNT(*)
FROM pg_catalog.pg_tables t
WHERE t.schemaname NOT IN ('pg_catalog', 'information_schema')
AND t.schemaname = ANY($1::text[])
AND has_table_privilege(current_user, quote_ident(t.schemaname) || '.' || quote_ident(t.tablename), 'SELECT')
`, includeSchemas).Scan(&selectableTableCount)
} else {
// Check all tables in non-system schemas
err = conn.QueryRow(ctx, `
SELECT COUNT(*)
FROM pg_catalog.pg_tables t
WHERE t.schemaname NOT IN ('pg_catalog', 'information_schema')
AND has_table_privilege(current_user, quote_ident(t.schemaname) || '.' || quote_ident(t.tablename), 'SELECT')
`).Scan(&selectableTableCount)
}
if err != nil {
return fmt.Errorf("cannot check SELECT privileges: %w", err)
}