Use Postgres for sync session management

This commit is contained in:
wayneshn
2026-03-18 15:42:41 +01:00
parent 33c3f5acfd
commit db525daceb
21 changed files with 4002 additions and 184 deletions

View File

@@ -13,6 +13,8 @@ ORIGIN=$APP_URL
SYNC_FREQUENCY='* * * * *'
# Set to 'true' to include Junk and Trash folders in the email archive. Defaults to false.
ALL_INCLUSIVE_ARCHIVE=false
# Number of mailbox jobs that run concurrently in the ingestion worker. Increase on servers with more RAM.
INGESTION_WORKER_CONCURRENCY=5
# --- Docker Compose Service Configuration ---
# These variables are used by docker-compose.yml to configure the services. Leave them unchanged if you use Docker services for Postgresql, Valkey (Redis) and Meilisearch. If you decide to use your own instances of these services, you can substitute them with your own connection credentials.

View File

@@ -4,7 +4,14 @@ The Jobs API provides endpoints for monitoring the job queues and the jobs withi
## Overview
Open Archiver uses a job queue system to handle asynchronous tasks like email ingestion and indexing. The system is built on Redis and BullMQ and uses a producer-consumer pattern.
Open Archiver uses a job queue system to handle asynchronous tasks like email ingestion and indexing. The system is built on Redis and BullMQ.
There are two queues:
- **`ingestion`** — handles all email ingestion and sync jobs (`initial-import`, `continuous-sync`, `process-mailbox`, `sync-cycle-finished`, `schedule-continuous-sync`)
- **`indexing`** — handles batched Meilisearch document indexing (`index-email-batch`)
Sync cycle coordination (tracking when all mailboxes in a sync have completed) is managed via the `sync_sessions` database table rather than BullMQ's built-in flow system. This keeps Redis memory usage stable regardless of how many mailboxes are being synced.
### Job Statuses

View File

@@ -0,0 +1,81 @@
# Job Queue Service
This document describes the architecture of the job queue system, including the sync cycle coordination mechanism and relevant configuration options.
## Architecture
The job queue system is built on [BullMQ](https://docs.bullmq.io/) backed by Redis (Valkey). Two worker processes run independently:
- **Ingestion worker** (`ingestion.worker.ts`) — processes the `ingestion` queue
- **Indexing worker** (`indexing.worker.ts`) — processes the `indexing` queue
### Queues
| Queue | Jobs | Purpose |
| ----------- | --------------------------------------------------------------------------------------------------------- | -------------------------------------- |
| `ingestion` | `schedule-continuous-sync`, `continuous-sync`, `initial-import`, `process-mailbox`, `sync-cycle-finished` | Email ingestion and sync orchestration |
| `indexing` | `index-email-batch` | Meilisearch document indexing |
### Job Flow
```
[schedule-continuous-sync] (repeating cron)
└→ [continuous-sync] (per ingestion source)
└→ [process-mailbox] × N (one per user mailbox)
└→ [index-email-batch] (batched, on indexing queue)
└→ [sync-cycle-finished] (dispatched by the last mailbox job)
```
For initial imports, `initial-import` triggers the same `process-mailbox``sync-cycle-finished` flow.
## Sync Cycle Coordination
Sync cycle completion (knowing when all mailboxes in a sync have finished) is coordinated via the `sync_sessions` PostgreSQL table rather than BullMQ's built-in flow/parent-child system.
**Why:** BullMQ's `FlowProducer` stores the entire parent/child relationship in Redis atomically. For large tenants with thousands of mailboxes, this creates large Redis writes and requires loading all child job return values into memory at once for aggregation.
**How it works:**
1. When `initial-import` or `continuous-sync` starts, it creates a `sync_sessions` row with `total_mailboxes = N`.
2. Each `process-mailbox` job atomically increments `completed_mailboxes` or `failed_mailboxes` when it finishes, and merges its `SyncState` into `ingestion_sources.sync_state` using PostgreSQL's `||` jsonb operator.
3. The job that brings `completed + failed` to equal `total` dispatches the `sync-cycle-finished` job.
4. `sync-cycle-finished` reads the aggregated results from the session row and finalizes the source status.
5. The session row is deleted after finalization.
### Session Heartbeat
Each `process-mailbox` job updates `last_activity_at` on the session every time it flushes an email batch to the indexing queue. This prevents the stale session detector from treating an actively processing large mailbox as stuck.
### Stale Session Detection
The `schedule-continuous-sync` job runs `SyncSessionService.cleanStaleSessions()` on every tick. A session is considered stale when `last_activity_at` has not been updated for 30 minutes, indicating the worker that created it has crashed before all mailbox jobs were enqueued.
When a stale session is detected:
1. The associated ingestion source is set to `status: 'error'` with a descriptive message.
2. The session row is deleted.
3. On the next scheduler tick, the source is picked up as an `error` source and a new `continuous-sync` job is dispatched.
Already-ingested emails from the partial sync are preserved. The next sync skips them via duplicate detection (`checkDuplicate()`).
## Configuration
| Environment Variable | Default | Description |
| ------------------------------ | ----------- | ----------------------------------------------------- |
| `SYNC_FREQUENCY` | `* * * * *` | Cron pattern for continuous sync scheduling |
| `INGESTION_WORKER_CONCURRENCY` | `5` | Number of `process-mailbox` jobs that run in parallel |
| `MEILI_INDEXING_BATCH` | `500` | Number of emails per `index-email-batch` job |
### Tuning `INGESTION_WORKER_CONCURRENCY`
Each `process-mailbox` job holds at most one parsed email in memory at a time during the ingestion loop. At typical email sizes (~50KB average), memory pressure per concurrent job is low. Increase this value on servers with more RAM to process multiple mailboxes in parallel and reduce total sync time.
### Tuning `MEILI_INDEXING_BATCH`
Each `index-email-batch` job loads the `.eml` file and all attachments from storage into memory for text extraction before sending to Meilisearch. Reduce this value if the indexing worker experiences memory pressure on deployments with large attachments.
## Resilience
- **Job retries:** All jobs are configured with 5 retry attempts using exponential backoff (starting at 1 second). This handles transient API failures from email providers.
- **Worker crash recovery:** BullMQ detects stalled jobs (no heartbeat within `lockDuration`) and re-queues them automatically. On retry, already-processed emails are skipped via `checkDuplicate()`.
- **Partial sync recovery:** Stale session detection handles the case where a worker crashes mid-dispatch, leaving some mailboxes never enqueued. The source is reset to `error` and the next scheduler tick retries the full sync.

View File

@@ -8,8 +8,8 @@
"build:enterprise": "cross-env VITE_ENTERPRISE_MODE=true pnpm build",
"start:oss": "dotenv -- concurrently \"node apps/open-archiver/dist/index.js\" \"pnpm --filter @open-archiver/frontend start\"",
"start:enterprise": "dotenv -- concurrently \"node apps/open-archiver-enterprise/dist/index.js\" \"pnpm --filter @open-archiver/frontend start\"",
"dev:enterprise": "cross-env VITE_ENTERPRISE_MODE=true dotenv -- pnpm --filter \"@open-archiver/*\" --filter \"open-archiver-enterprise-app\" --parallel dev",
"dev:oss": "dotenv -- pnpm --filter \"./packages/*\" --filter \"!./packages/@open-archiver/enterprise\" --filter \"open-archiver-app\" --parallel dev",
"dev:enterprise": "cross-env VITE_ENTERPRISE_MODE=true dotenv -- pnpm --filter \"@open-archiver/*\" --filter \"open-archiver-enterprise-app\" --parallel dev & pnpm run start:workers:dev",
"dev:oss": "dotenv -- pnpm --filter \"./packages/*\" --filter \"!./packages/@open-archiver/enterprise\" --filter \"open-archiver-app\" --parallel dev & pnpm run start:workers:dev",
"build": "pnpm --filter \"./packages/*\" --filter \"./apps/*\" build",
"start": "dotenv -- pnpm --filter \"open-archiver-app\" --parallel start",
"start:workers": "dotenv -- concurrently \"pnpm --filter @open-archiver/backend start:ingestion-worker\" \"pnpm --filter @open-archiver/backend start:indexing-worker\" \"pnpm --filter @open-archiver/backend start:sync-scheduler\"",

View File

@@ -0,0 +1,12 @@
CREATE TABLE "sync_sessions" (
"id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL,
"ingestion_source_id" uuid NOT NULL,
"is_initial_import" boolean DEFAULT false NOT NULL,
"total_mailboxes" integer DEFAULT 0 NOT NULL,
"completed_mailboxes" integer DEFAULT 0 NOT NULL,
"failed_mailboxes" integer DEFAULT 0 NOT NULL,
"error_messages" text[] DEFAULT '{}' NOT NULL,
"created_at" timestamp with time zone DEFAULT now() NOT NULL
);
--> statement-breakpoint
ALTER TABLE "sync_sessions" ADD CONSTRAINT "sync_sessions_ingestion_source_id_ingestion_sources_id_fk" FOREIGN KEY ("ingestion_source_id") REFERENCES "public"."ingestion_sources"("id") ON DELETE cascade ON UPDATE no action;

View File

@@ -0,0 +1 @@
ALTER TABLE "sync_sessions" ADD COLUMN "last_activity_at" timestamp with time zone DEFAULT now() NOT NULL;

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -190,6 +190,20 @@
"when": 1773326266420,
"tag": "0026_pink_fantastic_four",
"breakpoints": true
},
{
"idx": 27,
"version": "7",
"when": 1773768709477,
"tag": "0027_black_morph",
"breakpoints": true
},
{
"idx": 28,
"version": "7",
"when": 1773770326402,
"tag": "0028_youthful_kitty_pryde",
"breakpoints": true
}
]
}

View File

@@ -9,3 +9,4 @@ export * from './schema/system-settings';
export * from './schema/api-keys';
export * from './schema/audit-logs';
export * from './schema/enums';
export * from './schema/sync-sessions';

View File

@@ -0,0 +1,36 @@
import { boolean, integer, pgTable, text, timestamp, uuid } from 'drizzle-orm/pg-core';
import { ingestionSources } from './ingestion-sources';
import { relations } from 'drizzle-orm';
/**
* Tracks the progress of a single sync cycle (initial import or continuous sync).
* Used as the coordination layer to replace BullMQ FlowProducer parent/child tracking.
* Each process-mailbox job atomically increments completed/failed counters here,
* and the last job to finish dispatches the sync-cycle-finished job.
*/
export const syncSessions = pgTable('sync_sessions', {
id: uuid('id').primaryKey().defaultRandom(),
ingestionSourceId: uuid('ingestion_source_id')
.notNull()
.references(() => ingestionSources.id, { onDelete: 'cascade' }),
isInitialImport: boolean('is_initial_import').notNull().default(false),
totalMailboxes: integer('total_mailboxes').notNull().default(0),
completedMailboxes: integer('completed_mailboxes').notNull().default(0),
failedMailboxes: integer('failed_mailboxes').notNull().default(0),
/** Aggregated error messages from all failed process-mailbox jobs */
errorMessages: text('error_messages').array().notNull().default([]),
createdAt: timestamp('created_at', { withTimezone: true }).notNull().defaultNow(),
/**
* Updated each time a process-mailbox job reports its result.
* Used to detect genuinely stuck sessions (no activity for N minutes) vs.
* large imports that are still actively running.
*/
lastActivityAt: timestamp('last_activity_at', { withTimezone: true }).notNull().defaultNow(),
});
export const syncSessionsRelations = relations(syncSessions, ({ one }) => ({
ingestionSource: one(ingestionSources, {
fields: [syncSessions.ingestionSourceId],
references: [ingestionSources.id],
}),
}));

View File

@@ -2,7 +2,8 @@ import { Job } from 'bullmq';
import { IngestionService } from '../../services/IngestionService';
import { IContinuousSyncJob } from '@open-archiver/types';
import { EmailProviderFactory } from '../../services/EmailProviderFactory';
import { flowProducer } from '../queues';
import { ingestionQueue } from '../queues';
import { SyncSessionService } from '../../services/SyncSessionService';
import { logger } from '../../config/logger';
export default async (job: Job<IContinuousSyncJob>) => {
@@ -26,50 +27,54 @@ export default async (job: Job<IContinuousSyncJob>) => {
const connector = EmailProviderFactory.createConnector(source);
try {
const jobs = [];
// Phase 1: Collect user emails (async generator — no full buffering of job descriptors).
// We need the total count before creating the session so the counter is correct.
const userEmails: string[] = [];
for await (const user of connector.listAllUsers()) {
if (user.primaryEmail) {
jobs.push({
name: 'process-mailbox',
queueName: 'ingestion',
data: {
ingestionSourceId: source.id,
userEmail: user.primaryEmail,
},
opts: {
removeOnComplete: {
age: 60 * 10, // 10 minutes
},
removeOnFail: {
age: 60 * 30, // 30 minutes
},
timeout: 1000 * 60 * 30, // 30 minutes
},
});
userEmails.push(user.primaryEmail);
}
}
// }
if (jobs.length > 0) {
await flowProducer.add({
name: 'sync-cycle-finished',
queueName: 'ingestion',
data: {
ingestionSourceId,
isInitialImport: false,
},
children: jobs,
opts: {
removeOnComplete: true,
removeOnFail: true,
},
if (userEmails.length === 0) {
logger.info(
{ ingestionSourceId },
'No users found during continuous sync, marking active.'
);
await IngestionService.update(ingestionSourceId, {
status: 'active',
lastSyncFinishedAt: new Date(),
lastSyncStatusMessage: 'Continuous sync complete. No users found.',
});
return;
}
// Phase 2: Create a session BEFORE dispatching any jobs.
const sessionId = await SyncSessionService.create(
ingestionSourceId,
userEmails.length,
false
);
logger.info(
{ ingestionSourceId, userCount: userEmails.length, sessionId },
'Dispatching process-mailbox jobs for continuous sync'
);
// Phase 3: Enqueue individual process-mailbox jobs one at a time.
// No FlowProducer — each job carries the sessionId for DB-based coordination.
for (const userEmail of userEmails) {
await ingestionQueue.add('process-mailbox', {
ingestionSourceId: source.id,
userEmail,
sessionId,
});
}
// The status will be set back to 'active' by the 'sync-cycle-finished' job
// once all the mailboxes have been processed.
logger.info(
{ ingestionSourceId },
{ ingestionSourceId, sessionId },
'Continuous sync job finished dispatching mailbox jobs.'
);
} catch (error) {

View File

@@ -1,8 +1,9 @@
import { Job, FlowChildJob } from 'bullmq';
import { Job } from 'bullmq';
import { IngestionService } from '../../services/IngestionService';
import { IInitialImportJob, IngestionProvider } from '@open-archiver/types';
import { IInitialImportJob, IngestionStatus } from '@open-archiver/types';
import { EmailProviderFactory } from '../../services/EmailProviderFactory';
import { flowProducer } from '../queues';
import { ingestionQueue } from '../queues';
import { SyncSessionService } from '../../services/SyncSessionService';
import { logger } from '../../config/logger';
export default async (job: Job<IInitialImportJob>) => {
@@ -22,66 +23,55 @@ export default async (job: Job<IInitialImportJob>) => {
const connector = EmailProviderFactory.createConnector(source);
// if (connector instanceof GoogleWorkspaceConnector || connector instanceof MicrosoftConnector) {
const jobs: FlowChildJob[] = [];
let userCount = 0;
// Phase 1: Collect user emails from the provider (async generator — no full buffering
// of FlowChildJob objects). Email strings are tiny (~30 bytes each) compared to the
// old FlowChildJob descriptors (~500 bytes each), and we need the count before we can
// create the session.
const userEmails: string[] = [];
for await (const user of connector.listAllUsers()) {
if (user.primaryEmail) {
jobs.push({
name: 'process-mailbox',
queueName: 'ingestion',
data: {
ingestionSourceId,
userEmail: user.primaryEmail,
},
opts: {
removeOnComplete: {
age: 60 * 10, // 10 minutes
},
removeOnFail: {
age: 60 * 30, // 30 minutes
},
attempts: 1,
// failParentOnFailure: true
},
});
userCount++;
userEmails.push(user.primaryEmail);
}
}
if (jobs.length > 0) {
logger.info(
{ ingestionSourceId, userCount },
'Adding sync-cycle-finished job to the queue'
);
await flowProducer.add({
name: 'sync-cycle-finished',
queueName: 'ingestion',
data: {
ingestionSourceId,
userCount,
isInitialImport: true,
},
children: jobs,
opts: {
removeOnComplete: true,
removeOnFail: true,
},
});
} else {
if (userEmails.length === 0) {
const fileBasedIngestions = IngestionService.returnFileBasedIngestions();
const finalStatus = fileBasedIngestions.includes(source.provider)
const finalStatus: IngestionStatus = fileBasedIngestions.includes(source.provider)
? 'imported'
: 'active';
// If there are no users, we can consider the import finished and set to active
await IngestionService.update(ingestionSourceId, {
status: finalStatus,
lastSyncFinishedAt: new Date(),
lastSyncStatusMessage: 'Initial import complete. No users found.',
});
logger.info({ ingestionSourceId }, 'No users found, initial import complete');
return;
}
logger.info({ ingestionSourceId }, 'Finished initial import master job');
// Phase 2: Create a session BEFORE dispatching any jobs to avoid a race condition
// where a process-mailbox job finishes before the session's totalMailboxes is set.
const sessionId = await SyncSessionService.create(
ingestionSourceId,
userEmails.length,
true
);
logger.info(
{ ingestionSourceId, userCount: userEmails.length, sessionId },
'Dispatching process-mailbox jobs for initial import'
);
// Phase 3: Enqueue individual process-mailbox jobs one at a time.
// No FlowProducer, no large atomic Redis write — jobs are enqueued in a loop.
for (const userEmail of userEmails) {
await ingestionQueue.add('process-mailbox', {
ingestionSourceId,
userEmail,
sessionId,
});
}
logger.info({ ingestionSourceId, sessionId }, 'Finished dispatching initial import jobs');
} catch (error) {
logger.error({ err: error, ingestionSourceId }, 'Error in initial import master job');
await IngestionService.update(ingestionSourceId, {

View File

@@ -1,38 +1,29 @@
import { Job } from 'bullmq';
import {
IProcessMailboxJob,
SyncState,
ProcessMailboxError,
PendingEmail,
} from '@open-archiver/types';
import { IProcessMailboxJob, ProcessMailboxError, PendingEmail } from '@open-archiver/types';
import { IngestionService } from '../../services/IngestionService';
import { logger } from '../../config/logger';
import { EmailProviderFactory } from '../../services/EmailProviderFactory';
import { StorageService } from '../../services/StorageService';
import { IndexingService } from '../../services/IndexingService';
import { SearchService } from '../../services/SearchService';
import { DatabaseService } from '../../services/DatabaseService';
import { config } from '../../config';
import { indexingQueue } from '../queues';
import { indexingQueue, ingestionQueue } from '../queues';
import { SyncSessionService } from '../../services/SyncSessionService';
/**
* This processor handles the ingestion of emails for a single user's mailbox.
* If an error occurs during processing (e.g., an API failure),
* it catches the exception and returns a structured error object instead of throwing.
* This prevents a single failed mailbox from halting the entire sync cycle for all users.
* The parent 'sync-cycle-finished' job is responsible for inspecting the results of all
* 'process-mailbox' jobs, aggregating successes, and reporting detailed failures.
* Handles ingestion of emails for a single user's mailbox.
*
* On completion, it reports its result to SyncSessionService using an atomic DB counter.
* If this is the last mailbox job in the session, it dispatches the 'sync-cycle-finished' job.
* This replaces the BullMQ FlowProducer parent/child pattern, avoiding the memory and Redis
* overhead of loading all children's return values at once.
*/
export const processMailboxProcessor = async (job: Job<IProcessMailboxJob, SyncState, string>) => {
const { ingestionSourceId, userEmail } = job.data;
export const processMailboxProcessor = async (job: Job<IProcessMailboxJob>) => {
const { ingestionSourceId, userEmail, sessionId } = job.data;
const BATCH_SIZE: number = config.meili.indexingBatchSize;
let emailBatch: PendingEmail[] = [];
logger.info({ ingestionSourceId, userEmail }, `Processing mailbox for user`);
logger.info({ ingestionSourceId, userEmail, sessionId }, `Processing mailbox for user`);
const searchService = new SearchService();
const storageService = new StorageService();
const databaseService = new DatabaseService();
try {
const source = await IngestionService.findById(ingestionSourceId);
@@ -43,7 +34,7 @@ export const processMailboxProcessor = async (job: Job<IProcessMailboxJob, SyncS
const connector = EmailProviderFactory.createConnector(source);
const ingestionService = new IngestionService();
// Create a callback to check for duplicates without fetching full email content
// Pre-check for duplicates without fetching full email content
const checkDuplicate = async (messageId: string) => {
return await IngestionService.doesEmailExist(messageId, ingestionSourceId);
};
@@ -65,6 +56,12 @@ export const processMailboxProcessor = async (job: Job<IProcessMailboxJob, SyncS
if (emailBatch.length >= BATCH_SIZE) {
await indexingQueue.add('index-email-batch', { emails: emailBatch });
emailBatch = [];
// Heartbeat: a single large mailbox can take hours to process.
// Without this, cleanStaleSessions() would see no activity on the
// session and incorrectly mark it as stale after 30 minutes.
// We piggyback on the existing batch flush cadence — no extra DB
// writes beyond what we'd do anyway.
await SyncSessionService.heartbeat(sessionId);
}
}
}
@@ -77,8 +74,26 @@ export const processMailboxProcessor = async (job: Job<IProcessMailboxJob, SyncS
const newSyncState = connector.getUpdatedSyncState(userEmail);
logger.info({ ingestionSourceId, userEmail }, `Finished processing mailbox for user`);
return newSyncState;
// Report success to the session and check if this is the last job
const { isLast, totalFailed } = await SyncSessionService.recordMailboxResult(
sessionId,
newSyncState
);
if (isLast) {
logger.info(
{ ingestionSourceId, sessionId },
'Last mailbox job completed, dispatching sync-cycle-finished'
);
await ingestionQueue.add('sync-cycle-finished', {
ingestionSourceId,
sessionId,
isInitialImport: false,
});
}
} catch (error) {
// Flush any buffered emails before reporting failure
if (emailBatch.length > 0) {
await indexingQueue.add('index-email-batch', { emails: emailBatch });
emailBatch = [];
@@ -90,6 +105,33 @@ export const processMailboxProcessor = async (job: Job<IProcessMailboxJob, SyncS
error: true,
message: `Failed to process mailbox for ${userEmail}: ${errorMessage}`,
};
return processMailboxError;
// Report failure to the session — this still counts towards the total
try {
const { isLast } = await SyncSessionService.recordMailboxResult(
sessionId,
processMailboxError
);
if (isLast) {
logger.info(
{ ingestionSourceId, sessionId },
'Last mailbox job (with error) completed, dispatching sync-cycle-finished'
);
await ingestionQueue.add('sync-cycle-finished', {
ingestionSourceId,
sessionId,
isInitialImport: false,
});
}
} catch (sessionError) {
logger.error(
{ err: sessionError, sessionId },
'Failed to record mailbox error in sync session'
);
}
// Do not re-throw — a single failed mailbox should not mark the BullMQ job as failed
// and trigger retries that would double-count against the session counter.
}
};

View File

@@ -3,17 +3,36 @@ import { db } from '../../database';
import { ingestionSources } from '../../database/schema';
import { or, eq } from 'drizzle-orm';
import { ingestionQueue } from '../queues';
import { SyncSessionService } from '../../services/SyncSessionService';
import { logger } from '../../config/logger';
export default async (job: Job) => {
console.log('Scheduler running: Looking for active or error ingestion sources to sync.');
// find all sources that have the status of active or error for continuous syncing.
logger.info({}, 'Scheduler running: checking for stale sessions and active sources to sync.');
// Step 1: Clean up any stale sync sessions from previous crashed runs.
// A session is stale when lastActivityAt hasn't been updated in 30 minutes —
// meaning no process-mailbox job has reported back, indicating the worker crashed
// after creating the session but before all jobs were enqueued.
// This sets the associated ingestion source to 'error' so Step 2 picks it up.
try {
await SyncSessionService.cleanStaleSessions();
} catch (error) {
// Log but don't abort — stale session cleanup is best-effort
logger.error({ err: error }, 'Error during stale session cleanup in scheduler');
}
// Step 2: Find all sources with status 'active' or 'error' for continuous syncing.
// Sources previously stuck in 'importing'/'syncing' due to a crash will now appear
// as 'error' (set by cleanStaleSessions above) and will be picked up here for retry.
const sourcesToSync = await db
.select({ id: ingestionSources.id })
.from(ingestionSources)
.where(or(eq(ingestionSources.status, 'active'), eq(ingestionSources.status, 'error')));
logger.info({ count: sourcesToSync.length }, 'Dispatching continuous-sync jobs for sources');
for (const source of sourcesToSync) {
// The status field on the ingestion source is used to prevent duplicate syncs.
// The status field on the ingestion source prevents duplicate concurrent syncs.
await ingestionQueue.add('continuous-sync', { ingestionSourceId: source.id });
}
};

View File

@@ -1,103 +1,74 @@
import { Job } from 'bullmq';
import { IngestionService } from '../../services/IngestionService';
import { SyncSessionService } from '../../services/SyncSessionService';
import { logger } from '../../config/logger';
import {
SyncState,
ProcessMailboxError,
IngestionStatus,
IngestionProvider,
} from '@open-archiver/types';
import { db } from '../../database';
import { ingestionSources } from '../../database/schema';
import { eq } from 'drizzle-orm';
import { deepmerge } from 'deepmerge-ts';
import { IngestionStatus } from '@open-archiver/types';
interface ISyncCycleFinishedJob {
ingestionSourceId: string;
userCount?: number; // Optional, as it's only relevant for the initial import
sessionId: string;
isInitialImport: boolean;
}
/**
* This processor runs after all 'process-mailbox' jobs for a sync cycle have completed.
* It is responsible for aggregating the results and finalizing the sync status.
* It inspects the return values of all child jobs to identify successes and failures.
*
* If any child jobs returned an error object, this processor will:
* 1. Mark the overall ingestion status as 'error'.
* 2. Aggregate the detailed error messages from all failed jobs.
* 3. Save the sync state from any jobs that *did* succeed, preserving partial progress.
*
* If all child jobs succeeded, it marks the ingestion as 'active' and saves the final
* aggregated sync state from all children.
* Finalizes a sync cycle after all process-mailbox jobs have completed.
*
* This processor no longer uses BullMQ's job.getChildrenValues() or deepmerge.
* Instead, it reads the aggregated results from the sync_sessions table in PostgreSQL,
* where each process-mailbox job has already atomically recorded its outcome and
* incrementally merged its SyncState into ingestion_sources.sync_state.
*/
export default async (job: Job<ISyncCycleFinishedJob, any, string>) => {
const { ingestionSourceId, userCount, isInitialImport } = job.data;
export default async (job: Job<ISyncCycleFinishedJob>) => {
const { ingestionSourceId, sessionId, isInitialImport } = job.data;
logger.info(
{ ingestionSourceId, userCount, isInitialImport },
{ ingestionSourceId, sessionId, isInitialImport },
'Sync cycle finished job started'
);
try {
const childrenValues = await job.getChildrenValues<SyncState | ProcessMailboxError>();
const allChildJobs = Object.values(childrenValues);
// if data has error property, it is a failed job
const failedJobs = allChildJobs.filter(
(v) => v && (v as any).error
) as ProcessMailboxError[];
// if data doesn't have error property, it is a successful job with SyncState
const successfulJobs = allChildJobs.filter((v) => !v || !(v as any).error) as SyncState[];
const session = await SyncSessionService.findById(sessionId);
const finalSyncState = deepmerge(
...successfulJobs.filter((s) => s && Object.keys(s).length > 0)
) as SyncState;
const source = await IngestionService.findById(ingestionSourceId);
let status: IngestionStatus = 'active';
let message: string;
const fileBasedIngestions = IngestionService.returnFileBasedIngestions();
const source = await IngestionService.findById(ingestionSourceId);
if (fileBasedIngestions.includes(source.provider)) {
status = 'imported';
}
let message: string;
// Check for a specific rate-limit message from the successful jobs
const rateLimitMessage = successfulJobs.find(
(j) => j.statusMessage && j.statusMessage.includes('rate limit')
)?.statusMessage;
if (failedJobs.length > 0) {
if (session.failedMailboxes > 0) {
status = 'error';
const errorMessages = failedJobs.map((j) => j.message).join('\n');
message = `Sync cycle completed with ${failedJobs.length} error(s):\n${errorMessages}`;
const errorMessages = session.errorMessages.join('\n');
message = `Sync cycle completed with ${session.failedMailboxes} error(s):\n${errorMessages}`;
logger.error(
{ ingestionSourceId, errors: errorMessages },
{ ingestionSourceId, sessionId, errors: errorMessages },
'Sync cycle finished with errors.'
);
} else if (rateLimitMessage) {
message = rateLimitMessage;
logger.warn({ ingestionSourceId, message }, 'Sync cycle paused due to rate limiting.');
} else {
message = 'Continuous sync cycle finished successfully.';
if (isInitialImport) {
message = `Initial import finished for ${userCount} mailboxes.`;
}
logger.info({ ingestionSourceId }, 'Successfully updated status and final sync state.');
message = isInitialImport
? `Initial import finished for ${session.completedMailboxes} mailboxes.`
: 'Continuous sync cycle finished successfully.';
logger.info({ ingestionSourceId, sessionId }, 'Sync cycle finished successfully.');
}
await db
.update(ingestionSources)
.set({
status,
lastSyncFinishedAt: new Date(),
lastSyncStatusMessage: message,
syncState: finalSyncState,
})
.where(eq(ingestionSources.id, ingestionSourceId));
// syncState was already merged incrementally by each process-mailbox job via
// SyncSessionService.recordMailboxResult() — no deepmerge needed here.
await IngestionService.update(ingestionSourceId, {
status,
lastSyncFinishedAt: new Date(),
lastSyncStatusMessage: message,
});
// Clean up the session row
await SyncSessionService.finalize(sessionId);
logger.info({ ingestionSourceId, sessionId, status }, 'Sync cycle finalized');
} catch (error) {
logger.error(
{ err: error, ingestionSourceId },
{ err: error, ingestionSourceId, sessionId },
'An unexpected error occurred while finalizing the sync cycle.'
);
await IngestionService.update(ingestionSourceId, {

View File

@@ -1,8 +1,6 @@
import { Queue, FlowProducer } from 'bullmq';
import { Queue } from 'bullmq';
import { connection } from '../config/redis';
export const flowProducer = new FlowProducer({ connection });
// Default job options
const defaultJobOptions = {
attempts: 5,

View File

@@ -0,0 +1,260 @@
import { db } from '../database';
import { syncSessions, ingestionSources } from '../database/schema';
import { eq, lt, sql } from 'drizzle-orm';
import type { SyncState, ProcessMailboxError } from '@open-archiver/types';
import { logger } from '../config/logger';
export interface SyncSessionRecord {
id: string;
ingestionSourceId: string;
isInitialImport: boolean;
totalMailboxes: number;
completedMailboxes: number;
failedMailboxes: number;
errorMessages: string[];
createdAt: Date;
lastActivityAt: Date;
}
export interface MailboxResultOutcome {
/** True if this was the last mailbox job in the session (should trigger finalization) */
isLast: boolean;
totalCompleted: number;
totalFailed: number;
errorMessages: string[];
}
export class SyncSessionService {
/**
* Creates a new sync session for a given ingestion source and returns its ID.
* Must be called before any process-mailbox jobs are dispatched.
*/
public static async create(
ingestionSourceId: string,
totalMailboxes: number,
isInitialImport: boolean
): Promise<string> {
const [session] = await db
.insert(syncSessions)
.values({
ingestionSourceId,
totalMailboxes,
isInitialImport,
completedMailboxes: 0,
failedMailboxes: 0,
errorMessages: [],
})
.returning({ id: syncSessions.id });
logger.info(
{ sessionId: session.id, ingestionSourceId, totalMailboxes, isInitialImport },
'Sync session created'
);
return session.id;
}
/**
* Atomically records the result of a single process-mailbox job.
* Increments either completedMailboxes or failedMailboxes depending on the result.
* If the result is a successful SyncState, it is merged into the ingestion source's
* syncState column using PostgreSQL's jsonb merge operator.
*
* Returns whether this was the last mailbox job in the session.
*/
public static async recordMailboxResult(
sessionId: string,
result: SyncState | ProcessMailboxError
): Promise<MailboxResultOutcome> {
const isError = (result as ProcessMailboxError).error === true;
// Atomically increment the appropriate counter and append error message if needed.
// The RETURNING clause ensures we get the post-update values to check if this is the last job.
const [updated] = await db
.update(syncSessions)
.set({
completedMailboxes: isError
? syncSessions.completedMailboxes
: sql`${syncSessions.completedMailboxes} + 1`,
failedMailboxes: isError
? sql`${syncSessions.failedMailboxes} + 1`
: syncSessions.failedMailboxes,
errorMessages: isError
? sql`array_append(${syncSessions.errorMessages}, ${(result as ProcessMailboxError).message})`
: syncSessions.errorMessages,
// Touch lastActivityAt on every result so the stale-session detector
// knows this session is still alive, regardless of how long it has been running.
lastActivityAt: new Date(),
})
.where(eq(syncSessions.id, sessionId))
.returning({
completedMailboxes: syncSessions.completedMailboxes,
failedMailboxes: syncSessions.failedMailboxes,
totalMailboxes: syncSessions.totalMailboxes,
errorMessages: syncSessions.errorMessages,
ingestionSourceId: syncSessions.ingestionSourceId,
});
if (!updated) {
throw new Error(`Sync session ${sessionId} not found when recording mailbox result.`);
}
// If the result is a successful SyncState with actual content, merge it into the
// ingestion source's syncState column using PostgreSQL's || jsonb merge operator.
// This is done incrementally per mailbox to avoid the large deepmerge at the end.
if (!isError) {
const syncState = result as SyncState;
if (Object.keys(syncState).length > 0) {
await db
.update(ingestionSources)
.set({
syncState: sql`COALESCE(${ingestionSources.syncState}, '{}'::jsonb) || ${JSON.stringify(syncState)}::jsonb`,
})
.where(eq(ingestionSources.id, updated.ingestionSourceId));
}
}
const totalProcessed = updated.completedMailboxes + updated.failedMailboxes;
const isLast = totalProcessed >= updated.totalMailboxes;
logger.info(
{
sessionId,
completed: updated.completedMailboxes,
failed: updated.failedMailboxes,
total: updated.totalMailboxes,
isLast,
},
'Mailbox result recorded'
);
return {
isLast,
totalCompleted: updated.completedMailboxes,
totalFailed: updated.failedMailboxes,
errorMessages: updated.errorMessages,
};
}
/**
* Fetches a sync session by its ID.
*/
public static async findById(sessionId: string): Promise<SyncSessionRecord> {
const [session] = await db
.select()
.from(syncSessions)
.where(eq(syncSessions.id, sessionId));
if (!session) {
throw new Error(`Sync session ${sessionId} not found.`);
}
return session;
}
/**
* Updates lastActivityAt for the session without changing any counters.
* Should be called periodically during a long-running process-mailbox job
* to prevent cleanStaleSessions() from incorrectly treating an actively
* processing mailbox as stale.
*
*/
public static async heartbeat(sessionId: string): Promise<void> {
try {
console.log('heatbeat, ', sessionId);
await db
.update(syncSessions)
.set({ lastActivityAt: new Date() })
.where(eq(syncSessions.id, sessionId));
} catch (error) {
logger.warn({ err: error, sessionId }, 'Failed to update session heartbeat');
}
}
/**
* Deletes a sync session after finalization to keep the table clean.
*/
public static async finalize(sessionId: string): Promise<void> {
await db.delete(syncSessions).where(eq(syncSessions.id, sessionId));
logger.info({ sessionId }, 'Sync session finalized and deleted');
}
/**
* Finds all sync sessions that are stale and marks the associated ingestion source
* as 'error', then deletes the orphaned session row.
*
* Staleness is determined by lastActivityAt — the timestamp updated every time a
* process-mailbox job reports a result. This correctly handles large imports that run
* for many hours: as long as mailboxes are actively completing, lastActivityAt stays
* fresh and the session is never considered stale.
*
* A session is stale when:
* completedMailboxes + failedMailboxes < totalMailboxes
* AND lastActivityAt < (now - thresholdMs)
*
* Default threshold: 30 minutes of inactivity. This covers the crash scenario where
* the processor died after creating the session but before all process-mailbox jobs
* were enqueued — those jobs will never report back, causing permanent inactivity.
*
* Once cleaned up, the source is set to 'error' so the next scheduler tick will
* re-queue a continuous-sync job.
*/
public static async cleanStaleSessions(
thresholdMs: number = 30 * 60 * 1000 // 30 minutes of inactivity
): Promise<void> {
const cutoffTime = new Date(Date.now() - thresholdMs);
// Find sessions with no recent activity (regardless of how old they are)
const staleSessions = await db
.select()
.from(syncSessions)
.where(lt(syncSessions.lastActivityAt, cutoffTime));
for (const session of staleSessions) {
const totalProcessed = session.completedMailboxes + session.failedMailboxes;
if (totalProcessed >= session.totalMailboxes) {
// Session finished but was never finalized (e.g., sync-cycle-finished job
// was lost) — clean it up silently without touching the source status.
await db.delete(syncSessions).where(eq(syncSessions.id, session.id));
logger.warn(
{ sessionId: session.id, ingestionSourceId: session.ingestionSourceId },
'Cleaned up completed-but-unfinalized stale sync session'
);
continue;
}
// Session is genuinely stuck — no mailbox activity for the threshold period.
const inactiveMinutes = Math.round(
(Date.now() - session.lastActivityAt.getTime()) / 60000
);
logger.warn(
{
sessionId: session.id,
ingestionSourceId: session.ingestionSourceId,
totalMailboxes: session.totalMailboxes,
completedMailboxes: session.completedMailboxes,
failedMailboxes: session.failedMailboxes,
inactiveMinutes,
},
'Stale sync session detected — marking source as error and cleaning up'
);
await db
.update(ingestionSources)
.set({
status: 'error',
lastSyncFinishedAt: new Date(),
lastSyncStatusMessage: `Sync interrupted: no activity for ${inactiveMinutes} minutes. ${session.completedMailboxes} of ${session.totalMailboxes} mailboxes completed. Will retry on next sync cycle.`,
})
.where(eq(ingestionSources.id, session.ingestionSourceId));
await db.delete(syncSessions).where(eq(syncSessions.id, session.id));
logger.info(
{ sessionId: session.id, ingestionSourceId: session.ingestionSourceId },
'Stale sync session cleaned up, source set to error for retry'
);
}
}
}

View File

@@ -25,6 +25,10 @@ const processor = async (job: any) => {
const worker = new Worker('ingestion', processor, {
connection,
// Configurable via INGESTION_WORKER_CONCURRENCY env var. Tune based on available RAM.
concurrency: process.env.INGESTION_WORKER_CONCURRENCY
? parseInt(process.env.INGESTION_WORKER_CONCURRENCY, 10)
: 5,
removeOnComplete: {
count: 100, // keep last 100 jobs
},

View File

@@ -30,11 +30,23 @@ const handleRequest: RequestHandler = async ({ request, params, fetch }) => {
const response = await fetch(proxyRequest);
return response;
} catch (error) {
} catch (error: any) {
console.error('Proxy request failed:', error);
// Handle SvelteKit HttpError (e.g. from request.arrayBuffer() exceeding BODY_SIZE_LIMIT)
// Or other types of errors, formatting them into the standard ApiErrorResponse
const statusCode = error?.status || 500;
const message =
error?.body?.message || error?.message || 'Failed to connect to the backend service.';
return json(
{ message: `Failed to connect to the backend service. ${JSON.stringify(error)}` },
{ status: 500 }
{
status: 'error',
statusCode: statusCode,
message: message,
errors: null,
},
{ status: statusCode }
);
}
};

View File

@@ -149,6 +149,8 @@ export interface IInitialImportJob {
export interface IProcessMailboxJob {
ingestionSourceId: string;
userEmail: string;
/** ID of the SyncSession tracking this sync cycle's progress */
sessionId: string;
}
export interface IPstProcessingJob {