diff --git a/.env.example b/.env.example index c0182a4..d6c551f 100644 --- a/.env.example +++ b/.env.example @@ -19,7 +19,8 @@ DATABASE_URL="postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres:5432/$ # Meilisearch MEILI_MASTER_KEY=aSampleMasterKey MEILI_HOST=http://meilisearch:7700 - +# The number of emails to batch together for indexing. Defaults to 500. +MEILI_INDEXING_BATCH=500 # Redis (We use Valkey, which is Redis-compatible and open source) @@ -60,6 +61,8 @@ RATE_LIMIT_WINDOW_MS=60000 # The maximum number of API requests allowed from an IP within the window. Defaults to 100. RATE_LIMIT_MAX_REQUESTS=100 + + # JWT # IMPORTANT: Change this to a long, random, and secret string in your .env file JWT_SECRET=a-very-secret-key-that-you-should-change @@ -70,3 +73,9 @@ JWT_EXPIRES_IN="7d" # IMPORTANT: Generate a secure, random 32-byte hex string for this # You can use `openssl rand -hex 32` to generate a key. ENCRYPTION_KEY= + +# Apache Tika Integration +# ONLY active if TIKA_URL is set +TIKA_URL=http://tika:9998 + + diff --git a/docker-compose.yml b/docker-compose.yml index d00714c..ab0f1f2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -52,6 +52,13 @@ services: networks: - open-archiver-net + tika: + image: apache/tika:3.2.2.0-full + container_name: tika + restart: always + networks: + - open-archiver-net + volumes: pgdata: driver: local diff --git a/packages/backend/src/config/index.ts b/packages/backend/src/config/index.ts index 506d065..a2a63e1 100644 --- a/packages/backend/src/config/index.ts +++ b/packages/backend/src/config/index.ts @@ -1,6 +1,6 @@ import { storage } from './storage'; import { app } from './app'; -import { searchConfig } from './search'; +import { searchConfig, meiliConfig } from './search'; import { connection as redisConfig } from './redis'; import { apiConfig } from './api'; @@ -8,6 +8,7 @@ export const config = { storage, app, search: searchConfig, + meili: meiliConfig, redis: redisConfig, api: apiConfig, }; diff --git a/packages/backend/src/config/search.ts b/packages/backend/src/config/search.ts index d79632c..8bf317b 100644 --- a/packages/backend/src/config/search.ts +++ b/packages/backend/src/config/search.ts @@ -4,3 +4,9 @@ export const searchConfig = { host: process.env.MEILI_HOST || 'http://127.0.0.1:7700', apiKey: process.env.MEILI_MASTER_KEY || '', }; + +export const meiliConfig = { + indexingBatchSize: process.env.MEILI_INDEXING_BATCH + ? parseInt(process.env.MEILI_INDEXING_BATCH) + : 500, +}; diff --git a/packages/backend/src/helpers/textExtractor.ts b/packages/backend/src/helpers/textExtractor.ts index 7a3eb76..f82c301 100644 --- a/packages/backend/src/helpers/textExtractor.ts +++ b/packages/backend/src/helpers/textExtractor.ts @@ -1,7 +1,10 @@ import PDFParser from 'pdf2json'; import mammoth from 'mammoth'; import xlsx from 'xlsx'; +import { logger } from '../config/logger'; +import { OcrService } from '../services/OcrService'; +// Legacy PDF extraction (with improved memory management) function extractTextFromPdf(buffer: Buffer): Promise { return new Promise((resolve) => { const pdfParser = new PDFParser(null, true); @@ -10,34 +13,60 @@ function extractTextFromPdf(buffer: Buffer): Promise { const finish = (text: string) => { if (completed) return; completed = true; - pdfParser.removeAllListeners(); + + // explicit cleanup + try { + pdfParser.removeAllListeners(); + } catch (e) { + // Ignore cleanup errors + } + resolve(text); }; - pdfParser.on('pdfParser_dataError', () => finish('')); - pdfParser.on('pdfParser_dataReady', () => finish(pdfParser.getRawTextContent())); + pdfParser.on('pdfParser_dataError', (err: any) => { + logger.warn('PDF parsing error:', err?.parserError || 'Unknown error'); + finish(''); + }); + + pdfParser.on('pdfParser_dataReady', () => { + try { + const text = pdfParser.getRawTextContent(); + finish(text || ''); + } catch (err) { + logger.warn('Error getting PDF text content:', err); + finish(''); + } + }); try { pdfParser.parseBuffer(buffer); } catch (err) { - console.error('Error parsing PDF buffer', err); + logger.error('Error parsing PDF buffer:', err); finish(''); } - // Prevent hanging if the parser never emits events - setTimeout(() => finish(''), 10000); + // reduced Timeout for better performance + setTimeout(() => { + logger.warn('PDF parsing timed out'); + finish(''); + }, 5000); }); } -export async function extractText(buffer: Buffer, mimeType: string): Promise { +// Legacy text extraction for various formats +async function extractTextLegacy(buffer: Buffer, mimeType: string): Promise { try { if (mimeType === 'application/pdf') { + // Check PDF size (memory protection) + if (buffer.length > 50 * 1024 * 1024) { // 50MB Limit + logger.warn('PDF too large for legacy extraction, skipping'); + return ''; + } return await extractTextFromPdf(buffer); } - if ( - mimeType === 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' - ) { + if (mimeType === 'application/vnd.openxmlformats-officedocument.wordprocessingml.document') { const { value } = await mammoth.extractRawText({ buffer }); return value; } @@ -50,7 +79,7 @@ export async function extractText(buffer: Buffer, mimeType: string): Promise { + // Input validation + if (!buffer || buffer.length === 0) { + return ''; } - console.warn(`Unsupported MIME type for text extraction: ${mimeType}`); - return ''; // Return empty string for unsupported types + if (!mimeType) { + logger.warn('No MIME type provided for text extraction'); + return ''; + } + + // General size limit + const maxSize = process.env.TIKA_URL ? 100 * 1024 * 1024 : 50 * 1024 * 1024; // 100MB for Tika, 50MB for Legacy + if (buffer.length > maxSize) { + logger.warn(`File too large for text extraction: ${buffer.length} bytes (limit: ${maxSize})`); + return ''; + } + + // Decide between Tika and legacy + const tikaUrl = process.env.TIKA_URL; + + if (tikaUrl) { + // Tika decides what it can parse + logger.debug(`Using Tika for text extraction: ${mimeType}`); + const ocrService = new OcrService() + try { + return await ocrService.extractTextWithTika(buffer, mimeType); + } catch (error) { + logger.error({ error }, "OCR text extraction failed, returning empty string") + return '' + } + } else { + // extract using legacy mode + return await extractTextLegacy(buffer, mimeType); + } } diff --git a/packages/backend/src/jobs/processors/index-email.processor.ts b/packages/backend/src/jobs/processors/index-email-batch.processor.ts similarity index 65% rename from packages/backend/src/jobs/processors/index-email.processor.ts rename to packages/backend/src/jobs/processors/index-email-batch.processor.ts index d40946e..dea7b5d 100644 --- a/packages/backend/src/jobs/processors/index-email.processor.ts +++ b/packages/backend/src/jobs/processors/index-email-batch.processor.ts @@ -3,14 +3,15 @@ import { IndexingService } from '../../services/IndexingService'; import { SearchService } from '../../services/SearchService'; import { StorageService } from '../../services/StorageService'; import { DatabaseService } from '../../services/DatabaseService'; +import { PendingEmail } from '@open-archiver/types'; const searchService = new SearchService(); const storageService = new StorageService(); const databaseService = new DatabaseService(); const indexingService = new IndexingService(databaseService, searchService, storageService); -export default async function (job: Job<{ emailId: string }>) { - const { emailId } = job.data; - console.log(`Indexing email with ID: ${emailId}`); - await indexingService.indexEmailById(emailId); +export default async function (job: Job<{ emails: PendingEmail[] }>) { + const { emails } = job.data; + console.log(`Indexing email batch with ${emails.length} emails`); + await indexingService.indexEmailBatch(emails); } diff --git a/packages/backend/src/jobs/processors/process-mailbox.processor.ts b/packages/backend/src/jobs/processors/process-mailbox.processor.ts index a183d2a..ce94001 100644 --- a/packages/backend/src/jobs/processors/process-mailbox.processor.ts +++ b/packages/backend/src/jobs/processors/process-mailbox.processor.ts @@ -1,9 +1,19 @@ import { Job } from 'bullmq'; -import { IProcessMailboxJob, SyncState, ProcessMailboxError } from '@open-archiver/types'; +import { + IProcessMailboxJob, + SyncState, + ProcessMailboxError, + PendingEmail, +} from '@open-archiver/types'; import { IngestionService } from '../../services/IngestionService'; import { logger } from '../../config/logger'; import { EmailProviderFactory } from '../../services/EmailProviderFactory'; import { StorageService } from '../../services/StorageService'; +import { IndexingService } from '../../services/IndexingService'; +import { SearchService } from '../../services/SearchService'; +import { DatabaseService } from '../../services/DatabaseService'; +import { config } from '../../config'; + /** * This processor handles the ingestion of emails for a single user's mailbox. @@ -15,9 +25,16 @@ import { StorageService } from '../../services/StorageService'; */ export const processMailboxProcessor = async (job: Job) => { const { ingestionSourceId, userEmail } = job.data; + const BATCH_SIZE: number = config.meili.indexingBatchSize; + let emailBatch: PendingEmail[] = []; logger.info({ ingestionSourceId, userEmail }, `Processing mailbox for user`); + const searchService = new SearchService(); + const storageService = new StorageService(); + const databaseService = new DatabaseService(); + const indexingService = new IndexingService(databaseService, searchService, storageService); + try { const source = await IngestionService.findById(ingestionSourceId); if (!source) { @@ -26,22 +43,38 @@ export const processMailboxProcessor = async (job: Job= BATCH_SIZE) { + await indexingService.indexEmailBatch(emailBatch); + emailBatch = []; + } + } } } + if (emailBatch.length > 0) { + await indexingService.indexEmailBatch(emailBatch); + emailBatch = []; + } + const newSyncState = connector.getUpdatedSyncState(userEmail); - logger.info({ ingestionSourceId, userEmail }, `Finished processing mailbox for user`); - - // Return the new sync state to be aggregated by the parent flow return newSyncState; } catch (error) { + if (emailBatch.length > 0) { + await indexingService.indexEmailBatch(emailBatch); + } + logger.error({ err: error, ingestionSourceId, userEmail }, 'Error processing mailbox'); const errorMessage = error instanceof Error ? error.message : 'An unknown error occurred'; const processMailboxError: ProcessMailboxError = { diff --git a/packages/backend/src/jobs/schedulers/sync-scheduler.ts b/packages/backend/src/jobs/schedulers/sync-scheduler.ts index a3706bf..90a2edc 100644 --- a/packages/backend/src/jobs/schedulers/sync-scheduler.ts +++ b/packages/backend/src/jobs/schedulers/sync-scheduler.ts @@ -8,6 +8,7 @@ const scheduleContinuousSync = async () => { 'schedule-continuous-sync', {}, { + jobId: 'schedule-continuous-sync', repeat: { pattern: config.app.syncFrequency, }, diff --git a/packages/backend/src/services/IndexingService.ts b/packages/backend/src/services/IndexingService.ts index d947f83..1b4bba8 100644 --- a/packages/backend/src/services/IndexingService.ts +++ b/packages/backend/src/services/IndexingService.ts @@ -1,4 +1,10 @@ -import { Attachment, EmailAddress, EmailDocument, EmailObject } from '@open-archiver/types'; +import { + Attachment, + EmailAddress, + EmailDocument, + EmailObject, + PendingEmail, +} from '@open-archiver/types'; import { SearchService } from './SearchService'; import { StorageService } from './StorageService'; import { extractText } from '../helpers/textExtractor'; @@ -7,6 +13,7 @@ import { archivedEmails, attachments, emailAttachments } from '../database/schem import { eq } from 'drizzle-orm'; import { streamToBuffer } from '../helpers/streamToBuffer'; import { simpleParser } from 'mailparser'; +import { logger } from '../config/logger'; interface DbRecipients { to: { name: string; address: string }[]; @@ -20,14 +27,45 @@ type AttachmentsType = { mimeType: string; }[]; +/** + * Sanitizes text content by removing invalid characters that could cause JSON serialization issues + */ +function sanitizeText(text: string): string { + if (!text) return ''; + + // Remove control characters and invalid UTF-8 sequences + return text + .replace(/\uFFFD/g, '') // Replacement character for invalid UTF-8 sequences + .replace(/[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]/g, '') // Remove control characters + .trim(); +} + +/** + * Recursively sanitize all string values in an object to prevent JSON issues + */ +function sanitizeObject(obj: T): T { + if (typeof obj === 'string') { + return sanitizeText(obj) as unknown as T; + } else if (Array.isArray(obj)) { + return obj.map(sanitizeObject) as unknown as T; + } else if (obj !== null && typeof obj === 'object') { + const sanitized: any = {}; + for (const key in obj) { + if (Object.prototype.hasOwnProperty.call(obj, key)) { + sanitized[key] = sanitizeObject((obj as any)[key]); + } + } + return sanitized; + } + return obj; +} + + export class IndexingService { private dbService: DatabaseService; private searchService: SearchService; private storageService: StorageService; - /** - * Initializes the service with its dependencies. - */ constructor( dbService: DatabaseService, searchService: SearchService, @@ -39,9 +77,129 @@ export class IndexingService { } /** - * Fetches an email by its ID from the database, creates a search document, and indexes it. + * Index multiple emails in a single batch operation for better performance */ - public async indexEmailById(emailId: string): Promise { + public async indexEmailBatch(emails: PendingEmail[]): Promise { + if (emails.length === 0) { + return; + } + + logger.info({ batchSize: emails.length }, 'Starting batch indexing of emails'); + + try { + const CONCURRENCY_LIMIT = 10; + const rawDocuments: EmailDocument[] = []; + + for (let i = 0; i < emails.length; i += CONCURRENCY_LIMIT) { + const batch = emails.slice(i, i + CONCURRENCY_LIMIT); + + const batchDocuments = await Promise.allSettled( + batch.map(async ({ email, sourceId, archivedId }) => { + try { + return await this.createEmailDocumentFromRawForBatch( + email, + sourceId, + archivedId, + email.userEmail || '' + ); + } catch (error) { + logger.error( + { + emailId: archivedId, + sourceId, + userEmail: email.userEmail || '', + rawEmailData: JSON.stringify(email, null, 2), + error: error instanceof Error ? error.message : String(error), + }, + 'Failed to create document for email in batch' + ); + throw error; + } + }) + ); + + for (const result of batchDocuments) { + if (result.status === 'fulfilled') { + rawDocuments.push(result.value); + } else { + logger.error({ error: result.reason }, 'Failed to process email in batch'); + } + } + } + + if (rawDocuments.length === 0) { + logger.warn('No documents created from email batch'); + return; + } + + // Sanitize all documents + const sanitizedDocuments = rawDocuments.map((doc) => sanitizeObject(doc)); + + // Ensure all required fields are present + const completeDocuments = sanitizedDocuments.map((doc) => + this.ensureEmailDocumentFields(doc) + ); + + // Validate each document and separate valid from invalid ones + const validDocuments: EmailDocument[] = []; + const invalidDocuments: { doc: any; reason: string }[] = []; + + for (const doc of completeDocuments) { + if (this.isValidEmailDocument(doc)) { + validDocuments.push(doc); + } else { + invalidDocuments.push({ doc, reason: 'JSON.stringify failed' }); + logger.warn({ document: doc }, 'Skipping invalid EmailDocument'); + } + } + + // Log detailed information for invalid documents + if (invalidDocuments.length > 0) { + for (const { doc } of invalidDocuments) { + logger.error( + { + emailId: doc.id, + document: JSON.stringify(doc, null, 2), + }, + 'Invalid EmailDocument details' + ); + } + } + + if (validDocuments.length === 0) { + logger.warn('No valid documents to index in batch.'); + return; + } + + logger.debug({ documentCount: validDocuments.length }, 'Sending batch to Meilisearch'); + + await this.searchService.addDocuments('emails', validDocuments, 'id'); + + logger.info( + { + batchSize: emails.length, + successfulDocuments: validDocuments.length, + failedDocuments: emails.length - validDocuments.length, + invalidDocuments: invalidDocuments.length, + }, + 'Successfully indexed email batch' + ); + } catch (error) { + logger.error( + { + batchSize: emails.length, + error: error instanceof Error ? error.message : String(error), + }, + 'Failed to index email batch' + ); + throw error; + } + } + + /** + * @deprecated + */ + private async indexEmailById(emailId: string): Promise { const email = await this.dbService.db.query.archivedEmails.findFirst({ where: eq(archivedEmails.id, emailId), }); @@ -75,16 +233,14 @@ export class IndexingService { } /** - * Indexes an email object directly, creates a search document, and indexes it. + * @deprecated */ - public async indexByEmail( - email: EmailObject, - ingestionSourceId: string, - archivedEmailId: string + private async indexByEmail( + pendingEmail: PendingEmail ): Promise { const attachments: AttachmentsType = []; - if (email.attachments && email.attachments.length > 0) { - for (const attachment of email.attachments) { + if (pendingEmail.email.attachments && pendingEmail.email.attachments.length > 0) { + for (const attachment of pendingEmail.email.attachments) { attachments.push({ buffer: attachment.content, filename: attachment.filename, @@ -93,19 +249,96 @@ export class IndexingService { } } const document = await this.createEmailDocumentFromRaw( - email, + pendingEmail.email, attachments, - ingestionSourceId, - archivedEmailId, - email.userEmail || '' + pendingEmail.sourceId, + pendingEmail.archivedId, + pendingEmail.email.userEmail || '' ); // console.log(document); await this.searchService.addDocuments('emails', [document], 'id'); } + /** * Creates a search document from a raw email object and its attachments. */ + private async createEmailDocumentFromRawForBatch( + email: EmailObject, + ingestionSourceId: string, + archivedEmailId: string, + userEmail: string + ): Promise { + const extractedAttachments: { filename: string; content: string }[] = []; + + if (email.attachments && email.attachments.length > 0) { + const ATTACHMENT_CONCURRENCY = 3; + + for (let i = 0; i < email.attachments.length; i += ATTACHMENT_CONCURRENCY) { + const attachmentBatch = email.attachments.slice(i, i + ATTACHMENT_CONCURRENCY); + + const attachmentResults = await Promise.allSettled( + attachmentBatch.map(async (attachment) => { + try { + if (!this.shouldExtractText(attachment.contentType)) { + return null; + } + + const textContent = await extractText( + attachment.content, + attachment.contentType || '' + ); + + return { + filename: attachment.filename, + content: textContent || '', + }; + } catch (error) { + logger.warn( + { + filename: attachment.filename, + mimeType: attachment.contentType, + emailId: archivedEmailId, + error: error instanceof Error ? error.message : String(error), + }, + 'Failed to extract text from attachment' + ); + return null; + } + }) + ); + + for (const result of attachmentResults) { + if (result.status === 'fulfilled' && result.value) { + extractedAttachments.push(result.value); + } + } + } + } + + const allAttachmentText = extractedAttachments + .map((att) => sanitizeText(att.content)) + .join(' '); + + const enhancedBody = [sanitizeText(email.body || email.html || ''), allAttachmentText] + .filter(Boolean) + .join('\n\n--- Attachments ---\n\n'); + + return { + id: archivedEmailId, + userEmail: userEmail, + from: email.from[0]?.address || '', + to: email.to?.map((addr: EmailAddress) => addr.address) || [], + cc: email.cc?.map((addr: EmailAddress) => addr.address) || [], + bcc: email.bcc?.map((addr: EmailAddress) => addr.address) || [], + subject: email.subject || '', + body: enhancedBody, + attachments: extractedAttachments, + timestamp: new Date(email.receivedAt).getTime(), + ingestionSourceId: ingestionSourceId, + }; + } + private async createEmailDocumentFromRaw( email: EmailObject, attachments: AttachmentsType, @@ -126,7 +359,6 @@ export class IndexingService { `Failed to extract text from attachment: ${attachment.filename}`, error ); - // skip attachment or fail the job } } // console.log('email.userEmail', userEmail); @@ -145,9 +377,6 @@ export class IndexingService { }; } - /** - * Creates a search document from a database email record and its attachments. - */ private async createEmailDocument( email: typeof archivedEmails.$inferSelect, attachments: Attachment[], @@ -181,9 +410,6 @@ export class IndexingService { }; } - /** - * Extracts text content from a list of attachments. - */ private async extractAttachmentContents( attachments: Attachment[] ): Promise<{ filename: string; content: string }[]> { @@ -202,9 +428,90 @@ export class IndexingService { `Failed to extract text from attachment: ${attachment.filename}`, error ); - // skip attachment or fail the job } } return extractedAttachments; } + + private shouldExtractText(mimeType: string): boolean { + if (process.env.TIKA_URL) { + return true; + } + + if (!mimeType) return false; + // Tika supported mime types: https://tika.apache.org/2.4.1/formats.html + const extractableTypes = [ + 'application/pdf', + 'application/msword', + 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', + 'application/vnd.ms-excel', + 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', + 'application/vnd.ms-powerpoint', + 'application/vnd.openxmlformats-officedocument.presentationml.presentation', + 'text/plain', + 'text/html', + 'application/rss+xml', + 'application/xml', + 'application/json', + 'text/rtf', + 'application/rtf', + 'text/csv', + 'text/tsv', + 'application/csv', + 'image/bpg', + 'image/png', + 'image/vnd.wap.wbmp', + 'image/x-jbig2', + 'image/bmp', + 'image/x-xcf', + 'image/gif', + 'image/x-icon', + 'image/jpeg', + 'image/x-ms-bmp', + 'image/webp', + 'image/tiff', + 'image/svg+xml', + 'application/vnd.apple.pages', + 'application/vnd.apple.numbers', + 'application/vnd.apple.keynote', + 'image/heic', + 'image/heif', + ]; + + + + return extractableTypes.some((type) => mimeType.toLowerCase().includes(type)); + } + + /** + * Ensures all required fields are present in EmailDocument + */ + private ensureEmailDocumentFields(doc: Partial): EmailDocument { + return { + id: doc.id || 'missing-id', + userEmail: doc.userEmail || 'unknown', + from: doc.from || '', + to: Array.isArray(doc.to) ? doc.to : [], + cc: Array.isArray(doc.cc) ? doc.cc : [], + bcc: Array.isArray(doc.bcc) ? doc.bcc : [], + subject: doc.subject || '', + body: doc.body || '', + attachments: Array.isArray(doc.attachments) ? doc.attachments : [], + timestamp: typeof doc.timestamp === 'number' ? doc.timestamp : Date.now(), + ingestionSourceId: doc.ingestionSourceId || 'unknown', + }; + } + + /** + * Validates if the given object is a valid EmailDocument that can be serialized to JSON + */ + private isValidEmailDocument(doc: any): boolean { + try { + JSON.stringify(doc); + return true; + } catch (error) { + logger.error({ doc, error: (error as Error).message }, 'Invalid EmailDocument detected'); + return false; + } + } } diff --git a/packages/backend/src/services/IngestionService.ts b/packages/backend/src/services/IngestionService.ts index 1818c8e..e0eeb02 100644 --- a/packages/backend/src/services/IngestionService.ts +++ b/packages/backend/src/services/IngestionService.ts @@ -6,6 +6,7 @@ import type { IngestionSource, IngestionCredentials, IngestionProvider, + PendingEmail, } from '@open-archiver/types'; import { and, desc, eq } from 'drizzle-orm'; import { CryptoService } from './CryptoService'; @@ -302,7 +303,7 @@ export class IngestionService { source: IngestionSource, storage: StorageService, userEmail: string - ): Promise { + ): Promise { try { // Generate a unique message ID for the email. If the email already has a message-id header, use that. // Otherwise, generate a new one based on the email's hash, source ID, and email ID. @@ -331,7 +332,7 @@ export class IngestionService { { messageId, ingestionSourceId: source.id }, 'Skipping duplicate email' ); - return; + return null; } const emlBuffer = email.eml ?? Buffer.from(email.body, 'utf-8'); @@ -398,23 +399,14 @@ export class IngestionService { .onConflictDoNothing(); } } - // adding to indexing queue - //Instead: index by email (raw email object, ingestion id) - logger.info({ emailId: archivedEmail.id }, 'Indexing email'); - // await indexingQueue.add('index-email', { - // emailId: archivedEmail.id, - // }); - const searchService = new SearchService(); - const storageService = new StorageService(); - const databaseService = new DatabaseService(); - const indexingService = new IndexingService( - databaseService, - searchService, - storageService - ); - //assign userEmail + email.userEmail = userEmail; - await indexingService.indexByEmail(email, source.id, archivedEmail.id); + + return { + email, + sourceId: source.id, + archivedId: archivedEmail.id, + }; } catch (error) { logger.error({ message: `Failed to process email ${email.id} for source ${source.id}`, @@ -422,6 +414,7 @@ export class IngestionService { emailId: email.id, ingestionSourceId: source.id, }); + return null; } } } diff --git a/packages/backend/src/services/OcrService.ts b/packages/backend/src/services/OcrService.ts new file mode 100644 index 0000000..1aceca7 --- /dev/null +++ b/packages/backend/src/services/OcrService.ts @@ -0,0 +1,271 @@ +import crypto from 'crypto'; +import { logger } from '../config/logger'; + +// Simple LRU cache for Tika results with statistics +class TikaCache { + private cache = new Map(); + private maxSize = 50; + private hits = 0; + private misses = 0; + + get(key: string): string | undefined { + const value = this.cache.get(key); + if (value !== undefined) { + this.hits++; + // LRU: Move element to the end + this.cache.delete(key); + this.cache.set(key, value); + } else { + this.misses++; + } + return value; + } + + set(key: string, value: string): void { + // If already exists, delete first + if (this.cache.has(key)) { + this.cache.delete(key); + } + // If cache is full, remove oldest element + else if (this.cache.size >= this.maxSize) { + const firstKey = this.cache.keys().next().value; + if (firstKey !== undefined) { + this.cache.delete(firstKey); + } + } + + this.cache.set(key, value); + } + + getStats(): { size: number; maxSize: number; hits: number; misses: number; hitRate: number } { + const total = this.hits + this.misses; + const hitRate = total > 0 ? (this.hits / total) * 100 : 0; + return { + size: this.cache.size, + maxSize: this.maxSize, + hits: this.hits, + misses: this.misses, + hitRate: Math.round(hitRate * 100) / 100 // 2 decimal places + }; + } + + reset(): void { + this.cache.clear(); + this.hits = 0; + this.misses = 0; + } +} + +// Semaphore for running Tika requests +class TikaSemaphore { + private inProgress = new Map>(); + private waitCount = 0; + + async acquire(key: string, operation: () => Promise): Promise { + // Check if a request for this key is already running + const existingPromise = this.inProgress.get(key); + if (existingPromise) { + this.waitCount++; + logger.debug(`Waiting for in-progress Tika request (${key.slice(0, 8)}...)`); + try { + return await existingPromise; + } finally { + this.waitCount--; + } + } + + // Start new request + const promise = this.executeOperation(key, operation); + this.inProgress.set(key, promise); + + try { + return await promise; + } finally { + // Remove promise from map when finished + this.inProgress.delete(key); + } + } + + private async executeOperation(key: string, operation: () => Promise): Promise { + try { + return await operation(); + } catch (error) { + // Remove promise from map even on errors + logger.error(`Tika operation failed for key ${key.slice(0, 8)}...`, error); + throw error; + } + } + + getStats(): { inProgress: number; waitCount: number } { + return { + inProgress: this.inProgress.size, + waitCount: this.waitCount + }; + } + + clear(): void { + this.inProgress.clear(); + this.waitCount = 0; + } +} + +export class OcrService { + private tikaCache = new TikaCache(); + private tikaSemaphore = new TikaSemaphore(); + + // Tika-based text extraction with cache and semaphore + async extractTextWithTika(buffer: Buffer, mimeType: string): Promise { + const tikaUrl = process.env.TIKA_URL; + if (!tikaUrl) { + throw new Error('TIKA_URL environment variable not set'); + } + + // Cache key: SHA-256 hash of the buffer + const hash = crypto.createHash('sha256').update(buffer).digest('hex'); + + // Cache lookup (before semaphore!) + const cachedResult = this.tikaCache.get(hash); + if (cachedResult !== undefined) { + logger.debug(`Tika cache hit for ${mimeType} (${buffer.length} bytes)`); + return cachedResult; + } + + // Use semaphore to deduplicate parallel requests + return await this.tikaSemaphore.acquire(hash, async () => { + // Check cache again (might have been filled by parallel request) + const cachedAfterWait = this.tikaCache.get(hash); + if (cachedAfterWait !== undefined) { + logger.debug(`Tika cache hit after wait for ${mimeType} (${buffer.length} bytes)`); + return cachedAfterWait; + } + + logger.debug(`Executing Tika request for ${mimeType} (${buffer.length} bytes)`); + + // DNS fallback: If "tika" hostname, also try localhost + const urlsToTry = [ + `${tikaUrl}/tika`, + // Fallback falls DNS-Problem mit "tika" hostname + ...(tikaUrl.includes('://tika:') + ? [`${tikaUrl.replace('://tika:', '://localhost:')}/tika`] + : []) + ]; + + for (const url of urlsToTry) { + try { + logger.debug(`Trying Tika URL: ${url}`); + const response = await fetch(url, { + method: 'PUT', + headers: { + 'Content-Type': mimeType || 'application/octet-stream', + Accept: 'text/plain', + Connection: 'close' + }, + body: buffer, + signal: AbortSignal.timeout(180000) + }); + + if (!response.ok) { + logger.warn( + `Tika extraction failed at ${url}: ${response.status} ${response.statusText}` + ); + continue; // Try next URL + } + + const text = await response.text(); + const result = text.trim(); + + // Cache result (also empty strings to avoid repeated attempts) + this.tikaCache.set(hash, result); + + const cacheStats = this.tikaCache.getStats(); + const semaphoreStats = this.tikaSemaphore.getStats(); + logger.debug( + `Tika extraction successful - Cache: ${cacheStats.hits}H/${cacheStats.misses}M (${cacheStats.hitRate}%) - Semaphore: ${semaphoreStats.inProgress} active, ${semaphoreStats.waitCount} waiting` + ); + + return result; + } catch (error) { + logger.warn( + `Tika extraction error at ${url}:`, + error instanceof Error ? error.message : 'Unknown error' + ); + // Continue to next URL + } + } + + // All URLs failed - cache this too (as empty string) + logger.error('All Tika URLs failed'); + this.tikaCache.set(hash, ''); + return ''; + }); + } + + // Helper function to check Tika availability + async checkTikaAvailability(): Promise { + const tikaUrl = process.env.TIKA_URL; + if (!tikaUrl) { + return false; + } + + try { + const response = await fetch(`${tikaUrl}/version`, { + method: 'GET', + signal: AbortSignal.timeout(5000) // 5 seconds timeout + }); + + if (response.ok) { + const version = await response.text(); + logger.info(`Tika server available, version: ${version.trim()}`); + return true; + } + + return false; + } catch (error) { + logger.warn( + 'Tika server not available:', + error instanceof Error ? error.message : 'Unknown error' + ); + return false; + } + } + + // Optional: Tika health check on startup + async initializeTextExtractor(): Promise { + const tikaUrl = process.env.TIKA_URL; + + if (tikaUrl) { + const isAvailable = await this.checkTikaAvailability(); + if (!isAvailable) { + logger.error(`Tika server configured but not available at: ${tikaUrl}`); + logger.error('Text extraction will fall back to legacy methods or fail'); + } + } else { + logger.info('Using legacy text extraction methods (pdf2json, mammoth, xlsx)'); + logger.info('Set TIKA_URL environment variable to use Apache Tika for better extraction'); + } + } + + // Get cache statistics + getTikaCacheStats(): { + size: number; + maxSize: number; + hits: number; + misses: number; + hitRate: number; + } { + return this.tikaCache.getStats(); + } + + // Get semaphore statistics + getTikaSemaphoreStats(): { inProgress: number; waitCount: number } { + return this.tikaSemaphore.getStats(); + } + + // Clear cache (e.g. for tests or manual reset) + clearTikaCache(): void { + this.tikaCache.reset(); + this.tikaSemaphore.clear(); + logger.info('Tika cache and semaphore cleared'); + } +} + diff --git a/packages/backend/src/workers/indexing.worker.ts b/packages/backend/src/workers/indexing.worker.ts index 56686b9..3fcbf25 100644 --- a/packages/backend/src/workers/indexing.worker.ts +++ b/packages/backend/src/workers/indexing.worker.ts @@ -1,11 +1,11 @@ import { Worker } from 'bullmq'; import { connection } from '../config/redis'; -import indexEmailProcessor from '../jobs/processors/index-email.processor'; +import indexEmailBatchProcessor from '../jobs/processors/index-email-batch.processor'; const processor = async (job: any) => { switch (job.name) { - case 'index-email': - return indexEmailProcessor(job); + case 'index-email-batch': + return indexEmailBatchProcessor(job); default: throw new Error(`Unknown job name: ${job.name}`); } diff --git a/packages/frontend/src/lib/components/custom/IngestionSourceForm.svelte b/packages/frontend/src/lib/components/custom/IngestionSourceForm.svelte index 7a46f5c..32e0f22 100644 --- a/packages/frontend/src/lib/components/custom/IngestionSourceForm.svelte +++ b/packages/frontend/src/lib/components/custom/IngestionSourceForm.svelte @@ -211,7 +211,7 @@
- +