forked from ReactiveX/rxjs
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathobserveOn-spec.ts
127 lines (108 loc) · 4.7 KB
/
observeOn-spec.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import { observeOn, mergeMap } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { expect } from 'chai';
import { hot, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { of, Observable, asapScheduler } from 'rxjs';
declare function asDiagram(arg: string): Function;
declare const rxTestScheduler: TestScheduler;
/** @test {observeOn} */
describe('observeOn operator', () => {
asDiagram('observeOn(scheduler)')('should observe on specified scheduler', () => {
const e1 = hot('--a--b--|');
const expected = '--a--b--|';
const sub = '^ !';
expectObservable(e1.pipe(observeOn(rxTestScheduler))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(sub);
});
it('should observe after specified delay', () => {
const e1 = hot('--a--b--| ');
const expected = '-----a--b--|';
const sub = '^ ! ';
expectObservable(e1.pipe(observeOn(rxTestScheduler, 30))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(sub);
});
it('should observe when source raises error', () => {
const e1 = hot('--a--#');
const expected = '--a--#';
const sub = '^ !';
expectObservable(e1.pipe(observeOn(rxTestScheduler))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(sub);
});
it('should observe when source is empty', () => {
const e1 = hot('-----|');
const expected = '-----|';
const sub = '^ !';
expectObservable(e1.pipe(observeOn(rxTestScheduler))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(sub);
});
it('should observe when source does not complete', () => {
const e1 = hot('-----');
const expected = '-----';
const sub = '^ ';
expectObservable(e1.pipe(observeOn(rxTestScheduler))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(sub);
});
it('should allow unsubscribing early and explicitly', () => {
const e1 = hot('--a--b--|');
const sub = '^ ! ';
const expected = '--a-- ';
const unsub = ' ! ';
const result = e1.pipe(observeOn(rxTestScheduler));
expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(sub);
});
it('should not break unsubscription chains when the result is unsubscribed explicitly', () => {
const e1 = hot('--a--b--|');
const sub = '^ ! ';
const expected = '--a-- ';
const unsub = ' ! ';
const result = e1.pipe(
mergeMap((x: string) => of(x)),
observeOn(rxTestScheduler),
mergeMap((x: string) => of(x))
);
expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(sub);
});
it('should clean up subscriptions created by async scheduling (prevent memory leaks #2244)', (done) => {
//HACK: Deep introspection to make sure we're cleaning up notifications in scheduling.
// as the architecture changes, this test may become brittle.
const results: number[] = [];
// This is to build a scheduled observable with a slightly more stable
// subscription structure, since we're going to hack in to analyze it in this test.
const subscription: any = new Observable<number>(observer => {
let i = 1;
return asapScheduler.schedule(function () {
if (i > 3) {
observer.complete();
} else {
observer.next(i++);
this.schedule();
}
});
})
.pipe(observeOn(asapScheduler))
.subscribe(
x => {
// see #4106 - inner subscriptions are now added to destinations
// so the subscription will contain an ObserveOnSubscriber and a subscription for the scheduled action
expect(subscription._subscriptions.length).to.equal(2);
const actionSubscription = subscription._subscriptions[1];
expect(actionSubscription.state.notification.kind).to.equal('N');
expect(actionSubscription.state.notification.value).to.equal(x);
results.push(x);
},
err => done(err),
() => {
// now that the last nexted value is done, there should only be a complete notification scheduled
// the consumer will have been unsubscribed via Subscriber#_parentSubscription
expect(subscription._subscriptions.length).to.equal(1);
const actionSubscription = subscription._subscriptions[0];
expect(actionSubscription.state.notification.kind).to.equal('C');
// After completion, the entire _subscriptions list is nulled out anyhow, so we can't test much further than this.
expect(results).to.deep.equal([1, 2, 3]);
done();
}
);
});
});