diff --git a/App/FeatureSet/Docs/Content/telemetry/fluentd.md b/App/FeatureSet/Docs/Content/telemetry/fluentd.md index a2e1a39e17..efd344ca54 100644 --- a/App/FeatureSet/Docs/Content/telemetry/fluentd.md +++ b/App/FeatureSet/Docs/Content/telemetry/fluentd.md @@ -57,7 +57,7 @@ You can use the following configuration to send the telemetry data to the OneUpt endpoint https://oneuptime.com/fluentd/logs open_timeout 2 - headers {"x-oneuptime-service-token":""} + headers {"x-oneuptime-token":"YOUR_SERVICE_TOKEN", "x-oneuptime-service-name":"YOUR_SERVICE_NAME"} content_type application/json json_array true @@ -93,7 +93,7 @@ An example of full configuration file is shown below: endpoint https://oneuptime.com/fluentd/logs open_timeout 2 - headers {"x-oneuptime-service-token":""} + headers {"x-oneuptime-token":"YOUR_SERVICE_TOKEN", "x-oneuptime-service-name":"YOUR_SERVICE_NAME"} content_type application/json json_array true diff --git a/App/FeatureSet/Docs/Content/telemetry/open-telemetry.md b/App/FeatureSet/Docs/Content/telemetry/open-telemetry.md index 0aae01a01d..efa2453000 100644 --- a/App/FeatureSet/Docs/Content/telemetry/open-telemetry.md +++ b/App/FeatureSet/Docs/Content/telemetry/open-telemetry.md @@ -42,13 +42,15 @@ Once you have configured the telemetry service in your application, you can inte | Environment Variable | Value | | --- | --- | -| OTEL_EXPORTER_OTLP_HEADERS | x-oneuptime-service-token= | +| OTEL_EXPORTER_OTLP_HEADERS | x-oneuptime-token= | | OTEL_EXPORTER_OTLP_ENDPOINT | https://otlp.oneuptime.com | +| OTEL_SERVICE_NAME | | + **Example** ```bash -export OTEL_EXPORTER_OTLP_HEADERS=x-oneuptime-service-token=9c8806e0-a4aa-11ee-be95-010d5967b068 +export OTEL_EXPORTER_OTLP_HEADERS=x-oneuptime-token=9c8806e0-a4aa-11ee-be95-010d5967b068 export OTEL_EXPORTER_OTLP_ENDPOINT=https://otlp.oneuptime.com ``` diff --git a/Common/Types/JSONFunctions.ts b/Common/Types/JSONFunctions.ts index d4456acac8..3ca9ef3e80 100644 --- a/Common/Types/JSONFunctions.ts +++ b/Common/Types/JSONFunctions.ts @@ -1,6 +1,7 @@ import BaseModel from "../Models/BaseModel"; import DatabaseProperty from "./Database/DatabaseProperty"; import OneUptimeDate from "./Date"; +import DiskSize from "./DiskSize"; import { JSONArray, JSONObject, JSONValue, ObjectType } from "./JSON"; import SerializableObject from "./SerializableObject"; import SerializableObjectDictionary from "./SerializableObjectDictionary"; @@ -8,6 +9,12 @@ import Typeof from "./Typeof"; import JSON5 from "json5"; export default class JSONFunctions { + public static getSizeOfJSONinGB(obj: JSONObject): number { + const sizeInBytes: number = Buffer.byteLength(JSON.stringify(obj)); + const sizeToGb: number = DiskSize.byteSizeToGB(sizeInBytes); + return sizeToGb; + } + public static nestJson(obj: JSONObject): JSONObject { // obj could be in this format: diff --git a/Examples/otel-dotnet/README.md b/Examples/otel-dotnet/README.md index b41730fadf..d8755937f9 100644 --- a/Examples/otel-dotnet/README.md +++ b/Examples/otel-dotnet/README.md @@ -5,12 +5,12 @@ Please use ```bash -export OTEL_EXPORTER_OTLP_HEADERS="x-oneuptime-service-token=51d01130-cf14-11ee-a74d-1364f8ef0ac6" && export OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost" && dotnet run --urls=http://localhost:7856/ +export OTEL_EXPORTER_OTLP_HEADERS="x-oneuptime-token=51d01130-cf14-11ee-a74d-1364f8ef0ac6" && export OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost" && dotnet run --urls=http://localhost:7856/ ``` ### Run on test server ```bash -export OTEL_EXPORTER_OTLP_HEADERS="x-oneuptime-service-token=3be58190-c7ec-11ee-8e5e-3952f961cde5" && export OTEL_EXPORTER_OTLP_ENDPOINT="https://test-otlp.oneuptime.com" && dotnet run --urls=http://localhost:7856/ +export OTEL_EXPORTER_OTLP_HEADERS="x-oneuptime-token=3be58190-c7ec-11ee-8e5e-3952f961cde5" && export OTEL_EXPORTER_OTLP_ENDPOINT="https://test-otlp.oneuptime.com" && dotnet run --urls=http://localhost:7856/ ``` diff --git a/Fluentd/fluent.conf b/Fluentd/fluent.conf index 748445ad69..753cf6a77a 100644 --- a/Fluentd/fluent.conf +++ b/Fluentd/fluent.conf @@ -18,7 +18,7 @@ endpoint https://test.oneuptime.com/fluentd/logs # This is for test environment open_timeout 2 - headers {"x-oneuptime-service-token":"e83375b0-c1fc-11ee-a9f7-070615743da6"} + headers {"x-oneuptime-token":"e83375b0-c1fc-11ee-a9f7-070615743da6"} content_type application/json json_array true diff --git a/Ingestor/API/FluentIngest.ts b/Ingestor/API/FluentIngest.ts index bf25b37227..d50a029764 100644 --- a/Ingestor/API/FluentIngest.ts +++ b/Ingestor/API/FluentIngest.ts @@ -14,6 +14,8 @@ import Express, { import logger from "CommonServer/Utils/Logger"; import Response from "CommonServer/Utils/Response"; import Log, { LogSeverity } from "Model/AnalyticsModels/Log"; +import OTelIngestService from "../Service/OTelIngest"; +import ObjectID from "Common/Types/ObjectID"; export class FluentRequestMiddleware { public static async getProductType( @@ -50,11 +52,26 @@ router.post( JSONObject | string >; + let oneuptimeServiceName: string | string[] | undefined = + req.headers["x-oneuptime-service-name"]; + + if (!oneuptimeServiceName) { + oneuptimeServiceName = "Unknown Service"; + } + + const telemetryService: { + serviceId: ObjectID; + dataRententionInDays: number; + } = await OTelIngestService.telemetryServiceFromName({ + serviceName: oneuptimeServiceName as string, + projectId: (req as TelemetryRequest).projectId, + }); + for (let logItem of logItems) { const dbLog: Log = new Log(); dbLog.projectId = (req as TelemetryRequest).projectId; - dbLog.serviceId = (req as TelemetryRequest).serviceId; + dbLog.serviceId = telemetryService.serviceId; dbLog.severityNumber = 0; const currentTimeAndDate: Date = OneUptimeDate.getCurrentDate(); dbLog.timeUnixNano = OneUptimeDate.toUnixNano(currentTimeAndDate); diff --git a/Ingestor/API/OTelIngest.ts b/Ingestor/API/OTelIngest.ts index d389ec08f4..37f1bafe56 100644 --- a/Ingestor/API/OTelIngest.ts +++ b/Ingestor/API/OTelIngest.ts @@ -4,6 +4,7 @@ import TelemetryIngest, { } from "../Middleware/TelemetryIngest"; import OTelIngestService, { OtelAggregationTemporality, + TelemetryServiceDataIngested, } from "../Service/OTelIngest"; import OneUptimeDate from "Common/Types/Date"; import BadRequestException from "Common/Types/Exception/BadRequestException"; @@ -27,6 +28,8 @@ import Log, { LogSeverity } from "Model/AnalyticsModels/Log"; import Metric, { MetricPointType } from "Model/AnalyticsModels/Metric"; import Span, { SpanKind, SpanStatus } from "Model/AnalyticsModels/Span"; import protobuf from "protobufjs"; +import Dictionary from "Common/Types/Dictionary"; +import ObjectID from "Common/Types/ObjectID"; // Load proto file for OTel @@ -104,12 +107,9 @@ router.post( next: NextFunction, ): Promise => { try { - if ( - !(req as TelemetryRequest).projectId || - !(req as TelemetryRequest).serviceId - ) { + if (!(req as TelemetryRequest).projectId) { throw new BadRequestException( - "Invalid request - projectId or serviceId not found in request.", + "Invalid request - projectId not found in request.", ); } @@ -122,7 +122,36 @@ router.post( let attributes: string[] = []; + const serviceDictionary: Dictionary = {}; + for (const resourceSpan of resourceSpans) { + // get service name from resourceSpan attributes + + const serviceName: string = getServiceNameFromAttributes( + (resourceSpan["resource"] as JSONObject)["attributes"] as JSONArray, + ); + + if (!serviceDictionary[serviceName]) { + const service: { + serviceId: ObjectID; + dataRententionInDays: number; + } = await OTelIngestService.telemetryServiceFromName({ + serviceName: serviceName, + projectId: (req as TelemetryRequest).projectId, + }); + + serviceDictionary[serviceName] = { + serviceName: serviceName, + serviceId: service.serviceId, + dataRententionInDays: service.dataRententionInDays, + dataIngestedInGB: 0, + }; + } + + // size of req.body in bytes. + const sizeInGb: number = JSONFunctions.getSizeOfJSONinGB(resourceSpan); + serviceDictionary[serviceName]!.dataIngestedInGB += sizeInGb; + const scopeSpans: JSONArray = resourceSpan["scopeSpans"] as JSONArray; for (const scopeSpan of scopeSpans) { @@ -147,8 +176,8 @@ router.post( items: (resourceSpan["resource"] as JSONObject)[ "attributes" ] as JSONArray, - telemetryServiceName: (req as TelemetryRequest).serviceName, - telemetryServiceId: (req as TelemetryRequest).serviceId, + telemetryServiceName: serviceName, + telemetryServiceId: serviceDictionary[serviceName]!.serviceId!, }); } @@ -165,13 +194,13 @@ router.post( ...attributesObject, ...OTelIngestService.getAttributes({ items: span["attributes"] as JSONArray, - telemetryServiceName: (req as TelemetryRequest).serviceName, - telemetryServiceId: (req as TelemetryRequest).serviceId, + telemetryServiceName: serviceName, + telemetryServiceId: serviceDictionary[serviceName]!.serviceId!, }), }; dbSpan.projectId = (req as TelemetryRequest).projectId; - dbSpan.serviceId = (req as TelemetryRequest).serviceId; + dbSpan.serviceId = serviceDictionary[serviceName]!.serviceId!; dbSpan.spanId = Text.convertBase64ToHex(span["spanId"] as string); dbSpan.traceId = Text.convertBase64ToHex(span["traceId"] as string); @@ -252,8 +281,9 @@ router.post( name: event["name"] as string, attributes: OTelIngestService.getAttributes({ items: event["attributes"] as JSONArray, - telemetryServiceName: (req as TelemetryRequest).serviceName, - telemetryServiceId: (req as TelemetryRequest).serviceId, + telemetryServiceName: serviceName, + telemetryServiceId: + serviceDictionary[serviceName]!.serviceId!, }), }); } @@ -270,8 +300,9 @@ router.post( spanId: Text.convertBase64ToHex(link["spanId"] as string), attributes: OTelIngestService.getAttributes({ items: link["attributes"] as JSONArray, - telemetryServiceName: (req as TelemetryRequest).serviceName, - telemetryServiceId: (req as TelemetryRequest).serviceId, + telemetryServiceName: serviceName, + telemetryServiceId: + serviceDictionary[serviceName]!.serviceId!, }), }); } @@ -304,6 +335,14 @@ router.post( logger.error(err); }); + OTelIngestService.recordDataIngestedUsgaeBilling({ + services: serviceDictionary, + projectId: (req as TelemetryRequest).projectId, + productType: ProductType.Traces, + }).catch((err: Error) => { + logger.error(err); + }); + return Response.sendEmptySuccessResponse(req, res); } catch (err) { return next(err); @@ -321,12 +360,9 @@ router.post( next: NextFunction, ): Promise => { try { - if ( - !(req as TelemetryRequest).projectId || - !(req as TelemetryRequest).serviceId - ) { + if (!(req as TelemetryRequest).projectId) { throw new BadRequestException( - "Invalid request - projectId or serviceId not found in request.", + "Invalid request - projectId not found in request.", ); } @@ -340,7 +376,38 @@ router.post( let attributes: string[] = []; + const serviceDictionary: Dictionary = {}; + for (const resourceMetric of resourceMetrics) { + // get service name from resourceMetric attributes + + const serviceName: string = getServiceNameFromAttributes( + (resourceMetric["resource"] as JSONObject)["attributes"] as JSONArray, + ); + + if (!serviceDictionary[serviceName]) { + const service: { + serviceId: ObjectID; + dataRententionInDays: number; + } = await OTelIngestService.telemetryServiceFromName({ + serviceName: serviceName, + projectId: (req as TelemetryRequest).projectId, + }); + + serviceDictionary[serviceName] = { + serviceName: serviceName, + serviceId: service.serviceId, + dataRententionInDays: service.dataRententionInDays, + dataIngestedInGB: 0, + }; + } + + // size of req.body in bytes. + const sizeInGb: number = + JSONFunctions.getSizeOfJSONinGB(resourceMetric); + + serviceDictionary[serviceName]!.dataIngestedInGB += sizeInGb; + const scopeMetrics: JSONArray = resourceMetric[ "scopeMetrics" ] as JSONArray; @@ -357,7 +424,7 @@ router.post( const dbMetric: Metric = new Metric(); dbMetric.projectId = (req as TelemetryRequest).projectId; - dbMetric.serviceId = (req as TelemetryRequest).serviceId; + dbMetric.serviceId = serviceDictionary[serviceName]!.serviceId!; dbMetric.name = metricName; dbMetric.description = metricDescription; @@ -376,8 +443,9 @@ router.post( attributesObject = { ...OTelIngestService.getAttributes({ items: metric["attributes"] as JSONArray, - telemetryServiceName: (req as TelemetryRequest).serviceName, - telemetryServiceId: (req as TelemetryRequest).serviceId, + telemetryServiceName: serviceName, + telemetryServiceId: + serviceDictionary[serviceName]!.serviceId!, }), }; } @@ -400,8 +468,9 @@ router.post( items: (resourceMetric["resource"] as JSONObject)[ "attributes" ] as JSONArray, - telemetryServiceName: (req as TelemetryRequest).serviceName, - telemetryServiceId: (req as TelemetryRequest).serviceId, + telemetryServiceName: serviceName, + telemetryServiceId: + serviceDictionary[serviceName]!.serviceId!, }), }; } @@ -438,8 +507,9 @@ router.post( isMonotonic: (metric["sum"] as JSONObject)[ "isMonotonic" ] as boolean | undefined, - telemetryServiceId: (req as TelemetryRequest).serviceId, - telemetryServiceName: (req as TelemetryRequest).serviceName, + telemetryServiceId: + serviceDictionary[serviceName]!.serviceId!, + telemetryServiceName: serviceName, }); sumMetric.metricPointType = MetricPointType.Sum; @@ -469,8 +539,9 @@ router.post( isMonotonic: (metric["gauge"] as JSONObject)[ "isMonotonic" ] as boolean | undefined, - telemetryServiceId: (req as TelemetryRequest).serviceId, - telemetryServiceName: (req as TelemetryRequest).serviceName, + telemetryServiceId: + serviceDictionary[serviceName]!.serviceId!, + telemetryServiceName: serviceName, }); guageMetric.metricPointType = MetricPointType.Gauge; @@ -500,8 +571,9 @@ router.post( isMonotonic: (metric["histogram"] as JSONObject)[ "isMonotonic" ] as boolean | undefined, - telemetryServiceId: (req as TelemetryRequest).serviceId, - telemetryServiceName: (req as TelemetryRequest).serviceName, + telemetryServiceId: + serviceDictionary[serviceName]!.serviceId!, + telemetryServiceName: serviceName, }); histogramMetric.metricPointType = MetricPointType.Histogram; @@ -540,6 +612,14 @@ router.post( logger.error(err); }); + OTelIngestService.recordDataIngestedUsgaeBilling({ + services: serviceDictionary, + projectId: (req as TelemetryRequest).projectId, + productType: ProductType.Metrics, + }).catch((err: Error) => { + logger.error(err); + }); + return Response.sendEmptySuccessResponse(req, res); } catch (err) { return next(err); @@ -557,12 +637,9 @@ router.post( next: NextFunction, ): Promise => { try { - if ( - !(req as TelemetryRequest).projectId || - !(req as TelemetryRequest).serviceId - ) { + if (!(req as TelemetryRequest).projectId) { throw new BadRequestException( - "Invalid request - projectId or serviceId not found in request.", + "Invalid request - projectId not found in request.", ); } @@ -574,7 +651,36 @@ router.post( let attributes: string[] = []; + const serviceDictionary: Dictionary = {}; + for (const resourceLog of resourceLogs) { + // get service name from resourceLog attributes + + const serviceName: string = getServiceNameFromAttributes( + (resourceLog["resource"] as JSONObject)["attributes"] as JSONArray, + ); + + if (!serviceDictionary[serviceName]) { + const service: { + serviceId: ObjectID; + dataRententionInDays: number; + } = await OTelIngestService.telemetryServiceFromName({ + serviceName: serviceName, + projectId: (req as TelemetryRequest).projectId, + }); + + serviceDictionary[serviceName] = { + serviceName: serviceName, + serviceId: service.serviceId, + dataRententionInDays: service.dataRententionInDays, + dataIngestedInGB: 0, + }; + } + + // size of req.body in bytes. + const sizeInGb: number = JSONFunctions.getSizeOfJSONinGB(resourceLog); + serviceDictionary[serviceName]!.dataIngestedInGB += sizeInGb; + const scopeLogs: JSONArray = resourceLog["scopeLogs"] as JSONArray; for (const scopeLog of scopeLogs) { @@ -618,8 +724,9 @@ router.post( items: (resourceLog["resource"] as JSONObject)[ "attributes" ] as JSONArray, - telemetryServiceName: (req as TelemetryRequest).serviceName, - telemetryServiceId: (req as TelemetryRequest).serviceId, + telemetryServiceName: serviceName, + telemetryServiceId: + serviceDictionary[serviceName]!.serviceId!, }), }; } @@ -638,13 +745,13 @@ router.post( ...attributesObject, ...OTelIngestService.getAttributes({ items: log["attributes"] as JSONArray, - telemetryServiceName: (req as TelemetryRequest).serviceName, - telemetryServiceId: (req as TelemetryRequest).serviceId, + telemetryServiceName: serviceName, + telemetryServiceId: serviceDictionary[serviceName]!.serviceId!, }), }; dbLog.projectId = (req as TelemetryRequest).projectId; - dbLog.serviceId = (req as TelemetryRequest).serviceId; + dbLog.serviceId = serviceDictionary[serviceName]!.serviceId!; dbLog.timeUnixNano = log["timeUnixNano"] as number; dbLog.time = OneUptimeDate.fromUnixNano( @@ -730,6 +837,14 @@ router.post( logger.error(err); }); + OTelIngestService.recordDataIngestedUsgaeBilling({ + services: serviceDictionary, + projectId: (req as TelemetryRequest).projectId, + productType: ProductType.Logs, + }).catch((err: Error) => { + logger.error(err); + }); + return Response.sendEmptySuccessResponse(req, res); } catch (err) { return next(err); @@ -737,4 +852,18 @@ router.post( }, ); +type GetServiceNameFromAttributesFunction = (attributes: JSONArray) => string; + +const getServiceNameFromAttributes: GetServiceNameFromAttributesFunction = ( + attributes: JSONArray, +): string => { + for (const attribute of attributes) { + if (attribute["key"] === "service.name") { + return attribute["value"] as string; + } + } + + return "Unknown Service"; +}; + export default router; diff --git a/Ingestor/Middleware/TelemetryIngest.ts b/Ingestor/Middleware/TelemetryIngest.ts index 60efc7a45c..2f22c78192 100644 --- a/Ingestor/Middleware/TelemetryIngest.ts +++ b/Ingestor/Middleware/TelemetryIngest.ts @@ -1,24 +1,17 @@ import { ProbeExpressRequest } from "../Types/Request"; -import DiskSize from "Common/Types/DiskSize"; import BadRequestException from "Common/Types/Exception/BadRequestException"; import ProductType from "Common/Types/MeteredPlan/ProductType"; import ObjectID from "Common/Types/ObjectID"; -import TelemetryServiceService from "CommonServer/Services/TelemetryServiceService"; -import TelemetryUsageBillingService from "CommonServer/Services/TelemetryUsageBillingService"; import { ExpressRequest, ExpressResponse, NextFunction, } from "CommonServer/Utils/Express"; -import logger from "CommonServer/Utils/Logger"; -import TelemetryService from "Model/Models/TelemetryService"; -import { DEFAULT_RETENTION_IN_DAYS } from "Model/Models/TelemetryUsageBilling"; +import TelemetryIngestionKeyService from "CommonServer/Services/TelemetryIngestionKeyService"; +import TelemetryIngestionKey from "Model/Models/TelemetryIngestionKey"; export interface TelemetryRequest extends ExpressRequest { - serviceId: ObjectID; // Service ID - serviceName: string; // Service Name projectId: ObjectID; // Project ID - dataRententionInDays: number; // how long the data should be retained. productType: ProductType; // what is the product type of the request - logs, metrics or traces. } @@ -31,64 +24,44 @@ export default class TelemetryIngest { try { // check header. - const serviceTokenInHeader: string | undefined = req.headers[ - "x-oneuptime-service-token" + const oneuptimeToken: string | undefined = req.headers[ + "x-oneuptime-token" ] as string | undefined; - if (!serviceTokenInHeader) { - throw new BadRequestException( - "Missing header: x-oneuptime-service-token", - ); + if (!oneuptimeToken) { + throw new BadRequestException("Missing header: x-oneuptime-token"); } - // size of req.body in bytes. - const sizeInBytes: number = Buffer.byteLength(JSON.stringify(req.body)); + let projectId: ObjectID | undefined = undefined; - const sizeToGb: number = DiskSize.byteSizeToGB(sizeInBytes); - - // load from the database and set the cache. - const service: TelemetryService | null = - await TelemetryServiceService.findOneBy({ + const token: TelemetryIngestionKey | null = + await TelemetryIngestionKeyService.findOneBy({ query: { - telemetryServiceToken: new ObjectID(serviceTokenInHeader as string), + secretKey: new ObjectID(oneuptimeToken?.toString() || ""), }, select: { - _id: true, projectId: true, - retainTelemetryDataForDays: true, - name: true, }, props: { isRoot: true, }, }); - if (!service) { + if (!token) { throw new BadRequestException( - "Invalid service token: " + serviceTokenInHeader, + "Invalid service token: " + oneuptimeToken, ); } - (req as TelemetryRequest).serviceId = service.id as ObjectID; - (req as TelemetryRequest).projectId = service.projectId as ObjectID; - (req as TelemetryRequest).serviceName = service.name as string; - (req as TelemetryRequest).dataRententionInDays = - service.retainTelemetryDataForDays || DEFAULT_RETENTION_IN_DAYS; + projectId = token.projectId as ObjectID; - (req as TelemetryRequest).serviceId = service.id as ObjectID; - (req as TelemetryRequest).projectId = service.projectId as ObjectID; + if (!projectId) { + throw new BadRequestException( + "Project ID not found for service token: " + oneuptimeToken, + ); + } - // report to Usage Service. - TelemetryUsageBillingService.updateUsageBilling({ - projectId: (req as TelemetryRequest).projectId, - productType: (req as TelemetryRequest).productType, - dataIngestedInGB: sizeToGb, - telemetryServiceId: (req as TelemetryRequest).serviceId, - retentionInDays: (req as TelemetryRequest).dataRententionInDays, - }).catch((err: Error) => { - logger.error("Failed to update usage billing for OTel"); - logger.error(err); - }); + (req as TelemetryRequest).projectId = projectId as ObjectID; next(); } catch (err) { diff --git a/Ingestor/Service/OTelIngest.ts b/Ingestor/Service/OTelIngest.ts index 6e5206fada..8ef75175bf 100644 --- a/Ingestor/Service/OTelIngest.ts +++ b/Ingestor/Service/OTelIngest.ts @@ -7,13 +7,110 @@ import GlobalCache from "CommonServer/Infrastructure/GlobalCache"; import Metric, { AggregationTemporality } from "Model/AnalyticsModels/Metric"; import TelemetryType from "Common/Types/Telemetry/TelemetryType"; import TelemetryAttributeService from "CommonServer/Services/TelemetryAttributeService"; +import Dictionary from "Common/Types/Dictionary"; +import ProductType from "Common/Types/MeteredPlan/ProductType"; +import { IsBillingEnabled } from "CommonServer/EnvironmentConfig"; +import TelemetryUsageBillingService from "CommonServer/Services/TelemetryUsageBillingService"; +import logger from "CommonServer/Utils/Logger"; +import TelemetryService from "Model/Models/TelemetryService"; +import TelemetryServiceService from "CommonServer/Services/TelemetryServiceService"; +import { DEFAULT_RETENTION_IN_DAYS } from "Model/Models/TelemetryUsageBilling"; export enum OtelAggregationTemporality { Cumulative = "AGGREGATION_TEMPORALITY_CUMULATIVE", Delta = "AGGREGATION_TEMPORALITY_DELTA", } +export interface TelemetryServiceDataIngested { + serviceName: string; + serviceId: ObjectID; + dataIngestedInGB: number; + dataRententionInDays: number; +} + export default class OTelIngestService { + public static async telemetryServiceFromName(data: { + serviceName: string; + projectId: ObjectID; + }): Promise<{ + serviceId: ObjectID; + dataRententionInDays: number; + }> { + const service: TelemetryService | null = + await TelemetryServiceService.findOneBy({ + query: { + projectId: data.projectId, + name: data.serviceName, + }, + select: { + _id: true, + retainTelemetryDataForDays: true, + }, + props: { + isRoot: true, + }, + }); + + if (!service) { + // create service + + const newService: TelemetryService = new TelemetryService(); + newService.projectId = data.projectId; + newService.name = data.serviceName; + newService.description = data.serviceName; + newService.retainTelemetryDataForDays = DEFAULT_RETENTION_IN_DAYS; + + const createdService: TelemetryService = + await TelemetryServiceService.create({ + data: newService, + props: { + isRoot: true, + }, + }); + + return { + serviceId: createdService.id!, + dataRententionInDays: DEFAULT_RETENTION_IN_DAYS, + }; + } + + return { + serviceId: service.id!, + dataRententionInDays: + service.retainTelemetryDataForDays || DEFAULT_RETENTION_IN_DAYS, + }; + } + + public static async recordDataIngestedUsgaeBilling(data: { + services: Dictionary; + projectId: ObjectID; + productType: ProductType; + }): Promise { + if (!IsBillingEnabled) { + return; + } + + for (const serviceName in data.services) { + const serviceData: TelemetryServiceDataIngested | undefined = + data.services[serviceName]; + + if (!serviceData) { + continue; + } + + TelemetryUsageBillingService.updateUsageBilling({ + projectId: data.projectId, + productType: data.productType, + dataIngestedInGB: serviceData.dataIngestedInGB || 0, + telemetryServiceId: serviceData.serviceId, + retentionInDays: serviceData.dataRententionInDays, + }).catch((err: Error) => { + logger.error("Failed to update usage billing for OTel"); + logger.error(err); + }); + } + } + public static async indexAttributes(data: { attributes: string[]; projectId: ObjectID; diff --git a/config.example.env b/config.example.env index 0eefad79a5..41b1a080e4 100644 --- a/config.example.env +++ b/config.example.env @@ -199,7 +199,7 @@ SERVER_OPENTELEMETRY_EXPORTER_OTLP_ENDPOINT= # You can set the env var to http://localhost/otlp if you want instrumentation to be sent to local otel collector. CLIENT_OPENTELEMETRY_EXPORTER_OTLP_ENDPOINT= -# You can set the env var to "x-oneuptime-service-token=" +# You can set the env var to "x-oneuptime-token=" APP_OPENTELEMETRY_EXPORTER_OTLP_HEADERS= PROBE_OPENTELEMETRY_EXPORTER_OTLP_HEADERS= DASHBOARD_OPENTELEMETRY_EXPORTER_OTLP_HEADERS=