diff --git a/packages/backend/src/api/server.ts b/packages/backend/src/api/server.ts index f3f74a5..892ba81 100644 --- a/packages/backend/src/api/server.ts +++ b/packages/backend/src/api/server.ts @@ -158,13 +158,12 @@ export async function createServer(modules: ArchiverModule[] = []): Promise { res.send('Backend is running!!'); }); - - console.log('✅ Core OSS modules loaded.'); + logger.info('✅ Core OSS modules loaded.'); return app; } diff --git a/packages/backend/src/jobs/processors/index-email-batch.processor.ts b/packages/backend/src/jobs/processors/index-email-batch.processor.ts index 1b4f173..393b2bb 100644 --- a/packages/backend/src/jobs/processors/index-email-batch.processor.ts +++ b/packages/backend/src/jobs/processors/index-email-batch.processor.ts @@ -4,6 +4,7 @@ import { SearchService } from '../../services/SearchService'; import { StorageService } from '../../services/StorageService'; import { DatabaseService } from '../../services/DatabaseService'; import { PendingEmail } from '@open-archiver/types'; +import { logger } from '@open-archiver/backend/config/logger'; const searchService = new SearchService(); const storageService = new StorageService(); @@ -12,6 +13,6 @@ const indexingService = new IndexingService(databaseService, searchService, stor export default async function (job: Job<{ emails: PendingEmail[] }>) { const { emails } = job.data; - console.log(`Indexing email batch with ${emails.length} emails`); + logger.info(`Indexing email batch with ${emails.length} emails`); await indexingService.indexEmailBatch(emails); } diff --git a/packages/backend/src/jobs/schedulers/sync-scheduler.ts b/packages/backend/src/jobs/schedulers/sync-scheduler.ts index 90a2edc..4e355b3 100644 --- a/packages/backend/src/jobs/schedulers/sync-scheduler.ts +++ b/packages/backend/src/jobs/schedulers/sync-scheduler.ts @@ -1,6 +1,7 @@ import { ingestionQueue } from '../queues'; import { config } from '../../config'; +import { logger } from '@open-archiver/backend/config/logger'; const scheduleContinuousSync = async () => { // This job will run every 15 minutes @@ -17,5 +18,5 @@ const scheduleContinuousSync = async () => { }; scheduleContinuousSync().then(() => { - console.log('Continuous sync scheduler started.'); + logger.info('Continuous sync scheduler started.'); }); diff --git a/packages/backend/src/services/SyncSessionService.ts b/packages/backend/src/services/SyncSessionService.ts index abb611c..675471e 100644 --- a/packages/backend/src/services/SyncSessionService.ts +++ b/packages/backend/src/services/SyncSessionService.ts @@ -161,7 +161,7 @@ export class SyncSessionService { */ public static async heartbeat(sessionId: string): Promise { try { - console.log('heatbeat, ', sessionId); + logger.info('heatbeat, ', sessionId); await db .update(syncSessions) .set({ lastActivityAt: new Date() }) diff --git a/packages/backend/src/services/ingestion-connectors/ImapConnector.ts b/packages/backend/src/services/ingestion-connectors/ImapConnector.ts index 760eba5..d843ed7 100644 --- a/packages/backend/src/services/ingestion-connectors/ImapConnector.ts +++ b/packages/backend/src/services/ingestion-connectors/ImapConnector.ts @@ -197,7 +197,7 @@ export class ImapConnector implements IEmailConnector { // Only fetch if the mailbox has messages, to avoid errors on empty mailboxes with some IMAP servers. if (mailbox.exists > 0) { - const BATCH_SIZE = 250; // A configurable batch size + const BATCH_SIZE = 250; let startUid = (lastUid || 0) + 1; const maxUidToFetch = currentMaxUid; @@ -205,10 +205,11 @@ export class ImapConnector implements IEmailConnector { const endUid = Math.min(startUid + BATCH_SIZE - 1, maxUidToFetch); const searchCriteria = { uid: `${startUid}:${endUid}` }; + // --- Pass 1: fetch only envelope + uid (no source) for the entire batch. + const uidsToFetch: number[] = []; + for await (const msg of this.client.fetch(searchCriteria, { envelope: true, - source: true, - bodyStructure: true, uid: true, })) { if (lastUid && msg.uid <= lastUid) { @@ -219,7 +220,9 @@ export class ImapConnector implements IEmailConnector { this.newMaxUids[mailboxPath] = msg.uid; } - // Optimization: Verify existence using Message-ID from envelope before fetching full body + // Duplicate check against the Message-ID from the envelope. + // If a duplicate is found we skip fetching the full source entirely, + // avoiding loading attachment binary data into memory for known emails. if (checkDuplicate && msg.envelope?.messageId) { const isDuplicate = await checkDuplicate( msg.envelope.messageId @@ -237,18 +240,42 @@ export class ImapConnector implements IEmailConnector { } } - logger.debug({ mailboxPath, uid: msg.uid }, 'Processing message'); + if (msg.envelope) { + uidsToFetch.push(msg.uid); + } + } - if (msg.envelope && msg.source) { - try { - yield await this.parseMessage(msg, mailboxPath); - } catch (err: any) { - logger.error( - { err, mailboxPath, uid: msg.uid }, - 'Failed to parse message' - ); - throw err; + // --- Pass 2: fetch full source one message at a time for non-duplicate UIDs. + for (const uid of uidsToFetch) { + logger.debug( + { mailboxPath, uid }, + 'Fetching full source for message' + ); + + try { + const fullMsg = await this.withRetry( + async () => + await this.client.fetchOne( + String(uid), + { + envelope: true, + source: true, + bodyStructure: true, + uid: true, + }, + { uid: true } + ) + ); + + if (fullMsg && fullMsg.envelope && fullMsg.source) { + yield await this.parseMessage(fullMsg, mailboxPath); } + } catch (err: any) { + logger.error( + { err, mailboxPath, uid }, + 'Failed to fetch or parse message' + ); + throw err; } } diff --git a/packages/backend/src/workers/indexing.worker.ts b/packages/backend/src/workers/indexing.worker.ts index 00708c7..5cc6923 100644 --- a/packages/backend/src/workers/indexing.worker.ts +++ b/packages/backend/src/workers/indexing.worker.ts @@ -1,6 +1,7 @@ import { Worker } from 'bullmq'; import { connection } from '../config/redis'; import indexEmailBatchProcessor from '../jobs/processors/index-email-batch.processor'; +import { logger } from '../config/logger'; const processor = async (job: any) => { switch (job.name) { @@ -21,7 +22,7 @@ const worker = new Worker('indexing', processor, { }, }); -console.log('Indexing worker started'); +logger.info('Indexing worker started'); process.on('SIGINT', () => worker.close()); process.on('SIGTERM', () => worker.close()); diff --git a/packages/backend/src/workers/ingestion.worker.ts b/packages/backend/src/workers/ingestion.worker.ts index fe63fdc..34fe580 100644 --- a/packages/backend/src/workers/ingestion.worker.ts +++ b/packages/backend/src/workers/ingestion.worker.ts @@ -5,6 +5,7 @@ import continuousSyncProcessor from '../jobs/processors/continuous-sync.processo import scheduleContinuousSyncProcessor from '../jobs/processors/schedule-continuous-sync.processor'; import { processMailboxProcessor } from '../jobs/processors/process-mailbox.processor'; import syncCycleFinishedProcessor from '../jobs/processors/sync-cycle-finished.processor'; +import { logger } from '../config/logger'; const processor = async (job: any) => { switch (job.name) { @@ -37,7 +38,7 @@ const worker = new Worker('ingestion', processor, { }, }); -console.log('Ingestion worker started'); +logger.info('Ingestion worker started'); process.on('SIGINT', () => worker.close()); process.on('SIGTERM', () => worker.close()); diff --git a/packages/frontend/src/routes/+layout.ts b/packages/frontend/src/routes/+layout.ts index 2618086..0ab7067 100644 --- a/packages/frontend/src/routes/+layout.ts +++ b/packages/frontend/src/routes/+layout.ts @@ -11,8 +11,6 @@ export const load: LayoutLoad = async ({ url, data }) => { if (data && data.systemSettings?.language) { initLocale = data.systemSettings.language; } - - console.log(initLocale); await loadTranslations(initLocale, pathname); return {