Atomically update syncState

This commit is contained in:
Wayne
2025-07-22 22:45:32 +03:00
parent 36fcaa0475
commit e67cf33d5f
3 changed files with 68 additions and 32 deletions

View File

@@ -5,10 +5,9 @@ import { logger } from '../../config/logger';
import { EmailProviderFactory } from '../../services/EmailProviderFactory';
import { StorageService } from '../../services/StorageService';
import { IngestionSource, SyncState } from '@open-archiver/types';
import { db } from '../../database';
import { ingestionSources } from '../../database/schema';
import { eq } from 'drizzle-orm';
import { eq, sql } from 'drizzle-orm';
export const processMailboxProcessor = async (job: Job<IProcessMailboxJob, any, string>) => {
const { ingestionSourceId, userEmail } = job.data;
@@ -33,35 +32,39 @@ export const processMailboxProcessor = async (job: Job<IProcessMailboxJob, any,
}
const newSyncState = connector.getUpdatedSyncState(userEmail);
// console.log('new sync state: ', newSyncState);
// Atomically update the syncState JSONB field
if (Object.keys(newSyncState).length > 0) {
const currentSource = (await db
.select({ syncState: ingestionSources.syncState })
.from(ingestionSources)
.where(eq(ingestionSources.id, ingestionSourceId))) as IngestionSource[];
const currentSyncState = currentSource[0]?.syncState || {};
// Atomically update the syncState JSONB field to prevent race conditions
const provider = Object.keys(newSyncState)[0] as keyof typeof newSyncState | undefined;
const mergedSyncState: SyncState = { ...currentSyncState };
if (provider && newSyncState[provider]) {
let path: (string | number)[];
let userState: any;
if (newSyncState.google) {
mergedSyncState.google = { ...mergedSyncState.google, ...newSyncState.google };
}
if (newSyncState.microsoft) {
mergedSyncState.microsoft = { ...mergedSyncState.microsoft, ...newSyncState.microsoft };
}
if (newSyncState.imap) {
mergedSyncState.imap = newSyncState.imap;
if (provider === 'imap') {
path = ['imap'];
userState = newSyncState.imap;
} else {
// Handles 'google' and 'microsoft'
path = [provider, userEmail];
userState = (newSyncState[provider] as any)?.[userEmail];
}
await db
.update(ingestionSources)
.set({
syncState: mergedSyncState,
updatedAt: new Date()
})
.where(eq(ingestionSources.id, ingestionSourceId));
if (userState) {
await db
.update(ingestionSources)
.set({
syncState: sql`jsonb_set(
COALESCE(${ingestionSources.syncState}, '{}'::jsonb),
'{${sql.raw(path.join(','))}}',
${JSON.stringify(userState)}::jsonb,
true
)`,
updatedAt: new Date()
})
.where(eq(ingestionSources.id, ingestionSourceId));
} else {
logger.warn({ ingestionSourceId, userEmail, provider }, `No sync state found for user under provider`);
}
}
logger.info({ ingestionSourceId, userEmail }, `Finished processing mailbox for user`);
} catch (error) {

View File

@@ -136,9 +136,16 @@ export class MicrosoftConnector implements IEmailConnector {
syncState?: SyncState | null
): AsyncGenerator<EmailObject> {
const deltaToken = syncState?.microsoft?.[userEmail]?.deltaToken;
let requestUrl = deltaToken
? deltaToken
: `/users/${userEmail}/mailFolders/AllItems/messages/delta`;
if (!deltaToken) {
// Initial sync: fetch all messages and set the initial delta token.
yield* this.fetchAllMessagesAndSetDeltaToken(userEmail);
return;
}
// Continuous sync: use the existing delta token to fetch changes.
this.newDeltaToken = deltaToken; // Preserve the token in case there are no new messages.
let requestUrl: string | undefined = deltaToken;
try {
while (requestUrl) {
@@ -165,6 +172,30 @@ export class MicrosoftConnector implements IEmailConnector {
}
}
private async *fetchAllMessagesAndSetDeltaToken(userEmail: string): AsyncGenerator<EmailObject> {
let requestUrl: string | undefined = `/users/${userEmail}/mailFolders/AllItems/messages/delta`;
this.newDeltaToken = undefined; // Ensure it starts clean for initial sync
while (requestUrl) {
const response = await this.graphClient.api(requestUrl).get();
for (const message of response.value) {
if (message.id && !(message as any)['@removed']) {
const rawEmail = await this.getRawEmail(userEmail, message.id);
if (rawEmail) {
yield await this.parseEmail(rawEmail, message.id, userEmail);
}
}
}
if (response['@odata.deltaLink']) {
this.newDeltaToken = response['@odata.deltaLink'];
}
requestUrl = response['@odata.nextLink'];
}
}
private async getRawEmail(userEmail: string, messageId: string): Promise<Buffer | null> {
try {
const response = await this.graphClient.api(`/users/${userEmail}/messages/${messageId}/$value`).getStream();

View File

@@ -107,9 +107,11 @@
<Table.Row>
<Table.Cell>{new Date(email.sentAt).toLocaleString()}</Table.Cell>
<Table.Cell>
<a href={`/dashboard/archived-emails/${email.id}`}>
{email.subject}
</a>
<div class="max-w-100 truncate">
<a href={`/dashboard/archived-emails/${email.id}`}>
{email.subject}
</a>
</div>
</Table.Cell>
<Table.Cell>{email.senderEmail}</Table.Cell>
<Table.Cell>{email.hasAttachments ? 'Yes' : 'No'}</Table.Cell>