Google Workspace ingestion

This commit is contained in:
Wayne
2025-07-15 16:59:22 +03:00
parent a73964472c
commit 636c229a7f
7 changed files with 41 additions and 36 deletions

1
.gitignore vendored
View File

@@ -21,4 +21,3 @@ pnpm-debug.log
# Dev
.dev
.clinerules

View File

@@ -1,11 +1,9 @@
import { Job } from 'bullmq';
import { IProcessMailboxJob, EmailObject } from '@open-archiver/types';
import { IProcessMailboxJob } from '@open-archiver/types';
import { IngestionService } from '../../services/IngestionService';
import { logger } from '../../config/logger';
import { EmailProviderFactory } from '../../services/EmailProviderFactory';
import { GoogleWorkspaceConnector } from '../../services/ingestion-connectors/GoogleWorkspaceConnector';
import { StorageService } from '../../services/StorageService';
import { indexingQueue } from '../queues';
export const processMailboxProcessor = async (job: Job<IProcessMailboxJob, any, string>) => {
const { ingestionSourceId, userEmail } = job.data;
@@ -19,26 +17,12 @@ export const processMailboxProcessor = async (job: Job<IProcessMailboxJob, any,
}
const connector = EmailProviderFactory.createConnector(source);
const ingestionService = new IngestionService();
const storageService = new StorageService();
if (connector instanceof GoogleWorkspaceConnector) {
for await (const email of connector.fetchEmails(userEmail)) {
if (!email.raw) {
logger.warn({ emailId: email.id }, 'Skipping email without raw content');
continue;
}
const buffer = Buffer.from(email.raw, 'base64url');
const storageService = new StorageService();
const storagePath = `emails/${ingestionSourceId}/${userEmail}/${email.id}.eml`;
await storageService.put(storagePath, buffer);
await indexingQueue.add('index-email', { emailId: email.id });
}
} else {
logger.warn(
{ ingestionSourceId, userEmail },
'Skipping mailbox processing for non-Google Workspace provider'
);
for await (const email of connector.fetchEmails(userEmail)) {
await ingestionService.processEmail(email, source, storageService);
}
logger.info({ ingestionSourceId, userEmail }, `Finished processing mailbox for user`);
} catch (error) {
logger.error({ err: error, ingestionSourceId, userEmail }, 'Error processing mailbox');

View File

@@ -160,7 +160,7 @@ export class IngestionService {
}
}
private async processEmail(
public async processEmail(
email: EmailObject,
source: IngestionSource,
storage: StorageService

View File

@@ -2,10 +2,12 @@ import { google } from 'googleapis';
import type { admin_directory_v1, gmail_v1, Common } from 'googleapis';
import type {
GoogleWorkspaceCredentials,
EmailObject
EmailObject,
EmailAddress
} from '@open-archiver/types';
import type { IEmailConnector } from '../EmailProviderFactory';
import { logger } from '../../config/logger';
import { simpleParser, ParsedMail, Attachment, AddressObject } from 'mailparser';
/**
* A connector for Google Workspace that uses a service account with domain-wide delegation
@@ -152,18 +154,37 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
});
if (msgResponse.data.raw) {
// The raw data is base64url encoded, so we need to decode it.
const rawEmail = Buffer.from(msgResponse.data.raw, 'base64url');
const parsedEmail: ParsedMail = await simpleParser(rawEmail);
const attachments = parsedEmail.attachments.map((attachment: Attachment) => ({
filename: attachment.filename || 'untitled',
contentType: attachment.contentType,
size: attachment.size,
content: attachment.content as Buffer
}));
const mapAddresses = (addresses: AddressObject | AddressObject[] | undefined): EmailAddress[] => {
if (!addresses) return [];
const addressArray = Array.isArray(addresses) ? addresses : [addresses];
return addressArray.flatMap(a => a.value.map(v => ({ name: v.name, address: v.address || '' })));
};
yield {
id: msgResponse.data.id!,
userEmail: userEmail,
raw: msgResponse.data.raw,
from: [],
to: [],
subject: '',
body: '',
html: '',
headers: {},
attachments: [],
receivedAt: new Date()
eml: rawEmail,
from: mapAddresses(parsedEmail.from),
to: mapAddresses(parsedEmail.to),
cc: mapAddresses(parsedEmail.cc),
bcc: mapAddresses(parsedEmail.bcc),
subject: parsedEmail.subject || '',
body: parsedEmail.text || '',
html: parsedEmail.html || '',
headers: parsedEmail.headers as any,
attachments,
receivedAt: parsedEmail.date || new Date(),
};
}
}

View File

@@ -2,6 +2,7 @@ import { Worker } from 'bullmq';
import { connection } from '../config/redis';
import initialImportProcessor from '../jobs/processors/initial-import.processor';
import continuousSyncProcessor from '../jobs/processors/continuous-sync.processor';
import { processMailboxProcessor } from '../jobs/processors/process-mailbox.processor';
const processor = async (job: any) => {
switch (job.name) {
@@ -9,6 +10,8 @@ const processor = async (job: any) => {
return initialImportProcessor(job);
case 'continuous-sync':
return continuousSyncProcessor(job);
case 'process-mailbox':
return processMailboxProcessor(job);
default:
throw new Error(`Unknown job name: ${job.name}`);
}

View File

@@ -47,8 +47,6 @@ export interface EmailObject {
eml?: Buffer;
/** The email address of the user whose mailbox this email belongs to. */
userEmail?: string;
/** The full, raw content of the email, typically in base64url-encoded format for APIs like Gmail. */
raw?: string;
}
// Define the structure of the document to be indexed in Meilisearch

File diff suppressed because one or more lines are too long