-
Notifications
You must be signed in to change notification settings - Fork 42
/
Copy pathstream.js
67 lines (60 loc) · 1.78 KB
/
stream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/* jshint node:true */
'use strict';
const Transform = require('stream').Transform;
class Stream extends Transform {
constructor(parser) {
super();
this._parser = parser;
this._options = parser._options;
this._headers = this._options.headers || [];
this._hasHeaders = false;
this._lastError = null;
}
_mergeChunk(chunk) {
let self = this;
self._extra = self._extra || "";
// Remove starting comma
if (self._extra.charAt(0) == ',') self._extra = self._extra.substr(1);
// Append extra to chunk
chunk = self._extra + chunk.toString();
// Clear extra memory
if (self._extra.length > 0) self._extra = "";
return chunk;
}
_wrapArray(data) {
if (data.charAt(0) != '[') data = '[' + data;
if (data.charAt(data.length - 1) != ']') data += ']';
return data;
}
_transform(chunk, encoding, done) {
let self = this;
let json = null;
// Append extra data to chunk data
chunk = this._mergeChunk(chunk);
if (!chunk) return done(this._lastError);
// Split chunk in objects
let parts = chunk.split('}');
while (json === null && parts.length > 0) {
try {
let data = self._wrapArray(parts.join('}'));
json = JSON.parse(data);
} catch (ex) {
this._lastError = ex;
let extraChunk = parts.pop();
self._extra = extraChunk + (self._extra || "");
if (parts.length > 0) parts[parts.length - 1] += "}";
}
}
if (!json) return done();
this._parser.parse(json, (err, csvChunk) => {
if (err) return done(err);
if (!self.hasHeaders) {
self.hasHeaders = true;
self.push(self._parser.headers);
}
self.push(self._options.endOfLine + csvChunk);
done();
}, true);
}
}
module.exports = Stream;