V0.4 fix 2 (#210)

* formatting code

* Remove uninstalled packages

* fix(imap): Improve IMAP connection stability and error handling

This commit refactors the IMAP connector to enhance connection management, error handling, and overall stability during email ingestion.

The `isConnected` flag has been removed in favor of relying directly on the `client.usable` property from the `imapflow` library. This simplifies the connection logic and avoids state synchronization issues.

The `connect` method now re-creates the client instance if it's not usable, ensuring a fresh connection after errors or disconnects. The retry mechanism (`withRetry`) has been updated to no longer manually reset the connection state, as the `connect` method now handles this automatically on the next attempt.

Additionally, a minor bug in the `sync-cycle-finished` processor has been fixed. The logic for merging sync states from successful jobs has been simplified and correctly typed, preventing potential runtime errors when no successful jobs are present.

---------

Co-authored-by: Wayne <5291640+ringoinca@users.noreply.github.com>
This commit is contained in:
Wei S.
2025-10-29 12:59:19 +01:00
committed by GitHub
parent 399059a773
commit c2006dfa94
4 changed files with 30 additions and 56 deletions

View File

@@ -54,6 +54,7 @@ Password: openarchiver_demo
- **Thread discovery**: The ability to discover if an email belongs to a thread/conversation and present the context.
- **Compliance & Retention**: Define granular retention policies to automatically manage the lifecycle of your data. Place legal holds on communications to prevent deletion during litigation (TBD).
- **File Hash and Encryption**: Email and attachment file hash values are stored in the meta database upon ingestion, meaning any attempt to alter the file content will be identified, ensuring legal and regulatory compliance.
- - Each archived email comes with an "Integrity Report" feature that indicates if the files are original.
- **Comprehensive Auditing**: An immutable audit trail logs all system activities, ensuring you have a clear record of who accessed what and when.
## 🛠️ Tech Stack

View File

@@ -49,10 +49,9 @@ export default async (job: Job<ISyncCycleFinishedJob, any, string>) => {
// if data doesn't have error property, it is a successful job with SyncState
const successfulJobs = allChildJobs.filter((v) => !v || !(v as any).error) as SyncState[];
const finalSyncState =
successfulJobs.length > 0
? deepmerge(...successfulJobs.filter((s) => s && Object.keys(s).length > 0))
: {};
const finalSyncState = deepmerge(
...successfulJobs.filter((s) => s && Object.keys(s).length > 0)
) as SyncState;
const source = await IngestionService.findById(ingestionSourceId);
let status: IngestionStatus = 'active';

View File

@@ -15,7 +15,6 @@ import { getThreadId } from './helpers/utils';
export class ImapConnector implements IEmailConnector {
private client: ImapFlow;
private newMaxUids: { [mailboxPath: string]: number } = {};
private isConnected = false;
private statusMessage: string | undefined;
constructor(private credentials: GenericImapCredentials) {
@@ -41,7 +40,6 @@ export class ImapConnector implements IEmailConnector {
// Handles client-level errors, like unexpected disconnects, to prevent crashes.
client.on('error', (err) => {
logger.error({ err }, 'IMAP client error');
this.isConnected = false;
});
return client;
@@ -51,20 +49,17 @@ export class ImapConnector implements IEmailConnector {
* Establishes a connection to the IMAP server if not already connected.
*/
private async connect(): Promise<void> {
if (this.isConnected && this.client.usable) {
// If the client is already connected and usable, do nothing.
if (this.client.usable) {
return;
}
// If the client is not usable (e.g., after a logout), create a new one.
if (!this.client.usable) {
this.client = this.createClient();
}
// If the client is not usable (e.g., after a logout or an error), create a new one.
this.client = this.createClient();
try {
await this.client.connect();
this.isConnected = true;
} catch (err: any) {
this.isConnected = false;
logger.error({ err }, 'IMAP connection failed');
if (err.responseText) {
throw new Error(`IMAP Connection Error: ${err.responseText}`);
@@ -77,9 +72,8 @@ export class ImapConnector implements IEmailConnector {
* Disconnects from the IMAP server if the connection is active.
*/
private async disconnect(): Promise<void> {
if (this.isConnected && this.client.usable) {
if (this.client.usable) {
await this.client.logout();
this.isConnected = false;
}
}
@@ -130,8 +124,7 @@ export class ImapConnector implements IEmailConnector {
return await action();
} catch (err: any) {
logger.error({ err, attempt }, `IMAP operation failed on attempt ${attempt}`);
this.isConnected = false; // Force reconnect on next attempt
this.client = this.createClient(); // Create a new client instance for the next retry
// The client is no longer usable, a new one will be created on the next attempt.
if (attempt === maxRetries) {
logger.error({ err }, 'IMAP operation failed after all retries.');
throw err;
@@ -156,6 +149,10 @@ export class ImapConnector implements IEmailConnector {
const mailboxes = await this.withRetry(async () => await this.client.list());
const processableMailboxes = mailboxes.filter((mailbox) => {
// Exclude mailboxes that cannot be selected.
if (mailbox.flags.has('\\Noselect')) {
return false;
}
if (config.app.allInclusiveArchive) {
return true;
}

View File

@@ -98,52 +98,29 @@ export class MboxConnector implements IEmailConnector {
const mboxSplitter = new MboxSplitter();
const emailStream = fileStream.pipe(mboxSplitter);
try {
for await (const emailBuffer of emailStream) {
try {
const emailObject = await this.parseMessage(emailBuffer as Buffer, '');
yield emailObject;
} catch (error) {
logger.error(
{ error, file: this.credentials.uploadedFilePath },
'Failed to process a single message from mbox file. Skipping.'
);
}
}
} finally {
// Ensure all streams are properly closed before deleting the file.
if (fileStream instanceof Readable) {
fileStream.destroy();
}
if (emailStream instanceof Readable) {
emailStream.destroy();
}
// Wait for the streams to fully close to prevent race conditions with file deletion.
await new Promise((resolve) => {
if (fileStream instanceof Readable) {
fileStream.on('close', resolve);
} else {
resolve(true);
}
});
await new Promise((resolve) => {
if (emailStream instanceof Readable) {
emailStream.on('close', resolve);
} else {
resolve(true);
}
});
for await (const emailBuffer of emailStream) {
try {
await this.storage.delete(this.credentials.uploadedFilePath);
const emailObject = await this.parseMessage(emailBuffer as Buffer, '');
yield emailObject;
} catch (error) {
logger.error(
{ error, file: this.credentials.uploadedFilePath },
'Failed to delete mbox file after processing.'
'Failed to process a single message from mbox file. Skipping.'
);
}
}
// After the stream is fully consumed, delete the file.
// The `for await...of` loop ensures streams are properly closed on completion,
// so we can safely delete the file here without causing a hang.
try {
await this.storage.delete(this.credentials.uploadedFilePath);
} catch (error) {
logger.error(
{ error, file: this.credentials.uploadedFilePath },
'Failed to delete mbox file after processing.'
);
}
}
private async parseMessage(emlBuffer: Buffer, path: string): Promise<EmailObject> {