diff --git a/README.md b/README.md index 33405a4..63428a2 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,7 @@ Password: openarchiver_demo - **Thread discovery**: The ability to discover if an email belongs to a thread/conversation and present the context. - **Compliance & Retention**: Define granular retention policies to automatically manage the lifecycle of your data. Place legal holds on communications to prevent deletion during litigation (TBD). - **File Hash and Encryption**: Email and attachment file hash values are stored in the meta database upon ingestion, meaning any attempt to alter the file content will be identified, ensuring legal and regulatory compliance. +- - Each archived email comes with an "Integrity Report" feature that indicates if the files are original. - **Comprehensive Auditing**: An immutable audit trail logs all system activities, ensuring you have a clear record of who accessed what and when. ## 🛠️ Tech Stack diff --git a/packages/backend/src/jobs/processors/sync-cycle-finished.processor.ts b/packages/backend/src/jobs/processors/sync-cycle-finished.processor.ts index 3a1f88e..5b4a862 100644 --- a/packages/backend/src/jobs/processors/sync-cycle-finished.processor.ts +++ b/packages/backend/src/jobs/processors/sync-cycle-finished.processor.ts @@ -49,10 +49,9 @@ export default async (job: Job) => { // if data doesn't have error property, it is a successful job with SyncState const successfulJobs = allChildJobs.filter((v) => !v || !(v as any).error) as SyncState[]; - const finalSyncState = - successfulJobs.length > 0 - ? deepmerge(...successfulJobs.filter((s) => s && Object.keys(s).length > 0)) - : {}; + const finalSyncState = deepmerge( + ...successfulJobs.filter((s) => s && Object.keys(s).length > 0) + ) as SyncState; const source = await IngestionService.findById(ingestionSourceId); let status: IngestionStatus = 'active'; diff --git a/packages/backend/src/services/ingestion-connectors/ImapConnector.ts b/packages/backend/src/services/ingestion-connectors/ImapConnector.ts index c60bad2..5c26cae 100644 --- a/packages/backend/src/services/ingestion-connectors/ImapConnector.ts +++ b/packages/backend/src/services/ingestion-connectors/ImapConnector.ts @@ -15,7 +15,6 @@ import { getThreadId } from './helpers/utils'; export class ImapConnector implements IEmailConnector { private client: ImapFlow; private newMaxUids: { [mailboxPath: string]: number } = {}; - private isConnected = false; private statusMessage: string | undefined; constructor(private credentials: GenericImapCredentials) { @@ -41,7 +40,6 @@ export class ImapConnector implements IEmailConnector { // Handles client-level errors, like unexpected disconnects, to prevent crashes. client.on('error', (err) => { logger.error({ err }, 'IMAP client error'); - this.isConnected = false; }); return client; @@ -51,20 +49,17 @@ export class ImapConnector implements IEmailConnector { * Establishes a connection to the IMAP server if not already connected. */ private async connect(): Promise { - if (this.isConnected && this.client.usable) { + // If the client is already connected and usable, do nothing. + if (this.client.usable) { return; } - // If the client is not usable (e.g., after a logout), create a new one. - if (!this.client.usable) { - this.client = this.createClient(); - } + // If the client is not usable (e.g., after a logout or an error), create a new one. + this.client = this.createClient(); try { await this.client.connect(); - this.isConnected = true; } catch (err: any) { - this.isConnected = false; logger.error({ err }, 'IMAP connection failed'); if (err.responseText) { throw new Error(`IMAP Connection Error: ${err.responseText}`); @@ -77,9 +72,8 @@ export class ImapConnector implements IEmailConnector { * Disconnects from the IMAP server if the connection is active. */ private async disconnect(): Promise { - if (this.isConnected && this.client.usable) { + if (this.client.usable) { await this.client.logout(); - this.isConnected = false; } } @@ -130,8 +124,7 @@ export class ImapConnector implements IEmailConnector { return await action(); } catch (err: any) { logger.error({ err, attempt }, `IMAP operation failed on attempt ${attempt}`); - this.isConnected = false; // Force reconnect on next attempt - this.client = this.createClient(); // Create a new client instance for the next retry + // The client is no longer usable, a new one will be created on the next attempt. if (attempt === maxRetries) { logger.error({ err }, 'IMAP operation failed after all retries.'); throw err; @@ -156,6 +149,10 @@ export class ImapConnector implements IEmailConnector { const mailboxes = await this.withRetry(async () => await this.client.list()); const processableMailboxes = mailboxes.filter((mailbox) => { + // Exclude mailboxes that cannot be selected. + if (mailbox.flags.has('\\Noselect')) { + return false; + } if (config.app.allInclusiveArchive) { return true; } diff --git a/packages/backend/src/services/ingestion-connectors/MboxConnector.ts b/packages/backend/src/services/ingestion-connectors/MboxConnector.ts index f160ed7..fa03c42 100644 --- a/packages/backend/src/services/ingestion-connectors/MboxConnector.ts +++ b/packages/backend/src/services/ingestion-connectors/MboxConnector.ts @@ -98,52 +98,29 @@ export class MboxConnector implements IEmailConnector { const mboxSplitter = new MboxSplitter(); const emailStream = fileStream.pipe(mboxSplitter); - try { - for await (const emailBuffer of emailStream) { - try { - const emailObject = await this.parseMessage(emailBuffer as Buffer, ''); - yield emailObject; - } catch (error) { - logger.error( - { error, file: this.credentials.uploadedFilePath }, - 'Failed to process a single message from mbox file. Skipping.' - ); - } - } - } finally { - // Ensure all streams are properly closed before deleting the file. - if (fileStream instanceof Readable) { - fileStream.destroy(); - } - if (emailStream instanceof Readable) { - emailStream.destroy(); - } - // Wait for the streams to fully close to prevent race conditions with file deletion. - await new Promise((resolve) => { - if (fileStream instanceof Readable) { - fileStream.on('close', resolve); - } else { - resolve(true); - } - }); - - await new Promise((resolve) => { - if (emailStream instanceof Readable) { - emailStream.on('close', resolve); - } else { - resolve(true); - } - }); - + for await (const emailBuffer of emailStream) { try { - await this.storage.delete(this.credentials.uploadedFilePath); + const emailObject = await this.parseMessage(emailBuffer as Buffer, ''); + yield emailObject; } catch (error) { logger.error( { error, file: this.credentials.uploadedFilePath }, - 'Failed to delete mbox file after processing.' + 'Failed to process a single message from mbox file. Skipping.' ); } } + + // After the stream is fully consumed, delete the file. + // The `for await...of` loop ensures streams are properly closed on completion, + // so we can safely delete the file here without causing a hang. + try { + await this.storage.delete(this.credentials.uploadedFilePath); + } catch (error) { + logger.error( + { error, file: this.credentials.uploadedFilePath }, + 'Failed to delete mbox file after processing.' + ); + } } private async parseMessage(emlBuffer: Buffer, path: string): Promise {