From f7bb93e439603ac0fa59f743ffda0aee645402d1 Mon Sep 17 00:00:00 2001 From: Nawaz Dhandala Date: Mon, 2 Mar 2026 10:25:57 +0000 Subject: [PATCH] refactor: Remove SyntheticMonitorSemaphore and SyntheticMonitorWorkerPool for improved code simplicity --- Probe/API/Metrics.ts | 16 +- Probe/Utils/SyntheticMonitorSemaphore.ts | 139 ------ Probe/Utils/SyntheticMonitorWorkerPool.ts | 518 ---------------------- 3 files changed, 1 insertion(+), 672 deletions(-) delete mode 100644 Probe/Utils/SyntheticMonitorSemaphore.ts delete mode 100644 Probe/Utils/SyntheticMonitorWorkerPool.ts diff --git a/Probe/API/Metrics.ts b/Probe/API/Metrics.ts index 60b4c0eb57..9c8c45887f 100644 --- a/Probe/API/Metrics.ts +++ b/Probe/API/Metrics.ts @@ -15,7 +15,6 @@ import API from "Common/Utils/API"; import logger from "Common/Server/Utils/Logger"; import ProbeAPIRequest from "../Utils/ProbeAPIRequest"; import ProxyConfig from "../Utils/ProxyConfig"; -import SyntheticMonitorSemaphore from "../Utils/SyntheticMonitorSemaphore"; const router: ExpressRouter = Express.getRouter(); @@ -84,21 +83,8 @@ router.get( } } - /* - * Include synthetic monitor semaphore pressure: monitors queued - * waiting for a browser slot indicate the probe is at capacity and - * Kubernetes should scale up more probe replicas. - */ - const semaphoreStatus: { - running: number; - queued: number; - maxSlots: number; - } = SyntheticMonitorSemaphore.getStatus(); - - queueSize += semaphoreStatus.queued; - logger.debug( - `Pending monitor count for KEDA: ${queueSize} (API pending: ${queueSize - semaphoreStatus.queued}, semaphore queued: ${semaphoreStatus.queued}, semaphore running: ${semaphoreStatus.running}/${semaphoreStatus.maxSlots})`, + `Pending monitor count for KEDA: ${queueSize}`, ); return Response.sendJsonObjectResponse(req, res, { diff --git a/Probe/Utils/SyntheticMonitorSemaphore.ts b/Probe/Utils/SyntheticMonitorSemaphore.ts deleted file mode 100644 index db9b44d278..0000000000 --- a/Probe/Utils/SyntheticMonitorSemaphore.ts +++ /dev/null @@ -1,139 +0,0 @@ -import os from "os"; -import MemoryUtil from "Common/Server/Utils/Memory"; -import logger from "Common/Server/Utils/Logger"; - -interface Waiter { - resolve: (value: boolean) => void; - reject: (reason: Error) => void; - timer: ReturnType; - monitorId?: string | undefined; -} - -const MEMORY_BUFFER_BYTES: number = 256 * 1024 * 1024; // 256 MB reserved for probe + OS -const MEMORY_PER_MONITOR_BYTES: number = 400 * 1024 * 1024; // ~400 MB per fork (Node + multi-process Playwright) -const ACQUIRE_TIMEOUT_MS: number = 5 * 60 * 1000; // 5 minutes -const MAX_QUEUE_DEPTH: number = 20; // max waiters in queue before rejecting - -class SyntheticMonitorSemaphore { - private running: number = 0; - private queue: Waiter[] = []; - // Track monitorIds that are currently running or queued to prevent duplicates - private activeMonitorIds: Set = new Set(); - - private calculateMaxSlots(): number { - const availableMemory: number = - MemoryUtil.getContainerAwareAvailableMemoryInBytes(); - const usable: number = availableMemory - MEMORY_BUFFER_BYTES; - const slots: number = Math.floor(usable / MEMORY_PER_MONITOR_BYTES); - // Always allow at least one monitor so probing continues under pressure. - return Math.max(1, slots); - } - - /** - * @returns true if the slot was acquired, false if this monitorId is already running/queued (duplicate skipped). - */ - public async acquire(monitorId?: string | undefined): Promise { - // Deduplicate: if this monitor is already running or queued, skip it - if (monitorId && this.activeMonitorIds.has(monitorId)) { - logger.debug( - `SyntheticMonitorSemaphore: monitor ${monitorId} is already running or queued, skipping duplicate`, - ); - return false; - } - - if (monitorId) { - this.activeMonitorIds.add(monitorId); - } - - const maxSlots: number = this.calculateMaxSlots(); - - if (this.running < maxSlots) { - this.running++; - logger.debug( - `SyntheticMonitorSemaphore: acquired slot (${this.running}/${maxSlots} active, ${this.queue.length} queued, freemem=${Math.round(os.freemem() / 1024 / 1024)}MB)`, - ); - return true; - } - - // Reject fast if the queue is already full — avoids a guaranteed 5-minute timeout - if (this.queue.length >= MAX_QUEUE_DEPTH) { - if (monitorId) { - this.activeMonitorIds.delete(monitorId); - } - throw new Error( - `SyntheticMonitorSemaphore: queue is full (${MAX_QUEUE_DEPTH} waiters). ` + - `Try again later or reduce synthetic monitor concurrency.`, - ); - } - - logger.debug( - `SyntheticMonitorSemaphore: all ${maxSlots} slots in use (${this.queue.length} already queued), waiting...`, - ); - - return new Promise( - (resolve: (value: boolean) => void, reject: (reason: Error) => void) => { - const timer: ReturnType = setTimeout(() => { - const index: number = this.queue.indexOf(waiter); - if (index !== -1) { - this.queue.splice(index, 1); - } - if (monitorId) { - this.activeMonitorIds.delete(monitorId); - } - reject( - new Error( - "SyntheticMonitorSemaphore: timed out waiting for a slot", - ), - ); - }, ACQUIRE_TIMEOUT_MS); - - const waiter: Waiter = { resolve, reject, timer, monitorId }; - this.queue.push(waiter); - }, - ); - } - - public release(monitorId?: string | undefined): void { - this.running = Math.max(0, this.running - 1); - - if (monitorId) { - this.activeMonitorIds.delete(monitorId); - } - - // Re-check available memory and wake as many waiters as slots allow - const maxSlots: number = this.calculateMaxSlots(); - let woken: number = 0; - - while (this.queue.length > 0 && this.running < maxSlots) { - const next: Waiter = this.queue.shift()!; - clearTimeout(next.timer); - this.running++; - woken++; - next.resolve(true); - } - - if (woken > 0) { - logger.debug( - `SyntheticMonitorSemaphore: woke ${woken} queued waiter(s) (${this.running}/${maxSlots} active, ${this.queue.length} still queued, freemem=${Math.round(os.freemem() / 1024 / 1024)}MB)`, - ); - } else { - logger.debug( - `SyntheticMonitorSemaphore: released slot (${this.running}/${maxSlots} active, ${this.queue.length} queued, freemem=${Math.round(os.freemem() / 1024 / 1024)}MB)`, - ); - } - } - - public getStatus(): { - running: number; - queued: number; - maxSlots: number; - } { - return { - running: this.running, - queued: this.queue.length, - maxSlots: this.calculateMaxSlots(), - }; - } -} - -export default new SyntheticMonitorSemaphore(); diff --git a/Probe/Utils/SyntheticMonitorWorkerPool.ts b/Probe/Utils/SyntheticMonitorWorkerPool.ts deleted file mode 100644 index 6d5a05e3f7..0000000000 --- a/Probe/Utils/SyntheticMonitorWorkerPool.ts +++ /dev/null @@ -1,518 +0,0 @@ -import logger from "Common/Server/Utils/Logger"; -import BrowserType from "Common/Types/Monitor/SyntheticMonitors/BrowserType"; -import ScreenSizeType from "Common/Types/Monitor/SyntheticMonitors/ScreenSizeType"; -import { ChildProcess, fork } from "child_process"; -import path from "path"; - -interface WorkerConfig { - script: string; - browserType: BrowserType; - screenSizeType: ScreenSizeType; - timeout: number; - proxy?: - | { - server: string; - username?: string | undefined; - password?: string | undefined; - } - | undefined; -} - -interface WorkerResult { - logMessages: string[]; - scriptError?: string | undefined; - result?: unknown | undefined; - screenshots: Record; - executionTimeInMS: number; -} - -// IPC messages: parent → worker -interface ExecuteMessage { - type: "execute"; - id: string; - config: WorkerConfig; -} - -interface ShutdownMessage { - type: "shutdown"; -} - -type ParentToWorkerMessage = ExecuteMessage | ShutdownMessage; - -// IPC messages: worker → parent -interface ReadyMessage { - type: "ready"; - browserType?: BrowserType | undefined; -} - -interface ResultMessage { - type: "result"; - id: string; - data: WorkerResult; -} - -interface ErrorMessage { - type: "error"; - id: string; - error: string; -} - -interface LogMessage { - type: "log"; - message: string; -} - -type WorkerToParentMessage = - | ReadyMessage - | ResultMessage - | ErrorMessage - | LogMessage; - -const MAX_EXECUTIONS_PER_WORKER: number = 50; -const WORKER_IDLE_TIMEOUT_MS: number = 5 * 60 * 1000; // 5 minutes -const EXECUTION_TIMEOUT_BUFFER_MS: number = 30 * 1000; // 30s buffer beyond script timeout - -interface PoolWorker { - process: ChildProcess; - busy: boolean; - browserType?: BrowserType | undefined; - executionCount: number; - idleTimer?: ReturnType | undefined; - pendingResolve?: ((value: WorkerResult) => void) | undefined; - pendingReject?: ((reason: Error) => void) | undefined; - pendingTimeoutTimer?: ReturnType | undefined; - pendingId?: string | undefined; - stderrOutput: string; -} - -function getSanitizedEnv(): Record { - const safeKeys: string[] = [ - "PATH", - "HOME", - "NODE_ENV", - "PLAYWRIGHT_BROWSERS_PATH", - "HTTP_PROXY_URL", - "http_proxy", - "HTTPS_PROXY_URL", - "https_proxy", - "NO_PROXY", - "no_proxy", - ]; - - const env: Record = {}; - - for (const key of safeKeys) { - if (process.env[key]) { - env[key] = process.env[key]!; - } - } - - return env; -} - -let executionIdCounter: number = 0; - -class SyntheticMonitorWorkerPool { - private workers: PoolWorker[] = []; - - public constructor() { - process.on("SIGTERM", () => { - this.shutdown().catch((err: unknown) => { - logger.error( - `SyntheticMonitorWorkerPool: error during SIGTERM shutdown: ${err}`, - ); - }); - }); - } - - public async execute( - config: WorkerConfig, - timeout: number, - ): Promise { - const worker: PoolWorker = this.findOrCreateWorker(config.browserType); - - // Clear idle timer since the worker is now busy - if (worker.idleTimer) { - clearTimeout(worker.idleTimer); - worker.idleTimer = undefined; - } - - worker.busy = true; - worker.executionCount++; - - executionIdCounter++; - const executionId: string = `exec-${executionIdCounter}`; - - return new Promise( - ( - resolve: (value: WorkerResult) => void, - reject: (reason: Error) => void, - ) => { - worker.pendingResolve = resolve; - worker.pendingReject = reject; - worker.pendingId = executionId; - - // Execution timeout: script timeout + buffer - worker.pendingTimeoutTimer = setTimeout(() => { - this.handleWorkerTimeout(worker); - }, timeout + EXECUTION_TIMEOUT_BUFFER_MS); - - const message: ParentToWorkerMessage = { - type: "execute", - id: executionId, - config: config, - }; - - try { - worker.process.send(message); - } catch (sendErr: unknown) { - // IPC channel is broken — clean up and reject immediately - this.clearWorkerPending(worker); - worker.busy = false; - this.removeWorker(worker); - reject( - new Error( - `Failed to send config to worker: ${(sendErr as Error)?.message || String(sendErr)}`, - ), - ); - } - }, - ); - } - - public async shutdown(): Promise { - logger.debug( - `SyntheticMonitorWorkerPool: shutting down ${this.workers.length} workers`, - ); - - const shutdownPromises: Promise[] = this.workers.map( - (worker: PoolWorker) => { - return this.shutdownWorker(worker); - }, - ); - - await Promise.allSettled(shutdownPromises); - this.workers = []; - } - - private findOrCreateWorker(browserType: BrowserType): PoolWorker { - // 1. Find idle worker with matching browser type (fast path — reuses warm browser) - const matchingIdle: PoolWorker | undefined = this.workers.find( - (w: PoolWorker) => { - return !w.busy && w.browserType === browserType; - }, - ); - - if (matchingIdle) { - logger.debug( - `SyntheticMonitorWorkerPool: reusing idle worker with matching ${browserType} browser`, - ); - return matchingIdle; - } - - // 2. Find any idle worker (will close/relaunch browser for different type) - const anyIdle: PoolWorker | undefined = this.workers.find( - (w: PoolWorker) => { - return !w.busy; - }, - ); - - if (anyIdle) { - logger.debug( - `SyntheticMonitorWorkerPool: reusing idle worker (browser type change: ${anyIdle.browserType || "none"} → ${browserType})`, - ); - return anyIdle; - } - - // 3. No idle worker — fork a new one - logger.debug( - `SyntheticMonitorWorkerPool: forking new worker for ${browserType} (pool size: ${this.workers.length + 1})`, - ); - return this.forkWorker(); - } - - private forkWorker(): PoolWorker { - const workerPath: string = path.resolve( - __dirname, - "Monitors", - "MonitorTypes", - "SyntheticMonitorWorker", - ); - - const child: ChildProcess = fork(workerPath, [], { - env: getSanitizedEnv(), - execArgv: [...process.execArgv, "--max-old-space-size=256"], - stdio: ["pipe", "pipe", "pipe", "ipc"], - }); - - const worker: PoolWorker = { - process: child, - busy: false, - browserType: undefined, - executionCount: 0, - stderrOutput: "", - }; - - // Capture stderr for debugging - if (child.stderr) { - child.stderr.on("data", (data: Buffer) => { - worker.stderrOutput += data.toString(); - // Keep only last 2000 chars to prevent memory growth - if (worker.stderrOutput.length > 2000) { - worker.stderrOutput = worker.stderrOutput.slice(-2000); - } - }); - } - - // Handle messages from worker - child.on("message", (msg: WorkerToParentMessage) => { - this.handleWorkerMessage(worker, msg); - }); - - // Handle worker crash/exit - child.on("exit", (exitCode: number | null, signal: string | null) => { - this.handleWorkerExit(worker, exitCode, signal); - }); - - child.on("error", (err: Error) => { - this.handleWorkerError(worker, err); - }); - - this.workers.push(worker); - return worker; - } - - private handleWorkerMessage( - worker: PoolWorker, - msg: WorkerToParentMessage, - ): void { - // Forward diagnostic logs from worker to probe logger - if (msg.type === "log") { - logger.debug(msg.message); - return; - } - - if (msg.type === "ready") { - worker.browserType = msg.browserType; - return; - } - - if (msg.type === "result" && msg.id === worker.pendingId) { - this.resolveWorkerExecution(worker, msg.data); - return; - } - - if (msg.type === "error" && msg.id === worker.pendingId) { - this.rejectWorkerExecution( - worker, - new Error(msg.error || "Unknown worker error"), - ); - return; - } - } - - private resolveWorkerExecution( - worker: PoolWorker, - result: WorkerResult, - ): void { - const resolve: ((value: WorkerResult) => void) | undefined = - worker.pendingResolve; - - this.clearWorkerPending(worker); - worker.busy = false; - - // Retire worker if it has exceeded execution limit - if (worker.executionCount >= MAX_EXECUTIONS_PER_WORKER) { - logger.debug( - `SyntheticMonitorWorkerPool: retiring worker after ${worker.executionCount} executions`, - ); - this.retireWorker(worker); - } else { - this.startIdleTimer(worker); - } - - if (resolve) { - resolve(result); - } - } - - private rejectWorkerExecution(worker: PoolWorker, error: Error): void { - const reject: ((reason: Error) => void) | undefined = worker.pendingReject; - - this.clearWorkerPending(worker); - worker.busy = false; - - // Retire worker if it has exceeded execution limit (same check as resolveWorkerExecution) - if (worker.executionCount >= MAX_EXECUTIONS_PER_WORKER) { - logger.debug( - `SyntheticMonitorWorkerPool: retiring worker after ${worker.executionCount} executions (error path)`, - ); - this.retireWorker(worker); - } else { - this.startIdleTimer(worker); - } - - if (reject) { - reject(error); - } - } - - private clearWorkerPending(worker: PoolWorker): void { - if (worker.pendingTimeoutTimer) { - clearTimeout(worker.pendingTimeoutTimer); - worker.pendingTimeoutTimer = undefined; - } - worker.pendingResolve = undefined; - worker.pendingReject = undefined; - worker.pendingId = undefined; - } - - private handleWorkerTimeout(worker: PoolWorker): void { - logger.error( - `SyntheticMonitorWorkerPool: worker execution timed out, killing worker`, - ); - - const reject: ((reason: Error) => void) | undefined = worker.pendingReject; - - this.clearWorkerPending(worker); - this.removeWorker(worker); - - // Force kill the worker process - try { - worker.process.kill("SIGKILL"); - } catch { - // ignore — process may have already exited - } - - if (reject) { - reject(new Error("Synthetic monitor worker execution timed out")); - } - } - - private handleWorkerExit( - worker: PoolWorker, - exitCode: number | null, - signal: string | null, - ): void { - const stderrInfo: string = worker.stderrOutput.trim() - ? `: ${worker.stderrOutput.trim().substring(0, 500)}` - : ""; - - logger.debug( - `SyntheticMonitorWorkerPool: worker exited (code=${exitCode}, signal=${signal})${stderrInfo}`, - ); - - // If there's a pending execution, reject it - if (worker.pendingReject) { - const reject: (reason: Error) => void = worker.pendingReject; - this.clearWorkerPending(worker); - - if (exitCode === null) { - const signalInfo: string = signal ? ` (signal: ${signal})` : ""; - reject( - new Error( - `Synthetic monitor worker was terminated by the system${signalInfo}. ` + - `This is usually caused by high memory usage or resource limits in the container${stderrInfo}`, - ), - ); - } else { - reject( - new Error( - `Synthetic monitor worker exited unexpectedly with code ${exitCode}${stderrInfo}`, - ), - ); - } - } - - this.removeWorker(worker); - } - - private handleWorkerError(worker: PoolWorker, err: Error): void { - logger.error(`SyntheticMonitorWorkerPool: worker error: ${err.message}`); - - if (worker.pendingReject) { - const reject: (reason: Error) => void = worker.pendingReject; - this.clearWorkerPending(worker); - reject(err); - } - - this.removeWorker(worker); - } - - private startIdleTimer(worker: PoolWorker): void { - if (worker.idleTimer) { - clearTimeout(worker.idleTimer); - } - - worker.idleTimer = setTimeout(() => { - if (!worker.busy) { - logger.debug( - `SyntheticMonitorWorkerPool: retiring idle worker (browserType=${worker.browserType || "none"})`, - ); - this.retireWorker(worker); - } - }, WORKER_IDLE_TIMEOUT_MS); - - // Don't let idle timers prevent process exit - if (worker.idleTimer.unref) { - worker.idleTimer.unref(); - } - } - - private retireWorker(worker: PoolWorker): void { - this.removeWorker(worker); - this.shutdownWorker(worker).catch((err: unknown) => { - logger.error(`SyntheticMonitorWorkerPool: error retiring worker: ${err}`); - }); - } - - private removeWorker(worker: PoolWorker): void { - if (worker.idleTimer) { - clearTimeout(worker.idleTimer); - worker.idleTimer = undefined; - } - - const index: number = this.workers.indexOf(worker); - if (index !== -1) { - this.workers.splice(index, 1); - } - } - - private async shutdownWorker(worker: PoolWorker): Promise { - return new Promise((resolve: () => void) => { - const forceKillTimer: ReturnType = setTimeout(() => { - try { - worker.process.kill("SIGKILL"); - } catch { - // ignore - } - resolve(); - }, 5000); - - if (forceKillTimer.unref) { - forceKillTimer.unref(); - } - - worker.process.once("exit", () => { - clearTimeout(forceKillTimer); - resolve(); - }); - - try { - const shutdownMsg: ParentToWorkerMessage = { type: "shutdown" }; - worker.process.send(shutdownMsg); - } catch { - // IPC channel already closed — force kill - try { - worker.process.kill("SIGKILL"); - } catch { - // ignore - } - clearTimeout(forceKillTimer); - resolve(); - } - }); - } -} - -export default new SyntheticMonitorWorkerPool();