diff --git a/package.json b/package.json index 9703dbb..65b3bbb 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "open-archiver", - "version": "0.4.0", + "version": "0.4.1", "private": true, "license": "SEE LICENSE IN LICENSE file", "scripts": { diff --git a/packages/backend/src/jobs/queues.ts b/packages/backend/src/jobs/queues.ts index 188fe79..88e7f2e 100644 --- a/packages/backend/src/jobs/queues.ts +++ b/packages/backend/src/jobs/queues.ts @@ -1,8 +1,13 @@ import { Queue, FlowProducer } from 'bullmq'; import { connection } from '../config/redis'; +import { logger } from '../config/logger'; export const flowProducer = new FlowProducer({ connection }); +flowProducer.on('error', (err) => { + logger.error(err, 'FlowProducer connection error'); +}); + // Default job options const defaultJobOptions = { attempts: 5, @@ -23,7 +28,15 @@ export const ingestionQueue = new Queue('ingestion', { defaultJobOptions, }); +ingestionQueue.on('error', (err) => { + logger.error(err, 'Ingestion queue connection error'); +}); + export const indexingQueue = new Queue('indexing', { connection, defaultJobOptions, }); + +indexingQueue.on('error', (err) => { + logger.error(err, 'Indexing queue connection error'); +}); diff --git a/packages/backend/src/workers/ingestion.worker.ts b/packages/backend/src/workers/ingestion.worker.ts index 8b9ff6a..558624a 100644 --- a/packages/backend/src/workers/ingestion.worker.ts +++ b/packages/backend/src/workers/ingestion.worker.ts @@ -1,5 +1,6 @@ import { Worker } from 'bullmq'; import { connection } from '../config/redis'; +import { logger } from '../config/logger'; import initialImportProcessor from '../jobs/processors/initial-import.processor'; import continuousSyncProcessor from '../jobs/processors/continuous-sync.processor'; import scheduleContinuousSyncProcessor from '../jobs/processors/schedule-continuous-sync.processor'; @@ -25,6 +26,8 @@ const processor = async (job: any) => { const worker = new Worker('ingestion', processor, { connection, + concurrency: 5, + lockDuration: 1000 * 60 * 30, // 30 minutes removeOnComplete: { count: 100, // keep last 100 jobs }, @@ -33,7 +36,11 @@ const worker = new Worker('ingestion', processor, { }, }); -console.log('Ingestion worker started'); +worker.on('error', (err) => { + logger.error(err, 'Ingestion worker connection error'); +}); + +logger.info('Ingestion worker started'); process.on('SIGINT', () => worker.close()); process.on('SIGTERM', () => worker.close()); diff --git a/packages/frontend/src/routes/dashboard/admin/jobs/[queueName]/+page.svelte b/packages/frontend/src/routes/dashboard/admin/jobs/[queueName]/+page.svelte index 0f750aa..404b853 100644 --- a/packages/frontend/src/routes/dashboard/admin/jobs/[queueName]/+page.svelte +++ b/packages/frontend/src/routes/dashboard/admin/jobs/[queueName]/+page.svelte @@ -133,7 +133,7 @@ {/if}