From 6a154a8f02ca313bdbf2987bc504c2fd3f64d258 Mon Sep 17 00:00:00 2001 From: Wayne <5291640+ringoinca@users.noreply.github.com> Date: Sat, 2 Aug 2025 12:16:02 +0300 Subject: [PATCH] Handle sync error: remove failed jobs, force sync --- docs/.vitepress/config.mts | 3 +- .../processors/continuous-sync.processor.ts | 30 ++++++++----------- .../processors/initial-import.processor.ts | 14 ++++++++- .../backend/src/services/IngestionService.ts | 21 ++++++++++++- .../routes/dashboard/ingestions/+page.svelte | 3 +- 5 files changed, 50 insertions(+), 21 deletions(-) diff --git a/docs/.vitepress/config.mts b/docs/.vitepress/config.mts index 1ccd9b3..a14024d 100644 --- a/docs/.vitepress/config.mts +++ b/docs/.vitepress/config.mts @@ -23,7 +23,8 @@ export default defineConfig({ nav: [ { text: 'Home', link: '/' }, { text: 'Github', link: 'https://github.com/LogicLabs-OU/OpenArchiver' }, - { text: "Website", link: 'https://openarchiver.com/' } + { text: "Website", link: 'https://openarchiver.com/' }, + { text: "Discord", link: 'https://discord.gg/Qpv4BmHp' } ], sidebar: [ { diff --git a/packages/backend/src/jobs/processors/continuous-sync.processor.ts b/packages/backend/src/jobs/processors/continuous-sync.processor.ts index 38d9a6d..78f0f39 100644 --- a/packages/backend/src/jobs/processors/continuous-sync.processor.ts +++ b/packages/backend/src/jobs/processors/continuous-sync.processor.ts @@ -24,22 +24,6 @@ export default async (job: Job) => { try { const jobs = []; - // if (!connector.listAllUsers) { - // // This is for single-mailbox providers like Generic IMAP - // let userEmail = 'Default'; - // if (connector instanceof ImapConnector) { - // userEmail = connector.returnImapUserEmail(); - // } - // jobs.push({ - // name: 'process-mailbox', - // queueName: 'ingestion', - // data: { - // ingestionSourceId: source.id, - // userEmail: userEmail - // } - // }); - // } else { - // For multi-mailbox providers like Google Workspace and M365 for await (const user of connector.listAllUsers()) { if (user.primaryEmail) { jobs.push({ @@ -48,6 +32,14 @@ export default async (job: Job) => { data: { ingestionSourceId: source.id, userEmail: user.primaryEmail + }, + opts: { + removeOnComplete: { + age: 60 * 10 // 10 minutes + }, + removeOnFail: { + age: 60 * 30 // 30 minutes + } } }); } @@ -62,7 +54,11 @@ export default async (job: Job) => { ingestionSourceId, isInitialImport: false }, - children: jobs + children: jobs, + opts: { + removeOnComplete: true, + removeOnFail: true + } }); } diff --git a/packages/backend/src/jobs/processors/initial-import.processor.ts b/packages/backend/src/jobs/processors/initial-import.processor.ts index 14665c6..f0b49bd 100644 --- a/packages/backend/src/jobs/processors/initial-import.processor.ts +++ b/packages/backend/src/jobs/processors/initial-import.processor.ts @@ -33,6 +33,14 @@ export default async (job: Job) => { data: { ingestionSourceId, userEmail: user.primaryEmail, + }, + opts: { + removeOnComplete: { + age: 60 * 10 // 10 minutes + }, + removeOnFail: { + age: 60 * 30 // 30 minutes + } } }); userCount++; @@ -49,7 +57,11 @@ export default async (job: Job) => { userCount, isInitialImport: true }, - children: jobs + children: jobs, + opts: { + removeOnComplete: true, + removeOnFail: true + } }); } else { // If there are no users, we can consider the import finished and set to active diff --git a/packages/backend/src/services/IngestionService.ts b/packages/backend/src/services/IngestionService.ts index e5701f1..6bb3508 100644 --- a/packages/backend/src/services/IngestionService.ts +++ b/packages/backend/src/services/IngestionService.ts @@ -10,6 +10,7 @@ import { and, eq } from 'drizzle-orm'; import { CryptoService } from './CryptoService'; import { EmailProviderFactory } from './EmailProviderFactory'; import { ingestionQueue } from '../jobs/queues'; +import type { JobType } from 'bullmq'; import { StorageService } from './StorageService'; import type { IInitialImportJob, EmailObject } from '@open-archiver/types'; import { archivedEmails, attachments as attachmentsSchema, emailAttachments } from '../database/schema'; @@ -142,11 +143,29 @@ export class IngestionService { public static async triggerForceSync(id: string): Promise { const source = await this.findById(id); - + logger.info({ ingestionSourceId: id }, 'Force syncing started.'); if (!source) { throw new Error('Ingestion source not found'); } + // Clean up existing jobs for this source to break any stuck flows + const jobTypes: JobType[] = ['active', 'waiting', 'failed', 'delayed', 'paused']; + const jobs = await ingestionQueue.getJobs(jobTypes); + for (const job of jobs) { + if (job.data.ingestionSourceId === id) { + try { + await job.remove(); + logger.info({ jobId: job.id, ingestionSourceId: id }, 'Removed stale job during force sync.'); + } catch (error) { + logger.error({ err: error, jobId: job.id }, 'Failed to remove stale job.'); + } + } + } + + // Reset status to 'active' + await this.update(id, { status: 'active', lastSyncStatusMessage: 'Force sync triggered by user.' }); + + await ingestionQueue.add('continuous-sync', { ingestionSourceId: source.id }); } diff --git a/packages/frontend/src/routes/dashboard/ingestions/+page.svelte b/packages/frontend/src/routes/dashboard/ingestions/+page.svelte index 744b5c8..5ecc7c9 100644 --- a/packages/frontend/src/routes/dashboard/ingestions/+page.svelte +++ b/packages/frontend/src/routes/dashboard/ingestions/+page.svelte @@ -179,7 +179,8 @@ openEditDialog(source)} >Edit - handleSync(source.id)}>Sync handleSync(source.id)} + >Force sync openDeleteDialog(source)}