diff --git a/README.md b/README.md index 70150d7..33405a4 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ [![Redis](https://img.shields.io/badge/Redis-DC382D?style=for-the-badge&logo=redis&logoColor=white)](https://redis.io) [![SvelteKit](https://img.shields.io/badge/SvelteKit-FF3E00?style=for-the-badge&logo=svelte&logoColor=white)](https://svelte.dev/) -**A secure, sovereign, and open-source platform for email archiving and eDiscovery.** +**A secure, sovereign, and open-source platform for email archiving.** Open Archiver provides a robust, self-hosted solution for archiving, storing, indexing, and searching emails from major platforms, including Google Workspace (Gmail), Microsoft 365, PST files, as well as generic IMAP-enabled email inboxes. Use Open Archiver to keep a permanent, tamper-proof record of your communication history, free from vendor lock-in. @@ -48,13 +48,13 @@ Password: openarchiver_demo - Zipped .eml files - Mbox files -- **Secure & Efficient Storage**: Emails are stored in the standard `.eml` format. The system uses deduplication and compression to minimize storage costs. All data is encrypted at rest. +- **Secure & Efficient Storage**: Emails are stored in the standard `.eml` format. The system uses deduplication and compression to minimize storage costs. All files are encrypted at rest. - **Pluggable Storage Backends**: Support both local filesystem storage and S3-compatible object storage (like AWS S3 or MinIO). - **Powerful Search & eDiscovery**: A high-performance search engine indexes the full text of emails and attachments (PDF, DOCX, etc.). - **Thread discovery**: The ability to discover if an email belongs to a thread/conversation and present the context. - **Compliance & Retention**: Define granular retention policies to automatically manage the lifecycle of your data. Place legal holds on communications to prevent deletion during litigation (TBD). - **File Hash and Encryption**: Email and attachment file hash values are stored in the meta database upon ingestion, meaning any attempt to alter the file content will be identified, ensuring legal and regulatory compliance. -- **Comprehensive Auditing**: An immutable audit trail logs all system activities, ensuring you have a clear record of who accessed what and when (TBD). +- **Comprehensive Auditing**: An immutable audit trail logs all system activities, ensuring you have a clear record of who accessed what and when. ## 🛠️ Tech Stack diff --git a/assets/screenshots/job-queue.png b/assets/screenshots/job-queue.png new file mode 100644 index 0000000..80bc6e3 Binary files /dev/null and b/assets/screenshots/job-queue.png differ diff --git a/docs/user-guides/integrity-check.md b/docs/user-guides/integrity-check.md index cd3ec58..3f69a2f 100644 --- a/docs/user-guides/integrity-check.md +++ b/docs/user-guides/integrity-check.md @@ -1,4 +1,4 @@ -# Email Integrity Check +# Integrity Check Open Archiver allows you to verify the integrity of your archived emails and their attachments. This guide explains how the integrity check works and what the results mean. diff --git a/packages/backend/src/helpers/textExtractor.ts b/packages/backend/src/helpers/textExtractor.ts index 07335ae..e3b02ed 100644 --- a/packages/backend/src/helpers/textExtractor.ts +++ b/packages/backend/src/helpers/textExtractor.ts @@ -47,10 +47,10 @@ function extractTextFromPdf(buffer: Buffer): Promise { } // reduced Timeout for better performance - setTimeout(() => { - logger.warn('PDF parsing timed out'); - finish(''); - }, 5000); + // setTimeout(() => { + // logger.warn('PDF parsing timed out'); + // finish(''); + // }, 5000); }); } diff --git a/packages/backend/src/jobs/processors/process-mailbox.processor.ts b/packages/backend/src/jobs/processors/process-mailbox.processor.ts index 0d95914..817834a 100644 --- a/packages/backend/src/jobs/processors/process-mailbox.processor.ts +++ b/packages/backend/src/jobs/processors/process-mailbox.processor.ts @@ -33,7 +33,6 @@ export const processMailboxProcessor = async (job: Job 0) { - await indexingService.indexEmailBatch(emailBatch); + await indexingQueue.add('index-email-batch', { emails: emailBatch }); + emailBatch = []; } logger.error({ err: error, ingestionSourceId, userEmail }, 'Error processing mailbox'); diff --git a/packages/backend/src/jobs/processors/sync-cycle-finished.processor.ts b/packages/backend/src/jobs/processors/sync-cycle-finished.processor.ts index 98c17e8..3a1f88e 100644 --- a/packages/backend/src/jobs/processors/sync-cycle-finished.processor.ts +++ b/packages/backend/src/jobs/processors/sync-cycle-finished.processor.ts @@ -49,9 +49,10 @@ export default async (job: Job) => { // if data doesn't have error property, it is a successful job with SyncState const successfulJobs = allChildJobs.filter((v) => !v || !(v as any).error) as SyncState[]; - const finalSyncState = deepmerge( - ...successfulJobs.filter((s) => s && Object.keys(s).length > 0) - ); + const finalSyncState = + successfulJobs.length > 0 + ? deepmerge(...successfulJobs.filter((s) => s && Object.keys(s).length > 0)) + : {}; const source = await IngestionService.findById(ingestionSourceId); let status: IngestionStatus = 'active'; @@ -63,7 +64,9 @@ export default async (job: Job) => { let message: string; // Check for a specific rate-limit message from the successful jobs - const rateLimitMessage = successfulJobs.find((j) => j.statusMessage)?.statusMessage; + const rateLimitMessage = successfulJobs.find( + (j) => j.statusMessage && j.statusMessage.includes('rate limit') + )?.statusMessage; if (failedJobs.length > 0) { status = 'error'; diff --git a/packages/backend/src/services/IndexingService.ts b/packages/backend/src/services/IndexingService.ts index f5b0bc2..0df3eef 100644 --- a/packages/backend/src/services/IndexingService.ts +++ b/packages/backend/src/services/IndexingService.ts @@ -93,21 +93,17 @@ export class IndexingService { const batch = emails.slice(i, i + CONCURRENCY_LIMIT); const batchDocuments = await Promise.allSettled( - batch.map(async ({ email, sourceId, archivedId }) => { + batch.map(async (pendingEmail) => { try { - return await this.createEmailDocumentFromRawForBatch( - email, - sourceId, - archivedId, - email.userEmail || '' - ); + const document = await this.indexEmailById(pendingEmail.archivedEmailId); + if (document) { + return document; + } + return null; } catch (error) { logger.error( { - emailId: archivedId, - sourceId, - userEmail: email.userEmail || '', - rawEmailData: JSON.stringify(email, null, 2), + emailId: pendingEmail.archivedEmailId, error: error instanceof Error ? error.message : String(error), }, 'Failed to create document for email in batch' @@ -118,10 +114,12 @@ export class IndexingService { ); for (const result of batchDocuments) { - if (result.status === 'fulfilled') { + if (result.status === 'fulfilled' && result.value) { rawDocuments.push(result.value); - } else { + } else if (result.status === 'rejected') { logger.error({ error: result.reason }, 'Failed to process email in batch'); + } else { + logger.error({ result: result }, 'Failed to process email in batch, reason unknown.'); } } } @@ -195,10 +193,7 @@ export class IndexingService { } } - /** - * @deprecated - */ - private async indexEmailById(emailId: string): Promise { + private async indexEmailById(emailId: string): Promise { const email = await this.dbService.db.query.archivedEmails.findFirst({ where: eq(archivedEmails.id, emailId), }); @@ -228,13 +223,13 @@ export class IndexingService { emailAttachmentsResult, email.userEmail ); - await this.searchService.addDocuments('emails', [document], 'id'); + return document; } /** * @deprecated */ - private async indexByEmail(pendingEmail: PendingEmail): Promise { + /* private async indexByEmail(pendingEmail: PendingEmail): Promise { const attachments: AttachmentsType = []; if (pendingEmail.email.attachments && pendingEmail.email.attachments.length > 0) { for (const attachment of pendingEmail.email.attachments) { @@ -254,12 +249,12 @@ export class IndexingService { ); // console.log(document); await this.searchService.addDocuments('emails', [document], 'id'); - } + } */ /** * Creates a search document from a raw email object and its attachments. */ - private async createEmailDocumentFromRawForBatch( + /* private async createEmailDocumentFromRawForBatch( email: EmailObject, ingestionSourceId: string, archivedEmailId: string, @@ -333,7 +328,7 @@ export class IndexingService { timestamp: new Date(email.receivedAt).getTime(), ingestionSourceId: ingestionSourceId, }; - } + } */ private async createEmailDocumentFromRaw( email: EmailObject, diff --git a/packages/backend/src/services/IngestionService.ts b/packages/backend/src/services/IngestionService.ts index e59708a..966b3bc 100644 --- a/packages/backend/src/services/IngestionService.ts +++ b/packages/backend/src/services/IngestionService.ts @@ -186,7 +186,7 @@ export class IngestionService { (key) => key !== 'providerConfig' && originalSource[key as keyof IngestionSource] !== - decryptedSource[key as keyof IngestionSource] + decryptedSource[key as keyof IngestionSource] ); if (changedFields.length > 0) { await this.auditService.createAuditLog({ @@ -518,12 +518,8 @@ export class IngestionService { } } - email.userEmail = userEmail; - return { - email, - sourceId: source.id, - archivedId: archivedEmail.id, + archivedEmailId: archivedEmail.id, }; } catch (error) { logger.error({ diff --git a/packages/backend/src/services/StorageService.ts b/packages/backend/src/services/StorageService.ts index 565a41a..7912cb5 100644 --- a/packages/backend/src/services/StorageService.ts +++ b/packages/backend/src/services/StorageService.ts @@ -81,6 +81,79 @@ export class StorageService implements IStorageProvider { return Readable.from(decryptedContent); } + public async getStream(path: string): Promise { + const stream = await this.provider.get(path); + if (!this.encryptionKey) { + return stream; + } + + // For encrypted files, we need to read the prefix and IV first. + // This part still buffers a small, fixed amount of data, which is acceptable. + const prefixAndIvBuffer = await new Promise((resolve, reject) => { + const chunks: Buffer[] = []; + let totalLength = 0; + const targetLength = ENCRYPTION_PREFIX.length + 16; + + const onData = (chunk: Buffer) => { + chunks.push(chunk); + totalLength += chunk.length; + if (totalLength >= targetLength) { + stream.removeListener('data', onData); + resolve(Buffer.concat(chunks)); + } + }; + + stream.on('data', onData); + stream.on('error', reject); + stream.on('end', () => { + // Handle cases where the file is smaller than the prefix + IV + if (totalLength < targetLength) { + resolve(Buffer.concat(chunks)); + } + }); + }); + + const prefix = prefixAndIvBuffer.subarray(0, ENCRYPTION_PREFIX.length); + if (!prefix.equals(ENCRYPTION_PREFIX)) { + // File is not encrypted, return a new stream containing the buffered prefix and the rest of the original stream + const combinedStream = new Readable({ + read() { }, + }); + combinedStream.push(prefixAndIvBuffer); + stream.on('data', (chunk) => { + combinedStream.push(chunk); + }); + stream.on('end', () => { + combinedStream.push(null); // No more data + }); + stream.on('error', (err) => { + combinedStream.emit('error', err); + }); + return combinedStream; + } + + try { + const iv = prefixAndIvBuffer.subarray( + ENCRYPTION_PREFIX.length, + ENCRYPTION_PREFIX.length + 16 + ); + const decipher = createDecipheriv(this.algorithm, this.encryptionKey, iv); + + // Push the remaining part of the initial buffer to the decipher + const remainingBuffer = prefixAndIvBuffer.subarray(ENCRYPTION_PREFIX.length + 16); + if (remainingBuffer.length > 0) { + decipher.write(remainingBuffer); + } + + // Pipe the rest of the stream + stream.pipe(decipher); + + return decipher; + } catch (error) { + throw new Error('Failed to decrypt file. It may be corrupted or the key is incorrect.'); + } + } + delete(path: string): Promise { return this.provider.delete(path); } diff --git a/packages/backend/src/services/ingestion-connectors/ImapConnector.ts b/packages/backend/src/services/ingestion-connectors/ImapConnector.ts index 56e8106..c60bad2 100644 --- a/packages/backend/src/services/ingestion-connectors/ImapConnector.ts +++ b/packages/backend/src/services/ingestion-connectors/ImapConnector.ts @@ -131,6 +131,7 @@ export class ImapConnector implements IEmailConnector { } catch (err: any) { logger.error({ err, attempt }, `IMAP operation failed on attempt ${attempt}`); this.isConnected = false; // Force reconnect on next attempt + this.client = this.createClient(); // Create a new client instance for the next retry if (attempt === maxRetries) { logger.error({ err }, 'IMAP operation failed after all retries.'); throw err; diff --git a/packages/backend/src/services/ingestion-connectors/MboxConnector.ts b/packages/backend/src/services/ingestion-connectors/MboxConnector.ts index 1af2596..f160ed7 100644 --- a/packages/backend/src/services/ingestion-connectors/MboxConnector.ts +++ b/packages/backend/src/services/ingestion-connectors/MboxConnector.ts @@ -10,9 +10,46 @@ import { simpleParser, ParsedMail, Attachment, AddressObject } from 'mailparser' import { logger } from '../../config/logger'; import { getThreadId } from './helpers/utils'; import { StorageService } from '../StorageService'; -import { Readable } from 'stream'; +import { Readable, Transform } from 'stream'; import { createHash } from 'crypto'; -import { streamToBuffer } from '../../helpers/streamToBuffer'; + +class MboxSplitter extends Transform { + private buffer: Buffer = Buffer.alloc(0); + private delimiter: Buffer = Buffer.from('\nFrom '); + private firstChunk: boolean = true; + + _transform(chunk: Buffer, encoding: string, callback: Function) { + if (this.firstChunk) { + // Check if the file starts with "From ". If not, prepend it to the first email. + if (chunk.subarray(0, 5).toString() !== 'From ') { + this.push(Buffer.from('From ')); + } + this.firstChunk = false; + } + + let currentBuffer = Buffer.concat([this.buffer, chunk]); + let position; + + while ((position = currentBuffer.indexOf(this.delimiter)) > -1) { + const email = currentBuffer.subarray(0, position); + if (email.length > 0) { + this.push(email); + } + // The next email starts with "From ", which is what the parser expects. + currentBuffer = currentBuffer.subarray(position + 1); + } + + this.buffer = currentBuffer; + callback(); + } + + _flush(callback: Function) { + if (this.buffer.length > 0) { + this.push(this.buffer); + } + callback(); + } +} export class MboxConnector implements IEmailConnector { private storage: StorageService; @@ -57,30 +94,15 @@ export class MboxConnector implements IEmailConnector { userEmail: string, syncState?: SyncState | null ): AsyncGenerator { + const fileStream = await this.storage.getStream(this.credentials.uploadedFilePath); + const mboxSplitter = new MboxSplitter(); + const emailStream = fileStream.pipe(mboxSplitter); + try { - const fileStream = await this.storage.get(this.credentials.uploadedFilePath); - const fileBuffer = await streamToBuffer(fileStream as Readable); - const mboxContent = fileBuffer.toString('utf-8'); - const emailDelimiter = '\nFrom '; - const emails = mboxContent.split(emailDelimiter); - - // The first split part might be empty or part of the first email's header, so we adjust. - if (emails.length > 0 && !mboxContent.startsWith('From ')) { - emails.shift(); // Adjust if the file doesn't start with "From " - } - - logger.info(`Found ${emails.length} potential emails in the mbox file.`); - let emailCount = 0; - - for (const email of emails) { + for await (const emailBuffer of emailStream) { try { - // Re-add the "From " delimiter for the parser, except for the very first email - const emailWithDelimiter = - emailCount > 0 || mboxContent.startsWith('From ') ? `From ${email}` : email; - const emailBuffer = Buffer.from(emailWithDelimiter, 'utf-8'); - const emailObject = await this.parseMessage(emailBuffer, ''); + const emailObject = await this.parseMessage(emailBuffer as Buffer, ''); yield emailObject; - emailCount++; } catch (error) { logger.error( { error, file: this.credentials.uploadedFilePath }, @@ -88,8 +110,31 @@ export class MboxConnector implements IEmailConnector { ); } } - logger.info(`Finished processing mbox file. Total emails processed: ${emailCount}`); } finally { + // Ensure all streams are properly closed before deleting the file. + if (fileStream instanceof Readable) { + fileStream.destroy(); + } + if (emailStream instanceof Readable) { + emailStream.destroy(); + } + // Wait for the streams to fully close to prevent race conditions with file deletion. + await new Promise((resolve) => { + if (fileStream instanceof Readable) { + fileStream.on('close', resolve); + } else { + resolve(true); + } + }); + + await new Promise((resolve) => { + if (emailStream instanceof Readable) { + emailStream.on('close', resolve); + } else { + resolve(true); + } + }); + try { await this.storage.delete(this.credentials.uploadedFilePath); } catch (error) { diff --git a/packages/backend/src/services/ingestion-connectors/PSTConnector.ts b/packages/backend/src/services/ingestion-connectors/PSTConnector.ts index 405d1b3..6964cee 100644 --- a/packages/backend/src/services/ingestion-connectors/PSTConnector.ts +++ b/packages/backend/src/services/ingestion-connectors/PSTConnector.ts @@ -13,15 +13,8 @@ import { getThreadId } from './helpers/utils'; import { StorageService } from '../StorageService'; import { Readable } from 'stream'; import { createHash } from 'crypto'; - -const streamToBuffer = (stream: Readable): Promise => { - return new Promise((resolve, reject) => { - const chunks: Buffer[] = []; - stream.on('data', (chunk) => chunks.push(chunk)); - stream.on('error', reject); - stream.on('end', () => resolve(Buffer.concat(chunks))); - }); -}; +import { join } from 'path'; +import { createWriteStream, promises as fs } from 'fs'; // We have to hardcode names for deleted and trash folders here as current lib doesn't support looking into PST properties. const DELETED_FOLDERS = new Set([ @@ -113,20 +106,25 @@ const JUNK_FOLDERS = new Set([ export class PSTConnector implements IEmailConnector { private storage: StorageService; - private pstFile: PSTFile | null = null; constructor(private credentials: PSTImportCredentials) { this.storage = new StorageService(); } - private async loadPstFile(): Promise { - if (this.pstFile) { - return this.pstFile; - } - const fileStream = await this.storage.get(this.credentials.uploadedFilePath); - const buffer = await streamToBuffer(fileStream as Readable); - this.pstFile = new PSTFile(buffer); - return this.pstFile; + private async loadPstFile(): Promise<{ pstFile: PSTFile; tempDir: string }> { + const fileStream = await this.storage.getStream(this.credentials.uploadedFilePath); + const tempDir = await fs.mkdtemp(join('/tmp', `pst-import-${new Date().getTime()}`)); + const tempFilePath = join(tempDir, 'temp.pst'); + + await new Promise((resolve, reject) => { + const dest = createWriteStream(tempFilePath); + fileStream.pipe(dest); + dest.on('finish', resolve); + dest.on('error', reject); + }); + + const pstFile = new PSTFile(tempFilePath); + return { pstFile, tempDir }; } public async testConnection(): Promise { @@ -156,8 +154,11 @@ export class PSTConnector implements IEmailConnector { */ public async *listAllUsers(): AsyncGenerator { let pstFile: PSTFile | null = null; + let tempDir: string | null = null; try { - pstFile = await this.loadPstFile(); + const loadResult = await this.loadPstFile(); + pstFile = loadResult.pstFile; + tempDir = loadResult.tempDir; const root = pstFile.getRootFolder(); const displayName: string = root.displayName || pstFile.pstFilename || String(new Date().getTime()); @@ -171,10 +172,12 @@ export class PSTConnector implements IEmailConnector { }; } catch (error) { logger.error({ error }, 'Failed to list users from PST file.'); - pstFile?.close(); throw error; } finally { pstFile?.close(); + if (tempDir) { + await fs.rm(tempDir, { recursive: true, force: true }); + } } } @@ -183,16 +186,21 @@ export class PSTConnector implements IEmailConnector { syncState?: SyncState | null ): AsyncGenerator { let pstFile: PSTFile | null = null; + let tempDir: string | null = null; try { - pstFile = await this.loadPstFile(); + const loadResult = await this.loadPstFile(); + pstFile = loadResult.pstFile; + tempDir = loadResult.tempDir; const root = pstFile.getRootFolder(); yield* this.processFolder(root, '', userEmail); } catch (error) { logger.error({ error }, 'Failed to fetch email.'); - pstFile?.close(); throw error; } finally { pstFile?.close(); + if (tempDir) { + await fs.rm(tempDir, { recursive: true, force: true }); + } try { await this.storage.delete(this.credentials.uploadedFilePath); } catch (error) { @@ -281,8 +289,8 @@ export class PSTConnector implements IEmailConnector { emlBuffer ?? Buffer.from(parsedEmail.text || parsedEmail.html || '', 'utf-8') ) .digest('hex')}-${createHash('sha256') - .update(emlBuffer ?? Buffer.from(msg.subject || '', 'utf-8')) - .digest('hex')}-${msg.clientSubmitTime?.getTime()}`; + .update(emlBuffer ?? Buffer.from(msg.subject || '', 'utf-8')) + .digest('hex')}-${msg.clientSubmitTime?.getTime()}`; } return { id: messageId, diff --git a/packages/frontend/src/routes/dashboard/admin/jobs/[queueName]/+page.svelte b/packages/frontend/src/routes/dashboard/admin/jobs/[queueName]/+page.svelte index 4896272..0f750aa 100644 --- a/packages/frontend/src/routes/dashboard/admin/jobs/[queueName]/+page.svelte +++ b/packages/frontend/src/routes/dashboard/admin/jobs/[queueName]/+page.svelte @@ -58,7 +58,7 @@ {$t('app.jobs.jobs')} -
+
{#each jobStatuses as status}