Google workspace / MS 365 duplicate check, avoid extra API call when previous ingestion fails

This commit is contained in:
wayneshn
2026-03-19 17:12:34 +01:00
parent db525daceb
commit 3db04f9525
7 changed files with 1753 additions and 13 deletions

View File

@@ -0,0 +1,2 @@
ALTER TABLE "archived_emails" ADD COLUMN "provider_message_id" text;--> statement-breakpoint
CREATE INDEX "provider_msg_source_idx" ON "archived_emails" USING btree ("provider_message_id","ingestion_source_id");

File diff suppressed because it is too large Load Diff

View File

@@ -204,6 +204,13 @@
"when": 1773770326402,
"tag": "0028_youthful_kitty_pryde",
"breakpoints": true
},
{
"idx": 29,
"version": "7",
"when": 1773927678269,
"tag": "0029_lethal_brood",
"breakpoints": true
}
]
}

View File

@@ -12,6 +12,9 @@ export const archivedEmails = pgTable(
.references(() => ingestionSources.id, { onDelete: 'cascade' }),
userEmail: text('user_email').notNull(),
messageIdHeader: text('message_id_header'),
/** The provider-specific message ID (e.g., Gmail API ID, Graph API ID).
* Used by the pre-fetch duplicate check to avoid unnecessary API calls during retries. */
providerMessageId: text('provider_message_id'),
sentAt: timestamp('sent_at', { withTimezone: true }).notNull(),
subject: text('subject'),
senderName: text('sender_name'),
@@ -27,7 +30,10 @@ export const archivedEmails = pgTable(
path: text('path'),
tags: jsonb('tags'),
},
(table) => [index('thread_id_idx').on(table.threadId)]
(table) => [
index('thread_id_idx').on(table.threadId),
index('provider_msg_source_idx').on(table.providerMessageId, table.ingestionSourceId),
]
);
export const archivedEmailsRelations = relations(archivedEmails, ({ one }) => ({

View File

@@ -8,7 +8,7 @@ import type {
IngestionProvider,
PendingEmail,
} from '@open-archiver/types';
import { and, desc, eq } from 'drizzle-orm';
import { and, desc, eq, or } from 'drizzle-orm';
import { CryptoService } from './CryptoService';
import { EmailProviderFactory } from './EmailProviderFactory';
import { ingestionQueue } from '../jobs/queues';
@@ -392,8 +392,9 @@ export class IngestionService {
}
/**
* Quickly checks if an email exists in the database by its Message-ID header.
* This is used to skip downloading duplicate emails during ingestion.
* Pre-fetch duplicate check to avoid unnecessary API calls during ingestion.
* Checks both providerMessageId (for Google/Microsoft API IDs) and
* messageIdHeader (for IMAP/PST/EML/Mbox RFC Message-IDs and pre-migration rows).
*/
public static async doesEmailExist(
messageId: string,
@@ -401,12 +402,14 @@ export class IngestionService {
): Promise<boolean> {
const existingEmail = await db.query.archivedEmails.findFirst({
where: and(
eq(archivedEmails.messageIdHeader, messageId),
eq(archivedEmails.ingestionSourceId, ingestionSourceId)
eq(archivedEmails.ingestionSourceId, ingestionSourceId),
or(
eq(archivedEmails.providerMessageId, messageId),
eq(archivedEmails.messageIdHeader, messageId)
)
),
columns: { id: true },
});
return !!existingEmail;
}
@@ -463,6 +466,7 @@ export class IngestionService {
userEmail,
threadId: email.threadId,
messageIdHeader: messageId,
providerMessageId: email.id,
sentAt: email.receivedAt,
subject: email.subject,
senderName: email.from[0]?.name,

View File

@@ -225,7 +225,6 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
);
};
const threadId = getThreadId(parsedEmail.headers);
console.log('threadId', threadId);
yield {
id: msgResponse.data.id!,
threadId,
@@ -348,7 +347,6 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
);
};
const threadId = getThreadId(parsedEmail.headers);
console.log('threadId', threadId);
yield {
id: msgResponse.data.id!,
threadId,

View File

@@ -136,7 +136,8 @@ export class MicrosoftConnector implements IEmailConnector {
*/
public async *fetchEmails(
userEmail: string,
syncState?: SyncState | null
syncState?: SyncState | null,
checkDuplicate?: (messageId: string) => Promise<boolean>
): AsyncGenerator<EmailObject> {
this.newDeltaTokens = syncState?.microsoft?.[userEmail]?.deltaTokens || {};
@@ -152,7 +153,8 @@ export class MicrosoftConnector implements IEmailConnector {
userEmail,
folder.id,
folder.path,
this.newDeltaTokens[folder.id]
this.newDeltaTokens[folder.id],
checkDuplicate
);
}
}
@@ -214,7 +216,8 @@ export class MicrosoftConnector implements IEmailConnector {
userEmail: string,
folderId: string,
path: string,
deltaToken?: string
deltaToken?: string,
checkDuplicate?: (messageId: string) => Promise<boolean>
): AsyncGenerator<EmailObject> {
let requestUrl: string | undefined;
@@ -235,6 +238,15 @@ export class MicrosoftConnector implements IEmailConnector {
for (const message of response.value) {
if (message.id && !message['@removed']) {
// Skip fetching raw content for already-imported messages
if (checkDuplicate && (await checkDuplicate(message.id))) {
logger.debug(
{ messageId: message.id, userEmail },
'Skipping duplicate email (pre-check)'
);
continue;
}
const rawEmail = await this.getRawEmail(userEmail, message.id);
if (rawEmail) {
const emailObject = await this.parseEmail(
@@ -243,7 +255,7 @@ export class MicrosoftConnector implements IEmailConnector {
userEmail,
path
);
emailObject.threadId = message.conversationId; // Add conversationId as threadId
emailObject.threadId = message.conversationId;
yield emailObject;
}
}