diff --git a/packages/backend/src/jobs/processors/process-mailbox.processor.ts b/packages/backend/src/jobs/processors/process-mailbox.processor.ts index 1647fa6..0c6b18d 100644 --- a/packages/backend/src/jobs/processors/process-mailbox.processor.ts +++ b/packages/backend/src/jobs/processors/process-mailbox.processor.ts @@ -5,10 +5,9 @@ import { logger } from '../../config/logger'; import { EmailProviderFactory } from '../../services/EmailProviderFactory'; import { StorageService } from '../../services/StorageService'; -import { IngestionSource, SyncState } from '@open-archiver/types'; import { db } from '../../database'; import { ingestionSources } from '../../database/schema'; -import { eq } from 'drizzle-orm'; +import { eq, sql } from 'drizzle-orm'; export const processMailboxProcessor = async (job: Job) => { const { ingestionSourceId, userEmail } = job.data; @@ -33,35 +32,39 @@ export const processMailboxProcessor = async (job: Job 0) { - const currentSource = (await db - .select({ syncState: ingestionSources.syncState }) - .from(ingestionSources) - .where(eq(ingestionSources.id, ingestionSourceId))) as IngestionSource[]; - const currentSyncState = currentSource[0]?.syncState || {}; + // Atomically update the syncState JSONB field to prevent race conditions + const provider = Object.keys(newSyncState)[0] as keyof typeof newSyncState | undefined; - const mergedSyncState: SyncState = { ...currentSyncState }; + if (provider && newSyncState[provider]) { + let path: (string | number)[]; + let userState: any; - if (newSyncState.google) { - mergedSyncState.google = { ...mergedSyncState.google, ...newSyncState.google }; - } - if (newSyncState.microsoft) { - mergedSyncState.microsoft = { ...mergedSyncState.microsoft, ...newSyncState.microsoft }; - } - if (newSyncState.imap) { - mergedSyncState.imap = newSyncState.imap; + if (provider === 'imap') { + path = ['imap']; + userState = newSyncState.imap; + } else { + // Handles 'google' and 'microsoft' + path = [provider, userEmail]; + userState = (newSyncState[provider] as any)?.[userEmail]; } - await db - .update(ingestionSources) - .set({ - syncState: mergedSyncState, - updatedAt: new Date() - }) - .where(eq(ingestionSources.id, ingestionSourceId)); + if (userState) { + await db + .update(ingestionSources) + .set({ + syncState: sql`jsonb_set( + COALESCE(${ingestionSources.syncState}, '{}'::jsonb), + '{${sql.raw(path.join(','))}}', + ${JSON.stringify(userState)}::jsonb, + true + )`, + updatedAt: new Date() + }) + .where(eq(ingestionSources.id, ingestionSourceId)); + } else { + logger.warn({ ingestionSourceId, userEmail, provider }, `No sync state found for user under provider`); + } } logger.info({ ingestionSourceId, userEmail }, `Finished processing mailbox for user`); } catch (error) { diff --git a/packages/backend/src/services/ingestion-connectors/MicrosoftConnector.ts b/packages/backend/src/services/ingestion-connectors/MicrosoftConnector.ts index 38e69cb..468ac50 100644 --- a/packages/backend/src/services/ingestion-connectors/MicrosoftConnector.ts +++ b/packages/backend/src/services/ingestion-connectors/MicrosoftConnector.ts @@ -136,9 +136,16 @@ export class MicrosoftConnector implements IEmailConnector { syncState?: SyncState | null ): AsyncGenerator { const deltaToken = syncState?.microsoft?.[userEmail]?.deltaToken; - let requestUrl = deltaToken - ? deltaToken - : `/users/${userEmail}/mailFolders/AllItems/messages/delta`; + + if (!deltaToken) { + // Initial sync: fetch all messages and set the initial delta token. + yield* this.fetchAllMessagesAndSetDeltaToken(userEmail); + return; + } + + // Continuous sync: use the existing delta token to fetch changes. + this.newDeltaToken = deltaToken; // Preserve the token in case there are no new messages. + let requestUrl: string | undefined = deltaToken; try { while (requestUrl) { @@ -165,6 +172,30 @@ export class MicrosoftConnector implements IEmailConnector { } } + private async *fetchAllMessagesAndSetDeltaToken(userEmail: string): AsyncGenerator { + let requestUrl: string | undefined = `/users/${userEmail}/mailFolders/AllItems/messages/delta`; + this.newDeltaToken = undefined; // Ensure it starts clean for initial sync + + while (requestUrl) { + const response = await this.graphClient.api(requestUrl).get(); + + for (const message of response.value) { + if (message.id && !(message as any)['@removed']) { + const rawEmail = await this.getRawEmail(userEmail, message.id); + if (rawEmail) { + yield await this.parseEmail(rawEmail, message.id, userEmail); + } + } + } + + if (response['@odata.deltaLink']) { + this.newDeltaToken = response['@odata.deltaLink']; + } + + requestUrl = response['@odata.nextLink']; + } + } + private async getRawEmail(userEmail: string, messageId: string): Promise { try { const response = await this.graphClient.api(`/users/${userEmail}/messages/${messageId}/$value`).getStream(); diff --git a/packages/frontend/src/routes/dashboard/archived-emails/+page.svelte b/packages/frontend/src/routes/dashboard/archived-emails/+page.svelte index df22e43..32bcd1f 100644 --- a/packages/frontend/src/routes/dashboard/archived-emails/+page.svelte +++ b/packages/frontend/src/routes/dashboard/archived-emails/+page.svelte @@ -107,9 +107,11 @@ {new Date(email.sentAt).toLocaleString()} - - {email.subject} - + {email.senderEmail} {email.hasAttachments ? 'Yes' : 'No'}