From e7bb545cfa5e516497fa3f53d19180e0dc214a27 Mon Sep 17 00:00:00 2001 From: Wayne <5291640+ringoinca@users.noreply.github.com> Date: Tue, 22 Jul 2025 13:49:13 +0300 Subject: [PATCH] Continuous syncing fix --- .../processors/continuous-sync.processor.ts | 9 +++- .../processors/initial-import.processor.ts | 11 ++++- .../processors/process-mailbox.processor.ts | 4 +- .../src/services/EmailProviderFactory.ts | 3 +- .../backend/src/services/IngestionService.ts | 15 +++--- .../ingestion-connectors/ImapConnector.ts | 19 ++++--- .../custom/IngestionSourceForm.svelte | 5 ++ .../src/lib/components/ui/badge/badge.svelte | 49 +++++++++++++++++++ .../src/lib/components/ui/badge/index.ts | 2 + .../routes/dashboard/ingestions/+page.svelte | 33 +++++++++++-- packages/types/src/ingestion.types.ts | 10 ++-- packages/types/tsconfig.tsbuildinfo | 2 +- 12 files changed, 135 insertions(+), 27 deletions(-) create mode 100644 packages/frontend/src/lib/components/ui/badge/badge.svelte create mode 100644 packages/frontend/src/lib/components/ui/badge/index.ts diff --git a/packages/backend/src/jobs/processors/continuous-sync.processor.ts b/packages/backend/src/jobs/processors/continuous-sync.processor.ts index 01a9013..cd0ca84 100644 --- a/packages/backend/src/jobs/processors/continuous-sync.processor.ts +++ b/packages/backend/src/jobs/processors/continuous-sync.processor.ts @@ -4,6 +4,7 @@ import { IContinuousSyncJob } from '@open-archiver/types'; import { EmailProviderFactory } from '../../services/EmailProviderFactory'; import { flowProducer } from '../queues'; import { logger } from '../../config/logger'; +import { ImapConnector } from '../../services/ingestion-connectors/ImapConnector'; export default async (job: Job) => { const { ingestionSourceId } = job.data; @@ -26,12 +27,16 @@ export default async (job: Job) => { const jobs = []; if (!connector.listAllUsers) { // This is for single-mailbox providers like Generic IMAP + let userEmail = 'Default'; + if (connector instanceof ImapConnector) { + userEmail = connector.returnImapUserEmail(); + } jobs.push({ name: 'process-mailbox', queueName: 'ingestion', data: { ingestionSourceId: source.id, - userEmail: 'default' // A placeholder, as it's not needed for IMAP + userEmail: userEmail } }); } else { @@ -43,7 +48,7 @@ export default async (job: Job) => { queueName: 'ingestion', data: { ingestionSourceId: source.id, - userEmail: user.primaryEmail, + userEmail: user.primaryEmail } }); } diff --git a/packages/backend/src/jobs/processors/initial-import.processor.ts b/packages/backend/src/jobs/processors/initial-import.processor.ts index 6b7e521..7cf8f35 100644 --- a/packages/backend/src/jobs/processors/initial-import.processor.ts +++ b/packages/backend/src/jobs/processors/initial-import.processor.ts @@ -33,7 +33,7 @@ export default async (job: Job) => { queueName: 'ingestion', data: { ingestionSourceId, - userEmail: user.primaryEmail + userEmail: user.primaryEmail, } }); userCount++; @@ -62,6 +62,15 @@ export default async (job: Job) => { } else { // For other providers, we might trigger a simpler bulk import directly await new IngestionService().performBulkImport(job.data); + await flowProducer.add({ + name: 'sync-cycle-finished', + queueName: 'ingestion', + data: { + ingestionSourceId, + userCount: 1, + isInitialImport: true + } + }); } logger.info({ ingestionSourceId }, 'Finished initial import master job'); diff --git a/packages/backend/src/jobs/processors/process-mailbox.processor.ts b/packages/backend/src/jobs/processors/process-mailbox.processor.ts index 94e4e56..1647fa6 100644 --- a/packages/backend/src/jobs/processors/process-mailbox.processor.ts +++ b/packages/backend/src/jobs/processors/process-mailbox.processor.ts @@ -27,7 +27,9 @@ export const processMailboxProcessor = async (job: Job; - fetchEmails(userEmail: string, syncState?: SyncState | null): AsyncGenerator; + fetchEmails(userEmail: string, syncState?: SyncState | null): AsyncGenerator; getUpdatedSyncState(userEmail?: string): SyncState; listAllUsers?(): AsyncGenerator; + returnImapUserEmail?(): string; } export class EmailProviderFactory { diff --git a/packages/backend/src/services/IngestionService.ts b/packages/backend/src/services/IngestionService.ts index 643493a..c895f91 100644 --- a/packages/backend/src/services/IngestionService.ts +++ b/packages/backend/src/services/IngestionService.ts @@ -154,19 +154,20 @@ export class IngestionService { } } else { // For single-mailbox providers, dispatch a single job + // console.log('source.credentials ', source.credentials); await ingestionQueue.add('process-mailbox', { ingestionSourceId: source.id, - userEmail: 'default' // Placeholder, as it's not needed for IMAP + userEmail: source.credentials.type === 'generic_imap' ? source.credentials.username : 'Default' }); } - await IngestionService.update(ingestionSourceId, { - status: 'active', - lastSyncFinishedAt: new Date(), - lastSyncStatusMessage: 'Successfully initiated bulk import for all mailboxes.' - }); - console.log(`Bulk import job dispatch finished for source: ${source.name} (${source.id})`); + // await IngestionService.update(ingestionSourceId, { + // status: 'active', + // lastSyncFinishedAt: new Date(), + // lastSyncStatusMessage: 'Successfully initiated bulk import for all mailboxes.' + // }); + // console.log(`Bulk import job dispatch finished for source: ${source.name} (${source.id})`); } catch (error) { console.error(`Bulk import failed for source: ${source.name} (${source.id})`, error); await IngestionService.update(ingestionSourceId, { diff --git a/packages/backend/src/services/ingestion-connectors/ImapConnector.ts b/packages/backend/src/services/ingestion-connectors/ImapConnector.ts index 62265f5..faf9efa 100644 --- a/packages/backend/src/services/ingestion-connectors/ImapConnector.ts +++ b/packages/backend/src/services/ingestion-connectors/ImapConnector.ts @@ -30,8 +30,10 @@ export class ImapConnector implements IEmailConnector { return false; } } - - public async *fetchEmails(userEmail: string, syncState?: SyncState | null): AsyncGenerator { + public returnImapUserEmail(): string { + return this.credentials.username; + } + public async *fetchEmails(userEmail: string, syncState?: SyncState | null): AsyncGenerator { await this.client.connect(); try { const mailbox = await this.client.mailboxOpen('INBOX'); @@ -42,17 +44,22 @@ export class ImapConnector implements IEmailConnector { // Determine the highest UID in the mailbox currently. // This ensures that even if no new emails are fetched, the sync state is updated to the latest UID. if (mailbox.exists > 0) { - const highestUidInMailbox = mailbox.uidNext - 1; - if (highestUidInMailbox > this.newMaxUid) { - this.newMaxUid = highestUidInMailbox; + const lastMessage = await this.client.fetchOne(String(mailbox.exists), { uid: true }); + if (lastMessage && lastMessage.uid > this.newMaxUid) { + this.newMaxUid = lastMessage.uid; } } // If lastUid exists, fetch all emails with a UID greater than it. // Otherwise, fetch all emails. const searchCriteria = lastUid ? { uid: `${lastUid + 1}:*` } : { all: true }; - for await (const msg of this.client.fetch(searchCriteria, { envelope: true, source: true, bodyStructure: true, uid: true })) { + // Defensive check: Ensure we do not process emails we should have already synced. + if (lastUid && msg.uid <= lastUid) { + console.warn(`IMAP fetch returned UID ${msg.uid} which is not greater than last synced UID ${lastUid}. Skipping.`); + continue; + } + if (msg.uid > this.newMaxUid) { this.newMaxUid = msg.uid; } diff --git a/packages/frontend/src/lib/components/custom/IngestionSourceForm.svelte b/packages/frontend/src/lib/components/custom/IngestionSourceForm.svelte index a9d9001..386bed8 100644 --- a/packages/frontend/src/lib/components/custom/IngestionSourceForm.svelte +++ b/packages/frontend/src/lib/components/custom/IngestionSourceForm.svelte @@ -29,6 +29,11 @@ } }); + $effect(() => { + formData.providerConfig.type = formData.provider; + console.log(formData); + }); + const triggerContent = $derived( providerOptions.find((p) => p.value === formData.provider)?.label ?? 'Select a provider' ); diff --git a/packages/frontend/src/lib/components/ui/badge/badge.svelte b/packages/frontend/src/lib/components/ui/badge/badge.svelte new file mode 100644 index 0000000..7c242f7 --- /dev/null +++ b/packages/frontend/src/lib/components/ui/badge/badge.svelte @@ -0,0 +1,49 @@ + + + + + + {@render children?.()} + diff --git a/packages/frontend/src/lib/components/ui/badge/index.ts b/packages/frontend/src/lib/components/ui/badge/index.ts new file mode 100644 index 0000000..64e0aa9 --- /dev/null +++ b/packages/frontend/src/lib/components/ui/badge/index.ts @@ -0,0 +1,2 @@ +export { default as Badge } from "./badge.svelte"; +export { badgeVariants, type BadgeVariant } from "./badge.svelte"; diff --git a/packages/frontend/src/routes/dashboard/ingestions/+page.svelte b/packages/frontend/src/routes/dashboard/ingestions/+page.svelte index c877a23..f655cd8 100644 --- a/packages/frontend/src/routes/dashboard/ingestions/+page.svelte +++ b/packages/frontend/src/routes/dashboard/ingestions/+page.svelte @@ -9,6 +9,8 @@ import IngestionSourceForm from '$lib/components/custom/IngestionSourceForm.svelte'; import { api } from '$lib/api.client'; import type { IngestionSource, CreateIngestionSourceDto } from '@open-archiver/types'; + import Badge from '$lib/components/ui/badge/badge.svelte'; + import type { BadgeVariant } from '$lib/components/ui/badge/badge.svelte'; let { data }: { data: PageData } = $props(); @@ -88,6 +90,27 @@ } isDialogOpen = false; }; + + function getStatusClasses(status: IngestionSource['status']): string { + switch (status) { + case 'active': + return 'bg-green-100 text-green-800 dark:bg-green-900 dark:text-green-300'; + case 'paused': + return 'bg-gray-100 text-gray-800 dark:bg-gray-700 dark:text-gray-300'; + case 'error': + return 'bg-red-100 text-red-800 dark:bg-red-900 dark:text-red-300'; + case 'syncing': + return 'bg-blue-100 text-blue-800 dark:bg-blue-900 dark:text-blue-300'; + case 'importing': + return 'bg-purple-100 text-purple-800 dark:bg-purple-900 dark:text-purple-300'; + case 'pending_auth': + return 'bg-yellow-100 text-yellow-800 dark:bg-yellow-900 dark:text-yellow-300'; + case 'auth_success': + return 'bg-gray-100 text-gray-800 dark:bg-gray-700 dark:text-gray-300'; + default: + return 'bg-gray-100 text-gray-800 dark:bg-gray-700 dark:text-gray-300'; + } + }
@@ -115,11 +138,11 @@ {source.name} - {source.provider} - - - {source.status} - + {source.provider.split('_').join(' ')} + + + {source.status.split('_').join(' ')} +