refactor(fluent-ingest): replace Log model with JSON rows and add batching

- Remove dependency on Log model; build log JSON rows inline (IDs, ISO timestamps, unix nano)
- Buffer logs and flush in batches using new flushLogBuffer helper and LogService.insertJsonRows
- Add FLUENT_INGEST_LOG_FLUSH_BATCH_SIZE constant and simplify processing loop
This commit is contained in:
Nawaz Dhandala
2025-10-28 12:20:43 +00:00
parent f5de74611d
commit 0e272f0f31

View File

@@ -8,7 +8,6 @@ import { JSONObject } from "Common/Types/JSON";
import LogService from "Common/Server/Services/LogService";
import LogSeverity from "Common/Types/Log/LogSeverity";
import OTelIngestService from "Common/Server/Services/OpenTelemetryIngestService";
import Log from "Common/Models/AnalyticsModels/Log";
import { FLUENT_INGEST_CONCURRENCY } from "../../Config";
interface FluentIngestProcessData {
@@ -17,6 +16,8 @@ interface FluentIngestProcessData {
requestHeaders: JSONObject;
}
const FLUENT_INGEST_LOG_FLUSH_BATCH_SIZE: number = 500;
// Set up the worker for processing fluent ingest queue
QueueWorker.getWorker(
QueueName.FluentIngest,
@@ -46,7 +47,7 @@ QueueWorker.getWorker(
async function processFluentIngestFromQueue(
data: FluentIngestProcessData,
): Promise<void> {
const dbLogs: Array<Log> = [];
const dbLogs: Array<JSONObject> = [];
let logItems: Array<JSONObject | string> | JSONObject = data.requestBody as
| Array<JSONObject | string>
@@ -82,43 +83,61 @@ async function processFluentIngestFromQueue(
logItems = [logItems];
}
for (let logItem of logItems) {
const dbLog: Log = new Log();
for (const logItem of logItems) {
const logBody: string =
typeof logItem === "string" ? logItem : JSON.stringify(logItem);
dbLog.projectId = data.projectId;
dbLog.serviceId = telemetryService.serviceId;
dbLog.severityNumber = 0;
const currentTimeAndDate: Date = OneUptimeDate.getCurrentDate();
dbLog.timeUnixNano = OneUptimeDate.toUnixNano(currentTimeAndDate);
dbLog.time = currentTimeAndDate;
const ingestionDate: Date = OneUptimeDate.getCurrentDate();
const ingestionIso: string = OneUptimeDate.toString(ingestionDate);
const timeUnixNano: number = OneUptimeDate.toUnixNano(ingestionDate);
dbLog.severityText = LogSeverity.Unspecified;
const logRow: JSONObject = {
_id: ObjectID.generate().toString(),
createdAt: ingestionIso,
updatedAt: ingestionIso,
projectId: data.projectId.toString(),
serviceId: telemetryService.serviceId.toString(),
time: ingestionIso,
timeUnixNano: Math.trunc(timeUnixNano).toString(),
severityNumber: 0,
severityText: LogSeverity.Unspecified,
attributes: {},
attributeKeys: [],
traceId: "",
spanId: "",
body: logBody,
};
if (typeof logItem === "string") {
// check if its parseable to json
try {
logItem = JSON.parse(logItem);
} catch {
// do nothing
}
dbLogs.push(logRow);
if (dbLogs.length >= FLUENT_INGEST_LOG_FLUSH_BATCH_SIZE) {
await flushLogBuffer(dbLogs);
}
if (typeof logItem !== "string") {
logItem = JSON.stringify(logItem);
}
dbLog.body = logItem as string;
dbLog.attributeKeys = [];
dbLogs.push(dbLog);
}
await LogService.createMany({
items: dbLogs,
props: {
isRoot: true,
},
});
await flushLogBuffer(dbLogs, true);
}
logger.debug("Fluent ingest worker initialized");
async function flushLogBuffer(
logs: Array<JSONObject>,
force: boolean = false,
): Promise<void> {
while (
logs.length >= FLUENT_INGEST_LOG_FLUSH_BATCH_SIZE ||
(force && logs.length > 0)
) {
const batchSize: number = Math.min(
logs.length,
FLUENT_INGEST_LOG_FLUSH_BATCH_SIZE,
);
const batch: Array<JSONObject> = logs.splice(0, batchSize);
if (batch.length === 0) {
continue;
}
await LogService.insertJsonRows(batch);
}
}