Compare commits

...

7 Commits

20 changed files with 207 additions and 51 deletions

View File

@@ -52,7 +52,9 @@ router.post(
Response.sendEmptySuccessResponse(req, res);
// Add to queue for asynchronous processing
await FluentIngestQueueService.addFluentIngestJob(req as TelemetryRequest);
await FluentIngestQueueService.addFluentIngestJob(
req as TelemetryRequest,
);
return;
} catch (err) {

View File

@@ -34,9 +34,7 @@ QueueWorker.getWorker(
requestHeaders: jobData.requestHeaders,
});
logger.debug(
`Successfully processed fluent ingestion job: ${job.name}`,
);
logger.debug(`Successfully processed fluent ingestion job: ${job.name}`);
} catch (error) {
logger.error(`Error processing fluent ingestion job:`);
logger.error(error);
@@ -55,8 +53,9 @@ async function processFluentIngestFromQueue(
| Array<JSONObject | string>
| JSONObject;
let oneuptimeServiceName: string | string[] | undefined =
data.requestHeaders["x-oneuptime-service-name"] as string | string[] | undefined;
let oneuptimeServiceName: string | string[] | undefined = data.requestHeaders[
"x-oneuptime-service-name"
] as string | string[] | undefined;
if (!oneuptimeServiceName) {
oneuptimeServiceName = "Unknown Service";
@@ -124,7 +123,9 @@ async function processFluentIngestFromQueue(
OTelIngestService.recordDataIngestedUsgaeBilling({
services: {
[oneuptimeServiceName as string]: {
dataIngestedInGB: JSONFunctions.getSizeOfJSONinGB(data.requestBody as JSONObject),
dataIngestedInGB: JSONFunctions.getSizeOfJSONinGB(
data.requestBody as JSONObject,
),
dataRententionInDays: telemetryService.dataRententionInDays,
serviceId: telemetryService.serviceId,
serviceName: oneuptimeServiceName as string,

View File

@@ -12,9 +12,7 @@ export interface FluentIngestJobData {
}
export default class FluentIngestQueueService {
public static async addFluentIngestJob(
req: TelemetryRequest,
): Promise<void> {
public static async addFluentIngestJob(req: TelemetryRequest): Promise<void> {
try {
const jobData: FluentIngestJobData = {
projectId: req.projectId.toString(),

View File

@@ -112,7 +112,7 @@ spec:
---
# OneUptime fluent-ingest autoscaler
{{- if not $.Values.fluentIngest.disableAutoscaler }}
{{- if and (not $.Values.fluentIngest.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.fluentIngest.keda.enabled)) }}
{{- $fluentIngestAutoScalerArgs := dict "ServiceName" "fluent-ingest" "Release" $.Release "Values" $.Values -}}
{{- include "oneuptime.autoscaler" $fluentIngestAutoScalerArgs }}
{{- end }}

View File

@@ -112,7 +112,7 @@ spec:
---
# OneUptime incoming-request-ingest autoscaler
{{- if not $.Values.incomingRequestIngest.disableAutoscaler }}
{{- if and (not $.Values.incomingRequestIngest.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.incomingRequestIngest.keda.enabled)) }}
{{- $incomingRequestIngestAutoScalerArgs := dict "ServiceName" "incoming-request-ingest" "Release" $.Release "Values" $.Values -}}
{{- include "oneuptime.autoscaler" $incomingRequestIngestAutoScalerArgs }}
{{- end }}

View File

@@ -35,4 +35,14 @@ KEDA ScaledObjects for various services
{{- $metricsConfig := dict "enabled" .Values.probeIngest.keda.enabled "minReplicas" .Values.probeIngest.keda.minReplicas "maxReplicas" .Values.probeIngest.keda.maxReplicas "pollingInterval" .Values.probeIngest.keda.pollingInterval "cooldownPeriod" .Values.probeIngest.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_probe_ingest_queue_size" "threshold" .Values.probeIngest.keda.queueSizeThreshold "port" .Values.port.probeIngest)) }}
{{- $probeIngestKedaArgs := dict "ServiceName" "probe-ingest" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.probeIngest.disableAutoscaler }}
{{- include "oneuptime.kedaScaledObject" $probeIngestKedaArgs }}
{{- end }}
{{/* Probe KEDA ScaledObjects - one for each probe configuration */}}
{{- range $key, $val := $.Values.probes }}
{{- if and $.Values.keda.enabled $val.keda.enabled (not $val.disableAutoscaler) }}
{{- $serviceName := printf "probe-%s" $key }}
{{- $metricsConfig := dict "enabled" $val.keda.enabled "minReplicas" $val.keda.minReplicas "maxReplicas" $val.keda.maxReplicas "pollingInterval" $val.keda.pollingInterval "cooldownPeriod" $val.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_probe_queue_size" "threshold" $val.keda.queueSizeThreshold "port" $.Values.port.probe)) }}
{{- $probeKedaArgs := dict "ServiceName" $serviceName "Release" $.Release "Values" $.Values "MetricsConfig" $metricsConfig "DisableAutoscaler" $val.disableAutoscaler }}
{{- include "oneuptime.kedaScaledObject" $probeKedaArgs }}
{{- end }}
{{- end }}

View File

@@ -112,7 +112,7 @@ spec:
---
# OneUptime probe-ingest autoscaler
{{- if not $.Values.probeIngest.disableAutoscaler }}
{{- if and (not $.Values.probeIngest.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.probeIngest.keda.enabled)) }}
{{- $probeIngestAutoScalerArgs := dict "ServiceName" "probe-ingest" "Release" $.Release "Values" $.Values -}}
{{- include "oneuptime.autoscaler" $probeIngestAutoScalerArgs }}
{{- end }}

View File

@@ -112,7 +112,7 @@ spec:
---
# OneUptime server-monitor-ingest autoscaler
{{- if not $.Values.serverMonitorIngest.disableAutoscaler }}
{{- if and (not $.Values.serverMonitorIngest.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.serverMonitorIngest.keda.enabled)) }}
{{- $serverMonitorIngestAutoScalerArgs := dict "ServiceName" "server-monitor-ingest" "Release" $.Release "Values" $.Values -}}
{{- include "oneuptime.autoscaler" $serverMonitorIngestAutoScalerArgs }}
{{- end }}

View File

@@ -198,6 +198,17 @@ probes:
customCodeMonitorScriptTimeoutInMs: 60000
disableTelemetryCollection: false
disableAutoscaler: false
# KEDA autoscaling configuration based on monitor queue metrics
keda:
enabled: false
minReplicas: 1
maxReplicas: 100
# Scale up when queue size exceeds this threshold per probe
queueSizeThreshold: 10
# Polling interval for metrics (in seconds)
pollingInterval: 30
# Cooldown period after scaling (in seconds)
cooldownPeriod: 300
# resources:
# additionalContainers:
# two:
@@ -213,10 +224,22 @@ probes:
# disableAutoscaler: false
# resources:
# additionalContainers:
# KEDA autoscaling configuration based on monitor queue metrics
# keda:
# enabled: false
# minReplicas: 1
# maxReplicas: 100
# # Scale up when queue size exceeds this threshold per probe
# queueSizeThreshold: 10
# # Polling interval for metrics (in seconds)
# pollingInterval: 30
# # Cooldown period after scaling (in seconds)
# cooldownPeriod: 300
port:
app: 3002
probe: 3874
probeIngest: 3400
serverMonitorIngest: 3404
openTelemetryIngest: 3403

View File

@@ -105,7 +105,8 @@ router.get(
next: NextFunction,
): Promise<void> => {
try {
const size: number = await IncomingRequestIngestQueueService.getQueueSize();
const size: number =
await IncomingRequestIngestQueueService.getQueueSize();
return Response.sendJsonObjectResponse(req, res, { size });
} catch (err) {
return next(err);

View File

@@ -22,7 +22,8 @@ router.get(
next: NextFunction,
): Promise<void> => {
try {
const queueSize: number = await IncomingRequestIngestQueueService.getQueueSize();
const queueSize: number =
await IncomingRequestIngestQueueService.getQueueSize();
res.setHeader("Content-Type", "application/json");
res.status(200).json({

View File

@@ -98,12 +98,7 @@ async function processIncomingRequestFromQueue(
};
// process probe response here.
MonitorResourceUtil.monitorResource(incomingRequest).catch((err: Error) => {
// do nothing.
// we don't want to throw error here.
// we just want to log the error.
logger.error(err);
});
await MonitorResourceUtil.monitorResource(incomingRequest);
}
logger.debug("Incoming request ingest worker initialized");

View File

@@ -13,14 +13,12 @@ export interface IncomingRequestIngestJobData {
}
export default class IncomingRequestIngestQueueService {
public static async addIncomingRequestIngestJob(
data: {
secretKey: string;
requestHeaders: Dictionary<string>;
requestBody: string | JSONObject;
requestMethod: string;
},
): Promise<void> {
public static async addIncomingRequestIngestJob(data: {
secretKey: string;
requestHeaders: Dictionary<string>;
requestBody: string | JSONObject;
requestMethod: string;
}): Promise<void> {
try {
const jobData: IncomingRequestIngestJobData = {
secretKey: data.secretKey,

70
Probe/API/Metrics.ts Normal file
View File

@@ -0,0 +1,70 @@
import Express, {
ExpressRequest,
ExpressResponse,
ExpressRouter,
NextFunction,
} from "Common/Server/Utils/Express";
import Response from "Common/Server/Utils/Response";
import { PROBE_INGEST_URL } from "../Config";
import HTTPErrorResponse from "Common/Types/API/HTTPErrorResponse";
import HTTPMethod from "Common/Types/API/HTTPMethod";
import HTTPResponse from "Common/Types/API/HTTPResponse";
import URL from "Common/Types/API/URL";
import { JSONObject } from "Common/Types/JSON";
import API from "Common/Utils/API";
import logger from "Common/Server/Utils/Logger";
import ProbeAPIRequest from "../Utils/ProbeAPIRequest";
const router: ExpressRouter = Express.getRouter();
// Metrics endpoint for Keda autoscaling
router.get(
"/queue-size",
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
// Get the pending monitor count for this specific probe from ProbeIngest API
const queueSizeUrl: URL = URL.fromString(
PROBE_INGEST_URL.toString(),
).addRoute("/metrics/queue-size");
logger.debug("Fetching queue size from ProbeIngest API");
// Use probe authentication (probe key and probe ID)
const requestBody: JSONObject = ProbeAPIRequest.getDefaultRequestBody();
const result: HTTPResponse<JSONObject> | HTTPErrorResponse =
await API.fetch<JSONObject>(
HTTPMethod.POST,
queueSizeUrl,
requestBody,
{}
);
if (result instanceof HTTPErrorResponse) {
logger.error("Error fetching queue size from ProbeIngest API");
logger.error(result);
throw result;
}
// Extract queueSize from the response
const queueSize = result.data["queueSize"] || 0;
logger.debug(`Queue size fetched: ${queueSize}`);
return Response.sendJsonObjectResponse(req, res, {
queueSize: queueSize,
});
} catch (err) {
logger.error("Error in metrics queue-size endpoint");
logger.error(err);
return next(err);
}
},
);
export default router;

View File

@@ -3,10 +3,12 @@ import AliveJob from "./Jobs/Alive";
import FetchMonitorList from "./Jobs/Monitor/FetchList";
import FetchMonitorTestList from "./Jobs/Monitor/FetchMonitorTest";
import Register from "./Services/Register";
import MetricsAPI from "./API/Metrics";
import { PromiseVoidFunction } from "Common/Types/FunctionTypes";
import logger from "Common/Server/Utils/Logger";
import App from "Common/Server/Utils/StartServer";
import Telemetry from "Common/Server/Utils/Telemetry";
import Express, { ExpressApplication } from "Common/Server/Utils/Express";
import "ejs";
const APP_NAME: string = "probe";
@@ -29,6 +31,10 @@ const init: PromiseVoidFunction = async (): Promise<void> => {
},
});
// Add metrics API routes
const app: ExpressApplication = Express.getExpressApp();
app.use("/metrics", MetricsAPI);
// add default routes
await App.addDefaultRoutes();

View File

@@ -24,6 +24,11 @@ import Probe from "Common/Models/DatabaseModels/Probe";
import User from "Common/Models/DatabaseModels/User";
import ProbeIngestQueueService from "../Services/Queue/ProbeIngestQueueService";
import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
import PositiveNumber from "Common/Types/PositiveNumber";
import MonitorProbeService from "Common/Server/Services/MonitorProbeService";
import QueryHelper from "Common/Server/Types/Database/QueryHelper";
import OneUptimeDate from "Common/Types/Date";
import MonitorService from "Common/Server/Services/MonitorService";
const router: ExpressRouter = Express.getRouter();
@@ -363,6 +368,60 @@ router.get(
},
);
// Queue size endpoint for Keda autoscaling (returns pending monitors count for specific probe)
router.post(
"/metrics/queue-size",
ProbeAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
// This endpoint returns the number of monitors pending for the specific probe
// to be used by Keda for autoscaling probe replicas
// Get the probe ID from the authenticated request
const data: JSONObject = req.body;
const probeId: ObjectID = new ObjectID(data["probeId"] as string);
if (!probeId) {
return Response.sendErrorResponse(
req,
res,
new BadDataException("Probe ID not found"),
);
}
// Get pending monitor count for this specific probe
const pendingCount: PositiveNumber = await MonitorProbeService.countBy({
query: {
probeId: probeId,
isEnabled: true,
nextPingAt: QueryHelper.lessThanEqualToOrNull(
OneUptimeDate.getCurrentDate(),
),
monitor: {
...MonitorService.getEnabledMonitorQuery(),
},
project: {
...ProjectService.getActiveProjectStatusQuery(),
},
},
props: {
isRoot: true,
},
});
return Response.sendJsonObjectResponse(req, res, {
queueSize: pendingCount.toNumber()
});
} catch (err) {
return next(err);
}
},
);
// Queue failed jobs endpoint
router.get(
"/probe/queue/failed",

View File

@@ -22,9 +22,7 @@ QueueWorker.getWorker(
await processProbeFromQueue(jobData);
logger.debug(
`Successfully processed probe ingestion job: ${job.name}`,
);
logger.debug(`Successfully processed probe ingestion job: ${job.name}`);
} catch (error) {
logger.error(`Error processing probe ingestion job:`);
logger.error(error);
@@ -50,10 +48,7 @@ async function processProbeFromQueue(
if (jobData.jobType === "probe-response") {
// Handle regular probe response
MonitorResourceUtil.monitorResource(probeResponse).catch((err: Error) => {
logger.error("Error in monitor resource");
logger.error(err);
});
await MonitorResourceUtil.monitorResource(probeResponse);
} else if (jobData.jobType === "monitor-test" && jobData.testId) {
// Handle monitor test response
const testId: ObjectID = new ObjectID(jobData.testId);

View File

@@ -11,13 +11,11 @@ export interface ProbeIngestJobData {
}
export default class ProbeIngestQueueService {
public static async addProbeIngestJob(
data: {
probeMonitorResponse: JSONObject;
jobType: "probe-response" | "monitor-test";
testId?: string;
},
): Promise<void> {
public static async addProbeIngestJob(data: {
probeMonitorResponse: JSONObject;
jobType: "probe-response" | "monitor-test";
testId?: string;
}): Promise<void> {
try {
const jobData: ProbeIngestJobData = {
probeMonitorResponse: data.probeMonitorResponse,

View File

@@ -22,7 +22,8 @@ router.get(
next: NextFunction,
): Promise<void> => {
try {
const queueSize: number = await ServerMonitorIngestQueueService.getQueueSize();
const queueSize: number =
await ServerMonitorIngestQueueService.getQueueSize();
res.setHeader("Content-Type", "application/json");
res.status(200).json({

View File

@@ -10,12 +10,10 @@ export interface ServerMonitorIngestJobData {
}
export default class ServerMonitorIngestQueueService {
public static async addServerMonitorIngestJob(
data: {
secretKey: string;
serverMonitorResponse: JSONObject;
},
): Promise<void> {
public static async addServerMonitorIngestJob(data: {
secretKey: string;
serverMonitorResponse: JSONObject;
}): Promise<void> {
try {
const jobData: ServerMonitorIngestJobData = {
secretKey: data.secretKey,