refactor: Remove SyntheticMonitorSemaphore and SyntheticMonitorWorkerPool for improved code simplicity

This commit is contained in:
Nawaz Dhandala
2026-03-02 10:25:57 +00:00
parent 4598f0d751
commit f7bb93e439
3 changed files with 1 additions and 672 deletions

View File

@@ -15,7 +15,6 @@ import API from "Common/Utils/API";
import logger from "Common/Server/Utils/Logger";
import ProbeAPIRequest from "../Utils/ProbeAPIRequest";
import ProxyConfig from "../Utils/ProxyConfig";
import SyntheticMonitorSemaphore from "../Utils/SyntheticMonitorSemaphore";
const router: ExpressRouter = Express.getRouter();
@@ -84,21 +83,8 @@ router.get(
}
}
/*
* Include synthetic monitor semaphore pressure: monitors queued
* waiting for a browser slot indicate the probe is at capacity and
* Kubernetes should scale up more probe replicas.
*/
const semaphoreStatus: {
running: number;
queued: number;
maxSlots: number;
} = SyntheticMonitorSemaphore.getStatus();
queueSize += semaphoreStatus.queued;
logger.debug(
`Pending monitor count for KEDA: ${queueSize} (API pending: ${queueSize - semaphoreStatus.queued}, semaphore queued: ${semaphoreStatus.queued}, semaphore running: ${semaphoreStatus.running}/${semaphoreStatus.maxSlots})`,
`Pending monitor count for KEDA: ${queueSize}`,
);
return Response.sendJsonObjectResponse(req, res, {

View File

@@ -1,139 +0,0 @@
import os from "os";
import MemoryUtil from "Common/Server/Utils/Memory";
import logger from "Common/Server/Utils/Logger";
interface Waiter {
resolve: (value: boolean) => void;
reject: (reason: Error) => void;
timer: ReturnType<typeof setTimeout>;
monitorId?: string | undefined;
}
const MEMORY_BUFFER_BYTES: number = 256 * 1024 * 1024; // 256 MB reserved for probe + OS
const MEMORY_PER_MONITOR_BYTES: number = 400 * 1024 * 1024; // ~400 MB per fork (Node + multi-process Playwright)
const ACQUIRE_TIMEOUT_MS: number = 5 * 60 * 1000; // 5 minutes
const MAX_QUEUE_DEPTH: number = 20; // max waiters in queue before rejecting
class SyntheticMonitorSemaphore {
private running: number = 0;
private queue: Waiter[] = [];
// Track monitorIds that are currently running or queued to prevent duplicates
private activeMonitorIds: Set<string> = new Set();
private calculateMaxSlots(): number {
const availableMemory: number =
MemoryUtil.getContainerAwareAvailableMemoryInBytes();
const usable: number = availableMemory - MEMORY_BUFFER_BYTES;
const slots: number = Math.floor(usable / MEMORY_PER_MONITOR_BYTES);
// Always allow at least one monitor so probing continues under pressure.
return Math.max(1, slots);
}
/**
* @returns true if the slot was acquired, false if this monitorId is already running/queued (duplicate skipped).
*/
public async acquire(monitorId?: string | undefined): Promise<boolean> {
// Deduplicate: if this monitor is already running or queued, skip it
if (monitorId && this.activeMonitorIds.has(monitorId)) {
logger.debug(
`SyntheticMonitorSemaphore: monitor ${monitorId} is already running or queued, skipping duplicate`,
);
return false;
}
if (monitorId) {
this.activeMonitorIds.add(monitorId);
}
const maxSlots: number = this.calculateMaxSlots();
if (this.running < maxSlots) {
this.running++;
logger.debug(
`SyntheticMonitorSemaphore: acquired slot (${this.running}/${maxSlots} active, ${this.queue.length} queued, freemem=${Math.round(os.freemem() / 1024 / 1024)}MB)`,
);
return true;
}
// Reject fast if the queue is already full — avoids a guaranteed 5-minute timeout
if (this.queue.length >= MAX_QUEUE_DEPTH) {
if (monitorId) {
this.activeMonitorIds.delete(monitorId);
}
throw new Error(
`SyntheticMonitorSemaphore: queue is full (${MAX_QUEUE_DEPTH} waiters). ` +
`Try again later or reduce synthetic monitor concurrency.`,
);
}
logger.debug(
`SyntheticMonitorSemaphore: all ${maxSlots} slots in use (${this.queue.length} already queued), waiting...`,
);
return new Promise<boolean>(
(resolve: (value: boolean) => void, reject: (reason: Error) => void) => {
const timer: ReturnType<typeof setTimeout> = setTimeout(() => {
const index: number = this.queue.indexOf(waiter);
if (index !== -1) {
this.queue.splice(index, 1);
}
if (monitorId) {
this.activeMonitorIds.delete(monitorId);
}
reject(
new Error(
"SyntheticMonitorSemaphore: timed out waiting for a slot",
),
);
}, ACQUIRE_TIMEOUT_MS);
const waiter: Waiter = { resolve, reject, timer, monitorId };
this.queue.push(waiter);
},
);
}
public release(monitorId?: string | undefined): void {
this.running = Math.max(0, this.running - 1);
if (monitorId) {
this.activeMonitorIds.delete(monitorId);
}
// Re-check available memory and wake as many waiters as slots allow
const maxSlots: number = this.calculateMaxSlots();
let woken: number = 0;
while (this.queue.length > 0 && this.running < maxSlots) {
const next: Waiter = this.queue.shift()!;
clearTimeout(next.timer);
this.running++;
woken++;
next.resolve(true);
}
if (woken > 0) {
logger.debug(
`SyntheticMonitorSemaphore: woke ${woken} queued waiter(s) (${this.running}/${maxSlots} active, ${this.queue.length} still queued, freemem=${Math.round(os.freemem() / 1024 / 1024)}MB)`,
);
} else {
logger.debug(
`SyntheticMonitorSemaphore: released slot (${this.running}/${maxSlots} active, ${this.queue.length} queued, freemem=${Math.round(os.freemem() / 1024 / 1024)}MB)`,
);
}
}
public getStatus(): {
running: number;
queued: number;
maxSlots: number;
} {
return {
running: this.running,
queued: this.queue.length,
maxSlots: this.calculateMaxSlots(),
};
}
}
export default new SyntheticMonitorSemaphore();

View File

@@ -1,518 +0,0 @@
import logger from "Common/Server/Utils/Logger";
import BrowserType from "Common/Types/Monitor/SyntheticMonitors/BrowserType";
import ScreenSizeType from "Common/Types/Monitor/SyntheticMonitors/ScreenSizeType";
import { ChildProcess, fork } from "child_process";
import path from "path";
interface WorkerConfig {
script: string;
browserType: BrowserType;
screenSizeType: ScreenSizeType;
timeout: number;
proxy?:
| {
server: string;
username?: string | undefined;
password?: string | undefined;
}
| undefined;
}
interface WorkerResult {
logMessages: string[];
scriptError?: string | undefined;
result?: unknown | undefined;
screenshots: Record<string, string>;
executionTimeInMS: number;
}
// IPC messages: parent → worker
interface ExecuteMessage {
type: "execute";
id: string;
config: WorkerConfig;
}
interface ShutdownMessage {
type: "shutdown";
}
type ParentToWorkerMessage = ExecuteMessage | ShutdownMessage;
// IPC messages: worker → parent
interface ReadyMessage {
type: "ready";
browserType?: BrowserType | undefined;
}
interface ResultMessage {
type: "result";
id: string;
data: WorkerResult;
}
interface ErrorMessage {
type: "error";
id: string;
error: string;
}
interface LogMessage {
type: "log";
message: string;
}
type WorkerToParentMessage =
| ReadyMessage
| ResultMessage
| ErrorMessage
| LogMessage;
const MAX_EXECUTIONS_PER_WORKER: number = 50;
const WORKER_IDLE_TIMEOUT_MS: number = 5 * 60 * 1000; // 5 minutes
const EXECUTION_TIMEOUT_BUFFER_MS: number = 30 * 1000; // 30s buffer beyond script timeout
interface PoolWorker {
process: ChildProcess;
busy: boolean;
browserType?: BrowserType | undefined;
executionCount: number;
idleTimer?: ReturnType<typeof setTimeout> | undefined;
pendingResolve?: ((value: WorkerResult) => void) | undefined;
pendingReject?: ((reason: Error) => void) | undefined;
pendingTimeoutTimer?: ReturnType<typeof setTimeout> | undefined;
pendingId?: string | undefined;
stderrOutput: string;
}
function getSanitizedEnv(): Record<string, string> {
const safeKeys: string[] = [
"PATH",
"HOME",
"NODE_ENV",
"PLAYWRIGHT_BROWSERS_PATH",
"HTTP_PROXY_URL",
"http_proxy",
"HTTPS_PROXY_URL",
"https_proxy",
"NO_PROXY",
"no_proxy",
];
const env: Record<string, string> = {};
for (const key of safeKeys) {
if (process.env[key]) {
env[key] = process.env[key]!;
}
}
return env;
}
let executionIdCounter: number = 0;
class SyntheticMonitorWorkerPool {
private workers: PoolWorker[] = [];
public constructor() {
process.on("SIGTERM", () => {
this.shutdown().catch((err: unknown) => {
logger.error(
`SyntheticMonitorWorkerPool: error during SIGTERM shutdown: ${err}`,
);
});
});
}
public async execute(
config: WorkerConfig,
timeout: number,
): Promise<WorkerResult> {
const worker: PoolWorker = this.findOrCreateWorker(config.browserType);
// Clear idle timer since the worker is now busy
if (worker.idleTimer) {
clearTimeout(worker.idleTimer);
worker.idleTimer = undefined;
}
worker.busy = true;
worker.executionCount++;
executionIdCounter++;
const executionId: string = `exec-${executionIdCounter}`;
return new Promise<WorkerResult>(
(
resolve: (value: WorkerResult) => void,
reject: (reason: Error) => void,
) => {
worker.pendingResolve = resolve;
worker.pendingReject = reject;
worker.pendingId = executionId;
// Execution timeout: script timeout + buffer
worker.pendingTimeoutTimer = setTimeout(() => {
this.handleWorkerTimeout(worker);
}, timeout + EXECUTION_TIMEOUT_BUFFER_MS);
const message: ParentToWorkerMessage = {
type: "execute",
id: executionId,
config: config,
};
try {
worker.process.send(message);
} catch (sendErr: unknown) {
// IPC channel is broken — clean up and reject immediately
this.clearWorkerPending(worker);
worker.busy = false;
this.removeWorker(worker);
reject(
new Error(
`Failed to send config to worker: ${(sendErr as Error)?.message || String(sendErr)}`,
),
);
}
},
);
}
public async shutdown(): Promise<void> {
logger.debug(
`SyntheticMonitorWorkerPool: shutting down ${this.workers.length} workers`,
);
const shutdownPromises: Promise<void>[] = this.workers.map(
(worker: PoolWorker) => {
return this.shutdownWorker(worker);
},
);
await Promise.allSettled(shutdownPromises);
this.workers = [];
}
private findOrCreateWorker(browserType: BrowserType): PoolWorker {
// 1. Find idle worker with matching browser type (fast path — reuses warm browser)
const matchingIdle: PoolWorker | undefined = this.workers.find(
(w: PoolWorker) => {
return !w.busy && w.browserType === browserType;
},
);
if (matchingIdle) {
logger.debug(
`SyntheticMonitorWorkerPool: reusing idle worker with matching ${browserType} browser`,
);
return matchingIdle;
}
// 2. Find any idle worker (will close/relaunch browser for different type)
const anyIdle: PoolWorker | undefined = this.workers.find(
(w: PoolWorker) => {
return !w.busy;
},
);
if (anyIdle) {
logger.debug(
`SyntheticMonitorWorkerPool: reusing idle worker (browser type change: ${anyIdle.browserType || "none"}${browserType})`,
);
return anyIdle;
}
// 3. No idle worker — fork a new one
logger.debug(
`SyntheticMonitorWorkerPool: forking new worker for ${browserType} (pool size: ${this.workers.length + 1})`,
);
return this.forkWorker();
}
private forkWorker(): PoolWorker {
const workerPath: string = path.resolve(
__dirname,
"Monitors",
"MonitorTypes",
"SyntheticMonitorWorker",
);
const child: ChildProcess = fork(workerPath, [], {
env: getSanitizedEnv(),
execArgv: [...process.execArgv, "--max-old-space-size=256"],
stdio: ["pipe", "pipe", "pipe", "ipc"],
});
const worker: PoolWorker = {
process: child,
busy: false,
browserType: undefined,
executionCount: 0,
stderrOutput: "",
};
// Capture stderr for debugging
if (child.stderr) {
child.stderr.on("data", (data: Buffer) => {
worker.stderrOutput += data.toString();
// Keep only last 2000 chars to prevent memory growth
if (worker.stderrOutput.length > 2000) {
worker.stderrOutput = worker.stderrOutput.slice(-2000);
}
});
}
// Handle messages from worker
child.on("message", (msg: WorkerToParentMessage) => {
this.handleWorkerMessage(worker, msg);
});
// Handle worker crash/exit
child.on("exit", (exitCode: number | null, signal: string | null) => {
this.handleWorkerExit(worker, exitCode, signal);
});
child.on("error", (err: Error) => {
this.handleWorkerError(worker, err);
});
this.workers.push(worker);
return worker;
}
private handleWorkerMessage(
worker: PoolWorker,
msg: WorkerToParentMessage,
): void {
// Forward diagnostic logs from worker to probe logger
if (msg.type === "log") {
logger.debug(msg.message);
return;
}
if (msg.type === "ready") {
worker.browserType = msg.browserType;
return;
}
if (msg.type === "result" && msg.id === worker.pendingId) {
this.resolveWorkerExecution(worker, msg.data);
return;
}
if (msg.type === "error" && msg.id === worker.pendingId) {
this.rejectWorkerExecution(
worker,
new Error(msg.error || "Unknown worker error"),
);
return;
}
}
private resolveWorkerExecution(
worker: PoolWorker,
result: WorkerResult,
): void {
const resolve: ((value: WorkerResult) => void) | undefined =
worker.pendingResolve;
this.clearWorkerPending(worker);
worker.busy = false;
// Retire worker if it has exceeded execution limit
if (worker.executionCount >= MAX_EXECUTIONS_PER_WORKER) {
logger.debug(
`SyntheticMonitorWorkerPool: retiring worker after ${worker.executionCount} executions`,
);
this.retireWorker(worker);
} else {
this.startIdleTimer(worker);
}
if (resolve) {
resolve(result);
}
}
private rejectWorkerExecution(worker: PoolWorker, error: Error): void {
const reject: ((reason: Error) => void) | undefined = worker.pendingReject;
this.clearWorkerPending(worker);
worker.busy = false;
// Retire worker if it has exceeded execution limit (same check as resolveWorkerExecution)
if (worker.executionCount >= MAX_EXECUTIONS_PER_WORKER) {
logger.debug(
`SyntheticMonitorWorkerPool: retiring worker after ${worker.executionCount} executions (error path)`,
);
this.retireWorker(worker);
} else {
this.startIdleTimer(worker);
}
if (reject) {
reject(error);
}
}
private clearWorkerPending(worker: PoolWorker): void {
if (worker.pendingTimeoutTimer) {
clearTimeout(worker.pendingTimeoutTimer);
worker.pendingTimeoutTimer = undefined;
}
worker.pendingResolve = undefined;
worker.pendingReject = undefined;
worker.pendingId = undefined;
}
private handleWorkerTimeout(worker: PoolWorker): void {
logger.error(
`SyntheticMonitorWorkerPool: worker execution timed out, killing worker`,
);
const reject: ((reason: Error) => void) | undefined = worker.pendingReject;
this.clearWorkerPending(worker);
this.removeWorker(worker);
// Force kill the worker process
try {
worker.process.kill("SIGKILL");
} catch {
// ignore — process may have already exited
}
if (reject) {
reject(new Error("Synthetic monitor worker execution timed out"));
}
}
private handleWorkerExit(
worker: PoolWorker,
exitCode: number | null,
signal: string | null,
): void {
const stderrInfo: string = worker.stderrOutput.trim()
? `: ${worker.stderrOutput.trim().substring(0, 500)}`
: "";
logger.debug(
`SyntheticMonitorWorkerPool: worker exited (code=${exitCode}, signal=${signal})${stderrInfo}`,
);
// If there's a pending execution, reject it
if (worker.pendingReject) {
const reject: (reason: Error) => void = worker.pendingReject;
this.clearWorkerPending(worker);
if (exitCode === null) {
const signalInfo: string = signal ? ` (signal: ${signal})` : "";
reject(
new Error(
`Synthetic monitor worker was terminated by the system${signalInfo}. ` +
`This is usually caused by high memory usage or resource limits in the container${stderrInfo}`,
),
);
} else {
reject(
new Error(
`Synthetic monitor worker exited unexpectedly with code ${exitCode}${stderrInfo}`,
),
);
}
}
this.removeWorker(worker);
}
private handleWorkerError(worker: PoolWorker, err: Error): void {
logger.error(`SyntheticMonitorWorkerPool: worker error: ${err.message}`);
if (worker.pendingReject) {
const reject: (reason: Error) => void = worker.pendingReject;
this.clearWorkerPending(worker);
reject(err);
}
this.removeWorker(worker);
}
private startIdleTimer(worker: PoolWorker): void {
if (worker.idleTimer) {
clearTimeout(worker.idleTimer);
}
worker.idleTimer = setTimeout(() => {
if (!worker.busy) {
logger.debug(
`SyntheticMonitorWorkerPool: retiring idle worker (browserType=${worker.browserType || "none"})`,
);
this.retireWorker(worker);
}
}, WORKER_IDLE_TIMEOUT_MS);
// Don't let idle timers prevent process exit
if (worker.idleTimer.unref) {
worker.idleTimer.unref();
}
}
private retireWorker(worker: PoolWorker): void {
this.removeWorker(worker);
this.shutdownWorker(worker).catch((err: unknown) => {
logger.error(`SyntheticMonitorWorkerPool: error retiring worker: ${err}`);
});
}
private removeWorker(worker: PoolWorker): void {
if (worker.idleTimer) {
clearTimeout(worker.idleTimer);
worker.idleTimer = undefined;
}
const index: number = this.workers.indexOf(worker);
if (index !== -1) {
this.workers.splice(index, 1);
}
}
private async shutdownWorker(worker: PoolWorker): Promise<void> {
return new Promise<void>((resolve: () => void) => {
const forceKillTimer: ReturnType<typeof setTimeout> = setTimeout(() => {
try {
worker.process.kill("SIGKILL");
} catch {
// ignore
}
resolve();
}, 5000);
if (forceKillTimer.unref) {
forceKillTimer.unref();
}
worker.process.once("exit", () => {
clearTimeout(forceKillTimer);
resolve();
});
try {
const shutdownMsg: ParentToWorkerMessage = { type: "shutdown" };
worker.process.send(shutdownMsg);
} catch {
// IPC channel already closed — force kill
try {
worker.process.kill("SIGKILL");
} catch {
// ignore
}
clearTimeout(forceKillTimer);
resolve();
}
});
}
}
export default new SyntheticMonitorWorkerPool();