diff --git a/.vscode/launch.json b/.vscode/launch.json index 6fae508793..0ee9afa4af 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -217,20 +217,6 @@ "restart": true, "autoAttachChildProcesses": true }, - { - "address": "127.0.0.1", - "localRoot": "${workspaceFolder}/FluentIngest", - "name": "Fluent Ingest: Debug with Docker", - "port": 9937, - "remoteRoot": "/usr/src/app", - "request": "attach", - "skipFiles": [ - "/**" - ], - "type": "node", - "restart": true, - "autoAttachChildProcesses": true - }, { "address": "127.0.0.1", "localRoot": "${workspaceFolder}/IsolatedVM", diff --git a/Common/Server/Infrastructure/Queue.ts b/Common/Server/Infrastructure/Queue.ts index 8775ac9d00..236ffc19e1 100644 --- a/Common/Server/Infrastructure/Queue.ts +++ b/Common/Server/Infrastructure/Queue.ts @@ -14,7 +14,7 @@ export enum QueueName { Workflow = "Workflow", Worker = "Worker", Telemetry = "Telemetry", - FluentIngest = "FluentIngest", + FluentLogs = "FluentLogs", IncomingRequestIngest = "IncomingRequestIngest", ServerMonitorIngest = "ServerMonitorIngest", ProbeIngest = "ProbeIngest", diff --git a/Common/ServiceRoute.ts b/Common/ServiceRoute.ts index 7a1d803d1b..6c497b9397 100644 --- a/Common/ServiceRoute.ts +++ b/Common/ServiceRoute.ts @@ -36,8 +36,6 @@ export const IncomingRequestIngestRoute: Route = new Route( "/incoming-request-ingest", ); -export const FluentIngestRoute: Route = new Route("/fluent-ingest"); - export const RealtimeRoute: Route = new Route("/realtime/socket"); export const DocsRoute: Route = new Route("/docs"); diff --git a/Common/UI/Config.ts b/Common/UI/Config.ts index d6ab074ff5..8553163752 100644 --- a/Common/UI/Config.ts +++ b/Common/UI/Config.ts @@ -15,7 +15,6 @@ import { StatusPageApiRoute, StatusPageRoute, WorkflowRoute, - FluentIngestRoute, IncomingRequestIngestRoute, OpenTelemetryIngestRoute, } from "../ServiceRoute"; @@ -82,8 +81,6 @@ export const OPEN_TELEMETRY_INGEST_HOSTNAME: Hostname = export const INCOMING_REQUEST_INGEST_HOSTNAME: Hostname = Hostname.fromString(HOST); -export const FLUENT_INGEST_HOSTNAME: Hostname = Hostname.fromString(HOST); - export const HELM_HOSTNAME: Hostname = Hostname.fromString(HOST); export const API_DOCS_HOSTNAME: Hostname = Hostname.fromString(HOST); @@ -124,12 +121,6 @@ export const OPEN_TELEMETRY_INGEST_URL: URL = new URL( new Route(OpenTelemetryIngestRoute.toString()), ); -export const FLUENT_INGEST_URL: URL = new URL( - HTTP_PROTOCOL, - FLUENT_INGEST_HOSTNAME, - new Route(FluentIngestRoute.toString()), -); - export const IDENTITY_URL: URL = new URL( HTTP_PROTOCOL, IDENTITY_HOSTNAME, diff --git a/Docs/Content/self-hosted/architecture.md b/Docs/Content/self-hosted/architecture.md index 1381a16507..0cc76435b6 100644 --- a/Docs/Content/self-hosted/architecture.md +++ b/Docs/Content/self-hosted/architecture.md @@ -32,7 +32,7 @@ flowchart TB direction TB PROBEINGEST["Probe Ingest"] OTELINGEST["OpenTelemetry Ingest"] - FLUENTINGEST["Logs Ingest (Fluentd / Fluent Bit)"] + FLUENTLOGS["Logs Ingest (Fluent Bit)"] SERVERMONINGEST["Server Monitor Ingest"] INCOMINGREQINGEST["Incoming Request Ingest"] end @@ -83,13 +83,13 @@ flowchart TB INT <-->|HTTPS/TCP/Ping/DNS/Custom| P2 OTELCOLL["OTel Collector/Agents"] --> OTELINGEST - FLUENT["Fluentd / Fluent Bit"] --> FLUENTINGEST + FLUENT["Fluentd / Fluent Bit"] --> FLUENTLOGS SERVERAGENTS["Server Monitor Agents"] --> SERVERMONINGEST %% Ingest flow to core processing PROBEINGEST --> REDIS OTELINGEST --> CH - FLUENTINGEST --> CH + FLUENTLOGS --> CH SERVERMONINGEST --> CH INCOMINGREQINGEST --> CH @@ -106,7 +106,7 @@ flowchart TB class NGINX edge; class HOME,STATUS,API,WORKER web; - class PROBEINGEST,OTELINGEST,FLUENTINGEST,SERVERMONINGEST,INCOMINGREQINGEST ingest; + class PROBEINGEST,OTELINGEST,FLUENTLOGS,SERVERMONINGEST,INCOMINGREQINGEST ingest; class P1,P2 probe; class PG,CH,REDIS store; class EXT,INT,OTELCOLL,FLUENT,SERVERAGENTS outside; diff --git a/HelmChart/Public/oneuptime/templates/_helpers.tpl b/HelmChart/Public/oneuptime/templates/_helpers.tpl index 452e0ccfc7..bd20b804e7 100644 --- a/HelmChart/Public/oneuptime/templates/_helpers.tpl +++ b/HelmChart/Public/oneuptime/templates/_helpers.tpl @@ -63,8 +63,6 @@ Usage: value: {{ $.Values.openTelemetryCollectorHost }} - name: LOG_LEVEL value: {{ $.Values.logLevel }} -- name: FLUENTD_HOST - value: {{ $.Values.fluentdHost }} - name: HTTP_PROTOCOL value: {{ $.Values.httpProtocol }} - name: NODE_ENV diff --git a/HelmChart/Public/oneuptime/templates/ingress.yaml b/HelmChart/Public/oneuptime/templates/ingress.yaml index 6d653a62fd..5cbcaf3897 100644 --- a/HelmChart/Public/oneuptime/templates/ingress.yaml +++ b/HelmChart/Public/oneuptime/templates/ingress.yaml @@ -49,16 +49,4 @@ spec: port: name: "oneuptime-http" {{- end }} - {{- if $.Values.fluentdHost }} - - host: {{ $.Values.fluentdHost | quote }} - http: - paths: - - path: / - pathType: Prefix - backend: - service: - name: {{ printf "%s-%s" $.Release.Name "nginx" }} - port: - name: "oneuptime-http" - {{- end }} {{- end }} diff --git a/HelmChart/Public/oneuptime/templates/keda-scaledobjects.yaml b/HelmChart/Public/oneuptime/templates/keda-scaledobjects.yaml index 22ac92d2e9..173d42c461 100644 --- a/HelmChart/Public/oneuptime/templates/keda-scaledobjects.yaml +++ b/HelmChart/Public/oneuptime/templates/keda-scaledobjects.yaml @@ -9,13 +9,6 @@ KEDA ScaledObjects for various services {{- include "oneuptime.kedaScaledObject" $openTelemetryIngestKedaArgs }} {{- end }} -{{/* Fluent Ingest KEDA ScaledObject */}} -{{- if and .Values.keda.enabled .Values.fluentIngest.keda.enabled (not .Values.fluentIngest.disableAutoscaler) }} -{{- $metricsConfig := dict "enabled" .Values.fluentIngest.keda.enabled "minReplicas" .Values.fluentIngest.keda.minReplicas "maxReplicas" .Values.fluentIngest.keda.maxReplicas "pollingInterval" .Values.fluentIngest.keda.pollingInterval "cooldownPeriod" .Values.fluentIngest.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_fluent_ingest_queue_size" "threshold" .Values.fluentIngest.keda.queueSizeThreshold "port" .Values.fluentIngest.ports.http)) }} -{{- $fluentIngestKedaArgs := dict "ServiceName" "fluent-ingest" "Release" .Release "Values" .Values "MetricsConfig" $metricsConfig "DisableAutoscaler" .Values.fluentIngest.disableAutoscaler }} -{{- include "oneuptime.kedaScaledObject" $fluentIngestKedaArgs }} -{{- end }} - {{/* Incoming Request Ingest KEDA ScaledObject */}} {{- if and .Values.keda.enabled .Values.incomingRequestIngest.keda.enabled (not .Values.incomingRequestIngest.disableAutoscaler) }} {{- $metricsConfig := dict "enabled" .Values.incomingRequestIngest.keda.enabled "minReplicas" .Values.incomingRequestIngest.keda.minReplicas "maxReplicas" .Values.incomingRequestIngest.keda.maxReplicas "pollingInterval" .Values.incomingRequestIngest.keda.pollingInterval "cooldownPeriod" .Values.incomingRequestIngest.keda.cooldownPeriod "triggers" (list (dict "query" "oneuptime_incoming_request_ingest_queue_size" "threshold" .Values.incomingRequestIngest.keda.queueSizeThreshold "port" .Values.incomingRequestIngest.ports.http)) }} diff --git a/HelmChart/Public/oneuptime/values.schema.json b/HelmChart/Public/oneuptime/values.schema.json index 0c9315868d..a33a025bd9 100644 --- a/HelmChart/Public/oneuptime/values.schema.json +++ b/HelmChart/Public/oneuptime/values.schema.json @@ -83,9 +83,6 @@ "openTelemetryCollectorHost": { "type": ["string", "null"] }, - "fluentdHost": { - "type": ["string", "null"] - }, "deployment": { "type": "object", "properties": { @@ -1801,69 +1798,6 @@ }, "additionalProperties": false }, - "fluentIngest": { - "type": "object", - "properties": { - "replicaCount": { - "type": "integer" - }, - "disableTelemetryCollection": { - "type": "boolean" - }, - "disableAutoscaler": { - "type": "boolean" - }, - "concurrency": { - "type": "integer" - }, - "ports": { - "type": "object", - "properties": { - "http": { - "type": "integer" - } - }, - "additionalProperties": false - }, - "resources": { - "type": ["object", "null"] - }, - "nodeSelector": { - "type": "object" - }, - "podSecurityContext": { - "type": "object" - }, - "containerSecurityContext": { - "type": "object" - }, - "keda": { - "type": "object", - "properties": { - "enabled": { - "type": "boolean" - }, - "minReplicas": { - "type": "integer" - }, - "maxReplicas": { - "type": "integer" - }, - "queueSizeThreshold": { - "type": "integer" - }, - "pollingInterval": { - "type": "integer" - }, - "cooldownPeriod": { - "type": "integer" - } - }, - "additionalProperties": false - } - }, - "additionalProperties": false - }, "incomingRequestIngest": { "type": "object", "properties": { diff --git a/HelmChart/Public/oneuptime/values.yaml b/HelmChart/Public/oneuptime/values.yaml index e0188ca420..62f145f469 100644 --- a/HelmChart/Public/oneuptime/values.yaml +++ b/HelmChart/Public/oneuptime/values.yaml @@ -45,7 +45,6 @@ externalSecrets: # (Optional): You usually do not need to set this if you're self hosting. openTelemetryCollectorHost: -fluentdHost: deployment: # Default replica count for all deployments @@ -722,29 +721,6 @@ openTelemetryIngest: # Cooldown period after scaling (in seconds) cooldownPeriod: 300 -fluentIngest: - replicaCount: 1 - disableTelemetryCollection: false - disableAutoscaler: false - concurrency: 100 - ports: - http: 3401 - resources: - nodeSelector: {} - podSecurityContext: {} - containerSecurityContext: {} - # KEDA autoscaling configuration based on queue metrics - keda: - enabled: false - minReplicas: 1 - maxReplicas: 100 - # Scale up when queue size exceeds this threshold - queueSizeThreshold: 100 - # Polling interval for metrics (in seconds) - pollingInterval: 30 - # Cooldown period after scaling (in seconds) - cooldownPeriod: 300 - incomingRequestIngest: replicaCount: 1 disableTelemetryCollection: false diff --git a/OpenTelemetryIngest/API/Fluent.ts b/OpenTelemetryIngest/API/Fluent.ts new file mode 100644 index 0000000000..1120129b8c --- /dev/null +++ b/OpenTelemetryIngest/API/Fluent.ts @@ -0,0 +1,145 @@ +import TelemetryIngest, { + TelemetryRequest, +} from "Common/Server/Middleware/TelemetryIngest"; +import ProductType from "Common/Types/MeteredPlan/ProductType"; +import Express, { + ExpressRequest, + ExpressResponse, + ExpressRouter, + NextFunction, +} from "Common/Server/Utils/Express"; +import Response from "Common/Server/Utils/Response"; +import FluentLogsQueueService from "../Services/Queue/FluentLogsQueueService"; +import ClusterKeyAuthorization from "Common/Server/Middleware/ClusterKeyAuthorization"; +import BadRequestException from "Common/Types/Exception/BadRequestException"; + +export class FluentLogsRequestMiddleware { + public static async getProductType( + req: ExpressRequest, + _res: ExpressResponse, + next: NextFunction, + ): Promise { + try { + (req as TelemetryRequest).productType = ProductType.Logs; + return next(); + } catch (err) { + return next(err); + } + } +} + +const router: ExpressRouter = Express.getRouter(); + +router.post( + "/fluentd/v1/logs", + FluentLogsRequestMiddleware.getProductType, + TelemetryIngest.isAuthorizedServiceMiddleware, + async ( + req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise => { + try { + if (!(req as TelemetryRequest).projectId) { + throw new BadRequestException( + "Invalid request - projectId not found in request.", + ); + } + + req.body = req.body?.toJSON ? req.body.toJSON() : req.body; + + Response.sendEmptySuccessResponse(req, res); + + await FluentLogsQueueService.addFluentLogsJob( + req as TelemetryRequest, + ); + + return; + } catch (err) { + return next(err); + } + }, +); + +router.get( + "/fluent/queue/stats", + ClusterKeyAuthorization.isAuthorizedServiceMiddleware, + async ( + req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise => { + try { + const stats: { + waiting: number; + active: number; + completed: number; + failed: number; + delayed: number; + total: number; + } = await FluentLogsQueueService.getQueueStats(); + return Response.sendJsonObjectResponse(req, res, stats); + } catch (err) { + return next(err); + } + }, +); + +router.get( + "/fluent/queue/size", + ClusterKeyAuthorization.isAuthorizedServiceMiddleware, + async ( + req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise => { + try { + const size: number = await FluentLogsQueueService.getQueueSize(); + return Response.sendJsonObjectResponse(req, res, { size }); + } catch (err) { + return next(err); + } + }, +); + +router.get( + "/fluent/queue/failed", + ClusterKeyAuthorization.isAuthorizedServiceMiddleware, + async ( + req: ExpressRequest, + res: ExpressResponse, + next: NextFunction, + ): Promise => { + try { + const start: number = parseInt(req.query["start"] as string) || 0; + const end: number = parseInt(req.query["end"] as string) || 100; + + const failedJobs: Array<{ + id: string; + name: string; + data: any; + failedReason: string; + stackTrace?: string; + processedOn: Date | null; + finishedOn: Date | null; + attemptsMade: number; + }> = await FluentLogsQueueService.getFailedJobs({ + start, + end, + }); + + return Response.sendJsonObjectResponse(req, res, { + failedJobs, + pagination: { + start, + end, + count: failedJobs.length, + }, + }); + } catch (err) { + return next(err); + } + }, +); + +export default router; diff --git a/OpenTelemetryIngest/Index.ts b/OpenTelemetryIngest/Index.ts index c4948bcda8..d248576b01 100644 --- a/OpenTelemetryIngest/Index.ts +++ b/OpenTelemetryIngest/Index.ts @@ -1,6 +1,7 @@ import OTelIngestAPI from "./API/OTelIngest"; import MetricsAPI from "./API/Metrics"; import SyslogAPI from "./API/Syslog"; +import FluentAPI from "./API/Fluent"; import { PromiseVoidFunction } from "Common/Types/FunctionTypes"; import { ClickhouseAppInstance } from "Common/Server/Infrastructure/ClickhouseDatabase"; import PostgresAppInstance from "Common/Server/Infrastructure/PostgresDatabase"; @@ -12,16 +13,20 @@ import Realtime from "Common/Server/Utils/Realtime"; import App from "Common/Server/Utils/StartServer"; import Telemetry from "Common/Server/Utils/Telemetry"; import "./Jobs/TelemetryIngest/ProcessTelemetry"; +import "./Jobs/TelemetryIngest/ProcessFluentLogs"; import { OPEN_TELEMETRY_INGEST_CONCURRENCY } from "./Config"; +import type { StatusAPIOptions } from "Common/Server/API/StatusAPI"; import "ejs"; const app: ExpressApplication = Express.getExpressApp(); const APP_NAME: string = "open-telemetry-ingest"; +const ROUTE_PREFIXES: Array = [`/${APP_NAME}`, "/"]; -app.use([`/${APP_NAME}`, "/"], OTelIngestAPI); -app.use([`/${APP_NAME}`, "/"], MetricsAPI); -app.use([`/${APP_NAME}`, "/"], SyslogAPI); +app.use(ROUTE_PREFIXES, OTelIngestAPI); +app.use(ROUTE_PREFIXES, MetricsAPI); +app.use(ROUTE_PREFIXES, SyslogAPI); +app.use(ROUTE_PREFIXES, FluentAPI); const init: PromiseVoidFunction = async (): Promise => { try { @@ -44,12 +49,14 @@ const init: PromiseVoidFunction = async (): Promise => { ); // init the app + const statusOptions: StatusAPIOptions = { + liveCheck: statusCheck, + readyCheck: statusCheck, + }; + await App.init({ appName: APP_NAME, - statusOptions: { - liveCheck: statusCheck, - readyCheck: statusCheck, - }, + statusOptions: statusOptions, }); // connect to the database. @@ -63,9 +70,9 @@ const init: PromiseVoidFunction = async (): Promise => { ); await Realtime.init(); - // add default routes await App.addDefaultRoutes(); + } catch (err) { logger.error("App Init Failed:"); logger.error(err); diff --git a/OpenTelemetryIngest/Jobs/TelemetryIngest/ProcessFluentLogs.ts b/OpenTelemetryIngest/Jobs/TelemetryIngest/ProcessFluentLogs.ts new file mode 100644 index 0000000000..c0936b0127 --- /dev/null +++ b/OpenTelemetryIngest/Jobs/TelemetryIngest/ProcessFluentLogs.ts @@ -0,0 +1,139 @@ +import { FluentLogsJobData } from "../../Services/Queue/FluentLogsQueueService"; +import logger from "Common/Server/Utils/Logger"; +import { QueueJob, QueueName } from "Common/Server/Infrastructure/Queue"; +import QueueWorker from "Common/Server/Infrastructure/QueueWorker"; +import ObjectID from "Common/Types/ObjectID"; +import OneUptimeDate from "Common/Types/Date"; +import { JSONObject } from "Common/Types/JSON"; +import LogService from "Common/Server/Services/LogService"; +import LogSeverity from "Common/Types/Log/LogSeverity"; +import OTelIngestService from "Common/Server/Services/OpenTelemetryIngestService"; +import { + OPEN_TELEMETRY_INGEST_CONCURRENCY, + OPEN_TELEMETRY_INGEST_LOG_FLUSH_BATCH_SIZE, +} from "../../Config"; + +interface FluentLogsProcessData { + projectId: ObjectID; + requestBody: JSONObject; + requestHeaders: JSONObject; +} + +const LOG_FLUSH_BATCH_SIZE: number = + OPEN_TELEMETRY_INGEST_LOG_FLUSH_BATCH_SIZE || 500; + +QueueWorker.getWorker( + QueueName.FluentLogs, + async (job: QueueJob): Promise => { + logger.debug(`Processing fluent logs ingestion job: ${job.name}`); + + try { + const jobData: FluentLogsJobData = job.data as FluentLogsJobData; + + await processFluentLogsFromQueue({ + projectId: new ObjectID(jobData.projectId), + requestBody: jobData.requestBody, + requestHeaders: jobData.requestHeaders, + }); + + logger.debug(`Successfully processed fluent logs ingestion job: ${job.name}`); + } catch (error) { + logger.error(`Error processing fluent logs ingestion job:`); + logger.error(error); + throw error; + } + }, + { concurrency: OPEN_TELEMETRY_INGEST_CONCURRENCY }, +); + +async function processFluentLogsFromQueue( + data: FluentLogsProcessData, +): Promise { + const dbLogs: Array = []; + + let logItems: Array | JSONObject = data.requestBody as + | Array + | JSONObject; + + let oneuptimeServiceName: string | string[] | undefined = data.requestHeaders[ + "x-oneuptime-service-name" + ] as string | string[] | undefined; + + if (!oneuptimeServiceName) { + oneuptimeServiceName = "Unknown Service"; + } + + const telemetryService: { + serviceId: ObjectID; + dataRententionInDays: number; + } = await OTelIngestService.telemetryServiceFromName({ + serviceName: oneuptimeServiceName as string, + projectId: data.projectId, + }); + + if ( + logItems && + typeof logItems === "object" && + (logItems as JSONObject)["json"] + ) { + logItems = (logItems as JSONObject)["json"] as + | Array + | JSONObject; + } + + if (!Array.isArray(logItems)) { + logItems = [logItems]; + } + + for (const logItem of logItems) { + const logBody: string = + typeof logItem === "string" ? logItem : JSON.stringify(logItem); + + const ingestionDate: Date = OneUptimeDate.getCurrentDate(); + const ingestionIso: string = OneUptimeDate.toString(ingestionDate); + const timeUnixNano: number = OneUptimeDate.getCurrentDateAsUnixNano(); + + const logRow: JSONObject = { + _id: ObjectID.generate().toString(), + createdAt: ingestionIso, + updatedAt: ingestionIso, + projectId: data.projectId.toString(), + serviceId: telemetryService.serviceId.toString(), + time: ingestionIso, + timeUnixNano: Math.trunc(timeUnixNano).toString(), + severityNumber: 0, + severityText: LogSeverity.Unspecified, + attributes: {}, + attributeKeys: [], + traceId: "", + spanId: "", + body: logBody, + }; + + dbLogs.push(logRow); + + if (dbLogs.length >= LOG_FLUSH_BATCH_SIZE) { + await flushLogBuffer(dbLogs); + } + } + + await flushLogBuffer(dbLogs, true); +} + +logger.debug("Fluent logs ingest worker initialized"); + +async function flushLogBuffer( + logs: Array, + force: boolean = false, +): Promise { + while (logs.length >= LOG_FLUSH_BATCH_SIZE || (force && logs.length > 0)) { + const batchSize: number = Math.min(logs.length, LOG_FLUSH_BATCH_SIZE); + const batch: Array = logs.splice(0, batchSize); + + if (batch.length === 0) { + continue; + } + + await LogService.insertJsonRows(batch); + } +} diff --git a/OpenTelemetryIngest/Services/Queue/FluentLogsQueueService.ts b/OpenTelemetryIngest/Services/Queue/FluentLogsQueueService.ts new file mode 100644 index 0000000000..833367518f --- /dev/null +++ b/OpenTelemetryIngest/Services/Queue/FluentLogsQueueService.ts @@ -0,0 +1,73 @@ +import { TelemetryRequest } from "Common/Server/Middleware/TelemetryIngest"; +import Queue, { QueueName } from "Common/Server/Infrastructure/Queue"; +import { JSONObject } from "Common/Types/JSON"; +import OneUptimeDate from "Common/Types/Date"; +import logger from "Common/Server/Utils/Logger"; + +export interface FluentLogsJobData { + projectId: string; + requestBody: JSONObject; + requestHeaders: Record; + ingestionTimestamp: Date; +} + +export default class FluentLogsQueueService { + public static async addFluentLogsJob(req: TelemetryRequest): Promise { + try { + const jobData: FluentLogsJobData = { + projectId: req.projectId.toString(), + requestBody: req.body, + requestHeaders: req.headers as Record, + ingestionTimestamp: OneUptimeDate.getCurrentDate(), + }; + + const jobId: string = `fluent-logs-${req.projectId?.toString()}-${OneUptimeDate.getCurrentDateAsUnixNano()}`; + + await Queue.addJob( + QueueName.FluentLogs, + jobId, + "ProcessFluentLogs", + jobData as unknown as JSONObject, + ); + + logger.debug(`Added fluent logs ingestion job: ${jobId}`); + } catch (error) { + logger.error(`Error adding fluent logs ingestion job:`); + logger.error(error); + throw error; + } + } + + public static async getQueueSize(): Promise { + return Queue.getQueueSize(QueueName.FluentLogs); + } + + public static async getQueueStats(): Promise<{ + waiting: number; + active: number; + completed: number; + failed: number; + delayed: number; + total: number; + }> { + return Queue.getQueueStats(QueueName.FluentLogs); + } + + public static getFailedJobs(options?: { + start?: number; + end?: number; + }): Promise< + Array<{ + id: string; + name: string; + data: JSONObject; + failedReason: string; + stackTrace?: string; + processedOn: Date | null; + finishedOn: Date | null; + attemptsMade: number; + }> + > { + return Queue.getFailedJobs(QueueName.FluentLogs, options); + } +} diff --git a/Tests/Scripts/status-check.sh b/Tests/Scripts/status-check.sh index 9d6006f852..4196ac7cb2 100644 --- a/Tests/Scripts/status-check.sh +++ b/Tests/Scripts/status-check.sh @@ -29,8 +29,6 @@ bash $scriptDir/endpoint-status.sh "Dashboard (Status Check)" $HOST_TO_CHECK/das bash $scriptDir/endpoint-status.sh "Status Page" $HOST_TO_CHECK/status-page -bash $scriptDir/endpoint-status.sh "Status Page" $HOST_TO_CHECK/fluent-ingest/status/ready - bash $scriptDir/endpoint-status.sh "Status Page" $HOST_TO_CHECK/incoming-request-ingest/status/ready bash $scriptDir/endpoint-status.sh "Status Page (Ready Check)" $HOST_TO_CHECK/status-page/status/ready diff --git a/config.example.env b/config.example.env index dbfb8ff41d..c6dd1b5707 100644 --- a/config.example.env +++ b/config.example.env @@ -59,8 +59,6 @@ COMPOSE_PROJECT_NAME=oneuptime # OTEL HOST - if you like the collector to be hosted on a different server then change this to the IP of the server. OTEL_COLLECTOR_HOST= -# FLUENTD_HOST - if you like the fluentd to be hosted on a different server then change this to the IP of the server. -FLUENTD_HOST= # Clickhouse Settings CLICKHOUSE_USER=default @@ -94,7 +92,7 @@ REDIS_TLS_SENTINEL_MODE=false # Hostnames. Usually does not need to change. PROBE_INGEST_HOSTNAME=probe-ingest:3400 -FLUENT_INGEST_HOSTNAME=fluent-ingest:3401 +FLUENT_LOGS_HOSTNAME=open-telemetry-ingest:3403 INCOMING_REQUEST_INGEST_HOSTNAME=incoming-request-ingest:3402 OPEN_TELEMETRY_INGEST_HOSTNAME=otel-telemetry-ingest:3403 @@ -248,8 +246,8 @@ WORKFLOW_TIMEOUT_IN_MS=5000 # Max number of telemetry jobs processed concurrently by OpenTelemetry Ingest worker OPEN_TELEMETRY_INGEST_CONCURRENCY=100 -# Max number of jobs processed concurrently by Fluent Ingest worker -FLUENT_INGEST_CONCURRENCY=100 +# Max number of jobs processed concurrently by Fluent Logs worker +FLUENT_LOGS_CONCURRENCY=100 # Max number of jobs processed concurrently by Incoming Request Ingest worker INCOMING_REQUEST_INGEST_CONCURRENCY=100 @@ -316,7 +314,7 @@ DISABLE_TELEMETRY_FOR_ACCOUNTS=true DISABLE_TELEMETRY_FOR_APP=true DISABLE_TELEMETRY_FOR_PROBE_INGEST=true DISABLE_TELEMETRY_FOR_OPEN_TELEMETRY_INGEST=true -DISABLE_TELEMETRY_FOR_FLUENT_INGEST=true +DISABLE_TELEMETRY_FOR_FLUENT_LOGS=true DISABLE_TELEMETRY_FOR_INCOMING_REQUEST_INGEST=true DISABLE_TELEMETRY_FOR_TEST_SERVER=true DISABLE_TELEMETRY_FOR_STATUS_PAGE=true diff --git a/docker-compose.base.yml b/docker-compose.base.yml index 733de32f66..93e75e0c99 100644 --- a/docker-compose.base.yml +++ b/docker-compose.base.yml @@ -7,7 +7,6 @@ x-common-variables: &common-variables OTEL_COLLECTOR_HOST: ${OTEL_COLLECTOR_HOST} - FLUENTD_HOST: ${FLUENTD_HOST} STATUS_PAGE_CNAME_RECORD: ${STATUS_PAGE_CNAME_RECORD} @@ -412,15 +411,6 @@ services: ports: - 13133:13133 # Otel Collector Health Check Endpoint at /heath/status - fluentd: - networks: - - oneuptime - restart: always - logging: - driver: "local" - options: - max-size: "1000m" - fluent-bit: networks: - oneuptime diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index a1cf775b45..998f1212ff 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -394,23 +394,6 @@ services: context: . dockerfile: ./IncomingRequestIngest/Dockerfile - fluent-ingest: - volumes: - - ./FluentIngest:/usr/src/app:cached - # Use node modules of the container and not host system. - # https://stackoverflow.com/questions/29181032/add-a-volume-to-docker-but-exclude-a-sub-folder - - /usr/src/app/node_modules/ - - ./Common:/usr/src/Common:cached - - /usr/src/Common/node_modules/ - ports: - - '9937:9229' # Debugging port. - extends: - file: ./docker-compose.base.yml - service: fluent-ingest - build: - network: host - context: . - dockerfile: ./FluentIngest/Dockerfile # Fluentd. Required only for development. In production its the responsibility of the customer to run fluentd and pipe logs to OneUptime. diff --git a/docker-compose.yml b/docker-compose.yml index a93da3b540..ca8300ef9b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -126,12 +126,6 @@ services: file: ./docker-compose.base.yml service: incoming-request-ingest - fluent-ingest: - image: oneuptime/fluent-ingest:${APP_TAG} - extends: - file: ./docker-compose.base.yml - service: fluent-ingest - isolated-vm: image: oneuptime/isolated-vm:${APP_TAG} extends: