fix: add activity store and user activity caching

This commit is contained in:
pa
2026-03-20 04:40:25 +09:00
parent fbfaf7b93c
commit 15fc0bdf1b
14 changed files with 1084 additions and 101 deletions
+212
View File
@@ -0,0 +1,212 @@
import { dbVars } from '../database';
import sqliteService from '../sqlite.js';
const activityCache = {
/**
* @param {string} userId
* @returns {Promise<{
* userId: string,
* updatedAt: string,
* isSelf: boolean,
* sourceLastCreatedAt: string,
* pendingSessionStartAt: number | null
* } | null>}
*/
async getActivityCacheMeta(userId) {
let row = null;
await sqliteService.execute(
(dbRow) => {
row = {
userId: dbRow[0],
updatedAt: dbRow[1],
isSelf: Boolean(dbRow[2]),
sourceLastCreatedAt: dbRow[3] || '',
pendingSessionStartAt:
typeof dbRow[4] === 'number' ? dbRow[4] : null
};
},
`SELECT user_id, updated_at, is_self, source_last_created_at, pending_session_start_at
FROM ${dbVars.userPrefix}_activity_cache_meta
WHERE user_id = @userId`,
{ '@userId': userId }
);
return row;
},
/**
* @param {string} userId
* @returns {Promise<Array<{start: number, end: number}>>}
*/
async getActivityCacheSessions(userId) {
const sessions = [];
await sqliteService.execute(
(dbRow) => {
sessions.push({
start: dbRow[0],
end: dbRow[1]
});
},
`SELECT start_at, end_at
FROM ${dbVars.userPrefix}_activity_cache_sessions
WHERE user_id = @userId
ORDER BY start_at`,
{ '@userId': userId }
);
return sessions;
},
/**
* @param {string} userId
* @returns {Promise<{
* userId: string,
* updatedAt: string,
* isSelf: boolean,
* sourceLastCreatedAt: string,
* pendingSessionStartAt: number | null,
* sessions: Array<{start: number, end: number}>
* } | null>}
*/
async getActivityCache(userId) {
const meta = await this.getActivityCacheMeta(userId);
if (!meta) {
return null;
}
const sessions = await this.getActivityCacheSessions(userId);
return {
...meta,
sessions
};
},
/**
* @param {string} userId
* @returns {Promise<{start: number, end: number} | null>}
*/
async getLastActivityCacheSession(userId) {
let row = null;
await sqliteService.execute(
(dbRow) => {
row = {
start: dbRow[0],
end: dbRow[1]
};
},
`SELECT start_at, end_at
FROM ${dbVars.userPrefix}_activity_cache_sessions
WHERE user_id = @userId
ORDER BY start_at DESC
LIMIT 1`,
{ '@userId': userId }
);
return row;
},
/**
* @param {{
* userId: string,
* updatedAt: string,
* isSelf: boolean,
* sourceLastCreatedAt: string,
* pendingSessionStartAt: number | null,
* sessions: Array<{start: number, end: number}>
* }} entry
* @returns {Promise<void>}
*/
async replaceActivityCache(entry) {
await sqliteService.executeNonQuery('BEGIN');
try {
await sqliteService.executeNonQuery(
`DELETE FROM ${dbVars.userPrefix}_activity_cache_sessions WHERE user_id = @userId`,
{ '@userId': entry.userId }
);
await upsertSessions(entry.userId, entry.sessions);
await upsertMeta(entry);
await sqliteService.executeNonQuery('COMMIT');
} catch (error) {
await sqliteService.executeNonQuery('ROLLBACK');
throw error;
}
},
/**
* @param {{
* userId: string,
* updatedAt: string,
* isSelf: boolean,
* sourceLastCreatedAt: string,
* pendingSessionStartAt: number | null,
* sessions: Array<{start: number, end: number}>,
* replaceLastSession?: {start: number, end: number} | null
* }} entry
* @returns {Promise<void>}
*/
async appendActivityCache(entry) {
await sqliteService.executeNonQuery('BEGIN');
try {
if (entry.replaceLastSession) {
await sqliteService.executeNonQuery(
`DELETE FROM ${dbVars.userPrefix}_activity_cache_sessions
WHERE user_id = @userId AND start_at = @start AND end_at = @end`,
{
'@userId': entry.userId,
'@start': entry.replaceLastSession.start,
'@end': entry.replaceLastSession.end
}
);
}
await upsertSessions(entry.userId, entry.sessions);
await upsertMeta(entry);
await sqliteService.executeNonQuery('COMMIT');
} catch (error) {
await sqliteService.executeNonQuery('ROLLBACK');
throw error;
}
},
/**
* @param {{
* userId: string,
* updatedAt: string,
* isSelf: boolean,
* sourceLastCreatedAt: string,
* pendingSessionStartAt: number | null
* }} entry
* @returns {Promise<void>}
*/
async touchActivityCacheMeta(entry) {
await upsertMeta(entry);
}
};
async function upsertMeta(entry) {
await sqliteService.executeNonQuery(
`INSERT OR REPLACE INTO ${dbVars.userPrefix}_activity_cache_meta
(user_id, updated_at, is_self, source_last_created_at, pending_session_start_at)
VALUES (@user_id, @updated_at, @is_self, @source_last_created_at, @pending_session_start_at)`,
{
'@user_id': entry.userId,
'@updated_at': entry.updatedAt,
'@is_self': entry.isSelf ? 1 : 0,
'@source_last_created_at': entry.sourceLastCreatedAt || '',
'@pending_session_start_at': entry.pendingSessionStartAt
}
);
}
async function upsertSessions(userId, sessions = []) {
for (const session of sessions) {
await sqliteService.executeNonQuery(
`INSERT OR REPLACE INTO ${dbVars.userPrefix}_activity_cache_sessions
(user_id, start_at, end_at)
VALUES (@user_id, @start_at, @end_at)`,
{
'@user_id': userId,
'@start_at': session.start,
'@end_at': session.end
}
);
}
}
export { activityCache };
+24
View File
@@ -623,6 +623,30 @@ const feed = {
return data;
},
/**
* @param {string} userId
* @param {string} afterCreatedAt
* @returns {Promise<Array<{created_at: string, type: string}>>}
*/
async getOnlineOfflineSessionsAfter(userId, afterCreatedAt) {
const data = [];
await sqliteService.execute(
(dbRow) => {
data.push({ created_at: dbRow[0], type: dbRow[1] });
},
`SELECT created_at, type FROM ${dbVars.userPrefix}_feed_online_offline
WHERE user_id = @userId
AND (type = 'Online' OR type = 'Offline')
AND created_at > @afterCreatedAt
ORDER BY created_at`,
{
'@userId': userId,
'@afterCreatedAt': afterCreatedAt
}
);
return data;
},
/**
* @param {number} days - Number of days to look back
* @param {number} limit - Max number of worlds to return
+17
View File
@@ -1386,6 +1386,23 @@ const gameLog = {
return data;
},
/**
* Get current user's online sessions after a given timestamp (incremental).
* @param {string} afterCreatedAt - Only return rows created after this timestamp
* @returns {Promise<Array<{created_at: string, time: number}>>}
*/
async getCurrentUserOnlineSessionsAfter(afterCreatedAt) {
const data = [];
await sqliteService.execute(
(dbRow) => {
data.push({ created_at: dbRow[0], time: dbRow[1] || 0 });
},
`SELECT created_at, time FROM gamelog_location WHERE created_at > @after ORDER BY created_at`,
{ '@after': afterCreatedAt }
);
return data;
},
/**
* Get current user's top visited worlds from gamelog_location.
* Groups by world_id and aggregates visit count and total time.
+11
View File
@@ -1,3 +1,4 @@
import { activityCache } from './activityCache.js';
import { avatarFavorites } from './avatarFavorites.js';
import { avatarTags } from './avatarTags.js';
import { feed } from './feed.js';
@@ -25,6 +26,7 @@ const dbVars = {
const database = {
...feed,
...activityCache,
...gameLog,
...notifications,
...moderation,
@@ -70,6 +72,15 @@ const database = {
await sqliteService.executeNonQuery(
`CREATE TABLE IF NOT EXISTS ${dbVars.userPrefix}_feed_online_offline (id INTEGER PRIMARY KEY, created_at TEXT, user_id TEXT, display_name TEXT, type TEXT, location TEXT, world_name TEXT, time INTEGER, group_name TEXT)`
);
await sqliteService.executeNonQuery(
`CREATE TABLE IF NOT EXISTS ${dbVars.userPrefix}_activity_cache_meta (user_id TEXT PRIMARY KEY, updated_at TEXT, is_self INTEGER DEFAULT 0, source_last_created_at TEXT, pending_session_start_at INTEGER)`
);
await sqliteService.executeNonQuery(
`CREATE TABLE IF NOT EXISTS ${dbVars.userPrefix}_activity_cache_sessions (user_id TEXT NOT NULL, start_at INTEGER NOT NULL, end_at INTEGER NOT NULL, PRIMARY KEY (user_id, start_at, end_at))`
);
await sqliteService.executeNonQuery(
`CREATE INDEX IF NOT EXISTS ${dbVars.userPrefix}_activity_cache_sessions_user_start_idx ON ${dbVars.userPrefix}_activity_cache_sessions (user_id, start_at)`
);
await sqliteService.executeNonQuery(
`CREATE TABLE IF NOT EXISTS ${dbVars.userPrefix}_friend_log_current (user_id TEXT PRIMARY KEY, display_name TEXT, trust_level TEXT, friend_number INTEGER)`
);
+20
View File
@@ -10,6 +10,7 @@ const tableAlter = {
await this.updateTableForGroupNames();
await this.addFriendLogFriendNumber();
await this.updateTableForAvatarHistory();
await this.ensureActivityCacheTables();
// }
// await sqliteService.executeNonQuery('PRAGMA user_version = 1');
},
@@ -80,6 +81,25 @@ const tableAlter = {
}
}
}
},
async ensureActivityCacheTables() {
const tables = [];
await sqliteService.execute((dbRow) => {
tables.push(dbRow[0]);
}, `SELECT name FROM sqlite_schema WHERE type='table' AND name LIKE '%_feed_online_offline'`);
for (const tableName of tables) {
const userPrefix = tableName.replace(/_feed_online_offline$/, '');
await sqliteService.executeNonQuery(
`CREATE TABLE IF NOT EXISTS ${userPrefix}_activity_cache_meta (user_id TEXT PRIMARY KEY, updated_at TEXT, is_self INTEGER DEFAULT 0, source_last_created_at TEXT, pending_session_start_at INTEGER)`
);
await sqliteService.executeNonQuery(
`CREATE TABLE IF NOT EXISTS ${userPrefix}_activity_cache_sessions (user_id TEXT NOT NULL, start_at INTEGER NOT NULL, end_at INTEGER NOT NULL, PRIMARY KEY (user_id, start_at, end_at))`
);
await sqliteService.executeNonQuery(
`CREATE INDEX IF NOT EXISTS ${userPrefix}_activity_cache_sessions_user_start_idx ON ${userPrefix}_activity_cache_sessions (user_id, start_at)`
);
}
}
};