Thread discovery

This commit is contained in:
Wayne
2025-08-05 13:34:51 +03:00
parent f484f72994
commit f2a5b29105
15 changed files with 1077 additions and 37 deletions

View File

@@ -0,0 +1,2 @@
ALTER TABLE "archived_emails" ADD COLUMN "thread_id" text;--> statement-breakpoint
CREATE INDEX "thread_id_idx" ON "archived_emails" USING btree ("thread_id");

View File

@@ -0,0 +1,854 @@
{
"id": "701eda75-451a-4a6d-87e3-b6658fca65da",
"prevId": "86b6960e-1936-4543-846f-a2d24d6dc5d1",
"version": "7",
"dialect": "postgresql",
"tables": {
"public.archived_emails": {
"name": "archived_emails",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "uuid",
"primaryKey": true,
"notNull": true,
"default": "gen_random_uuid()"
},
"thread_id": {
"name": "thread_id",
"type": "text",
"primaryKey": false,
"notNull": false
},
"ingestion_source_id": {
"name": "ingestion_source_id",
"type": "uuid",
"primaryKey": false,
"notNull": true
},
"user_email": {
"name": "user_email",
"type": "text",
"primaryKey": false,
"notNull": true
},
"message_id_header": {
"name": "message_id_header",
"type": "text",
"primaryKey": false,
"notNull": false
},
"sent_at": {
"name": "sent_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true
},
"subject": {
"name": "subject",
"type": "text",
"primaryKey": false,
"notNull": false
},
"sender_name": {
"name": "sender_name",
"type": "text",
"primaryKey": false,
"notNull": false
},
"sender_email": {
"name": "sender_email",
"type": "text",
"primaryKey": false,
"notNull": true
},
"recipients": {
"name": "recipients",
"type": "jsonb",
"primaryKey": false,
"notNull": false
},
"storage_path": {
"name": "storage_path",
"type": "text",
"primaryKey": false,
"notNull": true
},
"storage_hash_sha256": {
"name": "storage_hash_sha256",
"type": "text",
"primaryKey": false,
"notNull": true
},
"size_bytes": {
"name": "size_bytes",
"type": "bigint",
"primaryKey": false,
"notNull": true
},
"is_indexed": {
"name": "is_indexed",
"type": "boolean",
"primaryKey": false,
"notNull": true,
"default": false
},
"has_attachments": {
"name": "has_attachments",
"type": "boolean",
"primaryKey": false,
"notNull": true,
"default": false
},
"is_on_legal_hold": {
"name": "is_on_legal_hold",
"type": "boolean",
"primaryKey": false,
"notNull": true,
"default": false
},
"archived_at": {
"name": "archived_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true,
"default": "now()"
}
},
"indexes": {
"thread_id_idx": {
"name": "thread_id_idx",
"columns": [
{
"expression": "thread_id",
"isExpression": false,
"asc": true,
"nulls": "last"
}
],
"isUnique": false,
"concurrently": false,
"method": "btree",
"with": {}
}
},
"foreignKeys": {
"archived_emails_ingestion_source_id_ingestion_sources_id_fk": {
"name": "archived_emails_ingestion_source_id_ingestion_sources_id_fk",
"tableFrom": "archived_emails",
"tableTo": "ingestion_sources",
"columnsFrom": [
"ingestion_source_id"
],
"columnsTo": [
"id"
],
"onDelete": "cascade",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.attachments": {
"name": "attachments",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "uuid",
"primaryKey": true,
"notNull": true,
"default": "gen_random_uuid()"
},
"filename": {
"name": "filename",
"type": "text",
"primaryKey": false,
"notNull": true
},
"mime_type": {
"name": "mime_type",
"type": "text",
"primaryKey": false,
"notNull": false
},
"size_bytes": {
"name": "size_bytes",
"type": "bigint",
"primaryKey": false,
"notNull": true
},
"content_hash_sha256": {
"name": "content_hash_sha256",
"type": "text",
"primaryKey": false,
"notNull": true
},
"storage_path": {
"name": "storage_path",
"type": "text",
"primaryKey": false,
"notNull": true
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {
"attachments_content_hash_sha256_unique": {
"name": "attachments_content_hash_sha256_unique",
"nullsNotDistinct": false,
"columns": [
"content_hash_sha256"
]
}
},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.email_attachments": {
"name": "email_attachments",
"schema": "",
"columns": {
"email_id": {
"name": "email_id",
"type": "uuid",
"primaryKey": false,
"notNull": true
},
"attachment_id": {
"name": "attachment_id",
"type": "uuid",
"primaryKey": false,
"notNull": true
}
},
"indexes": {},
"foreignKeys": {
"email_attachments_email_id_archived_emails_id_fk": {
"name": "email_attachments_email_id_archived_emails_id_fk",
"tableFrom": "email_attachments",
"tableTo": "archived_emails",
"columnsFrom": [
"email_id"
],
"columnsTo": [
"id"
],
"onDelete": "cascade",
"onUpdate": "no action"
},
"email_attachments_attachment_id_attachments_id_fk": {
"name": "email_attachments_attachment_id_attachments_id_fk",
"tableFrom": "email_attachments",
"tableTo": "attachments",
"columnsFrom": [
"attachment_id"
],
"columnsTo": [
"id"
],
"onDelete": "restrict",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {
"email_attachments_email_id_attachment_id_pk": {
"name": "email_attachments_email_id_attachment_id_pk",
"columns": [
"email_id",
"attachment_id"
]
}
},
"uniqueConstraints": {},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.audit_logs": {
"name": "audit_logs",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "bigserial",
"primaryKey": true,
"notNull": true
},
"timestamp": {
"name": "timestamp",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true,
"default": "now()"
},
"actor_identifier": {
"name": "actor_identifier",
"type": "text",
"primaryKey": false,
"notNull": true
},
"action": {
"name": "action",
"type": "text",
"primaryKey": false,
"notNull": true
},
"target_type": {
"name": "target_type",
"type": "text",
"primaryKey": false,
"notNull": false
},
"target_id": {
"name": "target_id",
"type": "text",
"primaryKey": false,
"notNull": false
},
"details": {
"name": "details",
"type": "jsonb",
"primaryKey": false,
"notNull": false
},
"is_tamper_evident": {
"name": "is_tamper_evident",
"type": "boolean",
"primaryKey": false,
"notNull": false,
"default": false
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.ediscovery_cases": {
"name": "ediscovery_cases",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "uuid",
"primaryKey": true,
"notNull": true,
"default": "gen_random_uuid()"
},
"name": {
"name": "name",
"type": "text",
"primaryKey": false,
"notNull": true
},
"description": {
"name": "description",
"type": "text",
"primaryKey": false,
"notNull": false
},
"status": {
"name": "status",
"type": "text",
"primaryKey": false,
"notNull": true,
"default": "'open'"
},
"created_by_identifier": {
"name": "created_by_identifier",
"type": "text",
"primaryKey": false,
"notNull": true
},
"created_at": {
"name": "created_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true,
"default": "now()"
},
"updated_at": {
"name": "updated_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true,
"default": "now()"
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {
"ediscovery_cases_name_unique": {
"name": "ediscovery_cases_name_unique",
"nullsNotDistinct": false,
"columns": [
"name"
]
}
},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.export_jobs": {
"name": "export_jobs",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "uuid",
"primaryKey": true,
"notNull": true,
"default": "gen_random_uuid()"
},
"case_id": {
"name": "case_id",
"type": "uuid",
"primaryKey": false,
"notNull": false
},
"format": {
"name": "format",
"type": "text",
"primaryKey": false,
"notNull": true
},
"status": {
"name": "status",
"type": "text",
"primaryKey": false,
"notNull": true,
"default": "'pending'"
},
"query": {
"name": "query",
"type": "jsonb",
"primaryKey": false,
"notNull": true
},
"file_path": {
"name": "file_path",
"type": "text",
"primaryKey": false,
"notNull": false
},
"created_by_identifier": {
"name": "created_by_identifier",
"type": "text",
"primaryKey": false,
"notNull": true
},
"created_at": {
"name": "created_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true,
"default": "now()"
},
"completed_at": {
"name": "completed_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": false
}
},
"indexes": {},
"foreignKeys": {
"export_jobs_case_id_ediscovery_cases_id_fk": {
"name": "export_jobs_case_id_ediscovery_cases_id_fk",
"tableFrom": "export_jobs",
"tableTo": "ediscovery_cases",
"columnsFrom": [
"case_id"
],
"columnsTo": [
"id"
],
"onDelete": "set null",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.legal_holds": {
"name": "legal_holds",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "uuid",
"primaryKey": true,
"notNull": true,
"default": "gen_random_uuid()"
},
"case_id": {
"name": "case_id",
"type": "uuid",
"primaryKey": false,
"notNull": true
},
"custodian_id": {
"name": "custodian_id",
"type": "uuid",
"primaryKey": false,
"notNull": false
},
"hold_criteria": {
"name": "hold_criteria",
"type": "jsonb",
"primaryKey": false,
"notNull": false
},
"reason": {
"name": "reason",
"type": "text",
"primaryKey": false,
"notNull": false
},
"applied_by_identifier": {
"name": "applied_by_identifier",
"type": "text",
"primaryKey": false,
"notNull": true
},
"applied_at": {
"name": "applied_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true,
"default": "now()"
},
"removed_at": {
"name": "removed_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": false
}
},
"indexes": {},
"foreignKeys": {
"legal_holds_case_id_ediscovery_cases_id_fk": {
"name": "legal_holds_case_id_ediscovery_cases_id_fk",
"tableFrom": "legal_holds",
"tableTo": "ediscovery_cases",
"columnsFrom": [
"case_id"
],
"columnsTo": [
"id"
],
"onDelete": "cascade",
"onUpdate": "no action"
},
"legal_holds_custodian_id_custodians_id_fk": {
"name": "legal_holds_custodian_id_custodians_id_fk",
"tableFrom": "legal_holds",
"tableTo": "custodians",
"columnsFrom": [
"custodian_id"
],
"columnsTo": [
"id"
],
"onDelete": "cascade",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.retention_policies": {
"name": "retention_policies",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "uuid",
"primaryKey": true,
"notNull": true,
"default": "gen_random_uuid()"
},
"name": {
"name": "name",
"type": "text",
"primaryKey": false,
"notNull": true
},
"description": {
"name": "description",
"type": "text",
"primaryKey": false,
"notNull": false
},
"priority": {
"name": "priority",
"type": "integer",
"primaryKey": false,
"notNull": true
},
"retention_period_days": {
"name": "retention_period_days",
"type": "integer",
"primaryKey": false,
"notNull": true
},
"action_on_expiry": {
"name": "action_on_expiry",
"type": "retention_action",
"typeSchema": "public",
"primaryKey": false,
"notNull": true
},
"is_enabled": {
"name": "is_enabled",
"type": "boolean",
"primaryKey": false,
"notNull": true,
"default": true
},
"conditions": {
"name": "conditions",
"type": "jsonb",
"primaryKey": false,
"notNull": false
},
"created_at": {
"name": "created_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true,
"default": "now()"
},
"updated_at": {
"name": "updated_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true,
"default": "now()"
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {
"retention_policies_name_unique": {
"name": "retention_policies_name_unique",
"nullsNotDistinct": false,
"columns": [
"name"
]
}
},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.custodians": {
"name": "custodians",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "uuid",
"primaryKey": true,
"notNull": true,
"default": "gen_random_uuid()"
},
"email": {
"name": "email",
"type": "text",
"primaryKey": false,
"notNull": true
},
"display_name": {
"name": "display_name",
"type": "text",
"primaryKey": false,
"notNull": false
},
"source_type": {
"name": "source_type",
"type": "ingestion_provider",
"typeSchema": "public",
"primaryKey": false,
"notNull": true
},
"created_at": {
"name": "created_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true,
"default": "now()"
},
"updated_at": {
"name": "updated_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true,
"default": "now()"
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {
"custodians_email_unique": {
"name": "custodians_email_unique",
"nullsNotDistinct": false,
"columns": [
"email"
]
}
},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.ingestion_sources": {
"name": "ingestion_sources",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "uuid",
"primaryKey": true,
"notNull": true,
"default": "gen_random_uuid()"
},
"name": {
"name": "name",
"type": "text",
"primaryKey": false,
"notNull": true
},
"provider": {
"name": "provider",
"type": "ingestion_provider",
"typeSchema": "public",
"primaryKey": false,
"notNull": true
},
"credentials": {
"name": "credentials",
"type": "text",
"primaryKey": false,
"notNull": false
},
"status": {
"name": "status",
"type": "ingestion_status",
"typeSchema": "public",
"primaryKey": false,
"notNull": true,
"default": "'pending_auth'"
},
"last_sync_started_at": {
"name": "last_sync_started_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": false
},
"last_sync_finished_at": {
"name": "last_sync_finished_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": false
},
"last_sync_status_message": {
"name": "last_sync_status_message",
"type": "text",
"primaryKey": false,
"notNull": false
},
"sync_state": {
"name": "sync_state",
"type": "jsonb",
"primaryKey": false,
"notNull": false
},
"created_at": {
"name": "created_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true,
"default": "now()"
},
"updated_at": {
"name": "updated_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true,
"default": "now()"
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
}
},
"enums": {
"public.retention_action": {
"name": "retention_action",
"schema": "public",
"values": [
"delete_permanently",
"notify_admin"
]
},
"public.ingestion_provider": {
"name": "ingestion_provider",
"schema": "public",
"values": [
"google_workspace",
"microsoft_365",
"generic_imap"
]
},
"public.ingestion_status": {
"name": "ingestion_status",
"schema": "public",
"values": [
"active",
"paused",
"error",
"pending_auth",
"syncing",
"importing",
"auth_success"
]
}
},
"schemas": {},
"sequences": {},
"roles": {},
"policies": {},
"views": {},
"_meta": {
"columns": {},
"schemas": {},
"tables": {}
}
}

View File

@@ -64,6 +64,13 @@
"when": 1753370737317,
"tag": "0008_eminent_the_spike",
"breakpoints": true
},
{
"idx": 9,
"version": "7",
"when": 1754337938241,
"tag": "0009_late_lenny_balinger",
"breakpoints": true
}
]
}

View File

@@ -1,27 +1,36 @@
import { relations } from 'drizzle-orm';
import { boolean, jsonb, pgTable, text, timestamp, uuid, bigint } from 'drizzle-orm/pg-core';
import { boolean, jsonb, pgTable, text, timestamp, uuid, bigint, index } from 'drizzle-orm/pg-core';
import { ingestionSources } from './ingestion-sources';
export const archivedEmails = pgTable('archived_emails', {
id: uuid('id').primaryKey().defaultRandom(),
ingestionSourceId: uuid('ingestion_source_id')
.notNull()
.references(() => ingestionSources.id, { onDelete: 'cascade' }),
userEmail: text('user_email').notNull(),
messageIdHeader: text('message_id_header'),
sentAt: timestamp('sent_at', { withTimezone: true }).notNull(),
subject: text('subject'),
senderName: text('sender_name'),
senderEmail: text('sender_email').notNull(),
recipients: jsonb('recipients'),
storagePath: text('storage_path').notNull(),
storageHashSha256: text('storage_hash_sha256').notNull(),
sizeBytes: bigint('size_bytes', { mode: 'number' }).notNull(),
isIndexed: boolean('is_indexed').notNull().default(false),
hasAttachments: boolean('has_attachments').notNull().default(false),
isOnLegalHold: boolean('is_on_legal_hold').notNull().default(false),
archivedAt: timestamp('archived_at', { withTimezone: true }).notNull().defaultNow(),
});
export const archivedEmails = pgTable(
'archived_emails',
{
id: uuid('id').primaryKey().defaultRandom(),
threadId: text('thread_id'),
ingestionSourceId: uuid('ingestion_source_id')
.notNull()
.references(() => ingestionSources.id, { onDelete: 'cascade' }),
userEmail: text('user_email').notNull(),
messageIdHeader: text('message_id_header'),
sentAt: timestamp('sent_at', { withTimezone: true }).notNull(),
subject: text('subject'),
senderName: text('sender_name'),
senderEmail: text('sender_email').notNull(),
recipients: jsonb('recipients'),
storagePath: text('storage_path').notNull(),
storageHashSha256: text('storage_hash_sha256').notNull(),
sizeBytes: bigint('size_bytes', { mode: 'number' }).notNull(),
isIndexed: boolean('is_indexed').notNull().default(false),
hasAttachments: boolean('has_attachments').notNull().default(false),
isOnLegalHold: boolean('is_on_legal_hold').notNull().default(false),
archivedAt: timestamp('archived_at', { withTimezone: true }).notNull().defaultNow(),
},
(table) => {
return {
threadIdIdx: index('thread_id_idx').on(table.threadId)
};
}
);
export const archivedEmailsRelations = relations(archivedEmails, ({ one }) => ({
ingestionSource: one(ingestionSources, {

View File

@@ -1,7 +1,7 @@
import { count, desc, eq } from 'drizzle-orm';
import { count, desc, eq, asc } from 'drizzle-orm';
import { db } from '../database';
import { archivedEmails, attachments, emailAttachments } from '../database/schema';
import type { PaginatedArchivedEmails, ArchivedEmail, Recipient } from '@open-archiver/types';
import type { PaginatedArchivedEmails, ArchivedEmail, Recipient, ThreadEmail } from '@open-archiver/types';
import { StorageService } from './StorageService';
import type { Readable } from 'stream';
@@ -11,6 +11,8 @@ interface DbRecipients {
bcc: { name: string; address: string; }[];
}
async function streamToBuffer(stream: Readable): Promise<Buffer> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
@@ -75,6 +77,21 @@ export class ArchivedEmailService {
return null;
}
let threadEmails: ThreadEmail[] = [];
if (email.threadId) {
threadEmails = await db.query.archivedEmails.findMany({
where: eq(archivedEmails.threadId, email.threadId),
orderBy: [asc(archivedEmails.sentAt)],
columns: {
id: true,
subject: true,
sentAt: true,
senderEmail: true,
},
});
}
const storage = new StorageService();
const rawStream = await storage.get(email.storagePath);
const raw = await streamToBuffer(rawStream as Readable);
@@ -82,7 +99,8 @@ export class ArchivedEmailService {
const mappedEmail = {
...email,
recipients: this.mapRecipients(email.recipients),
raw
raw,
thread: threadEmails
};
if (email.hasAttachments) {

View File

@@ -297,6 +297,7 @@ export class IngestionService {
.values({
ingestionSourceId: source.id,
userEmail,
threadId: email.threadId,
messageIdHeader: messageId,
sentAt: email.receivedAt,
subject: email.subject,

View File

@@ -9,7 +9,8 @@ import type {
} from '@open-archiver/types';
import type { IEmailConnector } from '../EmailProviderFactory';
import { logger } from '../../config/logger';
import { simpleParser, ParsedMail, Attachment, AddressObject } from 'mailparser';
import { simpleParser, ParsedMail, Attachment, AddressObject, Headers } from 'mailparser';
import { getThreadId } from './utils';
/**
* A connector for Google Workspace that uses a service account with domain-wide delegation
@@ -54,6 +55,7 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
return jwtClient;
}
/**
* Tests the connection and authentication by attempting to list the first user
* from the directory, impersonating the admin user.
@@ -150,7 +152,7 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
do {
const historyResponse: Common.GaxiosResponseWithHTTP2<gmail_v1.Schema$ListHistoryResponse> = await gmail.users.history.list({
userId: 'me',
userId: userEmail,
startHistoryId: this.newHistoryId,
pageToken: pageToken,
historyTypes: ['messageAdded']
@@ -167,7 +169,7 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
if (messageAdded.message?.id) {
try {
const msgResponse = await gmail.users.messages.get({
userId: 'me',
userId: userEmail,
id: messageAdded.message.id,
format: 'RAW'
});
@@ -186,8 +188,11 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
const addressArray = Array.isArray(addresses) ? addresses : [addresses];
return addressArray.flatMap(a => a.value.map(v => ({ name: v.name, address: v.address || '' })));
};
const threadId = getThreadId(parsedEmail.headers);
console.log('threadId', threadId);
yield {
id: msgResponse.data.id!,
threadId,
userEmail: userEmail,
eml: rawEmail,
from: mapAddresses(parsedEmail.from),
@@ -226,7 +231,7 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
let pageToken: string | undefined = undefined;
do {
const listResponse: Common.GaxiosResponseWithHTTP2<gmail_v1.Schema$ListMessagesResponse> = await gmail.users.messages.list({
userId: 'me',
userId: userEmail,
pageToken: pageToken
});
@@ -239,7 +244,7 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
if (message.id) {
try {
const msgResponse = await gmail.users.messages.get({
userId: 'me',
userId: userEmail,
id: message.id,
format: 'RAW'
});
@@ -258,8 +263,11 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
const addressArray = Array.isArray(addresses) ? addresses : [addresses];
return addressArray.flatMap(a => a.value.map(v => ({ name: v.name, address: v.address || '' })));
};
const threadId = getThreadId(parsedEmail.headers);
console.log('threadId', threadId);
yield {
id: msgResponse.data.id!,
threadId,
userEmail: userEmail,
eml: rawEmail,
from: mapAddresses(parsedEmail.from),
@@ -287,7 +295,7 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
} while (pageToken);
// After fetching all messages, get the latest history ID to use as the starting point for the next sync.
const profileResponse = await gmail.users.getProfile({ userId: 'me' });
const profileResponse = await gmail.users.getProfile({ userId: userEmail });
if (profileResponse.data.historyId) {
this.newHistoryId = profileResponse.data.historyId;
}

View File

@@ -1,8 +1,9 @@
import type { GenericImapCredentials, EmailObject, EmailAddress, SyncState, MailboxUser } from '@open-archiver/types';
import type { IEmailConnector } from '../EmailProviderFactory';
import { ImapFlow } from 'imapflow';
import { simpleParser, ParsedMail, Attachment, AddressObject } from 'mailparser';
import { simpleParser, ParsedMail, Attachment, AddressObject, Headers } from 'mailparser';
import { logger } from '../../config/logger';
import { getThreadId } from './utils';
export class ImapConnector implements IEmailConnector {
private client: ImapFlow;
@@ -192,8 +193,11 @@ export class ImapConnector implements IEmailConnector {
return addressArray.flatMap(a => a.value.map(v => ({ name: v.name, address: v.address || '' })));
};
const threadId = getThreadId(parsedEmail.headers);
return {
id: msg.uid.toString(),
threadId: threadId,
from: mapAddresses(parsedEmail.from),
to: mapAddresses(parsedEmail.to),
cc: mapAddresses(parsedEmail.cc),

View File

@@ -200,13 +200,17 @@ export class MicrosoftConnector implements IEmailConnector {
while (requestUrl) {
try {
const response = await this.graphClient.api(requestUrl).get();
const response = await this.graphClient.api(requestUrl)
.select('id,conversationId,@removed')
.get();
for (const message of response.value) {
if (message.id && !(message)['@removed']) {
const rawEmail = await this.getRawEmail(userEmail, message.id);
if (rawEmail) {
yield await this.parseEmail(rawEmail, message.id, userEmail);
const emailObject = await this.parseEmail(rawEmail, message.id, userEmail);
emailObject.threadId = message.conversationId; // Add conversationId as threadId
yield emailObject;
}
}
}

View File

@@ -0,0 +1,47 @@
import type { Headers } from 'mailparser';
function getHeaderValue(header: any): string | undefined {
if (typeof header === 'string') {
return header;
}
if (Array.isArray(header)) {
return getHeaderValue(header[0]);
}
if (typeof header === 'object' && header !== null && 'value' in header) {
return getHeaderValue(header.value);
}
return undefined;
}
export function getThreadId(headers: Headers): string | undefined {
const referencesHeader = headers.get('references');
if (referencesHeader) {
const references = getHeaderValue(referencesHeader);
if (references) {
return references.split(' ')[0].trim();
}
}
const inReplyToHeader = headers.get('in-reply-to');
if (inReplyToHeader) {
const inReplyTo = getHeaderValue(inReplyToHeader);
if (inReplyTo) {
return inReplyTo.trim();
}
}
const messageIdHeader = headers.get('message-id');
if (messageIdHeader) {
const messageId = getHeaderValue(messageIdHeader);
if (messageId) {
return messageId.trim();
}
}
console.warn('No thread ID found, returning undefined');
return undefined;
}

View File

@@ -0,0 +1,62 @@
<script lang="ts">
import { goto } from '$app/navigation';
import type { ArchivedEmail } from '@open-archiver/types';
let {
thread,
currentEmailId
}: {
thread: ArchivedEmail['thread'];
currentEmailId: string;
} = $props();
</script>
<div>
<div class="relative border-l-2 border-gray-200 pl-6">
{#if thread}
{#each thread as item, i (item.id)}
<div class="mb-8">
<span
class="absolute -left-3 flex h-6 w-6 items-center justify-center rounded-full bg-gray-200 ring-8 ring-white"
>
<svg
class="h-3 w-3 text-gray-600"
fill="currentColor"
viewBox="0 0 20 20"
xmlns="http://www.w3.org/2000/svg"
><path
fill-rule="evenodd"
d="M6 2a1 1 0 00-1 1v1H4a2 2 0 00-2 2v10a2 2 0 002 2h12a2 2 0 002-2V6a2 2 0 00-2-2h-1V3a1 1 0 10-2 0v1H7V3a1 1 0 00-1-1zm0 5a1 1 0 000 2h8a1 1 0 100-2H6z"
clip-rule="evenodd"
></path></svg
>
</span>
<h4
class:font-bold={item.id === currentEmailId}
class="text-md mb-2 {item.id !== currentEmailId
? 'text-blue-500 hover:underline'
: 'text-gray-900'}"
>
{#if item.id !== currentEmailId}
<a
href="/dashboard/archived-emails/{item.id}"
onclick={(e) => {
e.preventDefault();
goto(`/dashboard/archived-emails/${item.id}`, {
invalidateAll: true
});
}}>{item.subject || 'No Subject'}</a
>
{:else}
{item.subject || 'No Subject'}
{/if}
</h4>
<div class="flex flex-col space-y-2 text-sm font-normal leading-none text-gray-400">
<span>From: {item.senderEmail}</span>
<time class="">{new Date(item.sentAt).toLocaleString()}</time>
</div>
</div>
{/each}
{/if}
</div>
</div>

View File

@@ -3,11 +3,13 @@
import { Button } from '$lib/components/ui/button';
import * as Card from '$lib/components/ui/card';
import EmailPreview from '$lib/components/custom/EmailPreview.svelte';
import EmailThread from '$lib/components/custom/EmailThread.svelte';
import { api } from '$lib/api.client';
import { browser } from '$app/environment';
let { data }: { data: PageData } = $props();
const { email } = data;
let email = $derived(data.email);
async function download(path: string, filename: string) {
if (!browser) return;
@@ -41,7 +43,7 @@
<Card.Header>
<Card.Title>{email.subject || 'No Subject'}</Card.Title>
<Card.Description>
From: {email.senderName || email.senderEmail} | Sent: {new Date(
From: {email.senderEmail || email.senderName} | Sent: {new Date(
email.sentAt
).toLocaleString()}
</Card.Description>
@@ -79,7 +81,7 @@
</Card.Content>
</Card.Root>
</div>
<div class="col-span-3 md:col-span-1">
<div class="col-span-3 space-y-6 md:col-span-1">
<Card.Root>
<Card.Header>
<Card.Title>Actions</Card.Title>
@@ -90,6 +92,17 @@
>
</Card.Content>
</Card.Root>
{#if email.thread && email.thread.length > 1}
<Card.Root>
<Card.Header>
<Card.Title>Email thread</Card.Title>
</Card.Header>
<Card.Content>
<EmailThread thread={email.thread} currentEmailId={email.id} />
</Card.Content>
</Card.Root>
{/if}
</div>
</div>
{:else}

View File

@@ -31,7 +31,7 @@
setAlert({
type: 'warning',
title: 'Demo mode',
message: 'Editing is now allowed in demo mode.',
message: 'Editing is not allowed in demo mode.',
duration: 5000,
show: true
});

View File

@@ -17,6 +17,14 @@ export interface Attachment {
storagePath: string;
}
export interface ThreadEmail {
id: string; //the archivedemail id
subject: string | null;
sentAt: Date;
senderEmail: string;
}
/**
* Represents a single archived email.
*/
@@ -39,6 +47,7 @@ export interface ArchivedEmail {
archivedAt: Date;
attachments?: Attachment[];
raw?: Buffer;
thread?: ThreadEmail[];
}
/**

View File

@@ -23,6 +23,8 @@ export interface EmailAttachment {
export interface EmailObject {
/** A unique identifier for the email, typically assigned by the source provider. */
id: string;
/** An optional identifier for the email thread, used to group related emails. */
threadId?: string;
/** An array of `EmailAddress` objects representing the sender(s). */
from: EmailAddress[];
/** An array of `EmailAddress` objects representing the primary recipient(s). */