mirror of
https://github.com/OneUptime/oneuptime.git
synced 2026-04-06 08:42:13 +02:00
Compare commits
7 Commits
queue-work
...
probe-queu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c4c6793b29 | ||
|
|
c894b112e6 | ||
|
|
304baf1bb4 | ||
|
|
9adea6b1ba | ||
|
|
5498521e02 | ||
|
|
9e97c6ddbc | ||
|
|
63272e09f8 |
@@ -52,7 +52,9 @@ router.post(
|
||||
Response.sendEmptySuccessResponse(req, res);
|
||||
|
||||
// Add to queue for asynchronous processing
|
||||
await FluentIngestQueueService.addFluentIngestJob(req as TelemetryRequest);
|
||||
await FluentIngestQueueService.addFluentIngestJob(
|
||||
req as TelemetryRequest,
|
||||
);
|
||||
|
||||
return;
|
||||
} catch (err) {
|
||||
|
||||
@@ -34,9 +34,7 @@ QueueWorker.getWorker(
|
||||
requestHeaders: jobData.requestHeaders,
|
||||
});
|
||||
|
||||
logger.debug(
|
||||
`Successfully processed fluent ingestion job: ${job.name}`,
|
||||
);
|
||||
logger.debug(`Successfully processed fluent ingestion job: ${job.name}`);
|
||||
} catch (error) {
|
||||
logger.error(`Error processing fluent ingestion job:`);
|
||||
logger.error(error);
|
||||
@@ -55,8 +53,9 @@ async function processFluentIngestFromQueue(
|
||||
| Array<JSONObject | string>
|
||||
| JSONObject;
|
||||
|
||||
let oneuptimeServiceName: string | string[] | undefined =
|
||||
data.requestHeaders["x-oneuptime-service-name"] as string | string[] | undefined;
|
||||
let oneuptimeServiceName: string | string[] | undefined = data.requestHeaders[
|
||||
"x-oneuptime-service-name"
|
||||
] as string | string[] | undefined;
|
||||
|
||||
if (!oneuptimeServiceName) {
|
||||
oneuptimeServiceName = "Unknown Service";
|
||||
@@ -124,7 +123,9 @@ async function processFluentIngestFromQueue(
|
||||
OTelIngestService.recordDataIngestedUsgaeBilling({
|
||||
services: {
|
||||
[oneuptimeServiceName as string]: {
|
||||
dataIngestedInGB: JSONFunctions.getSizeOfJSONinGB(data.requestBody as JSONObject),
|
||||
dataIngestedInGB: JSONFunctions.getSizeOfJSONinGB(
|
||||
data.requestBody as JSONObject,
|
||||
),
|
||||
dataRententionInDays: telemetryService.dataRententionInDays,
|
||||
serviceId: telemetryService.serviceId,
|
||||
serviceName: oneuptimeServiceName as string,
|
||||
|
||||
@@ -12,9 +12,7 @@ export interface FluentIngestJobData {
|
||||
}
|
||||
|
||||
export default class FluentIngestQueueService {
|
||||
public static async addFluentIngestJob(
|
||||
req: TelemetryRequest,
|
||||
): Promise<void> {
|
||||
public static async addFluentIngestJob(req: TelemetryRequest): Promise<void> {
|
||||
try {
|
||||
const jobData: FluentIngestJobData = {
|
||||
projectId: req.projectId.toString(),
|
||||
|
||||
@@ -112,7 +112,7 @@ spec:
|
||||
---
|
||||
|
||||
# OneUptime fluent-ingest autoscaler
|
||||
{{- if not $.Values.fluentIngest.disableAutoscaler }}
|
||||
{{- if and (not $.Values.fluentIngest.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.fluentIngest.keda.enabled)) }}
|
||||
{{- $fluentIngestAutoScalerArgs := dict "ServiceName" "fluent-ingest" "Release" $.Release "Values" $.Values -}}
|
||||
{{- include "oneuptime.autoscaler" $fluentIngestAutoScalerArgs }}
|
||||
{{- end }}
|
||||
|
||||
@@ -112,7 +112,7 @@ spec:
|
||||
---
|
||||
|
||||
# OneUptime incoming-request-ingest autoscaler
|
||||
{{- if not $.Values.incomingRequestIngest.disableAutoscaler }}
|
||||
{{- if and (not $.Values.incomingRequestIngest.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.incomingRequestIngest.keda.enabled)) }}
|
||||
{{- $incomingRequestIngestAutoScalerArgs := dict "ServiceName" "incoming-request-ingest" "Release" $.Release "Values" $.Values -}}
|
||||
{{- include "oneuptime.autoscaler" $incomingRequestIngestAutoScalerArgs }}
|
||||
{{- end }}
|
||||
|
||||
@@ -35,4 +35,14 @@ KEDA ScaledObjects for various services
|
||||
{{- $metricsConfig := dict "enabled" .Values.probeIngest.keda.enabled "minReplicas" .Values.probeIngest.keda.minReplicas "maxReplicas" .Values.probeIngest.keda.maxReplicas "pollingInterval" .Values.probeIngest.keda.pollingInterval "cooldownPeriod" .Values.probeIngest.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_probe_ingest_queue_size" "threshold" .Values.probeIngest.keda.queueSizeThreshold "port" .Values.port.probeIngest)) }}
|
||||
{{- $probeIngestKedaArgs := dict "ServiceName" "probe-ingest" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.probeIngest.disableAutoscaler }}
|
||||
{{- include "oneuptime.kedaScaledObject" $probeIngestKedaArgs }}
|
||||
{{- end }}
|
||||
|
||||
{{/* Probe KEDA ScaledObjects - one for each probe configuration */}}
|
||||
{{- range $key, $val := $.Values.probes }}
|
||||
{{- if and $.Values.keda.enabled $val.keda.enabled (not $val.disableAutoscaler) }}
|
||||
{{- $serviceName := printf "probe-%s" $key }}
|
||||
{{- $metricsConfig := dict "enabled" $val.keda.enabled "minReplicas" $val.keda.minReplicas "maxReplicas" $val.keda.maxReplicas "pollingInterval" $val.keda.pollingInterval "cooldownPeriod" $val.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_probe_queue_size" "threshold" $val.keda.queueSizeThreshold "port" $.Values.port.probe)) }}
|
||||
{{- $probeKedaArgs := dict "ServiceName" $serviceName "Release" $.Release "Values" $.Values "MetricsConfig" $metricsConfig "DisableAutoscaler" $val.disableAutoscaler }}
|
||||
{{- include "oneuptime.kedaScaledObject" $probeKedaArgs }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
@@ -112,7 +112,7 @@ spec:
|
||||
---
|
||||
|
||||
# OneUptime probe-ingest autoscaler
|
||||
{{- if not $.Values.probeIngest.disableAutoscaler }}
|
||||
{{- if and (not $.Values.probeIngest.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.probeIngest.keda.enabled)) }}
|
||||
{{- $probeIngestAutoScalerArgs := dict "ServiceName" "probe-ingest" "Release" $.Release "Values" $.Values -}}
|
||||
{{- include "oneuptime.autoscaler" $probeIngestAutoScalerArgs }}
|
||||
{{- end }}
|
||||
|
||||
@@ -112,7 +112,7 @@ spec:
|
||||
---
|
||||
|
||||
# OneUptime server-monitor-ingest autoscaler
|
||||
{{- if not $.Values.serverMonitorIngest.disableAutoscaler }}
|
||||
{{- if and (not $.Values.serverMonitorIngest.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.serverMonitorIngest.keda.enabled)) }}
|
||||
{{- $serverMonitorIngestAutoScalerArgs := dict "ServiceName" "server-monitor-ingest" "Release" $.Release "Values" $.Values -}}
|
||||
{{- include "oneuptime.autoscaler" $serverMonitorIngestAutoScalerArgs }}
|
||||
{{- end }}
|
||||
|
||||
@@ -198,6 +198,17 @@ probes:
|
||||
customCodeMonitorScriptTimeoutInMs: 60000
|
||||
disableTelemetryCollection: false
|
||||
disableAutoscaler: false
|
||||
# KEDA autoscaling configuration based on monitor queue metrics
|
||||
keda:
|
||||
enabled: false
|
||||
minReplicas: 1
|
||||
maxReplicas: 100
|
||||
# Scale up when queue size exceeds this threshold per probe
|
||||
queueSizeThreshold: 10
|
||||
# Polling interval for metrics (in seconds)
|
||||
pollingInterval: 30
|
||||
# Cooldown period after scaling (in seconds)
|
||||
cooldownPeriod: 300
|
||||
# resources:
|
||||
# additionalContainers:
|
||||
# two:
|
||||
@@ -213,10 +224,22 @@ probes:
|
||||
# disableAutoscaler: false
|
||||
# resources:
|
||||
# additionalContainers:
|
||||
# KEDA autoscaling configuration based on monitor queue metrics
|
||||
# keda:
|
||||
# enabled: false
|
||||
# minReplicas: 1
|
||||
# maxReplicas: 100
|
||||
# # Scale up when queue size exceeds this threshold per probe
|
||||
# queueSizeThreshold: 10
|
||||
# # Polling interval for metrics (in seconds)
|
||||
# pollingInterval: 30
|
||||
# # Cooldown period after scaling (in seconds)
|
||||
# cooldownPeriod: 300
|
||||
|
||||
|
||||
port:
|
||||
app: 3002
|
||||
probe: 3874
|
||||
probeIngest: 3400
|
||||
serverMonitorIngest: 3404
|
||||
openTelemetryIngest: 3403
|
||||
|
||||
@@ -105,7 +105,8 @@ router.get(
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
const size: number = await IncomingRequestIngestQueueService.getQueueSize();
|
||||
const size: number =
|
||||
await IncomingRequestIngestQueueService.getQueueSize();
|
||||
return Response.sendJsonObjectResponse(req, res, { size });
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
|
||||
@@ -22,7 +22,8 @@ router.get(
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
const queueSize: number = await IncomingRequestIngestQueueService.getQueueSize();
|
||||
const queueSize: number =
|
||||
await IncomingRequestIngestQueueService.getQueueSize();
|
||||
|
||||
res.setHeader("Content-Type", "application/json");
|
||||
res.status(200).json({
|
||||
|
||||
@@ -98,12 +98,7 @@ async function processIncomingRequestFromQueue(
|
||||
};
|
||||
|
||||
// process probe response here.
|
||||
MonitorResourceUtil.monitorResource(incomingRequest).catch((err: Error) => {
|
||||
// do nothing.
|
||||
// we don't want to throw error here.
|
||||
// we just want to log the error.
|
||||
logger.error(err);
|
||||
});
|
||||
await MonitorResourceUtil.monitorResource(incomingRequest);
|
||||
}
|
||||
|
||||
logger.debug("Incoming request ingest worker initialized");
|
||||
|
||||
@@ -13,14 +13,12 @@ export interface IncomingRequestIngestJobData {
|
||||
}
|
||||
|
||||
export default class IncomingRequestIngestQueueService {
|
||||
public static async addIncomingRequestIngestJob(
|
||||
data: {
|
||||
secretKey: string;
|
||||
requestHeaders: Dictionary<string>;
|
||||
requestBody: string | JSONObject;
|
||||
requestMethod: string;
|
||||
},
|
||||
): Promise<void> {
|
||||
public static async addIncomingRequestIngestJob(data: {
|
||||
secretKey: string;
|
||||
requestHeaders: Dictionary<string>;
|
||||
requestBody: string | JSONObject;
|
||||
requestMethod: string;
|
||||
}): Promise<void> {
|
||||
try {
|
||||
const jobData: IncomingRequestIngestJobData = {
|
||||
secretKey: data.secretKey,
|
||||
|
||||
70
Probe/API/Metrics.ts
Normal file
70
Probe/API/Metrics.ts
Normal file
@@ -0,0 +1,70 @@
|
||||
import Express, {
|
||||
ExpressRequest,
|
||||
ExpressResponse,
|
||||
ExpressRouter,
|
||||
NextFunction,
|
||||
} from "Common/Server/Utils/Express";
|
||||
import Response from "Common/Server/Utils/Response";
|
||||
import { PROBE_INGEST_URL } from "../Config";
|
||||
import HTTPErrorResponse from "Common/Types/API/HTTPErrorResponse";
|
||||
import HTTPMethod from "Common/Types/API/HTTPMethod";
|
||||
import HTTPResponse from "Common/Types/API/HTTPResponse";
|
||||
import URL from "Common/Types/API/URL";
|
||||
import { JSONObject } from "Common/Types/JSON";
|
||||
import API from "Common/Utils/API";
|
||||
import logger from "Common/Server/Utils/Logger";
|
||||
import ProbeAPIRequest from "../Utils/ProbeAPIRequest";
|
||||
|
||||
const router: ExpressRouter = Express.getRouter();
|
||||
|
||||
// Metrics endpoint for Keda autoscaling
|
||||
router.get(
|
||||
"/queue-size",
|
||||
async (
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
// Get the pending monitor count for this specific probe from ProbeIngest API
|
||||
const queueSizeUrl: URL = URL.fromString(
|
||||
PROBE_INGEST_URL.toString(),
|
||||
).addRoute("/metrics/queue-size");
|
||||
|
||||
logger.debug("Fetching queue size from ProbeIngest API");
|
||||
|
||||
// Use probe authentication (probe key and probe ID)
|
||||
const requestBody: JSONObject = ProbeAPIRequest.getDefaultRequestBody();
|
||||
|
||||
const result: HTTPResponse<JSONObject> | HTTPErrorResponse =
|
||||
await API.fetch<JSONObject>(
|
||||
HTTPMethod.POST,
|
||||
queueSizeUrl,
|
||||
requestBody,
|
||||
{}
|
||||
);
|
||||
|
||||
if (result instanceof HTTPErrorResponse) {
|
||||
logger.error("Error fetching queue size from ProbeIngest API");
|
||||
logger.error(result);
|
||||
throw result;
|
||||
}
|
||||
|
||||
// Extract queueSize from the response
|
||||
const queueSize = result.data["queueSize"] || 0;
|
||||
|
||||
logger.debug(`Queue size fetched: ${queueSize}`);
|
||||
|
||||
return Response.sendJsonObjectResponse(req, res, {
|
||||
queueSize: queueSize,
|
||||
});
|
||||
|
||||
} catch (err) {
|
||||
logger.error("Error in metrics queue-size endpoint");
|
||||
logger.error(err);
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
export default router;
|
||||
@@ -3,10 +3,12 @@ import AliveJob from "./Jobs/Alive";
|
||||
import FetchMonitorList from "./Jobs/Monitor/FetchList";
|
||||
import FetchMonitorTestList from "./Jobs/Monitor/FetchMonitorTest";
|
||||
import Register from "./Services/Register";
|
||||
import MetricsAPI from "./API/Metrics";
|
||||
import { PromiseVoidFunction } from "Common/Types/FunctionTypes";
|
||||
import logger from "Common/Server/Utils/Logger";
|
||||
import App from "Common/Server/Utils/StartServer";
|
||||
import Telemetry from "Common/Server/Utils/Telemetry";
|
||||
import Express, { ExpressApplication } from "Common/Server/Utils/Express";
|
||||
import "ejs";
|
||||
|
||||
const APP_NAME: string = "probe";
|
||||
@@ -29,6 +31,10 @@ const init: PromiseVoidFunction = async (): Promise<void> => {
|
||||
},
|
||||
});
|
||||
|
||||
// Add metrics API routes
|
||||
const app: ExpressApplication = Express.getExpressApp();
|
||||
app.use("/metrics", MetricsAPI);
|
||||
|
||||
// add default routes
|
||||
await App.addDefaultRoutes();
|
||||
|
||||
|
||||
@@ -24,6 +24,11 @@ import Probe from "Common/Models/DatabaseModels/Probe";
|
||||
import User from "Common/Models/DatabaseModels/User";
|
||||
import ProbeIngestQueueService from "../Services/Queue/ProbeIngestQueueService";
|
||||
import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
|
||||
import PositiveNumber from "Common/Types/PositiveNumber";
|
||||
import MonitorProbeService from "Common/Server/Services/MonitorProbeService";
|
||||
import QueryHelper from "Common/Server/Types/Database/QueryHelper";
|
||||
import OneUptimeDate from "Common/Types/Date";
|
||||
import MonitorService from "Common/Server/Services/MonitorService";
|
||||
|
||||
const router: ExpressRouter = Express.getRouter();
|
||||
|
||||
@@ -363,6 +368,60 @@ router.get(
|
||||
},
|
||||
);
|
||||
|
||||
// Queue size endpoint for Keda autoscaling (returns pending monitors count for specific probe)
|
||||
router.post(
|
||||
"/metrics/queue-size",
|
||||
ProbeAuthorization.isAuthorizedServiceMiddleware,
|
||||
async (
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
// This endpoint returns the number of monitors pending for the specific probe
|
||||
// to be used by Keda for autoscaling probe replicas
|
||||
|
||||
// Get the probe ID from the authenticated request
|
||||
const data: JSONObject = req.body;
|
||||
const probeId: ObjectID = new ObjectID(data["probeId"] as string);
|
||||
|
||||
if (!probeId) {
|
||||
return Response.sendErrorResponse(
|
||||
req,
|
||||
res,
|
||||
new BadDataException("Probe ID not found"),
|
||||
);
|
||||
}
|
||||
|
||||
// Get pending monitor count for this specific probe
|
||||
const pendingCount: PositiveNumber = await MonitorProbeService.countBy({
|
||||
query: {
|
||||
probeId: probeId,
|
||||
isEnabled: true,
|
||||
nextPingAt: QueryHelper.lessThanEqualToOrNull(
|
||||
OneUptimeDate.getCurrentDate(),
|
||||
),
|
||||
monitor: {
|
||||
...MonitorService.getEnabledMonitorQuery(),
|
||||
},
|
||||
project: {
|
||||
...ProjectService.getActiveProjectStatusQuery(),
|
||||
},
|
||||
},
|
||||
props: {
|
||||
isRoot: true,
|
||||
},
|
||||
});
|
||||
|
||||
return Response.sendJsonObjectResponse(req, res, {
|
||||
queueSize: pendingCount.toNumber()
|
||||
});
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// Queue failed jobs endpoint
|
||||
router.get(
|
||||
"/probe/queue/failed",
|
||||
|
||||
@@ -22,9 +22,7 @@ QueueWorker.getWorker(
|
||||
|
||||
await processProbeFromQueue(jobData);
|
||||
|
||||
logger.debug(
|
||||
`Successfully processed probe ingestion job: ${job.name}`,
|
||||
);
|
||||
logger.debug(`Successfully processed probe ingestion job: ${job.name}`);
|
||||
} catch (error) {
|
||||
logger.error(`Error processing probe ingestion job:`);
|
||||
logger.error(error);
|
||||
@@ -50,10 +48,7 @@ async function processProbeFromQueue(
|
||||
|
||||
if (jobData.jobType === "probe-response") {
|
||||
// Handle regular probe response
|
||||
MonitorResourceUtil.monitorResource(probeResponse).catch((err: Error) => {
|
||||
logger.error("Error in monitor resource");
|
||||
logger.error(err);
|
||||
});
|
||||
await MonitorResourceUtil.monitorResource(probeResponse);
|
||||
} else if (jobData.jobType === "monitor-test" && jobData.testId) {
|
||||
// Handle monitor test response
|
||||
const testId: ObjectID = new ObjectID(jobData.testId);
|
||||
|
||||
@@ -11,13 +11,11 @@ export interface ProbeIngestJobData {
|
||||
}
|
||||
|
||||
export default class ProbeIngestQueueService {
|
||||
public static async addProbeIngestJob(
|
||||
data: {
|
||||
probeMonitorResponse: JSONObject;
|
||||
jobType: "probe-response" | "monitor-test";
|
||||
testId?: string;
|
||||
},
|
||||
): Promise<void> {
|
||||
public static async addProbeIngestJob(data: {
|
||||
probeMonitorResponse: JSONObject;
|
||||
jobType: "probe-response" | "monitor-test";
|
||||
testId?: string;
|
||||
}): Promise<void> {
|
||||
try {
|
||||
const jobData: ProbeIngestJobData = {
|
||||
probeMonitorResponse: data.probeMonitorResponse,
|
||||
|
||||
@@ -22,7 +22,8 @@ router.get(
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
const queueSize: number = await ServerMonitorIngestQueueService.getQueueSize();
|
||||
const queueSize: number =
|
||||
await ServerMonitorIngestQueueService.getQueueSize();
|
||||
|
||||
res.setHeader("Content-Type", "application/json");
|
||||
res.status(200).json({
|
||||
|
||||
@@ -10,12 +10,10 @@ export interface ServerMonitorIngestJobData {
|
||||
}
|
||||
|
||||
export default class ServerMonitorIngestQueueService {
|
||||
public static async addServerMonitorIngestJob(
|
||||
data: {
|
||||
secretKey: string;
|
||||
serverMonitorResponse: JSONObject;
|
||||
},
|
||||
): Promise<void> {
|
||||
public static async addServerMonitorIngestJob(data: {
|
||||
secretKey: string;
|
||||
serverMonitorResponse: JSONObject;
|
||||
}): Promise<void> {
|
||||
try {
|
||||
const jobData: ServerMonitorIngestJobData = {
|
||||
secretKey: data.secretKey,
|
||||
|
||||
Reference in New Issue
Block a user