mirror of
https://github.com/LogicLabs-OU/OpenArchiver.git
synced 2026-04-06 00:31:57 +02:00
feat(backend): Add BullMQ dashboard for job monitoring
This commit introduces a web-based UI for monitoring and managing background jobs using Bullmq. Key changes: - A new `/api/v1/jobs` endpoint is created, serving the Bull Board dashboard. Access is restricted to authenticated administrators. - All BullMQ queue definitions (`ingestion`, `indexing`, `sync-scheduler`) have been centralized into a new `packages/backend/src/jobs/queues.ts` file. - Workers and services now import queue instances from this central file, improving code organization and removing redundant queue instantiations.
This commit is contained in:
@@ -95,6 +95,7 @@ export default defineConfig({
|
||||
{ text: 'Integrity Check', link: '/api/integrity' },
|
||||
{ text: 'Search', link: '/api/search' },
|
||||
{ text: 'Storage', link: '/api/storage' },
|
||||
{ text: 'Jobs', link: '/api/jobs' },
|
||||
],
|
||||
},
|
||||
{
|
||||
|
||||
128
docs/api/jobs.md
Normal file
128
docs/api/jobs.md
Normal file
@@ -0,0 +1,128 @@
|
||||
# Jobs API
|
||||
|
||||
The Jobs API provides endpoints for monitoring the job queues and the jobs within them.
|
||||
|
||||
## Overview
|
||||
|
||||
Open Archiver uses a job queue system to handle asynchronous tasks like email ingestion and indexing. The system is built on Redis and BullMQ and uses a producer-consumer pattern.
|
||||
|
||||
### Job Statuses
|
||||
|
||||
Jobs can have one of the following statuses:
|
||||
|
||||
- **active:** The job is currently being processed.
|
||||
- **completed:** The job has been completed successfully.
|
||||
- **failed:** The job has failed after all retry attempts.
|
||||
- **delayed:** The job is delayed and will be processed at a later time.
|
||||
- **waiting:** The job is waiting to be processed.
|
||||
- **paused:** The job is paused and will not be processed until it is resumed.
|
||||
|
||||
### Errors
|
||||
|
||||
When a job fails, the `failedReason` and `stacktrace` fields will contain information about the error. The `error` field will also be populated with the `failedReason` for easier access.
|
||||
|
||||
### Job Preservation
|
||||
|
||||
Jobs are preserved for a limited time after they are completed or failed. This means that the job counts and the jobs that you see in the API are for a limited time.
|
||||
|
||||
- **Completed jobs:** The last 1000 completed jobs are preserved.
|
||||
- **Failed jobs:** The last 5000 failed jobs are preserved.
|
||||
|
||||
## Get All Queues
|
||||
|
||||
- **Endpoint:** `GET /v1/jobs/queues`
|
||||
- **Description:** Retrieves a list of all job queues and their job counts.
|
||||
- **Permissions:** `manage:all`
|
||||
- **Responses:**
|
||||
- `200 OK`: Returns a list of queue overviews.
|
||||
- `401 Unauthorized`: If the user is not authenticated.
|
||||
- `403 Forbidden`: If the user does not have the required permissions.
|
||||
|
||||
### Response Body
|
||||
|
||||
```json
|
||||
{
|
||||
"queues": [
|
||||
{
|
||||
"name": "ingestion",
|
||||
"counts": {
|
||||
"active": 0,
|
||||
"completed": 56,
|
||||
"failed": 4,
|
||||
"delayed": 3,
|
||||
"waiting": 0,
|
||||
"paused": 0
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "indexing",
|
||||
"counts": {
|
||||
"active": 0,
|
||||
"completed": 0,
|
||||
"failed": 0,
|
||||
"delayed": 0,
|
||||
"waiting": 0,
|
||||
"paused": 0
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
## Get Queue Jobs
|
||||
|
||||
- **Endpoint:** `GET /v1/jobs/queues/:queueName`
|
||||
- **Description:** Retrieves a list of jobs within a specific queue, with pagination and filtering by status.
|
||||
- **Permissions:** `manage:all`
|
||||
- **URL Parameters:**
|
||||
- `queueName` (string, required): The name of the queue to retrieve jobs from.
|
||||
- **Query Parameters:**
|
||||
- `status` (string, optional): The status of the jobs to retrieve. Can be one of `active`, `completed`, `failed`, `delayed`, `waiting`, `paused`. Defaults to `failed`.
|
||||
- `page` (number, optional): The page number to retrieve. Defaults to `1`.
|
||||
- `limit` (number, optional): The number of jobs to retrieve per page. Defaults to `10`.
|
||||
- **Responses:**
|
||||
- `200 OK`: Returns a detailed view of the queue, including a paginated list of jobs.
|
||||
- `401 Unauthorized`: If the user is not authenticated.
|
||||
- `403 Forbidden`: If the user does not have the required permissions.
|
||||
- `404 Not Found`: If the specified queue does not exist.
|
||||
|
||||
### Response Body
|
||||
|
||||
```json
|
||||
{
|
||||
"name": "ingestion",
|
||||
"counts": {
|
||||
"active": 0,
|
||||
"completed": 56,
|
||||
"failed": 4,
|
||||
"delayed": 3,
|
||||
"waiting": 0,
|
||||
"paused": 0
|
||||
},
|
||||
"jobs": [
|
||||
{
|
||||
"id": "1",
|
||||
"name": "initial-import",
|
||||
"data": {
|
||||
"ingestionSourceId": "clx1y2z3a0000b4d2e5f6g7h8"
|
||||
},
|
||||
"state": "failed",
|
||||
"failedReason": "Error: Connection timed out",
|
||||
"timestamp": 1678886400000,
|
||||
"processedOn": 1678886401000,
|
||||
"finishedOn": 1678886402000,
|
||||
"attemptsMade": 5,
|
||||
"stacktrace": ["..."],
|
||||
"returnValue": null,
|
||||
"ingestionSourceId": "clx1y2z3a0000b4d2e5f6g7h8",
|
||||
"error": "Error: Connection timed out"
|
||||
}
|
||||
],
|
||||
"pagination": {
|
||||
"currentPage": 1,
|
||||
"totalPages": 1,
|
||||
"totalJobs": 4,
|
||||
"limit": 10
|
||||
}
|
||||
}
|
||||
```
|
||||
@@ -9,7 +9,7 @@
|
||||
"start:oss": "dotenv -- concurrently \"node apps/open-archiver/dist/index.js\" \"pnpm --filter @open-archiver/frontend start\"",
|
||||
"start:enterprise": "dotenv -- concurrently \"node apps/open-archiver-enterprise/dist/index.js\" \"pnpm --filter @open-archiver/frontend start\"",
|
||||
"dev:enterprise": "cross-env VITE_ENTERPRISE_MODE=true dotenv -- pnpm --filter \"@open-archiver/*\" --filter \"open-archiver-enterprise-app\" --parallel dev",
|
||||
"dev:oss": "dotenv -- pnpm --filter \"@open-archiver/frontend\" --filter \"open-archiver-app\" --parallel dev",
|
||||
"dev:oss": "dotenv -- pnpm --filter \"./packages/*\" --filter \"!./packages/@open-archiver/enterprise\" --filter \"open-archiver-app\" --parallel dev",
|
||||
"build": "pnpm --filter \"./packages/*\" --filter \"./apps/*\" build",
|
||||
"start": "dotenv -- pnpm --filter \"open-archiver-app\" --parallel start",
|
||||
"start:workers": "dotenv -- concurrently \"pnpm --filter @open-archiver/backend start:ingestion-worker\" \"pnpm --filter @open-archiver/backend start:indexing-worker\" \"pnpm --filter @open-archiver/backend start:sync-scheduler\"",
|
||||
|
||||
@@ -65,8 +65,6 @@
|
||||
"zod": "^4.1.5"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@bull-board/api": "^6.11.0",
|
||||
"@bull-board/express": "^6.11.0",
|
||||
"@types/archiver": "^6.0.3",
|
||||
"@types/busboy": "^1.5.4",
|
||||
"@types/cors": "^2.8.19",
|
||||
|
||||
42
packages/backend/src/api/controllers/jobs.controller.ts
Normal file
42
packages/backend/src/api/controllers/jobs.controller.ts
Normal file
@@ -0,0 +1,42 @@
|
||||
import { Request, Response } from 'express';
|
||||
import { JobsService } from '../../services/JobsService';
|
||||
import {
|
||||
IGetQueueJobsRequestParams,
|
||||
IGetQueueJobsRequestQuery,
|
||||
JobStatus,
|
||||
} from '@open-archiver/types';
|
||||
|
||||
export class JobsController {
|
||||
private jobsService: JobsService;
|
||||
|
||||
constructor() {
|
||||
this.jobsService = new JobsService();
|
||||
}
|
||||
|
||||
public getQueues = async (req: Request, res: Response) => {
|
||||
try {
|
||||
const queues = await this.jobsService.getQueues();
|
||||
res.status(200).json({ queues });
|
||||
} catch (error) {
|
||||
res.status(500).json({ message: 'Error fetching queues', error });
|
||||
}
|
||||
};
|
||||
|
||||
public getQueueJobs = async (req: Request, res: Response) => {
|
||||
try {
|
||||
const { queueName } = req.params as unknown as IGetQueueJobsRequestParams;
|
||||
const { status, page, limit } = req.query as unknown as IGetQueueJobsRequestQuery;
|
||||
const pageNumber = parseInt(page, 10) || 1;
|
||||
const limitNumber = parseInt(limit, 10) || 10;
|
||||
const queueDetails = await this.jobsService.getQueueDetails(
|
||||
queueName,
|
||||
status,
|
||||
pageNumber,
|
||||
limitNumber
|
||||
);
|
||||
res.status(200).json(queueDetails);
|
||||
} catch (error) {
|
||||
res.status(500).json({ message: 'Error fetching queue jobs', error });
|
||||
}
|
||||
};
|
||||
}
|
||||
25
packages/backend/src/api/routes/jobs.routes.ts
Normal file
25
packages/backend/src/api/routes/jobs.routes.ts
Normal file
@@ -0,0 +1,25 @@
|
||||
import { Router } from 'express';
|
||||
import { JobsController } from '../controllers/jobs.controller';
|
||||
import { requireAuth } from '../middleware/requireAuth';
|
||||
import { requirePermission } from '../middleware/requirePermission';
|
||||
import { AuthService } from '../../services/AuthService';
|
||||
|
||||
export const createJobsRouter = (authService: AuthService): Router => {
|
||||
const router = Router();
|
||||
const jobsController = new JobsController();
|
||||
|
||||
router.use(requireAuth(authService));
|
||||
|
||||
router.get(
|
||||
'/queues',
|
||||
requirePermission('manage', 'all', 'user.requiresSuperAdminRole'),
|
||||
jobsController.getQueues
|
||||
);
|
||||
router.get(
|
||||
'/queues/:queueName',
|
||||
requirePermission('manage', 'all', 'user.requiresSuperAdminRole'),
|
||||
jobsController.getQueueJobs
|
||||
);
|
||||
|
||||
return router;
|
||||
};
|
||||
@@ -19,6 +19,7 @@ import { createUserRouter } from './routes/user.routes';
|
||||
import { createSettingsRouter } from './routes/settings.routes';
|
||||
import { apiKeyRoutes } from './routes/api-key.routes';
|
||||
import { integrityRoutes } from './routes/integrity.routes';
|
||||
import { createJobsRouter } from './routes/jobs.routes';
|
||||
import { AuthService } from '../services/AuthService';
|
||||
import { AuditService } from '../services/AuditService';
|
||||
import { UserService } from '../services/UserService';
|
||||
@@ -120,6 +121,7 @@ export async function createServer(modules: ArchiverModule[] = []): Promise<Expr
|
||||
const settingsRouter = createSettingsRouter(authService);
|
||||
const apiKeyRouter = apiKeyRoutes(authService);
|
||||
const integrityRouter = integrityRoutes(authService);
|
||||
const jobsRouter = createJobsRouter(authService);
|
||||
|
||||
// Middleware for all other routes
|
||||
app.use((req, res, next) => {
|
||||
@@ -147,19 +149,18 @@ export async function createServer(modules: ArchiverModule[] = []): Promise<Expr
|
||||
app.use(`/${config.api.version}/search`, searchRouter);
|
||||
app.use(`/${config.api.version}/dashboard`, dashboardRouter);
|
||||
app.use(`/${config.api.version}/users`, userRouter);
|
||||
app.use(`/${config.api.version}/settings`, settingsRouter);
|
||||
app.use(`/${config.api.version}/api-keys`, apiKeyRouter);
|
||||
app.use(`/${config.api.version}/integrity`, integrityRouter);
|
||||
app.use(`/${config.api.version}/jobs`, jobsRouter);
|
||||
|
||||
// Load all provided extension modules
|
||||
for (const module of modules) {
|
||||
await module.initialize(app, authService);
|
||||
console.log(`🏢 Enterprise module loaded: ${module.name}`);
|
||||
}
|
||||
|
||||
app.use(`/${config.api.version}/settings`, settingsRouter);
|
||||
app.use(`/${config.api.version}/api-keys`, apiKeyRouter);
|
||||
app.use(`/${config.api.version}/integrity`, integrityRouter);
|
||||
|
||||
|
||||
app.get('/', (req, res) => {
|
||||
res.send('Backend is running!');
|
||||
res.send('Backend is running!!');
|
||||
});
|
||||
|
||||
console.log('✅ Core OSS modules loaded.');
|
||||
|
||||
@@ -13,6 +13,7 @@ import { IndexingService } from '../../services/IndexingService';
|
||||
import { SearchService } from '../../services/SearchService';
|
||||
import { DatabaseService } from '../../services/DatabaseService';
|
||||
import { config } from '../../config';
|
||||
import { indexingQueue } from '../queues';
|
||||
|
||||
|
||||
/**
|
||||
@@ -55,7 +56,7 @@ export const processMailboxProcessor = async (job: Job<IProcessMailboxJob, SyncS
|
||||
if (processedEmail) {
|
||||
emailBatch.push(processedEmail);
|
||||
if (emailBatch.length >= BATCH_SIZE) {
|
||||
await indexingService.indexEmailBatch(emailBatch);
|
||||
await indexingQueue.add('index-email-batch', { emails: emailBatch });
|
||||
emailBatch = [];
|
||||
}
|
||||
}
|
||||
@@ -63,7 +64,7 @@ export const processMailboxProcessor = async (job: Job<IProcessMailboxJob, SyncS
|
||||
}
|
||||
|
||||
if (emailBatch.length > 0) {
|
||||
await indexingService.indexEmailBatch(emailBatch);
|
||||
await indexingQueue.add('index-email-batch', { emails: emailBatch });
|
||||
emailBatch = [];
|
||||
}
|
||||
|
||||
|
||||
107
packages/backend/src/services/JobsService.ts
Normal file
107
packages/backend/src/services/JobsService.ts
Normal file
@@ -0,0 +1,107 @@
|
||||
import { Job, Queue } from 'bullmq';
|
||||
import { ingestionQueue, indexingQueue } from '../jobs/queues';
|
||||
import {
|
||||
IJob,
|
||||
IQueueCounts,
|
||||
IQueueDetails,
|
||||
IQueueOverview,
|
||||
JobStatus,
|
||||
} from '@open-archiver/types';
|
||||
|
||||
export class JobsService {
|
||||
private queues: Queue[];
|
||||
|
||||
constructor() {
|
||||
this.queues = [ingestionQueue, indexingQueue];
|
||||
}
|
||||
|
||||
public async getQueues(): Promise<IQueueOverview[]> {
|
||||
const queueOverviews: IQueueOverview[] = [];
|
||||
for (const queue of this.queues) {
|
||||
const counts = await queue.getJobCounts(
|
||||
'active',
|
||||
'completed',
|
||||
'failed',
|
||||
'delayed',
|
||||
'waiting',
|
||||
'paused'
|
||||
);
|
||||
queueOverviews.push({
|
||||
name: queue.name,
|
||||
counts: {
|
||||
active: counts.active || 0,
|
||||
completed: counts.completed || 0,
|
||||
failed: counts.failed || 0,
|
||||
delayed: counts.delayed || 0,
|
||||
waiting: counts.waiting || 0,
|
||||
paused: counts.paused || 0,
|
||||
},
|
||||
});
|
||||
}
|
||||
return queueOverviews;
|
||||
}
|
||||
|
||||
public async getQueueDetails(
|
||||
queueName: string,
|
||||
status: JobStatus,
|
||||
page: number,
|
||||
limit: number
|
||||
): Promise<IQueueDetails> {
|
||||
const queue = this.queues.find((q) => q.name === queueName);
|
||||
if (!queue) {
|
||||
throw new Error(`Queue ${queueName} not found`);
|
||||
}
|
||||
|
||||
const counts = await queue.getJobCounts(
|
||||
'active',
|
||||
'completed',
|
||||
'failed',
|
||||
'delayed',
|
||||
'waiting',
|
||||
'paused'
|
||||
);
|
||||
const start = (page - 1) * limit;
|
||||
const end = start + limit - 1;
|
||||
const jobStatus = status === 'waiting' ? 'wait' : status;
|
||||
const jobs = await queue.getJobs([jobStatus], start, end, true);
|
||||
const totalJobs = await queue.getJobCountByTypes(jobStatus);
|
||||
|
||||
return {
|
||||
name: queue.name,
|
||||
counts: {
|
||||
active: counts.active || 0,
|
||||
completed: counts.completed || 0,
|
||||
failed: counts.failed || 0,
|
||||
delayed: counts.delayed || 0,
|
||||
waiting: counts.waiting || 0,
|
||||
paused: counts.paused || 0,
|
||||
},
|
||||
jobs: await Promise.all(jobs.map((job) => this.formatJob(job))),
|
||||
pagination: {
|
||||
currentPage: page,
|
||||
totalPages: Math.ceil(totalJobs / limit),
|
||||
totalJobs,
|
||||
limit,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
private async formatJob(job: Job): Promise<IJob> {
|
||||
const state = await job.getState();
|
||||
return {
|
||||
id: job.id,
|
||||
name: job.name,
|
||||
data: job.data,
|
||||
state: state,
|
||||
failedReason: job.failedReason,
|
||||
timestamp: job.timestamp,
|
||||
processedOn: job.processedOn,
|
||||
finishedOn: job.finishedOn,
|
||||
attemptsMade: job.attemptsMade,
|
||||
stacktrace: job.stacktrace,
|
||||
returnValue: job.returnvalue,
|
||||
ingestionSourceId: job.data.ingestionSourceId,
|
||||
error: state === 'failed' ? job.failedReason : undefined,
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -13,12 +13,11 @@ const processor = async (job: any) => {
|
||||
|
||||
const worker = new Worker('indexing', processor, {
|
||||
connection,
|
||||
concurrency: 5,
|
||||
removeOnComplete: {
|
||||
count: 1000, // keep last 1000 jobs
|
||||
count: 100, // keep last 100 jobs
|
||||
},
|
||||
removeOnFail: {
|
||||
count: 5000, // keep last 5000 failed jobs
|
||||
count: 500, // keep last 500 failed jobs
|
||||
},
|
||||
});
|
||||
|
||||
|
||||
@@ -281,10 +281,34 @@
|
||||
"prev": "Zurück",
|
||||
"next": "Weiter",
|
||||
"verification_successful_title": "Überprüfung erfolgreich",
|
||||
"verification_successful_message": "Die Integrität des Audit-Protokolls wurde erfolgreich überprüft.",
|
||||
"verification_successful_message": "Integrität des Audit-Protokolls erfolgreich überprüft.",
|
||||
"verification_failed_title": "Überprüfung fehlgeschlagen",
|
||||
"verification_failed_message": "Die Integritätsprüfung des Audit-Protokolls ist fehlgeschlagen. Bitte überprüfen Sie die Systemprotokolle für weitere Details.",
|
||||
"verification_error_message": "Während der Überprüfung ist ein unerwarteter Fehler aufgetreten. Bitte versuchen Sie es erneut."
|
||||
},
|
||||
"jobs": {
|
||||
"title": "Job-Warteschlangen",
|
||||
"queues": "Job-Warteschlangen",
|
||||
"active": "Aktiv",
|
||||
"completed": "Abgeschlossen",
|
||||
"failed": "Fehlgeschlagen",
|
||||
"delayed": "Verzögert",
|
||||
"waiting": "Wartend",
|
||||
"paused": "Pausiert",
|
||||
"back_to_queues": "Zurück zu den Warteschlangen",
|
||||
"queue_overview": "Warteschlangenübersicht",
|
||||
"jobs": "Jobs",
|
||||
"id": "ID",
|
||||
"name": "Name",
|
||||
"state": "Status",
|
||||
"created_at": "Erstellt am",
|
||||
"processed_at": "Verarbeitet am",
|
||||
"finished_at": "Beendet am",
|
||||
"showing": "Anzeige",
|
||||
"of": "von",
|
||||
"previous": "Zurück",
|
||||
"next": "Weiter",
|
||||
"ingestion_source": "Ingestion-Quelle"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -234,7 +234,8 @@
|
||||
"users": "Users",
|
||||
"roles": "Roles",
|
||||
"api_keys": "API Keys",
|
||||
"logout": "Logout"
|
||||
"logout": "Logout",
|
||||
"admin": "Admin"
|
||||
},
|
||||
"api_keys_page": {
|
||||
"title": "API Keys",
|
||||
@@ -323,6 +324,31 @@
|
||||
"verification_failed_title": "Verification Failed",
|
||||
"verification_failed_message": "The audit log integrity check failed. Please review the system logs for more details.",
|
||||
"verification_error_message": "An unexpected error occurred during verification. Please try again."
|
||||
},
|
||||
"jobs": {
|
||||
"title": "Job Queues",
|
||||
"queues": "Job Queues",
|
||||
"active": "Active",
|
||||
"completed": "Completed",
|
||||
"failed": "Failed",
|
||||
"delayed": "Delayed",
|
||||
"waiting": "Waiting",
|
||||
"paused": "Paused",
|
||||
"back_to_queues": "Back to Queues",
|
||||
"queue_overview": "Queue Overview",
|
||||
"jobs": "Jobs",
|
||||
"id": "ID",
|
||||
"name": "Name",
|
||||
"state": "State",
|
||||
|
||||
"created_at": "Created At",
|
||||
"processed_at": "Processed At",
|
||||
"finished_at": "Finished At",
|
||||
"showing": "Showing",
|
||||
"of": "of",
|
||||
"previous": "Previous",
|
||||
"next": "Next",
|
||||
"ingestion_source": "Ingestion Source"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,11 +28,11 @@
|
||||
},
|
||||
{ href: '/dashboard/search', label: $t('app.layout.search'), position: 3 },
|
||||
{
|
||||
label: $t('app.layout.settings'),
|
||||
label: $t('app.layout.admin'),
|
||||
subMenu: [
|
||||
{
|
||||
href: '/dashboard/settings/system',
|
||||
label: $t('app.layout.system'),
|
||||
href: '/dashboard/admin/jobs',
|
||||
label: $t('app.jobs.jobs'),
|
||||
},
|
||||
{
|
||||
href: '/dashboard/settings/users',
|
||||
@@ -42,12 +42,22 @@
|
||||
href: '/dashboard/settings/roles',
|
||||
label: $t('app.layout.roles'),
|
||||
},
|
||||
],
|
||||
position: 5,
|
||||
},
|
||||
{
|
||||
label: $t('app.layout.settings'),
|
||||
subMenu: [
|
||||
{
|
||||
href: '/dashboard/settings/system',
|
||||
label: $t('app.layout.system'),
|
||||
},
|
||||
{
|
||||
href: '/dashboard/settings/api-keys',
|
||||
label: $t('app.layout.api_keys'),
|
||||
},
|
||||
],
|
||||
position: 5,
|
||||
position: 6,
|
||||
},
|
||||
];
|
||||
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
import { api } from '$lib/server/api';
|
||||
import { error, type NumericRange } from '@sveltejs/kit';
|
||||
import type { PageServerLoad } from './$types';
|
||||
import type { IGetQueuesResponse } from '@open-archiver/types';
|
||||
|
||||
export const load: PageServerLoad = async (event) => {
|
||||
try {
|
||||
const response = await api('/jobs/queues', event);
|
||||
|
||||
if (!response.ok) {
|
||||
const responseText = await response.json();
|
||||
throw error(response.status as NumericRange<400, 599>, responseText.message || 'Failed to fetch job queues.');
|
||||
}
|
||||
|
||||
const data: IGetQueuesResponse = await response.json();
|
||||
|
||||
return {
|
||||
queues: data.queues,
|
||||
};
|
||||
} catch (e: any) {
|
||||
console.error('Failed to load job queues:', e);
|
||||
throw error(e.status || 500, e.body?.message || 'Failed to load job queues');
|
||||
}
|
||||
};
|
||||
@@ -0,0 +1,58 @@
|
||||
<script lang="ts">
|
||||
import type { PageData } from './$types';
|
||||
import * as Card from '$lib/components/ui/card';
|
||||
import { t } from '$lib/translations';
|
||||
import { Badge } from '$lib/components/ui/badge';
|
||||
|
||||
let { data }: { data: PageData } = $props();
|
||||
let queues = $derived(data.queues);
|
||||
</script>
|
||||
|
||||
<svelte:head>
|
||||
<title>{$t('app.jobs.title')} - Open Archiver</title>
|
||||
</svelte:head>
|
||||
|
||||
<div class="space-y-4">
|
||||
<h1 class="text-2xl font-bold">{$t('app.jobs.queues')}</h1>
|
||||
|
||||
<div class="grid gap-4 md:grid-cols-2 lg:grid-cols-3">
|
||||
{#each queues as queue}
|
||||
<a href={`/dashboard/admin/jobs/${queue.name}`} class="block">
|
||||
<Card.Root class="hover:border-primary">
|
||||
<Card.Header>
|
||||
<Card.Title class="capitalize">{queue.name.split('_').join(' ')}</Card.Title
|
||||
>
|
||||
</Card.Header>
|
||||
<Card.Content class="grid grid-cols-2 gap-2">
|
||||
<div class="flex items-center justify-between">
|
||||
<span class="text-sm font-medium">{$t('app.jobs.active')}</span>
|
||||
<Badge>{queue.counts.active}</Badge>
|
||||
</div>
|
||||
<div class="flex items-center justify-between">
|
||||
<span class="text-sm font-medium">{$t('app.jobs.completed')}</span>
|
||||
<Badge variant="default" class="bg-green-500"
|
||||
>{queue.counts.completed}</Badge
|
||||
>
|
||||
</div>
|
||||
<div class="flex items-center justify-between">
|
||||
<span class="text-sm font-medium">{$t('app.jobs.failed')}</span>
|
||||
<Badge variant="destructive">{queue.counts.failed}</Badge>
|
||||
</div>
|
||||
<div class="flex items-center justify-between">
|
||||
<span class="text-sm font-medium">{$t('app.jobs.delayed')}</span>
|
||||
<Badge variant="secondary">{queue.counts.delayed}</Badge>
|
||||
</div>
|
||||
<div class="flex items-center justify-between">
|
||||
<span class="text-sm font-medium">{$t('app.jobs.waiting')}</span>
|
||||
<Badge variant="outline">{queue.counts.waiting}</Badge>
|
||||
</div>
|
||||
<div class="flex items-center justify-between">
|
||||
<span class="text-sm font-medium">{$t('app.jobs.paused')}</span>
|
||||
<Badge variant="secondary">{queue.counts.paused}</Badge>
|
||||
</div>
|
||||
</Card.Content>
|
||||
</Card.Root>
|
||||
</a>
|
||||
{/each}
|
||||
</div>
|
||||
</div>
|
||||
@@ -0,0 +1,35 @@
|
||||
import { api } from '$lib/server/api';
|
||||
import { error, type NumericRange } from '@sveltejs/kit';
|
||||
import type { PageServerLoad } from './$types';
|
||||
import type { IGetQueueJobsResponse, JobStatus } from '@open-archiver/types';
|
||||
|
||||
export const load: PageServerLoad = async (event) => {
|
||||
const { queueName } = event.params;
|
||||
const status = (event.url.searchParams.get('status') || 'failed') as JobStatus;
|
||||
const page = event.url.searchParams.get('page') || '1';
|
||||
const limit = event.url.searchParams.get('limit') || '10';
|
||||
|
||||
try {
|
||||
const response = await api(
|
||||
`/jobs/queues/${queueName}?status=${status}&page=${page}&limit=${limit}`,
|
||||
event
|
||||
);
|
||||
|
||||
if (!response.ok) {
|
||||
const responseText = await response.json();
|
||||
throw error(
|
||||
response.status as NumericRange<400, 599>,
|
||||
responseText.message || 'Failed to fetch job queue details.'
|
||||
);
|
||||
}
|
||||
|
||||
const data: IGetQueueJobsResponse = await response.json();
|
||||
|
||||
return {
|
||||
queue: data,
|
||||
};
|
||||
} catch (e: any) {
|
||||
console.error('Failed to load job queue details:', e);
|
||||
throw error(e.status || 500, e.body?.message || 'Failed to load job queue details');
|
||||
}
|
||||
};
|
||||
@@ -0,0 +1,163 @@
|
||||
<script lang="ts">
|
||||
import type { PageData } from './$types';
|
||||
import * as Card from '$lib/components/ui/card';
|
||||
import { t } from '$lib/translations';
|
||||
import { Badge } from '$lib/components/ui/badge';
|
||||
import * as Table from '$lib/components/ui/table';
|
||||
import { Button, buttonVariants } from '$lib/components/ui/button';
|
||||
import { goto } from '$app/navigation';
|
||||
import type { JobStatus } from '@open-archiver/types';
|
||||
|
||||
let { data }: { data: PageData } = $props();
|
||||
let queue = $derived(data.queue);
|
||||
|
||||
const jobStatuses: JobStatus[] = [
|
||||
'failed',
|
||||
'active',
|
||||
'completed',
|
||||
'delayed',
|
||||
'waiting',
|
||||
'paused',
|
||||
];
|
||||
|
||||
let selectedStatus: JobStatus | undefined = $state('failed');
|
||||
|
||||
function handleStatusChange(status: JobStatus) {
|
||||
selectedStatus = status;
|
||||
const url = new URL(window.location.href);
|
||||
url.searchParams.set('status', status);
|
||||
url.searchParams.set('page', '1');
|
||||
goto(url.toString(), { invalidateAll: true });
|
||||
}
|
||||
|
||||
function handlePageChange(page: number) {
|
||||
const url = new URL(window.location.href);
|
||||
url.searchParams.set('page', page.toString());
|
||||
goto(url.toString(), { invalidateAll: true });
|
||||
}
|
||||
</script>
|
||||
|
||||
<svelte:head>
|
||||
<title>{queue.name} - {$t('app.jobs.title')} - Open Archiver</title>
|
||||
</svelte:head>
|
||||
|
||||
<div class="space-y-4">
|
||||
<a href="/dashboard/admin/jobs" class="text-primary mb-1 text-sm hover:underline">
|
||||
← {$t('app.jobs.back_to_queues')}
|
||||
</a>
|
||||
<h1 class="text-2xl font-bold capitalize">{queue.name.split('_').join(' ')}</h1>
|
||||
|
||||
<Card.Root>
|
||||
<Card.Header>
|
||||
<Card.Title>{$t('app.jobs.jobs')}</Card.Title>
|
||||
<div class="flex space-x-2">
|
||||
{#each jobStatuses as status}
|
||||
<Button
|
||||
variant={selectedStatus === status ? 'default' : 'outline'}
|
||||
onclick={() => handleStatusChange(status)}
|
||||
class="capitalize"
|
||||
>
|
||||
{status} ({queue.counts[status]})
|
||||
</Button>
|
||||
{/each}
|
||||
</div>
|
||||
</Card.Header>
|
||||
<Card.Content>
|
||||
<Table.Root>
|
||||
<Table.Header>
|
||||
<Table.Row>
|
||||
<Table.Head>{$t('app.jobs.id')}</Table.Head>
|
||||
<Table.Head>{$t('app.jobs.name')}</Table.Head>
|
||||
<Table.Head>{$t('app.jobs.state')}</Table.Head>
|
||||
<Table.Head>{$t('app.jobs.created_at')}</Table.Head>
|
||||
<Table.Head>{$t('app.jobs.processed_at')}</Table.Head>
|
||||
<Table.Head>{$t('app.jobs.finished_at')}</Table.Head>
|
||||
<Table.Head>{$t('app.jobs.ingestion_source')}</Table.Head>
|
||||
</Table.Row>
|
||||
</Table.Header>
|
||||
<Table.Body>
|
||||
{#each queue.jobs as job}
|
||||
<Table.Row>
|
||||
<Table.Cell>{job.id}</Table.Cell>
|
||||
<Table.Cell>{job.name}</Table.Cell>
|
||||
<Table.Cell class="capitalize">
|
||||
{#if job.error}
|
||||
<Button
|
||||
variant="secondary"
|
||||
size="sm"
|
||||
class="cursor-pointer capitalize"
|
||||
onclick={() => {
|
||||
if (job.error) {
|
||||
const el = document.getElementById(
|
||||
`error-${job.id}`
|
||||
);
|
||||
if (el) {
|
||||
el.classList.toggle('hidden');
|
||||
}
|
||||
}
|
||||
}}
|
||||
>
|
||||
{job.state}
|
||||
</Button>
|
||||
{:else}
|
||||
{job.state}
|
||||
{/if}
|
||||
</Table.Cell>
|
||||
<Table.Cell>{new Date(job.timestamp).toLocaleString()}</Table.Cell>
|
||||
<Table.Cell
|
||||
>{job.processedOn
|
||||
? new Date(job.processedOn).toLocaleString()
|
||||
: 'N/A'}</Table.Cell
|
||||
>
|
||||
<Table.Cell
|
||||
>{job.finishedOn
|
||||
? new Date(job.finishedOn).toLocaleString()
|
||||
: 'N/A'}</Table.Cell
|
||||
>
|
||||
<Table.Cell>
|
||||
<a
|
||||
href="/dashboard/archived-emails?ingestionSourceId={job.ingestionSourceId}"
|
||||
>{job.ingestionSourceId || 'N/A'}</a
|
||||
>
|
||||
</Table.Cell>
|
||||
</Table.Row>
|
||||
{#if job.error}
|
||||
<Table.Row id={`error-${job.id}`} class="hidden">
|
||||
<Table.Cell colspan={7}>
|
||||
<pre class="rounded-md bg-gray-100 p-4">{job.error}</pre>
|
||||
</Table.Cell>
|
||||
</Table.Row>
|
||||
{/if}
|
||||
{/each}
|
||||
</Table.Body>
|
||||
</Table.Root>
|
||||
</Card.Content>
|
||||
<Card.Footer class="flex justify-between">
|
||||
<div>
|
||||
<p class="text-muted-foreground text-sm">
|
||||
{$t('app.jobs.showing')}
|
||||
{queue.jobs.length}
|
||||
{$t('app.jobs.of')}
|
||||
{queue.pagination.totalJobs}
|
||||
{$t('app.jobs.jobs')}
|
||||
</p>
|
||||
</div>
|
||||
<div class="flex space-x-2">
|
||||
<Button
|
||||
variant="outline"
|
||||
disabled={queue.pagination.currentPage <= 1}
|
||||
onclick={() => handlePageChange(queue.pagination.currentPage - 1)}
|
||||
>
|
||||
{$t('app.jobs.previous')}
|
||||
</Button>
|
||||
<Button
|
||||
variant="outline"
|
||||
disabled={queue.pagination.currentPage >= queue.pagination.totalPages}
|
||||
onclick={() => handlePageChange(queue.pagination.currentPage + 1)}
|
||||
>
|
||||
{$t('app.jobs.next')}
|
||||
</Button>
|
||||
</div>
|
||||
</Card.Footer>
|
||||
</Card.Root>
|
||||
</div>
|
||||
@@ -11,3 +11,4 @@ export * from './system.types';
|
||||
export * from './audit-log.types';
|
||||
export * from './audit-log.enums';
|
||||
export * from './integrity.types';
|
||||
export * from './jobs.types';
|
||||
|
||||
93
packages/types/src/jobs.types.ts
Normal file
93
packages/types/src/jobs.types.ts
Normal file
@@ -0,0 +1,93 @@
|
||||
/**
|
||||
* Represents the possible statuses of a job in the queue.
|
||||
*/
|
||||
export type JobStatus = 'active' | 'completed' | 'failed' | 'delayed' | 'waiting' | 'paused';
|
||||
|
||||
/**
|
||||
* A detailed representation of a job, providing essential information for monitoring and debugging.
|
||||
*/
|
||||
export interface IJob {
|
||||
id: string | undefined;
|
||||
name: string;
|
||||
data: any;
|
||||
state: string;
|
||||
failedReason: string | undefined;
|
||||
timestamp: number;
|
||||
processedOn: number | undefined;
|
||||
finishedOn: number | undefined;
|
||||
attemptsMade: number;
|
||||
stacktrace: string[];
|
||||
returnValue: any;
|
||||
ingestionSourceId?: string;
|
||||
error?: any;
|
||||
}
|
||||
|
||||
/**
|
||||
* Holds the count of jobs in various states for a single queue.
|
||||
*/
|
||||
export interface IQueueCounts {
|
||||
active: number;
|
||||
completed: number;
|
||||
failed: number;
|
||||
delayed: number;
|
||||
waiting: number;
|
||||
paused: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides a high-level overview of a queue, including its name and job counts.
|
||||
*/
|
||||
export interface IQueueOverview {
|
||||
name: string;
|
||||
counts: IQueueCounts;
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents the pagination details for a list of jobs.
|
||||
*/
|
||||
export interface IPagination {
|
||||
currentPage: number;
|
||||
totalPages: number;
|
||||
totalJobs: number;
|
||||
limit: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides a detailed view of a specific queue, including a paginated list of its jobs.
|
||||
*/
|
||||
export interface IQueueDetails {
|
||||
name: string;
|
||||
counts: IQueueCounts;
|
||||
jobs: IJob[];
|
||||
pagination: IPagination;
|
||||
}
|
||||
|
||||
// --- API Request & Response Types ---
|
||||
|
||||
/**
|
||||
* Response body for the endpoint that lists all queues.
|
||||
*/
|
||||
export interface IGetQueuesResponse {
|
||||
queues: IQueueOverview[];
|
||||
}
|
||||
|
||||
/**
|
||||
* URL parameters for the endpoint that retrieves jobs from a specific queue.
|
||||
*/
|
||||
export interface IGetQueueJobsRequestParams {
|
||||
queueName: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Query parameters for filtering and paginating jobs within a queue.
|
||||
*/
|
||||
export interface IGetQueueJobsRequestQuery {
|
||||
status: JobStatus;
|
||||
page: string; // Received as a string from query params
|
||||
limit: string; // Received as a string from query params
|
||||
}
|
||||
|
||||
/**
|
||||
* Response body for the endpoint that retrieves jobs from a specific queue.
|
||||
*/
|
||||
export type IGetQueueJobsResponse = IQueueDetails;
|
||||
40
pnpm-lock.yaml
generated
40
pnpm-lock.yaml
generated
@@ -83,6 +83,12 @@ importers:
|
||||
'@azure/msal-node':
|
||||
specifier: ^3.6.3
|
||||
version: 3.6.3
|
||||
'@bull-board/api':
|
||||
specifier: ^6.13.0
|
||||
version: 6.13.0(@bull-board/ui@6.13.0)
|
||||
'@bull-board/express':
|
||||
specifier: ^6.13.0
|
||||
version: 6.13.0
|
||||
'@casl/ability':
|
||||
specifier: ^6.7.3
|
||||
version: 6.7.3
|
||||
@@ -201,12 +207,6 @@ importers:
|
||||
specifier: ^4.1.5
|
||||
version: 4.1.5
|
||||
devDependencies:
|
||||
'@bull-board/api':
|
||||
specifier: ^6.11.0
|
||||
version: 6.11.0(@bull-board/ui@6.11.0)
|
||||
'@bull-board/express':
|
||||
specifier: ^6.11.0
|
||||
version: 6.11.0
|
||||
'@types/archiver':
|
||||
specifier: ^6.0.3
|
||||
version: 6.0.3
|
||||
@@ -652,16 +652,16 @@ packages:
|
||||
resolution: {integrity: sha512-ruv7Ae4J5dUYULmeXw1gmb7rYRz57OWCPM57pHojnLq/3Z1CK2lNSLTCVjxVk1F/TZHwOZZrOWi0ur95BbLxNQ==}
|
||||
engines: {node: '>=6.9.0'}
|
||||
|
||||
'@bull-board/api@6.11.0':
|
||||
resolution: {integrity: sha512-HLbIuXIthrgeVRmN7Vec9/7ZKWx8i1xTC6Nzi//l7ua+Xu5wn6f/aZllUNVzty5ilLTHqWFkfVOwpuN91o7yxA==}
|
||||
'@bull-board/api@6.13.0':
|
||||
resolution: {integrity: sha512-GZ0On0VeL5uZVS1x7UdU90F9GV1kdmHa1955hW3Ow1PmslCY/2YwmvnapVdbvCUSVBqluTfbVZsE9X3h79r1kw==}
|
||||
peerDependencies:
|
||||
'@bull-board/ui': 6.11.0
|
||||
'@bull-board/ui': 6.13.0
|
||||
|
||||
'@bull-board/express@6.11.0':
|
||||
resolution: {integrity: sha512-dYejXl867e3tQElKwUstxzKpkTEJYWy9Cgbw0scYx+MyTvQS7A+gCBVdhdEMGqk3LdUDAzDtvA9cQnCHLC+2Sw==}
|
||||
'@bull-board/express@6.13.0':
|
||||
resolution: {integrity: sha512-PAbzD3dplV2NtN8ETs00bp++pBOD+cVb1BEYltXrjyViA2WluDBVKdlh/2wM+sHbYO2TAMNg8bUtKxGNCmxG7w==}
|
||||
|
||||
'@bull-board/ui@6.11.0':
|
||||
resolution: {integrity: sha512-NB2mYr8l850BOLzytUyeYl8T3M9ZgPDDfT9WTOCVCDPr77kFF7iEM5jSE9AZg86bmZyWAgO/ogOUJaPSCNHY7g==}
|
||||
'@bull-board/ui@6.13.0':
|
||||
resolution: {integrity: sha512-63I6b3nZnKWI5ok6mw/Tk2rIObuzMTY/tLGyO51p0GW4rAImdXxrK6mT7j4SgEuP2B+tt/8L1jU7sLu8MMcCNw==}
|
||||
|
||||
'@casl/ability@6.7.3':
|
||||
resolution: {integrity: sha512-A4L28Ko+phJAsTDhRjzCOZWECQWN2jzZnJPnROWWHjJpyMq1h7h9ZqjwS2WbIUa3Z474X1ZPSgW0f1PboZGC0A==}
|
||||
@@ -5367,23 +5367,23 @@ snapshots:
|
||||
'@babel/helper-string-parser': 7.27.1
|
||||
'@babel/helper-validator-identifier': 7.27.1
|
||||
|
||||
'@bull-board/api@6.11.0(@bull-board/ui@6.11.0)':
|
||||
'@bull-board/api@6.13.0(@bull-board/ui@6.13.0)':
|
||||
dependencies:
|
||||
'@bull-board/ui': 6.11.0
|
||||
'@bull-board/ui': 6.13.0
|
||||
redis-info: 3.1.0
|
||||
|
||||
'@bull-board/express@6.11.0':
|
||||
'@bull-board/express@6.13.0':
|
||||
dependencies:
|
||||
'@bull-board/api': 6.11.0(@bull-board/ui@6.11.0)
|
||||
'@bull-board/ui': 6.11.0
|
||||
'@bull-board/api': 6.13.0(@bull-board/ui@6.13.0)
|
||||
'@bull-board/ui': 6.13.0
|
||||
ejs: 3.1.10
|
||||
express: 5.1.0
|
||||
transitivePeerDependencies:
|
||||
- supports-color
|
||||
|
||||
'@bull-board/ui@6.11.0':
|
||||
'@bull-board/ui@6.13.0':
|
||||
dependencies:
|
||||
'@bull-board/api': 6.11.0(@bull-board/ui@6.11.0)
|
||||
'@bull-board/api': 6.13.0(@bull-board/ui@6.13.0)
|
||||
|
||||
'@casl/ability@6.7.3':
|
||||
dependencies:
|
||||
|
||||
Reference in New Issue
Block a user