Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Input Wanted: Move to using AbortSignal rather than Subscriptions #5545

Closed
benlesh opened this issue Jun 29, 2020 · 20 comments
Closed

Input Wanted: Move to using AbortSignal rather than Subscriptions #5545

benlesh opened this issue Jun 29, 2020 · 20 comments
Assignees

Comments

@benlesh
Copy link
Member

benlesh commented Jun 29, 2020

Now that it looks like Node will be getting AbortController/AbortSignal I think it's time we seriously start planning moving RxJS toward using AbortSignal.

I have done some research and work on a naive implementation here: https://github.com/benlesh/trex

CONS:

  • This will be a huge breaking change that requires a lot of lead time and long planning to execute.
  • AbortSignal uses EventTarget (addEventListener and removeEventListener), which is really gross.
  • The word "Abort" and its variations everywhere is a little unpalatable, IMO, but we didn't really have a say in the naming of the thing, I guess.

PROS:

  • We can drastically simplify internal code and APIs
  • We no longer have to ship a Subscription type
  • We can also simplify external APIs
  • We are being good citizens with the rest of the JS ecosystem

Proposed (Future) API:

Here is where I'd like to go with this:

  1. Observable subscribe and forEach both accept an optional AbortSignal argument.
  2. Observable's initialization function will have a second argument to it that is the AbortSignal.
  3. The (very) slow phase out of Subscription.

Observable initialization function changes:

The idea here would be to pass the signal into the initialization function to allow users to check it directly for whether or not
it had been aborted, and also to allow them to register teardown for the producer.

In the asynchronous case:

const onePerSecond = new Observable((subscriber) => {
  let n = 0;
  const id = setInterval(() => subscriber.next(n++), 1000);
  return () => {
    clearInterval(id);
  }
});

// becomes

const onePerSecond = new Observable((subscriber, signal) => {
  let n = 0;
  const id = setInterval(() => subscriber.next(n++), 1000);
  signal.addEventHandler('abort', () => clearInterval(id));
});

In the synchronous case:

const range1To100 = new Observable((subscriber) => {
  for (let i = 1; i <= 100 && !subscriber.closed; i++) {
    subscriber.next(i);
  }
  subscriber.complete();
});

// becomes

const range1To100 = new Observable((subscriber, signal) => {
  for (let i = 1; i <= 100 && !signal.aborted; i++) {
    subscriber.next(i);
  }
  subscriber.complete();
});

Observable subscribe and forEach functions changes:

Here we can finally move to a subscribe signature that only accepts an observer, and a forEach that accepts the function. Both will be cancellable by virtue of the fact that an AbortSignal can be passed. The only downside I see to this is most people still like to just pass a function, and in the case of an error with forEach(fn) there will be an unhandled rejection if the user does not handle possible errors coming off of it. (honestly, this might be an upside, now that I think about it, forcing people to actually handle errors). forEach might become the more popular method of subscribing to Observable, finally.

Old API:

subscribe(nextHandler: (value: T) => void): Subscription;
subscribe(observer: Partial<Observer<T>>): Subscription;
forEach(nextHandler: (value: T) => void): Promise<void>;

New API:

subscribe(observer: Partial<Observer<T>>, signal?: AbortSignal): void;
forEach(nextHandler: (value: T) => void, signal?: AbortSignal): Promise<void>;

Note that aborting a forEach will result in the returned promise being rejected with an AbortError. That is important so we don't leave promises hanging.

Other thoughts

Most, if not all of these changes could be non-breaking new features. Which I see as great. Both Subscription and AbortSignal could live alongside of each other, and could be converted back and forth. It's not until we decided to move away from Subscription that these would become breaking changes.

What I want in this thread:

Your thoughts, questions, and concerns. I think it would be good to see what we can plan out as far as moving this direction.

@benlesh benlesh changed the title Move to using AbortSignal rather than Subscriptions Input Wanted: Move to using AbortSignal rather than Subscriptions Jun 29, 2020
@benlesh benlesh added AGENDA ITEM Flagged for discussion at core team meetings type: discussion labels Jun 29, 2020
@benlesh
Copy link
Member Author

benlesh commented Jun 29, 2020

cc @benjamingr

@cartant
Copy link
Collaborator

cartant commented Jun 29, 2020

One of my first questions would be: what guarantees does AbortSignal offer regarding the order in which its event listeners are called? (Leaving this here, for myself, as a reminder to think about it later.)

@benjamingr
Copy link
Contributor

I am generally in favor of this and think it's a good way towards a unified API and this makes converting promise APIs to Rx (where appropriate) much easier which is great.

@benjamingr
Copy link
Contributor

Also, let me know if there is anything we (Node) can do to help with this transition.

One of my first questions would be: what guarantees does AbortSignal offer regarding the order in which its event listeners are called? (Leaving this here, for myself, as a reminder to think about it later.)

AbortSignal is an EventTarget, event listeners are guaranteed to be called in order and without duplicates + throwing an error has uncaught exception semantics (mostly) so it's important that internal listeners inside Rx don't throw.

@kwonoj
Copy link
Member

kwonoj commented Jun 30, 2020

This is quite interesting and there could be lot, major changes and discussion would be involved. Just few initial random thoughts unordered.

  • High level, I think we can move forward to this once widely used runtimes can support this.

  • Thinking about past experiences, wide adoption to new version of runtime supports new api takes really long time. If we were to support abortcontroller when node.js have stable api support, we probably have to support old subscription & new abortcontroller both for few major releases at least.

  • I'm in favor of thinking deprecation of fn-based subscribe handler (subscribe(nextHandler: (value: T) => void): Subscription;) separate to this proposal. first, it's unlikely we'll have abortcontroller-only release sooner than other deprecations, secondly having 2 major breaking change in single shot seems quite aggressive. I still hope we could preserve fn based subscribe handler behavior, but regardless we can probably discuss separately and even make prior breaking major for signature change only.

  • I haven't used abortcontroller in real work yet and may miss lot of details, curious if we could make subscribe(nextHandler: (value: T) => void): AbortController (or => abort()) instead of accepting signal as param? maybe I'm not fully aware of idiomatic patterns though

  • what'll be future of subscription.add?

@benlesh
Copy link
Member Author

benlesh commented Jun 30, 2020

once widely used runtimes can support this.

Right now it's supported in every major browser, and node support is in experimental stages. There are already multiple AbortSignal polyfills.

maybe I'm not fully aware of idiomatic patterns though

Idiomatic patterns with controller/signal or other token-based paradigms are that you create a controller, then pass its signal to whatever API you might want to cancel. (See use in fetch, or .NET's CancellationTokenSource (Which is an AbortController by another name) for examples.

@kwonoj
Copy link
Member

kwonoj commented Jun 30, 2020

node support is in experimental stages

yes, this is what I meant about runtime, only node have stable support and it's majorly being used (like node LTS 12, etcs are reaching EOL) we can properly deprecate current subscription.

@benjamingr
Copy link
Contributor

yes, this is what I meant about runtime, only node have stable support and it's majorly being used (like node LTS 12, etcs are reaching EOL) we can properly deprecate current subscription.

Note that it might be obvious but there is a polyfill for old Node versions.

@kwonoj
Copy link
Member

kwonoj commented Jul 1, 2020

there is a polyfill for old Node versions.

Yes, it's my personal take is would like to avoid deprecating existing implementation over new implementation requires polyfill. For stable runtime support I'd choose to support existing implementation for suffecient time - for one side, we took long time to support IE11 and making breaking changes over widely using runtime seems bit opposite move.

@voliva
Copy link
Contributor

voliva commented Jul 2, 2020

I'd like to suggest an alternative for using the AbortController:

I'm not too happy with how this API looks like for Observable. I think it makes total sense for fetch in the DOM API, because it returns a Promise and those are not cancellable by definition, so having the ability to pass in an AbortController signal in one of the options is probably the best alternative:

const controller = new AbortController();
fetch('resource', { signal: controller.signal });

// later on...
controller.abort();

But in the API proposed above, I think it's inconvenient for both producers and consumers when we compare it to the current API:

  • Producers will have to add an event listener to the signal and manage its cleanup once the observable completes/cancels. With the current API, it's much easier because all they need to do is return a clean-up function.
  • Consumers will need to create an AbortController and pass it in through a second parameter when they subscribe if they want to cancel it. With the current API, it's just a bit more convenient because you just receive a Subscription you can call unsubscribe if you need to.

Wouldn't it be better if we made an API that keeps the pros we have with the existing API? What I'd like to suggest is:

const onePerSecond = new Observable((subscriber) => {
  let n = 0;
  const id = setInterval(() => subscriber.next(n++), 1000);
  return () => {
    clearInterval(id);
  }
});

const abortController = onePerSecond.subscribe(...);

// later on
abortController.abort();

To let consumers reuse AbortControllers, maybe it could be done with an operator?

const abortController = onePerSecond.subscribe(...);
range1To100.pipe(
   takeUntilSignal(abortController.signal)
).subscribe(...);

// later on
abortController.abort(); // Will cancel both subscriptions.

If there's more than one takeUntilSignal operator in a stream, then any of those signals will cause that stream to cancel.

Or as an alternative, I think that keeping the signal in subscribe/forEach as an optional parameter also makes sense.

That's just my suggestion. I think that although this is better from a "rxjs library" consumer point of view, we still need to weigh in some possible cons. I'm sure I'm missing lots of internal details on both rxjs and AbortController, so it's quite hard for me to identify them.

@benjamingr
Copy link
Contributor

I'm not too happy with how this API looks like for Observable. I think it makes total sense for fetch in the DOM API, because it returns a Promise and those are not cancellable by definition, so having the ability to pass in an AbortController signal in one of the options is probably the best alternative:

FWIW the reason promises do this is that the language folks at TC39 wanted an invariant for promises where once you have a promise (a subscription in this case) you don't also have the capability of "action at a distance" where you can "abort" the subscription.

In C#, Rx observables (IIRC it's been like 5 years) take a cancel token:

        // copied from SO after googling 
        Observable.Interval(TimeSpan.FromSeconds(0.5), instance)
            .Subscribe(_ => Console.WriteLine(DateTime.UtcNow), cts.Token);

So it's not like there is no prior art :]

This is actually pretty convenient since in most cases the token is just passed along - the inconvenient part (separating observable subscriptions from the ability to cancel them) causes an extra argument to be passed around - I think that's quite intentional.

Personally I would have preferred promise cancellation (with a promise type Task that enables that) + observable cancellation but I think that in this ecosystem Rx using AbortController/AbortSignal makes a lot of sense. Just to address some of the concerns:

Producers will have to add an event listener to the signal and manage its cleanup once the observable completes/cancels. With the current API, it's much easier because all they need to do is return a clean-up function.

Rx can provide sugar for this (allow the user to return a function and make that a listener to abort on the provided signal. But I think it's still pretty short to write and the producer side isn't the hard one for this (usually).

Consumers will need to create an AbortController and pass it in through a second parameter when they subscribe if they want to cancel it.

In most cases I suspect they will just forward abort signals they got from elsewhere rather than create their own - but yes.

const abortController = onePerSecond.subscribe(...);

// later on
abortController.abort();

That's explicitly making observables stronger (but composition harder) since it loses the possibility of knowing the source won't be cancelled (which may be fine).

@voliva
Copy link
Contributor

voliva commented Jul 3, 2020

FWIW the reason promises do this is that the language folks at TC39 wanted an invariant for promises where once you have a promise (a subscription in this case) you don't also have the capability of "action at a distance" where you can "abort" the subscription.

Yep - I also think it's nice that we're moving towards a unified way of semantically representing cancelling "tasks".

However, with my suggestion I'm presenting a way that I feel is more developer-friendly, which is what I think abstractions like Observable should aim for. I agree that the way that Observable interops with the rest of the ecosystem should be by using standard conventions, but that shouldn't stop it from making it easier for developers to use it.

In terms of composability of abort signals, I added a possible example of how can that be addressed by using an API that feels natural to Observables. I’m sure that we can find an API that internally would use AbortController, without having to sacrifice the ergonomics of Observable.

@benjamingr
Copy link
Contributor

I don't think I explained myself well:

  • Making observable's .subscribe return an AbortController is a very significant change because it couples subscriptions with cancellations (like today). It's closer to the old API.
  • In 95% of cases I presume people are going to want to compose tokens rather than create new controllers. I am guessing that in your API structure people are going to have a ton of controller.signal.addEventListener('abort', () => someOtherController.abort())which is a lot more code than just doing.forEach(loop, token)`

I think a lot of code looks like this: (people already using async functions and want to mix cancellation between Rx):

async function doSomething(param, token) {
  const val = await somePromiseOpMaybeFetch(param, { token });
  await getSomeStreamBasedOn(val).forEach((value) => {
     handle(value);
  }, token); // adding the token here is super easy and intuitive, if I get a controller I'm mostly stuck
}

So I guess what I'm saying is that the API Ben is proposing is making it easier for users and harder for library authors

@voliva
Copy link
Contributor

voliva commented Jul 3, 2020

Sorry if I didn't explain myself. I'm also talking from a user point of view, not from I library author point of view. In my original suggestion, I said:

If there's more than one takeUntilSignal operator in a stream, then any of those signals will cause that stream to cancel.
Or as an alternative, I think that keeping the signal in subscribe/forEach as an optional parameter also makes sense.

With that I meant that I already liked the proposal of passing in a signal to subscribe and forEach. So my suggestion already supports your example (because it's one of the points I agree that it doesn't look bad from a user point of view). But at the same time I also propose an additional way of doing it:

async function doSomething(param, token) {
  const val = await somePromiseOpMaybeFetch(param, { token });
  await getSomeStreamBasedOn(val).pipe(
     takeUntilSignal(token)
  ).forEach((value) => {
     handle(value);
  });
}

Which is something that also feels natural within the streams mindset. But again, I think that both ways are interesting, and they are compatible with each other.

I'd like to point out that the advantage on the operator route though, is that you can theoretically compose more signals and it will allow you to represent more complex interactions between streams.

@benjamingr
Copy link
Contributor

Which is something that also feels natural within the streams mindset. But again, I think that both ways are interesting, and they are compatible with each other.

I assumed (I might be wrong) that passing the signal to forEach would implicitly pass the token to each of the operators and propagate automatically. So for example one wouldn't need to pass map the signal.

In the above API - how does cancellation propagate across operators? (Assuming operators need the capability to be aware of this)

@voliva
Copy link
Contributor

voliva commented Jul 3, 2020

Thanks for pointing this out, now I realise I was missing important examples. With my suggestion, we would keep the existing user-oriented API. As an example, let's implement map:

const map =
    <T, R>(mapFn: (value: T) => R) =>
    (source: Observable<T>) =>
        new Observable<R>(obs => source.subscribe({
            next: value => obs.next(mapFn(value)),
            error: obs.error,
            complete: obs.complete
        }))

Notice how this example would work as-it-is with the current API as well. Which it also implies that migration of old code wouldn't be as dramatic.

How does propagation of signals work here with my suggestion? The Observable constructor is receiving the token/signal returned from the inner .subscribe, that it will call to cancel that inner subscription when the next operator triggers the outer signal. All of this is happening internally in the definition of Observable, and so it's kept completely transparent from the user. However, AbortController is still a first-class citizen.

Let's break it down:

  • When pipe builds the stream, map receives source and creates a new Observable(.. instance, let's call it observableInstance from now on.
  • When a new observer subscribes, observableInstance passes that observer to the obs parameter of the inner function.
  • map calls source.subscribe(, receiving an AbortController.
  • It returns this AbortController back to the observableInstance.
  • observableInstance receives that AbortController and returns another AbortController back to the caller of its .subscribe (maybe it could be the same, idk, that would be an implementational detail of Observable)
  • [now everything is working as expected - values are getting mapped]
  • signal is dispatched to cancel the stream.
  • observableInstance receives the signal event
  • observableInstance takes the AbortController returned by the inner function and call .abort() (or maybe if Observable didn't create another AbortController that's already done by default and no action is needed - Again, implementational detail of Observable)
  • this .abort() is triggering the signal of the inner .subscribe(, which will keep propagating the signal to every parent.

It's consistent with how pipe works, in the sense that every operator is creating a new Observable that listens on the observable returned by the operator above. So you'd get this chain of signals by following the same rule.

This way, in general the only pieces that need to deal with AbortController are internal bits of Observable, subscribe and forEach, and building your own operators would keep being fairly easy.

Let's compare implementing an operator a bit more complex dealing with multiple subscriptions, switchMap. With the initial proposal, where new Observable() doesn't accept a cleanup function and you get a signal parameter instead, we would get something like:

const switchMap =
    <T, R>(mapFn: (value: T) => Observable<R>) =>
    (source: Observable<T>) =>
        new Observable<R>((obs, signal) => {
            let innerController = null;
            source.subscribe({
                next: value => {
                    innerController?.abort();
                    innerController = new AbortController();
                    mapFn(value).subscribe({
                        next: obs.next,
                        error: obs.error,
                    }, innerController.signal);
                },
                error: obs.error,
                complete: obs.complete
            }), signal);

            const cleanup = () => {
                signal.removeEventListener('abort', cleanup);
                obs.complete();
                innerController?.abort();
            }
            signal.addEventListener('abort', cleanup);
            /* Q: if `source` completes or errors, does `signal` emit abort?
                If it doesn't, then the listener to `signal` also needs to be cleaned
                up in those cases */
        }

With my suggestion, where cleanup is handled by Observable:

const switchMap =
    <T, R>(mapFn: (value: T) => Observable<R>) =>
    (source: Observable<T>) =>
        new Observable<R>(obs => {
            let innerController = null;
            const outerController = source.subscribe({
                next: value => {
                    innerController?.abort();
                    innerController = mapFn(value).subscribe({
                        next: obs.next,
                        error: obs.error,
                    }, outerController.signal)
                },
                error: obs.error,
                complete: obs.complete
            });
            return outerController;
        )

And I'm aware that these are not a perfect example, because it's something that doesn't directly apply to users (as it's an internal operator of rxjs), but in my projects I've had to build my custom operators, some of them simple and others more complex, and I think it gives a picture how it simplifies most of the work for producers, as they don't have to work with event listeners.

@benjamingr
Copy link
Contributor

benjamingr commented Jul 3, 2020

I am aware that not returning an AbortController makes creating new operators slightly more ergonomic but the fundamental tidbit is whether or not you want to couple RxJS observables with the capability to cancel/abort them - or you want the two to be two separate capabilities.

I think there is a third way (in case RxJS doesn't want to separate the two capabilities like promises did) which is:

  • Keep returning (regular Rx unsubscribe) disposers (initially) from observables
  • Also support AbortSignal in forEach/subscribe

(another random note is RxJS probably doesn't want to create an AbortController for each observable subscription)
(as a random nit: the example can be made shorter with small changes by using the fact event listeners support a {once: true} parameter)

@SanderElias
Copy link

@benlesh Question!
This sync sample:

const range1To100 = new Observable((subscriber, signal) => {
  for (let i = 1; i <= 100 && !signal.aborted; i++) {
    subscriber.next(i);
  }
  subscriber.complete();
});

How would it be possible to trigger the abort signal? The loop will 100% saturate the main event loop right?
What am I missing?
I understand it can be triggered from the inside (whatever is triggered by the next). But looking at this it seems possible to trigger from the outside too?

@cartant
Copy link
Collaborator

cartant commented Aug 13, 2020

@SanderElias The subscriber can signal the abort in the next handler.

@thw0rted
Copy link

thw0rted commented Apr 1, 2021

I've been reading this and the related issues, and just wanted to make sure that somebody considers interactions between the proposed designs and toPromise / firstValueFrom / lastValueFrom. I wrote this recently (for rxjs 6):

function observableToPromise<T>(obs: Observable<T>, signal?: AbortSignal): Promise<T> {
    if (!signal) { return obs.toPromise(); }
    // Reject immediately if the signal has already fired. Use `EmptyError`
    // because that's what `first` will fail with per the note below
    if (signal.aborted) { return Promise.reject(new EmptyError()); }

    const stop = fromEvent(signal, "abort").pipe(take(1));

    // Note that `takeUntil` will cause the observable to complete when the
    // Signal fires, but `first` will fail with EmptyError if there wasn't a
    // value, which will reject out of the returned Promise.
    return obs.pipe(takeUntil(stop), first()).toPromise();
}

I think it does what it says on the tin, and as the comments point out, the returned promise rejects when aborted. I believe the current toPromise resolves with undefined instead of rejecting if it completes early (that's a whole other thread, or maybe several threads) but the value-from methods both reject with EmptyError as well. Does the brave new AbortSignal world mean that callers will have to handle rejections from both EmptyError and AbortError?

@benlesh benlesh removed the AGENDA ITEM Flagged for discussion at core team meetings label Apr 20, 2021
@benlesh benlesh closed this as completed May 4, 2021
@ReactiveX ReactiveX locked and limited conversation to collaborators May 4, 2021

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants