Continuous syncing

This commit is contained in:
Wayne
2025-07-22 01:51:10 +03:00
parent f075cfe32d
commit 3d1feedafb
24 changed files with 1307 additions and 57 deletions

View File

@@ -46,6 +46,17 @@ services:
env_file:
- ./.env
sync-scheduler:
build:
context: ./packages/backend
dockerfile: Dockerfile
command: 'pnpm ts-node-dev --respawn --transpile-only src/jobs/schedulers/sync-scheduler.ts'
depends_on:
- postgres
- redis
env_file:
- ./.env
postgres:
image: postgres:15
ports:

View File

@@ -4,7 +4,7 @@
"scripts": {
"dev": "dotenv -- pnpm --filter \"./packages/*\" --parallel dev",
"build": "pnpm --filter \"./packages/*\" --parallel build",
"start:workers": "dotenv -- concurrently \"pnpm --filter @open-archiver/backend start:ingestion-worker\" \"pnpm --filter @open-archiver/backend start:indexing-worker\""
"start:workers": "dotenv -- concurrently \"pnpm --filter @open-archiver/backend start:ingestion-worker\" \"pnpm --filter @open-archiver/backend start:indexing-worker\" \"pnpm --filter @open-archiver/backend start:sync-scheduler\""
},
"devDependencies": {
"concurrently": "^9.2.0",

View File

@@ -10,6 +10,7 @@
"start": "node dist/index.js",
"start:ingestion-worker": "ts-node-dev --respawn --transpile-only src/workers/ingestion.worker.ts",
"start:indexing-worker": "ts-node-dev --respawn --transpile-only src/workers/indexing.worker.ts",
"start:sync-scheduler": "ts-node-dev --respawn --transpile-only src/jobs/schedulers/sync-scheduler.ts",
"db:generate": "drizzle-kit generate --config=drizzle.config.ts",
"db:push": "drizzle-kit push --config=drizzle.config.ts",
"db:migrate": "ts-node-dev src/database/migrate.ts"

View File

@@ -0,0 +1 @@
ALTER TABLE "ingestion_sources" ADD COLUMN "sync_state" jsonb;

View File

@@ -0,0 +1,826 @@
{
"id": "bdc9d789-04c7-4d9f-b4ed-00366b0d3603",
"prevId": "4fa75649-1e65-4c61-8cc5-95add8269925",
"version": "7",
"dialect": "postgresql",
"tables": {
"public.archived_emails": {
"name": "archived_emails",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "uuid",
"primaryKey": true,
"notNull": true,
"default": "gen_random_uuid()"
},
"ingestion_source_id": {
"name": "ingestion_source_id",
"type": "uuid",
"primaryKey": false,
"notNull": true
},
"message_id_header": {
"name": "message_id_header",
"type": "text",
"primaryKey": false,
"notNull": false
},
"sent_at": {
"name": "sent_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true
},
"subject": {
"name": "subject",
"type": "text",
"primaryKey": false,
"notNull": false
},
"sender_name": {
"name": "sender_name",
"type": "text",
"primaryKey": false,
"notNull": false
},
"sender_email": {
"name": "sender_email",
"type": "text",
"primaryKey": false,
"notNull": true
},
"recipients": {
"name": "recipients",
"type": "jsonb",
"primaryKey": false,
"notNull": false
},
"storage_path": {
"name": "storage_path",
"type": "text",
"primaryKey": false,
"notNull": true
},
"storage_hash_sha256": {
"name": "storage_hash_sha256",
"type": "text",
"primaryKey": false,
"notNull": true
},
"size_bytes": {
"name": "size_bytes",
"type": "bigint",
"primaryKey": false,
"notNull": true
},
"is_indexed": {
"name": "is_indexed",
"type": "boolean",
"primaryKey": false,
"notNull": true,
"default": false
},
"has_attachments": {
"name": "has_attachments",
"type": "boolean",
"primaryKey": false,
"notNull": true,
"default": false
},
"is_on_legal_hold": {
"name": "is_on_legal_hold",
"type": "boolean",
"primaryKey": false,
"notNull": true,
"default": false
},
"archived_at": {
"name": "archived_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true,
"default": "now()"
}
},
"indexes": {},
"foreignKeys": {
"archived_emails_ingestion_source_id_ingestion_sources_id_fk": {
"name": "archived_emails_ingestion_source_id_ingestion_sources_id_fk",
"tableFrom": "archived_emails",
"tableTo": "ingestion_sources",
"columnsFrom": [
"ingestion_source_id"
],
"columnsTo": [
"id"
],
"onDelete": "cascade",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.attachments": {
"name": "attachments",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "uuid",
"primaryKey": true,
"notNull": true,
"default": "gen_random_uuid()"
},
"filename": {
"name": "filename",
"type": "text",
"primaryKey": false,
"notNull": true
},
"mime_type": {
"name": "mime_type",
"type": "text",
"primaryKey": false,
"notNull": false
},
"size_bytes": {
"name": "size_bytes",
"type": "bigint",
"primaryKey": false,
"notNull": true
},
"content_hash_sha256": {
"name": "content_hash_sha256",
"type": "text",
"primaryKey": false,
"notNull": true
},
"storage_path": {
"name": "storage_path",
"type": "text",
"primaryKey": false,
"notNull": true
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {
"attachments_content_hash_sha256_unique": {
"name": "attachments_content_hash_sha256_unique",
"nullsNotDistinct": false,
"columns": [
"content_hash_sha256"
]
}
},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.email_attachments": {
"name": "email_attachments",
"schema": "",
"columns": {
"email_id": {
"name": "email_id",
"type": "uuid",
"primaryKey": false,
"notNull": true
},
"attachment_id": {
"name": "attachment_id",
"type": "uuid",
"primaryKey": false,
"notNull": true
}
},
"indexes": {},
"foreignKeys": {
"email_attachments_email_id_archived_emails_id_fk": {
"name": "email_attachments_email_id_archived_emails_id_fk",
"tableFrom": "email_attachments",
"tableTo": "archived_emails",
"columnsFrom": [
"email_id"
],
"columnsTo": [
"id"
],
"onDelete": "cascade",
"onUpdate": "no action"
},
"email_attachments_attachment_id_attachments_id_fk": {
"name": "email_attachments_attachment_id_attachments_id_fk",
"tableFrom": "email_attachments",
"tableTo": "attachments",
"columnsFrom": [
"attachment_id"
],
"columnsTo": [
"id"
],
"onDelete": "restrict",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {
"email_attachments_email_id_attachment_id_pk": {
"name": "email_attachments_email_id_attachment_id_pk",
"columns": [
"email_id",
"attachment_id"
]
}
},
"uniqueConstraints": {},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.audit_logs": {
"name": "audit_logs",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "bigserial",
"primaryKey": true,
"notNull": true
},
"timestamp": {
"name": "timestamp",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true,
"default": "now()"
},
"actor_identifier": {
"name": "actor_identifier",
"type": "text",
"primaryKey": false,
"notNull": true
},
"action": {
"name": "action",
"type": "text",
"primaryKey": false,
"notNull": true
},
"target_type": {
"name": "target_type",
"type": "text",
"primaryKey": false,
"notNull": false
},
"target_id": {
"name": "target_id",
"type": "text",
"primaryKey": false,
"notNull": false
},
"details": {
"name": "details",
"type": "jsonb",
"primaryKey": false,
"notNull": false
},
"is_tamper_evident": {
"name": "is_tamper_evident",
"type": "boolean",
"primaryKey": false,
"notNull": false,
"default": false
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.ediscovery_cases": {
"name": "ediscovery_cases",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "uuid",
"primaryKey": true,
"notNull": true,
"default": "gen_random_uuid()"
},
"name": {
"name": "name",
"type": "text",
"primaryKey": false,
"notNull": true
},
"description": {
"name": "description",
"type": "text",
"primaryKey": false,
"notNull": false
},
"status": {
"name": "status",
"type": "text",
"primaryKey": false,
"notNull": true,
"default": "'open'"
},
"created_by_identifier": {
"name": "created_by_identifier",
"type": "text",
"primaryKey": false,
"notNull": true
},
"created_at": {
"name": "created_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true,
"default": "now()"
},
"updated_at": {
"name": "updated_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true,
"default": "now()"
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {
"ediscovery_cases_name_unique": {
"name": "ediscovery_cases_name_unique",
"nullsNotDistinct": false,
"columns": [
"name"
]
}
},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.export_jobs": {
"name": "export_jobs",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "uuid",
"primaryKey": true,
"notNull": true,
"default": "gen_random_uuid()"
},
"case_id": {
"name": "case_id",
"type": "uuid",
"primaryKey": false,
"notNull": false
},
"format": {
"name": "format",
"type": "text",
"primaryKey": false,
"notNull": true
},
"status": {
"name": "status",
"type": "text",
"primaryKey": false,
"notNull": true,
"default": "'pending'"
},
"query": {
"name": "query",
"type": "jsonb",
"primaryKey": false,
"notNull": true
},
"file_path": {
"name": "file_path",
"type": "text",
"primaryKey": false,
"notNull": false
},
"created_by_identifier": {
"name": "created_by_identifier",
"type": "text",
"primaryKey": false,
"notNull": true
},
"created_at": {
"name": "created_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true,
"default": "now()"
},
"completed_at": {
"name": "completed_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": false
}
},
"indexes": {},
"foreignKeys": {
"export_jobs_case_id_ediscovery_cases_id_fk": {
"name": "export_jobs_case_id_ediscovery_cases_id_fk",
"tableFrom": "export_jobs",
"tableTo": "ediscovery_cases",
"columnsFrom": [
"case_id"
],
"columnsTo": [
"id"
],
"onDelete": "set null",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.legal_holds": {
"name": "legal_holds",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "uuid",
"primaryKey": true,
"notNull": true,
"default": "gen_random_uuid()"
},
"case_id": {
"name": "case_id",
"type": "uuid",
"primaryKey": false,
"notNull": true
},
"custodian_id": {
"name": "custodian_id",
"type": "uuid",
"primaryKey": false,
"notNull": false
},
"hold_criteria": {
"name": "hold_criteria",
"type": "jsonb",
"primaryKey": false,
"notNull": false
},
"reason": {
"name": "reason",
"type": "text",
"primaryKey": false,
"notNull": false
},
"applied_by_identifier": {
"name": "applied_by_identifier",
"type": "text",
"primaryKey": false,
"notNull": true
},
"applied_at": {
"name": "applied_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true,
"default": "now()"
},
"removed_at": {
"name": "removed_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": false
}
},
"indexes": {},
"foreignKeys": {
"legal_holds_case_id_ediscovery_cases_id_fk": {
"name": "legal_holds_case_id_ediscovery_cases_id_fk",
"tableFrom": "legal_holds",
"tableTo": "ediscovery_cases",
"columnsFrom": [
"case_id"
],
"columnsTo": [
"id"
],
"onDelete": "cascade",
"onUpdate": "no action"
},
"legal_holds_custodian_id_custodians_id_fk": {
"name": "legal_holds_custodian_id_custodians_id_fk",
"tableFrom": "legal_holds",
"tableTo": "custodians",
"columnsFrom": [
"custodian_id"
],
"columnsTo": [
"id"
],
"onDelete": "cascade",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.retention_policies": {
"name": "retention_policies",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "uuid",
"primaryKey": true,
"notNull": true,
"default": "gen_random_uuid()"
},
"name": {
"name": "name",
"type": "text",
"primaryKey": false,
"notNull": true
},
"description": {
"name": "description",
"type": "text",
"primaryKey": false,
"notNull": false
},
"priority": {
"name": "priority",
"type": "integer",
"primaryKey": false,
"notNull": true
},
"retention_period_days": {
"name": "retention_period_days",
"type": "integer",
"primaryKey": false,
"notNull": true
},
"action_on_expiry": {
"name": "action_on_expiry",
"type": "retention_action",
"typeSchema": "public",
"primaryKey": false,
"notNull": true
},
"is_enabled": {
"name": "is_enabled",
"type": "boolean",
"primaryKey": false,
"notNull": true,
"default": true
},
"conditions": {
"name": "conditions",
"type": "jsonb",
"primaryKey": false,
"notNull": false
},
"created_at": {
"name": "created_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true,
"default": "now()"
},
"updated_at": {
"name": "updated_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true,
"default": "now()"
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {
"retention_policies_name_unique": {
"name": "retention_policies_name_unique",
"nullsNotDistinct": false,
"columns": [
"name"
]
}
},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.custodians": {
"name": "custodians",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "uuid",
"primaryKey": true,
"notNull": true,
"default": "gen_random_uuid()"
},
"email": {
"name": "email",
"type": "text",
"primaryKey": false,
"notNull": true
},
"display_name": {
"name": "display_name",
"type": "text",
"primaryKey": false,
"notNull": false
},
"source_type": {
"name": "source_type",
"type": "ingestion_provider",
"typeSchema": "public",
"primaryKey": false,
"notNull": true
},
"created_at": {
"name": "created_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true,
"default": "now()"
},
"updated_at": {
"name": "updated_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true,
"default": "now()"
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {
"custodians_email_unique": {
"name": "custodians_email_unique",
"nullsNotDistinct": false,
"columns": [
"email"
]
}
},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.ingestion_sources": {
"name": "ingestion_sources",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "uuid",
"primaryKey": true,
"notNull": true,
"default": "gen_random_uuid()"
},
"name": {
"name": "name",
"type": "text",
"primaryKey": false,
"notNull": true
},
"provider": {
"name": "provider",
"type": "ingestion_provider",
"typeSchema": "public",
"primaryKey": false,
"notNull": true
},
"credentials": {
"name": "credentials",
"type": "jsonb",
"primaryKey": false,
"notNull": false
},
"status": {
"name": "status",
"type": "ingestion_status",
"typeSchema": "public",
"primaryKey": false,
"notNull": true,
"default": "'pending_auth'"
},
"last_sync_started_at": {
"name": "last_sync_started_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": false
},
"last_sync_finished_at": {
"name": "last_sync_finished_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": false
},
"last_sync_status_message": {
"name": "last_sync_status_message",
"type": "text",
"primaryKey": false,
"notNull": false
},
"sync_state": {
"name": "sync_state",
"type": "jsonb",
"primaryKey": false,
"notNull": false
},
"created_at": {
"name": "created_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true,
"default": "now()"
},
"updated_at": {
"name": "updated_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true,
"default": "now()"
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
}
},
"enums": {
"public.retention_action": {
"name": "retention_action",
"schema": "public",
"values": [
"delete_permanently",
"notify_admin"
]
},
"public.ingestion_provider": {
"name": "ingestion_provider",
"schema": "public",
"values": [
"google_workspace",
"microsoft_365",
"generic_imap"
]
},
"public.ingestion_status": {
"name": "ingestion_status",
"schema": "public",
"values": [
"active",
"paused",
"error",
"pending_auth",
"syncing",
"importing",
"auth_success"
]
}
},
"schemas": {},
"sequences": {},
"roles": {},
"policies": {},
"views": {},
"_meta": {
"columns": {},
"schemas": {},
"tables": {}
}
}

View File

@@ -43,6 +43,13 @@
"when": 1752606327253,
"tag": "0005_chunky_sue_storm",
"breakpoints": true
},
{
"idx": 6,
"version": "7",
"when": 1753112018514,
"tag": "0006_majestic_caretaker",
"breakpoints": true
}
]
}

View File

@@ -25,6 +25,7 @@ export const ingestionSources = pgTable('ingestion_sources', {
lastSyncStartedAt: timestamp('last_sync_started_at', { withTimezone: true }),
lastSyncFinishedAt: timestamp('last_sync_finished_at', { withTimezone: true }),
lastSyncStatusMessage: text('last_sync_status_message'),
syncState: jsonb('sync_state'),
createdAt: timestamp('created_at', { withTimezone: true }).notNull().defaultNow(),
updatedAt: timestamp('updated_at', { withTimezone: true }).notNull().defaultNow()
});

View File

@@ -1,11 +1,78 @@
import { Job } from 'bullmq';
import { IngestionService } from '../../services/IngestionService';
import { IInitialImportJob } from '@open-archiver/types';
import { IContinuousSyncJob } from '@open-archiver/types';
import { EmailProviderFactory } from '../../services/EmailProviderFactory';
import { flowProducer } from '../queues';
import { logger } from '../../config/logger';
const ingestionService = new IngestionService();
export default async (job: Job<IContinuousSyncJob>) => {
const { ingestionSourceId } = job.data;
logger.info({ ingestionSourceId }, 'Starting continuous sync job.');
export default async (job: Job<IInitialImportJob>) => {
console.log(`Processing continuous sync for ingestion source: ${job.data.ingestionSourceId}`);
// This would be similar to performBulkImport, but would likely use the `since` parameter.
// For now, we'll just log a message.
const source = await IngestionService.findById(ingestionSourceId);
if (!source || source.status !== 'active') {
logger.warn({ ingestionSourceId, status: source?.status }, 'Skipping continuous sync for non-active source.');
return;
}
await IngestionService.update(ingestionSourceId, {
status: 'syncing',
lastSyncStartedAt: new Date(),
});
const connector = EmailProviderFactory.createConnector(source);
try {
const jobs = [];
if (!connector.listAllUsers) {
// This is for single-mailbox providers like Generic IMAP
jobs.push({
name: 'process-mailbox',
queueName: 'ingestion',
data: {
ingestionSourceId: source.id,
userEmail: 'default' // A placeholder, as it's not needed for IMAP
}
});
} else {
// For multi-mailbox providers like Google Workspace and M365
for await (const user of connector.listAllUsers()) {
if (user.primaryEmail) {
jobs.push({
name: 'process-mailbox',
queueName: 'ingestion',
data: {
ingestionSourceId: source.id,
userEmail: user.primaryEmail,
}
});
}
}
}
if (jobs.length > 0) {
await flowProducer.add({
name: 'sync-cycle-finished',
queueName: 'ingestion',
data: {
ingestionSourceId,
isInitialImport: false
},
children: jobs
});
}
// The status will be set back to 'active' by the 'sync-cycle-finished' job
// once all the mailboxes have been processed.
logger.info({ ingestionSourceId }, 'Continuous sync job finished dispatching mailbox jobs.');
} catch (error) {
logger.error({ err: error, ingestionSourceId }, 'Continuous sync job failed.');
await IngestionService.update(ingestionSourceId, {
status: 'error',
lastSyncFinishedAt: new Date(),
lastSyncStatusMessage: error instanceof Error ? error.message : 'An unknown error occurred during sync.',
});
throw error;
}
};

View File

@@ -3,7 +3,7 @@ import { IngestionService } from '../../services/IngestionService';
import { IInitialImportJob } from '@open-archiver/types';
import { EmailProviderFactory } from '../../services/EmailProviderFactory';
import { GoogleWorkspaceConnector } from '../../services/ingestion-connectors/GoogleWorkspaceConnector';
import { ingestionQueue } from '../queues';
import { flowProducer, ingestionQueue } from '../queues';
import { logger } from '../../config/logger';
export default async (job: Job<IInitialImportJob>) => {
@@ -16,20 +16,49 @@ export default async (job: Job<IInitialImportJob>) => {
throw new Error(`Ingestion source with ID ${ingestionSourceId} not found`);
}
await IngestionService.update(ingestionSourceId, {
status: 'importing',
lastSyncStatusMessage: 'Starting initial import...'
});
const connector = EmailProviderFactory.createConnector(source);
if (connector instanceof GoogleWorkspaceConnector) {
const jobs = [];
let userCount = 0;
for await (const user of connector.listAllUsers()) {
if (user.primaryEmail) {
await ingestionQueue.add('process-mailbox', {
ingestionSourceId,
userEmail: user.primaryEmail
jobs.push({
name: 'process-mailbox',
queueName: 'ingestion',
data: {
ingestionSourceId,
userEmail: user.primaryEmail
}
});
userCount++;
}
}
logger.info({ ingestionSourceId, userCount }, `Enqueued mailbox processing jobs for all users`);
if (jobs.length > 0) {
await flowProducer.add({
name: 'sync-cycle-finished',
queueName: 'ingestion',
data: {
ingestionSourceId,
userCount,
isInitialImport: true
},
children: jobs
});
} else {
// If there are no users, we can consider the import finished and set to active
await IngestionService.update(ingestionSourceId, {
status: 'active',
lastSyncFinishedAt: new Date(),
lastSyncStatusMessage: 'Initial import complete. No users found.'
});
}
} else {
// For other providers, we might trigger a simpler bulk import directly
await new IngestionService().performBulkImport(job.data);
@@ -38,6 +67,10 @@ export default async (job: Job<IInitialImportJob>) => {
logger.info({ ingestionSourceId }, 'Finished initial import master job');
} catch (error) {
logger.error({ err: error, ingestionSourceId }, 'Error in initial import master job');
await IngestionService.update(ingestionSourceId, {
status: 'error',
lastSyncStatusMessage: `Initial import failed: ${error instanceof Error ? error.message : 'Unknown error'}`
});
throw error;
}
};

View File

@@ -5,6 +5,11 @@ import { logger } from '../../config/logger';
import { EmailProviderFactory } from '../../services/EmailProviderFactory';
import { StorageService } from '../../services/StorageService';
import { IngestionSource, SyncState } from '@open-archiver/types';
import { db } from '../../database';
import { ingestionSources } from '../../database/schema';
import { eq } from 'drizzle-orm';
export const processMailboxProcessor = async (job: Job<IProcessMailboxJob, any, string>) => {
const { ingestionSourceId, userEmail } = job.data;
@@ -20,9 +25,43 @@ export const processMailboxProcessor = async (job: Job<IProcessMailboxJob, any,
const ingestionService = new IngestionService();
const storageService = new StorageService();
for await (const email of connector.fetchEmails(userEmail)) {
// Pass the sync state for the entire source, the connector will handle per-user logic if necessary
for await (const email of connector.fetchEmails(userEmail, source.syncState)) {
await ingestionService.processEmail(email, source, storageService);
}
const newSyncState = connector.getUpdatedSyncState(userEmail);
// console.log('new sync state: ', newSyncState);
// Atomically update the syncState JSONB field
if (Object.keys(newSyncState).length > 0) {
const currentSource = (await db
.select({ syncState: ingestionSources.syncState })
.from(ingestionSources)
.where(eq(ingestionSources.id, ingestionSourceId))) as IngestionSource[];
const currentSyncState = currentSource[0]?.syncState || {};
const mergedSyncState: SyncState = { ...currentSyncState };
if (newSyncState.google) {
mergedSyncState.google = { ...mergedSyncState.google, ...newSyncState.google };
}
if (newSyncState.microsoft) {
mergedSyncState.microsoft = { ...mergedSyncState.microsoft, ...newSyncState.microsoft };
}
if (newSyncState.imap) {
mergedSyncState.imap = newSyncState.imap;
}
await db
.update(ingestionSources)
.set({
syncState: mergedSyncState,
updatedAt: new Date()
})
.where(eq(ingestionSources.id, ingestionSourceId));
}
logger.info({ ingestionSourceId, userEmail }, `Finished processing mailbox for user`);
} catch (error) {
logger.error({ err: error, ingestionSourceId, userEmail }, 'Error processing mailbox');

View File

@@ -0,0 +1,20 @@
import { Job } from 'bullmq';
import { db } from '../../database';
import { ingestionSources } from '../../database/schema';
import { eq } from 'drizzle-orm';
import { ingestionQueue } from '../queues';
export default async (job: Job) => {
console.log(
'Scheduler running: Looking for active ingestion sources to sync.'
);
const activeSources = await db
.select({ id: ingestionSources.id })
.from(ingestionSources)
.where(eq(ingestionSources.status, 'active'));
for (const source of activeSources) {
// The status field on the ingestion source is used to prevent duplicate syncs.
await ingestionQueue.add('continuous-sync', { ingestionSourceId: source.id });
}
};

View File

@@ -0,0 +1,32 @@
import { Job } from 'bullmq';
import { IngestionService } from '../../services/IngestionService';
import { logger } from '../../config/logger';
interface ISyncCycleFinishedJob {
ingestionSourceId: string;
userCount?: number; // Optional, as it's only relevant for the initial import
isInitialImport: boolean;
}
export default async (job: Job<ISyncCycleFinishedJob>) => {
const { ingestionSourceId, userCount, isInitialImport } = job.data;
logger.info({ ingestionSourceId }, 'Sync cycle finished, updating status to active.');
try {
let message = 'Continuous sync cycle finished successfully.';
if (isInitialImport) {
message = `Initial import finished for ${userCount} mailboxes.`;
}
await IngestionService.update(ingestionSourceId, {
status: 'active',
lastSyncFinishedAt: new Date(),
lastSyncStatusMessage: message
});
logger.info({ ingestionSourceId }, 'Successfully updated status to active.');
} catch (error) {
logger.error({ err: error, ingestionSourceId }, 'Failed to update status to active after sync cycle.');
// Even if this fails, we don't want to fail the job itself,
// as the import is technically complete. An admin might need to intervene.
}
};

View File

@@ -1,6 +1,8 @@
import { Queue } from 'bullmq';
import { Queue, FlowProducer } from 'bullmq';
import { connection } from '../config/redis';
export const flowProducer = new FlowProducer({ connection });
// Default job options
const defaultJobOptions = {
attempts: 5,

View File

@@ -0,0 +1,18 @@
import { ingestionQueue } from '../queues';
const scheduleContinuousSync = async () => {
// This job will run every 15 minutes
await ingestionQueue.add(
'schedule-continuous-sync',
{},
{
repeat: {
pattern: '* * * * *', // Every 15 minutes
},
}
);
};
scheduleContinuousSync().then(() => {
console.log('Continuous sync scheduler started.');
});

View File

@@ -3,7 +3,8 @@ import type {
GoogleWorkspaceCredentials,
Microsoft365Credentials,
GenericImapCredentials,
EmailObject
EmailObject,
SyncState
} from '@open-archiver/types';
import { GoogleWorkspaceConnector } from './ingestion-connectors/GoogleWorkspaceConnector';
import { MicrosoftConnector } from './ingestion-connectors/MicrosoftConnector';
@@ -12,7 +13,8 @@ import { ImapConnector } from './ingestion-connectors/ImapConnector';
// Define a common interface for all connectors
export interface IEmailConnector {
testConnection(): Promise<boolean>;
fetchEmails(userEmail?: string, since?: Date): AsyncGenerator<EmailObject>;
fetchEmails(userEmail: string, syncState?: SyncState | null): AsyncGenerator<EmailObject>;
getUpdatedSyncState(userEmail?: string): SyncState;
listAllUsers?(): AsyncGenerator<any>;
}

View File

@@ -73,7 +73,7 @@ export class IndexingService {
/**
* Indexes an email object directly, creates a search document, and indexes it.
*/
public async indexByEmail(email: EmailObject, ingestionSourceId: string): Promise<void> {
public async indexByEmail(email: EmailObject, ingestionSourceId: string, archivedEmailId: string): Promise<void> {
const attachments: AttachmentsType = [];
if (email.attachments && email.attachments.length > 0) {
for (const attachment of email.attachments) {
@@ -84,7 +84,7 @@ export class IndexingService {
});
}
}
const document = await this.createEmailDocumentFromRaw(email, attachments, ingestionSourceId);
const document = await this.createEmailDocumentFromRaw(email, attachments, ingestionSourceId, archivedEmailId);
await this.searchService.addDocuments('emails', [document], 'id');
}
@@ -94,7 +94,8 @@ export class IndexingService {
private async createEmailDocumentFromRaw(
email: EmailObject,
attachments: AttachmentsType,
ingestionSourceId: string
ingestionSourceId: string,
archivedEmailId: string
): Promise<EmailDocument> {
const extractedAttachments = [];
for (const attachment of attachments) {
@@ -116,7 +117,7 @@ export class IndexingService {
}
}
return {
id: email.id,
id: archivedEmailId,
from: email.from[0]?.address,
to: email.to.map((i) => i.address) || [],
cc: email.cc?.map((i) => i.address) || [],

View File

@@ -25,7 +25,7 @@ export class IngestionService {
const decryptedCredentials = CryptoService.decryptObject<IngestionCredentials>(
source.credentials as string
);
return { ...source, credentials: decryptedCredentials };
return { ...source, credentials: decryptedCredentials } as IngestionSource;
}
public static async create(dto: CreateIngestionSourceDto): Promise<IngestionSource> {
@@ -139,19 +139,34 @@ export class IngestionService {
});
const connector = EmailProviderFactory.createConnector(source);
const storage = new StorageService();
try {
for await (const email of connector.fetchEmails()) {
await this.processEmail(email, source, storage);
if (connector.listAllUsers) {
// For multi-mailbox providers, dispatch a job for each user
for await (const user of connector.listAllUsers()) {
const userEmail = (user as any).primaryEmail;
if (userEmail) {
await ingestionQueue.add('process-mailbox', {
ingestionSourceId: source.id,
userEmail: userEmail,
});
}
}
} else {
// For single-mailbox providers, dispatch a single job
await ingestionQueue.add('process-mailbox', {
ingestionSourceId: source.id,
userEmail: 'default' // Placeholder, as it's not needed for IMAP
});
}
await IngestionService.update(ingestionSourceId, {
status: 'active',
lastSyncFinishedAt: new Date(),
lastSyncStatusMessage: 'Successfully completed bulk import.'
lastSyncStatusMessage: 'Successfully initiated bulk import for all mailboxes.'
});
console.log(`Bulk import finished for source: ${source.name} (${source.id})`);
console.log(`Bulk import job dispatch finished for source: ${source.name} (${source.id})`);
} catch (error) {
console.error(`Bulk import failed for source: ${source.name} (${source.id})`, error);
await IngestionService.update(ingestionSourceId, {
@@ -236,7 +251,7 @@ export class IngestionService {
const storageService = new StorageService();
const databaseService = new DatabaseService();
const indexingService = new IndexingService(databaseService, searchService, storageService);
await indexingService.indexByEmail(email, source.id);
await indexingService.indexByEmail(email, source.id, archivedEmail.id);
} catch (error) {
logger.error({
message: `Failed to process email ${email.id} for source ${source.id}`,

View File

@@ -3,7 +3,8 @@ import type { admin_directory_v1, gmail_v1, Common } from 'googleapis';
import type {
GoogleWorkspaceCredentials,
EmailObject,
EmailAddress
EmailAddress,
SyncState
} from '@open-archiver/types';
import type { IEmailConnector } from '../EmailProviderFactory';
import { logger } from '../../config/logger';
@@ -16,6 +17,7 @@ import { simpleParser, ParsedMail, Attachment, AddressObject } from 'mailparser'
export class GoogleWorkspaceConnector implements IEmailConnector {
private credentials: GoogleWorkspaceCredentials;
private serviceAccountCreds: { client_email: string; private_key: string; };
private newHistoryId: string | undefined;
constructor(credentials: GoogleWorkspaceCredentials) {
this.credentials = credentials;
@@ -113,32 +115,105 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
}
/**
* Fetches emails for a single user, starting from a specific point in time.
* Fetches emails for a single user, starting from a specific history ID.
* This is ideal for continuous synchronization jobs.
* @param userEmail The email of the user whose mailbox will be read.
* @param since Optional date to fetch emails newer than this timestamp.
* @param syncState Optional state containing the startHistoryId.
* @returns An async generator that yields each raw email object.
*/
public async *fetchEmails(
userEmail: string,
since?: Date
syncState?: SyncState | null
): AsyncGenerator<EmailObject> {
const authClient = this.getAuthClient(userEmail, [
'https://www.googleapis.com/auth/gmail.readonly'
]);
const gmail = google.gmail({ version: 'v1', auth: authClient });
let pageToken: string | undefined = undefined;
const query = since ? `after:${Math.floor(since.getTime() / 1000)}` : '';
const startHistoryId = syncState?.google?.[userEmail]?.historyId;
// If no sync state is provided for this user, this is an initial import. Get all messages.
if (!startHistoryId) {
yield* this.fetchAllMessagesForUser(gmail, userEmail);
return;
}
this.newHistoryId = startHistoryId;
do {
const listResponse: Common.GaxiosResponseWithHTTP2<gmail_v1.Schema$ListMessagesResponse> =
await gmail.users.messages.list({
userId: 'me', // 'me' refers to the impersonated user
q: query,
pageToken: pageToken
});
const historyResponse: Common.GaxiosResponseWithHTTP2<gmail_v1.Schema$ListHistoryResponse> = await gmail.users.history.list({
userId: 'me',
startHistoryId: this.newHistoryId,
pageToken: pageToken,
historyTypes: ['messageAdded']
});
const histories = historyResponse.data.history;
if (!histories || histories.length === 0) {
return;
}
for (const historyRecord of histories) {
if (historyRecord.messagesAdded) {
for (const messageAdded of historyRecord.messagesAdded) {
if (messageAdded.message?.id) {
const msgResponse = await gmail.users.messages.get({
userId: 'me',
id: messageAdded.message.id,
format: 'RAW'
});
if (msgResponse.data.raw) {
const rawEmail = Buffer.from(msgResponse.data.raw, 'base64url');
const parsedEmail: ParsedMail = await simpleParser(rawEmail);
const attachments = parsedEmail.attachments.map((attachment: Attachment) => ({
filename: attachment.filename || 'untitled',
contentType: attachment.contentType,
size: attachment.size,
content: attachment.content as Buffer
}));
const mapAddresses = (addresses: AddressObject | AddressObject[] | undefined): EmailAddress[] => {
if (!addresses) return [];
const addressArray = Array.isArray(addresses) ? addresses : [addresses];
return addressArray.flatMap(a => a.value.map(v => ({ name: v.name, address: v.address || '' })));
};
yield {
id: msgResponse.data.id!,
userEmail: userEmail,
eml: rawEmail,
from: mapAddresses(parsedEmail.from),
to: mapAddresses(parsedEmail.to),
cc: mapAddresses(parsedEmail.cc),
bcc: mapAddresses(parsedEmail.bcc),
subject: parsedEmail.subject || '',
body: parsedEmail.text || '',
html: parsedEmail.html || '',
headers: parsedEmail.headers as any,
attachments,
receivedAt: parsedEmail.date || new Date(),
};
}
}
}
}
}
pageToken = historyResponse.data.nextPageToken ?? undefined;
if (historyResponse.data.historyId) {
this.newHistoryId = historyResponse.data.historyId;
}
} while (pageToken);
}
private async *fetchAllMessagesForUser(gmail: gmail_v1.Gmail, userEmail: string): AsyncGenerator<EmailObject> {
let pageToken: string | undefined = undefined;
do {
const listResponse: Common.GaxiosResponseWithHTTP2<gmail_v1.Schema$ListMessagesResponse> = await gmail.users.messages.list({
userId: 'me',
pageToken: pageToken
});
const messages = listResponse.data.messages;
if (!messages || messages.length === 0) {
@@ -150,27 +225,23 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
const msgResponse = await gmail.users.messages.get({
userId: 'me',
id: message.id,
format: 'RAW' // We want the full, raw .eml content
format: 'RAW'
});
if (msgResponse.data.raw) {
// The raw data is base64url encoded, so we need to decode it.
const rawEmail = Buffer.from(msgResponse.data.raw, 'base64url');
const parsedEmail: ParsedMail = await simpleParser(rawEmail);
const attachments = parsedEmail.attachments.map((attachment: Attachment) => ({
filename: attachment.filename || 'untitled',
contentType: attachment.contentType,
size: attachment.size,
content: attachment.content as Buffer
}));
const mapAddresses = (addresses: AddressObject | AddressObject[] | undefined): EmailAddress[] => {
if (!addresses) return [];
const addressArray = Array.isArray(addresses) ? addresses : [addresses];
return addressArray.flatMap(a => a.value.map(v => ({ name: v.name, address: v.address || '' })));
};
yield {
id: msgResponse.data.id!,
userEmail: userEmail,
@@ -189,8 +260,26 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
}
}
}
pageToken = listResponse.data.nextPageToken ?? undefined;
} while (pageToken);
// After fetching all messages, get the latest history ID to use as the starting point for the next sync.
const profileResponse = await gmail.users.getProfile({ userId: 'me' });
if (profileResponse.data.historyId) {
this.newHistoryId = profileResponse.data.historyId;
}
}
public getUpdatedSyncState(userEmail: string): SyncState {
if (!this.newHistoryId) {
return {};
}
return {
google: {
[userEmail]: {
historyId: this.newHistoryId
}
}
};
}
}

View File

@@ -1,10 +1,11 @@
import type { GenericImapCredentials, EmailObject, EmailAddress } from '@open-archiver/types';
import type { GenericImapCredentials, EmailObject, EmailAddress, SyncState } from '@open-archiver/types';
import type { IEmailConnector } from '../EmailProviderFactory';
import { ImapFlow } from 'imapflow';
import { simpleParser, ParsedMail, Attachment, AddressObject } from 'mailparser';
export class ImapConnector implements IEmailConnector {
private client: ImapFlow;
private newMaxUid: number = 0;
constructor(private credentials: GenericImapCredentials) {
this.client = new ImapFlow({
@@ -30,14 +31,23 @@ export class ImapConnector implements IEmailConnector {
}
}
public async *fetchEmails(userEmail?: string, since?: Date): AsyncGenerator<EmailObject> {
public async *fetchEmails(userEmail: string, syncState?: SyncState | null): AsyncGenerator<EmailObject> {
await this.client.connect();
try {
await this.client.mailboxOpen('INBOX');
const searchCriteria = since ? { since } : { all: true };
const lastUid = syncState?.imap?.maxUid;
this.newMaxUid = lastUid || 0;
// If lastUid exists, fetch all emails with a UID greater than it.
// Otherwise, fetch all emails.
const searchCriteria = lastUid ? { uid: `${lastUid + 1}:*` } : { all: true };
for await (const msg of this.client.fetch(searchCriteria, { envelope: true, source: true, bodyStructure: true, uid: true })) {
if (msg.uid > this.newMaxUid) {
this.newMaxUid = msg.uid;
}
for await (const msg of this.client.fetch(searchCriteria, { envelope: true, source: true, bodyStructure: true })) {
if (msg.envelope && msg.source) {
const parsedEmail: ParsedMail = await simpleParser(msg.source);
const attachments = parsedEmail.attachments.map((attachment: Attachment) => ({
@@ -73,4 +83,12 @@ export class ImapConnector implements IEmailConnector {
await this.client.logout();
}
}
public getUpdatedSyncState(): SyncState {
return {
imap: {
maxUid: this.newMaxUid
}
};
}
}

View File

@@ -1,4 +1,4 @@
import type { Microsoft365Credentials, EmailObject, EmailAddress } from '@open-archiver/types';
import type { Microsoft365Credentials, EmailObject, EmailAddress, SyncState } from '@open-archiver/types';
import type { IEmailConnector } from '../EmailProviderFactory';
import { ConfidentialClientApplication } from '@azure/msal-node';
import { simpleParser, ParsedMail, Attachment, AddressObject } from 'mailparser';
@@ -8,6 +8,7 @@ const GRAPH_API_ENDPOINT = 'https://graph.microsoft.com/v1.0';
export class MicrosoftConnector implements IEmailConnector {
private cca: ConfidentialClientApplication;
private newDeltaToken: string | undefined;
constructor(private credentials: Microsoft365Credentials) {
this.cca = new ConfidentialClientApplication({
@@ -38,23 +39,42 @@ export class MicrosoftConnector implements IEmailConnector {
}
}
public async *fetchEmails(userEmail?: string, since?: Date): AsyncGenerator<EmailObject> {
public async *fetchEmails(userEmail: string, syncState?: SyncState | null): AsyncGenerator<EmailObject> {
const accessToken = await this.getAccessToken();
const headers = { Authorization: `Bearer ${accessToken}` };
let nextLink: string | undefined = `${GRAPH_API_ENDPOINT}/users/me/messages`;
if (since) {
nextLink += `?$filter=receivedDateTime ge ${since.toISOString()}`;
let nextLink: string | undefined;
const deltaToken = syncState?.microsoft?.[userEmail]?.deltaToken;
if (deltaToken) {
nextLink = `${GRAPH_API_ENDPOINT}/me/mailFolders/allmail/messages/delta?$deltaToken=${deltaToken}`;
} else {
nextLink = `${GRAPH_API_ENDPOINT}/me/mailFolders/allmail/messages/delta`;
}
while (nextLink) {
const res: { data: { value: any[]; '@odata.nextLink'?: string; }; } = await axios.get(
const res: { data: { value: any[]; '@odata.nextLink'?: string; '@odata.deltaLink'?: string; }; } = await axios.get(
nextLink,
{ headers }
);
const messages = res.data.value;
const deltaLink = res.data['@odata.deltaLink'];
if (deltaLink) {
const deltaToken = new URL(deltaLink).searchParams.get('$deltatoken');
if (deltaToken) {
this.newDeltaToken = deltaToken;
}
}
for (const message of messages) {
// Skip if the message is deleted
if (message['@removed']) {
continue;
}
const rawContentRes = await axios.get(
`${GRAPH_API_ENDPOINT}/users/me/messages/${message.id}/$value`,
{ headers }
@@ -96,4 +116,17 @@ export class MicrosoftConnector implements IEmailConnector {
nextLink = res.data['@odata.nextLink'];
}
}
public getUpdatedSyncState(userEmail: string): SyncState {
if (!this.newDeltaToken) {
return {};
}
return {
microsoft: {
[userEmail]: {
deltaToken: this.newDeltaToken
}
}
};
}
}

View File

@@ -2,14 +2,20 @@ import { Worker } from 'bullmq';
import { connection } from '../config/redis';
import initialImportProcessor from '../jobs/processors/initial-import.processor';
import continuousSyncProcessor from '../jobs/processors/continuous-sync.processor';
import scheduleContinuousSyncProcessor from '../jobs/processors/schedule-continuous-sync.processor';
import { processMailboxProcessor } from '../jobs/processors/process-mailbox.processor';
import syncCycleFinishedProcessor from '../jobs/processors/sync-cycle-finished.processor';
const processor = async (job: any) => {
switch (job.name) {
case 'initial-import':
return initialImportProcessor(job);
case 'sync-cycle-finished':
return syncCycleFinishedProcessor(job);
case 'continuous-sync':
return continuousSyncProcessor(job);
case 'schedule-continuous-sync':
return scheduleContinuousSyncProcessor(job);
case 'process-mailbox':
return processMailboxProcessor(job);
default:

View File

@@ -89,7 +89,9 @@
{#if ingestionSources.length > 0}
{#each ingestionSources as source (source.id)}
<Table.Row>
<Table.Cell>{source.name}</Table.Cell>
<Table.Cell>
<a href="/dashboard/archived-emails?ingestionSourceId={source.id}">{source.name}</a>
</Table.Cell>
<Table.Cell>{source.provider}</Table.Cell>
<Table.Cell>{source.status}</Table.Cell>
<Table.Cell>{new Date(source.createdAt).toLocaleDateString()}</Table.Cell>

View File

@@ -1,3 +1,20 @@
export type SyncState = {
google?: {
[userEmail: string]: {
historyId: string;
};
};
microsoft?: {
[userEmail: string]: {
deltaToken: string;
};
};
imap?: {
maxUid: number;
};
lastSyncTimestamp?: string;
};
export type IngestionProvider = 'google_workspace' | 'microsoft_365' | 'generic_imap';
export type IngestionStatus =
@@ -51,6 +68,10 @@ export interface IngestionSource {
createdAt: Date;
updatedAt: Date;
credentials: IngestionCredentials;
lastSyncStartedAt?: Date | null;
lastSyncFinishedAt?: Date | null;
lastSyncStatusMessage?: string | null;
syncState?: SyncState | null;
}
export interface CreateIngestionSourceDto {
@@ -67,6 +88,11 @@ export interface UpdateIngestionSourceDto {
lastSyncStartedAt?: Date;
lastSyncFinishedAt?: Date;
lastSyncStatusMessage?: string;
syncState?: SyncState;
}
export interface IContinuousSyncJob {
ingestionSourceId: string;
}
export interface IInitialImportJob {

File diff suppressed because one or more lines are too long