Microsoft 365 syncState fix

This commit is contained in:
Wayne
2025-07-23 14:26:32 +03:00
parent 6b820e80c9
commit 7bd1b2d77a
3 changed files with 76 additions and 48 deletions

View File

@@ -11,7 +11,7 @@ import { logger } from '../../config/logger';
import { simpleParser, ParsedMail, Attachment, AddressObject } from 'mailparser';
import { ConfidentialClientApplication, Configuration, LogLevel } from '@azure/msal-node';
import { Client } from '@microsoft/microsoft-graph-client';
import type { User } from 'microsoft-graph';
import type { User, MailFolder } from 'microsoft-graph';
import type { AuthProvider } from '@microsoft/microsoft-graph-client';
/**
@@ -21,10 +21,12 @@ import type { AuthProvider } from '@microsoft/microsoft-graph-client';
export class MicrosoftConnector implements IEmailConnector {
private credentials: Microsoft365Credentials;
private graphClient: Client;
private newDeltaToken: string | undefined;
// Store delta tokens for each folder during a sync operation.
private newDeltaTokens: { [folderId: string]: string; };
constructor(credentials: Microsoft365Credentials) {
this.credentials = credentials;
this.newDeltaTokens = {}; // Initialize as an empty object
const msalConfig: Configuration = {
auth: {
@@ -126,29 +128,78 @@ export class MicrosoftConnector implements IEmailConnector {
}
/**
* Fetches emails for a single user, using a delta query for continuous sync.
* Fetches emails for a single user by iterating through all mail folders and
* performing a delta query on each.
* @param userEmail The user principal name or ID of the user.
* @param syncState Optional state containing the deltaToken.
* @param syncState Optional state containing the deltaTokens for each folder.
* @returns An async generator that yields each raw email object.
*/
public async *fetchEmails(
userEmail: string,
syncState?: SyncState | null
): AsyncGenerator<EmailObject> {
const deltaToken = syncState?.microsoft?.[userEmail]?.deltaToken;
if (!deltaToken) {
// Initial sync: fetch all messages and set the initial delta token.
yield* this.fetchAllMessagesAndSetDeltaToken(userEmail);
return;
}
// Continuous sync: use the existing delta token to fetch changes.
this.newDeltaToken = deltaToken; // Preserve the token in case there are no new messages.
let requestUrl: string | undefined = deltaToken;
this.newDeltaTokens = syncState?.microsoft?.[userEmail]?.deltaTokens || {};
try {
while (requestUrl) {
const folders = this.listAllFolders(userEmail);
for await (const folder of folders) {
if (folder.id) {
logger.info({ userEmail, folderId: folder.id, folderName: folder.displayName }, 'Syncing folder');
yield* this.syncFolder(userEmail, folder.id, this.newDeltaTokens[folder.id]);
}
}
} catch (error) {
logger.error({ err: error, userEmail }, 'Failed to fetch emails from Microsoft 365');
throw error;
}
}
/**
* Lists all mail folders for a given user.
* @param userEmail The user principal name or ID.
* @returns An async generator that yields each mail folder.
*/
private async *listAllFolders(userEmail: string): AsyncGenerator<MailFolder> {
let requestUrl: string | undefined = `/users/${userEmail}/mailFolders`;
while (requestUrl) {
try {
const response = await this.graphClient.api(requestUrl).get();
for (const folder of response.value as MailFolder[]) {
yield folder;
}
requestUrl = response['@odata.nextLink'];
} catch (error) {
logger.error({ err: error, userEmail }, 'Failed to list mail folders');
throw error; // Stop if we can't list folders
}
}
}
/**
* Performs a delta sync on a single mail folder.
* @param userEmail The user's email.
* @param folderId The ID of the folder to sync.
* @param deltaToken The existing delta token for this folder, if any.
* @returns An async generator that yields email objects.
*/
private async *syncFolder(
userEmail: string,
folderId: string,
deltaToken?: string
): AsyncGenerator<EmailObject> {
let requestUrl: string | undefined;
if (deltaToken) {
// Continuous sync
requestUrl = deltaToken;
} else {
// Initial sync
requestUrl = `/users/${userEmail}/mailFolders/${folderId}/messages/delta`;
}
while (requestUrl) {
try {
const response = await this.graphClient.api(requestUrl).get();
for (const message of response.value) {
@@ -161,38 +212,15 @@ export class MicrosoftConnector implements IEmailConnector {
}
if (response['@odata.deltaLink']) {
this.newDeltaToken = response['@odata.deltaLink'];
this.newDeltaTokens[folderId] = response['@odata.deltaLink'];
}
requestUrl = response['@odata.nextLink'];
} catch (error) {
logger.error({ err: error, userEmail, folderId }, 'Failed to sync mail folder');
// Continue to the next folder if one fails
return;
}
} catch (error) {
logger.error({ err: error, userEmail }, 'Failed to fetch emails from Microsoft 365');
throw error;
}
}
private async *fetchAllMessagesAndSetDeltaToken(userEmail: string): AsyncGenerator<EmailObject> {
let requestUrl: string | undefined = `/users/${userEmail}/mailFolders/AllItems/messages/delta`;
this.newDeltaToken = undefined; // Ensure it starts clean for initial sync
while (requestUrl) {
const response = await this.graphClient.api(requestUrl).get();
for (const message of response.value) {
if (message.id && !(message as any)['@removed']) {
const rawEmail = await this.getRawEmail(userEmail, message.id);
if (rawEmail) {
yield await this.parseEmail(rawEmail, message.id, userEmail);
}
}
}
if (response['@odata.deltaLink']) {
this.newDeltaToken = response['@odata.deltaLink'];
}
requestUrl = response['@odata.nextLink'];
}
}
@@ -242,13 +270,13 @@ export class MicrosoftConnector implements IEmailConnector {
}
public getUpdatedSyncState(userEmail: string): SyncState {
if (!this.newDeltaToken) {
if (Object.keys(this.newDeltaTokens).length === 0) {
return {};
}
return {
microsoft: {
[userEmail]: {
deltaToken: this.newDeltaToken
deltaTokens: this.newDeltaTokens
}
}
};

View File

@@ -6,7 +6,7 @@ export type SyncState = {
};
microsoft?: {
[userEmail: string]: {
deltaToken: string;
deltaTokens: { [folderId: string]: string; };
};
};
imap?: {

File diff suppressed because one or more lines are too long