Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/5487 reentrancy in take, takeWhile, and first #6396

Merged
merged 6 commits into from
Mar 6, 2023

Conversation

benlesh
Copy link
Member

@benlesh benlesh commented May 10, 2021

  • fix(first/take): behave properly with reentrant errors and completions

    • Resolves an issue with both first and take where an error emitted by a reentrant source at the moment the last value it taken would result in superceding the expected emitted value and completion.
    • Resolves a similar issue where a reentrant completion would supercede the expected last value and completion

    Fixes first/tap operators should unsubscribe before emit next value #5487

  • fix(takeWhile): reentrant errors and completions behave properly

    • Resolves an issue where a reentrant error notification would short circuit the completion.
    • Adds additional tests.

    Related first/tap operators should unsubscribe before emit next value #5487

  • chore: Add reentrant error test for takeUntil

@benlesh benlesh requested a review from cartant May 10, 2021 23:55
@benlesh benlesh changed the title Fix/5487 reentrancy in take et al Fix/5487 reentrancy in take, takeWhile, and first May 10, 2021
@benlesh
Copy link
Member Author

benlesh commented May 11, 2021

Note: also fixes elementAt (because it uses take).

// that here.
if (seen < count) {
subscriber.error(err);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of interest: is there any particular reason, why blocking notifications over unsubscribing before the last next notification was used?

(besides the failing delayWhen test)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late response. I'm not sure I understand your question here.

src/internal/operators/takeWhile.ts Outdated Show resolved Hide resolved
@cartant
Copy link
Collaborator

cartant commented May 11, 2021

I don't believe that the changes made to take are the right ones. The changes don't solve all of the problems. (I've not looked at the changes made to takeWhile, but if they just ignore notifications, I suspect they have the same problems as those made to take.)

I mentioned that the primary reason I consider the behaviour described in the issue to be a bug is because it does not behave as I'd expect with side effects. Specifically, with upstream side effects. I would expect take to do what it says and take the specified number of values from the upstream source. But in the reentrant scenario, it does not. Consider this code:

const subject = new BehaviorSubject<number>(1);
subject
  .pipe(
    tap((value) => console.log(`side effect: ${value}`)),
    take(1),
    tap((value) => subject.next(value + 1))
  )
  .subscribe((value) => console.log(`output: ${value}`));

This will output:

side effect: 1
side effect: 2
output: 1

So it doesn't take one value from the upstream source; it takes two. And the snippet could be tweaked to make it take many, many more.

This problem - and the problem in the original issue - can be solved by having the operator explicitly unsubscribe itself:

operate((source, subscriber) => {
  let seen = 0;
  const operatorSubscriber = new OperatorSubscriber(
    subscriber,
    (value) => {
      if (++seen >= count) {
        operatorSubscriber.unsubscribe();
        subscriber.next(value);
        subscriber.complete();
      } else {
        subscriber.next(value);
      }
    }
  );
  source.subscribe(operatorSubscriber);
});

I don't see why this wouldn't be a suitable solution:

  • A subscriber explicitly unsubscribing itself is something that already happens in the codebase:

    const subscriber = new SafeSubscriber<T>({
    next: (value) => {
    resolve(value);
    subscriber.unsubscribe();
    },

  • In numerous places - e.g. in share - deliberate actions are taken before notifying destinations.

    if (resetOnComplete) {
    reset();
    }
    dest.complete();

  • A general principle in the codebase is that resources should be released as soon as possible. With take, as soon as the specified number of values have been taken, there is nothing more that's needed from the source. After that, no notifications from the source have any meaning, so there is no reason that I can see for the operator to not unsubscribe immediately - releasing the resource.

  • There are other operators that continue emitting notifications after unsubscribing from their sources. All of the flattening and higher-order operators can do this.

  • There's only a small number of operators that have this behaviour - where something other than a complete or error notification means that their interest in listening to their source ends: elementAt, find, findIndex, isEmpty, take, takeUntil and takeWhile.

    IMO, each of these operators should unsubscribe from its source as soon as its specific condition is met and it has no further interest in source notifications.

The change suggested above would effect different behaviour, but the change in this PR will alter behaviour anyway.

@cartant
Copy link
Collaborator

cartant commented May 11, 2021

Hmm, having written this ☝ I've thought of a reason why it's not a good idea. For observable sources that emit values that are dependent upon the lifecycle of the source, this won't work - as the source would be torn down before the last taken value is nexted. We should probably add a test for this scenario, if it's something that we want to make sure we don't break.

Copy link
Contributor

@Enuvid Enuvid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cartant @benlesh @backbone87

Also i am not sure that fix of this problem right now will not affect already written code which can depend on this behaviour. Seems that is a Breaking Change.

I mean do 'unsubscribe source before emit last value next'. So maybe make sense to fix it in major release.

@backbone87
Copy link
Contributor

backbone87 commented May 11, 2021

I agree with (almost) everything @cartant wrote.

For observable sources that emit values that are dependent upon the lifecycle of the source

But this kind of observable would not work with a lot of operators. besides that, one could argue that it is bad practice to tie the lifecycle of values to the lifecycle of the observable.

Specifically, with upstream side effects. I would expect take to do what it says and take the specified number of values from the upstream source.

i dont think this guarantee can be made or should be made. notification behavior can only be guaranteed downstream. how upstream is handled is not part of the observable contract, imho. also according to http://reactivex.io/documentation/contract.html (if this is still a reference) it is perfectly fine for an observable to notify after unsubscribe.

edit: I still favor @cartant's version of the fix

@benlesh
Copy link
Member Author

benlesh commented May 11, 2021

Still thinking about this solution.

With @cartant's solution, I'm not sure how finalize would behave, since it's tied to unsubscription/teardown. What would happen then, in this case?

const subject = new Subject<number>();
merge(of(1), subject).pipe(
  finalize(() => console.log('finalize 1')),
  take(1),
  finalize(() => console.log('finalize 2')),
  tap(() => subject.next(2))
)
.subscribe({
  next: console.log,
  complete: () => console.log('done')
})

I think I'd expect:

1
done
finalize 1
finalize 2

But with @cartant's fix I think we'd get:

finalize 1
1
done
finalize 2

@benlesh
Copy link
Member Author

benlesh commented May 11, 2021

Yeah... unsubscribe() is the wrong tool here. We're not interested in tearing down until after complete (as that's what we always do).. what we want to do is make it isStopped, but we don't have anything exposed for that.

So what we'd want would be more something like:

operate((source, subscriber) => {
  let seen = 0;
  const operatorSubscriber = new OperatorSubscriber(
    subscriber,
    (value) => {
      if (++seen >= count) {
        operatorSubscriber.stop(); // New thing that flags `isStopped` up the chain, but doesn't teardown
        subscriber.next(value);
        subscriber.complete();
      } else {
        subscriber.next(value);
      }
    }
  );
  source.subscribe(operatorSubscriber);
});

... and I think the added complexity of the "proper fix" here is too high for the payoff, given I don't think this issue is common and honestly it reads like a code smell more than anything.

Longer term, this quirk is sort of an argument in favor of scheduling all the things. It's not something I like at all, TBH. But it would solve this problem extremely cleanly and make it all predictable. (I really don't like it, but it's an argument in favor of that, IMO)

@kwonoj
Copy link
Member

kwonoj commented May 11, 2021

Separate to discussions about scheduling or not - synchronous reentrant in rxjs is always undefined behavior and we discourage to do it. Is there specifics around this bug need to be treated differently? or will we trying to fix other synchronous reentrant?

@benlesh
Copy link
Member Author

benlesh commented May 11, 2021

FWIW: I think @cartant and I are in violent agreement:

Hmm, having written this ☝ I've thought of a reason why it's not a good idea. For observable sources that emit values that are dependent upon the lifecycle of the source, this won't work - as the source would be torn down before the last taken value is nexted. We should probably add a test for this scenario, if it's something that we want to make sure we don't break.

@backbone87
Copy link
Contributor

backbone87 commented May 11, 2021

i dont think that

finalize 1
1
done
finalize 2

is an unexpected result. from the upstream's points of view, that is exactly what should happen, no? from downstream we dont care. as I understand take's semantics: take at most 2 next notifications and then complete (immediately). there is no error condition after 2 next notifications were received. pipelines/operators are not transparent to subscribers, they dont give any guarantees, when or if they subscribe to something upstream or when or if they unsubscribe from something upstream. the subscriber can not know.

do we consider these 2 statements as correct?

  • from downstream's point of view operators are barriers for notifications from upstream (next, complete, error)
  • from upstream's point of view operators are barriers for notifications from downstream (subscribe, unsubscribe)

@benlesh
Copy link
Member Author

benlesh commented May 12, 2021

finalize 1
1
done
finalize 2

It is completely unexpected. 1 and done should happen before finalize 1. Finalization should always happen after completion. Not before.

I'm still thinking about this.

@benlesh
Copy link
Member Author

benlesh commented May 12, 2021

Yeah... hmm.. in RxJS 6 and 7 (not sure about others), this is all kinds of weird/wrong:

let n = 0;

const subject = new Subject<number>();

merge(subject, of(n++))
  .pipe(
    finalize(() => {
      console.log('finalized');
    }),
    take(10),
    tap(() => subject.next(n++))
  )
  .subscribe({
    next: console.log,
    complete: () => console.log('done')
  });
  
// outputs 
// 9
// done
// finalized

@benlesh
Copy link
Member Author

benlesh commented May 12, 2021

Okay... after some thought, and discussion with @cartant, I think the correct thing to do is to have it unsubscribe as soon as it knows it's done.

The principle behind this is simply that:

  1. All operators create observables that are the consumers of a source.
  2. The consumer should unsubscribe as soon as it knows it no longer wants any values from the source.

That said... this is a BREAKING CHANGE and we cannot have it until version 8.

@benlesh benlesh added the 8.x Issues and PRs for version 8.x label May 12, 2021
// Because this is reentrant, the last "taken" value of 5 is followed by `complete()`
// which shuts off all value emissions. Otherwise, this would be
// [5, 'done', 4, 3, 2, 1, 'finalize source', 'finalize result']
expect(results).to.deep.equal([5, 'done', 'finalize source', 'finalize result']);
Copy link
Contributor

@backbone87 backbone87 May 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one is weird. is this considered expected/ok behavior? guess the solution would be to subject.pipe(observeOn(asapScheduler))

spec/operators/takeWhile-spec.ts Show resolved Hide resolved
});
});

it('should not emit errors sent from the source *after* it found the first value in reentrant scenarios', () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does not fail on master, but idk how to even test this one

spec/operators/takeWhile-spec.ts Show resolved Hide resolved
@benlesh benlesh force-pushed the fix/5487-reentrancy-in-take-et-al branch from 23fb964 to 27a23c3 Compare January 4, 2022 14:24
@benlesh benlesh force-pushed the fix/5487-reentrancy-in-take-et-al branch from 27a23c3 to d6e2c08 Compare December 3, 2022 18:59
@benlesh benlesh force-pushed the fix/5487-reentrancy-in-take-et-al branch from d6e2c08 to f684e9a Compare January 20, 2023 03:11
- Resolves an issue where a reentrant error notification would short circuit the completion.
- Adds additional tests.

Related ReactiveX#5487

BREAKING CHANGE: If a the source synchronously errors after it recieves a completion notification, the error will no longer be emitted. This is a bug fix, but may be a breaking change for those relying on this behavior. If you need to mimic the behavior, you'll need to throw the error before the takeWhile notifier is notified.
- Resolves an issue with both `first` and `take` where an error emitted by a reentrant source at the moment the last value it taken would result in superceding the expected emitted value and completion.
- Resolves a similar issue where a reentrant completion would supercede the expected last value and completion

Fixes ReactiveX#5487

BREAKING CHANGE: If a the source synchronously errors after `take` or `first` notice a completion, the error will no longer be emitted. This is a bug fix, but may be a breaking change for those relying on this behavior. If you need to mimic the behavior, you'll need to throw the error before `take` or `first` notice the completion.
- Removes a function that had no real purpose, as the only check in it was impossible to hit.
- Adds some comments.
- adds test, fixed by changes to `take`.

Related ReactiveX#5487

BREAKING CHANGE: If a the source synchronously errors after it recieves a completion from `elementAt`, the error will no longer be emitted. This is a bug fix, but may be a breaking change for those relying on this behavior. If you need to mimic the behavior, you'll need to throw the error before `elementAt` finds the element at the index.
@benlesh benlesh force-pushed the fix/5487-reentrancy-in-take-et-al branch from f684e9a to bf72a43 Compare January 20, 2023 03:40
Copy link
Member

@jakovljevic-mladen jakovljevic-mladen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@benlesh benlesh merged commit 79314f1 into ReactiveX:master Mar 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
8.x Issues and PRs for version 8.x
Projects
None yet
Development

Successfully merging this pull request may close these issues.

first/tap operators should unsubscribe before emit next value
6 participants