diff --git a/packages/backend/src/database/migrations/0009_late_lenny_balinger.sql b/packages/backend/src/database/migrations/0009_late_lenny_balinger.sql new file mode 100644 index 0000000..39c1b06 --- /dev/null +++ b/packages/backend/src/database/migrations/0009_late_lenny_balinger.sql @@ -0,0 +1,2 @@ +ALTER TABLE "archived_emails" ADD COLUMN "thread_id" text;--> statement-breakpoint +CREATE INDEX "thread_id_idx" ON "archived_emails" USING btree ("thread_id"); \ No newline at end of file diff --git a/packages/backend/src/database/migrations/meta/0009_snapshot.json b/packages/backend/src/database/migrations/meta/0009_snapshot.json new file mode 100644 index 0000000..a516965 --- /dev/null +++ b/packages/backend/src/database/migrations/meta/0009_snapshot.json @@ -0,0 +1,854 @@ +{ + "id": "701eda75-451a-4a6d-87e3-b6658fca65da", + "prevId": "86b6960e-1936-4543-846f-a2d24d6dc5d1", + "version": "7", + "dialect": "postgresql", + "tables": { + "public.archived_emails": { + "name": "archived_emails", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "uuid", + "primaryKey": true, + "notNull": true, + "default": "gen_random_uuid()" + }, + "thread_id": { + "name": "thread_id", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "ingestion_source_id": { + "name": "ingestion_source_id", + "type": "uuid", + "primaryKey": false, + "notNull": true + }, + "user_email": { + "name": "user_email", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "message_id_header": { + "name": "message_id_header", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "sent_at": { + "name": "sent_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true + }, + "subject": { + "name": "subject", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "sender_name": { + "name": "sender_name", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "sender_email": { + "name": "sender_email", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "recipients": { + "name": "recipients", + "type": "jsonb", + "primaryKey": false, + "notNull": false + }, + "storage_path": { + "name": "storage_path", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "storage_hash_sha256": { + "name": "storage_hash_sha256", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "size_bytes": { + "name": "size_bytes", + "type": "bigint", + "primaryKey": false, + "notNull": true + }, + "is_indexed": { + "name": "is_indexed", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + }, + "has_attachments": { + "name": "has_attachments", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + }, + "is_on_legal_hold": { + "name": "is_on_legal_hold", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": false + }, + "archived_at": { + "name": "archived_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": { + "thread_id_idx": { + "name": "thread_id_idx", + "columns": [ + { + "expression": "thread_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "archived_emails_ingestion_source_id_ingestion_sources_id_fk": { + "name": "archived_emails_ingestion_source_id_ingestion_sources_id_fk", + "tableFrom": "archived_emails", + "tableTo": "ingestion_sources", + "columnsFrom": [ + "ingestion_source_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.attachments": { + "name": "attachments", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "uuid", + "primaryKey": true, + "notNull": true, + "default": "gen_random_uuid()" + }, + "filename": { + "name": "filename", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "mime_type": { + "name": "mime_type", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "size_bytes": { + "name": "size_bytes", + "type": "bigint", + "primaryKey": false, + "notNull": true + }, + "content_hash_sha256": { + "name": "content_hash_sha256", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "storage_path": { + "name": "storage_path", + "type": "text", + "primaryKey": false, + "notNull": true + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "attachments_content_hash_sha256_unique": { + "name": "attachments_content_hash_sha256_unique", + "nullsNotDistinct": false, + "columns": [ + "content_hash_sha256" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.email_attachments": { + "name": "email_attachments", + "schema": "", + "columns": { + "email_id": { + "name": "email_id", + "type": "uuid", + "primaryKey": false, + "notNull": true + }, + "attachment_id": { + "name": "attachment_id", + "type": "uuid", + "primaryKey": false, + "notNull": true + } + }, + "indexes": {}, + "foreignKeys": { + "email_attachments_email_id_archived_emails_id_fk": { + "name": "email_attachments_email_id_archived_emails_id_fk", + "tableFrom": "email_attachments", + "tableTo": "archived_emails", + "columnsFrom": [ + "email_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "email_attachments_attachment_id_attachments_id_fk": { + "name": "email_attachments_attachment_id_attachments_id_fk", + "tableFrom": "email_attachments", + "tableTo": "attachments", + "columnsFrom": [ + "attachment_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "restrict", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": { + "email_attachments_email_id_attachment_id_pk": { + "name": "email_attachments_email_id_attachment_id_pk", + "columns": [ + "email_id", + "attachment_id" + ] + } + }, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.audit_logs": { + "name": "audit_logs", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "bigserial", + "primaryKey": true, + "notNull": true + }, + "timestamp": { + "name": "timestamp", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "actor_identifier": { + "name": "actor_identifier", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "action": { + "name": "action", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "target_type": { + "name": "target_type", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "target_id": { + "name": "target_id", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "details": { + "name": "details", + "type": "jsonb", + "primaryKey": false, + "notNull": false + }, + "is_tamper_evident": { + "name": "is_tamper_evident", + "type": "boolean", + "primaryKey": false, + "notNull": false, + "default": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.ediscovery_cases": { + "name": "ediscovery_cases", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "uuid", + "primaryKey": true, + "notNull": true, + "default": "gen_random_uuid()" + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "description": { + "name": "description", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "status": { + "name": "status", + "type": "text", + "primaryKey": false, + "notNull": true, + "default": "'open'" + }, + "created_by_identifier": { + "name": "created_by_identifier", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "ediscovery_cases_name_unique": { + "name": "ediscovery_cases_name_unique", + "nullsNotDistinct": false, + "columns": [ + "name" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.export_jobs": { + "name": "export_jobs", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "uuid", + "primaryKey": true, + "notNull": true, + "default": "gen_random_uuid()" + }, + "case_id": { + "name": "case_id", + "type": "uuid", + "primaryKey": false, + "notNull": false + }, + "format": { + "name": "format", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "status": { + "name": "status", + "type": "text", + "primaryKey": false, + "notNull": true, + "default": "'pending'" + }, + "query": { + "name": "query", + "type": "jsonb", + "primaryKey": false, + "notNull": true + }, + "file_path": { + "name": "file_path", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "created_by_identifier": { + "name": "created_by_identifier", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "completed_at": { + "name": "completed_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": false + } + }, + "indexes": {}, + "foreignKeys": { + "export_jobs_case_id_ediscovery_cases_id_fk": { + "name": "export_jobs_case_id_ediscovery_cases_id_fk", + "tableFrom": "export_jobs", + "tableTo": "ediscovery_cases", + "columnsFrom": [ + "case_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "set null", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.legal_holds": { + "name": "legal_holds", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "uuid", + "primaryKey": true, + "notNull": true, + "default": "gen_random_uuid()" + }, + "case_id": { + "name": "case_id", + "type": "uuid", + "primaryKey": false, + "notNull": true + }, + "custodian_id": { + "name": "custodian_id", + "type": "uuid", + "primaryKey": false, + "notNull": false + }, + "hold_criteria": { + "name": "hold_criteria", + "type": "jsonb", + "primaryKey": false, + "notNull": false + }, + "reason": { + "name": "reason", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "applied_by_identifier": { + "name": "applied_by_identifier", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "applied_at": { + "name": "applied_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "removed_at": { + "name": "removed_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": false + } + }, + "indexes": {}, + "foreignKeys": { + "legal_holds_case_id_ediscovery_cases_id_fk": { + "name": "legal_holds_case_id_ediscovery_cases_id_fk", + "tableFrom": "legal_holds", + "tableTo": "ediscovery_cases", + "columnsFrom": [ + "case_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "legal_holds_custodian_id_custodians_id_fk": { + "name": "legal_holds_custodian_id_custodians_id_fk", + "tableFrom": "legal_holds", + "tableTo": "custodians", + "columnsFrom": [ + "custodian_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.retention_policies": { + "name": "retention_policies", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "uuid", + "primaryKey": true, + "notNull": true, + "default": "gen_random_uuid()" + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "description": { + "name": "description", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "priority": { + "name": "priority", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "retention_period_days": { + "name": "retention_period_days", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "action_on_expiry": { + "name": "action_on_expiry", + "type": "retention_action", + "typeSchema": "public", + "primaryKey": false, + "notNull": true + }, + "is_enabled": { + "name": "is_enabled", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": true + }, + "conditions": { + "name": "conditions", + "type": "jsonb", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "retention_policies_name_unique": { + "name": "retention_policies_name_unique", + "nullsNotDistinct": false, + "columns": [ + "name" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.custodians": { + "name": "custodians", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "uuid", + "primaryKey": true, + "notNull": true, + "default": "gen_random_uuid()" + }, + "email": { + "name": "email", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "display_name": { + "name": "display_name", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "source_type": { + "name": "source_type", + "type": "ingestion_provider", + "typeSchema": "public", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "custodians_email_unique": { + "name": "custodians_email_unique", + "nullsNotDistinct": false, + "columns": [ + "email" + ] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.ingestion_sources": { + "name": "ingestion_sources", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "uuid", + "primaryKey": true, + "notNull": true, + "default": "gen_random_uuid()" + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "provider": { + "name": "provider", + "type": "ingestion_provider", + "typeSchema": "public", + "primaryKey": false, + "notNull": true + }, + "credentials": { + "name": "credentials", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "status": { + "name": "status", + "type": "ingestion_status", + "typeSchema": "public", + "primaryKey": false, + "notNull": true, + "default": "'pending_auth'" + }, + "last_sync_started_at": { + "name": "last_sync_started_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": false + }, + "last_sync_finished_at": { + "name": "last_sync_finished_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": false + }, + "last_sync_status_message": { + "name": "last_sync_status_message", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "sync_state": { + "name": "sync_state", + "type": "jsonb", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + } + }, + "enums": { + "public.retention_action": { + "name": "retention_action", + "schema": "public", + "values": [ + "delete_permanently", + "notify_admin" + ] + }, + "public.ingestion_provider": { + "name": "ingestion_provider", + "schema": "public", + "values": [ + "google_workspace", + "microsoft_365", + "generic_imap" + ] + }, + "public.ingestion_status": { + "name": "ingestion_status", + "schema": "public", + "values": [ + "active", + "paused", + "error", + "pending_auth", + "syncing", + "importing", + "auth_success" + ] + } + }, + "schemas": {}, + "sequences": {}, + "roles": {}, + "policies": {}, + "views": {}, + "_meta": { + "columns": {}, + "schemas": {}, + "tables": {} + } +} \ No newline at end of file diff --git a/packages/backend/src/database/migrations/meta/_journal.json b/packages/backend/src/database/migrations/meta/_journal.json index 2346e3e..0ca8395 100644 --- a/packages/backend/src/database/migrations/meta/_journal.json +++ b/packages/backend/src/database/migrations/meta/_journal.json @@ -64,6 +64,13 @@ "when": 1753370737317, "tag": "0008_eminent_the_spike", "breakpoints": true + }, + { + "idx": 9, + "version": "7", + "when": 1754337938241, + "tag": "0009_late_lenny_balinger", + "breakpoints": true } ] } \ No newline at end of file diff --git a/packages/backend/src/database/schema/archived-emails.ts b/packages/backend/src/database/schema/archived-emails.ts index 9495b68..0ec8072 100644 --- a/packages/backend/src/database/schema/archived-emails.ts +++ b/packages/backend/src/database/schema/archived-emails.ts @@ -1,27 +1,36 @@ import { relations } from 'drizzle-orm'; -import { boolean, jsonb, pgTable, text, timestamp, uuid, bigint } from 'drizzle-orm/pg-core'; +import { boolean, jsonb, pgTable, text, timestamp, uuid, bigint, index } from 'drizzle-orm/pg-core'; import { ingestionSources } from './ingestion-sources'; -export const archivedEmails = pgTable('archived_emails', { - id: uuid('id').primaryKey().defaultRandom(), - ingestionSourceId: uuid('ingestion_source_id') - .notNull() - .references(() => ingestionSources.id, { onDelete: 'cascade' }), - userEmail: text('user_email').notNull(), - messageIdHeader: text('message_id_header'), - sentAt: timestamp('sent_at', { withTimezone: true }).notNull(), - subject: text('subject'), - senderName: text('sender_name'), - senderEmail: text('sender_email').notNull(), - recipients: jsonb('recipients'), - storagePath: text('storage_path').notNull(), - storageHashSha256: text('storage_hash_sha256').notNull(), - sizeBytes: bigint('size_bytes', { mode: 'number' }).notNull(), - isIndexed: boolean('is_indexed').notNull().default(false), - hasAttachments: boolean('has_attachments').notNull().default(false), - isOnLegalHold: boolean('is_on_legal_hold').notNull().default(false), - archivedAt: timestamp('archived_at', { withTimezone: true }).notNull().defaultNow(), -}); +export const archivedEmails = pgTable( + 'archived_emails', + { + id: uuid('id').primaryKey().defaultRandom(), + threadId: text('thread_id'), + ingestionSourceId: uuid('ingestion_source_id') + .notNull() + .references(() => ingestionSources.id, { onDelete: 'cascade' }), + userEmail: text('user_email').notNull(), + messageIdHeader: text('message_id_header'), + sentAt: timestamp('sent_at', { withTimezone: true }).notNull(), + subject: text('subject'), + senderName: text('sender_name'), + senderEmail: text('sender_email').notNull(), + recipients: jsonb('recipients'), + storagePath: text('storage_path').notNull(), + storageHashSha256: text('storage_hash_sha256').notNull(), + sizeBytes: bigint('size_bytes', { mode: 'number' }).notNull(), + isIndexed: boolean('is_indexed').notNull().default(false), + hasAttachments: boolean('has_attachments').notNull().default(false), + isOnLegalHold: boolean('is_on_legal_hold').notNull().default(false), + archivedAt: timestamp('archived_at', { withTimezone: true }).notNull().defaultNow(), + }, + (table) => { + return { + threadIdIdx: index('thread_id_idx').on(table.threadId) + }; + } +); export const archivedEmailsRelations = relations(archivedEmails, ({ one }) => ({ ingestionSource: one(ingestionSources, { diff --git a/packages/backend/src/services/ArchivedEmailService.ts b/packages/backend/src/services/ArchivedEmailService.ts index 4456b55..3492a03 100644 --- a/packages/backend/src/services/ArchivedEmailService.ts +++ b/packages/backend/src/services/ArchivedEmailService.ts @@ -1,7 +1,7 @@ -import { count, desc, eq } from 'drizzle-orm'; +import { count, desc, eq, asc } from 'drizzle-orm'; import { db } from '../database'; import { archivedEmails, attachments, emailAttachments } from '../database/schema'; -import type { PaginatedArchivedEmails, ArchivedEmail, Recipient } from '@open-archiver/types'; +import type { PaginatedArchivedEmails, ArchivedEmail, Recipient, ThreadEmail } from '@open-archiver/types'; import { StorageService } from './StorageService'; import type { Readable } from 'stream'; @@ -11,6 +11,8 @@ interface DbRecipients { bcc: { name: string; address: string; }[]; } + + async function streamToBuffer(stream: Readable): Promise { return new Promise((resolve, reject) => { const chunks: Buffer[] = []; @@ -75,6 +77,21 @@ export class ArchivedEmailService { return null; } + let threadEmails: ThreadEmail[] = []; + + if (email.threadId) { + threadEmails = await db.query.archivedEmails.findMany({ + where: eq(archivedEmails.threadId, email.threadId), + orderBy: [asc(archivedEmails.sentAt)], + columns: { + id: true, + subject: true, + sentAt: true, + senderEmail: true, + }, + }); + } + const storage = new StorageService(); const rawStream = await storage.get(email.storagePath); const raw = await streamToBuffer(rawStream as Readable); @@ -82,7 +99,8 @@ export class ArchivedEmailService { const mappedEmail = { ...email, recipients: this.mapRecipients(email.recipients), - raw + raw, + thread: threadEmails }; if (email.hasAttachments) { diff --git a/packages/backend/src/services/IngestionService.ts b/packages/backend/src/services/IngestionService.ts index 963e5a4..4398bf7 100644 --- a/packages/backend/src/services/IngestionService.ts +++ b/packages/backend/src/services/IngestionService.ts @@ -297,6 +297,7 @@ export class IngestionService { .values({ ingestionSourceId: source.id, userEmail, + threadId: email.threadId, messageIdHeader: messageId, sentAt: email.receivedAt, subject: email.subject, diff --git a/packages/backend/src/services/ingestion-connectors/GoogleWorkspaceConnector.ts b/packages/backend/src/services/ingestion-connectors/GoogleWorkspaceConnector.ts index 66d3bd7..db1df77 100644 --- a/packages/backend/src/services/ingestion-connectors/GoogleWorkspaceConnector.ts +++ b/packages/backend/src/services/ingestion-connectors/GoogleWorkspaceConnector.ts @@ -9,7 +9,8 @@ import type { } from '@open-archiver/types'; import type { IEmailConnector } from '../EmailProviderFactory'; import { logger } from '../../config/logger'; -import { simpleParser, ParsedMail, Attachment, AddressObject } from 'mailparser'; +import { simpleParser, ParsedMail, Attachment, AddressObject, Headers } from 'mailparser'; +import { getThreadId } from './utils'; /** * A connector for Google Workspace that uses a service account with domain-wide delegation @@ -54,6 +55,7 @@ export class GoogleWorkspaceConnector implements IEmailConnector { return jwtClient; } + /** * Tests the connection and authentication by attempting to list the first user * from the directory, impersonating the admin user. @@ -150,7 +152,7 @@ export class GoogleWorkspaceConnector implements IEmailConnector { do { const historyResponse: Common.GaxiosResponseWithHTTP2 = await gmail.users.history.list({ - userId: 'me', + userId: userEmail, startHistoryId: this.newHistoryId, pageToken: pageToken, historyTypes: ['messageAdded'] @@ -167,7 +169,7 @@ export class GoogleWorkspaceConnector implements IEmailConnector { if (messageAdded.message?.id) { try { const msgResponse = await gmail.users.messages.get({ - userId: 'me', + userId: userEmail, id: messageAdded.message.id, format: 'RAW' }); @@ -186,8 +188,11 @@ export class GoogleWorkspaceConnector implements IEmailConnector { const addressArray = Array.isArray(addresses) ? addresses : [addresses]; return addressArray.flatMap(a => a.value.map(v => ({ name: v.name, address: v.address || '' }))); }; + const threadId = getThreadId(parsedEmail.headers); + console.log('threadId', threadId); yield { id: msgResponse.data.id!, + threadId, userEmail: userEmail, eml: rawEmail, from: mapAddresses(parsedEmail.from), @@ -226,7 +231,7 @@ export class GoogleWorkspaceConnector implements IEmailConnector { let pageToken: string | undefined = undefined; do { const listResponse: Common.GaxiosResponseWithHTTP2 = await gmail.users.messages.list({ - userId: 'me', + userId: userEmail, pageToken: pageToken }); @@ -239,7 +244,7 @@ export class GoogleWorkspaceConnector implements IEmailConnector { if (message.id) { try { const msgResponse = await gmail.users.messages.get({ - userId: 'me', + userId: userEmail, id: message.id, format: 'RAW' }); @@ -258,8 +263,11 @@ export class GoogleWorkspaceConnector implements IEmailConnector { const addressArray = Array.isArray(addresses) ? addresses : [addresses]; return addressArray.flatMap(a => a.value.map(v => ({ name: v.name, address: v.address || '' }))); }; + const threadId = getThreadId(parsedEmail.headers); + console.log('threadId', threadId); yield { id: msgResponse.data.id!, + threadId, userEmail: userEmail, eml: rawEmail, from: mapAddresses(parsedEmail.from), @@ -287,7 +295,7 @@ export class GoogleWorkspaceConnector implements IEmailConnector { } while (pageToken); // After fetching all messages, get the latest history ID to use as the starting point for the next sync. - const profileResponse = await gmail.users.getProfile({ userId: 'me' }); + const profileResponse = await gmail.users.getProfile({ userId: userEmail }); if (profileResponse.data.historyId) { this.newHistoryId = profileResponse.data.historyId; } diff --git a/packages/backend/src/services/ingestion-connectors/ImapConnector.ts b/packages/backend/src/services/ingestion-connectors/ImapConnector.ts index 5429e53..8c05bc7 100644 --- a/packages/backend/src/services/ingestion-connectors/ImapConnector.ts +++ b/packages/backend/src/services/ingestion-connectors/ImapConnector.ts @@ -1,8 +1,9 @@ import type { GenericImapCredentials, EmailObject, EmailAddress, SyncState, MailboxUser } from '@open-archiver/types'; import type { IEmailConnector } from '../EmailProviderFactory'; import { ImapFlow } from 'imapflow'; -import { simpleParser, ParsedMail, Attachment, AddressObject } from 'mailparser'; +import { simpleParser, ParsedMail, Attachment, AddressObject, Headers } from 'mailparser'; import { logger } from '../../config/logger'; +import { getThreadId } from './utils'; export class ImapConnector implements IEmailConnector { private client: ImapFlow; @@ -192,8 +193,11 @@ export class ImapConnector implements IEmailConnector { return addressArray.flatMap(a => a.value.map(v => ({ name: v.name, address: v.address || '' }))); }; + const threadId = getThreadId(parsedEmail.headers); + return { id: msg.uid.toString(), + threadId: threadId, from: mapAddresses(parsedEmail.from), to: mapAddresses(parsedEmail.to), cc: mapAddresses(parsedEmail.cc), diff --git a/packages/backend/src/services/ingestion-connectors/MicrosoftConnector.ts b/packages/backend/src/services/ingestion-connectors/MicrosoftConnector.ts index 1d82f64..072650a 100644 --- a/packages/backend/src/services/ingestion-connectors/MicrosoftConnector.ts +++ b/packages/backend/src/services/ingestion-connectors/MicrosoftConnector.ts @@ -200,13 +200,17 @@ export class MicrosoftConnector implements IEmailConnector { while (requestUrl) { try { - const response = await this.graphClient.api(requestUrl).get(); + const response = await this.graphClient.api(requestUrl) + .select('id,conversationId,@removed') + .get(); for (const message of response.value) { if (message.id && !(message)['@removed']) { const rawEmail = await this.getRawEmail(userEmail, message.id); if (rawEmail) { - yield await this.parseEmail(rawEmail, message.id, userEmail); + const emailObject = await this.parseEmail(rawEmail, message.id, userEmail); + emailObject.threadId = message.conversationId; // Add conversationId as threadId + yield emailObject; } } } diff --git a/packages/backend/src/services/ingestion-connectors/utils.ts b/packages/backend/src/services/ingestion-connectors/utils.ts new file mode 100644 index 0000000..a4197d2 --- /dev/null +++ b/packages/backend/src/services/ingestion-connectors/utils.ts @@ -0,0 +1,47 @@ + +import type { Headers } from 'mailparser'; + +function getHeaderValue(header: any): string | undefined { + if (typeof header === 'string') { + return header; + } + if (Array.isArray(header)) { + return getHeaderValue(header[0]); + } + if (typeof header === 'object' && header !== null && 'value' in header) { + return getHeaderValue(header.value); + } + return undefined; +} + +export function getThreadId(headers: Headers): string | undefined { + + const referencesHeader = headers.get('references'); + + if (referencesHeader) { + const references = getHeaderValue(referencesHeader); + if (references) { + return references.split(' ')[0].trim(); + } + } + + const inReplyToHeader = headers.get('in-reply-to'); + + if (inReplyToHeader) { + const inReplyTo = getHeaderValue(inReplyToHeader); + if (inReplyTo) { + return inReplyTo.trim(); + } + } + + const messageIdHeader = headers.get('message-id'); + + if (messageIdHeader) { + const messageId = getHeaderValue(messageIdHeader); + if (messageId) { + return messageId.trim(); + } + } + console.warn('No thread ID found, returning undefined'); + return undefined; +} diff --git a/packages/frontend/src/lib/components/custom/EmailThread.svelte b/packages/frontend/src/lib/components/custom/EmailThread.svelte new file mode 100644 index 0000000..8a8e0b6 --- /dev/null +++ b/packages/frontend/src/lib/components/custom/EmailThread.svelte @@ -0,0 +1,62 @@ + + +
+
+ {#if thread} + {#each thread as item, i (item.id)} +
+ + + +

+ {#if item.id !== currentEmailId} + { + e.preventDefault(); + goto(`/dashboard/archived-emails/${item.id}`, { + invalidateAll: true + }); + }}>{item.subject || 'No Subject'} + {:else} + {item.subject || 'No Subject'} + {/if} +

+
+ From: {item.senderEmail} + +
+
+ {/each} + {/if} +
+
diff --git a/packages/frontend/src/routes/dashboard/archived-emails/[id]/+page.svelte b/packages/frontend/src/routes/dashboard/archived-emails/[id]/+page.svelte index 78c1cd2..684db3b 100644 --- a/packages/frontend/src/routes/dashboard/archived-emails/[id]/+page.svelte +++ b/packages/frontend/src/routes/dashboard/archived-emails/[id]/+page.svelte @@ -3,11 +3,13 @@ import { Button } from '$lib/components/ui/button'; import * as Card from '$lib/components/ui/card'; import EmailPreview from '$lib/components/custom/EmailPreview.svelte'; + import EmailThread from '$lib/components/custom/EmailThread.svelte'; import { api } from '$lib/api.client'; import { browser } from '$app/environment'; let { data }: { data: PageData } = $props(); - const { email } = data; + let email = $derived(data.email); + async function download(path: string, filename: string) { if (!browser) return; @@ -41,7 +43,7 @@ {email.subject || 'No Subject'} - From: {email.senderName || email.senderEmail} | Sent: {new Date( + From: {email.senderEmail || email.senderName} | Sent: {new Date( email.sentAt ).toLocaleString()} @@ -79,7 +81,7 @@ -
+
Actions @@ -90,6 +92,17 @@ > + + {#if email.thread && email.thread.length > 1} + + + Email thread + + + + + + {/if}
{:else} diff --git a/packages/frontend/src/routes/dashboard/ingestions/+page.svelte b/packages/frontend/src/routes/dashboard/ingestions/+page.svelte index 3a2568e..433f4a9 100644 --- a/packages/frontend/src/routes/dashboard/ingestions/+page.svelte +++ b/packages/frontend/src/routes/dashboard/ingestions/+page.svelte @@ -31,7 +31,7 @@ setAlert({ type: 'warning', title: 'Demo mode', - message: 'Editing is now allowed in demo mode.', + message: 'Editing is not allowed in demo mode.', duration: 5000, show: true }); diff --git a/packages/types/src/archived-emails.types.ts b/packages/types/src/archived-emails.types.ts index b346853..048fa56 100644 --- a/packages/types/src/archived-emails.types.ts +++ b/packages/types/src/archived-emails.types.ts @@ -17,6 +17,14 @@ export interface Attachment { storagePath: string; } + +export interface ThreadEmail { + id: string; //the archivedemail id + subject: string | null; + sentAt: Date; + senderEmail: string; +} + /** * Represents a single archived email. */ @@ -39,6 +47,7 @@ export interface ArchivedEmail { archivedAt: Date; attachments?: Attachment[]; raw?: Buffer; + thread?: ThreadEmail[]; } /** diff --git a/packages/types/src/email.types.ts b/packages/types/src/email.types.ts index 40d1e37..c067b36 100644 --- a/packages/types/src/email.types.ts +++ b/packages/types/src/email.types.ts @@ -23,6 +23,8 @@ export interface EmailAttachment { export interface EmailObject { /** A unique identifier for the email, typically assigned by the source provider. */ id: string; + /** An optional identifier for the email thread, used to group related emails. */ + threadId?: string; /** An array of `EmailAddress` objects representing the sender(s). */ from: EmailAddress[]; /** An array of `EmailAddress` objects representing the primary recipient(s). */