feat: implement atomic claiming of monitor probes using FOR UPDATE SKIP LOCKED

This commit is contained in:
Nawaz Dhandala
2026-01-19 13:59:57 +00:00
parent b714ad168c
commit ecfcbae86b
2 changed files with 116 additions and 37 deletions

View File

@@ -10,6 +10,8 @@ import { LIMIT_PER_PROJECT } from "../../Types/Database/LimitMax";
import MonitorService from "./MonitorService";
import CronTab from "../Utils/CronTab";
import logger from "../Utils/Logger";
import PostgresAppInstance from "../Infrastructure/PostgresDatabase";
import { DataSource } from "typeorm";
export class Service extends DatabaseService<MonitorProbe> {
public constructor() {
@@ -69,6 +71,95 @@ export class Service extends DatabaseService<MonitorProbe> {
}
}
/**
* Atomically claims monitor probes for a specific probe instance.
* Uses PostgreSQL's FOR UPDATE SKIP LOCKED to prevent multiple probe instances
* from picking up the same monitors simultaneously.
*
* @param data - Object containing probeId, limit, and nextPingAt
* @returns Array of claimed MonitorProbe IDs
*/
public async claimMonitorProbesForProbing(data: {
probeId: ObjectID;
limit: number;
}): Promise<Array<ObjectID>> {
const dataSource: DataSource | null = PostgresAppInstance.getDataSource();
if (!dataSource) {
throw new BadDataException("Database connection not available");
}
const currentDate: Date = OneUptimeDate.getCurrentDate();
// Use a transaction with FOR UPDATE SKIP LOCKED to atomically claim monitors
// This prevents multiple probe instances from picking up the same monitors
const claimedIds: Array<ObjectID> = await dataSource.transaction(
async (transactionalEntityManager) => {
// First, select and lock the monitor probes that need to be processed
// FOR UPDATE SKIP LOCKED ensures that:
// 1. Rows are locked for this transaction
// 2. Rows already locked by other transactions are skipped
const selectQuery: string = `
SELECT mp."_id"
FROM "MonitorProbe" mp
INNER JOIN "Monitor" m ON mp."monitorId" = m."_id"
INNER JOIN "Project" p ON mp."projectId" = p."_id"
WHERE mp."probeId" = $1
AND mp."isEnabled" = true
AND mp."deletedAt" IS NULL
AND (mp."nextPingAt" IS NULL OR mp."nextPingAt" <= $2)
AND m."disableActiveMonitoring" = false
AND m."deletedAt" IS NULL
AND p."deletedAt" IS NULL
ORDER BY mp."nextPingAt" ASC NULLS FIRST
LIMIT $3
FOR UPDATE OF mp SKIP LOCKED
`;
const selectedRows: Array<{ _id: string }> =
await transactionalEntityManager.query(selectQuery, [
data.probeId.toString(),
currentDate,
data.limit,
]);
if (selectedRows.length === 0) {
return [];
}
const ids: Array<string> = selectedRows.map((row) => {
return row._id;
});
// Update the claimed monitors to set nextPingAt to 1 minute from now
// This is a temporary value; the actual nextPingAt will be calculated
// based on the monitor's interval after the probe fetches the full details
const tempNextPingAt: Date = OneUptimeDate.addRemoveMinutes(
currentDate,
1,
);
const updateQuery: string = `
UPDATE "MonitorProbe"
SET "lastPingAt" = $1, "nextPingAt" = $2
WHERE "_id" = ANY($3::uuid[])
`;
await transactionalEntityManager.query(updateQuery, [
currentDate,
tempNextPingAt,
ids,
]);
return ids.map((id: string) => {
return new ObjectID(id);
});
},
);
return claimedIds;
}
protected override async onBeforeCreate(
createBy: CreateBy<MonitorProbe>,
): Promise<OnCreate<MonitorProbe>> {

View File

@@ -19,7 +19,6 @@ import Express, {
ExpressResponse,
ExpressRouter,
NextFunction,
OneUptimeRequest,
} from "Common/Server/Utils/Express";
import logger from "Common/Server/Utils/Logger";
import Response from "Common/Server/Utils/Response";
@@ -30,7 +29,6 @@ import ProjectService from "Common/Server/Services/ProjectService";
import MonitorType from "Common/Types/Monitor/MonitorType";
import MonitorTest from "Common/Models/DatabaseModels/MonitorTest";
import MonitorTestService from "Common/Server/Services/MonitorTestService";
import NumberUtil from "Common/Utils/Number";
const router: ExpressRouter = Express.getRouter();
@@ -384,46 +382,35 @@ router.post(
logger.debug("Fetching monitor list for probes");
/*
* we do this to distribute the load among the probes.
* so every request will get a different set of monitors to monitor
* const moduloBy: number = 10;
* const reminder: number = NumberUtil.getRandomNumber(0, 100) % moduloBy;
*/
// Atomically claim monitors for this probe instance using FOR UPDATE SKIP LOCKED
// This prevents multiple instances of the same probe from picking up the same monitors
const claimedMonitorProbeIds: Array<ObjectID> =
await MonitorProbeService.claimMonitorProbesForProbing({
probeId: probeId,
limit: limit,
});
const count: PositiveNumber = await MonitorProbeService.countBy({
query: {
...getMonitorFetchQuery((req as OneUptimeRequest).probe!.id!),
// version: QueryHelper.modulo(moduloBy, reminder), // distribute the load among the probes
},
skip: 0,
props: {
isRoot: true,
},
});
logger.debug(
`Claimed ${claimedMonitorProbeIds.length} monitor probes for probing`,
);
/*
* we do this to distribute the load among the probes.
* so every request will get a different set of monitors to monitor
*/
const countNumber: number = count.toNumber();
let skip: number = 0;
if (countNumber > limit) {
skip = NumberUtil.getRandomNumber(0, countNumber - limit);
if (claimedMonitorProbeIds.length === 0) {
logger.debug("No monitors to probe");
return Response.sendEntityArrayResponse(
req,
res,
[],
new PositiveNumber(0),
Monitor,
);
}
// Fetch the full monitor details for the claimed monitors
const monitorProbes: Array<MonitorProbe> =
await MonitorProbeService.findBy({
query: {
...getMonitorFetchQuery((req as OneUptimeRequest).probe!.id!),
// version: QueryHelper.modulo(moduloBy, reminder), // distribute the load among the probes
_id: QueryHelper.any(claimedMonitorProbeIds),
},
sort: {
nextPingAt: SortOrder.Ascending,
},
skip: skip,
limit: limit,
select: {
nextPingAt: true,
probeId: true,
@@ -435,6 +422,8 @@ router.post(
projectId: true,
},
},
limit: limit,
skip: 0,
props: {
isRoot: true,
},
@@ -443,8 +432,7 @@ router.post(
logger.debug("Fetched monitor list");
logger.debug(monitorProbes);
// update the lastMonitoredAt field of the monitors
// Update the nextPingAt based on the actual monitoring interval
const updatePromises: Array<Promise<void>> = [];
for (const monitorProbe of monitorProbes) {
@@ -469,7 +457,6 @@ router.post(
MonitorProbeService.updateOneById({
id: monitorProbe.id!,
data: {
lastPingAt: OneUptimeDate.getCurrentDate(),
nextPingAt: nextPing,
},
props: {
@@ -479,6 +466,7 @@ router.post(
);
}
// Update nextPingAt in parallel - this refines the temporary value set during claiming
await Promise.all(updatePromises);
const monitors: Array<Monitor> = monitorProbes