-
-
Notifications
You must be signed in to change notification settings - Fork 153
/
Copy pathmetastream.ts
177 lines (169 loc) · 4.96 KB
/
metastream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import { assert, Fn } from "@thi.ng/api";
import { CloseMode, CommonOpts, ISubscription, State } from "./api";
import { Subscription } from "./subscription";
import { optsWithID } from "./utils/idgen";
export interface MetaStreamOpts extends CommonOpts {
/**
* If true, emits the last received value from the metastream's
* current child stream (if any) when the metastream's parent is
* calling `.done()`.
*
* @defaultValue false
*/
emitLast: boolean;
}
/**
* Returns a {@link Subscription} which transforms each incoming value
* into a new {@link Stream}, subscribes to it (via an hidden / internal
* subscription) and then only passes values from that stream to its own
* subscribers.
*
* @remarks
* If a new value is received, the metastream first unsubscribes from
* any still active stream, before creating and subscribing to the new
* stream. Hence this stream type is useful for cases where streams need
* to be dynamically created & inserted into an existing dataflow
* topology.
*
* The user supplied `factory` function will be called for each incoming
* value and is responsible for creating the new stream instances. If
* the function returns null/undefined, no further action will be taken
* (acts like a filter transducer).
*
* The factory function does NOT need to create *new* streams, but can
* merely return other existing streams, and so making the meta stream
* act like a switch with arbitrary criteria.
*
* If the meta stream itself is the only subscriber to existing input
* streams, you'll need to configure the input's
* {@link CommonOpts.closeOut} option to keep them alive and support
* dynamic switching between them.
*
* @example
* ```ts
* // transform each received odd number into a stream
* // producing 3 copies of that number in the metastream
* // even numbers are ignored
* a = metastream(
* (x) => (x & 1)
* ? fromIterable(tx.repeat(x, 3), { delay: 100 })
* : null
* );
*
* a.subscribe(trace())
* a.next(23)
*
* // 23
* // 23
* // 23
*
* a.next(42) // ignored by factory fn
*
* a.next(43)
* // 43
* // 43
* // 43
* ```
*
* @example
* ```ts
* // infinite inputs
* a = fromIterable(
* tx.repeat("a"),
* { delay: 1000, closeOut: CloseMode.NEVER }
* );
* b = fromIterable(
* tx.repeat("b"),
* { delay: 1000, closeOut: CloseMode.NEVER }
* );
*
* // stream selector / switch
* m = metaStream((x) => x ? a : b);
* m.subscribe(trace("meta from: "));
*
* m.next(true);
* // meta from: a
*
* m.next(false);
* // meta from: b
*
* m.next(true);
* // meta from: a
* ```
*
* @param factory -
* @param id -
*/
export const metaStream = <A, B>(
factory: Fn<A, Subscription<B, B>>,
opts?: Partial<MetaStreamOpts>
) => new MetaStream(factory, opts);
/**
* @see {@link metaStream} for reference & examples.
*/
export class MetaStream<A, B> extends Subscription<A, B> {
factory: Fn<A, Subscription<B, B>>;
stream?: Subscription<B, B>;
sub?: ISubscription<B, B>;
emitLast: boolean;
doneRequested: boolean;
constructor(
factory: Fn<A, Subscription<B, B>>,
opts: Partial<MetaStreamOpts> = {}
) {
super(undefined, optsWithID("metastram", opts));
this.factory = factory;
this.emitLast = opts.emitLast === true;
this.doneRequested = false;
}
next(x: A) {
if (this.state < State.DONE) {
if (this.stream) {
this.stream.unsubscribe(this.sub);
}
let stream = this.factory(x);
if (stream) {
this.stream = stream;
this.sub = this.stream.subscribe({
next: (x) => {
stream === this.stream && super.dispatch(x);
this.doneRequested && this.done();
},
done: () => {
this.stream!.unsubscribe(this.sub);
if (stream === this.stream) {
this.stream = undefined;
this.sub = undefined;
}
},
error: (e) => super.error(e),
__owner: this,
});
}
}
}
done() {
if (this.emitLast && !this.doneRequested) {
this.doneRequested = true;
} else {
if (this.stream) {
this.detach(true);
}
this.closeIn !== CloseMode.NEVER && super.done();
}
}
unsubscribe(sub?: ISubscription<B, any>) {
if (this.stream && (!sub || this.subs.length === 1)) {
this.detach(!sub);
}
return super.unsubscribe(sub);
}
protected detach(force: boolean) {
if (force || this.closeOut !== CloseMode.NEVER) {
assert(!!this.stream, "input stream already removed");
this.stream!.unsubscribe(this.sub);
delete this.stream;
delete this.sub;
}
}
}