Compare commits

...

17 Commits

Author SHA1 Message Date
Bas950
9cbb88beda chore: release v0.0.34 2024-09-17 10:54:54 +02:00
Bas950
09bcfe703f chore: scan count config 2024-09-17 10:54:48 +02:00
Bas950
d24eda8957 chore: release v0.0.33 2024-09-17 10:45:42 +02:00
Bas950
bfffcb94ee chore: some testing 2024-09-17 10:45:38 +02:00
Bas950
4db6a78816 chore: release v0.0.32 2024-09-17 10:35:04 +02:00
Bas950
666838874f chore: forgot to save 2024-09-17 10:34:58 +02:00
Bas950
697f3660c2 chore: some improvements 2024-09-17 10:34:46 +02:00
Florian Metz
a668add973 chore: release v0.0.31 2024-09-17 10:00:06 +02:00
Florian Metz
42b70b1259 chore: optimize active presence gauge update with concurrency limit 2024-09-17 09:59:41 +02:00
Bas950
253b680d3e chore: release v0.0.30 2024-09-17 09:36:49 +02:00
Bas950
e9a40dc553 chore: small updates 2024-09-17 09:36:43 +02:00
Bas950
b25880d4cd chore: release v0.0.29 2024-09-17 09:11:12 +02:00
Bas950
fb06227aeb chore: release v0.0.29 2024-09-17 09:07:58 +02:00
Bas950
ff3d00497b chore: release v0.0.29 2024-09-17 09:07:21 +02:00
Bas950
a06780f85a chore: reduce memory 2024-09-17 09:06:10 +02:00
Bas950
5b1969c7ab chore: release v0.0.28 2024-09-16 23:22:15 +02:00
Bas950
bedd34594c chore: disable ip stuff for now 2024-09-16 23:22:11 +02:00
8 changed files with 113 additions and 95 deletions

View File

@@ -5,8 +5,8 @@ declare module "ip-location-api" {
country: string;
} | null>;
export function updateDb(options: { fields?: string[]; dataDir?: string; tmpDataDir?: string }): Promise<void>;
export function reload(options: { fields?: string[]; dataDir?: string; tmpDataDir?: string }): Promise<void>;
export function updateDb(options: { fields?: string[]; dataDir?: string; tmpDataDir?: string; smallMemory?: boolean }): Promise<void>;
export function reload(options: { fields?: string[]; dataDir?: string; tmpDataDir?: string; smallMemory?: boolean }): Promise<void>;
}
declare namespace NodeJS {

View File

@@ -1,7 +1,7 @@
{
"name": "@premid/api-master",
"type": "module",
"version": "0.0.27",
"version": "0.0.34",
"private": true,
"description": "PreMiD's api master",
"license": "MPL-2.0",

View File

@@ -1,7 +1,9 @@
import { lt, sql } from "drizzle-orm";
import { db, onlineUsersIpData } from "../db.js";
import { mainLog } from "../index.js";
export async function cleanupOldUserData(retentionDays: number) {
mainLog("Cleaning up old user ip data");
const interval = `'${retentionDays} days'`;
await db.delete(onlineUsersIpData)
.where(lt(onlineUsersIpData.timestamp, sql`now() - interval ${sql.raw(interval)}`));

View File

@@ -8,14 +8,9 @@ const registeredMetrics = new Map<string, ClearableGaugeMetric>();
//* Custom gauge metric class
export class ClearableGaugeMetric {
private data: Map<string, { value: number; attributes: Attributes }>;
private name: string;
private description: string;
private data = new Map<string, { value: number; attributes: Attributes }>();
constructor(name: string, description: string) {
this.data = new Map();
this.name = name;
this.description = description;
constructor(private readonly name: string, private readonly description: string) {
registeredMetrics.set(name, this);
}
@@ -23,8 +18,13 @@ export class ClearableGaugeMetric {
this.data.set(key, { value, attributes });
}
clear() {
this.data.clear();
clear({ except }: { except?: string[] }) {
for (const key of this.data.keys()) {
if (except && except.includes(key))
continue;
this.data.delete(key);
}
}
toMetricData(): GaugeMetricData {

View File

@@ -11,13 +11,14 @@ export async function insertIpData(
}>,
) {
const timestamp = new Date();
const list = Array.from(data.entries());
const list = [...data.keys()];
//* Split into batches of batchSize
for (let i = 0; i < list.length; i += batchSize) {
const batch = list.slice(i, i + batchSize);
const mapped = await Promise.all(batch.map(async ([ip, { presences, sessions }]) => {
const mapped = await Promise.all(batch.map(async (ip) => {
const parsed = await lookupIp(ip);
if (parsed) {
const { presences, sessions } = data.get(ip)!;
return {
ip,
country: parsed.country,

View File

@@ -7,6 +7,7 @@ const fields = ["latitude", "longitude", "country"];
const dataDir = join(process.cwd(), "data");
const tmpDataDir = join(process.cwd(), "tmp");
const smallMemory = true;
let initialized = false;
@@ -23,7 +24,7 @@ export async function lookupIp(ip: string): Promise<{ latitude: number; longitud
}
}
let reloading: Promise<void> | undefined;
let reloading: Promise<void> | undefined = Promise.resolve();
let log: debug.Debugger | undefined;
export async function reloadIpLocationApi() {
@@ -34,8 +35,8 @@ export async function reloadIpLocationApi() {
reloading = new Promise((resolve, reject) => {
log?.("Reloading IP location API");
updateDb({ fields, dataDir, tmpDataDir }).then(async () => {
await reload({ fields, dataDir, tmpDataDir });
updateDb({ fields, dataDir, tmpDataDir, smallMemory }).then(async () => {
await reload({ fields, dataDir, tmpDataDir, smallMemory });
log?.("IP location API reloaded");
initialized = true;
reloading = undefined;

View File

@@ -1,52 +1,77 @@
import { redis } from "../index.js";
import process from "node:process";
import pLimit from "p-limit";
import { mainLog, redis } from "../index.js";
import { activePresenceGauge } from "../tracing.js";
import { insertIpData } from "./insertIpData.js";
//* Function to update the gauge with per-service counts
export const updateActivePresenceGaugeLimit = pLimit(1);
let log: debug.Debugger | undefined;
const scanCount = Number.parseInt(process.env.SCAN_COUNT || "1000", 10);
export async function updateActivePresenceGauge() {
const pattern = "pmd-api.heartbeatUpdates.*";
let cursor: string = "0";
const serviceCounts = new Map<string, number>();
const ips = new Map<string, {
presences: string[];
sessions: number;
}>();
await updateActivePresenceGaugeLimit(async () => {
log ??= mainLog.extend("Heartbeat-Updates");
log?.("Starting active presence gauge update");
do {
const [newCursor, keys] = await redis.scan(cursor, "MATCH", pattern, "COUNT", 1000); //* Use SCAN with COUNT for memory efficiency
cursor = newCursor;
for (const key of keys) {
const hash = await redis.hgetall(key);
const service = hash.service;
const version = hash.version; //* Get version from hash
const ip = hash.ip_address;
if (service && version) {
serviceCounts.set(`${service}:${version}`, (serviceCounts.get(`${service}:${version}`) || 0) + 1);
const pattern = "pmd-api.heartbeatUpdates.*";
let cursor: string = "0";
const serviceCounts = new Map<string, number>();
const ips = new Map<string, {
presences: Set<string>;
sessions: number;
}>();
do {
const [newCursor, keys] = await redis.scan(cursor, "MATCH", pattern, "COUNT", scanCount);
cursor = newCursor;
//* Use pipelining for batch Redis operations
const pipeline = redis.pipeline();
keys.forEach(key => pipeline.hmget(key, "service", "version", "ip_address"));
const hashes = await pipeline.exec();
if (!hashes) {
log?.("No hashes found");
return;
}
else {
serviceCounts.set("none", (serviceCounts.get("none") || 0) + 1);
}
if (ip) {
const presenceName = service && version ? `${service}:${version}` : undefined;
const ipData = ips.get(ip) || { presences: [], sessions: 0 };
ipData.presences = [...new Set<string>([...ipData.presences, presenceName].filter(Boolean) as string[])];
ipData.sessions++;
ips.set(ip, ipData);
}
}
} while (cursor !== "0");
// Clear previous data
activePresenceGauge.clear();
hashes.forEach(([err, hash]) => {
if (err || !Array.isArray(hash))
return;
// Set new data
for (const [serviceVersion, count] of serviceCounts.entries()) {
const [presence_name, version] = serviceVersion.split(":");
activePresenceGauge.set(serviceVersion, count, {
presence_name,
version,
});
}
const [service, version, ip] = hash;
const serviceVersion = service && version ? `${service}:${version}` : "none";
serviceCounts.set(serviceVersion, (serviceCounts.get(serviceVersion) || 0) + 1);
insertIpData(ips);
if (ip) {
const ipData = ips.get(ip) || { presences: new Set(), sessions: 0 };
if (serviceVersion !== "none")
ipData.presences.add(serviceVersion);
ipData.sessions++;
ips.set(ip, ipData);
}
});
} while (cursor !== "0");
log?.("Updating active presence gauge");
//* Batch update the gauge
// activePresenceGauge.clear({ except: [...serviceCounts.keys()] });
// for (const [serviceVersion, count] of serviceCounts) {
// const [presence_name, version] = serviceVersion.split(":");
// activePresenceGauge.set(serviceVersion, count, { presence_name, version });
// }
//* Convert IP data for insertion
const ipDataForInsertion = new Map(
Array.from(ips, ([ip, data]) => [ip, {
presences: Array.from(data.presences),
sessions: data.sessions,
}]),
);
await insertIpData(ipDataForInsertion);
log?.("Active presence gauge update completed");
});
}

View File

@@ -1,12 +1,12 @@
import process from "node:process";
import { CronJob } from "cron";
import debug from "debug";
import { clearOldSessions } from "./functions/clearOldSessions.js";
import createRedis from "./functions/createRedis.js";
import { setSessionCounter } from "./functions/setSessionCounter.js";
import "./tracing.js";
import { updateActivePresenceGauge } from "./functions/updateActivePresenceGauge.js";
import { reloadIpLocationApi } from "./functions/lookupIp.js";
import { updateActivePresenceGauge, updateActivePresenceGaugeLimit } from "./functions/updateActivePresenceGauge.js";
// import { reloadIpLocationApi } from "./functions/lookupIp.js";
import { cleanupOldUserData } from "./functions/cleanupOldUserData.js";
export const redis = createRedis();
@@ -19,44 +19,33 @@ void new CronJob(
// Every 5 seconds
"*/5 * * * * *",
() => {
clearOldSessions();
if (process.env.DISABLE_CLEAR_OLD_SESSIONS !== "true") {
clearOldSessions();
}
if (process.env.DISABLE_SET_SESSION_COUNTER !== "true") {
setSessionCounter();
}
if (process.env.DISABLE_ACTIVE_PRESENCE_GAUGE !== "true") {
updateActivePresenceGaugeLimit.clearQueue();
updateActivePresenceGauge();
}
},
undefined,
true,
);
void new CronJob(
// Every second
"* * * * * *",
() => {
setSessionCounter();
},
undefined,
true,
);
void new CronJob(
// Every 5 seconds
"*/5 * * * * *",
() => {
updateActivePresenceGauge();
},
undefined,
true,
);
void new CronJob(
// Every day at 9am
"0 9 * * *",
() => {
reloadIpLocationApi();
},
undefined,
true,
undefined,
undefined,
true,
);
// void new CronJob(
// // Every day at 9am
// "0 9 * * *",
// () => {
// reloadIpLocationApi();
// },
// undefined,
// true,
// undefined,
// undefined,
// true,
// );
void new CronJob(
// Every day at 1am