Job queue management setup

This commit is contained in:
Wayne
2025-07-12 12:39:41 +03:00
parent 7ed8d78d73
commit f4d48a4e5a
13 changed files with 189 additions and 21 deletions

View File

@@ -66,7 +66,7 @@ Open Archive is built on a modern, scalable, and maintainable technology stack:
This command will build the necessary Docker images and start all the services (frontend, backend, database, etc.) in the background.
4. **Access the application:**
Once the services are running, you can access the Open Archive web interface by navigating to `http://localhost:5173` in your web browser.
Once the services are running, you can access the Open Archive web interface by navigating to `http://localhost:3000` in your web browser.
## Contributing

View File

@@ -24,11 +24,22 @@ services:
env_file:
- ./.env
worker:
ingestion-worker:
build:
context: ./packages/backend
dockerfile: Dockerfile
command: 'pnpm run start:worker'
command: 'pnpm ts-node-dev --respawn --transpile-only src/workers/ingestion.worker.ts'
depends_on:
- postgres
- redis
env_file:
- ./.env
indexing-worker:
build:
context: ./packages/backend
dockerfile: Dockerfile
command: 'pnpm ts-node-dev --respawn --transpile-only src/workers/indexing.worker.ts'
depends_on:
- postgres
- redis

View File

@@ -8,7 +8,8 @@
"build": "tsc",
"prestart": "npm run build",
"start": "node dist/index.js",
"start:worker": "ts-node-dev --respawn --transpile-only src/workers/ingestion.worker.ts",
"start:ingestion-worker": "ts-node-dev --respawn --transpile-only src/workers/ingestion.worker.ts",
"start:indexing-worker": "ts-node-dev --respawn --transpile-only src/workers/indexing.worker.ts",
"db:generate": "drizzle-kit generate --config=drizzle.config.ts",
"db:push": "drizzle-kit push --config=drizzle.config.ts",
"db:migrate": "ts-node-dev src/database/migrate.ts"
@@ -36,8 +37,11 @@
"tsconfig-paths": "^4.2.0"
},
"devDependencies": {
"@bull-board/api": "^6.11.0",
"@bull-board/express": "^6.11.0",
"@types/express": "^5.0.3",
"@types/node": "^24.0.12",
"bull-board": "^2.1.3",
"drizzle-kit": "^0.31.4",
"ts-node-dev": "^2.0.0",
"typescript": "^5.8.3"

View File

@@ -0,0 +1,10 @@
import 'dotenv/config';
/**
* @see https://github.com/taskforcesh/bullmq/blob/master/docs/gitbook/guide/connections.md
*/
export const connection = {
host: process.env.REDIS_HOST || 'localhost',
port: (process.env.REDIS_PORT && parseInt(process.env.REDIS_PORT, 10)) || 6379,
maxRetriesPerRequest: null
};

View File

@@ -0,0 +1,11 @@
import { Job } from 'bullmq';
import { IngestionService } from '../../services/IngestionService';
import { IInitialImportJob } from '@open-archive/types';
const ingestionService = new IngestionService();
export default async (job: Job<IInitialImportJob>) => {
console.log(`Processing continuous sync for ingestion source: ${job.data.ingestionSourceId}`);
// This would be similar to performBulkImport, but would likely use the `since` parameter.
// For now, we'll just log a message.
};

View File

@@ -0,0 +1,24 @@
import { Job } from 'bullmq';
import { StorageService } from '../../services/StorageService';
// import { SearchService } from '../../services/SearchService';
// const storageService = new StorageService();
// const searchService = new SearchService();
interface IIndexEmailJob {
filePath: string;
ingestionSourceId: string;
}
export default async (job: Job<IIndexEmailJob>) => {
const { filePath, ingestionSourceId } = job.data;
console.log(`Processing index-email for file: ${filePath}`);
// TODO:
// 1. Read the email file from storage.
// 2. Parse the email content.
// 3. Index the email in the search engine.
// const emailContent = await storageService.get(filePath);
// await searchService.indexDocument(ingestionSourceId, emailContent);
};

View File

@@ -0,0 +1,10 @@
import { Job } from 'bullmq';
import { IngestionService } from '../../services/IngestionService';
import { IInitialImportJob } from '@open-archive/types';
const ingestionService = new IngestionService();
export default async (job: Job<IInitialImportJob>) => {
console.log(`Processing initial import for ingestion source: ${job.data.ingestionSourceId}`);
await ingestionService.performBulkImport(job.data);
};

View File

@@ -0,0 +1,27 @@
import { Queue } from 'bullmq';
import { connection } from '../config/redis';
// Default job options
const defaultJobOptions = {
attempts: 5,
backoff: {
type: 'exponential',
delay: 1000
},
removeOnComplete: {
count: 1000
},
removeOnFail: {
count: 5000
}
};
export const ingestionQueue = new Queue('ingestion', {
connection,
defaultJobOptions
});
export const indexingQueue = new Queue('indexing', {
connection,
defaultJobOptions
});

View File

@@ -7,24 +7,12 @@ import type {
IngestionCredentials
} from '@open-archive/types';
import { eq } from 'drizzle-orm';
import { Queue } from 'bullmq';
import { CryptoService } from './CryptoService';
import { EmailProviderFactory } from './EmailProviderFactory';
import { ingestionQueue, indexingQueue } from '../jobs/queues';
import { StorageService } from './StorageService';
import type { IInitialImportJob } from '@open-archive/types';
// This assumes you have a BullMQ queue instance exported from somewhere
// In a real setup, this would be injected or imported from a central place.
let initialImportQueue: Queue;
// TODO: Initialize and connect to the actual BullMQ queue.
// For now, we'll use a mock for demonstration purposes.
if (process.env.NODE_ENV !== 'production') {
initialImportQueue = {
add: async (name: string, data: any) => {
console.log(`Mock Queue: Job '${name}' added with data:`, data);
return Promise.resolve({} as any);
}
} as Queue;
}
export class IngestionService {
private static decryptSource(source: typeof ingestionSources.$inferSelect): IngestionSource {
@@ -111,8 +99,52 @@ export class IngestionService {
public static async triggerInitialImport(id: string): Promise<IngestionSource> {
const source = await this.findById(id);
await initialImportQueue.add('initial-import', { ingestionSourceId: source.id });
await ingestionQueue.add('initial-import', { ingestionSourceId: source.id });
return await this.update(id, { status: 'syncing' });
}
public async performBulkImport(job: IInitialImportJob): Promise<void> {
const { ingestionSourceId } = job;
const source = await IngestionService.findById(ingestionSourceId);
if (!source) {
throw new Error(`Ingestion source ${ingestionSourceId} not found.`);
}
console.log(`Starting bulk import for source: ${source.name} (${source.id})`);
await IngestionService.update(ingestionSourceId, {
status: 'syncing',
lastSyncStartedAt: new Date()
});
const connector = EmailProviderFactory.createConnector(source);
const storage = new StorageService();
try {
for await (const email of connector.fetchEmails()) {
const filePath = `${source.id}/${email.id}.eml`;
await storage.put(filePath, Buffer.from(email.body, 'utf-8'));
await indexingQueue.add('index-email', {
filePath,
ingestionSourceId: source.id
});
}
await IngestionService.update(ingestionSourceId, {
status: 'active',
lastSyncFinishedAt: new Date(),
lastSyncStatusMessage: 'Successfully completed bulk import.'
});
console.log(`Bulk import finished for source: ${source.name} (${source.id})`);
} catch (error) {
console.error(`Bulk import failed for source: ${source.name} (${source.id})`, error);
await IngestionService.update(ingestionSourceId, {
status: 'error',
lastSyncFinishedAt: new Date(),
lastSyncStatusMessage: error instanceof Error ? error.message : 'An unknown error occurred.'
});
throw error; // Re-throw to allow BullMQ to handle the job failure
}
}
}

View File

@@ -0,0 +1,10 @@
import { Worker } from 'bullmq';
import { connection } from '../config/redis';
import indexEmailProcessor from '../jobs/processors/index-email.processor';
const worker = new Worker('indexing', indexEmailProcessor, { connection });
console.log('Indexing worker started');
process.on('SIGINT', () => worker.close());
process.on('SIGTERM', () => worker.close());

View File

@@ -0,0 +1,22 @@
import { Worker } from 'bullmq';
import { connection } from '../config/redis';
import initialImportProcessor from '../jobs/processors/initial-import.processor';
import continuousSyncProcessor from '../jobs/processors/continuous-sync.processor';
const processor = async (job: any) => {
switch (job.name) {
case 'initial-import':
return initialImportProcessor(job);
case 'continuous-sync':
return continuousSyncProcessor(job);
default:
throw new Error(`Unknown job name: ${job.name}`);
}
};
const worker = new Worker('ingestion', processor, { connection });
console.log('Ingestion worker started');
process.on('SIGINT', () => worker.close());
process.on('SIGTERM', () => worker.close());

View File

@@ -57,4 +57,11 @@ export interface UpdateIngestionSourceDto {
provider?: IngestionProvider;
status?: IngestionStatus;
providerConfig?: Record<string, any>;
lastSyncStartedAt?: Date;
lastSyncFinishedAt?: Date;
lastSyncStatusMessage?: string;
}
export interface IInitialImportJob {
ingestionSourceId: string;
}

File diff suppressed because one or more lines are too long