feat: add parseBody middleware for handling gzip and protobuf requests in OpenTelemetry routes

This commit is contained in:
Nawaz Dhandala
2026-04-02 23:46:57 +01:00
parent 46a698b4be
commit 8101f4a459
3 changed files with 83 additions and 13 deletions

View File

@@ -25,6 +25,7 @@ const router: ExpressRouter = Express.getRouter();
router.post(
"/otlp/v1/traces",
OpenTelemetryRequestMiddleware.parseBody,
OpenTelemetryRequestMiddleware.getProductType,
TelemetryIngest.isAuthorizedServiceMiddleware,
async (
@@ -38,6 +39,7 @@ router.post(
router.post(
"/otlp/v1/metrics",
OpenTelemetryRequestMiddleware.parseBody,
OpenTelemetryRequestMiddleware.getProductType,
TelemetryIngest.isAuthorizedServiceMiddleware,
async (
@@ -51,6 +53,7 @@ router.post(
router.post(
"/otlp/v1/logs",
OpenTelemetryRequestMiddleware.parseBody,
OpenTelemetryRequestMiddleware.getProductType,
TelemetryIngest.isAuthorizedServiceMiddleware,
async (
@@ -64,6 +67,7 @@ router.post(
router.post(
"/otlp/v1/profiles",
OpenTelemetryRequestMiddleware.parseBody,
OpenTelemetryRequestMiddleware.getProductType,
TelemetryIngest.isAuthorizedServiceMiddleware,
async (

View File

@@ -5,11 +5,14 @@ import {
ExpressRequest,
ExpressResponse,
NextFunction,
headerValueToString,
} from "Common/Server/Utils/Express";
import CaptureSpan from "Common/Server/Utils/Telemetry/CaptureSpan";
import protobuf from "protobufjs";
import logger from "Common/Server/Utils/Logger";
import path from "path";
import zlib from "zlib";
import { promisify } from "util";
// Load proto file for OTel
@@ -43,8 +46,56 @@ const LogsData: protobuf.Type = LogsProto.lookupType("LogsData");
const TracesData: protobuf.Type = TracesProto.lookupType("TracesData");
const MetricsData: protobuf.Type = MetricsProto.lookupType("MetricsData");
const ProfilesData: protobuf.Type = ProfilesProto.lookupType("ProfilesData");
const gunzipAsync: (buffer: Uint8Array) => Promise<Buffer> = promisify(
zlib.gunzip,
);
export default class OpenTelemetryRequestMiddleware {
@CaptureSpan()
public static async parseBody(
req: ExpressRequest,
_res: ExpressResponse,
next: NextFunction,
): Promise<void> {
try {
if (req.body !== undefined && req.body !== null) {
return next();
}
const requestBuffer: Buffer = await new Promise<Buffer>(
(resolve: (value: Buffer) => void, reject: (err: Error) => void) => {
const chunks: Array<Buffer> = [];
req.on("data", (chunk: Buffer | string) => {
chunks.push(
Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, "utf-8"),
);
});
req.on("end", () => {
resolve(Buffer.concat(chunks));
});
req.on("error", (err: Error) => {
reject(err);
});
},
);
const contentEncoding: string | undefined = headerValueToString(
req.headers["content-encoding"],
);
req.body = contentEncoding?.includes("gzip")
? await gunzipAsync(requestBuffer)
: requestBuffer;
next();
} catch (err) {
return next(err);
}
}
@CaptureSpan()
public static async getProductType(
req: ExpressRequest,
@@ -54,7 +105,9 @@ export default class OpenTelemetryRequestMiddleware {
try {
let productType: ProductType;
const contentType: string | undefined = req.headers["content-type"];
const contentType: string | undefined = headerValueToString(
req.headers["content-type"],
);
const isProtobuf: boolean =
req.body instanceof Uint8Array &&
(!contentType ||

View File

@@ -19,6 +19,7 @@ import Express, {
NextFunction,
OneUptimeRequest,
RequestHandler,
headerValueToString,
} from "./Express";
import logger from "./Logger";
import "./Process";
@@ -117,9 +118,9 @@ app.use((req: ExpressRequest, _res: ExpressResponse, next: NextFunction) => {
});
/*
* Parse protobuf (binary) bodies for OTLP ingestion before JSON/gzip middleware.
* The .NET OpenTelemetry SDK (and others) send telemetry data as application/x-protobuf.
* Without this, express.json() skips protobuf requests and req.body remains undefined.
* Parse protobuf (binary) bodies for non-OTLP routes.
* OTLP HTTP ingestion bypasses the global body parsers and handles raw/gzip
* payloads in the telemetry router to avoid conflicts with the merged app stack.
*/
const protobufBodyParserMiddleware: RequestHandler = ExpressRaw({
type: ["application/x-protobuf", "application/protobuf"],
@@ -127,15 +128,18 @@ const protobufBodyParserMiddleware: RequestHandler = ExpressRaw({
});
app.use((req: OneUptimeRequest, res: ExpressResponse, next: NextFunction) => {
const contentType: string | undefined = req.headers["content-type"];
if (req.path.includes("/otlp/v1/")) {
return next();
}
if (
contentType &&
(contentType.includes("application/x-protobuf") ||
contentType.includes("application/protobuf"))
) {
protobufBodyParserMiddleware(req, res, next);
} else if (req.headers["content-encoding"] === "gzip") {
const contentType: string | undefined = headerValueToString(
req.headers["content-type"],
);
const contentEncoding: string | undefined = headerValueToString(
req.headers["content-encoding"],
);
if (contentEncoding?.includes("gzip")) {
const buffers: any = [];
req.on("data", (chunk: any) => {
@@ -159,13 +163,22 @@ app.use((req: OneUptimeRequest, res: ExpressResponse, next: NextFunction) => {
next();
});
});
} else if (
contentType &&
(contentType.includes("application/x-protobuf") ||
contentType.includes("application/protobuf"))
) {
protobufBodyParserMiddleware(req, res, next);
} else {
jsonBodyParserMiddleware(req, res, next);
}
});
app.use((req: ExpressRequest, res: ExpressResponse, next: NextFunction) => {
if (req.headers["content-encoding"] === "gzip") {
if (
req.path.includes("/otlp/v1/") ||
headerValueToString(req.headers["content-encoding"])?.includes("gzip")
) {
next();
} else {
urlEncodedMiddleware(req, res, next);