IMAP: ingest all mailboxes, skip duplication

This commit is contained in:
Wayne
2025-07-26 15:11:18 +03:00
parent 6930162079
commit 8a74838f43
10 changed files with 272 additions and 45 deletions

View File

@@ -40,6 +40,7 @@ export default async (job: Job<IInitialImportJob>) => {
}
if (jobs.length > 0) {
logger.info({ ingestionSourceId, userCount }, 'Adding sync-cycle-finished job to the queue');
await flowProducer.add({
name: 'sync-cycle-finished',
queueName: 'ingestion',

View File

@@ -15,7 +15,7 @@ interface ISyncCycleFinishedJob {
export default async (job: Job<ISyncCycleFinishedJob, any, string>) => {
const { ingestionSourceId, userCount, isInitialImport } = job.data;
logger.info({ ingestionSourceId }, 'Sync cycle finished, processing results...');
logger.info({ ingestionSourceId, userCount, isInitialImport }, 'Sync cycle finished job started');
try {
const childrenJobs = await job.getChildrenValues<SyncState>();

View File

@@ -6,7 +6,7 @@ import type {
IngestionSource,
IngestionCredentials
} from '@open-archiver/types';
import { eq } from 'drizzle-orm';
import { and, eq } from 'drizzle-orm';
import { CryptoService } from './CryptoService';
import { EmailProviderFactory } from './EmailProviderFactory';
import { ingestionQueue } from '../jobs/queues';
@@ -203,6 +203,31 @@ export class IngestionService {
userEmail: string
): Promise<void> {
try {
// Generate a unique message ID for the email. If the email already has a message-id header, use that.
// Otherwise, generate a new one based on the email's hash, source ID, and email ID.
const messageIdHeader = email.headers.get('message-id');
let messageId: string | undefined;
if (Array.isArray(messageIdHeader)) {
messageId = messageIdHeader[0];
} else if (typeof messageIdHeader === 'string') {
messageId = messageIdHeader;
}
if (!messageId) {
messageId = `generated-${createHash('sha256').update(email.eml ?? Buffer.from(email.body, 'utf-8')).digest('hex')}-${source.id}-${email.id}`;
}
// Check if an email with the same message ID has already been imported for the current ingestion source. This is to prevent duplicate imports when an email is present in multiple mailboxes (e.g., "Inbox" and "All Mail").
const existingEmail = await db.query.archivedEmails.findFirst({
where: and(
eq(archivedEmails.messageIdHeader, messageId),
eq(archivedEmails.ingestionSourceId, source.id)
)
});
if (existingEmail) {
logger.info({ messageId, ingestionSourceId: source.id }, 'Skipping duplicate email');
return;
}
console.log('processing email, ', email.id, email.subject);
const emlBuffer = email.eml ?? Buffer.from(email.body, 'utf-8');
const emailHash = createHash('sha256').update(emlBuffer).digest('hex');
@@ -214,9 +239,7 @@ export class IngestionService {
.values({
ingestionSourceId: source.id,
userEmail,
messageIdHeader:
(email.headers['message-id'] as string) ??
`generated-${emailHash}-${source.id}-${email.id}`,
messageIdHeader: messageId,
sentAt: email.receivedAt,
subject: email.subject,
senderName: email.from[0]?.name,

View File

@@ -196,7 +196,7 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
subject: parsedEmail.subject || '',
body: parsedEmail.text || '',
html: parsedEmail.html || '',
headers: parsedEmail.headers as any,
headers: parsedEmail.headers,
attachments,
receivedAt: parsedEmail.date || new Date(),
};
@@ -260,7 +260,7 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
subject: parsedEmail.subject || '',
body: parsedEmail.text || '',
html: parsedEmail.html || '',
headers: parsedEmail.headers as any,
headers: parsedEmail.headers,
attachments,
receivedAt: parsedEmail.date || new Date(),
};

View File

@@ -5,7 +5,8 @@ import { simpleParser, ParsedMail, Attachment, AddressObject } from 'mailparser'
export class ImapConnector implements IEmailConnector {
private client: ImapFlow;
private newMaxUid: number = 0;
private newMaxUids: { [mailboxPath: string]: number; } = {};
private isConnected = false;
constructor(private credentials: GenericImapCredentials) {
this.client = new ImapFlow({
@@ -18,12 +19,45 @@ export class ImapConnector implements IEmailConnector {
},
logger: false, // Set to true for verbose logging
});
// Handles client-level errors, like unexpected disconnects, to prevent crashes.
this.client.on('error', (err) => {
console.error('IMAP client error:', err);
this.isConnected = false;
});
}
/**
* Establishes a connection to the IMAP server if not already connected.
*/
private async connect(): Promise<void> {
if (this.isConnected && this.client.usable) {
return;
}
try {
await this.client.connect();
this.isConnected = true;
} catch (err) {
this.isConnected = false;
console.error('IMAP connection failed:', err);
throw err;
}
}
/**
* Disconnects from the IMAP server if the connection is active.
*/
private async disconnect(): Promise<void> {
if (this.isConnected && this.client.usable) {
await this.client.logout();
this.isConnected = false;
}
}
public async testConnection(): Promise<boolean> {
try {
await this.client.connect();
await this.client.logout();
await this.connect();
await this.disconnect();
return true;
} catch (error) {
console.error('Failed to verify IMAP connection:', error);
@@ -50,42 +84,85 @@ export class ImapConnector implements IEmailConnector {
public returnImapUserEmail(): string {
return this.credentials.username;
}
public async *fetchEmails(userEmail: string, syncState?: SyncState | null): AsyncGenerator<EmailObject | null> {
await this.client.connect();
try {
const mailbox = await this.client.mailboxOpen('INBOX');
const lastUid = syncState?.imap?.maxUid;
// For continuous sync, we start with the last known UID.
// For initial sync, we start at 0 and find the highest UID.
this.newMaxUid = lastUid || 0;
// If it's an initial sync, we need to determine the highest UID in the mailbox
// to correctly set the state, even if we don't fetch anything.
if (!lastUid && mailbox.exists > 0) {
const lastMessage = await this.client.fetchOne(String(mailbox.exists), { uid: true });
if (lastMessage && lastMessage.uid > this.newMaxUid) {
this.newMaxUid = lastMessage.uid;
/**
* Wraps an IMAP operation with a retry mechanism to handle transient network errors.
* @param action The async function to execute.
* @param maxRetries The maximum number of retries.
* @returns The result of the action.
*/
private async withRetry<T>(action: () => Promise<T>, maxRetries = 3): Promise<T> {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
await this.connect();
return await action();
} catch (err: any) {
console.error(`IMAP operation failed on attempt ${attempt}:`, err.message);
this.isConnected = false; // Force reconnect on next attempt
if (attempt === maxRetries) {
console.error('IMAP operation failed after all retries.');
throw err;
}
// Wait for a short period before retrying
await new Promise(resolve => setTimeout(resolve, 1000 * attempt));
}
}
// This line should be unreachable
throw new Error('IMAP operation failed after all retries.');
}
const searchCriteria = lastUid ? { uid: `${lastUid + 1}:*` } : { all: true };
for await (const msg of this.client.fetch(searchCriteria, { envelope: true, source: true, bodyStructure: true, uid: true })) {
if (lastUid && msg.uid <= lastUid) {
continue; //in case imapflow returns one email even if it should return no email
public async *fetchEmails(userEmail: string, syncState?: SyncState | null): AsyncGenerator<EmailObject | null> {
try {
const mailboxes = await this.withRetry(() => this.client.list());
console.log('fetched mailboxes:', mailboxes);
const processableMailboxes = mailboxes.filter(mailbox => {
// filter out trash and all mail emails
if (mailbox.specialUse) {
const specialUse = mailbox.specialUse.toLowerCase();
if (specialUse === '\\junk' || specialUse === '\\trash' || specialUse === '\\all') {
return false;
}
}
// Fallback to checking flags
if (mailbox.flags.has('\\Noselect') || mailbox.flags.has('\\Trash') || mailbox.flags.has('\\Junk') || mailbox.flags.has('\\All')) {
return false;
}
if (msg.uid > this.newMaxUid) {
this.newMaxUid = msg.uid;
}
return true;
});
if (msg.envelope && msg.source) {
yield await this.parseMessage(msg);
for (const mailboxInfo of processableMailboxes) {
const mailboxPath = mailboxInfo.path;
const mailbox = await this.withRetry(() => this.client.mailboxOpen(mailboxPath));
const lastUid = syncState?.imap?.[mailboxPath]?.maxUid;
let currentMaxUid = lastUid || 0;
if (!lastUid && mailbox.exists > 0) {
const lastMessage = await this.client.fetchOne(String(mailbox.exists), { uid: true });
if (lastMessage && lastMessage.uid > currentMaxUid) {
currentMaxUid = lastMessage.uid;
}
}
this.newMaxUids[mailboxPath] = currentMaxUid;
const searchCriteria = lastUid ? { uid: `${lastUid + 1}:*` } : { all: true };
for await (const msg of this.client.fetch(searchCriteria, { envelope: true, source: true, bodyStructure: true, uid: true })) {
if (lastUid && msg.uid <= lastUid) {
continue;
}
if (msg.uid > this.newMaxUids[mailboxPath]) {
this.newMaxUids[mailboxPath] = msg.uid;
}
if (msg.envelope && msg.source) {
yield await this.parseMessage(msg);
}
}
}
} finally {
if (this.client.usable) await this.client.logout();
await this.disconnect();
}
}
@@ -113,7 +190,7 @@ export class ImapConnector implements IEmailConnector {
subject: parsedEmail.subject || '',
body: parsedEmail.text || '',
html: parsedEmail.html || '',
headers: parsedEmail.headers as any,
headers: parsedEmail.headers,
attachments,
receivedAt: parsedEmail.date || new Date(),
eml: msg.source
@@ -121,10 +198,133 @@ export class ImapConnector implements IEmailConnector {
}
public getUpdatedSyncState(): SyncState {
const imapSyncState: { [mailboxPath: string]: { maxUid: number; }; } = {};
for (const [path, uid] of Object.entries(this.newMaxUids)) {
imapSyncState[path] = { maxUid: uid };
}
return {
imap: {
maxUid: this.newMaxUid
}
imap: imapSyncState
};
}
}
/**
* Gmail:
* fetched mailboxes: [
[0] {
[0] path: 'INBOX',
[0] pathAsListed: 'INBOX',
[0] flags: Set(1) { '\\HasNoChildren' },
[0] delimiter: '/',
[0] listed: true,
[0] parentPath: '',
[0] parent: [],
[0] name: 'INBOX',
[0] subscribed: true,
[0] specialUse: '\\Inbox',
[0] specialUseSource: 'name'
[0] },
[0] {
[0] path: '[Gmail]/Starred',
[0] pathAsListed: '[Gmail]/Starred',
[0] flags: Set(2) { '\\Flagged', '\\HasNoChildren' },
[0] delimiter: '/',
[0] listed: true,
[0] parentPath: '[Gmail]',
[0] parent: [ '[Gmail]' ],
[0] name: 'Starred',
[0] subscribed: true,
[0] specialUse: '\\Flagged',
[0] specialUseSource: 'extension'
[0] },
[0] {
[0] path: '[Gmail]/Sent Mail',
[0] pathAsListed: '[Gmail]/Sent Mail',
[0] flags: Set(2) { '\\HasNoChildren', '\\Sent' },
[0] delimiter: '/',
[0] listed: true,
[0] parentPath: '[Gmail]',
[0] parent: [ '[Gmail]' ],
[0] name: 'Sent Mail',
[0] subscribed: true,
[0] specialUse: '\\Sent',
[0] specialUseSource: 'extension'
[0] },
[0] {
[0] path: '[Gmail]/Drafts',
[0] pathAsListed: '[Gmail]/Drafts',
[0] flags: Set(2) { '\\Drafts', '\\HasNoChildren' },
[0] delimiter: '/',
[0] listed: true,
[0] parentPath: '[Gmail]',
[0] parent: [ '[Gmail]' ],
[0] name: 'Drafts',
[0] subscribed: true,
[0] specialUse: '\\Drafts',
[0] specialUseSource: 'extension'
[0] },
[0] {
[0] path: '[Gmail]/All Mail',
[0] pathAsListed: '[Gmail]/All Mail',
[0] flags: Set(2) { '\\All', '\\HasNoChildren' },
[0] delimiter: '/',
[0] listed: true,
[0] parentPath: '[Gmail]',
[0] parent: [ '[Gmail]' ],
[0] name: 'All Mail',
[0] subscribed: true,
[0] specialUse: '\\All',
[0] specialUseSource: 'extension'
[0] },
[0] {
[0] path: '[Gmail]/Spam',
[0] pathAsListed: '[Gmail]/Spam',
[0] flags: Set(2) { '\\HasNoChildren', '\\Junk' },
[0] delimiter: '/',
[0] listed: true,
[0] parentPath: '[Gmail]',
[0] parent: [ '[Gmail]' ],
[0] name: 'Spam',
[0] subscribed: true,
[0] specialUse: '\\Junk',
[0] specialUseSource: 'extension'
[0] },
[0] {
[0] path: '[Gmail]/Trash',
[0] pathAsListed: '[Gmail]/Trash',
[0] flags: Set(2) { '\\HasNoChildren', '\\Trash' },
[0] delimiter: '/',
[0] listed: true,
[0] parentPath: '[Gmail]',
[0] parent: [ '[Gmail]' ],
[0] name: 'Trash',
[0] subscribed: true,
[0] specialUse: '\\Trash',
[0] specialUseSource: 'extension'
[0] },
[0] {
[0] path: '[Gmail]',
[0] pathAsListed: '[Gmail]',
[0] flags: Set(2) { '\\HasChildren', '\\Noselect' },
[0] delimiter: '/',
[0] listed: true,
[0] parentPath: '',
[0] parent: [],
[0] name: '[Gmail]',
[0] subscribed: true
[0] },
[0] {
[0] path: '[Gmail]/Important',
[0] pathAsListed: '[Gmail]/Important',
[0] flags: Set(2) { '\\HasNoChildren', '\\Important' },
[0] delimiter: '/',
[0] listed: true,
[0] parentPath: '[Gmail]',
[0] parent: [ '[Gmail]' ],
[0] name: 'Important',
[0] subscribed: true
[0] }
[0] ]
*/

View File

@@ -263,7 +263,7 @@ export class MicrosoftConnector implements IEmailConnector {
subject: parsedEmail.subject || '',
body: parsedEmail.text || '',
html: parsedEmail.html || '',
headers: parsedEmail.headers as any,
headers: parsedEmail.headers,
attachments,
receivedAt: parsedEmail.date || new Date(),
};

View File

@@ -33,6 +33,7 @@ const worker = new Worker('ingestion', processor, {
},
});
console.log('Ingestion worker started');
process.on('SIGINT', () => worker.close());

View File

@@ -37,8 +37,8 @@ export interface EmailObject {
body: string;
/** The HTML version of the email body, if available. */
html: string;
/** A record of all email headers, where keys are header names and values can be a string or an array of strings. */
headers: Record<string, string | string[]>;
/** A map of all email headers, where keys are header names and values can be a string, an array of strings, or a complex object. */
headers: Map<string, any>;
/** An array of `EmailAttachment` objects found in the email. */
attachments: EmailAttachment[];
/** The date and time when the email was received. */

View File

@@ -10,7 +10,9 @@ export type SyncState = {
};
};
imap?: {
maxUid: number;
[mailboxPath: string]: {
maxUid: number;
};
};
lastSyncTimestamp?: string;
};

File diff suppressed because one or more lines are too long