diff --git a/Common/Server/Infrastructure/Queue.ts b/Common/Server/Infrastructure/Queue.ts index 4bd7829e87..714fad7958 100644 --- a/Common/Server/Infrastructure/Queue.ts +++ b/Common/Server/Infrastructure/Queue.ts @@ -17,6 +17,10 @@ export enum QueueName { Workflow = "Workflow", Worker = "Worker", Telemetry = "Telemetry", + FluentIngest = "FluentIngest", + IncomingRequestIngest = "IncomingRequestIngest", + ServerMonitorIngest = "ServerMonitorIngest", + ProbeIngest = "ProbeIngest", } export type QueueJob = Job; diff --git a/FluentIngest/API/FluentIngest.ts b/FluentIngest/API/FluentIngest.ts index 08d490c832..c09a0f6b9f 100644 --- a/FluentIngest/API/FluentIngest.ts +++ b/FluentIngest/API/FluentIngest.ts @@ -1,23 +1,17 @@ import TelemetryIngest, { TelemetryRequest, } from "Common/Server/Middleware/TelemetryIngest"; -import OneUptimeDate from "Common/Types/Date"; -import { JSONObject } from "Common/Types/JSON"; import ProductType from "Common/Types/MeteredPlan/ProductType"; -import LogService from "Common/Server/Services/LogService"; import Express, { ExpressRequest, ExpressResponse, ExpressRouter, NextFunction, } from "Common/Server/Utils/Express"; -import logger from "Common/Server/Utils/Logger"; import Response from "Common/Server/Utils/Response"; -import Log from "Common/Models/AnalyticsModels/Log"; -import LogSeverity from "Common/Types/Log/LogSeverity"; -import OTelIngestService from "Common/Server/Services/OpenTelemetryIngestService"; -import ObjectID from "Common/Types/ObjectID"; -import JSONFunctions from "Common/Types/JSONFunctions"; +import FluentIngestQueueService from "../Services/Queue/FluentIngestQueueService"; +import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization"; +import BadRequestException from "Common/Types/Exception/BadRequestException"; export class FluentRequestMiddleware { public static async getProductType( @@ -46,96 +40,105 @@ router.post( next: NextFunction, ): Promise => { try { - logger.debug("Fluent ProbeIngest API called"); - - const dbLogs: Array = []; - - let logItems: Array | JSONObject = req.body as - | Array - | JSONObject; - - let oneuptimeServiceName: string | string[] | undefined = - req.headers["x-oneuptime-service-name"]; - - if (!oneuptimeServiceName) { - oneuptimeServiceName = "Unknown Service"; + if (!(req as TelemetryRequest).projectId) { + throw new BadRequestException( + "Invalid request - projectId not found in request.", + ); } - const telemetryService: { - serviceId: ObjectID; - dataRententionInDays: number; - } = await OTelIngestService.telemetryServiceFromName({ - serviceName: oneuptimeServiceName as string, - projectId: (req as TelemetryRequest).projectId, + req.body = req.body.toJSON ? req.body.toJSON() : req.body; + + // Return response immediately + Response.sendEmptySuccessResponse(req, res); + + // Add to queue for asynchronous processing + await FluentIngestQueueService.addFluentIngestJob(req as TelemetryRequest); + + return; + } catch (err) { + return next(err); + } + }, +); + +// Queue stats endpoint +router.get( + "/fluent/queue/stats", + ClusterKeyAuthorization.isAuthorizedServiceMiddleware, + async ( + req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise => { + try { + const stats: { + waiting: number; + active: number; + completed: number; + failed: number; + delayed: number; + total: number; + } = await FluentIngestQueueService.getQueueStats(); + return Response.sendJsonObjectResponse(req, res, stats); + } catch (err) { + return next(err); + } + }, +); + +// Queue size endpoint +router.get( + "/fluent/queue/size", + ClusterKeyAuthorization.isAuthorizedServiceMiddleware, + async ( + req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise => { + try { + const size: number = await FluentIngestQueueService.getQueueSize(); + return Response.sendJsonObjectResponse(req, res, { size }); + } catch (err) { + return next(err); + } + }, +); + +// Queue failed jobs endpoint +router.get( + "/fluent/queue/failed", + ClusterKeyAuthorization.isAuthorizedServiceMiddleware, + async ( + req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise => { + try { + // Parse pagination parameters from query string + const start: number = parseInt(req.query["start"] as string) || 0; + const end: number = parseInt(req.query["end"] as string) || 100; + + const failedJobs: Array<{ + id: string; + name: string; + data: any; + failedReason: string; + processedOn: Date | null; + finishedOn: Date | null; + attemptsMade: number; + }> = await FluentIngestQueueService.getFailedJobs({ + start, + end, }); - if ( - logItems && - typeof logItems === "object" && - (logItems as JSONObject)["json"] - ) { - logItems = (logItems as JSONObject)["json"] as - | Array - | JSONObject; - } - - if (!Array.isArray(logItems)) { - logItems = [logItems]; - } - - for (let logItem of logItems) { - const dbLog: Log = new Log(); - - dbLog.projectId = (req as TelemetryRequest).projectId; - dbLog.serviceId = telemetryService.serviceId; - dbLog.severityNumber = 0; - const currentTimeAndDate: Date = OneUptimeDate.getCurrentDate(); - dbLog.timeUnixNano = OneUptimeDate.toUnixNano(currentTimeAndDate); - dbLog.time = currentTimeAndDate; - - dbLog.severityText = LogSeverity.Unspecified; - - if (typeof logItem === "string") { - // check if its parseable to json - try { - logItem = JSON.parse(logItem); - } catch { - // do nothing - } - } - - if (typeof logItem !== "string") { - logItem = JSON.stringify(logItem); - } - - dbLog.body = logItem as string; - - dbLogs.push(dbLog); - } - - await LogService.createMany({ - items: dbLogs, - props: { - isRoot: true, + return Response.sendJsonObjectResponse(req, res, { + failedJobs, + pagination: { + start, + end, + count: failedJobs.length, }, }); - - OTelIngestService.recordDataIngestedUsgaeBilling({ - services: { - [oneuptimeServiceName as string]: { - dataIngestedInGB: JSONFunctions.getSizeOfJSONinGB(req.body), - dataRententionInDays: telemetryService.dataRententionInDays, - serviceId: telemetryService.serviceId, - serviceName: oneuptimeServiceName as string, - }, - }, - projectId: (req as TelemetryRequest).projectId, - productType: ProductType.Logs, - }).catch((err: Error) => { - logger.error(err); - }); - - return Response.sendEmptySuccessResponse(req, res); } catch (err) { return next(err); } diff --git a/FluentIngest/API/Metrics.ts b/FluentIngest/API/Metrics.ts new file mode 100644 index 0000000000..f6e16a32e6 --- /dev/null +++ b/FluentIngest/API/Metrics.ts @@ -0,0 +1,37 @@ +import Express, { + ExpressRequest, + ExpressResponse, + ExpressRouter, + NextFunction, +} from "Common/Server/Utils/Express"; +import FluentIngestQueueService from "../Services/Queue/FluentIngestQueueService"; +// import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization"; + +const router: ExpressRouter = Express.getRouter(); + +/** + * JSON metrics endpoint for KEDA autoscaling + * Returns queue size as JSON for KEDA metrics-api scaler + */ +router.get( + "/metrics/queue-size", + // ClusterKeyAuthorization.isAuthorizedServiceMiddleware, // Temporarily disabled for KEDA debugging + async ( + _req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise => { + try { + const queueSize: number = await FluentIngestQueueService.getQueueSize(); + + res.setHeader("Content-Type", "application/json"); + res.status(200).json({ + queueSize: queueSize, + }); + } catch (err) { + return next(err); + } + }, +); + +export default router; diff --git a/FluentIngest/Index.ts b/FluentIngest/Index.ts index 156c181e03..9d25283563 100644 --- a/FluentIngest/Index.ts +++ b/FluentIngest/Index.ts @@ -1,4 +1,5 @@ import FluentIngestAPI from "./API/FluentIngest"; +import MetricsAPI from "./API/Metrics"; import { PromiseVoidFunction } from "Common/Types/FunctionTypes"; import { ClickhouseAppInstance } from "Common/Server/Infrastructure/ClickhouseDatabase"; import PostgresAppInstance from "Common/Server/Infrastructure/PostgresDatabase"; @@ -9,12 +10,14 @@ import logger from "Common/Server/Utils/Logger"; import Realtime from "Common/Server/Utils/Realtime"; import App from "Common/Server/Utils/StartServer"; import Telemetry from "Common/Server/Utils/Telemetry"; +import "./Jobs/FluentIngest/ProcessFluentIngest"; const app: ExpressApplication = Express.getExpressApp(); const APP_NAME: string = "fluent-ingest"; app.use([`/${APP_NAME}`, "/"], FluentIngestAPI); +app.use([`/${APP_NAME}`, "/"], MetricsAPI); const init: PromiseVoidFunction = async (): Promise => { try { diff --git a/FluentIngest/Services/Queue/FluentIngestQueueService.ts b/FluentIngest/Services/Queue/FluentIngestQueueService.ts new file mode 100644 index 0000000000..a889ab0948 --- /dev/null +++ b/FluentIngest/Services/Queue/FluentIngestQueueService.ts @@ -0,0 +1,74 @@ +import { TelemetryRequest } from "Common/Server/Middleware/TelemetryIngest"; +import Queue, { QueueName } from "Common/Server/Infrastructure/Queue"; +import { JSONObject } from "Common/Types/JSON"; +import OneUptimeDate from "Common/Types/Date"; +import logger from "Common/Server/Utils/Logger"; + +export interface FluentIngestJobData { + projectId: string; + requestBody: JSONObject; + requestHeaders: Record; + ingestionTimestamp: Date; +} + +export default class FluentIngestQueueService { + public static async addFluentIngestJob( + req: TelemetryRequest, + ): Promise { + try { + const jobData: FluentIngestJobData = { + projectId: req.projectId.toString(), + requestBody: req.body, + requestHeaders: req.headers as Record, + ingestionTimestamp: OneUptimeDate.getCurrentDate(), + }; + + const jobId: string = `fluent-${req.projectId?.toString()}-${OneUptimeDate.getCurrentDateAsUnixNano()}`; + + await Queue.addJob( + QueueName.FluentIngest, + jobId, + "ProcessFluentIngest", + jobData as unknown as JSONObject, + ); + + logger.debug(`Added fluent ingestion job: ${jobId}`); + } catch (error) { + logger.error(`Error adding fluent ingestion job:`); + logger.error(error); + throw error; + } + } + + public static async getQueueSize(): Promise { + return Queue.getQueueSize(QueueName.FluentIngest); + } + + public static async getQueueStats(): Promise<{ + waiting: number; + active: number; + completed: number; + failed: number; + delayed: number; + total: number; + }> { + return Queue.getQueueStats(QueueName.FluentIngest); + } + + public static getFailedJobs(options?: { + start?: number; + end?: number; + }): Promise< + Array<{ + id: string; + name: string; + data: JSONObject; + failedReason: string; + processedOn: Date | null; + finishedOn: Date | null; + attemptsMade: number; + }> + > { + return Queue.getFailedJobs(QueueName.FluentIngest, options); + } +} diff --git a/HelmChart/Public/oneuptime/templates/keda-scaledobjects.yaml b/HelmChart/Public/oneuptime/templates/keda-scaledobjects.yaml index 0f1615d227..13807a9085 100644 --- a/HelmChart/Public/oneuptime/templates/keda-scaledobjects.yaml +++ b/HelmChart/Public/oneuptime/templates/keda-scaledobjects.yaml @@ -7,4 +7,32 @@ KEDA ScaledObjects for various services {{- $metricsConfig := dict "enabled" .Values.openTelemetryIngest.keda.enabled "minReplicas" .Values.openTelemetryIngest.keda.minReplicas "maxReplicas" .Values.openTelemetryIngest.keda.maxReplicas "pollingInterval" .Values.openTelemetryIngest.keda.pollingInterval "cooldownPeriod" .Values.openTelemetryIngest.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_telemetry_queue_size" "threshold" .Values.openTelemetryIngest.keda.queueSizeThreshold "port" .Values.port.openTelemetryIngest)) }} {{- $openTelemetryIngestKedaArgs := dict "ServiceName" "open-telemetry-ingest" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.openTelemetryIngest.disableAutoscaler }} {{- include "oneuptime.kedaScaledObject" $openTelemetryIngestKedaArgs }} +{{- end }} + +{{/* Fluent Ingest KEDA ScaledObject */}} +{{- if and .Values.keda.enabled .Values.fluentIngest.keda.enabled (not .Values.fluentIngest.disableAutoscaler) }} +{{- $metricsConfig := dict "enabled" .Values.fluentIngest.keda.enabled "minReplicas" .Values.fluentIngest.keda.minReplicas "maxReplicas" .Values.fluentIngest.keda.maxReplicas "pollingInterval" .Values.fluentIngest.keda.pollingInterval "cooldownPeriod" .Values.fluentIngest.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_fluent_ingest_queue_size" "threshold" .Values.fluentIngest.keda.queueSizeThreshold "port" .Values.port.fluentIngest)) }} +{{- $fluentIngestKedaArgs := dict "ServiceName" "fluent-ingest" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.fluentIngest.disableAutoscaler }} +{{- include "oneuptime.kedaScaledObject" $fluentIngestKedaArgs }} +{{- end }} + +{{/* Incoming Request Ingest KEDA ScaledObject */}} +{{- if and .Values.keda.enabled .Values.incomingRequestIngest.keda.enabled (not .Values.incomingRequestIngest.disableAutoscaler) }} +{{- $metricsConfig := dict "enabled" .Values.incomingRequestIngest.keda.enabled "minReplicas" .Values.incomingRequestIngest.keda.minReplicas "maxReplicas" .Values.incomingRequestIngest.keda.maxReplicas "pollingInterval" .Values.incomingRequestIngest.keda.pollingInterval "cooldownPeriod" .Values.incomingRequestIngest.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_incoming_request_ingest_queue_size" "threshold" .Values.incomingRequestIngest.keda.queueSizeThreshold "port" .Values.port.incomingRequestIngest)) }} +{{- $incomingRequestIngestKedaArgs := dict "ServiceName" "incoming-request-ingest" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.incomingRequestIngest.disableAutoscaler }} +{{- include "oneuptime.kedaScaledObject" $incomingRequestIngestKedaArgs }} +{{- end }} + +{{/* Server Monitor Ingest KEDA ScaledObject */}} +{{- if and .Values.keda.enabled .Values.serverMonitorIngest.keda.enabled (not .Values.serverMonitorIngest.disableAutoscaler) }} +{{- $metricsConfig := dict "enabled" .Values.serverMonitorIngest.keda.enabled "minReplicas" .Values.serverMonitorIngest.keda.minReplicas "maxReplicas" .Values.serverMonitorIngest.keda.maxReplicas "pollingInterval" .Values.serverMonitorIngest.keda.pollingInterval "cooldownPeriod" .Values.serverMonitorIngest.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_server_monitor_ingest_queue_size" "threshold" .Values.serverMonitorIngest.keda.queueSizeThreshold "port" .Values.port.serverMonitorIngest)) }} +{{- $serverMonitorIngestKedaArgs := dict "ServiceName" "server-monitor-ingest" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.serverMonitorIngest.disableAutoscaler }} +{{- include "oneuptime.kedaScaledObject" $serverMonitorIngestKedaArgs }} +{{- end }} + +{{/* Probe Ingest KEDA ScaledObject */}} +{{- if and .Values.keda.enabled .Values.probeIngest.keda.enabled (not .Values.probeIngest.disableAutoscaler) }} +{{- $metricsConfig := dict "enabled" .Values.probeIngest.keda.enabled "minReplicas" .Values.probeIngest.keda.minReplicas "maxReplicas" .Values.probeIngest.keda.maxReplicas "pollingInterval" .Values.probeIngest.keda.pollingInterval "cooldownPeriod" .Values.probeIngest.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_probe_ingest_queue_size" "threshold" .Values.probeIngest.keda.queueSizeThreshold "port" .Values.port.probeIngest)) }} +{{- $probeIngestKedaArgs := dict "ServiceName" "probe-ingest" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.probeIngest.disableAutoscaler }} +{{- include "oneuptime.kedaScaledObject" $probeIngestKedaArgs }} {{- end }} \ No newline at end of file diff --git a/HelmChart/Public/oneuptime/values.yaml b/HelmChart/Public/oneuptime/values.yaml index 1529169a52..8f3adf020e 100644 --- a/HelmChart/Public/oneuptime/values.yaml +++ b/HelmChart/Public/oneuptime/values.yaml @@ -490,6 +490,17 @@ probeIngest: disableTelemetryCollection: false disableAutoscaler: false resources: + # KEDA autoscaling configuration based on queue metrics + keda: + enabled: false + minReplicas: 1 + maxReplicas: 100 + # Scale up when queue size exceeds this threshold + queueSizeThreshold: 100 + # Polling interval for metrics (in seconds) + pollingInterval: 30 + # Cooldown period after scaling (in seconds) + cooldownPeriod: 300 openTelemetryIngest: replicaCount: 1 @@ -513,12 +524,34 @@ fluentIngest: disableTelemetryCollection: false disableAutoscaler: false resources: + # KEDA autoscaling configuration based on queue metrics + keda: + enabled: false + minReplicas: 1 + maxReplicas: 100 + # Scale up when queue size exceeds this threshold + queueSizeThreshold: 100 + # Polling interval for metrics (in seconds) + pollingInterval: 30 + # Cooldown period after scaling (in seconds) + cooldownPeriod: 300 incomingRequestIngest: replicaCount: 1 disableTelemetryCollection: false disableAutoscaler: false resources: + # KEDA autoscaling configuration based on queue metrics + keda: + enabled: false + minReplicas: 1 + maxReplicas: 100 + # Scale up when queue size exceeds this threshold + queueSizeThreshold: 100 + # Polling interval for metrics (in seconds) + pollingInterval: 30 + # Cooldown period after scaling (in seconds) + cooldownPeriod: 300 isolatedVM: replicaCount: 1 @@ -531,6 +564,17 @@ serverMonitorIngest: disableTelemetryCollection: false disableAutoscaler: false resources: + # KEDA autoscaling configuration based on queue metrics + keda: + enabled: false + minReplicas: 1 + maxReplicas: 100 + # Scale up when queue size exceeds this threshold + queueSizeThreshold: 100 + # Polling interval for metrics (in seconds) + pollingInterval: 30 + # Cooldown period after scaling (in seconds) + cooldownPeriod: 300 slackApp: diff --git a/IncomingRequestIngest/API/IncomingRequest.ts b/IncomingRequestIngest/API/IncomingRequest.ts index 5d8f5cf29e..5e67a73d1a 100644 --- a/IncomingRequestIngest/API/IncomingRequest.ts +++ b/IncomingRequestIngest/API/IncomingRequest.ts @@ -1,12 +1,6 @@ -import HTTPMethod from "Common/Types/API/HTTPMethod"; -import OneUptimeDate from "Common/Types/Date"; import Dictionary from "Common/Types/Dictionary"; import BadDataException from "Common/Types/Exception/BadDataException"; import { JSONObject } from "Common/Types/JSON"; -import IncomingMonitorRequest from "Common/Types/Monitor/IncomingMonitor/IncomingMonitorRequest"; -import MonitorType from "Common/Types/Monitor/MonitorType"; -import ObjectID from "Common/Types/ObjectID"; -import MonitorService from "Common/Server/Services/MonitorService"; import Express, { ExpressRequest, ExpressResponse, @@ -14,10 +8,9 @@ import Express, { NextFunction, RequestHandler, } from "Common/Server/Utils/Express"; -import MonitorResourceUtil from "Common/Server/Utils/Monitor/MonitorResource"; import Response from "Common/Server/Utils/Response"; -import Monitor from "Common/Models/DatabaseModels/Monitor"; -import logger from "Common/Server/Utils/Logger"; +import IncomingRequestIngestQueueService from "../Services/Queue/IncomingRequestIngestQueueService"; +import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization"; const router: ExpressRouter = Express.getRouter(); @@ -38,63 +31,18 @@ const processIncomingRequest: RequestHandler = async ( throw new BadDataException("Invalid Secret Key"); } - const isGetRequest: boolean = req.method === "GET"; - const isPostRequest: boolean = req.method === "POST"; + // Return response immediately + Response.sendEmptySuccessResponse(req, res); - let httpMethod: HTTPMethod = HTTPMethod.GET; - - if (isGetRequest) { - httpMethod = HTTPMethod.GET; - } - - if (isPostRequest) { - httpMethod = HTTPMethod.POST; - } - - const monitor: Monitor | null = await MonitorService.findOneBy({ - query: { - incomingRequestSecretKey: new ObjectID(monitorSecretKeyAsString), - monitorType: MonitorType.IncomingRequest, - }, - select: { - _id: true, - projectId: true, - }, - props: { - isRoot: true, - }, - }); - - if (!monitor || !monitor._id) { - throw new BadDataException("Monitor not found"); - } - - if (!monitor.projectId) { - throw new BadDataException("Project not found"); - } - - const now: Date = OneUptimeDate.getCurrentDate(); - - const incomingRequest: IncomingMonitorRequest = { - projectId: monitor.projectId, - monitorId: new ObjectID(monitor._id.toString()), + // Add to queue for asynchronous processing + await IncomingRequestIngestQueueService.addIncomingRequestIngestJob({ + secretKey: monitorSecretKeyAsString, requestHeaders: requestHeaders, requestBody: requestBody, - incomingRequestReceivedAt: now, - onlyCheckForIncomingRequestReceivedAt: false, - requestMethod: httpMethod, - checkedAt: now, - }; - - // process probe response here. - MonitorResourceUtil.monitorResource(incomingRequest).catch((err: Error) => { - // do nothing. - // we don't want to throw error here. - // we just want to log the error. - logger.error(err); + requestMethod: req.method, }); - return Response.sendEmptySuccessResponse(req, res); + return; } catch (err) { return next(err); } @@ -122,4 +70,88 @@ router.get( }, ); +// Queue stats endpoint +router.get( + "/incoming-request/queue/stats", + ClusterKeyAuthorization.isAuthorizedServiceMiddleware, + async ( + req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise => { + try { + const stats: { + waiting: number; + active: number; + completed: number; + failed: number; + delayed: number; + total: number; + } = await IncomingRequestIngestQueueService.getQueueStats(); + return Response.sendJsonObjectResponse(req, res, stats); + } catch (err) { + return next(err); + } + }, +); + +// Queue size endpoint +router.get( + "/incoming-request/queue/size", + ClusterKeyAuthorization.isAuthorizedServiceMiddleware, + async ( + req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise => { + try { + const size: number = await IncomingRequestIngestQueueService.getQueueSize(); + return Response.sendJsonObjectResponse(req, res, { size }); + } catch (err) { + return next(err); + } + }, +); + +// Queue failed jobs endpoint +router.get( + "/incoming-request/queue/failed", + ClusterKeyAuthorization.isAuthorizedServiceMiddleware, + async ( + req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise => { + try { + // Parse pagination parameters from query string + const start: number = parseInt(req.query["start"] as string) || 0; + const end: number = parseInt(req.query["end"] as string) || 100; + + const failedJobs: Array<{ + id: string; + name: string; + data: any; + failedReason: string; + processedOn: Date | null; + finishedOn: Date | null; + attemptsMade: number; + }> = await IncomingRequestIngestQueueService.getFailedJobs({ + start, + end, + }); + + return Response.sendJsonObjectResponse(req, res, { + failedJobs, + pagination: { + start, + end, + count: failedJobs.length, + }, + }); + } catch (err) { + return next(err); + } + }, +); + export default router; diff --git a/IncomingRequestIngest/API/Metrics.ts b/IncomingRequestIngest/API/Metrics.ts new file mode 100644 index 0000000000..3748a67440 --- /dev/null +++ b/IncomingRequestIngest/API/Metrics.ts @@ -0,0 +1,37 @@ +import Express, { + ExpressRequest, + ExpressResponse, + ExpressRouter, + NextFunction, +} from "Common/Server/Utils/Express"; +import IncomingRequestIngestQueueService from "../Services/Queue/IncomingRequestIngestQueueService"; +// import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization"; + +const router: ExpressRouter = Express.getRouter(); + +/** + * JSON metrics endpoint for KEDA autoscaling + * Returns queue size as JSON for KEDA metrics-api scaler + */ +router.get( + "/metrics/queue-size", + // ClusterKeyAuthorization.isAuthorizedServiceMiddleware, // Temporarily disabled for KEDA debugging + async ( + _req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise => { + try { + const queueSize: number = await IncomingRequestIngestQueueService.getQueueSize(); + + res.setHeader("Content-Type", "application/json"); + res.status(200).json({ + queueSize: queueSize, + }); + } catch (err) { + return next(err); + } + }, +); + +export default router; diff --git a/IncomingRequestIngest/Index.ts b/IncomingRequestIngest/Index.ts index dc56787e24..170b52e844 100644 --- a/IncomingRequestIngest/Index.ts +++ b/IncomingRequestIngest/Index.ts @@ -1,4 +1,5 @@ import IncomingRequestAPI from "./API/IncomingRequest"; +import MetricsAPI from "./API/Metrics"; import { PromiseVoidFunction } from "Common/Types/FunctionTypes"; import { ClickhouseAppInstance } from "Common/Server/Infrastructure/ClickhouseDatabase"; import PostgresAppInstance from "Common/Server/Infrastructure/PostgresDatabase"; @@ -9,6 +10,7 @@ import logger from "Common/Server/Utils/Logger"; import Realtime from "Common/Server/Utils/Realtime"; import App from "Common/Server/Utils/StartServer"; import Telemetry from "Common/Server/Utils/Telemetry"; +import "./Jobs/IncomingRequestIngest/ProcessIncomingRequestIngest"; import "ejs"; const app: ExpressApplication = Express.getExpressApp(); @@ -16,6 +18,7 @@ const app: ExpressApplication = Express.getExpressApp(); const APP_NAME: string = "incoming-request-ingest"; app.use([`/${APP_NAME}`, "/"], IncomingRequestAPI); +app.use([`/${APP_NAME}`, "/"], MetricsAPI); const init: PromiseVoidFunction = async (): Promise => { try { diff --git a/IncomingRequestIngest/Jobs/IncomingRequestIngest/ProcessIncomingRequestIngest.ts b/IncomingRequestIngest/Jobs/IncomingRequestIngest/ProcessIncomingRequestIngest.ts new file mode 100644 index 0000000000..725090ce9e --- /dev/null +++ b/IncomingRequestIngest/Jobs/IncomingRequestIngest/ProcessIncomingRequestIngest.ts @@ -0,0 +1,109 @@ +import { IncomingRequestIngestJobData } from "../../Services/Queue/IncomingRequestIngestQueueService"; +import logger from "Common/Server/Utils/Logger"; +import { QueueJob, QueueName } from "Common/Server/Infrastructure/Queue"; +import QueueWorker from "Common/Server/Infrastructure/QueueWorker"; +import HTTPMethod from "Common/Types/API/HTTPMethod"; +import OneUptimeDate from "Common/Types/Date"; +import Dictionary from "Common/Types/Dictionary"; +import BadDataException from "Common/Types/Exception/BadDataException"; +import { JSONObject } from "Common/Types/JSON"; +import IncomingMonitorRequest from "Common/Types/Monitor/IncomingMonitor/IncomingMonitorRequest"; +import MonitorType from "Common/Types/Monitor/MonitorType"; +import ObjectID from "Common/Types/ObjectID"; +import MonitorService from "Common/Server/Services/MonitorService"; +import MonitorResourceUtil from "Common/Server/Utils/Monitor/MonitorResource"; +import Monitor from "Common/Models/DatabaseModels/Monitor"; + +// Set up the worker for processing incoming request ingest queue +QueueWorker.getWorker( + QueueName.IncomingRequestIngest, + async (job: QueueJob): Promise => { + logger.debug(`Processing incoming request ingestion job: ${job.name}`); + + try { + const jobData: IncomingRequestIngestJobData = + job.data as IncomingRequestIngestJobData; + + await processIncomingRequestFromQueue(jobData); + + logger.debug( + `Successfully processed incoming request ingestion job: ${job.name}`, + ); + } catch (error) { + logger.error(`Error processing incoming request ingestion job:`); + logger.error(error); + throw error; + } + }, + { concurrency: 20 }, // Process up to 20 incoming request ingest jobs concurrently +); + +async function processIncomingRequestFromQueue( + jobData: IncomingRequestIngestJobData, +): Promise { + const requestHeaders: Dictionary = jobData.requestHeaders; + const requestBody: string | JSONObject = jobData.requestBody; + const monitorSecretKeyAsString: string = jobData.secretKey; + + if (!monitorSecretKeyAsString) { + throw new BadDataException("Invalid Secret Key"); + } + + const isGetRequest: boolean = jobData.requestMethod === "GET"; + const isPostRequest: boolean = jobData.requestMethod === "POST"; + + let httpMethod: HTTPMethod = HTTPMethod.GET; + + if (isGetRequest) { + httpMethod = HTTPMethod.GET; + } + + if (isPostRequest) { + httpMethod = HTTPMethod.POST; + } + + const monitor: Monitor | null = await MonitorService.findOneBy({ + query: { + incomingRequestSecretKey: new ObjectID(monitorSecretKeyAsString), + monitorType: MonitorType.IncomingRequest, + }, + select: { + _id: true, + projectId: true, + }, + props: { + isRoot: true, + }, + }); + + if (!monitor || !monitor._id) { + throw new BadDataException("Monitor not found"); + } + + if (!monitor.projectId) { + throw new BadDataException("Project not found"); + } + + const now: Date = OneUptimeDate.getCurrentDate(); + + const incomingRequest: IncomingMonitorRequest = { + projectId: monitor.projectId, + monitorId: new ObjectID(monitor._id.toString()), + requestHeaders: requestHeaders, + requestBody: requestBody, + incomingRequestReceivedAt: now, + onlyCheckForIncomingRequestReceivedAt: false, + requestMethod: httpMethod, + checkedAt: now, + }; + + // process probe response here. + MonitorResourceUtil.monitorResource(incomingRequest).catch((err: Error) => { + // do nothing. + // we don't want to throw error here. + // we just want to log the error. + logger.error(err); + }); +} + +logger.debug("Incoming request ingest worker initialized"); diff --git a/IncomingRequestIngest/Services/Queue/IncomingRequestIngestQueueService.ts b/IncomingRequestIngest/Services/Queue/IncomingRequestIngestQueueService.ts new file mode 100644 index 0000000000..590e4a0f34 --- /dev/null +++ b/IncomingRequestIngest/Services/Queue/IncomingRequestIngestQueueService.ts @@ -0,0 +1,81 @@ +import Queue, { QueueName } from "Common/Server/Infrastructure/Queue"; +import { JSONObject } from "Common/Types/JSON"; +import OneUptimeDate from "Common/Types/Date"; +import logger from "Common/Server/Utils/Logger"; +import Dictionary from "Common/Types/Dictionary"; + +export interface IncomingRequestIngestJobData { + secretKey: string; + requestHeaders: Dictionary; + requestBody: string | JSONObject; + requestMethod: string; + ingestionTimestamp: Date; +} + +export default class IncomingRequestIngestQueueService { + public static async addIncomingRequestIngestJob( + data: { + secretKey: string; + requestHeaders: Dictionary; + requestBody: string | JSONObject; + requestMethod: string; + }, + ): Promise { + try { + const jobData: IncomingRequestIngestJobData = { + secretKey: data.secretKey, + requestHeaders: data.requestHeaders, + requestBody: data.requestBody, + requestMethod: data.requestMethod, + ingestionTimestamp: OneUptimeDate.getCurrentDate(), + }; + + const jobId: string = `incoming-request-${data.secretKey}-${OneUptimeDate.getCurrentDateAsUnixNano()}`; + + await Queue.addJob( + QueueName.IncomingRequestIngest, + jobId, + "ProcessIncomingRequestIngest", + jobData as unknown as JSONObject, + ); + + logger.debug(`Added incoming request ingestion job: ${jobId}`); + } catch (error) { + logger.error(`Error adding incoming request ingestion job:`); + logger.error(error); + throw error; + } + } + + public static async getQueueSize(): Promise { + return Queue.getQueueSize(QueueName.IncomingRequestIngest); + } + + public static async getQueueStats(): Promise<{ + waiting: number; + active: number; + completed: number; + failed: number; + delayed: number; + total: number; + }> { + return Queue.getQueueStats(QueueName.IncomingRequestIngest); + } + + public static getFailedJobs(options?: { + start?: number; + end?: number; + }): Promise< + Array<{ + id: string; + name: string; + data: JSONObject; + failedReason: string; + processedOn: Date | null; + finishedOn: Date | null; + attemptsMade: number; + }> + > { + return Queue.getFailedJobs(QueueName.IncomingRequestIngest, options); + } +} diff --git a/ProbeIngest/API/Metrics.ts b/ProbeIngest/API/Metrics.ts new file mode 100644 index 0000000000..e957a1a010 --- /dev/null +++ b/ProbeIngest/API/Metrics.ts @@ -0,0 +1,37 @@ +import Express, { + ExpressRequest, + ExpressResponse, + ExpressRouter, + NextFunction, +} from "Common/Server/Utils/Express"; +import ProbeIngestQueueService from "../Services/Queue/ProbeIngestQueueService"; +// import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization"; + +const router: ExpressRouter = Express.getRouter(); + +/** + * JSON metrics endpoint for KEDA autoscaling + * Returns queue size as JSON for KEDA metrics-api scaler + */ +router.get( + "/metrics/queue-size", + // ClusterKeyAuthorization.isAuthorizedServiceMiddleware, // Temporarily disabled for KEDA debugging + async ( + _req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise => { + try { + const queueSize: number = await ProbeIngestQueueService.getQueueSize(); + + res.setHeader("Content-Type", "application/json"); + res.status(200).json({ + queueSize: queueSize, + }); + } catch (err) { + return next(err); + } + }, +); + +export default router; diff --git a/ProbeIngest/API/Probe.ts b/ProbeIngest/API/Probe.ts index ee1176b258..e7f7378713 100644 --- a/ProbeIngest/API/Probe.ts +++ b/ProbeIngest/API/Probe.ts @@ -18,13 +18,12 @@ import Express, { NextFunction, } from "Common/Server/Utils/Express"; import logger from "Common/Server/Utils/Logger"; -import MonitorResourceUtil from "Common/Server/Utils/Monitor/MonitorResource"; import Response from "Common/Server/Utils/Response"; import GlobalConfig from "Common/Models/DatabaseModels/GlobalConfig"; import Probe from "Common/Models/DatabaseModels/Probe"; import User from "Common/Models/DatabaseModels/User"; -import MonitorTestService from "Common/Server/Services/MonitorTestService"; -import OneUptimeDate from "Common/Types/Date"; +import ProbeIngestQueueService from "../Services/Queue/ProbeIngestQueueService"; +import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization"; const router: ExpressRouter = Express.getRouter(); @@ -255,17 +254,18 @@ router.post( ); } - // this is when the resource was ingested. - probeResponse.ingestedAt = OneUptimeDate.getCurrentDate(); - - MonitorResourceUtil.monitorResource(probeResponse).catch((err: Error) => { - logger.error("Error in monitor resource"); - logger.error(err); - }); - - return Response.sendJsonObjectResponse(req, res, { + // Return response immediately + Response.sendJsonObjectResponse(req, res, { result: "processing", }); + + // Add to queue for asynchronous processing + await ProbeIngestQueueService.addProbeIngestJob({ + probeMonitorResponse: req.body, + jobType: "probe-response", + }); + + return; } catch (err) { return next(err); } @@ -303,28 +303,101 @@ router.post( ); } - probeResponse.ingestedAt = OneUptimeDate.getCurrentDate(); + // Return response immediately + Response.sendEmptySuccessResponse(req, res); - // save the probe response to the monitor test. - - await MonitorTestService.updateOneById({ - id: testId, - data: { - monitorStepProbeResponse: { - [probeResponse.monitorStepId.toString()]: { - ...JSON.parse(JSON.stringify(probeResponse)), - monitoredAt: OneUptimeDate.getCurrentDate(), - }, - } as any, - }, - props: { - isRoot: true, - }, + // Add to queue for asynchronous processing + await ProbeIngestQueueService.addProbeIngestJob({ + probeMonitorResponse: req.body, + jobType: "monitor-test", + testId: testId.toString(), }); - // send success response. + return; + } catch (err) { + return next(err); + } + }, +); - return Response.sendEmptySuccessResponse(req, res); +// Queue stats endpoint +router.get( + "/probe/queue/stats", + ClusterKeyAuthorization.isAuthorizedServiceMiddleware, + async ( + req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise => { + try { + const stats: { + waiting: number; + active: number; + completed: number; + failed: number; + delayed: number; + total: number; + } = await ProbeIngestQueueService.getQueueStats(); + return Response.sendJsonObjectResponse(req, res, stats); + } catch (err) { + return next(err); + } + }, +); + +// Queue size endpoint +router.get( + "/probe/queue/size", + ClusterKeyAuthorization.isAuthorizedServiceMiddleware, + async ( + req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise => { + try { + const size: number = await ProbeIngestQueueService.getQueueSize(); + return Response.sendJsonObjectResponse(req, res, { size }); + } catch (err) { + return next(err); + } + }, +); + +// Queue failed jobs endpoint +router.get( + "/probe/queue/failed", + ClusterKeyAuthorization.isAuthorizedServiceMiddleware, + async ( + req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise => { + try { + // Parse pagination parameters from query string + const start: number = parseInt(req.query["start"] as string) || 0; + const end: number = parseInt(req.query["end"] as string) || 100; + + const failedJobs: Array<{ + id: string; + name: string; + data: any; + failedReason: string; + processedOn: Date | null; + finishedOn: Date | null; + attemptsMade: number; + }> = await ProbeIngestQueueService.getFailedJobs({ + start, + end, + }); + + return Response.sendJsonObjectResponse(req, res, { + failedJobs, + pagination: { + start, + end, + count: failedJobs.length, + }, + }); } catch (err) { return next(err); } diff --git a/ProbeIngest/Index.ts b/ProbeIngest/Index.ts index e0aa657ba3..1ed8e6fcd6 100644 --- a/ProbeIngest/Index.ts +++ b/ProbeIngest/Index.ts @@ -1,6 +1,7 @@ import MonitorAPI from "./API/Monitor"; import ProbeIngest from "./API/Probe"; import RegisterAPI from "./API/Register"; +import MetricsAPI from "./API/Metrics"; import { PromiseVoidFunction } from "Common/Types/FunctionTypes"; import { ClickhouseAppInstance } from "Common/Server/Infrastructure/ClickhouseDatabase"; import PostgresAppInstance from "Common/Server/Infrastructure/PostgresDatabase"; @@ -11,6 +12,7 @@ import logger from "Common/Server/Utils/Logger"; import Realtime from "Common/Server/Utils/Realtime"; import App from "Common/Server/Utils/StartServer"; import Telemetry from "Common/Server/Utils/Telemetry"; +import "./Jobs/ProbeIngest/ProcessProbeIngest"; import "ejs"; const app: ExpressApplication = Express.getExpressApp(); @@ -21,6 +23,7 @@ const APP_NAME: string = "probe-ingest"; app.use([`/${APP_NAME}`, "/ingestor", "/"], RegisterAPI); app.use([`/${APP_NAME}`, "/ingestor", "/"], MonitorAPI); app.use([`/${APP_NAME}`, "/ingestor", "/"], ProbeIngest); +app.use([`/${APP_NAME}`, "/"], MetricsAPI); const init: PromiseVoidFunction = async (): Promise => { try { diff --git a/ProbeIngest/Jobs/ProbeIngest/ProcessProbeIngest.ts b/ProbeIngest/Jobs/ProbeIngest/ProcessProbeIngest.ts new file mode 100644 index 0000000000..0c439ac7c9 --- /dev/null +++ b/ProbeIngest/Jobs/ProbeIngest/ProcessProbeIngest.ts @@ -0,0 +1,87 @@ +import { ProbeIngestJobData } from "../../Services/Queue/ProbeIngestQueueService"; +import logger from "Common/Server/Utils/Logger"; +import { QueueJob, QueueName } from "Common/Server/Infrastructure/Queue"; +import QueueWorker from "Common/Server/Infrastructure/QueueWorker"; +import BadDataException from "Common/Types/Exception/BadDataException"; +import JSONFunctions from "Common/Types/JSONFunctions"; +import ObjectID from "Common/Types/ObjectID"; +import MonitorResourceUtil from "Common/Server/Utils/Monitor/MonitorResource"; +import OneUptimeDate from "Common/Types/Date"; +import MonitorTestService from "Common/Server/Services/MonitorTestService"; +import ProbeMonitorResponse from "Common/Types/Probe/ProbeMonitorResponse"; +import { JSONObject } from "Common/Types/JSON"; + +// Set up the worker for processing probe ingest queue +QueueWorker.getWorker( + QueueName.ProbeIngest, + async (job: QueueJob): Promise => { + logger.debug(`Processing probe ingestion job: ${job.name}`); + + try { + const jobData: ProbeIngestJobData = job.data as ProbeIngestJobData; + + await processProbeFromQueue(jobData); + + logger.debug( + `Successfully processed probe ingestion job: ${job.name}`, + ); + } catch (error) { + logger.error(`Error processing probe ingestion job:`); + logger.error(error); + throw error; + } + }, + { concurrency: 20 }, // Process up to 20 probe ingest jobs concurrently +); + +async function processProbeFromQueue( + jobData: ProbeIngestJobData, +): Promise { + const probeResponse: ProbeMonitorResponse = JSONFunctions.deserialize( + jobData.probeMonitorResponse["probeMonitorResponse"] as JSONObject, + ) as any; + + if (!probeResponse) { + throw new BadDataException("ProbeMonitorResponse not found"); + } + + // this is when the resource was ingested. + probeResponse.ingestedAt = OneUptimeDate.getCurrentDate(); + + if (jobData.jobType === "probe-response") { + // Handle regular probe response + MonitorResourceUtil.monitorResource(probeResponse).catch((err: Error) => { + logger.error("Error in monitor resource"); + logger.error(err); + }); + } else if (jobData.jobType === "monitor-test" && jobData.testId) { + // Handle monitor test response + const testId: ObjectID = new ObjectID(jobData.testId); + + if (!testId) { + throw new BadDataException("TestId not found"); + } + + probeResponse.ingestedAt = OneUptimeDate.getCurrentDate(); + + // save the probe response to the monitor test. + await MonitorTestService.updateOneById({ + id: testId, + data: { + monitorStepProbeResponse: { + [probeResponse.monitorStepId.toString()]: { + ...JSON.parse(JSON.stringify(probeResponse)), + monitoredAt: OneUptimeDate.getCurrentDate(), + }, + } as any, + }, + props: { + isRoot: true, + }, + }); + } else { + throw new BadDataException(`Invalid job type: ${jobData.jobType}`); + } +} + +logger.debug("Probe ingest worker initialized"); diff --git a/ProbeIngest/Services/Queue/ProbeIngestQueueService.ts b/ProbeIngest/Services/Queue/ProbeIngestQueueService.ts new file mode 100644 index 0000000000..387e4b84b0 --- /dev/null +++ b/ProbeIngest/Services/Queue/ProbeIngestQueueService.ts @@ -0,0 +1,77 @@ +import Queue, { QueueName } from "Common/Server/Infrastructure/Queue"; +import { JSONObject } from "Common/Types/JSON"; +import OneUptimeDate from "Common/Types/Date"; +import logger from "Common/Server/Utils/Logger"; + +export interface ProbeIngestJobData { + probeMonitorResponse: JSONObject; + jobType: "probe-response" | "monitor-test"; + testId?: string | undefined; + ingestionTimestamp: Date; +} + +export default class ProbeIngestQueueService { + public static async addProbeIngestJob( + data: { + probeMonitorResponse: JSONObject; + jobType: "probe-response" | "monitor-test"; + testId?: string; + }, + ): Promise { + try { + const jobData: ProbeIngestJobData = { + probeMonitorResponse: data.probeMonitorResponse, + jobType: data.jobType, + testId: data.testId, + ingestionTimestamp: OneUptimeDate.getCurrentDate(), + }; + + const jobId: string = `probe-${data.jobType}-${data.testId || "general"}-${OneUptimeDate.getCurrentDateAsUnixNano()}`; + + await Queue.addJob( + QueueName.ProbeIngest, + jobId, + "ProcessProbeIngest", + jobData as unknown as JSONObject, + ); + + logger.debug(`Added probe ingestion job: ${jobId}`); + } catch (error) { + logger.error(`Error adding probe ingestion job:`); + logger.error(error); + throw error; + } + } + + public static async getQueueSize(): Promise { + return Queue.getQueueSize(QueueName.ProbeIngest); + } + + public static async getQueueStats(): Promise<{ + waiting: number; + active: number; + completed: number; + failed: number; + delayed: number; + total: number; + }> { + return Queue.getQueueStats(QueueName.ProbeIngest); + } + + public static getFailedJobs(options?: { + start?: number; + end?: number; + }): Promise< + Array<{ + id: string; + name: string; + data: JSONObject; + failedReason: string; + processedOn: Date | null; + finishedOn: Date | null; + attemptsMade: number; + }> + > { + return Queue.getFailedJobs(QueueName.ProbeIngest, options); + } +} diff --git a/ServerMonitorIngest/API/Metrics.ts b/ServerMonitorIngest/API/Metrics.ts new file mode 100644 index 0000000000..317f7625da --- /dev/null +++ b/ServerMonitorIngest/API/Metrics.ts @@ -0,0 +1,37 @@ +import Express, { + ExpressRequest, + ExpressResponse, + ExpressRouter, + NextFunction, +} from "Common/Server/Utils/Express"; +import ServerMonitorIngestQueueService from "../Services/Queue/ServerMonitorIngestQueueService"; +// import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization"; + +const router: ExpressRouter = Express.getRouter(); + +/** + * JSON metrics endpoint for KEDA autoscaling + * Returns queue size as JSON for KEDA metrics-api scaler + */ +router.get( + "/metrics/queue-size", + // ClusterKeyAuthorization.isAuthorizedServiceMiddleware, // Temporarily disabled for KEDA debugging + async ( + _req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise => { + try { + const queueSize: number = await ServerMonitorIngestQueueService.getQueueSize(); + + res.setHeader("Content-Type", "application/json"); + res.status(200).json({ + queueSize: queueSize, + }); + } catch (err) { + return next(err); + } + }, +); + +export default router; diff --git a/ServerMonitorIngest/API/ServerMonitor.ts b/ServerMonitorIngest/API/ServerMonitor.ts index b517c17284..55f9a8df88 100644 --- a/ServerMonitorIngest/API/ServerMonitor.ts +++ b/ServerMonitorIngest/API/ServerMonitor.ts @@ -1,8 +1,6 @@ import BadDataException from "Common/Types/Exception/BadDataException"; import { JSONObject } from "Common/Types/JSON"; -import JSONFunctions from "Common/Types/JSONFunctions"; import MonitorType from "Common/Types/Monitor/MonitorType"; -import ServerMonitorResponse from "Common/Types/Monitor/ServerMonitor/ServerMonitorResponse"; import ObjectID from "Common/Types/ObjectID"; import MonitorService from "Common/Server/Services/MonitorService"; import Express, { @@ -11,11 +9,11 @@ import Express, { ExpressRouter, NextFunction, } from "Common/Server/Utils/Express"; -import MonitorResourceUtil from "Common/Server/Utils/Monitor/MonitorResource"; import Response from "Common/Server/Utils/Response"; import Monitor from "Common/Models/DatabaseModels/Monitor"; -import OneUptimeDate from "Common/Types/Date"; import ProjectService from "Common/Server/Services/ProjectService"; +import ServerMonitorIngestQueueService from "../Services/Queue/ServerMonitorIngestQueueService"; +import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization"; const router: ExpressRouter = Express.getRouter(); @@ -77,52 +75,100 @@ router.post( throw new BadDataException("Invalid Secret Key"); } - const monitor: Monitor | null = await MonitorService.findOneBy({ - query: { - serverMonitorSecretKey: new ObjectID(monitorSecretKeyAsString), - monitorType: MonitorType.Server, - ...MonitorService.getEnabledMonitorQuery(), - project: { - ...ProjectService.getActiveProjectStatusQuery(), - }, - }, - select: { - _id: true, - }, - props: { - isRoot: true, - }, - }); - - if (!monitor) { - throw new BadDataException("Monitor not found"); - } - // return the response early. Response.sendEmptySuccessResponse(req, res); - // now process this request. + // Add to queue for asynchronous processing + await ServerMonitorIngestQueueService.addServerMonitorIngestJob({ + secretKey: monitorSecretKeyAsString, + serverMonitorResponse: req.body as JSONObject, + }); - const serverMonitorResponse: ServerMonitorResponse = - JSONFunctions.deserialize( - req.body["serverMonitorResponse"] as JSONObject, - ) as any; + return; + } catch (err) { + return next(err); + } + }, +); - if (!serverMonitorResponse) { - throw new BadDataException("Invalid Server Monitor Response"); - } +// Queue stats endpoint +router.get( + "/server-monitor/queue/stats", + ClusterKeyAuthorization.isAuthorizedServiceMiddleware, + async ( + req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise => { + try { + const stats: { + waiting: number; + active: number; + completed: number; + failed: number; + delayed: number; + total: number; + } = await ServerMonitorIngestQueueService.getQueueStats(); + return Response.sendJsonObjectResponse(req, res, stats); + } catch (err) { + return next(err); + } + }, +); - if (!monitor.id) { - throw new BadDataException("Monitor id not found"); - } +// Queue size endpoint +router.get( + "/server-monitor/queue/size", + ClusterKeyAuthorization.isAuthorizedServiceMiddleware, + async ( + req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise => { + try { + const size: number = await ServerMonitorIngestQueueService.getQueueSize(); + return Response.sendJsonObjectResponse(req, res, { size }); + } catch (err) { + return next(err); + } + }, +); - serverMonitorResponse.monitorId = monitor.id; +// Queue failed jobs endpoint +router.get( + "/server-monitor/queue/failed", + ClusterKeyAuthorization.isAuthorizedServiceMiddleware, + async ( + req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise => { + try { + // Parse pagination parameters from query string + const start: number = parseInt(req.query["start"] as string) || 0; + const end: number = parseInt(req.query["end"] as string) || 100; - serverMonitorResponse.requestReceivedAt = OneUptimeDate.getCurrentDate(); - serverMonitorResponse.timeNow = OneUptimeDate.getCurrentDate(); + const failedJobs: Array<{ + id: string; + name: string; + data: any; + failedReason: string; + processedOn: Date | null; + finishedOn: Date | null; + attemptsMade: number; + }> = await ServerMonitorIngestQueueService.getFailedJobs({ + start, + end, + }); - // process probe response here. - await MonitorResourceUtil.monitorResource(serverMonitorResponse); + return Response.sendJsonObjectResponse(req, res, { + failedJobs, + pagination: { + start, + end, + count: failedJobs.length, + }, + }); } catch (err) { return next(err); } diff --git a/ServerMonitorIngest/Index.ts b/ServerMonitorIngest/Index.ts index 6be5844bae..71135db1f5 100644 --- a/ServerMonitorIngest/Index.ts +++ b/ServerMonitorIngest/Index.ts @@ -1,4 +1,5 @@ import ServerMonitorAPI from "./API/ServerMonitor"; +import MetricsAPI from "./API/Metrics"; import { PromiseVoidFunction } from "Common/Types/FunctionTypes"; import { ClickhouseAppInstance } from "Common/Server/Infrastructure/ClickhouseDatabase"; import PostgresAppInstance from "Common/Server/Infrastructure/PostgresDatabase"; @@ -9,12 +10,14 @@ import logger from "Common/Server/Utils/Logger"; import Realtime from "Common/Server/Utils/Realtime"; import App from "Common/Server/Utils/StartServer"; import Telemetry from "Common/Server/Utils/Telemetry"; +import "./Jobs/ServerMonitorIngest/ProcessServerMonitorIngest"; const app: ExpressApplication = Express.getExpressApp(); const APP_NAME: string = "server-monitor-ingest"; app.use([`/${APP_NAME}`, "/"], ServerMonitorAPI); +app.use([`/${APP_NAME}`, "/"], MetricsAPI); const init: PromiseVoidFunction = async (): Promise => { try { diff --git a/ServerMonitorIngest/Jobs/ServerMonitorIngest/ProcessServerMonitorIngest.ts b/ServerMonitorIngest/Jobs/ServerMonitorIngest/ProcessServerMonitorIngest.ts new file mode 100644 index 0000000000..5bb76333d2 --- /dev/null +++ b/ServerMonitorIngest/Jobs/ServerMonitorIngest/ProcessServerMonitorIngest.ts @@ -0,0 +1,92 @@ +import { ServerMonitorIngestJobData } from "../../Services/Queue/ServerMonitorIngestQueueService"; +import logger from "Common/Server/Utils/Logger"; +import { QueueJob, QueueName } from "Common/Server/Infrastructure/Queue"; +import QueueWorker from "Common/Server/Infrastructure/QueueWorker"; +import BadDataException from "Common/Types/Exception/BadDataException"; +import { JSONObject } from "Common/Types/JSON"; +import JSONFunctions from "Common/Types/JSONFunctions"; +import MonitorType from "Common/Types/Monitor/MonitorType"; +import ServerMonitorResponse from "Common/Types/Monitor/ServerMonitor/ServerMonitorResponse"; +import ObjectID from "Common/Types/ObjectID"; +import MonitorService from "Common/Server/Services/MonitorService"; +import MonitorResourceUtil from "Common/Server/Utils/Monitor/MonitorResource"; +import Monitor from "Common/Models/DatabaseModels/Monitor"; +import OneUptimeDate from "Common/Types/Date"; +import ProjectService from "Common/Server/Services/ProjectService"; + +// Set up the worker for processing server monitor ingest queue +QueueWorker.getWorker( + QueueName.ServerMonitorIngest, + async (job: QueueJob): Promise => { + logger.debug(`Processing server monitor ingestion job: ${job.name}`); + + try { + const jobData: ServerMonitorIngestJobData = + job.data as ServerMonitorIngestJobData; + + await processServerMonitorFromQueue(jobData); + + logger.debug( + `Successfully processed server monitor ingestion job: ${job.name}`, + ); + } catch (error) { + logger.error(`Error processing server monitor ingestion job:`); + logger.error(error); + throw error; + } + }, + { concurrency: 20 }, // Process up to 20 server monitor ingest jobs concurrently +); + +async function processServerMonitorFromQueue( + jobData: ServerMonitorIngestJobData, +): Promise { + const monitorSecretKeyAsString: string = jobData.secretKey; + + if (!monitorSecretKeyAsString) { + throw new BadDataException("Invalid Secret Key"); + } + + const monitor: Monitor | null = await MonitorService.findOneBy({ + query: { + serverMonitorSecretKey: new ObjectID(monitorSecretKeyAsString), + monitorType: MonitorType.Server, + ...MonitorService.getEnabledMonitorQuery(), + project: { + ...ProjectService.getActiveProjectStatusQuery(), + }, + }, + select: { + _id: true, + }, + props: { + isRoot: true, + }, + }); + + if (!monitor) { + throw new BadDataException("Monitor not found"); + } + + const serverMonitorResponse: ServerMonitorResponse = + JSONFunctions.deserialize( + jobData.serverMonitorResponse["serverMonitorResponse"] as JSONObject, + ) as any; + + if (!serverMonitorResponse) { + throw new BadDataException("Invalid Server Monitor Response"); + } + + if (!monitor.id) { + throw new BadDataException("Monitor id not found"); + } + + serverMonitorResponse.monitorId = monitor.id; + serverMonitorResponse.requestReceivedAt = OneUptimeDate.getCurrentDate(); + serverMonitorResponse.timeNow = OneUptimeDate.getCurrentDate(); + + // process probe response here. + await MonitorResourceUtil.monitorResource(serverMonitorResponse); +} + +logger.debug("Server monitor ingest worker initialized"); diff --git a/ServerMonitorIngest/Services/Queue/ServerMonitorIngestQueueService.ts b/ServerMonitorIngest/Services/Queue/ServerMonitorIngestQueueService.ts new file mode 100644 index 0000000000..dfae524eed --- /dev/null +++ b/ServerMonitorIngest/Services/Queue/ServerMonitorIngestQueueService.ts @@ -0,0 +1,74 @@ +import Queue, { QueueName } from "Common/Server/Infrastructure/Queue"; +import { JSONObject } from "Common/Types/JSON"; +import OneUptimeDate from "Common/Types/Date"; +import logger from "Common/Server/Utils/Logger"; + +export interface ServerMonitorIngestJobData { + secretKey: string; + serverMonitorResponse: JSONObject; + ingestionTimestamp: Date; +} + +export default class ServerMonitorIngestQueueService { + public static async addServerMonitorIngestJob( + data: { + secretKey: string; + serverMonitorResponse: JSONObject; + }, + ): Promise { + try { + const jobData: ServerMonitorIngestJobData = { + secretKey: data.secretKey, + serverMonitorResponse: data.serverMonitorResponse, + ingestionTimestamp: OneUptimeDate.getCurrentDate(), + }; + + const jobId: string = `server-monitor-${data.secretKey}-${OneUptimeDate.getCurrentDateAsUnixNano()}`; + + await Queue.addJob( + QueueName.ServerMonitorIngest, + jobId, + "ProcessServerMonitorIngest", + jobData as unknown as JSONObject, + ); + + logger.debug(`Added server monitor ingestion job: ${jobId}`); + } catch (error) { + logger.error(`Error adding server monitor ingestion job:`); + logger.error(error); + throw error; + } + } + + public static async getQueueSize(): Promise { + return Queue.getQueueSize(QueueName.ServerMonitorIngest); + } + + public static async getQueueStats(): Promise<{ + waiting: number; + active: number; + completed: number; + failed: number; + delayed: number; + total: number; + }> { + return Queue.getQueueStats(QueueName.ServerMonitorIngest); + } + + public static getFailedJobs(options?: { + start?: number; + end?: number; + }): Promise< + Array<{ + id: string; + name: string; + data: JSONObject; + failedReason: string; + processedOn: Date | null; + finishedOn: Date | null; + attemptsMade: number; + }> + > { + return Queue.getFailedJobs(QueueName.ServerMonitorIngest, options); + } +}