mirror of
https://github.com/OneUptime/oneuptime.git
synced 2026-04-06 00:32:12 +02:00
chore(fluent-ingest): migrate fluent log ingest into open-telemetry-ingest and remove legacy fluent-ingest service
- Move Fluent/Fluent Bit logs ingestion into open-telemetry-ingest:
- Add OpenTelemetryIngest/API/Fluent.ts (routes for /fluentd and queue endpoints)
- Add Queue service, job worker and processor:
- OpenTelemetryIngest/Services/Queue/FluentLogsQueueService.ts
- OpenTelemetryIngest/Jobs/TelemetryIngest/ProcessFluentLogs.ts
- Register Fluent API and job processing in OpenTelemetryIngest/Index.ts
- Introduce QueueName.FluentLogs and related queue usage
- Remove legacy FluentIngest service and configuration:
- Delete fluent-ingest docker-compose/dev/base entries and docker-compose.yml service
- Remove fluent-ingest related helm values, KEDA scaledobject, ingress host and schema entries
- Remove FLUENTD_HOST env/values and replace FLUENT_INGEST_HOSTNAME -> FLUENT_LOGS_HOSTNAME (pointing to open-telemetry-ingest)
- Update config.example.env keys (FLUENT_LOGS_CONCURRENCY, DISABLE_TELEMETRY_FOR_FLUENT_LOGS)
- Remove FluentIngestRoute and FLUENT_INGEST_URL/hostname usages from UI config/templates
- Remove VSCode launch debug config for Fluent Ingest
- Remove Fluent ingest E2E status check entry in Tests/Scripts/status-check.sh
- Update docs/architecture diagram and Helm templates to reflect "FluentLogs" / Fluent Bit flow
- Misc:
- Remove FLUENTD_HOST environment injection from docker-compose.base.yml
- Cleanup related values.schema.json and values.yaml entries
This consolidates log ingestion under the OpenTelemetry ingest service and removes the separate FluentIngest service and its configuration.
This commit is contained in:
14
.vscode/launch.json
vendored
14
.vscode/launch.json
vendored
@@ -217,20 +217,6 @@
|
||||
"restart": true,
|
||||
"autoAttachChildProcesses": true
|
||||
},
|
||||
{
|
||||
"address": "127.0.0.1",
|
||||
"localRoot": "${workspaceFolder}/FluentIngest",
|
||||
"name": "Fluent Ingest: Debug with Docker",
|
||||
"port": 9937,
|
||||
"remoteRoot": "/usr/src/app",
|
||||
"request": "attach",
|
||||
"skipFiles": [
|
||||
"<node_internals>/**"
|
||||
],
|
||||
"type": "node",
|
||||
"restart": true,
|
||||
"autoAttachChildProcesses": true
|
||||
},
|
||||
{
|
||||
"address": "127.0.0.1",
|
||||
"localRoot": "${workspaceFolder}/IsolatedVM",
|
||||
|
||||
@@ -14,7 +14,7 @@ export enum QueueName {
|
||||
Workflow = "Workflow",
|
||||
Worker = "Worker",
|
||||
Telemetry = "Telemetry",
|
||||
FluentIngest = "FluentIngest",
|
||||
FluentLogs = "FluentLogs",
|
||||
IncomingRequestIngest = "IncomingRequestIngest",
|
||||
ServerMonitorIngest = "ServerMonitorIngest",
|
||||
ProbeIngest = "ProbeIngest",
|
||||
|
||||
@@ -36,8 +36,6 @@ export const IncomingRequestIngestRoute: Route = new Route(
|
||||
"/incoming-request-ingest",
|
||||
);
|
||||
|
||||
export const FluentIngestRoute: Route = new Route("/fluent-ingest");
|
||||
|
||||
export const RealtimeRoute: Route = new Route("/realtime/socket");
|
||||
|
||||
export const DocsRoute: Route = new Route("/docs");
|
||||
|
||||
@@ -15,7 +15,6 @@ import {
|
||||
StatusPageApiRoute,
|
||||
StatusPageRoute,
|
||||
WorkflowRoute,
|
||||
FluentIngestRoute,
|
||||
IncomingRequestIngestRoute,
|
||||
OpenTelemetryIngestRoute,
|
||||
} from "../ServiceRoute";
|
||||
@@ -82,8 +81,6 @@ export const OPEN_TELEMETRY_INGEST_HOSTNAME: Hostname =
|
||||
export const INCOMING_REQUEST_INGEST_HOSTNAME: Hostname =
|
||||
Hostname.fromString(HOST);
|
||||
|
||||
export const FLUENT_INGEST_HOSTNAME: Hostname = Hostname.fromString(HOST);
|
||||
|
||||
export const HELM_HOSTNAME: Hostname = Hostname.fromString(HOST);
|
||||
|
||||
export const API_DOCS_HOSTNAME: Hostname = Hostname.fromString(HOST);
|
||||
@@ -124,12 +121,6 @@ export const OPEN_TELEMETRY_INGEST_URL: URL = new URL(
|
||||
new Route(OpenTelemetryIngestRoute.toString()),
|
||||
);
|
||||
|
||||
export const FLUENT_INGEST_URL: URL = new URL(
|
||||
HTTP_PROTOCOL,
|
||||
FLUENT_INGEST_HOSTNAME,
|
||||
new Route(FluentIngestRoute.toString()),
|
||||
);
|
||||
|
||||
export const IDENTITY_URL: URL = new URL(
|
||||
HTTP_PROTOCOL,
|
||||
IDENTITY_HOSTNAME,
|
||||
|
||||
@@ -32,7 +32,7 @@ flowchart TB
|
||||
direction TB
|
||||
PROBEINGEST["Probe Ingest"]
|
||||
OTELINGEST["OpenTelemetry Ingest"]
|
||||
FLUENTINGEST["Logs Ingest (Fluentd / Fluent Bit)"]
|
||||
FLUENTLOGS["Logs Ingest (Fluent Bit)"]
|
||||
SERVERMONINGEST["Server Monitor Ingest"]
|
||||
INCOMINGREQINGEST["Incoming Request Ingest"]
|
||||
end
|
||||
@@ -83,13 +83,13 @@ flowchart TB
|
||||
INT <-->|HTTPS/TCP/Ping/DNS/Custom| P2
|
||||
|
||||
OTELCOLL["OTel Collector/Agents"] --> OTELINGEST
|
||||
FLUENT["Fluentd / Fluent Bit"] --> FLUENTINGEST
|
||||
FLUENT["Fluentd / Fluent Bit"] --> FLUENTLOGS
|
||||
SERVERAGENTS["Server Monitor Agents"] --> SERVERMONINGEST
|
||||
|
||||
%% Ingest flow to core processing
|
||||
PROBEINGEST --> REDIS
|
||||
OTELINGEST --> CH
|
||||
FLUENTINGEST --> CH
|
||||
FLUENTLOGS --> CH
|
||||
SERVERMONINGEST --> CH
|
||||
INCOMINGREQINGEST --> CH
|
||||
|
||||
@@ -106,7 +106,7 @@ flowchart TB
|
||||
|
||||
class NGINX edge;
|
||||
class HOME,STATUS,API,WORKER web;
|
||||
class PROBEINGEST,OTELINGEST,FLUENTINGEST,SERVERMONINGEST,INCOMINGREQINGEST ingest;
|
||||
class PROBEINGEST,OTELINGEST,FLUENTLOGS,SERVERMONINGEST,INCOMINGREQINGEST ingest;
|
||||
class P1,P2 probe;
|
||||
class PG,CH,REDIS store;
|
||||
class EXT,INT,OTELCOLL,FLUENT,SERVERAGENTS outside;
|
||||
|
||||
@@ -63,8 +63,6 @@ Usage:
|
||||
value: {{ $.Values.openTelemetryCollectorHost }}
|
||||
- name: LOG_LEVEL
|
||||
value: {{ $.Values.logLevel }}
|
||||
- name: FLUENTD_HOST
|
||||
value: {{ $.Values.fluentdHost }}
|
||||
- name: HTTP_PROTOCOL
|
||||
value: {{ $.Values.httpProtocol }}
|
||||
- name: NODE_ENV
|
||||
|
||||
@@ -49,16 +49,4 @@ spec:
|
||||
port:
|
||||
name: "oneuptime-http"
|
||||
{{- end }}
|
||||
{{- if $.Values.fluentdHost }}
|
||||
- host: {{ $.Values.fluentdHost | quote }}
|
||||
http:
|
||||
paths:
|
||||
- path: /
|
||||
pathType: Prefix
|
||||
backend:
|
||||
service:
|
||||
name: {{ printf "%s-%s" $.Release.Name "nginx" }}
|
||||
port:
|
||||
name: "oneuptime-http"
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
|
||||
@@ -9,13 +9,6 @@ KEDA ScaledObjects for various services
|
||||
{{- include "oneuptime.kedaScaledObject" $openTelemetryIngestKedaArgs }}
|
||||
{{- end }}
|
||||
|
||||
{{/* Fluent Ingest KEDA ScaledObject */}}
|
||||
{{- if and .Values.keda.enabled .Values.fluentIngest.keda.enabled (not .Values.fluentIngest.disableAutoscaler) }}
|
||||
{{- $metricsConfig := dict "enabled" .Values.fluentIngest.keda.enabled "minReplicas" .Values.fluentIngest.keda.minReplicas "maxReplicas" .Values.fluentIngest.keda.maxReplicas "pollingInterval" .Values.fluentIngest.keda.pollingInterval "cooldownPeriod" .Values.fluentIngest.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_fluent_ingest_queue_size" "threshold" .Values.fluentIngest.keda.queueSizeThreshold "port" .Values.fluentIngest.ports.http)) }}
|
||||
{{- $fluentIngestKedaArgs := dict "ServiceName" "fluent-ingest" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.fluentIngest.disableAutoscaler }}
|
||||
{{- include "oneuptime.kedaScaledObject" $fluentIngestKedaArgs }}
|
||||
{{- end }}
|
||||
|
||||
{{/* Incoming Request Ingest KEDA ScaledObject */}}
|
||||
{{- if and .Values.keda.enabled .Values.incomingRequestIngest.keda.enabled (not .Values.incomingRequestIngest.disableAutoscaler) }}
|
||||
{{- $metricsConfig := dict "enabled" .Values.incomingRequestIngest.keda.enabled "minReplicas" .Values.incomingRequestIngest.keda.minReplicas "maxReplicas" .Values.incomingRequestIngest.keda.maxReplicas "pollingInterval" .Values.incomingRequestIngest.keda.pollingInterval "cooldownPeriod" .Values.incomingRequestIngest.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_incoming_request_ingest_queue_size" "threshold" .Values.incomingRequestIngest.keda.queueSizeThreshold "port" .Values.incomingRequestIngest.ports.http)) }}
|
||||
|
||||
@@ -83,9 +83,6 @@
|
||||
"openTelemetryCollectorHost": {
|
||||
"type": ["string", "null"]
|
||||
},
|
||||
"fluentdHost": {
|
||||
"type": ["string", "null"]
|
||||
},
|
||||
"deployment": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
@@ -1801,69 +1798,6 @@
|
||||
},
|
||||
"additionalProperties": false
|
||||
},
|
||||
"fluentIngest": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"replicaCount": {
|
||||
"type": "integer"
|
||||
},
|
||||
"disableTelemetryCollection": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"disableAutoscaler": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"concurrency": {
|
||||
"type": "integer"
|
||||
},
|
||||
"ports": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"http": {
|
||||
"type": "integer"
|
||||
}
|
||||
},
|
||||
"additionalProperties": false
|
||||
},
|
||||
"resources": {
|
||||
"type": ["object", "null"]
|
||||
},
|
||||
"nodeSelector": {
|
||||
"type": "object"
|
||||
},
|
||||
"podSecurityContext": {
|
||||
"type": "object"
|
||||
},
|
||||
"containerSecurityContext": {
|
||||
"type": "object"
|
||||
},
|
||||
"keda": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"enabled": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"minReplicas": {
|
||||
"type": "integer"
|
||||
},
|
||||
"maxReplicas": {
|
||||
"type": "integer"
|
||||
},
|
||||
"queueSizeThreshold": {
|
||||
"type": "integer"
|
||||
},
|
||||
"pollingInterval": {
|
||||
"type": "integer"
|
||||
},
|
||||
"cooldownPeriod": {
|
||||
"type": "integer"
|
||||
}
|
||||
},
|
||||
"additionalProperties": false
|
||||
}
|
||||
},
|
||||
"additionalProperties": false
|
||||
},
|
||||
"incomingRequestIngest": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
|
||||
@@ -45,7 +45,6 @@ externalSecrets:
|
||||
|
||||
# (Optional): You usually do not need to set this if you're self hosting.
|
||||
openTelemetryCollectorHost:
|
||||
fluentdHost:
|
||||
|
||||
deployment:
|
||||
# Default replica count for all deployments
|
||||
@@ -722,29 +721,6 @@ openTelemetryIngest:
|
||||
# Cooldown period after scaling (in seconds)
|
||||
cooldownPeriod: 300
|
||||
|
||||
fluentIngest:
|
||||
replicaCount: 1
|
||||
disableTelemetryCollection: false
|
||||
disableAutoscaler: false
|
||||
concurrency: 100
|
||||
ports:
|
||||
http: 3401
|
||||
resources:
|
||||
nodeSelector: {}
|
||||
podSecurityContext: {}
|
||||
containerSecurityContext: {}
|
||||
# KEDA autoscaling configuration based on queue metrics
|
||||
keda:
|
||||
enabled: false
|
||||
minReplicas: 1
|
||||
maxReplicas: 100
|
||||
# Scale up when queue size exceeds this threshold
|
||||
queueSizeThreshold: 100
|
||||
# Polling interval for metrics (in seconds)
|
||||
pollingInterval: 30
|
||||
# Cooldown period after scaling (in seconds)
|
||||
cooldownPeriod: 300
|
||||
|
||||
incomingRequestIngest:
|
||||
replicaCount: 1
|
||||
disableTelemetryCollection: false
|
||||
|
||||
145
OpenTelemetryIngest/API/Fluent.ts
Normal file
145
OpenTelemetryIngest/API/Fluent.ts
Normal file
@@ -0,0 +1,145 @@
|
||||
import TelemetryIngest, {
|
||||
TelemetryRequest,
|
||||
} from "Common/Server/Middleware/TelemetryIngest";
|
||||
import ProductType from "Common/Types/MeteredPlan/ProductType";
|
||||
import Express, {
|
||||
ExpressRequest,
|
||||
ExpressResponse,
|
||||
ExpressRouter,
|
||||
NextFunction,
|
||||
} from "Common/Server/Utils/Express";
|
||||
import Response from "Common/Server/Utils/Response";
|
||||
import FluentLogsQueueService from "../Services/Queue/FluentLogsQueueService";
|
||||
import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization";
|
||||
import BadRequestException from "Common/Types/Exception/BadRequestException";
|
||||
|
||||
export class FluentLogsRequestMiddleware {
|
||||
public static async getProductType(
|
||||
req: ExpressRequest,
|
||||
_res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> {
|
||||
try {
|
||||
(req as TelemetryRequest).productType = ProductType.Logs;
|
||||
return next();
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const router: ExpressRouter = Express.getRouter();
|
||||
|
||||
router.post(
|
||||
"/fluentd/v1/logs",
|
||||
FluentLogsRequestMiddleware.getProductType,
|
||||
TelemetryIngest.isAuthorizedServiceMiddleware,
|
||||
async (
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
if (!(req as TelemetryRequest).projectId) {
|
||||
throw new BadRequestException(
|
||||
"Invalid request - projectId not found in request.",
|
||||
);
|
||||
}
|
||||
|
||||
req.body = req.body?.toJSON ? req.body.toJSON() : req.body;
|
||||
|
||||
Response.sendEmptySuccessResponse(req, res);
|
||||
|
||||
await FluentLogsQueueService.addFluentLogsJob(
|
||||
req as TelemetryRequest,
|
||||
);
|
||||
|
||||
return;
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
router.get(
|
||||
"/fluent/queue/stats",
|
||||
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
|
||||
async (
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
const stats: {
|
||||
waiting: number;
|
||||
active: number;
|
||||
completed: number;
|
||||
failed: number;
|
||||
delayed: number;
|
||||
total: number;
|
||||
} = await FluentLogsQueueService.getQueueStats();
|
||||
return Response.sendJsonObjectResponse(req, res, stats);
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
router.get(
|
||||
"/fluent/queue/size",
|
||||
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
|
||||
async (
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
const size: number = await FluentLogsQueueService.getQueueSize();
|
||||
return Response.sendJsonObjectResponse(req, res, { size });
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
router.get(
|
||||
"/fluent/queue/failed",
|
||||
ClusterKeyAuthorization.isAuthorizedServiceMiddleware,
|
||||
async (
|
||||
req: ExpressRequest,
|
||||
res: ExpressResponse,
|
||||
next: NextFunction,
|
||||
): Promise<void> => {
|
||||
try {
|
||||
const start: number = parseInt(req.query["start"] as string) || 0;
|
||||
const end: number = parseInt(req.query["end"] as string) || 100;
|
||||
|
||||
const failedJobs: Array<{
|
||||
id: string;
|
||||
name: string;
|
||||
data: any;
|
||||
failedReason: string;
|
||||
stackTrace?: string;
|
||||
processedOn: Date | null;
|
||||
finishedOn: Date | null;
|
||||
attemptsMade: number;
|
||||
}> = await FluentLogsQueueService.getFailedJobs({
|
||||
start,
|
||||
end,
|
||||
});
|
||||
|
||||
return Response.sendJsonObjectResponse(req, res, {
|
||||
failedJobs,
|
||||
pagination: {
|
||||
start,
|
||||
end,
|
||||
count: failedJobs.length,
|
||||
},
|
||||
});
|
||||
} catch (err) {
|
||||
return next(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
export default router;
|
||||
@@ -1,6 +1,7 @@
|
||||
import OTelIngestAPI from "./API/OTelIngest";
|
||||
import MetricsAPI from "./API/Metrics";
|
||||
import SyslogAPI from "./API/Syslog";
|
||||
import FluentAPI from "./API/Fluent";
|
||||
import { PromiseVoidFunction } from "Common/Types/FunctionTypes";
|
||||
import { ClickhouseAppInstance } from "Common/Server/Infrastructure/ClickhouseDatabase";
|
||||
import PostgresAppInstance from "Common/Server/Infrastructure/PostgresDatabase";
|
||||
@@ -12,16 +13,20 @@ import Realtime from "Common/Server/Utils/Realtime";
|
||||
import App from "Common/Server/Utils/StartServer";
|
||||
import Telemetry from "Common/Server/Utils/Telemetry";
|
||||
import "./Jobs/TelemetryIngest/ProcessTelemetry";
|
||||
import "./Jobs/TelemetryIngest/ProcessFluentLogs";
|
||||
import { OPEN_TELEMETRY_INGEST_CONCURRENCY } from "./Config";
|
||||
import type { StatusAPIOptions } from "Common/Server/API/StatusAPI";
|
||||
import "ejs";
|
||||
|
||||
const app: ExpressApplication = Express.getExpressApp();
|
||||
|
||||
const APP_NAME: string = "open-telemetry-ingest";
|
||||
const ROUTE_PREFIXES: Array<string> = [`/${APP_NAME}`, "/"];
|
||||
|
||||
app.use([`/${APP_NAME}`, "/"], OTelIngestAPI);
|
||||
app.use([`/${APP_NAME}`, "/"], MetricsAPI);
|
||||
app.use([`/${APP_NAME}`, "/"], SyslogAPI);
|
||||
app.use(ROUTE_PREFIXES, OTelIngestAPI);
|
||||
app.use(ROUTE_PREFIXES, MetricsAPI);
|
||||
app.use(ROUTE_PREFIXES, SyslogAPI);
|
||||
app.use(ROUTE_PREFIXES, FluentAPI);
|
||||
|
||||
const init: PromiseVoidFunction = async (): Promise<void> => {
|
||||
try {
|
||||
@@ -44,12 +49,14 @@ const init: PromiseVoidFunction = async (): Promise<void> => {
|
||||
);
|
||||
|
||||
// init the app
|
||||
const statusOptions: StatusAPIOptions = {
|
||||
liveCheck: statusCheck,
|
||||
readyCheck: statusCheck,
|
||||
};
|
||||
|
||||
await App.init({
|
||||
appName: APP_NAME,
|
||||
statusOptions: {
|
||||
liveCheck: statusCheck,
|
||||
readyCheck: statusCheck,
|
||||
},
|
||||
statusOptions: statusOptions,
|
||||
});
|
||||
|
||||
// connect to the database.
|
||||
@@ -63,9 +70,9 @@ const init: PromiseVoidFunction = async (): Promise<void> => {
|
||||
);
|
||||
|
||||
await Realtime.init();
|
||||
|
||||
// add default routes
|
||||
await App.addDefaultRoutes();
|
||||
|
||||
} catch (err) {
|
||||
logger.error("App Init Failed:");
|
||||
logger.error(err);
|
||||
|
||||
139
OpenTelemetryIngest/Jobs/TelemetryIngest/ProcessFluentLogs.ts
Normal file
139
OpenTelemetryIngest/Jobs/TelemetryIngest/ProcessFluentLogs.ts
Normal file
@@ -0,0 +1,139 @@
|
||||
import { FluentLogsJobData } from "../../Services/Queue/FluentLogsQueueService";
|
||||
import logger from "Common/Server/Utils/Logger";
|
||||
import { QueueJob, QueueName } from "Common/Server/Infrastructure/Queue";
|
||||
import QueueWorker from "Common/Server/Infrastructure/QueueWorker";
|
||||
import ObjectID from "Common/Types/ObjectID";
|
||||
import OneUptimeDate from "Common/Types/Date";
|
||||
import { JSONObject } from "Common/Types/JSON";
|
||||
import LogService from "Common/Server/Services/LogService";
|
||||
import LogSeverity from "Common/Types/Log/LogSeverity";
|
||||
import OTelIngestService from "Common/Server/Services/OpenTelemetryIngestService";
|
||||
import {
|
||||
OPEN_TELEMETRY_INGEST_CONCURRENCY,
|
||||
OPEN_TELEMETRY_INGEST_LOG_FLUSH_BATCH_SIZE,
|
||||
} from "../../Config";
|
||||
|
||||
interface FluentLogsProcessData {
|
||||
projectId: ObjectID;
|
||||
requestBody: JSONObject;
|
||||
requestHeaders: JSONObject;
|
||||
}
|
||||
|
||||
const LOG_FLUSH_BATCH_SIZE: number =
|
||||
OPEN_TELEMETRY_INGEST_LOG_FLUSH_BATCH_SIZE || 500;
|
||||
|
||||
QueueWorker.getWorker(
|
||||
QueueName.FluentLogs,
|
||||
async (job: QueueJob): Promise<void> => {
|
||||
logger.debug(`Processing fluent logs ingestion job: ${job.name}`);
|
||||
|
||||
try {
|
||||
const jobData: FluentLogsJobData = job.data as FluentLogsJobData;
|
||||
|
||||
await processFluentLogsFromQueue({
|
||||
projectId: new ObjectID(jobData.projectId),
|
||||
requestBody: jobData.requestBody,
|
||||
requestHeaders: jobData.requestHeaders,
|
||||
});
|
||||
|
||||
logger.debug(`Successfully processed fluent logs ingestion job: ${job.name}`);
|
||||
} catch (error) {
|
||||
logger.error(`Error processing fluent logs ingestion job:`);
|
||||
logger.error(error);
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
{ concurrency: OPEN_TELEMETRY_INGEST_CONCURRENCY },
|
||||
);
|
||||
|
||||
async function processFluentLogsFromQueue(
|
||||
data: FluentLogsProcessData,
|
||||
): Promise<void> {
|
||||
const dbLogs: Array<JSONObject> = [];
|
||||
|
||||
let logItems: Array<JSONObject | string> | JSONObject = data.requestBody as
|
||||
| Array<JSONObject | string>
|
||||
| JSONObject;
|
||||
|
||||
let oneuptimeServiceName: string | string[] | undefined = data.requestHeaders[
|
||||
"x-oneuptime-service-name"
|
||||
] as string | string[] | undefined;
|
||||
|
||||
if (!oneuptimeServiceName) {
|
||||
oneuptimeServiceName = "Unknown Service";
|
||||
}
|
||||
|
||||
const telemetryService: {
|
||||
serviceId: ObjectID;
|
||||
dataRententionInDays: number;
|
||||
} = await OTelIngestService.telemetryServiceFromName({
|
||||
serviceName: oneuptimeServiceName as string,
|
||||
projectId: data.projectId,
|
||||
});
|
||||
|
||||
if (
|
||||
logItems &&
|
||||
typeof logItems === "object" &&
|
||||
(logItems as JSONObject)["json"]
|
||||
) {
|
||||
logItems = (logItems as JSONObject)["json"] as
|
||||
| Array<JSONObject | string>
|
||||
| JSONObject;
|
||||
}
|
||||
|
||||
if (!Array.isArray(logItems)) {
|
||||
logItems = [logItems];
|
||||
}
|
||||
|
||||
for (const logItem of logItems) {
|
||||
const logBody: string =
|
||||
typeof logItem === "string" ? logItem : JSON.stringify(logItem);
|
||||
|
||||
const ingestionDate: Date = OneUptimeDate.getCurrentDate();
|
||||
const ingestionIso: string = OneUptimeDate.toString(ingestionDate);
|
||||
const timeUnixNano: number = OneUptimeDate.getCurrentDateAsUnixNano();
|
||||
|
||||
const logRow: JSONObject = {
|
||||
_id: ObjectID.generate().toString(),
|
||||
createdAt: ingestionIso,
|
||||
updatedAt: ingestionIso,
|
||||
projectId: data.projectId.toString(),
|
||||
serviceId: telemetryService.serviceId.toString(),
|
||||
time: ingestionIso,
|
||||
timeUnixNano: Math.trunc(timeUnixNano).toString(),
|
||||
severityNumber: 0,
|
||||
severityText: LogSeverity.Unspecified,
|
||||
attributes: {},
|
||||
attributeKeys: [],
|
||||
traceId: "",
|
||||
spanId: "",
|
||||
body: logBody,
|
||||
};
|
||||
|
||||
dbLogs.push(logRow);
|
||||
|
||||
if (dbLogs.length >= LOG_FLUSH_BATCH_SIZE) {
|
||||
await flushLogBuffer(dbLogs);
|
||||
}
|
||||
}
|
||||
|
||||
await flushLogBuffer(dbLogs, true);
|
||||
}
|
||||
|
||||
logger.debug("Fluent logs ingest worker initialized");
|
||||
|
||||
async function flushLogBuffer(
|
||||
logs: Array<JSONObject>,
|
||||
force: boolean = false,
|
||||
): Promise<void> {
|
||||
while (logs.length >= LOG_FLUSH_BATCH_SIZE || (force && logs.length > 0)) {
|
||||
const batchSize: number = Math.min(logs.length, LOG_FLUSH_BATCH_SIZE);
|
||||
const batch: Array<JSONObject> = logs.splice(0, batchSize);
|
||||
|
||||
if (batch.length === 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
await LogService.insertJsonRows(batch);
|
||||
}
|
||||
}
|
||||
73
OpenTelemetryIngest/Services/Queue/FluentLogsQueueService.ts
Normal file
73
OpenTelemetryIngest/Services/Queue/FluentLogsQueueService.ts
Normal file
@@ -0,0 +1,73 @@
|
||||
import { TelemetryRequest } from "Common/Server/Middleware/TelemetryIngest";
|
||||
import Queue, { QueueName } from "Common/Server/Infrastructure/Queue";
|
||||
import { JSONObject } from "Common/Types/JSON";
|
||||
import OneUptimeDate from "Common/Types/Date";
|
||||
import logger from "Common/Server/Utils/Logger";
|
||||
|
||||
export interface FluentLogsJobData {
|
||||
projectId: string;
|
||||
requestBody: JSONObject;
|
||||
requestHeaders: Record<string, string>;
|
||||
ingestionTimestamp: Date;
|
||||
}
|
||||
|
||||
export default class FluentLogsQueueService {
|
||||
public static async addFluentLogsJob(req: TelemetryRequest): Promise<void> {
|
||||
try {
|
||||
const jobData: FluentLogsJobData = {
|
||||
projectId: req.projectId.toString(),
|
||||
requestBody: req.body,
|
||||
requestHeaders: req.headers as Record<string, string>,
|
||||
ingestionTimestamp: OneUptimeDate.getCurrentDate(),
|
||||
};
|
||||
|
||||
const jobId: string = `fluent-logs-${req.projectId?.toString()}-${OneUptimeDate.getCurrentDateAsUnixNano()}`;
|
||||
|
||||
await Queue.addJob(
|
||||
QueueName.FluentLogs,
|
||||
jobId,
|
||||
"ProcessFluentLogs",
|
||||
jobData as unknown as JSONObject,
|
||||
);
|
||||
|
||||
logger.debug(`Added fluent logs ingestion job: ${jobId}`);
|
||||
} catch (error) {
|
||||
logger.error(`Error adding fluent logs ingestion job:`);
|
||||
logger.error(error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
public static async getQueueSize(): Promise<number> {
|
||||
return Queue.getQueueSize(QueueName.FluentLogs);
|
||||
}
|
||||
|
||||
public static async getQueueStats(): Promise<{
|
||||
waiting: number;
|
||||
active: number;
|
||||
completed: number;
|
||||
failed: number;
|
||||
delayed: number;
|
||||
total: number;
|
||||
}> {
|
||||
return Queue.getQueueStats(QueueName.FluentLogs);
|
||||
}
|
||||
|
||||
public static getFailedJobs(options?: {
|
||||
start?: number;
|
||||
end?: number;
|
||||
}): Promise<
|
||||
Array<{
|
||||
id: string;
|
||||
name: string;
|
||||
data: JSONObject;
|
||||
failedReason: string;
|
||||
stackTrace?: string;
|
||||
processedOn: Date | null;
|
||||
finishedOn: Date | null;
|
||||
attemptsMade: number;
|
||||
}>
|
||||
> {
|
||||
return Queue.getFailedJobs(QueueName.FluentLogs, options);
|
||||
}
|
||||
}
|
||||
@@ -29,8 +29,6 @@ bash $scriptDir/endpoint-status.sh "Dashboard (Status Check)" $HOST_TO_CHECK/das
|
||||
|
||||
bash $scriptDir/endpoint-status.sh "Status Page" $HOST_TO_CHECK/status-page
|
||||
|
||||
bash $scriptDir/endpoint-status.sh "Status Page" $HOST_TO_CHECK/fluent-ingest/status/ready
|
||||
|
||||
bash $scriptDir/endpoint-status.sh "Status Page" $HOST_TO_CHECK/incoming-request-ingest/status/ready
|
||||
|
||||
bash $scriptDir/endpoint-status.sh "Status Page (Ready Check)" $HOST_TO_CHECK/status-page/status/ready
|
||||
|
||||
@@ -59,8 +59,6 @@ COMPOSE_PROJECT_NAME=oneuptime
|
||||
# OTEL HOST - if you like the collector to be hosted on a different server then change this to the IP of the server.
|
||||
OTEL_COLLECTOR_HOST=
|
||||
|
||||
# FLUENTD_HOST - if you like the fluentd to be hosted on a different server then change this to the IP of the server.
|
||||
FLUENTD_HOST=
|
||||
|
||||
# Clickhouse Settings
|
||||
CLICKHOUSE_USER=default
|
||||
@@ -94,7 +92,7 @@ REDIS_TLS_SENTINEL_MODE=false
|
||||
|
||||
# Hostnames. Usually does not need to change.
|
||||
PROBE_INGEST_HOSTNAME=probe-ingest:3400
|
||||
FLUENT_INGEST_HOSTNAME=fluent-ingest:3401
|
||||
FLUENT_LOGS_HOSTNAME=open-telemetry-ingest:3403
|
||||
INCOMING_REQUEST_INGEST_HOSTNAME=incoming-request-ingest:3402
|
||||
OPEN_TELEMETRY_INGEST_HOSTNAME=otel-telemetry-ingest:3403
|
||||
|
||||
@@ -248,8 +246,8 @@ WORKFLOW_TIMEOUT_IN_MS=5000
|
||||
# Max number of telemetry jobs processed concurrently by OpenTelemetry Ingest worker
|
||||
OPEN_TELEMETRY_INGEST_CONCURRENCY=100
|
||||
|
||||
# Max number of jobs processed concurrently by Fluent Ingest worker
|
||||
FLUENT_INGEST_CONCURRENCY=100
|
||||
# Max number of jobs processed concurrently by Fluent Logs worker
|
||||
FLUENT_LOGS_CONCURRENCY=100
|
||||
|
||||
# Max number of jobs processed concurrently by Incoming Request Ingest worker
|
||||
INCOMING_REQUEST_INGEST_CONCURRENCY=100
|
||||
@@ -316,7 +314,7 @@ DISABLE_TELEMETRY_FOR_ACCOUNTS=true
|
||||
DISABLE_TELEMETRY_FOR_APP=true
|
||||
DISABLE_TELEMETRY_FOR_PROBE_INGEST=true
|
||||
DISABLE_TELEMETRY_FOR_OPEN_TELEMETRY_INGEST=true
|
||||
DISABLE_TELEMETRY_FOR_FLUENT_INGEST=true
|
||||
DISABLE_TELEMETRY_FOR_FLUENT_LOGS=true
|
||||
DISABLE_TELEMETRY_FOR_INCOMING_REQUEST_INGEST=true
|
||||
DISABLE_TELEMETRY_FOR_TEST_SERVER=true
|
||||
DISABLE_TELEMETRY_FOR_STATUS_PAGE=true
|
||||
|
||||
@@ -7,7 +7,6 @@ x-common-variables: &common-variables
|
||||
|
||||
OTEL_COLLECTOR_HOST: ${OTEL_COLLECTOR_HOST}
|
||||
|
||||
FLUENTD_HOST: ${FLUENTD_HOST}
|
||||
|
||||
STATUS_PAGE_CNAME_RECORD: ${STATUS_PAGE_CNAME_RECORD}
|
||||
|
||||
@@ -412,15 +411,6 @@ services:
|
||||
ports:
|
||||
- 13133:13133 # Otel Collector Health Check Endpoint at /heath/status
|
||||
|
||||
fluentd:
|
||||
networks:
|
||||
- oneuptime
|
||||
restart: always
|
||||
logging:
|
||||
driver: "local"
|
||||
options:
|
||||
max-size: "1000m"
|
||||
|
||||
fluent-bit:
|
||||
networks:
|
||||
- oneuptime
|
||||
|
||||
@@ -394,23 +394,6 @@ services:
|
||||
context: .
|
||||
dockerfile: ./IncomingRequestIngest/Dockerfile
|
||||
|
||||
fluent-ingest:
|
||||
volumes:
|
||||
- ./FluentIngest:/usr/src/app:cached
|
||||
# Use node modules of the container and not host system.
|
||||
# https://stackoverflow.com/questions/29181032/add-a-volume-to-docker-but-exclude-a-sub-folder
|
||||
- /usr/src/app/node_modules/
|
||||
- ./Common:/usr/src/Common:cached
|
||||
- /usr/src/Common/node_modules/
|
||||
ports:
|
||||
- '9937:9229' # Debugging port.
|
||||
extends:
|
||||
file: ./docker-compose.base.yml
|
||||
service: fluent-ingest
|
||||
build:
|
||||
network: host
|
||||
context: .
|
||||
dockerfile: ./FluentIngest/Dockerfile
|
||||
|
||||
|
||||
# Fluentd. Required only for development. In production its the responsibility of the customer to run fluentd and pipe logs to OneUptime.
|
||||
|
||||
@@ -126,12 +126,6 @@ services:
|
||||
file: ./docker-compose.base.yml
|
||||
service: incoming-request-ingest
|
||||
|
||||
fluent-ingest:
|
||||
image: oneuptime/fluent-ingest:${APP_TAG}
|
||||
extends:
|
||||
file: ./docker-compose.base.yml
|
||||
service: fluent-ingest
|
||||
|
||||
isolated-vm:
|
||||
image: oneuptime/isolated-vm:${APP_TAG}
|
||||
extends:
|
||||
|
||||
Reference in New Issue
Block a user