Probe fixes

This commit is contained in:
Nawaz Dhandala
2026-03-01 12:41:42 +00:00
parent 449ee826d4
commit 098a4e624f
8 changed files with 341 additions and 100 deletions

View File

@@ -1,11 +1,112 @@
import DatabaseService from "./DatabaseService";
import DatabaseService, { EntityManager } from "./DatabaseService";
import MonitorTest from "../../Models/DatabaseModels/MonitorTest";
import ObjectID from "../../Types/ObjectID";
import OneUptimeDate from "../../Types/Date";
import { MonitorStepProbeResponse } from "../../Models/DatabaseModels/MonitorProbe";
export class Service extends DatabaseService<MonitorTest> {
private static readonly STALE_TEST_CLAIM_TIMEOUT_IN_MINUTES: number = 10;
public constructor() {
super(MonitorTest);
this.hardDeleteItemsOlderThanInDays("createdAt", 2); // this is temporary data. Clear it after 2 days.
}
/**
* Atomically claims monitor tests for a specific probe instance using
* FOR UPDATE SKIP LOCKED so concurrent probe replicas do not execute the
* same monitor test.
*/
public async claimMonitorTestsForProbing(data: {
probeId: ObjectID;
limit: number;
}): Promise<Array<ObjectID>> {
const staleClaimThreshold: Date = OneUptimeDate.addRemoveMinutes(
OneUptimeDate.getCurrentDate(),
-Service.STALE_TEST_CLAIM_TIMEOUT_IN_MINUTES,
);
return await this.executeTransaction(
async (transactionalEntityManager: EntityManager) => {
const selectQuery: string = `
SELECT mt."_id"
FROM "MonitorTest" mt
WHERE mt."probeId" = $1
AND (
mt."isInQueue" = true
OR (
mt."isInQueue" = false
AND mt."updatedAt" <= $3
)
)
AND mt."monitorStepProbeResponse" IS NULL
AND mt."deletedAt" IS NULL
ORDER BY mt."createdAt" ASC
LIMIT $2
FOR UPDATE OF mt SKIP LOCKED
`;
const selectedRows: Array<{ _id: string }> =
await transactionalEntityManager.query(selectQuery, [
data.probeId.toString(),
data.limit,
staleClaimThreshold,
]);
if (selectedRows.length === 0) {
return [];
}
const ids: Array<string> = selectedRows.map((row: { _id: string }) => {
return row._id;
});
const updateQuery: string = `
UPDATE "MonitorTest"
SET "isInQueue" = false,
"updatedAt" = now()
WHERE "_id" = ANY($1::uuid[])
`;
await transactionalEntityManager.query(updateQuery, [ids]);
return ids.map((id: string) => {
return new ObjectID(id);
});
},
);
}
/**
* Merge a single step response into monitorStepProbeResponse atomically.
* This avoids step-response overwrite races when multiple step jobs are
* processed concurrently.
*/
public async mergeStepProbeResponse(data: {
testId: ObjectID;
monitorStepProbeResponse: MonitorStepProbeResponse;
}): Promise<void> {
const testedAt: Date = OneUptimeDate.getCurrentDate();
await this.executeTransaction(
async (transactionalEntityManager: EntityManager) => {
const updateQuery: string = `
UPDATE "MonitorTest"
SET "monitorStepProbeResponse" = COALESCE("monitorStepProbeResponse", '{}'::jsonb) || $2::jsonb,
"testedAt" = $3,
"updatedAt" = now()
WHERE "_id" = $1
AND "deletedAt" IS NULL
`;
await transactionalEntityManager.query(updateQuery, [
data.testId.toString(),
JSON.stringify(data.monitorStepProbeResponse),
testedAt,
]);
},
);
}
}
export default new Service();

View File

@@ -1,9 +1,88 @@
import fs from "fs";
import os from "os";
import NumberUtil from "./Number";
const UNLIMITED_CGROUP_THRESHOLD_BYTES: number = 1 << 60; // treat absurdly large cgroup limit as "unlimited"
export default class MemoryUtil {
public static convertToGb(memoryInBytes: number): number {
const gb: number = memoryInBytes / 1024 / 1024 / 1024;
//return two decimal places
return NumberUtil.convertToTwoDecimalPlaces(gb);
}
public static getHostFreeMemoryInBytes(): number {
return os.freemem();
}
public static getContainerAwareAvailableMemoryInBytes(): number {
const hostFreeMemory: number = this.getHostFreeMemoryInBytes();
const cgroupAvailableMemory: number | null =
this.getCgroupAvailableMemoryInBytes();
if (cgroupAvailableMemory === null) {
return hostFreeMemory;
}
// Be conservative: never exceed container-available memory.
return Math.min(hostFreeMemory, cgroupAvailableMemory);
}
public static getCgroupAvailableMemoryInBytes(): number | null {
// cgroup v2
const v2Limit: number | null = this.readNumericFile(
"/sys/fs/cgroup/memory.max",
);
const v2Usage: number | null = this.readNumericFile(
"/sys/fs/cgroup/memory.current",
);
if (
v2Limit &&
v2Usage !== null &&
v2Limit > 0 &&
v2Limit < UNLIMITED_CGROUP_THRESHOLD_BYTES
) {
return Math.max(v2Limit - v2Usage, 0);
}
// cgroup v1
const v1Limit: number | null = this.readNumericFile(
"/sys/fs/cgroup/memory/memory.limit_in_bytes",
);
const v1Usage: number | null = this.readNumericFile(
"/sys/fs/cgroup/memory/memory.usage_in_bytes",
);
if (
v1Limit &&
v1Usage !== null &&
v1Limit > 0 &&
v1Limit < UNLIMITED_CGROUP_THRESHOLD_BYTES
) {
return Math.max(v1Limit - v1Usage, 0);
}
return null;
}
private static readNumericFile(path: string): number | null {
try {
const rawValue: string = fs.readFileSync(path, "utf8").trim();
if (!rawValue || rawValue === "max") {
return null;
}
const value: number = Number(rawValue);
if (!Number.isFinite(value) || value <= 0) {
return null;
}
return value;
} catch {
return null;
}
}
}

View File

@@ -1,4 +1,31 @@
export default class NumberUtil {
public static parseNumberWithDefault(data: {
value: string | undefined;
defaultValue: number;
min?: number | undefined;
max?: number | undefined;
}): number {
if (data.value === undefined || data.value === null || data.value === "") {
return data.defaultValue;
}
const parsed: number = Number.parseInt(data.value, 10);
if (!Number.isFinite(parsed)) {
return data.defaultValue;
}
if (data.min !== undefined && parsed < data.min) {
return data.defaultValue;
}
if (data.max !== undefined && parsed > data.max) {
return data.defaultValue;
}
return parsed;
}
public static getRandomNumber(min: number, max: number): number {
return Math.floor(Math.random() * (max - min + 1)) + min;
}

View File

@@ -2,10 +2,11 @@ import URL from "Common/Types/API/URL";
import ObjectID from "Common/Types/ObjectID";
import logger from "Common/Server/Utils/Logger";
import Port from "Common/Types/Port";
import NumberUtil from "Common/Utils/Number";
if (!process.env["PROBE_INGEST_URL"] && !process.env["ONEUPTIME_URL"]) {
logger.error("PROBE_INGEST_URL or ONEUPTIME_URL is not set");
process.exit();
process.exit(1);
}
export let PROBE_INGEST_URL: URL = URL.fromString(
@@ -35,54 +36,56 @@ export const PROBE_ID: ObjectID | null = process.env["PROBE_ID"]
if (!process.env["PROBE_KEY"]) {
logger.error("PROBE_KEY is not set");
process.exit();
process.exit(1);
}
export const PROBE_KEY: string = process.env["PROBE_KEY"];
let probeMonitoringWorkers: string | number =
process.env["PROBE_MONITORING_WORKERS"] || 1;
export const PROBE_MONITORING_WORKERS: number = NumberUtil.parseNumberWithDefault(
{
value: process.env["PROBE_MONITORING_WORKERS"],
defaultValue: 1,
min: 1,
},
);
if (typeof probeMonitoringWorkers === "string") {
probeMonitoringWorkers = parseInt(probeMonitoringWorkers);
}
export const PROBE_MONITORING_WORKERS: number = probeMonitoringWorkers;
let monitorFetchLimit: string | number =
process.env["PROBE_MONITOR_FETCH_LIMIT"] || 10;
if (typeof monitorFetchLimit === "string") {
monitorFetchLimit = parseInt(monitorFetchLimit);
}
export const PROBE_MONITOR_FETCH_LIMIT: number = monitorFetchLimit;
export const PROBE_MONITOR_FETCH_LIMIT: number = NumberUtil.parseNumberWithDefault(
{
value: process.env["PROBE_MONITOR_FETCH_LIMIT"],
defaultValue: 10,
min: 1,
},
);
export const HOSTNAME: string = process.env["HOSTNAME"] || "localhost";
export const PROBE_SYNTHETIC_MONITOR_SCRIPT_TIMEOUT_IN_MS: number = process.env[
"PROBE_SYNTHETIC_MONITOR_SCRIPT_TIMEOUT_IN_MS"
]
? parseInt(
process.env["PROBE_SYNTHETIC_MONITOR_SCRIPT_TIMEOUT_IN_MS"].toString(),
)
: 60000;
export const PROBE_SYNTHETIC_MONITOR_SCRIPT_TIMEOUT_IN_MS: number =
NumberUtil.parseNumberWithDefault({
value: process.env["PROBE_SYNTHETIC_MONITOR_SCRIPT_TIMEOUT_IN_MS"],
defaultValue: 60000,
min: 1,
});
export const PROBE_CUSTOM_CODE_MONITOR_SCRIPT_TIMEOUT_IN_MS: number = process
.env["PROBE_CUSTOM_CODE_MONITOR_SCRIPT_TIMEOUT_IN_MS"]
? parseInt(
process.env["PROBE_CUSTOM_CODE_MONITOR_SCRIPT_TIMEOUT_IN_MS"].toString(),
)
: 60000;
export const PROBE_CUSTOM_CODE_MONITOR_SCRIPT_TIMEOUT_IN_MS: number =
NumberUtil.parseNumberWithDefault({
value: process.env["PROBE_CUSTOM_CODE_MONITOR_SCRIPT_TIMEOUT_IN_MS"],
defaultValue: 60000,
min: 1,
});
export const PROBE_MONITOR_RETRY_LIMIT: number = process.env[
"PROBE_MONITOR_RETRY_LIMIT"
]
? parseInt(process.env["PROBE_MONITOR_RETRY_LIMIT"].toString())
: 3;
export const PROBE_MONITOR_RETRY_LIMIT: number =
NumberUtil.parseNumberWithDefault({
value: process.env["PROBE_MONITOR_RETRY_LIMIT"],
defaultValue: 3,
min: 0,
});
export const PORT: Port = new Port(
process.env["PORT"] ? parseInt(process.env["PORT"]) : 3874,
NumberUtil.parseNumberWithDefault({
value: process.env["PORT"],
defaultValue: 3874,
min: 1,
}),
);
/*

View File

@@ -9,6 +9,7 @@ import {
import OnlineCheck from "../Utils/OnlineCheck";
import ProbeAPIRequest from "../Utils/ProbeAPIRequest";
import HTTPMethod from "Common/Types/API/HTTPMethod";
import HTTPErrorResponse from "Common/Types/API/HTTPErrorResponse";
import HTTPResponse from "Common/Types/API/HTTPResponse";
import URL from "Common/Types/API/URL";
import { JSONObject } from "Common/Types/JSON";
@@ -96,6 +97,7 @@ export default class Register {
// register probe with 5 retry and 15 seocnd interval between each retry.
let currentRetry: number = 0;
let isRegistered: boolean = false;
const maxRetry: number = 10;
@@ -106,6 +108,7 @@ export default class Register {
logger.debug(`Registering probe. Attempt: ${currentRetry + 1}`);
await Register._registerProbe();
logger.debug(`Probe registered successfully.`);
isRegistered = true;
break;
} catch (error) {
logger.error(
@@ -116,6 +119,12 @@ export default class Register {
await Sleep.sleep(retryIntervalInSeconds * 1000);
}
}
if (!isRegistered) {
throw new Error(
`Unable to register probe after ${maxRetry} attempts. Check PROBE_KEY / REGISTER_PROBE_KEY / connectivity to ${PROBE_INGEST_URL.toString()}.`,
);
}
}
private static async _registerProbe(): Promise<void> {
@@ -127,46 +136,68 @@ export default class Register {
logger.debug("Registering Probe...");
logger.debug("Sending request to: " + probeRegistrationUrl.toString());
const result: HTTPResponse<JSONObject> = await API.post({
url: probeRegistrationUrl,
data: {
probeKey: PROBE_KEY,
probeName: PROBE_NAME,
probeDescription: PROBE_DESCRIPTION,
registerProbeKey: RegisterProbeKey.toString(),
},
options: {
...ProxyConfig.getRequestProxyAgents(probeRegistrationUrl),
},
});
const result: HTTPResponse<JSONObject> | HTTPErrorResponse =
await API.post({
url: probeRegistrationUrl,
data: {
probeKey: PROBE_KEY,
probeName: PROBE_NAME,
probeDescription: PROBE_DESCRIPTION,
registerProbeKey: RegisterProbeKey.toString(),
},
options: {
...ProxyConfig.getRequestProxyAgents(probeRegistrationUrl),
},
});
if (result.isSuccess()) {
logger.debug("Probe Registered");
logger.debug(result.data);
if (result instanceof HTTPErrorResponse || !result.isSuccess()) {
const errorMessage: string =
result instanceof HTTPErrorResponse
? result.message || JSON.stringify(result.data || {})
: "Unknown registration error";
const probeId: string = result.data["_id"] as string;
LocalCache.setString("PROBE", "PROBE_ID", probeId as string);
throw new Error(`Probe registration failed: ${errorMessage}`);
}
logger.debug("Probe Registered");
logger.debug(result.data);
const probeId: string = result.data["_id"] as string;
if (!probeId) {
throw new Error("Probe registration succeeded but probe ID is missing");
}
LocalCache.setString("PROBE", "PROBE_ID", probeId as string);
} else {
// validate probe.
if (!PROBE_ID) {
logger.error("PROBE_ID or REGISTER_PROBE_KEY should be set");
return process.exit();
return process.exit(1);
}
const aliveUrl: URL = URL.fromString(
PROBE_INGEST_URL.toString(),
).addRoute("/alive");
await API.post({
url: aliveUrl,
data: {
probeKey: PROBE_KEY.toString(),
probeId: PROBE_ID.toString(),
},
options: { ...ProxyConfig.getRequestProxyAgents(aliveUrl) },
});
const aliveResult: HTTPResponse<JSONObject> | HTTPErrorResponse =
await API.post({
url: aliveUrl,
data: {
probeKey: PROBE_KEY.toString(),
probeId: PROBE_ID.toString(),
},
options: { ...ProxyConfig.getRequestProxyAgents(aliveUrl) },
});
if (aliveResult instanceof HTTPErrorResponse || !aliveResult.isSuccess()) {
const errorMessage: string =
aliveResult instanceof HTTPErrorResponse
? aliveResult.message || JSON.stringify(aliveResult.data || {})
: "Unknown alive validation error";
throw new Error(`Probe validation failed: ${errorMessage}`);
}
LocalCache.setString("PROBE", "PROBE_ID", PROBE_ID.toString() as string);
}

View File

@@ -1,4 +1,5 @@
import os from "os";
import MemoryUtil from "Common/Utils/Memory";
import logger from "Common/Server/Utils/Logger";
interface Waiter {
@@ -20,10 +21,12 @@ class SyntheticMonitorSemaphore {
private activeMonitorIds: Set<string> = new Set();
private calculateMaxSlots(): number {
const free: number = os.freemem();
const usable: number = free - MEMORY_BUFFER_BYTES;
const availableMemory: number =
MemoryUtil.getContainerAwareAvailableMemoryInBytes();
const usable: number = availableMemory - MEMORY_BUFFER_BYTES;
const slots: number = Math.floor(usable / MEMORY_PER_MONITOR_BYTES);
return Math.max(2, slots); // always allow at least 2 concurrent monitors, even if memory is low, to avoid complete service disruption (with the risk of OOM)
// Always allow at least one monitor so probing continues under pressure.
return Math.max(1, slots);
}
/**

View File

@@ -564,11 +564,34 @@ router.post(
logger.debug("Fetching test monitor list");
/*
* Atomically claim monitor tests with FOR UPDATE SKIP LOCKED to prevent
* duplicate test execution across concurrent probe replicas.
*/
const claimedMonitorTestIds: Array<ObjectID> =
await MonitorTestService.claimMonitorTestsForProbing({
probeId: probeId,
limit: limit,
});
logger.debug(
`Claimed ${claimedMonitorTestIds.length} monitor tests for probing`,
);
if (claimedMonitorTestIds.length === 0) {
logger.debug("No monitor tests to probe");
return Response.sendEntityArrayResponse(
req,
res,
[],
new PositiveNumber(0),
MonitorTest,
);
}
const monitorTests: Array<MonitorTest> = await MonitorTestService.findBy({
query: {
monitorStepProbeResponse: QueryHelper.isNull(),
probeId: probeId,
isInQueue: true, // only get the tests which are in queue
_id: QueryHelper.any(claimedMonitorTestIds),
},
sort: {
createdAt: SortOrder.Ascending,
@@ -590,26 +613,6 @@ router.post(
logger.debug("Fetched monitor tests");
logger.debug(monitorTests);
// update the lastMonitoredAt field of the monitors
const updatePromises: Array<Promise<void>> = [];
for (const monitorTest of monitorTests) {
updatePromises.push(
MonitorTestService.updateOneById({
id: monitorTest.id!,
data: {
isInQueue: false, // in progress now
},
props: {
isRoot: true,
},
}),
);
}
await Promise.all(updatePromises);
logger.debug("Populating secrets");
logger.debug(monitorTests);

View File

@@ -94,15 +94,9 @@ async function processProbeFromQueue(
} as ProbeMonitorResponse,
};
// eslint-disable-next-line @typescript-eslint/no-explicit-any
await (MonitorTestService as any).updateOneById({
id: testId,
data: {
monitorStepProbeResponse: stepResponse,
},
props: {
isRoot: true,
},
await MonitorTestService.mergeStepProbeResponse({
testId: testId,
monitorStepProbeResponse: stepResponse,
});
} else {
throw new BadDataException(`Invalid job type: ${jobData.jobType}`);