Continuous syncing fix

This commit is contained in:
Wayne
2025-07-22 13:49:13 +03:00
parent 5e42bef8ad
commit e7bb545cfa
12 changed files with 135 additions and 27 deletions

View File

@@ -4,6 +4,7 @@ import { IContinuousSyncJob } from '@open-archiver/types';
import { EmailProviderFactory } from '../../services/EmailProviderFactory';
import { flowProducer } from '../queues';
import { logger } from '../../config/logger';
import { ImapConnector } from '../../services/ingestion-connectors/ImapConnector';
export default async (job: Job<IContinuousSyncJob>) => {
const { ingestionSourceId } = job.data;
@@ -26,12 +27,16 @@ export default async (job: Job<IContinuousSyncJob>) => {
const jobs = [];
if (!connector.listAllUsers) {
// This is for single-mailbox providers like Generic IMAP
let userEmail = 'Default';
if (connector instanceof ImapConnector) {
userEmail = connector.returnImapUserEmail();
}
jobs.push({
name: 'process-mailbox',
queueName: 'ingestion',
data: {
ingestionSourceId: source.id,
userEmail: 'default' // A placeholder, as it's not needed for IMAP
userEmail: userEmail
}
});
} else {
@@ -43,7 +48,7 @@ export default async (job: Job<IContinuousSyncJob>) => {
queueName: 'ingestion',
data: {
ingestionSourceId: source.id,
userEmail: user.primaryEmail,
userEmail: user.primaryEmail
}
});
}

View File

@@ -33,7 +33,7 @@ export default async (job: Job<IInitialImportJob>) => {
queueName: 'ingestion',
data: {
ingestionSourceId,
userEmail: user.primaryEmail
userEmail: user.primaryEmail,
}
});
userCount++;
@@ -62,6 +62,15 @@ export default async (job: Job<IInitialImportJob>) => {
} else {
// For other providers, we might trigger a simpler bulk import directly
await new IngestionService().performBulkImport(job.data);
await flowProducer.add({
name: 'sync-cycle-finished',
queueName: 'ingestion',
data: {
ingestionSourceId,
userCount: 1,
isInitialImport: true
}
});
}
logger.info({ ingestionSourceId }, 'Finished initial import master job');

View File

@@ -27,7 +27,9 @@ export const processMailboxProcessor = async (job: Job<IProcessMailboxJob, any,
// Pass the sync state for the entire source, the connector will handle per-user logic if necessary
for await (const email of connector.fetchEmails(userEmail, source.syncState)) {
await ingestionService.processEmail(email, source, storageService);
if (email) {
await ingestionService.processEmail(email, source, storageService);
}
}
const newSyncState = connector.getUpdatedSyncState(userEmail);

View File

@@ -13,9 +13,10 @@ import { ImapConnector } from './ingestion-connectors/ImapConnector';
// Define a common interface for all connectors
export interface IEmailConnector {
testConnection(): Promise<boolean>;
fetchEmails(userEmail: string, syncState?: SyncState | null): AsyncGenerator<EmailObject>;
fetchEmails(userEmail: string, syncState?: SyncState | null): AsyncGenerator<EmailObject | null>;
getUpdatedSyncState(userEmail?: string): SyncState;
listAllUsers?(): AsyncGenerator<any>;
returnImapUserEmail?(): string;
}
export class EmailProviderFactory {

View File

@@ -154,19 +154,20 @@ export class IngestionService {
}
} else {
// For single-mailbox providers, dispatch a single job
// console.log('source.credentials ', source.credentials);
await ingestionQueue.add('process-mailbox', {
ingestionSourceId: source.id,
userEmail: 'default' // Placeholder, as it's not needed for IMAP
userEmail: source.credentials.type === 'generic_imap' ? source.credentials.username : 'Default'
});
}
await IngestionService.update(ingestionSourceId, {
status: 'active',
lastSyncFinishedAt: new Date(),
lastSyncStatusMessage: 'Successfully initiated bulk import for all mailboxes.'
});
console.log(`Bulk import job dispatch finished for source: ${source.name} (${source.id})`);
// await IngestionService.update(ingestionSourceId, {
// status: 'active',
// lastSyncFinishedAt: new Date(),
// lastSyncStatusMessage: 'Successfully initiated bulk import for all mailboxes.'
// });
// console.log(`Bulk import job dispatch finished for source: ${source.name} (${source.id})`);
} catch (error) {
console.error(`Bulk import failed for source: ${source.name} (${source.id})`, error);
await IngestionService.update(ingestionSourceId, {

View File

@@ -30,8 +30,10 @@ export class ImapConnector implements IEmailConnector {
return false;
}
}
public async *fetchEmails(userEmail: string, syncState?: SyncState | null): AsyncGenerator<EmailObject> {
public returnImapUserEmail(): string {
return this.credentials.username;
}
public async *fetchEmails(userEmail: string, syncState?: SyncState | null): AsyncGenerator<EmailObject | null> {
await this.client.connect();
try {
const mailbox = await this.client.mailboxOpen('INBOX');
@@ -42,17 +44,22 @@ export class ImapConnector implements IEmailConnector {
// Determine the highest UID in the mailbox currently.
// This ensures that even if no new emails are fetched, the sync state is updated to the latest UID.
if (mailbox.exists > 0) {
const highestUidInMailbox = mailbox.uidNext - 1;
if (highestUidInMailbox > this.newMaxUid) {
this.newMaxUid = highestUidInMailbox;
const lastMessage = await this.client.fetchOne(String(mailbox.exists), { uid: true });
if (lastMessage && lastMessage.uid > this.newMaxUid) {
this.newMaxUid = lastMessage.uid;
}
}
// If lastUid exists, fetch all emails with a UID greater than it.
// Otherwise, fetch all emails.
const searchCriteria = lastUid ? { uid: `${lastUid + 1}:*` } : { all: true };
for await (const msg of this.client.fetch(searchCriteria, { envelope: true, source: true, bodyStructure: true, uid: true })) {
// Defensive check: Ensure we do not process emails we should have already synced.
if (lastUid && msg.uid <= lastUid) {
console.warn(`IMAP fetch returned UID ${msg.uid} which is not greater than last synced UID ${lastUid}. Skipping.`);
continue;
}
if (msg.uid > this.newMaxUid) {
this.newMaxUid = msg.uid;
}

View File

@@ -29,6 +29,11 @@
}
});
$effect(() => {
formData.providerConfig.type = formData.provider;
console.log(formData);
});
const triggerContent = $derived(
providerOptions.find((p) => p.value === formData.provider)?.label ?? 'Select a provider'
);

View File

@@ -0,0 +1,49 @@
<script lang="ts" module>
import { type VariantProps, tv } from 'tailwind-variants';
export const badgeVariants = tv({
base: 'focus-visible:border-ring focus-visible:ring-ring/50 aria-invalid:ring-destructive/20 dark:aria-invalid:ring-destructive/40 aria-invalid:border-destructive inline-flex w-fit shrink-0 items-center justify-center gap-1 overflow-hidden whitespace-nowrap rounded-md border px-2 py-0.5 text-xs font-medium transition-[color,box-shadow] focus-visible:ring-[3px] [&>svg]:pointer-events-none [&>svg]:size-3',
variants: {
variant: {
default: 'bg-primary text-primary-foreground [a&]:hover:bg-primary/90 border-transparent',
secondary:
'bg-secondary text-secondary-foreground [a&]:hover:bg-secondary/90 border-transparent',
destructive:
'bg-destructive [a&]:hover:bg-destructive/90 focus-visible:ring-destructive/20 dark:focus-visible:ring-destructive/40 dark:bg-destructive/70 border-transparent text-white',
outline: 'text-foreground [a&]:hover:bg-accent [a&]:hover:text-accent-foreground'
}
},
defaultVariants: {
variant: 'default'
}
});
export type BadgeVariant = VariantProps<typeof badgeVariants>['variant'];
</script>
<script lang="ts">
import type { HTMLAnchorAttributes } from 'svelte/elements';
import { cn, type WithElementRef } from '$lib/utils.js';
let {
ref = $bindable(null),
href,
class: className,
variant = 'default',
children,
...restProps
}: WithElementRef<HTMLAnchorAttributes> & {
variant?: BadgeVariant;
} = $props();
</script>
<svelte:element
this={href ? 'a' : 'span'}
bind:this={ref}
data-slot="badge"
{href}
class={cn(badgeVariants({ variant }), className)}
{...restProps}
>
{@render children?.()}
</svelte:element>

View File

@@ -0,0 +1,2 @@
export { default as Badge } from "./badge.svelte";
export { badgeVariants, type BadgeVariant } from "./badge.svelte";

View File

@@ -9,6 +9,8 @@
import IngestionSourceForm from '$lib/components/custom/IngestionSourceForm.svelte';
import { api } from '$lib/api.client';
import type { IngestionSource, CreateIngestionSourceDto } from '@open-archiver/types';
import Badge from '$lib/components/ui/badge/badge.svelte';
import type { BadgeVariant } from '$lib/components/ui/badge/badge.svelte';
let { data }: { data: PageData } = $props();
@@ -88,6 +90,27 @@
}
isDialogOpen = false;
};
function getStatusClasses(status: IngestionSource['status']): string {
switch (status) {
case 'active':
return 'bg-green-100 text-green-800 dark:bg-green-900 dark:text-green-300';
case 'paused':
return 'bg-gray-100 text-gray-800 dark:bg-gray-700 dark:text-gray-300';
case 'error':
return 'bg-red-100 text-red-800 dark:bg-red-900 dark:text-red-300';
case 'syncing':
return 'bg-blue-100 text-blue-800 dark:bg-blue-900 dark:text-blue-300';
case 'importing':
return 'bg-purple-100 text-purple-800 dark:bg-purple-900 dark:text-purple-300';
case 'pending_auth':
return 'bg-yellow-100 text-yellow-800 dark:bg-yellow-900 dark:text-yellow-300';
case 'auth_success':
return 'bg-gray-100 text-gray-800 dark:bg-gray-700 dark:text-gray-300';
default:
return 'bg-gray-100 text-gray-800 dark:bg-gray-700 dark:text-gray-300';
}
}
</script>
<div class="">
@@ -115,11 +138,11 @@
<Table.Cell>
<a href="/dashboard/archived-emails?ingestionSourceId={source.id}">{source.name}</a>
</Table.Cell>
<Table.Cell>{source.provider}</Table.Cell>
<Table.Cell class=" min-w-24">
<span>
{source.status}
</span>
<Table.Cell class="capitalize">{source.provider.split('_').join(' ')}</Table.Cell>
<Table.Cell class="min-w-24">
<Badge class="{getStatusClasses(source.status)} capitalize">
{source.status.split('_').join(' ')}
</Badge>
</Table.Cell>
<Table.Cell>
<Switch

View File

@@ -26,7 +26,11 @@ export type IngestionStatus =
| 'importing'
| 'auth_success';
export interface GenericImapCredentials {
export interface BaseIngestionCredentials {
type: IngestionProvider;
}
export interface GenericImapCredentials extends BaseIngestionCredentials {
type: 'generic_imap';
host: string;
port: number;
@@ -35,7 +39,7 @@ export interface GenericImapCredentials {
password?: string;
}
export interface GoogleWorkspaceCredentials {
export interface GoogleWorkspaceCredentials extends BaseIngestionCredentials {
type: 'google_workspace';
/**
* The full JSON content of the Google Service Account key.
@@ -48,7 +52,7 @@ export interface GoogleWorkspaceCredentials {
impersonatedAdminEmail: string;
}
export interface Microsoft365Credentials {
export interface Microsoft365Credentials extends BaseIngestionCredentials {
type: 'microsoft_365';
clientId: string;
clientSecret: string;

File diff suppressed because one or more lines are too long