From c0259fc0412fc38a464f93a0724af69a9264cd07 Mon Sep 17 00:00:00 2001 From: Simon Larsen Date: Fri, 8 Aug 2025 21:21:53 +0100 Subject: [PATCH] feat: Add concurrency setting for OpenTelemetry Ingest worker and update related configurations --- .../oneuptime/templates/open-telemetry-ingest.yaml | 2 ++ HelmChart/Public/oneuptime/values.yaml | 2 ++ OpenTelemetryIngest/Config.ts | 9 +++++++++ .../Jobs/TelemetryIngest/ProcessTelemetry.ts | 3 ++- config.example.env | 5 +++++ docker-compose.base.yml | 2 ++ 6 files changed, 22 insertions(+), 1 deletion(-) create mode 100644 OpenTelemetryIngest/Config.ts diff --git a/HelmChart/Public/oneuptime/templates/open-telemetry-ingest.yaml b/HelmChart/Public/oneuptime/templates/open-telemetry-ingest.yaml index e51a74d718..9926393727 100644 --- a/HelmChart/Public/oneuptime/templates/open-telemetry-ingest.yaml +++ b/HelmChart/Public/oneuptime/templates/open-telemetry-ingest.yaml @@ -93,6 +93,8 @@ spec: value: {{ $.Values.openTelemetryIngest.ports.http | quote }} - name: DISABLE_TELEMETRY value: {{ $.Values.openTelemetryIngest.disableTelemetryCollection | quote }} + - name: OPEN_TELEMETRY_INGEST_CONCURRENCY + value: {{ $.Values.openTelemetryIngest.concurrency | squote }} ports: - containerPort: {{ $.Values.openTelemetryIngest.ports.http }} protocol: TCP diff --git a/HelmChart/Public/oneuptime/values.yaml b/HelmChart/Public/oneuptime/values.yaml index f34d7d40b6..b22b13236f 100644 --- a/HelmChart/Public/oneuptime/values.yaml +++ b/HelmChart/Public/oneuptime/values.yaml @@ -537,6 +537,8 @@ openTelemetryIngest: replicaCount: 1 disableTelemetryCollection: false disableAutoscaler: false + # Max concurrent telemetry jobs processed by each pod + concurrency: 100 ports: http: 3403 resources: diff --git a/OpenTelemetryIngest/Config.ts b/OpenTelemetryIngest/Config.ts new file mode 100644 index 0000000000..2011d9ffe3 --- /dev/null +++ b/OpenTelemetryIngest/Config.ts @@ -0,0 +1,9 @@ +let concurrency: string | number = + process.env["OPEN_TELEMETRY_INGEST_CONCURRENCY"] || 100; + +if (typeof concurrency === "string") { + const parsed: number = parseInt(concurrency, 10); + concurrency = !isNaN(parsed) && parsed > 0 ? parsed : 100; +} + +export const OPEN_TELEMETRY_INGEST_CONCURRENCY: number = concurrency as number; diff --git a/OpenTelemetryIngest/Jobs/TelemetryIngest/ProcessTelemetry.ts b/OpenTelemetryIngest/Jobs/TelemetryIngest/ProcessTelemetry.ts index 2b6d76ebd8..26ff8a0c4c 100644 --- a/OpenTelemetryIngest/Jobs/TelemetryIngest/ProcessTelemetry.ts +++ b/OpenTelemetryIngest/Jobs/TelemetryIngest/ProcessTelemetry.ts @@ -8,6 +8,7 @@ import logger from "Common/Server/Utils/Logger"; import { QueueJob, QueueName } from "Common/Server/Infrastructure/Queue"; import QueueWorker from "Common/Server/Infrastructure/QueueWorker"; import ObjectID from "Common/Types/ObjectID"; +import { OPEN_TELEMETRY_INGEST_CONCURRENCY } from "../../Config"; // Set up the unified worker for processing telemetry queue QueueWorker.getWorker( @@ -58,7 +59,7 @@ QueueWorker.getWorker( throw error; } }, - { concurrency: 50 }, // Process up to 50 telemetry jobs concurrently + { concurrency: OPEN_TELEMETRY_INGEST_CONCURRENCY }, ); logger.debug("Unified telemetry worker initialized"); diff --git a/config.example.env b/config.example.env index 22e7a7528b..25c823ce62 100644 --- a/config.example.env +++ b/config.example.env @@ -234,6 +234,11 @@ WORKFLOW_SCRIPT_TIMEOUT_IN_MS=5000 WORKFLOW_TIMEOUT_IN_MS=5000 +# Concurrency settings +# Max number of telemetry jobs processed concurrently by OpenTelemetry Ingest worker +OPEN_TELEMETRY_INGEST_CONCURRENCY=100 + + # Lets encrypt notification email. This email will be used when certs are about to expire LETS_ENCRYPT_NOTIFICATION_EMAIL= diff --git a/docker-compose.base.yml b/docker-compose.base.yml index 15cf032eef..20173588af 100644 --- a/docker-compose.base.yml +++ b/docker-compose.base.yml @@ -482,6 +482,8 @@ services: <<: *common-server-variables PORT: ${OPEN_TELEMETRY_INGEST_PORT} DISABLE_TELEMETRY: ${DISABLE_TELEMETRY_FOR_OPEN_TELEMETRY_INGEST} + # Max concurrent telemetry jobs the worker will process + OPEN_TELEMETRY_INGEST_CONCURRENCY: ${OPEN_TELEMETRY_INGEST_CONCURRENCY} logging: driver: "local" options: