feat(attachments): De-duplicate attachment content by content hash

This commit refactors attachment handling to allow multiple emails within the same ingestion source to reference attachments with identical content (same hash).

Changes:
- The unique index on the `attachments` table has been changed to a non-unique index to permit duplicate hash/source pairs.
- The ingestion logic is updated to first check for an existing attachment with the same hash and source. If found, it reuses the existing record; otherwise, it creates a new one. This maintains storage de-duplication.
- The email deletion logic is improved to be more robust. It now correctly removes the email-attachment link before checking if the attachment record and its corresponding file can be safely deleted.
This commit is contained in:
Wayne
2025-10-13 15:25:46 +02:00
parent eefe21c4cd
commit 150a9b15c9
6 changed files with 1408 additions and 47 deletions

View File

@@ -0,0 +1,2 @@
DROP INDEX "source_hash_unique";--> statement-breakpoint
CREATE INDEX "source_hash_idx" ON "attachments" USING btree ("ingestion_source_id","content_hash_sha256");

File diff suppressed because it is too large Load Diff

View File

@@ -162,6 +162,13 @@
"when": 1759701622932,
"tag": "0022_complete_triton",
"breakpoints": true
},
{
"idx": 23,
"version": "7",
"when": 1760354094610,
"tag": "0023_swift_swordsman",
"breakpoints": true
}
]
}

View File

@@ -1,5 +1,5 @@
import { relations } from 'drizzle-orm';
import { pgTable, text, uuid, bigint, primaryKey, uniqueIndex } from 'drizzle-orm/pg-core';
import { pgTable, text, uuid, bigint, primaryKey, index } from 'drizzle-orm/pg-core';
import { archivedEmails } from './archived-emails';
import { ingestionSources } from './ingestion-sources';
@@ -17,7 +17,7 @@ export const attachments = pgTable(
}),
},
(table) => [
uniqueIndex('source_hash_unique').on(table.ingestionSourceId, table.contentHashSha256),
index('source_hash_idx').on(table.ingestionSourceId, table.contentHashSha256),
]
);

View File

@@ -213,7 +213,7 @@ export class ArchivedEmailService {
// Load and handle attachments before deleting the email itself
if (email.hasAttachments) {
const emailAttachmentsResult = await db
const attachmentsForEmail = await db
.select({
attachmentId: attachments.id,
storagePath: attachments.storagePath,
@@ -223,37 +223,33 @@ export class ArchivedEmailService {
.where(eq(emailAttachments.emailId, emailId));
try {
for (const attachment of emailAttachmentsResult) {
const [refCount] = await db
.select({ count: count(emailAttachments.emailId) })
for (const attachment of attachmentsForEmail) {
// Delete the link between this email and the attachment record.
await db
.delete(emailAttachments)
.where(
and(
eq(emailAttachments.emailId, emailId),
eq(emailAttachments.attachmentId, attachment.attachmentId)
)
);
// Check if any other emails are linked to this attachment record.
const [recordRefCount] = await db
.select({ count: count() })
.from(emailAttachments)
.where(eq(emailAttachments.attachmentId, attachment.attachmentId));
if (refCount.count === 1) {
// If no other emails are linked to this record, it's safe to delete it and the file.
if (recordRefCount.count === 0) {
await storage.delete(attachment.storagePath);
await db
.delete(emailAttachments)
.where(
and(
eq(emailAttachments.emailId, emailId),
eq(emailAttachments.attachmentId, attachment.attachmentId)
)
);
await db
.delete(attachments)
.where(eq(attachments.id, attachment.attachmentId));
} else {
await db
.delete(emailAttachments)
.where(
and(
eq(emailAttachments.emailId, emailId),
eq(emailAttachments.attachmentId, attachment.attachmentId)
)
);
}
}
} catch {
} catch (error) {
console.error('Failed to delete email attachments', error);
throw new Error('Failed to delete email attachments');
}
}

View File

@@ -20,7 +20,7 @@ import {
attachments as attachmentsSchema,
emailAttachments,
} from '../database/schema';
import { createHash } from 'crypto';
import { createHash, randomUUID } from 'crypto';
import { logger } from '../config/logger';
import { SearchService } from './SearchService';
import { config } from '../config/index';
@@ -456,33 +456,63 @@ export class IngestionService {
const attachmentHash = createHash('sha256')
.update(attachmentBuffer)
.digest('hex');
const attachmentPath = `${config.storage.openArchiverFolderName}/${source.name.replaceAll(' ', '-')}-${source.id}/attachments/${attachment.filename}`;
await storage.put(attachmentPath, attachmentBuffer);
const [newAttachment] = await db
.insert(attachmentsSchema)
.values({
filename: attachment.filename,
mimeType: attachment.contentType,
sizeBytes: attachment.size,
contentHashSha256: attachmentHash,
storagePath: attachmentPath,
ingestionSourceId: source.id,
})
.onConflictDoUpdate({
target: [
attachmentsSchema.ingestionSourceId,
attachmentsSchema.contentHashSha256,
],
set: { filename: attachment.filename },
})
.returning();
// Check if an attachment with the same hash already exists for this source
const existingAttachment = await db.query.attachments.findFirst({
where: and(
eq(attachmentsSchema.contentHashSha256, attachmentHash),
eq(attachmentsSchema.ingestionSourceId, source.id)
),
});
let storagePath: string;
if (existingAttachment) {
// If it exists, reuse the storage path and don't save the file again
storagePath = existingAttachment.storagePath;
logger.info(
{
attachmentHash,
ingestionSourceId: source.id,
reusedPath: storagePath,
},
'Reusing existing attachment file for deduplication.'
);
} else {
// If it's a new attachment, create a unique path and save it
const uniqueId = randomUUID().slice(0, 7);
storagePath = `${config.storage.openArchiverFolderName}/${source.name.replaceAll(' ', '-')}-${source.id}/attachments/${uniqueId}-${attachment.filename}`;
await storage.put(storagePath, attachmentBuffer);
}
let attachmentRecord = existingAttachment;
if (!attachmentRecord) {
// If it's a new attachment, create a unique path and save it
const uniqueId = randomUUID().slice(0, 5);
const storagePath = `${config.storage.openArchiverFolderName}/${source.name.replaceAll(' ', '-')}-${source.id}/attachments/${uniqueId}-${attachment.filename}`;
await storage.put(storagePath, attachmentBuffer);
// Insert a new attachment record
[attachmentRecord] = await db
.insert(attachmentsSchema)
.values({
filename: attachment.filename,
mimeType: attachment.contentType,
sizeBytes: attachment.size,
contentHashSha256: attachmentHash,
storagePath: storagePath,
ingestionSourceId: source.id,
})
.returning();
}
// Link the attachment record (either new or existing) to the email
await db
.insert(emailAttachments)
.values({
emailId: archivedEmail.id,
attachmentId: newAttachment.id,
attachmentId: attachmentRecord.id,
})
.onConflictDoNothing();
}