forked from transduce/transduce-stream
-
Notifications
You must be signed in to change notification settings - Fork 0
/
transduce-stream.js
57 lines (49 loc) · 1.39 KB
/
transduce-stream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
"use strict"
var util = require('util'),
Transform = require('stream').Transform
module.exports = TransduceStream
var streamTransformer = {}
streamTransformer['@@transducer/init'] = function(){}
streamTransformer['@@transducer/step'] = function(stream, item){
if(!stream._destroyed){
stream.push(item)
}
return stream
}
streamTransformer['@@transducer/result'] = function(stream){return stream}
util.inherits(TransduceStream, Transform)
function TransduceStream(transducer, options){
if(!(this instanceof TransduceStream)){
return new TransduceStream(transducer, options)
}
Transform.call(this, options)
this._transformXf = transducer(streamTransformer)
}
TransduceStream.prototype._transform = function(chunk, enc, cb){
if(!this._destroyed){
var stream = this._transformXf['@@transducer/step'](this, chunk)
if(stream && stream['@@transducer/reduced']){
this._transformXf['@@transducer/result'](this)
this.destroy()
}
}
cb()
}
TransduceStream.prototype._flush = function(cb){
if(!this._destroyed){
this._transformXf['@@transducer/result'](this)
this.destroy()
}
cb()
}
// from https://github.com/rvagg/through2
TransduceStream.prototype.destroy = function(err){
if(!this._destroyed){
this._destroyed = true
var self = this
process.nextTick(function(){
if(err) self.emit('error', err)
self.end()
})
}
}