Compare commits

...

2 Commits

Author SHA1 Message Date
wayneshn
a4e50f7c7f feat(ingestion): optimize duplicate handling and fix race conditions in Google Workspace
- Implement fast duplicate check (by Message-ID) to skip full content download for existing emails in Google Workspace and IMAP connectors.
- Fix race condition in Google Workspace initial import by capturing `historyId` before listing messages, ensuring no data loss for incoming mail during import.
2026-02-24 18:09:46 +01:00
wayneshn
61f68ca9a6 fix(backend): improve ingestion error handling and error messages
This commit introduces a "force delete" mechanism for Ingestion Sources and improves error messages for file-based connectors.

Changes:
- Update `IngestionService.delete` to accept a `force` flag, bypassing the `checkDeletionEnabled` check.
- Use `force` deletion when rolling back failed ingestion source creations (e.g., decryption errors or connection failures) to ensure cleanup even if deletion is globally disabled.
- Enhance error messages in `EMLConnector`, `MboxConnector`, and `PSTConnector` to distinguish between missing local files and failed uploads, providing more specific feedback to the user.
2026-02-24 12:15:49 +01:00
8 changed files with 127 additions and 23 deletions

View File

@@ -43,7 +43,16 @@ export const processMailboxProcessor = async (job: Job<IProcessMailboxJob, SyncS
const connector = EmailProviderFactory.createConnector(source);
const ingestionService = new IngestionService();
for await (const email of connector.fetchEmails(userEmail, source.syncState)) {
// Create a callback to check for duplicates without fetching full email content
const checkDuplicate = async (messageId: string) => {
return await IngestionService.doesEmailExist(messageId, ingestionSourceId);
};
for await (const email of connector.fetchEmails(
userEmail,
source.syncState,
checkDuplicate
)) {
if (email) {
const processedEmail = await ingestionService.processEmail(
email,

View File

@@ -22,7 +22,8 @@ export interface IEmailConnector {
testConnection(): Promise<boolean>;
fetchEmails(
userEmail: string,
syncState?: SyncState | null
syncState?: SyncState | null,
checkDuplicate?: (messageId: string) => Promise<boolean>
): AsyncGenerator<EmailObject | null>;
getUpdatedSyncState(userEmail?: string): SyncState;
listAllUsers(): AsyncGenerator<MailboxUser>;

View File

@@ -85,7 +85,7 @@ export class IngestionService {
const decryptedSource = this.decryptSource(newSource);
if (!decryptedSource) {
await this.delete(newSource.id, actor, actorIp);
await this.delete(newSource.id, actor, actorIp, true);
throw new Error(
'Failed to process newly created ingestion source due to a decryption error.'
);
@@ -107,7 +107,7 @@ export class IngestionService {
}
} catch (error) {
// If connection fails, delete the newly created source and throw the error.
await this.delete(decryptedSource.id, actor, actorIp);
await this.delete(decryptedSource.id, actor, actorIp, true);
throw error;
}
}
@@ -205,8 +205,15 @@ export class IngestionService {
return decryptedSource;
}
public static async delete(id: string, actor: User, actorIp: string): Promise<IngestionSource> {
checkDeletionEnabled();
public static async delete(
id: string,
actor: User,
actorIp: string,
force: boolean = false
): Promise<IngestionSource> {
if (!force) {
checkDeletionEnabled();
}
const source = await this.findById(id);
if (!source) {
throw new Error('Ingestion source not found');
@@ -383,6 +390,25 @@ export class IngestionService {
}
}
/**
* Quickly checks if an email exists in the database by its Message-ID header.
* This is used to skip downloading duplicate emails during ingestion.
*/
public static async doesEmailExist(
messageId: string,
ingestionSourceId: string
): Promise<boolean> {
const existingEmail = await db.query.archivedEmails.findFirst({
where: and(
eq(archivedEmails.messageIdHeader, messageId),
eq(archivedEmails.ingestionSourceId, ingestionSourceId)
),
columns: { id: true },
});
return !!existingEmail;
}
public async processEmail(
email: EmailObject,
source: IngestionSource,

View File

@@ -58,7 +58,7 @@ export class EMLConnector implements IEmailConnector {
try {
const filePath = this.getFilePath();
if (!filePath) {
throw Error('EML file path not provided.');
throw Error('EML Zip file path not provided.');
}
if (!filePath.includes('.zip')) {
throw Error('Provided file is not in the ZIP format.');
@@ -77,12 +77,21 @@ export class EMLConnector implements IEmailConnector {
}
if (!fileExist) {
throw Error('EML file not found or upload not finished yet, please wait.');
if (this.credentials.localFilePath) {
throw Error(`EML Zip file not found at path: ${this.credentials.localFilePath}`);
} else {
throw Error(
'Uploaded EML Zip file not found. The upload may not have finished yet, or it failed.'
);
}
}
return true;
} catch (error) {
logger.error({ error, credentials: this.credentials }, 'EML file validation failed.');
logger.error(
{ error, credentials: this.credentials },
'EML Zip file validation failed.'
);
throw error;
}
}

View File

@@ -132,7 +132,8 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
*/
public async *fetchEmails(
userEmail: string,
syncState?: SyncState | null
syncState?: SyncState | null,
checkDuplicate?: (messageId: string) => Promise<boolean>
): AsyncGenerator<EmailObject> {
const authClient = this.getAuthClient(userEmail, [
'https://www.googleapis.com/auth/gmail.readonly',
@@ -144,7 +145,7 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
// If no sync state is provided for this user, this is an initial import. Get all messages.
if (!startHistoryId) {
yield* this.fetchAllMessagesForUser(gmail, userEmail);
yield* this.fetchAllMessagesForUser(gmail, userEmail, checkDuplicate);
return;
}
@@ -170,6 +171,16 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
if (messageAdded.message?.id) {
try {
const messageId = messageAdded.message.id;
// Optimization: Check for existence before fetching full content
if (checkDuplicate && (await checkDuplicate(messageId))) {
logger.debug(
{ messageId, userEmail },
'Skipping duplicate email (pre-check)'
);
continue;
}
const metadataResponse = await gmail.users.messages.get({
userId: userEmail,
id: messageId,
@@ -258,8 +269,17 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
private async *fetchAllMessagesForUser(
gmail: gmail_v1.Gmail,
userEmail: string
userEmail: string,
checkDuplicate?: (messageId: string) => Promise<boolean>
): AsyncGenerator<EmailObject> {
// Capture the history ID at the start to ensure no emails are missed during the import process.
// Any emails arriving during this import will be covered by the next sync starting from this point.
// Overlaps are handled by the duplicate check.
const profileResponse = await gmail.users.getProfile({ userId: userEmail });
if (profileResponse.data.historyId) {
this.newHistoryId = profileResponse.data.historyId;
}
let pageToken: string | undefined = undefined;
do {
const listResponse: Common.GaxiosResponseWithHTTP2<gmail_v1.Schema$ListMessagesResponse> =
@@ -277,6 +297,16 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
if (message.id) {
try {
const messageId = message.id;
// Optimization: Check for existence before fetching full content
if (checkDuplicate && (await checkDuplicate(messageId))) {
logger.debug(
{ messageId, userEmail },
'Skipping duplicate email (pre-check)'
);
continue;
}
const metadataResponse = await gmail.users.messages.get({
userId: userEmail,
id: messageId,
@@ -352,12 +382,6 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
}
pageToken = listResponse.data.nextPageToken ?? undefined;
} while (pageToken);
// After fetching all messages, get the latest history ID to use as the starting point for the next sync.
const profileResponse = await gmail.users.getProfile({ userId: userEmail });
if (profileResponse.data.historyId) {
this.newHistoryId = profileResponse.data.historyId;
}
}
public getUpdatedSyncState(userEmail: string): SyncState {

View File

@@ -142,7 +142,8 @@ export class ImapConnector implements IEmailConnector {
public async *fetchEmails(
userEmail: string,
syncState?: SyncState | null
syncState?: SyncState | null,
checkDuplicate?: (messageId: string) => Promise<boolean>
): AsyncGenerator<EmailObject | null> {
try {
// list all mailboxes first
@@ -218,6 +219,22 @@ export class ImapConnector implements IEmailConnector {
this.newMaxUids[mailboxPath] = msg.uid;
}
// Optimization: Verify existence using Message-ID from envelope before fetching full body
if (checkDuplicate && msg.envelope?.messageId) {
const isDuplicate = await checkDuplicate(msg.envelope.messageId);
if (isDuplicate) {
logger.debug(
{
mailboxPath,
uid: msg.uid,
messageId: msg.envelope.messageId,
},
'Skipping duplicate email (pre-check)'
);
continue;
}
}
logger.debug({ mailboxPath, uid: msg.uid }, 'Processing message');
if (msg.envelope && msg.source) {

View File

@@ -82,12 +82,21 @@ export class MboxConnector implements IEmailConnector {
}
if (!fileExist) {
throw Error('Mbox file not found or upload not finished yet, please wait.');
if (this.credentials.localFilePath) {
throw Error(`Mbox file not found at path: ${this.credentials.localFilePath}`);
} else {
throw Error(
'Uploaded Mbox file not found. The upload may not have finished yet, or it failed.'
);
}
}
return true;
} catch (error) {
logger.error({ error, credentials: this.credentials }, 'Mbox file validation failed.');
logger.error(
{ error, credentials: this.credentials },
'Mbox file validation failed.'
);
throw error;
}
}

View File

@@ -161,11 +161,20 @@ export class PSTConnector implements IEmailConnector {
}
if (!fileExist) {
throw Error('PST file not found or upload not finished yet, please wait.');
if (this.credentials.localFilePath) {
throw Error(`PST file not found at path: ${this.credentials.localFilePath}`);
} else {
throw Error(
'Uploaded PST file not found. The upload may not have finished yet, or it failed.'
);
}
}
return true;
} catch (error) {
logger.error({ error, credentials: this.credentials }, 'PST file validation failed.');
logger.error(
{ error, credentials: this.credentials },
'PST file validation failed.'
);
throw error;
}
}