Compare commits

...

5 Commits

7 changed files with 328 additions and 11 deletions

View File

@@ -57,10 +57,10 @@ import TelemetryService from "./TelemetryService";
],
})
@EnableWorkflow({
create: true,
delete: true,
update: true,
read: true,
create: false,
delete: false,
update: false,
read: false,
})
@CrudApiEndpoint(new Route("/metric-type"))
@SlugifyColumn("name", "slug")

View File

@@ -16,6 +16,9 @@ import CaptureSpan from "../Utils/Telemetry/CaptureSpan";
export enum QueueName {
Workflow = "Workflow",
Worker = "Worker",
OtelIngestTraces = "OtelIngestTraces",
OtelIngestMetrics = "OtelIngestMetrics",
OtelIngestLogs = "OtelIngestLogs",
}
export type QueueJob = Job;

View File

@@ -6,7 +6,11 @@ import Express, {
NextFunction,
} from "Common/Server/Utils/Express";
import OpenTelemetryRequestMiddleware from "../Middleware/OtelRequestMiddleware";
import OtelIngestService from "../Services/OtelIngest";
import OtelQueueWorker, { OtelIngestJobData } from "../Services/OtelQueueWorker";
import Response from "Common/Server/Utils/Response";
import BadRequestException from "Common/Types/Exception/BadRequestException";
import { TelemetryRequest } from "Common/Server/Middleware/TelemetryIngest";
import logger from "Common/Server/Utils/Logger";
const router: ExpressRouter = Express.getRouter();
@@ -25,7 +29,31 @@ router.post(
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
return OtelIngestService.ingestTraces(req, res, next);
try {
if (!(req as TelemetryRequest).projectId) {
throw new BadRequestException(
"Invalid request - projectId not found in request.",
);
}
const reqBody = req.body.toJSON ? req.body.toJSON() : req.body;
// Add job to queue
const jobData: OtelIngestJobData = {
body: reqBody,
projectId: (req as TelemetryRequest).projectId.toString(),
headers: req.headers,
};
await OtelQueueWorker.addTracesJob(jobData);
logger.debug(`Traces job added to queue for project ${jobData.projectId}`);
// Return response immediately
return Response.sendEmptySuccessResponse(req, res);
} catch (err) {
return next(err);
}
},
);
@@ -38,7 +66,32 @@ router.post(
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
return OtelIngestService.ingestMetrics(req, res, next);
try {
if (!(req as TelemetryRequest).projectId) {
throw new BadRequestException(
"Invalid request - projectId not found in request.",
);
}
const reqBody = req.body.toJSON ? req.body.toJSON() : req.body;
// Add job to queue
const jobData: OtelIngestJobData = {
body: reqBody,
projectId: (req as TelemetryRequest).projectId.toString(),
headers: req.headers,
};
await OtelQueueWorker.addMetricsJob(jobData);
logger.debug(`Metrics job added to queue for project ${jobData.projectId}`);
// Return response immediately
return Response.sendEmptySuccessResponse(req, res);
} catch (err) {
return next(err);
}
},
);
@@ -51,7 +104,31 @@ router.post(
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
return OtelIngestService.ingestLogs(req, res, next);
try {
if (!(req as TelemetryRequest).projectId) {
throw new BadRequestException(
"Invalid request - projectId not found in request.",
);
}
const reqBody = req.body.toJSON ? req.body.toJSON() : req.body;
// Add job to queue
const jobData: OtelIngestJobData = {
body: reqBody,
projectId: (req as TelemetryRequest).projectId.toString(),
headers: req.headers,
};
await OtelQueueWorker.addLogsJob(jobData);
logger.debug(`Logs job added to queue for project ${jobData.projectId}`);
// Return response immediately
return Response.sendEmptySuccessResponse(req, res);
} catch (err) {
return next(err);
}
},
);

View File

@@ -9,6 +9,9 @@ import logger from "Common/Server/Utils/Logger";
import Realtime from "Common/Server/Utils/Realtime";
import App from "Common/Server/Utils/StartServer";
import Telemetry from "Common/Server/Utils/Telemetry";
import OtelQueueWorker from "./Services/OtelQueueWorker";
import Queue from "Common/Server/Infrastructure/Queue";
import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
import "ejs";
const app: ExpressApplication = Express.getExpressApp();
@@ -54,6 +57,29 @@ const init: PromiseVoidFunction = async (): Promise<void> => {
await Realtime.init();
// Initialize OpenTelemetry Queue Workers with optional environment configuration
const queueConfig = {
concurrency: {
traces: parseInt(process.env['OTEL_QUEUE_TRACES_CONCURRENCY'] || '10'),
metrics: parseInt(process.env['OTEL_QUEUE_METRICS_CONCURRENCY'] || '10'),
logs: parseInt(process.env['OTEL_QUEUE_LOGS_CONCURRENCY'] || '10'),
},
enabled: {
traces: process.env['OTEL_QUEUE_TRACES_ENABLED'] !== 'false',
metrics: process.env['OTEL_QUEUE_METRICS_ENABLED'] !== 'false',
logs: process.env['OTEL_QUEUE_LOGS_ENABLED'] !== 'false',
},
};
await OtelQueueWorker.init(queueConfig);
// Add queue monitoring routes (Bull Board)
app.use(
Queue.getInspectorRoute(),
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
Queue.getQueueInspectorRouter(),
);
// add default routes
await App.addDefaultRoutes();
} catch (err) {

View File

@@ -107,7 +107,7 @@ export default class OtelIngestService {
}
@CaptureSpan()
private static async processLogsAsync(req: ExpressRequest): Promise<void> {
public static async processLogsAsync(req: ExpressRequest): Promise<void> {
const resourceLogs: JSONArray = req.body["resourceLogs"] as JSONArray;
const dbLogs: Array<Log> = [];
@@ -313,7 +313,7 @@ export default class OtelIngestService {
}
@CaptureSpan()
private static async processMetricsAsync(req: ExpressRequest): Promise<void> {
public static async processMetricsAsync(req: ExpressRequest): Promise<void> {
const resourceMetrics: JSONArray = req.body["resourceMetrics"] as JSONArray;
const dbMetrics: Array<Metric> = [];
@@ -552,7 +552,7 @@ export default class OtelIngestService {
}
@CaptureSpan()
private static async processTracesAsync(req: ExpressRequest): Promise<void> {
public static async processTracesAsync(req: ExpressRequest): Promise<void> {
const resourceSpans: JSONArray = req.body["resourceSpans"] as JSONArray;
const dbSpans: Array<Span> = [];

View File

@@ -0,0 +1,193 @@
import Queue, { QueueJob, QueueName } from "Common/Server/Infrastructure/Queue";
import QueueWorker from "Common/Server/Infrastructure/QueueWorker";
import logger from "Common/Server/Utils/Logger";
import { JSONObject } from "Common/Types/JSON";
import OtelIngestService from "./OtelIngest";
import { TelemetryRequest } from "Common/Server/Middleware/TelemetryIngest";
import ObjectID from "Common/Types/ObjectID";
import { OtelQueueConfig, DEFAULT_OTEL_QUEUE_CONFIG } from "../Types/QueueConfig";
export interface OtelIngestJobData extends JSONObject {
body: JSONObject;
projectId: string;
headers?: JSONObject;
}
export default class OtelQueueWorker {
private static isInitialized: boolean = false;
private static config: OtelQueueConfig = DEFAULT_OTEL_QUEUE_CONFIG;
private static workers: any[] = [];
public static async init(config?: Partial<OtelQueueConfig>): Promise<void> {
if (this.isInitialized) {
return;
}
// Merge provided config with defaults
if (config) {
this.config = {
concurrency: { ...DEFAULT_OTEL_QUEUE_CONFIG.concurrency, ...config.concurrency },
};
}
logger.info("Initializing OpenTelemetry Queue Workers...");
logger.info(`Queue config: ${JSON.stringify(this.config)}`);
// Initialize Traces Queue Worker
const tracesWorker = QueueWorker.getWorker(
QueueName.OtelIngestTraces,
async (job: QueueJob) => {
logger.debug(`Processing traces job: ${job.id}`);
const data = job.data as OtelIngestJobData;
await this.processTracesJob(data);
},
{ concurrency: this.config.concurrency.traces }
);
this.workers.push(tracesWorker);
logger.info(`Traces queue worker initialized with concurrency: ${this.config.concurrency.traces}`);
// Initialize Metrics Queue Worker
const metricsWorker = QueueWorker.getWorker(
QueueName.OtelIngestMetrics,
async (job: QueueJob) => {
logger.debug(`Processing metrics job: ${job.id}`);
const data = job.data as OtelIngestJobData;
await this.processMetricsJob(data);
},
{ concurrency: this.config.concurrency.metrics }
);
this.workers.push(metricsWorker);
logger.info(`Metrics queue worker initialized with concurrency: ${this.config.concurrency.metrics}`);
const logsWorker = QueueWorker.getWorker(
QueueName.OtelIngestLogs,
async (job: QueueJob) => {
logger.debug(`Processing logs job: ${job.id}`);
const data = job.data as OtelIngestJobData;
await this.processLogsJob(data);
},
{ concurrency: this.config.concurrency.logs }
);
this.workers.push(logsWorker);
logger.info(`Logs queue worker initialized with concurrency: ${this.config.concurrency.logs}`);
// Setup graceful shutdown
this.setupGracefulShutdown();
this.isInitialized = true;
logger.info("OpenTelemetry Queue Workers initialized successfully");
}
public static async shutdown(): Promise<void> {
logger.info("Shutting down OpenTelemetry Queue Workers...");
for (const worker of this.workers) {
try {
await worker.close();
} catch (error) {
logger.error("Error closing worker:");
logger.error(error);
}
}
this.workers = [];
this.isInitialized = false;
logger.info("OpenTelemetry Queue Workers shut down successfully");
}
private static setupGracefulShutdown(): void {
const shutdownHandler = async () => {
await this.shutdown();
process.exit(0);
};
process.on('SIGTERM', shutdownHandler);
process.on('SIGINT', shutdownHandler);
}
public static async addTracesJob(data: OtelIngestJobData): Promise<void> {
const jobId = `traces-${Date.now()}-${Math.random()}`;
await Queue.addJob(
QueueName.OtelIngestTraces,
jobId,
"process-traces",
data
);
logger.debug(`Added traces job to queue: ${jobId}`);
}
public static async addMetricsJob(data: OtelIngestJobData): Promise<void> {
const jobId = `metrics-${Date.now()}-${Math.random()}`;
await Queue.addJob(
QueueName.OtelIngestMetrics,
jobId,
"process-metrics",
data
);
logger.debug(`Added metrics job to queue: ${jobId}`);
}
public static async addLogsJob(data: OtelIngestJobData): Promise<void> {
const jobId = `logs-${Date.now()}-${Math.random()}`;
await Queue.addJob(
QueueName.OtelIngestLogs,
jobId,
"process-logs",
data
);
logger.debug(`Added logs job to queue: ${jobId}`);
}
private static async processTracesJob(data: OtelIngestJobData): Promise<void> {
try {
// Create a mock request object with the data
const telemetryRequest = this.createTelemetryRequestParamsRequest(data);
await OtelIngestService.processTracesAsync(telemetryRequest);
logger.debug("Traces job processed successfully");
} catch (error) {
logger.error("Error processing traces job:");
logger.error(error);
throw error;
}
}
private static async processMetricsJob(data: OtelIngestJobData): Promise<void> {
try {
// Create a mock request object with the data
const telemetryRequest = this.createTelemetryRequestParamsRequest(data);
await OtelIngestService.processMetricsAsync(telemetryRequest);
logger.debug("Metrics job processed successfully");
} catch (error) {
logger.error("Error processing metrics job:");
logger.error(error);
throw error;
}
}
private static async processLogsJob(data: OtelIngestJobData): Promise<void> {
try {
// Create a mock request object with the data
const telemetryRequest = this.createTelemetryRequestParamsRequest(data);
await OtelIngestService.processLogsAsync(telemetryRequest);
logger.debug("Logs job processed successfully");
} catch (error) {
logger.error("Error processing logs job:");
logger.error(error);
throw error;
}
}
private static createTelemetryRequestParamsRequest(data: OtelIngestJobData): TelemetryRequest {
return {
body: data.body,
projectId: new ObjectID(data.projectId),
headers: data.headers || {},
} as TelemetryRequest;
}
}

View File

@@ -0,0 +1,18 @@
export interface OtelQueueConfig {
/**
* Number of concurrent workers for each telemetry type
*/
concurrency: {
traces: number;
metrics: number;
logs: number;
};
}
export const DEFAULT_OTEL_QUEUE_CONFIG: OtelQueueConfig = {
concurrency: {
traces: 10,
metrics: 10,
logs: 10,
},
};