From cd11a450cdb1797b7cea3993654e9943f0c81126 Mon Sep 17 00:00:00 2001 From: Simon Larsen Date: Fri, 8 Aug 2025 21:52:49 +0100 Subject: [PATCH] feat: Introduce configurable concurrency settings for ingest workers in environment variables --- FluentIngest/Config.ts | 9 +++++++++ .../Jobs/FluentIngest/ProcessFluentIngest.ts | 3 ++- .../Public/oneuptime/templates/fluent-ingest.yaml | 2 ++ .../templates/incoming-request-ingest.yaml | 2 ++ .../Public/oneuptime/templates/probe-ingest.yaml | 2 ++ .../templates/server-monitor-ingest.yaml | 2 ++ HelmChart/Public/oneuptime/templates/worker.yaml | 2 ++ HelmChart/Public/oneuptime/values.yaml | 5 +++++ IncomingRequestIngest/Config.ts | 9 +++++++++ .../ProcessIncomingRequestIngest.ts | 3 ++- ProbeIngest/Config.ts | 9 +++++++++ .../Jobs/ProbeIngest/ProcessProbeIngest.ts | 3 ++- ServerMonitorIngest/Config.ts | 9 +++++++++ .../ProcessServerMonitorIngest.ts | 3 ++- Worker/Config.ts | 8 ++++++++ Worker/Routes.ts | 7 ++++--- config.example.env | 15 +++++++++++++++ docker-compose.base.yml | 5 +++++ 18 files changed, 91 insertions(+), 7 deletions(-) create mode 100644 FluentIngest/Config.ts create mode 100644 IncomingRequestIngest/Config.ts create mode 100644 ProbeIngest/Config.ts create mode 100644 ServerMonitorIngest/Config.ts create mode 100644 Worker/Config.ts diff --git a/FluentIngest/Config.ts b/FluentIngest/Config.ts new file mode 100644 index 0000000000..5b6daacf30 --- /dev/null +++ b/FluentIngest/Config.ts @@ -0,0 +1,9 @@ +let concurrency: string | number = + process.env["FLUENT_INGEST_CONCURRENCY"] || 100; + +if (typeof concurrency === "string") { + const parsed: number = parseInt(concurrency, 10); + concurrency = !isNaN(parsed) && parsed > 0 ? parsed : 100; +} + +export const FLUENT_INGEST_CONCURRENCY: number = concurrency as number; diff --git a/FluentIngest/Jobs/FluentIngest/ProcessFluentIngest.ts b/FluentIngest/Jobs/FluentIngest/ProcessFluentIngest.ts index 3f47e2bf10..7ca37c93c6 100644 --- a/FluentIngest/Jobs/FluentIngest/ProcessFluentIngest.ts +++ b/FluentIngest/Jobs/FluentIngest/ProcessFluentIngest.ts @@ -11,6 +11,7 @@ import LogSeverity from "Common/Types/Log/LogSeverity"; import OTelIngestService from "Common/Server/Services/OpenTelemetryIngestService"; import JSONFunctions from "Common/Types/JSONFunctions"; import Log from "Common/Models/AnalyticsModels/Log"; +import { FLUENT_INGEST_CONCURRENCY } from "../../Config"; interface FluentIngestProcessData { projectId: ObjectID; @@ -41,7 +42,7 @@ QueueWorker.getWorker( throw error; } }, - { concurrency: 50 }, // Process up to 50 fluent ingest jobs concurrently + { concurrency: FLUENT_INGEST_CONCURRENCY }, ); async function processFluentIngestFromQueue( diff --git a/HelmChart/Public/oneuptime/templates/fluent-ingest.yaml b/HelmChart/Public/oneuptime/templates/fluent-ingest.yaml index 4caa29e38b..665c7d80c2 100644 --- a/HelmChart/Public/oneuptime/templates/fluent-ingest.yaml +++ b/HelmChart/Public/oneuptime/templates/fluent-ingest.yaml @@ -93,6 +93,8 @@ spec: value: {{ $.Values.fluentIngest.ports.http | quote }} - name: DISABLE_TELEMETRY value: {{ $.Values.fluentIngest.disableTelemetryCollection | quote }} + - name: FLUENT_INGEST_CONCURRENCY + value: {{ $.Values.fluentIngest.concurrency | squote }} ports: - containerPort: {{ $.Values.fluentIngest.ports.http }} protocol: TCP diff --git a/HelmChart/Public/oneuptime/templates/incoming-request-ingest.yaml b/HelmChart/Public/oneuptime/templates/incoming-request-ingest.yaml index 9b2b6e3225..103f6f0b6b 100644 --- a/HelmChart/Public/oneuptime/templates/incoming-request-ingest.yaml +++ b/HelmChart/Public/oneuptime/templates/incoming-request-ingest.yaml @@ -93,6 +93,8 @@ spec: value: {{ $.Values.incomingRequestIngest.ports.http | quote }} - name: DISABLE_TELEMETRY value: {{ $.Values.incomingRequestIngest.disableTelemetryCollection | quote }} + - name: INCOMING_REQUEST_INGEST_CONCURRENCY + value: {{ $.Values.incomingRequestIngest.concurrency | squote }} ports: - containerPort: {{ $.Values.incomingRequestIngest.ports.http }} protocol: TCP diff --git a/HelmChart/Public/oneuptime/templates/probe-ingest.yaml b/HelmChart/Public/oneuptime/templates/probe-ingest.yaml index 1a6e50f853..433e794074 100644 --- a/HelmChart/Public/oneuptime/templates/probe-ingest.yaml +++ b/HelmChart/Public/oneuptime/templates/probe-ingest.yaml @@ -93,6 +93,8 @@ spec: value: {{ $.Values.probeIngest.ports.http | quote }} - name: DISABLE_TELEMETRY value: {{ $.Values.probeIngest.disableTelemetryCollection | quote }} + - name: PROBE_INGEST_CONCURRENCY + value: {{ $.Values.probeIngest.concurrency | squote }} ports: - containerPort: {{ $.Values.probeIngest.ports.http }} protocol: TCP diff --git a/HelmChart/Public/oneuptime/templates/server-monitor-ingest.yaml b/HelmChart/Public/oneuptime/templates/server-monitor-ingest.yaml index e6eb981784..9def7f7ce6 100644 --- a/HelmChart/Public/oneuptime/templates/server-monitor-ingest.yaml +++ b/HelmChart/Public/oneuptime/templates/server-monitor-ingest.yaml @@ -93,6 +93,8 @@ spec: value: {{ $.Values.serverMonitorIngest.ports.http | quote }} - name: DISABLE_TELEMETRY value: {{ $.Values.serverMonitorIngest.disableTelemetryCollection | quote }} + - name: SERVER_MONITOR_INGEST_CONCURRENCY + value: {{ $.Values.serverMonitorIngest.concurrency | squote }} ports: - containerPort: {{ $.Values.serverMonitorIngest.ports.http }} protocol: TCP diff --git a/HelmChart/Public/oneuptime/templates/worker.yaml b/HelmChart/Public/oneuptime/templates/worker.yaml index d4d578b227..f4ca4032cc 100644 --- a/HelmChart/Public/oneuptime/templates/worker.yaml +++ b/HelmChart/Public/oneuptime/templates/worker.yaml @@ -88,6 +88,8 @@ spec: value: {{ $.Values.worker.ports.http | quote }} - name: DISABLE_TELEMETRY value: {{ $.Values.worker.disableTelemetryCollection | quote }} + - name: WORKER_CONCURRENCY + value: {{ $.Values.worker.concurrency | squote }} ports: - containerPort: {{ $.Values.worker.ports.http }} protocol: TCP diff --git a/HelmChart/Public/oneuptime/values.yaml b/HelmChart/Public/oneuptime/values.yaml index b22b13236f..94588aa3fe 100644 --- a/HelmChart/Public/oneuptime/values.yaml +++ b/HelmChart/Public/oneuptime/values.yaml @@ -477,6 +477,7 @@ worker: replicaCount: 1 disableTelemetryCollection: false disableAutoscaler: false + concurrency: 100 ports: http: 1445 resources: @@ -518,6 +519,7 @@ probeIngest: replicaCount: 1 disableTelemetryCollection: false disableAutoscaler: false + concurrency: 100 ports: http: 3400 resources: @@ -558,6 +560,7 @@ fluentIngest: replicaCount: 1 disableTelemetryCollection: false disableAutoscaler: false + concurrency: 100 ports: http: 3401 resources: @@ -577,6 +580,7 @@ incomingRequestIngest: replicaCount: 1 disableTelemetryCollection: false disableAutoscaler: false + concurrency: 100 ports: http: 3402 resources: @@ -604,6 +608,7 @@ serverMonitorIngest: replicaCount: 1 disableTelemetryCollection: false disableAutoscaler: false + concurrency: 100 ports: http: 3404 resources: diff --git a/IncomingRequestIngest/Config.ts b/IncomingRequestIngest/Config.ts new file mode 100644 index 0000000000..738d2bf9fa --- /dev/null +++ b/IncomingRequestIngest/Config.ts @@ -0,0 +1,9 @@ +let concurrency: string | number = + process.env["INCOMING_REQUEST_INGEST_CONCURRENCY"] || 100; + +if (typeof concurrency === "string") { + const parsed: number = parseInt(concurrency, 10); + concurrency = !isNaN(parsed) && parsed > 0 ? parsed : 100; +} + +export const INCOMING_REQUEST_INGEST_CONCURRENCY: number = concurrency as number; diff --git a/IncomingRequestIngest/Jobs/IncomingRequestIngest/ProcessIncomingRequestIngest.ts b/IncomingRequestIngest/Jobs/IncomingRequestIngest/ProcessIncomingRequestIngest.ts index 0f4b3d25a1..36d4d216bb 100644 --- a/IncomingRequestIngest/Jobs/IncomingRequestIngest/ProcessIncomingRequestIngest.ts +++ b/IncomingRequestIngest/Jobs/IncomingRequestIngest/ProcessIncomingRequestIngest.ts @@ -13,6 +13,7 @@ import ObjectID from "Common/Types/ObjectID"; import MonitorService from "Common/Server/Services/MonitorService"; import MonitorResourceUtil from "Common/Server/Utils/Monitor/MonitorResource"; import Monitor from "Common/Models/DatabaseModels/Monitor"; +import { INCOMING_REQUEST_INGEST_CONCURRENCY } from "../../Config"; // Set up the worker for processing incoming request ingest queue QueueWorker.getWorker( @@ -35,7 +36,7 @@ QueueWorker.getWorker( throw error; } }, - { concurrency: 50 }, // Process up to 50 incoming request ingest jobs concurrently + { concurrency: INCOMING_REQUEST_INGEST_CONCURRENCY }, // Configurable via env, defaults to 100 ); async function processIncomingRequestFromQueue( diff --git a/ProbeIngest/Config.ts b/ProbeIngest/Config.ts new file mode 100644 index 0000000000..870187e7c1 --- /dev/null +++ b/ProbeIngest/Config.ts @@ -0,0 +1,9 @@ +let concurrency: string | number = + process.env["PROBE_INGEST_CONCURRENCY"] || 100; + +if (typeof concurrency === "string") { + const parsed: number = parseInt(concurrency, 10); + concurrency = !isNaN(parsed) && parsed > 0 ? parsed : 100; +} + +export const PROBE_INGEST_CONCURRENCY: number = concurrency as number; diff --git a/ProbeIngest/Jobs/ProbeIngest/ProcessProbeIngest.ts b/ProbeIngest/Jobs/ProbeIngest/ProcessProbeIngest.ts index 61271f31c6..ac45bae83f 100644 --- a/ProbeIngest/Jobs/ProbeIngest/ProcessProbeIngest.ts +++ b/ProbeIngest/Jobs/ProbeIngest/ProcessProbeIngest.ts @@ -10,6 +10,7 @@ import OneUptimeDate from "Common/Types/Date"; import MonitorTestService from "Common/Server/Services/MonitorTestService"; import ProbeMonitorResponse from "Common/Types/Probe/ProbeMonitorResponse"; import { JSONObject } from "Common/Types/JSON"; +import { PROBE_INGEST_CONCURRENCY } from "../../Config"; // Set up the worker for processing probe ingest queue QueueWorker.getWorker( @@ -29,7 +30,7 @@ QueueWorker.getWorker( throw error; } }, - { concurrency: 50 }, // Process up to 50 probe ingest jobs concurrently + { concurrency: PROBE_INGEST_CONCURRENCY }, // Configurable via env, defaults to 100 ); async function processProbeFromQueue( diff --git a/ServerMonitorIngest/Config.ts b/ServerMonitorIngest/Config.ts new file mode 100644 index 0000000000..ea7c360380 --- /dev/null +++ b/ServerMonitorIngest/Config.ts @@ -0,0 +1,9 @@ +let concurrency: string | number = + process.env["SERVER_MONITOR_INGEST_CONCURRENCY"] || 100; + +if (typeof concurrency === "string") { + const parsed: number = parseInt(concurrency, 10); + concurrency = !isNaN(parsed) && parsed > 0 ? parsed : 100; +} + +export const SERVER_MONITOR_INGEST_CONCURRENCY: number = concurrency as number; diff --git a/ServerMonitorIngest/Jobs/ServerMonitorIngest/ProcessServerMonitorIngest.ts b/ServerMonitorIngest/Jobs/ServerMonitorIngest/ProcessServerMonitorIngest.ts index 8abadefa13..394758dcbe 100644 --- a/ServerMonitorIngest/Jobs/ServerMonitorIngest/ProcessServerMonitorIngest.ts +++ b/ServerMonitorIngest/Jobs/ServerMonitorIngest/ProcessServerMonitorIngest.ts @@ -13,6 +13,7 @@ import MonitorResourceUtil from "Common/Server/Utils/Monitor/MonitorResource"; import Monitor from "Common/Models/DatabaseModels/Monitor"; import OneUptimeDate from "Common/Types/Date"; import ProjectService from "Common/Server/Services/ProjectService"; +import { SERVER_MONITOR_INGEST_CONCURRENCY } from "../../Config"; // Set up the worker for processing server monitor ingest queue QueueWorker.getWorker( @@ -35,7 +36,7 @@ QueueWorker.getWorker( throw error; } }, - { concurrency: 50 }, // Process up to 50 server monitor ingest jobs concurrently + { concurrency: SERVER_MONITOR_INGEST_CONCURRENCY }, // Configurable via env, defaults to 100 ); async function processServerMonitorFromQueue( diff --git a/Worker/Config.ts b/Worker/Config.ts new file mode 100644 index 0000000000..65e40d561e --- /dev/null +++ b/Worker/Config.ts @@ -0,0 +1,8 @@ +let concurrency: string | number = process.env["WORKER_CONCURRENCY"] || 100; + +if (typeof concurrency === "string") { + const parsed: number = parseInt(concurrency, 10); + concurrency = !isNaN(parsed) && parsed > 0 ? parsed : 100; +} + +export const WORKER_CONCURRENCY: number = concurrency as number; diff --git a/Worker/Routes.ts b/Worker/Routes.ts index 1653258a66..c371c8eade 100644 --- a/Worker/Routes.ts +++ b/Worker/Routes.ts @@ -112,6 +112,7 @@ import Queue, { QueueJob, QueueName } from "Common/Server/Infrastructure/Queue"; import QueueWorker from "Common/Server/Infrastructure/QueueWorker"; import FeatureSet from "Common/Server/Types/FeatureSet"; import logger from "Common/Server/Utils/Logger"; +import { WORKER_CONCURRENCY } from "./Config"; import Express, { ExpressApplication } from "Common/Server/Utils/Express"; @@ -133,7 +134,7 @@ const WorkersFeatureSet: FeatureSet = { await AnalyticsTableManagement.createTables(); // Job process. - QueueWorker.getWorker( + QueueWorker.getWorker( QueueName.Worker, async (job: QueueJob) => { const name: string = job.name; @@ -148,8 +149,8 @@ const WorkersFeatureSet: FeatureSet = { if (funcToRun) { await QueueWorker.runJobWithTimeout(timeoutInMs, funcToRun); } - }, - { concurrency: 100 }, + }, + { concurrency: WORKER_CONCURRENCY }, ); } catch (err) { logger.error("App Init Failed:"); diff --git a/config.example.env b/config.example.env index 25c823ce62..91ce1a6992 100644 --- a/config.example.env +++ b/config.example.env @@ -238,6 +238,21 @@ WORKFLOW_TIMEOUT_IN_MS=5000 # Max number of telemetry jobs processed concurrently by OpenTelemetry Ingest worker OPEN_TELEMETRY_INGEST_CONCURRENCY=100 +# Max number of jobs processed concurrently by Fluent Ingest worker +FLUENT_INGEST_CONCURRENCY=100 + +# Max number of jobs processed concurrently by Incoming Request Ingest worker +INCOMING_REQUEST_INGEST_CONCURRENCY=100 + +# Max number of jobs processed concurrently by Server Monitor Ingest worker +SERVER_MONITOR_INGEST_CONCURRENCY=100 + +# Max number of jobs processed concurrently by Probe Ingest worker +PROBE_INGEST_CONCURRENCY=100 + +# Max number of jobs processed concurrently by Worker service +WORKER_CONCURRENCY=100 + # Lets encrypt notification email. This email will be used when certs are about to expire diff --git a/docker-compose.base.yml b/docker-compose.base.yml index 20173588af..3f7276a70c 100644 --- a/docker-compose.base.yml +++ b/docker-compose.base.yml @@ -305,6 +305,7 @@ services: <<: *common-server-variables PORT: ${WORKER_PORT} DISABLE_TELEMETRY: ${DISABLE_TELEMETRY_FOR_WORKER} + WORKER_CONCURRENCY: ${WORKER_CONCURRENCY} logging: driver: "local" options: @@ -443,6 +444,7 @@ services: <<: *common-server-variables PORT: ${FLUENT_INGEST_PORT} DISABLE_TELEMETRY: ${DISABLE_TELEMETRY_FOR_FLUENT_INGEST} + FLUENT_INGEST_CONCURRENCY: ${FLUENT_INGEST_CONCURRENCY} logging: driver: "local" options: @@ -456,6 +458,7 @@ services: <<: *common-server-variables PORT: ${PROBE_INGEST_PORT} DISABLE_TELEMETRY: ${DISABLE_TELEMETRY_FOR_PROBE_INGEST} + PROBE_INGEST_CONCURRENCY: ${PROBE_INGEST_CONCURRENCY} logging: driver: "local" options: @@ -469,6 +472,7 @@ services: <<: *common-server-variables PORT: ${SERVER_MONITOR_INGEST_PORT} DISABLE_TELEMETRY: ${DISABLE_TELEMETRY_FOR_SERVER_MONITOR_INGEST} + SERVER_MONITOR_INGEST_CONCURRENCY: ${SERVER_MONITOR_INGEST_CONCURRENCY} logging: driver: "local" options: @@ -497,6 +501,7 @@ services: <<: *common-server-variables PORT: ${INCOMING_REQUEST_INGEST_PORT} DISABLE_TELEMETRY: ${DISABLE_TELEMETRY_FOR_INCOMING_REQUEST_INGEST} + INCOMING_REQUEST_INGEST_CONCURRENCY: ${INCOMING_REQUEST_INGEST_CONCURRENCY} logging: driver: "local" options: