feat: add methods for column existence check and codec management in SpanItem

This commit is contained in:
Nawaz Dhandala
2026-03-15 00:03:46 +00:00
parent dde1e89c34
commit 58024398cf
2 changed files with 132 additions and 44 deletions

View File

@@ -254,6 +254,60 @@ export default class AnalyticsDatabaseService<
);
}
public async doesColumnExist(columnName: string): Promise<boolean> {
const tableName: string = this.model.tableName;
const result: { data: Array<JSONObject> } = await (
await this.executeQuery(
`SELECT count() as cnt FROM system.columns WHERE database = currentDatabase() AND table = '${tableName}' AND name = '${columnName}'`,
)
).json();
const rows: Array<JSONObject> = result.data || [];
return rows.length > 0 && Number(rows[0]!.cnt) > 0;
}
public async getColumnCodec(columnName: string): Promise<string> {
const tableName: string = this.model.tableName;
const result: { data: Array<JSONObject> } = await (
await this.executeQuery(
`SELECT compression_codec FROM system.columns WHERE database = currentDatabase() AND table = '${tableName}' AND name = '${columnName}'`,
)
).json();
const rows: Array<JSONObject> = result.data || [];
if (rows.length === 0) {
return "";
}
return (rows[0]!.compression_codec as string) || "";
}
public async setColumnCodecIfNotSet(data: {
columnName: string;
columnType: string;
codec: string;
expectedCodecValue: string;
}): Promise<void> {
const tableName: string = this.model.tableName;
const currentCodec: string = await this.getColumnCodec(data.columnName);
if (currentCodec === data.expectedCodecValue) {
logger.info(
`${tableName}.${data.columnName} already has ${data.expectedCodecValue}, skipping`,
);
return;
}
await this.execute(
`ALTER TABLE ${tableName} MODIFY COLUMN ${data.columnName} ${data.columnType} CODEC(${data.codec}) SETTINGS mutations_sync=0`,
);
logger.info(
`Applied ${data.codec} codec to ${tableName}.${data.columnName} (async)`,
);
}
@CaptureSpan()
public async findBy(findBy: FindBy<TBaseModel>): Promise<Array<TBaseModel>> {
return await this._findBy(findBy);

View File

@@ -10,19 +10,28 @@ export default class AddSpanTableOptimizations extends DataMigrationBase {
}
public override async migrate(): Promise<void> {
// Step 1: Add hasException column (with its skip index added separately via addColumnInDatabase)
const model: Span = new Span();
const hasExceptionColumn: AnalyticsTableColumn | undefined =
model.tableColumns.find((item: AnalyticsTableColumn) => {
return item.key === "hasException";
});
// Step 1: Add hasException column if it doesn't exist
const hasColumnAlready: boolean =
await SpanService.doesColumnExist("hasException");
if (hasExceptionColumn) {
await SpanService.addColumnInDatabase(hasExceptionColumn);
logger.info("Added hasException column to SpanItem");
if (!hasColumnAlready) {
const model: Span = new Span();
const hasExceptionColumn: AnalyticsTableColumn | undefined =
model.tableColumns.find((item: AnalyticsTableColumn) => {
return item.key === "hasException";
});
if (hasExceptionColumn) {
await SpanService.addColumnInDatabase(hasExceptionColumn);
logger.info("Added hasException column to SpanItem");
}
} else {
logger.info(
"hasException column already exists on SpanItem, skipping add",
);
}
// Step 2: Add skip indexes on kind and parentSpanId
// Step 2: Add skip indexes on kind and parentSpanId (IF NOT EXISTS makes these safe to re-run)
await SpanService.execute(
`ALTER TABLE SpanItem ADD INDEX IF NOT EXISTS idx_kind assumeNotNull(kind) TYPE set(5) GRANULARITY 4`,
);
@@ -34,30 +43,35 @@ export default class AddSpanTableOptimizations extends DataMigrationBase {
logger.info("Added skip index idx_parent_span_id on SpanItem");
/*
* Step 3: Apply compression codecs
* Step 3: Apply compression codecs (only if not already applied)
* Use mutations_sync=0 so these operations return immediately and complete asynchronously.
* On large tables (76GB+), synchronous MODIFY COLUMN CODEC would time out.
*/
await SpanService.execute(
`ALTER TABLE SpanItem MODIFY COLUMN startTimeUnixNano Int128 CODEC(ZSTD(1)) SETTINGS mutations_sync=0`,
);
logger.info("Applied ZSTD(1) codec to SpanItem.startTimeUnixNano (async)");
await SpanService.execute(
`ALTER TABLE SpanItem MODIFY COLUMN endTimeUnixNano Int128 CODEC(ZSTD(1)) SETTINGS mutations_sync=0`,
);
logger.info("Applied ZSTD(1) codec to SpanItem.endTimeUnixNano (async)");
await SpanService.execute(
`ALTER TABLE SpanItem MODIFY COLUMN durationUnixNano Int128 CODEC(ZSTD(1)) SETTINGS mutations_sync=0`,
);
logger.info("Applied ZSTD(1) codec to SpanItem.durationUnixNano (async)");
await SpanService.setColumnCodecIfNotSet({
columnName: "startTimeUnixNano",
columnType: "Int128",
codec: "ZSTD(1)",
expectedCodecValue: "CODEC(ZSTD(1))",
});
await SpanService.setColumnCodecIfNotSet({
columnName: "endTimeUnixNano",
columnType: "Int128",
codec: "ZSTD(1)",
expectedCodecValue: "CODEC(ZSTD(1))",
});
await SpanService.setColumnCodecIfNotSet({
columnName: "durationUnixNano",
columnType: "Int128",
codec: "ZSTD(1)",
expectedCodecValue: "CODEC(ZSTD(1))",
});
// traceId and spanId have bloom_filter indexes — must drop index, apply codec, re-add index
await this.applyCodecWithIndex(
"traceId",
"String",
"ZSTD(1)",
"CODEC(ZSTD(1))",
"idx_trace_id",
"traceId",
"bloom_filter(0.01)",
@@ -68,31 +82,37 @@ export default class AddSpanTableOptimizations extends DataMigrationBase {
"spanId",
"String",
"ZSTD(1)",
"CODEC(ZSTD(1))",
"idx_span_id",
"spanId",
"bloom_filter(0.01)",
1,
);
await SpanService.execute(
`ALTER TABLE SpanItem MODIFY COLUMN parentSpanId Nullable(String) CODEC(ZSTD(1)) SETTINGS mutations_sync=0`,
);
logger.info("Applied ZSTD(1) codec to SpanItem.parentSpanId (async)");
await SpanService.execute(
`ALTER TABLE SpanItem MODIFY COLUMN attributes String CODEC(ZSTD(3)) SETTINGS mutations_sync=0`,
);
logger.info("Applied ZSTD(3) codec to SpanItem.attributes (async)");
await SpanService.execute(
`ALTER TABLE SpanItem MODIFY COLUMN events String CODEC(ZSTD(3)) SETTINGS mutations_sync=0`,
);
logger.info("Applied ZSTD(3) codec to SpanItem.events (async)");
await SpanService.execute(
`ALTER TABLE SpanItem MODIFY COLUMN links String CODEC(ZSTD(3)) SETTINGS mutations_sync=0`,
);
logger.info("Applied ZSTD(3) codec to SpanItem.links (async)");
await SpanService.setColumnCodecIfNotSet({
columnName: "parentSpanId",
columnType: "Nullable(String)",
codec: "ZSTD(1)",
expectedCodecValue: "CODEC(ZSTD(1))",
});
await SpanService.setColumnCodecIfNotSet({
columnName: "attributes",
columnType: "String",
codec: "ZSTD(3)",
expectedCodecValue: "CODEC(ZSTD(3))",
});
await SpanService.setColumnCodecIfNotSet({
columnName: "events",
columnType: "String",
codec: "ZSTD(3)",
expectedCodecValue: "CODEC(ZSTD(3))",
});
await SpanService.setColumnCodecIfNotSet({
columnName: "links",
columnType: "String",
codec: "ZSTD(3)",
expectedCodecValue: "CODEC(ZSTD(3))",
});
// Step 4: Backfill hasException for existing rows containing exception events
await SpanService.execute(
@@ -107,11 +127,25 @@ export default class AddSpanTableOptimizations extends DataMigrationBase {
columnName: string,
columnType: string,
codec: string,
expectedCodecValue: string,
indexName: string,
indexExpr: string,
indexType: string,
granularity: number,
): Promise<void> {
const currentCodec: string = await SpanService.getColumnCodec(columnName);
if (currentCodec === expectedCodecValue) {
logger.info(
`SpanItem.${columnName} already has ${expectedCodecValue}, skipping codec change`,
);
// Still ensure the index exists (IF NOT EXISTS is safe to re-run)
await SpanService.execute(
`ALTER TABLE SpanItem ADD INDEX IF NOT EXISTS ${indexName} ${indexExpr} TYPE ${indexType} GRANULARITY ${granularity}`,
);
return;
}
// Drop the index first so the column can be modified (async to avoid timeout)
await SpanService.execute(
`ALTER TABLE SpanItem DROP INDEX IF EXISTS ${indexName} SETTINGS mutations_sync=0`,