-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathqueue.ts
70 lines (57 loc) · 1.18 KB
/
queue.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
type Deferred = {
fulfil: (value?: any) => void;
reject: (error?: Error) => void;
promise: Promise<any>;
};
export default function queue(max = 4) {
const items: Array<{
fn: () => Promise<any>;
fulfil: (value: any) => void;
reject: (error: Error) => void;
}> = []; // TODO
let pending = 0;
let closed = false;
let fulfil_closed: () => void;
function dequeue() {
if (pending === 0 && items.length === 0) {
if (fulfil_closed) fulfil_closed();
}
if (pending >= max) return;
if (items.length === 0) return;
pending += 1;
const { fn, fulfil, reject } = items.shift();
const promise = fn();
try {
promise.then(fulfil, reject).then(() => {
pending -= 1;
dequeue();
});
} catch(err) {
reject(err);
pending -= 1;
dequeue();
}
dequeue();
}
return {
add(fn: () => Promise<any>) {
if (closed) {
throw new Error(`Cannot add to a closed queue`);
}
return new Promise((fulfil, reject) => {
items.push({ fn, fulfil, reject });
dequeue();
});
},
close() {
closed = true;
return new Promise((fulfil, reject) => {
if (pending === 0) {
fulfil();
} else {
fulfil_closed = fulfil;
}
});
}
};
}