Microsoft connector

This commit is contained in:
Wayne
2025-07-22 18:48:03 +03:00
parent 1b81647ff4
commit 5b967836b1
5 changed files with 181 additions and 87 deletions

View File

@@ -19,10 +19,12 @@
"@aws-sdk/client-s3": "^3.844.0",
"@aws-sdk/lib-storage": "^3.844.0",
"@azure/msal-node": "^3.6.3",
"@microsoft/microsoft-graph-client": "^3.0.7",
"@open-archiver/types": "workspace:*",
"axios": "^1.10.0",
"bcryptjs": "^3.0.2",
"bullmq": "^5.56.3",
"cross-fetch": "^4.1.0",
"dotenv": "^17.2.0",
"drizzle-orm": "^0.44.2",
"express": "^5.1.0",
@@ -49,6 +51,7 @@
"@bull-board/express": "^6.11.0",
"@types/express": "^5.0.3",
"@types/mailparser": "^3.4.6",
"@types/microsoft-graph": "^2.40.1",
"@types/node": "^24.0.12",
"bull-board": "^2.1.3",
"drizzle-kit": "^0.31.4",

View File

@@ -1,122 +1,208 @@
import type { Microsoft365Credentials, EmailObject, EmailAddress, SyncState } from '@open-archiver/types';
import 'cross-fetch/polyfill';
import type {
Microsoft365Credentials,
EmailObject,
EmailAddress,
SyncState
} from '@open-archiver/types';
import type { IEmailConnector } from '../EmailProviderFactory';
import { ConfidentialClientApplication } from '@azure/msal-node';
import { logger } from '../../config/logger';
import { simpleParser, ParsedMail, Attachment, AddressObject } from 'mailparser';
import axios from 'axios';
const GRAPH_API_ENDPOINT = 'https://graph.microsoft.com/v1.0';
import { ConfidentialClientApplication, Configuration, LogLevel } from '@azure/msal-node';
import { Client } from '@microsoft/microsoft-graph-client';
import type { User } from 'microsoft-graph';
import type { AuthProvider } from '@microsoft/microsoft-graph-client';
/**
* A connector for Microsoft 365 that uses the Microsoft Graph API with client credentials (app-only)
* to access data on behalf of the organization.
*/
export class MicrosoftConnector implements IEmailConnector {
private cca: ConfidentialClientApplication;
private credentials: Microsoft365Credentials;
private graphClient: Client;
private newDeltaToken: string | undefined;
constructor(private credentials: Microsoft365Credentials) {
this.cca = new ConfidentialClientApplication({
constructor(credentials: Microsoft365Credentials) {
this.credentials = credentials;
const msalConfig: Configuration = {
auth: {
clientId: this.credentials.clientId,
authority: `https://login.microsoftonline.com/${this.credentials.tenantId}`,
clientSecret: this.credentials.clientSecret,
},
});
}
private async getAccessToken(): Promise<string> {
const result = await this.cca.acquireTokenByClientCredential({
scopes: ['https://graph.microsoft.com/.default'],
});
if (!result?.accessToken) {
throw new Error('Failed to acquire access token');
}
return result.accessToken;
system: {
loggerOptions: {
loggerCallback(loglevel, message, containsPii) {
if (containsPii) return;
switch (loglevel) {
case LogLevel.Error:
logger.error(message);
return;
case LogLevel.Warning:
logger.warn(message);
return;
case LogLevel.Info:
logger.info(message);
return;
case LogLevel.Verbose:
logger.debug(message);
return;
}
},
piiLoggingEnabled: false,
logLevel: LogLevel.Warning,
}
}
};
const msalClient = new ConfidentialClientApplication(msalConfig);
const authProvider: AuthProvider = async (done) => {
try {
const response = await msalClient.acquireTokenByClientCredential({
scopes: ['https://graph.microsoft.com/.default'],
});
if (!response?.accessToken) {
throw new Error('Failed to acquire access token.');
}
done(null, response.accessToken);
} catch (error) {
logger.error({ err: error }, 'Failed to acquire Microsoft Graph access token');
done(error, null);
}
};
this.graphClient = Client.init({ authProvider });
}
/**
* Tests the connection and authentication by attempting to list the first user
* from the directory.
*/
public async testConnection(): Promise<boolean> {
try {
await this.getAccessToken();
await this.graphClient.api('/users').top(1).get();
logger.info('Microsoft 365 connection test successful.');
return true;
} catch (error) {
console.error('Failed to verify Microsoft 365 connection:', error);
logger.error({ err: error }, 'Failed to verify Microsoft 365 connection');
return false;
}
}
public async *fetchEmails(userEmail: string, syncState?: SyncState | null): AsyncGenerator<EmailObject> {
const accessToken = await this.getAccessToken();
const headers = { Authorization: `Bearer ${accessToken}` };
/**
* Lists all users in the Microsoft 365 tenant.
* This method handles pagination to retrieve the complete list of users.
* @returns An async generator that yields each user object.
*/
public async *listAllUsers(): AsyncGenerator<User> {
let request = this.graphClient.api('/users').select('id,userPrincipalName,displayName');
let nextLink: string | undefined;
try {
let response = await request.get();
while (response) {
for (const user of response.value) {
yield user;
}
if (response['@odata.nextLink']) {
response = await this.graphClient.api(response['@odata.nextLink']).get();
} else {
break;
}
}
} catch (error) {
logger.error({ err: error }, 'Failed to list all users from Microsoft 365');
throw error;
}
}
/**
* Fetches emails for a single user, using a delta query for continuous sync.
* @param userEmail The user principal name or ID of the user.
* @param syncState Optional state containing the deltaToken.
* @returns An async generator that yields each raw email object.
*/
public async *fetchEmails(
userEmail: string,
syncState?: SyncState | null
): AsyncGenerator<EmailObject> {
const deltaToken = syncState?.microsoft?.[userEmail]?.deltaToken;
let requestUrl = deltaToken
? deltaToken
: `/users/${userEmail}/messages/delta`;
if (deltaToken) {
nextLink = `${GRAPH_API_ENDPOINT}/me/mailFolders/allmail/messages/delta?$deltaToken=${deltaToken}`;
} else {
nextLink = `${GRAPH_API_ENDPOINT}/me/mailFolders/allmail/messages/delta`;
}
try {
while (requestUrl) {
const response = await this.graphClient.api(requestUrl).get();
while (nextLink) {
const res: { data: { value: any[]; '@odata.nextLink'?: string; '@odata.deltaLink'?: string; }; } = await axios.get(
nextLink,
{ headers }
);
const messages = res.data.value;
const deltaLink = res.data['@odata.deltaLink'];
if (deltaLink) {
const deltaToken = new URL(deltaLink).searchParams.get('$deltatoken');
if (deltaToken) {
this.newDeltaToken = deltaToken;
for (const message of response.value) {
if (message.id && !(message as any)['@removed']) {
const rawEmail = await this.getRawEmail(userEmail, message.id);
if (rawEmail) {
yield await this.parseEmail(rawEmail, message.id, userEmail);
}
}
}
}
for (const message of messages) {
// Skip if the message is deleted
if (message['@removed']) {
continue;
if (response['@odata.deltaLink']) {
this.newDeltaToken = response['@odata.deltaLink'];
}
const rawContentRes = await axios.get(
`${GRAPH_API_ENDPOINT}/users/me/messages/${message.id}/$value`,
{ headers }
);
const emlBuffer = Buffer.from(rawContentRes.data, 'utf-8');
const parsedEmail: ParsedMail = await simpleParser(emlBuffer);
const attachments = parsedEmail.attachments.map((attachment: Attachment) => ({
filename: attachment.filename || 'untitled',
contentType: attachment.contentType,
size: attachment.size,
content: attachment.content as Buffer
}));
const mapAddresses = (
addresses: AddressObject | AddressObject[] | undefined
): EmailAddress[] => {
if (!addresses) return [];
const addressArray = Array.isArray(addresses) ? addresses : [addresses];
return addressArray.flatMap(a =>
a.value.map(v => ({ name: v.name, address: v.address || '' }))
);
};
yield {
id: message.id,
from: mapAddresses(parsedEmail.from),
to: mapAddresses(parsedEmail.to),
cc: mapAddresses(parsedEmail.cc),
bcc: mapAddresses(parsedEmail.bcc),
subject: parsedEmail.subject || '',
body: parsedEmail.text || '',
html: parsedEmail.html || '',
headers: parsedEmail.headers as any,
attachments,
receivedAt: parsedEmail.date || new Date(),
eml: emlBuffer
};
requestUrl = response['@odata.nextLink'];
}
nextLink = res.data['@odata.nextLink'];
} catch (error) {
logger.error({ err: error, userEmail }, 'Failed to fetch emails from Microsoft 365');
throw error;
}
}
private async getRawEmail(userEmail: string, messageId: string): Promise<Buffer | null> {
try {
const response = await this.graphClient.api(`/users/${userEmail}/messages/${messageId}/$value`).getStream();
const chunks: any[] = [];
for await (const chunk of response) {
chunks.push(chunk);
}
return Buffer.concat(chunks);
} catch (error) {
logger.error({ err: error, userEmail, messageId }, 'Failed to fetch raw email content.');
return null;
}
}
private async parseEmail(rawEmail: Buffer, messageId: string, userEmail: string): Promise<EmailObject> {
const parsedEmail: ParsedMail = await simpleParser(rawEmail);
const attachments = parsedEmail.attachments.map((attachment: Attachment) => ({
filename: attachment.filename || 'untitled',
contentType: attachment.contentType,
size: attachment.size,
content: attachment.content as Buffer
}));
const mapAddresses = (addresses: AddressObject | AddressObject[] | undefined): EmailAddress[] => {
if (!addresses) return [];
const addressArray = Array.isArray(addresses) ? addresses : [addresses];
return addressArray.flatMap(a => a.value.map(v => ({ name: v.name, address: v.address || '' })));
};
return {
id: messageId,
userEmail: userEmail,
eml: rawEmail,
from: mapAddresses(parsedEmail.from),
to: mapAddresses(parsedEmail.to),
cc: mapAddresses(parsedEmail.cc),
bcc: mapAddresses(parsedEmail.bcc),
subject: parsedEmail.subject || '',
body: parsedEmail.text || '',
html: parsedEmail.html || '',
headers: parsedEmail.headers as any,
attachments,
receivedAt: parsedEmail.date || new Date(),
};
}
public getUpdatedSyncState(userEmail: string): SyncState {
if (!this.newDeltaToken) {
return {};

View File

@@ -101,6 +101,10 @@
class="col-span-3"
/>
</div>
<div class="grid grid-cols-4 items-center gap-4">
<Label for="tenantId" class="text-right">Tenant ID</Label>
<Input id="tenantId" bind:value={formData.providerConfig.tenantId} class="col-span-3" />
</div>
{:else if formData.provider === 'generic_imap'}
<div class="grid grid-cols-4 items-center gap-4">
<Label for="host" class="text-right">Host</Label>

View File

@@ -56,6 +56,7 @@ export interface Microsoft365Credentials extends BaseIngestionCredentials {
type: 'microsoft_365';
clientId: string;
clientSecret: string;
tenantId: string;
}
// Discriminated union for all possible credential types

File diff suppressed because one or more lines are too long