mirror of
https://github.com/OneUptime/oneuptime.git
synced 2026-04-06 08:42:13 +02:00
Compare commits
35 Commits
telemetry-
...
probe-queu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c4c6793b29 | ||
|
|
c894b112e6 | ||
|
|
304baf1bb4 | ||
|
|
9adea6b1ba | ||
|
|
5498521e02 | ||
|
|
9e97c6ddbc | ||
|
|
63272e09f8 | ||
|
|
327c28afdc | ||
|
|
896020b93b | ||
|
|
15a68472b0 | ||
|
|
0210480d97 | ||
|
|
72fdc06687 | ||
|
|
3710b81b9a | ||
|
|
9fcb3dc2e0 | ||
|
|
43e2ccf51a | ||
|
|
48c3d8603a | ||
|
|
9cfc912161 | ||
|
|
29e3ee57ab | ||
|
|
be7e849822 | ||
|
|
59d76b601a | ||
|
|
b77ef336b8 | ||
|
|
7df21fe8e5 | ||
|
|
f39e1943c7 | ||
|
|
966a903646 | ||
|
|
1d9d37c6d1 | ||
|
|
7edcc4dbce | ||
|
|
0939294d22 | ||
|
|
dbcbfe5f79 | ||
|
|
a638972817 | ||
|
|
37c6310465 | ||
|
|
a7d38389fd | ||
|
|
2f55336db7 | ||
|
|
f99a15b95b | ||
|
|
de5bff2ffe | ||
|
|
cef2764499 |
3
.github/workflows/release.yml
vendored
3
.github/workflows/release.yml
vendored
@@ -70,7 +70,7 @@ jobs:
|
||||
|
||||
publish-mcp-server:
|
||||
runs-on: ubuntu-latest
|
||||
needs: [generate-build-number]
|
||||
needs: [generate-build-number, publish-npm-packages]
|
||||
env:
|
||||
CI_PIPELINE_ID: ${{ github.run_number }}
|
||||
NPM_AUTH_TOKEN: ${{ secrets.NPM_AUTH_TOKEN }}
|
||||
@@ -138,6 +138,7 @@ jobs:
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
cd MCP
|
||||
npm update @oneuptime/common
|
||||
npm install
|
||||
|
||||
- name: Build MCP server
|
||||
|
||||
1
.github/workflows/test-release.yaml
vendored
1
.github/workflows/test-release.yaml
vendored
@@ -144,6 +144,7 @@ jobs:
|
||||
- name: Install dependencies and build
|
||||
run: |
|
||||
cd MCP
|
||||
npm update @oneuptime/common
|
||||
npm install
|
||||
npm run build
|
||||
|
||||
|
||||
@@ -17,6 +17,10 @@ export enum QueueName {
|
||||
Workflow = "Workflow",
|
||||
Worker = "Worker",
|
||||
Telemetry = "Telemetry",
|
||||
FluentIngest = "FluentIngest",
|
||||
IncomingRequestIngest = "IncomingRequestIngest",
|
||||
ServerMonitorIngest = "ServerMonitorIngest",
|
||||
ProbeIngest = "ProbeIngest",
|
||||
}
|
||||
|
||||
export type QueueJob = Job;
|
||||
@@ -137,12 +141,12 @@ export default class Queue {
|
||||
|
||||
@CaptureSpan()
|
||||
public static async getQueueSize(queueName: QueueName): Promise<number> {
|
||||
const queue = this.getQueue(queueName);
|
||||
const waiting = await queue.getWaiting();
|
||||
const active = await queue.getActive();
|
||||
const delayed = await queue.getDelayed();
|
||||
|
||||
return waiting.length + active.length + delayed.length;
|
||||
const queue: BullQueue = this.getQueue(queueName);
|
||||
const waitingCount: number = await queue.getWaitingCount();
|
||||
const activeCount: number = await queue.getActiveCount();
|
||||
const delayedCount: number = await queue.getDelayedCount();
|
||||
|
||||
return waitingCount + activeCount + delayedCount;
|
||||
}
|
||||
|
||||
@CaptureSpan()
|
||||
@@ -154,20 +158,61 @@ export default class Queue {
|
||||
delayed: number;
|
||||
total: number;
|
||||
}> {
|
||||
const queue = this.getQueue(queueName);
|
||||
const waiting = await queue.getWaiting();
|
||||
const active = await queue.getActive();
|
||||
const completed = await queue.getCompleted();
|
||||
const failed = await queue.getFailed();
|
||||
const delayed = await queue.getDelayed();
|
||||
|
||||
const queue: BullQueue = this.getQueue(queueName);
|
||||
const waitingCount: number = await queue.getWaitingCount();
|
||||
const activeCount: number = await queue.getActiveCount();
|
||||
const completedCount: number = await queue.getCompletedCount();
|
||||
const failedCount: number = await queue.getFailedCount();
|
||||
const delayedCount: number = await queue.getDelayedCount();
|
||||
|
||||
return {
|
||||
waiting: waiting.length,
|
||||
active: active.length,
|
||||
completed: completed.length,
|
||||
failed: failed.length,
|
||||
delayed: delayed.length,
|
||||
total: waiting.length + active.length + completed.length + failed.length + delayed.length,
|
||||
waiting: waitingCount,
|
||||
active: activeCount,
|
||||
completed: completedCount,
|
||||
failed: failedCount,
|
||||
delayed: delayedCount,
|
||||
total:
|
||||
waitingCount +
|
||||
activeCount +
|
||||
completedCount +
|
||||
failedCount +
|
||||
delayedCount,
|
||||
};
|
||||
}
|
||||
|
||||
@CaptureSpan()
|
||||
public static async getFailedJobs(
|
||||
queueName: QueueName,
|
||||
options?: {
|
||||
start?: number;
|
||||
end?: number;
|
||||
},
|
||||
): Promise<
|
||||
Array<{
|
||||
id: string;
|
||||
name: string;
|
||||
data: JSONObject;
|
||||
failedReason: string;
|
||||
processedOn: Date | null;
|
||||
finishedOn: Date | null;
|
||||
attemptsMade: number;
|
||||
}>
|
||||
> {
|
||||
const queue: BullQueue = this.getQueue(queueName);
|
||||
const start: number = options?.start || 0;
|
||||
const end: number = options?.end || 100;
|
||||
const failed: Job[] = await queue.getFailed(start, end);
|
||||
|
||||
return failed.map((job: Job) => {
|
||||
return {
|
||||
id: job.id || "unknown",
|
||||
name: job.name || "unknown",
|
||||
data: job.data as JSONObject,
|
||||
failedReason: job.failedReason || "No reason provided",
|
||||
processedOn: job.processedOn ? new Date(job.processedOn) : null,
|
||||
finishedOn: job.finishedOn ? new Date(job.finishedOn) : null,
|
||||
attemptsMade: job.attemptsMade || 0,
|
||||
};
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,6 +38,9 @@ export default class ClusterKeyAuthorization {
|
||||
} else if (req.headers && req.headers["clusterkey"]) {
|
||||
// Header keys are automatically transformed to lowercase
|
||||
clusterKey = req.headers["clusterkey"] as string;
|
||||
} else if (req.headers && req.headers["x-clusterkey"]) {
|
||||
// KEDA TriggerAuthentication sends headers with X- prefix
|
||||
clusterKey = req.headers["x-clusterkey"] as string;
|
||||
} else if (req.body && req.body.clusterKey) {
|
||||
clusterKey = req.body.clusterKey;
|
||||
} else {
|
||||
|
||||
@@ -137,7 +137,7 @@ export default class IncomingRequestCriteria {
|
||||
input.dataToProcess.monitorId.toString() +
|
||||
" is true",
|
||||
);
|
||||
return `Incoming request / heartbeat received in ${value} minutes.`;
|
||||
return `Incoming request / heartbeat received in ${value} minutes. It was received ${differenceInMinutes} minutes ago.`;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
@@ -153,7 +153,7 @@ export default class IncomingRequestCriteria {
|
||||
input.dataToProcess.monitorId.toString() +
|
||||
" is true",
|
||||
);
|
||||
return `Incoming request / heartbeat not received in ${value} minutes.`;
|
||||
return `Incoming request / heartbeat not received in ${value} minutes. It was received ${differenceInMinutes} minutes ago.`;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -228,6 +228,8 @@ export default class MonitorResourceUtil {
|
||||
await MonitorService.updateOneById({
|
||||
id: monitor.id!,
|
||||
data: {
|
||||
incomingRequestMonitorHeartbeatCheckedAt:
|
||||
OneUptimeDate.getCurrentDate(),
|
||||
incomingMonitorRequest: {
|
||||
...dataToProcess,
|
||||
} as any,
|
||||
|
||||
@@ -1,23 +1,17 @@
|
||||
import TelemetryIngest, {
|
||||
TelemetryRequest,
|
||||
} from "Common/Server/Middleware/TelemetryIngest";
|
||||
import OneUptimeDate from "Common/Types/Date";
|
||||
import { JSONObject } from "Common/Types/JSON";
|
||||
import ProductType from "Common/Types/MeteredPlan/ProductType";
|
||||
import LogService from "Common/Server/Services/LogService";
|
||||
import Express, {
|
||||
ExpressRequest,
|
||||
ExpressResponse,
|
||||
ExpressRouter,
|
||||
NextFunction,
|
||||
} from "Common/Server/Utils/Express";
|
||||
import logger from "Common/Server/Utils/Logger";
|
||||
import Response from "Common/Server/Utils/Response";
|
||||
import Log from "Common/Models/AnalyticsModels/Log";
|
||||
import LogSeverity from "Common/Types/Log/LogSeverity";
|
||||
import OTelIngestService from "Common/Server/Services/OpenTelemetryIngestService";
|
||||
import ObjectID from "Common/Types/ObjectID";
|
||||
import JSONFunctions from "Common/Types/JSONFunctions";
|
||||
import FluentIngestQueueService from "../Services/Queue/FluentIngestQueueService";
|
||||
import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
|
||||
import BadRequestException from "Common/Types/Exception/BadRequestException";
|
||||
|
||||
export class FluentRequestMiddleware {
|
||||
public static async getProductType(
|
||||
@@ -46,96 +40,107 @@ router.post(
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
logger.debug("Fluent ProbeIngest API called");
|
||||
|
||||
const dbLogs: Array<Log> = [];
|
||||
|
||||
let logItems: Array<JSONObject | string> | JSONObject = req.body as
|
||||
| Array<JSONObject | string>
|
||||
| JSONObject;
|
||||
|
||||
let oneuptimeServiceName: string | string[] | undefined =
|
||||
req.headers["x-oneuptime-service-name"];
|
||||
|
||||
if (!oneuptimeServiceName) {
|
||||
oneuptimeServiceName = "Unknown Service";
|
||||
if (!(req as TelemetryRequest).projectId) {
|
||||
throw new BadRequestException(
|
||||
"Invalid request - projectId not found in request.",
|
||||
);
|
||||
}
|
||||
|
||||
const telemetryService: {
|
||||
serviceId: ObjectID;
|
||||
dataRententionInDays: number;
|
||||
} = await OTelIngestService.telemetryServiceFromName({
|
||||
serviceName: oneuptimeServiceName as string,
|
||||
projectId: (req as TelemetryRequest).projectId,
|
||||
req.body = req.body.toJSON ? req.body.toJSON() : req.body;
|
||||
|
||||
// Return response immediately
|
||||
Response.sendEmptySuccessResponse(req, res);
|
||||
|
||||
// Add to queue for asynchronous processing
|
||||
await FluentIngestQueueService.addFluentIngestJob(
|
||||
req as TelemetryRequest,
|
||||
);
|
||||
|
||||
return;
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// Queue stats endpoint
|
||||
router.get(
|
||||
"/fluent/queue/stats",
|
||||
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
|
||||
async (
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
const stats: {
|
||||
waiting: number;
|
||||
active: number;
|
||||
completed: number;
|
||||
failed: number;
|
||||
delayed: number;
|
||||
total: number;
|
||||
} = await FluentIngestQueueService.getQueueStats();
|
||||
return Response.sendJsonObjectResponse(req, res, stats);
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// Queue size endpoint
|
||||
router.get(
|
||||
"/fluent/queue/size",
|
||||
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
|
||||
async (
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
const size: number = await FluentIngestQueueService.getQueueSize();
|
||||
return Response.sendJsonObjectResponse(req, res, { size });
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// Queue failed jobs endpoint
|
||||
router.get(
|
||||
"/fluent/queue/failed",
|
||||
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
|
||||
async (
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
// Parse pagination parameters from query string
|
||||
const start: number = parseInt(req.query["start"] as string) || 0;
|
||||
const end: number = parseInt(req.query["end"] as string) || 100;
|
||||
|
||||
const failedJobs: Array<{
|
||||
id: string;
|
||||
name: string;
|
||||
data: any;
|
||||
failedReason: string;
|
||||
processedOn: Date | null;
|
||||
finishedOn: Date | null;
|
||||
attemptsMade: number;
|
||||
}> = await FluentIngestQueueService.getFailedJobs({
|
||||
start,
|
||||
end,
|
||||
});
|
||||
|
||||
if (
|
||||
logItems &&
|
||||
typeof logItems === "object" &&
|
||||
(logItems as JSONObject)["json"]
|
||||
) {
|
||||
logItems = (logItems as JSONObject)["json"] as
|
||||
| Array<JSONObject | string>
|
||||
| JSONObject;
|
||||
}
|
||||
|
||||
if (!Array.isArray(logItems)) {
|
||||
logItems = [logItems];
|
||||
}
|
||||
|
||||
for (let logItem of logItems) {
|
||||
const dbLog: Log = new Log();
|
||||
|
||||
dbLog.projectId = (req as TelemetryRequest).projectId;
|
||||
dbLog.serviceId = telemetryService.serviceId;
|
||||
dbLog.severityNumber = 0;
|
||||
const currentTimeAndDate: Date = OneUptimeDate.getCurrentDate();
|
||||
dbLog.timeUnixNano = OneUptimeDate.toUnixNano(currentTimeAndDate);
|
||||
dbLog.time = currentTimeAndDate;
|
||||
|
||||
dbLog.severityText = LogSeverity.Unspecified;
|
||||
|
||||
if (typeof logItem === "string") {
|
||||
// check if its parseable to json
|
||||
try {
|
||||
logItem = JSON.parse(logItem);
|
||||
} catch {
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
|
||||
if (typeof logItem !== "string") {
|
||||
logItem = JSON.stringify(logItem);
|
||||
}
|
||||
|
||||
dbLog.body = logItem as string;
|
||||
|
||||
dbLogs.push(dbLog);
|
||||
}
|
||||
|
||||
await LogService.createMany({
|
||||
items: dbLogs,
|
||||
props: {
|
||||
isRoot: true,
|
||||
return Response.sendJsonObjectResponse(req, res, {
|
||||
failedJobs,
|
||||
pagination: {
|
||||
start,
|
||||
end,
|
||||
count: failedJobs.length,
|
||||
},
|
||||
});
|
||||
|
||||
OTelIngestService.recordDataIngestedUsgaeBilling({
|
||||
services: {
|
||||
[oneuptimeServiceName as string]: {
|
||||
dataIngestedInGB: JSONFunctions.getSizeOfJSONinGB(req.body),
|
||||
dataRententionInDays: telemetryService.dataRententionInDays,
|
||||
serviceId: telemetryService.serviceId,
|
||||
serviceName: oneuptimeServiceName as string,
|
||||
},
|
||||
},
|
||||
projectId: (req as TelemetryRequest).projectId,
|
||||
productType: ProductType.Logs,
|
||||
}).catch((err: Error) => {
|
||||
logger.error(err);
|
||||
});
|
||||
|
||||
return Response.sendEmptySuccessResponse(req, res);
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
|
||||
37
FluentIngest/API/Metrics.ts
Normal file
37
FluentIngest/API/Metrics.ts
Normal file
@@ -0,0 +1,37 @@
|
||||
import Express, {
|
||||
ExpressRequest,
|
||||
ExpressResponse,
|
||||
ExpressRouter,
|
||||
NextFunction,
|
||||
} from "Common/Server/Utils/Express";
|
||||
import FluentIngestQueueService from "../Services/Queue/FluentIngestQueueService";
|
||||
// import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
|
||||
|
||||
const router: ExpressRouter = Express.getRouter();
|
||||
|
||||
/**
|
||||
* JSON metrics endpoint for KEDA autoscaling
|
||||
* Returns queue size as JSON for KEDA metrics-api scaler
|
||||
*/
|
||||
router.get(
|
||||
"/metrics/queue-size",
|
||||
// ClusterKeyAuthorization.isAuthorizedServiceMiddleware, // Temporarily disabled for KEDA debugging
|
||||
async (
|
||||
_req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
const queueSize: number = await FluentIngestQueueService.getQueueSize();
|
||||
|
||||
res.setHeader("Content-Type", "application/json");
|
||||
res.status(200).json({
|
||||
queueSize: queueSize,
|
||||
});
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
export default router;
|
||||
@@ -1,4 +1,5 @@
|
||||
import FluentIngestAPI from "./API/FluentIngest";
|
||||
import MetricsAPI from "./API/Metrics";
|
||||
import { PromiseVoidFunction } from "Common/Types/FunctionTypes";
|
||||
import { ClickhouseAppInstance } from "Common/Server/Infrastructure/ClickhouseDatabase";
|
||||
import PostgresAppInstance from "Common/Server/Infrastructure/PostgresDatabase";
|
||||
@@ -9,12 +10,14 @@ import logger from "Common/Server/Utils/Logger";
|
||||
import Realtime from "Common/Server/Utils/Realtime";
|
||||
import App from "Common/Server/Utils/StartServer";
|
||||
import Telemetry from "Common/Server/Utils/Telemetry";
|
||||
import "./Jobs/FluentIngest/ProcessFluentIngest";
|
||||
|
||||
const app: ExpressApplication = Express.getExpressApp();
|
||||
|
||||
const APP_NAME: string = "fluent-ingest";
|
||||
|
||||
app.use([`/${APP_NAME}`, "/"], FluentIngestAPI);
|
||||
app.use([`/${APP_NAME}`, "/"], MetricsAPI);
|
||||
|
||||
const init: PromiseVoidFunction = async (): Promise<void> => {
|
||||
try {
|
||||
|
||||
141
FluentIngest/Jobs/FluentIngest/ProcessFluentIngest.ts
Normal file
141
FluentIngest/Jobs/FluentIngest/ProcessFluentIngest.ts
Normal file
@@ -0,0 +1,141 @@
|
||||
import { FluentIngestJobData } from "../../Services/Queue/FluentIngestQueueService";
|
||||
import logger from "Common/Server/Utils/Logger";
|
||||
import { QueueJob, QueueName } from "Common/Server/Infrastructure/Queue";
|
||||
import QueueWorker from "Common/Server/Infrastructure/QueueWorker";
|
||||
import ObjectID from "Common/Types/ObjectID";
|
||||
import OneUptimeDate from "Common/Types/Date";
|
||||
import { JSONObject } from "Common/Types/JSON";
|
||||
import ProductType from "Common/Types/MeteredPlan/ProductType";
|
||||
import LogService from "Common/Server/Services/LogService";
|
||||
import LogSeverity from "Common/Types/Log/LogSeverity";
|
||||
import OTelIngestService from "Common/Server/Services/OpenTelemetryIngestService";
|
||||
import JSONFunctions from "Common/Types/JSONFunctions";
|
||||
import Log from "Common/Models/AnalyticsModels/Log";
|
||||
|
||||
interface FluentIngestProcessData {
|
||||
projectId: ObjectID;
|
||||
requestBody: JSONObject;
|
||||
requestHeaders: JSONObject;
|
||||
}
|
||||
|
||||
// Set up the worker for processing fluent ingest queue
|
||||
QueueWorker.getWorker(
|
||||
QueueName.FluentIngest,
|
||||
async (job: QueueJob): Promise<void> => {
|
||||
logger.debug(`Processing fluent ingestion job: ${job.name}`);
|
||||
|
||||
try {
|
||||
const jobData: FluentIngestJobData = job.data as FluentIngestJobData;
|
||||
|
||||
// Pass job data directly to processing function
|
||||
await processFluentIngestFromQueue({
|
||||
projectId: new ObjectID(jobData.projectId),
|
||||
requestBody: jobData.requestBody,
|
||||
requestHeaders: jobData.requestHeaders,
|
||||
});
|
||||
|
||||
logger.debug(`Successfully processed fluent ingestion job: ${job.name}`);
|
||||
} catch (error) {
|
||||
logger.error(`Error processing fluent ingestion job:`);
|
||||
logger.error(error);
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
{ concurrency: 20 }, // Process up to 20 fluent ingest jobs concurrently
|
||||
);
|
||||
|
||||
async function processFluentIngestFromQueue(
|
||||
data: FluentIngestProcessData,
|
||||
): Promise<void> {
|
||||
const dbLogs: Array<Log> = [];
|
||||
|
||||
let logItems: Array<JSONObject | string> | JSONObject = data.requestBody as
|
||||
| Array<JSONObject | string>
|
||||
| JSONObject;
|
||||
|
||||
let oneuptimeServiceName: string | string[] | undefined = data.requestHeaders[
|
||||
"x-oneuptime-service-name"
|
||||
] as string | string[] | undefined;
|
||||
|
||||
if (!oneuptimeServiceName) {
|
||||
oneuptimeServiceName = "Unknown Service";
|
||||
}
|
||||
|
||||
const telemetryService: {
|
||||
serviceId: ObjectID;
|
||||
dataRententionInDays: number;
|
||||
} = await OTelIngestService.telemetryServiceFromName({
|
||||
serviceName: oneuptimeServiceName as string,
|
||||
projectId: data.projectId,
|
||||
});
|
||||
|
||||
if (
|
||||
logItems &&
|
||||
typeof logItems === "object" &&
|
||||
(logItems as JSONObject)["json"]
|
||||
) {
|
||||
logItems = (logItems as JSONObject)["json"] as
|
||||
| Array<JSONObject | string>
|
||||
| JSONObject;
|
||||
}
|
||||
|
||||
if (!Array.isArray(logItems)) {
|
||||
logItems = [logItems];
|
||||
}
|
||||
|
||||
for (let logItem of logItems) {
|
||||
const dbLog: Log = new Log();
|
||||
|
||||
dbLog.projectId = data.projectId;
|
||||
dbLog.serviceId = telemetryService.serviceId;
|
||||
dbLog.severityNumber = 0;
|
||||
const currentTimeAndDate: Date = OneUptimeDate.getCurrentDate();
|
||||
dbLog.timeUnixNano = OneUptimeDate.toUnixNano(currentTimeAndDate);
|
||||
dbLog.time = currentTimeAndDate;
|
||||
|
||||
dbLog.severityText = LogSeverity.Unspecified;
|
||||
|
||||
if (typeof logItem === "string") {
|
||||
// check if its parseable to json
|
||||
try {
|
||||
logItem = JSON.parse(logItem);
|
||||
} catch {
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
|
||||
if (typeof logItem !== "string") {
|
||||
logItem = JSON.stringify(logItem);
|
||||
}
|
||||
|
||||
dbLog.body = logItem as string;
|
||||
|
||||
dbLogs.push(dbLog);
|
||||
}
|
||||
|
||||
await LogService.createMany({
|
||||
items: dbLogs,
|
||||
props: {
|
||||
isRoot: true,
|
||||
},
|
||||
});
|
||||
|
||||
OTelIngestService.recordDataIngestedUsgaeBilling({
|
||||
services: {
|
||||
[oneuptimeServiceName as string]: {
|
||||
dataIngestedInGB: JSONFunctions.getSizeOfJSONinGB(
|
||||
data.requestBody as JSONObject,
|
||||
),
|
||||
dataRententionInDays: telemetryService.dataRententionInDays,
|
||||
serviceId: telemetryService.serviceId,
|
||||
serviceName: oneuptimeServiceName as string,
|
||||
},
|
||||
},
|
||||
projectId: data.projectId,
|
||||
productType: ProductType.Logs,
|
||||
}).catch((err: Error) => {
|
||||
logger.error(err);
|
||||
});
|
||||
}
|
||||
|
||||
logger.debug("Fluent ingest worker initialized");
|
||||
72
FluentIngest/Services/Queue/FluentIngestQueueService.ts
Normal file
72
FluentIngest/Services/Queue/FluentIngestQueueService.ts
Normal file
@@ -0,0 +1,72 @@
|
||||
import { TelemetryRequest } from "Common/Server/Middleware/TelemetryIngest";
|
||||
import Queue, { QueueName } from "Common/Server/Infrastructure/Queue";
|
||||
import { JSONObject } from "Common/Types/JSON";
|
||||
import OneUptimeDate from "Common/Types/Date";
|
||||
import logger from "Common/Server/Utils/Logger";
|
||||
|
||||
export interface FluentIngestJobData {
|
||||
projectId: string;
|
||||
requestBody: JSONObject;
|
||||
requestHeaders: Record<string, string>;
|
||||
ingestionTimestamp: Date;
|
||||
}
|
||||
|
||||
export default class FluentIngestQueueService {
|
||||
public static async addFluentIngestJob(req: TelemetryRequest): Promise<void> {
|
||||
try {
|
||||
const jobData: FluentIngestJobData = {
|
||||
projectId: req.projectId.toString(),
|
||||
requestBody: req.body,
|
||||
requestHeaders: req.headers as Record<string, string>,
|
||||
ingestionTimestamp: OneUptimeDate.getCurrentDate(),
|
||||
};
|
||||
|
||||
const jobId: string = `fluent-${req.projectId?.toString()}-${OneUptimeDate.getCurrentDateAsUnixNano()}`;
|
||||
|
||||
await Queue.addJob(
|
||||
QueueName.FluentIngest,
|
||||
jobId,
|
||||
"ProcessFluentIngest",
|
||||
jobData as unknown as JSONObject,
|
||||
);
|
||||
|
||||
logger.debug(`Added fluent ingestion job: ${jobId}`);
|
||||
} catch (error) {
|
||||
logger.error(`Error adding fluent ingestion job:`);
|
||||
logger.error(error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
public static async getQueueSize(): Promise<number> {
|
||||
return Queue.getQueueSize(QueueName.FluentIngest);
|
||||
}
|
||||
|
||||
public static async getQueueStats(): Promise<{
|
||||
waiting: number;
|
||||
active: number;
|
||||
completed: number;
|
||||
failed: number;
|
||||
delayed: number;
|
||||
total: number;
|
||||
}> {
|
||||
return Queue.getQueueStats(QueueName.FluentIngest);
|
||||
}
|
||||
|
||||
public static getFailedJobs(options?: {
|
||||
start?: number;
|
||||
end?: number;
|
||||
}): Promise<
|
||||
Array<{
|
||||
id: string;
|
||||
name: string;
|
||||
data: JSONObject;
|
||||
failedReason: string;
|
||||
processedOn: Date | null;
|
||||
finishedOn: Date | null;
|
||||
attemptsMade: number;
|
||||
}>
|
||||
> {
|
||||
return Queue.getFailedJobs(QueueName.FluentIngest, options);
|
||||
}
|
||||
}
|
||||
@@ -8,5 +8,8 @@ dependencies:
|
||||
- name: clickhouse
|
||||
repository: https://charts.bitnami.com/bitnami
|
||||
version: 9.0.0
|
||||
digest: sha256:fd5e473f86dd79be752e0eff131b1a90cd9a8cd2c6032c0d3d95ea43db833629
|
||||
generated: "2025-04-24T22:34:15.181176+01:00"
|
||||
- name: keda
|
||||
repository: https://kedacore.github.io/charts
|
||||
version: 2.17.2
|
||||
digest: sha256:6e4d02525abd4434e8481eedb6244d88ab9d8b800a6f1c543cba96cf5b1d2922
|
||||
generated: "2025-07-30T19:45:21.077234+01:00"
|
||||
|
||||
@@ -46,4 +46,8 @@ dependencies:
|
||||
version: "9.0.0"
|
||||
repository: "https://charts.bitnami.com/bitnami"
|
||||
condition: clickhouse.enabled
|
||||
- name: keda
|
||||
version: "2.17.2"
|
||||
repository: "https://kedacore.github.io/charts"
|
||||
condition: keda.enabled
|
||||
|
||||
|
||||
BIN
HelmChart/Public/oneuptime/charts/keda-2.17.2.tgz
Normal file
BIN
HelmChart/Public/oneuptime/charts/keda-2.17.2.tgz
Normal file
Binary file not shown.
@@ -559,9 +559,13 @@ spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app: {{ printf "%s-%s" $.Release.Name $.ServiceName }}
|
||||
{{- if $.ReplicaCount }}
|
||||
replicas: {{ $.ReplicaCount }}
|
||||
{{- else }}
|
||||
{{- if or (not $.Values.autoscaling.enabled) ($.DisableAutoscaler) }}
|
||||
replicas: {{ $.Values.deployment.replicaCount }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
@@ -695,3 +699,63 @@ spec:
|
||||
requests:
|
||||
storage: {{ $.Storage }}
|
||||
{{- end }}
|
||||
|
||||
|
||||
{{/*
|
||||
KEDA ScaledObject template for metric-based autoscaling
|
||||
Usage: include "oneuptime.kedaScaledObject" (dict "ServiceName" "service-name" "Release" .Release "Values" .Values "MetricsConfig" {...})
|
||||
*/}}
|
||||
{{- define "oneuptime.kedaScaledObject" }}
|
||||
{{- if and .Values.keda.enabled .MetricsConfig.enabled (not .DisableAutoscaler) }}
|
||||
apiVersion: keda.sh/v1alpha1
|
||||
kind: ScaledObject
|
||||
metadata:
|
||||
name: {{ printf "%s-%s-scaledobject" .Release.Name .ServiceName }}
|
||||
namespace: {{ .Release.Namespace }}
|
||||
labels:
|
||||
app: {{ printf "%s-%s" .Release.Name .ServiceName }}
|
||||
app.kubernetes.io/part-of: oneuptime
|
||||
app.kubernetes.io/managed-by: Helm
|
||||
appname: oneuptime
|
||||
spec:
|
||||
scaleTargetRef:
|
||||
name: {{ printf "%s-%s" .Release.Name .ServiceName }}
|
||||
minReplicaCount: {{ .MetricsConfig.minReplicas }}
|
||||
maxReplicaCount: {{ .MetricsConfig.maxReplicas }}
|
||||
pollingInterval: {{ .MetricsConfig.pollingInterval }}
|
||||
cooldownPeriod: {{ .MetricsConfig.cooldownPeriod }}
|
||||
triggers:
|
||||
{{- range .MetricsConfig.triggers }}
|
||||
- type: metrics-api
|
||||
metadata:
|
||||
targetValue: {{ .threshold | quote }}
|
||||
url: http://{{ printf "%s-%s" $.Release.Name $.ServiceName }}:{{ .port }}/metrics/queue-size
|
||||
valueLocation: 'queueSize'
|
||||
method: 'GET'
|
||||
# authenticationRef:
|
||||
# name: {{ printf "%s-%s-trigger-auth" $.Release.Name $.ServiceName }}
|
||||
{{- end }}
|
||||
---
|
||||
apiVersion: keda.sh/v1alpha1
|
||||
kind: TriggerAuthentication
|
||||
metadata:
|
||||
name: {{ printf "%s-%s-trigger-auth" .Release.Name .ServiceName }}
|
||||
namespace: {{ .Release.Namespace }}
|
||||
labels:
|
||||
app: {{ printf "%s-%s" .Release.Name .ServiceName }}
|
||||
app.kubernetes.io/part-of: oneuptime
|
||||
app.kubernetes.io/managed-by: Helm
|
||||
appname: oneuptime
|
||||
spec:
|
||||
secretTargetRef:
|
||||
{{- if .Values.externalSecrets.oneuptimeSecret.existingSecret.name }}
|
||||
- parameter: clusterkey
|
||||
name: {{ .Values.externalSecrets.oneuptimeSecret.existingSecret.name }}
|
||||
key: {{ .Values.externalSecrets.oneuptimeSecret.existingSecret.passwordKey }}
|
||||
{{- else }}
|
||||
- parameter: clusterkey
|
||||
name: {{ printf "%s-%s" .Release.Name "secrets" }}
|
||||
key: oneuptime-secret
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# OneUptime accounts Deployment
|
||||
{{- $accountsEnv := dict "PORT" $.Values.port.accounts "DISABLE_TELEMETRY" $.Values.accounts.disableTelemetryCollection -}}
|
||||
{{- $accountsPorts := dict "port" $.Values.port.accounts -}}
|
||||
{{- $accountsDeploymentArgs :=dict "IsUI" true "ServiceName" "accounts" "Ports" $accountsPorts "Release" $.Release "Values" $.Values "Env" $accountsEnv "Resources" $.Values.accounts.resources "DisableAutoscaler" $.Values.accounts.disableAutoscaler -}}
|
||||
{{- $accountsDeploymentArgs :=dict "IsUI" true "ServiceName" "accounts" "Ports" $accountsPorts "Release" $.Release "Values" $.Values "Env" $accountsEnv "Resources" $.Values.accounts.resources "DisableAutoscaler" $.Values.accounts.disableAutoscaler "ReplicaCount" $.Values.accounts.replicaCount -}}
|
||||
{{- include "oneuptime.deployment" $accountsDeploymentArgs }}
|
||||
---
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# OneUptime adminDashboard Deployment
|
||||
{{- $adminDashboardEnv := dict "PORT" $.Values.port.adminDashboard "DISABLE_TELEMETRY" $.Values.adminDashboard.disableTelemetryCollection -}}
|
||||
{{- $adminDashboardPorts := dict "port" $.Values.port.adminDashboard -}}
|
||||
{{- $adminDashboardDeploymentArgs :=dict "IsUI" true "ServiceName" "admin-dashboard" "Ports" $adminDashboardPorts "Release" $.Release "Values" $.Values "Env" $adminDashboardEnv "Resources" $.Values.adminDashboard.resources "DisableAutoscaler" $.Values.adminDashboard.disableAutoscaler -}}
|
||||
{{- $adminDashboardDeploymentArgs :=dict "IsUI" true "ServiceName" "admin-dashboard" "Ports" $adminDashboardPorts "Release" $.Release "Values" $.Values "Env" $adminDashboardEnv "Resources" $.Values.adminDashboard.resources "DisableAutoscaler" $.Values.adminDashboard.disableAutoscaler "ReplicaCount" $.Values.adminDashboard.replicaCount -}}
|
||||
{{- include "oneuptime.deployment" $adminDashboardDeploymentArgs }}
|
||||
---
|
||||
|
||||
|
||||
@@ -14,9 +14,13 @@ spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app: {{ printf "%s-%s" $.Release.Name "api-reference" }}
|
||||
{{- if $.Values.apiReference.replicaCount }}
|
||||
replicas: {{ $.Values.apiReference.replicaCount }}
|
||||
{{- else }}
|
||||
{{- if or (not $.Values.autoscaling.enabled) ($.Values.apiReference.disableAutoscaler) }}
|
||||
replicas: {{ $.Values.deployment.replicaCount }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
|
||||
@@ -14,9 +14,13 @@ spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app: {{ printf "%s-%s" $.Release.Name "app" }}
|
||||
{{- if $.Values.app.replicaCount }}
|
||||
replicas: {{ $.Values.app.replicaCount }}
|
||||
{{- else }}
|
||||
{{- if or (not $.Values.autoscaling.enabled) ($.Values.app.disableAutoscaler) }}
|
||||
replicas: {{ $.Values.deployment.replicaCount }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# OneUptime dashboard Deployment
|
||||
{{- $dashboardPorts := dict "port" $.Values.port.dashboard -}}
|
||||
{{- $dashboardEnv := dict "PORT" $.Values.port.dashboard "DISABLE_TELEMETRY" $.Values.dashboard.disableTelemetryCollection -}}
|
||||
{{- $dashboardDeploymentArgs :=dict "IsUI" true "ServiceName" "dashboard" "Ports" $dashboardPorts "Release" $.Release "Values" $.Values "Env" $dashboardEnv "Resources" $.Values.dashboard.resources "DisableAutoscaler" $.Values.dashboard.disableAutoscaler -}}
|
||||
{{- $dashboardDeploymentArgs :=dict "IsUI" true "ServiceName" "dashboard" "Ports" $dashboardPorts "Release" $.Release "Values" $.Values "Env" $dashboardEnv "Resources" $.Values.dashboard.resources "DisableAutoscaler" $.Values.dashboard.disableAutoscaler "ReplicaCount" $.Values.dashboard.replicaCount -}}
|
||||
{{- include "oneuptime.deployment" $dashboardDeploymentArgs }}
|
||||
---
|
||||
|
||||
|
||||
@@ -14,9 +14,13 @@ spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app: {{ printf "%s-%s" $.Release.Name "docs" }}
|
||||
{{- if $.Values.docs.replicaCount }}
|
||||
replicas: {{ $.Values.docs.replicaCount }}
|
||||
{{- else }}
|
||||
{{- if or (not $.Values.autoscaling.enabled) ($.Values.docs.disableAutoscaler) }}
|
||||
replicas: {{ $.Values.deployment.replicaCount }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
|
||||
@@ -15,8 +15,8 @@ spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app: {{ printf "%s-%s" $.Release.Name "fluent-ingest" }}
|
||||
{{- if $.Values.deployment.fluentIngest.replicaCount }}
|
||||
replicas: {{ $.Values.deployment.fluentIngest.replicaCount }}
|
||||
{{- if $.Values.fluentIngest.replicaCount }}
|
||||
replicas: {{ $.Values.fluentIngest.replicaCount }}
|
||||
{{- else }}
|
||||
{{- if or (not $.Values.autoscaling.enabled) ($.Values.fluentIngest.disableAutoscaler) }}
|
||||
replicas: {{ $.Values.deployment.replicaCount }}
|
||||
@@ -112,7 +112,7 @@ spec:
|
||||
---
|
||||
|
||||
# OneUptime fluent-ingest autoscaler
|
||||
{{- if not $.Values.fluentIngest.disableAutoscaler }}
|
||||
{{- if and (not $.Values.fluentIngest.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.fluentIngest.keda.enabled)) }}
|
||||
{{- $fluentIngestAutoScalerArgs := dict "ServiceName" "fluent-ingest" "Release" $.Release "Values" $.Values -}}
|
||||
{{- include "oneuptime.autoscaler" $fluentIngestAutoScalerArgs }}
|
||||
{{- end }}
|
||||
|
||||
@@ -14,9 +14,13 @@ spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app: {{ printf "%s-%s" $.Release.Name "home" }}
|
||||
{{- if $.Values.home.replicaCount }}
|
||||
replicas: {{ $.Values.home.replicaCount }}
|
||||
{{- else }}
|
||||
{{- if or (not $.Values.autoscaling.enabled) ($.Values.home.disableAutoscaler) }}
|
||||
replicas: {{ $.Values.deployment.replicaCount }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
|
||||
@@ -15,8 +15,8 @@ spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app: {{ printf "%s-%s" $.Release.Name "incoming-request-ingest" }}
|
||||
{{- if $.Values.deployment.incomingRequestIngest.replicaCount }}
|
||||
replicas: {{ $.Values.deployment.incomingRequestIngest.replicaCount }}
|
||||
{{- if $.Values.incomingRequestIngest.replicaCount }}
|
||||
replicas: {{ $.Values.incomingRequestIngest.replicaCount }}
|
||||
{{- else }}
|
||||
{{- if or (not $.Values.autoscaling.enabled) ($.Values.incomingRequestIngest.disableAutoscaler) }}
|
||||
replicas: {{ $.Values.deployment.replicaCount }}
|
||||
@@ -112,7 +112,7 @@ spec:
|
||||
---
|
||||
|
||||
# OneUptime incoming-request-ingest autoscaler
|
||||
{{- if not $.Values.incomingRequestIngest.disableAutoscaler }}
|
||||
{{- if and (not $.Values.incomingRequestIngest.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.incomingRequestIngest.keda.enabled)) }}
|
||||
{{- $incomingRequestIngestAutoScalerArgs := dict "ServiceName" "incoming-request-ingest" "Release" $.Release "Values" $.Values -}}
|
||||
{{- include "oneuptime.autoscaler" $incomingRequestIngestAutoScalerArgs }}
|
||||
{{- end }}
|
||||
|
||||
@@ -15,9 +15,13 @@ spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app: {{ printf "%s-%s" $.Release.Name "isolated-vm" }}
|
||||
{{- if $.Values.isolatedVM.replicaCount }}
|
||||
replicas: {{ $.Values.isolatedVM.replicaCount }}
|
||||
{{- else }}
|
||||
{{- if or (not $.Values.autoscaling.enabled) ($.Values.isolatedVM.disableAutoscaler) }}
|
||||
replicas: {{ $.Values.deployment.replicaCount }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
|
||||
48
HelmChart/Public/oneuptime/templates/keda-scaledobjects.yaml
Normal file
48
HelmChart/Public/oneuptime/templates/keda-scaledobjects.yaml
Normal file
@@ -0,0 +1,48 @@
|
||||
{{/*
|
||||
KEDA ScaledObjects for various services
|
||||
*/}}
|
||||
|
||||
{{/* OpenTelemetry Ingest KEDA ScaledObject */}}
|
||||
{{- if and .Values.keda.enabled .Values.openTelemetryIngest.keda.enabled (not .Values.openTelemetryIngest.disableAutoscaler) }}
|
||||
{{- $metricsConfig := dict "enabled" .Values.openTelemetryIngest.keda.enabled "minReplicas" .Values.openTelemetryIngest.keda.minReplicas "maxReplicas" .Values.openTelemetryIngest.keda.maxReplicas "pollingInterval" .Values.openTelemetryIngest.keda.pollingInterval "cooldownPeriod" .Values.openTelemetryIngest.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_telemetry_queue_size" "threshold" .Values.openTelemetryIngest.keda.queueSizeThreshold "port" .Values.port.openTelemetryIngest)) }}
|
||||
{{- $openTelemetryIngestKedaArgs := dict "ServiceName" "open-telemetry-ingest" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.openTelemetryIngest.disableAutoscaler }}
|
||||
{{- include "oneuptime.kedaScaledObject" $openTelemetryIngestKedaArgs }}
|
||||
{{- end }}
|
||||
|
||||
{{/* Fluent Ingest KEDA ScaledObject */}}
|
||||
{{- if and .Values.keda.enabled .Values.fluentIngest.keda.enabled (not .Values.fluentIngest.disableAutoscaler) }}
|
||||
{{- $metricsConfig := dict "enabled" .Values.fluentIngest.keda.enabled "minReplicas" .Values.fluentIngest.keda.minReplicas "maxReplicas" .Values.fluentIngest.keda.maxReplicas "pollingInterval" .Values.fluentIngest.keda.pollingInterval "cooldownPeriod" .Values.fluentIngest.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_fluent_ingest_queue_size" "threshold" .Values.fluentIngest.keda.queueSizeThreshold "port" .Values.port.fluentIngest)) }}
|
||||
{{- $fluentIngestKedaArgs := dict "ServiceName" "fluent-ingest" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.fluentIngest.disableAutoscaler }}
|
||||
{{- include "oneuptime.kedaScaledObject" $fluentIngestKedaArgs }}
|
||||
{{- end }}
|
||||
|
||||
{{/* Incoming Request Ingest KEDA ScaledObject */}}
|
||||
{{- if and .Values.keda.enabled .Values.incomingRequestIngest.keda.enabled (not .Values.incomingRequestIngest.disableAutoscaler) }}
|
||||
{{- $metricsConfig := dict "enabled" .Values.incomingRequestIngest.keda.enabled "minReplicas" .Values.incomingRequestIngest.keda.minReplicas "maxReplicas" .Values.incomingRequestIngest.keda.maxReplicas "pollingInterval" .Values.incomingRequestIngest.keda.pollingInterval "cooldownPeriod" .Values.incomingRequestIngest.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_incoming_request_ingest_queue_size" "threshold" .Values.incomingRequestIngest.keda.queueSizeThreshold "port" .Values.port.incomingRequestIngest)) }}
|
||||
{{- $incomingRequestIngestKedaArgs := dict "ServiceName" "incoming-request-ingest" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.incomingRequestIngest.disableAutoscaler }}
|
||||
{{- include "oneuptime.kedaScaledObject" $incomingRequestIngestKedaArgs }}
|
||||
{{- end }}
|
||||
|
||||
{{/* Server Monitor Ingest KEDA ScaledObject */}}
|
||||
{{- if and .Values.keda.enabled .Values.serverMonitorIngest.keda.enabled (not .Values.serverMonitorIngest.disableAutoscaler) }}
|
||||
{{- $metricsConfig := dict "enabled" .Values.serverMonitorIngest.keda.enabled "minReplicas" .Values.serverMonitorIngest.keda.minReplicas "maxReplicas" .Values.serverMonitorIngest.keda.maxReplicas "pollingInterval" .Values.serverMonitorIngest.keda.pollingInterval "cooldownPeriod" .Values.serverMonitorIngest.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_server_monitor_ingest_queue_size" "threshold" .Values.serverMonitorIngest.keda.queueSizeThreshold "port" .Values.port.serverMonitorIngest)) }}
|
||||
{{- $serverMonitorIngestKedaArgs := dict "ServiceName" "server-monitor-ingest" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.serverMonitorIngest.disableAutoscaler }}
|
||||
{{- include "oneuptime.kedaScaledObject" $serverMonitorIngestKedaArgs }}
|
||||
{{- end }}
|
||||
|
||||
{{/* Probe Ingest KEDA ScaledObject */}}
|
||||
{{- if and .Values.keda.enabled .Values.probeIngest.keda.enabled (not .Values.probeIngest.disableAutoscaler) }}
|
||||
{{- $metricsConfig := dict "enabled" .Values.probeIngest.keda.enabled "minReplicas" .Values.probeIngest.keda.minReplicas "maxReplicas" .Values.probeIngest.keda.maxReplicas "pollingInterval" .Values.probeIngest.keda.pollingInterval "cooldownPeriod" .Values.probeIngest.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_probe_ingest_queue_size" "threshold" .Values.probeIngest.keda.queueSizeThreshold "port" .Values.port.probeIngest)) }}
|
||||
{{- $probeIngestKedaArgs := dict "ServiceName" "probe-ingest" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.probeIngest.disableAutoscaler }}
|
||||
{{- include "oneuptime.kedaScaledObject" $probeIngestKedaArgs }}
|
||||
{{- end }}
|
||||
|
||||
{{/* Probe KEDA ScaledObjects - one for each probe configuration */}}
|
||||
{{- range $key, $val := $.Values.probes }}
|
||||
{{- if and $.Values.keda.enabled $val.keda.enabled (not $val.disableAutoscaler) }}
|
||||
{{- $serviceName := printf "probe-%s" $key }}
|
||||
{{- $metricsConfig := dict "enabled" $val.keda.enabled "minReplicas" $val.keda.minReplicas "maxReplicas" $val.keda.maxReplicas "pollingInterval" $val.keda.pollingInterval "cooldownPeriod" $val.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_probe_queue_size" "threshold" $val.keda.queueSizeThreshold "port" $.Values.port.probe)) }}
|
||||
{{- $probeKedaArgs := dict "ServiceName" $serviceName "Release" $.Release "Values" $.Values "MetricsConfig" $metricsConfig "DisableAutoscaler" $val.disableAutoscaler }}
|
||||
{{- include "oneuptime.kedaScaledObject" $probeKedaArgs }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
@@ -15,9 +15,13 @@ spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app: {{ printf "%s-%s" $.Release.Name "nginx" }}
|
||||
{{- if $.Values.nginx.replicaCount }}
|
||||
replicas: {{ $.Values.nginx.replicaCount }}
|
||||
{{- else }}
|
||||
{{- if or (not $.Values.autoscaling.enabled) ($.Values.nginx.disableAutoscaler) }}
|
||||
replicas: {{ $.Values.deployment.replicaCount }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
|
||||
@@ -15,8 +15,8 @@ spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app: {{ printf "%s-%s" $.Release.Name "open-telemetry-ingest" }}
|
||||
{{- if $.Values.deployment.openTelemetryIngest.replicaCount }}
|
||||
replicas: {{ $.Values.deployment.openTelemetryIngest.replicaCount }}
|
||||
{{- if $.Values.openTelemetryIngest.replicaCount }}
|
||||
replicas: {{ $.Values.openTelemetryIngest.replicaCount }}
|
||||
{{- else }}
|
||||
{{- if or (not $.Values.autoscaling.enabled) ($.Values.openTelemetryIngest.disableAutoscaler) }}
|
||||
replicas: {{ $.Values.deployment.replicaCount }}
|
||||
@@ -112,8 +112,8 @@ spec:
|
||||
---
|
||||
|
||||
# OneUptime open-telemetry-ingest autoscaler
|
||||
{{- if not $.Values.openTelemetryIngest.disableAutoscaler }}
|
||||
{{- if and (not $.Values.openTelemetryIngest.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.openTelemetryIngest.keda.enabled)) }}
|
||||
{{- $openTelemetryIngestAutoScalerArgs := dict "ServiceName" "open-telemetry-ingest" "Release" $.Release "Values" $.Values -}}
|
||||
{{- include "oneuptime.autoscaler" $openTelemetryIngestAutoScalerArgs }}
|
||||
{{- end }}
|
||||
---
|
||||
---
|
||||
@@ -15,8 +15,8 @@ spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app: {{ printf "%s-%s" $.Release.Name "otel-collector" }}
|
||||
{{- if $.Values.deployment.otelCollector.replicaCount }}
|
||||
replicas: {{ $.Values.deployment.otelCollector.replicaCount }}
|
||||
{{- if $.Values.openTelemetryCollector.replicaCount }}
|
||||
replicas: {{ $.Values.openTelemetryCollector.replicaCount }}
|
||||
{{- else }}
|
||||
{{- if or (not $.Values.autoscaling.enabled) ($.Values.openTelemetryCollector.disableAutoscaler) }}
|
||||
replicas: {{ $.Values.deployment.replicaCount }}
|
||||
|
||||
@@ -15,8 +15,8 @@ spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app: {{ printf "%s-%s" $.Release.Name "probe-ingest" }}
|
||||
{{- if $.Values.deployment.probeIngest.replicaCount }}
|
||||
replicas: {{ $.Values.deployment.probeIngest.replicaCount }}
|
||||
{{- if $.Values.probeIngest.replicaCount }}
|
||||
replicas: {{ $.Values.probeIngest.replicaCount }}
|
||||
{{- else }}
|
||||
{{- if or (not $.Values.autoscaling.enabled) ($.Values.probeIngest.disableAutoscaler) }}
|
||||
replicas: {{ $.Values.deployment.replicaCount }}
|
||||
@@ -112,7 +112,7 @@ spec:
|
||||
---
|
||||
|
||||
# OneUptime probe-ingest autoscaler
|
||||
{{- if not $.Values.probeIngest.disableAutoscaler }}
|
||||
{{- if and (not $.Values.probeIngest.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.probeIngest.keda.enabled)) }}
|
||||
{{- $probeIngestAutoScalerArgs := dict "ServiceName" "probe-ingest" "Release" $.Release "Values" $.Values -}}
|
||||
{{- include "oneuptime.autoscaler" $probeIngestAutoScalerArgs }}
|
||||
{{- end }}
|
||||
|
||||
@@ -12,8 +12,16 @@ stringData:
|
||||
## This is a workaround to keep the secrets unchanged
|
||||
{{- if .Release.IsUpgrade }}
|
||||
|
||||
{{- if .Values.oneuptimeSecret }}
|
||||
oneuptime-secret: {{ .Values.oneuptimeSecret | quote }}
|
||||
{{- else }}
|
||||
oneuptime-secret: {{ index (lookup "v1" "Secret" $.Release.Namespace (printf "%s-secrets" $.Release.Name)).data "oneuptime-secret" | b64dec }}
|
||||
{{- end }}
|
||||
{{- if .Values.encryptionSecret }}
|
||||
encryption-secret: {{ .Values.encryptionSecret | quote }}
|
||||
{{- else }}
|
||||
encryption-secret: {{ index (lookup "v1" "Secret" $.Release.Namespace (printf "%s-secrets" $.Release.Name)).data "encryption-secret" | b64dec }}
|
||||
{{- end }}
|
||||
|
||||
{{- range $key, $val := $.Values.probes }}
|
||||
{{- if (index (lookup "v1" "Secret" $.Release.Namespace (printf "%s-secrets" $.Release.Name)).data (printf "probe-%s" $key)) }}
|
||||
@@ -25,8 +33,16 @@ stringData:
|
||||
|
||||
{{ else }} # install operation
|
||||
|
||||
{{- if .Values.oneuptimeSecret }}
|
||||
oneuptime-secret: {{ .Values.oneuptimeSecret | quote }}
|
||||
{{- else }}
|
||||
oneuptime-secret: {{ randAlphaNum 32 | quote }}
|
||||
{{- end }}
|
||||
{{- if .Values.encryptionSecret }}
|
||||
encryption-secret: {{ .Values.encryptionSecret | quote }}
|
||||
{{- else }}
|
||||
encryption-secret: {{ randAlphaNum 32 | quote }}
|
||||
{{- end }}
|
||||
|
||||
{{- range $key, $val := $.Values.probes }}
|
||||
{{printf "probe-%s" $key}}: {{ randAlphaNum 32 | quote }}
|
||||
|
||||
@@ -15,8 +15,8 @@ spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app: {{ printf "%s-%s" $.Release.Name "server-monitor-ingest" }}
|
||||
{{- if $.Values.deployment.serverMonitorIngest.replicaCount }}
|
||||
replicas: {{ $.Values.deployment.serverMonitorIngest.replicaCount }}
|
||||
{{- if $.Values.serverMonitorIngest.replicaCount }}
|
||||
replicas: {{ $.Values.serverMonitorIngest.replicaCount }}
|
||||
{{- else }}
|
||||
{{- if or (not $.Values.autoscaling.enabled) ($.Values.serverMonitorIngest.disableAutoscaler) }}
|
||||
replicas: {{ $.Values.deployment.replicaCount }}
|
||||
@@ -112,7 +112,7 @@ spec:
|
||||
---
|
||||
|
||||
# OneUptime server-monitor-ingest autoscaler
|
||||
{{- if not $.Values.serverMonitorIngest.disableAutoscaler }}
|
||||
{{- if and (not $.Values.serverMonitorIngest.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.serverMonitorIngest.keda.enabled)) }}
|
||||
{{- $serverMonitorIngestAutoScalerArgs := dict "ServiceName" "server-monitor-ingest" "Release" $.Release "Values" $.Values -}}
|
||||
{{- include "oneuptime.autoscaler" $serverMonitorIngestAutoScalerArgs }}
|
||||
{{- end }}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# OneUptime statusPage Deployment
|
||||
{{- $statusPagePorts := dict "port" $.Values.port.statusPage -}}
|
||||
{{- $statusPageEnv := dict "PORT" $.Values.port.statusPage "DISABLE_TELEMETRY" $.Values.statusPage.disableTelemetryCollection -}}
|
||||
{{- $statusPageDeploymentArgs :=dict "IsUI" true "ServiceName" "status-page" "Ports" $statusPagePorts "Release" $.Release "Values" $.Values "Env" $statusPageEnv "Resources" $.Values.statusPage.resources "DisableAutoscaler" $.Values.statusPage.disableAutoscaler -}}
|
||||
{{- $statusPageDeploymentArgs :=dict "IsUI" true "ServiceName" "status-page" "Ports" $statusPagePorts "Release" $.Release "Values" $.Values "Env" $statusPageEnv "Resources" $.Values.statusPage.resources "DisableAutoscaler" $.Values.statusPage.disableAutoscaler "ReplicaCount" $.Values.statusPage.replicaCount -}}
|
||||
{{- include "oneuptime.deployment" $statusPageDeploymentArgs }}
|
||||
---
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
# OneUptime testServer Deployment
|
||||
{{- $testServerPorts := dict "port" $.Values.port.testServer -}}
|
||||
{{- $testServerEnv := dict "PORT" $.Values.port.testServer "DISABLE_TELEMETRY" $.Values.testServer.disableTelemetryCollection -}}
|
||||
{{- $testServerDeploymentArgs :=dict "IsUI" true "ServiceName" "test-server" "Ports" $testServerPorts "Release" $.Release "Values" $.Values "Env" $testServerEnv "Resources" $.Values.testServer.resources "DisableAutoscaler" $.Values.testServer.disableAutoscaler -}}
|
||||
{{- $testServerDeploymentArgs :=dict "IsUI" true "ServiceName" "test-server" "Ports" $testServerPorts "Release" $.Release "Values" $.Values "Env" $testServerEnv "Resources" $.Values.testServer.resources "DisableAutoscaler" $.Values.testServer.disableAutoscaler "ReplicaCount" $.Values.testServer.replicaCount -}}
|
||||
{{- include "oneuptime.deployment" $testServerDeploymentArgs }}
|
||||
---
|
||||
|
||||
|
||||
@@ -14,9 +14,13 @@ spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app: {{ printf "%s-%s" $.Release.Name "worker" }}
|
||||
{{- if $.Values.worker.replicaCount }}
|
||||
replicas: {{ $.Values.worker.replicaCount }}
|
||||
{{- else }}
|
||||
{{- if or (not $.Values.autoscaling.enabled) ($.Values.worker.disableAutoscaler) }}
|
||||
replicas: {{ $.Values.deployment.replicaCount }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
|
||||
@@ -14,9 +14,13 @@ spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app: {{ printf "%s-%s" $.Release.Name "workflow" }}
|
||||
{{- if $.Values.workflow.replicaCount }}
|
||||
replicas: {{ $.Values.workflow.replicaCount }}
|
||||
{{- else }}
|
||||
{{- if or (not $.Values.autoscaling.enabled) ($.Values.workflow.disableAutoscaler) }}
|
||||
replicas: {{ $.Values.deployment.replicaCount }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
|
||||
@@ -28,19 +28,7 @@ fluentdHost:
|
||||
|
||||
deployment:
|
||||
# Default replica count for all deployments
|
||||
replicaCount: 1
|
||||
probeIngest:
|
||||
replicaCount:
|
||||
serverMonitorIngest:
|
||||
replicaCount:
|
||||
openTelemetryIngest:
|
||||
replicaCount:
|
||||
fluentIngest:
|
||||
replicaCount:
|
||||
incomingRequestIngest:
|
||||
replicaCount:
|
||||
otelCollector:
|
||||
replicaCount:
|
||||
replicaCount: 1
|
||||
|
||||
metalLb:
|
||||
enabled: false
|
||||
@@ -50,6 +38,7 @@ metalLb:
|
||||
# - 51.158.55.153/32 # List of IP addresses of all the servers in the cluster.
|
||||
|
||||
nginx:
|
||||
replicaCount: 1
|
||||
disableTelemetryCollection: false
|
||||
disableAutoscaler: false
|
||||
listenAddress: ""
|
||||
@@ -191,6 +180,7 @@ alerts:
|
||||
# 2. Set the statusPage.cnameRecord to "oneuptime.yourcompany.com"
|
||||
# 3. Create CNAME record in your DNS provider with the name "status.yourcompany.com" and value "oneuptime.yourcompany.com"
|
||||
statusPage:
|
||||
replicaCount: 1
|
||||
cnameRecord:
|
||||
disableTelemetryCollection: false
|
||||
disableAutoscaler: false
|
||||
@@ -208,6 +198,17 @@ probes:
|
||||
customCodeMonitorScriptTimeoutInMs: 60000
|
||||
disableTelemetryCollection: false
|
||||
disableAutoscaler: false
|
||||
# KEDA autoscaling configuration based on monitor queue metrics
|
||||
keda:
|
||||
enabled: false
|
||||
minReplicas: 1
|
||||
maxReplicas: 100
|
||||
# Scale up when queue size exceeds this threshold per probe
|
||||
queueSizeThreshold: 10
|
||||
# Polling interval for metrics (in seconds)
|
||||
pollingInterval: 30
|
||||
# Cooldown period after scaling (in seconds)
|
||||
cooldownPeriod: 300
|
||||
# resources:
|
||||
# additionalContainers:
|
||||
# two:
|
||||
@@ -223,10 +224,22 @@ probes:
|
||||
# disableAutoscaler: false
|
||||
# resources:
|
||||
# additionalContainers:
|
||||
# KEDA autoscaling configuration based on monitor queue metrics
|
||||
# keda:
|
||||
# enabled: false
|
||||
# minReplicas: 1
|
||||
# maxReplicas: 100
|
||||
# # Scale up when queue size exceeds this threshold per probe
|
||||
# queueSizeThreshold: 10
|
||||
# # Polling interval for metrics (in seconds)
|
||||
# pollingInterval: 30
|
||||
# # Cooldown period after scaling (in seconds)
|
||||
# cooldownPeriod: 300
|
||||
|
||||
|
||||
port:
|
||||
app: 3002
|
||||
probe: 3874
|
||||
probeIngest: 3400
|
||||
serverMonitorIngest: 3404
|
||||
openTelemetryIngest: 3403
|
||||
@@ -251,6 +264,7 @@ port:
|
||||
docs: 1447
|
||||
|
||||
testServer:
|
||||
replicaCount: 1
|
||||
enabled: false
|
||||
disableTelemetryCollection: false
|
||||
disableAutoscaler: false
|
||||
@@ -431,6 +445,7 @@ readinessProbe: # Readiness probe configuration
|
||||
|
||||
# OpenTelemetry Collector Configuration
|
||||
openTelemetryCollector:
|
||||
replicaCount: 1
|
||||
disableTelemetryCollection: false
|
||||
disableAutoscaler: false
|
||||
sendingQueue:
|
||||
@@ -439,83 +454,157 @@ openTelemetryCollector:
|
||||
numConsumers: 3
|
||||
|
||||
accounts:
|
||||
replicaCount: 1
|
||||
disableTelemetryCollection: false
|
||||
disableAutoscaler: false
|
||||
resources:
|
||||
|
||||
home:
|
||||
replicaCount: 1
|
||||
disableTelemetryCollection: false
|
||||
disableAutoscaler: false
|
||||
resources:
|
||||
|
||||
dashboard:
|
||||
replicaCount: 1
|
||||
disableTelemetryCollection: false
|
||||
disableAutoscaler: false
|
||||
resources:
|
||||
|
||||
adminDashboard:
|
||||
replicaCount: 1
|
||||
disableTelemetryCollection: false
|
||||
disableAutoscaler: false
|
||||
resources:
|
||||
|
||||
worker:
|
||||
replicaCount: 1
|
||||
disableTelemetryCollection: false
|
||||
disableAutoscaler: false
|
||||
resources:
|
||||
|
||||
workflow:
|
||||
replicaCount: 1
|
||||
disableTelemetryCollection: false
|
||||
disableAutoscaler: false
|
||||
workflowTimeoutInMs: 5000
|
||||
resources:
|
||||
|
||||
apiReference:
|
||||
replicaCount: 1
|
||||
disableTelemetryCollection: false
|
||||
disableAutoscaler: false
|
||||
resources:
|
||||
|
||||
docs:
|
||||
replicaCount: 1
|
||||
disableTelemetryCollection: false
|
||||
disableAutoscaler: false
|
||||
resources:
|
||||
|
||||
app:
|
||||
replicaCount: 1
|
||||
disableTelemetryCollection: false
|
||||
disableAutoscaler: false
|
||||
resources:
|
||||
|
||||
probeIngest:
|
||||
replicaCount: 1
|
||||
disableTelemetryCollection: false
|
||||
disableAutoscaler: false
|
||||
resources:
|
||||
# KEDA autoscaling configuration based on queue metrics
|
||||
keda:
|
||||
enabled: false
|
||||
minReplicas: 1
|
||||
maxReplicas: 100
|
||||
# Scale up when queue size exceeds this threshold
|
||||
queueSizeThreshold: 100
|
||||
# Polling interval for metrics (in seconds)
|
||||
pollingInterval: 30
|
||||
# Cooldown period after scaling (in seconds)
|
||||
cooldownPeriod: 300
|
||||
|
||||
openTelemetryIngest:
|
||||
replicaCount: 1
|
||||
disableTelemetryCollection: false
|
||||
disableAutoscaler: false
|
||||
resources:
|
||||
# KEDA autoscaling configuration based on queue metrics
|
||||
keda:
|
||||
enabled: false
|
||||
minReplicas: 1
|
||||
maxReplicas: 100
|
||||
# Scale up when queue size exceeds this threshold
|
||||
queueSizeThreshold: 100
|
||||
# Polling interval for metrics (in seconds)
|
||||
pollingInterval: 30
|
||||
# Cooldown period after scaling (in seconds)
|
||||
cooldownPeriod: 300
|
||||
|
||||
fluentIngest:
|
||||
replicaCount: 1
|
||||
disableTelemetryCollection: false
|
||||
disableAutoscaler: false
|
||||
resources:
|
||||
# KEDA autoscaling configuration based on queue metrics
|
||||
keda:
|
||||
enabled: false
|
||||
minReplicas: 1
|
||||
maxReplicas: 100
|
||||
# Scale up when queue size exceeds this threshold
|
||||
queueSizeThreshold: 100
|
||||
# Polling interval for metrics (in seconds)
|
||||
pollingInterval: 30
|
||||
# Cooldown period after scaling (in seconds)
|
||||
cooldownPeriod: 300
|
||||
|
||||
incomingRequestIngest:
|
||||
replicaCount: 1
|
||||
disableTelemetryCollection: false
|
||||
disableAutoscaler: false
|
||||
resources:
|
||||
# KEDA autoscaling configuration based on queue metrics
|
||||
keda:
|
||||
enabled: false
|
||||
minReplicas: 1
|
||||
maxReplicas: 100
|
||||
# Scale up when queue size exceeds this threshold
|
||||
queueSizeThreshold: 100
|
||||
# Polling interval for metrics (in seconds)
|
||||
pollingInterval: 30
|
||||
# Cooldown period after scaling (in seconds)
|
||||
cooldownPeriod: 300
|
||||
|
||||
isolatedVM:
|
||||
replicaCount: 1
|
||||
disableTelemetryCollection: false
|
||||
disableAutoscaler: false
|
||||
resources:
|
||||
|
||||
serverMonitorIngest:
|
||||
replicaCount: 1
|
||||
disableTelemetryCollection: false
|
||||
disableAutoscaler: false
|
||||
resources:
|
||||
# KEDA autoscaling configuration based on queue metrics
|
||||
keda:
|
||||
enabled: false
|
||||
minReplicas: 1
|
||||
maxReplicas: 100
|
||||
# Scale up when queue size exceeds this threshold
|
||||
queueSizeThreshold: 100
|
||||
# Polling interval for metrics (in seconds)
|
||||
pollingInterval: 30
|
||||
# Cooldown period after scaling (in seconds)
|
||||
cooldownPeriod: 300
|
||||
|
||||
|
||||
slackApp:
|
||||
clientId:
|
||||
clientSecret:
|
||||
signingSecret:
|
||||
signingSecret:
|
||||
|
||||
|
||||
keda:
|
||||
enabled: true
|
||||
@@ -1,12 +1,6 @@
|
||||
import HTTPMethod from "Common/Types/API/HTTPMethod";
|
||||
import OneUptimeDate from "Common/Types/Date";
|
||||
import Dictionary from "Common/Types/Dictionary";
|
||||
import BadDataException from "Common/Types/Exception/BadDataException";
|
||||
import { JSONObject } from "Common/Types/JSON";
|
||||
import IncomingMonitorRequest from "Common/Types/Monitor/IncomingMonitor/IncomingMonitorRequest";
|
||||
import MonitorType from "Common/Types/Monitor/MonitorType";
|
||||
import ObjectID from "Common/Types/ObjectID";
|
||||
import MonitorService from "Common/Server/Services/MonitorService";
|
||||
import Express, {
|
||||
ExpressRequest,
|
||||
ExpressResponse,
|
||||
@@ -14,10 +8,9 @@ import Express, {
|
||||
NextFunction,
|
||||
RequestHandler,
|
||||
} from "Common/Server/Utils/Express";
|
||||
import MonitorResourceUtil from "Common/Server/Utils/Monitor/MonitorResource";
|
||||
import Response from "Common/Server/Utils/Response";
|
||||
import Monitor from "Common/Models/DatabaseModels/Monitor";
|
||||
import logger from "Common/Server/Utils/Logger";
|
||||
import IncomingRequestIngestQueueService from "../Services/Queue/IncomingRequestIngestQueueService";
|
||||
import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
|
||||
|
||||
const router: ExpressRouter = Express.getRouter();
|
||||
|
||||
@@ -38,63 +31,18 @@ const processIncomingRequest: RequestHandler = async (
|
||||
throw new BadDataException("Invalid Secret Key");
|
||||
}
|
||||
|
||||
const isGetRequest: boolean = req.method === "GET";
|
||||
const isPostRequest: boolean = req.method === "POST";
|
||||
// Return response immediately
|
||||
Response.sendEmptySuccessResponse(req, res);
|
||||
|
||||
let httpMethod: HTTPMethod = HTTPMethod.GET;
|
||||
|
||||
if (isGetRequest) {
|
||||
httpMethod = HTTPMethod.GET;
|
||||
}
|
||||
|
||||
if (isPostRequest) {
|
||||
httpMethod = HTTPMethod.POST;
|
||||
}
|
||||
|
||||
const monitor: Monitor | null = await MonitorService.findOneBy({
|
||||
query: {
|
||||
incomingRequestSecretKey: new ObjectID(monitorSecretKeyAsString),
|
||||
monitorType: MonitorType.IncomingRequest,
|
||||
},
|
||||
select: {
|
||||
_id: true,
|
||||
projectId: true,
|
||||
},
|
||||
props: {
|
||||
isRoot: true,
|
||||
},
|
||||
});
|
||||
|
||||
if (!monitor || !monitor._id) {
|
||||
throw new BadDataException("Monitor not found");
|
||||
}
|
||||
|
||||
if (!monitor.projectId) {
|
||||
throw new BadDataException("Project not found");
|
||||
}
|
||||
|
||||
const now: Date = OneUptimeDate.getCurrentDate();
|
||||
|
||||
const incomingRequest: IncomingMonitorRequest = {
|
||||
projectId: monitor.projectId,
|
||||
monitorId: new ObjectID(monitor._id.toString()),
|
||||
// Add to queue for asynchronous processing
|
||||
await IncomingRequestIngestQueueService.addIncomingRequestIngestJob({
|
||||
secretKey: monitorSecretKeyAsString,
|
||||
requestHeaders: requestHeaders,
|
||||
requestBody: requestBody,
|
||||
incomingRequestReceivedAt: now,
|
||||
onlyCheckForIncomingRequestReceivedAt: false,
|
||||
requestMethod: httpMethod,
|
||||
checkedAt: now,
|
||||
};
|
||||
|
||||
// process probe response here.
|
||||
MonitorResourceUtil.monitorResource(incomingRequest).catch((err: Error) => {
|
||||
// do nothing.
|
||||
// we don't want to throw error here.
|
||||
// we just want to log the error.
|
||||
logger.error(err);
|
||||
requestMethod: req.method,
|
||||
});
|
||||
|
||||
return Response.sendEmptySuccessResponse(req, res);
|
||||
return;
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
@@ -122,4 +70,89 @@ router.get(
|
||||
},
|
||||
);
|
||||
|
||||
// Queue stats endpoint
|
||||
router.get(
|
||||
"/incoming-request/queue/stats",
|
||||
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
|
||||
async (
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
const stats: {
|
||||
waiting: number;
|
||||
active: number;
|
||||
completed: number;
|
||||
failed: number;
|
||||
delayed: number;
|
||||
total: number;
|
||||
} = await IncomingRequestIngestQueueService.getQueueStats();
|
||||
return Response.sendJsonObjectResponse(req, res, stats);
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// Queue size endpoint
|
||||
router.get(
|
||||
"/incoming-request/queue/size",
|
||||
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
|
||||
async (
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
const size: number =
|
||||
await IncomingRequestIngestQueueService.getQueueSize();
|
||||
return Response.sendJsonObjectResponse(req, res, { size });
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// Queue failed jobs endpoint
|
||||
router.get(
|
||||
"/incoming-request/queue/failed",
|
||||
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
|
||||
async (
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
// Parse pagination parameters from query string
|
||||
const start: number = parseInt(req.query["start"] as string) || 0;
|
||||
const end: number = parseInt(req.query["end"] as string) || 100;
|
||||
|
||||
const failedJobs: Array<{
|
||||
id: string;
|
||||
name: string;
|
||||
data: any;
|
||||
failedReason: string;
|
||||
processedOn: Date | null;
|
||||
finishedOn: Date | null;
|
||||
attemptsMade: number;
|
||||
}> = await IncomingRequestIngestQueueService.getFailedJobs({
|
||||
start,
|
||||
end,
|
||||
});
|
||||
|
||||
return Response.sendJsonObjectResponse(req, res, {
|
||||
failedJobs,
|
||||
pagination: {
|
||||
start,
|
||||
end,
|
||||
count: failedJobs.length,
|
||||
},
|
||||
});
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
export default router;
|
||||
|
||||
38
IncomingRequestIngest/API/Metrics.ts
Normal file
38
IncomingRequestIngest/API/Metrics.ts
Normal file
@@ -0,0 +1,38 @@
|
||||
import Express, {
|
||||
ExpressRequest,
|
||||
ExpressResponse,
|
||||
ExpressRouter,
|
||||
NextFunction,
|
||||
} from "Common/Server/Utils/Express";
|
||||
import IncomingRequestIngestQueueService from "../Services/Queue/IncomingRequestIngestQueueService";
|
||||
// import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
|
||||
|
||||
const router: ExpressRouter = Express.getRouter();
|
||||
|
||||
/**
|
||||
* JSON metrics endpoint for KEDA autoscaling
|
||||
* Returns queue size as JSON for KEDA metrics-api scaler
|
||||
*/
|
||||
router.get(
|
||||
"/metrics/queue-size",
|
||||
// ClusterKeyAuthorization.isAuthorizedServiceMiddleware, // Temporarily disabled for KEDA debugging
|
||||
async (
|
||||
_req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
const queueSize: number =
|
||||
await IncomingRequestIngestQueueService.getQueueSize();
|
||||
|
||||
res.setHeader("Content-Type", "application/json");
|
||||
res.status(200).json({
|
||||
queueSize: queueSize,
|
||||
});
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
export default router;
|
||||
@@ -1,4 +1,5 @@
|
||||
import IncomingRequestAPI from "./API/IncomingRequest";
|
||||
import MetricsAPI from "./API/Metrics";
|
||||
import { PromiseVoidFunction } from "Common/Types/FunctionTypes";
|
||||
import { ClickhouseAppInstance } from "Common/Server/Infrastructure/ClickhouseDatabase";
|
||||
import PostgresAppInstance from "Common/Server/Infrastructure/PostgresDatabase";
|
||||
@@ -9,6 +10,7 @@ import logger from "Common/Server/Utils/Logger";
|
||||
import Realtime from "Common/Server/Utils/Realtime";
|
||||
import App from "Common/Server/Utils/StartServer";
|
||||
import Telemetry from "Common/Server/Utils/Telemetry";
|
||||
import "./Jobs/IncomingRequestIngest/ProcessIncomingRequestIngest";
|
||||
import "ejs";
|
||||
|
||||
const app: ExpressApplication = Express.getExpressApp();
|
||||
@@ -16,6 +18,7 @@ const app: ExpressApplication = Express.getExpressApp();
|
||||
const APP_NAME: string = "incoming-request-ingest";
|
||||
|
||||
app.use([`/${APP_NAME}`, "/"], IncomingRequestAPI);
|
||||
app.use([`/${APP_NAME}`, "/"], MetricsAPI);
|
||||
|
||||
const init: PromiseVoidFunction = async (): Promise<void> => {
|
||||
try {
|
||||
|
||||
@@ -0,0 +1,104 @@
|
||||
import { IncomingRequestIngestJobData } from "../../Services/Queue/IncomingRequestIngestQueueService";
|
||||
import logger from "Common/Server/Utils/Logger";
|
||||
import { QueueJob, QueueName } from "Common/Server/Infrastructure/Queue";
|
||||
import QueueWorker from "Common/Server/Infrastructure/QueueWorker";
|
||||
import HTTPMethod from "Common/Types/API/HTTPMethod";
|
||||
import OneUptimeDate from "Common/Types/Date";
|
||||
import Dictionary from "Common/Types/Dictionary";
|
||||
import BadDataException from "Common/Types/Exception/BadDataException";
|
||||
import { JSONObject } from "Common/Types/JSON";
|
||||
import IncomingMonitorRequest from "Common/Types/Monitor/IncomingMonitor/IncomingMonitorRequest";
|
||||
import MonitorType from "Common/Types/Monitor/MonitorType";
|
||||
import ObjectID from "Common/Types/ObjectID";
|
||||
import MonitorService from "Common/Server/Services/MonitorService";
|
||||
import MonitorResourceUtil from "Common/Server/Utils/Monitor/MonitorResource";
|
||||
import Monitor from "Common/Models/DatabaseModels/Monitor";
|
||||
|
||||
// Set up the worker for processing incoming request ingest queue
|
||||
QueueWorker.getWorker(
|
||||
QueueName.IncomingRequestIngest,
|
||||
async (job: QueueJob): Promise<void> => {
|
||||
logger.debug(`Processing incoming request ingestion job: ${job.name}`);
|
||||
|
||||
try {
|
||||
const jobData: IncomingRequestIngestJobData =
|
||||
job.data as IncomingRequestIngestJobData;
|
||||
|
||||
await processIncomingRequestFromQueue(jobData);
|
||||
|
||||
logger.debug(
|
||||
`Successfully processed incoming request ingestion job: ${job.name}`,
|
||||
);
|
||||
} catch (error) {
|
||||
logger.error(`Error processing incoming request ingestion job:`);
|
||||
logger.error(error);
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
{ concurrency: 20 }, // Process up to 20 incoming request ingest jobs concurrently
|
||||
);
|
||||
|
||||
async function processIncomingRequestFromQueue(
|
||||
jobData: IncomingRequestIngestJobData,
|
||||
): Promise<void> {
|
||||
const requestHeaders: Dictionary<string> = jobData.requestHeaders;
|
||||
const requestBody: string | JSONObject = jobData.requestBody;
|
||||
const monitorSecretKeyAsString: string = jobData.secretKey;
|
||||
|
||||
if (!monitorSecretKeyAsString) {
|
||||
throw new BadDataException("Invalid Secret Key");
|
||||
}
|
||||
|
||||
const isGetRequest: boolean = jobData.requestMethod === "GET";
|
||||
const isPostRequest: boolean = jobData.requestMethod === "POST";
|
||||
|
||||
let httpMethod: HTTPMethod = HTTPMethod.GET;
|
||||
|
||||
if (isGetRequest) {
|
||||
httpMethod = HTTPMethod.GET;
|
||||
}
|
||||
|
||||
if (isPostRequest) {
|
||||
httpMethod = HTTPMethod.POST;
|
||||
}
|
||||
|
||||
const monitor: Monitor | null = await MonitorService.findOneBy({
|
||||
query: {
|
||||
incomingRequestSecretKey: new ObjectID(monitorSecretKeyAsString),
|
||||
monitorType: MonitorType.IncomingRequest,
|
||||
},
|
||||
select: {
|
||||
_id: true,
|
||||
projectId: true,
|
||||
},
|
||||
props: {
|
||||
isRoot: true,
|
||||
},
|
||||
});
|
||||
|
||||
if (!monitor || !monitor._id) {
|
||||
throw new BadDataException("Monitor not found");
|
||||
}
|
||||
|
||||
if (!monitor.projectId) {
|
||||
throw new BadDataException("Project not found");
|
||||
}
|
||||
|
||||
const now: Date = OneUptimeDate.getCurrentDate();
|
||||
|
||||
const incomingRequest: IncomingMonitorRequest = {
|
||||
projectId: monitor.projectId,
|
||||
monitorId: new ObjectID(monitor._id.toString()),
|
||||
requestHeaders: requestHeaders,
|
||||
requestBody: requestBody,
|
||||
incomingRequestReceivedAt: now,
|
||||
onlyCheckForIncomingRequestReceivedAt: false,
|
||||
requestMethod: httpMethod,
|
||||
checkedAt: now,
|
||||
};
|
||||
|
||||
// process probe response here.
|
||||
await MonitorResourceUtil.monitorResource(incomingRequest);
|
||||
}
|
||||
|
||||
logger.debug("Incoming request ingest worker initialized");
|
||||
@@ -0,0 +1,79 @@
|
||||
import Queue, { QueueName } from "Common/Server/Infrastructure/Queue";
|
||||
import { JSONObject } from "Common/Types/JSON";
|
||||
import OneUptimeDate from "Common/Types/Date";
|
||||
import logger from "Common/Server/Utils/Logger";
|
||||
import Dictionary from "Common/Types/Dictionary";
|
||||
|
||||
export interface IncomingRequestIngestJobData {
|
||||
secretKey: string;
|
||||
requestHeaders: Dictionary<string>;
|
||||
requestBody: string | JSONObject;
|
||||
requestMethod: string;
|
||||
ingestionTimestamp: Date;
|
||||
}
|
||||
|
||||
export default class IncomingRequestIngestQueueService {
|
||||
public static async addIncomingRequestIngestJob(data: {
|
||||
secretKey: string;
|
||||
requestHeaders: Dictionary<string>;
|
||||
requestBody: string | JSONObject;
|
||||
requestMethod: string;
|
||||
}): Promise<void> {
|
||||
try {
|
||||
const jobData: IncomingRequestIngestJobData = {
|
||||
secretKey: data.secretKey,
|
||||
requestHeaders: data.requestHeaders,
|
||||
requestBody: data.requestBody,
|
||||
requestMethod: data.requestMethod,
|
||||
ingestionTimestamp: OneUptimeDate.getCurrentDate(),
|
||||
};
|
||||
|
||||
const jobId: string = `incoming-request-${data.secretKey}-${OneUptimeDate.getCurrentDateAsUnixNano()}`;
|
||||
|
||||
await Queue.addJob(
|
||||
QueueName.IncomingRequestIngest,
|
||||
jobId,
|
||||
"ProcessIncomingRequestIngest",
|
||||
jobData as unknown as JSONObject,
|
||||
);
|
||||
|
||||
logger.debug(`Added incoming request ingestion job: ${jobId}`);
|
||||
} catch (error) {
|
||||
logger.error(`Error adding incoming request ingestion job:`);
|
||||
logger.error(error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
public static async getQueueSize(): Promise<number> {
|
||||
return Queue.getQueueSize(QueueName.IncomingRequestIngest);
|
||||
}
|
||||
|
||||
public static async getQueueStats(): Promise<{
|
||||
waiting: number;
|
||||
active: number;
|
||||
completed: number;
|
||||
failed: number;
|
||||
delayed: number;
|
||||
total: number;
|
||||
}> {
|
||||
return Queue.getQueueStats(QueueName.IncomingRequestIngest);
|
||||
}
|
||||
|
||||
public static getFailedJobs(options?: {
|
||||
start?: number;
|
||||
end?: number;
|
||||
}): Promise<
|
||||
Array<{
|
||||
id: string;
|
||||
name: string;
|
||||
data: JSONObject;
|
||||
failedReason: string;
|
||||
processedOn: Date | null;
|
||||
finishedOn: Date | null;
|
||||
attemptsMade: number;
|
||||
}>
|
||||
> {
|
||||
return Queue.getFailedJobs(QueueName.IncomingRequestIngest, options);
|
||||
}
|
||||
}
|
||||
@@ -42,6 +42,7 @@ WORKDIR /usr/src/app
|
||||
|
||||
# Install app dependencies
|
||||
COPY ./MCP/package*.json /usr/src/app/
|
||||
RUN npm update @oneuptime/common
|
||||
RUN npm install
|
||||
COPY ./MCP /usr/src/app
|
||||
|
||||
|
||||
5
MCP/package-lock.json
generated
5
MCP/package-lock.json
generated
@@ -1262,7 +1262,9 @@
|
||||
}
|
||||
},
|
||||
"node_modules/@oneuptime/common": {
|
||||
"version": "7.0.4773",
|
||||
"version": "7.0.4800",
|
||||
"resolved": "https://registry.npmjs.org/@oneuptime/common/-/common-7.0.4800.tgz",
|
||||
"integrity": "sha512-v+2F3aW85IOFVwUzyYPCX4g26SE5N2NBGuf649QpGuJpwAhw6A3uSaX8PMjrHBtn76EKU3BEgg1GrXZL1NPsrw==",
|
||||
"license": "Apache-2.0",
|
||||
"dependencies": {
|
||||
"@asteasolutions/zod-to-openapi": "^7.3.2",
|
||||
@@ -1315,7 +1317,6 @@
|
||||
"json5": "^2.2.3",
|
||||
"jsonwebtoken": "^9.0.0",
|
||||
"jwt-decode": "^4.0.0",
|
||||
"lodash": "^4.17.21",
|
||||
"marked": "^12.0.2",
|
||||
"moment": "^2.30.1",
|
||||
"moment-timezone": "^0.5.45",
|
||||
|
||||
37
OpenTelemetryIngest/API/Metrics.ts
Normal file
37
OpenTelemetryIngest/API/Metrics.ts
Normal file
@@ -0,0 +1,37 @@
|
||||
import Express, {
|
||||
ExpressRequest,
|
||||
ExpressResponse,
|
||||
ExpressRouter,
|
||||
NextFunction,
|
||||
} from "Common/Server/Utils/Express";
|
||||
import TelemetryQueueService from "../Services/Queue/TelemetryQueueService";
|
||||
// import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
|
||||
|
||||
const router: ExpressRouter = Express.getRouter();
|
||||
|
||||
/**
|
||||
* JSON metrics endpoint for KEDA autoscaling
|
||||
* Returns queue size as JSON for KEDA metrics-api scaler
|
||||
*/
|
||||
router.get(
|
||||
"/metrics/queue-size",
|
||||
// ClusterKeyAuthorization.isAuthorizedServiceMiddleware, // Temporarily disabled for KEDA debugging
|
||||
async (
|
||||
_req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
const queueSize: number = await TelemetryQueueService.getQueueSize();
|
||||
|
||||
res.setHeader("Content-Type", "application/json");
|
||||
res.status(200).json({
|
||||
queueSize: queueSize,
|
||||
});
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
export default router;
|
||||
@@ -26,10 +26,10 @@ router.post(
|
||||
async (
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
return OtelIngestService.ingestTraces(req, res, next);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
router.post(
|
||||
@@ -39,10 +39,10 @@ router.post(
|
||||
async (
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
return OtelIngestService.ingestMetrics(req, res, next);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
router.post(
|
||||
@@ -52,28 +52,35 @@ router.post(
|
||||
async (
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
return OtelIngestService.ingestLogs(req, res, next);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// Queue stats endpoint
|
||||
router.get(
|
||||
"/otlp/queue/stats",
|
||||
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
|
||||
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
|
||||
async (
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
const stats = await TelemetryQueueService.getQueueStats();
|
||||
const stats: {
|
||||
waiting: number;
|
||||
active: number;
|
||||
completed: number;
|
||||
failed: number;
|
||||
delayed: number;
|
||||
total: number;
|
||||
} = await TelemetryQueueService.getQueueStats();
|
||||
return Response.sendJsonObjectResponse(req, res, stats);
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// Queue size endpoint
|
||||
@@ -83,15 +90,56 @@ router.get(
|
||||
async (
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
const size = await TelemetryQueueService.getQueueSize();
|
||||
const size: number = await TelemetryQueueService.getQueueSize();
|
||||
return Response.sendJsonObjectResponse(req, res, { size });
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// Queue failed jobs endpoint
|
||||
router.get(
|
||||
"/otlp/queue/failed",
|
||||
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
|
||||
async (
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
// Parse pagination parameters from query string
|
||||
const start: number = parseInt(req.query["start"] as string) || 0;
|
||||
const end: number = parseInt(req.query["end"] as string) || 100;
|
||||
|
||||
const failedJobs: Array<{
|
||||
id: string;
|
||||
name: string;
|
||||
data: any;
|
||||
failedReason: string;
|
||||
processedOn: Date | null;
|
||||
finishedOn: Date | null;
|
||||
attemptsMade: number;
|
||||
}> = await TelemetryQueueService.getFailedJobs({
|
||||
start,
|
||||
end,
|
||||
});
|
||||
|
||||
return Response.sendJsonObjectResponse(req, res, {
|
||||
failedJobs,
|
||||
pagination: {
|
||||
start,
|
||||
end,
|
||||
count: failedJobs.length,
|
||||
},
|
||||
});
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
export default router;
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import OTelIngestAPI from "./API/OTelIngest";
|
||||
import MetricsAPI from "./API/Metrics";
|
||||
import { PromiseVoidFunction } from "Common/Types/FunctionTypes";
|
||||
import { ClickhouseAppInstance } from "Common/Server/Infrastructure/ClickhouseDatabase";
|
||||
import PostgresAppInstance from "Common/Server/Infrastructure/PostgresDatabase";
|
||||
@@ -9,7 +10,7 @@ import logger from "Common/Server/Utils/Logger";
|
||||
import Realtime from "Common/Server/Utils/Realtime";
|
||||
import App from "Common/Server/Utils/StartServer";
|
||||
import Telemetry from "Common/Server/Utils/Telemetry";
|
||||
import ProcessTelemetryWorker from "./Jobs/TelemetryIngest/ProcessTelemetry";
|
||||
import "./Jobs/TelemetryIngest/ProcessTelemetry";
|
||||
import "ejs";
|
||||
|
||||
const app: ExpressApplication = Express.getExpressApp();
|
||||
@@ -17,6 +18,7 @@ const app: ExpressApplication = Express.getExpressApp();
|
||||
const APP_NAME: string = "open-telemetry-ingest";
|
||||
|
||||
app.use([`/${APP_NAME}`, "/"], OTelIngestAPI);
|
||||
app.use([`/${APP_NAME}`, "/"], MetricsAPI);
|
||||
|
||||
const init: PromiseVoidFunction = async (): Promise<void> => {
|
||||
try {
|
||||
@@ -55,10 +57,6 @@ const init: PromiseVoidFunction = async (): Promise<void> => {
|
||||
|
||||
await Realtime.init();
|
||||
|
||||
// Initialize telemetry processing worker
|
||||
logger.debug("Initializing telemetry processing worker...");
|
||||
logger.debug(`Telemetry worker initialized: ${ProcessTelemetryWorker ? 'success' : 'failed'}`);
|
||||
|
||||
// add default routes
|
||||
await App.addDefaultRoutes();
|
||||
} catch (err) {
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
import { TelemetryIngestJobData, TelemetryType } from "../../Services/Queue/TelemetryQueueService";
|
||||
import {
|
||||
TelemetryIngestJobData,
|
||||
TelemetryType,
|
||||
} from "../../Services/Queue/TelemetryQueueService";
|
||||
import OtelIngestService from "../../Services/OtelIngest";
|
||||
import { TelemetryRequest } from "Common/Server/Middleware/TelemetryIngest";
|
||||
import logger from "Common/Server/Utils/Logger";
|
||||
@@ -7,16 +10,17 @@ import QueueWorker from "Common/Server/Infrastructure/QueueWorker";
|
||||
import ObjectID from "Common/Types/ObjectID";
|
||||
|
||||
// Set up the unified worker for processing telemetry queue
|
||||
const worker = QueueWorker.getWorker(
|
||||
QueueWorker.getWorker(
|
||||
QueueName.Telemetry,
|
||||
async (job: QueueJob): Promise<void> => {
|
||||
logger.debug(`Processing telemetry ingestion job: ${job.name}`);
|
||||
|
||||
|
||||
try {
|
||||
const jobData = job.data as TelemetryIngestJobData;
|
||||
|
||||
const jobData: TelemetryIngestJobData =
|
||||
job.data as TelemetryIngestJobData;
|
||||
|
||||
// Create a mock request object with the queued data
|
||||
const mockRequest = {
|
||||
const mockRequest: TelemetryRequest = {
|
||||
projectId: new ObjectID(jobData.projectId.toString()),
|
||||
body: jobData.requestBody,
|
||||
headers: jobData.requestHeaders,
|
||||
@@ -26,19 +30,25 @@ const worker = QueueWorker.getWorker(
|
||||
switch (jobData.type) {
|
||||
case TelemetryType.Logs:
|
||||
await OtelIngestService.processLogsFromQueue(mockRequest);
|
||||
logger.debug(`Successfully processed logs for project: ${jobData.projectId}`);
|
||||
logger.debug(
|
||||
`Successfully processed logs for project: ${jobData.projectId}`,
|
||||
);
|
||||
break;
|
||||
|
||||
|
||||
case TelemetryType.Traces:
|
||||
await OtelIngestService.processTracesFromQueue(mockRequest);
|
||||
logger.debug(`Successfully processed traces for project: ${jobData.projectId}`);
|
||||
logger.debug(
|
||||
`Successfully processed traces for project: ${jobData.projectId}`,
|
||||
);
|
||||
break;
|
||||
|
||||
|
||||
case TelemetryType.Metrics:
|
||||
await OtelIngestService.processMetricsFromQueue(mockRequest);
|
||||
logger.debug(`Successfully processed metrics for project: ${jobData.projectId}`);
|
||||
logger.debug(
|
||||
`Successfully processed metrics for project: ${jobData.projectId}`,
|
||||
);
|
||||
break;
|
||||
|
||||
|
||||
default:
|
||||
throw new Error(`Unknown telemetry type: ${jobData.type}`);
|
||||
}
|
||||
@@ -52,5 +62,3 @@ const worker = QueueWorker.getWorker(
|
||||
);
|
||||
|
||||
logger.debug("Unified telemetry worker initialized");
|
||||
|
||||
export default worker;
|
||||
|
||||
@@ -115,10 +115,8 @@ export default class OtelIngestService {
|
||||
// Return response immediately
|
||||
Response.sendEmptySuccessResponse(req, res);
|
||||
|
||||
|
||||
// Add to queue for asynchronous processing
|
||||
await LogsQueueService.addLogIngestJob(req as TelemetryRequest);
|
||||
|
||||
// Add to queue for asynchronous processing
|
||||
await LogsQueueService.addLogIngestJob(req as TelemetryRequest);
|
||||
|
||||
return;
|
||||
} catch (err) {
|
||||
@@ -134,14 +132,18 @@ export default class OtelIngestService {
|
||||
}
|
||||
|
||||
@CaptureSpan()
|
||||
public static async processTracesFromQueue(req: ExpressRequest): Promise<void> {
|
||||
public static async processTracesFromQueue(
|
||||
req: ExpressRequest,
|
||||
): Promise<void> {
|
||||
// This method is specifically for queue processing
|
||||
// It bypasses the response handling since the response was already sent
|
||||
await this.processTracesAsync(req);
|
||||
}
|
||||
|
||||
@CaptureSpan()
|
||||
public static async processMetricsFromQueue(req: ExpressRequest): Promise<void> {
|
||||
public static async processMetricsFromQueue(
|
||||
req: ExpressRequest,
|
||||
): Promise<void> {
|
||||
// This method is specifically for queue processing
|
||||
// It bypasses the response handling since the response was already sent
|
||||
await this.processMetricsAsync(req);
|
||||
@@ -358,10 +360,9 @@ export default class OtelIngestService {
|
||||
// Return response immediately
|
||||
Response.sendEmptySuccessResponse(req, res);
|
||||
|
||||
|
||||
// Add to queue for asynchronous processing
|
||||
await MetricsQueueService.addMetricIngestJob(req as TelemetryRequest);
|
||||
|
||||
// Add to queue for asynchronous processing
|
||||
await MetricsQueueService.addMetricIngestJob(req as TelemetryRequest);
|
||||
|
||||
return;
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
@@ -613,10 +614,8 @@ export default class OtelIngestService {
|
||||
// Return response immediately
|
||||
Response.sendEmptySuccessResponse(req, res);
|
||||
|
||||
|
||||
// Add to queue for asynchronous processing
|
||||
await TracesQueueService.addTraceIngestJob(req as TelemetryRequest);
|
||||
|
||||
// Add to queue for asynchronous processing
|
||||
await TracesQueueService.addTraceIngestJob(req as TelemetryRequest);
|
||||
|
||||
return;
|
||||
} catch (err) {
|
||||
|
||||
@@ -1,15 +1,13 @@
|
||||
import { TelemetryRequest } from "Common/Server/Middleware/TelemetryIngest";
|
||||
import TelemetryQueueService, {
|
||||
LogsIngestJobData
|
||||
import TelemetryQueueService, {
|
||||
LogsIngestJobData,
|
||||
} from "./TelemetryQueueService";
|
||||
|
||||
// Export the interface for backward compatibility
|
||||
export { LogsIngestJobData };
|
||||
|
||||
export default class LogsQueueService {
|
||||
public static async addLogIngestJob(
|
||||
req: TelemetryRequest,
|
||||
): Promise<void> {
|
||||
public static async addLogIngestJob(req: TelemetryRequest): Promise<void> {
|
||||
return TelemetryQueueService.addLogIngestJob(req);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,15 +1,13 @@
|
||||
import { TelemetryRequest } from "Common/Server/Middleware/TelemetryIngest";
|
||||
import TelemetryQueueService, {
|
||||
MetricsIngestJobData
|
||||
import TelemetryQueueService, {
|
||||
MetricsIngestJobData,
|
||||
} from "./TelemetryQueueService";
|
||||
|
||||
// Export the interface for backward compatibility
|
||||
export { MetricsIngestJobData };
|
||||
|
||||
export default class MetricsQueueService {
|
||||
public static async addMetricIngestJob(
|
||||
req: TelemetryRequest,
|
||||
): Promise<void> {
|
||||
public static async addMetricIngestJob(req: TelemetryRequest): Promise<void> {
|
||||
return TelemetryQueueService.addMetricIngestJob(req);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,7 +45,7 @@ export default class TelemetryQueueService {
|
||||
ingestionTimestamp: OneUptimeDate.getCurrentDate(),
|
||||
};
|
||||
|
||||
const jobId = `${type}-${req.projectId?.toString()}-${OneUptimeDate.getCurrentDateAsUnixNano()}`;
|
||||
const jobId: string = `${type}-${req.projectId?.toString()}-${OneUptimeDate.getCurrentDateAsUnixNano()}`;
|
||||
|
||||
await Queue.addJob(
|
||||
QueueName.Telemetry,
|
||||
@@ -88,4 +88,21 @@ export default class TelemetryQueueService {
|
||||
}> {
|
||||
return Queue.getQueueStats(QueueName.Telemetry);
|
||||
}
|
||||
|
||||
public static getFailedJobs(options?: {
|
||||
start?: number;
|
||||
end?: number;
|
||||
}): Promise<
|
||||
Array<{
|
||||
id: string;
|
||||
name: string;
|
||||
data: JSONObject;
|
||||
failedReason: string;
|
||||
processedOn: Date | null;
|
||||
finishedOn: Date | null;
|
||||
attemptsMade: number;
|
||||
}>
|
||||
> {
|
||||
return Queue.getFailedJobs(QueueName.Telemetry, options);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,15 +1,13 @@
|
||||
import { TelemetryRequest } from "Common/Server/Middleware/TelemetryIngest";
|
||||
import TelemetryQueueService, {
|
||||
TracesIngestJobData
|
||||
import TelemetryQueueService, {
|
||||
TracesIngestJobData,
|
||||
} from "./TelemetryQueueService";
|
||||
|
||||
// Export the interface for backward compatibility
|
||||
export { TracesIngestJobData };
|
||||
|
||||
export default class TracesQueueService {
|
||||
public static async addTraceIngestJob(
|
||||
req: TelemetryRequest,
|
||||
): Promise<void> {
|
||||
public static async addTraceIngestJob(req: TelemetryRequest): Promise<void> {
|
||||
return TelemetryQueueService.addTraceIngestJob(req);
|
||||
}
|
||||
}
|
||||
|
||||
70
Probe/API/Metrics.ts
Normal file
70
Probe/API/Metrics.ts
Normal file
@@ -0,0 +1,70 @@
|
||||
import Express, {
|
||||
ExpressRequest,
|
||||
ExpressResponse,
|
||||
ExpressRouter,
|
||||
NextFunction,
|
||||
} from "Common/Server/Utils/Express";
|
||||
import Response from "Common/Server/Utils/Response";
|
||||
import { PROBE_INGEST_URL } from "../Config";
|
||||
import HTTPErrorResponse from "Common/Types/API/HTTPErrorResponse";
|
||||
import HTTPMethod from "Common/Types/API/HTTPMethod";
|
||||
import HTTPResponse from "Common/Types/API/HTTPResponse";
|
||||
import URL from "Common/Types/API/URL";
|
||||
import { JSONObject } from "Common/Types/JSON";
|
||||
import API from "Common/Utils/API";
|
||||
import logger from "Common/Server/Utils/Logger";
|
||||
import ProbeAPIRequest from "../Utils/ProbeAPIRequest";
|
||||
|
||||
const router: ExpressRouter = Express.getRouter();
|
||||
|
||||
// Metrics endpoint for Keda autoscaling
|
||||
router.get(
|
||||
"/queue-size",
|
||||
async (
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
// Get the pending monitor count for this specific probe from ProbeIngest API
|
||||
const queueSizeUrl: URL = URL.fromString(
|
||||
PROBE_INGEST_URL.toString(),
|
||||
).addRoute("/metrics/queue-size");
|
||||
|
||||
logger.debug("Fetching queue size from ProbeIngest API");
|
||||
|
||||
// Use probe authentication (probe key and probe ID)
|
||||
const requestBody: JSONObject = ProbeAPIRequest.getDefaultRequestBody();
|
||||
|
||||
const result: HTTPResponse<JSONObject> | HTTPErrorResponse =
|
||||
await API.fetch<JSONObject>(
|
||||
HTTPMethod.POST,
|
||||
queueSizeUrl,
|
||||
requestBody,
|
||||
{}
|
||||
);
|
||||
|
||||
if (result instanceof HTTPErrorResponse) {
|
||||
logger.error("Error fetching queue size from ProbeIngest API");
|
||||
logger.error(result);
|
||||
throw result;
|
||||
}
|
||||
|
||||
// Extract queueSize from the response
|
||||
const queueSize = result.data["queueSize"] || 0;
|
||||
|
||||
logger.debug(`Queue size fetched: ${queueSize}`);
|
||||
|
||||
return Response.sendJsonObjectResponse(req, res, {
|
||||
queueSize: queueSize,
|
||||
});
|
||||
|
||||
} catch (err) {
|
||||
logger.error("Error in metrics queue-size endpoint");
|
||||
logger.error(err);
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
export default router;
|
||||
@@ -3,10 +3,12 @@ import AliveJob from "./Jobs/Alive";
|
||||
import FetchMonitorList from "./Jobs/Monitor/FetchList";
|
||||
import FetchMonitorTestList from "./Jobs/Monitor/FetchMonitorTest";
|
||||
import Register from "./Services/Register";
|
||||
import MetricsAPI from "./API/Metrics";
|
||||
import { PromiseVoidFunction } from "Common/Types/FunctionTypes";
|
||||
import logger from "Common/Server/Utils/Logger";
|
||||
import App from "Common/Server/Utils/StartServer";
|
||||
import Telemetry from "Common/Server/Utils/Telemetry";
|
||||
import Express, { ExpressApplication } from "Common/Server/Utils/Express";
|
||||
import "ejs";
|
||||
|
||||
const APP_NAME: string = "probe";
|
||||
@@ -29,6 +31,10 @@ const init: PromiseVoidFunction = async (): Promise<void> => {
|
||||
},
|
||||
});
|
||||
|
||||
// Add metrics API routes
|
||||
const app: ExpressApplication = Express.getExpressApp();
|
||||
app.use("/metrics", MetricsAPI);
|
||||
|
||||
// add default routes
|
||||
await App.addDefaultRoutes();
|
||||
|
||||
|
||||
37
ProbeIngest/API/Metrics.ts
Normal file
37
ProbeIngest/API/Metrics.ts
Normal file
@@ -0,0 +1,37 @@
|
||||
import Express, {
|
||||
ExpressRequest,
|
||||
ExpressResponse,
|
||||
ExpressRouter,
|
||||
NextFunction,
|
||||
} from "Common/Server/Utils/Express";
|
||||
import ProbeIngestQueueService from "../Services/Queue/ProbeIngestQueueService";
|
||||
// import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
|
||||
|
||||
const router: ExpressRouter = Express.getRouter();
|
||||
|
||||
/**
|
||||
* JSON metrics endpoint for KEDA autoscaling
|
||||
* Returns queue size as JSON for KEDA metrics-api scaler
|
||||
*/
|
||||
router.get(
|
||||
"/metrics/queue-size",
|
||||
// ClusterKeyAuthorization.isAuthorizedServiceMiddleware, // Temporarily disabled for KEDA debugging
|
||||
async (
|
||||
_req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
const queueSize: number = await ProbeIngestQueueService.getQueueSize();
|
||||
|
||||
res.setHeader("Content-Type", "application/json");
|
||||
res.status(200).json({
|
||||
queueSize: queueSize,
|
||||
});
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
export default router;
|
||||
@@ -18,13 +18,17 @@ import Express, {
|
||||
NextFunction,
|
||||
} from "Common/Server/Utils/Express";
|
||||
import logger from "Common/Server/Utils/Logger";
|
||||
import MonitorResourceUtil from "Common/Server/Utils/Monitor/MonitorResource";
|
||||
import Response from "Common/Server/Utils/Response";
|
||||
import GlobalConfig from "Common/Models/DatabaseModels/GlobalConfig";
|
||||
import Probe from "Common/Models/DatabaseModels/Probe";
|
||||
import User from "Common/Models/DatabaseModels/User";
|
||||
import MonitorTestService from "Common/Server/Services/MonitorTestService";
|
||||
import ProbeIngestQueueService from "../Services/Queue/ProbeIngestQueueService";
|
||||
import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
|
||||
import PositiveNumber from "Common/Types/PositiveNumber";
|
||||
import MonitorProbeService from "Common/Server/Services/MonitorProbeService";
|
||||
import QueryHelper from "Common/Server/Types/Database/QueryHelper";
|
||||
import OneUptimeDate from "Common/Types/Date";
|
||||
import MonitorService from "Common/Server/Services/MonitorService";
|
||||
|
||||
const router: ExpressRouter = Express.getRouter();
|
||||
|
||||
@@ -255,17 +259,18 @@ router.post(
|
||||
);
|
||||
}
|
||||
|
||||
// this is when the resource was ingested.
|
||||
probeResponse.ingestedAt = OneUptimeDate.getCurrentDate();
|
||||
|
||||
MonitorResourceUtil.monitorResource(probeResponse).catch((err: Error) => {
|
||||
logger.error("Error in monitor resource");
|
||||
logger.error(err);
|
||||
});
|
||||
|
||||
return Response.sendJsonObjectResponse(req, res, {
|
||||
// Return response immediately
|
||||
Response.sendJsonObjectResponse(req, res, {
|
||||
result: "processing",
|
||||
});
|
||||
|
||||
// Add to queue for asynchronous processing
|
||||
await ProbeIngestQueueService.addProbeIngestJob({
|
||||
probeMonitorResponse: req.body,
|
||||
jobType: "probe-response",
|
||||
});
|
||||
|
||||
return;
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
@@ -303,28 +308,155 @@ router.post(
|
||||
);
|
||||
}
|
||||
|
||||
probeResponse.ingestedAt = OneUptimeDate.getCurrentDate();
|
||||
// Return response immediately
|
||||
Response.sendEmptySuccessResponse(req, res);
|
||||
|
||||
// save the probe response to the monitor test.
|
||||
// Add to queue for asynchronous processing
|
||||
await ProbeIngestQueueService.addProbeIngestJob({
|
||||
probeMonitorResponse: req.body,
|
||||
jobType: "monitor-test",
|
||||
testId: testId.toString(),
|
||||
});
|
||||
|
||||
await MonitorTestService.updateOneById({
|
||||
id: testId,
|
||||
data: {
|
||||
monitorStepProbeResponse: {
|
||||
[probeResponse.monitorStepId.toString()]: {
|
||||
...JSON.parse(JSON.stringify(probeResponse)),
|
||||
monitoredAt: OneUptimeDate.getCurrentDate(),
|
||||
},
|
||||
} as any,
|
||||
return;
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// Queue stats endpoint
|
||||
router.get(
|
||||
"/probe/queue/stats",
|
||||
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
|
||||
async (
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
const stats: {
|
||||
waiting: number;
|
||||
active: number;
|
||||
completed: number;
|
||||
failed: number;
|
||||
delayed: number;
|
||||
total: number;
|
||||
} = await ProbeIngestQueueService.getQueueStats();
|
||||
return Response.sendJsonObjectResponse(req, res, stats);
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// Queue size endpoint
|
||||
router.get(
|
||||
"/probe/queue/size",
|
||||
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
|
||||
async (
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
const size: number = await ProbeIngestQueueService.getQueueSize();
|
||||
return Response.sendJsonObjectResponse(req, res, { size });
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// Queue size endpoint for Keda autoscaling (returns pending monitors count for specific probe)
|
||||
router.post(
|
||||
"/metrics/queue-size",
|
||||
ProbeAuthorization.isAuthorizedServiceMiddleware,
|
||||
async (
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
// This endpoint returns the number of monitors pending for the specific probe
|
||||
// to be used by Keda for autoscaling probe replicas
|
||||
|
||||
// Get the probe ID from the authenticated request
|
||||
const data: JSONObject = req.body;
|
||||
const probeId: ObjectID = new ObjectID(data["probeId"] as string);
|
||||
|
||||
if (!probeId) {
|
||||
return Response.sendErrorResponse(
|
||||
req,
|
||||
res,
|
||||
new BadDataException("Probe ID not found"),
|
||||
);
|
||||
}
|
||||
|
||||
// Get pending monitor count for this specific probe
|
||||
const pendingCount: PositiveNumber = await MonitorProbeService.countBy({
|
||||
query: {
|
||||
probeId: probeId,
|
||||
isEnabled: true,
|
||||
nextPingAt: QueryHelper.lessThanEqualToOrNull(
|
||||
OneUptimeDate.getCurrentDate(),
|
||||
),
|
||||
monitor: {
|
||||
...MonitorService.getEnabledMonitorQuery(),
|
||||
},
|
||||
project: {
|
||||
...ProjectService.getActiveProjectStatusQuery(),
|
||||
},
|
||||
},
|
||||
props: {
|
||||
isRoot: true,
|
||||
},
|
||||
});
|
||||
|
||||
// send success response.
|
||||
return Response.sendJsonObjectResponse(req, res, {
|
||||
queueSize: pendingCount.toNumber()
|
||||
});
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
return Response.sendEmptySuccessResponse(req, res);
|
||||
// Queue failed jobs endpoint
|
||||
router.get(
|
||||
"/probe/queue/failed",
|
||||
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
|
||||
async (
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
// Parse pagination parameters from query string
|
||||
const start: number = parseInt(req.query["start"] as string) || 0;
|
||||
const end: number = parseInt(req.query["end"] as string) || 100;
|
||||
|
||||
const failedJobs: Array<{
|
||||
id: string;
|
||||
name: string;
|
||||
data: any;
|
||||
failedReason: string;
|
||||
processedOn: Date | null;
|
||||
finishedOn: Date | null;
|
||||
attemptsMade: number;
|
||||
}> = await ProbeIngestQueueService.getFailedJobs({
|
||||
start,
|
||||
end,
|
||||
});
|
||||
|
||||
return Response.sendJsonObjectResponse(req, res, {
|
||||
failedJobs,
|
||||
pagination: {
|
||||
start,
|
||||
end,
|
||||
count: failedJobs.length,
|
||||
},
|
||||
});
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import MonitorAPI from "./API/Monitor";
|
||||
import ProbeIngest from "./API/Probe";
|
||||
import RegisterAPI from "./API/Register";
|
||||
import MetricsAPI from "./API/Metrics";
|
||||
import { PromiseVoidFunction } from "Common/Types/FunctionTypes";
|
||||
import { ClickhouseAppInstance } from "Common/Server/Infrastructure/ClickhouseDatabase";
|
||||
import PostgresAppInstance from "Common/Server/Infrastructure/PostgresDatabase";
|
||||
@@ -11,6 +12,7 @@ import logger from "Common/Server/Utils/Logger";
|
||||
import Realtime from "Common/Server/Utils/Realtime";
|
||||
import App from "Common/Server/Utils/StartServer";
|
||||
import Telemetry from "Common/Server/Utils/Telemetry";
|
||||
import "./Jobs/ProbeIngest/ProcessProbeIngest";
|
||||
import "ejs";
|
||||
|
||||
const app: ExpressApplication = Express.getExpressApp();
|
||||
@@ -21,6 +23,7 @@ const APP_NAME: string = "probe-ingest";
|
||||
app.use([`/${APP_NAME}`, "/ingestor", "/"], RegisterAPI);
|
||||
app.use([`/${APP_NAME}`, "/ingestor", "/"], MonitorAPI);
|
||||
app.use([`/${APP_NAME}`, "/ingestor", "/"], ProbeIngest);
|
||||
app.use([`/${APP_NAME}`, "/"], MetricsAPI);
|
||||
|
||||
const init: PromiseVoidFunction = async (): Promise<void> => {
|
||||
try {
|
||||
|
||||
82
ProbeIngest/Jobs/ProbeIngest/ProcessProbeIngest.ts
Normal file
82
ProbeIngest/Jobs/ProbeIngest/ProcessProbeIngest.ts
Normal file
@@ -0,0 +1,82 @@
|
||||
import { ProbeIngestJobData } from "../../Services/Queue/ProbeIngestQueueService";
|
||||
import logger from "Common/Server/Utils/Logger";
|
||||
import { QueueJob, QueueName } from "Common/Server/Infrastructure/Queue";
|
||||
import QueueWorker from "Common/Server/Infrastructure/QueueWorker";
|
||||
import BadDataException from "Common/Types/Exception/BadDataException";
|
||||
import JSONFunctions from "Common/Types/JSONFunctions";
|
||||
import ObjectID from "Common/Types/ObjectID";
|
||||
import MonitorResourceUtil from "Common/Server/Utils/Monitor/MonitorResource";
|
||||
import OneUptimeDate from "Common/Types/Date";
|
||||
import MonitorTestService from "Common/Server/Services/MonitorTestService";
|
||||
import ProbeMonitorResponse from "Common/Types/Probe/ProbeMonitorResponse";
|
||||
import { JSONObject } from "Common/Types/JSON";
|
||||
|
||||
// Set up the worker for processing probe ingest queue
|
||||
QueueWorker.getWorker(
|
||||
QueueName.ProbeIngest,
|
||||
async (job: QueueJob): Promise<void> => {
|
||||
logger.debug(`Processing probe ingestion job: ${job.name}`);
|
||||
|
||||
try {
|
||||
const jobData: ProbeIngestJobData = job.data as ProbeIngestJobData;
|
||||
|
||||
await processProbeFromQueue(jobData);
|
||||
|
||||
logger.debug(`Successfully processed probe ingestion job: ${job.name}`);
|
||||
} catch (error) {
|
||||
logger.error(`Error processing probe ingestion job:`);
|
||||
logger.error(error);
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
{ concurrency: 20 }, // Process up to 20 probe ingest jobs concurrently
|
||||
);
|
||||
|
||||
async function processProbeFromQueue(
|
||||
jobData: ProbeIngestJobData,
|
||||
): Promise<void> {
|
||||
const probeResponse: ProbeMonitorResponse = JSONFunctions.deserialize(
|
||||
jobData.probeMonitorResponse["probeMonitorResponse"] as JSONObject,
|
||||
) as any;
|
||||
|
||||
if (!probeResponse) {
|
||||
throw new BadDataException("ProbeMonitorResponse not found");
|
||||
}
|
||||
|
||||
// this is when the resource was ingested.
|
||||
probeResponse.ingestedAt = OneUptimeDate.getCurrentDate();
|
||||
|
||||
if (jobData.jobType === "probe-response") {
|
||||
// Handle regular probe response
|
||||
await MonitorResourceUtil.monitorResource(probeResponse);
|
||||
} else if (jobData.jobType === "monitor-test" && jobData.testId) {
|
||||
// Handle monitor test response
|
||||
const testId: ObjectID = new ObjectID(jobData.testId);
|
||||
|
||||
if (!testId) {
|
||||
throw new BadDataException("TestId not found");
|
||||
}
|
||||
|
||||
probeResponse.ingestedAt = OneUptimeDate.getCurrentDate();
|
||||
|
||||
// save the probe response to the monitor test.
|
||||
await MonitorTestService.updateOneById({
|
||||
id: testId,
|
||||
data: {
|
||||
monitorStepProbeResponse: {
|
||||
[probeResponse.monitorStepId.toString()]: {
|
||||
...JSON.parse(JSON.stringify(probeResponse)),
|
||||
monitoredAt: OneUptimeDate.getCurrentDate(),
|
||||
},
|
||||
} as any,
|
||||
},
|
||||
props: {
|
||||
isRoot: true,
|
||||
},
|
||||
});
|
||||
} else {
|
||||
throw new BadDataException(`Invalid job type: ${jobData.jobType}`);
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug("Probe ingest worker initialized");
|
||||
75
ProbeIngest/Services/Queue/ProbeIngestQueueService.ts
Normal file
75
ProbeIngest/Services/Queue/ProbeIngestQueueService.ts
Normal file
@@ -0,0 +1,75 @@
|
||||
import Queue, { QueueName } from "Common/Server/Infrastructure/Queue";
|
||||
import { JSONObject } from "Common/Types/JSON";
|
||||
import OneUptimeDate from "Common/Types/Date";
|
||||
import logger from "Common/Server/Utils/Logger";
|
||||
|
||||
export interface ProbeIngestJobData {
|
||||
probeMonitorResponse: JSONObject;
|
||||
jobType: "probe-response" | "monitor-test";
|
||||
testId?: string | undefined;
|
||||
ingestionTimestamp: Date;
|
||||
}
|
||||
|
||||
export default class ProbeIngestQueueService {
|
||||
public static async addProbeIngestJob(data: {
|
||||
probeMonitorResponse: JSONObject;
|
||||
jobType: "probe-response" | "monitor-test";
|
||||
testId?: string;
|
||||
}): Promise<void> {
|
||||
try {
|
||||
const jobData: ProbeIngestJobData = {
|
||||
probeMonitorResponse: data.probeMonitorResponse,
|
||||
jobType: data.jobType,
|
||||
testId: data.testId,
|
||||
ingestionTimestamp: OneUptimeDate.getCurrentDate(),
|
||||
};
|
||||
|
||||
const jobId: string = `probe-${data.jobType}-${data.testId || "general"}-${OneUptimeDate.getCurrentDateAsUnixNano()}`;
|
||||
|
||||
await Queue.addJob(
|
||||
QueueName.ProbeIngest,
|
||||
jobId,
|
||||
"ProcessProbeIngest",
|
||||
jobData as unknown as JSONObject,
|
||||
);
|
||||
|
||||
logger.debug(`Added probe ingestion job: ${jobId}`);
|
||||
} catch (error) {
|
||||
logger.error(`Error adding probe ingestion job:`);
|
||||
logger.error(error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
public static async getQueueSize(): Promise<number> {
|
||||
return Queue.getQueueSize(QueueName.ProbeIngest);
|
||||
}
|
||||
|
||||
public static async getQueueStats(): Promise<{
|
||||
waiting: number;
|
||||
active: number;
|
||||
completed: number;
|
||||
failed: number;
|
||||
delayed: number;
|
||||
total: number;
|
||||
}> {
|
||||
return Queue.getQueueStats(QueueName.ProbeIngest);
|
||||
}
|
||||
|
||||
public static getFailedJobs(options?: {
|
||||
start?: number;
|
||||
end?: number;
|
||||
}): Promise<
|
||||
Array<{
|
||||
id: string;
|
||||
name: string;
|
||||
data: JSONObject;
|
||||
failedReason: string;
|
||||
processedOn: Date | null;
|
||||
finishedOn: Date | null;
|
||||
attemptsMade: number;
|
||||
}>
|
||||
> {
|
||||
return Queue.getFailedJobs(QueueName.ProbeIngest, options);
|
||||
}
|
||||
}
|
||||
38
ServerMonitorIngest/API/Metrics.ts
Normal file
38
ServerMonitorIngest/API/Metrics.ts
Normal file
@@ -0,0 +1,38 @@
|
||||
import Express, {
|
||||
ExpressRequest,
|
||||
ExpressResponse,
|
||||
ExpressRouter,
|
||||
NextFunction,
|
||||
} from "Common/Server/Utils/Express";
|
||||
import ServerMonitorIngestQueueService from "../Services/Queue/ServerMonitorIngestQueueService";
|
||||
// import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
|
||||
|
||||
const router: ExpressRouter = Express.getRouter();
|
||||
|
||||
/**
|
||||
* JSON metrics endpoint for KEDA autoscaling
|
||||
* Returns queue size as JSON for KEDA metrics-api scaler
|
||||
*/
|
||||
router.get(
|
||||
"/metrics/queue-size",
|
||||
// ClusterKeyAuthorization.isAuthorizedServiceMiddleware, // Temporarily disabled for KEDA debugging
|
||||
async (
|
||||
_req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
const queueSize: number =
|
||||
await ServerMonitorIngestQueueService.getQueueSize();
|
||||
|
||||
res.setHeader("Content-Type", "application/json");
|
||||
res.status(200).json({
|
||||
queueSize: queueSize,
|
||||
});
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
export default router;
|
||||
@@ -1,8 +1,6 @@
|
||||
import BadDataException from "Common/Types/Exception/BadDataException";
|
||||
import { JSONObject } from "Common/Types/JSON";
|
||||
import JSONFunctions from "Common/Types/JSONFunctions";
|
||||
import MonitorType from "Common/Types/Monitor/MonitorType";
|
||||
import ServerMonitorResponse from "Common/Types/Monitor/ServerMonitor/ServerMonitorResponse";
|
||||
import ObjectID from "Common/Types/ObjectID";
|
||||
import MonitorService from "Common/Server/Services/MonitorService";
|
||||
import Express, {
|
||||
@@ -11,11 +9,11 @@ import Express, {
|
||||
ExpressRouter,
|
||||
NextFunction,
|
||||
} from "Common/Server/Utils/Express";
|
||||
import MonitorResourceUtil from "Common/Server/Utils/Monitor/MonitorResource";
|
||||
import Response from "Common/Server/Utils/Response";
|
||||
import Monitor from "Common/Models/DatabaseModels/Monitor";
|
||||
import OneUptimeDate from "Common/Types/Date";
|
||||
import ProjectService from "Common/Server/Services/ProjectService";
|
||||
import ServerMonitorIngestQueueService from "../Services/Queue/ServerMonitorIngestQueueService";
|
||||
import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
|
||||
|
||||
const router: ExpressRouter = Express.getRouter();
|
||||
|
||||
@@ -77,52 +75,100 @@ router.post(
|
||||
throw new BadDataException("Invalid Secret Key");
|
||||
}
|
||||
|
||||
const monitor: Monitor | null = await MonitorService.findOneBy({
|
||||
query: {
|
||||
serverMonitorSecretKey: new ObjectID(monitorSecretKeyAsString),
|
||||
monitorType: MonitorType.Server,
|
||||
...MonitorService.getEnabledMonitorQuery(),
|
||||
project: {
|
||||
...ProjectService.getActiveProjectStatusQuery(),
|
||||
},
|
||||
},
|
||||
select: {
|
||||
_id: true,
|
||||
},
|
||||
props: {
|
||||
isRoot: true,
|
||||
},
|
||||
});
|
||||
|
||||
if (!monitor) {
|
||||
throw new BadDataException("Monitor not found");
|
||||
}
|
||||
|
||||
// return the response early.
|
||||
Response.sendEmptySuccessResponse(req, res);
|
||||
|
||||
// now process this request.
|
||||
// Add to queue for asynchronous processing
|
||||
await ServerMonitorIngestQueueService.addServerMonitorIngestJob({
|
||||
secretKey: monitorSecretKeyAsString,
|
||||
serverMonitorResponse: req.body as JSONObject,
|
||||
});
|
||||
|
||||
const serverMonitorResponse: ServerMonitorResponse =
|
||||
JSONFunctions.deserialize(
|
||||
req.body["serverMonitorResponse"] as JSONObject,
|
||||
) as any;
|
||||
return;
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
if (!serverMonitorResponse) {
|
||||
throw new BadDataException("Invalid Server Monitor Response");
|
||||
}
|
||||
// Queue stats endpoint
|
||||
router.get(
|
||||
"/server-monitor/queue/stats",
|
||||
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
|
||||
async (
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
const stats: {
|
||||
waiting: number;
|
||||
active: number;
|
||||
completed: number;
|
||||
failed: number;
|
||||
delayed: number;
|
||||
total: number;
|
||||
} = await ServerMonitorIngestQueueService.getQueueStats();
|
||||
return Response.sendJsonObjectResponse(req, res, stats);
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
if (!monitor.id) {
|
||||
throw new BadDataException("Monitor id not found");
|
||||
}
|
||||
// Queue size endpoint
|
||||
router.get(
|
||||
"/server-monitor/queue/size",
|
||||
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
|
||||
async (
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
const size: number = await ServerMonitorIngestQueueService.getQueueSize();
|
||||
return Response.sendJsonObjectResponse(req, res, { size });
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
serverMonitorResponse.monitorId = monitor.id;
|
||||
// Queue failed jobs endpoint
|
||||
router.get(
|
||||
"/server-monitor/queue/failed",
|
||||
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
|
||||
async (
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
// Parse pagination parameters from query string
|
||||
const start: number = parseInt(req.query["start"] as string) || 0;
|
||||
const end: number = parseInt(req.query["end"] as string) || 100;
|
||||
|
||||
serverMonitorResponse.requestReceivedAt = OneUptimeDate.getCurrentDate();
|
||||
serverMonitorResponse.timeNow = OneUptimeDate.getCurrentDate();
|
||||
const failedJobs: Array<{
|
||||
id: string;
|
||||
name: string;
|
||||
data: any;
|
||||
failedReason: string;
|
||||
processedOn: Date | null;
|
||||
finishedOn: Date | null;
|
||||
attemptsMade: number;
|
||||
}> = await ServerMonitorIngestQueueService.getFailedJobs({
|
||||
start,
|
||||
end,
|
||||
});
|
||||
|
||||
// process probe response here.
|
||||
await MonitorResourceUtil.monitorResource(serverMonitorResponse);
|
||||
return Response.sendJsonObjectResponse(req, res, {
|
||||
failedJobs,
|
||||
pagination: {
|
||||
start,
|
||||
end,
|
||||
count: failedJobs.length,
|
||||
},
|
||||
});
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import ServerMonitorAPI from "./API/ServerMonitor";
|
||||
import MetricsAPI from "./API/Metrics";
|
||||
import { PromiseVoidFunction } from "Common/Types/FunctionTypes";
|
||||
import { ClickhouseAppInstance } from "Common/Server/Infrastructure/ClickhouseDatabase";
|
||||
import PostgresAppInstance from "Common/Server/Infrastructure/PostgresDatabase";
|
||||
@@ -9,12 +10,14 @@ import logger from "Common/Server/Utils/Logger";
|
||||
import Realtime from "Common/Server/Utils/Realtime";
|
||||
import App from "Common/Server/Utils/StartServer";
|
||||
import Telemetry from "Common/Server/Utils/Telemetry";
|
||||
import "./Jobs/ServerMonitorIngest/ProcessServerMonitorIngest";
|
||||
|
||||
const app: ExpressApplication = Express.getExpressApp();
|
||||
|
||||
const APP_NAME: string = "server-monitor-ingest";
|
||||
|
||||
app.use([`/${APP_NAME}`, "/"], ServerMonitorAPI);
|
||||
app.use([`/${APP_NAME}`, "/"], MetricsAPI);
|
||||
|
||||
const init: PromiseVoidFunction = async (): Promise<void> => {
|
||||
try {
|
||||
|
||||
@@ -0,0 +1,92 @@
|
||||
import { ServerMonitorIngestJobData } from "../../Services/Queue/ServerMonitorIngestQueueService";
|
||||
import logger from "Common/Server/Utils/Logger";
|
||||
import { QueueJob, QueueName } from "Common/Server/Infrastructure/Queue";
|
||||
import QueueWorker from "Common/Server/Infrastructure/QueueWorker";
|
||||
import BadDataException from "Common/Types/Exception/BadDataException";
|
||||
import { JSONObject } from "Common/Types/JSON";
|
||||
import JSONFunctions from "Common/Types/JSONFunctions";
|
||||
import MonitorType from "Common/Types/Monitor/MonitorType";
|
||||
import ServerMonitorResponse from "Common/Types/Monitor/ServerMonitor/ServerMonitorResponse";
|
||||
import ObjectID from "Common/Types/ObjectID";
|
||||
import MonitorService from "Common/Server/Services/MonitorService";
|
||||
import MonitorResourceUtil from "Common/Server/Utils/Monitor/MonitorResource";
|
||||
import Monitor from "Common/Models/DatabaseModels/Monitor";
|
||||
import OneUptimeDate from "Common/Types/Date";
|
||||
import ProjectService from "Common/Server/Services/ProjectService";
|
||||
|
||||
// Set up the worker for processing server monitor ingest queue
|
||||
QueueWorker.getWorker(
|
||||
QueueName.ServerMonitorIngest,
|
||||
async (job: QueueJob): Promise<void> => {
|
||||
logger.debug(`Processing server monitor ingestion job: ${job.name}`);
|
||||
|
||||
try {
|
||||
const jobData: ServerMonitorIngestJobData =
|
||||
job.data as ServerMonitorIngestJobData;
|
||||
|
||||
await processServerMonitorFromQueue(jobData);
|
||||
|
||||
logger.debug(
|
||||
`Successfully processed server monitor ingestion job: ${job.name}`,
|
||||
);
|
||||
} catch (error) {
|
||||
logger.error(`Error processing server monitor ingestion job:`);
|
||||
logger.error(error);
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
{ concurrency: 20 }, // Process up to 20 server monitor ingest jobs concurrently
|
||||
);
|
||||
|
||||
async function processServerMonitorFromQueue(
|
||||
jobData: ServerMonitorIngestJobData,
|
||||
): Promise<void> {
|
||||
const monitorSecretKeyAsString: string = jobData.secretKey;
|
||||
|
||||
if (!monitorSecretKeyAsString) {
|
||||
throw new BadDataException("Invalid Secret Key");
|
||||
}
|
||||
|
||||
const monitor: Monitor | null = await MonitorService.findOneBy({
|
||||
query: {
|
||||
serverMonitorSecretKey: new ObjectID(monitorSecretKeyAsString),
|
||||
monitorType: MonitorType.Server,
|
||||
...MonitorService.getEnabledMonitorQuery(),
|
||||
project: {
|
||||
...ProjectService.getActiveProjectStatusQuery(),
|
||||
},
|
||||
},
|
||||
select: {
|
||||
_id: true,
|
||||
},
|
||||
props: {
|
||||
isRoot: true,
|
||||
},
|
||||
});
|
||||
|
||||
if (!monitor) {
|
||||
throw new BadDataException("Monitor not found");
|
||||
}
|
||||
|
||||
const serverMonitorResponse: ServerMonitorResponse =
|
||||
JSONFunctions.deserialize(
|
||||
jobData.serverMonitorResponse["serverMonitorResponse"] as JSONObject,
|
||||
) as any;
|
||||
|
||||
if (!serverMonitorResponse) {
|
||||
throw new BadDataException("Invalid Server Monitor Response");
|
||||
}
|
||||
|
||||
if (!monitor.id) {
|
||||
throw new BadDataException("Monitor id not found");
|
||||
}
|
||||
|
||||
serverMonitorResponse.monitorId = monitor.id;
|
||||
serverMonitorResponse.requestReceivedAt = OneUptimeDate.getCurrentDate();
|
||||
serverMonitorResponse.timeNow = OneUptimeDate.getCurrentDate();
|
||||
|
||||
// process probe response here.
|
||||
await MonitorResourceUtil.monitorResource(serverMonitorResponse);
|
||||
}
|
||||
|
||||
logger.debug("Server monitor ingest worker initialized");
|
||||
@@ -0,0 +1,72 @@
|
||||
import Queue, { QueueName } from "Common/Server/Infrastructure/Queue";
|
||||
import { JSONObject } from "Common/Types/JSON";
|
||||
import OneUptimeDate from "Common/Types/Date";
|
||||
import logger from "Common/Server/Utils/Logger";
|
||||
|
||||
export interface ServerMonitorIngestJobData {
|
||||
secretKey: string;
|
||||
serverMonitorResponse: JSONObject;
|
||||
ingestionTimestamp: Date;
|
||||
}
|
||||
|
||||
export default class ServerMonitorIngestQueueService {
|
||||
public static async addServerMonitorIngestJob(data: {
|
||||
secretKey: string;
|
||||
serverMonitorResponse: JSONObject;
|
||||
}): Promise<void> {
|
||||
try {
|
||||
const jobData: ServerMonitorIngestJobData = {
|
||||
secretKey: data.secretKey,
|
||||
serverMonitorResponse: data.serverMonitorResponse,
|
||||
ingestionTimestamp: OneUptimeDate.getCurrentDate(),
|
||||
};
|
||||
|
||||
const jobId: string = `server-monitor-${data.secretKey}-${OneUptimeDate.getCurrentDateAsUnixNano()}`;
|
||||
|
||||
await Queue.addJob(
|
||||
QueueName.ServerMonitorIngest,
|
||||
jobId,
|
||||
"ProcessServerMonitorIngest",
|
||||
jobData as unknown as JSONObject,
|
||||
);
|
||||
|
||||
logger.debug(`Added server monitor ingestion job: ${jobId}`);
|
||||
} catch (error) {
|
||||
logger.error(`Error adding server monitor ingestion job:`);
|
||||
logger.error(error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
public static async getQueueSize(): Promise<number> {
|
||||
return Queue.getQueueSize(QueueName.ServerMonitorIngest);
|
||||
}
|
||||
|
||||
public static async getQueueStats(): Promise<{
|
||||
waiting: number;
|
||||
active: number;
|
||||
completed: number;
|
||||
failed: number;
|
||||
delayed: number;
|
||||
total: number;
|
||||
}> {
|
||||
return Queue.getQueueStats(QueueName.ServerMonitorIngest);
|
||||
}
|
||||
|
||||
public static getFailedJobs(options?: {
|
||||
start?: number;
|
||||
end?: number;
|
||||
}): Promise<
|
||||
Array<{
|
||||
id: string;
|
||||
name: string;
|
||||
data: JSONObject;
|
||||
failedReason: string;
|
||||
processedOn: Date | null;
|
||||
finishedOn: Date | null;
|
||||
attemptsMade: number;
|
||||
}>
|
||||
> {
|
||||
return Queue.getFailedJobs(QueueName.ServerMonitorIngest, options);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user