From c2006dfa94db350a4b0e6f8c9c403f99cec6e3c0 Mon Sep 17 00:00:00 2001 From: "Wei S." <5291640+wayneshn@users.noreply.github.com> Date: Wed, 29 Oct 2025 12:59:19 +0100 Subject: [PATCH] V0.4 fix 2 (#210) * formatting code * Remove uninstalled packages * fix(imap): Improve IMAP connection stability and error handling This commit refactors the IMAP connector to enhance connection management, error handling, and overall stability during email ingestion. The `isConnected` flag has been removed in favor of relying directly on the `client.usable` property from the `imapflow` library. This simplifies the connection logic and avoids state synchronization issues. The `connect` method now re-creates the client instance if it's not usable, ensuring a fresh connection after errors or disconnects. The retry mechanism (`withRetry`) has been updated to no longer manually reset the connection state, as the `connect` method now handles this automatically on the next attempt. Additionally, a minor bug in the `sync-cycle-finished` processor has been fixed. The logic for merging sync states from successful jobs has been simplified and correctly typed, preventing potential runtime errors when no successful jobs are present. --------- Co-authored-by: Wayne <5291640+ringoinca@users.noreply.github.com> --- README.md | 1 + .../sync-cycle-finished.processor.ts | 7 +-- .../ingestion-connectors/ImapConnector.ts | 23 ++++---- .../ingestion-connectors/MboxConnector.ts | 55 ++++++------------- 4 files changed, 30 insertions(+), 56 deletions(-) diff --git a/README.md b/README.md index 33405a4..63428a2 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,7 @@ Password: openarchiver_demo - **Thread discovery**: The ability to discover if an email belongs to a thread/conversation and present the context. - **Compliance & Retention**: Define granular retention policies to automatically manage the lifecycle of your data. Place legal holds on communications to prevent deletion during litigation (TBD). - **File Hash and Encryption**: Email and attachment file hash values are stored in the meta database upon ingestion, meaning any attempt to alter the file content will be identified, ensuring legal and regulatory compliance. +- - Each archived email comes with an "Integrity Report" feature that indicates if the files are original. - **Comprehensive Auditing**: An immutable audit trail logs all system activities, ensuring you have a clear record of who accessed what and when. ## 🛠️ Tech Stack diff --git a/packages/backend/src/jobs/processors/sync-cycle-finished.processor.ts b/packages/backend/src/jobs/processors/sync-cycle-finished.processor.ts index 3a1f88e..5b4a862 100644 --- a/packages/backend/src/jobs/processors/sync-cycle-finished.processor.ts +++ b/packages/backend/src/jobs/processors/sync-cycle-finished.processor.ts @@ -49,10 +49,9 @@ export default async (job: Job) => { // if data doesn't have error property, it is a successful job with SyncState const successfulJobs = allChildJobs.filter((v) => !v || !(v as any).error) as SyncState[]; - const finalSyncState = - successfulJobs.length > 0 - ? deepmerge(...successfulJobs.filter((s) => s && Object.keys(s).length > 0)) - : {}; + const finalSyncState = deepmerge( + ...successfulJobs.filter((s) => s && Object.keys(s).length > 0) + ) as SyncState; const source = await IngestionService.findById(ingestionSourceId); let status: IngestionStatus = 'active'; diff --git a/packages/backend/src/services/ingestion-connectors/ImapConnector.ts b/packages/backend/src/services/ingestion-connectors/ImapConnector.ts index c60bad2..5c26cae 100644 --- a/packages/backend/src/services/ingestion-connectors/ImapConnector.ts +++ b/packages/backend/src/services/ingestion-connectors/ImapConnector.ts @@ -15,7 +15,6 @@ import { getThreadId } from './helpers/utils'; export class ImapConnector implements IEmailConnector { private client: ImapFlow; private newMaxUids: { [mailboxPath: string]: number } = {}; - private isConnected = false; private statusMessage: string | undefined; constructor(private credentials: GenericImapCredentials) { @@ -41,7 +40,6 @@ export class ImapConnector implements IEmailConnector { // Handles client-level errors, like unexpected disconnects, to prevent crashes. client.on('error', (err) => { logger.error({ err }, 'IMAP client error'); - this.isConnected = false; }); return client; @@ -51,20 +49,17 @@ export class ImapConnector implements IEmailConnector { * Establishes a connection to the IMAP server if not already connected. */ private async connect(): Promise { - if (this.isConnected && this.client.usable) { + // If the client is already connected and usable, do nothing. + if (this.client.usable) { return; } - // If the client is not usable (e.g., after a logout), create a new one. - if (!this.client.usable) { - this.client = this.createClient(); - } + // If the client is not usable (e.g., after a logout or an error), create a new one. + this.client = this.createClient(); try { await this.client.connect(); - this.isConnected = true; } catch (err: any) { - this.isConnected = false; logger.error({ err }, 'IMAP connection failed'); if (err.responseText) { throw new Error(`IMAP Connection Error: ${err.responseText}`); @@ -77,9 +72,8 @@ export class ImapConnector implements IEmailConnector { * Disconnects from the IMAP server if the connection is active. */ private async disconnect(): Promise { - if (this.isConnected && this.client.usable) { + if (this.client.usable) { await this.client.logout(); - this.isConnected = false; } } @@ -130,8 +124,7 @@ export class ImapConnector implements IEmailConnector { return await action(); } catch (err: any) { logger.error({ err, attempt }, `IMAP operation failed on attempt ${attempt}`); - this.isConnected = false; // Force reconnect on next attempt - this.client = this.createClient(); // Create a new client instance for the next retry + // The client is no longer usable, a new one will be created on the next attempt. if (attempt === maxRetries) { logger.error({ err }, 'IMAP operation failed after all retries.'); throw err; @@ -156,6 +149,10 @@ export class ImapConnector implements IEmailConnector { const mailboxes = await this.withRetry(async () => await this.client.list()); const processableMailboxes = mailboxes.filter((mailbox) => { + // Exclude mailboxes that cannot be selected. + if (mailbox.flags.has('\\Noselect')) { + return false; + } if (config.app.allInclusiveArchive) { return true; } diff --git a/packages/backend/src/services/ingestion-connectors/MboxConnector.ts b/packages/backend/src/services/ingestion-connectors/MboxConnector.ts index f160ed7..fa03c42 100644 --- a/packages/backend/src/services/ingestion-connectors/MboxConnector.ts +++ b/packages/backend/src/services/ingestion-connectors/MboxConnector.ts @@ -98,52 +98,29 @@ export class MboxConnector implements IEmailConnector { const mboxSplitter = new MboxSplitter(); const emailStream = fileStream.pipe(mboxSplitter); - try { - for await (const emailBuffer of emailStream) { - try { - const emailObject = await this.parseMessage(emailBuffer as Buffer, ''); - yield emailObject; - } catch (error) { - logger.error( - { error, file: this.credentials.uploadedFilePath }, - 'Failed to process a single message from mbox file. Skipping.' - ); - } - } - } finally { - // Ensure all streams are properly closed before deleting the file. - if (fileStream instanceof Readable) { - fileStream.destroy(); - } - if (emailStream instanceof Readable) { - emailStream.destroy(); - } - // Wait for the streams to fully close to prevent race conditions with file deletion. - await new Promise((resolve) => { - if (fileStream instanceof Readable) { - fileStream.on('close', resolve); - } else { - resolve(true); - } - }); - - await new Promise((resolve) => { - if (emailStream instanceof Readable) { - emailStream.on('close', resolve); - } else { - resolve(true); - } - }); - + for await (const emailBuffer of emailStream) { try { - await this.storage.delete(this.credentials.uploadedFilePath); + const emailObject = await this.parseMessage(emailBuffer as Buffer, ''); + yield emailObject; } catch (error) { logger.error( { error, file: this.credentials.uploadedFilePath }, - 'Failed to delete mbox file after processing.' + 'Failed to process a single message from mbox file. Skipping.' ); } } + + // After the stream is fully consumed, delete the file. + // The `for await...of` loop ensures streams are properly closed on completion, + // so we can safely delete the file here without causing a hang. + try { + await this.storage.delete(this.credentials.uploadedFilePath); + } catch (error) { + logger.error( + { error, file: this.credentials.uploadedFilePath }, + 'Failed to delete mbox file after processing.' + ); + } } private async parseMessage(emlBuffer: Buffer, path: string): Promise {