Compare commits

...

35 Commits

Author SHA1 Message Date
Simon Larsen
c4c6793b29 feat: Implement KEDA autoscaling configuration for probes and add metrics endpoints 2025-08-01 15:38:04 +01:00
Simon Larsen
c894b112e6 fix: Await monitorResource call to ensure proper error handling in incoming request processing 2025-08-01 14:34:17 +01:00
Simon Larsen
304baf1bb4 fix: Await monitorResource call to ensure proper error handling in probe response processing 2025-08-01 14:33:17 +01:00
Simon Larsen
9adea6b1ba feat: Remove Helm annotations for post-install and post-upgrade hooks from templates 2025-08-01 14:01:04 +01:00
Simon Larsen
5498521e02 feat: Add Helm annotations for post-install and post-upgrade hooks 2025-08-01 13:47:52 +01:00
Simon Larsen
9e97c6ddbc feat: Update autoscaler conditions for fluent-ingest, incoming-request-ingest, probe-ingest, and server-monitor-ingest templates 2025-08-01 13:23:39 +01:00
Nawaz Dhandala
63272e09f8 refactor: Simplify function parameter formatting and improve readability in various files 2025-08-01 10:45:55 +01:00
Simon Larsen
327c28afdc feat: Implement fluent ingest worker for processing queue jobs 2025-08-01 10:34:17 +01:00
Simon Larsen
896020b93b feat: Add KEDA autoscaling configuration for various ingests
- Introduced KEDA autoscaling configuration in values.yaml for probeIngest, fluentIngest, incomingRequestIngest, and serverMonitorIngest.
- Added endpoints for queue statistics, size, and failed jobs in IncomingRequestIngest and ProbeIngest APIs.
- Implemented asynchronous processing of incoming requests and probes using job queues.
- Created Metrics API for KEDA metrics integration in IncomingRequestIngest, ProbeIngest, and ServerMonitorIngest.
- Refactored IncomingRequest and Probe APIs to utilize queue services for processing.
- Added job processing logic for incoming requests and probes in respective job files.
- Implemented queue service classes for managing job addition and retrieval of queue statistics.
2025-08-01 10:29:02 +01:00
Simon Larsen
15a68472b0 feat: comment out ClusterKeyAuthorization import for KEDA debugging 2025-07-31 21:23:41 +01:00
Simon Larsen
0210480d97 feat: remove Prometheus metrics endpoint for KEDA debugging 2025-07-31 21:22:04 +01:00
Simon Larsen
72fdc06687 feat: temporarily disable authentication middleware for KEDA debugging in metrics endpoint 2025-07-31 20:52:10 +01:00
Simon Larsen
3710b81b9a feat: add replica count support for deployments in Helm templates 2025-07-31 20:03:25 +01:00
Simon Larsen
9fcb3dc2e0 feat: update cluster key handling for KEDA compatibility in authorization middleware and Helm chart 2025-07-31 19:50:25 +01:00
Simon Larsen
43e2ccf51a feat: improve secret handling in Helm chart for upgrade scenarios 2025-07-31 19:29:07 +01:00
Nawaz Dhandala
48c3d8603a fix: format code for better readability and consistency in MonitorResource and Metrics 2025-07-31 12:57:39 +01:00
Simon Larsen
9cfc912161 feat: enhance response messages for incoming request checks with time difference 2025-07-31 12:56:06 +01:00
Simon Larsen
29e3ee57ab feat: add metrics-api endpoint for queue size retrieval in KEDA autoscaling 2025-07-31 12:44:48 +01:00
Simon Larsen
be7e849822 feat: add KEDA ScaledObjects for OpenTelemetry Ingest with configurable metrics 2025-07-31 12:34:43 +01:00
Simon Larsen
59d76b601a feat: add KEDA autoscaling support for OpenTelemetry Ingest with configurable metrics 2025-07-31 12:18:57 +01:00
Simon Larsen
b77ef336b8 feat: add replica count configuration for multiple deployments in Helm templates 2025-07-31 11:47:03 +01:00
Nawaz Dhandala
7df21fe8e5 refactor: add type annotations for pagination parameters in OTelIngest 2025-07-30 22:42:24 +01:00
Nawaz Dhandala
f39e1943c7 refactor: improve code formatting and readability in Queue and TelemetryQueueService 2025-07-30 22:40:45 +01:00
Simon Larsen
966a903646 feat: implement pagination for retrieving failed jobs from the queue 2025-07-30 22:39:58 +01:00
Simon Larsen
1d9d37c6d1 refactor: optimize queue size and stats calculations by using count methods 2025-07-30 22:37:46 +01:00
Simon Larsen
7edcc4dbce feat: add endpoint to retrieve failed jobs from the queue 2025-07-30 22:37:08 +01:00
Simon Larsen
0939294d22 Refactor code structure for improved readability and maintainability 2025-07-30 19:45:46 +01:00
Simon Larsen
dbcbfe5f79 refactor: simplify telemetry processing worker initialization and remove unused export 2025-07-30 19:25:52 +01:00
Simon Larsen
a638972817 feat: update @oneuptime/common dependency in test-release workflow 2025-07-30 16:44:05 +01:00
Simon Larsen
37c6310465 feat: update @oneuptime/common to version 7.0.4800 and adjust workflow dependencies 2025-07-30 16:42:39 +01:00
Nawaz Dhandala
a7d38389fd style: improve formatting and consistency in Prometheus metrics generation 2025-07-30 16:30:26 +01:00
Nawaz Dhandala
2f55336db7 Merge branch 'master' of https://github.com/OneUptime/oneuptime 2025-07-30 16:29:25 +01:00
Nawaz Dhandala
f99a15b95b refactor: enhance type annotations for better clarity in queue processing methods 2025-07-30 16:28:53 +01:00
Simon Larsen
de5bff2ffe feat: add Prometheus metrics endpoint for telemetry queue monitoring 2025-07-30 16:26:38 +01:00
Nawaz Dhandala
cef2764499 style: format code for better readability and consistency across multiple files 2025-07-30 16:22:26 +01:00
65 changed files with 1926 additions and 344 deletions

View File

@@ -70,7 +70,7 @@ jobs:
publish-mcp-server:
runs-on: ubuntu-latest
needs: [generate-build-number]
needs: [generate-build-number, publish-npm-packages]
env:
CI_PIPELINE_ID: ${{ github.run_number }}
NPM_AUTH_TOKEN: ${{ secrets.NPM_AUTH_TOKEN }}
@@ -138,6 +138,7 @@ jobs:
- name: Install dependencies
run: |
cd MCP
npm update @oneuptime/common
npm install
- name: Build MCP server

View File

@@ -144,6 +144,7 @@ jobs:
- name: Install dependencies and build
run: |
cd MCP
npm update @oneuptime/common
npm install
npm run build

View File

@@ -17,6 +17,10 @@ export enum QueueName {
Workflow = "Workflow",
Worker = "Worker",
Telemetry = "Telemetry",
FluentIngest = "FluentIngest",
IncomingRequestIngest = "IncomingRequestIngest",
ServerMonitorIngest = "ServerMonitorIngest",
ProbeIngest = "ProbeIngest",
}
export type QueueJob = Job;
@@ -137,12 +141,12 @@ export default class Queue {
@CaptureSpan()
public static async getQueueSize(queueName: QueueName): Promise<number> {
const queue = this.getQueue(queueName);
const waiting = await queue.getWaiting();
const active = await queue.getActive();
const delayed = await queue.getDelayed();
return waiting.length + active.length + delayed.length;
const queue: BullQueue = this.getQueue(queueName);
const waitingCount: number = await queue.getWaitingCount();
const activeCount: number = await queue.getActiveCount();
const delayedCount: number = await queue.getDelayedCount();
return waitingCount + activeCount + delayedCount;
}
@CaptureSpan()
@@ -154,20 +158,61 @@ export default class Queue {
delayed: number;
total: number;
}> {
const queue = this.getQueue(queueName);
const waiting = await queue.getWaiting();
const active = await queue.getActive();
const completed = await queue.getCompleted();
const failed = await queue.getFailed();
const delayed = await queue.getDelayed();
const queue: BullQueue = this.getQueue(queueName);
const waitingCount: number = await queue.getWaitingCount();
const activeCount: number = await queue.getActiveCount();
const completedCount: number = await queue.getCompletedCount();
const failedCount: number = await queue.getFailedCount();
const delayedCount: number = await queue.getDelayedCount();
return {
waiting: waiting.length,
active: active.length,
completed: completed.length,
failed: failed.length,
delayed: delayed.length,
total: waiting.length + active.length + completed.length + failed.length + delayed.length,
waiting: waitingCount,
active: activeCount,
completed: completedCount,
failed: failedCount,
delayed: delayedCount,
total:
waitingCount +
activeCount +
completedCount +
failedCount +
delayedCount,
};
}
@CaptureSpan()
public static async getFailedJobs(
queueName: QueueName,
options?: {
start?: number;
end?: number;
},
): Promise<
Array<{
id: string;
name: string;
data: JSONObject;
failedReason: string;
processedOn: Date | null;
finishedOn: Date | null;
attemptsMade: number;
}>
> {
const queue: BullQueue = this.getQueue(queueName);
const start: number = options?.start || 0;
const end: number = options?.end || 100;
const failed: Job[] = await queue.getFailed(start, end);
return failed.map((job: Job) => {
return {
id: job.id || "unknown",
name: job.name || "unknown",
data: job.data as JSONObject,
failedReason: job.failedReason || "No reason provided",
processedOn: job.processedOn ? new Date(job.processedOn) : null,
finishedOn: job.finishedOn ? new Date(job.finishedOn) : null,
attemptsMade: job.attemptsMade || 0,
};
});
}
}

View File

@@ -38,6 +38,9 @@ export default class ClusterKeyAuthorization {
} else if (req.headers && req.headers["clusterkey"]) {
// Header keys are automatically transformed to lowercase
clusterKey = req.headers["clusterkey"] as string;
} else if (req.headers && req.headers["x-clusterkey"]) {
// KEDA TriggerAuthentication sends headers with X- prefix
clusterKey = req.headers["x-clusterkey"] as string;
} else if (req.body && req.body.clusterKey) {
clusterKey = req.body.clusterKey;
} else {

View File

@@ -137,7 +137,7 @@ export default class IncomingRequestCriteria {
input.dataToProcess.monitorId.toString() +
" is true",
);
return `Incoming request / heartbeat received in ${value} minutes.`;
return `Incoming request / heartbeat received in ${value} minutes. It was received ${differenceInMinutes} minutes ago.`;
}
return null;
}
@@ -153,7 +153,7 @@ export default class IncomingRequestCriteria {
input.dataToProcess.monitorId.toString() +
" is true",
);
return `Incoming request / heartbeat not received in ${value} minutes.`;
return `Incoming request / heartbeat not received in ${value} minutes. It was received ${differenceInMinutes} minutes ago.`;
}
return null;
}

View File

@@ -228,6 +228,8 @@ export default class MonitorResourceUtil {
await MonitorService.updateOneById({
id: monitor.id!,
data: {
incomingRequestMonitorHeartbeatCheckedAt:
OneUptimeDate.getCurrentDate(),
incomingMonitorRequest: {
...dataToProcess,
} as any,

View File

@@ -1,23 +1,17 @@
import TelemetryIngest, {
TelemetryRequest,
} from "Common/Server/Middleware/TelemetryIngest";
import OneUptimeDate from "Common/Types/Date";
import { JSONObject } from "Common/Types/JSON";
import ProductType from "Common/Types/MeteredPlan/ProductType";
import LogService from "Common/Server/Services/LogService";
import Express, {
ExpressRequest,
ExpressResponse,
ExpressRouter,
NextFunction,
} from "Common/Server/Utils/Express";
import logger from "Common/Server/Utils/Logger";
import Response from "Common/Server/Utils/Response";
import Log from "Common/Models/AnalyticsModels/Log";
import LogSeverity from "Common/Types/Log/LogSeverity";
import OTelIngestService from "Common/Server/Services/OpenTelemetryIngestService";
import ObjectID from "Common/Types/ObjectID";
import JSONFunctions from "Common/Types/JSONFunctions";
import FluentIngestQueueService from "../Services/Queue/FluentIngestQueueService";
import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
import BadRequestException from "Common/Types/Exception/BadRequestException";
export class FluentRequestMiddleware {
public static async getProductType(
@@ -46,96 +40,107 @@ router.post(
next: NextFunction,
): Promise<void> => {
try {
logger.debug("Fluent ProbeIngest API called");
const dbLogs: Array<Log> = [];
let logItems: Array<JSONObject | string> | JSONObject = req.body as
| Array<JSONObject | string>
| JSONObject;
let oneuptimeServiceName: string | string[] | undefined =
req.headers["x-oneuptime-service-name"];
if (!oneuptimeServiceName) {
oneuptimeServiceName = "Unknown Service";
if (!(req as TelemetryRequest).projectId) {
throw new BadRequestException(
"Invalid request - projectId not found in request.",
);
}
const telemetryService: {
serviceId: ObjectID;
dataRententionInDays: number;
} = await OTelIngestService.telemetryServiceFromName({
serviceName: oneuptimeServiceName as string,
projectId: (req as TelemetryRequest).projectId,
req.body = req.body.toJSON ? req.body.toJSON() : req.body;
// Return response immediately
Response.sendEmptySuccessResponse(req, res);
// Add to queue for asynchronous processing
await FluentIngestQueueService.addFluentIngestJob(
req as TelemetryRequest,
);
return;
} catch (err) {
return next(err);
}
},
);
// Queue stats endpoint
router.get(
"/fluent/queue/stats",
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
const stats: {
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
total: number;
} = await FluentIngestQueueService.getQueueStats();
return Response.sendJsonObjectResponse(req, res, stats);
} catch (err) {
return next(err);
}
},
);
// Queue size endpoint
router.get(
"/fluent/queue/size",
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
const size: number = await FluentIngestQueueService.getQueueSize();
return Response.sendJsonObjectResponse(req, res, { size });
} catch (err) {
return next(err);
}
},
);
// Queue failed jobs endpoint
router.get(
"/fluent/queue/failed",
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
// Parse pagination parameters from query string
const start: number = parseInt(req.query["start"] as string) || 0;
const end: number = parseInt(req.query["end"] as string) || 100;
const failedJobs: Array<{
id: string;
name: string;
data: any;
failedReason: string;
processedOn: Date | null;
finishedOn: Date | null;
attemptsMade: number;
}> = await FluentIngestQueueService.getFailedJobs({
start,
end,
});
if (
logItems &&
typeof logItems === "object" &&
(logItems as JSONObject)["json"]
) {
logItems = (logItems as JSONObject)["json"] as
| Array<JSONObject | string>
| JSONObject;
}
if (!Array.isArray(logItems)) {
logItems = [logItems];
}
for (let logItem of logItems) {
const dbLog: Log = new Log();
dbLog.projectId = (req as TelemetryRequest).projectId;
dbLog.serviceId = telemetryService.serviceId;
dbLog.severityNumber = 0;
const currentTimeAndDate: Date = OneUptimeDate.getCurrentDate();
dbLog.timeUnixNano = OneUptimeDate.toUnixNano(currentTimeAndDate);
dbLog.time = currentTimeAndDate;
dbLog.severityText = LogSeverity.Unspecified;
if (typeof logItem === "string") {
// check if its parseable to json
try {
logItem = JSON.parse(logItem);
} catch {
// do nothing
}
}
if (typeof logItem !== "string") {
logItem = JSON.stringify(logItem);
}
dbLog.body = logItem as string;
dbLogs.push(dbLog);
}
await LogService.createMany({
items: dbLogs,
props: {
isRoot: true,
return Response.sendJsonObjectResponse(req, res, {
failedJobs,
pagination: {
start,
end,
count: failedJobs.length,
},
});
OTelIngestService.recordDataIngestedUsgaeBilling({
services: {
[oneuptimeServiceName as string]: {
dataIngestedInGB: JSONFunctions.getSizeOfJSONinGB(req.body),
dataRententionInDays: telemetryService.dataRententionInDays,
serviceId: telemetryService.serviceId,
serviceName: oneuptimeServiceName as string,
},
},
projectId: (req as TelemetryRequest).projectId,
productType: ProductType.Logs,
}).catch((err: Error) => {
logger.error(err);
});
return Response.sendEmptySuccessResponse(req, res);
} catch (err) {
return next(err);
}

View File

@@ -0,0 +1,37 @@
import Express, {
ExpressRequest,
ExpressResponse,
ExpressRouter,
NextFunction,
} from "Common/Server/Utils/Express";
import FluentIngestQueueService from "../Services/Queue/FluentIngestQueueService";
// import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
const router: ExpressRouter = Express.getRouter();
/**
* JSON metrics endpoint for KEDA autoscaling
* Returns queue size as JSON for KEDA metrics-api scaler
*/
router.get(
"/metrics/queue-size",
// ClusterKeyAuthorization.isAuthorizedServiceMiddleware, // Temporarily disabled for KEDA debugging
async (
_req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
const queueSize: number = await FluentIngestQueueService.getQueueSize();
res.setHeader("Content-Type", "application/json");
res.status(200).json({
queueSize: queueSize,
});
} catch (err) {
return next(err);
}
},
);
export default router;

View File

@@ -1,4 +1,5 @@
import FluentIngestAPI from "./API/FluentIngest";
import MetricsAPI from "./API/Metrics";
import { PromiseVoidFunction } from "Common/Types/FunctionTypes";
import { ClickhouseAppInstance } from "Common/Server/Infrastructure/ClickhouseDatabase";
import PostgresAppInstance from "Common/Server/Infrastructure/PostgresDatabase";
@@ -9,12 +10,14 @@ import logger from "Common/Server/Utils/Logger";
import Realtime from "Common/Server/Utils/Realtime";
import App from "Common/Server/Utils/StartServer";
import Telemetry from "Common/Server/Utils/Telemetry";
import "./Jobs/FluentIngest/ProcessFluentIngest";
const app: ExpressApplication = Express.getExpressApp();
const APP_NAME: string = "fluent-ingest";
app.use([`/${APP_NAME}`, "/"], FluentIngestAPI);
app.use([`/${APP_NAME}`, "/"], MetricsAPI);
const init: PromiseVoidFunction = async (): Promise<void> => {
try {

View File

@@ -0,0 +1,141 @@
import { FluentIngestJobData } from "../../Services/Queue/FluentIngestQueueService";
import logger from "Common/Server/Utils/Logger";
import { QueueJob, QueueName } from "Common/Server/Infrastructure/Queue";
import QueueWorker from "Common/Server/Infrastructure/QueueWorker";
import ObjectID from "Common/Types/ObjectID";
import OneUptimeDate from "Common/Types/Date";
import { JSONObject } from "Common/Types/JSON";
import ProductType from "Common/Types/MeteredPlan/ProductType";
import LogService from "Common/Server/Services/LogService";
import LogSeverity from "Common/Types/Log/LogSeverity";
import OTelIngestService from "Common/Server/Services/OpenTelemetryIngestService";
import JSONFunctions from "Common/Types/JSONFunctions";
import Log from "Common/Models/AnalyticsModels/Log";
interface FluentIngestProcessData {
projectId: ObjectID;
requestBody: JSONObject;
requestHeaders: JSONObject;
}
// Set up the worker for processing fluent ingest queue
QueueWorker.getWorker(
QueueName.FluentIngest,
async (job: QueueJob): Promise<void> => {
logger.debug(`Processing fluent ingestion job: ${job.name}`);
try {
const jobData: FluentIngestJobData = job.data as FluentIngestJobData;
// Pass job data directly to processing function
await processFluentIngestFromQueue({
projectId: new ObjectID(jobData.projectId),
requestBody: jobData.requestBody,
requestHeaders: jobData.requestHeaders,
});
logger.debug(`Successfully processed fluent ingestion job: ${job.name}`);
} catch (error) {
logger.error(`Error processing fluent ingestion job:`);
logger.error(error);
throw error;
}
},
{ concurrency: 20 }, // Process up to 20 fluent ingest jobs concurrently
);
async function processFluentIngestFromQueue(
data: FluentIngestProcessData,
): Promise<void> {
const dbLogs: Array<Log> = [];
let logItems: Array<JSONObject | string> | JSONObject = data.requestBody as
| Array<JSONObject | string>
| JSONObject;
let oneuptimeServiceName: string | string[] | undefined = data.requestHeaders[
"x-oneuptime-service-name"
] as string | string[] | undefined;
if (!oneuptimeServiceName) {
oneuptimeServiceName = "Unknown Service";
}
const telemetryService: {
serviceId: ObjectID;
dataRententionInDays: number;
} = await OTelIngestService.telemetryServiceFromName({
serviceName: oneuptimeServiceName as string,
projectId: data.projectId,
});
if (
logItems &&
typeof logItems === "object" &&
(logItems as JSONObject)["json"]
) {
logItems = (logItems as JSONObject)["json"] as
| Array<JSONObject | string>
| JSONObject;
}
if (!Array.isArray(logItems)) {
logItems = [logItems];
}
for (let logItem of logItems) {
const dbLog: Log = new Log();
dbLog.projectId = data.projectId;
dbLog.serviceId = telemetryService.serviceId;
dbLog.severityNumber = 0;
const currentTimeAndDate: Date = OneUptimeDate.getCurrentDate();
dbLog.timeUnixNano = OneUptimeDate.toUnixNano(currentTimeAndDate);
dbLog.time = currentTimeAndDate;
dbLog.severityText = LogSeverity.Unspecified;
if (typeof logItem === "string") {
// check if its parseable to json
try {
logItem = JSON.parse(logItem);
} catch {
// do nothing
}
}
if (typeof logItem !== "string") {
logItem = JSON.stringify(logItem);
}
dbLog.body = logItem as string;
dbLogs.push(dbLog);
}
await LogService.createMany({
items: dbLogs,
props: {
isRoot: true,
},
});
OTelIngestService.recordDataIngestedUsgaeBilling({
services: {
[oneuptimeServiceName as string]: {
dataIngestedInGB: JSONFunctions.getSizeOfJSONinGB(
data.requestBody as JSONObject,
),
dataRententionInDays: telemetryService.dataRententionInDays,
serviceId: telemetryService.serviceId,
serviceName: oneuptimeServiceName as string,
},
},
projectId: data.projectId,
productType: ProductType.Logs,
}).catch((err: Error) => {
logger.error(err);
});
}
logger.debug("Fluent ingest worker initialized");

View File

@@ -0,0 +1,72 @@
import { TelemetryRequest } from "Common/Server/Middleware/TelemetryIngest";
import Queue, { QueueName } from "Common/Server/Infrastructure/Queue";
import { JSONObject } from "Common/Types/JSON";
import OneUptimeDate from "Common/Types/Date";
import logger from "Common/Server/Utils/Logger";
export interface FluentIngestJobData {
projectId: string;
requestBody: JSONObject;
requestHeaders: Record<string, string>;
ingestionTimestamp: Date;
}
export default class FluentIngestQueueService {
public static async addFluentIngestJob(req: TelemetryRequest): Promise<void> {
try {
const jobData: FluentIngestJobData = {
projectId: req.projectId.toString(),
requestBody: req.body,
requestHeaders: req.headers as Record<string, string>,
ingestionTimestamp: OneUptimeDate.getCurrentDate(),
};
const jobId: string = `fluent-${req.projectId?.toString()}-${OneUptimeDate.getCurrentDateAsUnixNano()}`;
await Queue.addJob(
QueueName.FluentIngest,
jobId,
"ProcessFluentIngest",
jobData as unknown as JSONObject,
);
logger.debug(`Added fluent ingestion job: ${jobId}`);
} catch (error) {
logger.error(`Error adding fluent ingestion job:`);
logger.error(error);
throw error;
}
}
public static async getQueueSize(): Promise<number> {
return Queue.getQueueSize(QueueName.FluentIngest);
}
public static async getQueueStats(): Promise<{
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
total: number;
}> {
return Queue.getQueueStats(QueueName.FluentIngest);
}
public static getFailedJobs(options?: {
start?: number;
end?: number;
}): Promise<
Array<{
id: string;
name: string;
data: JSONObject;
failedReason: string;
processedOn: Date | null;
finishedOn: Date | null;
attemptsMade: number;
}>
> {
return Queue.getFailedJobs(QueueName.FluentIngest, options);
}
}

View File

@@ -8,5 +8,8 @@ dependencies:
- name: clickhouse
repository: https://charts.bitnami.com/bitnami
version: 9.0.0
digest: sha256:fd5e473f86dd79be752e0eff131b1a90cd9a8cd2c6032c0d3d95ea43db833629
generated: "2025-04-24T22:34:15.181176+01:00"
- name: keda
repository: https://kedacore.github.io/charts
version: 2.17.2
digest: sha256:6e4d02525abd4434e8481eedb6244d88ab9d8b800a6f1c543cba96cf5b1d2922
generated: "2025-07-30T19:45:21.077234+01:00"

View File

@@ -46,4 +46,8 @@ dependencies:
version: "9.0.0"
repository: "https://charts.bitnami.com/bitnami"
condition: clickhouse.enabled
- name: keda
version: "2.17.2"
repository: "https://kedacore.github.io/charts"
condition: keda.enabled

Binary file not shown.

View File

@@ -559,9 +559,13 @@ spec:
selector:
matchLabels:
app: {{ printf "%s-%s" $.Release.Name $.ServiceName }}
{{- if $.ReplicaCount }}
replicas: {{ $.ReplicaCount }}
{{- else }}
{{- if or (not $.Values.autoscaling.enabled) ($.DisableAutoscaler) }}
replicas: {{ $.Values.deployment.replicaCount }}
{{- end }}
{{- end }}
template:
metadata:
labels:
@@ -695,3 +699,63 @@ spec:
requests:
storage: {{ $.Storage }}
{{- end }}
{{/*
KEDA ScaledObject template for metric-based autoscaling
Usage: include "oneuptime.kedaScaledObject" (dict "ServiceName" "service-name" "Release" .Release "Values" .Values "MetricsConfig" {...})
*/}}
{{- define "oneuptime.kedaScaledObject" }}
{{- if and .Values.keda.enabled .MetricsConfig.enabled (not .DisableAutoscaler) }}
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: {{ printf "%s-%s-scaledobject" .Release.Name .ServiceName }}
namespace: {{ .Release.Namespace }}
labels:
app: {{ printf "%s-%s" .Release.Name .ServiceName }}
app.kubernetes.io/part-of: oneuptime
app.kubernetes.io/managed-by: Helm
appname: oneuptime
spec:
scaleTargetRef:
name: {{ printf "%s-%s" .Release.Name .ServiceName }}
minReplicaCount: {{ .MetricsConfig.minReplicas }}
maxReplicaCount: {{ .MetricsConfig.maxReplicas }}
pollingInterval: {{ .MetricsConfig.pollingInterval }}
cooldownPeriod: {{ .MetricsConfig.cooldownPeriod }}
triggers:
{{- range .MetricsConfig.triggers }}
- type: metrics-api
metadata:
targetValue: {{ .threshold | quote }}
url: http://{{ printf "%s-%s" $.Release.Name $.ServiceName }}:{{ .port }}/metrics/queue-size
valueLocation: 'queueSize'
method: 'GET'
# authenticationRef:
# name: {{ printf "%s-%s-trigger-auth" $.Release.Name $.ServiceName }}
{{- end }}
---
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
name: {{ printf "%s-%s-trigger-auth" .Release.Name .ServiceName }}
namespace: {{ .Release.Namespace }}
labels:
app: {{ printf "%s-%s" .Release.Name .ServiceName }}
app.kubernetes.io/part-of: oneuptime
app.kubernetes.io/managed-by: Helm
appname: oneuptime
spec:
secretTargetRef:
{{- if .Values.externalSecrets.oneuptimeSecret.existingSecret.name }}
- parameter: clusterkey
name: {{ .Values.externalSecrets.oneuptimeSecret.existingSecret.name }}
key: {{ .Values.externalSecrets.oneuptimeSecret.existingSecret.passwordKey }}
{{- else }}
- parameter: clusterkey
name: {{ printf "%s-%s" .Release.Name "secrets" }}
key: oneuptime-secret
{{- end }}
{{- end }}
{{- end }}

View File

@@ -1,7 +1,7 @@
# OneUptime accounts Deployment
{{- $accountsEnv := dict "PORT" $.Values.port.accounts "DISABLE_TELEMETRY" $.Values.accounts.disableTelemetryCollection -}}
{{- $accountsPorts := dict "port" $.Values.port.accounts -}}
{{- $accountsDeploymentArgs :=dict "IsUI" true "ServiceName" "accounts" "Ports" $accountsPorts "Release" $.Release "Values" $.Values "Env" $accountsEnv "Resources" $.Values.accounts.resources "DisableAutoscaler" $.Values.accounts.disableAutoscaler -}}
{{- $accountsDeploymentArgs :=dict "IsUI" true "ServiceName" "accounts" "Ports" $accountsPorts "Release" $.Release "Values" $.Values "Env" $accountsEnv "Resources" $.Values.accounts.resources "DisableAutoscaler" $.Values.accounts.disableAutoscaler "ReplicaCount" $.Values.accounts.replicaCount -}}
{{- include "oneuptime.deployment" $accountsDeploymentArgs }}
---

View File

@@ -1,7 +1,7 @@
# OneUptime adminDashboard Deployment
{{- $adminDashboardEnv := dict "PORT" $.Values.port.adminDashboard "DISABLE_TELEMETRY" $.Values.adminDashboard.disableTelemetryCollection -}}
{{- $adminDashboardPorts := dict "port" $.Values.port.adminDashboard -}}
{{- $adminDashboardDeploymentArgs :=dict "IsUI" true "ServiceName" "admin-dashboard" "Ports" $adminDashboardPorts "Release" $.Release "Values" $.Values "Env" $adminDashboardEnv "Resources" $.Values.adminDashboard.resources "DisableAutoscaler" $.Values.adminDashboard.disableAutoscaler -}}
{{- $adminDashboardDeploymentArgs :=dict "IsUI" true "ServiceName" "admin-dashboard" "Ports" $adminDashboardPorts "Release" $.Release "Values" $.Values "Env" $adminDashboardEnv "Resources" $.Values.adminDashboard.resources "DisableAutoscaler" $.Values.adminDashboard.disableAutoscaler "ReplicaCount" $.Values.adminDashboard.replicaCount -}}
{{- include "oneuptime.deployment" $adminDashboardDeploymentArgs }}
---

View File

@@ -14,9 +14,13 @@ spec:
selector:
matchLabels:
app: {{ printf "%s-%s" $.Release.Name "api-reference" }}
{{- if $.Values.apiReference.replicaCount }}
replicas: {{ $.Values.apiReference.replicaCount }}
{{- else }}
{{- if or (not $.Values.autoscaling.enabled) ($.Values.apiReference.disableAutoscaler) }}
replicas: {{ $.Values.deployment.replicaCount }}
{{- end }}
{{- end }}
template:
metadata:
labels:

View File

@@ -14,9 +14,13 @@ spec:
selector:
matchLabels:
app: {{ printf "%s-%s" $.Release.Name "app" }}
{{- if $.Values.app.replicaCount }}
replicas: {{ $.Values.app.replicaCount }}
{{- else }}
{{- if or (not $.Values.autoscaling.enabled) ($.Values.app.disableAutoscaler) }}
replicas: {{ $.Values.deployment.replicaCount }}
{{- end }}
{{- end }}
template:
metadata:
labels:

View File

@@ -1,7 +1,7 @@
# OneUptime dashboard Deployment
{{- $dashboardPorts := dict "port" $.Values.port.dashboard -}}
{{- $dashboardEnv := dict "PORT" $.Values.port.dashboard "DISABLE_TELEMETRY" $.Values.dashboard.disableTelemetryCollection -}}
{{- $dashboardDeploymentArgs :=dict "IsUI" true "ServiceName" "dashboard" "Ports" $dashboardPorts "Release" $.Release "Values" $.Values "Env" $dashboardEnv "Resources" $.Values.dashboard.resources "DisableAutoscaler" $.Values.dashboard.disableAutoscaler -}}
{{- $dashboardDeploymentArgs :=dict "IsUI" true "ServiceName" "dashboard" "Ports" $dashboardPorts "Release" $.Release "Values" $.Values "Env" $dashboardEnv "Resources" $.Values.dashboard.resources "DisableAutoscaler" $.Values.dashboard.disableAutoscaler "ReplicaCount" $.Values.dashboard.replicaCount -}}
{{- include "oneuptime.deployment" $dashboardDeploymentArgs }}
---

View File

@@ -14,9 +14,13 @@ spec:
selector:
matchLabels:
app: {{ printf "%s-%s" $.Release.Name "docs" }}
{{- if $.Values.docs.replicaCount }}
replicas: {{ $.Values.docs.replicaCount }}
{{- else }}
{{- if or (not $.Values.autoscaling.enabled) ($.Values.docs.disableAutoscaler) }}
replicas: {{ $.Values.deployment.replicaCount }}
{{- end }}
{{- end }}
template:
metadata:
labels:

View File

@@ -15,8 +15,8 @@ spec:
selector:
matchLabels:
app: {{ printf "%s-%s" $.Release.Name "fluent-ingest" }}
{{- if $.Values.deployment.fluentIngest.replicaCount }}
replicas: {{ $.Values.deployment.fluentIngest.replicaCount }}
{{- if $.Values.fluentIngest.replicaCount }}
replicas: {{ $.Values.fluentIngest.replicaCount }}
{{- else }}
{{- if or (not $.Values.autoscaling.enabled) ($.Values.fluentIngest.disableAutoscaler) }}
replicas: {{ $.Values.deployment.replicaCount }}
@@ -112,7 +112,7 @@ spec:
---
# OneUptime fluent-ingest autoscaler
{{- if not $.Values.fluentIngest.disableAutoscaler }}
{{- if and (not $.Values.fluentIngest.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.fluentIngest.keda.enabled)) }}
{{- $fluentIngestAutoScalerArgs := dict "ServiceName" "fluent-ingest" "Release" $.Release "Values" $.Values -}}
{{- include "oneuptime.autoscaler" $fluentIngestAutoScalerArgs }}
{{- end }}

View File

@@ -14,9 +14,13 @@ spec:
selector:
matchLabels:
app: {{ printf "%s-%s" $.Release.Name "home" }}
{{- if $.Values.home.replicaCount }}
replicas: {{ $.Values.home.replicaCount }}
{{- else }}
{{- if or (not $.Values.autoscaling.enabled) ($.Values.home.disableAutoscaler) }}
replicas: {{ $.Values.deployment.replicaCount }}
{{- end }}
{{- end }}
template:
metadata:
labels:

View File

@@ -15,8 +15,8 @@ spec:
selector:
matchLabels:
app: {{ printf "%s-%s" $.Release.Name "incoming-request-ingest" }}
{{- if $.Values.deployment.incomingRequestIngest.replicaCount }}
replicas: {{ $.Values.deployment.incomingRequestIngest.replicaCount }}
{{- if $.Values.incomingRequestIngest.replicaCount }}
replicas: {{ $.Values.incomingRequestIngest.replicaCount }}
{{- else }}
{{- if or (not $.Values.autoscaling.enabled) ($.Values.incomingRequestIngest.disableAutoscaler) }}
replicas: {{ $.Values.deployment.replicaCount }}
@@ -112,7 +112,7 @@ spec:
---
# OneUptime incoming-request-ingest autoscaler
{{- if not $.Values.incomingRequestIngest.disableAutoscaler }}
{{- if and (not $.Values.incomingRequestIngest.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.incomingRequestIngest.keda.enabled)) }}
{{- $incomingRequestIngestAutoScalerArgs := dict "ServiceName" "incoming-request-ingest" "Release" $.Release "Values" $.Values -}}
{{- include "oneuptime.autoscaler" $incomingRequestIngestAutoScalerArgs }}
{{- end }}

View File

@@ -15,9 +15,13 @@ spec:
selector:
matchLabels:
app: {{ printf "%s-%s" $.Release.Name "isolated-vm" }}
{{- if $.Values.isolatedVM.replicaCount }}
replicas: {{ $.Values.isolatedVM.replicaCount }}
{{- else }}
{{- if or (not $.Values.autoscaling.enabled) ($.Values.isolatedVM.disableAutoscaler) }}
replicas: {{ $.Values.deployment.replicaCount }}
{{- end }}
{{- end }}
template:
metadata:
labels:

View File

@@ -0,0 +1,48 @@
{{/*
KEDA ScaledObjects for various services
*/}}
{{/* OpenTelemetry Ingest KEDA ScaledObject */}}
{{- if and .Values.keda.enabled .Values.openTelemetryIngest.keda.enabled (not .Values.openTelemetryIngest.disableAutoscaler) }}
{{- $metricsConfig := dict "enabled" .Values.openTelemetryIngest.keda.enabled "minReplicas" .Values.openTelemetryIngest.keda.minReplicas "maxReplicas" .Values.openTelemetryIngest.keda.maxReplicas "pollingInterval" .Values.openTelemetryIngest.keda.pollingInterval "cooldownPeriod" .Values.openTelemetryIngest.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_telemetry_queue_size" "threshold" .Values.openTelemetryIngest.keda.queueSizeThreshold "port" .Values.port.openTelemetryIngest)) }}
{{- $openTelemetryIngestKedaArgs := dict "ServiceName" "open-telemetry-ingest" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.openTelemetryIngest.disableAutoscaler }}
{{- include "oneuptime.kedaScaledObject" $openTelemetryIngestKedaArgs }}
{{- end }}
{{/* Fluent Ingest KEDA ScaledObject */}}
{{- if and .Values.keda.enabled .Values.fluentIngest.keda.enabled (not .Values.fluentIngest.disableAutoscaler) }}
{{- $metricsConfig := dict "enabled" .Values.fluentIngest.keda.enabled "minReplicas" .Values.fluentIngest.keda.minReplicas "maxReplicas" .Values.fluentIngest.keda.maxReplicas "pollingInterval" .Values.fluentIngest.keda.pollingInterval "cooldownPeriod" .Values.fluentIngest.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_fluent_ingest_queue_size" "threshold" .Values.fluentIngest.keda.queueSizeThreshold "port" .Values.port.fluentIngest)) }}
{{- $fluentIngestKedaArgs := dict "ServiceName" "fluent-ingest" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.fluentIngest.disableAutoscaler }}
{{- include "oneuptime.kedaScaledObject" $fluentIngestKedaArgs }}
{{- end }}
{{/* Incoming Request Ingest KEDA ScaledObject */}}
{{- if and .Values.keda.enabled .Values.incomingRequestIngest.keda.enabled (not .Values.incomingRequestIngest.disableAutoscaler) }}
{{- $metricsConfig := dict "enabled" .Values.incomingRequestIngest.keda.enabled "minReplicas" .Values.incomingRequestIngest.keda.minReplicas "maxReplicas" .Values.incomingRequestIngest.keda.maxReplicas "pollingInterval" .Values.incomingRequestIngest.keda.pollingInterval "cooldownPeriod" .Values.incomingRequestIngest.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_incoming_request_ingest_queue_size" "threshold" .Values.incomingRequestIngest.keda.queueSizeThreshold "port" .Values.port.incomingRequestIngest)) }}
{{- $incomingRequestIngestKedaArgs := dict "ServiceName" "incoming-request-ingest" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.incomingRequestIngest.disableAutoscaler }}
{{- include "oneuptime.kedaScaledObject" $incomingRequestIngestKedaArgs }}
{{- end }}
{{/* Server Monitor Ingest KEDA ScaledObject */}}
{{- if and .Values.keda.enabled .Values.serverMonitorIngest.keda.enabled (not .Values.serverMonitorIngest.disableAutoscaler) }}
{{- $metricsConfig := dict "enabled" .Values.serverMonitorIngest.keda.enabled "minReplicas" .Values.serverMonitorIngest.keda.minReplicas "maxReplicas" .Values.serverMonitorIngest.keda.maxReplicas "pollingInterval" .Values.serverMonitorIngest.keda.pollingInterval "cooldownPeriod" .Values.serverMonitorIngest.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_server_monitor_ingest_queue_size" "threshold" .Values.serverMonitorIngest.keda.queueSizeThreshold "port" .Values.port.serverMonitorIngest)) }}
{{- $serverMonitorIngestKedaArgs := dict "ServiceName" "server-monitor-ingest" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.serverMonitorIngest.disableAutoscaler }}
{{- include "oneuptime.kedaScaledObject" $serverMonitorIngestKedaArgs }}
{{- end }}
{{/* Probe Ingest KEDA ScaledObject */}}
{{- if and .Values.keda.enabled .Values.probeIngest.keda.enabled (not .Values.probeIngest.disableAutoscaler) }}
{{- $metricsConfig := dict "enabled" .Values.probeIngest.keda.enabled "minReplicas" .Values.probeIngest.keda.minReplicas "maxReplicas" .Values.probeIngest.keda.maxReplicas "pollingInterval" .Values.probeIngest.keda.pollingInterval "cooldownPeriod" .Values.probeIngest.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_probe_ingest_queue_size" "threshold" .Values.probeIngest.keda.queueSizeThreshold "port" .Values.port.probeIngest)) }}
{{- $probeIngestKedaArgs := dict "ServiceName" "probe-ingest" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.probeIngest.disableAutoscaler }}
{{- include "oneuptime.kedaScaledObject" $probeIngestKedaArgs }}
{{- end }}
{{/* Probe KEDA ScaledObjects - one for each probe configuration */}}
{{- range $key, $val := $.Values.probes }}
{{- if and $.Values.keda.enabled $val.keda.enabled (not $val.disableAutoscaler) }}
{{- $serviceName := printf "probe-%s" $key }}
{{- $metricsConfig := dict "enabled" $val.keda.enabled "minReplicas" $val.keda.minReplicas "maxReplicas" $val.keda.maxReplicas "pollingInterval" $val.keda.pollingInterval "cooldownPeriod" $val.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_probe_queue_size" "threshold" $val.keda.queueSizeThreshold "port" $.Values.port.probe)) }}
{{- $probeKedaArgs := dict "ServiceName" $serviceName "Release" $.Release "Values" $.Values "MetricsConfig" $metricsConfig "DisableAutoscaler" $val.disableAutoscaler }}
{{- include "oneuptime.kedaScaledObject" $probeKedaArgs }}
{{- end }}
{{- end }}

View File

@@ -15,9 +15,13 @@ spec:
selector:
matchLabels:
app: {{ printf "%s-%s" $.Release.Name "nginx" }}
{{- if $.Values.nginx.replicaCount }}
replicas: {{ $.Values.nginx.replicaCount }}
{{- else }}
{{- if or (not $.Values.autoscaling.enabled) ($.Values.nginx.disableAutoscaler) }}
replicas: {{ $.Values.deployment.replicaCount }}
{{- end }}
{{- end }}
template:
metadata:
labels:

View File

@@ -15,8 +15,8 @@ spec:
selector:
matchLabels:
app: {{ printf "%s-%s" $.Release.Name "open-telemetry-ingest" }}
{{- if $.Values.deployment.openTelemetryIngest.replicaCount }}
replicas: {{ $.Values.deployment.openTelemetryIngest.replicaCount }}
{{- if $.Values.openTelemetryIngest.replicaCount }}
replicas: {{ $.Values.openTelemetryIngest.replicaCount }}
{{- else }}
{{- if or (not $.Values.autoscaling.enabled) ($.Values.openTelemetryIngest.disableAutoscaler) }}
replicas: {{ $.Values.deployment.replicaCount }}
@@ -112,8 +112,8 @@ spec:
---
# OneUptime open-telemetry-ingest autoscaler
{{- if not $.Values.openTelemetryIngest.disableAutoscaler }}
{{- if and (not $.Values.openTelemetryIngest.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.openTelemetryIngest.keda.enabled)) }}
{{- $openTelemetryIngestAutoScalerArgs := dict "ServiceName" "open-telemetry-ingest" "Release" $.Release "Values" $.Values -}}
{{- include "oneuptime.autoscaler" $openTelemetryIngestAutoScalerArgs }}
{{- end }}
---
---

View File

@@ -15,8 +15,8 @@ spec:
selector:
matchLabels:
app: {{ printf "%s-%s" $.Release.Name "otel-collector" }}
{{- if $.Values.deployment.otelCollector.replicaCount }}
replicas: {{ $.Values.deployment.otelCollector.replicaCount }}
{{- if $.Values.openTelemetryCollector.replicaCount }}
replicas: {{ $.Values.openTelemetryCollector.replicaCount }}
{{- else }}
{{- if or (not $.Values.autoscaling.enabled) ($.Values.openTelemetryCollector.disableAutoscaler) }}
replicas: {{ $.Values.deployment.replicaCount }}

View File

@@ -15,8 +15,8 @@ spec:
selector:
matchLabels:
app: {{ printf "%s-%s" $.Release.Name "probe-ingest" }}
{{- if $.Values.deployment.probeIngest.replicaCount }}
replicas: {{ $.Values.deployment.probeIngest.replicaCount }}
{{- if $.Values.probeIngest.replicaCount }}
replicas: {{ $.Values.probeIngest.replicaCount }}
{{- else }}
{{- if or (not $.Values.autoscaling.enabled) ($.Values.probeIngest.disableAutoscaler) }}
replicas: {{ $.Values.deployment.replicaCount }}
@@ -112,7 +112,7 @@ spec:
---
# OneUptime probe-ingest autoscaler
{{- if not $.Values.probeIngest.disableAutoscaler }}
{{- if and (not $.Values.probeIngest.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.probeIngest.keda.enabled)) }}
{{- $probeIngestAutoScalerArgs := dict "ServiceName" "probe-ingest" "Release" $.Release "Values" $.Values -}}
{{- include "oneuptime.autoscaler" $probeIngestAutoScalerArgs }}
{{- end }}

View File

@@ -12,8 +12,16 @@ stringData:
## This is a workaround to keep the secrets unchanged
{{- if .Release.IsUpgrade }}
{{- if .Values.oneuptimeSecret }}
oneuptime-secret: {{ .Values.oneuptimeSecret | quote }}
{{- else }}
oneuptime-secret: {{ index (lookup "v1" "Secret" $.Release.Namespace (printf "%s-secrets" $.Release.Name)).data "oneuptime-secret" | b64dec }}
{{- end }}
{{- if .Values.encryptionSecret }}
encryption-secret: {{ .Values.encryptionSecret | quote }}
{{- else }}
encryption-secret: {{ index (lookup "v1" "Secret" $.Release.Namespace (printf "%s-secrets" $.Release.Name)).data "encryption-secret" | b64dec }}
{{- end }}
{{- range $key, $val := $.Values.probes }}
{{- if (index (lookup "v1" "Secret" $.Release.Namespace (printf "%s-secrets" $.Release.Name)).data (printf "probe-%s" $key)) }}
@@ -25,8 +33,16 @@ stringData:
{{ else }} # install operation
{{- if .Values.oneuptimeSecret }}
oneuptime-secret: {{ .Values.oneuptimeSecret | quote }}
{{- else }}
oneuptime-secret: {{ randAlphaNum 32 | quote }}
{{- end }}
{{- if .Values.encryptionSecret }}
encryption-secret: {{ .Values.encryptionSecret | quote }}
{{- else }}
encryption-secret: {{ randAlphaNum 32 | quote }}
{{- end }}
{{- range $key, $val := $.Values.probes }}
{{printf "probe-%s" $key}}: {{ randAlphaNum 32 | quote }}

View File

@@ -15,8 +15,8 @@ spec:
selector:
matchLabels:
app: {{ printf "%s-%s" $.Release.Name "server-monitor-ingest" }}
{{- if $.Values.deployment.serverMonitorIngest.replicaCount }}
replicas: {{ $.Values.deployment.serverMonitorIngest.replicaCount }}
{{- if $.Values.serverMonitorIngest.replicaCount }}
replicas: {{ $.Values.serverMonitorIngest.replicaCount }}
{{- else }}
{{- if or (not $.Values.autoscaling.enabled) ($.Values.serverMonitorIngest.disableAutoscaler) }}
replicas: {{ $.Values.deployment.replicaCount }}
@@ -112,7 +112,7 @@ spec:
---
# OneUptime server-monitor-ingest autoscaler
{{- if not $.Values.serverMonitorIngest.disableAutoscaler }}
{{- if and (not $.Values.serverMonitorIngest.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.serverMonitorIngest.keda.enabled)) }}
{{- $serverMonitorIngestAutoScalerArgs := dict "ServiceName" "server-monitor-ingest" "Release" $.Release "Values" $.Values -}}
{{- include "oneuptime.autoscaler" $serverMonitorIngestAutoScalerArgs }}
{{- end }}

View File

@@ -1,7 +1,7 @@
# OneUptime statusPage Deployment
{{- $statusPagePorts := dict "port" $.Values.port.statusPage -}}
{{- $statusPageEnv := dict "PORT" $.Values.port.statusPage "DISABLE_TELEMETRY" $.Values.statusPage.disableTelemetryCollection -}}
{{- $statusPageDeploymentArgs :=dict "IsUI" true "ServiceName" "status-page" "Ports" $statusPagePorts "Release" $.Release "Values" $.Values "Env" $statusPageEnv "Resources" $.Values.statusPage.resources "DisableAutoscaler" $.Values.statusPage.disableAutoscaler -}}
{{- $statusPageDeploymentArgs :=dict "IsUI" true "ServiceName" "status-page" "Ports" $statusPagePorts "Release" $.Release "Values" $.Values "Env" $statusPageEnv "Resources" $.Values.statusPage.resources "DisableAutoscaler" $.Values.statusPage.disableAutoscaler "ReplicaCount" $.Values.statusPage.replicaCount -}}
{{- include "oneuptime.deployment" $statusPageDeploymentArgs }}
---

View File

@@ -3,7 +3,7 @@
# OneUptime testServer Deployment
{{- $testServerPorts := dict "port" $.Values.port.testServer -}}
{{- $testServerEnv := dict "PORT" $.Values.port.testServer "DISABLE_TELEMETRY" $.Values.testServer.disableTelemetryCollection -}}
{{- $testServerDeploymentArgs :=dict "IsUI" true "ServiceName" "test-server" "Ports" $testServerPorts "Release" $.Release "Values" $.Values "Env" $testServerEnv "Resources" $.Values.testServer.resources "DisableAutoscaler" $.Values.testServer.disableAutoscaler -}}
{{- $testServerDeploymentArgs :=dict "IsUI" true "ServiceName" "test-server" "Ports" $testServerPorts "Release" $.Release "Values" $.Values "Env" $testServerEnv "Resources" $.Values.testServer.resources "DisableAutoscaler" $.Values.testServer.disableAutoscaler "ReplicaCount" $.Values.testServer.replicaCount -}}
{{- include "oneuptime.deployment" $testServerDeploymentArgs }}
---

View File

@@ -14,9 +14,13 @@ spec:
selector:
matchLabels:
app: {{ printf "%s-%s" $.Release.Name "worker" }}
{{- if $.Values.worker.replicaCount }}
replicas: {{ $.Values.worker.replicaCount }}
{{- else }}
{{- if or (not $.Values.autoscaling.enabled) ($.Values.worker.disableAutoscaler) }}
replicas: {{ $.Values.deployment.replicaCount }}
{{- end }}
{{- end }}
template:
metadata:
labels:

View File

@@ -14,9 +14,13 @@ spec:
selector:
matchLabels:
app: {{ printf "%s-%s" $.Release.Name "workflow" }}
{{- if $.Values.workflow.replicaCount }}
replicas: {{ $.Values.workflow.replicaCount }}
{{- else }}
{{- if or (not $.Values.autoscaling.enabled) ($.Values.workflow.disableAutoscaler) }}
replicas: {{ $.Values.deployment.replicaCount }}
{{- end }}
{{- end }}
template:
metadata:
labels:

View File

@@ -28,19 +28,7 @@ fluentdHost:
deployment:
# Default replica count for all deployments
replicaCount: 1
probeIngest:
replicaCount:
serverMonitorIngest:
replicaCount:
openTelemetryIngest:
replicaCount:
fluentIngest:
replicaCount:
incomingRequestIngest:
replicaCount:
otelCollector:
replicaCount:
replicaCount: 1
metalLb:
enabled: false
@@ -50,6 +38,7 @@ metalLb:
# - 51.158.55.153/32 # List of IP addresses of all the servers in the cluster.
nginx:
replicaCount: 1
disableTelemetryCollection: false
disableAutoscaler: false
listenAddress: ""
@@ -191,6 +180,7 @@ alerts:
# 2. Set the statusPage.cnameRecord to "oneuptime.yourcompany.com"
# 3. Create CNAME record in your DNS provider with the name "status.yourcompany.com" and value "oneuptime.yourcompany.com"
statusPage:
replicaCount: 1
cnameRecord:
disableTelemetryCollection: false
disableAutoscaler: false
@@ -208,6 +198,17 @@ probes:
customCodeMonitorScriptTimeoutInMs: 60000
disableTelemetryCollection: false
disableAutoscaler: false
# KEDA autoscaling configuration based on monitor queue metrics
keda:
enabled: false
minReplicas: 1
maxReplicas: 100
# Scale up when queue size exceeds this threshold per probe
queueSizeThreshold: 10
# Polling interval for metrics (in seconds)
pollingInterval: 30
# Cooldown period after scaling (in seconds)
cooldownPeriod: 300
# resources:
# additionalContainers:
# two:
@@ -223,10 +224,22 @@ probes:
# disableAutoscaler: false
# resources:
# additionalContainers:
# KEDA autoscaling configuration based on monitor queue metrics
# keda:
# enabled: false
# minReplicas: 1
# maxReplicas: 100
# # Scale up when queue size exceeds this threshold per probe
# queueSizeThreshold: 10
# # Polling interval for metrics (in seconds)
# pollingInterval: 30
# # Cooldown period after scaling (in seconds)
# cooldownPeriod: 300
port:
app: 3002
probe: 3874
probeIngest: 3400
serverMonitorIngest: 3404
openTelemetryIngest: 3403
@@ -251,6 +264,7 @@ port:
docs: 1447
testServer:
replicaCount: 1
enabled: false
disableTelemetryCollection: false
disableAutoscaler: false
@@ -431,6 +445,7 @@ readinessProbe: # Readiness probe configuration
# OpenTelemetry Collector Configuration
openTelemetryCollector:
replicaCount: 1
disableTelemetryCollection: false
disableAutoscaler: false
sendingQueue:
@@ -439,83 +454,157 @@ openTelemetryCollector:
numConsumers: 3
accounts:
replicaCount: 1
disableTelemetryCollection: false
disableAutoscaler: false
resources:
home:
replicaCount: 1
disableTelemetryCollection: false
disableAutoscaler: false
resources:
dashboard:
replicaCount: 1
disableTelemetryCollection: false
disableAutoscaler: false
resources:
adminDashboard:
replicaCount: 1
disableTelemetryCollection: false
disableAutoscaler: false
resources:
worker:
replicaCount: 1
disableTelemetryCollection: false
disableAutoscaler: false
resources:
workflow:
replicaCount: 1
disableTelemetryCollection: false
disableAutoscaler: false
workflowTimeoutInMs: 5000
resources:
apiReference:
replicaCount: 1
disableTelemetryCollection: false
disableAutoscaler: false
resources:
docs:
replicaCount: 1
disableTelemetryCollection: false
disableAutoscaler: false
resources:
app:
replicaCount: 1
disableTelemetryCollection: false
disableAutoscaler: false
resources:
probeIngest:
replicaCount: 1
disableTelemetryCollection: false
disableAutoscaler: false
resources:
# KEDA autoscaling configuration based on queue metrics
keda:
enabled: false
minReplicas: 1
maxReplicas: 100
# Scale up when queue size exceeds this threshold
queueSizeThreshold: 100
# Polling interval for metrics (in seconds)
pollingInterval: 30
# Cooldown period after scaling (in seconds)
cooldownPeriod: 300
openTelemetryIngest:
replicaCount: 1
disableTelemetryCollection: false
disableAutoscaler: false
resources:
# KEDA autoscaling configuration based on queue metrics
keda:
enabled: false
minReplicas: 1
maxReplicas: 100
# Scale up when queue size exceeds this threshold
queueSizeThreshold: 100
# Polling interval for metrics (in seconds)
pollingInterval: 30
# Cooldown period after scaling (in seconds)
cooldownPeriod: 300
fluentIngest:
replicaCount: 1
disableTelemetryCollection: false
disableAutoscaler: false
resources:
# KEDA autoscaling configuration based on queue metrics
keda:
enabled: false
minReplicas: 1
maxReplicas: 100
# Scale up when queue size exceeds this threshold
queueSizeThreshold: 100
# Polling interval for metrics (in seconds)
pollingInterval: 30
# Cooldown period after scaling (in seconds)
cooldownPeriod: 300
incomingRequestIngest:
replicaCount: 1
disableTelemetryCollection: false
disableAutoscaler: false
resources:
# KEDA autoscaling configuration based on queue metrics
keda:
enabled: false
minReplicas: 1
maxReplicas: 100
# Scale up when queue size exceeds this threshold
queueSizeThreshold: 100
# Polling interval for metrics (in seconds)
pollingInterval: 30
# Cooldown period after scaling (in seconds)
cooldownPeriod: 300
isolatedVM:
replicaCount: 1
disableTelemetryCollection: false
disableAutoscaler: false
resources:
serverMonitorIngest:
replicaCount: 1
disableTelemetryCollection: false
disableAutoscaler: false
resources:
# KEDA autoscaling configuration based on queue metrics
keda:
enabled: false
minReplicas: 1
maxReplicas: 100
# Scale up when queue size exceeds this threshold
queueSizeThreshold: 100
# Polling interval for metrics (in seconds)
pollingInterval: 30
# Cooldown period after scaling (in seconds)
cooldownPeriod: 300
slackApp:
clientId:
clientSecret:
signingSecret:
signingSecret:
keda:
enabled: true

View File

@@ -1,12 +1,6 @@
import HTTPMethod from "Common/Types/API/HTTPMethod";
import OneUptimeDate from "Common/Types/Date";
import Dictionary from "Common/Types/Dictionary";
import BadDataException from "Common/Types/Exception/BadDataException";
import { JSONObject } from "Common/Types/JSON";
import IncomingMonitorRequest from "Common/Types/Monitor/IncomingMonitor/IncomingMonitorRequest";
import MonitorType from "Common/Types/Monitor/MonitorType";
import ObjectID from "Common/Types/ObjectID";
import MonitorService from "Common/Server/Services/MonitorService";
import Express, {
ExpressRequest,
ExpressResponse,
@@ -14,10 +8,9 @@ import Express, {
NextFunction,
RequestHandler,
} from "Common/Server/Utils/Express";
import MonitorResourceUtil from "Common/Server/Utils/Monitor/MonitorResource";
import Response from "Common/Server/Utils/Response";
import Monitor from "Common/Models/DatabaseModels/Monitor";
import logger from "Common/Server/Utils/Logger";
import IncomingRequestIngestQueueService from "../Services/Queue/IncomingRequestIngestQueueService";
import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
const router: ExpressRouter = Express.getRouter();
@@ -38,63 +31,18 @@ const processIncomingRequest: RequestHandler = async (
throw new BadDataException("Invalid Secret Key");
}
const isGetRequest: boolean = req.method === "GET";
const isPostRequest: boolean = req.method === "POST";
// Return response immediately
Response.sendEmptySuccessResponse(req, res);
let httpMethod: HTTPMethod = HTTPMethod.GET;
if (isGetRequest) {
httpMethod = HTTPMethod.GET;
}
if (isPostRequest) {
httpMethod = HTTPMethod.POST;
}
const monitor: Monitor | null = await MonitorService.findOneBy({
query: {
incomingRequestSecretKey: new ObjectID(monitorSecretKeyAsString),
monitorType: MonitorType.IncomingRequest,
},
select: {
_id: true,
projectId: true,
},
props: {
isRoot: true,
},
});
if (!monitor || !monitor._id) {
throw new BadDataException("Monitor not found");
}
if (!monitor.projectId) {
throw new BadDataException("Project not found");
}
const now: Date = OneUptimeDate.getCurrentDate();
const incomingRequest: IncomingMonitorRequest = {
projectId: monitor.projectId,
monitorId: new ObjectID(monitor._id.toString()),
// Add to queue for asynchronous processing
await IncomingRequestIngestQueueService.addIncomingRequestIngestJob({
secretKey: monitorSecretKeyAsString,
requestHeaders: requestHeaders,
requestBody: requestBody,
incomingRequestReceivedAt: now,
onlyCheckForIncomingRequestReceivedAt: false,
requestMethod: httpMethod,
checkedAt: now,
};
// process probe response here.
MonitorResourceUtil.monitorResource(incomingRequest).catch((err: Error) => {
// do nothing.
// we don't want to throw error here.
// we just want to log the error.
logger.error(err);
requestMethod: req.method,
});
return Response.sendEmptySuccessResponse(req, res);
return;
} catch (err) {
return next(err);
}
@@ -122,4 +70,89 @@ router.get(
},
);
// Queue stats endpoint
router.get(
"/incoming-request/queue/stats",
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
const stats: {
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
total: number;
} = await IncomingRequestIngestQueueService.getQueueStats();
return Response.sendJsonObjectResponse(req, res, stats);
} catch (err) {
return next(err);
}
},
);
// Queue size endpoint
router.get(
"/incoming-request/queue/size",
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
const size: number =
await IncomingRequestIngestQueueService.getQueueSize();
return Response.sendJsonObjectResponse(req, res, { size });
} catch (err) {
return next(err);
}
},
);
// Queue failed jobs endpoint
router.get(
"/incoming-request/queue/failed",
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
// Parse pagination parameters from query string
const start: number = parseInt(req.query["start"] as string) || 0;
const end: number = parseInt(req.query["end"] as string) || 100;
const failedJobs: Array<{
id: string;
name: string;
data: any;
failedReason: string;
processedOn: Date | null;
finishedOn: Date | null;
attemptsMade: number;
}> = await IncomingRequestIngestQueueService.getFailedJobs({
start,
end,
});
return Response.sendJsonObjectResponse(req, res, {
failedJobs,
pagination: {
start,
end,
count: failedJobs.length,
},
});
} catch (err) {
return next(err);
}
},
);
export default router;

View File

@@ -0,0 +1,38 @@
import Express, {
ExpressRequest,
ExpressResponse,
ExpressRouter,
NextFunction,
} from "Common/Server/Utils/Express";
import IncomingRequestIngestQueueService from "../Services/Queue/IncomingRequestIngestQueueService";
// import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
const router: ExpressRouter = Express.getRouter();
/**
* JSON metrics endpoint for KEDA autoscaling
* Returns queue size as JSON for KEDA metrics-api scaler
*/
router.get(
"/metrics/queue-size",
// ClusterKeyAuthorization.isAuthorizedServiceMiddleware, // Temporarily disabled for KEDA debugging
async (
_req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
const queueSize: number =
await IncomingRequestIngestQueueService.getQueueSize();
res.setHeader("Content-Type", "application/json");
res.status(200).json({
queueSize: queueSize,
});
} catch (err) {
return next(err);
}
},
);
export default router;

View File

@@ -1,4 +1,5 @@
import IncomingRequestAPI from "./API/IncomingRequest";
import MetricsAPI from "./API/Metrics";
import { PromiseVoidFunction } from "Common/Types/FunctionTypes";
import { ClickhouseAppInstance } from "Common/Server/Infrastructure/ClickhouseDatabase";
import PostgresAppInstance from "Common/Server/Infrastructure/PostgresDatabase";
@@ -9,6 +10,7 @@ import logger from "Common/Server/Utils/Logger";
import Realtime from "Common/Server/Utils/Realtime";
import App from "Common/Server/Utils/StartServer";
import Telemetry from "Common/Server/Utils/Telemetry";
import "./Jobs/IncomingRequestIngest/ProcessIncomingRequestIngest";
import "ejs";
const app: ExpressApplication = Express.getExpressApp();
@@ -16,6 +18,7 @@ const app: ExpressApplication = Express.getExpressApp();
const APP_NAME: string = "incoming-request-ingest";
app.use([`/${APP_NAME}`, "/"], IncomingRequestAPI);
app.use([`/${APP_NAME}`, "/"], MetricsAPI);
const init: PromiseVoidFunction = async (): Promise<void> => {
try {

View File

@@ -0,0 +1,104 @@
import { IncomingRequestIngestJobData } from "../../Services/Queue/IncomingRequestIngestQueueService";
import logger from "Common/Server/Utils/Logger";
import { QueueJob, QueueName } from "Common/Server/Infrastructure/Queue";
import QueueWorker from "Common/Server/Infrastructure/QueueWorker";
import HTTPMethod from "Common/Types/API/HTTPMethod";
import OneUptimeDate from "Common/Types/Date";
import Dictionary from "Common/Types/Dictionary";
import BadDataException from "Common/Types/Exception/BadDataException";
import { JSONObject } from "Common/Types/JSON";
import IncomingMonitorRequest from "Common/Types/Monitor/IncomingMonitor/IncomingMonitorRequest";
import MonitorType from "Common/Types/Monitor/MonitorType";
import ObjectID from "Common/Types/ObjectID";
import MonitorService from "Common/Server/Services/MonitorService";
import MonitorResourceUtil from "Common/Server/Utils/Monitor/MonitorResource";
import Monitor from "Common/Models/DatabaseModels/Monitor";
// Set up the worker for processing incoming request ingest queue
QueueWorker.getWorker(
QueueName.IncomingRequestIngest,
async (job: QueueJob): Promise<void> => {
logger.debug(`Processing incoming request ingestion job: ${job.name}`);
try {
const jobData: IncomingRequestIngestJobData =
job.data as IncomingRequestIngestJobData;
await processIncomingRequestFromQueue(jobData);
logger.debug(
`Successfully processed incoming request ingestion job: ${job.name}`,
);
} catch (error) {
logger.error(`Error processing incoming request ingestion job:`);
logger.error(error);
throw error;
}
},
{ concurrency: 20 }, // Process up to 20 incoming request ingest jobs concurrently
);
async function processIncomingRequestFromQueue(
jobData: IncomingRequestIngestJobData,
): Promise<void> {
const requestHeaders: Dictionary<string> = jobData.requestHeaders;
const requestBody: string | JSONObject = jobData.requestBody;
const monitorSecretKeyAsString: string = jobData.secretKey;
if (!monitorSecretKeyAsString) {
throw new BadDataException("Invalid Secret Key");
}
const isGetRequest: boolean = jobData.requestMethod === "GET";
const isPostRequest: boolean = jobData.requestMethod === "POST";
let httpMethod: HTTPMethod = HTTPMethod.GET;
if (isGetRequest) {
httpMethod = HTTPMethod.GET;
}
if (isPostRequest) {
httpMethod = HTTPMethod.POST;
}
const monitor: Monitor | null = await MonitorService.findOneBy({
query: {
incomingRequestSecretKey: new ObjectID(monitorSecretKeyAsString),
monitorType: MonitorType.IncomingRequest,
},
select: {
_id: true,
projectId: true,
},
props: {
isRoot: true,
},
});
if (!monitor || !monitor._id) {
throw new BadDataException("Monitor not found");
}
if (!monitor.projectId) {
throw new BadDataException("Project not found");
}
const now: Date = OneUptimeDate.getCurrentDate();
const incomingRequest: IncomingMonitorRequest = {
projectId: monitor.projectId,
monitorId: new ObjectID(monitor._id.toString()),
requestHeaders: requestHeaders,
requestBody: requestBody,
incomingRequestReceivedAt: now,
onlyCheckForIncomingRequestReceivedAt: false,
requestMethod: httpMethod,
checkedAt: now,
};
// process probe response here.
await MonitorResourceUtil.monitorResource(incomingRequest);
}
logger.debug("Incoming request ingest worker initialized");

View File

@@ -0,0 +1,79 @@
import Queue, { QueueName } from "Common/Server/Infrastructure/Queue";
import { JSONObject } from "Common/Types/JSON";
import OneUptimeDate from "Common/Types/Date";
import logger from "Common/Server/Utils/Logger";
import Dictionary from "Common/Types/Dictionary";
export interface IncomingRequestIngestJobData {
secretKey: string;
requestHeaders: Dictionary<string>;
requestBody: string | JSONObject;
requestMethod: string;
ingestionTimestamp: Date;
}
export default class IncomingRequestIngestQueueService {
public static async addIncomingRequestIngestJob(data: {
secretKey: string;
requestHeaders: Dictionary<string>;
requestBody: string | JSONObject;
requestMethod: string;
}): Promise<void> {
try {
const jobData: IncomingRequestIngestJobData = {
secretKey: data.secretKey,
requestHeaders: data.requestHeaders,
requestBody: data.requestBody,
requestMethod: data.requestMethod,
ingestionTimestamp: OneUptimeDate.getCurrentDate(),
};
const jobId: string = `incoming-request-${data.secretKey}-${OneUptimeDate.getCurrentDateAsUnixNano()}`;
await Queue.addJob(
QueueName.IncomingRequestIngest,
jobId,
"ProcessIncomingRequestIngest",
jobData as unknown as JSONObject,
);
logger.debug(`Added incoming request ingestion job: ${jobId}`);
} catch (error) {
logger.error(`Error adding incoming request ingestion job:`);
logger.error(error);
throw error;
}
}
public static async getQueueSize(): Promise<number> {
return Queue.getQueueSize(QueueName.IncomingRequestIngest);
}
public static async getQueueStats(): Promise<{
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
total: number;
}> {
return Queue.getQueueStats(QueueName.IncomingRequestIngest);
}
public static getFailedJobs(options?: {
start?: number;
end?: number;
}): Promise<
Array<{
id: string;
name: string;
data: JSONObject;
failedReason: string;
processedOn: Date | null;
finishedOn: Date | null;
attemptsMade: number;
}>
> {
return Queue.getFailedJobs(QueueName.IncomingRequestIngest, options);
}
}

View File

@@ -42,6 +42,7 @@ WORKDIR /usr/src/app
# Install app dependencies
COPY ./MCP/package*.json /usr/src/app/
RUN npm update @oneuptime/common
RUN npm install
COPY ./MCP /usr/src/app

5
MCP/package-lock.json generated
View File

@@ -1262,7 +1262,9 @@
}
},
"node_modules/@oneuptime/common": {
"version": "7.0.4773",
"version": "7.0.4800",
"resolved": "https://registry.npmjs.org/@oneuptime/common/-/common-7.0.4800.tgz",
"integrity": "sha512-v+2F3aW85IOFVwUzyYPCX4g26SE5N2NBGuf649QpGuJpwAhw6A3uSaX8PMjrHBtn76EKU3BEgg1GrXZL1NPsrw==",
"license": "Apache-2.0",
"dependencies": {
"@asteasolutions/zod-to-openapi": "^7.3.2",
@@ -1315,7 +1317,6 @@
"json5": "^2.2.3",
"jsonwebtoken": "^9.0.0",
"jwt-decode": "^4.0.0",
"lodash": "^4.17.21",
"marked": "^12.0.2",
"moment": "^2.30.1",
"moment-timezone": "^0.5.45",

View File

@@ -0,0 +1,37 @@
import Express, {
ExpressRequest,
ExpressResponse,
ExpressRouter,
NextFunction,
} from "Common/Server/Utils/Express";
import TelemetryQueueService from "../Services/Queue/TelemetryQueueService";
// import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
const router: ExpressRouter = Express.getRouter();
/**
* JSON metrics endpoint for KEDA autoscaling
* Returns queue size as JSON for KEDA metrics-api scaler
*/
router.get(
"/metrics/queue-size",
// ClusterKeyAuthorization.isAuthorizedServiceMiddleware, // Temporarily disabled for KEDA debugging
async (
_req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
const queueSize: number = await TelemetryQueueService.getQueueSize();
res.setHeader("Content-Type", "application/json");
res.status(200).json({
queueSize: queueSize,
});
} catch (err) {
return next(err);
}
},
);
export default router;

View File

@@ -26,10 +26,10 @@ router.post(
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction
next: NextFunction,
): Promise<void> => {
return OtelIngestService.ingestTraces(req, res, next);
}
},
);
router.post(
@@ -39,10 +39,10 @@ router.post(
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction
next: NextFunction,
): Promise<void> => {
return OtelIngestService.ingestMetrics(req, res, next);
}
},
);
router.post(
@@ -52,28 +52,35 @@ router.post(
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction
next: NextFunction,
): Promise<void> => {
return OtelIngestService.ingestLogs(req, res, next);
}
},
);
// Queue stats endpoint
router.get(
"/otlp/queue/stats",
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction
next: NextFunction,
): Promise<void> => {
try {
const stats = await TelemetryQueueService.getQueueStats();
const stats: {
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
total: number;
} = await TelemetryQueueService.getQueueStats();
return Response.sendJsonObjectResponse(req, res, stats);
} catch (err) {
return next(err);
}
}
},
);
// Queue size endpoint
@@ -83,15 +90,56 @@ router.get(
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction
next: NextFunction,
): Promise<void> => {
try {
const size = await TelemetryQueueService.getQueueSize();
const size: number = await TelemetryQueueService.getQueueSize();
return Response.sendJsonObjectResponse(req, res, { size });
} catch (err) {
return next(err);
}
}
},
);
// Queue failed jobs endpoint
router.get(
"/otlp/queue/failed",
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
// Parse pagination parameters from query string
const start: number = parseInt(req.query["start"] as string) || 0;
const end: number = parseInt(req.query["end"] as string) || 100;
const failedJobs: Array<{
id: string;
name: string;
data: any;
failedReason: string;
processedOn: Date | null;
finishedOn: Date | null;
attemptsMade: number;
}> = await TelemetryQueueService.getFailedJobs({
start,
end,
});
return Response.sendJsonObjectResponse(req, res, {
failedJobs,
pagination: {
start,
end,
count: failedJobs.length,
},
});
} catch (err) {
return next(err);
}
},
);
export default router;

View File

@@ -1,4 +1,5 @@
import OTelIngestAPI from "./API/OTelIngest";
import MetricsAPI from "./API/Metrics";
import { PromiseVoidFunction } from "Common/Types/FunctionTypes";
import { ClickhouseAppInstance } from "Common/Server/Infrastructure/ClickhouseDatabase";
import PostgresAppInstance from "Common/Server/Infrastructure/PostgresDatabase";
@@ -9,7 +10,7 @@ import logger from "Common/Server/Utils/Logger";
import Realtime from "Common/Server/Utils/Realtime";
import App from "Common/Server/Utils/StartServer";
import Telemetry from "Common/Server/Utils/Telemetry";
import ProcessTelemetryWorker from "./Jobs/TelemetryIngest/ProcessTelemetry";
import "./Jobs/TelemetryIngest/ProcessTelemetry";
import "ejs";
const app: ExpressApplication = Express.getExpressApp();
@@ -17,6 +18,7 @@ const app: ExpressApplication = Express.getExpressApp();
const APP_NAME: string = "open-telemetry-ingest";
app.use([`/${APP_NAME}`, "/"], OTelIngestAPI);
app.use([`/${APP_NAME}`, "/"], MetricsAPI);
const init: PromiseVoidFunction = async (): Promise<void> => {
try {
@@ -55,10 +57,6 @@ const init: PromiseVoidFunction = async (): Promise<void> => {
await Realtime.init();
// Initialize telemetry processing worker
logger.debug("Initializing telemetry processing worker...");
logger.debug(`Telemetry worker initialized: ${ProcessTelemetryWorker ? 'success' : 'failed'}`);
// add default routes
await App.addDefaultRoutes();
} catch (err) {

View File

@@ -1,4 +1,7 @@
import { TelemetryIngestJobData, TelemetryType } from "../../Services/Queue/TelemetryQueueService";
import {
TelemetryIngestJobData,
TelemetryType,
} from "../../Services/Queue/TelemetryQueueService";
import OtelIngestService from "../../Services/OtelIngest";
import { TelemetryRequest } from "Common/Server/Middleware/TelemetryIngest";
import logger from "Common/Server/Utils/Logger";
@@ -7,16 +10,17 @@ import QueueWorker from "Common/Server/Infrastructure/QueueWorker";
import ObjectID from "Common/Types/ObjectID";
// Set up the unified worker for processing telemetry queue
const worker = QueueWorker.getWorker(
QueueWorker.getWorker(
QueueName.Telemetry,
async (job: QueueJob): Promise<void> => {
logger.debug(`Processing telemetry ingestion job: ${job.name}`);
try {
const jobData = job.data as TelemetryIngestJobData;
const jobData: TelemetryIngestJobData =
job.data as TelemetryIngestJobData;
// Create a mock request object with the queued data
const mockRequest = {
const mockRequest: TelemetryRequest = {
projectId: new ObjectID(jobData.projectId.toString()),
body: jobData.requestBody,
headers: jobData.requestHeaders,
@@ -26,19 +30,25 @@ const worker = QueueWorker.getWorker(
switch (jobData.type) {
case TelemetryType.Logs:
await OtelIngestService.processLogsFromQueue(mockRequest);
logger.debug(`Successfully processed logs for project: ${jobData.projectId}`);
logger.debug(
`Successfully processed logs for project: ${jobData.projectId}`,
);
break;
case TelemetryType.Traces:
await OtelIngestService.processTracesFromQueue(mockRequest);
logger.debug(`Successfully processed traces for project: ${jobData.projectId}`);
logger.debug(
`Successfully processed traces for project: ${jobData.projectId}`,
);
break;
case TelemetryType.Metrics:
await OtelIngestService.processMetricsFromQueue(mockRequest);
logger.debug(`Successfully processed metrics for project: ${jobData.projectId}`);
logger.debug(
`Successfully processed metrics for project: ${jobData.projectId}`,
);
break;
default:
throw new Error(`Unknown telemetry type: ${jobData.type}`);
}
@@ -52,5 +62,3 @@ const worker = QueueWorker.getWorker(
);
logger.debug("Unified telemetry worker initialized");
export default worker;

View File

@@ -115,10 +115,8 @@ export default class OtelIngestService {
// Return response immediately
Response.sendEmptySuccessResponse(req, res);
// Add to queue for asynchronous processing
await LogsQueueService.addLogIngestJob(req as TelemetryRequest);
// Add to queue for asynchronous processing
await LogsQueueService.addLogIngestJob(req as TelemetryRequest);
return;
} catch (err) {
@@ -134,14 +132,18 @@ export default class OtelIngestService {
}
@CaptureSpan()
public static async processTracesFromQueue(req: ExpressRequest): Promise<void> {
public static async processTracesFromQueue(
req: ExpressRequest,
): Promise<void> {
// This method is specifically for queue processing
// It bypasses the response handling since the response was already sent
await this.processTracesAsync(req);
}
@CaptureSpan()
public static async processMetricsFromQueue(req: ExpressRequest): Promise<void> {
public static async processMetricsFromQueue(
req: ExpressRequest,
): Promise<void> {
// This method is specifically for queue processing
// It bypasses the response handling since the response was already sent
await this.processMetricsAsync(req);
@@ -358,10 +360,9 @@ export default class OtelIngestService {
// Return response immediately
Response.sendEmptySuccessResponse(req, res);
// Add to queue for asynchronous processing
await MetricsQueueService.addMetricIngestJob(req as TelemetryRequest);
// Add to queue for asynchronous processing
await MetricsQueueService.addMetricIngestJob(req as TelemetryRequest);
return;
} catch (err) {
return next(err);
@@ -613,10 +614,8 @@ export default class OtelIngestService {
// Return response immediately
Response.sendEmptySuccessResponse(req, res);
// Add to queue for asynchronous processing
await TracesQueueService.addTraceIngestJob(req as TelemetryRequest);
// Add to queue for asynchronous processing
await TracesQueueService.addTraceIngestJob(req as TelemetryRequest);
return;
} catch (err) {

View File

@@ -1,15 +1,13 @@
import { TelemetryRequest } from "Common/Server/Middleware/TelemetryIngest";
import TelemetryQueueService, {
LogsIngestJobData
import TelemetryQueueService, {
LogsIngestJobData,
} from "./TelemetryQueueService";
// Export the interface for backward compatibility
export { LogsIngestJobData };
export default class LogsQueueService {
public static async addLogIngestJob(
req: TelemetryRequest,
): Promise<void> {
public static async addLogIngestJob(req: TelemetryRequest): Promise<void> {
return TelemetryQueueService.addLogIngestJob(req);
}
}

View File

@@ -1,15 +1,13 @@
import { TelemetryRequest } from "Common/Server/Middleware/TelemetryIngest";
import TelemetryQueueService, {
MetricsIngestJobData
import TelemetryQueueService, {
MetricsIngestJobData,
} from "./TelemetryQueueService";
// Export the interface for backward compatibility
export { MetricsIngestJobData };
export default class MetricsQueueService {
public static async addMetricIngestJob(
req: TelemetryRequest,
): Promise<void> {
public static async addMetricIngestJob(req: TelemetryRequest): Promise<void> {
return TelemetryQueueService.addMetricIngestJob(req);
}
}

View File

@@ -45,7 +45,7 @@ export default class TelemetryQueueService {
ingestionTimestamp: OneUptimeDate.getCurrentDate(),
};
const jobId = `${type}-${req.projectId?.toString()}-${OneUptimeDate.getCurrentDateAsUnixNano()}`;
const jobId: string = `${type}-${req.projectId?.toString()}-${OneUptimeDate.getCurrentDateAsUnixNano()}`;
await Queue.addJob(
QueueName.Telemetry,
@@ -88,4 +88,21 @@ export default class TelemetryQueueService {
}> {
return Queue.getQueueStats(QueueName.Telemetry);
}
public static getFailedJobs(options?: {
start?: number;
end?: number;
}): Promise<
Array<{
id: string;
name: string;
data: JSONObject;
failedReason: string;
processedOn: Date | null;
finishedOn: Date | null;
attemptsMade: number;
}>
> {
return Queue.getFailedJobs(QueueName.Telemetry, options);
}
}

View File

@@ -1,15 +1,13 @@
import { TelemetryRequest } from "Common/Server/Middleware/TelemetryIngest";
import TelemetryQueueService, {
TracesIngestJobData
import TelemetryQueueService, {
TracesIngestJobData,
} from "./TelemetryQueueService";
// Export the interface for backward compatibility
export { TracesIngestJobData };
export default class TracesQueueService {
public static async addTraceIngestJob(
req: TelemetryRequest,
): Promise<void> {
public static async addTraceIngestJob(req: TelemetryRequest): Promise<void> {
return TelemetryQueueService.addTraceIngestJob(req);
}
}

70
Probe/API/Metrics.ts Normal file
View File

@@ -0,0 +1,70 @@
import Express, {
ExpressRequest,
ExpressResponse,
ExpressRouter,
NextFunction,
} from "Common/Server/Utils/Express";
import Response from "Common/Server/Utils/Response";
import { PROBE_INGEST_URL } from "../Config";
import HTTPErrorResponse from "Common/Types/API/HTTPErrorResponse";
import HTTPMethod from "Common/Types/API/HTTPMethod";
import HTTPResponse from "Common/Types/API/HTTPResponse";
import URL from "Common/Types/API/URL";
import { JSONObject } from "Common/Types/JSON";
import API from "Common/Utils/API";
import logger from "Common/Server/Utils/Logger";
import ProbeAPIRequest from "../Utils/ProbeAPIRequest";
const router: ExpressRouter = Express.getRouter();
// Metrics endpoint for Keda autoscaling
router.get(
"/queue-size",
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
// Get the pending monitor count for this specific probe from ProbeIngest API
const queueSizeUrl: URL = URL.fromString(
PROBE_INGEST_URL.toString(),
).addRoute("/metrics/queue-size");
logger.debug("Fetching queue size from ProbeIngest API");
// Use probe authentication (probe key and probe ID)
const requestBody: JSONObject = ProbeAPIRequest.getDefaultRequestBody();
const result: HTTPResponse<JSONObject> | HTTPErrorResponse =
await API.fetch<JSONObject>(
HTTPMethod.POST,
queueSizeUrl,
requestBody,
{}
);
if (result instanceof HTTPErrorResponse) {
logger.error("Error fetching queue size from ProbeIngest API");
logger.error(result);
throw result;
}
// Extract queueSize from the response
const queueSize = result.data["queueSize"] || 0;
logger.debug(`Queue size fetched: ${queueSize}`);
return Response.sendJsonObjectResponse(req, res, {
queueSize: queueSize,
});
} catch (err) {
logger.error("Error in metrics queue-size endpoint");
logger.error(err);
return next(err);
}
},
);
export default router;

View File

@@ -3,10 +3,12 @@ import AliveJob from "./Jobs/Alive";
import FetchMonitorList from "./Jobs/Monitor/FetchList";
import FetchMonitorTestList from "./Jobs/Monitor/FetchMonitorTest";
import Register from "./Services/Register";
import MetricsAPI from "./API/Metrics";
import { PromiseVoidFunction } from "Common/Types/FunctionTypes";
import logger from "Common/Server/Utils/Logger";
import App from "Common/Server/Utils/StartServer";
import Telemetry from "Common/Server/Utils/Telemetry";
import Express, { ExpressApplication } from "Common/Server/Utils/Express";
import "ejs";
const APP_NAME: string = "probe";
@@ -29,6 +31,10 @@ const init: PromiseVoidFunction = async (): Promise<void> => {
},
});
// Add metrics API routes
const app: ExpressApplication = Express.getExpressApp();
app.use("/metrics", MetricsAPI);
// add default routes
await App.addDefaultRoutes();

View File

@@ -0,0 +1,37 @@
import Express, {
ExpressRequest,
ExpressResponse,
ExpressRouter,
NextFunction,
} from "Common/Server/Utils/Express";
import ProbeIngestQueueService from "../Services/Queue/ProbeIngestQueueService";
// import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
const router: ExpressRouter = Express.getRouter();
/**
* JSON metrics endpoint for KEDA autoscaling
* Returns queue size as JSON for KEDA metrics-api scaler
*/
router.get(
"/metrics/queue-size",
// ClusterKeyAuthorization.isAuthorizedServiceMiddleware, // Temporarily disabled for KEDA debugging
async (
_req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
const queueSize: number = await ProbeIngestQueueService.getQueueSize();
res.setHeader("Content-Type", "application/json");
res.status(200).json({
queueSize: queueSize,
});
} catch (err) {
return next(err);
}
},
);
export default router;

View File

@@ -18,13 +18,17 @@ import Express, {
NextFunction,
} from "Common/Server/Utils/Express";
import logger from "Common/Server/Utils/Logger";
import MonitorResourceUtil from "Common/Server/Utils/Monitor/MonitorResource";
import Response from "Common/Server/Utils/Response";
import GlobalConfig from "Common/Models/DatabaseModels/GlobalConfig";
import Probe from "Common/Models/DatabaseModels/Probe";
import User from "Common/Models/DatabaseModels/User";
import MonitorTestService from "Common/Server/Services/MonitorTestService";
import ProbeIngestQueueService from "../Services/Queue/ProbeIngestQueueService";
import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
import PositiveNumber from "Common/Types/PositiveNumber";
import MonitorProbeService from "Common/Server/Services/MonitorProbeService";
import QueryHelper from "Common/Server/Types/Database/QueryHelper";
import OneUptimeDate from "Common/Types/Date";
import MonitorService from "Common/Server/Services/MonitorService";
const router: ExpressRouter = Express.getRouter();
@@ -255,17 +259,18 @@ router.post(
);
}
// this is when the resource was ingested.
probeResponse.ingestedAt = OneUptimeDate.getCurrentDate();
MonitorResourceUtil.monitorResource(probeResponse).catch((err: Error) => {
logger.error("Error in monitor resource");
logger.error(err);
});
return Response.sendJsonObjectResponse(req, res, {
// Return response immediately
Response.sendJsonObjectResponse(req, res, {
result: "processing",
});
// Add to queue for asynchronous processing
await ProbeIngestQueueService.addProbeIngestJob({
probeMonitorResponse: req.body,
jobType: "probe-response",
});
return;
} catch (err) {
return next(err);
}
@@ -303,28 +308,155 @@ router.post(
);
}
probeResponse.ingestedAt = OneUptimeDate.getCurrentDate();
// Return response immediately
Response.sendEmptySuccessResponse(req, res);
// save the probe response to the monitor test.
// Add to queue for asynchronous processing
await ProbeIngestQueueService.addProbeIngestJob({
probeMonitorResponse: req.body,
jobType: "monitor-test",
testId: testId.toString(),
});
await MonitorTestService.updateOneById({
id: testId,
data: {
monitorStepProbeResponse: {
[probeResponse.monitorStepId.toString()]: {
...JSON.parse(JSON.stringify(probeResponse)),
monitoredAt: OneUptimeDate.getCurrentDate(),
},
} as any,
return;
} catch (err) {
return next(err);
}
},
);
// Queue stats endpoint
router.get(
"/probe/queue/stats",
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
const stats: {
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
total: number;
} = await ProbeIngestQueueService.getQueueStats();
return Response.sendJsonObjectResponse(req, res, stats);
} catch (err) {
return next(err);
}
},
);
// Queue size endpoint
router.get(
"/probe/queue/size",
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
const size: number = await ProbeIngestQueueService.getQueueSize();
return Response.sendJsonObjectResponse(req, res, { size });
} catch (err) {
return next(err);
}
},
);
// Queue size endpoint for Keda autoscaling (returns pending monitors count for specific probe)
router.post(
"/metrics/queue-size",
ProbeAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
// This endpoint returns the number of monitors pending for the specific probe
// to be used by Keda for autoscaling probe replicas
// Get the probe ID from the authenticated request
const data: JSONObject = req.body;
const probeId: ObjectID = new ObjectID(data["probeId"] as string);
if (!probeId) {
return Response.sendErrorResponse(
req,
res,
new BadDataException("Probe ID not found"),
);
}
// Get pending monitor count for this specific probe
const pendingCount: PositiveNumber = await MonitorProbeService.countBy({
query: {
probeId: probeId,
isEnabled: true,
nextPingAt: QueryHelper.lessThanEqualToOrNull(
OneUptimeDate.getCurrentDate(),
),
monitor: {
...MonitorService.getEnabledMonitorQuery(),
},
project: {
...ProjectService.getActiveProjectStatusQuery(),
},
},
props: {
isRoot: true,
},
});
// send success response.
return Response.sendJsonObjectResponse(req, res, {
queueSize: pendingCount.toNumber()
});
} catch (err) {
return next(err);
}
},
);
return Response.sendEmptySuccessResponse(req, res);
// Queue failed jobs endpoint
router.get(
"/probe/queue/failed",
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
// Parse pagination parameters from query string
const start: number = parseInt(req.query["start"] as string) || 0;
const end: number = parseInt(req.query["end"] as string) || 100;
const failedJobs: Array<{
id: string;
name: string;
data: any;
failedReason: string;
processedOn: Date | null;
finishedOn: Date | null;
attemptsMade: number;
}> = await ProbeIngestQueueService.getFailedJobs({
start,
end,
});
return Response.sendJsonObjectResponse(req, res, {
failedJobs,
pagination: {
start,
end,
count: failedJobs.length,
},
});
} catch (err) {
return next(err);
}

View File

@@ -1,6 +1,7 @@
import MonitorAPI from "./API/Monitor";
import ProbeIngest from "./API/Probe";
import RegisterAPI from "./API/Register";
import MetricsAPI from "./API/Metrics";
import { PromiseVoidFunction } from "Common/Types/FunctionTypes";
import { ClickhouseAppInstance } from "Common/Server/Infrastructure/ClickhouseDatabase";
import PostgresAppInstance from "Common/Server/Infrastructure/PostgresDatabase";
@@ -11,6 +12,7 @@ import logger from "Common/Server/Utils/Logger";
import Realtime from "Common/Server/Utils/Realtime";
import App from "Common/Server/Utils/StartServer";
import Telemetry from "Common/Server/Utils/Telemetry";
import "./Jobs/ProbeIngest/ProcessProbeIngest";
import "ejs";
const app: ExpressApplication = Express.getExpressApp();
@@ -21,6 +23,7 @@ const APP_NAME: string = "probe-ingest";
app.use([`/${APP_NAME}`, "/ingestor", "/"], RegisterAPI);
app.use([`/${APP_NAME}`, "/ingestor", "/"], MonitorAPI);
app.use([`/${APP_NAME}`, "/ingestor", "/"], ProbeIngest);
app.use([`/${APP_NAME}`, "/"], MetricsAPI);
const init: PromiseVoidFunction = async (): Promise<void> => {
try {

View File

@@ -0,0 +1,82 @@
import { ProbeIngestJobData } from "../../Services/Queue/ProbeIngestQueueService";
import logger from "Common/Server/Utils/Logger";
import { QueueJob, QueueName } from "Common/Server/Infrastructure/Queue";
import QueueWorker from "Common/Server/Infrastructure/QueueWorker";
import BadDataException from "Common/Types/Exception/BadDataException";
import JSONFunctions from "Common/Types/JSONFunctions";
import ObjectID from "Common/Types/ObjectID";
import MonitorResourceUtil from "Common/Server/Utils/Monitor/MonitorResource";
import OneUptimeDate from "Common/Types/Date";
import MonitorTestService from "Common/Server/Services/MonitorTestService";
import ProbeMonitorResponse from "Common/Types/Probe/ProbeMonitorResponse";
import { JSONObject } from "Common/Types/JSON";
// Set up the worker for processing probe ingest queue
QueueWorker.getWorker(
QueueName.ProbeIngest,
async (job: QueueJob): Promise<void> => {
logger.debug(`Processing probe ingestion job: ${job.name}`);
try {
const jobData: ProbeIngestJobData = job.data as ProbeIngestJobData;
await processProbeFromQueue(jobData);
logger.debug(`Successfully processed probe ingestion job: ${job.name}`);
} catch (error) {
logger.error(`Error processing probe ingestion job:`);
logger.error(error);
throw error;
}
},
{ concurrency: 20 }, // Process up to 20 probe ingest jobs concurrently
);
async function processProbeFromQueue(
jobData: ProbeIngestJobData,
): Promise<void> {
const probeResponse: ProbeMonitorResponse = JSONFunctions.deserialize(
jobData.probeMonitorResponse["probeMonitorResponse"] as JSONObject,
) as any;
if (!probeResponse) {
throw new BadDataException("ProbeMonitorResponse not found");
}
// this is when the resource was ingested.
probeResponse.ingestedAt = OneUptimeDate.getCurrentDate();
if (jobData.jobType === "probe-response") {
// Handle regular probe response
await MonitorResourceUtil.monitorResource(probeResponse);
} else if (jobData.jobType === "monitor-test" && jobData.testId) {
// Handle monitor test response
const testId: ObjectID = new ObjectID(jobData.testId);
if (!testId) {
throw new BadDataException("TestId not found");
}
probeResponse.ingestedAt = OneUptimeDate.getCurrentDate();
// save the probe response to the monitor test.
await MonitorTestService.updateOneById({
id: testId,
data: {
monitorStepProbeResponse: {
[probeResponse.monitorStepId.toString()]: {
...JSON.parse(JSON.stringify(probeResponse)),
monitoredAt: OneUptimeDate.getCurrentDate(),
},
} as any,
},
props: {
isRoot: true,
},
});
} else {
throw new BadDataException(`Invalid job type: ${jobData.jobType}`);
}
}
logger.debug("Probe ingest worker initialized");

View File

@@ -0,0 +1,75 @@
import Queue, { QueueName } from "Common/Server/Infrastructure/Queue";
import { JSONObject } from "Common/Types/JSON";
import OneUptimeDate from "Common/Types/Date";
import logger from "Common/Server/Utils/Logger";
export interface ProbeIngestJobData {
probeMonitorResponse: JSONObject;
jobType: "probe-response" | "monitor-test";
testId?: string | undefined;
ingestionTimestamp: Date;
}
export default class ProbeIngestQueueService {
public static async addProbeIngestJob(data: {
probeMonitorResponse: JSONObject;
jobType: "probe-response" | "monitor-test";
testId?: string;
}): Promise<void> {
try {
const jobData: ProbeIngestJobData = {
probeMonitorResponse: data.probeMonitorResponse,
jobType: data.jobType,
testId: data.testId,
ingestionTimestamp: OneUptimeDate.getCurrentDate(),
};
const jobId: string = `probe-${data.jobType}-${data.testId || "general"}-${OneUptimeDate.getCurrentDateAsUnixNano()}`;
await Queue.addJob(
QueueName.ProbeIngest,
jobId,
"ProcessProbeIngest",
jobData as unknown as JSONObject,
);
logger.debug(`Added probe ingestion job: ${jobId}`);
} catch (error) {
logger.error(`Error adding probe ingestion job:`);
logger.error(error);
throw error;
}
}
public static async getQueueSize(): Promise<number> {
return Queue.getQueueSize(QueueName.ProbeIngest);
}
public static async getQueueStats(): Promise<{
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
total: number;
}> {
return Queue.getQueueStats(QueueName.ProbeIngest);
}
public static getFailedJobs(options?: {
start?: number;
end?: number;
}): Promise<
Array<{
id: string;
name: string;
data: JSONObject;
failedReason: string;
processedOn: Date | null;
finishedOn: Date | null;
attemptsMade: number;
}>
> {
return Queue.getFailedJobs(QueueName.ProbeIngest, options);
}
}

View File

@@ -0,0 +1,38 @@
import Express, {
ExpressRequest,
ExpressResponse,
ExpressRouter,
NextFunction,
} from "Common/Server/Utils/Express";
import ServerMonitorIngestQueueService from "../Services/Queue/ServerMonitorIngestQueueService";
// import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
const router: ExpressRouter = Express.getRouter();
/**
* JSON metrics endpoint for KEDA autoscaling
* Returns queue size as JSON for KEDA metrics-api scaler
*/
router.get(
"/metrics/queue-size",
// ClusterKeyAuthorization.isAuthorizedServiceMiddleware, // Temporarily disabled for KEDA debugging
async (
_req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
const queueSize: number =
await ServerMonitorIngestQueueService.getQueueSize();
res.setHeader("Content-Type", "application/json");
res.status(200).json({
queueSize: queueSize,
});
} catch (err) {
return next(err);
}
},
);
export default router;

View File

@@ -1,8 +1,6 @@
import BadDataException from "Common/Types/Exception/BadDataException";
import { JSONObject } from "Common/Types/JSON";
import JSONFunctions from "Common/Types/JSONFunctions";
import MonitorType from "Common/Types/Monitor/MonitorType";
import ServerMonitorResponse from "Common/Types/Monitor/ServerMonitor/ServerMonitorResponse";
import ObjectID from "Common/Types/ObjectID";
import MonitorService from "Common/Server/Services/MonitorService";
import Express, {
@@ -11,11 +9,11 @@ import Express, {
ExpressRouter,
NextFunction,
} from "Common/Server/Utils/Express";
import MonitorResourceUtil from "Common/Server/Utils/Monitor/MonitorResource";
import Response from "Common/Server/Utils/Response";
import Monitor from "Common/Models/DatabaseModels/Monitor";
import OneUptimeDate from "Common/Types/Date";
import ProjectService from "Common/Server/Services/ProjectService";
import ServerMonitorIngestQueueService from "../Services/Queue/ServerMonitorIngestQueueService";
import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
const router: ExpressRouter = Express.getRouter();
@@ -77,52 +75,100 @@ router.post(
throw new BadDataException("Invalid Secret Key");
}
const monitor: Monitor | null = await MonitorService.findOneBy({
query: {
serverMonitorSecretKey: new ObjectID(monitorSecretKeyAsString),
monitorType: MonitorType.Server,
...MonitorService.getEnabledMonitorQuery(),
project: {
...ProjectService.getActiveProjectStatusQuery(),
},
},
select: {
_id: true,
},
props: {
isRoot: true,
},
});
if (!monitor) {
throw new BadDataException("Monitor not found");
}
// return the response early.
Response.sendEmptySuccessResponse(req, res);
// now process this request.
// Add to queue for asynchronous processing
await ServerMonitorIngestQueueService.addServerMonitorIngestJob({
secretKey: monitorSecretKeyAsString,
serverMonitorResponse: req.body as JSONObject,
});
const serverMonitorResponse: ServerMonitorResponse =
JSONFunctions.deserialize(
req.body["serverMonitorResponse"] as JSONObject,
) as any;
return;
} catch (err) {
return next(err);
}
},
);
if (!serverMonitorResponse) {
throw new BadDataException("Invalid Server Monitor Response");
}
// Queue stats endpoint
router.get(
"/server-monitor/queue/stats",
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
const stats: {
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
total: number;
} = await ServerMonitorIngestQueueService.getQueueStats();
return Response.sendJsonObjectResponse(req, res, stats);
} catch (err) {
return next(err);
}
},
);
if (!monitor.id) {
throw new BadDataException("Monitor id not found");
}
// Queue size endpoint
router.get(
"/server-monitor/queue/size",
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
const size: number = await ServerMonitorIngestQueueService.getQueueSize();
return Response.sendJsonObjectResponse(req, res, { size });
} catch (err) {
return next(err);
}
},
);
serverMonitorResponse.monitorId = monitor.id;
// Queue failed jobs endpoint
router.get(
"/server-monitor/queue/failed",
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
async (
req: ExpressRequest,
res: ExpressResponse,
next: NextFunction,
): Promise<void> => {
try {
// Parse pagination parameters from query string
const start: number = parseInt(req.query["start"] as string) || 0;
const end: number = parseInt(req.query["end"] as string) || 100;
serverMonitorResponse.requestReceivedAt = OneUptimeDate.getCurrentDate();
serverMonitorResponse.timeNow = OneUptimeDate.getCurrentDate();
const failedJobs: Array<{
id: string;
name: string;
data: any;
failedReason: string;
processedOn: Date | null;
finishedOn: Date | null;
attemptsMade: number;
}> = await ServerMonitorIngestQueueService.getFailedJobs({
start,
end,
});
// process probe response here.
await MonitorResourceUtil.monitorResource(serverMonitorResponse);
return Response.sendJsonObjectResponse(req, res, {
failedJobs,
pagination: {
start,
end,
count: failedJobs.length,
},
});
} catch (err) {
return next(err);
}

View File

@@ -1,4 +1,5 @@
import ServerMonitorAPI from "./API/ServerMonitor";
import MetricsAPI from "./API/Metrics";
import { PromiseVoidFunction } from "Common/Types/FunctionTypes";
import { ClickhouseAppInstance } from "Common/Server/Infrastructure/ClickhouseDatabase";
import PostgresAppInstance from "Common/Server/Infrastructure/PostgresDatabase";
@@ -9,12 +10,14 @@ import logger from "Common/Server/Utils/Logger";
import Realtime from "Common/Server/Utils/Realtime";
import App from "Common/Server/Utils/StartServer";
import Telemetry from "Common/Server/Utils/Telemetry";
import "./Jobs/ServerMonitorIngest/ProcessServerMonitorIngest";
const app: ExpressApplication = Express.getExpressApp();
const APP_NAME: string = "server-monitor-ingest";
app.use([`/${APP_NAME}`, "/"], ServerMonitorAPI);
app.use([`/${APP_NAME}`, "/"], MetricsAPI);
const init: PromiseVoidFunction = async (): Promise<void> => {
try {

View File

@@ -0,0 +1,92 @@
import { ServerMonitorIngestJobData } from "../../Services/Queue/ServerMonitorIngestQueueService";
import logger from "Common/Server/Utils/Logger";
import { QueueJob, QueueName } from "Common/Server/Infrastructure/Queue";
import QueueWorker from "Common/Server/Infrastructure/QueueWorker";
import BadDataException from "Common/Types/Exception/BadDataException";
import { JSONObject } from "Common/Types/JSON";
import JSONFunctions from "Common/Types/JSONFunctions";
import MonitorType from "Common/Types/Monitor/MonitorType";
import ServerMonitorResponse from "Common/Types/Monitor/ServerMonitor/ServerMonitorResponse";
import ObjectID from "Common/Types/ObjectID";
import MonitorService from "Common/Server/Services/MonitorService";
import MonitorResourceUtil from "Common/Server/Utils/Monitor/MonitorResource";
import Monitor from "Common/Models/DatabaseModels/Monitor";
import OneUptimeDate from "Common/Types/Date";
import ProjectService from "Common/Server/Services/ProjectService";
// Set up the worker for processing server monitor ingest queue
QueueWorker.getWorker(
QueueName.ServerMonitorIngest,
async (job: QueueJob): Promise<void> => {
logger.debug(`Processing server monitor ingestion job: ${job.name}`);
try {
const jobData: ServerMonitorIngestJobData =
job.data as ServerMonitorIngestJobData;
await processServerMonitorFromQueue(jobData);
logger.debug(
`Successfully processed server monitor ingestion job: ${job.name}`,
);
} catch (error) {
logger.error(`Error processing server monitor ingestion job:`);
logger.error(error);
throw error;
}
},
{ concurrency: 20 }, // Process up to 20 server monitor ingest jobs concurrently
);
async function processServerMonitorFromQueue(
jobData: ServerMonitorIngestJobData,
): Promise<void> {
const monitorSecretKeyAsString: string = jobData.secretKey;
if (!monitorSecretKeyAsString) {
throw new BadDataException("Invalid Secret Key");
}
const monitor: Monitor | null = await MonitorService.findOneBy({
query: {
serverMonitorSecretKey: new ObjectID(monitorSecretKeyAsString),
monitorType: MonitorType.Server,
...MonitorService.getEnabledMonitorQuery(),
project: {
...ProjectService.getActiveProjectStatusQuery(),
},
},
select: {
_id: true,
},
props: {
isRoot: true,
},
});
if (!monitor) {
throw new BadDataException("Monitor not found");
}
const serverMonitorResponse: ServerMonitorResponse =
JSONFunctions.deserialize(
jobData.serverMonitorResponse["serverMonitorResponse"] as JSONObject,
) as any;
if (!serverMonitorResponse) {
throw new BadDataException("Invalid Server Monitor Response");
}
if (!monitor.id) {
throw new BadDataException("Monitor id not found");
}
serverMonitorResponse.monitorId = monitor.id;
serverMonitorResponse.requestReceivedAt = OneUptimeDate.getCurrentDate();
serverMonitorResponse.timeNow = OneUptimeDate.getCurrentDate();
// process probe response here.
await MonitorResourceUtil.monitorResource(serverMonitorResponse);
}
logger.debug("Server monitor ingest worker initialized");

View File

@@ -0,0 +1,72 @@
import Queue, { QueueName } from "Common/Server/Infrastructure/Queue";
import { JSONObject } from "Common/Types/JSON";
import OneUptimeDate from "Common/Types/Date";
import logger from "Common/Server/Utils/Logger";
export interface ServerMonitorIngestJobData {
secretKey: string;
serverMonitorResponse: JSONObject;
ingestionTimestamp: Date;
}
export default class ServerMonitorIngestQueueService {
public static async addServerMonitorIngestJob(data: {
secretKey: string;
serverMonitorResponse: JSONObject;
}): Promise<void> {
try {
const jobData: ServerMonitorIngestJobData = {
secretKey: data.secretKey,
serverMonitorResponse: data.serverMonitorResponse,
ingestionTimestamp: OneUptimeDate.getCurrentDate(),
};
const jobId: string = `server-monitor-${data.secretKey}-${OneUptimeDate.getCurrentDateAsUnixNano()}`;
await Queue.addJob(
QueueName.ServerMonitorIngest,
jobId,
"ProcessServerMonitorIngest",
jobData as unknown as JSONObject,
);
logger.debug(`Added server monitor ingestion job: ${jobId}`);
} catch (error) {
logger.error(`Error adding server monitor ingestion job:`);
logger.error(error);
throw error;
}
}
public static async getQueueSize(): Promise<number> {
return Queue.getQueueSize(QueueName.ServerMonitorIngest);
}
public static async getQueueStats(): Promise<{
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
total: number;
}> {
return Queue.getQueueStats(QueueName.ServerMonitorIngest);
}
public static getFailedJobs(options?: {
start?: number;
end?: number;
}): Promise<
Array<{
id: string;
name: string;
data: JSONObject;
failedReason: string;
processedOn: Date | null;
finishedOn: Date | null;
attemptsMade: number;
}>
> {
return Queue.getFailedJobs(QueueName.ServerMonitorIngest, options);
}
}