Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix(Observable): Fix Observable.subscribe to add operator TeardownLog…
Browse files Browse the repository at this point in the history
…ic to returned Subscription.
PSanetra committed Jan 26, 2019

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 3a11d25 commit 7c61e57
Showing 3 changed files with 40 additions and 12 deletions.
21 changes: 21 additions & 0 deletions spec/Observable-spec.ts
Original file line number Diff line number Diff line change
@@ -5,6 +5,9 @@ import { Observer, TeardownLogic } from '../src/internal/types';
import { cold, expectObservable, expectSubscriptions } from './helpers/marble-testing';
import { map } from '../src/internal/operators/map';
import { noop } from '../src/internal/util/noop';
import { NEVER } from '../src/internal/observable/never';
import { Subscriber } from '../src/internal/Subscriber';
import { Operator } from '../src/internal/Operator';

declare const asDiagram: any, rxTestScheduler: any;
const Observable = Rx.Observable;
@@ -697,6 +700,24 @@ describe('Observable.lift', () => {
}
}

it('should return Observable which calls TeardownLogic of operator on unsubscription', (done) => {

const myOperator: Operator<any, any> = {
call: (subscriber: Subscriber<any>, source: any) => {
const subscription = source.subscribe((x: any) => subscriber.next(x));
return () => {
subscription.unsubscribe();
done();
};
}
};

NEVER.lift(myOperator)
.subscribe()
.unsubscribe();

});

it('should be overrideable in a custom Observable type that composes', (done) => {
const result = new MyCustomObservable<number>((observer) => {
observer.next(1);
2 changes: 1 addition & 1 deletion src/internal/Observable.ts
Original file line number Diff line number Diff line change
@@ -204,7 +204,7 @@ export class Observable<T> implements Subscribable<T> {
const sink = toSubscriber(observerOrNext, error, complete);

if (operator) {
operator.call(sink, this.source);
sink.add(operator.call(sink, this.source));
} else {
sink.add(
this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?
29 changes: 18 additions & 11 deletions src/internal/Subscription.ts
Original file line number Diff line number Diff line change
@@ -168,14 +168,15 @@ export class Subscription implements SubscriptionLike {
}
}

// Optimize for the common case when adding the first subscription.
const subscriptions = this._subscriptions;
if (subscriptions) {
subscriptions.push(subscription);
} else {
this._subscriptions = [subscription];
if (subscription._addParent(this)) {
// Optimize for the common case when adding the first subscription.
const subscriptions = this._subscriptions;
if (subscriptions) {
subscriptions.push(subscription);
} else {
this._subscriptions = [subscription];
}
}
subscription._addParent(this);

return subscription;
}
@@ -197,20 +198,26 @@ export class Subscription implements SubscriptionLike {
}

/** @internal */
private _addParent(parent: Subscription) {
private _addParent(parent: Subscription): boolean {
let { _parent, _parents } = this;
if (!_parent || _parent === parent) {
// If we don't have a parent, or the new parent is the same as the
// current parent, then set this._parent to the new parent.
if (_parent === parent) {
// If the new parent is the same as the current parent, then do nothing.
return false;
} else if (!_parent) {
// If we don't have a parent, then set this._parent to the new parent.
this._parent = parent;
return true;
} else if (!_parents) {
// If there's already one parent, but not multiple, allocate an Array to
// store the rest of the parent Subscriptions.
this._parents = [parent];
return true;
} else if (_parents.indexOf(parent) === -1) {
// Only add the new parent to the _parents list if it's not already there.
_parents.push(parent);
return true;
}
return false;
}
}

0 comments on commit 7c61e57

Please sign in to comment.