From e30f2587e8066a5eba6c296208838e93a4d50390 Mon Sep 17 00:00:00 2001 From: Nawaz Dhandala Date: Tue, 31 Mar 2026 13:55:14 +0100 Subject: [PATCH] feat: integrate Pyroscope for performance profiling - Added @pyroscope/nodejs dependency to package.json and package-lock.json. - Implemented Pyroscope API routes in Telemetry/Index.ts. - Created new Pyroscope API handler in Telemetry/API/Pyroscope.ts to manage profile ingestion. - Defined pprof protocol buffer schema in Telemetry/ProtoFiles/pprof/profile.proto. - Developed PyroscopeIngestService for processing and converting pprof profiles to OTLP format. - Enhanced error handling and request validation in the Pyroscope ingestion process. --- Common/Server/Utils/Profiling.ts | 592 ++----------------- Common/package-lock.json | 145 ++++- Common/package.json | 1 + Telemetry/API/Pyroscope.ts | 76 +++ Telemetry/Index.ts | 2 + Telemetry/ProtoFiles/pprof/profile.proto | 78 +++ Telemetry/Services/PyroscopeIngestService.ts | 383 ++++++++++++ 7 files changed, 739 insertions(+), 538 deletions(-) create mode 100644 Telemetry/API/Pyroscope.ts create mode 100644 Telemetry/ProtoFiles/pprof/profile.proto create mode 100644 Telemetry/Services/PyroscopeIngestService.ts diff --git a/Common/Server/Utils/Profiling.ts b/Common/Server/Utils/Profiling.ts index bff5f0134e..8d75a956df 100644 --- a/Common/Server/Utils/Profiling.ts +++ b/Common/Server/Utils/Profiling.ts @@ -1,51 +1,11 @@ -import inspector from "inspector"; -import http from "http"; -import https from "https"; -import zlib from "zlib"; -import { URL as NodeURL } from "url"; -import Dictionary from "../../Types/Dictionary"; +import Pyroscope from "@pyroscope/nodejs"; import { - AppVersion, - Env, DisableTelemetry, EnableProfiling, } from "../EnvironmentConfig"; import logger from "./Logger"; -// V8 CPU Profile types from the inspector module -interface V8CallFrame { - functionName: string; - scriptId: string; - url: string; - lineNumber: number; - columnNumber: number; -} - -interface V8CpuProfileNode { - id: number; - callFrame: V8CallFrame; - hitCount: number; - children?: Array; -} - -interface V8CpuProfile { - nodes: Array; - startTime: number; // microseconds (monotonic clock) - endTime: number; // microseconds (monotonic clock) - samples: Array; // node IDs - timeDeltas: Array; // microseconds between samples -} - export default class Profiling { - private static session: inspector.Session | null = null; - private static intervalId: ReturnType | null = null; - private static serviceName: string = ""; - private static isCollecting: boolean = false; - - // Profile every 60 seconds, sample for 10 seconds each time - private static readonly PROFILING_INTERVAL_MS: number = 60_000; - private static readonly PROFILING_DURATION_MS: number = 10_000; - public static init(data: { serviceName: string }): void { if (!EnableProfiling) { return; @@ -55,527 +15,87 @@ export default class Profiling { return; } - const endpoint: string | null = this.getOtlpProfilesEndpoint(); - const headers: Dictionary = this.getHeaders(); + const serverAddress: string | undefined = this.getServerAddress(); + const authToken: string | undefined = this.getAuthToken(); - if (!endpoint || Object.keys(headers).length === 0) { + if (!serverAddress) { logger.warn( - "Profiling enabled but OTLP endpoint or headers not configured. Skipping profiling initialization.", + "Profiling enabled but OPENTELEMETRY_EXPORTER_OTLP_ENDPOINT not configured. Skipping profiling initialization.", ); return; } - this.serviceName = data.serviceName; - try { - this.session = new inspector.Session(); - this.session.connect(); + Pyroscope.init({ + appName: data.serviceName, + serverAddress: serverAddress, + authToken: authToken, + wall: { + collectCpuTime: true, + }, + }); - this.postToSession("Profiler.enable") - .then(() => { - logger.info( - `CPU profiling initialized for service: ${data.serviceName}`, - ); - this.startProfilingLoop(); - }) - .catch((err: unknown) => { - logger.error("Failed to enable V8 profiler:"); - logger.error(err); - }); + Pyroscope.start(); + + logger.info( + `Profiling initialized for service: ${data.serviceName} -> ${serverAddress}`, + ); } catch (err) { - logger.error("Failed to initialize profiling session:"); + logger.error("Failed to initialize profiling:"); logger.error(err); } process.on("SIGTERM", () => { - this.stop(); + Pyroscope.stop().catch((err: unknown) => { + logger.error("Error stopping profiler:"); + logger.error(err); + }); }); } - public static stop(): void { - if (this.intervalId) { - clearInterval(this.intervalId); - this.intervalId = null; - } - - if (this.session) { - try { - this.session.post("Profiler.disable"); - this.session.disconnect(); - } catch { - // Ignore errors during cleanup - } - this.session = null; - } - } - - private static getOtlpProfilesEndpoint(): string | null { - const base: string | undefined = + private static getServerAddress(): string | undefined { + // Use the OTLP endpoint base URL as the Pyroscope server address. + // The Pyroscope SDK will append /ingest to this URL. + // The Telemetry service has a Pyroscope-compatible /ingest endpoint. + const endpoint: string | undefined = process.env["OPENTELEMETRY_EXPORTER_OTLP_ENDPOINT"]; - if (!base) { - return null; - } - return `${base}/v1/profiles`; - } - private static getHeaders(): Dictionary { - if (!process.env["OPENTELEMETRY_EXPORTER_OTLP_HEADERS"]) { - return {}; - } - - const headersStrings: Array = - process.env["OPENTELEMETRY_EXPORTER_OTLP_HEADERS"].split(";"); - - const headers: Dictionary = {}; - - for (const headerString of headersStrings) { - const parts: Array = headerString.split("="); - if (parts.length === 2) { - headers[parts[0]!.toString()] = parts[1]!.toString(); - } - } - - return headers; - } - - private static startProfilingLoop(): void { - // Start the first collection after a short delay - setTimeout(() => { - this.collectAndSendProfile().catch((err: unknown) => { - logger.error("Error in initial profile collection:"); - logger.error(err); - }); - }, 5000); - - this.intervalId = setInterval(() => { - this.collectAndSendProfile().catch((err: unknown) => { - logger.error("Error in profile collection:"); - logger.error(err); - }); - }, this.PROFILING_INTERVAL_MS); - } - - private static async collectAndSendProfile(): Promise { - if (!this.session || this.isCollecting) { - return; - } - - this.isCollecting = true; - const wallClockStartMs: number = Date.now(); - - try { - await this.postToSession("Profiler.start"); - - await new Promise((resolve: () => void) => { - return setTimeout(resolve, this.PROFILING_DURATION_MS); - }); - - const wallClockEndMs: number = Date.now(); - const result: unknown = await this.postToSession("Profiler.stop"); - const profile: V8CpuProfile | undefined = ( - result as { profile?: V8CpuProfile } - )?.profile; - - if (!profile || !profile.samples || profile.samples.length === 0) { - return; - } - - const otlpPayload: object = this.convertV8ProfileToOTLP( - profile, - wallClockStartMs, - wallClockEndMs, - ); - - await this.sendProfile(otlpPayload); - } catch (err) { - logger.error("Error collecting/sending profile:"); - logger.error(err); - } finally { - this.isCollecting = false; - } - } - - private static postToSession( - method: string, - params?: object, - ): Promise { - return new Promise( - (resolve: (value: unknown) => void, reject: (reason: Error) => void) => { - if (!this.session) { - reject(new Error("Inspector session not available")); - return; - } - - this.session.post( - method, - params || {}, - (err: Error | null, result?: object) => { - if (err) { - reject(err); - } else { - resolve(result); - } - }, - ); - }, - ); - } - - private static convertV8ProfileToOTLP( - v8Profile: V8CpuProfile, - wallClockStartMs: number, - wallClockEndMs: number, - ): object { - // Build node lookup and parent maps - const nodeMap: Map = new Map< - number, - V8CpuProfileNode - >(); - const parentMap: Map = new Map(); - - for (const node of v8Profile.nodes) { - nodeMap.set(node.id, node); - if (node.children) { - for (const childId of node.children) { - parentMap.set(childId, node.id); - } - } - } - - // String table with deduplication - const stringTable: Array = [""]; - const stringIndexMap: Map = new Map(); - stringIndexMap.set("", 0); - - const getStringIndex: (s: string) => number = (s: string): number => { - let idx: number | undefined = stringIndexMap.get(s); - if (idx === undefined) { - idx = stringTable.length; - stringTable.push(s); - stringIndexMap.set(s, idx); - } - return idx; - }; - - // Predefined string indices for sample types - const cpuTypeIdx: number = getStringIndex("cpu"); - const nanosecondsIdx: number = getStringIndex("nanoseconds"); - const samplesTypeIdx: number = getStringIndex("samples"); - const countIdx: number = getStringIndex("count"); - - // Build function and location tables - const functionTable: Array<{ name: number; filename: number }> = []; - const locationTable: Array<{ - line: Array<{ functionIndex: number; line: number }>; - }> = []; - - const funcIndexMap: Map = new Map(); - const locationIndexMap: Map = new Map(); - - const getLocationIndex: (node: V8CpuProfileNode) => number = ( - node: V8CpuProfileNode, - ): number => { - const locKey: string = `${node.callFrame.functionName}|${node.callFrame.url}|${node.callFrame.lineNumber}`; - let locIdx: number | undefined = locationIndexMap.get(locKey); - if (locIdx !== undefined) { - return locIdx; - } - - // Ensure function entry exists - const fKey: string = `${node.callFrame.functionName}|${node.callFrame.url}`; - let fIdx: number | undefined = funcIndexMap.get(fKey); - if (fIdx === undefined) { - fIdx = functionTable.length; - functionTable.push({ - name: getStringIndex(node.callFrame.functionName || "(anonymous)"), - filename: getStringIndex(node.callFrame.url || ""), - }); - funcIndexMap.set(fKey, fIdx); - } - - locIdx = locationTable.length; - locationTable.push({ - line: [ - { - functionIndex: fIdx, - line: Math.max(0, node.callFrame.lineNumber + 1), // V8 uses 0-based line numbers - }, - ], - }); - locationIndexMap.set(locKey, locIdx); - - return locIdx; - }; - - // Build stack table from samples - const stackTable: Array<{ locationIndices: Array }> = []; - const stackKeyMap: Map = new Map(); - - const getStackIndex: (leafNodeId: number) => number = ( - leafNodeId: number, - ): number => { - const locationIndices: Array = []; - let currentId: number | undefined = leafNodeId; - - while (currentId !== undefined) { - const node: V8CpuProfileNode | undefined = nodeMap.get(currentId); - if (!node) { - break; - } - - // Skip V8 internal nodes - const fnName: string = node.callFrame.functionName; - if ( - fnName !== "(root)" && - fnName !== "(program)" && - fnName !== "(idle)" && - fnName !== "(garbage collector)" - ) { - locationIndices.push(getLocationIndex(node)); - } - - currentId = parentMap.get(currentId); - } - - const key: string = locationIndices.join(","); - let stackIdx: number | undefined = stackKeyMap.get(key); - if (stackIdx === undefined) { - stackIdx = stackTable.length; - stackTable.push({ locationIndices }); - stackKeyMap.set(key, stackIdx); - } - - return stackIdx; - }; - - // Use wall clock for absolute timestamps (V8 uses monotonic clock) - const NANOS_PER_MS: bigint = BigInt(1000000); - const NANOS_PER_US: bigint = BigInt(1000); - const ZERO: bigint = BigInt(0); - - const startTimeNano: bigint = BigInt(wallClockStartMs) * NANOS_PER_MS; - const endTimeNano: bigint = BigInt(wallClockEndMs) * NANOS_PER_MS; - - // Build sample entries - const samples: Array<{ - stackIndex: number; - value: Array; - timestampsUnixNano: Array; - }> = []; - - let cumulativeDeltaNano: bigint = ZERO; - const totalV8DurationUs: bigint = BigInt( - v8Profile.endTime - v8Profile.startTime, - ); - const totalWallDurationNano: bigint = endTimeNano - startTimeNano; - - for (let i: number = 0; i < v8Profile.samples.length; i++) { - const nodeId: number = v8Profile.samples[i]!; - const node: V8CpuProfileNode | undefined = nodeMap.get(nodeId); - - // Accumulate time delta - const deltaUs: bigint = BigInt(v8Profile.timeDeltas[i] || 0); - cumulativeDeltaNano = cumulativeDeltaNano + deltaUs * NANOS_PER_US; - - if (!node) { - continue; - } - - // Skip idle/root/program/gc samples - const fnName: string = node.callFrame.functionName; - if ( - fnName === "(idle)" || - fnName === "(root)" || - fnName === "(program)" || - fnName === "(garbage collector)" - ) { - continue; - } - - // Map V8 monotonic time to wall clock time proportionally - const sampleTimeNano: bigint = - totalV8DurationUs > ZERO - ? startTimeNano + - (cumulativeDeltaNano * totalWallDurationNano) / - (totalV8DurationUs * NANOS_PER_US) - : startTimeNano + cumulativeDeltaNano; - - const timeDeltaNano: bigint = deltaUs * NANOS_PER_US; - - const stackIndex: number = getStackIndex(nodeId); - - samples.push({ - stackIndex, - value: [timeDeltaNano.toString(), "1"], - timestampsUnixNano: [sampleTimeNano.toString()], - }); - } - - // If no meaningful samples were collected, return an empty payload - if (samples.length === 0) { - return { resourceProfiles: [] }; - } - - // Compute average sampling period in nanoseconds - const avgPeriodNs: number = - v8Profile.samples.length > 0 - ? Math.trunc( - ((v8Profile.endTime - v8Profile.startTime) * 1000) / - v8Profile.samples.length, - ) - : 1_000_000; // default 1ms - - // Generate a random profile ID (16 bytes as base64) - const profileIdBytes: Buffer = Buffer.alloc(16); - for (let i: number = 0; i < 16; i++) { - profileIdBytes[i] = Math.floor(Math.random() * 256); - } - const profileId: string = profileIdBytes.toString("base64"); - - return { - resourceProfiles: [ - { - resource: { - attributes: [ - { - key: "service.name", - value: { stringValue: this.serviceName }, - }, - { - key: "service.version", - value: { stringValue: AppVersion }, - }, - { - key: "deployment.environment", - value: { stringValue: Env }, - }, - ], - }, - scopeProfiles: [ - { - scope: { - name: "oneuptime-node-profiler", - version: "1.0.0", - }, - profiles: [ - { - profileId: profileId, - startTimeUnixNano: startTimeNano.toString(), - endTimeUnixNano: endTimeNano.toString(), - attributes: [ - { - key: "profiler.name", - value: { stringValue: "v8-cpu-profiler" }, - }, - { - key: "runtime.name", - value: { stringValue: "nodejs" }, - }, - { - key: "runtime.version", - value: { stringValue: process.version }, - }, - ], - profile: { - stringTable, - sampleType: [ - { type: cpuTypeIdx, unit: nanosecondsIdx }, - { type: samplesTypeIdx, unit: countIdx }, - ], - sample: samples, - locationTable, - functionTable, - stackTable, - linkTable: [], - attributeTable: [], - periodType: { type: cpuTypeIdx, unit: nanosecondsIdx }, - period: avgPeriodNs.toString(), - }, - }, - ], - }, - ], - }, - ], - }; - } - - private static async sendProfile(payload: object): Promise { - const endpoint: string | null = this.getOtlpProfilesEndpoint(); if (!endpoint) { - return; + return undefined; } - const resourceProfiles: Array = ( - payload as { resourceProfiles: Array } - ).resourceProfiles; - if (!resourceProfiles || resourceProfiles.length === 0) { - return; + // Strip /otlp suffix if present, since Pyroscope SDK appends /ingest + let baseUrl: string = endpoint; + if (baseUrl.endsWith("/otlp")) { + baseUrl = baseUrl.substring(0, baseUrl.length - 5); + } + if (baseUrl.endsWith("/")) { + baseUrl = baseUrl.substring(0, baseUrl.length - 1); } - const headers: Dictionary = this.getHeaders(); - const jsonData: string = JSON.stringify(payload); + return baseUrl; + } - const compressed: Buffer = await new Promise( - (resolve: (value: Buffer) => void, reject: (reason: Error) => void) => { - zlib.gzip(jsonData, (err: Error | null, result: Buffer) => { - if (err) { - reject(err); - } else { - resolve(result); - } - }); - }, - ); + private static getAuthToken(): string | undefined { + // Extract the OneUptime token from OTLP headers + // Format: "x-oneuptime-token=;other-header=value" + const headersStr: string | undefined = + process.env["OPENTELEMETRY_EXPORTER_OTLP_HEADERS"]; - const url: NodeURL = new NodeURL(endpoint); - const isHttps: boolean = url.protocol === "https:"; - const httpModule: typeof http | typeof https = isHttps ? https : http; + if (!headersStr) { + return undefined; + } - return new Promise((resolve: () => void) => { - const req: http.ClientRequest = httpModule.request( - { - hostname: url.hostname, - port: url.port || (isHttps ? 443 : 80), - path: url.pathname, - method: "POST", - headers: { - "Content-Type": "application/json", - "Content-Encoding": "gzip", - ...headers, - }, - }, - (res: http.IncomingMessage) => { - let data: string = ""; - res.on("data", (chunk: Buffer) => { - data += chunk.toString(); - }); - res.on("end", () => { - if ( - res.statusCode && - res.statusCode >= 200 && - res.statusCode < 300 - ) { - logger.debug( - `Profile sent successfully for service: ${this.serviceName}`, - ); - } else { - logger.warn( - `Profile export failed with status ${res.statusCode}: ${data}`, - ); - } - resolve(); - }); - }, - ); + const parts: Array = headersStr.split(";"); + for (const part of parts) { + const [key, value]: Array = part.split("=") as Array< + string | undefined + >; + if (key === "x-oneuptime-token" && value) { + return value; + } + } - req.on("error", (err: Error) => { - logger.warn(`Profile export error: ${err.message}`); - resolve(); // Don't throw - profiling failures should not crash the service - }); - - req.write(compressed); - req.end(); - }); + return undefined; } } diff --git a/Common/package-lock.json b/Common/package-lock.json index 3b0d938575..764e0e663d 100644 --- a/Common/package-lock.json +++ b/Common/package-lock.json @@ -32,6 +32,7 @@ "@opentelemetry/sdk-node": "^0.207.0", "@opentelemetry/sdk-trace-web": "^1.25.1", "@opentelemetry/semantic-conventions": "^1.37.0", + "@pyroscope/nodejs": "^0.4.11", "@remixicon/react": "^4.2.0", "@simplewebauthn/server": "^13.2.2", "@tippyjs/react": "^4.2.6", @@ -989,6 +990,32 @@ "integrity": "sha512-Duh3cign2ChvXABpjVj9Hkz5y20Zf48OE0Y50S4qBVPdhI81S4Rh4MI/bEwvwMnzHubSkiEQ+VhC5HzV8ybnpg==", "license": "Apache-2.0" }, + "node_modules/@datadog/pprof": { + "version": "5.13.3", + "resolved": "https://registry.npmjs.org/@datadog/pprof/-/pprof-5.13.3.tgz", + "integrity": "sha512-G25IicP7pc5CXmAfVz7nrIERsKK9hvPz6p7xsLTUwG4Qs+Zgd5KFedKCVsnvNasLc7l7OXQ6839ajowgQLWTyw==", + "hasInstallScript": true, + "license": "Apache-2.0", + "dependencies": { + "delay": "^5.0.0", + "node-gyp-build": "<4.0", + "p-limit": "^3.1.0", + "pprof-format": "^2.2.1", + "source-map": "^0.7.4" + }, + "engines": { + "node": ">=16" + } + }, + "node_modules/@datadog/pprof/node_modules/source-map": { + "version": "0.7.6", + "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.7.6.tgz", + "integrity": "sha512-i5uvt8C3ikiWeNZSVZNWcfZPItFQOsYTUAOkcUPGd8DqDy1uOUikjt5dG+uRlwyvR108Fb9DOd4GvXfT0N2/uQ==", + "license": "BSD-3-Clause", + "engines": { + "node": ">= 12" + } + }, "node_modules/@elastic/elasticsearch": { "version": "8.16.1", "resolved": "https://registry.npmjs.org/@elastic/elasticsearch/-/elasticsearch-8.16.1.tgz", @@ -4472,6 +4499,87 @@ "integrity": "sha512-Vvn3zZrhQZkkBE8LSuW3em98c0FwgO4nxzv6OdSxPKJIEKY2bGbHn+mhGIPerzI4twdxaP8/0+06HBpwf345Lw==", "license": "BSD-3-Clause" }, + "node_modules/@pyroscope/nodejs": { + "version": "0.4.11", + "resolved": "https://registry.npmjs.org/@pyroscope/nodejs/-/nodejs-0.4.11.tgz", + "integrity": "sha512-hfLE72zc8toxC4UPgSPHxglHfFF9+62YqhqUC6LsK5pdaBQBKjqvIW9EFL6vp5c3lSs7ctjnLy2Rki9RCsqQ8g==", + "license": "Apache-2.0", + "dependencies": { + "@datadog/pprof": "5.13.3", + "debug": "^4.4.3", + "p-limit": "^7.3.0", + "regenerator-runtime": "^0.14.1", + "source-map": "^0.7.6" + }, + "engines": { + "node": ">=20" + }, + "peerDependencies": { + "express": "^4.0.0 || ^5.0.0", + "fastify": "^5.7.4" + }, + "peerDependenciesMeta": { + "express": { + "optional": true + }, + "fastify": { + "optional": true + } + } + }, + "node_modules/@pyroscope/nodejs/node_modules/debug": { + "version": "4.4.3", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.4.3.tgz", + "integrity": "sha512-RGwwWnwQvkVfavKVt22FGLw+xYSdzARwm0ru6DhTVA3umU5hZc28V3kO4stgYryrTlLpuvgI9GiijltAjNbcqA==", + "license": "MIT", + "dependencies": { + "ms": "^2.1.3" + }, + "engines": { + "node": ">=6.0" + }, + "peerDependenciesMeta": { + "supports-color": { + "optional": true + } + } + }, + "node_modules/@pyroscope/nodejs/node_modules/p-limit": { + "version": "7.3.0", + "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-7.3.0.tgz", + "integrity": "sha512-7cIXg/Z0M5WZRblrsOla88S4wAK+zOQQWeBYfV3qJuJXMr+LnbYjaadrFaS0JILfEDPVqHyKnZ1Z/1d6J9VVUw==", + "license": "MIT", + "dependencies": { + "yocto-queue": "^1.2.1" + }, + "engines": { + "node": ">=20" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/@pyroscope/nodejs/node_modules/source-map": { + "version": "0.7.6", + "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.7.6.tgz", + "integrity": "sha512-i5uvt8C3ikiWeNZSVZNWcfZPItFQOsYTUAOkcUPGd8DqDy1uOUikjt5dG+uRlwyvR108Fb9DOd4GvXfT0N2/uQ==", + "license": "BSD-3-Clause", + "engines": { + "node": ">= 12" + } + }, + "node_modules/@pyroscope/nodejs/node_modules/yocto-queue": { + "version": "1.2.2", + "resolved": "https://registry.npmjs.org/yocto-queue/-/yocto-queue-1.2.2.tgz", + "integrity": "sha512-4LCcse/U2MHZ63HAJVE+v71o7yOdIe4cZ70Wpf8D/IyjDKYQLV5GD46B+hSTjJsvV5PztjvHoU580EftxjDZFQ==", + "license": "MIT", + "engines": { + "node": ">=12.20" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/@reactflow/background": { "version": "11.3.14", "resolved": "https://registry.npmjs.org/@reactflow/background/-/background-11.3.14.tgz", @@ -8397,6 +8505,18 @@ "robust-predicates": "^3.0.2" } }, + "node_modules/delay": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/delay/-/delay-5.0.0.tgz", + "integrity": "sha512-ReEBKkIfe4ya47wlPYf/gu5ib6yUG0/Aez0JQZQz94kiWtRQvZIQbTiehsnwHvLSWJnQdhVeqYue7Id1dKr0qw==", + "license": "MIT", + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/delayed-stream": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-1.0.0.tgz", @@ -14191,6 +14311,17 @@ "node": ">= 6.13.0" } }, + "node_modules/node-gyp-build": { + "version": "3.9.0", + "resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-3.9.0.tgz", + "integrity": "sha512-zLcTg6P4AbcHPq465ZMFNXx7XpKKJh+7kkN699NiQWisR2uWYOWNWqRHAmbnmKiL4e9aLSlmy5U7rEMUXV59+A==", + "license": "MIT", + "bin": { + "node-gyp-build": "bin.js", + "node-gyp-build-optional": "optional.js", + "node-gyp-build-test": "build-test.js" + } + }, "node_modules/node-gyp-build-optional-packages": { "version": "5.2.2", "resolved": "https://registry.npmjs.org/node-gyp-build-optional-packages/-/node-gyp-build-optional-packages-5.2.2.tgz", @@ -14423,7 +14554,6 @@ "version": "3.1.0", "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-3.1.0.tgz", "integrity": "sha512-TYOanM3wGwNGsZN2cVTYPArw454xnXj5qmWF1bEoAc4+cU/ol7GVh7odevjp1FNHduHc3KZMcFduxU5Xc6uJRQ==", - "dev": true, "license": "MIT", "dependencies": { "yocto-queue": "^0.1.0" @@ -14917,6 +15047,12 @@ "web-vitals": "^4.2.4" } }, + "node_modules/pprof-format": { + "version": "2.2.1", + "resolved": "https://registry.npmjs.org/pprof-format/-/pprof-format-2.2.1.tgz", + "integrity": "sha512-p4tVN7iK19ccDqQv8heyobzUmbHyds4N2FI6aBMcXz6y99MglTWDxIyhFkNaLeEXs6IFUEzT0zya0icbSLLY0g==", + "license": "MIT" + }, "node_modules/preact": { "version": "10.24.3", "resolved": "https://registry.npmjs.org/preact/-/preact-10.24.3.tgz", @@ -16101,6 +16237,12 @@ "url": "https://github.com/sponsors/wooorm" } }, + "node_modules/regenerator-runtime": { + "version": "0.14.1", + "resolved": "https://registry.npmjs.org/regenerator-runtime/-/regenerator-runtime-0.14.1.tgz", + "integrity": "sha512-dYnhHh0nJoMfnkZs6GmmhFknAGRrLznOu5nc9ML+EJxGvrx6H7teuevqVqCuPcPK//3eDrrjQhehXVx9cnkGdw==", + "license": "MIT" + }, "node_modules/regexp.prototype.flags": { "version": "1.5.3", "resolved": "https://registry.npmjs.org/regexp.prototype.flags/-/regexp.prototype.flags-1.5.3.tgz", @@ -19367,7 +19509,6 @@ "version": "0.1.0", "resolved": "https://registry.npmjs.org/yocto-queue/-/yocto-queue-0.1.0.tgz", "integrity": "sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q==", - "dev": true, "license": "MIT", "engines": { "node": ">=10" diff --git a/Common/package.json b/Common/package.json index f908c07fc9..cf132ee6c5 100644 --- a/Common/package.json +++ b/Common/package.json @@ -71,6 +71,7 @@ "@opentelemetry/sdk-node": "^0.207.0", "@opentelemetry/sdk-trace-web": "^1.25.1", "@opentelemetry/semantic-conventions": "^1.37.0", + "@pyroscope/nodejs": "^0.4.11", "@remixicon/react": "^4.2.0", "@simplewebauthn/server": "^13.2.2", "@tippyjs/react": "^4.2.6", diff --git a/Telemetry/API/Pyroscope.ts b/Telemetry/API/Pyroscope.ts new file mode 100644 index 0000000000..77ce0a61c5 --- /dev/null +++ b/Telemetry/API/Pyroscope.ts @@ -0,0 +1,76 @@ +import TelemetryIngest, { + TelemetryRequest, +} from "Common/Server/Middleware/TelemetryIngest"; +import ProductType from "Common/Types/MeteredPlan/ProductType"; +import Express, { + ExpressRequest, + ExpressResponse, + ExpressRouter, + NextFunction, + RequestHandler, +} from "Common/Server/Utils/Express"; +import PyroscopeIngestService from "../Services/PyroscopeIngestService"; +import MultipartFormDataMiddleware from "Common/Server/Middleware/MultipartFormData"; + +const router: ExpressRouter = Express.getRouter(); + +// Set product type to Profiles for metering +const setProfilesProductType: RequestHandler = ( + req: ExpressRequest, + _res: ExpressResponse, + next: NextFunction, +): void => { + (req as TelemetryRequest).productType = ProductType.Profiles; + next(); +}; + +// Map Authorization: Bearer to x-oneuptime-token header +// Pyroscope SDKs use authToken which sends Authorization: Bearer +const mapBearerTokenMiddleware: RequestHandler = ( + req: ExpressRequest, + _res: ExpressResponse, + next: NextFunction, +): void => { + if (!req.headers["x-oneuptime-token"]) { + const authHeader: string | undefined = req.headers[ + "authorization" + ] as string; + if (authHeader && authHeader.startsWith("Bearer ")) { + req.headers["x-oneuptime-token"] = authHeader.substring(7); + } + } + next(); +}; + +router.post( + "/pyroscope/ingest", + MultipartFormDataMiddleware, + mapBearerTokenMiddleware, + setProfilesProductType, + TelemetryIngest.isAuthorizedServiceMiddleware, + async ( + req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise => { + return PyroscopeIngestService.ingestPyroscopeProfile(req, res, next); + }, +); + +// Also mount at /ingest for Pyroscope SDKs that use serverAddress without a subpath +router.post( + "/ingest", + MultipartFormDataMiddleware, + mapBearerTokenMiddleware, + setProfilesProductType, + TelemetryIngest.isAuthorizedServiceMiddleware, + async ( + req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise => { + return PyroscopeIngestService.ingestPyroscopeProfile(req, res, next); + }, +); + +export default router; diff --git a/Telemetry/Index.ts b/Telemetry/Index.ts index eaa58c0cb8..43f2231440 100644 --- a/Telemetry/Index.ts +++ b/Telemetry/Index.ts @@ -2,6 +2,7 @@ import OTelIngestAPI from "./API/OTelIngest"; import MetricsAPI from "./API/Metrics"; import SyslogAPI from "./API/Syslog"; import FluentAPI from "./API/Fluent"; +import PyroscopeAPI from "./API/Pyroscope"; // ProbeIngest routes import ProbeIngestRegisterAPI from "./API/ProbeIngest/Register"; import ProbeIngestMonitorAPI from "./API/ProbeIngest/Monitor"; @@ -38,6 +39,7 @@ app.use(TELEMETRY_PREFIXES, OTelIngestAPI); app.use(TELEMETRY_PREFIXES, MetricsAPI); app.use(TELEMETRY_PREFIXES, SyslogAPI); app.use(TELEMETRY_PREFIXES, FluentAPI); +app.use(TELEMETRY_PREFIXES, PyroscopeAPI); /* * ProbeIngest routes under ["/probe-ingest", "/ingestor", "/"] diff --git a/Telemetry/ProtoFiles/pprof/profile.proto b/Telemetry/ProtoFiles/pprof/profile.proto new file mode 100644 index 0000000000..1936c77d2b --- /dev/null +++ b/Telemetry/ProtoFiles/pprof/profile.proto @@ -0,0 +1,78 @@ +// Protocol buffer definition for pprof profiles. +// Based on https://github.com/google/pprof/blob/main/proto/profile.proto +// Copyright 2016 Google Inc. All Rights Reserved. +// Licensed under the Apache License, Version 2.0 + +syntax = "proto3"; + +package perftools.profiles; + +message Profile { + repeated ValueType sample_type = 1; + repeated Sample sample = 2; + repeated Mapping mapping = 3; + repeated Location location = 4; + repeated Function function = 5; + repeated string string_table = 6; + int64 drop_frames = 7; + int64 keep_frames = 8; + int64 time_nanos = 9; + int64 duration_nanos = 10; + ValueType period_type = 11; + int64 period = 12; + repeated int64 comment = 13; + int64 default_sample_type = 15; +} + +message ValueType { + int64 type = 1; + int64 unit = 2; + int64 aggregation_temporality = 3; +} + +message Sample { + repeated uint64 location_id = 1; + repeated int64 value = 2; + repeated Label label = 3; +} + +message Label { + int64 key = 1; + int64 str = 2; + int64 num = 3; + int64 num_unit = 4; +} + +message Mapping { + uint64 id = 1; + uint64 memory_start = 2; + uint64 memory_limit = 3; + uint64 file_offset = 4; + int64 filename = 5; + int64 build_id = 6; + bool has_functions = 7; + bool has_filenames = 8; + bool has_line_numbers = 9; + bool has_inline_frames = 10; +} + +message Location { + uint64 id = 1; + uint64 mapping_id = 2; + uint64 address = 3; + repeated Line line = 4; + bool is_folded = 5; +} + +message Line { + uint64 function_id = 1; + int64 line = 2; +} + +message Function { + uint64 id = 1; + int64 name = 2; + int64 system_name = 3; + int64 filename = 4; + int64 start_line = 5; +} diff --git a/Telemetry/Services/PyroscopeIngestService.ts b/Telemetry/Services/PyroscopeIngestService.ts new file mode 100644 index 0000000000..ec974a9d67 --- /dev/null +++ b/Telemetry/Services/PyroscopeIngestService.ts @@ -0,0 +1,383 @@ +import { TelemetryRequest } from "Common/Server/Middleware/TelemetryIngest"; +import { + ExpressRequest, + ExpressResponse, + NextFunction, +} from "Common/Server/Utils/Express"; +import Response from "Common/Server/Utils/Response"; +import CaptureSpan from "Common/Server/Utils/Telemetry/CaptureSpan"; +import BadRequestException from "Common/Types/Exception/BadRequestException"; +import { JSONObject } from "Common/Types/JSON"; +import ObjectID from "Common/Types/ObjectID"; +import protobuf from "protobufjs"; +import zlib from "zlib"; +import ProfilesQueueService from "./Queue/ProfilesQueueService"; + +// Load pprof proto schema +const PprofProto: protobuf.Root = protobuf.loadSync( + "/usr/src/app/ProtoFiles/pprof/profile.proto", +); +const PprofProfile: protobuf.Type = PprofProto.lookupType( + "perftools.profiles.Profile", +); + +// Interfaces for parsed pprof data +interface PprofValueType { + type: number; + unit: number; +} + +interface PprofSample { + locationId: Array; + value: Array; + label?: Array<{ key: number; str?: number; num?: number; numUnit?: number }>; +} + +interface PprofLocation { + id: number | string; + line: Array<{ functionId: number | string; line: number }>; + address?: number | string; +} + +interface PprofFunction { + id: number | string; + name: number; + systemName?: number; + filename: number; + startLine?: number; +} + +interface PprofProfileData { + stringTable: Array; + sampleType: Array; + sample: Array; + location: Array; + function: Array; + timeNanos: number | string; + durationNanos: number | string; + periodType?: PprofValueType; + period: number | string; +} + +export default class PyroscopeIngestService { + @CaptureSpan() + public static async ingestPyroscopeProfile( + req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise { + try { + if (!(req as TelemetryRequest).projectId) { + throw new BadRequestException( + "Invalid request - projectId not found in request.", + ); + } + + // Extract query params + const appName: string = this.parseAppName( + (req.query["name"] as string) || "unknown", + ); + const fromSeconds: number = parseInt( + (req.query["from"] as string) || "0", + 10, + ); + const untilSeconds: number = parseInt( + (req.query["until"] as string) || "0", + 10, + ); + + // Extract pprof data from request + const pprofBuffer: Buffer | null = + this.extractPprofFromRequest(req); + + if (!pprofBuffer || pprofBuffer.length === 0) { + throw new BadRequestException( + "No profile data found in request body.", + ); + } + + // Decompress if gzipped + const decompressed: Buffer = await this.decompressIfNeeded(pprofBuffer); + + // Parse pprof protobuf + const pprofData: PprofProfileData = this.parsePprof(decompressed); + + // Convert to OTLP profiles format + const otlpBody: JSONObject = this.convertPprofToOTLP({ + pprofData, + appName, + fromSeconds, + untilSeconds, + }); + + // Set the converted body on the request for the queue processor + req.body = otlpBody; + + // Respond immediately and queue for async processing + Response.sendEmptySuccessResponse(req, res); + + await ProfilesQueueService.addProfileIngestJob( + req as TelemetryRequest, + ); + } catch (err) { + return next(err); + } + } + + private static parseAppName(name: string): string { + // Pyroscope name format: "appName.profileType{label1=value1,label2=value2}" + // Extract just the app name part (before the first '{' or '.') + const braceIndex: number = name.indexOf("{"); + if (braceIndex >= 0) { + name = name.substring(0, braceIndex); + } + + // Remove profile type suffix (e.g., ".cpu", ".wall", ".alloc_objects") + const knownSuffixes: Array = [ + ".cpu", + ".wall", + ".alloc_objects", + ".alloc_space", + ".inuse_objects", + ".inuse_space", + ".goroutine", + ".mutex_count", + ".mutex_duration", + ".block_count", + ".block_duration", + ".contention", + ".itimer", + ]; + + for (const suffix of knownSuffixes) { + if (name.endsWith(suffix)) { + return name.substring(0, name.length - suffix.length); + } + } + + return name; + } + + private static extractPprofFromRequest(req: ExpressRequest): Buffer | null { + // Check multer files (multipart/form-data) + const files: Array<{ fieldname: string; buffer: Buffer }> | undefined = + req.files as Array<{ fieldname: string; buffer: Buffer }> | undefined; + + if (files && files.length > 0) { + // Find the 'profile' field + const profileFile: { fieldname: string; buffer: Buffer } | undefined = + files.find( + (f: { fieldname: string; buffer: Buffer }) => + f.fieldname === "profile", + ); + + if (profileFile) { + return profileFile.buffer; + } + + // If no 'profile' field, use the first file + return files[0]!.buffer; + } + + // Check raw body (application/octet-stream) + if (Buffer.isBuffer(req.body)) { + return req.body as Buffer; + } + + if (req.body instanceof Uint8Array) { + return Buffer.from(req.body); + } + + return null; + } + + private static async decompressIfNeeded(data: Buffer): Promise { + // Check for gzip magic bytes (0x1f, 0x8b) + if (data.length >= 2 && data[0] === 0x1f && data[1] === 0x8b) { + return new Promise( + ( + resolve: (value: Buffer) => void, + reject: (reason: Error) => void, + ) => { + zlib.gunzip(data as unknown as Uint8Array, (err: Error | null, result: Buffer) => { + if (err) { + reject(err); + } else { + resolve(result); + } + }); + }, + ); + } + return data; + } + + private static parsePprof(data: Buffer): PprofProfileData { + const message: protobuf.Message = PprofProfile.decode( + new Uint8Array(data.buffer, data.byteOffset, data.byteLength), + ); + const obj: Record = PprofProfile.toObject(message, { + longs: Number, + defaults: true, + arrays: true, + }) as Record; + + return obj as unknown as PprofProfileData; + } + + @CaptureSpan() + private static convertPprofToOTLP(data: { + pprofData: PprofProfileData; + appName: string; + fromSeconds: number; + untilSeconds: number; + }): JSONObject { + const { pprofData, appName, fromSeconds, untilSeconds } = data; + + const stringTable: Array = pprofData.stringTable || []; + + // Build function ID → index map + const functionIdToIndex: Map = new Map(); + const functionTable: Array = []; + + for (let i: number = 0; i < (pprofData.function || []).length; i++) { + const fn: PprofFunction = pprofData.function[i]!; + functionIdToIndex.set(fn.id.toString(), i); + functionTable.push({ + name: fn.name, + filename: fn.filename, + }); + } + + // Build location ID → index map + const locationIdToIndex: Map = new Map(); + const locationTable: Array = []; + + for (let i: number = 0; i < (pprofData.location || []).length; i++) { + const loc: PprofLocation = pprofData.location[i]!; + locationIdToIndex.set(loc.id.toString(), i); + + const lines: Array = (loc.line || []).map( + (line: { functionId: number | string; line: number }) => { + const fnIndex: number = + functionIdToIndex.get(line.functionId.toString()) || 0; + return { + functionIndex: fnIndex, + line: line.line || 0, + }; + }, + ); + + locationTable.push({ line: lines }); + } + + // Build stack table and samples from pprof samples + const stackTable: Array = []; + const stackKeyMap: Map = new Map(); + const otlpSamples: Array = []; + + // Compute timestamps + const startTimeNanos: string = pprofData.timeNanos + ? pprofData.timeNanos.toString() + : (fromSeconds * 1_000_000_000).toString(); + const endTimeNanos: string = pprofData.durationNanos + ? ( + Number(pprofData.timeNanos) + Number(pprofData.durationNanos) + ).toString() + : (untilSeconds * 1_000_000_000).toString(); + + for (const sample of pprofData.sample || []) { + // Convert location IDs to location indices + const locationIndices: Array = (sample.locationId || []).map( + (locId: number | string) => { + return locationIdToIndex.get(locId.toString()) || 0; + }, + ); + + // Deduplicate stacks + const stackKey: string = locationIndices.join(","); + let stackIndex: number | undefined = stackKeyMap.get(stackKey); + if (stackIndex === undefined) { + stackIndex = stackTable.length; + stackTable.push({ locationIndices }); + stackKeyMap.set(stackKey, stackIndex); + } + + // Convert values to strings + const values: Array = (sample.value || []).map( + (v: number | string) => v.toString(), + ); + + otlpSamples.push({ + stackIndex, + value: values, + timestampsUnixNano: [startTimeNanos], + }); + } + + // Build sample types + const sampleType: Array = (pprofData.sampleType || []).map( + (st: PprofValueType) => ({ + type: st.type, + unit: st.unit, + }), + ); + + // Build period type + const periodType: JSONObject = pprofData.periodType + ? { type: pprofData.periodType.type, unit: pprofData.periodType.unit } + : { type: 0, unit: 0 }; + + // Generate profile ID + const profileId: string = ObjectID.generate().toString(); + const profileIdBase64: string = + Buffer.from(profileId, "hex").toString("base64"); + + return { + resourceProfiles: [ + { + resource: { + attributes: [ + { + key: "service.name", + value: { stringValue: appName }, + }, + { + key: "telemetry.sdk.name", + value: { stringValue: "pyroscope" }, + }, + ], + }, + scopeProfiles: [ + { + scope: { + name: "pyroscope", + version: "1.0.0", + }, + profiles: [ + { + profileId: profileIdBase64, + startTimeUnixNano: startTimeNanos, + endTimeUnixNano: endTimeNanos, + attributes: [], + profile: { + stringTable, + sampleType, + sample: otlpSamples, + locationTable, + functionTable, + stackTable, + linkTable: [], + attributeTable: [], + periodType, + period: (pprofData.period || 0).toString(), + }, + }, + ], + }, + ], + }, + ], + } as unknown as JSONObject; + } +}