-
-
Notifications
You must be signed in to change notification settings - Fork 368
/
bullMQ.ts
109 lines (93 loc) · 2.73 KB
/
bullMQ.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import { Job, Queue } from 'bullmq';
import {
JobCleanStatus,
JobCounts,
JobStatus,
QueueAdapterOptions,
QueueJobOptions,
Status,
} from '../../typings/app';
import { STATUSES } from '../constants/statuses';
import { BaseAdapter } from './base';
export class BullMQAdapter extends BaseAdapter {
constructor(private queue: Queue, options: Partial<QueueAdapterOptions> = {}) {
const libName = 'bullmq';
super(libName, options);
if (
!(queue instanceof Queue || `${(queue as Queue).metaValues?.version}`?.startsWith(libName))
) {
throw new Error(`You've used the BullMQ adapter with a non-BullMQ queue.`);
}
}
public async getRedisInfo(): Promise<string> {
const client = await this.queue.client;
return client.info();
}
public getName(): string {
return `${this.prefix}${this.queue.name}`;
}
public async clean(jobStatus: JobCleanStatus, graceTimeMs: number): Promise<void> {
await this.queue.clean(graceTimeMs, Number.MAX_SAFE_INTEGER, jobStatus);
}
public addJob(name: string, data: any, options: QueueJobOptions) {
return this.queue.add(name, data, options);
}
public getJob(id: string): Promise<Job | undefined> {
return this.queue.getJob(id);
}
public getJobs(jobStatuses: JobStatus[], start?: number, end?: number): Promise<Job[]> {
return this.queue.getJobs(jobStatuses, start, end);
}
public getJobCounts(): Promise<JobCounts> {
return this.queue.getJobCounts() as unknown as Promise<JobCounts>;
}
public getJobLogs(id: string): Promise<string[]> {
return this.queue.getJobLogs(id).then(({ logs }) => logs);
}
public isPaused(): Promise<boolean> {
return this.queue.isPaused();
}
public pause(): Promise<void> {
return this.queue.pause();
}
public resume(): Promise<void> {
return this.queue.resume();
}
public empty(): Promise<void> {
return this.queue.drain();
}
public async promoteAll(): Promise<void> {
// since bullmq 4.6.0
if (typeof this.queue.promoteJobs === 'function') {
await this.queue.promoteJobs();
} else {
const jobs = await this.getJobs([STATUSES.delayed]);
await Promise.all(jobs.map((job) => job.promote()));
}
}
public getStatuses(): Status[] {
return [
STATUSES.latest,
STATUSES.active,
STATUSES.waiting,
STATUSES.waitingChildren,
STATUSES.prioritized,
STATUSES.completed,
STATUSES.failed,
STATUSES.delayed,
STATUSES.paused,
];
}
public getJobStatuses(): JobStatus[] {
return [
STATUSES.active,
STATUSES.waiting,
STATUSES.waitingChildren,
STATUSES.prioritized,
STATUSES.completed,
STATUSES.failed,
STATUSES.delayed,
STATUSES.paused,
];
}
}