From 58024398cf831b0a98da36fc999545805d91041e Mon Sep 17 00:00:00 2001 From: Nawaz Dhandala Date: Sun, 15 Mar 2026 00:03:46 +0000 Subject: [PATCH] feat: add methods for column existence check and codec management in SpanItem --- .../Services/AnalyticsDatabaseService.ts | 54 ++++++++ .../AddSpanTableOptimizations.ts | 122 +++++++++++------- 2 files changed, 132 insertions(+), 44 deletions(-) diff --git a/Common/Server/Services/AnalyticsDatabaseService.ts b/Common/Server/Services/AnalyticsDatabaseService.ts index 5c885d3c1c..c85a8e5d86 100644 --- a/Common/Server/Services/AnalyticsDatabaseService.ts +++ b/Common/Server/Services/AnalyticsDatabaseService.ts @@ -254,6 +254,60 @@ export default class AnalyticsDatabaseService< ); } + public async doesColumnExist(columnName: string): Promise { + const tableName: string = this.model.tableName; + const result: { data: Array } = await ( + await this.executeQuery( + `SELECT count() as cnt FROM system.columns WHERE database = currentDatabase() AND table = '${tableName}' AND name = '${columnName}'`, + ) + ).json(); + + const rows: Array = result.data || []; + + return rows.length > 0 && Number(rows[0]!.cnt) > 0; + } + + public async getColumnCodec(columnName: string): Promise { + const tableName: string = this.model.tableName; + const result: { data: Array } = await ( + await this.executeQuery( + `SELECT compression_codec FROM system.columns WHERE database = currentDatabase() AND table = '${tableName}' AND name = '${columnName}'`, + ) + ).json(); + + const rows: Array = result.data || []; + + if (rows.length === 0) { + return ""; + } + + return (rows[0]!.compression_codec as string) || ""; + } + + public async setColumnCodecIfNotSet(data: { + columnName: string; + columnType: string; + codec: string; + expectedCodecValue: string; + }): Promise { + const tableName: string = this.model.tableName; + const currentCodec: string = await this.getColumnCodec(data.columnName); + + if (currentCodec === data.expectedCodecValue) { + logger.info( + `${tableName}.${data.columnName} already has ${data.expectedCodecValue}, skipping`, + ); + return; + } + + await this.execute( + `ALTER TABLE ${tableName} MODIFY COLUMN ${data.columnName} ${data.columnType} CODEC(${data.codec}) SETTINGS mutations_sync=0`, + ); + logger.info( + `Applied ${data.codec} codec to ${tableName}.${data.columnName} (async)`, + ); + } + @CaptureSpan() public async findBy(findBy: FindBy): Promise> { return await this._findBy(findBy); diff --git a/Worker/DataMigrations/AddSpanTableOptimizations.ts b/Worker/DataMigrations/AddSpanTableOptimizations.ts index 388127f435..342d6e627d 100644 --- a/Worker/DataMigrations/AddSpanTableOptimizations.ts +++ b/Worker/DataMigrations/AddSpanTableOptimizations.ts @@ -10,19 +10,28 @@ export default class AddSpanTableOptimizations extends DataMigrationBase { } public override async migrate(): Promise { - // Step 1: Add hasException column (with its skip index added separately via addColumnInDatabase) - const model: Span = new Span(); - const hasExceptionColumn: AnalyticsTableColumn | undefined = - model.tableColumns.find((item: AnalyticsTableColumn) => { - return item.key === "hasException"; - }); + // Step 1: Add hasException column if it doesn't exist + const hasColumnAlready: boolean = + await SpanService.doesColumnExist("hasException"); - if (hasExceptionColumn) { - await SpanService.addColumnInDatabase(hasExceptionColumn); - logger.info("Added hasException column to SpanItem"); + if (!hasColumnAlready) { + const model: Span = new Span(); + const hasExceptionColumn: AnalyticsTableColumn | undefined = + model.tableColumns.find((item: AnalyticsTableColumn) => { + return item.key === "hasException"; + }); + + if (hasExceptionColumn) { + await SpanService.addColumnInDatabase(hasExceptionColumn); + logger.info("Added hasException column to SpanItem"); + } + } else { + logger.info( + "hasException column already exists on SpanItem, skipping add", + ); } - // Step 2: Add skip indexes on kind and parentSpanId + // Step 2: Add skip indexes on kind and parentSpanId (IF NOT EXISTS makes these safe to re-run) await SpanService.execute( `ALTER TABLE SpanItem ADD INDEX IF NOT EXISTS idx_kind assumeNotNull(kind) TYPE set(5) GRANULARITY 4`, ); @@ -34,30 +43,35 @@ export default class AddSpanTableOptimizations extends DataMigrationBase { logger.info("Added skip index idx_parent_span_id on SpanItem"); /* - * Step 3: Apply compression codecs + * Step 3: Apply compression codecs (only if not already applied) * Use mutations_sync=0 so these operations return immediately and complete asynchronously. * On large tables (76GB+), synchronous MODIFY COLUMN CODEC would time out. */ - await SpanService.execute( - `ALTER TABLE SpanItem MODIFY COLUMN startTimeUnixNano Int128 CODEC(ZSTD(1)) SETTINGS mutations_sync=0`, - ); - logger.info("Applied ZSTD(1) codec to SpanItem.startTimeUnixNano (async)"); - - await SpanService.execute( - `ALTER TABLE SpanItem MODIFY COLUMN endTimeUnixNano Int128 CODEC(ZSTD(1)) SETTINGS mutations_sync=0`, - ); - logger.info("Applied ZSTD(1) codec to SpanItem.endTimeUnixNano (async)"); - - await SpanService.execute( - `ALTER TABLE SpanItem MODIFY COLUMN durationUnixNano Int128 CODEC(ZSTD(1)) SETTINGS mutations_sync=0`, - ); - logger.info("Applied ZSTD(1) codec to SpanItem.durationUnixNano (async)"); + await SpanService.setColumnCodecIfNotSet({ + columnName: "startTimeUnixNano", + columnType: "Int128", + codec: "ZSTD(1)", + expectedCodecValue: "CODEC(ZSTD(1))", + }); + await SpanService.setColumnCodecIfNotSet({ + columnName: "endTimeUnixNano", + columnType: "Int128", + codec: "ZSTD(1)", + expectedCodecValue: "CODEC(ZSTD(1))", + }); + await SpanService.setColumnCodecIfNotSet({ + columnName: "durationUnixNano", + columnType: "Int128", + codec: "ZSTD(1)", + expectedCodecValue: "CODEC(ZSTD(1))", + }); // traceId and spanId have bloom_filter indexes — must drop index, apply codec, re-add index await this.applyCodecWithIndex( "traceId", "String", "ZSTD(1)", + "CODEC(ZSTD(1))", "idx_trace_id", "traceId", "bloom_filter(0.01)", @@ -68,31 +82,37 @@ export default class AddSpanTableOptimizations extends DataMigrationBase { "spanId", "String", "ZSTD(1)", + "CODEC(ZSTD(1))", "idx_span_id", "spanId", "bloom_filter(0.01)", 1, ); - await SpanService.execute( - `ALTER TABLE SpanItem MODIFY COLUMN parentSpanId Nullable(String) CODEC(ZSTD(1)) SETTINGS mutations_sync=0`, - ); - logger.info("Applied ZSTD(1) codec to SpanItem.parentSpanId (async)"); - - await SpanService.execute( - `ALTER TABLE SpanItem MODIFY COLUMN attributes String CODEC(ZSTD(3)) SETTINGS mutations_sync=0`, - ); - logger.info("Applied ZSTD(3) codec to SpanItem.attributes (async)"); - - await SpanService.execute( - `ALTER TABLE SpanItem MODIFY COLUMN events String CODEC(ZSTD(3)) SETTINGS mutations_sync=0`, - ); - logger.info("Applied ZSTD(3) codec to SpanItem.events (async)"); - - await SpanService.execute( - `ALTER TABLE SpanItem MODIFY COLUMN links String CODEC(ZSTD(3)) SETTINGS mutations_sync=0`, - ); - logger.info("Applied ZSTD(3) codec to SpanItem.links (async)"); + await SpanService.setColumnCodecIfNotSet({ + columnName: "parentSpanId", + columnType: "Nullable(String)", + codec: "ZSTD(1)", + expectedCodecValue: "CODEC(ZSTD(1))", + }); + await SpanService.setColumnCodecIfNotSet({ + columnName: "attributes", + columnType: "String", + codec: "ZSTD(3)", + expectedCodecValue: "CODEC(ZSTD(3))", + }); + await SpanService.setColumnCodecIfNotSet({ + columnName: "events", + columnType: "String", + codec: "ZSTD(3)", + expectedCodecValue: "CODEC(ZSTD(3))", + }); + await SpanService.setColumnCodecIfNotSet({ + columnName: "links", + columnType: "String", + codec: "ZSTD(3)", + expectedCodecValue: "CODEC(ZSTD(3))", + }); // Step 4: Backfill hasException for existing rows containing exception events await SpanService.execute( @@ -107,11 +127,25 @@ export default class AddSpanTableOptimizations extends DataMigrationBase { columnName: string, columnType: string, codec: string, + expectedCodecValue: string, indexName: string, indexExpr: string, indexType: string, granularity: number, ): Promise { + const currentCodec: string = await SpanService.getColumnCodec(columnName); + + if (currentCodec === expectedCodecValue) { + logger.info( + `SpanItem.${columnName} already has ${expectedCodecValue}, skipping codec change`, + ); + // Still ensure the index exists (IF NOT EXISTS is safe to re-run) + await SpanService.execute( + `ALTER TABLE SpanItem ADD INDEX IF NOT EXISTS ${indexName} ${indexExpr} TYPE ${indexType} GRANULARITY ${granularity}`, + ); + return; + } + // Drop the index first so the column can be modified (async to avoid timeout) await SpanService.execute( `ALTER TABLE SpanItem DROP INDEX IF EXISTS ${indexName} SETTINGS mutations_sync=0`,