Skip to content
This repository has been archived by the owner on Jan 26, 2022. It is now read-only.

Why is this not modelled on top of Observables? #20

Closed
b-strauss opened this issue Feb 9, 2016 · 12 comments
Closed

Why is this not modelled on top of Observables? #20

b-strauss opened this issue Feb 9, 2016 · 12 comments

Comments

@b-strauss
Copy link

Is there any reason not to use Observables or Streams for this? As far as I understand these are push based APIs. Why do we need 2, in the future potentially 3 with Streams, APIs for a single type of effect?

For all other type of effects I know exactly what to use (T, Promise<T>, Iterator<T>). What's the fourth one? Observable<T>? Stream<T>? AsyncIterator<T>?

Maybe I don't see the big picture, but I don't understand why we can't have a unified interface for this.

@RangerMauve
Copy link

Streams are too heavy for implementing async iterators because of all the other baggage that they have. However it would make sense for streams to implement the async iteration protocol so they could be consumed in for-of loops.

Observables seem more likely to be useful for implementing async iterators, but they're a bit different in that you subscribe to the source and have data pushed to you, whereas iterators are about pulling the data.

Also, extending the existing concepts for regular iterators to be async seems to make sense.

Also, the main difference between Streams and Observables, by the looks of it, is that streams are more focused on binary data processing and have back pressure, whereas Observables are more for reacting to sequences of events over time.

@zenparsing
Copy link
Member

Great question, thanks!

So the first thing to consider is that async iterators aren't simply push-based. The result of each iteration is delivered asynchronously through a promise, but each iteration is requested by the consumer calling next with (maybe) a value. This setup has some nice properties:

  • It keeps the interfaces for sync iteration and async iteration similar, so that programmers with an understanding of promises and iterators don't have to learn an additional API.
  • Backpressure is inherent, because the consumer doesn't request the next item until they are ready for it.
  • Async generators have all of the power of sync generators (with "throw" and "return" methods) without having to awkwardly have a consumer send back information from a next handler (as you might need if you based the design around Observables).

Also, async iterators can easily be adapted to observables:

function toObservable(asyncIterable) {
    return new Observable(observer => {
        let stop = false;
        async function drain() {
            for await (let x of asyncIterable) {
                if (stop) break;
                observer.next(x);
            }
        }
        drain().then(x => observer.complete(x), err => observer.error(err));
        return _=> { stop = true; }
    });
}

The big picture is that support for async iterators is built into the language syntax via async generator functions and for-await. Observables and Streams are library abstractions which sit on top of the language and can expose async iterators for consumption by for-await, similar to how collection APIs expose iterators for consumption by for-of.

Hope that helps!

@b-strauss
Copy link
Author

This all sounds logical to me and I think I understand the differences of these APIs. What I have a problem to understand, and that might be a bit naive from a simple API consumer point of view, is that there is already a system that I have used that does all these things (events and I/O) with a unified interface.

Dart:
Asynchronous Programming: Streams
Dart's Stream class
Single-Subscription vs. Broadcast Streams

And there seems to be at least some kind of recognition about this, because the same question, whether there is some kind of unifiable pattern, was raised 2014 on JSConf.

As I said my view might be a bit naive, without knowing the specific implementation details, but for someone with a Dart background it just seems a bit odd to me that I now have to use 3 seperate APIs to solve problems that are solvable with a single interface in Dart.

@RangerMauve
Copy link

I think it's due to people having different focuses. Streams in JS are more focused on processing data, like binary data. Observables aim to be as purely functional and low level as possible, which isn't as useful for certain applications. Async iterators seem to be just taking the existing concepts of generators and mixing them with async functions as a logical next step.

Personally, I'm very excited about async iterators out of all these new specs the most.

@b-strauss
Copy link
Author

I'm also looking forward to all of these, it just seems a bit like a lost chance to me to not model these around some unified interface. It feels a bit like Array and NodeList. :/

@zenparsing
Copy link
Member

I'm also looking forward to all of these, it just seems a bit like a lost chance to me to not model these around some unified interface. It feels a bit like Array and NodeList.

Feel free to consider async iterators as the unified protocol for async sequences : )

@b-strauss
Copy link
Author

Yeah, maybe someone will write some kind of abstractions around all of these. :)

Anyway i will close this issue, thanks for the explanations.

@drpicox
Copy link

drpicox commented Jul 19, 2017

but each iteration is requested by the consumer calling next with (maybe) a value.

That's right. But, why? Is it useful?
You can use a parameter with another iterator instead of returning values through next.
In fact, the parameter of next is counterintuitive:

function *c(a) {
  a = yield a + 10;
  a = yield a + 20;
  a = yield a + 30;
  a = yield a + 40;
}

it = c(1);
console.log(it.next(2).value); // Result: 11
console.log(it.next(3).value); // Result: 23  (!?)
console.log(it.next(4).value); // Result: 34

I am a kind of noob with Javascript generators, so any hint about it would be appreciated (although I have been programming Javascript for almost 20 years, and I have even written parallel C++ compilers). In fact, except to emulate async function I never have seen any other use.

Async functions are great, its adoption is very fast, and people really prefer to use them to other solutions. They relay in Promises, which are really well known and have been proved very useful, and they map great against Promises.

Observables are also great. Although they have a huge cognitive load (so many functions, considerations, ...) they have been proved to be very useful.

So because all of this, may be it should be considered the fact to map async iterators into Observables. It is very likely (I should write some example) that cognitive load of Observables will be reduced drastically.

@Jamesernator
Copy link

@drpicox Observables aren't directly comparable to async generators, async generators do nothing unless .next is called (or unless they impose extra work upon themselves) e.g.:

async function* animationFrames() {
    while (true) {
        yield new Promise(requestAnimationFrame)
    }
}

// vs
function animationFrames() {
     return new Observable(observer => {
     let frame    
     function step() {
        frame = requestAnimationFrame(time => {
            observer.next(time)
            step()
        })
    }
    step()
    return _ => clearAnimationFrame(frame)
}

If you use observable then every single animation frame will cause the callback to invoked which may cause jank or backpressure if the consumer of the frames can't keep up e.g.:

async function main() {
    for await (const time of animationFrames()) {
        // At worst the user sees a few frames skipped sometimes but
        // this will always be roughly in sync with the frames
        longOperationSometimesSpanningMultipleFrames(time)
    }
}

function main() {
    animationFrames().subscribe({
        // If longOperations cause a slowdown of frame rate then
        // it'll appear like are happening slower, e.g. if the operation
        // takes 5 animation frames sometimes then those 5 frames
        // will still definitely happen just late
        next: longOperationSometimesSpanningMultipleFrames
    })
}

This happens because async iterators are lazy by value but observables are lazy by subscription, it's not to say async generators are better, it's harder to represent events where every event must be responded to as async generators but not impossible.

By all means use what works for your project, Observables are probably going to be async iterable (and if not you can always create operators to do it) in some way and Observable.from will almost certainly convert async iterables so they should be fairly interoperable.


The value being sent to the async iterable is so that it's the same as generators in this regard, it can be used to implement coroutines that can also await.

Basically coroutines allow two way communication, e.g. you yield values out and you can send responses back in, they don't really have the same use cases as iterators even if they have the same syntax.

When sending a value to .next you're just replacing the location of yield with the value you send back in:

function* myGen() {
    const val = yield 1
    const val2 = yield 2 + val
}

const gen = myGen()
// There's no point that's stopped in the generator for the value 9999
const request1 = gen.next(9999).value
// But after calling gen.next(9999) the generator is stopped at
// the first yield so it can get a response back for that yield
const request2 = gen.next(request1 + 17).value
console.log(request2) // 20 which is 18 (the value sent to the first yield)
                      // plus 2

Coroutines can be useful if you want things to be decoupled from how they're interpreted e.g.:

function* storeInDatabase(data) {
    // The executor need not return any particular implementation
    // of connection
    try {
         // Suppose the database did not exist the executor can even use
         // .throw to cause an error at this point
         const connection = yield ['open', 'the-database-uri.tld/database']
    } catch (err) {
         return false
    }
    // The database might not even need to do transactions if it's synchronous
    // So an executor could choose to ignore this
    yield ['beginTransaction', connection]
    for (const { user, value } of data) {
       // The coroutine engine can aggregate these
       // or do any optimizations it might want to
       yield ['setData', user,  value]
    }
    yield ['endTransaction', connection]
    // An executor could even choose to do things like revoking the
    // database connection here once the function has finished executing
    return true
}

Async generators do nothing more to extend this than to allow awaiting promises within the generator which is probably useful if you need to await for things that are the same across platforms.

@drpicox
Copy link

drpicox commented Jul 20, 2017

Uhmmmm, you made your point with the backpressure.

But the example that you have shown of animationFrames may be wrong because the first frame is generated before anyone request it, so, probably the first call will get an obsolete result.

And I do not like the example storeInDatabase, you can achieve the same with an async function and using the database as parameter:

async function storeInDatabase(database, data) {
    // The db need be any particular implementation
    // of connection
    try {
         // Suppose the database did not exist the db can even use
         // throw to cause an error at this point
         const connection = await db.open('the-database-uri.tld/database');
    } catch (err) {
         return false
    }
    // The database might not even need to do transactions if it's synchronous
    // So db could choose to ignore this
    await db.beginTransaction(connection);
    for (const { user, value } of data) {
       // The db engine can aggregate these
       // or do any optimizations it might want to
       await db.setData(user,  value)
    }
    await db.endTransaction(connection);
    // A db could even choose to do things like revoking the
    // database connection here once the function has finished executing
    return true
}

I will investigate a little bit more about Observables and backpresure (there is some existing work: https://github.com/ReactiveX/RxJava/wiki/Backpressure). Backpressure exists when Observables are synchronous, may be now is the good timing to fix https://github.com/tc39/proposal-observable .

@Jamesernator
Copy link

Actually the first animation frame won't be obselete, generators don't execute their first section until the first .next call is done e.g.:

function* gen(x) {
    console.log("Hello!")
    yield x
    console.log(x)
}

const g = gen(10) // Nothing logged yet
g.next() // logs "Hello!"
g.next() // logs 10

// Same thing with async generators, the first part of the generator
// code is not executed until the first call to next

async function* animationFrames() {
    for (let i = 0 ; ; i++) {
        console.log(i)
        yield new Promise(requestAnimationFrame)
    }
}

async function main() {
    const frames = animationFrames()
    // Nothing is printed yet
    await frames.next() // Only here is 0 printed
    await frames.next() // 1
}

main()

I'm not really sure of any great examples of coroutines as I haven't used them very much. Perhaps someone who knows more about coroutines would be able to offer better explanations. This blog might offer some insights into the design of generators having two way communication though.

@drpicox
Copy link

drpicox commented Jul 20, 2017

I did not know it. Thanks!

Btw:
I love coroutines and I have used a lot of them in developments in C and C++. The typical case of usage is a lexer/parser coroutines (flex/bison), but the best application that I found was to emulate system calls in a multicore processor simulator that I built.

You can learn more here about C & C++ corountines:

#define crBegin static int state=0; switch(state) { case 0:
#define crReturn(i,x) do { state=i; return x; case i:; } while (0)
#define crFinish }
int function(void) {
    static int i;
    crBegin;
    for (i = 0; i < 10; i++)
        crReturn(1, i);
    crFinish;
}

May be my missing concepts come from how this implementation worked and I have seen no usage of function* except to emulate async/await.

May be function* would be useful to compute lists like head:[...tail] like functional like languages does, that they only compute things as soon as they are requested so they can deal with infinite lists.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants