mirror of
https://github.com/OneUptime/oneuptime.git
synced 2026-04-06 00:32:12 +02:00
feat: Add retentionDate column and TTL support to telemetry models
- Introduced retentionDate column to Log, Metric, Span, and MonitorLog models for per-service data retention. - Implemented TTL configuration using retentionDate to enable automatic data deletion in ClickHouse. - Added skip indexes for improved query performance on relevant columns in Log, Span, Metric, and Exception models. - Updated ingestion services to compute and store retentionDate based on service-specific retention settings. - Created a data migration to add retentionDate columns and skip indexes to existing telemetry tables. - Deprecated the cron job for data deletion, transitioning to ClickHouse's native TTL mechanism.
This commit is contained in:
@@ -45,6 +45,7 @@ export default class AnalyticsBaseModel extends CommonModel {
|
||||
projections?: Array<Projection> | undefined;
|
||||
materializedViews?: Array<MaterializedView> | undefined;
|
||||
enableMCP?: boolean | undefined;
|
||||
ttlExpression?: string | undefined; // e.g. "retentionDate DELETE"
|
||||
}) {
|
||||
super({
|
||||
tableColumns: data.tableColumns,
|
||||
@@ -148,6 +149,7 @@ export default class AnalyticsBaseModel extends CommonModel {
|
||||
this.projections = data.projections || [];
|
||||
this.materializedViews = data.materializedViews || [];
|
||||
this.enableMCP = data.enableMCP || false;
|
||||
this.ttlExpression = data.ttlExpression || "";
|
||||
}
|
||||
|
||||
private _enableWorkflowOn: EnableWorkflowOn | undefined;
|
||||
@@ -282,6 +284,14 @@ export default class AnalyticsBaseModel extends CommonModel {
|
||||
this._enableMCP = v;
|
||||
}
|
||||
|
||||
private _ttlExpression: string = "";
|
||||
public get ttlExpression(): string {
|
||||
return this._ttlExpression;
|
||||
}
|
||||
public set ttlExpression(v: string) {
|
||||
this._ttlExpression = v;
|
||||
}
|
||||
|
||||
public getTenantColumn(): AnalyticsTableColumn | null {
|
||||
const column: AnalyticsTableColumn | undefined = this.tableColumns.find(
|
||||
(column: AnalyticsTableColumn) => {
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
import AnalyticsBaseModel from "./AnalyticsBaseModel/AnalyticsBaseModel";
|
||||
import Route from "../../Types/API/Route";
|
||||
import AnalyticsTableEngine from "../../Types/AnalyticsDatabase/AnalyticsTableEngine";
|
||||
import AnalyticsTableColumn from "../../Types/AnalyticsDatabase/TableColumn";
|
||||
import AnalyticsTableColumn, {
|
||||
SkipIndexType,
|
||||
} from "../../Types/AnalyticsDatabase/TableColumn";
|
||||
import TableColumnType from "../../Types/AnalyticsDatabase/TableColumnType";
|
||||
import ObjectID from "../../Types/ObjectID";
|
||||
import Permission from "../../Types/Permission";
|
||||
@@ -108,6 +110,12 @@ export default class ExceptionInstance extends AnalyticsBaseModel {
|
||||
description: "Exception Type", // Examples: java.net.ConnectException; OSError; etc.
|
||||
required: false,
|
||||
type: TableColumnType.Text,
|
||||
skipIndex: {
|
||||
name: "idx_exception_type",
|
||||
type: SkipIndexType.BloomFilter,
|
||||
params: [0.01],
|
||||
granularity: 1,
|
||||
},
|
||||
accessControl: {
|
||||
read: [
|
||||
Permission.ProjectOwner,
|
||||
@@ -131,6 +139,10 @@ export default class ExceptionInstance extends AnalyticsBaseModel {
|
||||
description: "Exception Stack Trace", // Examples: Division by zero; Can't convert 'int' object to str implicitly
|
||||
required: false,
|
||||
type: TableColumnType.Text,
|
||||
codec: {
|
||||
codec: "ZSTD",
|
||||
level: 3,
|
||||
},
|
||||
accessControl: {
|
||||
read: [
|
||||
Permission.ProjectOwner,
|
||||
@@ -154,6 +166,10 @@ export default class ExceptionInstance extends AnalyticsBaseModel {
|
||||
description: "Exception Message", // Examples: Division by zero; Can't convert 'int' object to str implicitly
|
||||
required: false,
|
||||
type: TableColumnType.Text,
|
||||
codec: {
|
||||
codec: "ZSTD",
|
||||
level: 3,
|
||||
},
|
||||
accessControl: {
|
||||
read: [
|
||||
Permission.ProjectOwner,
|
||||
@@ -225,6 +241,12 @@ export default class ExceptionInstance extends AnalyticsBaseModel {
|
||||
description: "ID of the trace",
|
||||
required: false,
|
||||
type: TableColumnType.Text,
|
||||
skipIndex: {
|
||||
name: "idx_trace_id",
|
||||
type: SkipIndexType.BloomFilter,
|
||||
params: [0.01],
|
||||
granularity: 1,
|
||||
},
|
||||
accessControl: {
|
||||
read: [
|
||||
Permission.ProjectOwner,
|
||||
@@ -248,6 +270,12 @@ export default class ExceptionInstance extends AnalyticsBaseModel {
|
||||
description: "ID of the span",
|
||||
required: false,
|
||||
type: TableColumnType.Text,
|
||||
skipIndex: {
|
||||
name: "idx_span_id",
|
||||
type: SkipIndexType.BloomFilter,
|
||||
params: [0.01],
|
||||
granularity: 1,
|
||||
},
|
||||
accessControl: {
|
||||
read: [
|
||||
Permission.ProjectOwner,
|
||||
@@ -271,6 +299,12 @@ export default class ExceptionInstance extends AnalyticsBaseModel {
|
||||
description: "Fingerprint of the exception",
|
||||
required: true,
|
||||
type: TableColumnType.Text,
|
||||
skipIndex: {
|
||||
name: "idx_fingerprint",
|
||||
type: SkipIndexType.BloomFilter,
|
||||
params: [0.01],
|
||||
granularity: 1,
|
||||
},
|
||||
accessControl: {
|
||||
read: [
|
||||
Permission.ProjectOwner,
|
||||
@@ -406,6 +440,16 @@ export default class ExceptionInstance extends AnalyticsBaseModel {
|
||||
},
|
||||
});
|
||||
|
||||
const retentionDateColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
|
||||
key: "retentionDate",
|
||||
title: "Retention Date",
|
||||
description:
|
||||
"Date after which this row is eligible for TTL deletion, computed at ingest time as time + service.retainTelemetryDataForDays",
|
||||
required: true,
|
||||
type: TableColumnType.Date,
|
||||
defaultValue: undefined,
|
||||
});
|
||||
|
||||
super({
|
||||
tableName: "ExceptionItem",
|
||||
tableEngine: AnalyticsTableEngine.MergeTree,
|
||||
@@ -459,11 +503,13 @@ export default class ExceptionInstance extends AnalyticsBaseModel {
|
||||
environmentColumn,
|
||||
parsedFramesColumn,
|
||||
attributesColumn,
|
||||
retentionDateColumn,
|
||||
],
|
||||
projections: [],
|
||||
sortKeys: ["projectId", "time", "serviceId", "fingerprint"],
|
||||
primaryKeys: ["projectId", "time", "serviceId", "fingerprint"],
|
||||
partitionKey: "sipHash64(projectId) % 16",
|
||||
ttlExpression: "retentionDate DELETE",
|
||||
});
|
||||
}
|
||||
|
||||
@@ -602,4 +648,12 @@ export default class ExceptionInstance extends AnalyticsBaseModel {
|
||||
public set parsedFrames(v: string | undefined) {
|
||||
this.setColumnValue("parsedFrames", v);
|
||||
}
|
||||
|
||||
public get retentionDate(): Date | undefined {
|
||||
return this.getColumnValue("retentionDate") as Date | undefined;
|
||||
}
|
||||
|
||||
public set retentionDate(v: Date | undefined) {
|
||||
this.setColumnValue("retentionDate", v);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
import AnalyticsBaseModel from "./AnalyticsBaseModel/AnalyticsBaseModel";
|
||||
import Route from "../../Types/API/Route";
|
||||
import AnalyticsTableEngine from "../../Types/AnalyticsDatabase/AnalyticsTableEngine";
|
||||
import AnalyticsTableColumn from "../../Types/AnalyticsDatabase/TableColumn";
|
||||
import AnalyticsTableColumn, {
|
||||
SkipIndexType,
|
||||
} from "../../Types/AnalyticsDatabase/TableColumn";
|
||||
import TableColumnType from "../../Types/AnalyticsDatabase/TableColumnType";
|
||||
import { JSONObject } from "../../Types/JSON";
|
||||
import ObjectID from "../../Types/ObjectID";
|
||||
@@ -109,6 +111,12 @@ export default class Log extends AnalyticsBaseModel {
|
||||
description: "Log Severity Text",
|
||||
required: true,
|
||||
type: TableColumnType.Text,
|
||||
skipIndex: {
|
||||
name: "idx_severity",
|
||||
type: SkipIndexType.Set,
|
||||
params: [10],
|
||||
granularity: 4,
|
||||
},
|
||||
accessControl: {
|
||||
read: [
|
||||
Permission.ProjectOwner,
|
||||
@@ -205,6 +213,12 @@ export default class Log extends AnalyticsBaseModel {
|
||||
description: "ID of the trace",
|
||||
required: false,
|
||||
type: TableColumnType.Text,
|
||||
skipIndex: {
|
||||
name: "idx_trace_id",
|
||||
type: SkipIndexType.BloomFilter,
|
||||
params: [0.01],
|
||||
granularity: 1,
|
||||
},
|
||||
accessControl: {
|
||||
read: [
|
||||
Permission.ProjectOwner,
|
||||
@@ -228,6 +242,12 @@ export default class Log extends AnalyticsBaseModel {
|
||||
description: "ID of the span",
|
||||
required: false,
|
||||
type: TableColumnType.Text,
|
||||
skipIndex: {
|
||||
name: "idx_span_id",
|
||||
type: SkipIndexType.BloomFilter,
|
||||
params: [0.01],
|
||||
granularity: 1,
|
||||
},
|
||||
accessControl: {
|
||||
read: [
|
||||
Permission.ProjectOwner,
|
||||
@@ -251,6 +271,16 @@ export default class Log extends AnalyticsBaseModel {
|
||||
description: "Body of the Log",
|
||||
required: false,
|
||||
type: TableColumnType.Text,
|
||||
skipIndex: {
|
||||
name: "idx_body",
|
||||
type: SkipIndexType.TokenBF,
|
||||
params: [10240, 3, 0],
|
||||
granularity: 4,
|
||||
},
|
||||
codec: {
|
||||
codec: "ZSTD",
|
||||
level: 3,
|
||||
},
|
||||
accessControl: {
|
||||
read: [
|
||||
Permission.ProjectOwner,
|
||||
@@ -268,6 +298,16 @@ export default class Log extends AnalyticsBaseModel {
|
||||
},
|
||||
});
|
||||
|
||||
const retentionDateColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
|
||||
key: "retentionDate",
|
||||
title: "Retention Date",
|
||||
description:
|
||||
"Date after which this row is eligible for TTL deletion, computed at ingest time as time + service.retainTelemetryDataForDays",
|
||||
required: true,
|
||||
type: TableColumnType.Date,
|
||||
defaultValue: undefined,
|
||||
});
|
||||
|
||||
super({
|
||||
tableName: "LogItem",
|
||||
tableEngine: AnalyticsTableEngine.MergeTree,
|
||||
@@ -312,11 +352,13 @@ export default class Log extends AnalyticsBaseModel {
|
||||
traceIdColumn,
|
||||
spanIdColumn,
|
||||
bodyColumn,
|
||||
retentionDateColumn,
|
||||
],
|
||||
projections: [],
|
||||
sortKeys: ["projectId", "time", "serviceId"],
|
||||
primaryKeys: ["projectId", "time", "serviceId"],
|
||||
partitionKey: "sipHash64(projectId) % 16",
|
||||
ttlExpression: "retentionDate DELETE",
|
||||
});
|
||||
}
|
||||
|
||||
@@ -407,4 +449,12 @@ export default class Log extends AnalyticsBaseModel {
|
||||
public set spanId(v: string | undefined) {
|
||||
this.setColumnValue("spanId", v);
|
||||
}
|
||||
|
||||
public get retentionDate(): Date | undefined {
|
||||
return this.getColumnValue("retentionDate") as Date | undefined;
|
||||
}
|
||||
|
||||
public set retentionDate(v: Date | undefined) {
|
||||
this.setColumnValue("retentionDate", v);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
import AnalyticsBaseModel from "./AnalyticsBaseModel/AnalyticsBaseModel";
|
||||
import Route from "../../Types/API/Route";
|
||||
import AnalyticsTableEngine from "../../Types/AnalyticsDatabase/AnalyticsTableEngine";
|
||||
import AnalyticsTableColumn from "../../Types/AnalyticsDatabase/TableColumn";
|
||||
import AnalyticsTableColumn, {
|
||||
SkipIndexType,
|
||||
} from "../../Types/AnalyticsDatabase/TableColumn";
|
||||
import TableColumnType from "../../Types/AnalyticsDatabase/TableColumnType";
|
||||
import { JSONObject } from "../../Types/JSON";
|
||||
import ObjectID from "../../Types/ObjectID";
|
||||
@@ -83,6 +85,12 @@ export default class Metric extends AnalyticsBaseModel {
|
||||
description: "Type of the service that this telemetry belongs to",
|
||||
required: false,
|
||||
type: TableColumnType.Text,
|
||||
skipIndex: {
|
||||
name: "idx_service_type",
|
||||
type: SkipIndexType.Set,
|
||||
params: [5],
|
||||
granularity: 4,
|
||||
},
|
||||
accessControl: {
|
||||
read: [
|
||||
Permission.ProjectOwner,
|
||||
@@ -107,6 +115,12 @@ export default class Metric extends AnalyticsBaseModel {
|
||||
description: "Name of the Metric",
|
||||
required: true,
|
||||
type: TableColumnType.Text,
|
||||
skipIndex: {
|
||||
name: "idx_name",
|
||||
type: SkipIndexType.BloomFilter,
|
||||
params: [0.01],
|
||||
granularity: 1,
|
||||
},
|
||||
accessControl: {
|
||||
read: [
|
||||
Permission.ProjectOwner,
|
||||
@@ -503,6 +517,16 @@ export default class Metric extends AnalyticsBaseModel {
|
||||
},
|
||||
);
|
||||
|
||||
const retentionDateColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
|
||||
key: "retentionDate",
|
||||
title: "Retention Date",
|
||||
description:
|
||||
"Date after which this row is eligible for TTL deletion, computed at ingest time as time + service.retainTelemetryDataForDays",
|
||||
required: true,
|
||||
type: TableColumnType.Date,
|
||||
defaultValue: undefined,
|
||||
});
|
||||
|
||||
super({
|
||||
tableName: "MetricItem",
|
||||
tableEngine: AnalyticsTableEngine.MergeTree,
|
||||
@@ -556,11 +580,13 @@ export default class Metric extends AnalyticsBaseModel {
|
||||
maxColumn,
|
||||
bucketCountsColumn,
|
||||
explicitBoundsColumn,
|
||||
retentionDateColumn,
|
||||
],
|
||||
projections: [],
|
||||
sortKeys: ["projectId", "time", "serviceId"],
|
||||
primaryKeys: ["projectId", "time", "serviceId"],
|
||||
partitionKey: "sipHash64(projectId) % 16",
|
||||
ttlExpression: "retentionDate DELETE",
|
||||
});
|
||||
}
|
||||
|
||||
@@ -727,4 +753,12 @@ export default class Metric extends AnalyticsBaseModel {
|
||||
public set explicitBounds(v: Array<number> | undefined) {
|
||||
this.setColumnValue("explicitBounds", v);
|
||||
}
|
||||
|
||||
public get retentionDate(): Date | undefined {
|
||||
return this.getColumnValue("retentionDate") as Date | undefined;
|
||||
}
|
||||
|
||||
public set retentionDate(v: Date | undefined) {
|
||||
this.setColumnValue("retentionDate", v);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -103,6 +103,16 @@ export default class MonitorLog extends AnalyticsBaseModel {
|
||||
},
|
||||
});
|
||||
|
||||
const retentionDateColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
|
||||
key: "retentionDate",
|
||||
title: "Retention Date",
|
||||
description:
|
||||
"Date after which this row is eligible for TTL deletion, computed at ingest time as time + service.retainTelemetryDataForDays",
|
||||
required: true,
|
||||
type: TableColumnType.Date,
|
||||
defaultValue: undefined,
|
||||
});
|
||||
|
||||
super({
|
||||
tableName: "MonitorLog",
|
||||
tableEngine: AnalyticsTableEngine.MergeTree,
|
||||
@@ -140,11 +150,13 @@ export default class MonitorLog extends AnalyticsBaseModel {
|
||||
monitorIdColumn,
|
||||
timeColumn,
|
||||
logBodyColumn,
|
||||
retentionDateColumn,
|
||||
],
|
||||
projections: [],
|
||||
sortKeys: ["projectId", "time", "monitorId"],
|
||||
primaryKeys: ["projectId", "time", "monitorId"],
|
||||
partitionKey: "sipHash64(projectId) % 16",
|
||||
ttlExpression: "retentionDate DELETE",
|
||||
});
|
||||
}
|
||||
|
||||
@@ -176,4 +188,12 @@ export default class MonitorLog extends AnalyticsBaseModel {
|
||||
public set logBody(v: JSONObject | undefined) {
|
||||
this.setColumnValue("logBody", v);
|
||||
}
|
||||
|
||||
public get retentionDate(): Date | undefined {
|
||||
return this.getColumnValue("retentionDate") as Date | undefined;
|
||||
}
|
||||
|
||||
public set retentionDate(v: Date | undefined) {
|
||||
this.setColumnValue("retentionDate", v);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
import AnalyticsBaseModel from "./AnalyticsBaseModel/AnalyticsBaseModel";
|
||||
import Route from "../../Types/API/Route";
|
||||
import AnalyticsTableEngine from "../../Types/AnalyticsDatabase/AnalyticsTableEngine";
|
||||
import AnalyticsTableColumn from "../../Types/AnalyticsDatabase/TableColumn";
|
||||
import AnalyticsTableColumn, {
|
||||
SkipIndexType,
|
||||
} from "../../Types/AnalyticsDatabase/TableColumn";
|
||||
import TableColumnType from "../../Types/AnalyticsDatabase/TableColumnType";
|
||||
import { JSONObject } from "../../Types/JSON";
|
||||
import ObjectID from "../../Types/ObjectID";
|
||||
@@ -212,6 +214,12 @@ export default class Span extends AnalyticsBaseModel {
|
||||
description: "ID of the trace",
|
||||
required: true,
|
||||
type: TableColumnType.Text,
|
||||
skipIndex: {
|
||||
name: "idx_trace_id",
|
||||
type: SkipIndexType.BloomFilter,
|
||||
params: [0.01],
|
||||
granularity: 1,
|
||||
},
|
||||
accessControl: {
|
||||
read: [
|
||||
Permission.ProjectOwner,
|
||||
@@ -235,6 +243,12 @@ export default class Span extends AnalyticsBaseModel {
|
||||
description: "ID of the span",
|
||||
required: true,
|
||||
type: TableColumnType.Text,
|
||||
skipIndex: {
|
||||
name: "idx_span_id",
|
||||
type: SkipIndexType.BloomFilter,
|
||||
params: [0.01],
|
||||
granularity: 1,
|
||||
},
|
||||
accessControl: {
|
||||
read: [
|
||||
Permission.ProjectOwner,
|
||||
@@ -400,6 +414,12 @@ export default class Span extends AnalyticsBaseModel {
|
||||
description: "Status Code",
|
||||
required: false,
|
||||
type: TableColumnType.Number,
|
||||
skipIndex: {
|
||||
name: "idx_status_code",
|
||||
type: SkipIndexType.Set,
|
||||
params: [5],
|
||||
granularity: 4,
|
||||
},
|
||||
accessControl: {
|
||||
read: [
|
||||
Permission.ProjectOwner,
|
||||
@@ -446,6 +466,12 @@ export default class Span extends AnalyticsBaseModel {
|
||||
description: "Name of the span",
|
||||
required: false,
|
||||
type: TableColumnType.Text,
|
||||
skipIndex: {
|
||||
name: "idx_name",
|
||||
type: SkipIndexType.TokenBF,
|
||||
params: [10240, 3, 0],
|
||||
granularity: 4,
|
||||
},
|
||||
accessControl: {
|
||||
read: [
|
||||
Permission.ProjectOwner,
|
||||
@@ -486,6 +512,16 @@ export default class Span extends AnalyticsBaseModel {
|
||||
},
|
||||
});
|
||||
|
||||
const retentionDateColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
|
||||
key: "retentionDate",
|
||||
title: "Retention Date",
|
||||
description:
|
||||
"Date after which this row is eligible for TTL deletion, computed at ingest time as startTime + service.retainTelemetryDataForDays",
|
||||
required: true,
|
||||
type: TableColumnType.Date,
|
||||
defaultValue: undefined,
|
||||
});
|
||||
|
||||
super({
|
||||
tableName: "SpanItem",
|
||||
tableEngine: AnalyticsTableEngine.MergeTree,
|
||||
@@ -538,11 +574,13 @@ export default class Span extends AnalyticsBaseModel {
|
||||
statusMessageColumn,
|
||||
nameColumn,
|
||||
kindColumn,
|
||||
retentionDateColumn,
|
||||
],
|
||||
projections: [],
|
||||
sortKeys: ["projectId", "startTime", "serviceId", "traceId"],
|
||||
primaryKeys: ["projectId", "startTime", "serviceId", "traceId"],
|
||||
partitionKey: "sipHash64(projectId) % 16",
|
||||
ttlExpression: "retentionDate DELETE",
|
||||
});
|
||||
}
|
||||
|
||||
@@ -697,4 +735,12 @@ export default class Span extends AnalyticsBaseModel {
|
||||
public set statusMessage(v: string | undefined) {
|
||||
this.setColumnValue("statusMessage", v);
|
||||
}
|
||||
|
||||
public get retentionDate(): Date | undefined {
|
||||
return this.getColumnValue("retentionDate") as Date | undefined;
|
||||
}
|
||||
|
||||
public set retentionDate(v: Date | undefined) {
|
||||
this.setColumnValue("retentionDate", v);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import TableColumnType from "../../Types/AnalyticsDatabase/TableColumnType";
|
||||
import { JSONObject } from "../../Types/JSON";
|
||||
import ObjectID from "../../Types/ObjectID";
|
||||
import BadDataException from "../../Types/Exception/BadDataException";
|
||||
import Includes from "../../Types/BaseDatabase/Includes";
|
||||
import CaptureSpan from "../Utils/Telemetry/CaptureSpan";
|
||||
import { DbJSONResponse, Results } from "./AnalyticsDatabaseService";
|
||||
|
||||
@@ -204,39 +205,43 @@ export class LogAggregationService {
|
||||
>,
|
||||
): void {
|
||||
if (request.serviceIds && request.serviceIds.length > 0) {
|
||||
const idStrings: Array<string> = request.serviceIds.map(
|
||||
(id: ObjectID): string => {
|
||||
return `'${id.toString()}'`;
|
||||
},
|
||||
statement.append(
|
||||
SQL` AND serviceId IN (${{
|
||||
type: TableColumnType.ObjectID,
|
||||
value: new Includes(
|
||||
request.serviceIds.map((id: ObjectID) => {
|
||||
return id.toString();
|
||||
}),
|
||||
),
|
||||
}})`,
|
||||
);
|
||||
statement.append(` AND serviceId IN (${idStrings.join(",")})`);
|
||||
}
|
||||
|
||||
if (request.severityTexts && request.severityTexts.length > 0) {
|
||||
const sevStrings: Array<string> = request.severityTexts.map(
|
||||
(s: string): string => {
|
||||
return `'${LogAggregationService.escapeSingleQuotes(s)}'`;
|
||||
},
|
||||
statement.append(
|
||||
SQL` AND severityText IN (${{
|
||||
type: TableColumnType.Text,
|
||||
value: new Includes(request.severityTexts),
|
||||
}})`,
|
||||
);
|
||||
statement.append(` AND severityText IN (${sevStrings.join(",")})`);
|
||||
}
|
||||
|
||||
if (request.traceIds && request.traceIds.length > 0) {
|
||||
const traceStrings: Array<string> = request.traceIds.map(
|
||||
(s: string): string => {
|
||||
return `'${LogAggregationService.escapeSingleQuotes(s)}'`;
|
||||
},
|
||||
statement.append(
|
||||
SQL` AND traceId IN (${{
|
||||
type: TableColumnType.Text,
|
||||
value: new Includes(request.traceIds),
|
||||
}})`,
|
||||
);
|
||||
statement.append(` AND traceId IN (${traceStrings.join(",")})`);
|
||||
}
|
||||
|
||||
if (request.spanIds && request.spanIds.length > 0) {
|
||||
const spanStrings: Array<string> = request.spanIds.map(
|
||||
(s: string): string => {
|
||||
return `'${LogAggregationService.escapeSingleQuotes(s)}'`;
|
||||
},
|
||||
statement.append(
|
||||
SQL` AND spanId IN (${{
|
||||
type: TableColumnType.Text,
|
||||
value: new Includes(request.spanIds),
|
||||
}})`,
|
||||
);
|
||||
statement.append(` AND spanId IN (${spanStrings.join(",")})`);
|
||||
}
|
||||
|
||||
if (request.bodySearchText && request.bodySearchText.trim().length > 0) {
|
||||
@@ -276,9 +281,6 @@ export class LogAggregationService {
|
||||
}
|
||||
}
|
||||
|
||||
private static escapeSingleQuotes(value: string): string {
|
||||
return value.replace(/'/g, "\\'");
|
||||
}
|
||||
}
|
||||
|
||||
export default LogAggregationService;
|
||||
|
||||
@@ -600,6 +600,31 @@ export default class StatementGenerator<TBaseModel extends AnalyticsBaseModel> {
|
||||
.append(this.toColumnType(column.type))
|
||||
.append(SQL`)`),
|
||||
);
|
||||
|
||||
// Append CODEC if specified
|
||||
if (column.codec) {
|
||||
const codecStr: string = column.codec.level !== undefined
|
||||
? `${column.codec.codec}(${column.codec.level})`
|
||||
: column.codec.codec;
|
||||
columns.append(` CODEC(${codecStr})`);
|
||||
}
|
||||
}
|
||||
|
||||
// Append skip indexes after column definitions
|
||||
const skipIndexColumns: Array<AnalyticsTableColumn> = tableColumns.filter(
|
||||
(col: AnalyticsTableColumn) => {
|
||||
return col.skipIndex !== undefined;
|
||||
},
|
||||
);
|
||||
|
||||
for (const col of skipIndexColumns) {
|
||||
const idx: AnalyticsTableColumn["skipIndex"] = col.skipIndex!;
|
||||
const paramsStr: string = idx.params && idx.params.length > 0
|
||||
? `(${idx.params.join(", ")})`
|
||||
: "";
|
||||
columns.append(
|
||||
`, INDEX ${idx.name} ${col.key} TYPE ${idx.type}${paramsStr} GRANULARITY ${idx.granularity}`,
|
||||
);
|
||||
}
|
||||
|
||||
return columns;
|
||||
@@ -734,6 +759,11 @@ export default class StatementGenerator<TBaseModel extends AnalyticsBaseModel> {
|
||||
|
||||
statement.append(SQL`)`);
|
||||
|
||||
// Append TTL if specified
|
||||
if (this.model.ttlExpression) {
|
||||
statement.append(`\nTTL ${this.model.ttlExpression}`);
|
||||
}
|
||||
|
||||
/* eslint-enable prettier/prettier */
|
||||
|
||||
logger.debug(`${this.model.tableName} Table Create Statement`);
|
||||
|
||||
@@ -27,6 +27,11 @@ export default class MonitorLogUtil {
|
||||
const logTimestamp: string =
|
||||
OneUptimeDate.toClickhouseDateTime(logIngestionDate);
|
||||
|
||||
const retentionDate: Date = OneUptimeDate.addRemoveDays(
|
||||
logIngestionDate,
|
||||
15,
|
||||
);
|
||||
|
||||
const monitorLogRow: JSONObject = {
|
||||
_id: ObjectID.generate().toString(),
|
||||
createdAt: logTimestamp,
|
||||
@@ -35,6 +40,7 @@ export default class MonitorLogUtil {
|
||||
monitorId: data.monitorId.toString(),
|
||||
time: logTimestamp,
|
||||
logBody: JSON.parse(JSON.stringify(data.dataToProcess)),
|
||||
retentionDate: OneUptimeDate.toClickhouseDateTime(retentionDate),
|
||||
};
|
||||
|
||||
MonitorLogService.insertJsonRows([monitorLogRow]).catch((err: Error) => {
|
||||
|
||||
@@ -65,6 +65,11 @@ export default class MonitorMetricUtil {
|
||||
const attributeKeys: Array<string> =
|
||||
TelemetryUtil.getAttributeKeys(attributes);
|
||||
|
||||
const retentionDate: Date = OneUptimeDate.addRemoveDays(
|
||||
ingestionDate,
|
||||
15,
|
||||
);
|
||||
|
||||
return {
|
||||
_id: ObjectID.generate().toString(),
|
||||
createdAt: ingestionTimestamp,
|
||||
@@ -89,6 +94,7 @@ export default class MonitorMetricUtil {
|
||||
bucketCounts: [],
|
||||
explicitBounds: [],
|
||||
value: data.value ?? null,
|
||||
retentionDate: OneUptimeDate.toClickhouseDateTime(retentionDate),
|
||||
} as JSONObject;
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,29 @@ import { ColumnAccessControl } from "../BaseDatabase/AccessControl";
|
||||
import ColumnBillingAccessControl from "../BaseDatabase/ColumnBillingAccessControl";
|
||||
import { JSONValue } from "../JSON";
|
||||
|
||||
export enum SkipIndexType {
|
||||
BloomFilter = "bloom_filter",
|
||||
Set = "set",
|
||||
TokenBF = "tokenbf_v1",
|
||||
NgramBF = "ngrambf_v1",
|
||||
MinMax = "minmax",
|
||||
}
|
||||
|
||||
export interface SkipIndex {
|
||||
name: string;
|
||||
type: SkipIndexType;
|
||||
// e.g. 0.01 for bloom_filter, 10 for set, or [10240, 3, 0] for tokenbf_v1
|
||||
params?: Array<number> | undefined;
|
||||
granularity: number;
|
||||
}
|
||||
|
||||
export type ColumnCodec = "ZSTD" | "LZ4" | "LZ4HC" | "NONE";
|
||||
|
||||
export interface ColumnCodecConfig {
|
||||
codec: ColumnCodec;
|
||||
level?: number | undefined; // e.g. 3 for ZSTD(3)
|
||||
}
|
||||
|
||||
export default class AnalyticsTableColumn {
|
||||
private _key: string = "id";
|
||||
|
||||
@@ -103,6 +126,22 @@ export default class AnalyticsTableColumn {
|
||||
this._accessControl = v;
|
||||
}
|
||||
|
||||
private _skipIndex: SkipIndex | undefined;
|
||||
public get skipIndex(): SkipIndex | undefined {
|
||||
return this._skipIndex;
|
||||
}
|
||||
public set skipIndex(v: SkipIndex | undefined) {
|
||||
this._skipIndex = v;
|
||||
}
|
||||
|
||||
private _codec: ColumnCodecConfig | undefined;
|
||||
public get codec(): ColumnCodecConfig | undefined {
|
||||
return this._codec;
|
||||
}
|
||||
public set codec(v: ColumnCodecConfig | undefined) {
|
||||
this._codec = v;
|
||||
}
|
||||
|
||||
public constructor(data: {
|
||||
key: string;
|
||||
title: string;
|
||||
@@ -117,6 +156,8 @@ export default class AnalyticsTableColumn {
|
||||
forceGetDefaultValueOnCreate?:
|
||||
| (() => Date | string | number | boolean)
|
||||
| undefined;
|
||||
skipIndex?: SkipIndex | undefined;
|
||||
codec?: ColumnCodecConfig | undefined;
|
||||
}) {
|
||||
this.accessControl = data.accessControl;
|
||||
this.key = data.key;
|
||||
@@ -130,5 +171,7 @@ export default class AnalyticsTableColumn {
|
||||
this.billingAccessControl = data.billingAccessControl;
|
||||
this.allowAccessIfSubscriptionIsUnpaid =
|
||||
data.allowAccessIfSubscriptionIsUnpaid || false;
|
||||
this.skipIndex = data.skipIndex;
|
||||
this.codec = data.codec;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -407,31 +407,98 @@ INDEX idx_severity severityText TYPE set(10) GRANULARITY 4
|
||||
- `Telemetry/Services/OtelLogsIngestService.ts` (write DateTime64 timestamps)
|
||||
- `Worker/DataMigrations/` (new migration)
|
||||
|
||||
### 5.4 Add TTL for Automatic Data Retention (High)
|
||||
### 5.4 Add TTL for Per-Service Automatic Data Retention (High)
|
||||
|
||||
**Current**: The ingestion service tracks `dataRetentionInDays` per service but there is **no TTL clause** on the ClickHouse table. Data is never automatically deleted, leading to unbounded storage growth.
|
||||
**Current**: Each `Service` (PostgreSQL) has a `retainTelemetryDataForDays` field (default 15 days). Retention is enforced by an hourly cron job at `Worker/Jobs/TelemetryService/DeleteOldData.ts` that iterates over **every project → every service** and issues `ALTER TABLE DELETE` mutations per service. This is problematic because:
|
||||
- `ALTER TABLE DELETE` creates ClickHouse **mutations** — expensive async background operations that rewrite entire data parts
|
||||
- Running this for every service every hour can pile up hundreds of pending mutations
|
||||
- Mutations compete with ingestion for disk I/O and can degrade cluster performance
|
||||
- If the cron job fails or falls behind, data accumulates unboundedly
|
||||
|
||||
**Target**: ClickHouse-native TTL so old data is automatically dropped.
|
||||
**Target**: ClickHouse-native TTL with per-service retention, eliminating the mutation-based cron job.
|
||||
|
||||
**Approach: `retentionDate` column with row-level TTL**
|
||||
|
||||
Since retention is per-service (not per-table), a simple `TTL time + INTERVAL N DAY` won't work — different rows in the same table need different expiry times. The solution is to compute the expiry date at ingest time and store it:
|
||||
|
||||
1. Add a `retentionDate DateTime` column to `LogItem`
|
||||
2. At ingest time, compute `retentionDate = time + service.retainTelemetryDataForDays`
|
||||
3. Set the table TTL to `TTL retentionDate DELETE`
|
||||
4. ClickHouse automatically drops expired rows during background merges — no mutations, no cron job
|
||||
|
||||
**Implementation**:
|
||||
|
||||
- Add a TTL clause to the LogItem table definition:
|
||||
```sql
|
||||
TTL time + INTERVAL 30 DAY DELETE
|
||||
- Add `retentionDate` column to the Log model at `Common/Models/AnalyticsModels/Log.ts`:
|
||||
```typescript
|
||||
const retentionDateColumn = new AnalyticsTableColumn({
|
||||
key: "retentionDate",
|
||||
title: "Retention Date",
|
||||
description: "Date after which this row is eligible for TTL deletion",
|
||||
required: true,
|
||||
type: TableColumnType.Date,
|
||||
// no read/create access needed — internal-only column
|
||||
});
|
||||
```
|
||||
- For per-tenant or per-service retention, options include:
|
||||
- A single global TTL (simplest) with a background job that deletes data beyond the service-specific retention
|
||||
- A `retentionDate` column populated at ingest time (`time + dataRetentionInDays`) with `TTL retentionDate DELETE`
|
||||
- Extend `AnalyticsBaseModel` to support TTL configuration
|
||||
- Migration to add TTL to existing table: `ALTER TABLE LogItem MODIFY TTL time + INTERVAL 30 DAY DELETE`
|
||||
|
||||
**Impact**: Prevents disk exhaustion. Without TTL, the only way to remove old data is manual `ALTER TABLE DELETE` which is expensive and must be scheduled externally.
|
||||
- Add TTL clause to table definition:
|
||||
```sql
|
||||
TTL retentionDate DELETE
|
||||
```
|
||||
|
||||
- Update ingestion in `OtelLogsIngestService.processLogsAsync()` to compute and store the retention date:
|
||||
```typescript
|
||||
const retentionDays = serviceDictionary[serviceName]!.dataRententionInDays;
|
||||
const retentionDate = OneUptimeDate.addRemoveDays(timeDate, retentionDays);
|
||||
|
||||
const logRow: JSONObject = {
|
||||
// ... existing fields ...
|
||||
retentionDate: OneUptimeDate.toClickhouseDateTime(retentionDate),
|
||||
};
|
||||
```
|
||||
|
||||
- Also update `FluentLogsIngestService` and `SyslogIngestService` (same pattern)
|
||||
|
||||
- Data migration for existing data:
|
||||
```sql
|
||||
-- Add the column
|
||||
ALTER TABLE LogItem ADD COLUMN retentionDate DateTime DEFAULT time + INTERVAL 15 DAY;
|
||||
|
||||
-- Set the TTL
|
||||
ALTER TABLE LogItem MODIFY TTL retentionDate DELETE;
|
||||
```
|
||||
Existing rows without an explicit `retentionDate` will use the DEFAULT expression (`time + 15 days`) which provides a safe fallback.
|
||||
|
||||
- Deprecate the cron job at `Worker/Jobs/TelemetryService/DeleteOldData.ts` — can be kept temporarily as a safety net for the transition period, then removed
|
||||
|
||||
**Edge cases**:
|
||||
|
||||
- **Retention policy changes**: If a user changes `retainTelemetryDataForDays` from 30 to 7, already-ingested rows still have the old `retentionDate`. Options:
|
||||
- Accept that the change only applies to newly ingested data (simplest, recommended)
|
||||
- Run a one-time `ALTER TABLE UPDATE retentionDate = time + INTERVAL 7 DAY WHERE serviceId = {sid}` mutation (expensive but correct). This could be triggered from the Service settings UI.
|
||||
- **Default retention**: If `retainTelemetryDataForDays` is not set on a service, default to 15 days (matching current behavior)
|
||||
- **Same approach for Span and Metric tables**: This pattern should be applied to `SpanItem` and `MetricItem` tables as well since they use the same cron-based deletion today
|
||||
|
||||
**Why this is better than the current approach**:
|
||||
|
||||
| | Current (cron + mutations) | Proposed (TTL + retentionDate) |
|
||||
|---|---|---|
|
||||
| Mechanism | Hourly `ALTER TABLE DELETE` per service | ClickHouse background merges |
|
||||
| Cost | Creates mutations that rewrite data parts | Free — part of normal merge cycle |
|
||||
| Reliability | Depends on cron job running | Built into ClickHouse engine |
|
||||
| Scale | O(projects × services) mutations/hour | Zero external operations |
|
||||
| Disk I/O | Heavy (mutation rewrites) | Minimal (parts dropped during merge) |
|
||||
|
||||
**Files to modify**:
|
||||
- `Common/Models/AnalyticsModels/Log.ts` (add TTL config)
|
||||
- `Common/Models/AnalyticsModels/AnalyticsBaseModel/AnalyticsBaseModel.ts` (TTL support)
|
||||
- `Common/Server/Utils/AnalyticsDatabase/StatementGenerator.ts` (emit TTL clause)
|
||||
- `Worker/DataMigrations/` (new migration)
|
||||
- `Common/Models/AnalyticsModels/Log.ts` (add `retentionDate` column + TTL config)
|
||||
- `Common/Models/AnalyticsModels/AnalyticsBaseModel/AnalyticsBaseModel.ts` (TTL support in base model)
|
||||
- `Common/Server/Utils/AnalyticsDatabase/StatementGenerator.ts` (emit TTL clause in CREATE TABLE)
|
||||
- `Telemetry/Services/OtelLogsIngestService.ts` (compute and store `retentionDate`)
|
||||
- `Telemetry/Services/FluentLogsIngestService.ts` (same)
|
||||
- `Telemetry/Services/SyslogIngestService.ts` (same)
|
||||
- `Common/Server/Services/OpenTelemetryIngestService.ts` (ensure `dataRetentionInDays` is passed through)
|
||||
- `Worker/DataMigrations/` (new migration to add column + set TTL)
|
||||
- `Worker/Jobs/TelemetryService/DeleteOldData.ts` (deprecate after transition)
|
||||
- Apply same pattern to `Common/Models/AnalyticsModels/Span.ts` and `Common/Models/AnalyticsModels/Metric.ts`
|
||||
|
||||
### 5.5 Fix SQL Construction in `LogAggregationService` (High)
|
||||
|
||||
|
||||
@@ -219,6 +219,11 @@ export default class FluentLogsIngestService extends OtelIngestBaseService {
|
||||
...entryAttributes,
|
||||
};
|
||||
|
||||
const retentionDate: Date = OneUptimeDate.addRemoveDays(
|
||||
ingestionDate,
|
||||
serviceMetadata.dataRententionInDays || 15,
|
||||
);
|
||||
|
||||
const logRow: JSONObject = {
|
||||
_id: ObjectID.generate().toString(),
|
||||
createdAt: ingestionDateTime,
|
||||
@@ -234,6 +239,8 @@ export default class FluentLogsIngestService extends OtelIngestBaseService {
|
||||
traceId,
|
||||
spanId,
|
||||
body,
|
||||
retentionDate:
|
||||
OneUptimeDate.toClickhouseDateTime(retentionDate),
|
||||
} satisfies JSONObject;
|
||||
|
||||
dbLogs.push(logRow);
|
||||
|
||||
@@ -286,6 +286,11 @@ export default class OtelLogsIngestService extends OtelIngestBaseService {
|
||||
const logTimestamp: string =
|
||||
OneUptimeDate.toClickhouseDateTime(timeDate);
|
||||
|
||||
const retentionDate: Date = OneUptimeDate.addRemoveDays(
|
||||
ingestionDate,
|
||||
serviceDictionary[serviceName]!.dataRententionInDays || 15,
|
||||
);
|
||||
|
||||
const logRow: JSONObject = {
|
||||
_id: ObjectID.generate().toString(),
|
||||
createdAt: ingestionTimestamp,
|
||||
@@ -301,6 +306,8 @@ export default class OtelLogsIngestService extends OtelIngestBaseService {
|
||||
traceId: traceId,
|
||||
spanId: spanId,
|
||||
body: body,
|
||||
retentionDate:
|
||||
OneUptimeDate.toClickhouseDateTime(retentionDate),
|
||||
};
|
||||
|
||||
dbLogs.push(logRow);
|
||||
|
||||
@@ -299,6 +299,8 @@ export default class OtelMetricsIngestService extends OtelIngestBaseService {
|
||||
metricName: metricName,
|
||||
metricPointType: metricPointType,
|
||||
aggregationTemporality: aggregationTemporality,
|
||||
dataRententionInDays:
|
||||
serviceMetadata.dataRententionInDays,
|
||||
...(typeof isMonotonic === "boolean"
|
||||
? { isMonotonic: isMonotonic }
|
||||
: {}),
|
||||
@@ -389,6 +391,7 @@ export default class OtelMetricsIngestService extends OtelIngestBaseService {
|
||||
metricPointType: MetricPointType;
|
||||
aggregationTemporality?: OtelAggregationTemporality;
|
||||
isMonotonic?: boolean;
|
||||
dataRententionInDays: number;
|
||||
}): JSONObject {
|
||||
const ingestionDate: Date = OneUptimeDate.getCurrentDate();
|
||||
const ingestionTimestamp: string =
|
||||
@@ -465,6 +468,11 @@ export default class OtelMetricsIngestService extends OtelIngestBaseService {
|
||||
})
|
||||
: [];
|
||||
|
||||
const retentionDate: Date = OneUptimeDate.addRemoveDays(
|
||||
ingestionDate,
|
||||
data.dataRententionInDays || 15,
|
||||
);
|
||||
|
||||
const row: JSONObject = {
|
||||
_id: ObjectID.generate().toString(),
|
||||
createdAt: ingestionTimestamp,
|
||||
@@ -495,6 +503,7 @@ export default class OtelMetricsIngestService extends OtelIngestBaseService {
|
||||
max: max,
|
||||
bucketCounts: bucketCounts,
|
||||
explicitBounds: explicitBounds,
|
||||
retentionDate: OneUptimeDate.toClickhouseDateTime(retentionDate),
|
||||
};
|
||||
|
||||
if (startTimeFields) {
|
||||
|
||||
@@ -59,6 +59,7 @@ type ExceptionEventPayload = {
|
||||
release: string;
|
||||
environment: string;
|
||||
parsedFrames: string;
|
||||
dataRententionInDays: number;
|
||||
};
|
||||
|
||||
export default class OtelTracesIngestService extends OtelIngestBaseService {
|
||||
@@ -323,6 +324,8 @@ export default class OtelTracesIngestService extends OtelIngestBaseService {
|
||||
spanStatusCode: statusCode,
|
||||
spanName: spanName,
|
||||
resourceAttributes: resourceAttributes,
|
||||
dataRententionInDays:
|
||||
serviceDictionary[serviceName]!.dataRententionInDays,
|
||||
},
|
||||
dbExceptions,
|
||||
);
|
||||
@@ -361,6 +364,8 @@ export default class OtelTracesIngestService extends OtelIngestBaseService {
|
||||
durationUnixNano: durationUnixNano,
|
||||
events: spanEvents,
|
||||
links: spanLinks,
|
||||
dataRententionInDays:
|
||||
serviceDictionary[serviceName]!.dataRententionInDays,
|
||||
});
|
||||
|
||||
dbSpans.push(spanRow);
|
||||
@@ -453,6 +458,7 @@ export default class OtelTracesIngestService extends OtelIngestBaseService {
|
||||
spanStatusCode: SpanStatus;
|
||||
spanName: string;
|
||||
resourceAttributes: Dictionary<AttributeType | Array<AttributeType>>;
|
||||
dataRententionInDays: number;
|
||||
},
|
||||
dbExceptions: Array<JSONObject>,
|
||||
): Array<JSONObject> {
|
||||
@@ -550,6 +556,7 @@ export default class OtelTracesIngestService extends OtelIngestBaseService {
|
||||
release: release,
|
||||
environment: environment,
|
||||
parsedFrames: parsedFramesJson,
|
||||
dataRententionInDays: spanContext.dataRententionInDays,
|
||||
};
|
||||
|
||||
dbExceptions.push(this.buildExceptionRow(exceptionData));
|
||||
@@ -652,10 +659,15 @@ export default class OtelTracesIngestService extends OtelIngestBaseService {
|
||||
durationUnixNano: string;
|
||||
events: Array<JSONObject>;
|
||||
links: Array<JSONObject>;
|
||||
dataRententionInDays: number;
|
||||
}): JSONObject {
|
||||
const ingestionDate: Date = OneUptimeDate.getCurrentDate();
|
||||
const ingestionTimestamp: string =
|
||||
OneUptimeDate.toClickhouseDateTime(ingestionDate);
|
||||
const retentionDate: Date = OneUptimeDate.addRemoveDays(
|
||||
ingestionDate,
|
||||
data.dataRententionInDays || 15,
|
||||
);
|
||||
|
||||
return {
|
||||
_id: ObjectID.generate().toString(),
|
||||
@@ -680,6 +692,7 @@ export default class OtelTracesIngestService extends OtelIngestBaseService {
|
||||
kind: data.kind,
|
||||
events: data.events,
|
||||
links: data.links,
|
||||
retentionDate: OneUptimeDate.toClickhouseDateTime(retentionDate),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -687,6 +700,10 @@ export default class OtelTracesIngestService extends OtelIngestBaseService {
|
||||
const ingestionDate: Date = OneUptimeDate.getCurrentDate();
|
||||
const ingestionTimestamp: string =
|
||||
OneUptimeDate.toClickhouseDateTime(ingestionDate);
|
||||
const retentionDate: Date = OneUptimeDate.addRemoveDays(
|
||||
ingestionDate,
|
||||
data.dataRententionInDays || 15,
|
||||
);
|
||||
|
||||
return {
|
||||
_id: ObjectID.generate().toString(),
|
||||
@@ -712,6 +729,7 @@ export default class OtelTracesIngestService extends OtelIngestBaseService {
|
||||
environment: data.environment || "",
|
||||
parsedFrames: data.parsedFrames || "[]",
|
||||
attributes: data.attributes || {},
|
||||
retentionDate: OneUptimeDate.toClickhouseDateTime(retentionDate),
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -194,6 +194,11 @@ export default class SyslogIngestService extends OtelIngestBaseService {
|
||||
serviceName,
|
||||
});
|
||||
|
||||
const retentionDate: Date = OneUptimeDate.addRemoveDays(
|
||||
ingestionDate,
|
||||
serviceMetadata.dataRententionInDays || 15,
|
||||
);
|
||||
|
||||
const logRow: JSONObject = {
|
||||
_id: ObjectID.generate().toString(),
|
||||
createdAt: OneUptimeDate.toClickhouseDateTime(ingestionDate),
|
||||
@@ -211,6 +216,8 @@ export default class SyslogIngestService extends OtelIngestBaseService {
|
||||
traceId: "",
|
||||
spanId: "",
|
||||
body: parsed.message,
|
||||
retentionDate:
|
||||
OneUptimeDate.toClickhouseDateTime(retentionDate),
|
||||
} satisfies JSONObject;
|
||||
|
||||
dbLogs.push(logRow);
|
||||
|
||||
@@ -0,0 +1,255 @@
|
||||
import DataMigrationBase from "./DataMigrationBase";
|
||||
import AnalyticsTableColumn from "Common/Types/AnalyticsDatabase/TableColumn";
|
||||
import TableColumnType from "Common/Types/AnalyticsDatabase/TableColumnType";
|
||||
import Log from "Common/Models/AnalyticsModels/Log";
|
||||
import Span from "Common/Models/AnalyticsModels/Span";
|
||||
import Metric from "Common/Models/AnalyticsModels/Metric";
|
||||
import ExceptionInstance from "Common/Models/AnalyticsModels/ExceptionInstance";
|
||||
import MonitorLog from "Common/Models/AnalyticsModels/MonitorLog";
|
||||
import LogService from "Common/Server/Services/LogService";
|
||||
import SpanService from "Common/Server/Services/SpanService";
|
||||
import MetricService from "Common/Server/Services/MetricService";
|
||||
import ExceptionInstanceService from "Common/Server/Services/ExceptionInstanceService";
|
||||
import MonitorLogService from "Common/Server/Services/MonitorLogService";
|
||||
import logger from "Common/Server/Utils/Logger";
|
||||
|
||||
export default class AddRetentionDateAndSkipIndexesToTelemetryTables extends DataMigrationBase {
|
||||
public constructor() {
|
||||
super("AddRetentionDateAndSkipIndexesToTelemetryTables");
|
||||
}
|
||||
|
||||
public override async migrate(): Promise<void> {
|
||||
// Add retentionDate column to all telemetry tables
|
||||
await this.addRetentionDateColumn(
|
||||
new Log(),
|
||||
LogService,
|
||||
"LogItem",
|
||||
);
|
||||
await this.addRetentionDateColumn(
|
||||
new Span(),
|
||||
SpanService,
|
||||
"SpanItem",
|
||||
);
|
||||
await this.addRetentionDateColumn(
|
||||
new Metric(),
|
||||
MetricService,
|
||||
"MetricItem",
|
||||
);
|
||||
await this.addRetentionDateColumn(
|
||||
new ExceptionInstance(),
|
||||
ExceptionInstanceService,
|
||||
"ExceptionItem",
|
||||
);
|
||||
await this.addRetentionDateColumn(
|
||||
new MonitorLog(),
|
||||
MonitorLogService,
|
||||
"MonitorLog",
|
||||
);
|
||||
|
||||
// Add skip indexes to Log table
|
||||
await this.addSkipIndex(
|
||||
LogService,
|
||||
"LogItem",
|
||||
"idx_severity",
|
||||
"severityText",
|
||||
"set(10)",
|
||||
4,
|
||||
);
|
||||
await this.addSkipIndex(
|
||||
LogService,
|
||||
"LogItem",
|
||||
"idx_trace_id",
|
||||
"traceId",
|
||||
"bloom_filter(0.01)",
|
||||
1,
|
||||
);
|
||||
await this.addSkipIndex(
|
||||
LogService,
|
||||
"LogItem",
|
||||
"idx_span_id",
|
||||
"spanId",
|
||||
"bloom_filter(0.01)",
|
||||
1,
|
||||
);
|
||||
await this.addSkipIndex(
|
||||
LogService,
|
||||
"LogItem",
|
||||
"idx_body",
|
||||
"body",
|
||||
"tokenbf_v1(10240, 3, 0)",
|
||||
4,
|
||||
);
|
||||
|
||||
// Add skip indexes to Span table
|
||||
await this.addSkipIndex(
|
||||
SpanService,
|
||||
"SpanItem",
|
||||
"idx_trace_id",
|
||||
"traceId",
|
||||
"bloom_filter(0.01)",
|
||||
1,
|
||||
);
|
||||
await this.addSkipIndex(
|
||||
SpanService,
|
||||
"SpanItem",
|
||||
"idx_span_id",
|
||||
"spanId",
|
||||
"bloom_filter(0.01)",
|
||||
1,
|
||||
);
|
||||
await this.addSkipIndex(
|
||||
SpanService,
|
||||
"SpanItem",
|
||||
"idx_status_code",
|
||||
"statusCode",
|
||||
"set(5)",
|
||||
4,
|
||||
);
|
||||
await this.addSkipIndex(
|
||||
SpanService,
|
||||
"SpanItem",
|
||||
"idx_name",
|
||||
"name",
|
||||
"tokenbf_v1(10240, 3, 0)",
|
||||
4,
|
||||
);
|
||||
|
||||
// Add skip indexes to Metric table
|
||||
await this.addSkipIndex(
|
||||
MetricService,
|
||||
"MetricItem",
|
||||
"idx_name",
|
||||
"name",
|
||||
"bloom_filter(0.01)",
|
||||
1,
|
||||
);
|
||||
await this.addSkipIndex(
|
||||
MetricService,
|
||||
"MetricItem",
|
||||
"idx_service_type",
|
||||
"serviceType",
|
||||
"set(5)",
|
||||
4,
|
||||
);
|
||||
|
||||
// Add skip indexes to Exception table
|
||||
await this.addSkipIndex(
|
||||
ExceptionInstanceService,
|
||||
"ExceptionItem",
|
||||
"idx_exception_type",
|
||||
"exceptionType",
|
||||
"bloom_filter(0.01)",
|
||||
1,
|
||||
);
|
||||
await this.addSkipIndex(
|
||||
ExceptionInstanceService,
|
||||
"ExceptionItem",
|
||||
"idx_trace_id",
|
||||
"traceId",
|
||||
"bloom_filter(0.01)",
|
||||
1,
|
||||
);
|
||||
await this.addSkipIndex(
|
||||
ExceptionInstanceService,
|
||||
"ExceptionItem",
|
||||
"idx_span_id",
|
||||
"spanId",
|
||||
"bloom_filter(0.01)",
|
||||
1,
|
||||
);
|
||||
await this.addSkipIndex(
|
||||
ExceptionInstanceService,
|
||||
"ExceptionItem",
|
||||
"idx_fingerprint",
|
||||
"fingerprint",
|
||||
"bloom_filter(0.01)",
|
||||
1,
|
||||
);
|
||||
|
||||
// Set TTL on all tables
|
||||
await this.setTTL(LogService, "LogItem");
|
||||
await this.setTTL(SpanService, "SpanItem");
|
||||
await this.setTTL(MetricService, "MetricItem");
|
||||
await this.setTTL(ExceptionInstanceService, "ExceptionItem");
|
||||
await this.setTTL(MonitorLogService, "MonitorLog");
|
||||
}
|
||||
|
||||
private async addRetentionDateColumn(
|
||||
model: { tableColumns: Array<AnalyticsTableColumn> },
|
||||
service: { addColumnInDatabase: (column: AnalyticsTableColumn) => Promise<void>; getColumnTypeInDatabase: (column: AnalyticsTableColumn) => Promise<TableColumnType | null> },
|
||||
tableName: string,
|
||||
): Promise<void> {
|
||||
try {
|
||||
const column: AnalyticsTableColumn | undefined =
|
||||
model.tableColumns.find((item: AnalyticsTableColumn) => {
|
||||
return item.key === "retentionDate";
|
||||
});
|
||||
|
||||
if (!column) {
|
||||
logger.warn(
|
||||
`retentionDate column not found in model for ${tableName}`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const columnType: TableColumnType | null =
|
||||
await service.getColumnTypeInDatabase(column);
|
||||
|
||||
if (!columnType) {
|
||||
await service.addColumnInDatabase(column);
|
||||
logger.info(
|
||||
`Added retentionDate column to ${tableName}`,
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
logger.error(
|
||||
`Error adding retentionDate column to ${tableName}`,
|
||||
);
|
||||
logger.error(err);
|
||||
}
|
||||
}
|
||||
|
||||
private async addSkipIndex(
|
||||
service: { execute: (statement: string) => Promise<unknown> },
|
||||
tableName: string,
|
||||
indexName: string,
|
||||
columnName: string,
|
||||
indexType: string,
|
||||
granularity: number,
|
||||
): Promise<void> {
|
||||
try {
|
||||
await service.execute(
|
||||
`ALTER TABLE ${tableName} ADD INDEX IF NOT EXISTS ${indexName} ${columnName} TYPE ${indexType} GRANULARITY ${granularity}`,
|
||||
);
|
||||
logger.info(
|
||||
`Added skip index ${indexName} on ${tableName}.${columnName}`,
|
||||
);
|
||||
} catch (err) {
|
||||
logger.error(
|
||||
`Error adding skip index ${indexName} to ${tableName}: ${err}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private async setTTL(
|
||||
service: { execute: (statement: string) => Promise<unknown> },
|
||||
tableName: string,
|
||||
): Promise<void> {
|
||||
try {
|
||||
await service.execute(
|
||||
`ALTER TABLE ${tableName} MODIFY TTL retentionDate DELETE`,
|
||||
);
|
||||
logger.info(
|
||||
`Set TTL on ${tableName} using retentionDate column`,
|
||||
);
|
||||
} catch (err) {
|
||||
logger.error(
|
||||
`Error setting TTL on ${tableName}: ${err}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public override async rollback(): Promise<void> {
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -57,6 +57,7 @@ import AddDefaultIncidentRolesToExistingProjects from "./AddDefaultIncidentRoles
|
||||
import AddDefaultIconsToIncidentRoles from "./AddDefaultIconsToIncidentRoles";
|
||||
import UpdateObserverRoleToAllowMultipleUsers from "./UpdateObserverRoleToAllowMultipleUsers";
|
||||
import AddColumnsToExceptionInstance from "./AddColumnsToExceptionInstance";
|
||||
import AddRetentionDateAndSkipIndexesToTelemetryTables from "./AddRetentionDateAndSkipIndexesToTelemetryTables";
|
||||
|
||||
// This is the order in which the migrations will be run. Add new migrations to the end of the array.
|
||||
|
||||
@@ -118,6 +119,7 @@ const DataMigrations: Array<DataMigrationBase> = [
|
||||
new AddDefaultIconsToIncidentRoles(),
|
||||
new UpdateObserverRoleToAllowMultipleUsers(),
|
||||
new AddColumnsToExceptionInstance(),
|
||||
new AddRetentionDateAndSkipIndexesToTelemetryTables(),
|
||||
];
|
||||
|
||||
export default DataMigrations;
|
||||
|
||||
Reference in New Issue
Block a user