Compare commits

...

6 Commits

15 changed files with 39 additions and 51 deletions

View File

@@ -52,7 +52,9 @@ router.post(
Response.sendEmptySuccessResponse(req, res);
// Add to queue for asynchronous processing
await FluentIngestQueueService.addFluentIngestJob(req as TelemetryRequest);
await FluentIngestQueueService.addFluentIngestJob(
req as TelemetryRequest,
);
return;
} catch (err) {

View File

@@ -34,9 +34,7 @@ QueueWorker.getWorker(
requestHeaders: jobData.requestHeaders,
});
logger.debug(
`Successfully processed fluent ingestion job: ${job.name}`,
);
logger.debug(`Successfully processed fluent ingestion job: ${job.name}`);
} catch (error) {
logger.error(`Error processing fluent ingestion job:`);
logger.error(error);
@@ -55,8 +53,9 @@ async function processFluentIngestFromQueue(
| Array<JSONObject | string>
| JSONObject;
let oneuptimeServiceName: string | string[] | undefined =
data.requestHeaders["x-oneuptime-service-name"] as string | string[] | undefined;
let oneuptimeServiceName: string | string[] | undefined = data.requestHeaders[
"x-oneuptime-service-name"
] as string | string[] | undefined;
if (!oneuptimeServiceName) {
oneuptimeServiceName = "Unknown Service";
@@ -124,7 +123,9 @@ async function processFluentIngestFromQueue(
OTelIngestService.recordDataIngestedUsgaeBilling({
services: {
[oneuptimeServiceName as string]: {
dataIngestedInGB: JSONFunctions.getSizeOfJSONinGB(data.requestBody as JSONObject),
dataIngestedInGB: JSONFunctions.getSizeOfJSONinGB(
data.requestBody as JSONObject,
),
dataRententionInDays: telemetryService.dataRententionInDays,
serviceId: telemetryService.serviceId,
serviceName: oneuptimeServiceName as string,

View File

@@ -12,9 +12,7 @@ export interface FluentIngestJobData {
}
export default class FluentIngestQueueService {
public static async addFluentIngestJob(
req: TelemetryRequest,
): Promise<void> {
public static async addFluentIngestJob(req: TelemetryRequest): Promise<void> {
try {
const jobData: FluentIngestJobData = {
projectId: req.projectId.toString(),

View File

@@ -112,7 +112,7 @@ spec:
---
# OneUptime fluent-ingest autoscaler
{{- if not $.Values.fluentIngest.disableAutoscaler }}
{{- if and (not $.Values.fluentIngest.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.fluentIngest.keda.enabled)) }}
{{- $fluentIngestAutoScalerArgs := dict "ServiceName" "fluent-ingest" "Release" $.Release "Values" $.Values -}}
{{- include "oneuptime.autoscaler" $fluentIngestAutoScalerArgs }}
{{- end }}

View File

@@ -112,7 +112,7 @@ spec:
---
# OneUptime incoming-request-ingest autoscaler
{{- if not $.Values.incomingRequestIngest.disableAutoscaler }}
{{- if and (not $.Values.incomingRequestIngest.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.incomingRequestIngest.keda.enabled)) }}
{{- $incomingRequestIngestAutoScalerArgs := dict "ServiceName" "incoming-request-ingest" "Release" $.Release "Values" $.Values -}}
{{- include "oneuptime.autoscaler" $incomingRequestIngestAutoScalerArgs }}
{{- end }}

View File

@@ -112,7 +112,7 @@ spec:
---
# OneUptime probe-ingest autoscaler
{{- if not $.Values.probeIngest.disableAutoscaler }}
{{- if and (not $.Values.probeIngest.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.probeIngest.keda.enabled)) }}
{{- $probeIngestAutoScalerArgs := dict "ServiceName" "probe-ingest" "Release" $.Release "Values" $.Values -}}
{{- include "oneuptime.autoscaler" $probeIngestAutoScalerArgs }}
{{- end }}

View File

@@ -112,7 +112,7 @@ spec:
---
# OneUptime server-monitor-ingest autoscaler
{{- if not $.Values.serverMonitorIngest.disableAutoscaler }}
{{- if and (not $.Values.serverMonitorIngest.disableAutoscaler) (not (and $.Values.keda.enabled $.Values.serverMonitorIngest.keda.enabled)) }}
{{- $serverMonitorIngestAutoScalerArgs := dict "ServiceName" "server-monitor-ingest" "Release" $.Release "Values" $.Values -}}
{{- include "oneuptime.autoscaler" $serverMonitorIngestAutoScalerArgs }}
{{- end }}

View File

@@ -105,7 +105,8 @@ router.get(
next: NextFunction,
): Promise<void> => {
try {
const size: number = await IncomingRequestIngestQueueService.getQueueSize();
const size: number =
await IncomingRequestIngestQueueService.getQueueSize();
return Response.sendJsonObjectResponse(req, res, { size });
} catch (err) {
return next(err);

View File

@@ -22,7 +22,8 @@ router.get(
next: NextFunction,
): Promise<void> => {
try {
const queueSize: number = await IncomingRequestIngestQueueService.getQueueSize();
const queueSize: number =
await IncomingRequestIngestQueueService.getQueueSize();
res.setHeader("Content-Type", "application/json");
res.status(200).json({

View File

@@ -98,12 +98,7 @@ async function processIncomingRequestFromQueue(
};
// process probe response here.
MonitorResourceUtil.monitorResource(incomingRequest).catch((err: Error) => {
// do nothing.
// we don't want to throw error here.
// we just want to log the error.
logger.error(err);
});
await MonitorResourceUtil.monitorResource(incomingRequest);
}
logger.debug("Incoming request ingest worker initialized");

View File

@@ -13,14 +13,12 @@ export interface IncomingRequestIngestJobData {
}
export default class IncomingRequestIngestQueueService {
public static async addIncomingRequestIngestJob(
data: {
secretKey: string;
requestHeaders: Dictionary<string>;
requestBody: string | JSONObject;
requestMethod: string;
},
): Promise<void> {
public static async addIncomingRequestIngestJob(data: {
secretKey: string;
requestHeaders: Dictionary<string>;
requestBody: string | JSONObject;
requestMethod: string;
}): Promise<void> {
try {
const jobData: IncomingRequestIngestJobData = {
secretKey: data.secretKey,

View File

@@ -22,9 +22,7 @@ QueueWorker.getWorker(
await processProbeFromQueue(jobData);
logger.debug(
`Successfully processed probe ingestion job: ${job.name}`,
);
logger.debug(`Successfully processed probe ingestion job: ${job.name}`);
} catch (error) {
logger.error(`Error processing probe ingestion job:`);
logger.error(error);
@@ -50,10 +48,7 @@ async function processProbeFromQueue(
if (jobData.jobType === "probe-response") {
// Handle regular probe response
MonitorResourceUtil.monitorResource(probeResponse).catch((err: Error) => {
logger.error("Error in monitor resource");
logger.error(err);
});
await MonitorResourceUtil.monitorResource(probeResponse);
} else if (jobData.jobType === "monitor-test" && jobData.testId) {
// Handle monitor test response
const testId: ObjectID = new ObjectID(jobData.testId);

View File

@@ -11,13 +11,11 @@ export interface ProbeIngestJobData {
}
export default class ProbeIngestQueueService {
public static async addProbeIngestJob(
data: {
probeMonitorResponse: JSONObject;
jobType: "probe-response" | "monitor-test";
testId?: string;
},
): Promise<void> {
public static async addProbeIngestJob(data: {
probeMonitorResponse: JSONObject;
jobType: "probe-response" | "monitor-test";
testId?: string;
}): Promise<void> {
try {
const jobData: ProbeIngestJobData = {
probeMonitorResponse: data.probeMonitorResponse,

View File

@@ -22,7 +22,8 @@ router.get(
next: NextFunction,
): Promise<void> => {
try {
const queueSize: number = await ServerMonitorIngestQueueService.getQueueSize();
const queueSize: number =
await ServerMonitorIngestQueueService.getQueueSize();
res.setHeader("Content-Type", "application/json");
res.status(200).json({

View File

@@ -10,12 +10,10 @@ export interface ServerMonitorIngestJobData {
}
export default class ServerMonitorIngestQueueService {
public static async addServerMonitorIngestJob(
data: {
secretKey: string;
serverMonitorResponse: JSONObject;
},
): Promise<void> {
public static async addServerMonitorIngestJob(data: {
secretKey: string;
serverMonitorResponse: JSONObject;
}): Promise<void> {
try {
const jobData: ServerMonitorIngestJobData = {
secretKey: data.secretKey,