feat: Add KEDA autoscaling configuration for various ingests

- Introduced KEDA autoscaling configuration in values.yaml for probeIngest, fluentIngest, incomingRequestIngest, and serverMonitorIngest.
- Added endpoints for queue statistics, size, and failed jobs in IncomingRequestIngest and ProbeIngest APIs.
- Implemented asynchronous processing of incoming requests and probes using job queues.
- Created Metrics API for KEDA metrics integration in IncomingRequestIngest, ProbeIngest, and ServerMonitorIngest.
- Refactored IncomingRequest and Probe APIs to utilize queue services for processing.
- Added job processing logic for incoming requests and probes in respective job files.
- Implemented queue service classes for managing job addition and retrieval of queue statistics.
This commit is contained in:
Simon Larsen
2025-08-01 10:29:02 +01:00
parent 15a68472b0
commit 896020b93b
22 changed files with 1209 additions and 225 deletions

View File

@@ -17,6 +17,10 @@ export enum QueueName {
Workflow = "Workflow",
Worker = "Worker",
Telemetry = "Telemetry",
FluentIngest = "FluentIngest",
IncomingRequestIngest = "IncomingRequestIngest",
ServerMonitorIngest = "ServerMonitorIngest",
ProbeIngest = "ProbeIngest",
}
export type QueueJob = Job;

View File

@@ -1,23 +1,17 @@
import TelemetryIngest, {
TelemetryRequest,
} from "Common/Server/Middleware/TelemetryIngest";
import OneUptimeDate from "Common/Types/Date";
import { JSONObject } from "Common/Types/JSON";
import ProductType from "Common/Types/MeteredPlan/ProductType";
import LogService from "Common/Server/Services/LogService";
import Express, {
ExpressRequest,
ExpressResponse,
ExpressRouter,
NextFunction,
} from "Common/Server/Utils/Express";
import logger from "Common/Server/Utils/Logger";
import Response from "Common/Server/Utils/Response";
import Log from "Common/Models/AnalyticsModels/Log";
import LogSeverity from "Common/Types/Log/LogSeverity";
import OTelIngestService from "Common/Server/Services/OpenTelemetryIngestService";
import ObjectID from "Common/Types/ObjectID";
import JSONFunctions from "Common/Types/JSONFunctions";
import FluentIngestQueueService from "../Services/Queue/FluentIngestQueueService";
import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
import BadRequestException from "Common/Types/Exception/BadRequestException";
export class FluentRequestMiddleware {
public static async getProductType(
@@ -46,96 +40,105 @@ router.post(
next: NextFunction,
): Promise<void> => {
try {
logger.debug("Fluent ProbeIngest API called");
const dbLogs: Array<Log> = [];
let logItems: Array<JSONObject | string> | JSONObject = req.body as
| Array<JSONObject | string>
| JSONObject;
let oneuptimeServiceName: string | string[] | undefined =
req.headers["x-oneuptime-service-name"];
if (!oneuptimeServiceName) {
oneuptimeServiceName = "Unknown Service";
if (!(req as TelemetryRequest).projectId) {
throw new BadRequestException(
"Invalid request - projectId not found in request.",
);
}
const telemetryService: {
serviceId: ObjectID;
dataRententionInDays: number;
} = await OTelIngestService.telemetryServiceFromName({
serviceName: oneuptimeServiceName as string,
projectId: (req as TelemetryRequest).projectId,
req.body = req.body.toJSON ? req.body.toJSON() : req.body;
// Return response immediately
Response.sendEmptySuccessResponse(req, res);
// Add to queue for asynchronous processing
await FluentIngestQueueService.addFluentIngestJob(req as TelemetryRequest);
return;
} catch (err) {
return next(err);
}
},
);
// Queue stats endpoint
router.get(
"/fluent/queue/stats",
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
const stats: {
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
total: number;
} = await FluentIngestQueueService.getQueueStats();
return Response.sendJsonObjectResponse(req, res, stats);
} catch (err) {
return next(err);
}
},
);
// Queue size endpoint
router.get(
"/fluent/queue/size",
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
const size: number = await FluentIngestQueueService.getQueueSize();
return Response.sendJsonObjectResponse(req, res, { size });
} catch (err) {
return next(err);
}
},
);
// Queue failed jobs endpoint
router.get(
"/fluent/queue/failed",
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
// Parse pagination parameters from query string
const start: number = parseInt(req.query["start"] as string) || 0;
const end: number = parseInt(req.query["end"] as string) || 100;
const failedJobs: Array<{
id: string;
name: string;
data: any;
failedReason: string;
processedOn: Date | null;
finishedOn: Date | null;
attemptsMade: number;
}> = await FluentIngestQueueService.getFailedJobs({
start,
end,
});
if (
logItems &&
typeof logItems === "object" &&
(logItems as JSONObject)["json"]
) {
logItems = (logItems as JSONObject)["json"] as
| Array<JSONObject | string>
| JSONObject;
}
if (!Array.isArray(logItems)) {
logItems = [logItems];
}
for (let logItem of logItems) {
const dbLog: Log = new Log();
dbLog.projectId = (req as TelemetryRequest).projectId;
dbLog.serviceId = telemetryService.serviceId;
dbLog.severityNumber = 0;
const currentTimeAndDate: Date = OneUptimeDate.getCurrentDate();
dbLog.timeUnixNano = OneUptimeDate.toUnixNano(currentTimeAndDate);
dbLog.time = currentTimeAndDate;
dbLog.severityText = LogSeverity.Unspecified;
if (typeof logItem === "string") {
// check if its parseable to json
try {
logItem = JSON.parse(logItem);
} catch {
// do nothing
}
}
if (typeof logItem !== "string") {
logItem = JSON.stringify(logItem);
}
dbLog.body = logItem as string;
dbLogs.push(dbLog);
}
await LogService.createMany({
items: dbLogs,
props: {
isRoot: true,
return Response.sendJsonObjectResponse(req, res, {
failedJobs,
pagination: {
start,
end,
count: failedJobs.length,
},
});
OTelIngestService.recordDataIngestedUsgaeBilling({
services: {
[oneuptimeServiceName as string]: {
dataIngestedInGB: JSONFunctions.getSizeOfJSONinGB(req.body),
dataRententionInDays: telemetryService.dataRententionInDays,
serviceId: telemetryService.serviceId,
serviceName: oneuptimeServiceName as string,
},
},
projectId: (req as TelemetryRequest).projectId,
productType: ProductType.Logs,
}).catch((err: Error) => {
logger.error(err);
});
return Response.sendEmptySuccessResponse(req, res);
} catch (err) {
return next(err);
}

View File

@@ -0,0 +1,37 @@
import Express, {
ExpressRequest,
ExpressResponse,
ExpressRouter,
NextFunction,
} from "Common/Server/Utils/Express";
import FluentIngestQueueService from "../Services/Queue/FluentIngestQueueService";
// import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
const router: ExpressRouter = Express.getRouter();
/**
* JSON metrics endpoint for KEDA autoscaling
* Returns queue size as JSON for KEDA metrics-api scaler
*/
router.get(
"/metrics/queue-size",
// ClusterKeyAuthorization.isAuthorizedServiceMiddleware, // Temporarily disabled for KEDA debugging
async (
_req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
const queueSize: number = await FluentIngestQueueService.getQueueSize();
res.setHeader("Content-Type", "application/json");
res.status(200).json({
queueSize: queueSize,
});
} catch (err) {
return next(err);
}
},
);
export default router;

View File

@@ -1,4 +1,5 @@
import FluentIngestAPI from "./API/FluentIngest";
import MetricsAPI from "./API/Metrics";
import { PromiseVoidFunction } from "Common/Types/FunctionTypes";
import { ClickhouseAppInstance } from "Common/Server/Infrastructure/ClickhouseDatabase";
import PostgresAppInstance from "Common/Server/Infrastructure/PostgresDatabase";
@@ -9,12 +10,14 @@ import logger from "Common/Server/Utils/Logger";
import Realtime from "Common/Server/Utils/Realtime";
import App from "Common/Server/Utils/StartServer";
import Telemetry from "Common/Server/Utils/Telemetry";
import "./Jobs/FluentIngest/ProcessFluentIngest";
const app: ExpressApplication = Express.getExpressApp();
const APP_NAME: string = "fluent-ingest";
app.use([`/${APP_NAME}`, "/"], FluentIngestAPI);
app.use([`/${APP_NAME}`, "/"], MetricsAPI);
const init: PromiseVoidFunction = async (): Promise<void> => {
try {

View File

@@ -0,0 +1,74 @@
import { TelemetryRequest } from "Common/Server/Middleware/TelemetryIngest";
import Queue, { QueueName } from "Common/Server/Infrastructure/Queue";
import { JSONObject } from "Common/Types/JSON";
import OneUptimeDate from "Common/Types/Date";
import logger from "Common/Server/Utils/Logger";
export interface FluentIngestJobData {
projectId: string;
requestBody: JSONObject;
requestHeaders: Record<string, string>;
ingestionTimestamp: Date;
}
export default class FluentIngestQueueService {
public static async addFluentIngestJob(
req: TelemetryRequest,
): Promise<void> {
try {
const jobData: FluentIngestJobData = {
projectId: req.projectId.toString(),
requestBody: req.body,
requestHeaders: req.headers as Record<string, string>,
ingestionTimestamp: OneUptimeDate.getCurrentDate(),
};
const jobId: string = `fluent-${req.projectId?.toString()}-${OneUptimeDate.getCurrentDateAsUnixNano()}`;
await Queue.addJob(
QueueName.FluentIngest,
jobId,
"ProcessFluentIngest",
jobData as unknown as JSONObject,
);
logger.debug(`Added fluent ingestion job: ${jobId}`);
} catch (error) {
logger.error(`Error adding fluent ingestion job:`);
logger.error(error);
throw error;
}
}
public static async getQueueSize(): Promise<number> {
return Queue.getQueueSize(QueueName.FluentIngest);
}
public static async getQueueStats(): Promise<{
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
total: number;
}> {
return Queue.getQueueStats(QueueName.FluentIngest);
}
public static getFailedJobs(options?: {
start?: number;
end?: number;
}): Promise<
Array<{
id: string;
name: string;
data: JSONObject;
failedReason: string;
processedOn: Date | null;
finishedOn: Date | null;
attemptsMade: number;
}>
> {
return Queue.getFailedJobs(QueueName.FluentIngest, options);
}
}

View File

@@ -7,4 +7,32 @@ KEDA ScaledObjects for various services
{{- $metricsConfig := dict "enabled" .Values.openTelemetryIngest.keda.enabled "minReplicas" .Values.openTelemetryIngest.keda.minReplicas "maxReplicas" .Values.openTelemetryIngest.keda.maxReplicas "pollingInterval" .Values.openTelemetryIngest.keda.pollingInterval "cooldownPeriod" .Values.openTelemetryIngest.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_telemetry_queue_size" "threshold" .Values.openTelemetryIngest.keda.queueSizeThreshold "port" .Values.port.openTelemetryIngest)) }}
{{- $openTelemetryIngestKedaArgs := dict "ServiceName" "open-telemetry-ingest" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.openTelemetryIngest.disableAutoscaler }}
{{- include "oneuptime.kedaScaledObject" $openTelemetryIngestKedaArgs }}
{{- end }}
{{/* Fluent Ingest KEDA ScaledObject */}}
{{- if and .Values.keda.enabled .Values.fluentIngest.keda.enabled (not .Values.fluentIngest.disableAutoscaler) }}
{{- $metricsConfig := dict "enabled" .Values.fluentIngest.keda.enabled "minReplicas" .Values.fluentIngest.keda.minReplicas "maxReplicas" .Values.fluentIngest.keda.maxReplicas "pollingInterval" .Values.fluentIngest.keda.pollingInterval "cooldownPeriod" .Values.fluentIngest.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_fluent_ingest_queue_size" "threshold" .Values.fluentIngest.keda.queueSizeThreshold "port" .Values.port.fluentIngest)) }}
{{- $fluentIngestKedaArgs := dict "ServiceName" "fluent-ingest" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.fluentIngest.disableAutoscaler }}
{{- include "oneuptime.kedaScaledObject" $fluentIngestKedaArgs }}
{{- end }}
{{/* Incoming Request Ingest KEDA ScaledObject */}}
{{- if and .Values.keda.enabled .Values.incomingRequestIngest.keda.enabled (not .Values.incomingRequestIngest.disableAutoscaler) }}
{{- $metricsConfig := dict "enabled" .Values.incomingRequestIngest.keda.enabled "minReplicas" .Values.incomingRequestIngest.keda.minReplicas "maxReplicas" .Values.incomingRequestIngest.keda.maxReplicas "pollingInterval" .Values.incomingRequestIngest.keda.pollingInterval "cooldownPeriod" .Values.incomingRequestIngest.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_incoming_request_ingest_queue_size" "threshold" .Values.incomingRequestIngest.keda.queueSizeThreshold "port" .Values.port.incomingRequestIngest)) }}
{{- $incomingRequestIngestKedaArgs := dict "ServiceName" "incoming-request-ingest" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.incomingRequestIngest.disableAutoscaler }}
{{- include "oneuptime.kedaScaledObject" $incomingRequestIngestKedaArgs }}
{{- end }}
{{/* Server Monitor Ingest KEDA ScaledObject */}}
{{- if and .Values.keda.enabled .Values.serverMonitorIngest.keda.enabled (not .Values.serverMonitorIngest.disableAutoscaler) }}
{{- $metricsConfig := dict "enabled" .Values.serverMonitorIngest.keda.enabled "minReplicas" .Values.serverMonitorIngest.keda.minReplicas "maxReplicas" .Values.serverMonitorIngest.keda.maxReplicas "pollingInterval" .Values.serverMonitorIngest.keda.pollingInterval "cooldownPeriod" .Values.serverMonitorIngest.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_server_monitor_ingest_queue_size" "threshold" .Values.serverMonitorIngest.keda.queueSizeThreshold "port" .Values.port.serverMonitorIngest)) }}
{{- $serverMonitorIngestKedaArgs := dict "ServiceName" "server-monitor-ingest" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.serverMonitorIngest.disableAutoscaler }}
{{- include "oneuptime.kedaScaledObject" $serverMonitorIngestKedaArgs }}
{{- end }}
{{/* Probe Ingest KEDA ScaledObject */}}
{{- if and .Values.keda.enabled .Values.probeIngest.keda.enabled (not .Values.probeIngest.disableAutoscaler) }}
{{- $metricsConfig := dict "enabled" .Values.probeIngest.keda.enabled "minReplicas" .Values.probeIngest.keda.minReplicas "maxReplicas" .Values.probeIngest.keda.maxReplicas "pollingInterval" .Values.probeIngest.keda.pollingInterval "cooldownPeriod" .Values.probeIngest.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_probe_ingest_queue_size" "threshold" .Values.probeIngest.keda.queueSizeThreshold "port" .Values.port.probeIngest)) }}
{{- $probeIngestKedaArgs := dict "ServiceName" "probe-ingest" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.probeIngest.disableAutoscaler }}
{{- include "oneuptime.kedaScaledObject" $probeIngestKedaArgs }}
{{- end }}

View File

@@ -490,6 +490,17 @@ probeIngest:
disableTelemetryCollection: false
disableAutoscaler: false
resources:
# KEDA autoscaling configuration based on queue metrics
keda:
enabled: false
minReplicas: 1
maxReplicas: 100
# Scale up when queue size exceeds this threshold
queueSizeThreshold: 100
# Polling interval for metrics (in seconds)
pollingInterval: 30
# Cooldown period after scaling (in seconds)
cooldownPeriod: 300
openTelemetryIngest:
replicaCount: 1
@@ -513,12 +524,34 @@ fluentIngest:
disableTelemetryCollection: false
disableAutoscaler: false
resources:
# KEDA autoscaling configuration based on queue metrics
keda:
enabled: false
minReplicas: 1
maxReplicas: 100
# Scale up when queue size exceeds this threshold
queueSizeThreshold: 100
# Polling interval for metrics (in seconds)
pollingInterval: 30
# Cooldown period after scaling (in seconds)
cooldownPeriod: 300
incomingRequestIngest:
replicaCount: 1
disableTelemetryCollection: false
disableAutoscaler: false
resources:
# KEDA autoscaling configuration based on queue metrics
keda:
enabled: false
minReplicas: 1
maxReplicas: 100
# Scale up when queue size exceeds this threshold
queueSizeThreshold: 100
# Polling interval for metrics (in seconds)
pollingInterval: 30
# Cooldown period after scaling (in seconds)
cooldownPeriod: 300
isolatedVM:
replicaCount: 1
@@ -531,6 +564,17 @@ serverMonitorIngest:
disableTelemetryCollection: false
disableAutoscaler: false
resources:
# KEDA autoscaling configuration based on queue metrics
keda:
enabled: false
minReplicas: 1
maxReplicas: 100
# Scale up when queue size exceeds this threshold
queueSizeThreshold: 100
# Polling interval for metrics (in seconds)
pollingInterval: 30
# Cooldown period after scaling (in seconds)
cooldownPeriod: 300
slackApp:

View File

@@ -1,12 +1,6 @@
import HTTPMethod from "Common/Types/API/HTTPMethod";
import OneUptimeDate from "Common/Types/Date";
import Dictionary from "Common/Types/Dictionary";
import BadDataException from "Common/Types/Exception/BadDataException";
import { JSONObject } from "Common/Types/JSON";
import IncomingMonitorRequest from "Common/Types/Monitor/IncomingMonitor/IncomingMonitorRequest";
import MonitorType from "Common/Types/Monitor/MonitorType";
import ObjectID from "Common/Types/ObjectID";
import MonitorService from "Common/Server/Services/MonitorService";
import Express, {
ExpressRequest,
ExpressResponse,
@@ -14,10 +8,9 @@ import Express, {
NextFunction,
RequestHandler,
} from "Common/Server/Utils/Express";
import MonitorResourceUtil from "Common/Server/Utils/Monitor/MonitorResource";
import Response from "Common/Server/Utils/Response";
import Monitor from "Common/Models/DatabaseModels/Monitor";
import logger from "Common/Server/Utils/Logger";
import IncomingRequestIngestQueueService from "../Services/Queue/IncomingRequestIngestQueueService";
import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
const router: ExpressRouter = Express.getRouter();
@@ -38,63 +31,18 @@ const processIncomingRequest: RequestHandler = async (
throw new BadDataException("Invalid Secret Key");
}
const isGetRequest: boolean = req.method === "GET";
const isPostRequest: boolean = req.method === "POST";
// Return response immediately
Response.sendEmptySuccessResponse(req, res);
let httpMethod: HTTPMethod = HTTPMethod.GET;
if (isGetRequest) {
httpMethod = HTTPMethod.GET;
}
if (isPostRequest) {
httpMethod = HTTPMethod.POST;
}
const monitor: Monitor | null = await MonitorService.findOneBy({
query: {
incomingRequestSecretKey: new ObjectID(monitorSecretKeyAsString),
monitorType: MonitorType.IncomingRequest,
},
select: {
_id: true,
projectId: true,
},
props: {
isRoot: true,
},
});
if (!monitor || !monitor._id) {
throw new BadDataException("Monitor not found");
}
if (!monitor.projectId) {
throw new BadDataException("Project not found");
}
const now: Date = OneUptimeDate.getCurrentDate();
const incomingRequest: IncomingMonitorRequest = {
projectId: monitor.projectId,
monitorId: new ObjectID(monitor._id.toString()),
// Add to queue for asynchronous processing
await IncomingRequestIngestQueueService.addIncomingRequestIngestJob({
secretKey: monitorSecretKeyAsString,
requestHeaders: requestHeaders,
requestBody: requestBody,
incomingRequestReceivedAt: now,
onlyCheckForIncomingRequestReceivedAt: false,
requestMethod: httpMethod,
checkedAt: now,
};
// process probe response here.
MonitorResourceUtil.monitorResource(incomingRequest).catch((err: Error) => {
// do nothing.
// we don't want to throw error here.
// we just want to log the error.
logger.error(err);
requestMethod: req.method,
});
return Response.sendEmptySuccessResponse(req, res);
return;
} catch (err) {
return next(err);
}
@@ -122,4 +70,88 @@ router.get(
},
);
// Queue stats endpoint
router.get(
"/incoming-request/queue/stats",
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
const stats: {
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
total: number;
} = await IncomingRequestIngestQueueService.getQueueStats();
return Response.sendJsonObjectResponse(req, res, stats);
} catch (err) {
return next(err);
}
},
);
// Queue size endpoint
router.get(
"/incoming-request/queue/size",
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
const size: number = await IncomingRequestIngestQueueService.getQueueSize();
return Response.sendJsonObjectResponse(req, res, { size });
} catch (err) {
return next(err);
}
},
);
// Queue failed jobs endpoint
router.get(
"/incoming-request/queue/failed",
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
// Parse pagination parameters from query string
const start: number = parseInt(req.query["start"] as string) || 0;
const end: number = parseInt(req.query["end"] as string) || 100;
const failedJobs: Array<{
id: string;
name: string;
data: any;
failedReason: string;
processedOn: Date | null;
finishedOn: Date | null;
attemptsMade: number;
}> = await IncomingRequestIngestQueueService.getFailedJobs({
start,
end,
});
return Response.sendJsonObjectResponse(req, res, {
failedJobs,
pagination: {
start,
end,
count: failedJobs.length,
},
});
} catch (err) {
return next(err);
}
},
);
export default router;

View File

@@ -0,0 +1,37 @@
import Express, {
ExpressRequest,
ExpressResponse,
ExpressRouter,
NextFunction,
} from "Common/Server/Utils/Express";
import IncomingRequestIngestQueueService from "../Services/Queue/IncomingRequestIngestQueueService";
// import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
const router: ExpressRouter = Express.getRouter();
/**
* JSON metrics endpoint for KEDA autoscaling
* Returns queue size as JSON for KEDA metrics-api scaler
*/
router.get(
"/metrics/queue-size",
// ClusterKeyAuthorization.isAuthorizedServiceMiddleware, // Temporarily disabled for KEDA debugging
async (
_req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
const queueSize: number = await IncomingRequestIngestQueueService.getQueueSize();
res.setHeader("Content-Type", "application/json");
res.status(200).json({
queueSize: queueSize,
});
} catch (err) {
return next(err);
}
},
);
export default router;

View File

@@ -1,4 +1,5 @@
import IncomingRequestAPI from "./API/IncomingRequest";
import MetricsAPI from "./API/Metrics";
import { PromiseVoidFunction } from "Common/Types/FunctionTypes";
import { ClickhouseAppInstance } from "Common/Server/Infrastructure/ClickhouseDatabase";
import PostgresAppInstance from "Common/Server/Infrastructure/PostgresDatabase";
@@ -9,6 +10,7 @@ import logger from "Common/Server/Utils/Logger";
import Realtime from "Common/Server/Utils/Realtime";
import App from "Common/Server/Utils/StartServer";
import Telemetry from "Common/Server/Utils/Telemetry";
import "./Jobs/IncomingRequestIngest/ProcessIncomingRequestIngest";
import "ejs";
const app: ExpressApplication = Express.getExpressApp();
@@ -16,6 +18,7 @@ const app: ExpressApplication = Express.getExpressApp();
const APP_NAME: string = "incoming-request-ingest";
app.use([`/${APP_NAME}`, "/"], IncomingRequestAPI);
app.use([`/${APP_NAME}`, "/"], MetricsAPI);
const init: PromiseVoidFunction = async (): Promise<void> => {
try {

View File

@@ -0,0 +1,109 @@
import { IncomingRequestIngestJobData } from "../../Services/Queue/IncomingRequestIngestQueueService";
import logger from "Common/Server/Utils/Logger";
import { QueueJob, QueueName } from "Common/Server/Infrastructure/Queue";
import QueueWorker from "Common/Server/Infrastructure/QueueWorker";
import HTTPMethod from "Common/Types/API/HTTPMethod";
import OneUptimeDate from "Common/Types/Date";
import Dictionary from "Common/Types/Dictionary";
import BadDataException from "Common/Types/Exception/BadDataException";
import { JSONObject } from "Common/Types/JSON";
import IncomingMonitorRequest from "Common/Types/Monitor/IncomingMonitor/IncomingMonitorRequest";
import MonitorType from "Common/Types/Monitor/MonitorType";
import ObjectID from "Common/Types/ObjectID";
import MonitorService from "Common/Server/Services/MonitorService";
import MonitorResourceUtil from "Common/Server/Utils/Monitor/MonitorResource";
import Monitor from "Common/Models/DatabaseModels/Monitor";
// Set up the worker for processing incoming request ingest queue
QueueWorker.getWorker(
QueueName.IncomingRequestIngest,
async (job: QueueJob): Promise<void> => {
logger.debug(`Processing incoming request ingestion job: ${job.name}`);
try {
const jobData: IncomingRequestIngestJobData =
job.data as IncomingRequestIngestJobData;
await processIncomingRequestFromQueue(jobData);
logger.debug(
`Successfully processed incoming request ingestion job: ${job.name}`,
);
} catch (error) {
logger.error(`Error processing incoming request ingestion job:`);
logger.error(error);
throw error;
}
},
{ concurrency: 20 }, // Process up to 20 incoming request ingest jobs concurrently
);
async function processIncomingRequestFromQueue(
jobData: IncomingRequestIngestJobData,
): Promise<void> {
const requestHeaders: Dictionary<string> = jobData.requestHeaders;
const requestBody: string | JSONObject = jobData.requestBody;
const monitorSecretKeyAsString: string = jobData.secretKey;
if (!monitorSecretKeyAsString) {
throw new BadDataException("Invalid Secret Key");
}
const isGetRequest: boolean = jobData.requestMethod === "GET";
const isPostRequest: boolean = jobData.requestMethod === "POST";
let httpMethod: HTTPMethod = HTTPMethod.GET;
if (isGetRequest) {
httpMethod = HTTPMethod.GET;
}
if (isPostRequest) {
httpMethod = HTTPMethod.POST;
}
const monitor: Monitor | null = await MonitorService.findOneBy({
query: {
incomingRequestSecretKey: new ObjectID(monitorSecretKeyAsString),
monitorType: MonitorType.IncomingRequest,
},
select: {
_id: true,
projectId: true,
},
props: {
isRoot: true,
},
});
if (!monitor || !monitor._id) {
throw new BadDataException("Monitor not found");
}
if (!monitor.projectId) {
throw new BadDataException("Project not found");
}
const now: Date = OneUptimeDate.getCurrentDate();
const incomingRequest: IncomingMonitorRequest = {
projectId: monitor.projectId,
monitorId: new ObjectID(monitor._id.toString()),
requestHeaders: requestHeaders,
requestBody: requestBody,
incomingRequestReceivedAt: now,
onlyCheckForIncomingRequestReceivedAt: false,
requestMethod: httpMethod,
checkedAt: now,
};
// process probe response here.
MonitorResourceUtil.monitorResource(incomingRequest).catch((err: Error) => {
// do nothing.
// we don't want to throw error here.
// we just want to log the error.
logger.error(err);
});
}
logger.debug("Incoming request ingest worker initialized");

View File

@@ -0,0 +1,81 @@
import Queue, { QueueName } from "Common/Server/Infrastructure/Queue";
import { JSONObject } from "Common/Types/JSON";
import OneUptimeDate from "Common/Types/Date";
import logger from "Common/Server/Utils/Logger";
import Dictionary from "Common/Types/Dictionary";
export interface IncomingRequestIngestJobData {
secretKey: string;
requestHeaders: Dictionary<string>;
requestBody: string | JSONObject;
requestMethod: string;
ingestionTimestamp: Date;
}
export default class IncomingRequestIngestQueueService {
public static async addIncomingRequestIngestJob(
data: {
secretKey: string;
requestHeaders: Dictionary<string>;
requestBody: string | JSONObject;
requestMethod: string;
},
): Promise<void> {
try {
const jobData: IncomingRequestIngestJobData = {
secretKey: data.secretKey,
requestHeaders: data.requestHeaders,
requestBody: data.requestBody,
requestMethod: data.requestMethod,
ingestionTimestamp: OneUptimeDate.getCurrentDate(),
};
const jobId: string = `incoming-request-${data.secretKey}-${OneUptimeDate.getCurrentDateAsUnixNano()}`;
await Queue.addJob(
QueueName.IncomingRequestIngest,
jobId,
"ProcessIncomingRequestIngest",
jobData as unknown as JSONObject,
);
logger.debug(`Added incoming request ingestion job: ${jobId}`);
} catch (error) {
logger.error(`Error adding incoming request ingestion job:`);
logger.error(error);
throw error;
}
}
public static async getQueueSize(): Promise<number> {
return Queue.getQueueSize(QueueName.IncomingRequestIngest);
}
public static async getQueueStats(): Promise<{
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
total: number;
}> {
return Queue.getQueueStats(QueueName.IncomingRequestIngest);
}
public static getFailedJobs(options?: {
start?: number;
end?: number;
}): Promise<
Array<{
id: string;
name: string;
data: JSONObject;
failedReason: string;
processedOn: Date | null;
finishedOn: Date | null;
attemptsMade: number;
}>
> {
return Queue.getFailedJobs(QueueName.IncomingRequestIngest, options);
}
}

View File

@@ -0,0 +1,37 @@
import Express, {
ExpressRequest,
ExpressResponse,
ExpressRouter,
NextFunction,
} from "Common/Server/Utils/Express";
import ProbeIngestQueueService from "../Services/Queue/ProbeIngestQueueService";
// import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
const router: ExpressRouter = Express.getRouter();
/**
* JSON metrics endpoint for KEDA autoscaling
* Returns queue size as JSON for KEDA metrics-api scaler
*/
router.get(
"/metrics/queue-size",
// ClusterKeyAuthorization.isAuthorizedServiceMiddleware, // Temporarily disabled for KEDA debugging
async (
_req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
const queueSize: number = await ProbeIngestQueueService.getQueueSize();
res.setHeader("Content-Type", "application/json");
res.status(200).json({
queueSize: queueSize,
});
} catch (err) {
return next(err);
}
},
);
export default router;

View File

@@ -18,13 +18,12 @@ import Express, {
NextFunction,
} from "Common/Server/Utils/Express";
import logger from "Common/Server/Utils/Logger";
import MonitorResourceUtil from "Common/Server/Utils/Monitor/MonitorResource";
import Response from "Common/Server/Utils/Response";
import GlobalConfig from "Common/Models/DatabaseModels/GlobalConfig";
import Probe from "Common/Models/DatabaseModels/Probe";
import User from "Common/Models/DatabaseModels/User";
import MonitorTestService from "Common/Server/Services/MonitorTestService";
import OneUptimeDate from "Common/Types/Date";
import ProbeIngestQueueService from "../Services/Queue/ProbeIngestQueueService";
import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
const router: ExpressRouter = Express.getRouter();
@@ -255,17 +254,18 @@ router.post(
);
}
// this is when the resource was ingested.
probeResponse.ingestedAt = OneUptimeDate.getCurrentDate();
MonitorResourceUtil.monitorResource(probeResponse).catch((err: Error) => {
logger.error("Error in monitor resource");
logger.error(err);
});
return Response.sendJsonObjectResponse(req, res, {
// Return response immediately
Response.sendJsonObjectResponse(req, res, {
result: "processing",
});
// Add to queue for asynchronous processing
await ProbeIngestQueueService.addProbeIngestJob({
probeMonitorResponse: req.body,
jobType: "probe-response",
});
return;
} catch (err) {
return next(err);
}
@@ -303,28 +303,101 @@ router.post(
);
}
probeResponse.ingestedAt = OneUptimeDate.getCurrentDate();
// Return response immediately
Response.sendEmptySuccessResponse(req, res);
// save the probe response to the monitor test.
await MonitorTestService.updateOneById({
id: testId,
data: {
monitorStepProbeResponse: {
[probeResponse.monitorStepId.toString()]: {
...JSON.parse(JSON.stringify(probeResponse)),
monitoredAt: OneUptimeDate.getCurrentDate(),
},
} as any,
},
props: {
isRoot: true,
},
// Add to queue for asynchronous processing
await ProbeIngestQueueService.addProbeIngestJob({
probeMonitorResponse: req.body,
jobType: "monitor-test",
testId: testId.toString(),
});
// send success response.
return;
} catch (err) {
return next(err);
}
},
);
return Response.sendEmptySuccessResponse(req, res);
// Queue stats endpoint
router.get(
"/probe/queue/stats",
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
const stats: {
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
total: number;
} = await ProbeIngestQueueService.getQueueStats();
return Response.sendJsonObjectResponse(req, res, stats);
} catch (err) {
return next(err);
}
},
);
// Queue size endpoint
router.get(
"/probe/queue/size",
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
const size: number = await ProbeIngestQueueService.getQueueSize();
return Response.sendJsonObjectResponse(req, res, { size });
} catch (err) {
return next(err);
}
},
);
// Queue failed jobs endpoint
router.get(
"/probe/queue/failed",
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
// Parse pagination parameters from query string
const start: number = parseInt(req.query["start"] as string) || 0;
const end: number = parseInt(req.query["end"] as string) || 100;
const failedJobs: Array<{
id: string;
name: string;
data: any;
failedReason: string;
processedOn: Date | null;
finishedOn: Date | null;
attemptsMade: number;
}> = await ProbeIngestQueueService.getFailedJobs({
start,
end,
});
return Response.sendJsonObjectResponse(req, res, {
failedJobs,
pagination: {
start,
end,
count: failedJobs.length,
},
});
} catch (err) {
return next(err);
}

View File

@@ -1,6 +1,7 @@
import MonitorAPI from "./API/Monitor";
import ProbeIngest from "./API/Probe";
import RegisterAPI from "./API/Register";
import MetricsAPI from "./API/Metrics";
import { PromiseVoidFunction } from "Common/Types/FunctionTypes";
import { ClickhouseAppInstance } from "Common/Server/Infrastructure/ClickhouseDatabase";
import PostgresAppInstance from "Common/Server/Infrastructure/PostgresDatabase";
@@ -11,6 +12,7 @@ import logger from "Common/Server/Utils/Logger";
import Realtime from "Common/Server/Utils/Realtime";
import App from "Common/Server/Utils/StartServer";
import Telemetry from "Common/Server/Utils/Telemetry";
import "./Jobs/ProbeIngest/ProcessProbeIngest";
import "ejs";
const app: ExpressApplication = Express.getExpressApp();
@@ -21,6 +23,7 @@ const APP_NAME: string = "probe-ingest";
app.use([`/${APP_NAME}`, "/ingestor", "/"], RegisterAPI);
app.use([`/${APP_NAME}`, "/ingestor", "/"], MonitorAPI);
app.use([`/${APP_NAME}`, "/ingestor", "/"], ProbeIngest);
app.use([`/${APP_NAME}`, "/"], MetricsAPI);
const init: PromiseVoidFunction = async (): Promise<void> => {
try {

View File

@@ -0,0 +1,87 @@
import { ProbeIngestJobData } from "../../Services/Queue/ProbeIngestQueueService";
import logger from "Common/Server/Utils/Logger";
import { QueueJob, QueueName } from "Common/Server/Infrastructure/Queue";
import QueueWorker from "Common/Server/Infrastructure/QueueWorker";
import BadDataException from "Common/Types/Exception/BadDataException";
import JSONFunctions from "Common/Types/JSONFunctions";
import ObjectID from "Common/Types/ObjectID";
import MonitorResourceUtil from "Common/Server/Utils/Monitor/MonitorResource";
import OneUptimeDate from "Common/Types/Date";
import MonitorTestService from "Common/Server/Services/MonitorTestService";
import ProbeMonitorResponse from "Common/Types/Probe/ProbeMonitorResponse";
import { JSONObject } from "Common/Types/JSON";
// Set up the worker for processing probe ingest queue
QueueWorker.getWorker(
QueueName.ProbeIngest,
async (job: QueueJob): Promise<void> => {
logger.debug(`Processing probe ingestion job: ${job.name}`);
try {
const jobData: ProbeIngestJobData = job.data as ProbeIngestJobData;
await processProbeFromQueue(jobData);
logger.debug(
`Successfully processed probe ingestion job: ${job.name}`,
);
} catch (error) {
logger.error(`Error processing probe ingestion job:`);
logger.error(error);
throw error;
}
},
{ concurrency: 20 }, // Process up to 20 probe ingest jobs concurrently
);
async function processProbeFromQueue(
jobData: ProbeIngestJobData,
): Promise<void> {
const probeResponse: ProbeMonitorResponse = JSONFunctions.deserialize(
jobData.probeMonitorResponse["probeMonitorResponse"] as JSONObject,
) as any;
if (!probeResponse) {
throw new BadDataException("ProbeMonitorResponse not found");
}
// this is when the resource was ingested.
probeResponse.ingestedAt = OneUptimeDate.getCurrentDate();
if (jobData.jobType === "probe-response") {
// Handle regular probe response
MonitorResourceUtil.monitorResource(probeResponse).catch((err: Error) => {
logger.error("Error in monitor resource");
logger.error(err);
});
} else if (jobData.jobType === "monitor-test" && jobData.testId) {
// Handle monitor test response
const testId: ObjectID = new ObjectID(jobData.testId);
if (!testId) {
throw new BadDataException("TestId not found");
}
probeResponse.ingestedAt = OneUptimeDate.getCurrentDate();
// save the probe response to the monitor test.
await MonitorTestService.updateOneById({
id: testId,
data: {
monitorStepProbeResponse: {
[probeResponse.monitorStepId.toString()]: {
...JSON.parse(JSON.stringify(probeResponse)),
monitoredAt: OneUptimeDate.getCurrentDate(),
},
} as any,
},
props: {
isRoot: true,
},
});
} else {
throw new BadDataException(`Invalid job type: ${jobData.jobType}`);
}
}
logger.debug("Probe ingest worker initialized");

View File

@@ -0,0 +1,77 @@
import Queue, { QueueName } from "Common/Server/Infrastructure/Queue";
import { JSONObject } from "Common/Types/JSON";
import OneUptimeDate from "Common/Types/Date";
import logger from "Common/Server/Utils/Logger";
export interface ProbeIngestJobData {
probeMonitorResponse: JSONObject;
jobType: "probe-response" | "monitor-test";
testId?: string | undefined;
ingestionTimestamp: Date;
}
export default class ProbeIngestQueueService {
public static async addProbeIngestJob(
data: {
probeMonitorResponse: JSONObject;
jobType: "probe-response" | "monitor-test";
testId?: string;
},
): Promise<void> {
try {
const jobData: ProbeIngestJobData = {
probeMonitorResponse: data.probeMonitorResponse,
jobType: data.jobType,
testId: data.testId,
ingestionTimestamp: OneUptimeDate.getCurrentDate(),
};
const jobId: string = `probe-${data.jobType}-${data.testId || "general"}-${OneUptimeDate.getCurrentDateAsUnixNano()}`;
await Queue.addJob(
QueueName.ProbeIngest,
jobId,
"ProcessProbeIngest",
jobData as unknown as JSONObject,
);
logger.debug(`Added probe ingestion job: ${jobId}`);
} catch (error) {
logger.error(`Error adding probe ingestion job:`);
logger.error(error);
throw error;
}
}
public static async getQueueSize(): Promise<number> {
return Queue.getQueueSize(QueueName.ProbeIngest);
}
public static async getQueueStats(): Promise<{
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
total: number;
}> {
return Queue.getQueueStats(QueueName.ProbeIngest);
}
public static getFailedJobs(options?: {
start?: number;
end?: number;
}): Promise<
Array<{
id: string;
name: string;
data: JSONObject;
failedReason: string;
processedOn: Date | null;
finishedOn: Date | null;
attemptsMade: number;
}>
> {
return Queue.getFailedJobs(QueueName.ProbeIngest, options);
}
}

View File

@@ -0,0 +1,37 @@
import Express, {
ExpressRequest,
ExpressResponse,
ExpressRouter,
NextFunction,
} from "Common/Server/Utils/Express";
import ServerMonitorIngestQueueService from "../Services/Queue/ServerMonitorIngestQueueService";
// import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
const router: ExpressRouter = Express.getRouter();
/**
* JSON metrics endpoint for KEDA autoscaling
* Returns queue size as JSON for KEDA metrics-api scaler
*/
router.get(
"/metrics/queue-size",
// ClusterKeyAuthorization.isAuthorizedServiceMiddleware, // Temporarily disabled for KEDA debugging
async (
_req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
const queueSize: number = await ServerMonitorIngestQueueService.getQueueSize();
res.setHeader("Content-Type", "application/json");
res.status(200).json({
queueSize: queueSize,
});
} catch (err) {
return next(err);
}
},
);
export default router;

View File

@@ -1,8 +1,6 @@
import BadDataException from "Common/Types/Exception/BadDataException";
import { JSONObject } from "Common/Types/JSON";
import JSONFunctions from "Common/Types/JSONFunctions";
import MonitorType from "Common/Types/Monitor/MonitorType";
import ServerMonitorResponse from "Common/Types/Monitor/ServerMonitor/ServerMonitorResponse";
import ObjectID from "Common/Types/ObjectID";
import MonitorService from "Common/Server/Services/MonitorService";
import Express, {
@@ -11,11 +9,11 @@ import Express, {
ExpressRouter,
NextFunction,
} from "Common/Server/Utils/Express";
import MonitorResourceUtil from "Common/Server/Utils/Monitor/MonitorResource";
import Response from "Common/Server/Utils/Response";
import Monitor from "Common/Models/DatabaseModels/Monitor";
import OneUptimeDate from "Common/Types/Date";
import ProjectService from "Common/Server/Services/ProjectService";
import ServerMonitorIngestQueueService from "../Services/Queue/ServerMonitorIngestQueueService";
import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
const router: ExpressRouter = Express.getRouter();
@@ -77,52 +75,100 @@ router.post(
throw new BadDataException("Invalid Secret Key");
}
const monitor: Monitor | null = await MonitorService.findOneBy({
query: {
serverMonitorSecretKey: new ObjectID(monitorSecretKeyAsString),
monitorType: MonitorType.Server,
...MonitorService.getEnabledMonitorQuery(),
project: {
...ProjectService.getActiveProjectStatusQuery(),
},
},
select: {
_id: true,
},
props: {
isRoot: true,
},
});
if (!monitor) {
throw new BadDataException("Monitor not found");
}
// return the response early.
Response.sendEmptySuccessResponse(req, res);
// now process this request.
// Add to queue for asynchronous processing
await ServerMonitorIngestQueueService.addServerMonitorIngestJob({
secretKey: monitorSecretKeyAsString,
serverMonitorResponse: req.body as JSONObject,
});
const serverMonitorResponse: ServerMonitorResponse =
JSONFunctions.deserialize(
req.body["serverMonitorResponse"] as JSONObject,
) as any;
return;
} catch (err) {
return next(err);
}
},
);
if (!serverMonitorResponse) {
throw new BadDataException("Invalid Server Monitor Response");
}
// Queue stats endpoint
router.get(
"/server-monitor/queue/stats",
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
const stats: {
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
total: number;
} = await ServerMonitorIngestQueueService.getQueueStats();
return Response.sendJsonObjectResponse(req, res, stats);
} catch (err) {
return next(err);
}
},
);
if (!monitor.id) {
throw new BadDataException("Monitor id not found");
}
// Queue size endpoint
router.get(
"/server-monitor/queue/size",
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
const size: number = await ServerMonitorIngestQueueService.getQueueSize();
return Response.sendJsonObjectResponse(req, res, { size });
} catch (err) {
return next(err);
}
},
);
serverMonitorResponse.monitorId = monitor.id;
// Queue failed jobs endpoint
router.get(
"/server-monitor/queue/failed",
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
// Parse pagination parameters from query string
const start: number = parseInt(req.query["start"] as string) || 0;
const end: number = parseInt(req.query["end"] as string) || 100;
serverMonitorResponse.requestReceivedAt = OneUptimeDate.getCurrentDate();
serverMonitorResponse.timeNow = OneUptimeDate.getCurrentDate();
const failedJobs: Array<{
id: string;
name: string;
data: any;
failedReason: string;
processedOn: Date | null;
finishedOn: Date | null;
attemptsMade: number;
}> = await ServerMonitorIngestQueueService.getFailedJobs({
start,
end,
});
// process probe response here.
await MonitorResourceUtil.monitorResource(serverMonitorResponse);
return Response.sendJsonObjectResponse(req, res, {
failedJobs,
pagination: {
start,
end,
count: failedJobs.length,
},
});
} catch (err) {
return next(err);
}

View File

@@ -1,4 +1,5 @@
import ServerMonitorAPI from "./API/ServerMonitor";
import MetricsAPI from "./API/Metrics";
import { PromiseVoidFunction } from "Common/Types/FunctionTypes";
import { ClickhouseAppInstance } from "Common/Server/Infrastructure/ClickhouseDatabase";
import PostgresAppInstance from "Common/Server/Infrastructure/PostgresDatabase";
@@ -9,12 +10,14 @@ import logger from "Common/Server/Utils/Logger";
import Realtime from "Common/Server/Utils/Realtime";
import App from "Common/Server/Utils/StartServer";
import Telemetry from "Common/Server/Utils/Telemetry";
import "./Jobs/ServerMonitorIngest/ProcessServerMonitorIngest";
const app: ExpressApplication = Express.getExpressApp();
const APP_NAME: string = "server-monitor-ingest";
app.use([`/${APP_NAME}`, "/"], ServerMonitorAPI);
app.use([`/${APP_NAME}`, "/"], MetricsAPI);
const init: PromiseVoidFunction = async (): Promise<void> => {
try {

View File

@@ -0,0 +1,92 @@
import { ServerMonitorIngestJobData } from "../../Services/Queue/ServerMonitorIngestQueueService";
import logger from "Common/Server/Utils/Logger";
import { QueueJob, QueueName } from "Common/Server/Infrastructure/Queue";
import QueueWorker from "Common/Server/Infrastructure/QueueWorker";
import BadDataException from "Common/Types/Exception/BadDataException";
import { JSONObject } from "Common/Types/JSON";
import JSONFunctions from "Common/Types/JSONFunctions";
import MonitorType from "Common/Types/Monitor/MonitorType";
import ServerMonitorResponse from "Common/Types/Monitor/ServerMonitor/ServerMonitorResponse";
import ObjectID from "Common/Types/ObjectID";
import MonitorService from "Common/Server/Services/MonitorService";
import MonitorResourceUtil from "Common/Server/Utils/Monitor/MonitorResource";
import Monitor from "Common/Models/DatabaseModels/Monitor";
import OneUptimeDate from "Common/Types/Date";
import ProjectService from "Common/Server/Services/ProjectService";
// Set up the worker for processing server monitor ingest queue
QueueWorker.getWorker(
QueueName.ServerMonitorIngest,
async (job: QueueJob): Promise<void> => {
logger.debug(`Processing server monitor ingestion job: ${job.name}`);
try {
const jobData: ServerMonitorIngestJobData =
job.data as ServerMonitorIngestJobData;
await processServerMonitorFromQueue(jobData);
logger.debug(
`Successfully processed server monitor ingestion job: ${job.name}`,
);
} catch (error) {
logger.error(`Error processing server monitor ingestion job:`);
logger.error(error);
throw error;
}
},
{ concurrency: 20 }, // Process up to 20 server monitor ingest jobs concurrently
);
async function processServerMonitorFromQueue(
jobData: ServerMonitorIngestJobData,
): Promise<void> {
const monitorSecretKeyAsString: string = jobData.secretKey;
if (!monitorSecretKeyAsString) {
throw new BadDataException("Invalid Secret Key");
}
const monitor: Monitor | null = await MonitorService.findOneBy({
query: {
serverMonitorSecretKey: new ObjectID(monitorSecretKeyAsString),
monitorType: MonitorType.Server,
...MonitorService.getEnabledMonitorQuery(),
project: {
...ProjectService.getActiveProjectStatusQuery(),
},
},
select: {
_id: true,
},
props: {
isRoot: true,
},
});
if (!monitor) {
throw new BadDataException("Monitor not found");
}
const serverMonitorResponse: ServerMonitorResponse =
JSONFunctions.deserialize(
jobData.serverMonitorResponse["serverMonitorResponse"] as JSONObject,
) as any;
if (!serverMonitorResponse) {
throw new BadDataException("Invalid Server Monitor Response");
}
if (!monitor.id) {
throw new BadDataException("Monitor id not found");
}
serverMonitorResponse.monitorId = monitor.id;
serverMonitorResponse.requestReceivedAt = OneUptimeDate.getCurrentDate();
serverMonitorResponse.timeNow = OneUptimeDate.getCurrentDate();
// process probe response here.
await MonitorResourceUtil.monitorResource(serverMonitorResponse);
}
logger.debug("Server monitor ingest worker initialized");

View File

@@ -0,0 +1,74 @@
import Queue, { QueueName } from "Common/Server/Infrastructure/Queue";
import { JSONObject } from "Common/Types/JSON";
import OneUptimeDate from "Common/Types/Date";
import logger from "Common/Server/Utils/Logger";
export interface ServerMonitorIngestJobData {
secretKey: string;
serverMonitorResponse: JSONObject;
ingestionTimestamp: Date;
}
export default class ServerMonitorIngestQueueService {
public static async addServerMonitorIngestJob(
data: {
secretKey: string;
serverMonitorResponse: JSONObject;
},
): Promise<void> {
try {
const jobData: ServerMonitorIngestJobData = {
secretKey: data.secretKey,
serverMonitorResponse: data.serverMonitorResponse,
ingestionTimestamp: OneUptimeDate.getCurrentDate(),
};
const jobId: string = `server-monitor-${data.secretKey}-${OneUptimeDate.getCurrentDateAsUnixNano()}`;
await Queue.addJob(
QueueName.ServerMonitorIngest,
jobId,
"ProcessServerMonitorIngest",
jobData as unknown as JSONObject,
);
logger.debug(`Added server monitor ingestion job: ${jobId}`);
} catch (error) {
logger.error(`Error adding server monitor ingestion job:`);
logger.error(error);
throw error;
}
}
public static async getQueueSize(): Promise<number> {
return Queue.getQueueSize(QueueName.ServerMonitorIngest);
}
public static async getQueueStats(): Promise<{
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
total: number;
}> {
return Queue.getQueueStats(QueueName.ServerMonitorIngest);
}
public static getFailedJobs(options?: {
start?: number;
end?: number;
}): Promise<
Array<{
id: string;
name: string;
data: JSONObject;
failedReason: string;
processedOn: Date | null;
finishedOn: Date | null;
attemptsMade: number;
}>
> {
return Queue.getFailedJobs(QueueName.ServerMonitorIngest, options);
}
}