* adding exports to backend package, page icons update * Integrity report PDF generation * Fixed inline attachment images not displaying in the email preview by modifying `EmailPreview.svelte`. The email HTML references embedded images via `cid:` URIs (e.g., `src="cid:ii_19c6d5f8d5eee7bd6d91"`), but the component never resolved those `cid:` references to actual image data, even though `postal-mime` already parses inline attachments with their `contentId` and binary `content`. The `emailHtml` derived value now calls `resolveContentIdReferences()` before rendering, so inline/embedded images display correctly in the iframe preview. * feat: strip non-inline attachments from EML before storage Add nodemailer dependency and emlUtils helper to remove non-inline attachments from .eml buffers during ingestion. This avoids double-storing attachment data since attachments are already stored separately. * upload error handing for file based ingestion * Use Postgres for sync session management * Google workspace / MS 365 duplicate check, avoid extra API call when previous ingestion fails * OpenAPI specs for API docs * code formatting * ran duplicate check for IMAP import, optimize message listing * Version update
5.3 KiB
Job Queue Service
This document describes the architecture of the job queue system, including the sync cycle coordination mechanism and relevant configuration options.
Architecture
The job queue system is built on BullMQ backed by Redis (Valkey). Two worker processes run independently:
- Ingestion worker (
ingestion.worker.ts) — processes theingestionqueue - Indexing worker (
indexing.worker.ts) — processes theindexingqueue
Queues
| Queue | Jobs | Purpose |
|---|---|---|
ingestion |
schedule-continuous-sync, continuous-sync, initial-import, process-mailbox, sync-cycle-finished |
Email ingestion and sync orchestration |
indexing |
index-email-batch |
Meilisearch document indexing |
Job Flow
[schedule-continuous-sync] (repeating cron)
└→ [continuous-sync] (per ingestion source)
└→ [process-mailbox] × N (one per user mailbox)
└→ [index-email-batch] (batched, on indexing queue)
└→ [sync-cycle-finished] (dispatched by the last mailbox job)
For initial imports, initial-import triggers the same process-mailbox → sync-cycle-finished flow.
Sync Cycle Coordination
Sync cycle completion (knowing when all mailboxes in a sync have finished) is coordinated via the sync_sessions PostgreSQL table rather than BullMQ's built-in flow/parent-child system.
Why: BullMQ's FlowProducer stores the entire parent/child relationship in Redis atomically. For large tenants with thousands of mailboxes, this creates large Redis writes and requires loading all child job return values into memory at once for aggregation.
How it works:
- When
initial-importorcontinuous-syncstarts, it creates async_sessionsrow withtotal_mailboxes = N. - Each
process-mailboxjob atomically incrementscompleted_mailboxesorfailed_mailboxeswhen it finishes, and merges itsSyncStateintoingestion_sources.sync_stateusing PostgreSQL's||jsonb operator. - The job that brings
completed + failedto equaltotaldispatches thesync-cycle-finishedjob. sync-cycle-finishedreads the aggregated results from the session row and finalizes the source status.- The session row is deleted after finalization.
Session Heartbeat
Each process-mailbox job updates last_activity_at on the session every time it flushes an email batch to the indexing queue. This prevents the stale session detector from treating an actively processing large mailbox as stuck.
Stale Session Detection
The schedule-continuous-sync job runs SyncSessionService.cleanStaleSessions() on every tick. A session is considered stale when last_activity_at has not been updated for 30 minutes, indicating the worker that created it has crashed before all mailbox jobs were enqueued.
When a stale session is detected:
- The associated ingestion source is set to
status: 'error'with a descriptive message. - The session row is deleted.
- On the next scheduler tick, the source is picked up as an
errorsource and a newcontinuous-syncjob is dispatched.
Already-ingested emails from the partial sync are preserved. The next sync skips them via duplicate detection (checkDuplicate()).
Configuration
| Environment Variable | Default | Description |
|---|---|---|
SYNC_FREQUENCY |
* * * * * |
Cron pattern for continuous sync scheduling |
INGESTION_WORKER_CONCURRENCY |
5 |
Number of process-mailbox jobs that run in parallel |
MEILI_INDEXING_BATCH |
500 |
Number of emails per index-email-batch job |
Tuning INGESTION_WORKER_CONCURRENCY
Each process-mailbox job holds at most one parsed email in memory at a time during the ingestion loop. At typical email sizes (~50KB average), memory pressure per concurrent job is low. Increase this value on servers with more RAM to process multiple mailboxes in parallel and reduce total sync time.
Tuning MEILI_INDEXING_BATCH
Each index-email-batch job loads the .eml file and all attachments from storage into memory for text extraction before sending to Meilisearch. Reduce this value if the indexing worker experiences memory pressure on deployments with large attachments.
Resilience
- Job retries: All jobs are configured with 5 retry attempts using exponential backoff (starting at 1 second). This handles transient API failures from email providers.
- Worker crash recovery: BullMQ detects stalled jobs (no heartbeat within
lockDuration) and re-queues them automatically. On retry, already-processed emails are skipped viacheckDuplicate(). - Partial sync recovery: Stale session detection handles the case where a worker crashes mid-dispatch, leaving some mailboxes never enqueued. The source is reset to
errorand the next scheduler tick retries the full sync.