indexing service

This commit is contained in:
Wayne
2025-07-14 20:00:39 +03:00
parent a305bb5006
commit 497b7217c5
24 changed files with 322 additions and 32 deletions

View File

@@ -80,4 +80,4 @@ Please read our `CONTRIBUTING.md` file for more details on our code of conduct a
## License
This project is licensed under the MIT License - see the `LICENSE` file for details.
This project is licensed under the AGPL-3.0 License.

View File

@@ -4,9 +4,10 @@
"scripts": {
"dev": "dotenv -- pnpm --filter \"./packages/*\" --parallel dev",
"build": "pnpm --filter \"./packages/*\" --parallel build",
"start:workers": "dotenv -- pnpm --filter \"./packages/backend\" start:ingestion-worker && -- pnpm --filter \"./packages/backend\" start:indexing-worker"
"start:workers": "dotenv -- concurrently \"pnpm --filter @open-archive/backend start:ingestion-worker\" \"pnpm --filter @open-archive/backend start:indexing-worker\""
},
"devDependencies": {
"concurrently": "^9.2.0",
"dotenv-cli": "8.0.0",
"typescript": "5.8.3"
},

View File

@@ -0,0 +1 @@
1.15.2

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1 @@
7709d764-566c-4d16-bdd4-622f9e94474c

Binary file not shown.

Binary file not shown.

View File

@@ -31,13 +31,17 @@
"imapflow": "^1.0.191",
"jose": "^6.0.11",
"mailparser": "^3.7.4",
"mammoth": "^1.9.1",
"meilisearch": "^0.51.0",
"pdf2json": "^3.1.6",
"pg": "^8.16.3",
"pino": "^9.7.0",
"pino-pretty": "^13.0.0",
"postgres": "^3.4.7",
"reflect-metadata": "^0.2.2",
"sqlite3": "^5.1.7",
"tsconfig-paths": "^4.2.0"
"tsconfig-paths": "^4.2.0",
"xlsx": "^0.18.5"
},
"devDependencies": {
"@bull-board/api": "^6.11.0",

View File

@@ -15,6 +15,7 @@ export class StorageController {
try {
const fileExists = await this.storageService.exists(filePath);
if (!fileExists) {
console.log(filePath);
res.status(404).send('File not found');
return;
}

View File

@@ -1,7 +1,11 @@
import { storage } from './storage';
import { app } from './app';
import { searchConfig } from './search';
import { connection as redisConfig } from './redis';
export const config = {
storage,
app,
search: searchConfig,
redis: redisConfig,
};

View File

@@ -8,5 +8,7 @@ export const connection = {
port: (process.env.REDIS_PORT && parseInt(process.env.REDIS_PORT, 10)) || 6379,
password: process.env.REDIS_PASSWORD,
maxRetriesPerRequest: null,
tls: {} // Enable TLS for Upstash
tls: {
rejectUnauthorized: false
}
};

View File

@@ -0,0 +1,6 @@
import 'dotenv/config';
export const searchConfig = {
host: process.env.MEILI_HOST || 'http://127.0.0.1:7700',
apiKey: process.env.MEILI_MASTER_KEY || '',
};

View File

@@ -0,0 +1,8 @@
export function streamToBuffer(stream: NodeJS.ReadableStream): Promise<Buffer> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
stream.on('data', (chunk) => chunks.push(chunk));
stream.on('error', reject);
stream.on('end', () => resolve(Buffer.concat(chunks)));
});
}

View File

@@ -0,0 +1,68 @@
import PDFParser from 'pdf2json';
import mammoth from 'mammoth';
import xlsx from 'xlsx';
function extractTextFromPdf(buffer: Buffer): Promise<string> {
return new Promise((resolve, reject) => {
const pdfParser = new PDFParser(null, true);
pdfParser.on('pdfParser_dataError', (errData: any) =>
reject(new Error(errData.parserError))
);
pdfParser.on('pdfParser_dataReady', () => {
resolve(pdfParser.getRawTextContent());
});
pdfParser.parseBuffer(buffer);
});
}
export async function extractText(
buffer: Buffer,
mimeType: string
): Promise<string> {
try {
if (mimeType === 'application/pdf') {
return await extractTextFromPdf(buffer);
}
if (
mimeType ===
'application/vnd.openxmlformats-officedocument.wordprocessingml.document'
) {
const { value } = await mammoth.extractRawText({ buffer });
return value;
}
if (
mimeType ===
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
) {
const workbook = xlsx.read(buffer, { type: 'buffer' });
let fullText = '';
for (const sheetName of workbook.SheetNames) {
const sheet = workbook.Sheets[sheetName];
const sheetText = xlsx.utils.sheet_to_txt(sheet);
fullText += sheetText + '\n';
}
return fullText;
}
if (
mimeType.startsWith('text/') ||
mimeType === 'application/json' ||
mimeType === 'application/xml'
) {
return buffer.toString('utf-8');
}
} catch (error) {
console.error(
`Error extracting text from attachment with MIME type ${mimeType}:`,
error
);
return ''; // Return empty string on failure
}
console.warn(`Unsupported MIME type for text extraction: ${mimeType}`);
return ''; // Return empty string for unsupported types
}

View File

@@ -1,24 +1,15 @@
import { Job } from 'bullmq';
import { IndexingService } from '../../services/IndexingService';
import { SearchService } from '../../services/SearchService';
import { StorageService } from '../../services/StorageService';
// import { SearchService } from '../../services/SearchService';
import DatabaseService from '../../services/DatabaseService';
// const storageService = new StorageService();
// const searchService = new SearchService();
const searchService = new SearchService();
const storageService = new StorageService();
const indexingService = new IndexingService(DatabaseService, searchService, storageService);
interface IIndexEmailJob {
filePath: string;
ingestionSourceId: string;
export default async function (job: Job<{ emailId: string; }>) {
const { emailId } = job.data;
console.log(`Indexing email with ID: ${emailId}`);
await indexingService.indexEmailById(emailId);
}
export default async (job: Job<IIndexEmailJob>) => {
const { filePath, ingestionSourceId } = job.data;
console.log(`Processing index-email for file: ${filePath}`);
// TODO:
// 1. Read the email file from storage.
// 2. Parse the email content.
// 3. Index the email in the search engine.
// const emailContent = await storageService.get(filePath);
// await searchService.indexDocument(ingestionSourceId, emailContent);
};

View File

@@ -0,0 +1,7 @@
import { db } from '../database';
class DatabaseService {
public db = db;
}
export default new DatabaseService();

View File

@@ -0,0 +1,131 @@
import { Attachment } from '@open-archive/types';
import { SearchService } from './SearchService';
import { StorageService } from './StorageService';
import { extractText } from '../helpers/textExtractor';
import DatabaseService from './DatabaseService';
import { archivedEmails, attachments, emailAttachments } from '../database/schema';
import { eq } from 'drizzle-orm';
import { streamToBuffer } from '../helpers/streamToBuffer';
// Define the structure of the document to be indexed in Meilisearch
interface EmailDocument {
id: string; // The unique ID of the email
from: string;
to: string[];
cc: string[];
bcc: string[];
subject: string;
body: string;
attachments: {
filename: string;
content: string; // Extracted text from the attachment
}[];
timestamp: number;
// other metadata
}
interface DbRecipients {
to: { name: string; address: string; }[];
cc: { name: string; address: string; }[];
bcc: { name: string; address: string; }[];
}
export class IndexingService {
private dbService: typeof DatabaseService;
private searchService: SearchService;
private storageService: StorageService;
constructor(
dbService: typeof DatabaseService,
searchService: SearchService,
storageService: StorageService,
) {
this.dbService = dbService;
this.searchService = searchService;
this.storageService = storageService;
}
public async indexEmailById(emailId: string): Promise<void> {
const email = await this.dbService.db.query.archivedEmails.findFirst({
where: eq(archivedEmails.id, emailId),
});
if (!email) {
throw new Error(`Email with ID ${emailId} not found for indexing.`);
}
let emailAttachmentsResult: Attachment[] = [];
if (email.hasAttachments) {
emailAttachmentsResult = await this.dbService.db
.select({
id: attachments.id,
filename: attachments.filename,
mimeType: attachments.mimeType,
sizeBytes: attachments.sizeBytes,
contentHashSha256: attachments.contentHashSha256,
storagePath: attachments.storagePath,
})
.from(emailAttachments)
.innerJoin(attachments, eq(emailAttachments.attachmentId, attachments.id))
.where(eq(emailAttachments.emailId, emailId));
}
const document = await this.createEmailDocument(email, emailAttachmentsResult);
await this.searchService.addDocuments('emails', [document], 'id');
}
private async createEmailDocument(
email: typeof archivedEmails.$inferSelect,
attachments: Attachment[]
): Promise<EmailDocument> {
const attachmentContents = await this.extractAttachmentContents(attachments);
const emailBodyStream = await this.storageService.get(email.storagePath);
const emailBodyBuffer = await streamToBuffer(emailBodyStream);
const emailBodyText = await extractText(emailBodyBuffer, 'text/plain');
const recipients = email.recipients as DbRecipients;
return {
id: email.id,
from: email.senderEmail,
to: recipients.to?.map((r) => r.address) || [],
cc: recipients.cc?.map((r) => r.address) || [],
bcc: recipients.bcc?.map((r) => r.address) || [],
subject: email.subject || '',
body: emailBodyText,
attachments: attachmentContents,
timestamp: new Date(email.sentAt).getTime(),
};
}
private async extractAttachmentContents(
attachments: Attachment[]
): Promise<{ filename: string; content: string; }[]> {
const extractedAttachments = [];
for (const attachment of attachments) {
try {
const fileStream = await this.storageService.get(
attachment.storagePath
);
const fileBuffer = await streamToBuffer(fileStream);
const textContent = await extractText(
fileBuffer,
attachment.mimeType || ''
);
extractedAttachments.push({
filename: attachment.filename,
content: textContent,
});
} catch (error) {
console.error(
`Failed to extract text from attachment: ${attachment.filename}`,
error
);
// Decide on error handling: skip attachment or fail the job
}
}
return extractedAttachments;
}
}

View File

@@ -166,7 +166,7 @@ export class IngestionService {
storage: StorageService
): Promise<void> {
try {
console.log('processing email, ', email.id);
console.log('processing email, ', email.id, email.subject);
const emlBuffer = email.eml ?? Buffer.from(email.body, 'utf-8');
const emailHash = createHash('sha256').update(emlBuffer).digest('hex');
const emailPath = `email-archive/${source.name.replaceAll(' ', '-')}-${source.id}/emails/${email.id}.eml`;
@@ -223,12 +223,11 @@ export class IngestionService {
});
}
}
// Uncomment when indexing feature is done
// await indexingQueue.add('index-email', {
// filePath: emailPath,
// ingestionSourceId: source.id
// });
// adding to indexing queue
console.log('adding to indexing queue');
await indexingQueue.add('index-email', {
emailId: archivedEmail.id,
});
} catch (error) {
logger.error({
message: `Failed to process email ${email.id} for source ${source.id}`,

View File

@@ -0,0 +1,54 @@
import { Index, MeiliSearch } from 'meilisearch';
import { config } from '../config';
export class SearchService {
private client: MeiliSearch;
constructor() {
this.client = new MeiliSearch({
host: config.search.host,
apiKey: config.search.apiKey,
});
}
public async getIndex<T extends Record<string, any>>(name: string): Promise<Index<T>> {
return this.client.index<T>(name);
}
public async addDocuments<T extends Record<string, any>>(
indexName: string,
documents: T[],
primaryKey?: string
) {
const index = await this.getIndex<T>(indexName);
if (primaryKey) {
index.update({ primaryKey });
}
return index.addDocuments(documents);
}
public async search<T extends Record<string, any>>(indexName: string, query: string, options?: any) {
const index = await this.getIndex<T>(indexName);
return index.search(query, options);
}
// Add other methods as needed (e.g., deleteDocuments, updateSettings)
public async configureEmailIndex() {
const index = await this.getIndex('emails');
await index.updateSettings({
searchableAttributes: [
'subject',
'body',
'from',
'to',
'cc',
'bcc',
'attachments.filename',
'attachments.content',
],
filterableAttributes: ['from', 'to', 'cc', 'bcc', 'timestamp'],
sortableAttributes: ['timestamp'],
});
}
}

View File

@@ -2,7 +2,19 @@ import { Worker } from 'bullmq';
import { connection } from '../config/redis';
import indexEmailProcessor from '../jobs/processors/index-email.processor';
const worker = new Worker('indexing', indexEmailProcessor, { connection });
const processor = async (job: any) => {
switch (job.name) {
case 'index-email':
return indexEmailProcessor(job);
default:
throw new Error(`Unknown job name: ${job.name}`);
}
};
const worker = new Worker('indexing', processor, {
connection,
concurrency: 5
});
console.log('Indexing worker started');

View File

@@ -8,7 +8,7 @@
let { data }: { data: PageData } = $props();
const { email } = data;
console.log(email);
async function download(path: string, filename: string) {
if (!browser) return;