chore: some improvements

This commit is contained in:
Bas950
2024-09-17 10:34:46 +02:00
parent a668add973
commit 697f3660c2

View File

@@ -5,16 +5,17 @@ import { insertIpData } from "./insertIpData.js";
export const updateActivePresenceGaugeLimit = pLimit(1);
let log: debug.Debugger | undefined;
//* Function to update the gauge with per-service counts
export async function updateActivePresenceGauge() {
await updateActivePresenceGaugeLimit(async () => {
log ??= mainLog.extend("Heartbeat-Updates");
log?.("Starting active presence gauge update");
const pattern = "pmd-api.heartbeatUpdates.*";
let cursor: string = "0";
const serviceCounts = new Map<string, number>();
const ips = new Map<string, {
presences: string[];
presences: Set<string>;
sessions: number;
}>();
@@ -22,46 +23,47 @@ export async function updateActivePresenceGauge() {
const [newCursor, keys] = await redis.scan(cursor, "MATCH", pattern, "COUNT", 1000);
cursor = newCursor;
const hashes = await Promise.all(keys.map(key => redis.hmget(key, "service", "version", "ip_address")));
for (const hash of hashes) {
const service = hash[0];
const version = hash[1];
const ip = hash[2];
if (service && version) {
serviceCounts.set(`${service}:${version}`, (serviceCounts.get(`${service}:${version}`) || 0) + 1);
}
else {
serviceCounts.set("none", (serviceCounts.get("none") || 0) + 1);
}
//* Use pipelining for batch Redis operations
const pipeline = redis.pipeline();
keys.forEach(key => pipeline.hmget(key, "service", "version", "ip_address"));
const hashes = await pipeline.exec();
hashes.forEach(([err, hash]) => {
if (err || !Array.isArray(hash))
return;
const [service, version, ip] = hash;
const serviceVersion = service && version ? `${service}:${version}` : "none";
serviceCounts.set(serviceVersion, (serviceCounts.get(serviceVersion) || 0) + 1);
if (ip) {
const presenceName = service && version ? `${service}:${version}` : undefined;
const ipData = ips.get(ip) || { presences: [], sessions: 0 };
if (presenceName) {
ipData.presences.push(presenceName);
ipData.presences = Array.from(new Set(ipData.presences.filter(Boolean)));
}
const ipData = ips.get(ip) || { presences: new Set(), sessions: 0 };
if (serviceVersion !== "none")
ipData.presences.add(serviceVersion);
ipData.sessions++;
ips.set(ip, ipData);
}
}
});
} while (cursor !== "0");
log?.("Updating active presence gauge");
// Clear previous data
//* Batch update the gauge
activePresenceGauge.clear({ except: [...serviceCounts.keys()] });
// Set new data
for (const [serviceVersion, count] of serviceCounts.entries()) {
for (const [serviceVersion, count] of serviceCounts) {
const [presence_name, version] = serviceVersion.split(":");
activePresenceGauge.set(serviceVersion, count, {
presence_name,
version,
});
activePresenceGauge.set(serviceVersion, count, { presence_name, version });
}
insertIpData(ips);
//* Convert IP data for insertion
const ipDataForInsertion = new Map(
Array.from(ips, ([ip, data]) => [ip, {
presences: Array.from(data.presences),
sessions: data.sessions,
}]),
);
await insertIpData(ipDataForInsertion);
log?.("Active presence gauge update completed");
});
}