mirror of
https://github.com/OneUptime/oneuptime.git
synced 2026-04-06 00:32:12 +02:00
301 lines
8.3 KiB
TypeScript
301 lines
8.3 KiB
TypeScript
import { SQL, Statement } from "../Utils/AnalyticsDatabase/Statement";
|
|
import TelemetryType from "../../Types/Telemetry/TelemetryType";
|
|
import LogDatabaseService from "./LogService";
|
|
import MetricDatabaseService from "./MetricService";
|
|
import SpanDatabaseService from "./SpanService";
|
|
import TableColumnType from "../../Types/AnalyticsDatabase/TableColumnType";
|
|
import { JSONObject } from "../../Types/JSON";
|
|
import ObjectID from "../../Types/ObjectID";
|
|
import OneUptimeDate from "../../Types/Date";
|
|
import GlobalCache from "../Infrastructure/GlobalCache";
|
|
import CaptureSpan from "../Utils/Telemetry/CaptureSpan";
|
|
import AnalyticsDatabaseService, {
|
|
DbJSONResponse,
|
|
Results,
|
|
} from "./AnalyticsDatabaseService";
|
|
|
|
type TelemetrySource = {
|
|
service: AnalyticsDatabaseService<any>;
|
|
tableName: string;
|
|
attributesColumn: string;
|
|
attributeKeysColumn: string;
|
|
timeColumn: string;
|
|
};
|
|
|
|
type TelemetryAttributesCacheEntry = {
|
|
attributes: Array<string>;
|
|
refreshedAt: Date;
|
|
};
|
|
|
|
export class TelemetryAttributeService {
|
|
private static readonly ATTRIBUTES_LIMIT: number = 5000;
|
|
private static readonly ROW_SCAN_LIMIT: number = 10000;
|
|
private static readonly CACHE_NAMESPACE: string = "telemetry-attributes";
|
|
private static readonly CACHE_STALE_AFTER_MINUTES: number = 5;
|
|
private static readonly LOOKBACK_WINDOW_IN_DAYS: number = 30;
|
|
|
|
private getTelemetrySource(
|
|
telemetryType: TelemetryType,
|
|
): TelemetrySource | null {
|
|
switch (telemetryType) {
|
|
case TelemetryType.Log:
|
|
return {
|
|
service: LogDatabaseService,
|
|
tableName: LogDatabaseService.model.tableName,
|
|
attributesColumn: "attributes",
|
|
attributeKeysColumn: "attributeKeys",
|
|
timeColumn: "time",
|
|
};
|
|
case TelemetryType.Metric:
|
|
return {
|
|
service: MetricDatabaseService,
|
|
tableName: MetricDatabaseService.model.tableName,
|
|
attributesColumn: "attributes",
|
|
attributeKeysColumn: "attributeKeys",
|
|
timeColumn: "time",
|
|
};
|
|
case TelemetryType.Trace:
|
|
return {
|
|
service: SpanDatabaseService,
|
|
tableName: SpanDatabaseService.model.tableName,
|
|
attributesColumn: "attributes",
|
|
attributeKeysColumn: "attributeKeys",
|
|
timeColumn: "startTime",
|
|
};
|
|
default:
|
|
return null;
|
|
}
|
|
}
|
|
|
|
@CaptureSpan()
|
|
public async fetchAttributes(data: {
|
|
projectId: ObjectID;
|
|
telemetryType: TelemetryType;
|
|
}): Promise<string[]> {
|
|
const source: TelemetrySource | null = this.getTelemetrySource(
|
|
data.telemetryType,
|
|
);
|
|
|
|
if (!source) {
|
|
return [];
|
|
}
|
|
|
|
const cacheKey: string = TelemetryAttributeService.getCacheKey(
|
|
data.projectId,
|
|
data.telemetryType,
|
|
);
|
|
|
|
const cachedEntry: TelemetryAttributesCacheEntry | null =
|
|
await TelemetryAttributeService.getCachedAttributes(cacheKey);
|
|
|
|
if (cachedEntry && TelemetryAttributeService.isCacheFresh(cachedEntry)) {
|
|
return cachedEntry.attributes;
|
|
}
|
|
|
|
let attributes: Array<string> = [];
|
|
|
|
try {
|
|
attributes = await TelemetryAttributeService.fetchAttributesFromDatabase({
|
|
projectId: data.projectId,
|
|
source,
|
|
});
|
|
} catch (error) {
|
|
if (cachedEntry) {
|
|
return cachedEntry.attributes;
|
|
}
|
|
|
|
throw error;
|
|
}
|
|
|
|
await TelemetryAttributeService.storeAttributesInCache(
|
|
cacheKey,
|
|
attributes,
|
|
);
|
|
|
|
if (attributes.length === 0 && cachedEntry) {
|
|
return cachedEntry.attributes;
|
|
}
|
|
|
|
return attributes;
|
|
}
|
|
|
|
private static getCacheKey(
|
|
projectId: ObjectID,
|
|
telemetryType: TelemetryType,
|
|
): string {
|
|
return `${projectId.toString()}:${telemetryType}`;
|
|
}
|
|
|
|
private static getLookbackStartDate(): Date {
|
|
return OneUptimeDate.addRemoveDays(
|
|
OneUptimeDate.getCurrentDate(),
|
|
-TelemetryAttributeService.LOOKBACK_WINDOW_IN_DAYS,
|
|
);
|
|
}
|
|
|
|
private static async getCachedAttributes(
|
|
cacheKey: string,
|
|
): Promise<TelemetryAttributesCacheEntry | null> {
|
|
let payload: JSONObject | null = null;
|
|
|
|
try {
|
|
payload = await GlobalCache.getJSONObject(
|
|
TelemetryAttributeService.CACHE_NAMESPACE,
|
|
cacheKey,
|
|
);
|
|
} catch {
|
|
return null;
|
|
}
|
|
|
|
if (!payload) {
|
|
return null;
|
|
}
|
|
|
|
const attributesValue: JSONObject["attributes"] = payload["attributes"];
|
|
const refreshedAtValue: JSONObject["refreshedAt"] = payload["refreshedAt"];
|
|
|
|
if (
|
|
!Array.isArray(attributesValue) ||
|
|
typeof refreshedAtValue !== "string"
|
|
) {
|
|
return null;
|
|
}
|
|
|
|
const attributeCandidates: Array<unknown> =
|
|
attributesValue as Array<unknown>;
|
|
|
|
const attributes: Array<string> = attributeCandidates.filter(
|
|
(attribute: unknown): attribute is string => {
|
|
return typeof attribute === "string";
|
|
},
|
|
);
|
|
|
|
return {
|
|
attributes,
|
|
refreshedAt: OneUptimeDate.fromString(refreshedAtValue),
|
|
};
|
|
}
|
|
|
|
private static isCacheFresh(
|
|
cacheEntry: TelemetryAttributesCacheEntry,
|
|
): boolean {
|
|
const now: Date = OneUptimeDate.getCurrentDate();
|
|
const minutesSinceRefresh: number = Math.abs(
|
|
OneUptimeDate.getNumberOfMinutesBetweenDates(cacheEntry.refreshedAt, now),
|
|
);
|
|
|
|
return (
|
|
minutesSinceRefresh <= TelemetryAttributeService.CACHE_STALE_AFTER_MINUTES
|
|
);
|
|
}
|
|
|
|
private static async storeAttributesInCache(
|
|
cacheKey: string,
|
|
attributes: Array<string>,
|
|
): Promise<void> {
|
|
const payload: JSONObject = {
|
|
attributes,
|
|
refreshedAt: OneUptimeDate.getCurrentDate().toISOString(),
|
|
};
|
|
|
|
try {
|
|
await GlobalCache.setJSON(
|
|
TelemetryAttributeService.CACHE_NAMESPACE,
|
|
cacheKey,
|
|
payload,
|
|
{
|
|
expiresInSeconds:
|
|
TelemetryAttributeService.CACHE_STALE_AFTER_MINUTES * 60,
|
|
},
|
|
);
|
|
} catch {
|
|
return;
|
|
}
|
|
}
|
|
|
|
private static buildAttributesStatement(data: {
|
|
projectId: ObjectID;
|
|
tableName: string;
|
|
attributesColumn: string;
|
|
attributeKeysColumn: string;
|
|
timeColumn: string;
|
|
}): Statement {
|
|
const lookbackStartDate: Date =
|
|
TelemetryAttributeService.getLookbackStartDate();
|
|
|
|
const statement: Statement = SQL`
|
|
WITH filtered AS (
|
|
SELECT arrayJoin(
|
|
if(
|
|
empty(${data.attributeKeysColumn}),
|
|
mapKeys(${data.attributesColumn}),
|
|
${data.attributeKeysColumn}
|
|
)
|
|
) AS attribute
|
|
FROM ${data.tableName}
|
|
WHERE projectId = ${{
|
|
type: TableColumnType.ObjectID,
|
|
value: data.projectId,
|
|
}}
|
|
AND (
|
|
NOT empty(${data.attributeKeysColumn}) OR
|
|
NOT empty(${data.attributesColumn})
|
|
)
|
|
AND ${data.timeColumn} >= ${{
|
|
type: TableColumnType.Date,
|
|
value: lookbackStartDate,
|
|
}}
|
|
ORDER BY ${data.timeColumn} DESC
|
|
LIMIT ${{
|
|
type: TableColumnType.Number,
|
|
value: TelemetryAttributeService.ROW_SCAN_LIMIT,
|
|
}}
|
|
)
|
|
SELECT DISTINCT attribute
|
|
FROM filtered
|
|
WHERE attribute IS NOT NULL AND attribute != ''
|
|
ORDER BY attribute ASC
|
|
LIMIT ${{
|
|
type: TableColumnType.Number,
|
|
value: TelemetryAttributeService.ATTRIBUTES_LIMIT,
|
|
}}
|
|
`;
|
|
|
|
return statement;
|
|
}
|
|
|
|
private static async fetchAttributesFromDatabase(data: {
|
|
projectId: ObjectID;
|
|
source: TelemetrySource;
|
|
}): Promise<Array<string>> {
|
|
const statement: Statement =
|
|
TelemetryAttributeService.buildAttributesStatement({
|
|
projectId: data.projectId,
|
|
tableName: data.source.tableName,
|
|
attributesColumn: data.source.attributesColumn,
|
|
attributeKeysColumn: data.source.attributeKeysColumn,
|
|
timeColumn: data.source.timeColumn,
|
|
});
|
|
|
|
const dbResult: Results = await data.source.service.executeQuery(statement);
|
|
const response: DbJSONResponse = await dbResult.json<{
|
|
data?: Array<JSONObject>;
|
|
}>();
|
|
|
|
const rows: Array<JSONObject> = response.data || [];
|
|
|
|
const attributeKeys: Array<string> = rows
|
|
.map((row: JSONObject) => {
|
|
const attribute: unknown = row["attribute"];
|
|
return typeof attribute === "string" ? attribute.trim() : null;
|
|
})
|
|
.filter((attribute: string | null): attribute is string => {
|
|
return Boolean(attribute);
|
|
});
|
|
|
|
return Array.from(new Set(attributeKeys));
|
|
}
|
|
}
|
|
|
|
export default new TelemetryAttributeService();
|