Imap batch process, handle rate limits gracefully

This commit is contained in:
Wayne
2025-08-08 14:56:21 +03:00
parent a87000f9dc
commit 29db34c5d8
3 changed files with 94 additions and 36 deletions

View File

@@ -48,12 +48,19 @@ export default async (job: Job<ISyncCycleFinishedJob, any, string>) => {
}
let message: string;
// Check for a specific rate-limit message from the successful jobs
const rateLimitMessage = successfulJobs.find(j => j.statusMessage)?.statusMessage;
if (failedJobs.length > 0) {
status = 'error';
const errorMessages = failedJobs.map(j => j.message).join('\n');
message = `Sync cycle completed with ${failedJobs.length} error(s):\n${errorMessages}`;
logger.error({ ingestionSourceId, errors: errorMessages }, 'Sync cycle finished with errors.');
} else {
} else if (rateLimitMessage) {
message = rateLimitMessage;
logger.warn({ ingestionSourceId, message }, 'Sync cycle paused due to rate limiting.');
}
else {
message = 'Continuous sync cycle finished successfully.';
if (isInitialImport) {
message = `Initial import finished for ${userCount} mailboxes.`;

View File

@@ -9,9 +9,14 @@ export class ImapConnector implements IEmailConnector {
private client: ImapFlow;
private newMaxUids: { [mailboxPath: string]: number; } = {};
private isConnected = false;
private statusMessage: string | undefined;
constructor(private credentials: GenericImapCredentials) {
this.client = new ImapFlow({
this.client = this.createClient();
}
private createClient(): ImapFlow {
const client = new ImapFlow({
host: this.credentials.host,
port: this.credentials.port,
secure: this.credentials.secure,
@@ -23,10 +28,12 @@ export class ImapConnector implements IEmailConnector {
});
// Handles client-level errors, like unexpected disconnects, to prevent crashes.
this.client.on('error', (err) => {
client.on('error', (err) => {
logger.error({ err }, 'IMAP client error');
this.isConnected = false;
});
return client;
}
/**
@@ -36,6 +43,12 @@ export class ImapConnector implements IEmailConnector {
if (this.isConnected && this.client.usable) {
return;
}
// If the client is not usable (e.g., after a logout), create a new one.
if (!this.client.usable) {
this.client = this.createClient();
}
try {
await this.client.connect();
this.isConnected = true;
@@ -100,7 +113,7 @@ export class ImapConnector implements IEmailConnector {
* @param maxRetries The maximum number of retries.
* @returns The result of the action.
*/
private async withRetry<T>(action: () => Promise<T>, maxRetries = 3): Promise<T> {
private async withRetry<T>(action: () => Promise<T>, maxRetries = 5): Promise<T> {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
await this.connect();
@@ -113,7 +126,10 @@ export class ImapConnector implements IEmailConnector {
throw err;
}
// Wait for a short period before retrying
await new Promise(resolve => setTimeout(resolve, 1000 * attempt));
const delay = Math.pow(2, attempt) * 1000;
const jitter = Math.random() * 1000;
logger.info(`Retrying in ${Math.round((delay + jitter) / 1000)}s`);
await new Promise(resolve => setTimeout(resolve, delay + jitter));
}
}
// This line should be unreachable
@@ -121,28 +137,32 @@ export class ImapConnector implements IEmailConnector {
}
public async *fetchEmails(userEmail: string, syncState?: SyncState | null): AsyncGenerator<EmailObject | null> {
try {
const mailboxes = await this.withRetry(() => this.client.list());
// console.log('fetched mailboxes:', mailboxes);
const processableMailboxes = mailboxes.filter(mailbox => {
// filter out trash and all mail emails
if (mailbox.specialUse) {
const specialUse = mailbox.specialUse.toLowerCase();
if (specialUse === '\\junk' || specialUse === '\\trash' || specialUse === '\\all') {
return false;
}
}
// Fallback to checking flags
if (mailbox.flags.has('\\Noselect') || mailbox.flags.has('\\Trash') || mailbox.flags.has('\\Junk') || mailbox.flags.has('\\All')) {
// list all mailboxes first
const mailboxes = await this.withRetry(async () => await this.client.list());
await this.disconnect();
const processableMailboxes = mailboxes.filter(mailbox => {
// filter out trash and all mail emails
if (mailbox.specialUse) {
const specialUse = mailbox.specialUse.toLowerCase();
if (specialUse === '\\junk' || specialUse === '\\trash' || specialUse === '\\all') {
return false;
}
}
// Fallback to checking flags
if (mailbox.flags.has('\\Noselect') || mailbox.flags.has('\\Trash') || mailbox.flags.has('\\Junk') || mailbox.flags.has('\\All')) {
return false;
}
return true;
});
return true;
});
for (const mailboxInfo of processableMailboxes) {
const mailboxPath = mailboxInfo.path;
const mailbox = await this.withRetry(() => this.client.mailboxOpen(mailboxPath));
for (const mailboxInfo of processableMailboxes) {
const mailboxPath = mailboxInfo.path;
logger.info({ mailboxPath }, 'Processing mailbox');
try {
const mailbox = await this.withRetry(async () => await this.client.mailboxOpen(mailboxPath));
const lastUid = syncState?.imap?.[mailboxPath]?.maxUid;
let currentMaxUid = lastUid || 0;
@@ -154,27 +174,51 @@ export class ImapConnector implements IEmailConnector {
}
this.newMaxUids[mailboxPath] = currentMaxUid;
const searchCriteria = lastUid ? { uid: `${lastUid + 1}:*` } : { all: true };
// Only fetch if the mailbox has messages, to avoid errors on empty mailboxes with some IMAP servers.
if (mailbox.exists > 0) {
for await (const msg of this.client.fetch(searchCriteria, { envelope: true, source: true, bodyStructure: true, uid: true })) {
if (lastUid && msg.uid <= lastUid) {
continue;
const BATCH_SIZE = 250; // A configurable batch size
let startUid = (lastUid || 0) + 1;
while (true) {
const endUid = startUid + BATCH_SIZE - 1;
const searchCriteria = { uid: `${startUid}:${endUid}` };
let messagesInBatch = 0;
for await (const msg of this.client.fetch(searchCriteria, { envelope: true, source: true, bodyStructure: true, uid: true })) {
messagesInBatch++;
if (lastUid && msg.uid <= lastUid) {
continue;
}
if (msg.uid > this.newMaxUids[mailboxPath]) {
this.newMaxUids[mailboxPath] = msg.uid;
}
if (msg.envelope && msg.source) {
yield await this.parseMessage(msg);
}
}
if (msg.uid > this.newMaxUids[mailboxPath]) {
this.newMaxUids[mailboxPath] = msg.uid;
// If this batch was smaller than the batch size, we've reached the end
if (messagesInBatch < BATCH_SIZE) {
break;
}
if (msg.envelope && msg.source) {
yield await this.parseMessage(msg);
}
// Move to the next batch
startUid = endUid + 1;
}
}
} catch (err: any) {
logger.error({ err, mailboxPath }, 'Failed to process mailbox');
// Check if the error indicates a persistent failure after retries
if (err.message.includes('IMAP operation failed after all retries')) {
this.statusMessage = 'Sync paused due to reaching the mail server rate limit. The process will automatically resume later.';
}
}
finally {
await this.disconnect();
}
} finally {
await this.disconnect();
}
}
@@ -217,8 +261,14 @@ export class ImapConnector implements IEmailConnector {
for (const [path, uid] of Object.entries(this.newMaxUids)) {
imapSyncState[path] = { maxUid: uid };
}
return {
const syncState: SyncState = {
imap: imapSyncState
};
if (this.statusMessage) {
syncState.statusMessage = this.statusMessage;
}
return syncState;
}
}

View File

@@ -15,6 +15,7 @@ export type SyncState = {
};
};
lastSyncTimestamp?: string;
statusMessage?: string;
};
export type IngestionProvider = 'google_workspace' | 'microsoft_365' | 'generic_imap' | 'pst_import';