-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.js
108 lines (81 loc) · 2.29 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
const { Duplex } = require('readable-stream')
const endOfStream = require('end-of-stream')
const once = require('once')
const noop = () => {}
const IGNORE_SUBSTREAM = {}
class ObjectMultiplex extends Duplex {
constructor(_opts = {}) {
const opts = Object.assign({}, _opts, {
objectMode: true,
})
super(opts)
this._substreams = {}
}
createStream (name) {
// validate name
if (!name) throw new Error('ObjectMultiplex - name must not be empty')
if (this._substreams[name]) throw new Error('ObjectMultiplex - Substream for name "${name}" already exists')
// create substream
const substream = new Substream({ parent: this, name: name })
this._substreams[name] = substream
// listen for parent stream to end
anyStreamEnd(this, (err) => {
substream.destroy(err)
})
return substream
}
// ignore streams (dont display orphaned data warning)
ignoreStream (name) {
// validate name
if (!name) throw new Error('ObjectMultiplex - name must not be empty')
if (this._substreams[name]) throw new Error('ObjectMultiplex - Substream for name "${name}" already exists')
// set
this._substreams[name] = IGNORE_SUBSTREAM
}
// stream plumbing
_read () {}
_write(chunk, encoding, callback) {
// parse message
const name = chunk.name
const data = chunk.data
if (!name) {
console.warn(`ObjectMultiplex - malformed chunk without name "${chunk}"`)
return callback()
}
// get corresponding substream
const substream = this._substreams[name]
if (!substream) {
console.warn(`ObjectMultiplex - orphaned data for stream "${name}"`)
return callback()
}
// push data into substream
if (substream !== IGNORE_SUBSTREAM) {
substream.push(data)
}
callback()
}
}
class Substream extends Duplex {
constructor ({ parent, name }) {
super({
objectMode: true,
})
this._parent = parent
this._name = name
}
_read () {}
_write (chunk, enc, callback) {
this._parent.push({
name: this._name,
data: chunk,
})
callback()
}
}
module.exports = ObjectMultiplex
// util
function anyStreamEnd(stream, _cb) {
const cb = once(_cb)
endOfStream(stream, { readable: false }, cb)
endOfStream(stream, { writable: false }, cb)
}