Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rstream] Revised error handling logic #281

Closed
9 of 10 tasks
postspectacular opened this issue Mar 9, 2021 · 1 comment
Closed
9 of 10 tasks

[rstream] Revised error handling logic #281

postspectacular opened this issue Mar 9, 2021 · 1 comment

Comments

@postspectacular
Copy link
Member

postspectacular commented Mar 9, 2021

About

The existing error handling logic in thi.ng/rstream has been somewhat peculiar (to say the least) and has been source of various confusions and unexpected outcomes. The #279 PR is aiming to change that (among other things), but I thought it best to use this separate issue here to document the existing problems, as well as the new logic getting implemented:

Subscriptions are state machines

The following diagram illustrates the various states a Subscription (as base type for all other constructs in rstream) can be in:

rstream-error-handling (2)

A subscription starts in an IDLE state, goes into ACTIVE once the first child sub is attached (and vice versa in the child) and then depending on the nature of the upstream source, the presence of errors or forced unsubscribing, eventually ends in one of the ERROR or UNSUBSCRIBED states. The DONE state will only be temporarily active (during the execution of the .done() handler) iff the upstream source is finite and completes naturally.

Child subscriptions can only be added whilst the parent sub is in IDLE or ACTIVE states. Likewise for child subs themselves. In all other cases .subscribe() will throw an illegal state error.

The error handling discussed for the rest of this issue is about how to deal with errors in subscriptions handlers & value processing/transformations. The state machine is just useful to better understand the implications of a sub going into the ERROR state...

Issues w/ existing error handling

The code below creates a simple linear topology (pipeline) of 4 nodes:

Example 1

import { State, stream, Subscription, trace } from "@thi.ng/rstream";

const a = stream<number>();

const a2 = a.subscribe({
  next(x) {
    throw new Error(`eek ${x}`);
  }
});

const a3 = a2.subscribe(trace("a3"));

const a4 = a3.subscribe(trace("a4"));

Recursive error propagation

Once we start pushing a new value into a, the a2.next() handler will receive that value and throw an error. Since a2 itself has no error handler, the error will be caught by the wrapping subscription's own error handler, which then attempts to recursively call error handlers on all of its child subscriptions (which of course is objectively wrong, but the approach did make some sense, depending on use case):

// trigger error
a.next(1);

// output:
// a4 1
// a3 1
// a3 error Error: eek 1
// a4 error Error: eek 1

It's obvious, that this is far from correct behavior and there are various resulting secondary problems arising from that. In short, this is a limitation/result of the way child subscriptions have been implemented so far... The a3 & a4 subs have nothing directly to do with the error in a2, yet their (optional) error handlers are being invoked recursively. Even worse, there's no logic to ensure an error is only handled once (e.g. by the first downstream handler found) and there's also currently no provision to detect if an error handler could successfully respond to (or recover from) a given error... This causes all downstream nodes (from a2 onwards) to go into the ERROR state and is then stopping them from receiving any future values...

const state = (sub: Subscription<any, any>) => State[sub.getState()];

// only `a` itself is still usable now...
console.log(state(a), state(a2), state(a3), state(a4));
// ACTIVE ERROR ERROR ERROR

Transducer error handling

Errors can not just be caused during execution of a sub's next() handler, but also already whilst executing a sub's optional transducer, e.g. here replacing the above a2 with and yielding the same error behavior as above.

const a2 = a.transform<number>(map((x) => { throw x * 10; }));

Teardown with/without error

Depending on each subscription's configuration (via closeIn, closeOut options), triggering an error state can (and with default options it should) cause a recursive partial teardown of the upstream data flow branch the sub in question belongs to. At the very least, a subscription detaches itself (via .unsubscribe()) upon entering the error state.

The closeOut mode of a parent sub dictates the parent's behavior when a child sub unsubscribes. By the default, the parent itself unsubscribes from its parent once the last child has disconnected (i.e. CloseMode.LAST). However, in the current implementation this behavior is not fully respected.

With logging enabled, this teardown logic becomes more visible:

Example 2

setLogger(new ConsoleLogger("rstream"));

const a = stream<number>({ id: "a" });
const a2 = a.subscribe({ next(x) { throw x; } }, { id: "a2" });

// dummy subs (no user error handlers given)
const a3 = a2.subscribe({}, { id: "a3" });
const a4 = a3.subscribe({}, { id: "a4" });

// trigger
a.next(1);

// [WARN] rstream: a4 unhandled error: 1
// [DEBUG] rstream: a4 unsubscribing...
// [DEBUG] rstream: a4 unsub start self
// [DEBUG] rstream: a3 unsub start a4
// [DEBUG] rstream: a3 unsub child a4
// [DEBUG] rstream: a4 cleanup

For comparison, replacing the above line a.next(1) with a4.unsubscribe() yields similar (minus the error handling output). In both cases, only a4's parent (a3) is impacted, but doesn't propagate further (wrong!). Note that this doesn't impact the logical behavior of the entire a...a4 stream construct (a2,a3,a4 wont be receiving any future values), but the point of teardown is resource/memory usage and this isn't handled properly (but already fixed by #279).

// [DEBUG] rstream: a4 unsub start self
// [DEBUG] rstream: a3 unsub start a4
// [DEBUG] rstream: a3 unsub child a4
// [DEBUG] rstream: a4 cleanup

Revised error handling logic

The changes introduced in #279 are addressing all of the above issues:

  • make error handling more understandable & predictable (WRT to the scope of responsible error handlers)
  • allow error handlers to indicate (un)successful handling (via boolean return value)
  • avoid the recursive (and wrong) depth-first propagation of errors
  • provide a new option for transducer error handling (i.e. for .transform() child subs)
  • simplify/fix the recursive teardown logic related to .unsubscribe() (due to errors or without)

Using the same code from Example 2 above, the new behavior produces the following output:

// [DEBUG] rstream: a dispatch 1
// [DEBUG] rstream: a2 dispatch 1
// [WARN] rstream: a2 unhandled error: 1
// [DEBUG] rstream: a2 unsub self
// [DEBUG] rstream: a unsub child a2
// [DEBUG] rstream: a unsub self
// [DEBUG] rstream: a cancel

As shown, due to a2's lack of user provided error handler, the wrapping subscription will catch the error, put a2 into ERROR state and then unsubscribe a2 from its parent (a). Because a2 was a's only child, a itself will shutdown (due to default CloseMode.LAST). The result states for all 4 subs are:

a: UNSUBSCRIBED a2: ERROR a3: ACTIVE a4: ACTIVE

Error handlers

The most important aspect of the new logic is that only ever a single error handler will be used now and it's always only either the user provided handler or, as fallback, that of the wrapping Subscription type. The above result states also show that only a2 is put into the ERROR state, essentially constituting a cut of the data flow.

Furthermore, error handlers MUST return a boolean result to indicate if the error could be successfully handled/recovered from (true) or failed to do so (false). In the latter case, the wrapping sub's own error handler will be used and then trigger the sub going into the ERROR state. If the (user) handler returns true, the sub remains ACTIVE.

reactive(42).subscribe({
  next(x) { throw x; },
  error(e) { console.warn("eek", e); return true; }
});

// [DEBUG] rstream: stream-0 dispatch 42
// [DEBUG] rstream: $sub-1 dispatch 42
// [DEBUG] rstream: $sub-1 attempting wrapped error handler
// eek 42

Transducer error handling

For transducer-only subscriptions created via .transform() or .map() an optional error handler can be provided via the final options object arg:

reactive(42).map((x) => { throw x; }, { error(e) { console.warn("eek", e); return true; } })
// [DEBUG] rstream: stream-4 dispatch 42
// [DEBUG] rstream: xform-5 attempting wrapped error handler
// eek 42

The .map() and .transform() methods are merely syntax sugar for this type of .subscribe() call:

reactive(42).subscribe(
  // error handler-only child sub
  { error(e) { console.warn("eek", e); return true; } },
  // possibly composed transducer given as part of options
  { xform: map((x) => { throw x; }) }
);

True recursive teardown

Regarding recursive upstream teardown, again replacing a.next(1) with a4.unsubscribe() will teardown the entire dataflow pipeline (in this case):

// [DEBUG] rstream: a4 unsub self
// [DEBUG] rstream: a3 unsub child a4
// [DEBUG] rstream: a3 unsub self
// [DEBUG] rstream: a2 unsub child a3
// [DEBUG] rstream: a2 unsub self
// [DEBUG] rstream: a unsub child a2
// [DEBUG] rstream: a unsub self
// [DEBUG] rstream: a cancel

Result states:

a: UNSUBSCRIBED a2: UNSUBSCRIBED a3: UNSUBSCRIBED a4: UNSUBSCRIBED

The teardown also works in opposite direction, e.g. here triggering a.done() in place of a4.unsubscribe(). First .done() is invoked in a depth-first manner, which then causes the same recursive upstream teardown once the last node in the pipepline (a4) has been reached...

// [DEBUG] rstream: a cancel
// [DEBUG] rstream: a entering done()
// [DEBUG] rstream: a2 entering done()
// [DEBUG] rstream: a3 entering done()
// [DEBUG] rstream: a4 entering done()
// [DEBUG] rstream: a4 unsub self
// [DEBUG] rstream: a3 unsub child a4
// [DEBUG] rstream: a3 unsub self
// [DEBUG] rstream: a2 unsub child a3
// [DEBUG] rstream: a2 unsub self
// [DEBUG] rstream: a unsub child a2
// [DEBUG] rstream: a unsub self
// [DEBUG] rstream: a4 exiting done()
// [DEBUG] rstream: a3 exiting done()
// [DEBUG] rstream: a2 exiting done()
// [DEBUG] rstream: a exiting done()

Result states:

a: UNSUBSCRIBED a2: UNSUBSCRIBED a3: UNSUBSCRIBED a4: UNSUBSCRIBED

Structural internal differences

There're several key interfaces used in thi.ng/rstream related to subscriptions:

  • ISubscriber - describes the shape of a raw user subscription object of {next, error, done} handlers. Such an object can be given to .subscribe(), but will always be wrapped in a real ISubscription instance
  • ISubscription - the name of the interface describing key aspects of a Subscription, i.e. a combined interface of IDeref, ISubscriber, ISubscribable, ITransformable. This interface is fully implemented by the Subscription class, which also acts as base class for other subscription types (i.e. Stream, StreamSync, PubSub etc.)
  • ISubscribable - merely describes the contract of the various signatures of the .subscribe() method
  • ITransformable - describes the contract of the various sigs of the .transform() method

The current implementation of Subscription doesn't differentiate between an optionally given ISubscriber (which is wrapped by this subscription instance) and other child subscriptions attached via .subscribe(). It's primarily this lack of distinction which is responsible for the semi-erroneous behavior(s) described in the beginning.

In the rewritten Subscription (a9e4040), a wrapped ISubscriber is handled (and stored) separately from other child subs, allowing for all the improvements listed above.

This new distinction also enables user error handling of stream source constructs and the stream() factory function now supports an optional error handler:

const a = stream(
  // stream source function
  (s) => { s.next(1); throw 2; },
  // options w/ err handler
  { error(e) { console.log("eeek", e); return false; } }
);
// stream only starts when first sub is attached
const a2 = a.subscribe(trace("a2"));

// [DEBUG] rstream: stream-0 dispatch 1
// [DEBUG] rstream: $sub-1 dispatch 1
// a2 1
// eeek 23
// [WARN] rstream: stream-0 unhandled error: 23
// [DEBUG] rstream: stream-0 unsub self

a.getState() === State.ERROR
// true

Summary

The diagrams hopefully illustrate the differences between the old and new approaches. In both cases, we use code from Example 1 (with an additional sibling of a3) and show the states after an uncaught error occurred in a2:

Old New
rstream-error-handling rstream-error-handling (4)

The lack of connections between a3 -> a4 (in the old) and a -> a2 (in the new version) indicate that the connection between nodes/subs has been severed...


  • update above examples & log output
  • explain differences
    • wrapped subscribers
    • error handling responsibility
    • error propagation
    • teardown logic
  • new features (e.g. error handler for .transform() subs)
  • add diagrams
  • FSM details
  • write documentation
postspectacular added a commit that referenced this issue Mar 9, 2021
- add `ErrorHandler` type, update to return boolean
- update `ISubscribable`, `ITransformable` to only refer to `ISubscription`
  interface (rather than `Subscription` class itself)
- refactor `Subscription.next()`, add `.dispatchXform()`
- update various error handlers (add return values)
- update tests
postspectacular added a commit that referenced this issue Mar 10, 2021
- replace old `Subscription` class w/ what was recently `Sub2` (removed)
- update/fix done(), subscribe()/unsubscribe() logic
- update related constructs (Stream, StreamSync, MetaStream, etc.)
- update Stream ctor (and factory fns) to support error handler opts arg
- update Timeout error dispatch
- fix typehints
@postspectacular
Copy link
Member Author

Closing for now, since already implemented & published. Feel free to reopen or create new issue(s)...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant