mirror of
https://github.com/LogicLabs-OU/OpenArchiver.git
synced 2026-04-06 00:31:57 +02:00
Compare commits
2 Commits
v0.5.0
...
v0.4.2-fix
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a4e50f7c7f | ||
|
|
61f68ca9a6 |
@@ -43,7 +43,16 @@ export const processMailboxProcessor = async (job: Job<IProcessMailboxJob, SyncS
|
||||
const connector = EmailProviderFactory.createConnector(source);
|
||||
const ingestionService = new IngestionService();
|
||||
|
||||
for await (const email of connector.fetchEmails(userEmail, source.syncState)) {
|
||||
// Create a callback to check for duplicates without fetching full email content
|
||||
const checkDuplicate = async (messageId: string) => {
|
||||
return await IngestionService.doesEmailExist(messageId, ingestionSourceId);
|
||||
};
|
||||
|
||||
for await (const email of connector.fetchEmails(
|
||||
userEmail,
|
||||
source.syncState,
|
||||
checkDuplicate
|
||||
)) {
|
||||
if (email) {
|
||||
const processedEmail = await ingestionService.processEmail(
|
||||
email,
|
||||
|
||||
@@ -22,7 +22,8 @@ export interface IEmailConnector {
|
||||
testConnection(): Promise<boolean>;
|
||||
fetchEmails(
|
||||
userEmail: string,
|
||||
syncState?: SyncState | null
|
||||
syncState?: SyncState | null,
|
||||
checkDuplicate?: (messageId: string) => Promise<boolean>
|
||||
): AsyncGenerator<EmailObject | null>;
|
||||
getUpdatedSyncState(userEmail?: string): SyncState;
|
||||
listAllUsers(): AsyncGenerator<MailboxUser>;
|
||||
|
||||
@@ -85,7 +85,7 @@ export class IngestionService {
|
||||
|
||||
const decryptedSource = this.decryptSource(newSource);
|
||||
if (!decryptedSource) {
|
||||
await this.delete(newSource.id, actor, actorIp);
|
||||
await this.delete(newSource.id, actor, actorIp, true);
|
||||
throw new Error(
|
||||
'Failed to process newly created ingestion source due to a decryption error.'
|
||||
);
|
||||
@@ -107,7 +107,7 @@ export class IngestionService {
|
||||
}
|
||||
} catch (error) {
|
||||
// If connection fails, delete the newly created source and throw the error.
|
||||
await this.delete(decryptedSource.id, actor, actorIp);
|
||||
await this.delete(decryptedSource.id, actor, actorIp, true);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
@@ -205,8 +205,15 @@ export class IngestionService {
|
||||
return decryptedSource;
|
||||
}
|
||||
|
||||
public static async delete(id: string, actor: User, actorIp: string): Promise<IngestionSource> {
|
||||
checkDeletionEnabled();
|
||||
public static async delete(
|
||||
id: string,
|
||||
actor: User,
|
||||
actorIp: string,
|
||||
force: boolean = false
|
||||
): Promise<IngestionSource> {
|
||||
if (!force) {
|
||||
checkDeletionEnabled();
|
||||
}
|
||||
const source = await this.findById(id);
|
||||
if (!source) {
|
||||
throw new Error('Ingestion source not found');
|
||||
@@ -383,6 +390,25 @@ export class IngestionService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Quickly checks if an email exists in the database by its Message-ID header.
|
||||
* This is used to skip downloading duplicate emails during ingestion.
|
||||
*/
|
||||
public static async doesEmailExist(
|
||||
messageId: string,
|
||||
ingestionSourceId: string
|
||||
): Promise<boolean> {
|
||||
const existingEmail = await db.query.archivedEmails.findFirst({
|
||||
where: and(
|
||||
eq(archivedEmails.messageIdHeader, messageId),
|
||||
eq(archivedEmails.ingestionSourceId, ingestionSourceId)
|
||||
),
|
||||
columns: { id: true },
|
||||
});
|
||||
|
||||
return !!existingEmail;
|
||||
}
|
||||
|
||||
public async processEmail(
|
||||
email: EmailObject,
|
||||
source: IngestionSource,
|
||||
|
||||
@@ -58,7 +58,7 @@ export class EMLConnector implements IEmailConnector {
|
||||
try {
|
||||
const filePath = this.getFilePath();
|
||||
if (!filePath) {
|
||||
throw Error('EML file path not provided.');
|
||||
throw Error('EML Zip file path not provided.');
|
||||
}
|
||||
if (!filePath.includes('.zip')) {
|
||||
throw Error('Provided file is not in the ZIP format.');
|
||||
@@ -77,12 +77,21 @@ export class EMLConnector implements IEmailConnector {
|
||||
}
|
||||
|
||||
if (!fileExist) {
|
||||
throw Error('EML file not found or upload not finished yet, please wait.');
|
||||
if (this.credentials.localFilePath) {
|
||||
throw Error(`EML Zip file not found at path: ${this.credentials.localFilePath}`);
|
||||
} else {
|
||||
throw Error(
|
||||
'Uploaded EML Zip file not found. The upload may not have finished yet, or it failed.'
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
} catch (error) {
|
||||
logger.error({ error, credentials: this.credentials }, 'EML file validation failed.');
|
||||
logger.error(
|
||||
{ error, credentials: this.credentials },
|
||||
'EML Zip file validation failed.'
|
||||
);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -132,7 +132,8 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
|
||||
*/
|
||||
public async *fetchEmails(
|
||||
userEmail: string,
|
||||
syncState?: SyncState | null
|
||||
syncState?: SyncState | null,
|
||||
checkDuplicate?: (messageId: string) => Promise<boolean>
|
||||
): AsyncGenerator<EmailObject> {
|
||||
const authClient = this.getAuthClient(userEmail, [
|
||||
'https://www.googleapis.com/auth/gmail.readonly',
|
||||
@@ -144,7 +145,7 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
|
||||
|
||||
// If no sync state is provided for this user, this is an initial import. Get all messages.
|
||||
if (!startHistoryId) {
|
||||
yield* this.fetchAllMessagesForUser(gmail, userEmail);
|
||||
yield* this.fetchAllMessagesForUser(gmail, userEmail, checkDuplicate);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -170,6 +171,16 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
|
||||
if (messageAdded.message?.id) {
|
||||
try {
|
||||
const messageId = messageAdded.message.id;
|
||||
|
||||
// Optimization: Check for existence before fetching full content
|
||||
if (checkDuplicate && (await checkDuplicate(messageId))) {
|
||||
logger.debug(
|
||||
{ messageId, userEmail },
|
||||
'Skipping duplicate email (pre-check)'
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
const metadataResponse = await gmail.users.messages.get({
|
||||
userId: userEmail,
|
||||
id: messageId,
|
||||
@@ -258,8 +269,17 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
|
||||
|
||||
private async *fetchAllMessagesForUser(
|
||||
gmail: gmail_v1.Gmail,
|
||||
userEmail: string
|
||||
userEmail: string,
|
||||
checkDuplicate?: (messageId: string) => Promise<boolean>
|
||||
): AsyncGenerator<EmailObject> {
|
||||
// Capture the history ID at the start to ensure no emails are missed during the import process.
|
||||
// Any emails arriving during this import will be covered by the next sync starting from this point.
|
||||
// Overlaps are handled by the duplicate check.
|
||||
const profileResponse = await gmail.users.getProfile({ userId: userEmail });
|
||||
if (profileResponse.data.historyId) {
|
||||
this.newHistoryId = profileResponse.data.historyId;
|
||||
}
|
||||
|
||||
let pageToken: string | undefined = undefined;
|
||||
do {
|
||||
const listResponse: Common.GaxiosResponseWithHTTP2<gmail_v1.Schema$ListMessagesResponse> =
|
||||
@@ -277,6 +297,16 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
|
||||
if (message.id) {
|
||||
try {
|
||||
const messageId = message.id;
|
||||
|
||||
// Optimization: Check for existence before fetching full content
|
||||
if (checkDuplicate && (await checkDuplicate(messageId))) {
|
||||
logger.debug(
|
||||
{ messageId, userEmail },
|
||||
'Skipping duplicate email (pre-check)'
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
const metadataResponse = await gmail.users.messages.get({
|
||||
userId: userEmail,
|
||||
id: messageId,
|
||||
@@ -352,12 +382,6 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
|
||||
}
|
||||
pageToken = listResponse.data.nextPageToken ?? undefined;
|
||||
} while (pageToken);
|
||||
|
||||
// After fetching all messages, get the latest history ID to use as the starting point for the next sync.
|
||||
const profileResponse = await gmail.users.getProfile({ userId: userEmail });
|
||||
if (profileResponse.data.historyId) {
|
||||
this.newHistoryId = profileResponse.data.historyId;
|
||||
}
|
||||
}
|
||||
|
||||
public getUpdatedSyncState(userEmail: string): SyncState {
|
||||
|
||||
@@ -142,7 +142,8 @@ export class ImapConnector implements IEmailConnector {
|
||||
|
||||
public async *fetchEmails(
|
||||
userEmail: string,
|
||||
syncState?: SyncState | null
|
||||
syncState?: SyncState | null,
|
||||
checkDuplicate?: (messageId: string) => Promise<boolean>
|
||||
): AsyncGenerator<EmailObject | null> {
|
||||
try {
|
||||
// list all mailboxes first
|
||||
@@ -218,6 +219,22 @@ export class ImapConnector implements IEmailConnector {
|
||||
this.newMaxUids[mailboxPath] = msg.uid;
|
||||
}
|
||||
|
||||
// Optimization: Verify existence using Message-ID from envelope before fetching full body
|
||||
if (checkDuplicate && msg.envelope?.messageId) {
|
||||
const isDuplicate = await checkDuplicate(msg.envelope.messageId);
|
||||
if (isDuplicate) {
|
||||
logger.debug(
|
||||
{
|
||||
mailboxPath,
|
||||
uid: msg.uid,
|
||||
messageId: msg.envelope.messageId,
|
||||
},
|
||||
'Skipping duplicate email (pre-check)'
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug({ mailboxPath, uid: msg.uid }, 'Processing message');
|
||||
|
||||
if (msg.envelope && msg.source) {
|
||||
|
||||
@@ -82,12 +82,21 @@ export class MboxConnector implements IEmailConnector {
|
||||
}
|
||||
|
||||
if (!fileExist) {
|
||||
throw Error('Mbox file not found or upload not finished yet, please wait.');
|
||||
if (this.credentials.localFilePath) {
|
||||
throw Error(`Mbox file not found at path: ${this.credentials.localFilePath}`);
|
||||
} else {
|
||||
throw Error(
|
||||
'Uploaded Mbox file not found. The upload may not have finished yet, or it failed.'
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
} catch (error) {
|
||||
logger.error({ error, credentials: this.credentials }, 'Mbox file validation failed.');
|
||||
logger.error(
|
||||
{ error, credentials: this.credentials },
|
||||
'Mbox file validation failed.'
|
||||
);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -161,11 +161,20 @@ export class PSTConnector implements IEmailConnector {
|
||||
}
|
||||
|
||||
if (!fileExist) {
|
||||
throw Error('PST file not found or upload not finished yet, please wait.');
|
||||
if (this.credentials.localFilePath) {
|
||||
throw Error(`PST file not found at path: ${this.credentials.localFilePath}`);
|
||||
} else {
|
||||
throw Error(
|
||||
'Uploaded PST file not found. The upload may not have finished yet, or it failed.'
|
||||
);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
} catch (error) {
|
||||
logger.error({ error, credentials: this.credentials }, 'PST file validation failed.');
|
||||
logger.error(
|
||||
{ error, credentials: this.credentials },
|
||||
'PST file validation failed.'
|
||||
);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user