-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream-subject.js
134 lines (107 loc) · 2.99 KB
/
stream-subject.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
'use strict'
const Rx = require('rxjs/Rx')
const Subject = Rx.Subject
const AnonymousSubject = require('rxjs/Subject').AnonymousSubject
const Subscriber = Rx.Subscriber
const Subscription = Rx.Subscription
const ReplaySubject = Rx.ReplaySubject
const Observable = Rx.Observable
module.exports = class StreamSubject extends AnonymousSubject {
constructor (optionsOrSource, destination) {
if (destination instanceof Observable) {
super(destination, optionsOrSource)
} else {
super()
this._output = new Subject()
Object.assign(this, optionsOrSource)
this.destination = new ReplaySubject()
}
}
// lift (operator) {
// console.log('lifting', this)
// const subj = new StreamSubject(this, this.destination)
// subj.operator = operator
// return subj
// }
unsubscribe () {
if (this.stream) {
this.stream.destroy && this.stream.destroy()
this.stream.end && this.stream.end()
this.stream = null
}
super.unsubscribe()
if (!this.source) {
this.destination = new ReplaySubject()
}
}
_connectStream () {
const stream = this.createStream()
this.stream = stream
const subscription = new Subscription(() => {
this.stream = null
if (stream) {
stream.destroy && stream.destroy()
stream.end && stream.end()
}
})
const observer = this._output
const createSubscription = () => {
if (this.openObserver) {
this.openObserver.next(this.stream)
}
const ender = () => {
if (this.closingObserver) {
this.closingObserver.next(undefined)
}
this.stream.destroy && this.stream.destroy()
this.destination = new ReplaySubject()
this.stream = null
}
const queue = this.destination
this.destination = Subscriber.create(
(x) => this.stream.writable && this.stream.write(x),
ender,
ender
)
if (queue && queue instanceof ReplaySubject) {
subscription.add(queue.subscribe(this.destination))
}
}
if (this.openEvent) {
stream.on(this.openEvent, createSubscription)
} else {
createSubscription()
}
stream.on('error', (err) => observer.error(err))
stream.on(this.endEvent, () => {
const closeObserver = this.closeObserver
if (closeObserver) {
closeObserver.next(undefined)
}
observer.complete()
})
stream.on(this.dataEvent, (chunk) => {
observer.next(chunk)
})
}
_subscribe (subscriber) {
const source = this.source
if (source) {
return source.subscribe(subscriber)
}
if (!this.stream) {
this._connectStream()
}
const subscription = new Subscription()
subscription.add(this._output.subscribe(subscriber))
subscription.add(() => {
let stream = this.stream
if (stream) {
stream.destroy && stream.destroy()
stream.end && stream.end()
stream = null
}
})
return subscription
}
}