-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
143 lines (123 loc) · 3.28 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
'use strict';
const { PassThrough } = require('stream');
const debug = require('debug')('stream-collect');
/**
* When a listener for an event named collect is added start collecting the data
*/
const addListener = function addListener(name) {
if (name !== 'collect') {
return;
}
if (this._collecting) {
// Don't add more than once
return;
}
debug('adding collect method', this._readableState.objectMode, this._readableState.encoding);
let collected;
if (this._readableState.objectMode) {
collected = [];
} else if (this._readableState.encoding === null) {
collected = Buffer.from('');
} else {
collected = '';
}
this
.on('readable', () => {
let chunk;
while ((chunk = this.read()) !== null) {
debug('data', chunk);
if (chunk !== null) {
if (this._readableState.objectMode) {
collected.push(chunk);
} else if (this._readableState.encoding === null) {
collected = Buffer.concat([collected, chunk]);
} else {
collected += chunk;
}
}
}
})
.on('end', () => this.emit('collect', collected));
this._collecting = true;
};
/**
* Add the collect event to the stream
* @param {Stream} stream
*/
function addToStream(stream) {
// Don't add more than once
if (stream.listeners('addListener').includes(addListener)) {
return stream;
}
stream.on('newListener', addListener);
return stream;
}
/**
* Returns a PassThrough stream with the collect event added and a collect method
*/
class Collect extends PassThrough {
constructor(options) {
super(options);
addToStream(this);
this._collected = null;
}
collect() {
if (!this._collected) {
this._collected = new Promise((resolve, reject) => {
this.on('collect', resolve);
this.on('error', reject);
});
}
return this._collected;
}
/** @depreacted */
then(resolve, reject) {
return this.collect().then(resolve, reject);
}
/** @depreacted */
catch(reject) {
return this.collect().then(null, reject);
}
}
/**
* A CollectStreamObject set to objectMode
*/
class CollectObjects extends Collect {
constructor(options = {}) {
options.objectMode = true;
super(options);
}
}
/**
* Collect all data in a stream and return as a promise or in a callback
* @param {Stream} stream A stream
* @param {String} [encoding] Stream encoding - optional
* @param {Function} cb Callback, the first argument will be the data
* @returns {Promise}
*/
function collect(stream, encoding, cb = () => {}) {
if (typeof encoding === 'function') {
cb = encoding;
encoding = null;
}
return stream
.pipe(new Collect({ encoding, objectMode: stream._readableState.objectMode }))
.collect()
.then((data) => {
cb(null, data);
return data;
})
.catch((e) => {
cb(e);
throw e;
});
}
module.exports = collect;
collect.addToStream = addToStream;
collect.Collect = Collect;
collect.CollectObjects = CollectObjects;
// deprecated names
collect.PassThrough = function (options) { return new Collect(options); };
collect.stream = collect.PassThrough;
collect.PassThroughObject = function (options) { return new CollectObjects(options); };
collect.objectStream = collect.PassThroughObject;