diff --git a/App/FeatureSet/Telemetry/API/OTelIngest.ts b/App/FeatureSet/Telemetry/API/OTelIngest.ts index 351a1ca80d..c9b01c5d17 100644 --- a/App/FeatureSet/Telemetry/API/OTelIngest.ts +++ b/App/FeatureSet/Telemetry/API/OTelIngest.ts @@ -25,6 +25,7 @@ const router: ExpressRouter = Express.getRouter(); router.post( "/otlp/v1/traces", + OpenTelemetryRequestMiddleware.parseBody, OpenTelemetryRequestMiddleware.getProductType, TelemetryIngest.isAuthorizedServiceMiddleware, async ( @@ -38,6 +39,7 @@ router.post( router.post( "/otlp/v1/metrics", + OpenTelemetryRequestMiddleware.parseBody, OpenTelemetryRequestMiddleware.getProductType, TelemetryIngest.isAuthorizedServiceMiddleware, async ( @@ -51,6 +53,7 @@ router.post( router.post( "/otlp/v1/logs", + OpenTelemetryRequestMiddleware.parseBody, OpenTelemetryRequestMiddleware.getProductType, TelemetryIngest.isAuthorizedServiceMiddleware, async ( @@ -64,6 +67,7 @@ router.post( router.post( "/otlp/v1/profiles", + OpenTelemetryRequestMiddleware.parseBody, OpenTelemetryRequestMiddleware.getProductType, TelemetryIngest.isAuthorizedServiceMiddleware, async ( diff --git a/App/FeatureSet/Telemetry/Middleware/OtelRequestMiddleware.ts b/App/FeatureSet/Telemetry/Middleware/OtelRequestMiddleware.ts index ede2a01ebb..949d651a33 100644 --- a/App/FeatureSet/Telemetry/Middleware/OtelRequestMiddleware.ts +++ b/App/FeatureSet/Telemetry/Middleware/OtelRequestMiddleware.ts @@ -5,11 +5,14 @@ import { ExpressRequest, ExpressResponse, NextFunction, + headerValueToString, } from "Common/Server/Utils/Express"; import CaptureSpan from "Common/Server/Utils/Telemetry/CaptureSpan"; import protobuf from "protobufjs"; import logger from "Common/Server/Utils/Logger"; import path from "path"; +import zlib from "zlib"; +import { promisify } from "util"; // Load proto file for OTel @@ -43,8 +46,56 @@ const LogsData: protobuf.Type = LogsProto.lookupType("LogsData"); const TracesData: protobuf.Type = TracesProto.lookupType("TracesData"); const MetricsData: protobuf.Type = MetricsProto.lookupType("MetricsData"); const ProfilesData: protobuf.Type = ProfilesProto.lookupType("ProfilesData"); +const gunzipAsync: (buffer: Uint8Array) => Promise = promisify( + zlib.gunzip, +); export default class OpenTelemetryRequestMiddleware { + @CaptureSpan() + public static async parseBody( + req: ExpressRequest, + _res: ExpressResponse, + next: NextFunction, + ): Promise { + try { + if (req.body !== undefined && req.body !== null) { + return next(); + } + + const requestBuffer: Buffer = await new Promise( + (resolve: (value: Buffer) => void, reject: (err: Error) => void) => { + const chunks: Array = []; + + req.on("data", (chunk: Buffer | string) => { + chunks.push( + Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, "utf-8"), + ); + }); + + req.on("end", () => { + resolve(Buffer.concat(chunks)); + }); + + req.on("error", (err: Error) => { + reject(err); + }); + }, + ); + + const contentEncoding: string | undefined = headerValueToString( + req.headers["content-encoding"], + ); + + req.body = contentEncoding?.includes("gzip") + ? await gunzipAsync(requestBuffer) + : requestBuffer; + + next(); + } catch (err) { + return next(err); + } + } + @CaptureSpan() public static async getProductType( req: ExpressRequest, @@ -54,7 +105,9 @@ export default class OpenTelemetryRequestMiddleware { try { let productType: ProductType; - const contentType: string | undefined = req.headers["content-type"]; + const contentType: string | undefined = headerValueToString( + req.headers["content-type"], + ); const isProtobuf: boolean = req.body instanceof Uint8Array && (!contentType || diff --git a/Common/Server/Utils/StartServer.ts b/Common/Server/Utils/StartServer.ts index 1af018c17b..4799a06532 100644 --- a/Common/Server/Utils/StartServer.ts +++ b/Common/Server/Utils/StartServer.ts @@ -19,6 +19,7 @@ import Express, { NextFunction, OneUptimeRequest, RequestHandler, + headerValueToString, } from "./Express"; import logger from "./Logger"; import "./Process"; @@ -117,9 +118,9 @@ app.use((req: ExpressRequest, _res: ExpressResponse, next: NextFunction) => { }); /* - * Parse protobuf (binary) bodies for OTLP ingestion before JSON/gzip middleware. - * The .NET OpenTelemetry SDK (and others) send telemetry data as application/x-protobuf. - * Without this, express.json() skips protobuf requests and req.body remains undefined. + * Parse protobuf (binary) bodies for non-OTLP routes. + * OTLP HTTP ingestion bypasses the global body parsers and handles raw/gzip + * payloads in the telemetry router to avoid conflicts with the merged app stack. */ const protobufBodyParserMiddleware: RequestHandler = ExpressRaw({ type: ["application/x-protobuf", "application/protobuf"], @@ -127,15 +128,18 @@ const protobufBodyParserMiddleware: RequestHandler = ExpressRaw({ }); app.use((req: OneUptimeRequest, res: ExpressResponse, next: NextFunction) => { - const contentType: string | undefined = req.headers["content-type"]; + if (req.path.includes("/otlp/v1/")) { + return next(); + } - if ( - contentType && - (contentType.includes("application/x-protobuf") || - contentType.includes("application/protobuf")) - ) { - protobufBodyParserMiddleware(req, res, next); - } else if (req.headers["content-encoding"] === "gzip") { + const contentType: string | undefined = headerValueToString( + req.headers["content-type"], + ); + const contentEncoding: string | undefined = headerValueToString( + req.headers["content-encoding"], + ); + + if (contentEncoding?.includes("gzip")) { const buffers: any = []; req.on("data", (chunk: any) => { @@ -159,13 +163,22 @@ app.use((req: OneUptimeRequest, res: ExpressResponse, next: NextFunction) => { next(); }); }); + } else if ( + contentType && + (contentType.includes("application/x-protobuf") || + contentType.includes("application/protobuf")) + ) { + protobufBodyParserMiddleware(req, res, next); } else { jsonBodyParserMiddleware(req, res, next); } }); app.use((req: ExpressRequest, res: ExpressResponse, next: NextFunction) => { - if (req.headers["content-encoding"] === "gzip") { + if ( + req.path.includes("/otlp/v1/") || + headerValueToString(req.headers["content-encoding"])?.includes("gzip") + ) { next(); } else { urlEncodedMiddleware(req, res, next);