fix(IMAP): Share connections between each fetch email action

This commit is contained in:
Wayne
2025-09-01 12:21:11 +03:00
parent 21fe610cad
commit bbe63bd0e2
2 changed files with 88 additions and 87 deletions

View File

@@ -99,7 +99,7 @@ export class IndexingService {
archivedEmailId,
email.userEmail || ''
);
console.log(document);
// console.log(document);
await this.searchService.addDocuments('emails', [document], 'id');
}
@@ -129,7 +129,7 @@ export class IndexingService {
// skip attachment or fail the job
}
}
console.log('email.userEmail', userEmail);
// console.log('email.userEmail', userEmail);
return {
id: archivedEmailId,
userEmail: userEmail,
@@ -165,7 +165,7 @@ export class IndexingService {
'';
const recipients = email.recipients as DbRecipients;
console.log('email.userEmail', email.userEmail);
// console.log('email.userEmail', email.userEmail);
return {
id: email.id,
userEmail: userEmail,

View File

@@ -149,107 +149,108 @@ export class ImapConnector implements IEmailConnector {
userEmail: string,
syncState?: SyncState | null
): AsyncGenerator<EmailObject | null> {
// list all mailboxes first
const mailboxes = await this.withRetry(async () => await this.client.list());
await this.disconnect();
try {
// list all mailboxes first
const mailboxes = await this.withRetry(async () => await this.client.list());
const processableMailboxes = mailboxes.filter((mailbox) => {
// filter out trash and all mail emails
if (mailbox.specialUse) {
const specialUse = mailbox.specialUse.toLowerCase();
if (specialUse === '\\junk' || specialUse === '\\trash' || specialUse === '\\all') {
const processableMailboxes = mailboxes.filter((mailbox) => {
// filter out trash and all mail emails
if (mailbox.specialUse) {
const specialUse = mailbox.specialUse.toLowerCase();
if (specialUse === '\\junk' || specialUse === '\\trash' || specialUse === '\\all') {
return false;
}
}
// Fallback to checking flags
if (
mailbox.flags.has('\\Noselect') ||
mailbox.flags.has('\\Trash') ||
mailbox.flags.has('\\Junk') ||
mailbox.flags.has('\\All')
) {
return false;
}
}
// Fallback to checking flags
if (
mailbox.flags.has('\\Noselect') ||
mailbox.flags.has('\\Trash') ||
mailbox.flags.has('\\Junk') ||
mailbox.flags.has('\\All')
) {
return false;
}
return true;
});
return true;
});
for (const mailboxInfo of processableMailboxes) {
const mailboxPath = mailboxInfo.path;
logger.info({ mailboxPath }, 'Processing mailbox');
for (const mailboxInfo of processableMailboxes) {
const mailboxPath = mailboxInfo.path;
logger.info({ mailboxPath }, 'Processing mailbox');
try {
const mailbox = await this.withRetry(
async () => await this.client.mailboxOpen(mailboxPath)
);
const lastUid = syncState?.imap?.[mailboxPath]?.maxUid;
let currentMaxUid = lastUid || 0;
try {
const mailbox = await this.withRetry(
async () => await this.client.mailboxOpen(mailboxPath)
);
const lastUid = syncState?.imap?.[mailboxPath]?.maxUid;
let currentMaxUid = lastUid || 0;
if (mailbox.exists > 0) {
const lastMessage = await this.client.fetchOne(String(mailbox.exists), {
uid: true,
});
if (lastMessage && lastMessage.uid > currentMaxUid) {
currentMaxUid = lastMessage.uid;
}
}
// Initialize with last synced UID, not the maximum UID in mailbox
this.newMaxUids[mailboxPath] = lastUid || 0;
// Only fetch if the mailbox has messages, to avoid errors on empty mailboxes with some IMAP servers.
if (mailbox.exists > 0) {
const BATCH_SIZE = 250; // A configurable batch size
let startUid = (lastUid || 0) + 1;
const maxUidToFetch = currentMaxUid;
while (startUid <= maxUidToFetch) {
const endUid = Math.min(startUid + BATCH_SIZE - 1, maxUidToFetch);
const searchCriteria = { uid: `${startUid}:${endUid}` };
for await (const msg of this.client.fetch(searchCriteria, {
envelope: true,
source: true,
bodyStructure: true,
if (mailbox.exists > 0) {
const lastMessage = await this.client.fetchOne(String(mailbox.exists), {
uid: true,
})) {
if (lastUid && msg.uid <= lastUid) {
continue;
}
});
if (lastMessage && lastMessage.uid > currentMaxUid) {
currentMaxUid = lastMessage.uid;
}
}
if (msg.uid > this.newMaxUids[mailboxPath]) {
this.newMaxUids[mailboxPath] = msg.uid;
}
// Initialize with last synced UID, not the maximum UID in mailbox
this.newMaxUids[mailboxPath] = lastUid || 0;
logger.debug({ mailboxPath, uid: msg.uid }, 'Processing message');
// Only fetch if the mailbox has messages, to avoid errors on empty mailboxes with some IMAP servers.
if (mailbox.exists > 0) {
const BATCH_SIZE = 250; // A configurable batch size
let startUid = (lastUid || 0) + 1;
const maxUidToFetch = currentMaxUid;
if (msg.envelope && msg.source) {
try {
yield await this.parseMessage(msg, mailboxPath);
} catch (err: any) {
logger.error(
{ err, mailboxPath, uid: msg.uid },
'Failed to parse message'
);
throw err;
while (startUid <= maxUidToFetch) {
const endUid = Math.min(startUid + BATCH_SIZE - 1, maxUidToFetch);
const searchCriteria = { uid: `${startUid}:${endUid}` };
for await (const msg of this.client.fetch(searchCriteria, {
envelope: true,
source: true,
bodyStructure: true,
uid: true,
})) {
if (lastUid && msg.uid <= lastUid) {
continue;
}
if (msg.uid > this.newMaxUids[mailboxPath]) {
this.newMaxUids[mailboxPath] = msg.uid;
}
logger.debug({ mailboxPath, uid: msg.uid }, 'Processing message');
if (msg.envelope && msg.source) {
try {
yield await this.parseMessage(msg, mailboxPath);
} catch (err: any) {
logger.error(
{ err, mailboxPath, uid: msg.uid },
'Failed to parse message'
);
throw err;
}
}
}
}
// Move to the next batch
startUid = endUid + 1;
// Move to the next batch
startUid = endUid + 1;
}
}
} catch (err: any) {
logger.error({ err, mailboxPath }, 'Failed to process mailbox');
// Check if the error indicates a persistent failure after retries
if (err.message.includes('IMAP operation failed after all retries')) {
this.statusMessage =
'Sync paused due to reaching the mail server rate limit. The process will automatically resume later.';
}
}
} catch (err: any) {
logger.error({ err, mailboxPath }, 'Failed to process mailbox');
// Check if the error indicates a persistent failure after retries
if (err.message.includes('IMAP operation failed after all retries')) {
this.statusMessage =
'Sync paused due to reaching the mail server rate limit. The process will automatically resume later.';
}
} finally {
await this.disconnect();
}
} finally {
await this.disconnect();
}
}