ran duplicate check for IMAP import, optimize message listing

This commit is contained in:
wayneshn
2026-03-20 12:07:37 +01:00
parent a55a1ede07
commit 8dee7105fa
8 changed files with 52 additions and 24 deletions

View File

@@ -158,13 +158,12 @@ export async function createServer(modules: ArchiverModule[] = []): Promise<Expr
// Load all provided extension modules
for (const module of modules) {
await module.initialize(app, authService);
console.log(`🏢 Enterprise module loaded: ${module.name}`);
logger.info(`🏢 Enterprise module loaded: ${module.name}`);
}
app.get('/', (req, res) => {
res.send('Backend is running!!');
});
console.log('✅ Core OSS modules loaded.');
logger.info('✅ Core OSS modules loaded.');
return app;
}

View File

@@ -4,6 +4,7 @@ import { SearchService } from '../../services/SearchService';
import { StorageService } from '../../services/StorageService';
import { DatabaseService } from '../../services/DatabaseService';
import { PendingEmail } from '@open-archiver/types';
import { logger } from '@open-archiver/backend/config/logger';
const searchService = new SearchService();
const storageService = new StorageService();
@@ -12,6 +13,6 @@ const indexingService = new IndexingService(databaseService, searchService, stor
export default async function (job: Job<{ emails: PendingEmail[] }>) {
const { emails } = job.data;
console.log(`Indexing email batch with ${emails.length} emails`);
logger.info(`Indexing email batch with ${emails.length} emails`);
await indexingService.indexEmailBatch(emails);
}

View File

@@ -1,6 +1,7 @@
import { ingestionQueue } from '../queues';
import { config } from '../../config';
import { logger } from '@open-archiver/backend/config/logger';
const scheduleContinuousSync = async () => {
// This job will run every 15 minutes
@@ -17,5 +18,5 @@ const scheduleContinuousSync = async () => {
};
scheduleContinuousSync().then(() => {
console.log('Continuous sync scheduler started.');
logger.info('Continuous sync scheduler started.');
});

View File

@@ -161,7 +161,7 @@ export class SyncSessionService {
*/
public static async heartbeat(sessionId: string): Promise<void> {
try {
console.log('heatbeat, ', sessionId);
logger.info('heatbeat, ', sessionId);
await db
.update(syncSessions)
.set({ lastActivityAt: new Date() })

View File

@@ -197,7 +197,7 @@ export class ImapConnector implements IEmailConnector {
// Only fetch if the mailbox has messages, to avoid errors on empty mailboxes with some IMAP servers.
if (mailbox.exists > 0) {
const BATCH_SIZE = 250; // A configurable batch size
const BATCH_SIZE = 250;
let startUid = (lastUid || 0) + 1;
const maxUidToFetch = currentMaxUid;
@@ -205,10 +205,11 @@ export class ImapConnector implements IEmailConnector {
const endUid = Math.min(startUid + BATCH_SIZE - 1, maxUidToFetch);
const searchCriteria = { uid: `${startUid}:${endUid}` };
// --- Pass 1: fetch only envelope + uid (no source) for the entire batch.
const uidsToFetch: number[] = [];
for await (const msg of this.client.fetch(searchCriteria, {
envelope: true,
source: true,
bodyStructure: true,
uid: true,
})) {
if (lastUid && msg.uid <= lastUid) {
@@ -219,7 +220,9 @@ export class ImapConnector implements IEmailConnector {
this.newMaxUids[mailboxPath] = msg.uid;
}
// Optimization: Verify existence using Message-ID from envelope before fetching full body
// Duplicate check against the Message-ID from the envelope.
// If a duplicate is found we skip fetching the full source entirely,
// avoiding loading attachment binary data into memory for known emails.
if (checkDuplicate && msg.envelope?.messageId) {
const isDuplicate = await checkDuplicate(
msg.envelope.messageId
@@ -237,18 +240,42 @@ export class ImapConnector implements IEmailConnector {
}
}
logger.debug({ mailboxPath, uid: msg.uid }, 'Processing message');
if (msg.envelope) {
uidsToFetch.push(msg.uid);
}
}
if (msg.envelope && msg.source) {
try {
yield await this.parseMessage(msg, mailboxPath);
} catch (err: any) {
logger.error(
{ err, mailboxPath, uid: msg.uid },
'Failed to parse message'
);
throw err;
// --- Pass 2: fetch full source one message at a time for non-duplicate UIDs.
for (const uid of uidsToFetch) {
logger.debug(
{ mailboxPath, uid },
'Fetching full source for message'
);
try {
const fullMsg = await this.withRetry(
async () =>
await this.client.fetchOne(
String(uid),
{
envelope: true,
source: true,
bodyStructure: true,
uid: true,
},
{ uid: true }
)
);
if (fullMsg && fullMsg.envelope && fullMsg.source) {
yield await this.parseMessage(fullMsg, mailboxPath);
}
} catch (err: any) {
logger.error(
{ err, mailboxPath, uid },
'Failed to fetch or parse message'
);
throw err;
}
}

View File

@@ -1,6 +1,7 @@
import { Worker } from 'bullmq';
import { connection } from '../config/redis';
import indexEmailBatchProcessor from '../jobs/processors/index-email-batch.processor';
import { logger } from '../config/logger';
const processor = async (job: any) => {
switch (job.name) {
@@ -21,7 +22,7 @@ const worker = new Worker('indexing', processor, {
},
});
console.log('Indexing worker started');
logger.info('Indexing worker started');
process.on('SIGINT', () => worker.close());
process.on('SIGTERM', () => worker.close());

View File

@@ -5,6 +5,7 @@ import continuousSyncProcessor from '../jobs/processors/continuous-sync.processo
import scheduleContinuousSyncProcessor from '../jobs/processors/schedule-continuous-sync.processor';
import { processMailboxProcessor } from '../jobs/processors/process-mailbox.processor';
import syncCycleFinishedProcessor from '../jobs/processors/sync-cycle-finished.processor';
import { logger } from '../config/logger';
const processor = async (job: any) => {
switch (job.name) {
@@ -37,7 +38,7 @@ const worker = new Worker('ingestion', processor, {
},
});
console.log('Ingestion worker started');
logger.info('Ingestion worker started');
process.on('SIGINT', () => worker.close());
process.on('SIGTERM', () => worker.close());

View File

@@ -11,8 +11,6 @@ export const load: LayoutLoad = async ({ url, data }) => {
if (data && data.systemSettings?.language) {
initLocale = data.systemSettings.language;
}
console.log(initLocale);
await loadTranslations(initLocale, pathname);
return {