diff --git a/FluentIngest/Jobs/FluentIngest/ProcessFluentIngest.ts b/FluentIngest/Jobs/FluentIngest/ProcessFluentIngest.ts index f02d3ff87b..1294e715c7 100644 --- a/FluentIngest/Jobs/FluentIngest/ProcessFluentIngest.ts +++ b/FluentIngest/Jobs/FluentIngest/ProcessFluentIngest.ts @@ -8,7 +8,6 @@ import { JSONObject } from "Common/Types/JSON"; import LogService from "Common/Server/Services/LogService"; import LogSeverity from "Common/Types/Log/LogSeverity"; import OTelIngestService from "Common/Server/Services/OpenTelemetryIngestService"; -import Log from "Common/Models/AnalyticsModels/Log"; import { FLUENT_INGEST_CONCURRENCY } from "../../Config"; interface FluentIngestProcessData { @@ -17,6 +16,8 @@ interface FluentIngestProcessData { requestHeaders: JSONObject; } +const FLUENT_INGEST_LOG_FLUSH_BATCH_SIZE: number = 500; + // Set up the worker for processing fluent ingest queue QueueWorker.getWorker( QueueName.FluentIngest, @@ -46,7 +47,7 @@ QueueWorker.getWorker( async function processFluentIngestFromQueue( data: FluentIngestProcessData, ): Promise { - const dbLogs: Array = []; + const dbLogs: Array = []; let logItems: Array | JSONObject = data.requestBody as | Array @@ -82,43 +83,61 @@ async function processFluentIngestFromQueue( logItems = [logItems]; } - for (let logItem of logItems) { - const dbLog: Log = new Log(); + for (const logItem of logItems) { + const logBody: string = + typeof logItem === "string" ? logItem : JSON.stringify(logItem); - dbLog.projectId = data.projectId; - dbLog.serviceId = telemetryService.serviceId; - dbLog.severityNumber = 0; - const currentTimeAndDate: Date = OneUptimeDate.getCurrentDate(); - dbLog.timeUnixNano = OneUptimeDate.toUnixNano(currentTimeAndDate); - dbLog.time = currentTimeAndDate; + const ingestionDate: Date = OneUptimeDate.getCurrentDate(); + const ingestionIso: string = OneUptimeDate.toString(ingestionDate); + const timeUnixNano: number = OneUptimeDate.toUnixNano(ingestionDate); - dbLog.severityText = LogSeverity.Unspecified; + const logRow: JSONObject = { + _id: ObjectID.generate().toString(), + createdAt: ingestionIso, + updatedAt: ingestionIso, + projectId: data.projectId.toString(), + serviceId: telemetryService.serviceId.toString(), + time: ingestionIso, + timeUnixNano: Math.trunc(timeUnixNano).toString(), + severityNumber: 0, + severityText: LogSeverity.Unspecified, + attributes: {}, + attributeKeys: [], + traceId: "", + spanId: "", + body: logBody, + }; - if (typeof logItem === "string") { - // check if its parseable to json - try { - logItem = JSON.parse(logItem); - } catch { - // do nothing - } + dbLogs.push(logRow); + + if (dbLogs.length >= FLUENT_INGEST_LOG_FLUSH_BATCH_SIZE) { + await flushLogBuffer(dbLogs); } - - if (typeof logItem !== "string") { - logItem = JSON.stringify(logItem); - } - - dbLog.body = logItem as string; - dbLog.attributeKeys = []; - - dbLogs.push(dbLog); } - await LogService.createMany({ - items: dbLogs, - props: { - isRoot: true, - }, - }); + await flushLogBuffer(dbLogs, true); } logger.debug("Fluent ingest worker initialized"); + +async function flushLogBuffer( + logs: Array, + force: boolean = false, +): Promise { + while ( + logs.length >= FLUENT_INGEST_LOG_FLUSH_BATCH_SIZE || + (force && logs.length > 0) + ) { + const batchSize: number = Math.min( + logs.length, + FLUENT_INGEST_LOG_FLUSH_BATCH_SIZE, + ); + const batch: Array = logs.splice(0, batchSize); + + if (batch.length === 0) { + continue; + } + + await LogService.insertJsonRows(batch); + } +}