feat: Introduce configurable concurrency settings for ingest workers in environment variables

This commit is contained in:
Simon Larsen
2025-08-08 21:52:49 +01:00
parent c0259fc041
commit cd11a450cd
18 changed files with 91 additions and 7 deletions

9
FluentIngest/Config.ts Normal file
View File

@@ -0,0 +1,9 @@
let concurrency: string | number =
process.env["FLUENT_INGEST_CONCURRENCY"] || 100;
if (typeof concurrency === "string") {
const parsed: number = parseInt(concurrency, 10);
concurrency = !isNaN(parsed) && parsed > 0 ? parsed : 100;
}
export const FLUENT_INGEST_CONCURRENCY: number = concurrency as number;

View File

@@ -11,6 +11,7 @@ import LogSeverity from "Common/Types/Log/LogSeverity";
import OTelIngestService from "Common/Server/Services/OpenTelemetryIngestService";
import JSONFunctions from "Common/Types/JSONFunctions";
import Log from "Common/Models/AnalyticsModels/Log";
import { FLUENT_INGEST_CONCURRENCY } from "../../Config";
interface FluentIngestProcessData {
projectId: ObjectID;
@@ -41,7 +42,7 @@ QueueWorker.getWorker(
throw error;
}
},
{ concurrency: 50 }, // Process up to 50 fluent ingest jobs concurrently
{ concurrency: FLUENT_INGEST_CONCURRENCY },
);
async function processFluentIngestFromQueue(

View File

@@ -93,6 +93,8 @@ spec:
value: {{ $.Values.fluentIngest.ports.http | quote }}
- name: DISABLE_TELEMETRY
value: {{ $.Values.fluentIngest.disableTelemetryCollection | quote }}
- name: FLUENT_INGEST_CONCURRENCY
value: {{ $.Values.fluentIngest.concurrency | squote }}
ports:
- containerPort: {{ $.Values.fluentIngest.ports.http }}
protocol: TCP

View File

@@ -93,6 +93,8 @@ spec:
value: {{ $.Values.incomingRequestIngest.ports.http | quote }}
- name: DISABLE_TELEMETRY
value: {{ $.Values.incomingRequestIngest.disableTelemetryCollection | quote }}
- name: INCOMING_REQUEST_INGEST_CONCURRENCY
value: {{ $.Values.incomingRequestIngest.concurrency | squote }}
ports:
- containerPort: {{ $.Values.incomingRequestIngest.ports.http }}
protocol: TCP

View File

@@ -93,6 +93,8 @@ spec:
value: {{ $.Values.probeIngest.ports.http | quote }}
- name: DISABLE_TELEMETRY
value: {{ $.Values.probeIngest.disableTelemetryCollection | quote }}
- name: PROBE_INGEST_CONCURRENCY
value: {{ $.Values.probeIngest.concurrency | squote }}
ports:
- containerPort: {{ $.Values.probeIngest.ports.http }}
protocol: TCP

View File

@@ -93,6 +93,8 @@ spec:
value: {{ $.Values.serverMonitorIngest.ports.http | quote }}
- name: DISABLE_TELEMETRY
value: {{ $.Values.serverMonitorIngest.disableTelemetryCollection | quote }}
- name: SERVER_MONITOR_INGEST_CONCURRENCY
value: {{ $.Values.serverMonitorIngest.concurrency | squote }}
ports:
- containerPort: {{ $.Values.serverMonitorIngest.ports.http }}
protocol: TCP

View File

@@ -88,6 +88,8 @@ spec:
value: {{ $.Values.worker.ports.http | quote }}
- name: DISABLE_TELEMETRY
value: {{ $.Values.worker.disableTelemetryCollection | quote }}
- name: WORKER_CONCURRENCY
value: {{ $.Values.worker.concurrency | squote }}
ports:
- containerPort: {{ $.Values.worker.ports.http }}
protocol: TCP

View File

@@ -477,6 +477,7 @@ worker:
replicaCount: 1
disableTelemetryCollection: false
disableAutoscaler: false
concurrency: 100
ports:
http: 1445
resources:
@@ -518,6 +519,7 @@ probeIngest:
replicaCount: 1
disableTelemetryCollection: false
disableAutoscaler: false
concurrency: 100
ports:
http: 3400
resources:
@@ -558,6 +560,7 @@ fluentIngest:
replicaCount: 1
disableTelemetryCollection: false
disableAutoscaler: false
concurrency: 100
ports:
http: 3401
resources:
@@ -577,6 +580,7 @@ incomingRequestIngest:
replicaCount: 1
disableTelemetryCollection: false
disableAutoscaler: false
concurrency: 100
ports:
http: 3402
resources:
@@ -604,6 +608,7 @@ serverMonitorIngest:
replicaCount: 1
disableTelemetryCollection: false
disableAutoscaler: false
concurrency: 100
ports:
http: 3404
resources:

View File

@@ -0,0 +1,9 @@
let concurrency: string | number =
process.env["INCOMING_REQUEST_INGEST_CONCURRENCY"] || 100;
if (typeof concurrency === "string") {
const parsed: number = parseInt(concurrency, 10);
concurrency = !isNaN(parsed) && parsed > 0 ? parsed : 100;
}
export const INCOMING_REQUEST_INGEST_CONCURRENCY: number = concurrency as number;

View File

@@ -13,6 +13,7 @@ import ObjectID from "Common/Types/ObjectID";
import MonitorService from "Common/Server/Services/MonitorService";
import MonitorResourceUtil from "Common/Server/Utils/Monitor/MonitorResource";
import Monitor from "Common/Models/DatabaseModels/Monitor";
import { INCOMING_REQUEST_INGEST_CONCURRENCY } from "../../Config";
// Set up the worker for processing incoming request ingest queue
QueueWorker.getWorker(
@@ -35,7 +36,7 @@ QueueWorker.getWorker(
throw error;
}
},
{ concurrency: 50 }, // Process up to 50 incoming request ingest jobs concurrently
{ concurrency: INCOMING_REQUEST_INGEST_CONCURRENCY }, // Configurable via env, defaults to 100
);
async function processIncomingRequestFromQueue(

9
ProbeIngest/Config.ts Normal file
View File

@@ -0,0 +1,9 @@
let concurrency: string | number =
process.env["PROBE_INGEST_CONCURRENCY"] || 100;
if (typeof concurrency === "string") {
const parsed: number = parseInt(concurrency, 10);
concurrency = !isNaN(parsed) && parsed > 0 ? parsed : 100;
}
export const PROBE_INGEST_CONCURRENCY: number = concurrency as number;

View File

@@ -10,6 +10,7 @@ import OneUptimeDate from "Common/Types/Date";
import MonitorTestService from "Common/Server/Services/MonitorTestService";
import ProbeMonitorResponse from "Common/Types/Probe/ProbeMonitorResponse";
import { JSONObject } from "Common/Types/JSON";
import { PROBE_INGEST_CONCURRENCY } from "../../Config";
// Set up the worker for processing probe ingest queue
QueueWorker.getWorker(
@@ -29,7 +30,7 @@ QueueWorker.getWorker(
throw error;
}
},
{ concurrency: 50 }, // Process up to 50 probe ingest jobs concurrently
{ concurrency: PROBE_INGEST_CONCURRENCY }, // Configurable via env, defaults to 100
);
async function processProbeFromQueue(

View File

@@ -0,0 +1,9 @@
let concurrency: string | number =
process.env["SERVER_MONITOR_INGEST_CONCURRENCY"] || 100;
if (typeof concurrency === "string") {
const parsed: number = parseInt(concurrency, 10);
concurrency = !isNaN(parsed) && parsed > 0 ? parsed : 100;
}
export const SERVER_MONITOR_INGEST_CONCURRENCY: number = concurrency as number;

View File

@@ -13,6 +13,7 @@ import MonitorResourceUtil from "Common/Server/Utils/Monitor/MonitorResource";
import Monitor from "Common/Models/DatabaseModels/Monitor";
import OneUptimeDate from "Common/Types/Date";
import ProjectService from "Common/Server/Services/ProjectService";
import { SERVER_MONITOR_INGEST_CONCURRENCY } from "../../Config";
// Set up the worker for processing server monitor ingest queue
QueueWorker.getWorker(
@@ -35,7 +36,7 @@ QueueWorker.getWorker(
throw error;
}
},
{ concurrency: 50 }, // Process up to 50 server monitor ingest jobs concurrently
{ concurrency: SERVER_MONITOR_INGEST_CONCURRENCY }, // Configurable via env, defaults to 100
);
async function processServerMonitorFromQueue(

8
Worker/Config.ts Normal file
View File

@@ -0,0 +1,8 @@
let concurrency: string | number = process.env["WORKER_CONCURRENCY"] || 100;
if (typeof concurrency === "string") {
const parsed: number = parseInt(concurrency, 10);
concurrency = !isNaN(parsed) && parsed > 0 ? parsed : 100;
}
export const WORKER_CONCURRENCY: number = concurrency as number;

View File

@@ -112,6 +112,7 @@ import Queue, { QueueJob, QueueName } from "Common/Server/Infrastructure/Queue";
import QueueWorker from "Common/Server/Infrastructure/QueueWorker";
import FeatureSet from "Common/Server/Types/FeatureSet";
import logger from "Common/Server/Utils/Logger";
import { WORKER_CONCURRENCY } from "./Config";
import Express, { ExpressApplication } from "Common/Server/Utils/Express";
@@ -133,7 +134,7 @@ const WorkersFeatureSet: FeatureSet = {
await AnalyticsTableManagement.createTables();
// Job process.
QueueWorker.getWorker(
QueueWorker.getWorker(
QueueName.Worker,
async (job: QueueJob) => {
const name: string = job.name;
@@ -148,8 +149,8 @@ const WorkersFeatureSet: FeatureSet = {
if (funcToRun) {
await QueueWorker.runJobWithTimeout(timeoutInMs, funcToRun);
}
},
{ concurrency: 100 },
},
{ concurrency: WORKER_CONCURRENCY },
);
} catch (err) {
logger.error("App Init Failed:");

View File

@@ -238,6 +238,21 @@ WORKFLOW_TIMEOUT_IN_MS=5000
# Max number of telemetry jobs processed concurrently by OpenTelemetry Ingest worker
OPEN_TELEMETRY_INGEST_CONCURRENCY=100
# Max number of jobs processed concurrently by Fluent Ingest worker
FLUENT_INGEST_CONCURRENCY=100
# Max number of jobs processed concurrently by Incoming Request Ingest worker
INCOMING_REQUEST_INGEST_CONCURRENCY=100
# Max number of jobs processed concurrently by Server Monitor Ingest worker
SERVER_MONITOR_INGEST_CONCURRENCY=100
# Max number of jobs processed concurrently by Probe Ingest worker
PROBE_INGEST_CONCURRENCY=100
# Max number of jobs processed concurrently by Worker service
WORKER_CONCURRENCY=100
# Lets encrypt notification email. This email will be used when certs are about to expire

View File

@@ -305,6 +305,7 @@ services:
<<: *common-server-variables
PORT: ${WORKER_PORT}
DISABLE_TELEMETRY: ${DISABLE_TELEMETRY_FOR_WORKER}
WORKER_CONCURRENCY: ${WORKER_CONCURRENCY}
logging:
driver: "local"
options:
@@ -443,6 +444,7 @@ services:
<<: *common-server-variables
PORT: ${FLUENT_INGEST_PORT}
DISABLE_TELEMETRY: ${DISABLE_TELEMETRY_FOR_FLUENT_INGEST}
FLUENT_INGEST_CONCURRENCY: ${FLUENT_INGEST_CONCURRENCY}
logging:
driver: "local"
options:
@@ -456,6 +458,7 @@ services:
<<: *common-server-variables
PORT: ${PROBE_INGEST_PORT}
DISABLE_TELEMETRY: ${DISABLE_TELEMETRY_FOR_PROBE_INGEST}
PROBE_INGEST_CONCURRENCY: ${PROBE_INGEST_CONCURRENCY}
logging:
driver: "local"
options:
@@ -469,6 +472,7 @@ services:
<<: *common-server-variables
PORT: ${SERVER_MONITOR_INGEST_PORT}
DISABLE_TELEMETRY: ${DISABLE_TELEMETRY_FOR_SERVER_MONITOR_INGEST}
SERVER_MONITOR_INGEST_CONCURRENCY: ${SERVER_MONITOR_INGEST_CONCURRENCY}
logging:
driver: "local"
options:
@@ -497,6 +501,7 @@ services:
<<: *common-server-variables
PORT: ${INCOMING_REQUEST_INGEST_PORT}
DISABLE_TELEMETRY: ${DISABLE_TELEMETRY_FOR_INCOMING_REQUEST_INGEST}
INCOMING_REQUEST_INGEST_CONCURRENCY: ${INCOMING_REQUEST_INGEST_CONCURRENCY}
logging:
driver: "local"
options: