Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rstream] - Reducing complexity of the subscribe method #246

Closed
allforabit opened this issue Aug 29, 2020 · 6 comments
Closed

[rstream] - Reducing complexity of the subscribe method #246

allforabit opened this issue Aug 29, 2020 · 6 comments

Comments

@allforabit
Copy link
Contributor

Currently the subscribe method for subscriptions and streams is quite complicated. I'm wondering if it could be simplified? I always find it hard to remember exactly what it accepts (maybe that's just me!!).

At the moment it's used for 3 distinct things:

  • To transform a stream using a transducer pipeline
  • To transform a stream using a subscription operator (e.g. resolve)
  • To actually do the subscription and kick off the reactive flow

Since the initial version of the package, a transform method has been added so this covers the supplying a transducer pipeline use case. Would it be best to remove this as an option? I know it's a fairly foundational package and this might cause too much of a ripple effect.

I think it would be good to have another method that accepted a number of subscription operations to cover the case of transforming a stream with subscription operators. Rxjs has a method pipe for this. It works a little differently than how subscribe works in rstream in that it is lazy and won't cause upstream subscriptions to fire. This only happens when the subscribe method is invoked.

What do you think @postspectacular ? If you think it's worth exploring I could do a POC.

@postspectacular
Copy link
Member

Hey @allforabit - I do agree that .subscribe() is somewhat more complex than it should be and I'm sorry you're finding it confusing, but IMHO it looks more scary due to the overrides and generics required for the different ways of adding a subscription.

You're right that there're 3 forms of adding a subscription, but I'm not agreeing (or rather understanding) your 3rd option (more on that below). In my mind the 3 forms are as illustrated below (sandbox):

  1. adding a subscription (either a Subscription sub-class or wrapping an ISubscriber object)
  2. adding a transducer as subscription
  3. combining form 1 w/ attached transducer
import { reactive, subscription, trace } from "@thi.ng/rstream";
import { filter, map, mapcat, repeat, sideEffect } from "@thi.ng/transducers";

// pre-seeded stream/subscription
// (everything in rstream is a Subscription, one way or another)
const a = reactive(1);

// form 1a: wrap & add given ISubscriber object as child subscription of `a`
const b1 = a.subscribe(trace("b"));
// form 1b: ...or add Subscription instance directly
const b2 = a.subscribe(subscription({ next(x) { console.log("b2", x); } }));

// form 2: compose & wrap given transducers as Subscription
// and add as child of `a`
// syntax sugar for: a.subscribe(comp(tx1, tx2,...))
const c = a.transform(
  filter((x: number) => x > 0 && x < 10),
  sideEffect((x) => console.log("c", x)),
  mapcat((x: number) => repeat(x, x))
);

// form 3: add subscription w/ own attached transducer
// (which will be pre-applied to incoming values before
// given `next()` receives values...)
const d = c.subscribe({ next(x) { console.log("d", x); } }, map((x: number) => x * 10));

a.next(2);

// output
// b1 1
// b2 1
// c 1
// d 10
// c 2
// d 20
// d 20
// b2 2
// b1 2

As you can (hopefully) see there's a conceptual difference between forms 2 & 3 in that they result in different topologies... In the latter, the transducer is part of the same processing node/unit (d), whereas in form 2 (c) the transformation is the only role of that resulting subscription instance. Anyone subscribing to c will only receive these transformed values. The same is true for any subscribers of d, but there d itself also performs other work...

Also, in all forms/cases (b, c, d), the child subscription will immediately be triggered once it's fully setup & attached to its parent and before .subscribe() returns. So I'm not quite following with your last bullet point (i.e. "To actually do the subscription and kick off the reactive flow" role of .subscribe()), and by extension, also not sure what you mean re: the laziness of the .pipe() method in RxJS. Can you please give a more concrete example? Are you asking for a version where child subscriptions (b,c or d in the example) will only receive a's future value(s) (here 2 in the above example), but not the already present initial value (here: 1) at the time of their subscription?

Sorry for asking, but I've got no hands-on experience with RxJS and no bandwidth to try out myself right now... thanks! :)

@allforabit
Copy link
Contributor Author

Wowza thanks for the super detailed reply! Actually I think my main issue isn't really about the complexity of the subscribe method and more about the complecting (haha nice clojure word) or connecting various streams and running them. In other words, building up a blueprint of a reactive flow first and then running it. I think this idea is actually available in the csp package that you did. From the readme:

// no real work has been executed thus far (only scheduled via promises)
// now kick off entire process by writing file paths into the 1st channel
paths.into(["src/channel.ts", "src/mult.ts", "src/pubsub.ts"]);

This is similar to how it would work in rx where the various streams are piped together to form a blueprint for the computation which is only kicked off when the subscription method is called. I've prepared a small code sandbox to show the differing behaviours:
https://codesandbox.io/s/rx-vs-rs-hm5se?file=/src/rs.ts Just comment out the subscribe method at the bottom of each file to see the differences.

The way that rx acheives this with it's pipe method is having it take a number of functions which take a stream and return stream (what they call an operator). These functions are composed together to form their pipeline. Here's an example of such an operator (from https://netbasal.com/creating-custom-operators-in-rxjs-32f052d69457):

function filterNil() {
  return function<T>(source: Observable<T>): Observable<T> {
    return new Observable(subscriber => {
      source.subscribe({
        next(value) {
          if(value !== undefined && value !== null) {
            subscriber.next(value);
          }
        },
        error(error) {
          subscriber.error(error);
        },
        complete() {
          subscriber.complete();
        }
      })
    });
  }
}

This is just a slight variation on the subs that are supplied with rstream (such as resolve) which just directly return the stream rather than wrapping it in a function that takes a stream and returns one. The actual pipe method is just (https://github.com/ReactiveX/rxjs/blob/master/src/internal/Observable.ts):

pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
    if (operations.length === 0) {
      return this as any;
    }

    return pipeFromArray(operations)(this);
  }

And pipeFromArray is:

export function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
  if (fns.length === 0) {
    return identity as UnaryFunction<any, any>;
  }

  if (fns.length === 1) {
    return fns[0];
  }

  return function piped(input: T): R {
    return fns.reduce((prev: any, fn: UnaryFunction<T, R>) => fn(prev), input as any);
  };
}

Actually it really looks like the same idea as transducers, except it's streams instead of reducers! So really I guess it's just about making function composition easier.

Sorry I'm probably not being overly clear so if there's anything that would help to get across the idea let me know.

@postspectacular
Copy link
Member

Thanks for that, Kevin! I think the missing piece of rstream information might be that .subscribe() is only eager & immediately triggering new child subscribers if and only if the parent already has a value ready. Consider the difference between:

const a = reactive(42);

// child sub will be immediately notified since `a` already has a value
a.subscribe({ next(x) { console.log("asub", x); } });
// asub 42

// vs.

// empty stream
const b = stream();

// child sub will NOT be immediately notified since `b` has NO value just yet
b.subscribe({ next(x) { console.log("bsub", x); } });

// only when pushing a new value into `b`, children will get notified henceforth
b.next(42)
// bsub 42

So for your codesandbox example (💯 thanks for that btw.!) you can achieve that behavior by either not pre-seeding the req$ stream with an URL yet (i.e. only pushing in the URL at the end) or defining $res in a slightly different way/order. See my forked sandbox...

Alternatively, you could also configure the input stream to NOT cache its value, thereby stopping eager subscription processing too...

const c = reactive(42, { cache: false });

// lazy
c.subscribe({ next(x) { console.log("csub", x); } });

c.next(43)
// csub 43

More tomorrow/coming days... Hope that makes more sense for now!

@allforabit
Copy link
Contributor Author

Ok yes that's really nice. In this use case you most likely wouldn't have the stream seeded with a value. I'll play around with that idea and see how I get on. Also I can see there's not much magic with the rxjs pipe method and it's really about composing functions which can be done with comp and similar. I'll let you know if I stumble onto anything interesting :-)

To be honest I've still to really properly dig into either rxjs or rstream and mostly use react.js, redux/xstate and good old fashioned promises to manage my reactive stuff. React is really discouraging using promises though primarily because of lack of cancellation. Streams work really well for this. For instance an rstream subscriptions can be used with useEffect as follows:

() => {
  const s = myStream.subscribe({
    next: setState
  });
 return () => {
    return s.unsubscribe():
  }
}

I think the best thing is to just get stuck in and use rstream more and try to understand how it works instead of trying to force it too much into the mental model I have with rxjs.

@postspectacular
Copy link
Member

@allforabit can this issue be closed now that #281 is in place or still things unclear/unhappy?

@allforabit
Copy link
Contributor Author

@postspectacular I've only looked through the changes and they look really great, the error handling in particular. Brilliant to get clarity on all of that. So yes this ticket can be closed, thanks v much!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants