diff --git a/docs/api/ingestion.md b/docs/api/ingestion.md index 4071bc4..c860373 100644 --- a/docs/api/ingestion.md +++ b/docs/api/ingestion.md @@ -24,6 +24,35 @@ interface CreateIngestionSourceDto { } ``` +#### Example: Creating an Mbox Import Source with File Upload + +```json +{ + "name": "My Mbox Import", + "provider": "mbox_import", + "providerConfig": { + "type": "mbox_import", + "uploadedFileName": "emails.mbox", + "uploadedFilePath": "open-archiver/tmp/uuid-emails.mbox" + } +} +``` + +#### Example: Creating an Mbox Import Source with Local File Path + +```json +{ + "name": "My Mbox Import", + "provider": "mbox_import", + "providerConfig": { + "type": "mbox_import", + "localFilePath": "/path/to/emails.mbox" + } +} +``` + +**Note:** When using `localFilePath`, the file will not be deleted after import. When using `uploadedFilePath` (via the upload API), the file will be automatically deleted after import. The same applies to `pst_import` and `eml_import` providers. + #### Responses - **201 Created:** The newly created ingestion source. diff --git a/packages/backend/src/services/IngestionService.ts b/packages/backend/src/services/IngestionService.ts index 8f95bc2..00f83eb 100644 --- a/packages/backend/src/services/IngestionService.ts +++ b/packages/backend/src/services/IngestionService.ts @@ -219,7 +219,8 @@ export class IngestionService { if ( (source.credentials.type === 'pst_import' || - source.credentials.type === 'eml_import') && + source.credentials.type === 'eml_import' || + source.credentials.type === 'mbox_import') && source.credentials.uploadedFilePath && (await storage.exists(source.credentials.uploadedFilePath)) ) { diff --git a/packages/backend/src/services/ingestion-connectors/EMLConnector.ts b/packages/backend/src/services/ingestion-connectors/EMLConnector.ts index 4209ab3..022c256 100644 --- a/packages/backend/src/services/ingestion-connectors/EMLConnector.ts +++ b/packages/backend/src/services/ingestion-connectors/EMLConnector.ts @@ -32,17 +32,52 @@ export class EMLConnector implements IEmailConnector { this.storage = new StorageService(); } + private getFilePath(): string { + return this.credentials.localFilePath || this.credentials.uploadedFilePath || ''; + } + + private getDisplayName(): string { + if (this.credentials.uploadedFileName) { + return this.credentials.uploadedFileName; + } + if (this.credentials.localFilePath) { + const parts = this.credentials.localFilePath.split('/'); + return parts[parts.length - 1].replace('.zip', ''); + } + return `eml-import-${new Date().getTime()}`; + } + + private async getFileStream(): Promise { + if (this.credentials.localFilePath) { + return createReadStream(this.credentials.localFilePath); + } + return this.storage.get(this.getFilePath()); + } + public async testConnection(): Promise { try { - if (!this.credentials.uploadedFilePath) { + const filePath = this.getFilePath(); + if (!filePath) { throw Error('EML file path not provided.'); } - if (!this.credentials.uploadedFilePath.includes('.zip')) { + if (!filePath.includes('.zip')) { throw Error('Provided file is not in the ZIP format.'); } - const fileExist = await this.storage.exists(this.credentials.uploadedFilePath); + + let fileExist = false; + if (this.credentials.localFilePath) { + try { + await fs.access(this.credentials.localFilePath); + fileExist = true; + } catch { + fileExist = false; + } + } else { + fileExist = await this.storage.exists(filePath); + } + if (!fileExist) { - throw Error('EML file upload not finished yet, please wait.'); + throw Error('EML file not found or upload not finished yet, please wait.'); } return true; @@ -53,8 +88,7 @@ export class EMLConnector implements IEmailConnector { } public async *listAllUsers(): AsyncGenerator { - const displayName = - this.credentials.uploadedFileName || `eml-import-${new Date().getTime()}`; + const displayName = this.getDisplayName(); logger.info(`Found potential mailbox: ${displayName}`); const constructedPrimaryEmail = `${displayName.replace(/ /g, '.').toLowerCase()}@eml.local`; yield { @@ -68,7 +102,7 @@ export class EMLConnector implements IEmailConnector { userEmail: string, syncState?: SyncState | null ): AsyncGenerator { - const fileStream = await this.storage.get(this.credentials.uploadedFilePath); + const fileStream = await this.getFileStream(); const tempDir = await fs.mkdtemp(join('/tmp', `eml-import-${new Date().getTime()}`)); const unzippedPath = join(tempDir, 'unzipped'); await fs.mkdir(unzippedPath); @@ -115,13 +149,15 @@ export class EMLConnector implements IEmailConnector { throw error; } finally { await fs.rm(tempDir, { recursive: true, force: true }); - try { - await this.storage.delete(this.credentials.uploadedFilePath); - } catch (error) { - logger.error( - { error, file: this.credentials.uploadedFilePath }, - 'Failed to delete EML file after processing.' - ); + if (this.credentials.uploadedFilePath && !this.credentials.localFilePath) { + try { + await this.storage.delete(this.credentials.uploadedFilePath); + } catch (error) { + logger.error( + { error, file: this.credentials.uploadedFilePath }, + 'Failed to delete EML file after processing.' + ); + } } } } diff --git a/packages/backend/src/services/ingestion-connectors/MboxConnector.ts b/packages/backend/src/services/ingestion-connectors/MboxConnector.ts index fa03c42..dab5eec 100644 --- a/packages/backend/src/services/ingestion-connectors/MboxConnector.ts +++ b/packages/backend/src/services/ingestion-connectors/MboxConnector.ts @@ -12,6 +12,7 @@ import { getThreadId } from './helpers/utils'; import { StorageService } from '../StorageService'; import { Readable, Transform } from 'stream'; import { createHash } from 'crypto'; +import { promises as fs, createReadStream } from 'fs'; class MboxSplitter extends Transform { private buffer: Buffer = Buffer.alloc(0); @@ -60,15 +61,28 @@ export class MboxConnector implements IEmailConnector { public async testConnection(): Promise { try { - if (!this.credentials.uploadedFilePath) { + const filePath = this.getFilePath(); + if (!filePath) { throw Error('Mbox file path not provided.'); } - if (!this.credentials.uploadedFilePath.includes('.mbox')) { + if (!filePath.includes('.mbox')) { throw Error('Provided file is not in the MBOX format.'); } - const fileExist = await this.storage.exists(this.credentials.uploadedFilePath); + + let fileExist = false; + if (this.credentials.localFilePath) { + try { + await fs.access(this.credentials.localFilePath); + fileExist = true; + } catch { + fileExist = false; + } + } else { + fileExist = await this.storage.exists(filePath); + } + if (!fileExist) { - throw Error('Mbox file upload not finished yet, please wait.'); + throw Error('Mbox file not found or upload not finished yet, please wait.'); } return true; @@ -78,9 +92,19 @@ export class MboxConnector implements IEmailConnector { } } + private getFilePath(): string { + return this.credentials.localFilePath || this.credentials.uploadedFilePath || ''; + } + + private async getFileStream(): Promise { + if (this.credentials.localFilePath) { + return createReadStream(this.credentials.localFilePath); + } + return this.storage.getStream(this.getFilePath()); + } + public async *listAllUsers(): AsyncGenerator { - const displayName = - this.credentials.uploadedFileName || `mbox-import-${new Date().getTime()}`; + const displayName = this.getDisplayName(); logger.info(`Found potential mailbox: ${displayName}`); const constructedPrimaryEmail = `${displayName.replace(/ /g, '.').toLowerCase()}@mbox.local`; yield { @@ -90,11 +114,23 @@ export class MboxConnector implements IEmailConnector { }; } + private getDisplayName(): string { + if (this.credentials.uploadedFileName) { + return this.credentials.uploadedFileName; + } + if (this.credentials.localFilePath) { + const parts = this.credentials.localFilePath.split('/'); + return parts[parts.length - 1].replace('.mbox', ''); + } + return `mbox-import-${new Date().getTime()}`; + } + public async *fetchEmails( userEmail: string, syncState?: SyncState | null ): AsyncGenerator { - const fileStream = await this.storage.getStream(this.credentials.uploadedFilePath); + const filePath = this.getFilePath(); + const fileStream = await this.getFileStream(); const mboxSplitter = new MboxSplitter(); const emailStream = fileStream.pipe(mboxSplitter); @@ -104,22 +140,21 @@ export class MboxConnector implements IEmailConnector { yield emailObject; } catch (error) { logger.error( - { error, file: this.credentials.uploadedFilePath }, + { error, file: filePath }, 'Failed to process a single message from mbox file. Skipping.' ); } } - // After the stream is fully consumed, delete the file. - // The `for await...of` loop ensures streams are properly closed on completion, - // so we can safely delete the file here without causing a hang. - try { - await this.storage.delete(this.credentials.uploadedFilePath); - } catch (error) { - logger.error( - { error, file: this.credentials.uploadedFilePath }, - 'Failed to delete mbox file after processing.' - ); + if (this.credentials.uploadedFilePath && !this.credentials.localFilePath) { + try { + await this.storage.delete(filePath); + } catch (error) { + logger.error( + { error, file: filePath }, + 'Failed to delete mbox file after processing.' + ); + } } } diff --git a/packages/backend/src/services/ingestion-connectors/PSTConnector.ts b/packages/backend/src/services/ingestion-connectors/PSTConnector.ts index d843423..d0f6171 100644 --- a/packages/backend/src/services/ingestion-connectors/PSTConnector.ts +++ b/packages/backend/src/services/ingestion-connectors/PSTConnector.ts @@ -14,7 +14,7 @@ import { StorageService } from '../StorageService'; import { Readable } from 'stream'; import { createHash } from 'crypto'; import { join } from 'path'; -import { createWriteStream, promises as fs } from 'fs'; +import { createWriteStream, createReadStream, promises as fs } from 'fs'; // We have to hardcode names for deleted and trash folders here as current lib doesn't support looking into PST properties. const DELETED_FOLDERS = new Set([ @@ -111,8 +111,19 @@ export class PSTConnector implements IEmailConnector { this.storage = new StorageService(); } + private getFilePath(): string { + return this.credentials.localFilePath || this.credentials.uploadedFilePath || ''; + } + + private async getFileStream(): Promise { + if (this.credentials.localFilePath) { + return createReadStream(this.credentials.localFilePath); + } + return this.storage.getStream(this.getFilePath()); + } + private async loadPstFile(): Promise<{ pstFile: PSTFile; tempDir: string }> { - const fileStream = await this.storage.getStream(this.credentials.uploadedFilePath); + const fileStream = await this.getFileStream(); const tempDir = await fs.mkdtemp(join('/tmp', `pst-import-${new Date().getTime()}`)); const tempFilePath = join(tempDir, 'temp.pst'); @@ -129,15 +140,28 @@ export class PSTConnector implements IEmailConnector { public async testConnection(): Promise { try { - if (!this.credentials.uploadedFilePath) { + const filePath = this.getFilePath(); + if (!filePath) { throw Error('PST file path not provided.'); } - if (!this.credentials.uploadedFilePath.includes('.pst')) { + if (!filePath.includes('.pst')) { throw Error('Provided file is not in the PST format.'); } - const fileExist = await this.storage.exists(this.credentials.uploadedFilePath); + + let fileExist = false; + if (this.credentials.localFilePath) { + try { + await fs.access(this.credentials.localFilePath); + fileExist = true; + } catch { + fileExist = false; + } + } else { + fileExist = await this.storage.exists(filePath); + } + if (!fileExist) { - throw Error('PST file upload not finished yet, please wait.'); + throw Error('PST file not found or upload not finished yet, please wait.'); } return true; } catch (error) { @@ -200,13 +224,15 @@ export class PSTConnector implements IEmailConnector { if (tempDir) { await fs.rm(tempDir, { recursive: true, force: true }); } - try { - await this.storage.delete(this.credentials.uploadedFilePath); - } catch (error) { - logger.error( - { error, file: this.credentials.uploadedFilePath }, - 'Failed to delete PST file after processing.' - ); + if (this.credentials.uploadedFilePath && !this.credentials.localFilePath) { + try { + await this.storage.delete(this.credentials.uploadedFilePath); + } catch (error) { + logger.error( + { error, file: this.credentials.uploadedFilePath }, + 'Failed to delete PST file after processing.' + ); + } } } } diff --git a/packages/types/src/ingestion.types.ts b/packages/types/src/ingestion.types.ts index 83256f9..40ca2c6 100644 --- a/packages/types/src/ingestion.types.ts +++ b/packages/types/src/ingestion.types.ts @@ -72,20 +72,23 @@ export interface Microsoft365Credentials extends BaseIngestionCredentials { export interface PSTImportCredentials extends BaseIngestionCredentials { type: 'pst_import'; - uploadedFileName: string; - uploadedFilePath: string; + uploadedFileName?: string; + uploadedFilePath?: string; + localFilePath?: string; } export interface EMLImportCredentials extends BaseIngestionCredentials { type: 'eml_import'; - uploadedFileName: string; - uploadedFilePath: string; + uploadedFileName?: string; + uploadedFilePath?: string; + localFilePath?: string; } export interface MboxImportCredentials extends BaseIngestionCredentials { type: 'mbox_import'; - uploadedFileName: string; - uploadedFilePath: string; + uploadedFileName?: string; + uploadedFilePath?: string; + localFilePath?: string; } // Discriminated union for all possible credential types