feat: Implement KEDA autoscaling configuration for probes and add metrics endpoints

This commit is contained in:
Simon Larsen
2025-08-01 15:38:04 +01:00
parent c894b112e6
commit c4c6793b29
5 changed files with 168 additions and 0 deletions

View File

@@ -35,4 +35,14 @@ KEDA ScaledObjects for various services
{{- $metricsConfig := dict "enabled" .Values.probeIngest.keda.enabled "minReplicas" .Values.probeIngest.keda.minReplicas "maxReplicas" .Values.probeIngest.keda.maxReplicas "pollingInterval" .Values.probeIngest.keda.pollingInterval "cooldownPeriod" .Values.probeIngest.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_probe_ingest_queue_size" "threshold" .Values.probeIngest.keda.queueSizeThreshold "port" .Values.port.probeIngest)) }}
{{- $probeIngestKedaArgs := dict "ServiceName" "probe-ingest" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.probeIngest.disableAutoscaler }}
{{- include "oneuptime.kedaScaledObject" $probeIngestKedaArgs }}
{{- end }}
{{/* Probe KEDA ScaledObjects - one for each probe configuration */}}
{{- range $key, $val := $.Values.probes }}
{{- if and $.Values.keda.enabled $val.keda.enabled (not $val.disableAutoscaler) }}
{{- $serviceName := printf "probe-%s" $key }}
{{- $metricsConfig := dict "enabled" $val.keda.enabled "minReplicas" $val.keda.minReplicas "maxReplicas" $val.keda.maxReplicas "pollingInterval" $val.keda.pollingInterval "cooldownPeriod" $val.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_probe_queue_size" "threshold" $val.keda.queueSizeThreshold "port" $.Values.port.probe)) }}
{{- $probeKedaArgs := dict "ServiceName" $serviceName "Release" $.Release "Values" $.Values "MetricsConfig" $metricsConfig "DisableAutoscaler" $val.disableAutoscaler }}
{{- include "oneuptime.kedaScaledObject" $probeKedaArgs }}
{{- end }}
{{- end }}

View File

@@ -198,6 +198,17 @@ probes:
customCodeMonitorScriptTimeoutInMs: 60000
disableTelemetryCollection: false
disableAutoscaler: false
# KEDA autoscaling configuration based on monitor queue metrics
keda:
enabled: false
minReplicas: 1
maxReplicas: 100
# Scale up when queue size exceeds this threshold per probe
queueSizeThreshold: 10
# Polling interval for metrics (in seconds)
pollingInterval: 30
# Cooldown period after scaling (in seconds)
cooldownPeriod: 300
# resources:
# additionalContainers:
# two:
@@ -213,10 +224,22 @@ probes:
# disableAutoscaler: false
# resources:
# additionalContainers:
# KEDA autoscaling configuration based on monitor queue metrics
# keda:
# enabled: false
# minReplicas: 1
# maxReplicas: 100
# # Scale up when queue size exceeds this threshold per probe
# queueSizeThreshold: 10
# # Polling interval for metrics (in seconds)
# pollingInterval: 30
# # Cooldown period after scaling (in seconds)
# cooldownPeriod: 300
port:
app: 3002
probe: 3874
probeIngest: 3400
serverMonitorIngest: 3404
openTelemetryIngest: 3403

70
Probe/API/Metrics.ts Normal file
View File

@@ -0,0 +1,70 @@
import Express, {
ExpressRequest,
ExpressResponse,
ExpressRouter,
NextFunction,
} from "Common/Server/Utils/Express";
import Response from "Common/Server/Utils/Response";
import { PROBE_INGEST_URL } from "../Config";
import HTTPErrorResponse from "Common/Types/API/HTTPErrorResponse";
import HTTPMethod from "Common/Types/API/HTTPMethod";
import HTTPResponse from "Common/Types/API/HTTPResponse";
import URL from "Common/Types/API/URL";
import { JSONObject } from "Common/Types/JSON";
import API from "Common/Utils/API";
import logger from "Common/Server/Utils/Logger";
import ProbeAPIRequest from "../Utils/ProbeAPIRequest";
const router: ExpressRouter = Express.getRouter();
// Metrics endpoint for Keda autoscaling
router.get(
"/queue-size",
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
// Get the pending monitor count for this specific probe from ProbeIngest API
const queueSizeUrl: URL = URL.fromString(
PROBE_INGEST_URL.toString(),
).addRoute("/metrics/queue-size");
logger.debug("Fetching queue size from ProbeIngest API");
// Use probe authentication (probe key and probe ID)
const requestBody: JSONObject = ProbeAPIRequest.getDefaultRequestBody();
const result: HTTPResponse<JSONObject> | HTTPErrorResponse =
await API.fetch<JSONObject>(
HTTPMethod.POST,
queueSizeUrl,
requestBody,
{}
);
if (result instanceof HTTPErrorResponse) {
logger.error("Error fetching queue size from ProbeIngest API");
logger.error(result);
throw result;
}
// Extract queueSize from the response
const queueSize = result.data["queueSize"] || 0;
logger.debug(`Queue size fetched: ${queueSize}`);
return Response.sendJsonObjectResponse(req, res, {
queueSize: queueSize,
});
} catch (err) {
logger.error("Error in metrics queue-size endpoint");
logger.error(err);
return next(err);
}
},
);
export default router;

View File

@@ -3,10 +3,12 @@ import AliveJob from "./Jobs/Alive";
import FetchMonitorList from "./Jobs/Monitor/FetchList";
import FetchMonitorTestList from "./Jobs/Monitor/FetchMonitorTest";
import Register from "./Services/Register";
import MetricsAPI from "./API/Metrics";
import { PromiseVoidFunction } from "Common/Types/FunctionTypes";
import logger from "Common/Server/Utils/Logger";
import App from "Common/Server/Utils/StartServer";
import Telemetry from "Common/Server/Utils/Telemetry";
import Express, { ExpressApplication } from "Common/Server/Utils/Express";
import "ejs";
const APP_NAME: string = "probe";
@@ -29,6 +31,10 @@ const init: PromiseVoidFunction = async (): Promise<void> => {
},
});
// Add metrics API routes
const app: ExpressApplication = Express.getExpressApp();
app.use("/metrics", MetricsAPI);
// add default routes
await App.addDefaultRoutes();

View File

@@ -24,6 +24,11 @@ import Probe from "Common/Models/DatabaseModels/Probe";
import User from "Common/Models/DatabaseModels/User";
import ProbeIngestQueueService from "../Services/Queue/ProbeIngestQueueService";
import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
import PositiveNumber from "Common/Types/PositiveNumber";
import MonitorProbeService from "Common/Server/Services/MonitorProbeService";
import QueryHelper from "Common/Server/Types/Database/QueryHelper";
import OneUptimeDate from "Common/Types/Date";
import MonitorService from "Common/Server/Services/MonitorService";
const router: ExpressRouter = Express.getRouter();
@@ -363,6 +368,60 @@ router.get(
},
);
// Queue size endpoint for Keda autoscaling (returns pending monitors count for specific probe)
router.post(
"/metrics/queue-size",
ProbeAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
// This endpoint returns the number of monitors pending for the specific probe
// to be used by Keda for autoscaling probe replicas
// Get the probe ID from the authenticated request
const data: JSONObject = req.body;
const probeId: ObjectID = new ObjectID(data["probeId"] as string);
if (!probeId) {
return Response.sendErrorResponse(
req,
res,
new BadDataException("Probe ID not found"),
);
}
// Get pending monitor count for this specific probe
const pendingCount: PositiveNumber = await MonitorProbeService.countBy({
query: {
probeId: probeId,
isEnabled: true,
nextPingAt: QueryHelper.lessThanEqualToOrNull(
OneUptimeDate.getCurrentDate(),
),
monitor: {
...MonitorService.getEnabledMonitorQuery(),
},
project: {
...ProjectService.getActiveProjectStatusQuery(),
},
},
props: {
isRoot: true,
},
});
return Response.sendJsonObjectResponse(req, res, {
queueSize: pendingCount.toNumber()
});
} catch (err) {
return next(err);
}
},
);
// Queue failed jobs endpoint
router.get(
"/probe/queue/failed",