Compare commits

...

37 Commits

Author SHA1 Message Date
Florian Metz
3258179040 chore: release v0.0.24 2024-09-15 03:09:13 +02:00
Florian Metz
086d476af2 chore: update hash 2024-09-15 03:09:04 +02:00
Florian Metz
146bf9e270 chore: release v0.0.23 2024-09-15 02:48:56 +02:00
Florian Metz
a02f25ba29 chore: test 2024-09-15 02:48:41 +02:00
Florian Metz
416b65f0d4 chore: release v0.0.12 2024-09-15 02:41:31 +02:00
Florian Metz
f8e9fc832d chore: test 2024-09-15 02:41:16 +02:00
Florian Metz
86b0f07216 chore: test 2024-09-15 02:31:38 +02:00
Florian Metz
9eb5c03877 chore: release v0.0.11 2024-09-15 02:25:50 +02:00
Florian Metz
e63e1270aa chore: release v0.0.22 2024-09-15 02:25:38 +02:00
Florian Metz
f730e71bbf chore: test 2024-09-15 02:25:10 +02:00
Bas950
8b68bf85c8 chore: release v0.0.10 2024-09-13 17:27:44 +02:00
Bas950
e4c794a9ad chore: 202 on disabled flag 2024-09-13 17:27:38 +02:00
Bas950
6e8258d76f chore: release v0.0.21 2024-09-13 15:08:16 +02:00
Bas950
56b796c621 chore: use ky 2024-09-13 15:08:08 +02:00
Bas950
0de59c48b4 chore: release v0.0.20 2024-09-13 14:37:31 +02:00
Bas950
60056e069d chore: update log 2024-09-13 14:37:24 +02:00
Bas950
b6bad90919 chore: release v0.0.9 2024-09-13 14:33:34 +02:00
Bas950
ee21bb9dec chore: release v0.0.20 2024-09-13 14:31:39 +02:00
Bas950
6efac4fef1 feat: use scienceId 2024-09-13 14:31:27 +02:00
Bas950
93424793bd chore: release v0.0.19 2024-09-13 13:46:33 +02:00
Bas950
affcb6a0cf chore: add reason 2024-09-13 13:46:27 +02:00
Bas950
bb56949dfb chore: release v0.0.18 2024-09-13 13:02:31 +02:00
Bas950
c06fe04b65 chore: fix time 2024-09-13 13:02:26 +02:00
Florian Metz
ef976341ba chore: release v0.0.17 2024-09-13 12:33:19 +02:00
Florian Metz
38893891af chore: why does it not abort 2024-09-13 12:33:10 +02:00
Florian Metz
63eeeefda7 chore: release v0.0.16 2024-09-13 12:05:42 +02:00
Florian Metz
056db21cb0 chore: add p-limit dependency for session cleanup 2024-09-13 12:05:37 +02:00
Bas950
d8dc08c6c3 chore: release v0.0.15 2024-09-13 11:55:36 +02:00
Bas950
634391b6e3 chore: always return the key 2024-09-13 11:55:32 +02:00
Florian Metz
c46cf6975a chore: release v0.0.14 2024-09-13 11:52:23 +02:00
Florian Metz
68c6b4fcdc chore: add p-limit dependency for session cleanup 2024-09-13 11:52:00 +02:00
Florian Metz
55fa07d5b5 chore: release v0.0.13 2024-09-13 11:38:49 +02:00
Florian Metz
903c238b33 chore: add timeout to headless session deletion 2024-09-13 11:38:40 +02:00
Bas950
acd9afb2b1 chore: release v0.0.12 2024-09-13 11:32:55 +02:00
Bas950
4bd42390eb chore: move some code 2024-09-13 11:32:44 +02:00
Florian Metz
c014504464 chore: release v0.0.11 2024-09-13 11:00:16 +02:00
Florian Metz
24fe349b60 chore: optimize session cleanup with batch deletion 2024-09-13 10:59:13 +02:00
12 changed files with 172 additions and 49 deletions

View File

@@ -1,7 +1,7 @@
{
"name": "@premid/api-master",
"type": "module",
"version": "0.0.10",
"version": "0.0.24",
"private": true,
"description": "PreMiD's api master",
"license": "MPL-2.0",
@@ -14,7 +14,6 @@
"dev": "node --watch --env-file .env --enable-source-maps ."
},
"dependencies": {
"@discordjs/rest": "^2.3.0",
"@envelop/sentry": "^9.0.0",
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/exporter-prometheus": "^0.52.1",
@@ -22,7 +21,9 @@
"@sentry/node": "^8.17.0",
"cron": "^3.1.7",
"debug": "^4.3.6",
"ioredis": "^5.3.2"
"ioredis": "^5.3.2",
"ky": "^1.7.2",
"p-limit": "^6.1.0"
},
"devDependencies": {
"@types/debug": "^4.1.12"

View File

@@ -1,4 +1,5 @@
import { REST } from "@discordjs/rest";
import pLimit from "p-limit";
import ky, { HTTPError, TimeoutError } from "ky";
import { mainLog, redis } from "../index.js";
let inProgress = false;
@@ -13,15 +14,20 @@ export async function clearOldSessions() {
let cursor = "0";
let totalSessions = 0;
let cleared = 0;
const batchSize = 100;
let keysToDelete: string[] = [];
mainLog("Starting session cleanup");
const limit = pLimit(100); // Create a limit of 100 concurrent operations
do {
//* Use hscan to iterate through sessions
const [nextCursor, result] = await redis.hscan("pmd-api.sessions", cursor, "COUNT", "100");
const [nextCursor, result] = await redis.hscan("pmd-api.sessions", cursor, "COUNT", batchSize);
cursor = nextCursor;
totalSessions += result.length / 2;
const deletePromises = [];
for (let i = 0; i < result.length; i += 2) {
const key = result[i];
const value = result[i + 1];
@@ -36,29 +42,30 @@ export async function clearOldSessions() {
lastUpdated: number;
};
//* If the session is younger than 30 seconds, skip it
if (now - session.lastUpdated < 30000)
continue;
//* Delete the session
try {
const discord = new REST({ version: "10", authPrefix: "Bearer" });
discord.setToken(session.token);
await discord.post("/users/@me/headless-sessions/delete", {
body: {
token: session.session,
},
});
deletePromises.push(limit(() => deleteSession(session, key)));
}
await redis.hdel("pmd-api.sessions", key);
const results = await Promise.allSettled(deletePromises);
results.forEach((result) => {
if (result.status === "fulfilled" && result.value) {
keysToDelete.push(result.value);
cleared++;
}
catch (error) {
mainLog(`Failed to delete session: %O`, (typeof error === "object" && error && "message" in error ? error.message : error));
}
});
if (keysToDelete.length >= batchSize) {
await redis.hdel("pmd-api.sessions", ...keysToDelete);
keysToDelete = [];
}
} while (cursor !== "0");
if (keysToDelete.length > 0) {
await redis.hdel("pmd-api.sessions", ...keysToDelete);
}
if (totalSessions === 0) {
mainLog("No sessions to clear");
}
@@ -68,3 +75,31 @@ export async function clearOldSessions() {
inProgress = false;
}
async function deleteSession(session: { token: string; session: string }, key: string): Promise<string> {
try {
await ky.post("https://discord.com/api/v10/users/@me/headless-sessions/delete", {
json: {
token: session.session,
},
headers: {
Authorization: `Bearer ${session.token}`,
},
retry: 3,
timeout: 5000,
});
}
catch (error) {
if (error instanceof TimeoutError) {
mainLog(`Session deletion aborted due to timeout for key ${key}`);
}
else if (error instanceof HTTPError) {
mainLog(`Failed to delete session for key ${key}: [${error.name}] ${error.message} ${JSON.stringify(await error.response.json())}`);
}
else {
mainLog(`Failed to delete session for key ${key}: Unknown error`);
}
}
return key;
}

View File

@@ -0,0 +1,39 @@
import { redis } from "../index.js";
import { activePresenceGauge } from "../tracing.js";
//* Track previously recorded services
const previousServices = new Set<string>();
//* Function to update the gauge with per-service counts
export async function updateActivePresenceGauge() {
const pattern = "pmd-api.heartbeatUpdates.*";
let cursor: string = "0";
const serviceCounts = new Map<string, number>();
do {
const [newCursor, keys] = await redis.scan(cursor, "MATCH", pattern, "COUNT", 1000); //* Use SCAN with COUNT for memory efficiency
cursor = newCursor;
for (const key of keys) {
const hash = await redis.hgetall(key);
const service = hash.service;
const version = hash.version; //* Get version from hash
serviceCounts.set(`${service}:${version}`, (serviceCounts.get(`${service}:${version}`) || 0) + 1);
}
} while (cursor !== "0");
// Set current counts and remove from previousServices
serviceCounts.forEach((count, serviceVersion) => {
const [presence_name, version] = serviceVersion.split(":");
activePresenceGauge.record(count, { presence_name, version });
previousServices.delete(serviceVersion);
});
// Set gauge to 0 for services that are no longer active
previousServices.forEach((serviceVersion) => {
const [presence_name, version] = serviceVersion.split(":");
activePresenceGauge.record(0, { presence_name, version });
});
// Update the set of previous services
serviceCounts.forEach((_, serviceVersion) => previousServices.add(serviceVersion));
}

View File

@@ -5,12 +5,13 @@ import { clearOldSessions } from "./functions/clearOldSessions.js";
import createRedis from "./functions/createRedis.js";
import { setCounter } from "./functions/setCounter.js";
import "./tracing.js";
import { updateActivePresenceGauge } from "./functions/updateActivePresenceGauge.js"; //* Added import
export const redis = createRedis();
export const mainLog = debug("api-master");
debug("Starting cron job to clear old sessions");
debug("Starting cron jobs");
void new CronJob(
// Every 5 seconds
@@ -31,3 +32,13 @@ void new CronJob(
undefined,
true,
);
void new CronJob(
// Every 5 seconds
"*/5 * * * * *",
() => {
updateActivePresenceGauge();
},
undefined,
true,
);

View File

@@ -15,4 +15,10 @@ export const counter = meter.createUpDownCounter("active_activites", {
valueType: ValueType.INT,
});
// * Replace Observable Gauge with regular Gauge
export const activePresenceGauge = meter.createGauge("active_presence_names", {
description: "Number of active presence names per service",
valueType: ValueType.INT,
});
prometheusExporter.startServer();

View File

@@ -1,7 +1,7 @@
{
"name": "@premid/api-worker",
"type": "module",
"version": "0.0.8",
"version": "0.0.12",
"private": true,
"description": "PreMiD's api",
"license": "MPL-2.0",

View File

@@ -0,0 +1,10 @@
import process from "node:process";
import { defu } from "defu";
const disabledFlags = process.env.DISABLED_FEATURE_FLAGS?.split(",") ?? [];
const flags = Object.fromEntries(disabledFlags.map(flag => [flag, false]));
export const featureFlags = defu(flags, {
WebSocketManager: true,
SessionKeepAlive: true,
});

View File

@@ -1,13 +1,11 @@
import { readFile } from "node:fs/promises";
import { resolve } from "node:path";
import process from "node:process";
import { useSentry } from "@envelop/sentry";
import { maxAliasesPlugin } from "@escape.tech/graphql-armor-max-aliases";
import { maxDepthPlugin } from "@escape.tech/graphql-armor-max-depth";
import { maxDirectivesPlugin } from "@escape.tech/graphql-armor-max-directives";
import { maxTokensPlugin } from "@escape.tech/graphql-armor-max-tokens";
import fastifyWebsocket from "@fastify/websocket";
import { defu } from "defu";
import fastify from "fastify";
import { createSchema, createYoga } from "graphql-yoga";
@@ -15,6 +13,7 @@ import type { FastifyReply, FastifyRequest } from "fastify";
import { Socket } from "../classes/Socket.js";
import { resolvers } from "../graphql/resolvers/v5/index.js";
import { sessionKeepAlive } from "../routes/sessionKeepAlive.js";
import { featureFlags } from "../constants.js";
import createRedis from "./createRedis.js";
export interface FastifyContext {
@@ -48,7 +47,7 @@ export default async function createServer() {
maxDepthPlugin(),
maxDirectivesPlugin(),
maxTokensPlugin(),
useSentry(),
/* useSentry(), */
],
schema: createSchema<FastifyContext>({
resolvers,
@@ -87,15 +86,7 @@ export default async function createServer() {
});
app.get("/v5/feature-flags", async (request, reply) => {
const disabledFlags = process.env.DISABLED_FEATURE_FLAGS?.split(",") ?? [];
const flags = Object.fromEntries(disabledFlags.map(flag => [flag, false]));
const test = defu(flags, {
WebSocketManager: true,
SessionKeepAlive: true,
});
void reply.send(test);
void reply.send(featureFlags);
});
app.post("/v5/session-keep-alive", sessionKeepAlive);

View File

@@ -1,9 +1,10 @@
import { type } from "arktype";
import type { MutationResolvers } from "../../../../generated/graphql-v5.js";
import { redis } from "../../../../functions/createServer.js";
const heartbeatSchema = type({
identifier: "string.uuid & string.lower",
presences: {
presence: {
service: "string.trim",
version: "string.semver",
language: "string.trim",
@@ -25,13 +26,19 @@ const mutation: MutationResolvers["heartbeat"] = async (_parent, input) => {
if (out instanceof type.errors)
throw new Error(out.summary);
// ! Disabled for now
/* await redis.setex(
`pmd-api.heartbeatUpdates.${data.identifier}`,
// 5 minutes
300,
JSON.stringify(data)
); */
// * Use Redis Hash with 'service' in the key to store heartbeat data
const redisKey = `pmd-api.heartbeatUpdates.${out.identifier}`;
await redis.hset(redisKey, {
service: out.presence.service,
version: out.presence.version,
language: out.presence.language,
since: out.presence.since.toString(),
extension_version: out.extension.version,
extension_language: out.extension.language,
extension_connected_app: out.extension.connected?.app?.toString(),
extension_connected_discord: out.extension.connected?.discord?.toString(),
});
await redis.expire(redisKey, 300);
return {
__typename: "HeartbeatResult",

View File

@@ -3,8 +3,6 @@ import process from "node:process";
import * as Sentry from "@sentry/node";
import { connect } from "mongoose";
import "./tracing.js";
// eslint-disable-next-line perfectionist/sort-imports
import createServer from "./functions/createServer.js";
// TODO SETUP SENTRY

View File

@@ -4,17 +4,25 @@ import { type } from "arktype";
import { Routes } from "discord-api-types/v10";
import type { FastifyReply, FastifyRequest } from "fastify";
import { redis } from "../functions/createServer.js";
import { featureFlags } from "../constants.js";
const schema = type({
token: "string.trim",
session: "string.trim",
version: "string.semver & string.trim",
scienceId: "string.trim",
});
export async function sessionKeepAlive(request: FastifyRequest, reply: FastifyReply) {
//* Get the 2 headers
if (!featureFlags.SessionKeepAlive)
return reply.status(202).send();
//* Get the headers
const out = schema({
token: request.headers["x-token"],
session: request.headers["x-session"],
version: request.headers["x-version"] ?? "2.6.8",
scienceId: request.headers["x-science-id"] ?? request.headers["x-token"],
});
if (out instanceof type.errors)
@@ -25,7 +33,7 @@ export async function sessionKeepAlive(request: FastifyRequest, reply: FastifyRe
await redis.hset(
"pmd-api.sessions",
out.token,
out.scienceId,
JSON.stringify({
session: out.session,
token: out.token,

23
pnpm-lock.yaml generated
View File

@@ -53,9 +53,6 @@ importers:
apps/api-master:
dependencies:
'@discordjs/rest':
specifier: ^2.3.0
version: 2.4.0
'@envelop/sentry':
specifier: ^9.0.0
version: 9.0.0(@envelop/core@5.0.2)(@sentry/node@8.30.0)(graphql@16.9.0)
@@ -80,6 +77,12 @@ importers:
ioredis:
specifier: ^5.3.2
version: 5.4.1
ky:
specifier: ^1.7.2
version: 1.7.2
p-limit:
specifier: ^6.1.0
version: 6.1.0
devDependencies:
'@types/debug':
specifier: ^4.1.12
@@ -6143,6 +6146,10 @@ packages:
kolorist@1.8.0:
resolution: {integrity: sha512-Y+60/zizpJ3HRH8DCss+q95yr6145JXZo46OTpFvDZWLfRCE4qChOyk1b26nMaNpfHHgxagk9dXT5OP0Tfe+dQ==}
ky@1.7.2:
resolution: {integrity: sha512-OzIvbHKKDpi60TnF9t7UUVAF1B4mcqc02z5PIvrm08Wyb+yOcz63GRvEuVxNT18a9E1SrNouhB4W2NNLeD7Ykg==}
engines: {node: '>=18'}
launch-editor@2.9.1:
resolution: {integrity: sha512-Gcnl4Bd+hRO9P9icCP/RVVT2o8SFlPXofuCxvA2SaZuH45whSvf5p8x5oih5ftLiVhEI4sp5xDY+R+b3zJBh5w==}
@@ -6855,6 +6862,10 @@ packages:
resolution: {integrity: sha512-5b0R4txpzjPWVw/cXXUResoD4hb6U/x9BH08L7nw+GN1sezDzPdxeRvpc9c433fZhBan/wusjbCsqwqm4EIBIQ==}
engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0}
p-limit@6.1.0:
resolution: {integrity: sha512-H0jc0q1vOzlEk0TqAKXKZxdl7kX3OFUzCnNVUnq5Pc3DGo0kpeaMuPqxQn235HibwBEb0/pm9dgKTjXy66fBkg==}
engines: {node: '>=18'}
p-locate@4.1.0:
resolution: {integrity: sha512-R79ZZ/0wAxKGu3oYMlz8jy/kbhsNrS7SKZ7PxEHBgJ5+F2mtFW2fK2cOtBh1cHYkQsbzFV7I+EoRKe6Yt0oK7A==}
engines: {node: '>=8'}
@@ -16731,6 +16742,8 @@ snapshots:
kolorist@1.8.0: {}
ky@1.7.2: {}
launch-editor@2.9.1:
dependencies:
picocolors: 1.1.0
@@ -17895,6 +17908,10 @@ snapshots:
dependencies:
yocto-queue: 1.1.1
p-limit@6.1.0:
dependencies:
yocto-queue: 1.1.1
p-locate@4.1.0:
dependencies:
p-limit: 2.3.0