mirror of
https://github.com/PreMiD/PreMiD.git
synced 2026-04-06 04:41:58 +02:00
Compare commits
17 Commits
api-master
...
api-master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d24eda8957 | ||
|
|
bfffcb94ee | ||
|
|
4db6a78816 | ||
|
|
666838874f | ||
|
|
697f3660c2 | ||
|
|
a668add973 | ||
|
|
42b70b1259 | ||
|
|
253b680d3e | ||
|
|
e9a40dc553 | ||
|
|
b25880d4cd | ||
|
|
fb06227aeb | ||
|
|
ff3d00497b | ||
|
|
a06780f85a | ||
|
|
5b1969c7ab | ||
|
|
bedd34594c | ||
|
|
47feaa5c70 | ||
|
|
9fb32f53ae |
4
apps/api-master/environment.d.ts
vendored
4
apps/api-master/environment.d.ts
vendored
@@ -5,8 +5,8 @@ declare module "ip-location-api" {
|
||||
country: string;
|
||||
} | null>;
|
||||
|
||||
export function updateDb(options: { fields?: string[]; dataDir?: string; tmpDataDir?: string }): Promise<void>;
|
||||
export function reload(options: { fields?: string[]; dataDir?: string; tmpDataDir?: string }): Promise<void>;
|
||||
export function updateDb(options: { fields?: string[]; dataDir?: string; tmpDataDir?: string; smallMemory?: boolean }): Promise<void>;
|
||||
export function reload(options: { fields?: string[]; dataDir?: string; tmpDataDir?: string; smallMemory?: boolean }): Promise<void>;
|
||||
}
|
||||
|
||||
declare namespace NodeJS {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"name": "@premid/api-master",
|
||||
"type": "module",
|
||||
"version": "0.0.26",
|
||||
"version": "0.0.33",
|
||||
"private": true,
|
||||
"description": "PreMiD's api master",
|
||||
"license": "MPL-2.0",
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
import { lt, sql } from "drizzle-orm";
|
||||
import { db, onlineUsersIpData } from "../db.js";
|
||||
import { mainLog } from "../index.js";
|
||||
|
||||
export async function cleanupOldUserData(retentionDays: number) {
|
||||
mainLog("Cleaning up old user ip data");
|
||||
const interval = `'${retentionDays} days'`;
|
||||
await db.delete(onlineUsersIpData)
|
||||
.where(lt(onlineUsersIpData.timestamp, sql`now() - interval ${sql.raw(interval)}`));
|
||||
|
||||
@@ -8,14 +8,9 @@ const registeredMetrics = new Map<string, ClearableGaugeMetric>();
|
||||
|
||||
//* Custom gauge metric class
|
||||
export class ClearableGaugeMetric {
|
||||
private data: Map<string, { value: number; attributes: Attributes }>;
|
||||
private name: string;
|
||||
private description: string;
|
||||
private data = new Map<string, { value: number; attributes: Attributes }>();
|
||||
|
||||
constructor(name: string, description: string) {
|
||||
this.data = new Map();
|
||||
this.name = name;
|
||||
this.description = description;
|
||||
constructor(private readonly name: string, private readonly description: string) {
|
||||
registeredMetrics.set(name, this);
|
||||
}
|
||||
|
||||
@@ -23,8 +18,13 @@ export class ClearableGaugeMetric {
|
||||
this.data.set(key, { value, attributes });
|
||||
}
|
||||
|
||||
clear() {
|
||||
this.data.clear();
|
||||
clear({ except }: { except?: string[] }) {
|
||||
for (const key of this.data.keys()) {
|
||||
if (except && except.includes(key))
|
||||
continue;
|
||||
|
||||
this.data.delete(key);
|
||||
}
|
||||
}
|
||||
|
||||
toMetricData(): GaugeMetricData {
|
||||
|
||||
@@ -2,7 +2,7 @@ import type { InferInsertModel } from "drizzle-orm";
|
||||
import { db, onlineUsersIpData } from "../db.js";
|
||||
import { lookupIp } from "./lookupIp.js";
|
||||
|
||||
const batchSize = 10000;
|
||||
const batchSize = 1000;
|
||||
|
||||
export async function insertIpData(
|
||||
data: Map<string, {
|
||||
@@ -11,13 +11,14 @@ export async function insertIpData(
|
||||
}>,
|
||||
) {
|
||||
const timestamp = new Date();
|
||||
const list = Array.from(data.entries());
|
||||
const list = [...data.keys()];
|
||||
//* Split into batches of batchSize
|
||||
for (let i = 0; i < list.length; i += batchSize) {
|
||||
const batch = list.slice(i, i + batchSize);
|
||||
const mapped = await Promise.all(batch.map(async ([ip, { presences, sessions }]) => {
|
||||
const mapped = await Promise.all(batch.map(async (ip) => {
|
||||
const parsed = await lookupIp(ip);
|
||||
if (parsed) {
|
||||
const { presences, sessions } = data.get(ip)!;
|
||||
return {
|
||||
ip,
|
||||
country: parsed.country,
|
||||
|
||||
@@ -7,6 +7,7 @@ const fields = ["latitude", "longitude", "country"];
|
||||
|
||||
const dataDir = join(process.cwd(), "data");
|
||||
const tmpDataDir = join(process.cwd(), "tmp");
|
||||
const smallMemory = true;
|
||||
|
||||
let initialized = false;
|
||||
|
||||
@@ -23,7 +24,7 @@ export async function lookupIp(ip: string): Promise<{ latitude: number; longitud
|
||||
}
|
||||
}
|
||||
|
||||
let reloading: Promise<void> | undefined;
|
||||
let reloading: Promise<void> | undefined = Promise.resolve();
|
||||
let log: debug.Debugger | undefined;
|
||||
|
||||
export async function reloadIpLocationApi() {
|
||||
@@ -34,8 +35,8 @@ export async function reloadIpLocationApi() {
|
||||
|
||||
reloading = new Promise((resolve, reject) => {
|
||||
log?.("Reloading IP location API");
|
||||
updateDb({ fields, dataDir, tmpDataDir }).then(async () => {
|
||||
await reload({ fields, dataDir, tmpDataDir });
|
||||
updateDb({ fields, dataDir, tmpDataDir, smallMemory }).then(async () => {
|
||||
await reload({ fields, dataDir, tmpDataDir, smallMemory });
|
||||
log?.("IP location API reloaded");
|
||||
initialized = true;
|
||||
reloading = undefined;
|
||||
|
||||
@@ -1,52 +1,74 @@
|
||||
import { redis } from "../index.js";
|
||||
import pLimit from "p-limit";
|
||||
import { mainLog, redis } from "../index.js";
|
||||
import { activePresenceGauge } from "../tracing.js";
|
||||
import { insertIpData } from "./insertIpData.js";
|
||||
|
||||
//* Function to update the gauge with per-service counts
|
||||
export const updateActivePresenceGaugeLimit = pLimit(1);
|
||||
let log: debug.Debugger | undefined;
|
||||
|
||||
export async function updateActivePresenceGauge() {
|
||||
const pattern = "pmd-api.heartbeatUpdates.*";
|
||||
let cursor: string = "0";
|
||||
const serviceCounts = new Map<string, number>();
|
||||
const ips = new Map<string, {
|
||||
presences: string[];
|
||||
sessions: number;
|
||||
}>();
|
||||
await updateActivePresenceGaugeLimit(async () => {
|
||||
log ??= mainLog.extend("Heartbeat-Updates");
|
||||
log?.("Starting active presence gauge update");
|
||||
|
||||
do {
|
||||
const [newCursor, keys] = await redis.scan(cursor, "MATCH", pattern, "COUNT", 1000); //* Use SCAN with COUNT for memory efficiency
|
||||
cursor = newCursor;
|
||||
for (const key of keys) {
|
||||
const hash = await redis.hgetall(key);
|
||||
const service = hash.service;
|
||||
const version = hash.version; //* Get version from hash
|
||||
const ip = hash.ip_address;
|
||||
if (service && version) {
|
||||
serviceCounts.set(`${service}:${version}`, (serviceCounts.get(`${service}:${version}`) || 0) + 1);
|
||||
const pattern = "pmd-api.heartbeatUpdates.*";
|
||||
let cursor: string = "0";
|
||||
const serviceCounts = new Map<string, number>();
|
||||
const ips = new Map<string, {
|
||||
presences: Set<string>;
|
||||
sessions: number;
|
||||
}>();
|
||||
|
||||
do {
|
||||
const [newCursor, keys] = await redis.scan(cursor, "MATCH", pattern, "COUNT", 1000);
|
||||
cursor = newCursor;
|
||||
|
||||
//* Use pipelining for batch Redis operations
|
||||
const pipeline = redis.pipeline();
|
||||
keys.forEach(key => pipeline.hmget(key, "service", "version", "ip_address"));
|
||||
const hashes = await pipeline.exec();
|
||||
|
||||
if (!hashes) {
|
||||
log?.("No hashes found");
|
||||
return;
|
||||
}
|
||||
else {
|
||||
serviceCounts.set("none", (serviceCounts.get("none") || 0) + 1);
|
||||
}
|
||||
if (ip) {
|
||||
const presenceName = service && version ? `${service}:${version}` : undefined;
|
||||
const ipData = ips.get(ip) || { presences: [], sessions: 0 };
|
||||
ipData.presences = [...new Set<string>([...ipData.presences, presenceName].filter(Boolean) as string[])];
|
||||
ipData.sessions++;
|
||||
ips.set(ip, ipData);
|
||||
}
|
||||
}
|
||||
} while (cursor !== "0");
|
||||
|
||||
// Clear previous data
|
||||
activePresenceGauge.clear();
|
||||
hashes.forEach(([err, hash]) => {
|
||||
if (err || !Array.isArray(hash))
|
||||
return;
|
||||
|
||||
// Set new data
|
||||
for (const [serviceVersion, count] of serviceCounts.entries()) {
|
||||
const [presence_name, version] = serviceVersion.split(":");
|
||||
activePresenceGauge.set(serviceVersion, count, {
|
||||
presence_name,
|
||||
version,
|
||||
});
|
||||
}
|
||||
const [service, version, ip] = hash;
|
||||
const serviceVersion = service && version ? `${service}:${version}` : "none";
|
||||
serviceCounts.set(serviceVersion, (serviceCounts.get(serviceVersion) || 0) + 1);
|
||||
|
||||
insertIpData(ips);
|
||||
if (ip) {
|
||||
const ipData = ips.get(ip) || { presences: new Set(), sessions: 0 };
|
||||
if (serviceVersion !== "none")
|
||||
ipData.presences.add(serviceVersion);
|
||||
ipData.sessions++;
|
||||
ips.set(ip, ipData);
|
||||
}
|
||||
});
|
||||
} while (cursor !== "0");
|
||||
|
||||
log?.("Updating active presence gauge");
|
||||
|
||||
//* Batch update the gauge
|
||||
// activePresenceGauge.clear({ except: [...serviceCounts.keys()] });
|
||||
// for (const [serviceVersion, count] of serviceCounts) {
|
||||
// const [presence_name, version] = serviceVersion.split(":");
|
||||
// activePresenceGauge.set(serviceVersion, count, { presence_name, version });
|
||||
// }
|
||||
|
||||
//* Convert IP data for insertion
|
||||
const ipDataForInsertion = new Map(
|
||||
Array.from(ips, ([ip, data]) => [ip, {
|
||||
presences: Array.from(data.presences),
|
||||
sessions: data.sessions,
|
||||
}]),
|
||||
);
|
||||
|
||||
await insertIpData(ipDataForInsertion);
|
||||
log?.("Active presence gauge update completed");
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
import process from "node:process";
|
||||
import { CronJob } from "cron";
|
||||
|
||||
import debug from "debug";
|
||||
import { clearOldSessions } from "./functions/clearOldSessions.js";
|
||||
import createRedis from "./functions/createRedis.js";
|
||||
import { setSessionCounter } from "./functions/setSessionCounter.js";
|
||||
import "./tracing.js";
|
||||
import { updateActivePresenceGauge } from "./functions/updateActivePresenceGauge.js";
|
||||
import { reloadIpLocationApi } from "./functions/lookupIp.js";
|
||||
import { updateActivePresenceGauge, updateActivePresenceGaugeLimit } from "./functions/updateActivePresenceGauge.js";
|
||||
// import { reloadIpLocationApi } from "./functions/lookupIp.js";
|
||||
import { cleanupOldUserData } from "./functions/cleanupOldUserData.js";
|
||||
|
||||
export const redis = createRedis();
|
||||
@@ -19,44 +19,33 @@ void new CronJob(
|
||||
// Every 5 seconds
|
||||
"*/5 * * * * *",
|
||||
() => {
|
||||
clearOldSessions();
|
||||
if (process.env.DISABLE_CLEAR_OLD_SESSIONS !== "true") {
|
||||
clearOldSessions();
|
||||
}
|
||||
if (process.env.DISABLE_SET_SESSION_COUNTER !== "true") {
|
||||
setSessionCounter();
|
||||
}
|
||||
if (process.env.DISABLE_ACTIVE_PRESENCE_GAUGE !== "true") {
|
||||
updateActivePresenceGaugeLimit.clearQueue();
|
||||
updateActivePresenceGauge();
|
||||
}
|
||||
},
|
||||
undefined,
|
||||
true,
|
||||
);
|
||||
|
||||
void new CronJob(
|
||||
// Every second
|
||||
"* * * * * *",
|
||||
() => {
|
||||
setSessionCounter();
|
||||
},
|
||||
undefined,
|
||||
true,
|
||||
);
|
||||
|
||||
void new CronJob(
|
||||
// Every 5 seconds
|
||||
"*/5 * * * * *",
|
||||
() => {
|
||||
updateActivePresenceGauge();
|
||||
},
|
||||
undefined,
|
||||
true,
|
||||
);
|
||||
|
||||
void new CronJob(
|
||||
// Every day at 9am
|
||||
"0 9 * * *",
|
||||
() => {
|
||||
reloadIpLocationApi();
|
||||
},
|
||||
undefined,
|
||||
true,
|
||||
undefined,
|
||||
undefined,
|
||||
true,
|
||||
);
|
||||
// void new CronJob(
|
||||
// // Every day at 9am
|
||||
// "0 9 * * *",
|
||||
// () => {
|
||||
// reloadIpLocationApi();
|
||||
// },
|
||||
// undefined,
|
||||
// true,
|
||||
// undefined,
|
||||
// undefined,
|
||||
// true,
|
||||
// );
|
||||
|
||||
void new CronJob(
|
||||
// Every day at 1am
|
||||
|
||||
Reference in New Issue
Block a user