feat: implement combined queue size metrics for KEDA autoscaling

This commit is contained in:
Nawaz Dhandala
2026-04-02 14:29:39 +01:00
parent eb4010dfa5
commit 54a79a8100
6 changed files with 63 additions and 11 deletions

35
App/API/Metrics.ts Normal file
View File

@@ -0,0 +1,35 @@
import Express, {
ExpressRequest,
ExpressResponse,
ExpressRouter,
NextFunction,
} from "Common/Server/Utils/Express";
import AppQueueService from "../Services/Queue/AppQueueService";
const router: ExpressRouter = Express.getRouter();
/**
* JSON metrics endpoint for KEDA autoscaling
* Returns combined queue size (worker + workflow + telemetry) as JSON for KEDA metrics-api scaler
*/
router.get(
"/metrics/queue-size",
async (
_req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
const queueSize: number = await AppQueueService.getQueueSize();
res.setHeader("Content-Type", "application/json");
res.status(200).json({
queueSize: queueSize,
});
} catch (err) {
return next(err);
}
},
);
export default router;

View File

@@ -9,6 +9,8 @@ import NotificationRoutes from "./FeatureSet/Notification/Index";
import WorkersRoutes from "./FeatureSet/Workers/Index";
import TelemetryRoutes from "./FeatureSet/Telemetry/Index";
import WorkflowRoutes from "./FeatureSet/Workflow/Index";
import AppMetricsAPI from "./API/Metrics";
import Express, { ExpressApplication } from "Common/Server/Utils/Express";
import { PromiseVoidFunction } from "Common/Types/FunctionTypes";
import { ClickhouseAppInstance } from "Common/Server/Infrastructure/ClickhouseDatabase";
import PostgresAppInstance from "Common/Server/Infrastructure/PostgresDatabase";
@@ -103,6 +105,10 @@ const init: PromiseVoidFunction = async (): Promise<void> => {
// Initialize real-time functionalities
await Realtime.init();
// Expose app-level combined metrics endpoint for KEDA
const expressApp: ExpressApplication = Express.getExpressApp();
expressApp.use("/", AppMetricsAPI);
// Initialize feature sets
await IdentityRoutes.init();
await NotificationRoutes.init();

View File

@@ -0,0 +1,16 @@
import Queue, { QueueName } from "Common/Server/Infrastructure/Queue";
export default class AppQueueService {
public static async getQueueSize(): Promise<number> {
const [workerSize, workflowSize, telemetrySize]: [
number,
number,
number,
] = await Promise.all([
Queue.getQueueSize(QueueName.Worker),
Queue.getQueueSize(QueueName.Workflow),
Queue.getQueueSize(QueueName.Telemetry),
]);
return workerSize + workflowSize + telemetrySize;
}
}

View File

@@ -17,9 +17,9 @@ KEDA ScaledObjects for various services
{{- end }}
{{- end }}
{{/* App KEDA ScaledObject - scales based on worker and telemetry queue sizes */}}
{{/* App KEDA ScaledObject - scales based on combined queue size (worker + workflow + telemetry) */}}
{{- if and .Values.keda.enabled .Values.app.enabled .Values.app.keda.enabled (not .Values.app.disableAutoscaler) (not .Values.deployment.disableDeployments) }}
{{- $metricsConfig := dict "enabled" .Values.app.keda.enabled "minReplicas" .Values.app.keda.minReplicas "maxReplicas" .Values.app.keda.maxReplicas "pollingInterval" .Values.app.keda.pollingInterval "cooldownPeriod" .Values.app.keda.cooldownPeriod "targetCPUUtilizationPercentage" .Values.app.keda.targetCPUUtilizationPercentage "targetMemoryUtilizationPercentage" .Values.app.keda.targetMemoryUtilizationPercentage "triggers" (list (dict "query" "oneuptime_app_worker_queue_size" "threshold" .Values.app.keda.workerQueueSizeThreshold "port" .Values.app.ports.http "urlPath" "/worker/metrics/queue-size") (dict "query" "oneuptime_app_telemetry_queue_size" "threshold" .Values.app.keda.telemetryQueueSizeThreshold "port" .Values.app.ports.http "urlPath" "/telemetry/metrics/queue-size")) }}
{{- $metricsConfig := dict "enabled" .Values.app.keda.enabled "minReplicas" .Values.app.keda.minReplicas "maxReplicas" .Values.app.keda.maxReplicas "pollingInterval" .Values.app.keda.pollingInterval "cooldownPeriod" .Values.app.keda.cooldownPeriod "targetCPUUtilizationPercentage" .Values.app.keda.targetCPUUtilizationPercentage "targetMemoryUtilizationPercentage" .Values.app.keda.targetMemoryUtilizationPercentage "triggers" (list (dict "query" "oneuptime_app_queue_size" "threshold" .Values.app.keda.queueSizeThreshold "port" .Values.app.ports.http)) }}
{{- $appKedaArgs := dict "ServiceName" "app" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.app.disableAutoscaler }}
{{- include "oneuptime.kedaScaledObject" $appKedaArgs }}
{{- end }}

View File

@@ -1807,10 +1807,7 @@
"maxReplicas": {
"type": "integer"
},
"workerQueueSizeThreshold": {
"type": "integer"
},
"telemetryQueueSizeThreshold": {
"queueSizeThreshold": {
"type": "integer"
},
"targetCPUUtilizationPercentage": {

View File

@@ -662,15 +662,13 @@ app:
nodeSelector: {}
podSecurityContext: {}
containerSecurityContext: {}
# KEDA autoscaling configuration based on worker and telemetry queue metrics
# KEDA autoscaling configuration based on combined queue metrics (worker + workflow + telemetry)
keda:
enabled: false
minReplicas: 1
maxReplicas: 100
# Scale up when worker queue size exceeds this threshold
workerQueueSizeThreshold: 10
# Scale up when telemetry queue size exceeds this threshold
telemetryQueueSizeThreshold: 10
# Scale up when combined queue size (worker + workflow + telemetry) exceeds this threshold
queueSizeThreshold: 10
# Scale up when average CPU utilization exceeds this percentage (0 to disable)
targetCPUUtilizationPercentage: 80
# Scale up when average memory utilization exceeds this percentage (0 to disable)