feat(ingestion): optimize duplicate handling and fix race conditions in Google Workspace

- Implement fast duplicate check (by Message-ID) to skip full content download for existing emails in Google Workspace and IMAP connectors.
- Fix race condition in Google Workspace initial import by capturing `historyId` before listing messages, ensuring no data loss for incoming mail during import.
This commit is contained in:
wayneshn
2026-02-24 18:09:46 +01:00
parent 61f68ca9a6
commit a4e50f7c7f
5 changed files with 82 additions and 12 deletions

View File

@@ -43,7 +43,16 @@ export const processMailboxProcessor = async (job: Job<IProcessMailboxJob, SyncS
const connector = EmailProviderFactory.createConnector(source);
const ingestionService = new IngestionService();
for await (const email of connector.fetchEmails(userEmail, source.syncState)) {
// Create a callback to check for duplicates without fetching full email content
const checkDuplicate = async (messageId: string) => {
return await IngestionService.doesEmailExist(messageId, ingestionSourceId);
};
for await (const email of connector.fetchEmails(
userEmail,
source.syncState,
checkDuplicate
)) {
if (email) {
const processedEmail = await ingestionService.processEmail(
email,

View File

@@ -22,7 +22,8 @@ export interface IEmailConnector {
testConnection(): Promise<boolean>;
fetchEmails(
userEmail: string,
syncState?: SyncState | null
syncState?: SyncState | null,
checkDuplicate?: (messageId: string) => Promise<boolean>
): AsyncGenerator<EmailObject | null>;
getUpdatedSyncState(userEmail?: string): SyncState;
listAllUsers(): AsyncGenerator<MailboxUser>;

View File

@@ -390,6 +390,25 @@ export class IngestionService {
}
}
/**
* Quickly checks if an email exists in the database by its Message-ID header.
* This is used to skip downloading duplicate emails during ingestion.
*/
public static async doesEmailExist(
messageId: string,
ingestionSourceId: string
): Promise<boolean> {
const existingEmail = await db.query.archivedEmails.findFirst({
where: and(
eq(archivedEmails.messageIdHeader, messageId),
eq(archivedEmails.ingestionSourceId, ingestionSourceId)
),
columns: { id: true },
});
return !!existingEmail;
}
public async processEmail(
email: EmailObject,
source: IngestionSource,

View File

@@ -132,7 +132,8 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
*/
public async *fetchEmails(
userEmail: string,
syncState?: SyncState | null
syncState?: SyncState | null,
checkDuplicate?: (messageId: string) => Promise<boolean>
): AsyncGenerator<EmailObject> {
const authClient = this.getAuthClient(userEmail, [
'https://www.googleapis.com/auth/gmail.readonly',
@@ -144,7 +145,7 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
// If no sync state is provided for this user, this is an initial import. Get all messages.
if (!startHistoryId) {
yield* this.fetchAllMessagesForUser(gmail, userEmail);
yield* this.fetchAllMessagesForUser(gmail, userEmail, checkDuplicate);
return;
}
@@ -170,6 +171,16 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
if (messageAdded.message?.id) {
try {
const messageId = messageAdded.message.id;
// Optimization: Check for existence before fetching full content
if (checkDuplicate && (await checkDuplicate(messageId))) {
logger.debug(
{ messageId, userEmail },
'Skipping duplicate email (pre-check)'
);
continue;
}
const metadataResponse = await gmail.users.messages.get({
userId: userEmail,
id: messageId,
@@ -258,8 +269,17 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
private async *fetchAllMessagesForUser(
gmail: gmail_v1.Gmail,
userEmail: string
userEmail: string,
checkDuplicate?: (messageId: string) => Promise<boolean>
): AsyncGenerator<EmailObject> {
// Capture the history ID at the start to ensure no emails are missed during the import process.
// Any emails arriving during this import will be covered by the next sync starting from this point.
// Overlaps are handled by the duplicate check.
const profileResponse = await gmail.users.getProfile({ userId: userEmail });
if (profileResponse.data.historyId) {
this.newHistoryId = profileResponse.data.historyId;
}
let pageToken: string | undefined = undefined;
do {
const listResponse: Common.GaxiosResponseWithHTTP2<gmail_v1.Schema$ListMessagesResponse> =
@@ -277,6 +297,16 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
if (message.id) {
try {
const messageId = message.id;
// Optimization: Check for existence before fetching full content
if (checkDuplicate && (await checkDuplicate(messageId))) {
logger.debug(
{ messageId, userEmail },
'Skipping duplicate email (pre-check)'
);
continue;
}
const metadataResponse = await gmail.users.messages.get({
userId: userEmail,
id: messageId,
@@ -352,12 +382,6 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
}
pageToken = listResponse.data.nextPageToken ?? undefined;
} while (pageToken);
// After fetching all messages, get the latest history ID to use as the starting point for the next sync.
const profileResponse = await gmail.users.getProfile({ userId: userEmail });
if (profileResponse.data.historyId) {
this.newHistoryId = profileResponse.data.historyId;
}
}
public getUpdatedSyncState(userEmail: string): SyncState {

View File

@@ -142,7 +142,8 @@ export class ImapConnector implements IEmailConnector {
public async *fetchEmails(
userEmail: string,
syncState?: SyncState | null
syncState?: SyncState | null,
checkDuplicate?: (messageId: string) => Promise<boolean>
): AsyncGenerator<EmailObject | null> {
try {
// list all mailboxes first
@@ -218,6 +219,22 @@ export class ImapConnector implements IEmailConnector {
this.newMaxUids[mailboxPath] = msg.uid;
}
// Optimization: Verify existence using Message-ID from envelope before fetching full body
if (checkDuplicate && msg.envelope?.messageId) {
const isDuplicate = await checkDuplicate(msg.envelope.messageId);
if (isDuplicate) {
logger.debug(
{
mailboxPath,
uid: msg.uid,
messageId: msg.envelope.messageId,
},
'Skipping duplicate email (pre-check)'
);
continue;
}
}
logger.debug({ mailboxPath, uid: msg.uid }, 'Processing message');
if (msg.envelope && msg.source) {