Compare commits

..

1 Commits

Author SHA1 Message Date
Wayne
2bfec79cd2 feat: Add Mbox ingestion
This commit introduces two major features:

1.  **Mbox File Ingestion:**
    Users can now ingest emails from Mbox files (`.mbox`). A new Mbox connector has been implemented on the backend, and the user interface has been updated to support creating Mbox ingestion sources. Documentation for this new provider has also been added.

Additionally, this commit includes new documentation for upgrading and migrating Open Archiver.
2025-09-16 20:18:11 +03:00
23 changed files with 808 additions and 988 deletions

View File

@@ -19,8 +19,7 @@ DATABASE_URL="postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres:5432/$
# Meilisearch
MEILI_MASTER_KEY=aSampleMasterKey
MEILI_HOST=http://meilisearch:7700
# The number of emails to batch together for indexing. Defaults to 500.
MEILI_INDEXING_BATCH=500
# Redis (We use Valkey, which is Redis-compatible and open source)
@@ -61,8 +60,6 @@ RATE_LIMIT_WINDOW_MS=60000
# The maximum number of API requests allowed from an IP within the window. Defaults to 100.
RATE_LIMIT_MAX_REQUESTS=100
# JWT
# IMPORTANT: Change this to a long, random, and secret string in your .env file
JWT_SECRET=a-very-secret-key-that-you-should-change
@@ -73,9 +70,3 @@ JWT_EXPIRES_IN="7d"
# IMPORTANT: Generate a secure, random 32-byte hex string for this
# You can use `openssl rand -hex 32` to generate a key.
ENCRYPTION_KEY=
# Apache Tika Integration
# ONLY active if TIKA_URL is set
TIKA_URL=http://tika:9998

View File

@@ -1,32 +0,0 @@
---
name: Bug report
about: Create a report to help us improve
title: ''
labels: bug
assignees: ''
---
**Describe the bug**
A clear and concise description of what the bug is.
**To Reproduce**
Steps to reproduce the behavior:
1. Go to '...'
2. Click on '....'
5. See error
**Expected behavior**
A clear and concise description of what you expected to happen.
**Screenshots**
If applicable, add screenshots to help explain your problem.
**System:**
- Open Archiver Version:
**Relevant logs:**
Any relevant logs (Redact sensitive information)
**Additional context**
Add any other context about the problem here.

View File

@@ -1,20 +0,0 @@
---
name: Feature request
about: Suggest an idea for this project
title: ''
labels: enhancement
assignees: ''
---
**Is your feature request related to a problem? Please describe.**
A clear and concise description of what the problem is.
**Describe the solution you'd like**
A clear and concise description of what you want to happen.
**Describe alternatives you've considered**
A clear and concise description of any alternative solutions or features you've considered.
**Additional context**
Add any other context or screenshots about the feature request here.

View File

@@ -52,13 +52,6 @@ services:
networks:
- open-archiver-net
tika:
image: apache/tika:3.2.2.0-full
container_name: tika
restart: always
networks:
- open-archiver-net
volumes:
pgdata:
driver: local

View File

@@ -100,7 +100,6 @@ export default defineConfig({
items: [
{ text: 'Overview', link: '/services/' },
{ text: 'Storage Service', link: '/services/storage-service' },
{ text: 'OCR Service', link: '/services/ocr-service' },
{
text: 'IAM Service',
items: [{ text: 'IAM Policies', link: '/services/iam-service/iam-policy' }],

View File

@@ -0,0 +1,289 @@
# IAM Policies
This document provides a guide to creating and managing IAM policies in Open Archiver. It is intended for developers and administrators who need to configure granular access control for users and roles.
## Policy Structure
IAM policies are defined as an array of JSON objects, where each object represents a single permission rule. The structure of a policy object is as follows:
```json
{
"action": "read" OR ["read", "create"],
"subject": "ingestion" OR ["ingestion", "dashboard"],
"conditions": {
"field_name": "value"
},
"inverted": false OR true,
}
```
- `action`: The action(s) to be performed on the subject. Can be a single string or an array of strings.
- `subject`: The resource(s) or entity on which the action is to be performed. Can be a single string or an array of strings.
- `conditions`: (Optional) A set of conditions that must be met for the permission to be granted.
- `inverted`: (Optional) When set to `true`, this inverts the rule, turning it from a "can" rule into a "cannot" rule. This is useful for creating exceptions to broader permissions.
## Actions
The following actions are available for use in IAM policies:
- `manage`: A wildcard action that grants all permissions on a subject (`create`, `read`, `update`, `delete`, `search`, `sync`).
- `create`: Allows the user to create a new resource.
- `read`: Allows the user to view a resource.
- `update`: Allows the user to modify an existing resource.
- `delete`: Allows the user to delete a resource.
- `search`: Allows the user to search for resources.
- `sync`: Allows the user to synchronize a resource.
## Subjects
The following subjects are available for use in IAM policies:
- `all`: A wildcard subject that represents all resources.
- `archive`: Represents archived emails.
- `ingestion`: Represents ingestion sources.
- `settings`: Represents system settings.
- `users`: Represents user accounts.
- `roles`: Represents user roles.
- `dashboard`: Represents the dashboard.
## Advanced Conditions with MongoDB-Style Queries
Conditions are the key to creating fine-grained access control rules. They are defined as a JSON object where each key represents a field on the subject, and the value defines the criteria for that field.
All conditions within a single rule are implicitly joined with an **AND** logic. This means that for a permission to be granted, the resource must satisfy _all_ specified conditions.
The power of this system comes from its use of a subset of [MongoDB's query language](https://www.mongodb.com/docs/manual/), which provides a flexible and expressive way to define complex rules. These rules are translated into native queries for both the PostgreSQL database (via Drizzle ORM) and the Meilisearch engine.
### Supported Operators and Examples
Here is a detailed breakdown of the supported operators with examples.
#### `$eq` (Equal)
This is the default operator. If you provide a simple key-value pair, it is treated as an equality check.
```json
// This rule...
{ "status": "active" }
// ...is equivalent to this:
{ "status": { "$eq": "active" } }
```
**Use Case**: Grant access to an ingestion source only if its status is `active`.
#### `$ne` (Not Equal)
Matches documents where the field value is not equal to the specified value.
```json
{ "provider": { "$ne": "pst_import" } }
```
**Use Case**: Allow a user to see all ingestion sources except for PST imports.
#### `$in` (In Array)
Matches documents where the field value is one of the values in the specified array.
```json
{
"id": {
"$in": ["INGESTION_ID_1", "INGESTION_ID_2"]
}
}
```
**Use Case**: Grant an auditor access to a specific list of ingestion sources.
#### `$nin` (Not In Array)
Matches documents where the field value is not one of the values in the specified array.
```json
{ "provider": { "$nin": ["pst_import", "eml_import"] } }
```
**Use Case**: Hide all manual import sources from a specific user role.
#### `$lt` / `$lte` (Less Than / Less Than or Equal)
Matches documents where the field value is less than (`$lt`) or less than or equal to (`$lte`) the specified value. This is useful for numeric or date-based comparisons.
```json
{ "sentAt": { "$lt": "2024-01-01T00:00:00.000Z" } }
```
#### `$gt` / `$gte` (Greater Than / Greater Than or Equal)
Matches documents where the field value is greater than (`$gt`) or greater than or equal to (`$gte`) the specified value.
```json
{ "sentAt": { "$lt": "2024-01-01T00:00:00.000Z" } }
```
#### `$exists`
Matches documents that have (or do not have) the specified field.
```json
// Grant access only if a 'lastSyncStatusMessage' exists
{ "lastSyncStatusMessage": { "$exists": true } }
```
## Inverted Rules: Creating Exceptions with `cannot`
By default, all rules are "can" rules, meaning they grant permissions. However, you can create a "cannot" rule by adding `"inverted": true` to a policy object. This is extremely useful for creating exceptions to broader permissions.
A common pattern is to grant broad access and then use an inverted rule to carve out a specific restriction.
**Use Case**: Grant a user access to all ingestion sources _except_ for one specific source.
This is achieved with two rules:
1. A "can" rule that grants `read` access to the `ingestion` subject.
2. An inverted "cannot" rule that denies `read` access for the specific ingestion `id`.
```json
[
{
"action": "read",
"subject": "ingestion"
},
{
"inverted": true,
"action": "read",
"subject": "ingestion",
"conditions": {
"id": "SPECIFIC_INGESTION_ID_TO_EXCLUDE"
}
}
]
```
## Policy Evaluation Logic
The system evaluates policies by combining all relevant rules for a user. The logic is simple:
- A user has permission if at least one `can` rule allows it.
- A permission is denied if a `cannot` (`"inverted": true`) rule explicitly forbids it, even if a `can` rule allows it. `cannot` rules always take precedence.
### Dynamic Policies with Placeholders
To create dynamic policies that are specific to the current user, you can use the `${user.id}` placeholder in the `conditions` object. This placeholder will be replaced with the ID of the current user at runtime.
## Special Permissions for User and Role Management
It is important to note that while `read` access to `users` and `roles` can be granted granularly, any actions that modify these resources (`create`, `update`, `delete`) are restricted to Super Admins.
A user must have the `{ "action": "manage", "subject": "all" }` permission (Typically a Super Admin role) to manage users and roles. This is a security measure to prevent unauthorized changes to user accounts and permissions.
## Policy Examples
Here are several examples based on the default roles in the system, demonstrating how to combine actions, subjects, and conditions to achieve specific access control scenarios.
### Administrator
This policy grants a user full access to all resources using wildcards.
```json
[
{
"action": "manage",
"subject": "all"
}
]
```
### End-User
This policy allows a user to view the dashboard, create new ingestion sources, and fully manage the ingestion sources they own.
```json
[
{
"action": "read",
"subject": "dashboard"
},
{
"action": "create",
"subject": "ingestion"
},
{
"action": "manage",
"subject": "ingestion",
"conditions": {
"userId": "${user.id}"
}
},
{
"action": "manage",
"subject": "archive",
"conditions": {
"ingestionSource.userId": "${user.id}" // also needs to give permission to archived emails created by the user
}
}
]
```
### Global Read-Only Auditor
This policy grants read and search access across most of the application's resources, making it suitable for an auditor who needs to view data without modifying it.
```json
[
{
"action": ["read", "search"],
"subject": ["ingestion", "archive", "dashboard", "users", "roles"]
}
]
```
### Ingestion Admin
This policy grants full control over all ingestion sources and archives, but no other resources.
```json
[
{
"action": "manage",
"subject": "ingestion"
}
]
```
### Auditor for Specific Ingestion Sources
This policy demonstrates how to grant access to a specific list of ingestion sources using the `$in` operator.
```json
[
{
"action": ["read", "search"],
"subject": "ingestion",
"conditions": {
"id": {
"$in": ["INGESTION_ID_1", "INGESTION_ID_2"]
}
}
}
]
```
### Limit Access to a Specific Mailbox
This policy grants a user access to a specific ingestion source, but only allows them to see emails belonging to a single user within that source.
This is achieved by defining two specific `can` rules: The rule grants `read` and `search` access to the `archive` subject, but the `userEmail` must match.
```json
[
{
"action": ["read", "search"],
"subject": "archive",
"conditions": {
"userEmail": "user1@example.com"
}
}
]
```

View File

@@ -1,96 +0,0 @@
# OCR Service
The OCR (Optical Character Recognition) and text extraction service is responsible for extracting plain text content from various file formats, such as PDFs, Office documents, and more. This is a crucial component for making email attachments searchable.
## Overview
The system employs a two-pronged approach for text extraction:
1. **Primary Extractor (Apache Tika)**: A powerful and versatile toolkit that can extract text from a wide variety of file formats. It is the recommended method for its superior performance and format support.
2. **Legacy Extractor**: A fallback mechanism that uses a combination of libraries (`pdf2json`, `mammoth`, `xlsx`) for common file types like PDF, DOCX, and XLSX. This is used when Apache Tika is not configured.
The main logic resides in `packages/backend/src/helpers/textExtractor.ts`, which decides which extraction method to use based on the application's configuration.
## Configuration
To enable the primary text extraction method, you must configure the URL of an Apache Tika server instance in your environment variables.
In your `.env` file, set the `TIKA_URL`:
```env
# .env.example
# Apache Tika Integration
# ONLY active if TIKA_URL is set
TIKA_URL=http://tika:9998
```
If `TIKA_URL` is not set, the system will automatically fall back to the legacy extraction methods. The service performs a health check on startup to verify connectivity with the Tika server.
## File Size Limits
To prevent excessive memory usage and processing time, the service imposes a general size limit on files submitted for text extraction. Files larger than the configured limit will be skipped.
- **With Apache Tika**: The maximum file size is **100MB**.
- **With Legacy Fallback**: The maximum file size is **50MB**.
## Supported File Formats
The service's ability to extract text depends on whether it's using Apache Tika or the legacy fallback methods.
### With Apache Tika
When `TIKA_URL` is configured, the service can process a vast range of file formats. Apache Tika is designed for broad compatibility and supports hundreds of file types, including but not limited to:
- Portable Document Format (PDF)
- Microsoft Office formats (DOC, DOCX, PPT, PPTX, XLS, XLSX)
- OpenDocument Formats (ODT, ODS, ODP)
- Rich Text Format (RTF)
- Plain Text (TXT, CSV, JSON, XML, HTML)
- Image formats with OCR capabilities (PNG, JPEG, TIFF)
- Archive formats (ZIP, TAR, GZ)
- Email formats (EML, MSG)
For a complete and up-to-date list, please refer to the official [Apache Tika documentation](https://tika.apache.org/3.2.3/formats.html).
### With Legacy Fallback
When Tika is not configured, text extraction is limited to the following formats:
- `application/pdf` (PDF)
- `application/vnd.openxmlformats-officedocument.wordprocessingml.document` (DOCX)
- `application/vnd.openxmlformats-officedocument.spreadsheetml.sheet` (XLSX)
- Plain text formats such as `text/*`, `application/json`, and `application/xml`.
## Features of the Tika Integration (`OcrService`)
The `OcrService` (`packages/backend/src/services/OcrService.ts`) provides several enhancements to make text extraction efficient and robust.
### Caching
To avoid redundant processing of the same file, the service implements a simple LRU (Least Recently Used) cache.
- **Cache Key**: A SHA-256 hash of the file's buffer is used as the cache key.
- **Functionality**: If a file with the same hash is processed again, the text content is served directly from the cache, saving significant processing time.
- **Statistics**: The service keeps track of cache hits, misses, and the hit rate for performance monitoring.
### Concurrency Management (Semaphore)
Extracting text from large files can be resource-intensive. To prevent the Tika server from being overwhelmed by multiple requests for the _same file_ simultaneously (e.g., during a large import), a semaphore mechanism is used.
- **Functionality**: If a request for a specific file (identified by its hash) is already in progress, any subsequent requests for the same file will wait for the first one to complete and then use its result.
- **Benefit**: This deduplicates parallel processing efforts and reduces unnecessary load on the Tika server.
### Health Check and DNS Fallback
- **Availability Check**: The service includes a `checkTikaAvailability` method to verify that the Tika server is reachable and operational. This check is performed on application startup.
- **DNS Fallback**: For convenience in Docker environments, if the Tika URL uses the hostname `tika` (e.g., `http://tika:9998`), the service will automatically attempt a fallback to `localhost` if the initial connection fails.
## Legacy Fallback Methods
When Tika is not available, the `extractTextLegacy` function in `textExtractor.ts` handles extraction for a limited set of MIME types:
- `application/pdf`: Processed using `pdf2json`. Includes a 50MB size limit and a 5-second timeout to prevent memory issues.
- `application/vnd.openxmlformats-officedocument.wordprocessingml.document` (DOCX): Processed using `mammoth`.
- `application/vnd.openxmlformats-officedocument.spreadsheetml.sheet` (XLSX): Processed using `xlsx`.
- Plain text formats (`text/*`, `application/json`, `application/xml`): Converted directly from the buffer.

View File

@@ -76,19 +76,18 @@ Here is a complete list of environment variables available for configuration:
These variables are used by `docker-compose.yml` to configure the services.
| Variable | Description | Default Value |
| ---------------------- | ---------------------------------------------------- | -------------------------------------------------------- |
| `POSTGRES_DB` | The name of the PostgreSQL database. | `open_archive` |
| `POSTGRES_USER` | The username for the PostgreSQL database. | `admin` |
| `POSTGRES_PASSWORD` | The password for the PostgreSQL database. | `password` |
| `DATABASE_URL` | The connection URL for the PostgreSQL database. | `postgresql://admin:password@postgres:5432/open_archive` |
| `MEILI_MASTER_KEY` | The master key for Meilisearch. | `aSampleMasterKey` |
| `MEILI_HOST` | The host for the Meilisearch service. | `http://meilisearch:7700` |
| `MEILI_INDEXING_BATCH` | The number of emails to batch together for indexing. | `500` |
| `REDIS_HOST` | The host for the Valkey (Redis) service. | `valkey` |
| `REDIS_PORT` | The port for the Valkey (Redis) service. | `6379` |
| `REDIS_PASSWORD` | The password for the Valkey (Redis) service. | `defaultredispassword` |
| `REDIS_TLS_ENABLED` | Enable or disable TLS for Redis. | `false` |
| Variable | Description | Default Value |
| ------------------- | ----------------------------------------------- | -------------------------------------------------------- |
| `POSTGRES_DB` | The name of the PostgreSQL database. | `open_archive` |
| `POSTGRES_USER` | The username for the PostgreSQL database. | `admin` |
| `POSTGRES_PASSWORD` | The password for the PostgreSQL database. | `password` |
| `DATABASE_URL` | The connection URL for the PostgreSQL database. | `postgresql://admin:password@postgres:5432/open_archive` |
| `MEILI_MASTER_KEY` | The master key for Meilisearch. | `aSampleMasterKey` |
| `MEILI_HOST` | The host for the Meilisearch service. | `http://meilisearch:7700` |
| `REDIS_HOST` | The host for the Valkey (Redis) service. | `valkey` |
| `REDIS_PORT` | The port for the Valkey (Redis) service. | `6379` |
| `REDIS_PASSWORD` | The password for the Valkey (Redis) service. | `defaultredispassword` |
| `REDIS_TLS_ENABLED` | Enable or disable TLS for Redis. | `false` |
#### Storage Settings
@@ -115,12 +114,6 @@ These variables are used by `docker-compose.yml` to configure the services.
| `RATE_LIMIT_MAX_REQUESTS` | The maximum number of API requests allowed from an IP within the window. | `100` |
| `ENCRYPTION_KEY` | A 32-byte hex string for encrypting sensitive data in the database. | |
#### Apache Tika Integration
| Variable | Description | Default Value |
| ---------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | ------------------ |
| `TIKA_URL` | Optional. The URL of an Apache Tika server for advanced text extraction from attachments. If not set, the application falls back to built-in parsers for PDF, Word, and Excel files. | `http://tika:9998` |
## 3. Run the Application
Once you have configured your `.env` file, you can start all the services using Docker Compose:

View File

@@ -59,7 +59,7 @@
"reflect-metadata": "^0.2.2",
"sqlite3": "^5.1.7",
"tsconfig-paths": "^4.2.0",
"xlsx": "https://cdn.sheetjs.com/xlsx-0.20.3/xlsx-0.20.3.tgz",
"xlsx": "^0.18.5",
"yauzl": "^3.2.0",
"zod": "^4.1.5"
},
@@ -74,6 +74,7 @@
"@types/multer": "^2.0.0",
"@types/node": "^24.0.12",
"@types/yauzl": "^2.10.3",
"bull-board": "^2.1.3",
"ts-node-dev": "^2.0.0",
"typescript": "^5.8.3"
}

View File

@@ -1,6 +1,6 @@
import { storage } from './storage';
import { app } from './app';
import { searchConfig, meiliConfig } from './search';
import { searchConfig } from './search';
import { connection as redisConfig } from './redis';
import { apiConfig } from './api';
@@ -8,7 +8,6 @@ export const config = {
storage,
app,
search: searchConfig,
meili: meiliConfig,
redis: redisConfig,
api: apiConfig,
};

View File

@@ -4,9 +4,3 @@ export const searchConfig = {
host: process.env.MEILI_HOST || 'http://127.0.0.1:7700',
apiKey: process.env.MEILI_MASTER_KEY || '',
};
export const meiliConfig = {
indexingBatchSize: process.env.MEILI_INDEXING_BATCH
? parseInt(process.env.MEILI_INDEXING_BATCH)
: 500,
};

View File

@@ -1,10 +1,7 @@
import PDFParser from 'pdf2json';
import mammoth from 'mammoth';
import xlsx from 'xlsx';
import { logger } from '../config/logger';
import { OcrService } from '../services/OcrService';
// Legacy PDF extraction (with improved memory management)
function extractTextFromPdf(buffer: Buffer): Promise<string> {
return new Promise((resolve) => {
const pdfParser = new PDFParser(null, true);
@@ -13,60 +10,34 @@ function extractTextFromPdf(buffer: Buffer): Promise<string> {
const finish = (text: string) => {
if (completed) return;
completed = true;
// explicit cleanup
try {
pdfParser.removeAllListeners();
} catch (e) {
// Ignore cleanup errors
}
pdfParser.removeAllListeners();
resolve(text);
};
pdfParser.on('pdfParser_dataError', (err: any) => {
logger.warn('PDF parsing error:', err?.parserError || 'Unknown error');
finish('');
});
pdfParser.on('pdfParser_dataReady', () => {
try {
const text = pdfParser.getRawTextContent();
finish(text || '');
} catch (err) {
logger.warn('Error getting PDF text content:', err);
finish('');
}
});
pdfParser.on('pdfParser_dataError', () => finish(''));
pdfParser.on('pdfParser_dataReady', () => finish(pdfParser.getRawTextContent()));
try {
pdfParser.parseBuffer(buffer);
} catch (err) {
logger.error('Error parsing PDF buffer:', err);
console.error('Error parsing PDF buffer', err);
finish('');
}
// reduced Timeout for better performance
setTimeout(() => {
logger.warn('PDF parsing timed out');
finish('');
}, 5000);
// Prevent hanging if the parser never emits events
setTimeout(() => finish(''), 10000);
});
}
// Legacy text extraction for various formats
async function extractTextLegacy(buffer: Buffer, mimeType: string): Promise<string> {
export async function extractText(buffer: Buffer, mimeType: string): Promise<string> {
try {
if (mimeType === 'application/pdf') {
// Check PDF size (memory protection)
if (buffer.length > 50 * 1024 * 1024) { // 50MB Limit
logger.warn('PDF too large for legacy extraction, skipping');
return '';
}
return await extractTextFromPdf(buffer);
}
if (mimeType === 'application/vnd.openxmlformats-officedocument.wordprocessingml.document') {
if (
mimeType === 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'
) {
const { value } = await mammoth.extractRawText({ buffer });
return value;
}
@@ -79,7 +50,7 @@ async function extractTextLegacy(buffer: Buffer, mimeType: string): Promise<stri
const sheetText = xlsx.utils.sheet_to_txt(sheet);
fullText += sheetText + '\n';
}
return fullText.trim();
return fullText;
}
if (
@@ -89,54 +60,11 @@ async function extractTextLegacy(buffer: Buffer, mimeType: string): Promise<stri
) {
return buffer.toString('utf-8');
}
return '';
} catch (error) {
logger.error(`Error extracting text from attachment with MIME type ${mimeType}:`, error);
// Force garbage collection if available
if (global.gc) {
global.gc();
}
return '';
}
}
// Main extraction function
export async function extractText(buffer: Buffer, mimeType: string): Promise<string> {
// Input validation
if (!buffer || buffer.length === 0) {
return '';
}
if (!mimeType) {
logger.warn('No MIME type provided for text extraction');
return '';
}
// General size limit
const maxSize = process.env.TIKA_URL ? 100 * 1024 * 1024 : 50 * 1024 * 1024; // 100MB for Tika, 50MB for Legacy
if (buffer.length > maxSize) {
logger.warn(`File too large for text extraction: ${buffer.length} bytes (limit: ${maxSize})`);
return '';
}
// Decide between Tika and legacy
const tikaUrl = process.env.TIKA_URL;
if (tikaUrl) {
// Tika decides what it can parse
logger.debug(`Using Tika for text extraction: ${mimeType}`);
const ocrService = new OcrService()
try {
return await ocrService.extractTextWithTika(buffer, mimeType);
} catch (error) {
logger.error({ error }, "OCR text extraction failed, returning empty string")
return ''
}
} else {
// extract using legacy mode
return await extractTextLegacy(buffer, mimeType);
console.error(`Error extracting text from attachment with MIME type ${mimeType}:`, error);
return ''; // Return empty string on failure
}
console.warn(`Unsupported MIME type for text extraction: ${mimeType}`);
return ''; // Return empty string for unsupported types
}

View File

@@ -3,15 +3,14 @@ import { IndexingService } from '../../services/IndexingService';
import { SearchService } from '../../services/SearchService';
import { StorageService } from '../../services/StorageService';
import { DatabaseService } from '../../services/DatabaseService';
import { PendingEmail } from '@open-archiver/types';
const searchService = new SearchService();
const storageService = new StorageService();
const databaseService = new DatabaseService();
const indexingService = new IndexingService(databaseService, searchService, storageService);
export default async function (job: Job<{ emails: PendingEmail[] }>) {
const { emails } = job.data;
console.log(`Indexing email batch with ${emails.length} emails`);
await indexingService.indexEmailBatch(emails);
export default async function (job: Job<{ emailId: string }>) {
const { emailId } = job.data;
console.log(`Indexing email with ID: ${emailId}`);
await indexingService.indexEmailById(emailId);
}

View File

@@ -1,19 +1,9 @@
import { Job } from 'bullmq';
import {
IProcessMailboxJob,
SyncState,
ProcessMailboxError,
PendingEmail,
} from '@open-archiver/types';
import { IProcessMailboxJob, SyncState, ProcessMailboxError } from '@open-archiver/types';
import { IngestionService } from '../../services/IngestionService';
import { logger } from '../../config/logger';
import { EmailProviderFactory } from '../../services/EmailProviderFactory';
import { StorageService } from '../../services/StorageService';
import { IndexingService } from '../../services/IndexingService';
import { SearchService } from '../../services/SearchService';
import { DatabaseService } from '../../services/DatabaseService';
import { config } from '../../config';
/**
* This processor handles the ingestion of emails for a single user's mailbox.
@@ -25,16 +15,9 @@ import { config } from '../../config';
*/
export const processMailboxProcessor = async (job: Job<IProcessMailboxJob, SyncState, string>) => {
const { ingestionSourceId, userEmail } = job.data;
const BATCH_SIZE: number = config.meili.indexingBatchSize;
let emailBatch: PendingEmail[] = [];
logger.info({ ingestionSourceId, userEmail }, `Processing mailbox for user`);
const searchService = new SearchService();
const storageService = new StorageService();
const databaseService = new DatabaseService();
const indexingService = new IndexingService(databaseService, searchService, storageService);
try {
const source = await IngestionService.findById(ingestionSourceId);
if (!source) {
@@ -43,38 +26,22 @@ export const processMailboxProcessor = async (job: Job<IProcessMailboxJob, SyncS
const connector = EmailProviderFactory.createConnector(source);
const ingestionService = new IngestionService();
const storageService = new StorageService();
// Pass the sync state for the entire source, the connector will handle per-user logic if necessary
for await (const email of connector.fetchEmails(userEmail, source.syncState)) {
if (email) {
const processedEmail = await ingestionService.processEmail(
email,
source,
storageService,
userEmail
);
if (processedEmail) {
emailBatch.push(processedEmail);
if (emailBatch.length >= BATCH_SIZE) {
await indexingService.indexEmailBatch(emailBatch);
emailBatch = [];
}
}
await ingestionService.processEmail(email, source, storageService, userEmail);
}
}
if (emailBatch.length > 0) {
await indexingService.indexEmailBatch(emailBatch);
emailBatch = [];
}
const newSyncState = connector.getUpdatedSyncState(userEmail);
logger.info({ ingestionSourceId, userEmail }, `Finished processing mailbox for user`);
// Return the new sync state to be aggregated by the parent flow
return newSyncState;
} catch (error) {
if (emailBatch.length > 0) {
await indexingService.indexEmailBatch(emailBatch);
}
logger.error({ err: error, ingestionSourceId, userEmail }, 'Error processing mailbox');
const errorMessage = error instanceof Error ? error.message : 'An unknown error occurred';
const processMailboxError: ProcessMailboxError = {

View File

@@ -8,7 +8,6 @@ const scheduleContinuousSync = async () => {
'schedule-continuous-sync',
{},
{
jobId: 'schedule-continuous-sync',
repeat: {
pattern: config.app.syncFrequency,
},

View File

@@ -1,10 +1,4 @@
import {
Attachment,
EmailAddress,
EmailDocument,
EmailObject,
PendingEmail,
} from '@open-archiver/types';
import { Attachment, EmailAddress, EmailDocument, EmailObject } from '@open-archiver/types';
import { SearchService } from './SearchService';
import { StorageService } from './StorageService';
import { extractText } from '../helpers/textExtractor';
@@ -13,7 +7,6 @@ import { archivedEmails, attachments, emailAttachments } from '../database/schem
import { eq } from 'drizzle-orm';
import { streamToBuffer } from '../helpers/streamToBuffer';
import { simpleParser } from 'mailparser';
import { logger } from '../config/logger';
interface DbRecipients {
to: { name: string; address: string }[];
@@ -27,45 +20,14 @@ type AttachmentsType = {
mimeType: string;
}[];
/**
* Sanitizes text content by removing invalid characters that could cause JSON serialization issues
*/
function sanitizeText(text: string): string {
if (!text) return '';
// Remove control characters and invalid UTF-8 sequences
return text
.replace(/\uFFFD/g, '') // Replacement character for invalid UTF-8 sequences
.replace(/[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]/g, '') // Remove control characters
.trim();
}
/**
* Recursively sanitize all string values in an object to prevent JSON issues
*/
function sanitizeObject<T>(obj: T): T {
if (typeof obj === 'string') {
return sanitizeText(obj) as unknown as T;
} else if (Array.isArray(obj)) {
return obj.map(sanitizeObject) as unknown as T;
} else if (obj !== null && typeof obj === 'object') {
const sanitized: any = {};
for (const key in obj) {
if (Object.prototype.hasOwnProperty.call(obj, key)) {
sanitized[key] = sanitizeObject((obj as any)[key]);
}
}
return sanitized;
}
return obj;
}
export class IndexingService {
private dbService: DatabaseService;
private searchService: SearchService;
private storageService: StorageService;
/**
* Initializes the service with its dependencies.
*/
constructor(
dbService: DatabaseService,
searchService: SearchService,
@@ -77,129 +39,9 @@ export class IndexingService {
}
/**
* Index multiple emails in a single batch operation for better performance
* Fetches an email by its ID from the database, creates a search document, and indexes it.
*/
public async indexEmailBatch(emails: PendingEmail[]): Promise<void> {
if (emails.length === 0) {
return;
}
logger.info({ batchSize: emails.length }, 'Starting batch indexing of emails');
try {
const CONCURRENCY_LIMIT = 10;
const rawDocuments: EmailDocument[] = [];
for (let i = 0; i < emails.length; i += CONCURRENCY_LIMIT) {
const batch = emails.slice(i, i + CONCURRENCY_LIMIT);
const batchDocuments = await Promise.allSettled(
batch.map(async ({ email, sourceId, archivedId }) => {
try {
return await this.createEmailDocumentFromRawForBatch(
email,
sourceId,
archivedId,
email.userEmail || ''
);
} catch (error) {
logger.error(
{
emailId: archivedId,
sourceId,
userEmail: email.userEmail || '',
rawEmailData: JSON.stringify(email, null, 2),
error: error instanceof Error ? error.message : String(error),
},
'Failed to create document for email in batch'
);
throw error;
}
})
);
for (const result of batchDocuments) {
if (result.status === 'fulfilled') {
rawDocuments.push(result.value);
} else {
logger.error({ error: result.reason }, 'Failed to process email in batch');
}
}
}
if (rawDocuments.length === 0) {
logger.warn('No documents created from email batch');
return;
}
// Sanitize all documents
const sanitizedDocuments = rawDocuments.map((doc) => sanitizeObject(doc));
// Ensure all required fields are present
const completeDocuments = sanitizedDocuments.map((doc) =>
this.ensureEmailDocumentFields(doc)
);
// Validate each document and separate valid from invalid ones
const validDocuments: EmailDocument[] = [];
const invalidDocuments: { doc: any; reason: string }[] = [];
for (const doc of completeDocuments) {
if (this.isValidEmailDocument(doc)) {
validDocuments.push(doc);
} else {
invalidDocuments.push({ doc, reason: 'JSON.stringify failed' });
logger.warn({ document: doc }, 'Skipping invalid EmailDocument');
}
}
// Log detailed information for invalid documents
if (invalidDocuments.length > 0) {
for (const { doc } of invalidDocuments) {
logger.error(
{
emailId: doc.id,
document: JSON.stringify(doc, null, 2),
},
'Invalid EmailDocument details'
);
}
}
if (validDocuments.length === 0) {
logger.warn('No valid documents to index in batch.');
return;
}
logger.debug({ documentCount: validDocuments.length }, 'Sending batch to Meilisearch');
await this.searchService.addDocuments('emails', validDocuments, 'id');
logger.info(
{
batchSize: emails.length,
successfulDocuments: validDocuments.length,
failedDocuments: emails.length - validDocuments.length,
invalidDocuments: invalidDocuments.length,
},
'Successfully indexed email batch'
);
} catch (error) {
logger.error(
{
batchSize: emails.length,
error: error instanceof Error ? error.message : String(error),
},
'Failed to index email batch'
);
throw error;
}
}
/**
* @deprecated
*/
private async indexEmailById(emailId: string): Promise<void> {
public async indexEmailById(emailId: string): Promise<void> {
const email = await this.dbService.db.query.archivedEmails.findFirst({
where: eq(archivedEmails.id, emailId),
});
@@ -233,14 +75,16 @@ export class IndexingService {
}
/**
* @deprecated
* Indexes an email object directly, creates a search document, and indexes it.
*/
private async indexByEmail(
pendingEmail: PendingEmail
public async indexByEmail(
email: EmailObject,
ingestionSourceId: string,
archivedEmailId: string
): Promise<void> {
const attachments: AttachmentsType = [];
if (pendingEmail.email.attachments && pendingEmail.email.attachments.length > 0) {
for (const attachment of pendingEmail.email.attachments) {
if (email.attachments && email.attachments.length > 0) {
for (const attachment of email.attachments) {
attachments.push({
buffer: attachment.content,
filename: attachment.filename,
@@ -249,96 +93,19 @@ export class IndexingService {
}
}
const document = await this.createEmailDocumentFromRaw(
pendingEmail.email,
email,
attachments,
pendingEmail.sourceId,
pendingEmail.archivedId,
pendingEmail.email.userEmail || ''
ingestionSourceId,
archivedEmailId,
email.userEmail || ''
);
// console.log(document);
await this.searchService.addDocuments('emails', [document], 'id');
}
/**
* Creates a search document from a raw email object and its attachments.
*/
private async createEmailDocumentFromRawForBatch(
email: EmailObject,
ingestionSourceId: string,
archivedEmailId: string,
userEmail: string
): Promise<EmailDocument> {
const extractedAttachments: { filename: string; content: string }[] = [];
if (email.attachments && email.attachments.length > 0) {
const ATTACHMENT_CONCURRENCY = 3;
for (let i = 0; i < email.attachments.length; i += ATTACHMENT_CONCURRENCY) {
const attachmentBatch = email.attachments.slice(i, i + ATTACHMENT_CONCURRENCY);
const attachmentResults = await Promise.allSettled(
attachmentBatch.map(async (attachment) => {
try {
if (!this.shouldExtractText(attachment.contentType)) {
return null;
}
const textContent = await extractText(
attachment.content,
attachment.contentType || ''
);
return {
filename: attachment.filename,
content: textContent || '',
};
} catch (error) {
logger.warn(
{
filename: attachment.filename,
mimeType: attachment.contentType,
emailId: archivedEmailId,
error: error instanceof Error ? error.message : String(error),
},
'Failed to extract text from attachment'
);
return null;
}
})
);
for (const result of attachmentResults) {
if (result.status === 'fulfilled' && result.value) {
extractedAttachments.push(result.value);
}
}
}
}
const allAttachmentText = extractedAttachments
.map((att) => sanitizeText(att.content))
.join(' ');
const enhancedBody = [sanitizeText(email.body || email.html || ''), allAttachmentText]
.filter(Boolean)
.join('\n\n--- Attachments ---\n\n');
return {
id: archivedEmailId,
userEmail: userEmail,
from: email.from[0]?.address || '',
to: email.to?.map((addr: EmailAddress) => addr.address) || [],
cc: email.cc?.map((addr: EmailAddress) => addr.address) || [],
bcc: email.bcc?.map((addr: EmailAddress) => addr.address) || [],
subject: email.subject || '',
body: enhancedBody,
attachments: extractedAttachments,
timestamp: new Date(email.receivedAt).getTime(),
ingestionSourceId: ingestionSourceId,
};
}
private async createEmailDocumentFromRaw(
email: EmailObject,
attachments: AttachmentsType,
@@ -359,6 +126,7 @@ export class IndexingService {
`Failed to extract text from attachment: ${attachment.filename}`,
error
);
// skip attachment or fail the job
}
}
// console.log('email.userEmail', userEmail);
@@ -377,6 +145,9 @@ export class IndexingService {
};
}
/**
* Creates a search document from a database email record and its attachments.
*/
private async createEmailDocument(
email: typeof archivedEmails.$inferSelect,
attachments: Attachment[],
@@ -410,6 +181,9 @@ export class IndexingService {
};
}
/**
* Extracts text content from a list of attachments.
*/
private async extractAttachmentContents(
attachments: Attachment[]
): Promise<{ filename: string; content: string }[]> {
@@ -428,90 +202,9 @@ export class IndexingService {
`Failed to extract text from attachment: ${attachment.filename}`,
error
);
// skip attachment or fail the job
}
}
return extractedAttachments;
}
private shouldExtractText(mimeType: string): boolean {
if (process.env.TIKA_URL) {
return true;
}
if (!mimeType) return false;
// Tika supported mime types: https://tika.apache.org/2.4.1/formats.html
const extractableTypes = [
'application/pdf',
'application/msword',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'application/vnd.ms-excel',
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
'application/vnd.ms-powerpoint',
'application/vnd.openxmlformats-officedocument.presentationml.presentation',
'text/plain',
'text/html',
'application/rss+xml',
'application/xml',
'application/json',
'text/rtf',
'application/rtf',
'text/csv',
'text/tsv',
'application/csv',
'image/bpg',
'image/png',
'image/vnd.wap.wbmp',
'image/x-jbig2',
'image/bmp',
'image/x-xcf',
'image/gif',
'image/x-icon',
'image/jpeg',
'image/x-ms-bmp',
'image/webp',
'image/tiff',
'image/svg+xml',
'application/vnd.apple.pages',
'application/vnd.apple.numbers',
'application/vnd.apple.keynote',
'image/heic',
'image/heif',
];
return extractableTypes.some((type) => mimeType.toLowerCase().includes(type));
}
/**
* Ensures all required fields are present in EmailDocument
*/
private ensureEmailDocumentFields(doc: Partial<EmailDocument>): EmailDocument {
return {
id: doc.id || 'missing-id',
userEmail: doc.userEmail || 'unknown',
from: doc.from || '',
to: Array.isArray(doc.to) ? doc.to : [],
cc: Array.isArray(doc.cc) ? doc.cc : [],
bcc: Array.isArray(doc.bcc) ? doc.bcc : [],
subject: doc.subject || '',
body: doc.body || '',
attachments: Array.isArray(doc.attachments) ? doc.attachments : [],
timestamp: typeof doc.timestamp === 'number' ? doc.timestamp : Date.now(),
ingestionSourceId: doc.ingestionSourceId || 'unknown',
};
}
/**
* Validates if the given object is a valid EmailDocument that can be serialized to JSON
*/
private isValidEmailDocument(doc: any): boolean {
try {
JSON.stringify(doc);
return true;
} catch (error) {
logger.error({ doc, error: (error as Error).message }, 'Invalid EmailDocument detected');
return false;
}
}
}

View File

@@ -6,7 +6,6 @@ import type {
IngestionSource,
IngestionCredentials,
IngestionProvider,
PendingEmail,
} from '@open-archiver/types';
import { and, desc, eq } from 'drizzle-orm';
import { CryptoService } from './CryptoService';
@@ -303,7 +302,7 @@ export class IngestionService {
source: IngestionSource,
storage: StorageService,
userEmail: string
): Promise<PendingEmail | null> {
): Promise<void> {
try {
// Generate a unique message ID for the email. If the email already has a message-id header, use that.
// Otherwise, generate a new one based on the email's hash, source ID, and email ID.
@@ -332,7 +331,7 @@ export class IngestionService {
{ messageId, ingestionSourceId: source.id },
'Skipping duplicate email'
);
return null;
return;
}
const emlBuffer = email.eml ?? Buffer.from(email.body, 'utf-8');
@@ -399,14 +398,23 @@ export class IngestionService {
.onConflictDoNothing();
}
}
// adding to indexing queue
//Instead: index by email (raw email object, ingestion id)
logger.info({ emailId: archivedEmail.id }, 'Indexing email');
// await indexingQueue.add('index-email', {
// emailId: archivedEmail.id,
// });
const searchService = new SearchService();
const storageService = new StorageService();
const databaseService = new DatabaseService();
const indexingService = new IndexingService(
databaseService,
searchService,
storageService
);
//assign userEmail
email.userEmail = userEmail;
return {
email,
sourceId: source.id,
archivedId: archivedEmail.id,
};
await indexingService.indexByEmail(email, source.id, archivedEmail.id);
} catch (error) {
logger.error({
message: `Failed to process email ${email.id} for source ${source.id}`,
@@ -414,7 +422,6 @@ export class IngestionService {
emailId: email.id,
ingestionSourceId: source.id,
});
return null;
}
}
}

View File

@@ -1,271 +0,0 @@
import crypto from 'crypto';
import { logger } from '../config/logger';
// Simple LRU cache for Tika results with statistics
class TikaCache {
private cache = new Map<string, string>();
private maxSize = 50;
private hits = 0;
private misses = 0;
get(key: string): string | undefined {
const value = this.cache.get(key);
if (value !== undefined) {
this.hits++;
// LRU: Move element to the end
this.cache.delete(key);
this.cache.set(key, value);
} else {
this.misses++;
}
return value;
}
set(key: string, value: string): void {
// If already exists, delete first
if (this.cache.has(key)) {
this.cache.delete(key);
}
// If cache is full, remove oldest element
else if (this.cache.size >= this.maxSize) {
const firstKey = this.cache.keys().next().value;
if (firstKey !== undefined) {
this.cache.delete(firstKey);
}
}
this.cache.set(key, value);
}
getStats(): { size: number; maxSize: number; hits: number; misses: number; hitRate: number } {
const total = this.hits + this.misses;
const hitRate = total > 0 ? (this.hits / total) * 100 : 0;
return {
size: this.cache.size,
maxSize: this.maxSize,
hits: this.hits,
misses: this.misses,
hitRate: Math.round(hitRate * 100) / 100 // 2 decimal places
};
}
reset(): void {
this.cache.clear();
this.hits = 0;
this.misses = 0;
}
}
// Semaphore for running Tika requests
class TikaSemaphore {
private inProgress = new Map<string, Promise<string>>();
private waitCount = 0;
async acquire(key: string, operation: () => Promise<string>): Promise<string> {
// Check if a request for this key is already running
const existingPromise = this.inProgress.get(key);
if (existingPromise) {
this.waitCount++;
logger.debug(`Waiting for in-progress Tika request (${key.slice(0, 8)}...)`);
try {
return await existingPromise;
} finally {
this.waitCount--;
}
}
// Start new request
const promise = this.executeOperation(key, operation);
this.inProgress.set(key, promise);
try {
return await promise;
} finally {
// Remove promise from map when finished
this.inProgress.delete(key);
}
}
private async executeOperation(key: string, operation: () => Promise<string>): Promise<string> {
try {
return await operation();
} catch (error) {
// Remove promise from map even on errors
logger.error(`Tika operation failed for key ${key.slice(0, 8)}...`, error);
throw error;
}
}
getStats(): { inProgress: number; waitCount: number } {
return {
inProgress: this.inProgress.size,
waitCount: this.waitCount
};
}
clear(): void {
this.inProgress.clear();
this.waitCount = 0;
}
}
export class OcrService {
private tikaCache = new TikaCache();
private tikaSemaphore = new TikaSemaphore();
// Tika-based text extraction with cache and semaphore
async extractTextWithTika(buffer: Buffer, mimeType: string): Promise<string> {
const tikaUrl = process.env.TIKA_URL;
if (!tikaUrl) {
throw new Error('TIKA_URL environment variable not set');
}
// Cache key: SHA-256 hash of the buffer
const hash = crypto.createHash('sha256').update(buffer).digest('hex');
// Cache lookup (before semaphore!)
const cachedResult = this.tikaCache.get(hash);
if (cachedResult !== undefined) {
logger.debug(`Tika cache hit for ${mimeType} (${buffer.length} bytes)`);
return cachedResult;
}
// Use semaphore to deduplicate parallel requests
return await this.tikaSemaphore.acquire(hash, async () => {
// Check cache again (might have been filled by parallel request)
const cachedAfterWait = this.tikaCache.get(hash);
if (cachedAfterWait !== undefined) {
logger.debug(`Tika cache hit after wait for ${mimeType} (${buffer.length} bytes)`);
return cachedAfterWait;
}
logger.debug(`Executing Tika request for ${mimeType} (${buffer.length} bytes)`);
// DNS fallback: If "tika" hostname, also try localhost
const urlsToTry = [
`${tikaUrl}/tika`,
// Fallback falls DNS-Problem mit "tika" hostname
...(tikaUrl.includes('://tika:')
? [`${tikaUrl.replace('://tika:', '://localhost:')}/tika`]
: [])
];
for (const url of urlsToTry) {
try {
logger.debug(`Trying Tika URL: ${url}`);
const response = await fetch(url, {
method: 'PUT',
headers: {
'Content-Type': mimeType || 'application/octet-stream',
Accept: 'text/plain',
Connection: 'close'
},
body: buffer,
signal: AbortSignal.timeout(180000)
});
if (!response.ok) {
logger.warn(
`Tika extraction failed at ${url}: ${response.status} ${response.statusText}`
);
continue; // Try next URL
}
const text = await response.text();
const result = text.trim();
// Cache result (also empty strings to avoid repeated attempts)
this.tikaCache.set(hash, result);
const cacheStats = this.tikaCache.getStats();
const semaphoreStats = this.tikaSemaphore.getStats();
logger.debug(
`Tika extraction successful - Cache: ${cacheStats.hits}H/${cacheStats.misses}M (${cacheStats.hitRate}%) - Semaphore: ${semaphoreStats.inProgress} active, ${semaphoreStats.waitCount} waiting`
);
return result;
} catch (error) {
logger.warn(
`Tika extraction error at ${url}:`,
error instanceof Error ? error.message : 'Unknown error'
);
// Continue to next URL
}
}
// All URLs failed - cache this too (as empty string)
logger.error('All Tika URLs failed');
this.tikaCache.set(hash, '');
return '';
});
}
// Helper function to check Tika availability
async checkTikaAvailability(): Promise<boolean> {
const tikaUrl = process.env.TIKA_URL;
if (!tikaUrl) {
return false;
}
try {
const response = await fetch(`${tikaUrl}/version`, {
method: 'GET',
signal: AbortSignal.timeout(5000) // 5 seconds timeout
});
if (response.ok) {
const version = await response.text();
logger.info(`Tika server available, version: ${version.trim()}`);
return true;
}
return false;
} catch (error) {
logger.warn(
'Tika server not available:',
error instanceof Error ? error.message : 'Unknown error'
);
return false;
}
}
// Optional: Tika health check on startup
async initializeTextExtractor(): Promise<void> {
const tikaUrl = process.env.TIKA_URL;
if (tikaUrl) {
const isAvailable = await this.checkTikaAvailability();
if (!isAvailable) {
logger.error(`Tika server configured but not available at: ${tikaUrl}`);
logger.error('Text extraction will fall back to legacy methods or fail');
}
} else {
logger.info('Using legacy text extraction methods (pdf2json, mammoth, xlsx)');
logger.info('Set TIKA_URL environment variable to use Apache Tika for better extraction');
}
}
// Get cache statistics
getTikaCacheStats(): {
size: number;
maxSize: number;
hits: number;
misses: number;
hitRate: number;
} {
return this.tikaCache.getStats();
}
// Get semaphore statistics
getTikaSemaphoreStats(): { inProgress: number; waitCount: number } {
return this.tikaSemaphore.getStats();
}
// Clear cache (e.g. for tests or manual reset)
clearTikaCache(): void {
this.tikaCache.reset();
this.tikaSemaphore.clear();
logger.info('Tika cache and semaphore cleared');
}
}

View File

@@ -1,11 +1,11 @@
import { Worker } from 'bullmq';
import { connection } from '../config/redis';
import indexEmailBatchProcessor from '../jobs/processors/index-email-batch.processor';
import indexEmailProcessor from '../jobs/processors/index-email.processor';
const processor = async (job: any) => {
switch (job.name) {
case 'index-email-batch':
return indexEmailBatchProcessor(job);
case 'index-email':
return indexEmailProcessor(job);
default:
throw new Error(`Unknown job name: ${job.name}`);
}

View File

@@ -15,7 +15,7 @@
"dependencies": {
"@iconify/svelte": "^5.0.1",
"@open-archiver/types": "workspace:*",
"@sveltejs/kit": "^2.38.1",
"@sveltejs/kit": "^2.16.0",
"bits-ui": "^2.8.10",
"clsx": "^2.1.1",
"d3-shape": "^3.2.0",

View File

@@ -211,7 +211,7 @@
<Input id="username" bind:value={formData.providerConfig.username} class="col-span-3" />
</div>
<div class="grid grid-cols-4 items-center gap-4">
<Label for="password" class="text-left">{$t('app.auth.password')}</Label>
<Label for="password" class="text-left">{$t('auth.password')}</Label>
<Input
id="password"
type="password"

View File

@@ -55,16 +55,6 @@ export interface EmailObject {
tags?: string[];
}
/**
* Represents an email that has been processed and is ready for indexing.
* This interface defines the shape of the data that is passed to the batch indexing function.
*/
export interface PendingEmail {
email: EmailObject;
sourceId: string;
archivedId: string;
}
// Define the structure of the document to be indexed in Meilisearch
export interface EmailDocument {
id: string; // The unique ID of the email

459
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff