From 54a79a81009989ab48f89338f6563f92c9350eee Mon Sep 17 00:00:00 2001 From: Nawaz Dhandala Date: Thu, 2 Apr 2026 14:29:39 +0100 Subject: [PATCH] feat: implement combined queue size metrics for KEDA autoscaling --- App/API/Metrics.ts | 35 +++++++++++++++++++ App/Index.ts | 6 ++++ App/Services/Queue/AppQueueService.ts | 16 +++++++++ .../templates/keda-scaledobjects.yaml | 4 +-- HelmChart/Public/oneuptime/values.schema.json | 5 +-- HelmChart/Public/oneuptime/values.yaml | 8 ++--- 6 files changed, 63 insertions(+), 11 deletions(-) create mode 100644 App/API/Metrics.ts create mode 100644 App/Services/Queue/AppQueueService.ts diff --git a/App/API/Metrics.ts b/App/API/Metrics.ts new file mode 100644 index 0000000000..6e778e8068 --- /dev/null +++ b/App/API/Metrics.ts @@ -0,0 +1,35 @@ +import Express, { + ExpressRequest, + ExpressResponse, + ExpressRouter, + NextFunction, +} from "Common/Server/Utils/Express"; +import AppQueueService from "../Services/Queue/AppQueueService"; + +const router: ExpressRouter = Express.getRouter(); + +/** + * JSON metrics endpoint for KEDA autoscaling + * Returns combined queue size (worker + workflow + telemetry) as JSON for KEDA metrics-api scaler + */ +router.get( + "/metrics/queue-size", + async ( + _req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise => { + try { + const queueSize: number = await AppQueueService.getQueueSize(); + + res.setHeader("Content-Type", "application/json"); + res.status(200).json({ + queueSize: queueSize, + }); + } catch (err) { + return next(err); + } + }, +); + +export default router; diff --git a/App/Index.ts b/App/Index.ts index f63d7e5680..6eaf70e462 100755 --- a/App/Index.ts +++ b/App/Index.ts @@ -9,6 +9,8 @@ import NotificationRoutes from "./FeatureSet/Notification/Index"; import WorkersRoutes from "./FeatureSet/Workers/Index"; import TelemetryRoutes from "./FeatureSet/Telemetry/Index"; import WorkflowRoutes from "./FeatureSet/Workflow/Index"; +import AppMetricsAPI from "./API/Metrics"; +import Express, { ExpressApplication } from "Common/Server/Utils/Express"; import { PromiseVoidFunction } from "Common/Types/FunctionTypes"; import { ClickhouseAppInstance } from "Common/Server/Infrastructure/ClickhouseDatabase"; import PostgresAppInstance from "Common/Server/Infrastructure/PostgresDatabase"; @@ -103,6 +105,10 @@ const init: PromiseVoidFunction = async (): Promise => { // Initialize real-time functionalities await Realtime.init(); + // Expose app-level combined metrics endpoint for KEDA + const expressApp: ExpressApplication = Express.getExpressApp(); + expressApp.use("/", AppMetricsAPI); + // Initialize feature sets await IdentityRoutes.init(); await NotificationRoutes.init(); diff --git a/App/Services/Queue/AppQueueService.ts b/App/Services/Queue/AppQueueService.ts new file mode 100644 index 0000000000..733cfb386d --- /dev/null +++ b/App/Services/Queue/AppQueueService.ts @@ -0,0 +1,16 @@ +import Queue, { QueueName } from "Common/Server/Infrastructure/Queue"; + +export default class AppQueueService { + public static async getQueueSize(): Promise { + const [workerSize, workflowSize, telemetrySize]: [ + number, + number, + number, + ] = await Promise.all([ + Queue.getQueueSize(QueueName.Worker), + Queue.getQueueSize(QueueName.Workflow), + Queue.getQueueSize(QueueName.Telemetry), + ]); + return workerSize + workflowSize + telemetrySize; + } +} diff --git a/HelmChart/Public/oneuptime/templates/keda-scaledobjects.yaml b/HelmChart/Public/oneuptime/templates/keda-scaledobjects.yaml index 25233c72ab..e4afbdcd83 100644 --- a/HelmChart/Public/oneuptime/templates/keda-scaledobjects.yaml +++ b/HelmChart/Public/oneuptime/templates/keda-scaledobjects.yaml @@ -17,9 +17,9 @@ KEDA ScaledObjects for various services {{- end }} {{- end }} -{{/* App KEDA ScaledObject - scales based on worker and telemetry queue sizes */}} +{{/* App KEDA ScaledObject - scales based on combined queue size (worker + workflow + telemetry) */}} {{- if and .Values.keda.enabled .Values.app.enabled .Values.app.keda.enabled (not .Values.app.disableAutoscaler) (not .Values.deployment.disableDeployments) }} -{{- $metricsConfig := dict "enabled" .Values.app.keda.enabled "minReplicas" .Values.app.keda.minReplicas "maxReplicas" .Values.app.keda.maxReplicas "pollingInterval" .Values.app.keda.pollingInterval "cooldownPeriod" .Values.app.keda.cooldownPeriod "targetCPUUtilizationPercentage" .Values.app.keda.targetCPUUtilizationPercentage "targetMemoryUtilizationPercentage" .Values.app.keda.targetMemoryUtilizationPercentage "triggers" (list (dict "query" "oneuptime_app_worker_queue_size" "threshold" .Values.app.keda.workerQueueSizeThreshold "port" .Values.app.ports.http "urlPath" "/worker/metrics/queue-size") (dict "query" "oneuptime_app_telemetry_queue_size" "threshold" .Values.app.keda.telemetryQueueSizeThreshold "port" .Values.app.ports.http "urlPath" "/telemetry/metrics/queue-size")) }} +{{- $metricsConfig := dict "enabled" .Values.app.keda.enabled "minReplicas" .Values.app.keda.minReplicas "maxReplicas" .Values.app.keda.maxReplicas "pollingInterval" .Values.app.keda.pollingInterval "cooldownPeriod" .Values.app.keda.cooldownPeriod "targetCPUUtilizationPercentage" .Values.app.keda.targetCPUUtilizationPercentage "targetMemoryUtilizationPercentage" .Values.app.keda.targetMemoryUtilizationPercentage "triggers" (list (dict "query" "oneuptime_app_queue_size" "threshold" .Values.app.keda.queueSizeThreshold "port" .Values.app.ports.http)) }} {{- $appKedaArgs := dict "ServiceName" "app" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.app.disableAutoscaler }} {{- include "oneuptime.kedaScaledObject" $appKedaArgs }} {{- end }} diff --git a/HelmChart/Public/oneuptime/values.schema.json b/HelmChart/Public/oneuptime/values.schema.json index f1c9ef752f..6d0ed19ac9 100644 --- a/HelmChart/Public/oneuptime/values.schema.json +++ b/HelmChart/Public/oneuptime/values.schema.json @@ -1807,10 +1807,7 @@ "maxReplicas": { "type": "integer" }, - "workerQueueSizeThreshold": { - "type": "integer" - }, - "telemetryQueueSizeThreshold": { + "queueSizeThreshold": { "type": "integer" }, "targetCPUUtilizationPercentage": { diff --git a/HelmChart/Public/oneuptime/values.yaml b/HelmChart/Public/oneuptime/values.yaml index df7027a8e9..a1a604b7d2 100644 --- a/HelmChart/Public/oneuptime/values.yaml +++ b/HelmChart/Public/oneuptime/values.yaml @@ -662,15 +662,13 @@ app: nodeSelector: {} podSecurityContext: {} containerSecurityContext: {} - # KEDA autoscaling configuration based on worker and telemetry queue metrics + # KEDA autoscaling configuration based on combined queue metrics (worker + workflow + telemetry) keda: enabled: false minReplicas: 1 maxReplicas: 100 - # Scale up when worker queue size exceeds this threshold - workerQueueSizeThreshold: 10 - # Scale up when telemetry queue size exceeds this threshold - telemetryQueueSizeThreshold: 10 + # Scale up when combined queue size (worker + workflow + telemetry) exceeds this threshold + queueSizeThreshold: 10 # Scale up when average CPU utilization exceeds this percentage (0 to disable) targetCPUUtilizationPercentage: 80 # Scale up when average memory utilization exceeds this percentage (0 to disable)