diff --git a/Common/Models/AnalyticsModels/Index.ts b/Common/Models/AnalyticsModels/Index.ts index 622d36b5a2..1575576650 100644 --- a/Common/Models/AnalyticsModels/Index.ts +++ b/Common/Models/AnalyticsModels/Index.ts @@ -4,6 +4,8 @@ import Metric from "./Metric"; import Span from "./Span"; import ExceptionInstance from "./ExceptionInstance"; import MonitorLog from "./MonitorLog"; +import Profile from "./Profile"; +import ProfileSample from "./ProfileSample"; const AnalyticsModels: Array<{ new (): AnalyticsBaseModel }> = [ Log, @@ -11,6 +13,8 @@ const AnalyticsModels: Array<{ new (): AnalyticsBaseModel }> = [ Metric, ExceptionInstance, MonitorLog, + Profile, + ProfileSample, ]; const modelTypeMap: { [key: string]: { new (): AnalyticsBaseModel } } = {}; diff --git a/Common/Models/AnalyticsModels/Profile.ts b/Common/Models/AnalyticsModels/Profile.ts new file mode 100644 index 0000000000..9a1a96d4b4 --- /dev/null +++ b/Common/Models/AnalyticsModels/Profile.ts @@ -0,0 +1,687 @@ +import AnalyticsBaseModel from "./AnalyticsBaseModel/AnalyticsBaseModel"; +import Route from "../../Types/API/Route"; +import AnalyticsTableEngine from "../../Types/AnalyticsDatabase/AnalyticsTableEngine"; +import AnalyticsTableName from "../../Types/AnalyticsDatabase/AnalyticsTableName"; +import AnalyticsTableColumn, { + SkipIndexType, +} from "../../Types/AnalyticsDatabase/TableColumn"; +import TableColumnType from "../../Types/AnalyticsDatabase/TableColumnType"; +import { JSONObject } from "../../Types/JSON"; +import ObjectID from "../../Types/ObjectID"; +import Permission from "../../Types/Permission"; + +export default class Profile extends AnalyticsBaseModel { + public constructor() { + const projectIdColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "projectId", + title: "Project ID", + description: "ID of project", + required: true, + type: TableColumnType.ObjectID, + isTenantId: true, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const serviceIdColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "serviceId", + title: "Service ID", + description: "ID of the Service which created the profile", + required: true, + type: TableColumnType.ObjectID, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const profileIdColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "profileId", + title: "Profile ID", + description: "Unique identifier for the profile", + required: true, + type: TableColumnType.Text, + codec: { codec: "ZSTD", level: 1 }, + skipIndex: { + name: "idx_profile_id", + type: SkipIndexType.BloomFilter, + params: [0.01], + granularity: 1, + }, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const traceIdColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "traceId", + title: "Trace ID", + description: "Correlation with traces", + required: false, + type: TableColumnType.Text, + codec: { codec: "ZSTD", level: 1 }, + skipIndex: { + name: "idx_trace_id", + type: SkipIndexType.BloomFilter, + params: [0.01], + granularity: 1, + }, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const spanIdColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "spanId", + title: "Span ID", + description: "Correlation with spans", + required: false, + type: TableColumnType.Text, + codec: { codec: "ZSTD", level: 1 }, + skipIndex: { + name: "idx_span_id", + type: SkipIndexType.BloomFilter, + params: [0.01], + granularity: 1, + }, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const startTimeColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "startTime", + title: "Start Time", + description: "Profile start timestamp", + required: true, + type: TableColumnType.DateTime64, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const endTimeColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "endTime", + title: "End Time", + description: "Profile end timestamp", + required: true, + type: TableColumnType.DateTime64, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const startTimeUnixNanoColumn: AnalyticsTableColumn = + new AnalyticsTableColumn({ + key: "startTimeUnixNano", + title: "Start Time in Unix Nano", + description: "Profile start timestamp in unix nanoseconds", + required: true, + type: TableColumnType.LongNumber, + codec: { codec: "ZSTD", level: 1 }, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const endTimeUnixNanoColumn: AnalyticsTableColumn = + new AnalyticsTableColumn({ + key: "endTimeUnixNano", + title: "End Time in Unix Nano", + description: "Profile end timestamp in unix nanoseconds", + required: true, + type: TableColumnType.LongNumber, + codec: { codec: "ZSTD", level: 1 }, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const durationNanoColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "durationNano", + title: "Duration in Nanoseconds", + description: "Duration of the profile in nanoseconds", + required: true, + type: TableColumnType.LongNumber, + codec: { codec: "ZSTD", level: 1 }, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const profileTypeColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "profileType", + title: "Profile Type", + description: + "Type of profile (e.g., cpu, wall, alloc_objects, alloc_space, goroutine)", + required: true, + type: TableColumnType.Text, + skipIndex: { + name: "idx_profile_type", + type: SkipIndexType.Set, + params: [20], + granularity: 4, + }, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const unitColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "unit", + title: "Unit", + description: "Unit of the profile values (e.g., nanoseconds, bytes)", + required: true, + type: TableColumnType.Text, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const periodTypeColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "periodType", + title: "Period Type", + description: "Sampling period type", + required: false, + type: TableColumnType.Text, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const periodColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "period", + title: "Period", + description: "Sampling period value", + required: false, + type: TableColumnType.LongNumber, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const attributesColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "attributes", + title: "Attributes", + description: "Profile-level attributes", + required: true, + defaultValue: {}, + type: TableColumnType.MapStringString, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const attributeKeysColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "attributeKeys", + title: "Attribute Keys", + description: "Attribute keys extracted from attributes", + required: true, + defaultValue: [], + type: TableColumnType.ArrayText, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const sampleCountColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "sampleCount", + title: "Sample Count", + description: "Number of samples in this profile", + required: true, + type: TableColumnType.Number, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const originalPayloadFormatColumn: AnalyticsTableColumn = + new AnalyticsTableColumn({ + key: "originalPayloadFormat", + title: "Original Payload Format", + description: "Format of the original payload (e.g., pprofext)", + required: false, + type: TableColumnType.Text, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const retentionDateColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "retentionDate", + title: "Retention Date", + description: + "Date after which this row is eligible for TTL deletion, computed at ingest time as startTime + service.retainTelemetryDataForDays", + required: true, + type: TableColumnType.Date, + defaultValue: undefined, + }); + + super({ + tableName: AnalyticsTableName.Profile, + tableEngine: AnalyticsTableEngine.MergeTree, + singularName: "Profile", + pluralName: "Profiles", + crudApiPath: new Route("/profile"), + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.EditTelemetryServiceProfiles, + ], + delete: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.DeleteTelemetryServiceProfiles, + ], + }, + tableColumns: [ + projectIdColumn, + serviceIdColumn, + profileIdColumn, + traceIdColumn, + spanIdColumn, + startTimeColumn, + endTimeColumn, + startTimeUnixNanoColumn, + endTimeUnixNanoColumn, + durationNanoColumn, + profileTypeColumn, + unitColumn, + periodTypeColumn, + periodColumn, + attributesColumn, + attributeKeysColumn, + sampleCountColumn, + originalPayloadFormatColumn, + retentionDateColumn, + ], + sortKeys: ["projectId", "startTime", "serviceId", "profileType"], + primaryKeys: ["projectId", "startTime", "serviceId", "profileType"], + partitionKey: "sipHash64(projectId) % 16", + ttlExpression: "retentionDate DELETE", + }); + } + + public get projectId(): ObjectID | undefined { + return this.getColumnValue("projectId") as ObjectID | undefined; + } + + public set projectId(v: ObjectID | undefined) { + this.setColumnValue("projectId", v); + } + + public get serviceId(): ObjectID | undefined { + return this.getColumnValue("serviceId") as ObjectID | undefined; + } + + public set serviceId(v: ObjectID | undefined) { + this.setColumnValue("serviceId", v); + } + + public get profileId(): string | undefined { + return this.getColumnValue("profileId") as string | undefined; + } + + public set profileId(v: string | undefined) { + this.setColumnValue("profileId", v); + } + + public get traceId(): string | undefined { + return this.getColumnValue("traceId") as string | undefined; + } + + public set traceId(v: string | undefined) { + this.setColumnValue("traceId", v); + } + + public get spanId(): string | undefined { + return this.getColumnValue("spanId") as string | undefined; + } + + public set spanId(v: string | undefined) { + this.setColumnValue("spanId", v); + } + + public get startTime(): Date | undefined { + return this.getColumnValue("startTime") as Date | undefined; + } + + public set startTime(v: Date | undefined) { + this.setColumnValue("startTime", v); + } + + public get endTime(): Date | undefined { + return this.getColumnValue("endTime") as Date | undefined; + } + + public set endTime(v: Date | undefined) { + this.setColumnValue("endTime", v); + } + + public get startTimeUnixNano(): number | undefined { + return this.getColumnValue("startTimeUnixNano") as number | undefined; + } + + public set startTimeUnixNano(v: number | undefined) { + this.setColumnValue("startTimeUnixNano", v); + } + + public get endTimeUnixNano(): number | undefined { + return this.getColumnValue("endTimeUnixNano") as number | undefined; + } + + public set endTimeUnixNano(v: number | undefined) { + this.setColumnValue("endTimeUnixNano", v); + } + + public get durationNano(): number | undefined { + return this.getColumnValue("durationNano") as number | undefined; + } + + public set durationNano(v: number | undefined) { + this.setColumnValue("durationNano", v); + } + + public get profileType(): string | undefined { + return this.getColumnValue("profileType") as string | undefined; + } + + public set profileType(v: string | undefined) { + this.setColumnValue("profileType", v); + } + + public get unit(): string | undefined { + return this.getColumnValue("unit") as string | undefined; + } + + public set unit(v: string | undefined) { + this.setColumnValue("unit", v); + } + + public get periodType(): string | undefined { + return this.getColumnValue("periodType") as string | undefined; + } + + public set periodType(v: string | undefined) { + this.setColumnValue("periodType", v); + } + + public get period(): number | undefined { + return this.getColumnValue("period") as number | undefined; + } + + public set period(v: number | undefined) { + this.setColumnValue("period", v); + } + + public get attributes(): JSONObject | undefined { + return this.getColumnValue("attributes") as JSONObject | undefined; + } + + public set attributes(v: JSONObject | undefined) { + this.setColumnValue("attributes", v); + } + + public get attributeKeys(): Array | undefined { + return this.getColumnValue("attributeKeys") as Array | undefined; + } + + public set attributeKeys(v: Array | undefined) { + this.setColumnValue("attributeKeys", v); + } + + public get sampleCount(): number | undefined { + return this.getColumnValue("sampleCount") as number | undefined; + } + + public set sampleCount(v: number | undefined) { + this.setColumnValue("sampleCount", v); + } + + public get originalPayloadFormat(): string | undefined { + return this.getColumnValue("originalPayloadFormat") as string | undefined; + } + + public set originalPayloadFormat(v: string | undefined) { + this.setColumnValue("originalPayloadFormat", v); + } + + public get retentionDate(): Date | undefined { + return this.getColumnValue("retentionDate") as Date | undefined; + } + + public set retentionDate(v: Date | undefined) { + this.setColumnValue("retentionDate", v); + } +} diff --git a/Common/Models/AnalyticsModels/ProfileSample.ts b/Common/Models/AnalyticsModels/ProfileSample.ts new file mode 100644 index 0000000000..effb3c9dbc --- /dev/null +++ b/Common/Models/AnalyticsModels/ProfileSample.ts @@ -0,0 +1,546 @@ +import AnalyticsBaseModel from "./AnalyticsBaseModel/AnalyticsBaseModel"; +import Route from "../../Types/API/Route"; +import AnalyticsTableEngine from "../../Types/AnalyticsDatabase/AnalyticsTableEngine"; +import AnalyticsTableName from "../../Types/AnalyticsDatabase/AnalyticsTableName"; +import AnalyticsTableColumn, { + SkipIndexType, +} from "../../Types/AnalyticsDatabase/TableColumn"; +import TableColumnType from "../../Types/AnalyticsDatabase/TableColumnType"; +import { JSONObject } from "../../Types/JSON"; +import ObjectID from "../../Types/ObjectID"; +import Permission from "../../Types/Permission"; + +export default class ProfileSample extends AnalyticsBaseModel { + public constructor() { + const projectIdColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "projectId", + title: "Project ID", + description: "ID of project", + required: true, + type: TableColumnType.ObjectID, + isTenantId: true, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const serviceIdColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "serviceId", + title: "Service ID", + description: "ID of the Service which created the profile", + required: true, + type: TableColumnType.ObjectID, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const profileIdColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "profileId", + title: "Profile ID", + description: "FK to profile table", + required: true, + type: TableColumnType.Text, + codec: { codec: "ZSTD", level: 1 }, + skipIndex: { + name: "idx_profile_id", + type: SkipIndexType.BloomFilter, + params: [0.01], + granularity: 1, + }, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const traceIdColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "traceId", + title: "Trace ID", + description: "Trace correlation (from Link table)", + required: false, + type: TableColumnType.Text, + codec: { codec: "ZSTD", level: 1 }, + skipIndex: { + name: "idx_trace_id", + type: SkipIndexType.BloomFilter, + params: [0.01], + granularity: 1, + }, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const spanIdColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "spanId", + title: "Span ID", + description: "Span correlation (from Link table)", + required: false, + type: TableColumnType.Text, + codec: { codec: "ZSTD", level: 1 }, + skipIndex: { + name: "idx_span_id", + type: SkipIndexType.BloomFilter, + params: [0.01], + granularity: 1, + }, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const timeColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "time", + title: "Time", + description: "Sample timestamp", + required: true, + type: TableColumnType.DateTime64, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const timeUnixNanoColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "timeUnixNano", + title: "Time (in Unix Nano)", + description: "Sample timestamp in unix nanoseconds", + required: true, + type: TableColumnType.LongNumber, + codec: { codec: "ZSTD", level: 1 }, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const stacktraceColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "stacktrace", + title: "Stacktrace", + description: "Fully-resolved stack frames (function@file:line)", + required: true, + defaultValue: [], + type: TableColumnType.ArrayText, + codec: { codec: "ZSTD", level: 3 }, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const stacktraceHashColumn: AnalyticsTableColumn = + new AnalyticsTableColumn({ + key: "stacktraceHash", + title: "Stacktrace Hash", + description: "Hash of stacktrace for grouping", + required: true, + type: TableColumnType.Text, + codec: { codec: "ZSTD", level: 1 }, + skipIndex: { + name: "idx_stacktrace_hash", + type: SkipIndexType.BloomFilter, + params: [0.01], + granularity: 1, + }, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const frameTypesColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "frameTypes", + title: "Frame Types", + description: + "Per-frame runtime type (kernel, native, jvm, cpython, go, v8js, etc.)", + required: true, + defaultValue: [], + type: TableColumnType.ArrayText, + codec: { codec: "ZSTD", level: 3 }, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const valueColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "value", + title: "Value", + description: "Sample value (CPU time, bytes, count)", + required: true, + type: TableColumnType.LongNumber, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const profileTypeColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "profileType", + title: "Profile Type", + description: "Denormalized profile type for filtering", + required: true, + type: TableColumnType.Text, + skipIndex: { + name: "idx_profile_type", + type: SkipIndexType.Set, + params: [20], + granularity: 4, + }, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const labelsColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "labels", + title: "Labels", + description: "Sample-level labels", + required: true, + defaultValue: {}, + type: TableColumnType.MapStringString, + codec: { codec: "ZSTD", level: 3 }, + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [], + }, + }); + + const retentionDateColumn: AnalyticsTableColumn = new AnalyticsTableColumn({ + key: "retentionDate", + title: "Retention Date", + description: + "Date after which this row is eligible for TTL deletion, computed at ingest time as time + service.retainTelemetryDataForDays", + required: true, + type: TableColumnType.Date, + defaultValue: undefined, + }); + + super({ + tableName: AnalyticsTableName.ProfileSample, + tableEngine: AnalyticsTableEngine.MergeTree, + singularName: "ProfileSample", + pluralName: "ProfileSamples", + crudApiPath: new Route("/profile-sample"), + accessControl: { + read: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.ReadTelemetryServiceProfiles, + ], + create: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.CreateTelemetryServiceProfiles, + ], + update: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.EditTelemetryServiceProfiles, + ], + delete: [ + Permission.ProjectOwner, + Permission.ProjectAdmin, + Permission.ProjectMember, + Permission.DeleteTelemetryServiceProfiles, + ], + }, + tableColumns: [ + projectIdColumn, + serviceIdColumn, + profileIdColumn, + traceIdColumn, + spanIdColumn, + timeColumn, + timeUnixNanoColumn, + stacktraceColumn, + stacktraceHashColumn, + frameTypesColumn, + valueColumn, + profileTypeColumn, + labelsColumn, + retentionDateColumn, + ], + sortKeys: [ + "projectId", + "time", + "serviceId", + "profileType", + "stacktraceHash", + ], + primaryKeys: [ + "projectId", + "time", + "serviceId", + "profileType", + "stacktraceHash", + ], + partitionKey: "sipHash64(projectId) % 16", + ttlExpression: "retentionDate DELETE", + }); + } + + public get projectId(): ObjectID | undefined { + return this.getColumnValue("projectId") as ObjectID | undefined; + } + + public set projectId(v: ObjectID | undefined) { + this.setColumnValue("projectId", v); + } + + public get serviceId(): ObjectID | undefined { + return this.getColumnValue("serviceId") as ObjectID | undefined; + } + + public set serviceId(v: ObjectID | undefined) { + this.setColumnValue("serviceId", v); + } + + public get profileId(): string | undefined { + return this.getColumnValue("profileId") as string | undefined; + } + + public set profileId(v: string | undefined) { + this.setColumnValue("profileId", v); + } + + public get traceId(): string | undefined { + return this.getColumnValue("traceId") as string | undefined; + } + + public set traceId(v: string | undefined) { + this.setColumnValue("traceId", v); + } + + public get spanId(): string | undefined { + return this.getColumnValue("spanId") as string | undefined; + } + + public set spanId(v: string | undefined) { + this.setColumnValue("spanId", v); + } + + public get time(): Date | undefined { + return this.getColumnValue("time") as Date | undefined; + } + + public set time(v: Date | undefined) { + this.setColumnValue("time", v); + } + + public get timeUnixNano(): number | undefined { + return this.getColumnValue("timeUnixNano") as number | undefined; + } + + public set timeUnixNano(v: number | undefined) { + this.setColumnValue("timeUnixNano", v); + } + + public get stacktrace(): Array | undefined { + return this.getColumnValue("stacktrace") as Array | undefined; + } + + public set stacktrace(v: Array | undefined) { + this.setColumnValue("stacktrace", v); + } + + public get stacktraceHash(): string | undefined { + return this.getColumnValue("stacktraceHash") as string | undefined; + } + + public set stacktraceHash(v: string | undefined) { + this.setColumnValue("stacktraceHash", v); + } + + public get frameTypes(): Array | undefined { + return this.getColumnValue("frameTypes") as Array | undefined; + } + + public set frameTypes(v: Array | undefined) { + this.setColumnValue("frameTypes", v); + } + + public get value(): number | undefined { + return this.getColumnValue("value") as number | undefined; + } + + public set value(v: number | undefined) { + this.setColumnValue("value", v); + } + + public get profileType(): string | undefined { + return this.getColumnValue("profileType") as string | undefined; + } + + public set profileType(v: string | undefined) { + this.setColumnValue("profileType", v); + } + + public get labels(): JSONObject | undefined { + return this.getColumnValue("labels") as JSONObject | undefined; + } + + public set labels(v: JSONObject | undefined) { + this.setColumnValue("labels", v); + } + + public get retentionDate(): Date | undefined { + return this.getColumnValue("retentionDate") as Date | undefined; + } + + public set retentionDate(v: Date | undefined) { + this.setColumnValue("retentionDate", v); + } +} diff --git a/Common/Server/Services/ProfileSampleService.ts b/Common/Server/Services/ProfileSampleService.ts new file mode 100644 index 0000000000..b47796ef76 --- /dev/null +++ b/Common/Server/Services/ProfileSampleService.ts @@ -0,0 +1,11 @@ +import ClickhouseDatabase from "../Infrastructure/ClickhouseDatabase"; +import AnalyticsDatabaseService from "./AnalyticsDatabaseService"; +import ProfileSample from "../../Models/AnalyticsModels/ProfileSample"; + +export class ProfileSampleService extends AnalyticsDatabaseService { + public constructor(clickhouseDatabase?: ClickhouseDatabase | undefined) { + super({ modelType: ProfileSample, database: clickhouseDatabase }); + } +} + +export default new ProfileSampleService(); diff --git a/Common/Server/Services/ProfileService.ts b/Common/Server/Services/ProfileService.ts new file mode 100644 index 0000000000..7a5c1c9b39 --- /dev/null +++ b/Common/Server/Services/ProfileService.ts @@ -0,0 +1,11 @@ +import ClickhouseDatabase from "../Infrastructure/ClickhouseDatabase"; +import AnalyticsDatabaseService from "./AnalyticsDatabaseService"; +import Profile from "../../Models/AnalyticsModels/Profile"; + +export class ProfileService extends AnalyticsDatabaseService { + public constructor(clickhouseDatabase?: ClickhouseDatabase | undefined) { + super({ modelType: Profile, database: clickhouseDatabase }); + } +} + +export default new ProfileService(); diff --git a/Common/Types/AnalyticsDatabase/AnalyticsTableName.ts b/Common/Types/AnalyticsDatabase/AnalyticsTableName.ts index 4c43b9e4de..86d817e7c4 100644 --- a/Common/Types/AnalyticsDatabase/AnalyticsTableName.ts +++ b/Common/Types/AnalyticsDatabase/AnalyticsTableName.ts @@ -4,6 +4,8 @@ enum AnalyticsTableName { ExceptionInstance = "ExceptionItemV2", Span = "SpanItemV2", MonitorLog = "MonitorLogV2", + Profile = "ProfileItemV2", + ProfileSample = "ProfileSampleItemV2", } export default AnalyticsTableName; diff --git a/Common/Types/MeteredPlan/ProductType.ts b/Common/Types/MeteredPlan/ProductType.ts index a09b0674a7..102cbbf3ae 100644 --- a/Common/Types/MeteredPlan/ProductType.ts +++ b/Common/Types/MeteredPlan/ProductType.ts @@ -2,6 +2,7 @@ enum ProductType { Logs = "Logs", Traces = "Traces", Metrics = "Metrics", + Profiles = "Profiles", ActiveMonitoring = "Active Monitoring", } diff --git a/Common/Types/Permission.ts b/Common/Types/Permission.ts index 1b7367c429..7c05116108 100644 --- a/Common/Types/Permission.ts +++ b/Common/Types/Permission.ts @@ -130,6 +130,12 @@ enum Permission { EditTelemetryServiceMetrics = "EditTelemetryServiceMetrics", ReadTelemetryServiceMetrics = "ReadTelemetryServiceMetrics", + // Profiles + CreateTelemetryServiceProfiles = "CreateTelemetryServiceProfiles", + DeleteTelemetryServiceProfiles = "DeleteTelemetryServiceProfiles", + EditTelemetryServiceProfiles = "EditTelemetryServiceProfiles", + ReadTelemetryServiceProfiles = "ReadTelemetryServiceProfiles", + // Billing Permissions (Owner Permission) ManageProjectBilling = "ManageProjectBilling", @@ -4745,6 +4751,43 @@ export class PermissionHelper { group: PermissionGroup.Telemetry, }, + { + permission: Permission.CreateTelemetryServiceProfiles, + title: "Create Telemetry Service Profiles", + description: + "This permission can create Telemetry Service Profiles this project.", + isAssignableToTenant: true, + isAccessControlPermission: false, + group: PermissionGroup.Telemetry, + }, + { + permission: Permission.DeleteTelemetryServiceProfiles, + title: "Delete Telemetry Service Profiles", + description: + "This permission can delete Telemetry Service Profiles of this project.", + isAssignableToTenant: true, + isAccessControlPermission: false, + group: PermissionGroup.Telemetry, + }, + { + permission: Permission.EditTelemetryServiceProfiles, + title: "Edit Telemetry Service Profiles", + description: + "This permission can edit Telemetry Service Profiles of this project.", + isAssignableToTenant: true, + isAccessControlPermission: false, + group: PermissionGroup.Telemetry, + }, + { + permission: Permission.ReadTelemetryServiceProfiles, + title: "Read Telemetry Service Profiles", + description: + "This permission can read Telemetry Service Profiles of this project.", + isAssignableToTenant: true, + isAccessControlPermission: false, + group: PermissionGroup.Telemetry, + }, + { permission: Permission.CreateScheduledMaintenanceOwnerTeam, title: "Create Scheduled Maintenance Team Owner", diff --git a/Common/Types/Telemetry/TelemetryType.ts b/Common/Types/Telemetry/TelemetryType.ts index d5aa68fd9d..f0e98b7a31 100644 --- a/Common/Types/Telemetry/TelemetryType.ts +++ b/Common/Types/Telemetry/TelemetryType.ts @@ -3,6 +3,7 @@ enum TelemetryType { Trace = "Trace", Log = "Log", Exception = "Exception", + Profile = "Profile", } export default TelemetryType; diff --git a/Telemetry/API/OTelIngest.ts b/Telemetry/API/OTelIngest.ts index c80c8ce085..351a1ca80d 100644 --- a/Telemetry/API/OTelIngest.ts +++ b/Telemetry/API/OTelIngest.ts @@ -10,6 +10,7 @@ import OpenTelemetryRequestMiddleware from "../Middleware/OtelRequestMiddleware" import OtelTracesIngestService from "../Services/OtelTracesIngestService"; import OtelMetricsIngestService from "../Services/OtelMetricsIngestService"; import OtelLogsIngestService from "../Services/OtelLogsIngestService"; +import OtelProfilesIngestService from "../Services/OtelProfilesIngestService"; import TelemetryQueueService from "../Services/Queue/TelemetryQueueService"; import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization"; import { JSONObject } from "Common/Types/JSON"; @@ -61,6 +62,19 @@ router.post( }, ); +router.post( + "/otlp/v1/profiles", + OpenTelemetryRequestMiddleware.getProductType, + TelemetryIngest.isAuthorizedServiceMiddleware, + async ( + req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise => { + return OtelProfilesIngestService.ingestProfiles(req, res, next); + }, +); + // Queue stats endpoint router.get( "/otlp/queue/stats", diff --git a/Telemetry/Config.ts b/Telemetry/Config.ts index 44d31abbfe..5deb9a6f6a 100644 --- a/Telemetry/Config.ts +++ b/Telemetry/Config.ts @@ -48,6 +48,16 @@ export const TELEMETRY_EXCEPTION_FLUSH_BATCH_SIZE: number = parseBatchSize( 500, ); +export const TELEMETRY_PROFILE_FLUSH_BATCH_SIZE: number = parseBatchSize( + "TELEMETRY_PROFILE_FLUSH_BATCH_SIZE", + 500, +); + +export const TELEMETRY_PROFILE_SAMPLE_FLUSH_BATCH_SIZE: number = parseBatchSize( + "TELEMETRY_PROFILE_SAMPLE_FLUSH_BATCH_SIZE", + 500, +); + /* * Some telemetry batches can be large and take >30s (BullMQ default lock) to process. * Allow configuring a longer lock duration (in ms) to avoid premature stall detection. diff --git a/Telemetry/GrpcServer.ts b/Telemetry/GrpcServer.ts index ce04c153af..9bd3c42d81 100644 --- a/Telemetry/GrpcServer.ts +++ b/Telemetry/GrpcServer.ts @@ -10,6 +10,7 @@ import { TelemetryRequest } from "Common/Server/Middleware/TelemetryIngest"; import TracesQueueService from "./Services/Queue/TracesQueueService"; import LogsQueueService from "./Services/Queue/LogsQueueService"; import MetricsQueueService from "./Services/Queue/MetricsQueueService"; +import ProfilesQueueService from "./Services/Queue/ProfilesQueueService"; const GRPC_PORT: number = 4317; @@ -162,11 +163,23 @@ export function startGrpcServer(): void { }, ); + const profilesServiceDef: protoLoader.PackageDefinition = + protoLoader.loadSync(path.join(PROTO_DIR, "profiles_service.proto"), { + keepCase: false, + longs: String, + enums: String, + defaults: true, + oneofs: true, + includeDirs: [PROTO_DIR], + }); + const traceProto: grpc.GrpcObject = grpc.loadPackageDefinition(traceServiceDef); const logsProto: grpc.GrpcObject = grpc.loadPackageDefinition(logsServiceDef); const metricsProto: grpc.GrpcObject = grpc.loadPackageDefinition(metricsServiceDef); + const profilesProto: grpc.GrpcObject = + grpc.loadPackageDefinition(profilesServiceDef); type ProtoServiceDef = { service: grpc.ServiceDefinition; @@ -213,6 +226,17 @@ export function startGrpcServer(): void { "MetricsService", ); + const profilesServiceDefinition: grpc.ServiceDefinition = + getServiceDefinition( + profilesProto, + "opentelemetry", + "proto", + "collector", + "profiles", + "v1development", + "ProfilesService", + ); + const server: grpc.Server = new grpc.Server({ "grpc.max_receive_message_length": 50 * 1024 * 1024, // 50MB }); @@ -250,6 +274,17 @@ export function startGrpcServer(): void { }, }); + server.addService(profilesServiceDefinition, { + Export: (call: GrpcCall, callback: GrpcCallback): void => { + handleExport( + call, + callback, + ProductType.Profiles, + ProfilesQueueService.addProfileIngestJob.bind(ProfilesQueueService), + ); + }, + }); + server.bindAsync( `0.0.0.0:${GRPC_PORT}`, grpc.ServerCredentials.createInsecure(), diff --git a/Telemetry/Jobs/TelemetryIngest/ProcessTelemetry.ts b/Telemetry/Jobs/TelemetryIngest/ProcessTelemetry.ts index 8117dc6445..fd83aad4f2 100644 --- a/Telemetry/Jobs/TelemetryIngest/ProcessTelemetry.ts +++ b/Telemetry/Jobs/TelemetryIngest/ProcessTelemetry.ts @@ -5,6 +5,7 @@ import { import OtelLogsIngestService from "../../Services/OtelLogsIngestService"; import OtelTracesIngestService from "../../Services/OtelTracesIngestService"; import OtelMetricsIngestService from "../../Services/OtelMetricsIngestService"; +import OtelProfilesIngestService from "../../Services/OtelProfilesIngestService"; import SyslogIngestService from "../../Services/SyslogIngestService"; import FluentLogsIngestService from "../../Services/FluentLogsIngestService"; import { @@ -80,6 +81,20 @@ QueueWorker.getWorker( break; } + case TelemetryType.Profiles: { + const mockRequest: TelemetryRequest = { + projectId: new ObjectID(jobData.projectId!.toString()), + body: jobData.requestBody!, + headers: jobData.requestHeaders!, + } as TelemetryRequest; + + await OtelProfilesIngestService.processProfilesFromQueue(mockRequest); + logger.debug( + `Successfully processed profiles for project: ${jobData.projectId}`, + ); + break; + } + case TelemetryType.Syslog: { const mockRequest: TelemetryRequest = { projectId: new ObjectID(jobData.projectId!.toString()), diff --git a/Telemetry/Middleware/OtelRequestMiddleware.ts b/Telemetry/Middleware/OtelRequestMiddleware.ts index ff532453a5..9b65b6f116 100644 --- a/Telemetry/Middleware/OtelRequestMiddleware.ts +++ b/Telemetry/Middleware/OtelRequestMiddleware.ts @@ -25,10 +25,16 @@ const MetricsProto: protobuf.Root = protobuf.loadSync( "/usr/src/app/ProtoFiles/OTel/v1/metrics.proto", ); +const ProfilesProto: protobuf.Root = protobuf.loadSync( + "/usr/src/app/ProtoFiles/OTel/v1/profiles.proto", +); + // Lookup the message type const LogsData: protobuf.Type = LogsProto.lookupType("LogsData"); const TracesData: protobuf.Type = TracesProto.lookupType("TracesData"); const MetricsData: protobuf.Type = MetricsProto.lookupType("MetricsData"); +const ProfilesData: protobuf.Type = + ProfilesProto.lookupType("ProfilesData"); export default class OpenTelemetryRequestMiddleware { @CaptureSpan() @@ -69,6 +75,13 @@ export default class OpenTelemetryRequestMiddleware { req.body = JSON.parse(Buffer.from(req.body).toString("utf-8")); } productType = ProductType.Metrics; + } else if (req.url.includes("/otlp/v1/profiles")) { + if (isProtobuf) { + req.body = ProfilesData.decode(req.body); + } else if (req.body instanceof Uint8Array) { + req.body = JSON.parse(Buffer.from(req.body).toString("utf-8")); + } + productType = ProductType.Profiles; } else { throw new BadRequestException("Invalid URL: " + req.baseUrl); } diff --git a/Telemetry/ProtoFiles/OTel/v1/profiles.proto b/Telemetry/ProtoFiles/OTel/v1/profiles.proto new file mode 100644 index 0000000000..0abbbd540d --- /dev/null +++ b/Telemetry/ProtoFiles/OTel/v1/profiles.proto @@ -0,0 +1,275 @@ +// Copyright 2024, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package opentelemetry.proto.profiles.v1development; + +import "./common.proto"; +import "./resource.proto"; + +option csharp_namespace = "OpenTelemetry.Proto.Profiles.V1Development"; +option java_multiple_files = true; +option java_package = "io.opentelemetry.proto.profiles.v1development"; +option java_outer_classname = "ProfilesProto"; +option go_package = "go.opentelemetry.io/proto/otlp/profiles/v1development"; + +// ProfilesData represents the profiles data that can be stored in a persistent storage, +// OR can be embedded by other protocols that transfer OTLP profiles data but do +// not implement the OTLP protocol. +message ProfilesData { + // An array of ResourceProfiles. + repeated ResourceProfiles resource_profiles = 1; +} + +// A collection of ScopeProfiles from a Resource. +message ResourceProfiles { + reserved 1000; + + // The resource for the profiles in this message. + opentelemetry.proto.resource.v1.Resource resource = 1; + + // A list of ScopeProfiles that originate from a resource. + repeated ScopeProfiles scope_profiles = 2; + + // The Schema URL. + string schema_url = 3; +} + +// A collection of ProfileContainers produced by an InstrumentationScope. +message ScopeProfiles { + // The instrumentation scope information for the profiles in this message. + opentelemetry.proto.common.v1.InstrumentationScope scope = 1; + + // A list of ProfileContainers that originate from an instrumentation scope. + repeated ProfileContainer profiles = 2; + + // The Schema URL. + string schema_url = 3; +} + +// A ProfileContainer represents a single profile along with metadata. +message ProfileContainer { + // A unique identifier for a profile. + bytes profile_id = 1; + + // start_time_unix_nano is the start time of the profile. + fixed64 start_time_unix_nano = 2; + + // end_time_unix_nano is the end time of the profile. + fixed64 end_time_unix_nano = 3; + + // attributes is a collection of key/value pairs. + repeated opentelemetry.proto.common.v1.KeyValue attributes = 4; + + // dropped_attributes_count is the number of attributes that were discarded. + uint32 dropped_attributes_count = 5; + + // Specifies format of the original payload. Common values are defined in semantic conventions. + string original_payload_format = 6; + + // Original payload can be stored in this field. + bytes original_payload = 7; + + // The profile data. + Profile profile = 8; +} + +// Provides additional context for a profile. +message Profile { + // A description of the profile. + repeated ValueType sample_type = 1; + + // The set of samples recorded in this profile. + repeated Sample sample = 2; + + // Time of collection (UTC) represented as nanoseconds past the epoch. + fixed64 time_unix_nano = 3; + + // Duration of the profile, if a duration makes sense. + fixed64 duration_nano = 4; + + // The kind of events between sampled occurrences. + ValueType period_type = 5; + + // The number of events between sampled occurrences. + int64 period = 6; + + // Free-form text associated to the profile. + repeated int64 comment = 7; // Indices into string table. + + // A common table for strings referenced by various messages. + repeated string string_table = 8; + + // Index into the string table of the type of the preferred sample value. + int64 default_sample_type = 9; + + // References to locations in the location_table. + repeated Location location_table = 10; + + // Useful program location + repeated Function function_table = 11; + + // Mapping from address to the mapped entry. + repeated Mapping mapping_table = 12; + + // References to links in the link_table. + repeated Link link_table = 13; + + // References to stacks in the stack_table. + repeated Stack stack_table = 14; + + // Lookup table for attributes. + repeated opentelemetry.proto.common.v1.KeyValue attribute_table = 15; + + // Represents a mapping between Attribute Keys and Units. + repeated AttributeUnit attribute_units = 16; + + // dropped_attributes_count is the number of attributes that were discarded. + uint32 dropped_attributes_count = 17; +} + +// ValueType describes the semantics and measurement units of a value. +message ValueType { + int64 type = 1; // Index into string table. + int64 unit = 2; // Index into string table. + AggregationTemporality aggregation_temporality = 3; +} + +// Specifies the method of aggregating metric values. +enum AggregationTemporality { + AGGREGATION_TEMPORALITY_UNSPECIFIED = 0; + AGGREGATION_TEMPORALITY_DELTA = 1; + AGGREGATION_TEMPORALITY_CUMULATIVE = 2; +} + +// Each Sample records values encountered in some program context. +message Sample { + // The index into the stack_table for the callstack of this sample. + int32 locations_start_index = 1; + + // locations_length limits the number of locations to use from the stack starting from locations_start_index. + int32 locations_length = 2; + + // The index into the stack_table for the stack of this sample. + int32 stack_index = 3; + + // The type and unit of each value is defined by the corresponding + // entry in Profile.sample_type. + repeated int64 value = 4; + + // References to attributes in Profile.attribute_table. + repeated int32 attribute_indices = 5; + + // Reference to link in Profile.link_table. + int32 link_index = 6; + + // Timestamps associated with the sample. + repeated fixed64 timestamps_unix_nano = 7; +} + +// Provides the mapping between a function name and its address/binary. +message Mapping { + // Address at which the binary (or DLL) is loaded into memory. + uint64 memory_start = 1; + + // The limit of the address range occupied by this mapping. + uint64 memory_limit = 2; + + // Offset in the binary that corresponds to the first mapped address. + uint64 file_offset = 3; + + // The object this entry is loaded from. + int64 filename = 4; // Index into string table + + // References to attributes in Profile.attribute_table. + repeated int32 attribute_indices = 5; + + // If true, the mapping was not able to be identified. + bool has_functions = 6; + bool has_filenames = 7; + bool has_line_numbers = 8; + bool has_inline_frames = 9; +} + +// Describes function and line number information. +message Location { + // Reference to mapping in Profile.mapping_table. + int32 mapping_index = 1; + + // The instruction address for this location. + uint64 address = 2; + + // Multiple line indicates this location has inlined functions. + repeated Line line = 3; + + // If true, this location address has NOT been resolved to a function and source. + bool is_folded = 4; + + // Type of frame (kernel, native, jvm, cpython, go, v8js, etc.). + int32 type_index = 5; + + // References to attributes in Profile.attribute_table. + repeated int32 attribute_indices = 6; +} + +// Details a specific line in a source code, linked to a function. +message Line { + // Reference to function in Profile.function_table. + int32 function_index = 1; + + // Line number in source code. + int64 line = 2; + + // Column number in source code. + int64 column = 3; +} + +// Describes a function, including its human-readable name. +message Function { + // Name of the function, in human-readable form. + int64 name = 1; // Index into string table + + // Name of the function, as identified by the system. + int64 system_name = 2; // Index into string table + + // Source file containing the function. + int64 filename = 3; // Index into string table + + // Line number in source file. + int64 start_line = 4; +} + +// A pointer from a profile Sample to a trace Span. +message Link { + // A unique identifier of a trace. + bytes trace_id = 1; + + // A unique identifier for the linked span. + bytes span_id = 2; +} + +// A Stack is a collection of location references. +message Stack { + // References to locations in the location_table. + repeated int32 location_indices = 1; +} + +// Defines the mapping between an attribute key and its unit. +message AttributeUnit { + // Index into string table for the attribute key. + int64 attribute_key = 1; + // Index into string table for the unit. + int64 unit = 2; +} diff --git a/Telemetry/ProtoFiles/OTel/v1/profiles_service.proto b/Telemetry/ProtoFiles/OTel/v1/profiles_service.proto new file mode 100644 index 0000000000..c5b46010f8 --- /dev/null +++ b/Telemetry/ProtoFiles/OTel/v1/profiles_service.proto @@ -0,0 +1,53 @@ +// Copyright 2024, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package opentelemetry.proto.collector.profiles.v1development; + +import "./profiles.proto"; + +option csharp_namespace = "OpenTelemetry.Proto.Collector.Profiles.V1Development"; +option java_multiple_files = true; +option java_package = "io.opentelemetry.proto.collector.profiles.v1development"; +option java_outer_classname = "ProfilesServiceProto"; +option go_package = "go.opentelemetry.io/proto/otlp/collector/profiles/v1development"; + +// Service that can be used to push profiles between one Application +// instrumented with OpenTelemetry and a collector, or between a collector and a +// central collector. +service ProfilesService { + // For performance reasons, it is recommended to keep this RPC + // alive for the entire life of the application. + rpc Export(ExportProfilesServiceRequest) returns (ExportProfilesServiceResponse) {} +} + +message ExportProfilesServiceRequest { + // An array of ResourceProfiles. + repeated opentelemetry.proto.profiles.v1development.ResourceProfiles resource_profiles = 1; +} + +message ExportProfilesServiceResponse { + // The details of a partially successful export request. + ExportProfilesPartialSuccess partial_success = 1; +} + +// ExportProfilesPartialSuccess is used when some profiles in the request could not be processed. +message ExportProfilesPartialSuccess { + // The number of rejected profiles. + int64 rejected_profiles = 1; + + // A developer-facing human-readable message. + string error_message = 2; +} diff --git a/Telemetry/Services/OtelProfilesIngestService.ts b/Telemetry/Services/OtelProfilesIngestService.ts new file mode 100644 index 0000000000..d92ff81720 --- /dev/null +++ b/Telemetry/Services/OtelProfilesIngestService.ts @@ -0,0 +1,834 @@ +import { TelemetryRequest } from "Common/Server/Middleware/TelemetryIngest"; +import OTelIngestService, { + TelemetryServiceMetadata, +} from "Common/Server/Services/OpenTelemetryIngestService"; +import OneUptimeDate from "Common/Types/Date"; +import BadRequestException from "Common/Types/Exception/BadRequestException"; +import { + ExpressRequest, + ExpressResponse, + NextFunction, +} from "Common/Server/Utils/Express"; +import Response from "Common/Server/Utils/Response"; +import Dictionary from "Common/Types/Dictionary"; +import ObjectID from "Common/Types/ObjectID"; +import TelemetryUtil, { + AttributeType, +} from "Common/Server/Utils/Telemetry/Telemetry"; +import { JSONArray, JSONObject } from "Common/Types/JSON"; +import logger from "Common/Server/Utils/Logger"; +import ProfileService from "Common/Server/Services/ProfileService"; +import ProfileSampleService from "Common/Server/Services/ProfileSampleService"; +import CaptureSpan from "Common/Server/Utils/Telemetry/CaptureSpan"; +import Text from "Common/Types/Text"; +import ProfilesQueueService from "./Queue/ProfilesQueueService"; +import OtelIngestBaseService from "./OtelIngestBaseService"; +import { + TELEMETRY_PROFILE_FLUSH_BATCH_SIZE, + TELEMETRY_PROFILE_SAMPLE_FLUSH_BATCH_SIZE, +} from "../Config"; +import crypto from "crypto"; + +type ParsedUnixNano = { + unixNano: number; + nano: string; + iso: string; + date: Date; +}; + +export default class OtelProfilesIngestService extends OtelIngestBaseService { + private static async flushProfilesBuffer( + profiles: Array, + force: boolean = false, + ): Promise { + while ( + profiles.length >= TELEMETRY_PROFILE_FLUSH_BATCH_SIZE || + (force && profiles.length > 0) + ) { + const batchSize: number = Math.min( + profiles.length, + TELEMETRY_PROFILE_FLUSH_BATCH_SIZE, + ); + const batch: Array = profiles.splice(0, batchSize); + + if (batch.length === 0) { + continue; + } + + await ProfileService.insertJsonRows(batch); + } + } + + private static async flushSamplesBuffer( + samples: Array, + force: boolean = false, + ): Promise { + while ( + samples.length >= TELEMETRY_PROFILE_SAMPLE_FLUSH_BATCH_SIZE || + (force && samples.length > 0) + ) { + const batchSize: number = Math.min( + samples.length, + TELEMETRY_PROFILE_SAMPLE_FLUSH_BATCH_SIZE, + ); + const batch: Array = samples.splice(0, batchSize); + + if (batch.length === 0) { + continue; + } + + await ProfileSampleService.insertJsonRows(batch); + } + } + + @CaptureSpan() + public static async ingestProfiles( + req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise { + try { + if (!(req as TelemetryRequest).projectId) { + throw new BadRequestException( + "Invalid request - projectId not found in request.", + ); + } + + req.body = req.body.toJSON ? req.body.toJSON() : req.body; + + Response.sendEmptySuccessResponse(req, res); + + await ProfilesQueueService.addProfileIngestJob(req as TelemetryRequest); + + return; + } catch (err) { + return next(err); + } + } + + @CaptureSpan() + public static async processProfilesFromQueue( + req: ExpressRequest, + ): Promise { + await this.processProfilesAsync(req); + } + + @CaptureSpan() + private static async processProfilesAsync( + req: ExpressRequest, + ): Promise { + try { + const resourceProfiles: JSONArray = req.body[ + "resourceProfiles" + ] as JSONArray; + + if (!resourceProfiles || !Array.isArray(resourceProfiles)) { + logger.error("Invalid resourceProfiles format in request body"); + throw new BadRequestException("Invalid resourceProfiles format"); + } + + const dbProfiles: Array = []; + const dbSamples: Array = []; + const serviceDictionary: Dictionary = {}; + let totalProfilesProcessed: number = 0; + + let resourceProfileCounter: number = 0; + for (const resourceProfile of resourceProfiles) { + try { + if (resourceProfileCounter % 25 === 0) { + await Promise.resolve(); + } + resourceProfileCounter++; + + const serviceName: string = this.getServiceNameFromAttributes( + req, + ((resourceProfile["resource"] as JSONObject)?.[ + "attributes" + ] as JSONArray) || [], + ); + + if (!serviceDictionary[serviceName]) { + const service: { + serviceId: ObjectID; + dataRententionInDays: number; + } = await OTelIngestService.telemetryServiceFromName({ + serviceName: serviceName, + projectId: (req as TelemetryRequest).projectId, + }); + + serviceDictionary[serviceName] = { + serviceName: serviceName, + serviceId: service.serviceId, + dataRententionInDays: service.dataRententionInDays, + }; + } + + const resourceAttributes: Dictionary< + AttributeType | Array + > = { + ...TelemetryUtil.getAttributesForServiceIdAndServiceName({ + serviceId: serviceDictionary[serviceName]!.serviceId!, + serviceName: serviceName, + }), + ...TelemetryUtil.getAttributes({ + items: + ((resourceProfile["resource"] as JSONObject)?.[ + "attributes" + ] as JSONArray) || [], + prefixKeysWithString: "resource", + }), + }; + + const scopeProfiles: JSONArray = resourceProfile[ + "scopeProfiles" + ] as JSONArray; + + if (!scopeProfiles || !Array.isArray(scopeProfiles)) { + logger.warn( + "Invalid scopeProfiles format, skipping resource profile", + ); + continue; + } + + let scopeProfileCounter: number = 0; + for (const scopeProfile of scopeProfiles) { + try { + if (scopeProfileCounter % 50 === 0) { + await Promise.resolve(); + } + scopeProfileCounter++; + + const profileContainers: JSONArray = scopeProfile[ + "profiles" + ] as JSONArray; + + if (!profileContainers || !Array.isArray(profileContainers)) { + logger.warn( + "Invalid profiles format, skipping scope profile", + ); + continue; + } + + let profileCounter: number = 0; + for (const profileContainer of profileContainers) { + try { + if (profileCounter % 100 === 0) { + await Promise.resolve(); + } + profileCounter++; + + const projectId: ObjectID = (req as TelemetryRequest) + .projectId; + const serviceId: ObjectID = + serviceDictionary[serviceName]!.serviceId!; + const dataRetentionInDays: number = + serviceDictionary[serviceName]!.dataRententionInDays; + + const profileId: string = this.convertBase64ToHexSafe( + (profileContainer as JSONObject)["profileId"] as + | string + | undefined, + ) || ObjectID.generate().toString(); + + const startTime: ParsedUnixNano = this.safeParseUnixNano( + (profileContainer as JSONObject)[ + "startTimeUnixNano" + ] as string | number | undefined, + "profile startTimeUnixNano", + ); + + const endTime: ParsedUnixNano = this.safeParseUnixNano( + (profileContainer as JSONObject)[ + "endTimeUnixNano" + ] as string | number | undefined, + "profile endTimeUnixNano", + ); + + const durationNano: number = Math.max( + 0, + Math.trunc(endTime.unixNano - startTime.unixNano), + ); + + // Container-level attributes + const containerAttributes: Dictionary< + AttributeType | Array + > = { + ...resourceAttributes, + ...TelemetryUtil.getAttributes({ + items: + ((profileContainer as JSONObject)[ + "attributes" + ] as JSONArray) || [], + prefixKeysWithString: "profileAttributes", + }), + }; + + if ( + scopeProfile["scope"] && + Object.keys(scopeProfile["scope"]).length > 0 + ) { + const scopeAttributes: JSONObject = scopeProfile[ + "scope" + ] as JSONObject; + for (const key of Object.keys(scopeAttributes)) { + containerAttributes[`scope.${key}`] = scopeAttributes[ + key + ] as AttributeType; + } + } + + const attributeKeys: Array = + TelemetryUtil.getAttributeKeys(containerAttributes); + + const profile: JSONObject | undefined = ( + profileContainer as JSONObject + )["profile"] as JSONObject | undefined; + + const originalPayloadFormat: string = + ((profileContainer as JSONObject)[ + "originalPayloadFormat" + ] as string) || ""; + + // Extract sample types from the profile + let profileType: string = "unknown"; + let unit: string = "unknown"; + let periodType: string = ""; + let period: number = 0; + let sampleCount: number = 0; + + if (profile) { + const stringTable: Array = + (profile["stringTable"] as Array) || []; + + // Extract sample type from first sample_type entry + const sampleTypes: JSONArray = + (profile["sampleType"] as JSONArray) || []; + if (sampleTypes.length > 0) { + const firstSampleType: JSONObject = + sampleTypes[0] as JSONObject; + const typeIndex: number = + (firstSampleType["type"] as number) || 0; + const unitIndex: number = + (firstSampleType["unit"] as number) || 0; + + if (stringTable[typeIndex]) { + profileType = stringTable[typeIndex]!; + } + if (stringTable[unitIndex]) { + unit = stringTable[unitIndex]!; + } + } + + // Extract period type + const periodTypeObj: JSONObject | undefined = + profile["periodType"] as JSONObject | undefined; + if (periodTypeObj) { + const periodTypeIndex: number = + (periodTypeObj["type"] as number) || 0; + if (stringTable[periodTypeIndex]) { + periodType = stringTable[periodTypeIndex]!; + } + } + period = (profile["period"] as number) || 0; + + // Process samples + const samples: JSONArray = + (profile["sample"] as JSONArray) || []; + sampleCount = samples.length; + + // Build dictionary tables for denormalization + const functionTable: JSONArray = + (profile["functionTable"] as JSONArray) || []; + const locationTable: JSONArray = + (profile["locationTable"] as JSONArray) || []; + const linkTable: JSONArray = + (profile["linkTable"] as JSONArray) || []; + const stackTable: JSONArray = + (profile["stackTable"] as JSONArray) || []; + const attributeTable: JSONArray = + (profile["attributeTable"] as JSONArray) || []; + + let sampleCounter: number = 0; + for (const sample of samples) { + try { + if (sampleCounter % 200 === 0) { + await Promise.resolve(); + } + sampleCounter++; + + const sampleObj: JSONObject = sample as JSONObject; + + // Resolve the stack frames + const resolvedStack: { + frames: Array; + frameTypes: Array; + } = this.resolveStackFrames({ + sample: sampleObj, + stackTable: stackTable, + locationTable: locationTable, + functionTable: functionTable, + stringTable: stringTable, + attributeTable: attributeTable, + }); + + // Compute stacktrace hash + const stacktraceHash: string = crypto + .createHash("sha256") + .update(resolvedStack.frames.join("|")) + .digest("hex"); + + // Resolve trace/span correlation from link table + let traceId: string = ""; + let spanId: string = ""; + const linkIndex: number = + (sampleObj["linkIndex"] as number) || 0; + if ( + linkTable.length > 0 && + linkIndex >= 0 && + linkIndex < linkTable.length + ) { + const link: JSONObject = + linkTable[linkIndex] as JSONObject; + traceId = this.convertBase64ToHexSafe( + link["traceId"] as string | undefined, + ); + spanId = this.convertBase64ToHexSafe( + link["spanId"] as string | undefined, + ); + } + + // Extract sample value (first value from values array) + const values: Array = + (sampleObj["value"] as Array) || []; + let sampleValue: number = 0; + if (values.length > 0) { + sampleValue = + typeof values[0] === "string" + ? parseInt(values[0]!, 10) || 0 + : (values[0] as number) || 0; + } + + // Extract sample timestamp + const timestamps: Array = + (sampleObj["timestampsUnixNano"] as Array< + number | string + >) || []; + let sampleTime: ParsedUnixNano = startTime; + if (timestamps.length > 0) { + sampleTime = this.safeParseUnixNano( + timestamps[0] as string | number, + "sample timestampsUnixNano", + ); + } + + // Resolve sample-level labels from attribute_indices + const sampleLabels: Dictionary = {}; + const sampleAttributeIndices: Array = + (sampleObj["attributeIndices"] as Array) || []; + for (const attrIdx of sampleAttributeIndices) { + if (attrIdx >= 0 && attrIdx < attributeTable.length) { + const attr: JSONObject = + attributeTable[attrIdx] as JSONObject; + const key: string = + (attr["key"] as string) || ""; + const val: JSONObject = + (attr["value"] as JSONObject) || {}; + sampleLabels[key] = + (val["stringValue"] as string) || + (val["intValue"]?.toString() as string) || + (val["doubleValue"]?.toString() as string) || + (val["boolValue"]?.toString() as string) || + ""; + } + } + + const sampleRow: JSONObject = this.buildSampleRow({ + projectId: projectId, + serviceId: serviceId, + profileId: profileId, + traceId: traceId, + spanId: spanId, + time: sampleTime, + stacktrace: resolvedStack.frames, + stacktraceHash: stacktraceHash, + frameTypes: resolvedStack.frameTypes, + value: sampleValue, + profileType: profileType, + labels: sampleLabels, + dataRetentionInDays: dataRetentionInDays, + }); + + dbSamples.push(sampleRow); + + if ( + dbSamples.length >= + TELEMETRY_PROFILE_SAMPLE_FLUSH_BATCH_SIZE + ) { + await this.flushSamplesBuffer(dbSamples); + } + } catch (sampleError) { + logger.error("Error processing individual sample:"); + logger.error(sampleError); + } + } + + // Also extract trace/span from first link if available for the profile-level record + let profileTraceId: string = ""; + let profileSpanId: string = ""; + if (linkTable.length > 0) { + const firstLink: JSONObject = + linkTable[0] as JSONObject; + profileTraceId = this.convertBase64ToHexSafe( + firstLink["traceId"] as string | undefined, + ); + profileSpanId = this.convertBase64ToHexSafe( + firstLink["spanId"] as string | undefined, + ); + } + + const profileRow: JSONObject = this.buildProfileRow({ + projectId: projectId, + serviceId: serviceId, + profileId: profileId, + traceId: profileTraceId, + spanId: profileSpanId, + startTime: startTime, + endTime: endTime, + durationNano: durationNano, + profileType: profileType, + unit: unit, + periodType: periodType, + period: period, + attributes: containerAttributes, + attributeKeys: attributeKeys, + sampleCount: sampleCount, + originalPayloadFormat: originalPayloadFormat, + dataRetentionInDays: dataRetentionInDays, + }); + + dbProfiles.push(profileRow); + totalProfilesProcessed++; + + if ( + dbProfiles.length >= TELEMETRY_PROFILE_FLUSH_BATCH_SIZE + ) { + await this.flushProfilesBuffer(dbProfiles); + } + } + } catch (profileError) { + logger.error("Error processing individual profile:"); + logger.error(profileError); + } + } + } catch (scopeError) { + logger.error("Error processing scope profile:"); + logger.error(scopeError); + } + } + } catch (resourceError) { + logger.error("Error processing resource profile:"); + logger.error(resourceError); + } + } + + await Promise.all([ + this.flushProfilesBuffer(dbProfiles, true), + this.flushSamplesBuffer(dbSamples, true), + ]); + + if (totalProfilesProcessed === 0) { + logger.warn("No valid profiles were processed from the request"); + return; + } + + logger.debug( + `Successfully processed ${totalProfilesProcessed} profiles for project: ${(req as TelemetryRequest).projectId}`, + ); + + try { + dbProfiles.length = 0; + dbSamples.length = 0; + if (req.body) { + req.body = null; + } + } catch (cleanupError) { + logger.error("Error during memory cleanup:"); + logger.error(cleanupError); + } + } catch (error) { + logger.error("Critical error in processProfilesAsync:"); + logger.error(error); + throw error; + } + } + + private static resolveStackFrames(data: { + sample: JSONObject; + stackTable: JSONArray; + locationTable: JSONArray; + functionTable: JSONArray; + stringTable: Array; + attributeTable: JSONArray; + }): { frames: Array; frameTypes: Array } { + const frames: Array = []; + const frameTypes: Array = []; + + // Try stack_index first (newer format) + const stackIndex: number | undefined = data.sample[ + "stackIndex" + ] as number | undefined; + + let locationIndices: Array = []; + + if ( + stackIndex !== undefined && + stackIndex >= 0 && + stackIndex < data.stackTable.length + ) { + const stack: JSONObject = data.stackTable[stackIndex] as JSONObject; + locationIndices = + (stack["locationIndices"] as Array) || []; + } else { + // Fall back to locations_start_index + locations_length (older format) + const startIndex: number = + (data.sample["locationsStartIndex"] as number) || 0; + const length: number = + (data.sample["locationsLength"] as number) || 0; + if (length > 0) { + for (let i: number = startIndex; i < startIndex + length; i++) { + locationIndices.push(i); + } + } + } + + for (const locationIndex of locationIndices) { + if (locationIndex < 0 || locationIndex >= data.locationTable.length) { + frames.push(""); + frameTypes.push("unknown"); + continue; + } + + const location: JSONObject = + data.locationTable[locationIndex] as JSONObject; + const lines: JSONArray = (location["line"] as JSONArray) || []; + + // Resolve frame type from location type_index + let locFrameType: string = "unknown"; + const typeIndex: number | undefined = location[ + "typeIndex" + ] as number | undefined; + if (typeIndex !== undefined && typeIndex >= 0) { + // type_index refers to attribute_table entry with key "profile.frame.type" + if (typeIndex < data.attributeTable.length) { + const attr: JSONObject = + data.attributeTable[typeIndex] as JSONObject; + const val: JSONObject = + (attr["value"] as JSONObject) || {}; + locFrameType = + (val["stringValue"] as string) || "unknown"; + } + } + + if (lines.length === 0) { + // No line info - use address + const address: number | string = + (location["address"] as number | string) || 0; + frames.push(`0x${address.toString(16)}`); + frameTypes.push(locFrameType); + } else { + // Handle inline frames: each line in location.lines expands to a frame + for (const lineObj of lines) { + const line: JSONObject = lineObj as JSONObject; + const functionIndex: number = + (line["functionIndex"] as number) || 0; + + let functionName: string = ""; + let fileName: string = ""; + let lineNumber: number = 0; + + if ( + functionIndex >= 0 && + functionIndex < data.functionTable.length + ) { + const func: JSONObject = + data.functionTable[functionIndex] as JSONObject; + const nameIndex: number = (func["name"] as number) || 0; + const fileIndex: number = (func["filename"] as number) || 0; + + if (nameIndex >= 0 && nameIndex < data.stringTable.length) { + functionName = data.stringTable[nameIndex] || ""; + } + if (fileIndex >= 0 && fileIndex < data.stringTable.length) { + fileName = data.stringTable[fileIndex] || ""; + } + } + + lineNumber = (line["line"] as number) || 0; + + let frame: string = functionName; + if (fileName) { + frame += `@${fileName}`; + } + if (lineNumber > 0) { + frame += `:${lineNumber}`; + } + + frames.push(frame); + frameTypes.push(locFrameType); + } + } + } + + return { frames, frameTypes }; + } + + private static buildProfileRow(data: { + projectId: ObjectID; + serviceId: ObjectID; + profileId: string; + traceId: string; + spanId: string; + startTime: ParsedUnixNano; + endTime: ParsedUnixNano; + durationNano: number; + profileType: string; + unit: string; + periodType: string; + period: number; + attributes: Dictionary>; + attributeKeys: Array; + sampleCount: number; + originalPayloadFormat: string; + dataRetentionInDays: number; + }): JSONObject { + const ingestionDate: Date = OneUptimeDate.getCurrentDate(); + const ingestionTimestamp: string = + OneUptimeDate.toClickhouseDateTime(ingestionDate); + const retentionDate: Date = OneUptimeDate.addRemoveDays( + ingestionDate, + data.dataRetentionInDays || 15, + ); + + return { + _id: ObjectID.generate().toString(), + createdAt: ingestionTimestamp, + updatedAt: ingestionTimestamp, + projectId: data.projectId.toString(), + serviceId: data.serviceId.toString(), + profileId: data.profileId, + traceId: data.traceId || "", + spanId: data.spanId || "", + startTime: OneUptimeDate.toClickhouseDateTime(data.startTime.date), + endTime: OneUptimeDate.toClickhouseDateTime(data.endTime.date), + startTimeUnixNano: data.startTime.nano, + endTimeUnixNano: data.endTime.nano, + durationNano: data.durationNano.toString(), + profileType: data.profileType, + unit: data.unit, + periodType: data.periodType || "", + period: data.period ? data.period.toString() : "0", + attributes: data.attributes, + attributeKeys: data.attributeKeys, + sampleCount: data.sampleCount, + originalPayloadFormat: data.originalPayloadFormat || "", + retentionDate: OneUptimeDate.toClickhouseDateTime(retentionDate), + }; + } + + private static buildSampleRow(data: { + projectId: ObjectID; + serviceId: ObjectID; + profileId: string; + traceId: string; + spanId: string; + time: ParsedUnixNano; + stacktrace: Array; + stacktraceHash: string; + frameTypes: Array; + value: number; + profileType: string; + labels: Dictionary; + dataRetentionInDays: number; + }): JSONObject { + const ingestionDate: Date = OneUptimeDate.getCurrentDate(); + const ingestionTimestamp: string = + OneUptimeDate.toClickhouseDateTime(ingestionDate); + const retentionDate: Date = OneUptimeDate.addRemoveDays( + ingestionDate, + data.dataRetentionInDays || 15, + ); + + return { + _id: ObjectID.generate().toString(), + createdAt: ingestionTimestamp, + updatedAt: ingestionTimestamp, + projectId: data.projectId.toString(), + serviceId: data.serviceId.toString(), + profileId: data.profileId, + traceId: data.traceId || "", + spanId: data.spanId || "", + time: OneUptimeDate.toClickhouseDateTime(data.time.date), + timeUnixNano: data.time.nano, + stacktrace: data.stacktrace, + stacktraceHash: data.stacktraceHash, + frameTypes: data.frameTypes, + value: data.value.toString(), + profileType: data.profileType, + labels: data.labels, + retentionDate: OneUptimeDate.toClickhouseDateTime(retentionDate), + }; + } + + private static safeParseUnixNano( + value: string | number | undefined, + context: string, + ): ParsedUnixNano { + let numericValue: number = OneUptimeDate.getCurrentDateAsUnixNano(); + + if (value !== undefined && value !== null) { + try { + if (typeof value === "string") { + const parsed: number = Number.parseFloat(value); + if (!Number.isNaN(parsed)) { + numericValue = parsed; + } else { + throw new Error(`Invalid timestamp string: ${value}`); + } + } else if (typeof value === "number") { + if (!Number.isFinite(value)) { + throw new Error(`Invalid timestamp number: ${value}`); + } + numericValue = value; + } + } catch (error) { + logger.warn( + `Error processing ${context}: ${error instanceof Error ? error.message : String(error)}, using current time`, + ); + numericValue = OneUptimeDate.getCurrentDateAsUnixNano(); + } + } + + numericValue = Math.trunc(numericValue); + const date: Date = OneUptimeDate.fromUnixNano(numericValue); + const iso: string = OneUptimeDate.toString(date); + + return { + unixNano: numericValue, + nano: numericValue.toString(), + iso: iso, + date: date, + }; + } + + private static convertBase64ToHexSafe(value: string | undefined): string { + if (!value) { + return ""; + } + + try { + return Text.convertBase64ToHex(value); + } catch { + return ""; + } + } +} diff --git a/Telemetry/Services/Queue/ProfilesQueueService.ts b/Telemetry/Services/Queue/ProfilesQueueService.ts new file mode 100644 index 0000000000..54f0f758dc --- /dev/null +++ b/Telemetry/Services/Queue/ProfilesQueueService.ts @@ -0,0 +1,15 @@ +import { TelemetryRequest } from "Common/Server/Middleware/TelemetryIngest"; +import TelemetryQueueService, { + ProfilesIngestJobData, +} from "./TelemetryQueueService"; + +// Export the interface for backward compatibility +export { ProfilesIngestJobData }; + +export default class ProfilesQueueService { + public static async addProfileIngestJob( + req: TelemetryRequest, + ): Promise { + return TelemetryQueueService.addProfileIngestJob(req); + } +} diff --git a/Telemetry/Services/Queue/TelemetryQueueService.ts b/Telemetry/Services/Queue/TelemetryQueueService.ts index 107e026c9b..5cba359199 100644 --- a/Telemetry/Services/Queue/TelemetryQueueService.ts +++ b/Telemetry/Services/Queue/TelemetryQueueService.ts @@ -9,6 +9,7 @@ export enum TelemetryType { Logs = "logs", Traces = "traces", Metrics = "metrics", + Profiles = "profiles", Syslog = "syslog", FluentLogs = "fluentlogs", ProbeIngest = "probe-ingest", @@ -89,6 +90,10 @@ export interface MetricsIngestJobData extends TelemetryIngestJobData { type: TelemetryType.Metrics; } +export interface ProfilesIngestJobData extends TelemetryIngestJobData { + type: TelemetryType.Profiles; +} + export interface SyslogIngestJobData extends TelemetryIngestJobData { type: TelemetryType.Syslog; } @@ -136,6 +141,12 @@ export default class TelemetryQueueService { return this.addTelemetryIngestJob(req, TelemetryType.Metrics); } + public static async addProfileIngestJob( + req: TelemetryRequest, + ): Promise { + return this.addTelemetryIngestJob(req, TelemetryType.Profiles); + } + public static async addFluentLogIngestJob( req: TelemetryRequest, ): Promise { diff --git a/Telemetry/Tests/Services/OtelProfilesIngestService.test.ts b/Telemetry/Tests/Services/OtelProfilesIngestService.test.ts new file mode 100644 index 0000000000..5eecc8fff3 --- /dev/null +++ b/Telemetry/Tests/Services/OtelProfilesIngestService.test.ts @@ -0,0 +1,441 @@ +import OtelProfilesIngestService from "../../Services/OtelProfilesIngestService"; +import { JSONObject, JSONArray } from "Common/Types/JSON"; + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +const service: any = OtelProfilesIngestService as any; + +describe("OtelProfilesIngestService", () => { + describe("resolveStackFrames", () => { + const baseStringTable: Array = [ + "", // 0 - empty + "main", // 1 + "app.go", // 2 + "runtime.main", // 3 + "runtime/proc.go", // 4 + "handleRequest", // 5 + "server.go", // 6 + "kernel", // 7 + "native", // 8 + "go", // 9 + ]; + + const baseFunctionTable: JSONArray = [ + { name: 1, systemName: 1, filename: 2, startLine: 10 }, // 0: main@app.go + { name: 3, systemName: 3, filename: 4, startLine: 100 }, // 1: runtime.main@runtime/proc.go + { name: 5, systemName: 5, filename: 6, startLine: 50 }, // 2: handleRequest@server.go + ]; + + const baseLocationTable: JSONArray = [ + { + mappingIndex: 0, + address: 4096, + line: [{ functionIndex: 0, line: 15, column: 0 }], + isFolded: false, + typeIndex: 0, + attributeIndices: [], + }, // 0: main@app.go:15 + { + mappingIndex: 0, + address: 8192, + line: [{ functionIndex: 1, line: 120, column: 0 }], + isFolded: false, + typeIndex: 0, + attributeIndices: [], + }, // 1: runtime.main@runtime/proc.go:120 + { + mappingIndex: 0, + address: 12288, + line: [{ functionIndex: 2, line: 55, column: 0 }], + isFolded: false, + typeIndex: 0, + attributeIndices: [], + }, // 2: handleRequest@server.go:55 + ]; + + const baseAttributeTable: JSONArray = [ + { key: "profile.frame.type", value: { stringValue: "go" } }, + ]; + + test("resolves simple stack via stack_table", () => { + const stackTable: JSONArray = [ + { locationIndices: [0, 1] }, // stack 0: main, runtime.main + ]; + + const sample: JSONObject = { stackIndex: 0 }; + + const result: { frames: Array; frameTypes: Array } = + service["resolveStackFrames"]({ + sample, + stackTable, + locationTable: baseLocationTable, + functionTable: baseFunctionTable, + stringTable: baseStringTable, + attributeTable: baseAttributeTable, + }); + + expect(result.frames).toHaveLength(2); + expect(result.frames[0]).toBe("main@app.go:15"); + expect(result.frames[1]).toBe("runtime.main@runtime/proc.go:120"); + expect(result.frameTypes[0]).toBe("go"); + expect(result.frameTypes[1]).toBe("go"); + }); + + test("resolves stack with three frames", () => { + const stackTable: JSONArray = [ + { locationIndices: [2, 0, 1] }, // handleRequest -> main -> runtime.main + ]; + + const sample: JSONObject = { stackIndex: 0 }; + + const result: { frames: Array; frameTypes: Array } = + service["resolveStackFrames"]({ + sample, + stackTable, + locationTable: baseLocationTable, + functionTable: baseFunctionTable, + stringTable: baseStringTable, + attributeTable: baseAttributeTable, + }); + + expect(result.frames).toHaveLength(3); + expect(result.frames[0]).toBe("handleRequest@server.go:55"); + expect(result.frames[1]).toBe("main@app.go:15"); + expect(result.frames[2]).toBe("runtime.main@runtime/proc.go:120"); + }); + + test("handles inline frames (multiple lines per location)", () => { + const locationTableWithInline: JSONArray = [ + { + mappingIndex: 0, + address: 4096, + line: [ + { functionIndex: 0, line: 15, column: 0 }, // main@app.go:15 + { functionIndex: 2, line: 55, column: 0 }, // handleRequest@server.go:55 (inlined) + ], + isFolded: false, + typeIndex: 0, + attributeIndices: [], + }, + ]; + + const stackTable: JSONArray = [{ locationIndices: [0] }]; + + const sample: JSONObject = { stackIndex: 0 }; + + const result: { frames: Array; frameTypes: Array } = + service["resolveStackFrames"]({ + sample, + stackTable, + locationTable: locationTableWithInline, + functionTable: baseFunctionTable, + stringTable: baseStringTable, + attributeTable: baseAttributeTable, + }); + + // Inline frames should expand into separate frames + expect(result.frames).toHaveLength(2); + expect(result.frames[0]).toBe("main@app.go:15"); + expect(result.frames[1]).toBe("handleRequest@server.go:55"); + }); + + test("handles location without line info (uses hex address)", () => { + const locationTableNoLine: JSONArray = [ + { + mappingIndex: 0, + address: 65535, + line: [], + isFolded: false, + typeIndex: 0, + attributeIndices: [], + }, + ]; + + const stackTable: JSONArray = [{ locationIndices: [0] }]; + + const sample: JSONObject = { stackIndex: 0 }; + + const result: { frames: Array; frameTypes: Array } = + service["resolveStackFrames"]({ + sample, + stackTable, + locationTable: locationTableNoLine, + functionTable: baseFunctionTable, + stringTable: baseStringTable, + attributeTable: baseAttributeTable, + }); + + expect(result.frames).toHaveLength(1); + expect(result.frames[0]).toBe("0xffff"); + }); + + test("handles empty stack", () => { + const stackTable: JSONArray = [{ locationIndices: [] }]; + + const sample: JSONObject = { stackIndex: 0 }; + + const result: { frames: Array; frameTypes: Array } = + service["resolveStackFrames"]({ + sample, + stackTable, + locationTable: baseLocationTable, + functionTable: baseFunctionTable, + stringTable: baseStringTable, + attributeTable: baseAttributeTable, + }); + + expect(result.frames).toHaveLength(0); + expect(result.frameTypes).toHaveLength(0); + }); + + test("handles out-of-bounds location index gracefully", () => { + const stackTable: JSONArray = [{ locationIndices: [999] }]; + + const sample: JSONObject = { stackIndex: 0 }; + + const result: { frames: Array; frameTypes: Array } = + service["resolveStackFrames"]({ + sample, + stackTable, + locationTable: baseLocationTable, + functionTable: baseFunctionTable, + stringTable: baseStringTable, + attributeTable: baseAttributeTable, + }); + + expect(result.frames).toHaveLength(1); + expect(result.frames[0]).toBe(""); + expect(result.frameTypes[0]).toBe("unknown"); + }); + + test("falls back to locationsStartIndex/locationsLength when no stackIndex", () => { + const sample: JSONObject = { + locationsStartIndex: 0, + locationsLength: 2, + }; + + const result: { frames: Array; frameTypes: Array } = + service["resolveStackFrames"]({ + sample, + stackTable: [], + locationTable: baseLocationTable, + functionTable: baseFunctionTable, + stringTable: baseStringTable, + attributeTable: baseAttributeTable, + }); + + expect(result.frames).toHaveLength(2); + expect(result.frames[0]).toBe("main@app.go:15"); + expect(result.frames[1]).toBe("runtime.main@runtime/proc.go:120"); + }); + + test("handles function without filename", () => { + const stringTableNoFile: Array = [ + "", // 0 + "anonymous", // 1 + ]; + + const functionTableNoFile: JSONArray = [ + { name: 1, systemName: 1, filename: 0, startLine: 0 }, // anonymous (no file) + ]; + + const locationTableNoFile: JSONArray = [ + { + mappingIndex: 0, + address: 4096, + line: [{ functionIndex: 0, line: 0, column: 0 }], + isFolded: false, + typeIndex: 0, + attributeIndices: [], + }, + ]; + + const stackTable: JSONArray = [{ locationIndices: [0] }]; + const sample: JSONObject = { stackIndex: 0 }; + + const result: { frames: Array; frameTypes: Array } = + service["resolveStackFrames"]({ + sample, + stackTable, + locationTable: locationTableNoFile, + functionTable: functionTableNoFile, + stringTable: stringTableNoFile, + attributeTable: [], + }); + + expect(result.frames).toHaveLength(1); + // Should just be function name without file or line + expect(result.frames[0]).toBe("anonymous"); + }); + }); + + describe("safeParseUnixNano", () => { + test("parses numeric value correctly", () => { + const nanos: number = 1700000000000000000; + const result: { + unixNano: number; + nano: string; + iso: string; + date: Date; + } = service["safeParseUnixNano"](nanos, "test"); + + expect(result.unixNano).toBe(nanos); + expect(result.nano).toBe(nanos.toString()); + expect(result.date).toBeInstanceOf(Date); + }); + + test("parses string value correctly", () => { + const nanos: string = "1700000000000000000"; + const result: { + unixNano: number; + nano: string; + iso: string; + date: Date; + } = service["safeParseUnixNano"](nanos, "test"); + + expect(result.unixNano).toBe(1700000000000000000); + expect(result.date).toBeInstanceOf(Date); + }); + + test("falls back to current time for undefined", () => { + const result: { + unixNano: number; + nano: string; + iso: string; + date: Date; + } = service["safeParseUnixNano"](undefined, "test"); + + expect(result.unixNano).toBeGreaterThan(0); + expect(result.date).toBeInstanceOf(Date); + }); + + test("falls back to current time for NaN string", () => { + const result: { + unixNano: number; + nano: string; + iso: string; + date: Date; + } = service["safeParseUnixNano"]("not-a-number", "test"); + + expect(result.unixNano).toBeGreaterThan(0); + expect(result.date).toBeInstanceOf(Date); + }); + + test("falls back to current time for Infinity", () => { + const result: { + unixNano: number; + nano: string; + iso: string; + date: Date; + } = service["safeParseUnixNano"](Infinity, "test"); + + expect(result.unixNano).toBeGreaterThan(0); + expect(result.date).toBeInstanceOf(Date); + }); + }); + + describe("convertBase64ToHexSafe", () => { + test("returns empty string for undefined", () => { + const result: string = service["convertBase64ToHexSafe"](undefined); + expect(result).toBe(""); + }); + + test("returns empty string for empty string", () => { + const result: string = service["convertBase64ToHexSafe"](""); + expect(result).toBe(""); + }); + + test("converts valid base64 to hex", () => { + // "AQID" is base64 for bytes [1, 2, 3] which is hex "010203" + const result: string = service["convertBase64ToHexSafe"]("AQID"); + expect(result).toBe("010203"); + }); + }); + + describe("buildProfileRow", () => { + test("builds profile row with all fields", () => { + const row: JSONObject = service["buildProfileRow"]({ + projectId: { toString: () => "proj-123" }, + serviceId: { toString: () => "svc-456" }, + profileId: "profile-789", + traceId: "trace-abc", + spanId: "span-def", + startTime: { + unixNano: 1700000000000000000, + nano: "1700000000000000000", + iso: "2023-11-14T22:13:20.000Z", + date: new Date("2023-11-14T22:13:20.000Z"), + }, + endTime: { + unixNano: 1700000001000000000, + nano: "1700000001000000000", + iso: "2023-11-14T22:13:21.000Z", + date: new Date("2023-11-14T22:13:21.000Z"), + }, + durationNano: 1000000000, + profileType: "cpu", + unit: "nanoseconds", + periodType: "cpu", + period: 10000000, + attributes: { "resource.service.name": "my-service" }, + attributeKeys: ["resource.service.name"], + sampleCount: 100, + originalPayloadFormat: "pprofext", + dataRetentionInDays: 15, + }); + + expect(row["projectId"]).toBe("proj-123"); + expect(row["serviceId"]).toBe("svc-456"); + expect(row["profileId"]).toBe("profile-789"); + expect(row["traceId"]).toBe("trace-abc"); + expect(row["spanId"]).toBe("span-def"); + expect(row["profileType"]).toBe("cpu"); + expect(row["unit"]).toBe("nanoseconds"); + expect(row["periodType"]).toBe("cpu"); + expect(row["sampleCount"]).toBe(100); + expect(row["originalPayloadFormat"]).toBe("pprofext"); + expect(row["_id"]).toBeDefined(); + expect(row["retentionDate"]).toBeDefined(); + }); + }); + + describe("buildSampleRow", () => { + test("builds sample row with all fields", () => { + const row: JSONObject = service["buildSampleRow"]({ + projectId: { toString: () => "proj-123" }, + serviceId: { toString: () => "svc-456" }, + profileId: "profile-789", + traceId: "trace-abc", + spanId: "span-def", + time: { + unixNano: 1700000000000000000, + nano: "1700000000000000000", + iso: "2023-11-14T22:13:20.000Z", + date: new Date("2023-11-14T22:13:20.000Z"), + }, + stacktrace: ["main@app.go:15", "runtime.main@runtime/proc.go:120"], + stacktraceHash: "abc123", + frameTypes: ["go", "go"], + value: 50000, + profileType: "cpu", + labels: { "thread.name": "main" }, + dataRetentionInDays: 15, + }); + + expect(row["projectId"]).toBe("proj-123"); + expect(row["serviceId"]).toBe("svc-456"); + expect(row["profileId"]).toBe("profile-789"); + expect(row["traceId"]).toBe("trace-abc"); + expect(row["stacktrace"]).toEqual([ + "main@app.go:15", + "runtime.main@runtime/proc.go:120", + ]); + expect(row["stacktraceHash"]).toBe("abc123"); + expect(row["frameTypes"]).toEqual(["go", "go"]); + expect(row["value"]).toBe("50000"); + expect(row["profileType"]).toBe("cpu"); + expect(row["labels"]).toEqual({ "thread.name": "main" }); + expect(row["_id"]).toBeDefined(); + expect(row["retentionDate"]).toBeDefined(); + }); + }); +});