Google Workspace connector

This commit is contained in:
Wayne
2025-07-15 13:58:55 +03:00
parent e909f5e879
commit a73964472c
13 changed files with 334 additions and 112 deletions

View File

@@ -1,14 +1,43 @@
import { Job } from 'bullmq';
import { IngestionService } from '../../services/IngestionService';
import { IInitialImportJob } from '@open-archiver/types';
const ingestionService = new IngestionService();
import { EmailProviderFactory } from '../../services/EmailProviderFactory';
import { GoogleWorkspaceConnector } from '../../services/ingestion-connectors/GoogleWorkspaceConnector';
import { ingestionQueue } from '../queues';
import { logger } from '../../config/logger';
export default async (job: Job<IInitialImportJob>) => {
const { ingestionSourceId } = job.data;
logger.info({ ingestionSourceId }, 'Starting initial import master job');
try {
console.log(`Processing initial import for ingestion source: ${job.data.ingestionSourceId}`);
await ingestionService.performBulkImport(job.data);
const source = await IngestionService.findById(ingestionSourceId);
if (!source) {
throw new Error(`Ingestion source with ID ${ingestionSourceId} not found`);
}
const connector = EmailProviderFactory.createConnector(source);
if (connector instanceof GoogleWorkspaceConnector) {
let userCount = 0;
for await (const user of connector.listAllUsers()) {
if (user.primaryEmail) {
await ingestionQueue.add('process-mailbox', {
ingestionSourceId,
userEmail: user.primaryEmail
});
userCount++;
}
}
logger.info({ ingestionSourceId, userCount }, `Enqueued mailbox processing jobs for all users`);
} else {
// For other providers, we might trigger a simpler bulk import directly
await new IngestionService().performBulkImport(job.data);
}
logger.info({ ingestionSourceId }, 'Finished initial import master job');
} catch (error) {
console.error(error);
logger.error({ err: error, ingestionSourceId }, 'Error in initial import master job');
throw error;
}
};

View File

@@ -0,0 +1,47 @@
import { Job } from 'bullmq';
import { IProcessMailboxJob, EmailObject } from '@open-archiver/types';
import { IngestionService } from '../../services/IngestionService';
import { logger } from '../../config/logger';
import { EmailProviderFactory } from '../../services/EmailProviderFactory';
import { GoogleWorkspaceConnector } from '../../services/ingestion-connectors/GoogleWorkspaceConnector';
import { StorageService } from '../../services/StorageService';
import { indexingQueue } from '../queues';
export const processMailboxProcessor = async (job: Job<IProcessMailboxJob, any, string>) => {
const { ingestionSourceId, userEmail } = job.data;
logger.info({ ingestionSourceId, userEmail }, `Processing mailbox for user`);
try {
const source = await IngestionService.findById(ingestionSourceId);
if (!source) {
throw new Error(`Ingestion source with ID ${ingestionSourceId} not found`);
}
const connector = EmailProviderFactory.createConnector(source);
if (connector instanceof GoogleWorkspaceConnector) {
for await (const email of connector.fetchEmails(userEmail)) {
if (!email.raw) {
logger.warn({ emailId: email.id }, 'Skipping email without raw content');
continue;
}
const buffer = Buffer.from(email.raw, 'base64url');
const storageService = new StorageService();
const storagePath = `emails/${ingestionSourceId}/${userEmail}/${email.id}.eml`;
await storageService.put(storagePath, buffer);
await indexingQueue.add('index-email', { emailId: email.id });
}
} else {
logger.warn(
{ ingestionSourceId, userEmail },
'Skipping mailbox processing for non-Google Workspace provider'
);
}
logger.info({ ingestionSourceId, userEmail }, `Finished processing mailbox for user`);
} catch (error) {
logger.error({ err: error, ingestionSourceId, userEmail }, 'Error processing mailbox');
throw error;
}
};

View File

@@ -5,14 +5,15 @@ import type {
GenericImapCredentials,
EmailObject
} from '@open-archiver/types';
import { GoogleConnector } from './ingestion-connectors/GoogleConnector';
import { GoogleWorkspaceConnector } from './ingestion-connectors/GoogleWorkspaceConnector';
import { MicrosoftConnector } from './ingestion-connectors/MicrosoftConnector';
import { ImapConnector } from './ingestion-connectors/ImapConnector';
// Define a common interface for all connectors
export interface IEmailConnector {
testConnection(): Promise<boolean>;
fetchEmails(since?: Date): AsyncGenerator<EmailObject>;
fetchEmails(userEmail?: string, since?: Date): AsyncGenerator<EmailObject>;
listAllUsers?(): AsyncGenerator<any>;
}
export class EmailProviderFactory {
@@ -22,7 +23,7 @@ export class EmailProviderFactory {
switch (source.provider) {
case 'google_workspace':
return new GoogleConnector(credentials as GoogleWorkspaceCredentials);
return new GoogleWorkspaceConnector(credentials as GoogleWorkspaceCredentials);
case 'microsoft_365':
return new MicrosoftConnector(credentials as Microsoft365Credentials);
case 'generic_imap':

View File

@@ -1,92 +0,0 @@
import type { GoogleWorkspaceCredentials, EmailObject, EmailAddress } from '@open-archiver/types';
import type { IEmailConnector } from '../EmailProviderFactory';
import { google } from 'googleapis';
import { simpleParser, ParsedMail, Attachment, AddressObject } from 'mailparser';
import { OAuth2Client } from 'google-auth-library';
import type { gmail_v1 } from 'googleapis';
export class GoogleConnector implements IEmailConnector {
private auth: OAuth2Client;
constructor(private credentials: GoogleWorkspaceCredentials) {
this.auth = new google.auth.OAuth2(
this.credentials.clientId,
this.credentials.clientSecret
);
}
public async testConnection(): Promise<boolean> {
try {
// The google-auth-library doesn't have a simple "verify" function.
// A common way to test is to get an access token.
const token = await this.auth.getAccessToken();
return !!token;
} catch (error) {
console.error('Failed to verify Google Workspace connection:', error);
return false;
}
}
public async *fetchEmails(since?: Date): AsyncGenerator<EmailObject> {
const gmail = google.gmail({ version: 'v1', auth: this.auth });
let nextPageToken: string | undefined | null = undefined;
do {
const res: { data: gmail_v1.Schema$ListMessagesResponse; } =
await gmail.users.messages.list({
userId: 'me',
q: since ? `after:${Math.floor(since.getTime() / 1000)}` : '',
pageToken: nextPageToken || undefined,
});
const messages = res.data.messages || [];
for (const message of messages) {
if (message.id) {
const msg = await gmail.users.messages.get({
userId: 'me',
id: message.id,
format: 'raw'
});
if (msg.data.raw) {
const emlBuffer = Buffer.from(msg.data.raw, 'base64');
const parsedEmail: ParsedMail = await simpleParser(emlBuffer);
const attachments = parsedEmail.attachments.map((attachment: Attachment) => ({
filename: attachment.filename || 'untitled',
contentType: attachment.contentType,
size: attachment.size,
content: attachment.content as Buffer
}));
const mapAddresses = (
addresses: AddressObject | AddressObject[] | undefined
): EmailAddress[] => {
if (!addresses) return [];
const addressArray = Array.isArray(addresses)
? addresses
: [addresses];
return addressArray.flatMap(a =>
a.value.map(v => ({ name: v.name, address: v.address || '' }))
);
};
yield {
id: msg.data.id!,
from: mapAddresses(parsedEmail.from),
to: mapAddresses(parsedEmail.to),
cc: mapAddresses(parsedEmail.cc),
bcc: mapAddresses(parsedEmail.bcc),
subject: parsedEmail.subject || '',
body: parsedEmail.text || '',
html: parsedEmail.html || '',
headers: parsedEmail.headers as any,
attachments,
receivedAt: parsedEmail.date || new Date(),
eml: emlBuffer
};
}
}
}
nextPageToken = res.data.nextPageToken;
} while (nextPageToken);
}
}

View File

@@ -0,0 +1,175 @@
import { google } from 'googleapis';
import type { admin_directory_v1, gmail_v1, Common } from 'googleapis';
import type {
GoogleWorkspaceCredentials,
EmailObject
} from '@open-archiver/types';
import type { IEmailConnector } from '../EmailProviderFactory';
import { logger } from '../../config/logger';
/**
* A connector for Google Workspace that uses a service account with domain-wide delegation
* to access user data on behalf of users in the domain.
*/
export class GoogleWorkspaceConnector implements IEmailConnector {
private credentials: GoogleWorkspaceCredentials;
private serviceAccountCreds: { client_email: string; private_key: string; };
constructor(credentials: GoogleWorkspaceCredentials) {
this.credentials = credentials;
try {
// Pre-parse the JSON key to catch errors early.
const parsedKey = JSON.parse(this.credentials.serviceAccountKeyJson);
if (!parsedKey.client_email || !parsedKey.private_key) {
throw new Error('Service account key JSON is missing required fields.');
}
this.serviceAccountCreds = {
client_email: parsedKey.client_email,
private_key: parsedKey.private_key
};
} catch (error) {
logger.error({ err: error }, 'Failed to parse Google Service Account JSON');
throw new Error('Invalid Google Service Account JSON key.');
}
}
/**
* Creates an authenticated JWT client capable of impersonating a user.
* @param subject The email address of the user to impersonate.
* @param scopes The OAuth scopes required for the API calls.
* @returns An authenticated JWT client.
*/
private getAuthClient(subject: string, scopes: string[]) {
const jwtClient = new google.auth.JWT({
email: this.serviceAccountCreds.client_email,
key: this.serviceAccountCreds.private_key,
scopes,
subject
});
return jwtClient;
}
/**
* Tests the connection and authentication by attempting to list the first user
* from the directory, impersonating the admin user.
*/
public async testConnection(): Promise<boolean> {
try {
const authClient = this.getAuthClient(this.credentials.impersonatedAdminEmail, [
'https://www.googleapis.com/auth/admin.directory.user.readonly'
]);
const admin = google.admin({
version: 'directory_v1',
auth: authClient
});
// Perform a simple, low-impact read operation to verify credentials.
await admin.users.list({
customer: 'my_customer',
maxResults: 1,
orderBy: 'email'
});
logger.info('Google Workspace connection test successful.');
return true;
} catch (error) {
logger.error({ err: error }, 'Failed to verify Google Workspace connection');
return false;
}
}
/**
* Lists all users in the Google Workspace domain.
* This method handles pagination to retrieve the complete list of users.
* @returns An async generator that yields each user object.
*/
public async *listAllUsers(): AsyncGenerator<admin_directory_v1.Schema$User> {
const authClient = this.getAuthClient(this.credentials.impersonatedAdminEmail, [
'https://www.googleapis.com/auth/admin.directory.user.readonly'
]);
const admin = google.admin({ version: 'directory_v1', auth: authClient });
let pageToken: string | undefined = undefined;
do {
const res: Common.GaxiosResponseWithHTTP2<admin_directory_v1.Schema$Users> = await admin.users.list({
customer: 'my_customer',
maxResults: 500, // Max allowed per page
pageToken: pageToken,
orderBy: 'email'
});
const users = res.data.users;
if (users) {
for (const user of users) {
yield user;
}
}
pageToken = res.data.nextPageToken ?? undefined;
} while (pageToken);
}
/**
* Fetches emails for a single user, starting from a specific point in time.
* This is ideal for continuous synchronization jobs.
* @param userEmail The email of the user whose mailbox will be read.
* @param since Optional date to fetch emails newer than this timestamp.
* @returns An async generator that yields each raw email object.
*/
public async *fetchEmails(
userEmail: string,
since?: Date
): AsyncGenerator<EmailObject> {
const authClient = this.getAuthClient(userEmail, [
'https://www.googleapis.com/auth/gmail.readonly'
]);
const gmail = google.gmail({ version: 'v1', auth: authClient });
let pageToken: string | undefined = undefined;
const query = since ? `after:${Math.floor(since.getTime() / 1000)}` : '';
do {
const listResponse: Common.GaxiosResponseWithHTTP2<gmail_v1.Schema$ListMessagesResponse> =
await gmail.users.messages.list({
userId: 'me', // 'me' refers to the impersonated user
q: query,
pageToken: pageToken
});
const messages = listResponse.data.messages;
if (!messages || messages.length === 0) {
return;
}
for (const message of messages) {
if (message.id) {
const msgResponse = await gmail.users.messages.get({
userId: 'me',
id: message.id,
format: 'RAW' // We want the full, raw .eml content
});
if (msgResponse.data.raw) {
yield {
id: msgResponse.data.id!,
userEmail: userEmail,
raw: msgResponse.data.raw,
from: [],
to: [],
subject: '',
body: '',
html: '',
headers: {},
attachments: [],
receivedAt: new Date()
};
}
}
}
pageToken = listResponse.data.nextPageToken ?? undefined;
} while (pageToken);
}
}

View File

@@ -30,7 +30,7 @@ export class ImapConnector implements IEmailConnector {
}
}
public async *fetchEmails(since?: Date): AsyncGenerator<EmailObject> {
public async *fetchEmails(userEmail?: string, since?: Date): AsyncGenerator<EmailObject> {
await this.client.connect();
try {
await this.client.mailboxOpen('INBOX');

View File

@@ -38,7 +38,7 @@ export class MicrosoftConnector implements IEmailConnector {
}
}
public async *fetchEmails(since?: Date): AsyncGenerator<EmailObject> {
public async *fetchEmails(userEmail?: string, since?: Date): AsyncGenerator<EmailObject> {
const accessToken = await this.getAccessToken();
const headers = { Authorization: `Bearer ${accessToken}` };

View File

@@ -5,6 +5,7 @@
import { Input } from '$lib/components/ui/input';
import { Label } from '$lib/components/ui/label';
import * as Select from '$lib/components/ui/select';
import { Textarea } from '$lib/components/ui/textarea/index.js';
let {
source = null,
@@ -20,10 +21,12 @@
{ value: 'generic_imap', label: 'Generic IMAP' }
];
let formData = $state({
let formData: CreateIngestionSourceDto = $state({
name: source?.name ?? '',
provider: source?.provider ?? 'google_workspace',
providerConfig: source?.credentials ?? {}
providerConfig: source?.credentials ?? {
type: source?.provider ?? 'google_workspace'
}
});
const triggerContent = $derived(
@@ -55,7 +58,25 @@
</Select.Root>
</div>
{#if formData.provider === 'google_workspace' || formData.provider === 'microsoft_365'}
{#if formData.provider === 'google_workspace'}
<div class="grid grid-cols-4 items-center gap-4">
<Label for="serviceAccountKeyJson" class="text-right">Service Account Key (JSON)</Label>
<Textarea
placeholder="Paste your service account key JSON content"
id="serviceAccountKeyJson"
bind:value={formData.providerConfig.serviceAccountKeyJson}
class="col-span-3 max-h-32"
/>
</div>
<div class="grid grid-cols-4 items-center gap-4">
<Label for="impersonatedAdminEmail" class="text-right">Impersonated Admin Email</Label>
<Input
id="impersonatedAdminEmail"
bind:value={formData.providerConfig.impersonatedAdminEmail}
class="col-span-3"
/>
</div>
{:else if formData.provider === 'microsoft_365'}
<div class="grid grid-cols-4 items-center gap-4">
<Label for="clientId" class="text-right">Client ID</Label>
<Input id="clientId" bind:value={formData.providerConfig.clientId} class="col-span-3" />
@@ -68,10 +89,6 @@
class="col-span-3"
/>
</div>
<div class="grid grid-cols-4 items-center gap-4">
<Label for="redirectUri" class="text-right">Redirect URI</Label>
<Input id="redirectUri" bind:value={formData.providerConfig.redirectUri} class="col-span-3" />
</div>
{:else if formData.provider === 'generic_imap'}
<div class="grid grid-cols-4 items-center gap-4">
<Label for="host" class="text-right">Host</Label>

View File

@@ -0,0 +1,7 @@
import Root from "./textarea.svelte";
export {
Root,
//
Root as Textarea,
};

View File

@@ -0,0 +1,22 @@
<script lang="ts">
import { cn, type WithElementRef, type WithoutChildren } from "$lib/utils.js";
import type { HTMLTextareaAttributes } from "svelte/elements";
let {
ref = $bindable(null),
value = $bindable(),
class: className,
...restProps
}: WithoutChildren<WithElementRef<HTMLTextareaAttributes>> = $props();
</script>
<textarea
bind:this={ref}
data-slot="textarea"
class={cn(
"border-input placeholder:text-muted-foreground focus-visible:border-ring focus-visible:ring-ring/50 aria-invalid:ring-destructive/20 dark:aria-invalid:ring-destructive/40 aria-invalid:border-destructive dark:bg-input/30 field-sizing-content shadow-xs flex min-h-16 w-full rounded-md border bg-transparent px-3 py-2 text-base outline-none transition-[color,box-shadow] focus-visible:ring-[3px] disabled:cursor-not-allowed disabled:opacity-50 md:text-sm",
className
)}
bind:value
{...restProps}
></textarea>

View File

@@ -45,6 +45,10 @@ export interface EmailObject {
receivedAt: Date;
/** An optional buffer containing the full raw EML content of the email, which is useful for archival and compliance purposes. */
eml?: Buffer;
/** The email address of the user whose mailbox this email belongs to. */
userEmail?: string;
/** The full, raw content of the email, typically in base64url-encoded format for APIs like Gmail. */
raw?: string;
}
// Define the structure of the document to be indexed in Meilisearch

View File

@@ -19,8 +19,15 @@ export interface GenericImapCredentials {
export interface GoogleWorkspaceCredentials {
type: 'google_workspace';
clientId: string;
clientSecret: string;
/**
* The full JSON content of the Google Service Account key.
* This should be a stringified JSON object.
*/
serviceAccountKeyJson: string;
/**
* The email of the super-admin user to impersonate for domain-wide operations.
*/
impersonatedAdminEmail: string;
}
export interface Microsoft365Credentials {
@@ -64,3 +71,8 @@ export interface UpdateIngestionSourceDto {
export interface IInitialImportJob {
ingestionSourceId: string;
}
export interface IProcessMailboxJob {
ingestionSourceId: string;
userEmail: string;
}

File diff suppressed because one or more lines are too long