diff --git a/HelmChart/Public/oneuptime/templates/keda-scaledobjects.yaml b/HelmChart/Public/oneuptime/templates/keda-scaledobjects.yaml index 13807a9085..ae39246229 100644 --- a/HelmChart/Public/oneuptime/templates/keda-scaledobjects.yaml +++ b/HelmChart/Public/oneuptime/templates/keda-scaledobjects.yaml @@ -35,4 +35,14 @@ KEDA ScaledObjects for various services {{- $metricsConfig := dict "enabled" .Values.probeIngest.keda.enabled "minReplicas" .Values.probeIngest.keda.minReplicas "maxReplicas" .Values.probeIngest.keda.maxReplicas "pollingInterval" .Values.probeIngest.keda.pollingInterval "cooldownPeriod" .Values.probeIngest.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_probe_ingest_queue_size" "threshold" .Values.probeIngest.keda.queueSizeThreshold "port" .Values.port.probeIngest)) }} {{- $probeIngestKedaArgs := dict "ServiceName" "probe-ingest" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.probeIngest.disableAutoscaler }} {{- include "oneuptime.kedaScaledObject" $probeIngestKedaArgs }} +{{- end }} + +{{/* Probe KEDA ScaledObjects - one for each probe configuration */}} +{{- range $key, $val := $.Values.probes }} +{{- if and $.Values.keda.enabled $val.keda.enabled (not $val.disableAutoscaler) }} +{{- $serviceName := printf "probe-%s" $key }} +{{- $metricsConfig := dict "enabled" $val.keda.enabled "minReplicas" $val.keda.minReplicas "maxReplicas" $val.keda.maxReplicas "pollingInterval" $val.keda.pollingInterval "cooldownPeriod" $val.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_probe_queue_size" "threshold" $val.keda.queueSizeThreshold "port" $.Values.port.probe)) }} +{{- $probeKedaArgs := dict "ServiceName" $serviceName "Release" $.Release "Values" $.Values "MetricsConfig" $metricsConfig "DisableAutoscaler" $val.disableAutoscaler }} +{{- include "oneuptime.kedaScaledObject" $probeKedaArgs }} +{{- end }} {{- end }} \ No newline at end of file diff --git a/HelmChart/Public/oneuptime/values.yaml b/HelmChart/Public/oneuptime/values.yaml index 8f3adf020e..67be097d0e 100644 --- a/HelmChart/Public/oneuptime/values.yaml +++ b/HelmChart/Public/oneuptime/values.yaml @@ -198,6 +198,17 @@ probes: customCodeMonitorScriptTimeoutInMs: 60000 disableTelemetryCollection: false disableAutoscaler: false + # KEDA autoscaling configuration based on monitor queue metrics + keda: + enabled: false + minReplicas: 1 + maxReplicas: 100 + # Scale up when queue size exceeds this threshold per probe + queueSizeThreshold: 10 + # Polling interval for metrics (in seconds) + pollingInterval: 30 + # Cooldown period after scaling (in seconds) + cooldownPeriod: 300 # resources: # additionalContainers: # two: @@ -213,10 +224,22 @@ probes: # disableAutoscaler: false # resources: # additionalContainers: + # KEDA autoscaling configuration based on monitor queue metrics + # keda: + # enabled: false + # minReplicas: 1 + # maxReplicas: 100 + # # Scale up when queue size exceeds this threshold per probe + # queueSizeThreshold: 10 + # # Polling interval for metrics (in seconds) + # pollingInterval: 30 + # # Cooldown period after scaling (in seconds) + # cooldownPeriod: 300 port: app: 3002 + probe: 3874 probeIngest: 3400 serverMonitorIngest: 3404 openTelemetryIngest: 3403 diff --git a/Probe/API/Metrics.ts b/Probe/API/Metrics.ts new file mode 100644 index 0000000000..44dae239a3 --- /dev/null +++ b/Probe/API/Metrics.ts @@ -0,0 +1,70 @@ +import Express, { + ExpressRequest, + ExpressResponse, + ExpressRouter, + NextFunction, +} from "Common/Server/Utils/Express"; +import Response from "Common/Server/Utils/Response"; +import { PROBE_INGEST_URL } from "../Config"; +import HTTPErrorResponse from "Common/Types/API/HTTPErrorResponse"; +import HTTPMethod from "Common/Types/API/HTTPMethod"; +import HTTPResponse from "Common/Types/API/HTTPResponse"; +import URL from "Common/Types/API/URL"; +import { JSONObject } from "Common/Types/JSON"; +import API from "Common/Utils/API"; +import logger from "Common/Server/Utils/Logger"; +import ProbeAPIRequest from "../Utils/ProbeAPIRequest"; + +const router: ExpressRouter = Express.getRouter(); + +// Metrics endpoint for Keda autoscaling +router.get( + "/queue-size", + async ( + req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise => { + try { + // Get the pending monitor count for this specific probe from ProbeIngest API + const queueSizeUrl: URL = URL.fromString( + PROBE_INGEST_URL.toString(), + ).addRoute("/metrics/queue-size"); + + logger.debug("Fetching queue size from ProbeIngest API"); + + // Use probe authentication (probe key and probe ID) + const requestBody: JSONObject = ProbeAPIRequest.getDefaultRequestBody(); + + const result: HTTPResponse | HTTPErrorResponse = + await API.fetch( + HTTPMethod.POST, + queueSizeUrl, + requestBody, + {} + ); + + if (result instanceof HTTPErrorResponse) { + logger.error("Error fetching queue size from ProbeIngest API"); + logger.error(result); + throw result; + } + + // Extract queueSize from the response + const queueSize = result.data["queueSize"] || 0; + + logger.debug(`Queue size fetched: ${queueSize}`); + + return Response.sendJsonObjectResponse(req, res, { + queueSize: queueSize, + }); + + } catch (err) { + logger.error("Error in metrics queue-size endpoint"); + logger.error(err); + return next(err); + } + }, +); + +export default router; diff --git a/Probe/Index.ts b/Probe/Index.ts index c774cc34ff..8a7bc99f20 100644 --- a/Probe/Index.ts +++ b/Probe/Index.ts @@ -3,10 +3,12 @@ import AliveJob from "./Jobs/Alive"; import FetchMonitorList from "./Jobs/Monitor/FetchList"; import FetchMonitorTestList from "./Jobs/Monitor/FetchMonitorTest"; import Register from "./Services/Register"; +import MetricsAPI from "./API/Metrics"; import { PromiseVoidFunction } from "Common/Types/FunctionTypes"; import logger from "Common/Server/Utils/Logger"; import App from "Common/Server/Utils/StartServer"; import Telemetry from "Common/Server/Utils/Telemetry"; +import Express, { ExpressApplication } from "Common/Server/Utils/Express"; import "ejs"; const APP_NAME: string = "probe"; @@ -29,6 +31,10 @@ const init: PromiseVoidFunction = async (): Promise => { }, }); + // Add metrics API routes + const app: ExpressApplication = Express.getExpressApp(); + app.use("/metrics", MetricsAPI); + // add default routes await App.addDefaultRoutes(); diff --git a/ProbeIngest/API/Probe.ts b/ProbeIngest/API/Probe.ts index e7f7378713..b6a88d3769 100644 --- a/ProbeIngest/API/Probe.ts +++ b/ProbeIngest/API/Probe.ts @@ -24,6 +24,11 @@ import Probe from "Common/Models/DatabaseModels/Probe"; import User from "Common/Models/DatabaseModels/User"; import ProbeIngestQueueService from "../Services/Queue/ProbeIngestQueueService"; import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization"; +import PositiveNumber from "Common/Types/PositiveNumber"; +import MonitorProbeService from "Common/Server/Services/MonitorProbeService"; +import QueryHelper from "Common/Server/Types/Database/QueryHelper"; +import OneUptimeDate from "Common/Types/Date"; +import MonitorService from "Common/Server/Services/MonitorService"; const router: ExpressRouter = Express.getRouter(); @@ -363,6 +368,60 @@ router.get( }, ); +// Queue size endpoint for Keda autoscaling (returns pending monitors count for specific probe) +router.post( + "/metrics/queue-size", + ProbeAuthorization.isAuthorizedServiceMiddleware, + async ( + req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise => { + try { + // This endpoint returns the number of monitors pending for the specific probe + // to be used by Keda for autoscaling probe replicas + + // Get the probe ID from the authenticated request + const data: JSONObject = req.body; + const probeId: ObjectID = new ObjectID(data["probeId"] as string); + + if (!probeId) { + return Response.sendErrorResponse( + req, + res, + new BadDataException("Probe ID not found"), + ); + } + + // Get pending monitor count for this specific probe + const pendingCount: PositiveNumber = await MonitorProbeService.countBy({ + query: { + probeId: probeId, + isEnabled: true, + nextPingAt: QueryHelper.lessThanEqualToOrNull( + OneUptimeDate.getCurrentDate(), + ), + monitor: { + ...MonitorService.getEnabledMonitorQuery(), + }, + project: { + ...ProjectService.getActiveProjectStatusQuery(), + }, + }, + props: { + isRoot: true, + }, + }); + + return Response.sendJsonObjectResponse(req, res, { + queueSize: pendingCount.toNumber() + }); + } catch (err) { + return next(err); + } + }, +); + // Queue failed jobs endpoint router.get( "/probe/queue/failed",