mirror of
https://github.com/OneUptime/oneuptime.git
synced 2026-04-06 08:42:13 +02:00
Compare commits
6 Commits
queue-work
...
7.0.4832
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c894b112e6 | ||
|
|
304baf1bb4 | ||
|
|
9adea6b1ba | ||
|
|
5498521e02 | ||
|
|
9e97c6ddbc | ||
|
|
63272e09f8 |
@@ -52,7 +52,9 @@ router.post(
|
||||
Response.sendEmptySuccessResponse(req, res);
|
||||
|
||||
// Add to queue for asynchronous processing
|
||||
await FluentIngestQueueService.addFluentIngestJob(req as TelemetryRequest);
|
||||
await FluentIngestQueueService.addFluentIngestJob(
|
||||
req as TelemetryRequest,
|
||||
);
|
||||
|
||||
return;
|
||||
} catch (err) {
|
||||
|
||||
@@ -34,9 +34,7 @@ QueueWorker.getWorker(
|
||||
requestHeaders: jobData.requestHeaders,
|
||||
});
|
||||
|
||||
logger.debug(
|
||||
`Successfully processed fluent ingestion job: ${job.name}`,
|
||||
);
|
||||
logger.debug(`Successfully processed fluent ingestion job: ${job.name}`);
|
||||
} catch (error) {
|
||||
logger.error(`Error processing fluent ingestion job:`);
|
||||
logger.error(error);
|
||||
@@ -55,8 +53,9 @@ async function processFluentIngestFromQueue(
|
||||
| Array<JSONObject | string>
|
||||
| JSONObject;
|
||||
|
||||
let oneuptimeServiceName: string | string[] | undefined =
|
||||
data.requestHeaders["x-oneuptime-service-name"] as string | string[] | undefined;
|
||||
let oneuptimeServiceName: string | string[] | undefined = data.requestHeaders[
|
||||
"x-oneuptime-service-name"
|
||||
] as string | string[] | undefined;
|
||||
|
||||
if (!oneuptimeServiceName) {
|
||||
oneuptimeServiceName = "Unknown Service";
|
||||
@@ -124,7 +123,9 @@ async function processFluentIngestFromQueue(
|
||||
OTelIngestService.recordDataIngestedUsgaeBilling({
|
||||
services: {
|
||||
[oneuptimeServiceName as string]: {
|
||||
dataIngestedInGB: JSONFunctions.getSizeOfJSONinGB(data.requestBody as JSONObject),
|
||||
dataIngestedInGB: JSONFunctions.getSizeOfJSONinGB(
|
||||
data.requestBody as JSONObject,
|
||||
),
|
||||
dataRententionInDays: telemetryService.dataRententionInDays,
|
||||
serviceId: telemetryService.serviceId,
|
||||
serviceName: oneuptimeServiceName as string,
|
||||
|
||||
@@ -12,9 +12,7 @@ export interface FluentIngestJobData {
|
||||
}
|
||||
|
||||
export default class FluentIngestQueueService {
|
||||
public static async addFluentIngestJob(
|
||||
req: TelemetryRequest,
|
||||
): Promise<void> {
|
||||
public static async addFluentIngestJob(req: TelemetryRequest): Promise<void> {
|
||||
try {
|
||||
const jobData: FluentIngestJobData = {
|
||||
projectId: req.projectId.toString(),
|
||||
|
||||
@@ -112,7 +112,7 @@ spec:
|
||||
---
|
||||
|
||||
# OneUptime fluent-ingest autoscaler
|
||||
{{- if not $.Values.fluentIngest.disableAutoscaler }}
|
||||
{{- if and (not $.Values.fluentIngest.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.fluentIngest.keda.enabled)) }}
|
||||
{{- $fluentIngestAutoScalerArgs := dict "ServiceName" "fluent-ingest" "Release" $.Release "Values" $.Values -}}
|
||||
{{- include "oneuptime.autoscaler" $fluentIngestAutoScalerArgs }}
|
||||
{{- end }}
|
||||
|
||||
@@ -112,7 +112,7 @@ spec:
|
||||
---
|
||||
|
||||
# OneUptime incoming-request-ingest autoscaler
|
||||
{{- if not $.Values.incomingRequestIngest.disableAutoscaler }}
|
||||
{{- if and (not $.Values.incomingRequestIngest.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.incomingRequestIngest.keda.enabled)) }}
|
||||
{{- $incomingRequestIngestAutoScalerArgs := dict "ServiceName" "incoming-request-ingest" "Release" $.Release "Values" $.Values -}}
|
||||
{{- include "oneuptime.autoscaler" $incomingRequestIngestAutoScalerArgs }}
|
||||
{{- end }}
|
||||
|
||||
@@ -112,7 +112,7 @@ spec:
|
||||
---
|
||||
|
||||
# OneUptime probe-ingest autoscaler
|
||||
{{- if not $.Values.probeIngest.disableAutoscaler }}
|
||||
{{- if and (not $.Values.probeIngest.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.probeIngest.keda.enabled)) }}
|
||||
{{- $probeIngestAutoScalerArgs := dict "ServiceName" "probe-ingest" "Release" $.Release "Values" $.Values -}}
|
||||
{{- include "oneuptime.autoscaler" $probeIngestAutoScalerArgs }}
|
||||
{{- end }}
|
||||
|
||||
@@ -112,7 +112,7 @@ spec:
|
||||
---
|
||||
|
||||
# OneUptime server-monitor-ingest autoscaler
|
||||
{{- if not $.Values.serverMonitorIngest.disableAutoscaler }}
|
||||
{{- if and (not $.Values.serverMonitorIngest.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.serverMonitorIngest.keda.enabled)) }}
|
||||
{{- $serverMonitorIngestAutoScalerArgs := dict "ServiceName" "server-monitor-ingest" "Release" $.Release "Values" $.Values -}}
|
||||
{{- include "oneuptime.autoscaler" $serverMonitorIngestAutoScalerArgs }}
|
||||
{{- end }}
|
||||
|
||||
@@ -105,7 +105,8 @@ router.get(
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
const size: number = await IncomingRequestIngestQueueService.getQueueSize();
|
||||
const size: number =
|
||||
await IncomingRequestIngestQueueService.getQueueSize();
|
||||
return Response.sendJsonObjectResponse(req, res, { size });
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
|
||||
@@ -22,7 +22,8 @@ router.get(
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
const queueSize: number = await IncomingRequestIngestQueueService.getQueueSize();
|
||||
const queueSize: number =
|
||||
await IncomingRequestIngestQueueService.getQueueSize();
|
||||
|
||||
res.setHeader("Content-Type", "application/json");
|
||||
res.status(200).json({
|
||||
|
||||
@@ -98,12 +98,7 @@ async function processIncomingRequestFromQueue(
|
||||
};
|
||||
|
||||
// process probe response here.
|
||||
MonitorResourceUtil.monitorResource(incomingRequest).catch((err: Error) => {
|
||||
// do nothing.
|
||||
// we don't want to throw error here.
|
||||
// we just want to log the error.
|
||||
logger.error(err);
|
||||
});
|
||||
await MonitorResourceUtil.monitorResource(incomingRequest);
|
||||
}
|
||||
|
||||
logger.debug("Incoming request ingest worker initialized");
|
||||
|
||||
@@ -13,14 +13,12 @@ export interface IncomingRequestIngestJobData {
|
||||
}
|
||||
|
||||
export default class IncomingRequestIngestQueueService {
|
||||
public static async addIncomingRequestIngestJob(
|
||||
data: {
|
||||
secretKey: string;
|
||||
requestHeaders: Dictionary<string>;
|
||||
requestBody: string | JSONObject;
|
||||
requestMethod: string;
|
||||
},
|
||||
): Promise<void> {
|
||||
public static async addIncomingRequestIngestJob(data: {
|
||||
secretKey: string;
|
||||
requestHeaders: Dictionary<string>;
|
||||
requestBody: string | JSONObject;
|
||||
requestMethod: string;
|
||||
}): Promise<void> {
|
||||
try {
|
||||
const jobData: IncomingRequestIngestJobData = {
|
||||
secretKey: data.secretKey,
|
||||
|
||||
@@ -22,9 +22,7 @@ QueueWorker.getWorker(
|
||||
|
||||
await processProbeFromQueue(jobData);
|
||||
|
||||
logger.debug(
|
||||
`Successfully processed probe ingestion job: ${job.name}`,
|
||||
);
|
||||
logger.debug(`Successfully processed probe ingestion job: ${job.name}`);
|
||||
} catch (error) {
|
||||
logger.error(`Error processing probe ingestion job:`);
|
||||
logger.error(error);
|
||||
@@ -50,10 +48,7 @@ async function processProbeFromQueue(
|
||||
|
||||
if (jobData.jobType === "probe-response") {
|
||||
// Handle regular probe response
|
||||
MonitorResourceUtil.monitorResource(probeResponse).catch((err: Error) => {
|
||||
logger.error("Error in monitor resource");
|
||||
logger.error(err);
|
||||
});
|
||||
await MonitorResourceUtil.monitorResource(probeResponse);
|
||||
} else if (jobData.jobType === "monitor-test" && jobData.testId) {
|
||||
// Handle monitor test response
|
||||
const testId: ObjectID = new ObjectID(jobData.testId);
|
||||
|
||||
@@ -11,13 +11,11 @@ export interface ProbeIngestJobData {
|
||||
}
|
||||
|
||||
export default class ProbeIngestQueueService {
|
||||
public static async addProbeIngestJob(
|
||||
data: {
|
||||
probeMonitorResponse: JSONObject;
|
||||
jobType: "probe-response" | "monitor-test";
|
||||
testId?: string;
|
||||
},
|
||||
): Promise<void> {
|
||||
public static async addProbeIngestJob(data: {
|
||||
probeMonitorResponse: JSONObject;
|
||||
jobType: "probe-response" | "monitor-test";
|
||||
testId?: string;
|
||||
}): Promise<void> {
|
||||
try {
|
||||
const jobData: ProbeIngestJobData = {
|
||||
probeMonitorResponse: data.probeMonitorResponse,
|
||||
|
||||
@@ -22,7 +22,8 @@ router.get(
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
const queueSize: number = await ServerMonitorIngestQueueService.getQueueSize();
|
||||
const queueSize: number =
|
||||
await ServerMonitorIngestQueueService.getQueueSize();
|
||||
|
||||
res.setHeader("Content-Type", "application/json");
|
||||
res.status(200).json({
|
||||
|
||||
@@ -10,12 +10,10 @@ export interface ServerMonitorIngestJobData {
|
||||
}
|
||||
|
||||
export default class ServerMonitorIngestQueueService {
|
||||
public static async addServerMonitorIngestJob(
|
||||
data: {
|
||||
secretKey: string;
|
||||
serverMonitorResponse: JSONObject;
|
||||
},
|
||||
): Promise<void> {
|
||||
public static async addServerMonitorIngestJob(data: {
|
||||
secretKey: string;
|
||||
serverMonitorResponse: JSONObject;
|
||||
}): Promise<void> {
|
||||
try {
|
||||
const jobData: ServerMonitorIngestJobData = {
|
||||
secretKey: data.secretKey,
|
||||
|
||||
Reference in New Issue
Block a user