Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Idiomatic Way to Explicitly Tell an Underlying Transform Stream to Flush to the Sink #960

Open
sebmarkbage opened this issue Nov 6, 2018 · 12 comments

Comments

@sebmarkbage
Copy link

sebmarkbage commented Nov 6, 2018

Sometimes transform streams keep some internal state and keeps a buffer window. It then waits to flush that buffer to the readable stream until the buffer window is full. Examples of this include compression and encryption libraries.

This can however cause problems when the source stream has to wait a long time for the next piece of data but the receiver can do something useful with it. For example, generating a HTML document that can be displayed in partial pieces, or sending a list of chat messages while waiting for the next message. In that case, the source knows that now is a good time to flush to the underlying sink even if that causes inefficient use of the buffer window.

In Node.js's Zlib module that is solved by exposing an explicit flush() method on the Writable. I believe .NET has a similar convention for all Streams.

I haven't seen an idiomatic way to solve this with Streams.

This gets particularly awkward with Service Workers because there is no WritableStream for Responses. So there isn't really anything to call flush on. So if the idea is to stick with ReadableStream being the only way, then this use case would need to be modeled by having the ReadableStream communicate this flush signal intent somehow.

@ricea
Copy link
Collaborator

ricea commented Nov 7, 2018

Currently the way you'd do this is by having the Zlib transform decide for itself to flush based on a timeout. This is obviously not good as the transform has no context about whether to flush or not.

I assume we'd want this to work in a pipe. Consider this pipe:

source
  .pipeThrough(a)
  .pipeThrough(zlib)
  .pipeThrough(b)
  .pipeTo(destination);

Who is responsible for issuing the signal? Only source knows that it doesn't expect to receive new data for a while, but only destination knows that it wants chunks ASAP.

I'm going to assume we get around that problem by telling source in some out-of-band fashion that we want it to issue flushes.

Sketching what the API would look like, TransformStream already has something called flush() so we'd have to use a different name. sync() seems as good as any. I expect the underlying source would call controller.sync() to indicate that it isn't expecting to have new chunks for a while. pipeTo() would convert this to a call to writer.sync().

In order for the sync signal to pass through a without change, the default behaviour if sync() is not supplied should be to call controller.sync(). Or maybe the sync signal should always pass through a TransformStream regardless of whether sync() is defined or not.

zlib would have an implementation of sync() that flushes any currently buffered data, and passes on the sync signal. If destination was something like a buffered file, it might use the sync signal to flush any buffers to disk.

This is a lot of additional complexity, and we'd need some compelling use cases to justify it.

@sebmarkbage
Copy link
Author

My use case is React server rendering. TBH, all my current use cases where gzip/encryption comes up are for using web Streams as the API on the server instead of Node.js streams. While being able to use the same API in a SW.

It would be a blocker for adopting the Stream API on the server but not in a browser client or SW. I’m not sure how strong the motivation is to support that use case. One possible solution to this would be to fork/patch the server versions of this API. Eg in Node.js.

Others might have more compelling use cases on the client though.

@ricea
Copy link
Collaborator

ricea commented Nov 7, 2018

Server-side use cases are still important. I don't want a permanent fork of the standard on the server-side, but maybe a temporary addition to the API to explore how well it works in the server environment would be a good way to explore this area.

@domenic
Copy link
Member

domenic commented Nov 7, 2018

@sebmarkbage could you give a code example of the scenario in question? Is the scenario something like this?

event.respondWith(new Response(someReadableStream.pipeThrough(someBufferingTransform)));

And when you say "the source knows that now is a good time to flush to the underlying sink", is "the source" in this example someReadableStream or is it the author of the above line of code?

@sebmarkbage
Copy link
Author

Yes specifically it would be something like:

let reactStream = React.renderToReadableStream(reactComponents);
event.respondWith(new Response(reactStream.pipeThrough(gzipTransform));

Possibly with an extra .pipeThrough(encryptionTransform).

The reactStream is the source that knows that it's a good time to flush. Concretely, if the React components deep in the tree are waiting for I/O, and there is nothing more in the queue to process, then now it's a good time to flush what we have right now.

@domenic
Copy link
Member

domenic commented Nov 7, 2018

Thanks. I was vaguely hoping we could do this as part of the piping process. E.g. using pipeThrough(zlib, { signal }) where signal is a subclass of AbortSignal (call it PipeSignal) whose corresponding PipeController has a method like requestFlush(). But, that would put the responsibility in the hands of the author of the service worker code, whereas you've clarified that it would be better propagating automatically from the readable stream.

In that case @ricea's sketch seems like the obvious design. I wonder if generalizing it so that you could propagate more messages (of which "flush" or "sync" was just one) would make it solve enough use cases to be worth the extra complexity.

@ricea
Copy link
Collaborator

ricea commented Oct 15, 2019

Related: ricea/compressstream-explainer#3.

@ricea
Copy link
Collaborator

ricea commented Jan 30, 2020

This is useful for WebCodecs: https://github.com/WICG/web-codecs/blob/master/explainer.md. So I'm considering increasing the priority of this request.

In addition to passing a sync down a pipe from the source, it might also be useful to have a method on TransformStream to be able to start a sync at a particular point in a pipe.

@MattiasBuelens can I ask for your opinion on this as another implementer?

@sandersdan
Copy link

It's probably worth laying out the goals that WebCodes has for a flush implementation:

  • Flush is used primarily at end-of-stream, where flush completion is an important signal to the rendering pipeline. For looping content, completion of such a flush triggers a backwards seek.
  • Without reset (Reset for TransformStream (and WritableStream?) #1026), flush would also be used as a seek barrier. There could be multiple in flight and being able to attach data (the seek target) would be very useful.
  • WebCodecs can specify its own flush message. The main advantage of specifying flush in Streams would be interoperability with other Streams users, and it would be unfortunate to design our own, incompatible thing.

@MattiasBuelens
Copy link
Collaborator

I think sync() would be a good addition. I haven't yet run into a use case for it myself though, so I don't have any examples other than the ones you've already mentioned.

I'm not sure about the name. I would have liked flush(), but that already has a meaning for transform streams. yield() could work, but that could get confusing with the yield keyword (although the same could be said of Promise.catch(), Promise.finally() and Iterator.return()). Anyway, this might just be me bikeshedding. 😅

@ricea
Copy link
Collaborator

ricea commented Feb 4, 2020

Bikeshedding in moderation is welcome 😸

My current position is that the use cases are sufficiently compelling to add sync() methods to ReadableStream and TransformStream controllers, underlying transformers and sinks, and WritableStream writers.

I'm worried that adding sync() methods to ReadableStream and TransformStream objects, which work even when locked, would make offloading pipes onto another thread unduly difficult. We could consider that option separately.

@gaearon
Copy link

gaearon commented Nov 18, 2021

More context on how we're using this in React: reactwg/react-18#91

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

No branches or pull requests

6 participants