mirror of
https://github.com/OneUptime/oneuptime.git
synced 2026-04-06 00:32:12 +02:00
feat: integrate Pyroscope for performance profiling
- Added @pyroscope/nodejs dependency to package.json and package-lock.json. - Implemented Pyroscope API routes in Telemetry/Index.ts. - Created new Pyroscope API handler in Telemetry/API/Pyroscope.ts to manage profile ingestion. - Defined pprof protocol buffer schema in Telemetry/ProtoFiles/pprof/profile.proto. - Developed PyroscopeIngestService for processing and converting pprof profiles to OTLP format. - Enhanced error handling and request validation in the Pyroscope ingestion process.
This commit is contained in:
@@ -1,51 +1,11 @@
|
||||
import inspector from "inspector";
|
||||
import http from "http";
|
||||
import https from "https";
|
||||
import zlib from "zlib";
|
||||
import { URL as NodeURL } from "url";
|
||||
import Dictionary from "../../Types/Dictionary";
|
||||
import Pyroscope from "@pyroscope/nodejs";
|
||||
import {
|
||||
AppVersion,
|
||||
Env,
|
||||
DisableTelemetry,
|
||||
EnableProfiling,
|
||||
} from "../EnvironmentConfig";
|
||||
import logger from "./Logger";
|
||||
|
||||
// V8 CPU Profile types from the inspector module
|
||||
interface V8CallFrame {
|
||||
functionName: string;
|
||||
scriptId: string;
|
||||
url: string;
|
||||
lineNumber: number;
|
||||
columnNumber: number;
|
||||
}
|
||||
|
||||
interface V8CpuProfileNode {
|
||||
id: number;
|
||||
callFrame: V8CallFrame;
|
||||
hitCount: number;
|
||||
children?: Array<number>;
|
||||
}
|
||||
|
||||
interface V8CpuProfile {
|
||||
nodes: Array<V8CpuProfileNode>;
|
||||
startTime: number; // microseconds (monotonic clock)
|
||||
endTime: number; // microseconds (monotonic clock)
|
||||
samples: Array<number>; // node IDs
|
||||
timeDeltas: Array<number>; // microseconds between samples
|
||||
}
|
||||
|
||||
export default class Profiling {
|
||||
private static session: inspector.Session | null = null;
|
||||
private static intervalId: ReturnType<typeof setInterval> | null = null;
|
||||
private static serviceName: string = "";
|
||||
private static isCollecting: boolean = false;
|
||||
|
||||
// Profile every 60 seconds, sample for 10 seconds each time
|
||||
private static readonly PROFILING_INTERVAL_MS: number = 60_000;
|
||||
private static readonly PROFILING_DURATION_MS: number = 10_000;
|
||||
|
||||
public static init(data: { serviceName: string }): void {
|
||||
if (!EnableProfiling) {
|
||||
return;
|
||||
@@ -55,527 +15,87 @@ export default class Profiling {
|
||||
return;
|
||||
}
|
||||
|
||||
const endpoint: string | null = this.getOtlpProfilesEndpoint();
|
||||
const headers: Dictionary<string> = this.getHeaders();
|
||||
const serverAddress: string | undefined = this.getServerAddress();
|
||||
const authToken: string | undefined = this.getAuthToken();
|
||||
|
||||
if (!endpoint || Object.keys(headers).length === 0) {
|
||||
if (!serverAddress) {
|
||||
logger.warn(
|
||||
"Profiling enabled but OTLP endpoint or headers not configured. Skipping profiling initialization.",
|
||||
"Profiling enabled but OPENTELEMETRY_EXPORTER_OTLP_ENDPOINT not configured. Skipping profiling initialization.",
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
this.serviceName = data.serviceName;
|
||||
|
||||
try {
|
||||
this.session = new inspector.Session();
|
||||
this.session.connect();
|
||||
Pyroscope.init({
|
||||
appName: data.serviceName,
|
||||
serverAddress: serverAddress,
|
||||
authToken: authToken,
|
||||
wall: {
|
||||
collectCpuTime: true,
|
||||
},
|
||||
});
|
||||
|
||||
this.postToSession("Profiler.enable")
|
||||
.then(() => {
|
||||
logger.info(
|
||||
`CPU profiling initialized for service: ${data.serviceName}`,
|
||||
);
|
||||
this.startProfilingLoop();
|
||||
})
|
||||
.catch((err: unknown) => {
|
||||
logger.error("Failed to enable V8 profiler:");
|
||||
logger.error(err);
|
||||
});
|
||||
Pyroscope.start();
|
||||
|
||||
logger.info(
|
||||
`Profiling initialized for service: ${data.serviceName} -> ${serverAddress}`,
|
||||
);
|
||||
} catch (err) {
|
||||
logger.error("Failed to initialize profiling session:");
|
||||
logger.error("Failed to initialize profiling:");
|
||||
logger.error(err);
|
||||
}
|
||||
|
||||
process.on("SIGTERM", () => {
|
||||
this.stop();
|
||||
Pyroscope.stop().catch((err: unknown) => {
|
||||
logger.error("Error stopping profiler:");
|
||||
logger.error(err);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
public static stop(): void {
|
||||
if (this.intervalId) {
|
||||
clearInterval(this.intervalId);
|
||||
this.intervalId = null;
|
||||
}
|
||||
|
||||
if (this.session) {
|
||||
try {
|
||||
this.session.post("Profiler.disable");
|
||||
this.session.disconnect();
|
||||
} catch {
|
||||
// Ignore errors during cleanup
|
||||
}
|
||||
this.session = null;
|
||||
}
|
||||
}
|
||||
|
||||
private static getOtlpProfilesEndpoint(): string | null {
|
||||
const base: string | undefined =
|
||||
private static getServerAddress(): string | undefined {
|
||||
// Use the OTLP endpoint base URL as the Pyroscope server address.
|
||||
// The Pyroscope SDK will append /ingest to this URL.
|
||||
// The Telemetry service has a Pyroscope-compatible /ingest endpoint.
|
||||
const endpoint: string | undefined =
|
||||
process.env["OPENTELEMETRY_EXPORTER_OTLP_ENDPOINT"];
|
||||
if (!base) {
|
||||
return null;
|
||||
}
|
||||
return `${base}/v1/profiles`;
|
||||
}
|
||||
|
||||
private static getHeaders(): Dictionary<string> {
|
||||
if (!process.env["OPENTELEMETRY_EXPORTER_OTLP_HEADERS"]) {
|
||||
return {};
|
||||
}
|
||||
|
||||
const headersStrings: Array<string> =
|
||||
process.env["OPENTELEMETRY_EXPORTER_OTLP_HEADERS"].split(";");
|
||||
|
||||
const headers: Dictionary<string> = {};
|
||||
|
||||
for (const headerString of headersStrings) {
|
||||
const parts: Array<string> = headerString.split("=");
|
||||
if (parts.length === 2) {
|
||||
headers[parts[0]!.toString()] = parts[1]!.toString();
|
||||
}
|
||||
}
|
||||
|
||||
return headers;
|
||||
}
|
||||
|
||||
private static startProfilingLoop(): void {
|
||||
// Start the first collection after a short delay
|
||||
setTimeout(() => {
|
||||
this.collectAndSendProfile().catch((err: unknown) => {
|
||||
logger.error("Error in initial profile collection:");
|
||||
logger.error(err);
|
||||
});
|
||||
}, 5000);
|
||||
|
||||
this.intervalId = setInterval(() => {
|
||||
this.collectAndSendProfile().catch((err: unknown) => {
|
||||
logger.error("Error in profile collection:");
|
||||
logger.error(err);
|
||||
});
|
||||
}, this.PROFILING_INTERVAL_MS);
|
||||
}
|
||||
|
||||
private static async collectAndSendProfile(): Promise<void> {
|
||||
if (!this.session || this.isCollecting) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.isCollecting = true;
|
||||
const wallClockStartMs: number = Date.now();
|
||||
|
||||
try {
|
||||
await this.postToSession("Profiler.start");
|
||||
|
||||
await new Promise<void>((resolve: () => void) => {
|
||||
return setTimeout(resolve, this.PROFILING_DURATION_MS);
|
||||
});
|
||||
|
||||
const wallClockEndMs: number = Date.now();
|
||||
const result: unknown = await this.postToSession("Profiler.stop");
|
||||
const profile: V8CpuProfile | undefined = (
|
||||
result as { profile?: V8CpuProfile }
|
||||
)?.profile;
|
||||
|
||||
if (!profile || !profile.samples || profile.samples.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const otlpPayload: object = this.convertV8ProfileToOTLP(
|
||||
profile,
|
||||
wallClockStartMs,
|
||||
wallClockEndMs,
|
||||
);
|
||||
|
||||
await this.sendProfile(otlpPayload);
|
||||
} catch (err) {
|
||||
logger.error("Error collecting/sending profile:");
|
||||
logger.error(err);
|
||||
} finally {
|
||||
this.isCollecting = false;
|
||||
}
|
||||
}
|
||||
|
||||
private static postToSession(
|
||||
method: string,
|
||||
params?: object,
|
||||
): Promise<unknown> {
|
||||
return new Promise<unknown>(
|
||||
(resolve: (value: unknown) => void, reject: (reason: Error) => void) => {
|
||||
if (!this.session) {
|
||||
reject(new Error("Inspector session not available"));
|
||||
return;
|
||||
}
|
||||
|
||||
this.session.post(
|
||||
method,
|
||||
params || {},
|
||||
(err: Error | null, result?: object) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve(result);
|
||||
}
|
||||
},
|
||||
);
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
private static convertV8ProfileToOTLP(
|
||||
v8Profile: V8CpuProfile,
|
||||
wallClockStartMs: number,
|
||||
wallClockEndMs: number,
|
||||
): object {
|
||||
// Build node lookup and parent maps
|
||||
const nodeMap: Map<number, V8CpuProfileNode> = new Map<
|
||||
number,
|
||||
V8CpuProfileNode
|
||||
>();
|
||||
const parentMap: Map<number, number> = new Map<number, number>();
|
||||
|
||||
for (const node of v8Profile.nodes) {
|
||||
nodeMap.set(node.id, node);
|
||||
if (node.children) {
|
||||
for (const childId of node.children) {
|
||||
parentMap.set(childId, node.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// String table with deduplication
|
||||
const stringTable: Array<string> = [""];
|
||||
const stringIndexMap: Map<string, number> = new Map<string, number>();
|
||||
stringIndexMap.set("", 0);
|
||||
|
||||
const getStringIndex: (s: string) => number = (s: string): number => {
|
||||
let idx: number | undefined = stringIndexMap.get(s);
|
||||
if (idx === undefined) {
|
||||
idx = stringTable.length;
|
||||
stringTable.push(s);
|
||||
stringIndexMap.set(s, idx);
|
||||
}
|
||||
return idx;
|
||||
};
|
||||
|
||||
// Predefined string indices for sample types
|
||||
const cpuTypeIdx: number = getStringIndex("cpu");
|
||||
const nanosecondsIdx: number = getStringIndex("nanoseconds");
|
||||
const samplesTypeIdx: number = getStringIndex("samples");
|
||||
const countIdx: number = getStringIndex("count");
|
||||
|
||||
// Build function and location tables
|
||||
const functionTable: Array<{ name: number; filename: number }> = [];
|
||||
const locationTable: Array<{
|
||||
line: Array<{ functionIndex: number; line: number }>;
|
||||
}> = [];
|
||||
|
||||
const funcIndexMap: Map<string, number> = new Map<string, number>();
|
||||
const locationIndexMap: Map<string, number> = new Map<string, number>();
|
||||
|
||||
const getLocationIndex: (node: V8CpuProfileNode) => number = (
|
||||
node: V8CpuProfileNode,
|
||||
): number => {
|
||||
const locKey: string = `${node.callFrame.functionName}|${node.callFrame.url}|${node.callFrame.lineNumber}`;
|
||||
let locIdx: number | undefined = locationIndexMap.get(locKey);
|
||||
if (locIdx !== undefined) {
|
||||
return locIdx;
|
||||
}
|
||||
|
||||
// Ensure function entry exists
|
||||
const fKey: string = `${node.callFrame.functionName}|${node.callFrame.url}`;
|
||||
let fIdx: number | undefined = funcIndexMap.get(fKey);
|
||||
if (fIdx === undefined) {
|
||||
fIdx = functionTable.length;
|
||||
functionTable.push({
|
||||
name: getStringIndex(node.callFrame.functionName || "(anonymous)"),
|
||||
filename: getStringIndex(node.callFrame.url || ""),
|
||||
});
|
||||
funcIndexMap.set(fKey, fIdx);
|
||||
}
|
||||
|
||||
locIdx = locationTable.length;
|
||||
locationTable.push({
|
||||
line: [
|
||||
{
|
||||
functionIndex: fIdx,
|
||||
line: Math.max(0, node.callFrame.lineNumber + 1), // V8 uses 0-based line numbers
|
||||
},
|
||||
],
|
||||
});
|
||||
locationIndexMap.set(locKey, locIdx);
|
||||
|
||||
return locIdx;
|
||||
};
|
||||
|
||||
// Build stack table from samples
|
||||
const stackTable: Array<{ locationIndices: Array<number> }> = [];
|
||||
const stackKeyMap: Map<string, number> = new Map<string, number>();
|
||||
|
||||
const getStackIndex: (leafNodeId: number) => number = (
|
||||
leafNodeId: number,
|
||||
): number => {
|
||||
const locationIndices: Array<number> = [];
|
||||
let currentId: number | undefined = leafNodeId;
|
||||
|
||||
while (currentId !== undefined) {
|
||||
const node: V8CpuProfileNode | undefined = nodeMap.get(currentId);
|
||||
if (!node) {
|
||||
break;
|
||||
}
|
||||
|
||||
// Skip V8 internal nodes
|
||||
const fnName: string = node.callFrame.functionName;
|
||||
if (
|
||||
fnName !== "(root)" &&
|
||||
fnName !== "(program)" &&
|
||||
fnName !== "(idle)" &&
|
||||
fnName !== "(garbage collector)"
|
||||
) {
|
||||
locationIndices.push(getLocationIndex(node));
|
||||
}
|
||||
|
||||
currentId = parentMap.get(currentId);
|
||||
}
|
||||
|
||||
const key: string = locationIndices.join(",");
|
||||
let stackIdx: number | undefined = stackKeyMap.get(key);
|
||||
if (stackIdx === undefined) {
|
||||
stackIdx = stackTable.length;
|
||||
stackTable.push({ locationIndices });
|
||||
stackKeyMap.set(key, stackIdx);
|
||||
}
|
||||
|
||||
return stackIdx;
|
||||
};
|
||||
|
||||
// Use wall clock for absolute timestamps (V8 uses monotonic clock)
|
||||
const NANOS_PER_MS: bigint = BigInt(1000000);
|
||||
const NANOS_PER_US: bigint = BigInt(1000);
|
||||
const ZERO: bigint = BigInt(0);
|
||||
|
||||
const startTimeNano: bigint = BigInt(wallClockStartMs) * NANOS_PER_MS;
|
||||
const endTimeNano: bigint = BigInt(wallClockEndMs) * NANOS_PER_MS;
|
||||
|
||||
// Build sample entries
|
||||
const samples: Array<{
|
||||
stackIndex: number;
|
||||
value: Array<string>;
|
||||
timestampsUnixNano: Array<string>;
|
||||
}> = [];
|
||||
|
||||
let cumulativeDeltaNano: bigint = ZERO;
|
||||
const totalV8DurationUs: bigint = BigInt(
|
||||
v8Profile.endTime - v8Profile.startTime,
|
||||
);
|
||||
const totalWallDurationNano: bigint = endTimeNano - startTimeNano;
|
||||
|
||||
for (let i: number = 0; i < v8Profile.samples.length; i++) {
|
||||
const nodeId: number = v8Profile.samples[i]!;
|
||||
const node: V8CpuProfileNode | undefined = nodeMap.get(nodeId);
|
||||
|
||||
// Accumulate time delta
|
||||
const deltaUs: bigint = BigInt(v8Profile.timeDeltas[i] || 0);
|
||||
cumulativeDeltaNano = cumulativeDeltaNano + deltaUs * NANOS_PER_US;
|
||||
|
||||
if (!node) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Skip idle/root/program/gc samples
|
||||
const fnName: string = node.callFrame.functionName;
|
||||
if (
|
||||
fnName === "(idle)" ||
|
||||
fnName === "(root)" ||
|
||||
fnName === "(program)" ||
|
||||
fnName === "(garbage collector)"
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Map V8 monotonic time to wall clock time proportionally
|
||||
const sampleTimeNano: bigint =
|
||||
totalV8DurationUs > ZERO
|
||||
? startTimeNano +
|
||||
(cumulativeDeltaNano * totalWallDurationNano) /
|
||||
(totalV8DurationUs * NANOS_PER_US)
|
||||
: startTimeNano + cumulativeDeltaNano;
|
||||
|
||||
const timeDeltaNano: bigint = deltaUs * NANOS_PER_US;
|
||||
|
||||
const stackIndex: number = getStackIndex(nodeId);
|
||||
|
||||
samples.push({
|
||||
stackIndex,
|
||||
value: [timeDeltaNano.toString(), "1"],
|
||||
timestampsUnixNano: [sampleTimeNano.toString()],
|
||||
});
|
||||
}
|
||||
|
||||
// If no meaningful samples were collected, return an empty payload
|
||||
if (samples.length === 0) {
|
||||
return { resourceProfiles: [] };
|
||||
}
|
||||
|
||||
// Compute average sampling period in nanoseconds
|
||||
const avgPeriodNs: number =
|
||||
v8Profile.samples.length > 0
|
||||
? Math.trunc(
|
||||
((v8Profile.endTime - v8Profile.startTime) * 1000) /
|
||||
v8Profile.samples.length,
|
||||
)
|
||||
: 1_000_000; // default 1ms
|
||||
|
||||
// Generate a random profile ID (16 bytes as base64)
|
||||
const profileIdBytes: Buffer = Buffer.alloc(16);
|
||||
for (let i: number = 0; i < 16; i++) {
|
||||
profileIdBytes[i] = Math.floor(Math.random() * 256);
|
||||
}
|
||||
const profileId: string = profileIdBytes.toString("base64");
|
||||
|
||||
return {
|
||||
resourceProfiles: [
|
||||
{
|
||||
resource: {
|
||||
attributes: [
|
||||
{
|
||||
key: "service.name",
|
||||
value: { stringValue: this.serviceName },
|
||||
},
|
||||
{
|
||||
key: "service.version",
|
||||
value: { stringValue: AppVersion },
|
||||
},
|
||||
{
|
||||
key: "deployment.environment",
|
||||
value: { stringValue: Env },
|
||||
},
|
||||
],
|
||||
},
|
||||
scopeProfiles: [
|
||||
{
|
||||
scope: {
|
||||
name: "oneuptime-node-profiler",
|
||||
version: "1.0.0",
|
||||
},
|
||||
profiles: [
|
||||
{
|
||||
profileId: profileId,
|
||||
startTimeUnixNano: startTimeNano.toString(),
|
||||
endTimeUnixNano: endTimeNano.toString(),
|
||||
attributes: [
|
||||
{
|
||||
key: "profiler.name",
|
||||
value: { stringValue: "v8-cpu-profiler" },
|
||||
},
|
||||
{
|
||||
key: "runtime.name",
|
||||
value: { stringValue: "nodejs" },
|
||||
},
|
||||
{
|
||||
key: "runtime.version",
|
||||
value: { stringValue: process.version },
|
||||
},
|
||||
],
|
||||
profile: {
|
||||
stringTable,
|
||||
sampleType: [
|
||||
{ type: cpuTypeIdx, unit: nanosecondsIdx },
|
||||
{ type: samplesTypeIdx, unit: countIdx },
|
||||
],
|
||||
sample: samples,
|
||||
locationTable,
|
||||
functionTable,
|
||||
stackTable,
|
||||
linkTable: [],
|
||||
attributeTable: [],
|
||||
periodType: { type: cpuTypeIdx, unit: nanosecondsIdx },
|
||||
period: avgPeriodNs.toString(),
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
};
|
||||
}
|
||||
|
||||
private static async sendProfile(payload: object): Promise<void> {
|
||||
const endpoint: string | null = this.getOtlpProfilesEndpoint();
|
||||
if (!endpoint) {
|
||||
return;
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const resourceProfiles: Array<unknown> = (
|
||||
payload as { resourceProfiles: Array<unknown> }
|
||||
).resourceProfiles;
|
||||
if (!resourceProfiles || resourceProfiles.length === 0) {
|
||||
return;
|
||||
// Strip /otlp suffix if present, since Pyroscope SDK appends /ingest
|
||||
let baseUrl: string = endpoint;
|
||||
if (baseUrl.endsWith("/otlp")) {
|
||||
baseUrl = baseUrl.substring(0, baseUrl.length - 5);
|
||||
}
|
||||
if (baseUrl.endsWith("/")) {
|
||||
baseUrl = baseUrl.substring(0, baseUrl.length - 1);
|
||||
}
|
||||
|
||||
const headers: Dictionary<string> = this.getHeaders();
|
||||
const jsonData: string = JSON.stringify(payload);
|
||||
return baseUrl;
|
||||
}
|
||||
|
||||
const compressed: Buffer = await new Promise<Buffer>(
|
||||
(resolve: (value: Buffer) => void, reject: (reason: Error) => void) => {
|
||||
zlib.gzip(jsonData, (err: Error | null, result: Buffer) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve(result);
|
||||
}
|
||||
});
|
||||
},
|
||||
);
|
||||
private static getAuthToken(): string | undefined {
|
||||
// Extract the OneUptime token from OTLP headers
|
||||
// Format: "x-oneuptime-token=<value>;other-header=value"
|
||||
const headersStr: string | undefined =
|
||||
process.env["OPENTELEMETRY_EXPORTER_OTLP_HEADERS"];
|
||||
|
||||
const url: NodeURL = new NodeURL(endpoint);
|
||||
const isHttps: boolean = url.protocol === "https:";
|
||||
const httpModule: typeof http | typeof https = isHttps ? https : http;
|
||||
if (!headersStr) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return new Promise<void>((resolve: () => void) => {
|
||||
const req: http.ClientRequest = httpModule.request(
|
||||
{
|
||||
hostname: url.hostname,
|
||||
port: url.port || (isHttps ? 443 : 80),
|
||||
path: url.pathname,
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
"Content-Encoding": "gzip",
|
||||
...headers,
|
||||
},
|
||||
},
|
||||
(res: http.IncomingMessage) => {
|
||||
let data: string = "";
|
||||
res.on("data", (chunk: Buffer) => {
|
||||
data += chunk.toString();
|
||||
});
|
||||
res.on("end", () => {
|
||||
if (
|
||||
res.statusCode &&
|
||||
res.statusCode >= 200 &&
|
||||
res.statusCode < 300
|
||||
) {
|
||||
logger.debug(
|
||||
`Profile sent successfully for service: ${this.serviceName}`,
|
||||
);
|
||||
} else {
|
||||
logger.warn(
|
||||
`Profile export failed with status ${res.statusCode}: ${data}`,
|
||||
);
|
||||
}
|
||||
resolve();
|
||||
});
|
||||
},
|
||||
);
|
||||
const parts: Array<string> = headersStr.split(";");
|
||||
for (const part of parts) {
|
||||
const [key, value]: Array<string | undefined> = part.split("=") as Array<
|
||||
string | undefined
|
||||
>;
|
||||
if (key === "x-oneuptime-token" && value) {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
||||
req.on("error", (err: Error) => {
|
||||
logger.warn(`Profile export error: ${err.message}`);
|
||||
resolve(); // Don't throw - profiling failures should not crash the service
|
||||
});
|
||||
|
||||
req.write(compressed);
|
||||
req.end();
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
145
Common/package-lock.json
generated
145
Common/package-lock.json
generated
@@ -32,6 +32,7 @@
|
||||
"@opentelemetry/sdk-node": "^0.207.0",
|
||||
"@opentelemetry/sdk-trace-web": "^1.25.1",
|
||||
"@opentelemetry/semantic-conventions": "^1.37.0",
|
||||
"@pyroscope/nodejs": "^0.4.11",
|
||||
"@remixicon/react": "^4.2.0",
|
||||
"@simplewebauthn/server": "^13.2.2",
|
||||
"@tippyjs/react": "^4.2.6",
|
||||
@@ -989,6 +990,32 @@
|
||||
"integrity": "sha512-Duh3cign2ChvXABpjVj9Hkz5y20Zf48OE0Y50S4qBVPdhI81S4Rh4MI/bEwvwMnzHubSkiEQ+VhC5HzV8ybnpg==",
|
||||
"license": "Apache-2.0"
|
||||
},
|
||||
"node_modules/@datadog/pprof": {
|
||||
"version": "5.13.3",
|
||||
"resolved": "https://registry.npmjs.org/@datadog/pprof/-/pprof-5.13.3.tgz",
|
||||
"integrity": "sha512-G25IicP7pc5CXmAfVz7nrIERsKK9hvPz6p7xsLTUwG4Qs+Zgd5KFedKCVsnvNasLc7l7OXQ6839ajowgQLWTyw==",
|
||||
"hasInstallScript": true,
|
||||
"license": "Apache-2.0",
|
||||
"dependencies": {
|
||||
"delay": "^5.0.0",
|
||||
"node-gyp-build": "<4.0",
|
||||
"p-limit": "^3.1.0",
|
||||
"pprof-format": "^2.2.1",
|
||||
"source-map": "^0.7.4"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=16"
|
||||
}
|
||||
},
|
||||
"node_modules/@datadog/pprof/node_modules/source-map": {
|
||||
"version": "0.7.6",
|
||||
"resolved": "https://registry.npmjs.org/source-map/-/source-map-0.7.6.tgz",
|
||||
"integrity": "sha512-i5uvt8C3ikiWeNZSVZNWcfZPItFQOsYTUAOkcUPGd8DqDy1uOUikjt5dG+uRlwyvR108Fb9DOd4GvXfT0N2/uQ==",
|
||||
"license": "BSD-3-Clause",
|
||||
"engines": {
|
||||
"node": ">= 12"
|
||||
}
|
||||
},
|
||||
"node_modules/@elastic/elasticsearch": {
|
||||
"version": "8.16.1",
|
||||
"resolved": "https://registry.npmjs.org/@elastic/elasticsearch/-/elasticsearch-8.16.1.tgz",
|
||||
@@ -4472,6 +4499,87 @@
|
||||
"integrity": "sha512-Vvn3zZrhQZkkBE8LSuW3em98c0FwgO4nxzv6OdSxPKJIEKY2bGbHn+mhGIPerzI4twdxaP8/0+06HBpwf345Lw==",
|
||||
"license": "BSD-3-Clause"
|
||||
},
|
||||
"node_modules/@pyroscope/nodejs": {
|
||||
"version": "0.4.11",
|
||||
"resolved": "https://registry.npmjs.org/@pyroscope/nodejs/-/nodejs-0.4.11.tgz",
|
||||
"integrity": "sha512-hfLE72zc8toxC4UPgSPHxglHfFF9+62YqhqUC6LsK5pdaBQBKjqvIW9EFL6vp5c3lSs7ctjnLy2Rki9RCsqQ8g==",
|
||||
"license": "Apache-2.0",
|
||||
"dependencies": {
|
||||
"@datadog/pprof": "5.13.3",
|
||||
"debug": "^4.4.3",
|
||||
"p-limit": "^7.3.0",
|
||||
"regenerator-runtime": "^0.14.1",
|
||||
"source-map": "^0.7.6"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=20"
|
||||
},
|
||||
"peerDependencies": {
|
||||
"express": "^4.0.0 || ^5.0.0",
|
||||
"fastify": "^5.7.4"
|
||||
},
|
||||
"peerDependenciesMeta": {
|
||||
"express": {
|
||||
"optional": true
|
||||
},
|
||||
"fastify": {
|
||||
"optional": true
|
||||
}
|
||||
}
|
||||
},
|
||||
"node_modules/@pyroscope/nodejs/node_modules/debug": {
|
||||
"version": "4.4.3",
|
||||
"resolved": "https://registry.npmjs.org/debug/-/debug-4.4.3.tgz",
|
||||
"integrity": "sha512-RGwwWnwQvkVfavKVt22FGLw+xYSdzARwm0ru6DhTVA3umU5hZc28V3kO4stgYryrTlLpuvgI9GiijltAjNbcqA==",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"ms": "^2.1.3"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=6.0"
|
||||
},
|
||||
"peerDependenciesMeta": {
|
||||
"supports-color": {
|
||||
"optional": true
|
||||
}
|
||||
}
|
||||
},
|
||||
"node_modules/@pyroscope/nodejs/node_modules/p-limit": {
|
||||
"version": "7.3.0",
|
||||
"resolved": "https://registry.npmjs.org/p-limit/-/p-limit-7.3.0.tgz",
|
||||
"integrity": "sha512-7cIXg/Z0M5WZRblrsOla88S4wAK+zOQQWeBYfV3qJuJXMr+LnbYjaadrFaS0JILfEDPVqHyKnZ1Z/1d6J9VVUw==",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"yocto-queue": "^1.2.1"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=20"
|
||||
},
|
||||
"funding": {
|
||||
"url": "https://github.com/sponsors/sindresorhus"
|
||||
}
|
||||
},
|
||||
"node_modules/@pyroscope/nodejs/node_modules/source-map": {
|
||||
"version": "0.7.6",
|
||||
"resolved": "https://registry.npmjs.org/source-map/-/source-map-0.7.6.tgz",
|
||||
"integrity": "sha512-i5uvt8C3ikiWeNZSVZNWcfZPItFQOsYTUAOkcUPGd8DqDy1uOUikjt5dG+uRlwyvR108Fb9DOd4GvXfT0N2/uQ==",
|
||||
"license": "BSD-3-Clause",
|
||||
"engines": {
|
||||
"node": ">= 12"
|
||||
}
|
||||
},
|
||||
"node_modules/@pyroscope/nodejs/node_modules/yocto-queue": {
|
||||
"version": "1.2.2",
|
||||
"resolved": "https://registry.npmjs.org/yocto-queue/-/yocto-queue-1.2.2.tgz",
|
||||
"integrity": "sha512-4LCcse/U2MHZ63HAJVE+v71o7yOdIe4cZ70Wpf8D/IyjDKYQLV5GD46B+hSTjJsvV5PztjvHoU580EftxjDZFQ==",
|
||||
"license": "MIT",
|
||||
"engines": {
|
||||
"node": ">=12.20"
|
||||
},
|
||||
"funding": {
|
||||
"url": "https://github.com/sponsors/sindresorhus"
|
||||
}
|
||||
},
|
||||
"node_modules/@reactflow/background": {
|
||||
"version": "11.3.14",
|
||||
"resolved": "https://registry.npmjs.org/@reactflow/background/-/background-11.3.14.tgz",
|
||||
@@ -8397,6 +8505,18 @@
|
||||
"robust-predicates": "^3.0.2"
|
||||
}
|
||||
},
|
||||
"node_modules/delay": {
|
||||
"version": "5.0.0",
|
||||
"resolved": "https://registry.npmjs.org/delay/-/delay-5.0.0.tgz",
|
||||
"integrity": "sha512-ReEBKkIfe4ya47wlPYf/gu5ib6yUG0/Aez0JQZQz94kiWtRQvZIQbTiehsnwHvLSWJnQdhVeqYue7Id1dKr0qw==",
|
||||
"license": "MIT",
|
||||
"engines": {
|
||||
"node": ">=10"
|
||||
},
|
||||
"funding": {
|
||||
"url": "https://github.com/sponsors/sindresorhus"
|
||||
}
|
||||
},
|
||||
"node_modules/delayed-stream": {
|
||||
"version": "1.0.0",
|
||||
"resolved": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-1.0.0.tgz",
|
||||
@@ -14191,6 +14311,17 @@
|
||||
"node": ">= 6.13.0"
|
||||
}
|
||||
},
|
||||
"node_modules/node-gyp-build": {
|
||||
"version": "3.9.0",
|
||||
"resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-3.9.0.tgz",
|
||||
"integrity": "sha512-zLcTg6P4AbcHPq465ZMFNXx7XpKKJh+7kkN699NiQWisR2uWYOWNWqRHAmbnmKiL4e9aLSlmy5U7rEMUXV59+A==",
|
||||
"license": "MIT",
|
||||
"bin": {
|
||||
"node-gyp-build": "bin.js",
|
||||
"node-gyp-build-optional": "optional.js",
|
||||
"node-gyp-build-test": "build-test.js"
|
||||
}
|
||||
},
|
||||
"node_modules/node-gyp-build-optional-packages": {
|
||||
"version": "5.2.2",
|
||||
"resolved": "https://registry.npmjs.org/node-gyp-build-optional-packages/-/node-gyp-build-optional-packages-5.2.2.tgz",
|
||||
@@ -14423,7 +14554,6 @@
|
||||
"version": "3.1.0",
|
||||
"resolved": "https://registry.npmjs.org/p-limit/-/p-limit-3.1.0.tgz",
|
||||
"integrity": "sha512-TYOanM3wGwNGsZN2cVTYPArw454xnXj5qmWF1bEoAc4+cU/ol7GVh7odevjp1FNHduHc3KZMcFduxU5Xc6uJRQ==",
|
||||
"dev": true,
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"yocto-queue": "^0.1.0"
|
||||
@@ -14917,6 +15047,12 @@
|
||||
"web-vitals": "^4.2.4"
|
||||
}
|
||||
},
|
||||
"node_modules/pprof-format": {
|
||||
"version": "2.2.1",
|
||||
"resolved": "https://registry.npmjs.org/pprof-format/-/pprof-format-2.2.1.tgz",
|
||||
"integrity": "sha512-p4tVN7iK19ccDqQv8heyobzUmbHyds4N2FI6aBMcXz6y99MglTWDxIyhFkNaLeEXs6IFUEzT0zya0icbSLLY0g==",
|
||||
"license": "MIT"
|
||||
},
|
||||
"node_modules/preact": {
|
||||
"version": "10.24.3",
|
||||
"resolved": "https://registry.npmjs.org/preact/-/preact-10.24.3.tgz",
|
||||
@@ -16101,6 +16237,12 @@
|
||||
"url": "https://github.com/sponsors/wooorm"
|
||||
}
|
||||
},
|
||||
"node_modules/regenerator-runtime": {
|
||||
"version": "0.14.1",
|
||||
"resolved": "https://registry.npmjs.org/regenerator-runtime/-/regenerator-runtime-0.14.1.tgz",
|
||||
"integrity": "sha512-dYnhHh0nJoMfnkZs6GmmhFknAGRrLznOu5nc9ML+EJxGvrx6H7teuevqVqCuPcPK//3eDrrjQhehXVx9cnkGdw==",
|
||||
"license": "MIT"
|
||||
},
|
||||
"node_modules/regexp.prototype.flags": {
|
||||
"version": "1.5.3",
|
||||
"resolved": "https://registry.npmjs.org/regexp.prototype.flags/-/regexp.prototype.flags-1.5.3.tgz",
|
||||
@@ -19367,7 +19509,6 @@
|
||||
"version": "0.1.0",
|
||||
"resolved": "https://registry.npmjs.org/yocto-queue/-/yocto-queue-0.1.0.tgz",
|
||||
"integrity": "sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q==",
|
||||
"dev": true,
|
||||
"license": "MIT",
|
||||
"engines": {
|
||||
"node": ">=10"
|
||||
|
||||
@@ -71,6 +71,7 @@
|
||||
"@opentelemetry/sdk-node": "^0.207.0",
|
||||
"@opentelemetry/sdk-trace-web": "^1.25.1",
|
||||
"@opentelemetry/semantic-conventions": "^1.37.0",
|
||||
"@pyroscope/nodejs": "^0.4.11",
|
||||
"@remixicon/react": "^4.2.0",
|
||||
"@simplewebauthn/server": "^13.2.2",
|
||||
"@tippyjs/react": "^4.2.6",
|
||||
|
||||
76
Telemetry/API/Pyroscope.ts
Normal file
76
Telemetry/API/Pyroscope.ts
Normal file
@@ -0,0 +1,76 @@
|
||||
import TelemetryIngest, {
|
||||
TelemetryRequest,
|
||||
} from "Common/Server/Middleware/TelemetryIngest";
|
||||
import ProductType from "Common/Types/MeteredPlan/ProductType";
|
||||
import Express, {
|
||||
ExpressRequest,
|
||||
ExpressResponse,
|
||||
ExpressRouter,
|
||||
NextFunction,
|
||||
RequestHandler,
|
||||
} from "Common/Server/Utils/Express";
|
||||
import PyroscopeIngestService from "../Services/PyroscopeIngestService";
|
||||
import MultipartFormDataMiddleware from "Common/Server/Middleware/MultipartFormData";
|
||||
|
||||
const router: ExpressRouter = Express.getRouter();
|
||||
|
||||
// Set product type to Profiles for metering
|
||||
const setProfilesProductType: RequestHandler = (
|
||||
req: ExpressRequest,
|
||||
_res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): void => {
|
||||
(req as TelemetryRequest).productType = ProductType.Profiles;
|
||||
next();
|
||||
};
|
||||
|
||||
// Map Authorization: Bearer <token> to x-oneuptime-token header
|
||||
// Pyroscope SDKs use authToken which sends Authorization: Bearer
|
||||
const mapBearerTokenMiddleware: RequestHandler = (
|
||||
req: ExpressRequest,
|
||||
_res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): void => {
|
||||
if (!req.headers["x-oneuptime-token"]) {
|
||||
const authHeader: string | undefined = req.headers[
|
||||
"authorization"
|
||||
] as string;
|
||||
if (authHeader && authHeader.startsWith("Bearer ")) {
|
||||
req.headers["x-oneuptime-token"] = authHeader.substring(7);
|
||||
}
|
||||
}
|
||||
next();
|
||||
};
|
||||
|
||||
router.post(
|
||||
"/pyroscope/ingest",
|
||||
MultipartFormDataMiddleware,
|
||||
mapBearerTokenMiddleware,
|
||||
setProfilesProductType,
|
||||
TelemetryIngest.isAuthorizedServiceMiddleware,
|
||||
async (
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
return PyroscopeIngestService.ingestPyroscopeProfile(req, res, next);
|
||||
},
|
||||
);
|
||||
|
||||
// Also mount at /ingest for Pyroscope SDKs that use serverAddress without a subpath
|
||||
router.post(
|
||||
"/ingest",
|
||||
MultipartFormDataMiddleware,
|
||||
mapBearerTokenMiddleware,
|
||||
setProfilesProductType,
|
||||
TelemetryIngest.isAuthorizedServiceMiddleware,
|
||||
async (
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
return PyroscopeIngestService.ingestPyroscopeProfile(req, res, next);
|
||||
},
|
||||
);
|
||||
|
||||
export default router;
|
||||
@@ -2,6 +2,7 @@ import OTelIngestAPI from "./API/OTelIngest";
|
||||
import MetricsAPI from "./API/Metrics";
|
||||
import SyslogAPI from "./API/Syslog";
|
||||
import FluentAPI from "./API/Fluent";
|
||||
import PyroscopeAPI from "./API/Pyroscope";
|
||||
// ProbeIngest routes
|
||||
import ProbeIngestRegisterAPI from "./API/ProbeIngest/Register";
|
||||
import ProbeIngestMonitorAPI from "./API/ProbeIngest/Monitor";
|
||||
@@ -38,6 +39,7 @@ app.use(TELEMETRY_PREFIXES, OTelIngestAPI);
|
||||
app.use(TELEMETRY_PREFIXES, MetricsAPI);
|
||||
app.use(TELEMETRY_PREFIXES, SyslogAPI);
|
||||
app.use(TELEMETRY_PREFIXES, FluentAPI);
|
||||
app.use(TELEMETRY_PREFIXES, PyroscopeAPI);
|
||||
|
||||
/*
|
||||
* ProbeIngest routes under ["/probe-ingest", "/ingestor", "/"]
|
||||
|
||||
78
Telemetry/ProtoFiles/pprof/profile.proto
Normal file
78
Telemetry/ProtoFiles/pprof/profile.proto
Normal file
@@ -0,0 +1,78 @@
|
||||
// Protocol buffer definition for pprof profiles.
|
||||
// Based on https://github.com/google/pprof/blob/main/proto/profile.proto
|
||||
// Copyright 2016 Google Inc. All Rights Reserved.
|
||||
// Licensed under the Apache License, Version 2.0
|
||||
|
||||
syntax = "proto3";
|
||||
|
||||
package perftools.profiles;
|
||||
|
||||
message Profile {
|
||||
repeated ValueType sample_type = 1;
|
||||
repeated Sample sample = 2;
|
||||
repeated Mapping mapping = 3;
|
||||
repeated Location location = 4;
|
||||
repeated Function function = 5;
|
||||
repeated string string_table = 6;
|
||||
int64 drop_frames = 7;
|
||||
int64 keep_frames = 8;
|
||||
int64 time_nanos = 9;
|
||||
int64 duration_nanos = 10;
|
||||
ValueType period_type = 11;
|
||||
int64 period = 12;
|
||||
repeated int64 comment = 13;
|
||||
int64 default_sample_type = 15;
|
||||
}
|
||||
|
||||
message ValueType {
|
||||
int64 type = 1;
|
||||
int64 unit = 2;
|
||||
int64 aggregation_temporality = 3;
|
||||
}
|
||||
|
||||
message Sample {
|
||||
repeated uint64 location_id = 1;
|
||||
repeated int64 value = 2;
|
||||
repeated Label label = 3;
|
||||
}
|
||||
|
||||
message Label {
|
||||
int64 key = 1;
|
||||
int64 str = 2;
|
||||
int64 num = 3;
|
||||
int64 num_unit = 4;
|
||||
}
|
||||
|
||||
message Mapping {
|
||||
uint64 id = 1;
|
||||
uint64 memory_start = 2;
|
||||
uint64 memory_limit = 3;
|
||||
uint64 file_offset = 4;
|
||||
int64 filename = 5;
|
||||
int64 build_id = 6;
|
||||
bool has_functions = 7;
|
||||
bool has_filenames = 8;
|
||||
bool has_line_numbers = 9;
|
||||
bool has_inline_frames = 10;
|
||||
}
|
||||
|
||||
message Location {
|
||||
uint64 id = 1;
|
||||
uint64 mapping_id = 2;
|
||||
uint64 address = 3;
|
||||
repeated Line line = 4;
|
||||
bool is_folded = 5;
|
||||
}
|
||||
|
||||
message Line {
|
||||
uint64 function_id = 1;
|
||||
int64 line = 2;
|
||||
}
|
||||
|
||||
message Function {
|
||||
uint64 id = 1;
|
||||
int64 name = 2;
|
||||
int64 system_name = 3;
|
||||
int64 filename = 4;
|
||||
int64 start_line = 5;
|
||||
}
|
||||
383
Telemetry/Services/PyroscopeIngestService.ts
Normal file
383
Telemetry/Services/PyroscopeIngestService.ts
Normal file
@@ -0,0 +1,383 @@
|
||||
import { TelemetryRequest } from "Common/Server/Middleware/TelemetryIngest";
|
||||
import {
|
||||
ExpressRequest,
|
||||
ExpressResponse,
|
||||
NextFunction,
|
||||
} from "Common/Server/Utils/Express";
|
||||
import Response from "Common/Server/Utils/Response";
|
||||
import CaptureSpan from "Common/Server/Utils/Telemetry/CaptureSpan";
|
||||
import BadRequestException from "Common/Types/Exception/BadRequestException";
|
||||
import { JSONObject } from "Common/Types/JSON";
|
||||
import ObjectID from "Common/Types/ObjectID";
|
||||
import protobuf from "protobufjs";
|
||||
import zlib from "zlib";
|
||||
import ProfilesQueueService from "./Queue/ProfilesQueueService";
|
||||
|
||||
// Load pprof proto schema
|
||||
const PprofProto: protobuf.Root = protobuf.loadSync(
|
||||
"/usr/src/app/ProtoFiles/pprof/profile.proto",
|
||||
);
|
||||
const PprofProfile: protobuf.Type = PprofProto.lookupType(
|
||||
"perftools.profiles.Profile",
|
||||
);
|
||||
|
||||
// Interfaces for parsed pprof data
|
||||
interface PprofValueType {
|
||||
type: number;
|
||||
unit: number;
|
||||
}
|
||||
|
||||
interface PprofSample {
|
||||
locationId: Array<number | string>;
|
||||
value: Array<number | string>;
|
||||
label?: Array<{ key: number; str?: number; num?: number; numUnit?: number }>;
|
||||
}
|
||||
|
||||
interface PprofLocation {
|
||||
id: number | string;
|
||||
line: Array<{ functionId: number | string; line: number }>;
|
||||
address?: number | string;
|
||||
}
|
||||
|
||||
interface PprofFunction {
|
||||
id: number | string;
|
||||
name: number;
|
||||
systemName?: number;
|
||||
filename: number;
|
||||
startLine?: number;
|
||||
}
|
||||
|
||||
interface PprofProfileData {
|
||||
stringTable: Array<string>;
|
||||
sampleType: Array<PprofValueType>;
|
||||
sample: Array<PprofSample>;
|
||||
location: Array<PprofLocation>;
|
||||
function: Array<PprofFunction>;
|
||||
timeNanos: number | string;
|
||||
durationNanos: number | string;
|
||||
periodType?: PprofValueType;
|
||||
period: number | string;
|
||||
}
|
||||
|
||||
export default class PyroscopeIngestService {
|
||||
@CaptureSpan()
|
||||
public static async ingestPyroscopeProfile(
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> {
|
||||
try {
|
||||
if (!(req as TelemetryRequest).projectId) {
|
||||
throw new BadRequestException(
|
||||
"Invalid request - projectId not found in request.",
|
||||
);
|
||||
}
|
||||
|
||||
// Extract query params
|
||||
const appName: string = this.parseAppName(
|
||||
(req.query["name"] as string) || "unknown",
|
||||
);
|
||||
const fromSeconds: number = parseInt(
|
||||
(req.query["from"] as string) || "0",
|
||||
10,
|
||||
);
|
||||
const untilSeconds: number = parseInt(
|
||||
(req.query["until"] as string) || "0",
|
||||
10,
|
||||
);
|
||||
|
||||
// Extract pprof data from request
|
||||
const pprofBuffer: Buffer | null =
|
||||
this.extractPprofFromRequest(req);
|
||||
|
||||
if (!pprofBuffer || pprofBuffer.length === 0) {
|
||||
throw new BadRequestException(
|
||||
"No profile data found in request body.",
|
||||
);
|
||||
}
|
||||
|
||||
// Decompress if gzipped
|
||||
const decompressed: Buffer = await this.decompressIfNeeded(pprofBuffer);
|
||||
|
||||
// Parse pprof protobuf
|
||||
const pprofData: PprofProfileData = this.parsePprof(decompressed);
|
||||
|
||||
// Convert to OTLP profiles format
|
||||
const otlpBody: JSONObject = this.convertPprofToOTLP({
|
||||
pprofData,
|
||||
appName,
|
||||
fromSeconds,
|
||||
untilSeconds,
|
||||
});
|
||||
|
||||
// Set the converted body on the request for the queue processor
|
||||
req.body = otlpBody;
|
||||
|
||||
// Respond immediately and queue for async processing
|
||||
Response.sendEmptySuccessResponse(req, res);
|
||||
|
||||
await ProfilesQueueService.addProfileIngestJob(
|
||||
req as TelemetryRequest,
|
||||
);
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
}
|
||||
|
||||
private static parseAppName(name: string): string {
|
||||
// Pyroscope name format: "appName.profileType{label1=value1,label2=value2}"
|
||||
// Extract just the app name part (before the first '{' or '.')
|
||||
const braceIndex: number = name.indexOf("{");
|
||||
if (braceIndex >= 0) {
|
||||
name = name.substring(0, braceIndex);
|
||||
}
|
||||
|
||||
// Remove profile type suffix (e.g., ".cpu", ".wall", ".alloc_objects")
|
||||
const knownSuffixes: Array<string> = [
|
||||
".cpu",
|
||||
".wall",
|
||||
".alloc_objects",
|
||||
".alloc_space",
|
||||
".inuse_objects",
|
||||
".inuse_space",
|
||||
".goroutine",
|
||||
".mutex_count",
|
||||
".mutex_duration",
|
||||
".block_count",
|
||||
".block_duration",
|
||||
".contention",
|
||||
".itimer",
|
||||
];
|
||||
|
||||
for (const suffix of knownSuffixes) {
|
||||
if (name.endsWith(suffix)) {
|
||||
return name.substring(0, name.length - suffix.length);
|
||||
}
|
||||
}
|
||||
|
||||
return name;
|
||||
}
|
||||
|
||||
private static extractPprofFromRequest(req: ExpressRequest): Buffer | null {
|
||||
// Check multer files (multipart/form-data)
|
||||
const files: Array<{ fieldname: string; buffer: Buffer }> | undefined =
|
||||
req.files as Array<{ fieldname: string; buffer: Buffer }> | undefined;
|
||||
|
||||
if (files && files.length > 0) {
|
||||
// Find the 'profile' field
|
||||
const profileFile: { fieldname: string; buffer: Buffer } | undefined =
|
||||
files.find(
|
||||
(f: { fieldname: string; buffer: Buffer }) =>
|
||||
f.fieldname === "profile",
|
||||
);
|
||||
|
||||
if (profileFile) {
|
||||
return profileFile.buffer;
|
||||
}
|
||||
|
||||
// If no 'profile' field, use the first file
|
||||
return files[0]!.buffer;
|
||||
}
|
||||
|
||||
// Check raw body (application/octet-stream)
|
||||
if (Buffer.isBuffer(req.body)) {
|
||||
return req.body as Buffer;
|
||||
}
|
||||
|
||||
if (req.body instanceof Uint8Array) {
|
||||
return Buffer.from(req.body);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private static async decompressIfNeeded(data: Buffer): Promise<Buffer> {
|
||||
// Check for gzip magic bytes (0x1f, 0x8b)
|
||||
if (data.length >= 2 && data[0] === 0x1f && data[1] === 0x8b) {
|
||||
return new Promise<Buffer>(
|
||||
(
|
||||
resolve: (value: Buffer) => void,
|
||||
reject: (reason: Error) => void,
|
||||
) => {
|
||||
zlib.gunzip(data as unknown as Uint8Array, (err: Error | null, result: Buffer) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve(result);
|
||||
}
|
||||
});
|
||||
},
|
||||
);
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
private static parsePprof(data: Buffer): PprofProfileData {
|
||||
const message: protobuf.Message = PprofProfile.decode(
|
||||
new Uint8Array(data.buffer, data.byteOffset, data.byteLength),
|
||||
);
|
||||
const obj: Record<string, unknown> = PprofProfile.toObject(message, {
|
||||
longs: Number,
|
||||
defaults: true,
|
||||
arrays: true,
|
||||
}) as Record<string, unknown>;
|
||||
|
||||
return obj as unknown as PprofProfileData;
|
||||
}
|
||||
|
||||
@CaptureSpan()
|
||||
private static convertPprofToOTLP(data: {
|
||||
pprofData: PprofProfileData;
|
||||
appName: string;
|
||||
fromSeconds: number;
|
||||
untilSeconds: number;
|
||||
}): JSONObject {
|
||||
const { pprofData, appName, fromSeconds, untilSeconds } = data;
|
||||
|
||||
const stringTable: Array<string> = pprofData.stringTable || [];
|
||||
|
||||
// Build function ID → index map
|
||||
const functionIdToIndex: Map<string, number> = new Map<string, number>();
|
||||
const functionTable: Array<JSONObject> = [];
|
||||
|
||||
for (let i: number = 0; i < (pprofData.function || []).length; i++) {
|
||||
const fn: PprofFunction = pprofData.function[i]!;
|
||||
functionIdToIndex.set(fn.id.toString(), i);
|
||||
functionTable.push({
|
||||
name: fn.name,
|
||||
filename: fn.filename,
|
||||
});
|
||||
}
|
||||
|
||||
// Build location ID → index map
|
||||
const locationIdToIndex: Map<string, number> = new Map<string, number>();
|
||||
const locationTable: Array<JSONObject> = [];
|
||||
|
||||
for (let i: number = 0; i < (pprofData.location || []).length; i++) {
|
||||
const loc: PprofLocation = pprofData.location[i]!;
|
||||
locationIdToIndex.set(loc.id.toString(), i);
|
||||
|
||||
const lines: Array<JSONObject> = (loc.line || []).map(
|
||||
(line: { functionId: number | string; line: number }) => {
|
||||
const fnIndex: number =
|
||||
functionIdToIndex.get(line.functionId.toString()) || 0;
|
||||
return {
|
||||
functionIndex: fnIndex,
|
||||
line: line.line || 0,
|
||||
};
|
||||
},
|
||||
);
|
||||
|
||||
locationTable.push({ line: lines });
|
||||
}
|
||||
|
||||
// Build stack table and samples from pprof samples
|
||||
const stackTable: Array<JSONObject> = [];
|
||||
const stackKeyMap: Map<string, number> = new Map<string, number>();
|
||||
const otlpSamples: Array<JSONObject> = [];
|
||||
|
||||
// Compute timestamps
|
||||
const startTimeNanos: string = pprofData.timeNanos
|
||||
? pprofData.timeNanos.toString()
|
||||
: (fromSeconds * 1_000_000_000).toString();
|
||||
const endTimeNanos: string = pprofData.durationNanos
|
||||
? (
|
||||
Number(pprofData.timeNanos) + Number(pprofData.durationNanos)
|
||||
).toString()
|
||||
: (untilSeconds * 1_000_000_000).toString();
|
||||
|
||||
for (const sample of pprofData.sample || []) {
|
||||
// Convert location IDs to location indices
|
||||
const locationIndices: Array<number> = (sample.locationId || []).map(
|
||||
(locId: number | string) => {
|
||||
return locationIdToIndex.get(locId.toString()) || 0;
|
||||
},
|
||||
);
|
||||
|
||||
// Deduplicate stacks
|
||||
const stackKey: string = locationIndices.join(",");
|
||||
let stackIndex: number | undefined = stackKeyMap.get(stackKey);
|
||||
if (stackIndex === undefined) {
|
||||
stackIndex = stackTable.length;
|
||||
stackTable.push({ locationIndices });
|
||||
stackKeyMap.set(stackKey, stackIndex);
|
||||
}
|
||||
|
||||
// Convert values to strings
|
||||
const values: Array<string> = (sample.value || []).map(
|
||||
(v: number | string) => v.toString(),
|
||||
);
|
||||
|
||||
otlpSamples.push({
|
||||
stackIndex,
|
||||
value: values,
|
||||
timestampsUnixNano: [startTimeNanos],
|
||||
});
|
||||
}
|
||||
|
||||
// Build sample types
|
||||
const sampleType: Array<JSONObject> = (pprofData.sampleType || []).map(
|
||||
(st: PprofValueType) => ({
|
||||
type: st.type,
|
||||
unit: st.unit,
|
||||
}),
|
||||
);
|
||||
|
||||
// Build period type
|
||||
const periodType: JSONObject = pprofData.periodType
|
||||
? { type: pprofData.periodType.type, unit: pprofData.periodType.unit }
|
||||
: { type: 0, unit: 0 };
|
||||
|
||||
// Generate profile ID
|
||||
const profileId: string = ObjectID.generate().toString();
|
||||
const profileIdBase64: string =
|
||||
Buffer.from(profileId, "hex").toString("base64");
|
||||
|
||||
return {
|
||||
resourceProfiles: [
|
||||
{
|
||||
resource: {
|
||||
attributes: [
|
||||
{
|
||||
key: "service.name",
|
||||
value: { stringValue: appName },
|
||||
},
|
||||
{
|
||||
key: "telemetry.sdk.name",
|
||||
value: { stringValue: "pyroscope" },
|
||||
},
|
||||
],
|
||||
},
|
||||
scopeProfiles: [
|
||||
{
|
||||
scope: {
|
||||
name: "pyroscope",
|
||||
version: "1.0.0",
|
||||
},
|
||||
profiles: [
|
||||
{
|
||||
profileId: profileIdBase64,
|
||||
startTimeUnixNano: startTimeNanos,
|
||||
endTimeUnixNano: endTimeNanos,
|
||||
attributes: [],
|
||||
profile: {
|
||||
stringTable,
|
||||
sampleType,
|
||||
sample: otlpSamples,
|
||||
locationTable,
|
||||
functionTable,
|
||||
stackTable,
|
||||
linkTable: [],
|
||||
attributeTable: [],
|
||||
periodType,
|
||||
period: (pprofData.period || 0).toString(),
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
} as unknown as JSONObject;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user