mirror of
https://github.com/OneUptime/oneuptime.git
synced 2026-04-06 08:42:13 +02:00
Compare commits
5 Commits
10.0.16
...
otel-queue
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4379e6694d | ||
|
|
14c949d5bb | ||
|
|
48ab829de5 | ||
|
|
b5961ec2f1 | ||
|
|
2ddcd85e6a |
@@ -57,10 +57,10 @@ import TelemetryService from "./TelemetryService";
|
||||
],
|
||||
})
|
||||
@EnableWorkflow({
|
||||
create: true,
|
||||
delete: true,
|
||||
update: true,
|
||||
read: true,
|
||||
create: false,
|
||||
delete: false,
|
||||
update: false,
|
||||
read: false,
|
||||
})
|
||||
@CrudApiEndpoint(new Route("/metric-type"))
|
||||
@SlugifyColumn("name", "slug")
|
||||
|
||||
@@ -16,6 +16,9 @@ import CaptureSpan from "../Utils/Telemetry/CaptureSpan";
|
||||
export enum QueueName {
|
||||
Workflow = "Workflow",
|
||||
Worker = "Worker",
|
||||
OtelIngestTraces = "OtelIngestTraces",
|
||||
OtelIngestMetrics = "OtelIngestMetrics",
|
||||
OtelIngestLogs = "OtelIngestLogs",
|
||||
}
|
||||
|
||||
export type QueueJob = Job;
|
||||
|
||||
@@ -6,7 +6,11 @@ import Express, {
|
||||
NextFunction,
|
||||
} from "Common/Server/Utils/Express";
|
||||
import OpenTelemetryRequestMiddleware from "../Middleware/OtelRequestMiddleware";
|
||||
import OtelIngestService from "../Services/OtelIngest";
|
||||
import OtelQueueWorker, { OtelIngestJobData } from "../Services/OtelQueueWorker";
|
||||
import Response from "Common/Server/Utils/Response";
|
||||
import BadRequestException from "Common/Types/Exception/BadRequestException";
|
||||
import { TelemetryRequest } from "Common/Server/Middleware/TelemetryIngest";
|
||||
import logger from "Common/Server/Utils/Logger";
|
||||
|
||||
const router: ExpressRouter = Express.getRouter();
|
||||
|
||||
@@ -25,7 +29,31 @@ router.post(
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
return OtelIngestService.ingestTraces(req, res, next);
|
||||
try {
|
||||
if (!(req as TelemetryRequest).projectId) {
|
||||
throw new BadRequestException(
|
||||
"Invalid request - projectId not found in request.",
|
||||
);
|
||||
}
|
||||
|
||||
const reqBody = req.body.toJSON ? req.body.toJSON() : req.body;
|
||||
|
||||
// Add job to queue
|
||||
const jobData: OtelIngestJobData = {
|
||||
body: reqBody,
|
||||
projectId: (req as TelemetryRequest).projectId.toString(),
|
||||
headers: req.headers,
|
||||
};
|
||||
|
||||
await OtelQueueWorker.addTracesJob(jobData);
|
||||
|
||||
logger.debug(`Traces job added to queue for project ${jobData.projectId}`);
|
||||
|
||||
// Return response immediately
|
||||
return Response.sendEmptySuccessResponse(req, res);
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
@@ -38,7 +66,32 @@ router.post(
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
return OtelIngestService.ingestMetrics(req, res, next);
|
||||
try {
|
||||
if (!(req as TelemetryRequest).projectId) {
|
||||
throw new BadRequestException(
|
||||
"Invalid request - projectId not found in request.",
|
||||
);
|
||||
}
|
||||
|
||||
const reqBody = req.body.toJSON ? req.body.toJSON() : req.body;
|
||||
|
||||
// Add job to queue
|
||||
const jobData: OtelIngestJobData = {
|
||||
body: reqBody,
|
||||
projectId: (req as TelemetryRequest).projectId.toString(),
|
||||
headers: req.headers,
|
||||
};
|
||||
|
||||
await OtelQueueWorker.addMetricsJob(jobData);
|
||||
|
||||
|
||||
logger.debug(`Metrics job added to queue for project ${jobData.projectId}`);
|
||||
|
||||
// Return response immediately
|
||||
return Response.sendEmptySuccessResponse(req, res);
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
@@ -51,7 +104,31 @@ router.post(
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
return OtelIngestService.ingestLogs(req, res, next);
|
||||
try {
|
||||
if (!(req as TelemetryRequest).projectId) {
|
||||
throw new BadRequestException(
|
||||
"Invalid request - projectId not found in request.",
|
||||
);
|
||||
}
|
||||
|
||||
const reqBody = req.body.toJSON ? req.body.toJSON() : req.body;
|
||||
|
||||
// Add job to queue
|
||||
const jobData: OtelIngestJobData = {
|
||||
body: reqBody,
|
||||
projectId: (req as TelemetryRequest).projectId.toString(),
|
||||
headers: req.headers,
|
||||
};
|
||||
|
||||
await OtelQueueWorker.addLogsJob(jobData);
|
||||
|
||||
logger.debug(`Logs job added to queue for project ${jobData.projectId}`);
|
||||
|
||||
// Return response immediately
|
||||
return Response.sendEmptySuccessResponse(req, res);
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
|
||||
@@ -9,6 +9,9 @@ import logger from "Common/Server/Utils/Logger";
|
||||
import Realtime from "Common/Server/Utils/Realtime";
|
||||
import App from "Common/Server/Utils/StartServer";
|
||||
import Telemetry from "Common/Server/Utils/Telemetry";
|
||||
import OtelQueueWorker from "./Services/OtelQueueWorker";
|
||||
import Queue from "Common/Server/Infrastructure/Queue";
|
||||
import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
|
||||
import "ejs";
|
||||
|
||||
const app: ExpressApplication = Express.getExpressApp();
|
||||
@@ -54,6 +57,29 @@ const init: PromiseVoidFunction = async (): Promise<void> => {
|
||||
|
||||
await Realtime.init();
|
||||
|
||||
// Initialize OpenTelemetry Queue Workers with optional environment configuration
|
||||
const queueConfig = {
|
||||
concurrency: {
|
||||
traces: parseInt(process.env['OTEL_QUEUE_TRACES_CONCURRENCY'] || '10'),
|
||||
metrics: parseInt(process.env['OTEL_QUEUE_METRICS_CONCURRENCY'] || '10'),
|
||||
logs: parseInt(process.env['OTEL_QUEUE_LOGS_CONCURRENCY'] || '10'),
|
||||
},
|
||||
enabled: {
|
||||
traces: process.env['OTEL_QUEUE_TRACES_ENABLED'] !== 'false',
|
||||
metrics: process.env['OTEL_QUEUE_METRICS_ENABLED'] !== 'false',
|
||||
logs: process.env['OTEL_QUEUE_LOGS_ENABLED'] !== 'false',
|
||||
},
|
||||
};
|
||||
|
||||
await OtelQueueWorker.init(queueConfig);
|
||||
|
||||
// Add queue monitoring routes (Bull Board)
|
||||
app.use(
|
||||
Queue.getInspectorRoute(),
|
||||
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
|
||||
Queue.getQueueInspectorRouter(),
|
||||
);
|
||||
|
||||
// add default routes
|
||||
await App.addDefaultRoutes();
|
||||
} catch (err) {
|
||||
|
||||
@@ -107,7 +107,7 @@ export default class OtelIngestService {
|
||||
}
|
||||
|
||||
@CaptureSpan()
|
||||
private static async processLogsAsync(req: ExpressRequest): Promise<void> {
|
||||
public static async processLogsAsync(req: ExpressRequest): Promise<void> {
|
||||
const resourceLogs: JSONArray = req.body["resourceLogs"] as JSONArray;
|
||||
|
||||
const dbLogs: Array<Log> = [];
|
||||
@@ -313,7 +313,7 @@ export default class OtelIngestService {
|
||||
}
|
||||
|
||||
@CaptureSpan()
|
||||
private static async processMetricsAsync(req: ExpressRequest): Promise<void> {
|
||||
public static async processMetricsAsync(req: ExpressRequest): Promise<void> {
|
||||
const resourceMetrics: JSONArray = req.body["resourceMetrics"] as JSONArray;
|
||||
|
||||
const dbMetrics: Array<Metric> = [];
|
||||
@@ -552,7 +552,7 @@ export default class OtelIngestService {
|
||||
}
|
||||
|
||||
@CaptureSpan()
|
||||
private static async processTracesAsync(req: ExpressRequest): Promise<void> {
|
||||
public static async processTracesAsync(req: ExpressRequest): Promise<void> {
|
||||
const resourceSpans: JSONArray = req.body["resourceSpans"] as JSONArray;
|
||||
|
||||
const dbSpans: Array<Span> = [];
|
||||
|
||||
193
OpenTelemetryIngest/Services/OtelQueueWorker.ts
Normal file
193
OpenTelemetryIngest/Services/OtelQueueWorker.ts
Normal file
@@ -0,0 +1,193 @@
|
||||
import Queue, { QueueJob, QueueName } from "Common/Server/Infrastructure/Queue";
|
||||
import QueueWorker from "Common/Server/Infrastructure/QueueWorker";
|
||||
import logger from "Common/Server/Utils/Logger";
|
||||
import { JSONObject } from "Common/Types/JSON";
|
||||
import OtelIngestService from "./OtelIngest";
|
||||
import { TelemetryRequest } from "Common/Server/Middleware/TelemetryIngest";
|
||||
import ObjectID from "Common/Types/ObjectID";
|
||||
import { OtelQueueConfig, DEFAULT_OTEL_QUEUE_CONFIG } from "../Types/QueueConfig";
|
||||
|
||||
export interface OtelIngestJobData extends JSONObject {
|
||||
body: JSONObject;
|
||||
projectId: string;
|
||||
headers?: JSONObject;
|
||||
}
|
||||
|
||||
export default class OtelQueueWorker {
|
||||
private static isInitialized: boolean = false;
|
||||
private static config: OtelQueueConfig = DEFAULT_OTEL_QUEUE_CONFIG;
|
||||
private static workers: any[] = [];
|
||||
|
||||
public static async init(config?: Partial<OtelQueueConfig>): Promise<void> {
|
||||
if (this.isInitialized) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Merge provided config with defaults
|
||||
if (config) {
|
||||
this.config = {
|
||||
concurrency: { ...DEFAULT_OTEL_QUEUE_CONFIG.concurrency, ...config.concurrency },
|
||||
};
|
||||
}
|
||||
|
||||
logger.info("Initializing OpenTelemetry Queue Workers...");
|
||||
logger.info(`Queue config: ${JSON.stringify(this.config)}`);
|
||||
|
||||
// Initialize Traces Queue Worker
|
||||
|
||||
const tracesWorker = QueueWorker.getWorker(
|
||||
QueueName.OtelIngestTraces,
|
||||
async (job: QueueJob) => {
|
||||
logger.debug(`Processing traces job: ${job.id}`);
|
||||
const data = job.data as OtelIngestJobData;
|
||||
await this.processTracesJob(data);
|
||||
},
|
||||
{ concurrency: this.config.concurrency.traces }
|
||||
);
|
||||
this.workers.push(tracesWorker);
|
||||
logger.info(`Traces queue worker initialized with concurrency: ${this.config.concurrency.traces}`);
|
||||
|
||||
|
||||
// Initialize Metrics Queue Worker
|
||||
|
||||
const metricsWorker = QueueWorker.getWorker(
|
||||
QueueName.OtelIngestMetrics,
|
||||
async (job: QueueJob) => {
|
||||
logger.debug(`Processing metrics job: ${job.id}`);
|
||||
const data = job.data as OtelIngestJobData;
|
||||
await this.processMetricsJob(data);
|
||||
},
|
||||
{ concurrency: this.config.concurrency.metrics }
|
||||
);
|
||||
this.workers.push(metricsWorker);
|
||||
logger.info(`Metrics queue worker initialized with concurrency: ${this.config.concurrency.metrics}`);
|
||||
|
||||
|
||||
const logsWorker = QueueWorker.getWorker(
|
||||
QueueName.OtelIngestLogs,
|
||||
async (job: QueueJob) => {
|
||||
logger.debug(`Processing logs job: ${job.id}`);
|
||||
const data = job.data as OtelIngestJobData;
|
||||
await this.processLogsJob(data);
|
||||
},
|
||||
{ concurrency: this.config.concurrency.logs }
|
||||
);
|
||||
this.workers.push(logsWorker);
|
||||
logger.info(`Logs queue worker initialized with concurrency: ${this.config.concurrency.logs}`);
|
||||
|
||||
|
||||
// Setup graceful shutdown
|
||||
this.setupGracefulShutdown();
|
||||
|
||||
this.isInitialized = true;
|
||||
logger.info("OpenTelemetry Queue Workers initialized successfully");
|
||||
}
|
||||
|
||||
|
||||
public static async shutdown(): Promise<void> {
|
||||
logger.info("Shutting down OpenTelemetry Queue Workers...");
|
||||
|
||||
for (const worker of this.workers) {
|
||||
try {
|
||||
await worker.close();
|
||||
} catch (error) {
|
||||
logger.error("Error closing worker:");
|
||||
logger.error(error);
|
||||
}
|
||||
}
|
||||
|
||||
this.workers = [];
|
||||
this.isInitialized = false;
|
||||
logger.info("OpenTelemetry Queue Workers shut down successfully");
|
||||
}
|
||||
|
||||
private static setupGracefulShutdown(): void {
|
||||
const shutdownHandler = async () => {
|
||||
await this.shutdown();
|
||||
process.exit(0);
|
||||
};
|
||||
|
||||
process.on('SIGTERM', shutdownHandler);
|
||||
process.on('SIGINT', shutdownHandler);
|
||||
}
|
||||
|
||||
public static async addTracesJob(data: OtelIngestJobData): Promise<void> {
|
||||
const jobId = `traces-${Date.now()}-${Math.random()}`;
|
||||
await Queue.addJob(
|
||||
QueueName.OtelIngestTraces,
|
||||
jobId,
|
||||
"process-traces",
|
||||
data
|
||||
);
|
||||
logger.debug(`Added traces job to queue: ${jobId}`);
|
||||
}
|
||||
|
||||
public static async addMetricsJob(data: OtelIngestJobData): Promise<void> {
|
||||
const jobId = `metrics-${Date.now()}-${Math.random()}`;
|
||||
await Queue.addJob(
|
||||
QueueName.OtelIngestMetrics,
|
||||
jobId,
|
||||
"process-metrics",
|
||||
data
|
||||
);
|
||||
logger.debug(`Added metrics job to queue: ${jobId}`);
|
||||
}
|
||||
|
||||
public static async addLogsJob(data: OtelIngestJobData): Promise<void> {
|
||||
const jobId = `logs-${Date.now()}-${Math.random()}`;
|
||||
await Queue.addJob(
|
||||
QueueName.OtelIngestLogs,
|
||||
jobId,
|
||||
"process-logs",
|
||||
data
|
||||
);
|
||||
logger.debug(`Added logs job to queue: ${jobId}`);
|
||||
}
|
||||
|
||||
private static async processTracesJob(data: OtelIngestJobData): Promise<void> {
|
||||
try {
|
||||
// Create a mock request object with the data
|
||||
const telemetryRequest = this.createTelemetryRequestParamsRequest(data);
|
||||
await OtelIngestService.processTracesAsync(telemetryRequest);
|
||||
logger.debug("Traces job processed successfully");
|
||||
} catch (error) {
|
||||
logger.error("Error processing traces job:");
|
||||
logger.error(error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
private static async processMetricsJob(data: OtelIngestJobData): Promise<void> {
|
||||
try {
|
||||
// Create a mock request object with the data
|
||||
const telemetryRequest = this.createTelemetryRequestParamsRequest(data);
|
||||
await OtelIngestService.processMetricsAsync(telemetryRequest);
|
||||
logger.debug("Metrics job processed successfully");
|
||||
} catch (error) {
|
||||
logger.error("Error processing metrics job:");
|
||||
logger.error(error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
private static async processLogsJob(data: OtelIngestJobData): Promise<void> {
|
||||
try {
|
||||
// Create a mock request object with the data
|
||||
const telemetryRequest = this.createTelemetryRequestParamsRequest(data);
|
||||
await OtelIngestService.processLogsAsync(telemetryRequest);
|
||||
logger.debug("Logs job processed successfully");
|
||||
} catch (error) {
|
||||
logger.error("Error processing logs job:");
|
||||
logger.error(error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
private static createTelemetryRequestParamsRequest(data: OtelIngestJobData): TelemetryRequest {
|
||||
return {
|
||||
body: data.body,
|
||||
projectId: new ObjectID(data.projectId),
|
||||
headers: data.headers || {},
|
||||
} as TelemetryRequest;
|
||||
}
|
||||
}
|
||||
18
OpenTelemetryIngest/Types/QueueConfig.ts
Normal file
18
OpenTelemetryIngest/Types/QueueConfig.ts
Normal file
@@ -0,0 +1,18 @@
|
||||
export interface OtelQueueConfig {
|
||||
/**
|
||||
* Number of concurrent workers for each telemetry type
|
||||
*/
|
||||
concurrency: {
|
||||
traces: number;
|
||||
metrics: number;
|
||||
logs: number;
|
||||
};
|
||||
}
|
||||
|
||||
export const DEFAULT_OTEL_QUEUE_CONFIG: OtelQueueConfig = {
|
||||
concurrency: {
|
||||
traces: 10,
|
||||
metrics: 10,
|
||||
logs: 10,
|
||||
},
|
||||
};
|
||||
Reference in New Issue
Block a user