-
-
Notifications
You must be signed in to change notification settings - Fork 368
/
queues.ts
111 lines (96 loc) · 3.19 KB
/
queues.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import {
AppJob,
AppQueue,
BullBoardRequest,
ControllerHandlerReturnType,
JobCounts,
JobStatus,
Pagination,
QueueJob,
Status,
} from '../../typings/app';
import { BaseAdapter } from '../queueAdapters/base';
export const formatJob = (job: QueueJob, queue: BaseAdapter): AppJob => {
const jobProps = job.toJSON();
const stacktrace = jobProps.stacktrace ? jobProps.stacktrace.filter(Boolean) : [];
return {
id: jobProps.id,
timestamp: jobProps.timestamp,
processedOn: jobProps.processedOn,
processedBy: jobProps.processedBy,
finishedOn: jobProps.finishedOn,
progress: jobProps.progress,
attempts: jobProps.attemptsMade,
delay: jobProps.delay,
failedReason: jobProps.failedReason,
stacktrace,
opts: jobProps.opts,
data: queue.format('data', jobProps.data),
name: queue.format('name', jobProps, jobProps.name || ''),
returnValue: queue.format('returnValue', jobProps.returnvalue),
isFailed: !!jobProps.failedReason || (Array.isArray(stacktrace) && stacktrace.length > 0),
};
};
function getPagination(
statuses: JobStatus[],
counts: JobCounts,
currentPage: number,
jobsPerPage: number
): Pagination {
const isLatestStatus = statuses.length > 1;
const total = isLatestStatus
? statuses.reduce((total, status) => total + Math.min(counts[status], jobsPerPage), 0)
: counts[statuses[0]];
const start = isLatestStatus ? 0 : (currentPage - 1) * jobsPerPage;
const pageCount = isLatestStatus ? 1 : Math.ceil(total / jobsPerPage);
return {
pageCount,
range: { start, end: start + jobsPerPage - 1 },
};
}
async function getAppQueues(
pairs: [string, BaseAdapter][],
query: Record<string, any>
): Promise<AppQueue[]> {
return Promise.all(
pairs.map(async ([queueName, queue]) => {
const isActiveQueue = decodeURIComponent(query.activeQueue) === queueName;
const jobsPerPage = +query.jobsPerPage || 10;
const jobStatuses = queue.getJobStatuses();
const status =
!isActiveQueue || query.status === 'latest' ? jobStatuses : [query.status as JobStatus];
const currentPage = +query.page || 1;
const counts = await queue.getJobCounts();
const isPaused = await queue.isPaused();
const pagination = getPagination(status, counts, currentPage, jobsPerPage);
const jobs = isActiveQueue
? await queue.getJobs(status, pagination.range.start, pagination.range.end)
: [];
return {
name: queueName,
description: queue.getDescription() || undefined,
statuses: queue.getStatuses(),
counts: counts as Record<Status, number>,
jobs: jobs.filter(Boolean).map((job) => formatJob(job, queue)),
pagination,
readOnlyMode: queue.readOnlyMode,
allowRetries: queue.allowRetries,
allowCompletedRetries: queue.allowCompletedRetries,
isPaused,
type: queue.type,
};
})
);
}
export async function queuesHandler({
queues: bullBoardQueues,
query = {},
}: BullBoardRequest): Promise<ControllerHandlerReturnType> {
const pairs = [...bullBoardQueues.entries()];
const queues = pairs.length > 0 ? await getAppQueues(pairs, query) : [];
return {
body: {
queues,
},
};
}