feat: Add ProfilesService and ingestion service for OpenTelemetry profiles

- Introduced `profiles_service.proto` to define the ProfilesService for exporting resource profiles.
- Implemented `OtelProfilesIngestService` to handle ingestion of profiles, including processing and flushing to the database.
- Created `ProfilesQueueService` to manage profile ingestion jobs.
- Added comprehensive tests for `OtelProfilesIngestService`, covering stack frame resolution, timestamp parsing, and row building for profiles and samples.
This commit is contained in:
Nawaz Dhandala
2026-03-27 10:15:55 +00:00
parent a1122ed241
commit 147ff47aa2
20 changed files with 3022 additions and 0 deletions

View File

@@ -4,6 +4,8 @@ import Metric from "./Metric";
import Span from "./Span";
import ExceptionInstance from "./ExceptionInstance";
import MonitorLog from "./MonitorLog";
import Profile from "./Profile";
import ProfileSample from "./ProfileSample";
const AnalyticsModels: Array<{ new (): AnalyticsBaseModel }> = [
Log,
@@ -11,6 +13,8 @@ const AnalyticsModels: Array<{ new (): AnalyticsBaseModel }> = [
Metric,
ExceptionInstance,
MonitorLog,
Profile,
ProfileSample,
];
const modelTypeMap: { [key: string]: { new (): AnalyticsBaseModel } } = {};

View File

@@ -0,0 +1,687 @@
import AnalyticsBaseModel from "./AnalyticsBaseModel/AnalyticsBaseModel";
import Route from "../../Types/API/Route";
import AnalyticsTableEngine from "../../Types/AnalyticsDatabase/AnalyticsTableEngine";
import AnalyticsTableName from "../../Types/AnalyticsDatabase/AnalyticsTableName";
import AnalyticsTableColumn, {
SkipIndexType,
} from "../../Types/AnalyticsDatabase/TableColumn";
import TableColumnType from "../../Types/AnalyticsDatabase/TableColumnType";
import { JSONObject } from "../../Types/JSON";
import ObjectID from "../../Types/ObjectID";
import Permission from "../../Types/Permission";
export default class Profile extends AnalyticsBaseModel {
public constructor() {
const projectIdColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "projectId",
title: "Project ID",
description: "ID of project",
required: true,
type: TableColumnType.ObjectID,
isTenantId: true,
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const serviceIdColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "serviceId",
title: "Service ID",
description: "ID of the Service which created the profile",
required: true,
type: TableColumnType.ObjectID,
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const profileIdColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "profileId",
title: "Profile ID",
description: "Unique identifier for the profile",
required: true,
type: TableColumnType.Text,
codec: { codec: "ZSTD", level: 1 },
skipIndex: {
name: "idx_profile_id",
type: SkipIndexType.BloomFilter,
params: [0.01],
granularity: 1,
},
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const traceIdColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "traceId",
title: "Trace ID",
description: "Correlation with traces",
required: false,
type: TableColumnType.Text,
codec: { codec: "ZSTD", level: 1 },
skipIndex: {
name: "idx_trace_id",
type: SkipIndexType.BloomFilter,
params: [0.01],
granularity: 1,
},
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const spanIdColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "spanId",
title: "Span ID",
description: "Correlation with spans",
required: false,
type: TableColumnType.Text,
codec: { codec: "ZSTD", level: 1 },
skipIndex: {
name: "idx_span_id",
type: SkipIndexType.BloomFilter,
params: [0.01],
granularity: 1,
},
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const startTimeColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "startTime",
title: "Start Time",
description: "Profile start timestamp",
required: true,
type: TableColumnType.DateTime64,
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const endTimeColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "endTime",
title: "End Time",
description: "Profile end timestamp",
required: true,
type: TableColumnType.DateTime64,
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const startTimeUnixNanoColumn: AnalyticsTableColumn =
new AnalyticsTableColumn({
key: "startTimeUnixNano",
title: "Start Time in Unix Nano",
description: "Profile start timestamp in unix nanoseconds",
required: true,
type: TableColumnType.LongNumber,
codec: { codec: "ZSTD", level: 1 },
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const endTimeUnixNanoColumn: AnalyticsTableColumn =
new AnalyticsTableColumn({
key: "endTimeUnixNano",
title: "End Time in Unix Nano",
description: "Profile end timestamp in unix nanoseconds",
required: true,
type: TableColumnType.LongNumber,
codec: { codec: "ZSTD", level: 1 },
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const durationNanoColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "durationNano",
title: "Duration in Nanoseconds",
description: "Duration of the profile in nanoseconds",
required: true,
type: TableColumnType.LongNumber,
codec: { codec: "ZSTD", level: 1 },
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const profileTypeColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "profileType",
title: "Profile Type",
description:
"Type of profile (e.g., cpu, wall, alloc_objects, alloc_space, goroutine)",
required: true,
type: TableColumnType.Text,
skipIndex: {
name: "idx_profile_type",
type: SkipIndexType.Set,
params: [20],
granularity: 4,
},
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const unitColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "unit",
title: "Unit",
description: "Unit of the profile values (e.g., nanoseconds, bytes)",
required: true,
type: TableColumnType.Text,
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const periodTypeColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "periodType",
title: "Period Type",
description: "Sampling period type",
required: false,
type: TableColumnType.Text,
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const periodColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "period",
title: "Period",
description: "Sampling period value",
required: false,
type: TableColumnType.LongNumber,
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const attributesColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "attributes",
title: "Attributes",
description: "Profile-level attributes",
required: true,
defaultValue: {},
type: TableColumnType.MapStringString,
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const attributeKeysColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "attributeKeys",
title: "Attribute Keys",
description: "Attribute keys extracted from attributes",
required: true,
defaultValue: [],
type: TableColumnType.ArrayText,
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const sampleCountColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "sampleCount",
title: "Sample Count",
description: "Number of samples in this profile",
required: true,
type: TableColumnType.Number,
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const originalPayloadFormatColumn: AnalyticsTableColumn =
new AnalyticsTableColumn({
key: "originalPayloadFormat",
title: "Original Payload Format",
description: "Format of the original payload (e.g., pprofext)",
required: false,
type: TableColumnType.Text,
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const retentionDateColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "retentionDate",
title: "Retention Date",
description:
"Date after which this row is eligible for TTL deletion, computed at ingest time as startTime + service.retainTelemetryDataForDays",
required: true,
type: TableColumnType.Date,
defaultValue: undefined,
});
super({
tableName: AnalyticsTableName.Profile,
tableEngine: AnalyticsTableEngine.MergeTree,
singularName: "Profile",
pluralName: "Profiles",
crudApiPath: new Route("/profile"),
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.EditTelemetryServiceProfiles,
],
delete: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.DeleteTelemetryServiceProfiles,
],
},
tableColumns: [
projectIdColumn,
serviceIdColumn,
profileIdColumn,
traceIdColumn,
spanIdColumn,
startTimeColumn,
endTimeColumn,
startTimeUnixNanoColumn,
endTimeUnixNanoColumn,
durationNanoColumn,
profileTypeColumn,
unitColumn,
periodTypeColumn,
periodColumn,
attributesColumn,
attributeKeysColumn,
sampleCountColumn,
originalPayloadFormatColumn,
retentionDateColumn,
],
sortKeys: ["projectId", "startTime", "serviceId", "profileType"],
primaryKeys: ["projectId", "startTime", "serviceId", "profileType"],
partitionKey: "sipHash64(projectId) % 16",
ttlExpression: "retentionDate DELETE",
});
}
public get projectId(): ObjectID | undefined {
return this.getColumnValue("projectId") as ObjectID | undefined;
}
public set projectId(v: ObjectID | undefined) {
this.setColumnValue("projectId", v);
}
public get serviceId(): ObjectID | undefined {
return this.getColumnValue("serviceId") as ObjectID | undefined;
}
public set serviceId(v: ObjectID | undefined) {
this.setColumnValue("serviceId", v);
}
public get profileId(): string | undefined {
return this.getColumnValue("profileId") as string | undefined;
}
public set profileId(v: string | undefined) {
this.setColumnValue("profileId", v);
}
public get traceId(): string | undefined {
return this.getColumnValue("traceId") as string | undefined;
}
public set traceId(v: string | undefined) {
this.setColumnValue("traceId", v);
}
public get spanId(): string | undefined {
return this.getColumnValue("spanId") as string | undefined;
}
public set spanId(v: string | undefined) {
this.setColumnValue("spanId", v);
}
public get startTime(): Date | undefined {
return this.getColumnValue("startTime") as Date | undefined;
}
public set startTime(v: Date | undefined) {
this.setColumnValue("startTime", v);
}
public get endTime(): Date | undefined {
return this.getColumnValue("endTime") as Date | undefined;
}
public set endTime(v: Date | undefined) {
this.setColumnValue("endTime", v);
}
public get startTimeUnixNano(): number | undefined {
return this.getColumnValue("startTimeUnixNano") as number | undefined;
}
public set startTimeUnixNano(v: number | undefined) {
this.setColumnValue("startTimeUnixNano", v);
}
public get endTimeUnixNano(): number | undefined {
return this.getColumnValue("endTimeUnixNano") as number | undefined;
}
public set endTimeUnixNano(v: number | undefined) {
this.setColumnValue("endTimeUnixNano", v);
}
public get durationNano(): number | undefined {
return this.getColumnValue("durationNano") as number | undefined;
}
public set durationNano(v: number | undefined) {
this.setColumnValue("durationNano", v);
}
public get profileType(): string | undefined {
return this.getColumnValue("profileType") as string | undefined;
}
public set profileType(v: string | undefined) {
this.setColumnValue("profileType", v);
}
public get unit(): string | undefined {
return this.getColumnValue("unit") as string | undefined;
}
public set unit(v: string | undefined) {
this.setColumnValue("unit", v);
}
public get periodType(): string | undefined {
return this.getColumnValue("periodType") as string | undefined;
}
public set periodType(v: string | undefined) {
this.setColumnValue("periodType", v);
}
public get period(): number | undefined {
return this.getColumnValue("period") as number | undefined;
}
public set period(v: number | undefined) {
this.setColumnValue("period", v);
}
public get attributes(): JSONObject | undefined {
return this.getColumnValue("attributes") as JSONObject | undefined;
}
public set attributes(v: JSONObject | undefined) {
this.setColumnValue("attributes", v);
}
public get attributeKeys(): Array<string> | undefined {
return this.getColumnValue("attributeKeys") as Array<string> | undefined;
}
public set attributeKeys(v: Array<string> | undefined) {
this.setColumnValue("attributeKeys", v);
}
public get sampleCount(): number | undefined {
return this.getColumnValue("sampleCount") as number | undefined;
}
public set sampleCount(v: number | undefined) {
this.setColumnValue("sampleCount", v);
}
public get originalPayloadFormat(): string | undefined {
return this.getColumnValue("originalPayloadFormat") as string | undefined;
}
public set originalPayloadFormat(v: string | undefined) {
this.setColumnValue("originalPayloadFormat", v);
}
public get retentionDate(): Date | undefined {
return this.getColumnValue("retentionDate") as Date | undefined;
}
public set retentionDate(v: Date | undefined) {
this.setColumnValue("retentionDate", v);
}
}

View File

@@ -0,0 +1,546 @@
import AnalyticsBaseModel from "./AnalyticsBaseModel/AnalyticsBaseModel";
import Route from "../../Types/API/Route";
import AnalyticsTableEngine from "../../Types/AnalyticsDatabase/AnalyticsTableEngine";
import AnalyticsTableName from "../../Types/AnalyticsDatabase/AnalyticsTableName";
import AnalyticsTableColumn, {
SkipIndexType,
} from "../../Types/AnalyticsDatabase/TableColumn";
import TableColumnType from "../../Types/AnalyticsDatabase/TableColumnType";
import { JSONObject } from "../../Types/JSON";
import ObjectID from "../../Types/ObjectID";
import Permission from "../../Types/Permission";
export default class ProfileSample extends AnalyticsBaseModel {
public constructor() {
const projectIdColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "projectId",
title: "Project ID",
description: "ID of project",
required: true,
type: TableColumnType.ObjectID,
isTenantId: true,
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const serviceIdColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "serviceId",
title: "Service ID",
description: "ID of the Service which created the profile",
required: true,
type: TableColumnType.ObjectID,
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const profileIdColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "profileId",
title: "Profile ID",
description: "FK to profile table",
required: true,
type: TableColumnType.Text,
codec: { codec: "ZSTD", level: 1 },
skipIndex: {
name: "idx_profile_id",
type: SkipIndexType.BloomFilter,
params: [0.01],
granularity: 1,
},
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const traceIdColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "traceId",
title: "Trace ID",
description: "Trace correlation (from Link table)",
required: false,
type: TableColumnType.Text,
codec: { codec: "ZSTD", level: 1 },
skipIndex: {
name: "idx_trace_id",
type: SkipIndexType.BloomFilter,
params: [0.01],
granularity: 1,
},
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const spanIdColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "spanId",
title: "Span ID",
description: "Span correlation (from Link table)",
required: false,
type: TableColumnType.Text,
codec: { codec: "ZSTD", level: 1 },
skipIndex: {
name: "idx_span_id",
type: SkipIndexType.BloomFilter,
params: [0.01],
granularity: 1,
},
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const timeColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "time",
title: "Time",
description: "Sample timestamp",
required: true,
type: TableColumnType.DateTime64,
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const timeUnixNanoColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "timeUnixNano",
title: "Time (in Unix Nano)",
description: "Sample timestamp in unix nanoseconds",
required: true,
type: TableColumnType.LongNumber,
codec: { codec: "ZSTD", level: 1 },
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const stacktraceColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "stacktrace",
title: "Stacktrace",
description: "Fully-resolved stack frames (function@file:line)",
required: true,
defaultValue: [],
type: TableColumnType.ArrayText,
codec: { codec: "ZSTD", level: 3 },
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const stacktraceHashColumn: AnalyticsTableColumn =
new AnalyticsTableColumn({
key: "stacktraceHash",
title: "Stacktrace Hash",
description: "Hash of stacktrace for grouping",
required: true,
type: TableColumnType.Text,
codec: { codec: "ZSTD", level: 1 },
skipIndex: {
name: "idx_stacktrace_hash",
type: SkipIndexType.BloomFilter,
params: [0.01],
granularity: 1,
},
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const frameTypesColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "frameTypes",
title: "Frame Types",
description:
"Per-frame runtime type (kernel, native, jvm, cpython, go, v8js, etc.)",
required: true,
defaultValue: [],
type: TableColumnType.ArrayText,
codec: { codec: "ZSTD", level: 3 },
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const valueColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "value",
title: "Value",
description: "Sample value (CPU time, bytes, count)",
required: true,
type: TableColumnType.LongNumber,
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const profileTypeColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "profileType",
title: "Profile Type",
description: "Denormalized profile type for filtering",
required: true,
type: TableColumnType.Text,
skipIndex: {
name: "idx_profile_type",
type: SkipIndexType.Set,
params: [20],
granularity: 4,
},
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const labelsColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "labels",
title: "Labels",
description: "Sample-level labels",
required: true,
defaultValue: {},
type: TableColumnType.MapStringString,
codec: { codec: "ZSTD", level: 3 },
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [],
},
});
const retentionDateColumn: AnalyticsTableColumn = new AnalyticsTableColumn({
key: "retentionDate",
title: "Retention Date",
description:
"Date after which this row is eligible for TTL deletion, computed at ingest time as time + service.retainTelemetryDataForDays",
required: true,
type: TableColumnType.Date,
defaultValue: undefined,
});
super({
tableName: AnalyticsTableName.ProfileSample,
tableEngine: AnalyticsTableEngine.MergeTree,
singularName: "ProfileSample",
pluralName: "ProfileSamples",
crudApiPath: new Route("/profile-sample"),
accessControl: {
read: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.ReadTelemetryServiceProfiles,
],
create: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.CreateTelemetryServiceProfiles,
],
update: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.EditTelemetryServiceProfiles,
],
delete: [
Permission.ProjectOwner,
Permission.ProjectAdmin,
Permission.ProjectMember,
Permission.DeleteTelemetryServiceProfiles,
],
},
tableColumns: [
projectIdColumn,
serviceIdColumn,
profileIdColumn,
traceIdColumn,
spanIdColumn,
timeColumn,
timeUnixNanoColumn,
stacktraceColumn,
stacktraceHashColumn,
frameTypesColumn,
valueColumn,
profileTypeColumn,
labelsColumn,
retentionDateColumn,
],
sortKeys: [
"projectId",
"time",
"serviceId",
"profileType",
"stacktraceHash",
],
primaryKeys: [
"projectId",
"time",
"serviceId",
"profileType",
"stacktraceHash",
],
partitionKey: "sipHash64(projectId) % 16",
ttlExpression: "retentionDate DELETE",
});
}
public get projectId(): ObjectID | undefined {
return this.getColumnValue("projectId") as ObjectID | undefined;
}
public set projectId(v: ObjectID | undefined) {
this.setColumnValue("projectId", v);
}
public get serviceId(): ObjectID | undefined {
return this.getColumnValue("serviceId") as ObjectID | undefined;
}
public set serviceId(v: ObjectID | undefined) {
this.setColumnValue("serviceId", v);
}
public get profileId(): string | undefined {
return this.getColumnValue("profileId") as string | undefined;
}
public set profileId(v: string | undefined) {
this.setColumnValue("profileId", v);
}
public get traceId(): string | undefined {
return this.getColumnValue("traceId") as string | undefined;
}
public set traceId(v: string | undefined) {
this.setColumnValue("traceId", v);
}
public get spanId(): string | undefined {
return this.getColumnValue("spanId") as string | undefined;
}
public set spanId(v: string | undefined) {
this.setColumnValue("spanId", v);
}
public get time(): Date | undefined {
return this.getColumnValue("time") as Date | undefined;
}
public set time(v: Date | undefined) {
this.setColumnValue("time", v);
}
public get timeUnixNano(): number | undefined {
return this.getColumnValue("timeUnixNano") as number | undefined;
}
public set timeUnixNano(v: number | undefined) {
this.setColumnValue("timeUnixNano", v);
}
public get stacktrace(): Array<string> | undefined {
return this.getColumnValue("stacktrace") as Array<string> | undefined;
}
public set stacktrace(v: Array<string> | undefined) {
this.setColumnValue("stacktrace", v);
}
public get stacktraceHash(): string | undefined {
return this.getColumnValue("stacktraceHash") as string | undefined;
}
public set stacktraceHash(v: string | undefined) {
this.setColumnValue("stacktraceHash", v);
}
public get frameTypes(): Array<string> | undefined {
return this.getColumnValue("frameTypes") as Array<string> | undefined;
}
public set frameTypes(v: Array<string> | undefined) {
this.setColumnValue("frameTypes", v);
}
public get value(): number | undefined {
return this.getColumnValue("value") as number | undefined;
}
public set value(v: number | undefined) {
this.setColumnValue("value", v);
}
public get profileType(): string | undefined {
return this.getColumnValue("profileType") as string | undefined;
}
public set profileType(v: string | undefined) {
this.setColumnValue("profileType", v);
}
public get labels(): JSONObject | undefined {
return this.getColumnValue("labels") as JSONObject | undefined;
}
public set labels(v: JSONObject | undefined) {
this.setColumnValue("labels", v);
}
public get retentionDate(): Date | undefined {
return this.getColumnValue("retentionDate") as Date | undefined;
}
public set retentionDate(v: Date | undefined) {
this.setColumnValue("retentionDate", v);
}
}

View File

@@ -0,0 +1,11 @@
import ClickhouseDatabase from "../Infrastructure/ClickhouseDatabase";
import AnalyticsDatabaseService from "./AnalyticsDatabaseService";
import ProfileSample from "../../Models/AnalyticsModels/ProfileSample";
export class ProfileSampleService extends AnalyticsDatabaseService<ProfileSample> {
public constructor(clickhouseDatabase?: ClickhouseDatabase | undefined) {
super({ modelType: ProfileSample, database: clickhouseDatabase });
}
}
export default new ProfileSampleService();

View File

@@ -0,0 +1,11 @@
import ClickhouseDatabase from "../Infrastructure/ClickhouseDatabase";
import AnalyticsDatabaseService from "./AnalyticsDatabaseService";
import Profile from "../../Models/AnalyticsModels/Profile";
export class ProfileService extends AnalyticsDatabaseService<Profile> {
public constructor(clickhouseDatabase?: ClickhouseDatabase | undefined) {
super({ modelType: Profile, database: clickhouseDatabase });
}
}
export default new ProfileService();

View File

@@ -4,6 +4,8 @@ enum AnalyticsTableName {
ExceptionInstance = "ExceptionItemV2",
Span = "SpanItemV2",
MonitorLog = "MonitorLogV2",
Profile = "ProfileItemV2",
ProfileSample = "ProfileSampleItemV2",
}
export default AnalyticsTableName;

View File

@@ -2,6 +2,7 @@ enum ProductType {
Logs = "Logs",
Traces = "Traces",
Metrics = "Metrics",
Profiles = "Profiles",
ActiveMonitoring = "Active Monitoring",
}

View File

@@ -130,6 +130,12 @@ enum Permission {
EditTelemetryServiceMetrics = "EditTelemetryServiceMetrics",
ReadTelemetryServiceMetrics = "ReadTelemetryServiceMetrics",
// Profiles
CreateTelemetryServiceProfiles = "CreateTelemetryServiceProfiles",
DeleteTelemetryServiceProfiles = "DeleteTelemetryServiceProfiles",
EditTelemetryServiceProfiles = "EditTelemetryServiceProfiles",
ReadTelemetryServiceProfiles = "ReadTelemetryServiceProfiles",
// Billing Permissions (Owner Permission)
ManageProjectBilling = "ManageProjectBilling",
@@ -4745,6 +4751,43 @@ export class PermissionHelper {
group: PermissionGroup.Telemetry,
},
{
permission: Permission.CreateTelemetryServiceProfiles,
title: "Create Telemetry Service Profiles",
description:
"This permission can create Telemetry Service Profiles this project.",
isAssignableToTenant: true,
isAccessControlPermission: false,
group: PermissionGroup.Telemetry,
},
{
permission: Permission.DeleteTelemetryServiceProfiles,
title: "Delete Telemetry Service Profiles",
description:
"This permission can delete Telemetry Service Profiles of this project.",
isAssignableToTenant: true,
isAccessControlPermission: false,
group: PermissionGroup.Telemetry,
},
{
permission: Permission.EditTelemetryServiceProfiles,
title: "Edit Telemetry Service Profiles",
description:
"This permission can edit Telemetry Service Profiles of this project.",
isAssignableToTenant: true,
isAccessControlPermission: false,
group: PermissionGroup.Telemetry,
},
{
permission: Permission.ReadTelemetryServiceProfiles,
title: "Read Telemetry Service Profiles",
description:
"This permission can read Telemetry Service Profiles of this project.",
isAssignableToTenant: true,
isAccessControlPermission: false,
group: PermissionGroup.Telemetry,
},
{
permission: Permission.CreateScheduledMaintenanceOwnerTeam,
title: "Create Scheduled Maintenance Team Owner",

View File

@@ -3,6 +3,7 @@ enum TelemetryType {
Trace = "Trace",
Log = "Log",
Exception = "Exception",
Profile = "Profile",
}
export default TelemetryType;

View File

@@ -10,6 +10,7 @@ import OpenTelemetryRequestMiddleware from "../Middleware/OtelRequestMiddleware"
import OtelTracesIngestService from "../Services/OtelTracesIngestService";
import OtelMetricsIngestService from "../Services/OtelMetricsIngestService";
import OtelLogsIngestService from "../Services/OtelLogsIngestService";
import OtelProfilesIngestService from "../Services/OtelProfilesIngestService";
import TelemetryQueueService from "../Services/Queue/TelemetryQueueService";
import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
import { JSONObject } from "Common/Types/JSON";
@@ -61,6 +62,19 @@ router.post(
},
);
router.post(
"/otlp/v1/profiles",
OpenTelemetryRequestMiddleware.getProductType,
TelemetryIngest.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
return OtelProfilesIngestService.ingestProfiles(req, res, next);
},
);
// Queue stats endpoint
router.get(
"/otlp/queue/stats",

View File

@@ -48,6 +48,16 @@ export const TELEMETRY_EXCEPTION_FLUSH_BATCH_SIZE: number = parseBatchSize(
500,
);
export const TELEMETRY_PROFILE_FLUSH_BATCH_SIZE: number = parseBatchSize(
"TELEMETRY_PROFILE_FLUSH_BATCH_SIZE",
500,
);
export const TELEMETRY_PROFILE_SAMPLE_FLUSH_BATCH_SIZE: number = parseBatchSize(
"TELEMETRY_PROFILE_SAMPLE_FLUSH_BATCH_SIZE",
500,
);
/*
* Some telemetry batches can be large and take >30s (BullMQ default lock) to process.
* Allow configuring a longer lock duration (in ms) to avoid premature stall detection.

View File

@@ -10,6 +10,7 @@ import { TelemetryRequest } from "Common/Server/Middleware/TelemetryIngest";
import TracesQueueService from "./Services/Queue/TracesQueueService";
import LogsQueueService from "./Services/Queue/LogsQueueService";
import MetricsQueueService from "./Services/Queue/MetricsQueueService";
import ProfilesQueueService from "./Services/Queue/ProfilesQueueService";
const GRPC_PORT: number = 4317;
@@ -162,11 +163,23 @@ export function startGrpcServer(): void {
},
);
const profilesServiceDef: protoLoader.PackageDefinition =
protoLoader.loadSync(path.join(PROTO_DIR, "profiles_service.proto"), {
keepCase: false,
longs: String,
enums: String,
defaults: true,
oneofs: true,
includeDirs: [PROTO_DIR],
});
const traceProto: grpc.GrpcObject =
grpc.loadPackageDefinition(traceServiceDef);
const logsProto: grpc.GrpcObject = grpc.loadPackageDefinition(logsServiceDef);
const metricsProto: grpc.GrpcObject =
grpc.loadPackageDefinition(metricsServiceDef);
const profilesProto: grpc.GrpcObject =
grpc.loadPackageDefinition(profilesServiceDef);
type ProtoServiceDef = {
service: grpc.ServiceDefinition;
@@ -213,6 +226,17 @@ export function startGrpcServer(): void {
"MetricsService",
);
const profilesServiceDefinition: grpc.ServiceDefinition =
getServiceDefinition(
profilesProto,
"opentelemetry",
"proto",
"collector",
"profiles",
"v1development",
"ProfilesService",
);
const server: grpc.Server = new grpc.Server({
"grpc.max_receive_message_length": 50 * 1024 * 1024, // 50MB
});
@@ -250,6 +274,17 @@ export function startGrpcServer(): void {
},
});
server.addService(profilesServiceDefinition, {
Export: (call: GrpcCall, callback: GrpcCallback): void => {
handleExport(
call,
callback,
ProductType.Profiles,
ProfilesQueueService.addProfileIngestJob.bind(ProfilesQueueService),
);
},
});
server.bindAsync(
`0.0.0.0:${GRPC_PORT}`,
grpc.ServerCredentials.createInsecure(),

View File

@@ -5,6 +5,7 @@ import {
import OtelLogsIngestService from "../../Services/OtelLogsIngestService";
import OtelTracesIngestService from "../../Services/OtelTracesIngestService";
import OtelMetricsIngestService from "../../Services/OtelMetricsIngestService";
import OtelProfilesIngestService from "../../Services/OtelProfilesIngestService";
import SyslogIngestService from "../../Services/SyslogIngestService";
import FluentLogsIngestService from "../../Services/FluentLogsIngestService";
import {
@@ -80,6 +81,20 @@ QueueWorker.getWorker(
break;
}
case TelemetryType.Profiles: {
const mockRequest: TelemetryRequest = {
projectId: new ObjectID(jobData.projectId!.toString()),
body: jobData.requestBody!,
headers: jobData.requestHeaders!,
} as TelemetryRequest;
await OtelProfilesIngestService.processProfilesFromQueue(mockRequest);
logger.debug(
`Successfully processed profiles for project: ${jobData.projectId}`,
);
break;
}
case TelemetryType.Syslog: {
const mockRequest: TelemetryRequest = {
projectId: new ObjectID(jobData.projectId!.toString()),

View File

@@ -25,10 +25,16 @@ const MetricsProto: protobuf.Root = protobuf.loadSync(
"/usr/src/app/ProtoFiles/OTel/v1/metrics.proto",
);
const ProfilesProto: protobuf.Root = protobuf.loadSync(
"/usr/src/app/ProtoFiles/OTel/v1/profiles.proto",
);
// Lookup the message type
const LogsData: protobuf.Type = LogsProto.lookupType("LogsData");
const TracesData: protobuf.Type = TracesProto.lookupType("TracesData");
const MetricsData: protobuf.Type = MetricsProto.lookupType("MetricsData");
const ProfilesData: protobuf.Type =
ProfilesProto.lookupType("ProfilesData");
export default class OpenTelemetryRequestMiddleware {
@CaptureSpan()
@@ -69,6 +75,13 @@ export default class OpenTelemetryRequestMiddleware {
req.body = JSON.parse(Buffer.from(req.body).toString("utf-8"));
}
productType = ProductType.Metrics;
} else if (req.url.includes("/otlp/v1/profiles")) {
if (isProtobuf) {
req.body = ProfilesData.decode(req.body);
} else if (req.body instanceof Uint8Array) {
req.body = JSON.parse(Buffer.from(req.body).toString("utf-8"));
}
productType = ProductType.Profiles;
} else {
throw new BadRequestException("Invalid URL: " + req.baseUrl);
}

View File

@@ -0,0 +1,275 @@
// Copyright 2024, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package opentelemetry.proto.profiles.v1development;
import "./common.proto";
import "./resource.proto";
option csharp_namespace = "OpenTelemetry.Proto.Profiles.V1Development";
option java_multiple_files = true;
option java_package = "io.opentelemetry.proto.profiles.v1development";
option java_outer_classname = "ProfilesProto";
option go_package = "go.opentelemetry.io/proto/otlp/profiles/v1development";
// ProfilesData represents the profiles data that can be stored in a persistent storage,
// OR can be embedded by other protocols that transfer OTLP profiles data but do
// not implement the OTLP protocol.
message ProfilesData {
// An array of ResourceProfiles.
repeated ResourceProfiles resource_profiles = 1;
}
// A collection of ScopeProfiles from a Resource.
message ResourceProfiles {
reserved 1000;
// The resource for the profiles in this message.
opentelemetry.proto.resource.v1.Resource resource = 1;
// A list of ScopeProfiles that originate from a resource.
repeated ScopeProfiles scope_profiles = 2;
// The Schema URL.
string schema_url = 3;
}
// A collection of ProfileContainers produced by an InstrumentationScope.
message ScopeProfiles {
// The instrumentation scope information for the profiles in this message.
opentelemetry.proto.common.v1.InstrumentationScope scope = 1;
// A list of ProfileContainers that originate from an instrumentation scope.
repeated ProfileContainer profiles = 2;
// The Schema URL.
string schema_url = 3;
}
// A ProfileContainer represents a single profile along with metadata.
message ProfileContainer {
// A unique identifier for a profile.
bytes profile_id = 1;
// start_time_unix_nano is the start time of the profile.
fixed64 start_time_unix_nano = 2;
// end_time_unix_nano is the end time of the profile.
fixed64 end_time_unix_nano = 3;
// attributes is a collection of key/value pairs.
repeated opentelemetry.proto.common.v1.KeyValue attributes = 4;
// dropped_attributes_count is the number of attributes that were discarded.
uint32 dropped_attributes_count = 5;
// Specifies format of the original payload. Common values are defined in semantic conventions.
string original_payload_format = 6;
// Original payload can be stored in this field.
bytes original_payload = 7;
// The profile data.
Profile profile = 8;
}
// Provides additional context for a profile.
message Profile {
// A description of the profile.
repeated ValueType sample_type = 1;
// The set of samples recorded in this profile.
repeated Sample sample = 2;
// Time of collection (UTC) represented as nanoseconds past the epoch.
fixed64 time_unix_nano = 3;
// Duration of the profile, if a duration makes sense.
fixed64 duration_nano = 4;
// The kind of events between sampled occurrences.
ValueType period_type = 5;
// The number of events between sampled occurrences.
int64 period = 6;
// Free-form text associated to the profile.
repeated int64 comment = 7; // Indices into string table.
// A common table for strings referenced by various messages.
repeated string string_table = 8;
// Index into the string table of the type of the preferred sample value.
int64 default_sample_type = 9;
// References to locations in the location_table.
repeated Location location_table = 10;
// Useful program location
repeated Function function_table = 11;
// Mapping from address to the mapped entry.
repeated Mapping mapping_table = 12;
// References to links in the link_table.
repeated Link link_table = 13;
// References to stacks in the stack_table.
repeated Stack stack_table = 14;
// Lookup table for attributes.
repeated opentelemetry.proto.common.v1.KeyValue attribute_table = 15;
// Represents a mapping between Attribute Keys and Units.
repeated AttributeUnit attribute_units = 16;
// dropped_attributes_count is the number of attributes that were discarded.
uint32 dropped_attributes_count = 17;
}
// ValueType describes the semantics and measurement units of a value.
message ValueType {
int64 type = 1; // Index into string table.
int64 unit = 2; // Index into string table.
AggregationTemporality aggregation_temporality = 3;
}
// Specifies the method of aggregating metric values.
enum AggregationTemporality {
AGGREGATION_TEMPORALITY_UNSPECIFIED = 0;
AGGREGATION_TEMPORALITY_DELTA = 1;
AGGREGATION_TEMPORALITY_CUMULATIVE = 2;
}
// Each Sample records values encountered in some program context.
message Sample {
// The index into the stack_table for the callstack of this sample.
int32 locations_start_index = 1;
// locations_length limits the number of locations to use from the stack starting from locations_start_index.
int32 locations_length = 2;
// The index into the stack_table for the stack of this sample.
int32 stack_index = 3;
// The type and unit of each value is defined by the corresponding
// entry in Profile.sample_type.
repeated int64 value = 4;
// References to attributes in Profile.attribute_table.
repeated int32 attribute_indices = 5;
// Reference to link in Profile.link_table.
int32 link_index = 6;
// Timestamps associated with the sample.
repeated fixed64 timestamps_unix_nano = 7;
}
// Provides the mapping between a function name and its address/binary.
message Mapping {
// Address at which the binary (or DLL) is loaded into memory.
uint64 memory_start = 1;
// The limit of the address range occupied by this mapping.
uint64 memory_limit = 2;
// Offset in the binary that corresponds to the first mapped address.
uint64 file_offset = 3;
// The object this entry is loaded from.
int64 filename = 4; // Index into string table
// References to attributes in Profile.attribute_table.
repeated int32 attribute_indices = 5;
// If true, the mapping was not able to be identified.
bool has_functions = 6;
bool has_filenames = 7;
bool has_line_numbers = 8;
bool has_inline_frames = 9;
}
// Describes function and line number information.
message Location {
// Reference to mapping in Profile.mapping_table.
int32 mapping_index = 1;
// The instruction address for this location.
uint64 address = 2;
// Multiple line indicates this location has inlined functions.
repeated Line line = 3;
// If true, this location address has NOT been resolved to a function and source.
bool is_folded = 4;
// Type of frame (kernel, native, jvm, cpython, go, v8js, etc.).
int32 type_index = 5;
// References to attributes in Profile.attribute_table.
repeated int32 attribute_indices = 6;
}
// Details a specific line in a source code, linked to a function.
message Line {
// Reference to function in Profile.function_table.
int32 function_index = 1;
// Line number in source code.
int64 line = 2;
// Column number in source code.
int64 column = 3;
}
// Describes a function, including its human-readable name.
message Function {
// Name of the function, in human-readable form.
int64 name = 1; // Index into string table
// Name of the function, as identified by the system.
int64 system_name = 2; // Index into string table
// Source file containing the function.
int64 filename = 3; // Index into string table
// Line number in source file.
int64 start_line = 4;
}
// A pointer from a profile Sample to a trace Span.
message Link {
// A unique identifier of a trace.
bytes trace_id = 1;
// A unique identifier for the linked span.
bytes span_id = 2;
}
// A Stack is a collection of location references.
message Stack {
// References to locations in the location_table.
repeated int32 location_indices = 1;
}
// Defines the mapping between an attribute key and its unit.
message AttributeUnit {
// Index into string table for the attribute key.
int64 attribute_key = 1;
// Index into string table for the unit.
int64 unit = 2;
}

View File

@@ -0,0 +1,53 @@
// Copyright 2024, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package opentelemetry.proto.collector.profiles.v1development;
import "./profiles.proto";
option csharp_namespace = "OpenTelemetry.Proto.Collector.Profiles.V1Development";
option java_multiple_files = true;
option java_package = "io.opentelemetry.proto.collector.profiles.v1development";
option java_outer_classname = "ProfilesServiceProto";
option go_package = "go.opentelemetry.io/proto/otlp/collector/profiles/v1development";
// Service that can be used to push profiles between one Application
// instrumented with OpenTelemetry and a collector, or between a collector and a
// central collector.
service ProfilesService {
// For performance reasons, it is recommended to keep this RPC
// alive for the entire life of the application.
rpc Export(ExportProfilesServiceRequest) returns (ExportProfilesServiceResponse) {}
}
message ExportProfilesServiceRequest {
// An array of ResourceProfiles.
repeated opentelemetry.proto.profiles.v1development.ResourceProfiles resource_profiles = 1;
}
message ExportProfilesServiceResponse {
// The details of a partially successful export request.
ExportProfilesPartialSuccess partial_success = 1;
}
// ExportProfilesPartialSuccess is used when some profiles in the request could not be processed.
message ExportProfilesPartialSuccess {
// The number of rejected profiles.
int64 rejected_profiles = 1;
// A developer-facing human-readable message.
string error_message = 2;
}

View File

@@ -0,0 +1,834 @@
import { TelemetryRequest } from "Common/Server/Middleware/TelemetryIngest";
import OTelIngestService, {
TelemetryServiceMetadata,
} from "Common/Server/Services/OpenTelemetryIngestService";
import OneUptimeDate from "Common/Types/Date";
import BadRequestException from "Common/Types/Exception/BadRequestException";
import {
ExpressRequest,
ExpressResponse,
NextFunction,
} from "Common/Server/Utils/Express";
import Response from "Common/Server/Utils/Response";
import Dictionary from "Common/Types/Dictionary";
import ObjectID from "Common/Types/ObjectID";
import TelemetryUtil, {
AttributeType,
} from "Common/Server/Utils/Telemetry/Telemetry";
import { JSONArray, JSONObject } from "Common/Types/JSON";
import logger from "Common/Server/Utils/Logger";
import ProfileService from "Common/Server/Services/ProfileService";
import ProfileSampleService from "Common/Server/Services/ProfileSampleService";
import CaptureSpan from "Common/Server/Utils/Telemetry/CaptureSpan";
import Text from "Common/Types/Text";
import ProfilesQueueService from "./Queue/ProfilesQueueService";
import OtelIngestBaseService from "./OtelIngestBaseService";
import {
TELEMETRY_PROFILE_FLUSH_BATCH_SIZE,
TELEMETRY_PROFILE_SAMPLE_FLUSH_BATCH_SIZE,
} from "../Config";
import crypto from "crypto";
type ParsedUnixNano = {
unixNano: number;
nano: string;
iso: string;
date: Date;
};
export default class OtelProfilesIngestService extends OtelIngestBaseService {
private static async flushProfilesBuffer(
profiles: Array<JSONObject>,
force: boolean = false,
): Promise<void> {
while (
profiles.length >= TELEMETRY_PROFILE_FLUSH_BATCH_SIZE ||
(force && profiles.length > 0)
) {
const batchSize: number = Math.min(
profiles.length,
TELEMETRY_PROFILE_FLUSH_BATCH_SIZE,
);
const batch: Array<JSONObject> = profiles.splice(0, batchSize);
if (batch.length === 0) {
continue;
}
await ProfileService.insertJsonRows(batch);
}
}
private static async flushSamplesBuffer(
samples: Array<JSONObject>,
force: boolean = false,
): Promise<void> {
while (
samples.length >= TELEMETRY_PROFILE_SAMPLE_FLUSH_BATCH_SIZE ||
(force && samples.length > 0)
) {
const batchSize: number = Math.min(
samples.length,
TELEMETRY_PROFILE_SAMPLE_FLUSH_BATCH_SIZE,
);
const batch: Array<JSONObject> = samples.splice(0, batchSize);
if (batch.length === 0) {
continue;
}
await ProfileSampleService.insertJsonRows(batch);
}
}
@CaptureSpan()
public static async ingestProfiles(
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> {
try {
if (!(req as TelemetryRequest).projectId) {
throw new BadRequestException(
"Invalid request - projectId not found in request.",
);
}
req.body = req.body.toJSON ? req.body.toJSON() : req.body;
Response.sendEmptySuccessResponse(req, res);
await ProfilesQueueService.addProfileIngestJob(req as TelemetryRequest);
return;
} catch (err) {
return next(err);
}
}
@CaptureSpan()
public static async processProfilesFromQueue(
req: ExpressRequest,
): Promise<void> {
await this.processProfilesAsync(req);
}
@CaptureSpan()
private static async processProfilesAsync(
req: ExpressRequest,
): Promise<void> {
try {
const resourceProfiles: JSONArray = req.body[
"resourceProfiles"
] as JSONArray;
if (!resourceProfiles || !Array.isArray(resourceProfiles)) {
logger.error("Invalid resourceProfiles format in request body");
throw new BadRequestException("Invalid resourceProfiles format");
}
const dbProfiles: Array<JSONObject> = [];
const dbSamples: Array<JSONObject> = [];
const serviceDictionary: Dictionary<TelemetryServiceMetadata> = {};
let totalProfilesProcessed: number = 0;
let resourceProfileCounter: number = 0;
for (const resourceProfile of resourceProfiles) {
try {
if (resourceProfileCounter % 25 === 0) {
await Promise.resolve();
}
resourceProfileCounter++;
const serviceName: string = this.getServiceNameFromAttributes(
req,
((resourceProfile["resource"] as JSONObject)?.[
"attributes"
] as JSONArray) || [],
);
if (!serviceDictionary[serviceName]) {
const service: {
serviceId: ObjectID;
dataRententionInDays: number;
} = await OTelIngestService.telemetryServiceFromName({
serviceName: serviceName,
projectId: (req as TelemetryRequest).projectId,
});
serviceDictionary[serviceName] = {
serviceName: serviceName,
serviceId: service.serviceId,
dataRententionInDays: service.dataRententionInDays,
};
}
const resourceAttributes: Dictionary<
AttributeType | Array<AttributeType>
> = {
...TelemetryUtil.getAttributesForServiceIdAndServiceName({
serviceId: serviceDictionary[serviceName]!.serviceId!,
serviceName: serviceName,
}),
...TelemetryUtil.getAttributes({
items:
((resourceProfile["resource"] as JSONObject)?.[
"attributes"
] as JSONArray) || [],
prefixKeysWithString: "resource",
}),
};
const scopeProfiles: JSONArray = resourceProfile[
"scopeProfiles"
] as JSONArray;
if (!scopeProfiles || !Array.isArray(scopeProfiles)) {
logger.warn(
"Invalid scopeProfiles format, skipping resource profile",
);
continue;
}
let scopeProfileCounter: number = 0;
for (const scopeProfile of scopeProfiles) {
try {
if (scopeProfileCounter % 50 === 0) {
await Promise.resolve();
}
scopeProfileCounter++;
const profileContainers: JSONArray = scopeProfile[
"profiles"
] as JSONArray;
if (!profileContainers || !Array.isArray(profileContainers)) {
logger.warn(
"Invalid profiles format, skipping scope profile",
);
continue;
}
let profileCounter: number = 0;
for (const profileContainer of profileContainers) {
try {
if (profileCounter % 100 === 0) {
await Promise.resolve();
}
profileCounter++;
const projectId: ObjectID = (req as TelemetryRequest)
.projectId;
const serviceId: ObjectID =
serviceDictionary[serviceName]!.serviceId!;
const dataRetentionInDays: number =
serviceDictionary[serviceName]!.dataRententionInDays;
const profileId: string = this.convertBase64ToHexSafe(
(profileContainer as JSONObject)["profileId"] as
| string
| undefined,
) || ObjectID.generate().toString();
const startTime: ParsedUnixNano = this.safeParseUnixNano(
(profileContainer as JSONObject)[
"startTimeUnixNano"
] as string | number | undefined,
"profile startTimeUnixNano",
);
const endTime: ParsedUnixNano = this.safeParseUnixNano(
(profileContainer as JSONObject)[
"endTimeUnixNano"
] as string | number | undefined,
"profile endTimeUnixNano",
);
const durationNano: number = Math.max(
0,
Math.trunc(endTime.unixNano - startTime.unixNano),
);
// Container-level attributes
const containerAttributes: Dictionary<
AttributeType | Array<AttributeType>
> = {
...resourceAttributes,
...TelemetryUtil.getAttributes({
items:
((profileContainer as JSONObject)[
"attributes"
] as JSONArray) || [],
prefixKeysWithString: "profileAttributes",
}),
};
if (
scopeProfile["scope"] &&
Object.keys(scopeProfile["scope"]).length > 0
) {
const scopeAttributes: JSONObject = scopeProfile[
"scope"
] as JSONObject;
for (const key of Object.keys(scopeAttributes)) {
containerAttributes[`scope.${key}`] = scopeAttributes[
key
] as AttributeType;
}
}
const attributeKeys: Array<string> =
TelemetryUtil.getAttributeKeys(containerAttributes);
const profile: JSONObject | undefined = (
profileContainer as JSONObject
)["profile"] as JSONObject | undefined;
const originalPayloadFormat: string =
((profileContainer as JSONObject)[
"originalPayloadFormat"
] as string) || "";
// Extract sample types from the profile
let profileType: string = "unknown";
let unit: string = "unknown";
let periodType: string = "";
let period: number = 0;
let sampleCount: number = 0;
if (profile) {
const stringTable: Array<string> =
(profile["stringTable"] as Array<string>) || [];
// Extract sample type from first sample_type entry
const sampleTypes: JSONArray =
(profile["sampleType"] as JSONArray) || [];
if (sampleTypes.length > 0) {
const firstSampleType: JSONObject =
sampleTypes[0] as JSONObject;
const typeIndex: number =
(firstSampleType["type"] as number) || 0;
const unitIndex: number =
(firstSampleType["unit"] as number) || 0;
if (stringTable[typeIndex]) {
profileType = stringTable[typeIndex]!;
}
if (stringTable[unitIndex]) {
unit = stringTable[unitIndex]!;
}
}
// Extract period type
const periodTypeObj: JSONObject | undefined =
profile["periodType"] as JSONObject | undefined;
if (periodTypeObj) {
const periodTypeIndex: number =
(periodTypeObj["type"] as number) || 0;
if (stringTable[periodTypeIndex]) {
periodType = stringTable[periodTypeIndex]!;
}
}
period = (profile["period"] as number) || 0;
// Process samples
const samples: JSONArray =
(profile["sample"] as JSONArray) || [];
sampleCount = samples.length;
// Build dictionary tables for denormalization
const functionTable: JSONArray =
(profile["functionTable"] as JSONArray) || [];
const locationTable: JSONArray =
(profile["locationTable"] as JSONArray) || [];
const linkTable: JSONArray =
(profile["linkTable"] as JSONArray) || [];
const stackTable: JSONArray =
(profile["stackTable"] as JSONArray) || [];
const attributeTable: JSONArray =
(profile["attributeTable"] as JSONArray) || [];
let sampleCounter: number = 0;
for (const sample of samples) {
try {
if (sampleCounter % 200 === 0) {
await Promise.resolve();
}
sampleCounter++;
const sampleObj: JSONObject = sample as JSONObject;
// Resolve the stack frames
const resolvedStack: {
frames: Array<string>;
frameTypes: Array<string>;
} = this.resolveStackFrames({
sample: sampleObj,
stackTable: stackTable,
locationTable: locationTable,
functionTable: functionTable,
stringTable: stringTable,
attributeTable: attributeTable,
});
// Compute stacktrace hash
const stacktraceHash: string = crypto
.createHash("sha256")
.update(resolvedStack.frames.join("|"))
.digest("hex");
// Resolve trace/span correlation from link table
let traceId: string = "";
let spanId: string = "";
const linkIndex: number =
(sampleObj["linkIndex"] as number) || 0;
if (
linkTable.length > 0 &&
linkIndex >= 0 &&
linkIndex < linkTable.length
) {
const link: JSONObject =
linkTable[linkIndex] as JSONObject;
traceId = this.convertBase64ToHexSafe(
link["traceId"] as string | undefined,
);
spanId = this.convertBase64ToHexSafe(
link["spanId"] as string | undefined,
);
}
// Extract sample value (first value from values array)
const values: Array<number | string> =
(sampleObj["value"] as Array<number | string>) || [];
let sampleValue: number = 0;
if (values.length > 0) {
sampleValue =
typeof values[0] === "string"
? parseInt(values[0]!, 10) || 0
: (values[0] as number) || 0;
}
// Extract sample timestamp
const timestamps: Array<number | string> =
(sampleObj["timestampsUnixNano"] as Array<
number | string
>) || [];
let sampleTime: ParsedUnixNano = startTime;
if (timestamps.length > 0) {
sampleTime = this.safeParseUnixNano(
timestamps[0] as string | number,
"sample timestampsUnixNano",
);
}
// Resolve sample-level labels from attribute_indices
const sampleLabels: Dictionary<string> = {};
const sampleAttributeIndices: Array<number> =
(sampleObj["attributeIndices"] as Array<number>) || [];
for (const attrIdx of sampleAttributeIndices) {
if (attrIdx >= 0 && attrIdx < attributeTable.length) {
const attr: JSONObject =
attributeTable[attrIdx] as JSONObject;
const key: string =
(attr["key"] as string) || "";
const val: JSONObject =
(attr["value"] as JSONObject) || {};
sampleLabels[key] =
(val["stringValue"] as string) ||
(val["intValue"]?.toString() as string) ||
(val["doubleValue"]?.toString() as string) ||
(val["boolValue"]?.toString() as string) ||
"";
}
}
const sampleRow: JSONObject = this.buildSampleRow({
projectId: projectId,
serviceId: serviceId,
profileId: profileId,
traceId: traceId,
spanId: spanId,
time: sampleTime,
stacktrace: resolvedStack.frames,
stacktraceHash: stacktraceHash,
frameTypes: resolvedStack.frameTypes,
value: sampleValue,
profileType: profileType,
labels: sampleLabels,
dataRetentionInDays: dataRetentionInDays,
});
dbSamples.push(sampleRow);
if (
dbSamples.length >=
TELEMETRY_PROFILE_SAMPLE_FLUSH_BATCH_SIZE
) {
await this.flushSamplesBuffer(dbSamples);
}
} catch (sampleError) {
logger.error("Error processing individual sample:");
logger.error(sampleError);
}
}
// Also extract trace/span from first link if available for the profile-level record
let profileTraceId: string = "";
let profileSpanId: string = "";
if (linkTable.length > 0) {
const firstLink: JSONObject =
linkTable[0] as JSONObject;
profileTraceId = this.convertBase64ToHexSafe(
firstLink["traceId"] as string | undefined,
);
profileSpanId = this.convertBase64ToHexSafe(
firstLink["spanId"] as string | undefined,
);
}
const profileRow: JSONObject = this.buildProfileRow({
projectId: projectId,
serviceId: serviceId,
profileId: profileId,
traceId: profileTraceId,
spanId: profileSpanId,
startTime: startTime,
endTime: endTime,
durationNano: durationNano,
profileType: profileType,
unit: unit,
periodType: periodType,
period: period,
attributes: containerAttributes,
attributeKeys: attributeKeys,
sampleCount: sampleCount,
originalPayloadFormat: originalPayloadFormat,
dataRetentionInDays: dataRetentionInDays,
});
dbProfiles.push(profileRow);
totalProfilesProcessed++;
if (
dbProfiles.length >= TELEMETRY_PROFILE_FLUSH_BATCH_SIZE
) {
await this.flushProfilesBuffer(dbProfiles);
}
}
} catch (profileError) {
logger.error("Error processing individual profile:");
logger.error(profileError);
}
}
} catch (scopeError) {
logger.error("Error processing scope profile:");
logger.error(scopeError);
}
}
} catch (resourceError) {
logger.error("Error processing resource profile:");
logger.error(resourceError);
}
}
await Promise.all([
this.flushProfilesBuffer(dbProfiles, true),
this.flushSamplesBuffer(dbSamples, true),
]);
if (totalProfilesProcessed === 0) {
logger.warn("No valid profiles were processed from the request");
return;
}
logger.debug(
`Successfully processed ${totalProfilesProcessed} profiles for project: ${(req as TelemetryRequest).projectId}`,
);
try {
dbProfiles.length = 0;
dbSamples.length = 0;
if (req.body) {
req.body = null;
}
} catch (cleanupError) {
logger.error("Error during memory cleanup:");
logger.error(cleanupError);
}
} catch (error) {
logger.error("Critical error in processProfilesAsync:");
logger.error(error);
throw error;
}
}
private static resolveStackFrames(data: {
sample: JSONObject;
stackTable: JSONArray;
locationTable: JSONArray;
functionTable: JSONArray;
stringTable: Array<string>;
attributeTable: JSONArray;
}): { frames: Array<string>; frameTypes: Array<string> } {
const frames: Array<string> = [];
const frameTypes: Array<string> = [];
// Try stack_index first (newer format)
const stackIndex: number | undefined = data.sample[
"stackIndex"
] as number | undefined;
let locationIndices: Array<number> = [];
if (
stackIndex !== undefined &&
stackIndex >= 0 &&
stackIndex < data.stackTable.length
) {
const stack: JSONObject = data.stackTable[stackIndex] as JSONObject;
locationIndices =
(stack["locationIndices"] as Array<number>) || [];
} else {
// Fall back to locations_start_index + locations_length (older format)
const startIndex: number =
(data.sample["locationsStartIndex"] as number) || 0;
const length: number =
(data.sample["locationsLength"] as number) || 0;
if (length > 0) {
for (let i: number = startIndex; i < startIndex + length; i++) {
locationIndices.push(i);
}
}
}
for (const locationIndex of locationIndices) {
if (locationIndex < 0 || locationIndex >= data.locationTable.length) {
frames.push("<unknown>");
frameTypes.push("unknown");
continue;
}
const location: JSONObject =
data.locationTable[locationIndex] as JSONObject;
const lines: JSONArray = (location["line"] as JSONArray) || [];
// Resolve frame type from location type_index
let locFrameType: string = "unknown";
const typeIndex: number | undefined = location[
"typeIndex"
] as number | undefined;
if (typeIndex !== undefined && typeIndex >= 0) {
// type_index refers to attribute_table entry with key "profile.frame.type"
if (typeIndex < data.attributeTable.length) {
const attr: JSONObject =
data.attributeTable[typeIndex] as JSONObject;
const val: JSONObject =
(attr["value"] as JSONObject) || {};
locFrameType =
(val["stringValue"] as string) || "unknown";
}
}
if (lines.length === 0) {
// No line info - use address
const address: number | string =
(location["address"] as number | string) || 0;
frames.push(`0x${address.toString(16)}`);
frameTypes.push(locFrameType);
} else {
// Handle inline frames: each line in location.lines expands to a frame
for (const lineObj of lines) {
const line: JSONObject = lineObj as JSONObject;
const functionIndex: number =
(line["functionIndex"] as number) || 0;
let functionName: string = "<unknown>";
let fileName: string = "";
let lineNumber: number = 0;
if (
functionIndex >= 0 &&
functionIndex < data.functionTable.length
) {
const func: JSONObject =
data.functionTable[functionIndex] as JSONObject;
const nameIndex: number = (func["name"] as number) || 0;
const fileIndex: number = (func["filename"] as number) || 0;
if (nameIndex >= 0 && nameIndex < data.stringTable.length) {
functionName = data.stringTable[nameIndex] || "<unknown>";
}
if (fileIndex >= 0 && fileIndex < data.stringTable.length) {
fileName = data.stringTable[fileIndex] || "";
}
}
lineNumber = (line["line"] as number) || 0;
let frame: string = functionName;
if (fileName) {
frame += `@${fileName}`;
}
if (lineNumber > 0) {
frame += `:${lineNumber}`;
}
frames.push(frame);
frameTypes.push(locFrameType);
}
}
}
return { frames, frameTypes };
}
private static buildProfileRow(data: {
projectId: ObjectID;
serviceId: ObjectID;
profileId: string;
traceId: string;
spanId: string;
startTime: ParsedUnixNano;
endTime: ParsedUnixNano;
durationNano: number;
profileType: string;
unit: string;
periodType: string;
period: number;
attributes: Dictionary<AttributeType | Array<AttributeType>>;
attributeKeys: Array<string>;
sampleCount: number;
originalPayloadFormat: string;
dataRetentionInDays: number;
}): JSONObject {
const ingestionDate: Date = OneUptimeDate.getCurrentDate();
const ingestionTimestamp: string =
OneUptimeDate.toClickhouseDateTime(ingestionDate);
const retentionDate: Date = OneUptimeDate.addRemoveDays(
ingestionDate,
data.dataRetentionInDays || 15,
);
return {
_id: ObjectID.generate().toString(),
createdAt: ingestionTimestamp,
updatedAt: ingestionTimestamp,
projectId: data.projectId.toString(),
serviceId: data.serviceId.toString(),
profileId: data.profileId,
traceId: data.traceId || "",
spanId: data.spanId || "",
startTime: OneUptimeDate.toClickhouseDateTime(data.startTime.date),
endTime: OneUptimeDate.toClickhouseDateTime(data.endTime.date),
startTimeUnixNano: data.startTime.nano,
endTimeUnixNano: data.endTime.nano,
durationNano: data.durationNano.toString(),
profileType: data.profileType,
unit: data.unit,
periodType: data.periodType || "",
period: data.period ? data.period.toString() : "0",
attributes: data.attributes,
attributeKeys: data.attributeKeys,
sampleCount: data.sampleCount,
originalPayloadFormat: data.originalPayloadFormat || "",
retentionDate: OneUptimeDate.toClickhouseDateTime(retentionDate),
};
}
private static buildSampleRow(data: {
projectId: ObjectID;
serviceId: ObjectID;
profileId: string;
traceId: string;
spanId: string;
time: ParsedUnixNano;
stacktrace: Array<string>;
stacktraceHash: string;
frameTypes: Array<string>;
value: number;
profileType: string;
labels: Dictionary<string>;
dataRetentionInDays: number;
}): JSONObject {
const ingestionDate: Date = OneUptimeDate.getCurrentDate();
const ingestionTimestamp: string =
OneUptimeDate.toClickhouseDateTime(ingestionDate);
const retentionDate: Date = OneUptimeDate.addRemoveDays(
ingestionDate,
data.dataRetentionInDays || 15,
);
return {
_id: ObjectID.generate().toString(),
createdAt: ingestionTimestamp,
updatedAt: ingestionTimestamp,
projectId: data.projectId.toString(),
serviceId: data.serviceId.toString(),
profileId: data.profileId,
traceId: data.traceId || "",
spanId: data.spanId || "",
time: OneUptimeDate.toClickhouseDateTime(data.time.date),
timeUnixNano: data.time.nano,
stacktrace: data.stacktrace,
stacktraceHash: data.stacktraceHash,
frameTypes: data.frameTypes,
value: data.value.toString(),
profileType: data.profileType,
labels: data.labels,
retentionDate: OneUptimeDate.toClickhouseDateTime(retentionDate),
};
}
private static safeParseUnixNano(
value: string | number | undefined,
context: string,
): ParsedUnixNano {
let numericValue: number = OneUptimeDate.getCurrentDateAsUnixNano();
if (value !== undefined && value !== null) {
try {
if (typeof value === "string") {
const parsed: number = Number.parseFloat(value);
if (!Number.isNaN(parsed)) {
numericValue = parsed;
} else {
throw new Error(`Invalid timestamp string: ${value}`);
}
} else if (typeof value === "number") {
if (!Number.isFinite(value)) {
throw new Error(`Invalid timestamp number: ${value}`);
}
numericValue = value;
}
} catch (error) {
logger.warn(
`Error processing ${context}: ${error instanceof Error ? error.message : String(error)}, using current time`,
);
numericValue = OneUptimeDate.getCurrentDateAsUnixNano();
}
}
numericValue = Math.trunc(numericValue);
const date: Date = OneUptimeDate.fromUnixNano(numericValue);
const iso: string = OneUptimeDate.toString(date);
return {
unixNano: numericValue,
nano: numericValue.toString(),
iso: iso,
date: date,
};
}
private static convertBase64ToHexSafe(value: string | undefined): string {
if (!value) {
return "";
}
try {
return Text.convertBase64ToHex(value);
} catch {
return "";
}
}
}

View File

@@ -0,0 +1,15 @@
import { TelemetryRequest } from "Common/Server/Middleware/TelemetryIngest";
import TelemetryQueueService, {
ProfilesIngestJobData,
} from "./TelemetryQueueService";
// Export the interface for backward compatibility
export { ProfilesIngestJobData };
export default class ProfilesQueueService {
public static async addProfileIngestJob(
req: TelemetryRequest,
): Promise<void> {
return TelemetryQueueService.addProfileIngestJob(req);
}
}

View File

@@ -9,6 +9,7 @@ export enum TelemetryType {
Logs = "logs",
Traces = "traces",
Metrics = "metrics",
Profiles = "profiles",
Syslog = "syslog",
FluentLogs = "fluentlogs",
ProbeIngest = "probe-ingest",
@@ -89,6 +90,10 @@ export interface MetricsIngestJobData extends TelemetryIngestJobData {
type: TelemetryType.Metrics;
}
export interface ProfilesIngestJobData extends TelemetryIngestJobData {
type: TelemetryType.Profiles;
}
export interface SyslogIngestJobData extends TelemetryIngestJobData {
type: TelemetryType.Syslog;
}
@@ -136,6 +141,12 @@ export default class TelemetryQueueService {
return this.addTelemetryIngestJob(req, TelemetryType.Metrics);
}
public static async addProfileIngestJob(
req: TelemetryRequest,
): Promise<void> {
return this.addTelemetryIngestJob(req, TelemetryType.Profiles);
}
public static async addFluentLogIngestJob(
req: TelemetryRequest,
): Promise<void> {

View File

@@ -0,0 +1,441 @@
import OtelProfilesIngestService from "../../Services/OtelProfilesIngestService";
import { JSONObject, JSONArray } from "Common/Types/JSON";
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const service: any = OtelProfilesIngestService as any;
describe("OtelProfilesIngestService", () => {
describe("resolveStackFrames", () => {
const baseStringTable: Array<string> = [
"", // 0 - empty
"main", // 1
"app.go", // 2
"runtime.main", // 3
"runtime/proc.go", // 4
"handleRequest", // 5
"server.go", // 6
"kernel", // 7
"native", // 8
"go", // 9
];
const baseFunctionTable: JSONArray = [
{ name: 1, systemName: 1, filename: 2, startLine: 10 }, // 0: main@app.go
{ name: 3, systemName: 3, filename: 4, startLine: 100 }, // 1: runtime.main@runtime/proc.go
{ name: 5, systemName: 5, filename: 6, startLine: 50 }, // 2: handleRequest@server.go
];
const baseLocationTable: JSONArray = [
{
mappingIndex: 0,
address: 4096,
line: [{ functionIndex: 0, line: 15, column: 0 }],
isFolded: false,
typeIndex: 0,
attributeIndices: [],
}, // 0: main@app.go:15
{
mappingIndex: 0,
address: 8192,
line: [{ functionIndex: 1, line: 120, column: 0 }],
isFolded: false,
typeIndex: 0,
attributeIndices: [],
}, // 1: runtime.main@runtime/proc.go:120
{
mappingIndex: 0,
address: 12288,
line: [{ functionIndex: 2, line: 55, column: 0 }],
isFolded: false,
typeIndex: 0,
attributeIndices: [],
}, // 2: handleRequest@server.go:55
];
const baseAttributeTable: JSONArray = [
{ key: "profile.frame.type", value: { stringValue: "go" } },
];
test("resolves simple stack via stack_table", () => {
const stackTable: JSONArray = [
{ locationIndices: [0, 1] }, // stack 0: main, runtime.main
];
const sample: JSONObject = { stackIndex: 0 };
const result: { frames: Array<string>; frameTypes: Array<string> } =
service["resolveStackFrames"]({
sample,
stackTable,
locationTable: baseLocationTable,
functionTable: baseFunctionTable,
stringTable: baseStringTable,
attributeTable: baseAttributeTable,
});
expect(result.frames).toHaveLength(2);
expect(result.frames[0]).toBe("main@app.go:15");
expect(result.frames[1]).toBe("runtime.main@runtime/proc.go:120");
expect(result.frameTypes[0]).toBe("go");
expect(result.frameTypes[1]).toBe("go");
});
test("resolves stack with three frames", () => {
const stackTable: JSONArray = [
{ locationIndices: [2, 0, 1] }, // handleRequest -> main -> runtime.main
];
const sample: JSONObject = { stackIndex: 0 };
const result: { frames: Array<string>; frameTypes: Array<string> } =
service["resolveStackFrames"]({
sample,
stackTable,
locationTable: baseLocationTable,
functionTable: baseFunctionTable,
stringTable: baseStringTable,
attributeTable: baseAttributeTable,
});
expect(result.frames).toHaveLength(3);
expect(result.frames[0]).toBe("handleRequest@server.go:55");
expect(result.frames[1]).toBe("main@app.go:15");
expect(result.frames[2]).toBe("runtime.main@runtime/proc.go:120");
});
test("handles inline frames (multiple lines per location)", () => {
const locationTableWithInline: JSONArray = [
{
mappingIndex: 0,
address: 4096,
line: [
{ functionIndex: 0, line: 15, column: 0 }, // main@app.go:15
{ functionIndex: 2, line: 55, column: 0 }, // handleRequest@server.go:55 (inlined)
],
isFolded: false,
typeIndex: 0,
attributeIndices: [],
},
];
const stackTable: JSONArray = [{ locationIndices: [0] }];
const sample: JSONObject = { stackIndex: 0 };
const result: { frames: Array<string>; frameTypes: Array<string> } =
service["resolveStackFrames"]({
sample,
stackTable,
locationTable: locationTableWithInline,
functionTable: baseFunctionTable,
stringTable: baseStringTable,
attributeTable: baseAttributeTable,
});
// Inline frames should expand into separate frames
expect(result.frames).toHaveLength(2);
expect(result.frames[0]).toBe("main@app.go:15");
expect(result.frames[1]).toBe("handleRequest@server.go:55");
});
test("handles location without line info (uses hex address)", () => {
const locationTableNoLine: JSONArray = [
{
mappingIndex: 0,
address: 65535,
line: [],
isFolded: false,
typeIndex: 0,
attributeIndices: [],
},
];
const stackTable: JSONArray = [{ locationIndices: [0] }];
const sample: JSONObject = { stackIndex: 0 };
const result: { frames: Array<string>; frameTypes: Array<string> } =
service["resolveStackFrames"]({
sample,
stackTable,
locationTable: locationTableNoLine,
functionTable: baseFunctionTable,
stringTable: baseStringTable,
attributeTable: baseAttributeTable,
});
expect(result.frames).toHaveLength(1);
expect(result.frames[0]).toBe("0xffff");
});
test("handles empty stack", () => {
const stackTable: JSONArray = [{ locationIndices: [] }];
const sample: JSONObject = { stackIndex: 0 };
const result: { frames: Array<string>; frameTypes: Array<string> } =
service["resolveStackFrames"]({
sample,
stackTable,
locationTable: baseLocationTable,
functionTable: baseFunctionTable,
stringTable: baseStringTable,
attributeTable: baseAttributeTable,
});
expect(result.frames).toHaveLength(0);
expect(result.frameTypes).toHaveLength(0);
});
test("handles out-of-bounds location index gracefully", () => {
const stackTable: JSONArray = [{ locationIndices: [999] }];
const sample: JSONObject = { stackIndex: 0 };
const result: { frames: Array<string>; frameTypes: Array<string> } =
service["resolveStackFrames"]({
sample,
stackTable,
locationTable: baseLocationTable,
functionTable: baseFunctionTable,
stringTable: baseStringTable,
attributeTable: baseAttributeTable,
});
expect(result.frames).toHaveLength(1);
expect(result.frames[0]).toBe("<unknown>");
expect(result.frameTypes[0]).toBe("unknown");
});
test("falls back to locationsStartIndex/locationsLength when no stackIndex", () => {
const sample: JSONObject = {
locationsStartIndex: 0,
locationsLength: 2,
};
const result: { frames: Array<string>; frameTypes: Array<string> } =
service["resolveStackFrames"]({
sample,
stackTable: [],
locationTable: baseLocationTable,
functionTable: baseFunctionTable,
stringTable: baseStringTable,
attributeTable: baseAttributeTable,
});
expect(result.frames).toHaveLength(2);
expect(result.frames[0]).toBe("main@app.go:15");
expect(result.frames[1]).toBe("runtime.main@runtime/proc.go:120");
});
test("handles function without filename", () => {
const stringTableNoFile: Array<string> = [
"", // 0
"anonymous", // 1
];
const functionTableNoFile: JSONArray = [
{ name: 1, systemName: 1, filename: 0, startLine: 0 }, // anonymous (no file)
];
const locationTableNoFile: JSONArray = [
{
mappingIndex: 0,
address: 4096,
line: [{ functionIndex: 0, line: 0, column: 0 }],
isFolded: false,
typeIndex: 0,
attributeIndices: [],
},
];
const stackTable: JSONArray = [{ locationIndices: [0] }];
const sample: JSONObject = { stackIndex: 0 };
const result: { frames: Array<string>; frameTypes: Array<string> } =
service["resolveStackFrames"]({
sample,
stackTable,
locationTable: locationTableNoFile,
functionTable: functionTableNoFile,
stringTable: stringTableNoFile,
attributeTable: [],
});
expect(result.frames).toHaveLength(1);
// Should just be function name without file or line
expect(result.frames[0]).toBe("anonymous");
});
});
describe("safeParseUnixNano", () => {
test("parses numeric value correctly", () => {
const nanos: number = 1700000000000000000;
const result: {
unixNano: number;
nano: string;
iso: string;
date: Date;
} = service["safeParseUnixNano"](nanos, "test");
expect(result.unixNano).toBe(nanos);
expect(result.nano).toBe(nanos.toString());
expect(result.date).toBeInstanceOf(Date);
});
test("parses string value correctly", () => {
const nanos: string = "1700000000000000000";
const result: {
unixNano: number;
nano: string;
iso: string;
date: Date;
} = service["safeParseUnixNano"](nanos, "test");
expect(result.unixNano).toBe(1700000000000000000);
expect(result.date).toBeInstanceOf(Date);
});
test("falls back to current time for undefined", () => {
const result: {
unixNano: number;
nano: string;
iso: string;
date: Date;
} = service["safeParseUnixNano"](undefined, "test");
expect(result.unixNano).toBeGreaterThan(0);
expect(result.date).toBeInstanceOf(Date);
});
test("falls back to current time for NaN string", () => {
const result: {
unixNano: number;
nano: string;
iso: string;
date: Date;
} = service["safeParseUnixNano"]("not-a-number", "test");
expect(result.unixNano).toBeGreaterThan(0);
expect(result.date).toBeInstanceOf(Date);
});
test("falls back to current time for Infinity", () => {
const result: {
unixNano: number;
nano: string;
iso: string;
date: Date;
} = service["safeParseUnixNano"](Infinity, "test");
expect(result.unixNano).toBeGreaterThan(0);
expect(result.date).toBeInstanceOf(Date);
});
});
describe("convertBase64ToHexSafe", () => {
test("returns empty string for undefined", () => {
const result: string = service["convertBase64ToHexSafe"](undefined);
expect(result).toBe("");
});
test("returns empty string for empty string", () => {
const result: string = service["convertBase64ToHexSafe"]("");
expect(result).toBe("");
});
test("converts valid base64 to hex", () => {
// "AQID" is base64 for bytes [1, 2, 3] which is hex "010203"
const result: string = service["convertBase64ToHexSafe"]("AQID");
expect(result).toBe("010203");
});
});
describe("buildProfileRow", () => {
test("builds profile row with all fields", () => {
const row: JSONObject = service["buildProfileRow"]({
projectId: { toString: () => "proj-123" },
serviceId: { toString: () => "svc-456" },
profileId: "profile-789",
traceId: "trace-abc",
spanId: "span-def",
startTime: {
unixNano: 1700000000000000000,
nano: "1700000000000000000",
iso: "2023-11-14T22:13:20.000Z",
date: new Date("2023-11-14T22:13:20.000Z"),
},
endTime: {
unixNano: 1700000001000000000,
nano: "1700000001000000000",
iso: "2023-11-14T22:13:21.000Z",
date: new Date("2023-11-14T22:13:21.000Z"),
},
durationNano: 1000000000,
profileType: "cpu",
unit: "nanoseconds",
periodType: "cpu",
period: 10000000,
attributes: { "resource.service.name": "my-service" },
attributeKeys: ["resource.service.name"],
sampleCount: 100,
originalPayloadFormat: "pprofext",
dataRetentionInDays: 15,
});
expect(row["projectId"]).toBe("proj-123");
expect(row["serviceId"]).toBe("svc-456");
expect(row["profileId"]).toBe("profile-789");
expect(row["traceId"]).toBe("trace-abc");
expect(row["spanId"]).toBe("span-def");
expect(row["profileType"]).toBe("cpu");
expect(row["unit"]).toBe("nanoseconds");
expect(row["periodType"]).toBe("cpu");
expect(row["sampleCount"]).toBe(100);
expect(row["originalPayloadFormat"]).toBe("pprofext");
expect(row["_id"]).toBeDefined();
expect(row["retentionDate"]).toBeDefined();
});
});
describe("buildSampleRow", () => {
test("builds sample row with all fields", () => {
const row: JSONObject = service["buildSampleRow"]({
projectId: { toString: () => "proj-123" },
serviceId: { toString: () => "svc-456" },
profileId: "profile-789",
traceId: "trace-abc",
spanId: "span-def",
time: {
unixNano: 1700000000000000000,
nano: "1700000000000000000",
iso: "2023-11-14T22:13:20.000Z",
date: new Date("2023-11-14T22:13:20.000Z"),
},
stacktrace: ["main@app.go:15", "runtime.main@runtime/proc.go:120"],
stacktraceHash: "abc123",
frameTypes: ["go", "go"],
value: 50000,
profileType: "cpu",
labels: { "thread.name": "main" },
dataRetentionInDays: 15,
});
expect(row["projectId"]).toBe("proj-123");
expect(row["serviceId"]).toBe("svc-456");
expect(row["profileId"]).toBe("profile-789");
expect(row["traceId"]).toBe("trace-abc");
expect(row["stacktrace"]).toEqual([
"main@app.go:15",
"runtime.main@runtime/proc.go:120",
]);
expect(row["stacktraceHash"]).toBe("abc123");
expect(row["frameTypes"]).toEqual(["go", "go"]);
expect(row["value"]).toBe("50000");
expect(row["profileType"]).toBe("cpu");
expect(row["labels"]).toEqual({ "thread.name": "main" });
expect(row["_id"]).toBeDefined();
expect(row["retentionDate"]).toBeDefined();
});
});
});