Skip to content

Commit

Permalink
refactor: Have consumers unsubscribe when they are done in take and t…
Browse files Browse the repository at this point in the history
…akeWhile
  • Loading branch information
benlesh committed Dec 3, 2022
1 parent b5f35f8 commit d6e2c08
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 71 deletions.
4 changes: 2 additions & 2 deletions spec/observables/dom/webSocket-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -715,11 +715,11 @@ describe('webSocket', () => {
expect(results).to.deep.equal([
'A next',
'B next',
'A complete',
'A unsub',
'A complete',
'B next',
'B complete',
'B unsub',
'B complete',
]);
});
});
Expand Down
4 changes: 2 additions & 2 deletions spec/operators/delay-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ describe('delay', () => {
return delayed;
}),
skip(1),
take(2),
tap({
next() {
const [[subscriber]] = subscribeSpy.args;
Expand All @@ -249,7 +248,8 @@ describe('delay', () => {
complete() {
expect(counts).to.deep.equal([1, 1]);
},
})
}),
take(2)
);

expectObservable(result).toBe(expected);
Expand Down
30 changes: 28 additions & 2 deletions spec/operators/take-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from 'chai';
import { merge, Observable, of, Subject } from 'rxjs';
import { mergeMap, take, tap } from 'rxjs/operators';
import { config, merge, Observable, of, Subject } from 'rxjs';
import { finalize, mergeMap, take, tap } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';

Expand Down Expand Up @@ -250,4 +250,30 @@ describe('take', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});

it('should properly handle reentrancy when taking many values', () => {
const results: any[] = [];
const subject = new Subject<number>();

config.onStoppedNotification = (x) => results.push(x);

merge(subject, of(1))
.pipe(
finalize(() => results.push('finalize source')),
take(5),
tap((n) => subject.next(n + 1)),
finalize(() => results.push('finalize result'))
)
.subscribe({
next: (n) => results.push(n),
complete: () => results.push('done'),
});

// Because this is reentrant, the last "taken" value of 5 is followed by `complete()`
// which shuts off all value emissions. Otherwise, this would be
// [5, 'done', 4, 3, 2, 1, 'finalize source', 'finalize result']
expect(results).to.deep.equal([5, 'done', 'finalize source', 'finalize result']);
});

afterEach(() => (config.onStoppedNotification = null));
});
42 changes: 42 additions & 0 deletions spec/operators/takeWhile-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,48 @@ describe('takeWhile', () => {
});
});

it('should not emit errors sent from the source *after* it found the first value in reentrant scenarios in next', () => {
testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const subject = new Subject();
const source = cold('-------a----b----c---|');
const expected = ' -------a----(b|)';
const subs = ' ^-----------!';

const result = merge(source, subject).pipe(
takeWhile((x) => x !== 'b', true),
tap((x) => {
if (x === 'b') {
subject.error(new Error('reentrant shennanigans'));
}
})
);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

it('should not emit completes sent from the source *after* it found the first value in reentrant scenarios', () => {
testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const subject = new Subject();
const source = cold('-------a----b----c---|');
const expected = ' -------a----(b|)';
const subs = ' ^-----------!';

const result = merge(source, subject).pipe(
takeWhile((x) => x !== 'b', true),
tap((x) => {
if (x === 'b') {
subject.complete();
}
})
);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

it('should not emit errors sent from the source *after* it found the first value in reentrant scenarios', () => {
testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const subject = new Subject();
Expand Down
47 changes: 10 additions & 37 deletions src/internal/operators/take.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,42 +51,15 @@ export function take<T>(count: number): MonoTypeOperatorFunction<T> {
() => EMPTY
: operate((source, subscriber) => {
let seen = 0;
source.subscribe(
createOperatorSubscriber(
subscriber,
(value) => {
// Increment the number of values we have seen,
// then check it against the allowed count to see
// if we are still letting values through.
if (++seen <= count) {
subscriber.next(value);
// If we have met or passed our allowed count,
// we need to complete. We have to do <= here,
// because re-entrant code will increment `seen` twice.
if (count <= seen) {
subscriber.complete();
}
}
},
() => {
// If seen === count or higher, then we've already taken all of
// the values we were supposed to, and the complete we're getting here
// is from reentrant code racing our `complete` above. We want to stop
// that here.
if (seen < count) {
subscriber.complete();
}
},
(err) => {
// If seen === count or higher, then we've already taken all of
// the values we were supposed to, and the error we're getting here
// is from reentrant code racing our `complete` above. We want to stop
// that here.
if (seen < count) {
subscriber.error(err);
}
}
)
);
const operatorSubscriber = createOperatorSubscriber<T>(subscriber, (value) => {
if (++seen < count) {
subscriber.next(value);
} else {
operatorSubscriber.unsubscribe();
subscriber.next(value);
subscriber.complete();
}
});
source.subscribe(operatorSubscriber);
});
}
40 changes: 12 additions & 28 deletions src/internal/operators/takeWhile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,34 +55,18 @@ export function takeWhile<T>(predicate: (value: T, index: number) => boolean, in
export function takeWhile<T>(predicate: (value: T, index: number) => boolean, inclusive = false): MonoTypeOperatorFunction<T> {
return operate((source, subscriber) => {
let index = 0;
// Whether or not we're still taking values.
let taking = true;
source.subscribe(
createOperatorSubscriber(
subscriber,
(value) => {
taking = predicate(value, index++);
if (taking || inclusive) {
// If we're still taking, Or if this was "inclusive", meaning
// we found the "last" value, then we send the value along.
subscriber.next(value);
}
if (!taking) {
// We're no longer taking values after calling the predicate above
// time to complete the result.
subscriber.complete();
}
},
undefined,
(err) => {
// If we're not currently taking values,
// then the code that has reached here is reentrant, and
// racing our `subscriber.complete()` above. We want to stop it.
if (taking) {
subscriber.error(err);
}
const operatorSubscriber = createOperatorSubscriber<T>(subscriber, (value) => {
if (predicate(value, index++)) {
subscriber.next(value);
} else {
operatorSubscriber.unsubscribe();
if (inclusive) {
subscriber.next(value);
}
)
);
subscriber.complete();
}
});

source.subscribe(operatorSubscriber);
});
}

0 comments on commit d6e2c08

Please sign in to comment.