-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
149 lines (137 loc) · 3.84 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
var PROCESSING = 1;
var EOF = 2;
var CLOSING = 4;
var CLOSED = 8;
var CLOSED_CLOSING = CLOSED | CLOSING;
var Queue = function(concurrency) {
var self = this;
if (arguments.length > 1) {
throw new Error('too many arguments');
}
self.concurrency = Queue.parseConcurrency(concurrency);
self.length = 0;
self.running = 0;
self.error = undefined;
self.stopped = false;
self.done = undefined;
self._flags = 0;
self._array = [];
// Using a closure is significantly faster than using Function.bind():
self._callbackBound = function(error) { self._callback(error); };
};
Queue.prototype.concat = function(jobs) {
var self = this;
if (!jobs || jobs.constructor !== Array) {
throw new Error('jobs must be an Array');
}
for (var index = 0, length = jobs.length; index < length; index++) {
self.push(jobs[index]);
}
};
Queue.prototype.end = function(error) {
var self = this;
if (self._flags & EOF) return;
self._flags |= EOF;
if (self._flags & CLOSED_CLOSING) return;
self._tick(error);
};
Queue.prototype.onData = function(job, end) {
throw new Error('Queue.onData callback must be defined');
};
Queue.prototype.onEnd = function(error) {
throw new Error('Queue.onEnd callback must be defined');
};
Queue.prototype.push = function(job) {
var self = this;
if (self._flags & EOF) {
throw new Error('Queue.push() was called after Queue.end()');
}
if (self._flags & CLOSED_CLOSING) return;
self._array.push(job);
self.length++;
if (!(self._flags & PROCESSING)) self._process();
};
Queue.prototype.stop = function(error) {
var self = this;
if (self._flags & CLOSED_CLOSING) return;
// If error is provided, _tick will set self.error and CLOSING.
// If we set CLOSING here, _tick will not set self.error.
if (!error) self._flags |= CLOSING;
self.stopped = true;
self._tick(error);
};
Queue.prototype._callback = function(error) {
var self = this;
if (self._flags & CLOSED) {
throw new Error('an onData handler called end() more than once');
}
self.length--;
self.running--;
self._tick(error);
};
Queue.prototype._process = function() {
var self = this;
if (self._flags & CLOSED_CLOSING) return;
if (self._flags & PROCESSING) return;
self._flags |= PROCESSING;
while (self._array.length) {
if (
(self._flags & CLOSED_CLOSING) ||
(self.running >= self.concurrency)
) {
self._flags &= ~PROCESSING;
return;
}
self.running++;
self.onData(self._array.shift(), self._callbackBound);
}
self._flags &= ~PROCESSING;
};
Queue.prototype._tick = function(error) {
var self = this;
if (self._flags & CLOSED) return;
if (self.done !== undefined) {
throw new Error('deprecated use of `queue.done`');
}
if (error && !(self._flags & CLOSING)) {
self._flags |= CLOSING;
self.error = error;
}
if (self._flags & CLOSING) {
if (self.running === 0) {
// If stop() was called then self.error will be undefined.
// If error was returned then self.error will be defined.
self._flags |= CLOSED;
self.onEnd(self.error);
return;
} else {
return;
}
}
if ((self._flags & EOF) && self.length === 0) {
if (self.running !== 0) {
throw new Error('running=' + self.running + ' !== 0');
}
self._flags |= CLOSED;
self.onEnd(undefined);
return;
}
self._process();
};
Queue.parseConcurrency = function(concurrency) {
var self = this;
if (concurrency === undefined) return 1;
if (concurrency === false) return 1;
if (concurrency === true) return 1024;
if (typeof concurrency !== 'number') {
throw new Error('concurrency must be a number');
}
if (Math.floor(concurrency) !== concurrency) {
throw new Error('concurrency must be an integer');
}
if (concurrency < 1) {
throw new Error('concurrency must be at least 1');
}
return concurrency;
};
module.exports = Queue;