diff --git a/Common/Server/Services/MonitorTestService.ts b/Common/Server/Services/MonitorTestService.ts index 7c7856a593..7bd7fa92bd 100644 --- a/Common/Server/Services/MonitorTestService.ts +++ b/Common/Server/Services/MonitorTestService.ts @@ -1,11 +1,112 @@ -import DatabaseService from "./DatabaseService"; +import DatabaseService, { EntityManager } from "./DatabaseService"; import MonitorTest from "../../Models/DatabaseModels/MonitorTest"; +import ObjectID from "../../Types/ObjectID"; +import OneUptimeDate from "../../Types/Date"; +import { MonitorStepProbeResponse } from "../../Models/DatabaseModels/MonitorProbe"; export class Service extends DatabaseService { + private static readonly STALE_TEST_CLAIM_TIMEOUT_IN_MINUTES: number = 10; + public constructor() { super(MonitorTest); this.hardDeleteItemsOlderThanInDays("createdAt", 2); // this is temporary data. Clear it after 2 days. } + + /** + * Atomically claims monitor tests for a specific probe instance using + * FOR UPDATE SKIP LOCKED so concurrent probe replicas do not execute the + * same monitor test. + */ + public async claimMonitorTestsForProbing(data: { + probeId: ObjectID; + limit: number; + }): Promise> { + const staleClaimThreshold: Date = OneUptimeDate.addRemoveMinutes( + OneUptimeDate.getCurrentDate(), + -Service.STALE_TEST_CLAIM_TIMEOUT_IN_MINUTES, + ); + + return await this.executeTransaction( + async (transactionalEntityManager: EntityManager) => { + const selectQuery: string = ` + SELECT mt."_id" + FROM "MonitorTest" mt + WHERE mt."probeId" = $1 + AND ( + mt."isInQueue" = true + OR ( + mt."isInQueue" = false + AND mt."updatedAt" <= $3 + ) + ) + AND mt."monitorStepProbeResponse" IS NULL + AND mt."deletedAt" IS NULL + ORDER BY mt."createdAt" ASC + LIMIT $2 + FOR UPDATE OF mt SKIP LOCKED + `; + + const selectedRows: Array<{ _id: string }> = + await transactionalEntityManager.query(selectQuery, [ + data.probeId.toString(), + data.limit, + staleClaimThreshold, + ]); + + if (selectedRows.length === 0) { + return []; + } + + const ids: Array = selectedRows.map((row: { _id: string }) => { + return row._id; + }); + + const updateQuery: string = ` + UPDATE "MonitorTest" + SET "isInQueue" = false, + "updatedAt" = now() + WHERE "_id" = ANY($1::uuid[]) + `; + + await transactionalEntityManager.query(updateQuery, [ids]); + + return ids.map((id: string) => { + return new ObjectID(id); + }); + }, + ); + } + + /** + * Merge a single step response into monitorStepProbeResponse atomically. + * This avoids step-response overwrite races when multiple step jobs are + * processed concurrently. + */ + public async mergeStepProbeResponse(data: { + testId: ObjectID; + monitorStepProbeResponse: MonitorStepProbeResponse; + }): Promise { + const testedAt: Date = OneUptimeDate.getCurrentDate(); + + await this.executeTransaction( + async (transactionalEntityManager: EntityManager) => { + const updateQuery: string = ` + UPDATE "MonitorTest" + SET "monitorStepProbeResponse" = COALESCE("monitorStepProbeResponse", '{}'::jsonb) || $2::jsonb, + "testedAt" = $3, + "updatedAt" = now() + WHERE "_id" = $1 + AND "deletedAt" IS NULL + `; + + await transactionalEntityManager.query(updateQuery, [ + data.testId.toString(), + JSON.stringify(data.monitorStepProbeResponse), + testedAt, + ]); + }, + ); + } } export default new Service(); diff --git a/Common/Utils/Memory.ts b/Common/Utils/Memory.ts index 7f1c84e1f2..1f36969d28 100644 --- a/Common/Utils/Memory.ts +++ b/Common/Utils/Memory.ts @@ -1,9 +1,88 @@ +import fs from "fs"; +import os from "os"; import NumberUtil from "./Number"; +const UNLIMITED_CGROUP_THRESHOLD_BYTES: number = 1 << 60; // treat absurdly large cgroup limit as "unlimited" + export default class MemoryUtil { public static convertToGb(memoryInBytes: number): number { const gb: number = memoryInBytes / 1024 / 1024 / 1024; //return two decimal places return NumberUtil.convertToTwoDecimalPlaces(gb); } + + public static getHostFreeMemoryInBytes(): number { + return os.freemem(); + } + + public static getContainerAwareAvailableMemoryInBytes(): number { + const hostFreeMemory: number = this.getHostFreeMemoryInBytes(); + const cgroupAvailableMemory: number | null = + this.getCgroupAvailableMemoryInBytes(); + + if (cgroupAvailableMemory === null) { + return hostFreeMemory; + } + + // Be conservative: never exceed container-available memory. + return Math.min(hostFreeMemory, cgroupAvailableMemory); + } + + public static getCgroupAvailableMemoryInBytes(): number | null { + // cgroup v2 + const v2Limit: number | null = this.readNumericFile( + "/sys/fs/cgroup/memory.max", + ); + const v2Usage: number | null = this.readNumericFile( + "/sys/fs/cgroup/memory.current", + ); + + if ( + v2Limit && + v2Usage !== null && + v2Limit > 0 && + v2Limit < UNLIMITED_CGROUP_THRESHOLD_BYTES + ) { + return Math.max(v2Limit - v2Usage, 0); + } + + // cgroup v1 + const v1Limit: number | null = this.readNumericFile( + "/sys/fs/cgroup/memory/memory.limit_in_bytes", + ); + const v1Usage: number | null = this.readNumericFile( + "/sys/fs/cgroup/memory/memory.usage_in_bytes", + ); + + if ( + v1Limit && + v1Usage !== null && + v1Limit > 0 && + v1Limit < UNLIMITED_CGROUP_THRESHOLD_BYTES + ) { + return Math.max(v1Limit - v1Usage, 0); + } + + return null; + } + + private static readNumericFile(path: string): number | null { + try { + const rawValue: string = fs.readFileSync(path, "utf8").trim(); + + if (!rawValue || rawValue === "max") { + return null; + } + + const value: number = Number(rawValue); + + if (!Number.isFinite(value) || value <= 0) { + return null; + } + + return value; + } catch { + return null; + } + } } diff --git a/Common/Utils/Number.ts b/Common/Utils/Number.ts index d56b4b2fc9..d895daa852 100644 --- a/Common/Utils/Number.ts +++ b/Common/Utils/Number.ts @@ -1,4 +1,31 @@ export default class NumberUtil { + public static parseNumberWithDefault(data: { + value: string | undefined; + defaultValue: number; + min?: number | undefined; + max?: number | undefined; + }): number { + if (data.value === undefined || data.value === null || data.value === "") { + return data.defaultValue; + } + + const parsed: number = Number.parseInt(data.value, 10); + + if (!Number.isFinite(parsed)) { + return data.defaultValue; + } + + if (data.min !== undefined && parsed < data.min) { + return data.defaultValue; + } + + if (data.max !== undefined && parsed > data.max) { + return data.defaultValue; + } + + return parsed; + } + public static getRandomNumber(min: number, max: number): number { return Math.floor(Math.random() * (max - min + 1)) + min; } diff --git a/Probe/Config.ts b/Probe/Config.ts index cdf320f403..eb68fb20ed 100644 --- a/Probe/Config.ts +++ b/Probe/Config.ts @@ -2,10 +2,11 @@ import URL from "Common/Types/API/URL"; import ObjectID from "Common/Types/ObjectID"; import logger from "Common/Server/Utils/Logger"; import Port from "Common/Types/Port"; +import NumberUtil from "Common/Utils/Number"; if (!process.env["PROBE_INGEST_URL"] && !process.env["ONEUPTIME_URL"]) { logger.error("PROBE_INGEST_URL or ONEUPTIME_URL is not set"); - process.exit(); + process.exit(1); } export let PROBE_INGEST_URL: URL = URL.fromString( @@ -35,54 +36,56 @@ export const PROBE_ID: ObjectID | null = process.env["PROBE_ID"] if (!process.env["PROBE_KEY"]) { logger.error("PROBE_KEY is not set"); - process.exit(); + process.exit(1); } export const PROBE_KEY: string = process.env["PROBE_KEY"]; -let probeMonitoringWorkers: string | number = - process.env["PROBE_MONITORING_WORKERS"] || 1; +export const PROBE_MONITORING_WORKERS: number = NumberUtil.parseNumberWithDefault( + { + value: process.env["PROBE_MONITORING_WORKERS"], + defaultValue: 1, + min: 1, + }, +); -if (typeof probeMonitoringWorkers === "string") { - probeMonitoringWorkers = parseInt(probeMonitoringWorkers); -} - -export const PROBE_MONITORING_WORKERS: number = probeMonitoringWorkers; - -let monitorFetchLimit: string | number = - process.env["PROBE_MONITOR_FETCH_LIMIT"] || 10; - -if (typeof monitorFetchLimit === "string") { - monitorFetchLimit = parseInt(monitorFetchLimit); -} - -export const PROBE_MONITOR_FETCH_LIMIT: number = monitorFetchLimit; +export const PROBE_MONITOR_FETCH_LIMIT: number = NumberUtil.parseNumberWithDefault( + { + value: process.env["PROBE_MONITOR_FETCH_LIMIT"], + defaultValue: 10, + min: 1, + }, +); export const HOSTNAME: string = process.env["HOSTNAME"] || "localhost"; -export const PROBE_SYNTHETIC_MONITOR_SCRIPT_TIMEOUT_IN_MS: number = process.env[ - "PROBE_SYNTHETIC_MONITOR_SCRIPT_TIMEOUT_IN_MS" -] - ? parseInt( - process.env["PROBE_SYNTHETIC_MONITOR_SCRIPT_TIMEOUT_IN_MS"].toString(), - ) - : 60000; +export const PROBE_SYNTHETIC_MONITOR_SCRIPT_TIMEOUT_IN_MS: number = + NumberUtil.parseNumberWithDefault({ + value: process.env["PROBE_SYNTHETIC_MONITOR_SCRIPT_TIMEOUT_IN_MS"], + defaultValue: 60000, + min: 1, + }); -export const PROBE_CUSTOM_CODE_MONITOR_SCRIPT_TIMEOUT_IN_MS: number = process - .env["PROBE_CUSTOM_CODE_MONITOR_SCRIPT_TIMEOUT_IN_MS"] - ? parseInt( - process.env["PROBE_CUSTOM_CODE_MONITOR_SCRIPT_TIMEOUT_IN_MS"].toString(), - ) - : 60000; +export const PROBE_CUSTOM_CODE_MONITOR_SCRIPT_TIMEOUT_IN_MS: number = + NumberUtil.parseNumberWithDefault({ + value: process.env["PROBE_CUSTOM_CODE_MONITOR_SCRIPT_TIMEOUT_IN_MS"], + defaultValue: 60000, + min: 1, + }); -export const PROBE_MONITOR_RETRY_LIMIT: number = process.env[ - "PROBE_MONITOR_RETRY_LIMIT" -] - ? parseInt(process.env["PROBE_MONITOR_RETRY_LIMIT"].toString()) - : 3; +export const PROBE_MONITOR_RETRY_LIMIT: number = + NumberUtil.parseNumberWithDefault({ + value: process.env["PROBE_MONITOR_RETRY_LIMIT"], + defaultValue: 3, + min: 0, + }); export const PORT: Port = new Port( - process.env["PORT"] ? parseInt(process.env["PORT"]) : 3874, + NumberUtil.parseNumberWithDefault({ + value: process.env["PORT"], + defaultValue: 3874, + min: 1, + }), ); /* diff --git a/Probe/Services/Register.ts b/Probe/Services/Register.ts index 56de930094..b461c0b392 100644 --- a/Probe/Services/Register.ts +++ b/Probe/Services/Register.ts @@ -9,6 +9,7 @@ import { import OnlineCheck from "../Utils/OnlineCheck"; import ProbeAPIRequest from "../Utils/ProbeAPIRequest"; import HTTPMethod from "Common/Types/API/HTTPMethod"; +import HTTPErrorResponse from "Common/Types/API/HTTPErrorResponse"; import HTTPResponse from "Common/Types/API/HTTPResponse"; import URL from "Common/Types/API/URL"; import { JSONObject } from "Common/Types/JSON"; @@ -96,6 +97,7 @@ export default class Register { // register probe with 5 retry and 15 seocnd interval between each retry. let currentRetry: number = 0; + let isRegistered: boolean = false; const maxRetry: number = 10; @@ -106,6 +108,7 @@ export default class Register { logger.debug(`Registering probe. Attempt: ${currentRetry + 1}`); await Register._registerProbe(); logger.debug(`Probe registered successfully.`); + isRegistered = true; break; } catch (error) { logger.error( @@ -116,6 +119,12 @@ export default class Register { await Sleep.sleep(retryIntervalInSeconds * 1000); } } + + if (!isRegistered) { + throw new Error( + `Unable to register probe after ${maxRetry} attempts. Check PROBE_KEY / REGISTER_PROBE_KEY / connectivity to ${PROBE_INGEST_URL.toString()}.`, + ); + } } private static async _registerProbe(): Promise { @@ -127,46 +136,68 @@ export default class Register { logger.debug("Registering Probe..."); logger.debug("Sending request to: " + probeRegistrationUrl.toString()); - const result: HTTPResponse = await API.post({ - url: probeRegistrationUrl, - data: { - probeKey: PROBE_KEY, - probeName: PROBE_NAME, - probeDescription: PROBE_DESCRIPTION, - registerProbeKey: RegisterProbeKey.toString(), - }, - options: { - ...ProxyConfig.getRequestProxyAgents(probeRegistrationUrl), - }, - }); + const result: HTTPResponse | HTTPErrorResponse = + await API.post({ + url: probeRegistrationUrl, + data: { + probeKey: PROBE_KEY, + probeName: PROBE_NAME, + probeDescription: PROBE_DESCRIPTION, + registerProbeKey: RegisterProbeKey.toString(), + }, + options: { + ...ProxyConfig.getRequestProxyAgents(probeRegistrationUrl), + }, + }); - if (result.isSuccess()) { - logger.debug("Probe Registered"); - logger.debug(result.data); + if (result instanceof HTTPErrorResponse || !result.isSuccess()) { + const errorMessage: string = + result instanceof HTTPErrorResponse + ? result.message || JSON.stringify(result.data || {}) + : "Unknown registration error"; - const probeId: string = result.data["_id"] as string; - - LocalCache.setString("PROBE", "PROBE_ID", probeId as string); + throw new Error(`Probe registration failed: ${errorMessage}`); } + + logger.debug("Probe Registered"); + logger.debug(result.data); + + const probeId: string = result.data["_id"] as string; + + if (!probeId) { + throw new Error("Probe registration succeeded but probe ID is missing"); + } + + LocalCache.setString("PROBE", "PROBE_ID", probeId as string); } else { // validate probe. if (!PROBE_ID) { logger.error("PROBE_ID or REGISTER_PROBE_KEY should be set"); - return process.exit(); + return process.exit(1); } const aliveUrl: URL = URL.fromString( PROBE_INGEST_URL.toString(), ).addRoute("/alive"); - await API.post({ - url: aliveUrl, - data: { - probeKey: PROBE_KEY.toString(), - probeId: PROBE_ID.toString(), - }, - options: { ...ProxyConfig.getRequestProxyAgents(aliveUrl) }, - }); + const aliveResult: HTTPResponse | HTTPErrorResponse = + await API.post({ + url: aliveUrl, + data: { + probeKey: PROBE_KEY.toString(), + probeId: PROBE_ID.toString(), + }, + options: { ...ProxyConfig.getRequestProxyAgents(aliveUrl) }, + }); + + if (aliveResult instanceof HTTPErrorResponse || !aliveResult.isSuccess()) { + const errorMessage: string = + aliveResult instanceof HTTPErrorResponse + ? aliveResult.message || JSON.stringify(aliveResult.data || {}) + : "Unknown alive validation error"; + + throw new Error(`Probe validation failed: ${errorMessage}`); + } LocalCache.setString("PROBE", "PROBE_ID", PROBE_ID.toString() as string); } diff --git a/Probe/Utils/SyntheticMonitorSemaphore.ts b/Probe/Utils/SyntheticMonitorSemaphore.ts index be2e85e1d9..6c037ec600 100644 --- a/Probe/Utils/SyntheticMonitorSemaphore.ts +++ b/Probe/Utils/SyntheticMonitorSemaphore.ts @@ -1,4 +1,5 @@ import os from "os"; +import MemoryUtil from "Common/Utils/Memory"; import logger from "Common/Server/Utils/Logger"; interface Waiter { @@ -20,10 +21,12 @@ class SyntheticMonitorSemaphore { private activeMonitorIds: Set = new Set(); private calculateMaxSlots(): number { - const free: number = os.freemem(); - const usable: number = free - MEMORY_BUFFER_BYTES; + const availableMemory: number = + MemoryUtil.getContainerAwareAvailableMemoryInBytes(); + const usable: number = availableMemory - MEMORY_BUFFER_BYTES; const slots: number = Math.floor(usable / MEMORY_PER_MONITOR_BYTES); - return Math.max(2, slots); // always allow at least 2 concurrent monitors, even if memory is low, to avoid complete service disruption (with the risk of OOM) + // Always allow at least one monitor so probing continues under pressure. + return Math.max(1, slots); } /** diff --git a/ProbeIngest/API/Monitor.ts b/ProbeIngest/API/Monitor.ts index 40cdd4a058..1bf1937a7b 100644 --- a/ProbeIngest/API/Monitor.ts +++ b/ProbeIngest/API/Monitor.ts @@ -564,11 +564,34 @@ router.post( logger.debug("Fetching test monitor list"); + /* + * Atomically claim monitor tests with FOR UPDATE SKIP LOCKED to prevent + * duplicate test execution across concurrent probe replicas. + */ + const claimedMonitorTestIds: Array = + await MonitorTestService.claimMonitorTestsForProbing({ + probeId: probeId, + limit: limit, + }); + + logger.debug( + `Claimed ${claimedMonitorTestIds.length} monitor tests for probing`, + ); + + if (claimedMonitorTestIds.length === 0) { + logger.debug("No monitor tests to probe"); + return Response.sendEntityArrayResponse( + req, + res, + [], + new PositiveNumber(0), + MonitorTest, + ); + } + const monitorTests: Array = await MonitorTestService.findBy({ query: { - monitorStepProbeResponse: QueryHelper.isNull(), - probeId: probeId, - isInQueue: true, // only get the tests which are in queue + _id: QueryHelper.any(claimedMonitorTestIds), }, sort: { createdAt: SortOrder.Ascending, @@ -590,26 +613,6 @@ router.post( logger.debug("Fetched monitor tests"); logger.debug(monitorTests); - // update the lastMonitoredAt field of the monitors - - const updatePromises: Array> = []; - - for (const monitorTest of monitorTests) { - updatePromises.push( - MonitorTestService.updateOneById({ - id: monitorTest.id!, - data: { - isInQueue: false, // in progress now - }, - props: { - isRoot: true, - }, - }), - ); - } - - await Promise.all(updatePromises); - logger.debug("Populating secrets"); logger.debug(monitorTests); diff --git a/ProbeIngest/Jobs/ProbeIngest/ProcessProbeIngest.ts b/ProbeIngest/Jobs/ProbeIngest/ProcessProbeIngest.ts index 9e8b996bb5..1ac6631356 100644 --- a/ProbeIngest/Jobs/ProbeIngest/ProcessProbeIngest.ts +++ b/ProbeIngest/Jobs/ProbeIngest/ProcessProbeIngest.ts @@ -94,15 +94,9 @@ async function processProbeFromQueue( } as ProbeMonitorResponse, }; - // eslint-disable-next-line @typescript-eslint/no-explicit-any - await (MonitorTestService as any).updateOneById({ - id: testId, - data: { - monitorStepProbeResponse: stepResponse, - }, - props: { - isRoot: true, - }, + await MonitorTestService.mergeStepProbeResponse({ + testId: testId, + monitorStepProbeResponse: stepResponse, }); } else { throw new BadDataException(`Invalid job type: ${jobData.jobType}`);