Allow specifying local file path for mbox/eml/pst (#214)

* Add agents AI doc

* Allow local file path for Mbox file ingestion


---------

Co-authored-by: Wei S. <5291640+wayneshn@users.noreply.github.com>
This commit is contained in:
Jan Berdajs
2026-02-23 14:40:31 +01:00
committed by GitHub
parent b4cd23b27e
commit 7551d4d7c7
6 changed files with 182 additions and 52 deletions

View File

@@ -24,6 +24,35 @@ interface CreateIngestionSourceDto {
}
```
#### Example: Creating an Mbox Import Source with File Upload
```json
{
"name": "My Mbox Import",
"provider": "mbox_import",
"providerConfig": {
"type": "mbox_import",
"uploadedFileName": "emails.mbox",
"uploadedFilePath": "open-archiver/tmp/uuid-emails.mbox"
}
}
```
#### Example: Creating an Mbox Import Source with Local File Path
```json
{
"name": "My Mbox Import",
"provider": "mbox_import",
"providerConfig": {
"type": "mbox_import",
"localFilePath": "/path/to/emails.mbox"
}
}
```
**Note:** When using `localFilePath`, the file will not be deleted after import. When using `uploadedFilePath` (via the upload API), the file will be automatically deleted after import. The same applies to `pst_import` and `eml_import` providers.
#### Responses
- **201 Created:** The newly created ingestion source.

View File

@@ -219,7 +219,8 @@ export class IngestionService {
if (
(source.credentials.type === 'pst_import' ||
source.credentials.type === 'eml_import') &&
source.credentials.type === 'eml_import' ||
source.credentials.type === 'mbox_import') &&
source.credentials.uploadedFilePath &&
(await storage.exists(source.credentials.uploadedFilePath))
) {

View File

@@ -32,17 +32,52 @@ export class EMLConnector implements IEmailConnector {
this.storage = new StorageService();
}
private getFilePath(): string {
return this.credentials.localFilePath || this.credentials.uploadedFilePath || '';
}
private getDisplayName(): string {
if (this.credentials.uploadedFileName) {
return this.credentials.uploadedFileName;
}
if (this.credentials.localFilePath) {
const parts = this.credentials.localFilePath.split('/');
return parts[parts.length - 1].replace('.zip', '');
}
return `eml-import-${new Date().getTime()}`;
}
private async getFileStream(): Promise<NodeJS.ReadableStream> {
if (this.credentials.localFilePath) {
return createReadStream(this.credentials.localFilePath);
}
return this.storage.get(this.getFilePath());
}
public async testConnection(): Promise<boolean> {
try {
if (!this.credentials.uploadedFilePath) {
const filePath = this.getFilePath();
if (!filePath) {
throw Error('EML file path not provided.');
}
if (!this.credentials.uploadedFilePath.includes('.zip')) {
if (!filePath.includes('.zip')) {
throw Error('Provided file is not in the ZIP format.');
}
const fileExist = await this.storage.exists(this.credentials.uploadedFilePath);
let fileExist = false;
if (this.credentials.localFilePath) {
try {
await fs.access(this.credentials.localFilePath);
fileExist = true;
} catch {
fileExist = false;
}
} else {
fileExist = await this.storage.exists(filePath);
}
if (!fileExist) {
throw Error('EML file upload not finished yet, please wait.');
throw Error('EML file not found or upload not finished yet, please wait.');
}
return true;
@@ -53,8 +88,7 @@ export class EMLConnector implements IEmailConnector {
}
public async *listAllUsers(): AsyncGenerator<MailboxUser> {
const displayName =
this.credentials.uploadedFileName || `eml-import-${new Date().getTime()}`;
const displayName = this.getDisplayName();
logger.info(`Found potential mailbox: ${displayName}`);
const constructedPrimaryEmail = `${displayName.replace(/ /g, '.').toLowerCase()}@eml.local`;
yield {
@@ -68,7 +102,7 @@ export class EMLConnector implements IEmailConnector {
userEmail: string,
syncState?: SyncState | null
): AsyncGenerator<EmailObject | null> {
const fileStream = await this.storage.get(this.credentials.uploadedFilePath);
const fileStream = await this.getFileStream();
const tempDir = await fs.mkdtemp(join('/tmp', `eml-import-${new Date().getTime()}`));
const unzippedPath = join(tempDir, 'unzipped');
await fs.mkdir(unzippedPath);
@@ -115,13 +149,15 @@ export class EMLConnector implements IEmailConnector {
throw error;
} finally {
await fs.rm(tempDir, { recursive: true, force: true });
try {
await this.storage.delete(this.credentials.uploadedFilePath);
} catch (error) {
logger.error(
{ error, file: this.credentials.uploadedFilePath },
'Failed to delete EML file after processing.'
);
if (this.credentials.uploadedFilePath && !this.credentials.localFilePath) {
try {
await this.storage.delete(this.credentials.uploadedFilePath);
} catch (error) {
logger.error(
{ error, file: this.credentials.uploadedFilePath },
'Failed to delete EML file after processing.'
);
}
}
}
}

View File

@@ -12,6 +12,7 @@ import { getThreadId } from './helpers/utils';
import { StorageService } from '../StorageService';
import { Readable, Transform } from 'stream';
import { createHash } from 'crypto';
import { promises as fs, createReadStream } from 'fs';
class MboxSplitter extends Transform {
private buffer: Buffer = Buffer.alloc(0);
@@ -60,15 +61,28 @@ export class MboxConnector implements IEmailConnector {
public async testConnection(): Promise<boolean> {
try {
if (!this.credentials.uploadedFilePath) {
const filePath = this.getFilePath();
if (!filePath) {
throw Error('Mbox file path not provided.');
}
if (!this.credentials.uploadedFilePath.includes('.mbox')) {
if (!filePath.includes('.mbox')) {
throw Error('Provided file is not in the MBOX format.');
}
const fileExist = await this.storage.exists(this.credentials.uploadedFilePath);
let fileExist = false;
if (this.credentials.localFilePath) {
try {
await fs.access(this.credentials.localFilePath);
fileExist = true;
} catch {
fileExist = false;
}
} else {
fileExist = await this.storage.exists(filePath);
}
if (!fileExist) {
throw Error('Mbox file upload not finished yet, please wait.');
throw Error('Mbox file not found or upload not finished yet, please wait.');
}
return true;
@@ -78,9 +92,19 @@ export class MboxConnector implements IEmailConnector {
}
}
private getFilePath(): string {
return this.credentials.localFilePath || this.credentials.uploadedFilePath || '';
}
private async getFileStream(): Promise<NodeJS.ReadableStream> {
if (this.credentials.localFilePath) {
return createReadStream(this.credentials.localFilePath);
}
return this.storage.getStream(this.getFilePath());
}
public async *listAllUsers(): AsyncGenerator<MailboxUser> {
const displayName =
this.credentials.uploadedFileName || `mbox-import-${new Date().getTime()}`;
const displayName = this.getDisplayName();
logger.info(`Found potential mailbox: ${displayName}`);
const constructedPrimaryEmail = `${displayName.replace(/ /g, '.').toLowerCase()}@mbox.local`;
yield {
@@ -90,11 +114,23 @@ export class MboxConnector implements IEmailConnector {
};
}
private getDisplayName(): string {
if (this.credentials.uploadedFileName) {
return this.credentials.uploadedFileName;
}
if (this.credentials.localFilePath) {
const parts = this.credentials.localFilePath.split('/');
return parts[parts.length - 1].replace('.mbox', '');
}
return `mbox-import-${new Date().getTime()}`;
}
public async *fetchEmails(
userEmail: string,
syncState?: SyncState | null
): AsyncGenerator<EmailObject | null> {
const fileStream = await this.storage.getStream(this.credentials.uploadedFilePath);
const filePath = this.getFilePath();
const fileStream = await this.getFileStream();
const mboxSplitter = new MboxSplitter();
const emailStream = fileStream.pipe(mboxSplitter);
@@ -104,22 +140,21 @@ export class MboxConnector implements IEmailConnector {
yield emailObject;
} catch (error) {
logger.error(
{ error, file: this.credentials.uploadedFilePath },
{ error, file: filePath },
'Failed to process a single message from mbox file. Skipping.'
);
}
}
// After the stream is fully consumed, delete the file.
// The `for await...of` loop ensures streams are properly closed on completion,
// so we can safely delete the file here without causing a hang.
try {
await this.storage.delete(this.credentials.uploadedFilePath);
} catch (error) {
logger.error(
{ error, file: this.credentials.uploadedFilePath },
'Failed to delete mbox file after processing.'
);
if (this.credentials.uploadedFilePath && !this.credentials.localFilePath) {
try {
await this.storage.delete(filePath);
} catch (error) {
logger.error(
{ error, file: filePath },
'Failed to delete mbox file after processing.'
);
}
}
}

View File

@@ -14,7 +14,7 @@ import { StorageService } from '../StorageService';
import { Readable } from 'stream';
import { createHash } from 'crypto';
import { join } from 'path';
import { createWriteStream, promises as fs } from 'fs';
import { createWriteStream, createReadStream, promises as fs } from 'fs';
// We have to hardcode names for deleted and trash folders here as current lib doesn't support looking into PST properties.
const DELETED_FOLDERS = new Set([
@@ -111,8 +111,19 @@ export class PSTConnector implements IEmailConnector {
this.storage = new StorageService();
}
private getFilePath(): string {
return this.credentials.localFilePath || this.credentials.uploadedFilePath || '';
}
private async getFileStream(): Promise<NodeJS.ReadableStream> {
if (this.credentials.localFilePath) {
return createReadStream(this.credentials.localFilePath);
}
return this.storage.getStream(this.getFilePath());
}
private async loadPstFile(): Promise<{ pstFile: PSTFile; tempDir: string }> {
const fileStream = await this.storage.getStream(this.credentials.uploadedFilePath);
const fileStream = await this.getFileStream();
const tempDir = await fs.mkdtemp(join('/tmp', `pst-import-${new Date().getTime()}`));
const tempFilePath = join(tempDir, 'temp.pst');
@@ -129,15 +140,28 @@ export class PSTConnector implements IEmailConnector {
public async testConnection(): Promise<boolean> {
try {
if (!this.credentials.uploadedFilePath) {
const filePath = this.getFilePath();
if (!filePath) {
throw Error('PST file path not provided.');
}
if (!this.credentials.uploadedFilePath.includes('.pst')) {
if (!filePath.includes('.pst')) {
throw Error('Provided file is not in the PST format.');
}
const fileExist = await this.storage.exists(this.credentials.uploadedFilePath);
let fileExist = false;
if (this.credentials.localFilePath) {
try {
await fs.access(this.credentials.localFilePath);
fileExist = true;
} catch {
fileExist = false;
}
} else {
fileExist = await this.storage.exists(filePath);
}
if (!fileExist) {
throw Error('PST file upload not finished yet, please wait.');
throw Error('PST file not found or upload not finished yet, please wait.');
}
return true;
} catch (error) {
@@ -200,13 +224,15 @@ export class PSTConnector implements IEmailConnector {
if (tempDir) {
await fs.rm(tempDir, { recursive: true, force: true });
}
try {
await this.storage.delete(this.credentials.uploadedFilePath);
} catch (error) {
logger.error(
{ error, file: this.credentials.uploadedFilePath },
'Failed to delete PST file after processing.'
);
if (this.credentials.uploadedFilePath && !this.credentials.localFilePath) {
try {
await this.storage.delete(this.credentials.uploadedFilePath);
} catch (error) {
logger.error(
{ error, file: this.credentials.uploadedFilePath },
'Failed to delete PST file after processing.'
);
}
}
}
}

View File

@@ -72,20 +72,23 @@ export interface Microsoft365Credentials extends BaseIngestionCredentials {
export interface PSTImportCredentials extends BaseIngestionCredentials {
type: 'pst_import';
uploadedFileName: string;
uploadedFilePath: string;
uploadedFileName?: string;
uploadedFilePath?: string;
localFilePath?: string;
}
export interface EMLImportCredentials extends BaseIngestionCredentials {
type: 'eml_import';
uploadedFileName: string;
uploadedFilePath: string;
uploadedFileName?: string;
uploadedFilePath?: string;
localFilePath?: string;
}
export interface MboxImportCredentials extends BaseIngestionCredentials {
type: 'mbox_import';
uploadedFileName: string;
uploadedFilePath: string;
uploadedFileName?: string;
uploadedFilePath?: string;
localFilePath?: string;
}
// Discriminated union for all possible credential types