Files
oneuptime/Telemetry/Services/Queue/TelemetryQueueService.ts
OneUptime AI Agent a7797b9b53 fix: Job traces-a8b2ddcf-0d6c-4c5f-8fb4-aad709372869-17
This commit fixes an exception detected by OneUptime.

Exception Type: Error
Exception ID: e6f39d81-94f7-4c7d-a5f1-822e9c28b9ae

Automatically generated by OneUptime AI Agent.
2026-01-09 19:31:06 +00:00

123 lines
3.3 KiB
TypeScript

import { TelemetryRequest } from "Common/Server/Middleware/TelemetryIngest";
import Queue, { QueueName } from "Common/Server/Infrastructure/Queue";
import { JSONObject } from "Common/Types/JSON";
import OneUptimeDate from "Common/Types/Date";
import logger from "Common/Server/Utils/Logger";
import Text from "Common/Types/Text";
export enum TelemetryType {
Logs = "logs",
Traces = "traces",
Metrics = "metrics",
Syslog = "syslog",
FluentLogs = "fluentlogs",
}
export interface TelemetryIngestJobData {
type: TelemetryType;
projectId: string;
requestBody: JSONObject;
requestHeaders: Record<string, string>;
ingestionTimestamp: Date;
}
// Legacy interfaces for backward compatibility
export interface LogsIngestJobData extends TelemetryIngestJobData {
type: TelemetryType.Logs;
}
export interface TracesIngestJobData extends TelemetryIngestJobData {
type: TelemetryType.Traces;
}
export interface MetricsIngestJobData extends TelemetryIngestJobData {
type: TelemetryType.Metrics;
}
export interface SyslogIngestJobData extends TelemetryIngestJobData {
type: TelemetryType.Syslog;
}
export default class TelemetryQueueService {
public static async addTelemetryIngestJob(
req: TelemetryRequest,
type: TelemetryType,
): Promise<void> {
try {
const jobData: TelemetryIngestJobData = {
type,
projectId: req.projectId.toString(),
requestBody: req.body,
requestHeaders: req.headers as Record<string, string>,
ingestionTimestamp: OneUptimeDate.getCurrentDate(),
};
const jobId: string = `${type}-${req.projectId?.toString()}-${OneUptimeDate.getCurrentDateAsUnixNano()}-${Text.generateRandomNumber(6)}`;
await Queue.addJob(
QueueName.Telemetry,
jobId,
"ProcessTelemetry",
jobData as unknown as JSONObject,
);
logger.debug(`Added ${type} ingestion job: ${jobId}`);
} catch (error) {
logger.error(`Error adding ${type} ingestion job:`);
logger.error(error);
throw error;
}
}
public static async addLogIngestJob(req: TelemetryRequest): Promise<void> {
return this.addTelemetryIngestJob(req, TelemetryType.Logs);
}
public static async addTraceIngestJob(req: TelemetryRequest): Promise<void> {
return this.addTelemetryIngestJob(req, TelemetryType.Traces);
}
public static async addMetricIngestJob(req: TelemetryRequest): Promise<void> {
return this.addTelemetryIngestJob(req, TelemetryType.Metrics);
}
public static async addFluentLogIngestJob(
req: TelemetryRequest,
): Promise<void> {
return this.addTelemetryIngestJob(req, TelemetryType.FluentLogs);
}
public static async getQueueSize(): Promise<number> {
return Queue.getQueueSize(QueueName.Telemetry);
}
public static async getQueueStats(): Promise<{
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
total: number;
}> {
return Queue.getQueueStats(QueueName.Telemetry);
}
public static getFailedJobs(options?: {
start?: number;
end?: number;
}): Promise<
Array<{
id: string;
name: string;
data: JSONObject;
failedReason: string;
stackTrace?: string;
processedOn: Date | null;
finishedOn: Date | null;
attemptsMade: number;
}>
> {
return Queue.getFailedJobs(QueueName.Telemetry, options);
}
}