-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement node eval streaming #4251
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
9 Ignored Deployments
|
🟢 CI successful 🟢Thanks |
/// The StreamState actually holds the data of a Stream. | ||
pub enum StreamState<T> { | ||
/// An OpenStream is tied directly to a source stream, and will lazily pull | ||
/// new values out as a reader reaches the end of our already-pulled | ||
/// data. | ||
OpenStream { | ||
source: Box<dyn StreamTrait<Item = T> + Send + Sync + Unpin>, | ||
pulled: Vec<T>, | ||
}, | ||
|
||
/// A Closed stream state cannot be pushed to, so it's anyone polling can | ||
/// read all values at their leisure. | ||
Closed(Box<[T]>), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- It's a little bit weird that the pulled data is either in
OpenStream.pulled
or inClosed.0
. I would hoist that into a shared field. - The current design doesn't allow to free memory of the stream, even if all reader have read a part of the stream. That doesn't allow to use the stream with infinite stream. That's currently not needed, but would be a cool feature to have.
So I would do it this way
/// The StreamState actually holds the data of a Stream. | |
pub enum StreamState<T> { | |
/// An OpenStream is tied directly to a source stream, and will lazily pull | |
/// new values out as a reader reaches the end of our already-pulled | |
/// data. | |
OpenStream { | |
source: Box<dyn StreamTrait<Item = T> + Send + Sync + Unpin>, | |
pulled: Vec<T>, | |
}, | |
/// A Closed stream state cannot be pushed to, so it's anyone polling can | |
/// read all values at their leisure. | |
Closed(Box<[T]>), | |
} | |
/// The StreamState actually holds the data of a Stream. | |
pub struct StreamSegment<T> { | |
/// Data in the current segment. More can be pushed into it as it's pulled from the source. | |
/// When this reaches a limit, it will create a new segment into `next` rather than pushing more | |
/// items to the buffer. This allows to deallocate previous segements once all reader have consumed them. | |
buffer: Vec<T>, | |
/// What follows after this segment. | |
/// Might be `None` when the source is ended/closed. | |
next: Option<StreamSegmentNext<T>>, | |
} | |
enum StreamSegmentNext<T> { | |
/// A stream is tied directly to a source stream, and will lazily pull | |
/// new values out as a reader reaches the end of our already-pulled | |
/// data. | |
Source(Box<dyn StreamTrait<Item = T> + Send + Sync + Unpin>), | |
/// Another segment is following. This forms a linked list of segments to | |
/// allow deallocating memory of previous segments. | |
Segment(Arc<Mutex<StreamSegment<T>>> | |
} |
Should streams have a notation of "errored"? If so we might want to add StreamSegementNext::Error(SharedError)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current design doesn't allow to free memory of the stream, even if all reader have read a part of the stream. That doesn't allow to use the stream with infinite stream. That's currently not needed, but would be a cool feature to have.
That would mean the Vc which holds a stream could never be reused, and any tasks which depend on it need to rerun the node evaluation fully.
I can make join the open/closed stream into the same struct by wrapping the source
in an Option<dyn StreamTrait>
.
Should streams have a notation of "errored"? If so we might want to add StreamSegementNext::Error(SharedError)?
Stream
is generic, the wrappers can (and do) handle error state with Result<T>
inside the stream.
@@ -75,13 +76,11 @@ async fn run_proxy_operation( | |||
.send(RenderProxyOutgoingMessage::Headers { data: &data }) | |||
.await?; | |||
|
|||
let body = body.await?; | |||
let mut body = body.await?.read(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When StreamRead
is merged into Stream
, you would fork
/clone
the cached stream to consume it, while leaving the cached stream untouched.
let mut body = body.await?.read(); | |
let mut body = body.await?.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like this change. The index can't be used on the original value, because it would only screw up all readers if it were ever read from originally. The split between the immutable source and the mutable read iteration isn't any different than having a dedicated iter type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The index can't be used on the original value, because it would only screw up all readers if it were ever read from originally.
Rust mutability prevents reading from the cached value since you don't have a mutable reference to it.
Also readers won't references the original stream. They have a cloned stream with a shared StreamState
. StreamState is still immutable in that sense since we never remove items from pulled
.
With
pub struct Stream<T> {
inner: Arc<Mutex<StreamState<T>>>,
/// Number of items consumed in the current segment.
/// Might revert back to 0 when reached the end of a segment
consumed: usize,
}
you would need a &mut Stream
to read from it, which is only possible if you are the only owner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rust mutability prevents reading from the cached value since you don't have a mutable reference to it.
What I mean is imagine you clone from an already mutated stream:
let mut s = Stream(…);
// This advances `s.index`
s.next.await?;
// Inherits the advanced index
s.clone()
So by mutating the index, any clones that want to get access to the full stream would be broken. I don't see why we want to store this on the struct itself anyways, we don't store iteration index as part of a Vec<T>
. If you want to read and iterate, you alloc your own Iter
to hold your position.
pub enum JavaScriptEvaluation { | ||
/// An Empty is only possible if the evaluation completed successfully, but | ||
/// didn't send an error nor value. | ||
Empty, | ||
/// A Single is only possible if the evaluation returns either an error or | ||
/// value, and never sends an intermediate value. | ||
Single(EvaluationItem), | ||
/// A Stream represents a series of intermediate values and followed by | ||
/// either an error or a ending value. A stream is never empty. | ||
Stream(#[turbo_tasks(trace_ignore, debug_ignore)] JavaScriptStream), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be easier to consume when it is always an stream.
pub enum JavaScriptEvaluation { | |
/// An Empty is only possible if the evaluation completed successfully, but | |
/// didn't send an error nor value. | |
Empty, | |
/// A Single is only possible if the evaluation returns either an error or | |
/// value, and never sends an intermediate value. | |
Single(EvaluationItem), | |
/// A Stream represents a series of intermediate values and followed by | |
/// either an error or a ending value. A stream is never empty. | |
Stream(#[turbo_tasks(trace_ignore, debug_ignore)] JavaScriptStream), | |
} | |
pub enum JavaScriptEvaluation(#[turbo_tasks(trace_ignore, debug_ignore)] JavaScriptStream) |
Maybe add StreamExt::into_single_value()
which gets the only value from stream and throws when it's empty, more than one value or the stream errored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried implementing this, and it just makes everything that consumes a single value (all of our config processing) more difficult to reason about:
- let JavaScriptEvaluation::Single(Ok(val)) = &*config_value else {
+ let val = match config_value.into_single().await {
+ Ok(Some(Ok(val))) => val,
+ _ => {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm?
- let JavaScriptEvaluation::Single(Ok(val)) = &*config_value else {
+ let Some(val) = config_value.into_single().await? else {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently the configs handle errors by defaulting, moving the unwrap into the ?
would prevent that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I figured out a return value that makes this palatable.
One side effect of this is that any code that handles either a single value or a stream of values must ensure they fully close the stream so the NodeJsOperation
is released back to the pool. Eg, the route
method can take receive either a single result for Rewrite
, or a stream for Middleware
responses. I can't call into_single
for this, because I need the individual results for the middleware. For the rewrite response, I just called the read.next().await
a single time. But that left the stream midway, and the process was never released to the pool. I had to add a bit of code to call into the stream one more time.
/// The StreamState actually holds the data of a Stream. | ||
pub enum StreamState<T> { | ||
/// An OpenStream is tied directly to a source stream, and will lazily pull | ||
/// new values out as a reader reaches the end of our already-pulled | ||
/// data. | ||
OpenStream { | ||
source: Box<dyn StreamTrait<Item = T> + Send + Sync + Unpin>, | ||
pulled: Vec<T>, | ||
}, | ||
|
||
/// A Closed stream state cannot be pushed to, so it's anyone polling can | ||
/// read all values at their leisure. | ||
Closed(Box<[T]>), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current design doesn't allow to free memory of the stream, even if all reader have read a part of the stream. That doesn't allow to use the stream with infinite stream. That's currently not needed, but would be a cool feature to have.
That would mean the Vc which holds a stream could never be reused, and any tasks which depend on it need to rerun the node evaluation fully.
I can make join the open/closed stream into the same struct by wrapping the source
in an Option<dyn StreamTrait>
.
Should streams have a notation of "errored"? If so we might want to add StreamSegementNext::Error(SharedError)?
Stream
is generic, the wrappers can (and do) handle error state with Result<T>
inside the stream.
@@ -75,13 +76,11 @@ async fn run_proxy_operation( | |||
.send(RenderProxyOutgoingMessage::Headers { data: &data }) | |||
.await?; | |||
|
|||
let body = body.await?; | |||
let mut body = body.await?.read(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like this change. The index can't be used on the original value, because it would only screw up all readers if it were ever read from originally. The split between the immutable source and the mutable read iteration isn't any different than having a dedicated iter type.
f00e769
to
0248991
Compare
// need to spawn a new thread to continually pull data out of the process, | ||
// and ferry that along. | ||
let (sender, stream) = JavaScriptStream::new_open(vec![Ok(data.into())]); | ||
tokio::spawn(async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having a disconnected tokio task, would disconnect any action below from the turbo_tasks Task. This has a few effects:
- we can't read Vcs below (currently there are non, but my SourceMap PR will add that since process output will be source mapped)
- dropping the Stream won't get rid of the Task and will will continue to run independent from the stream.
Instead any execution should happen when polling the stream (similar to futures). So the pattern (sender, stream)
won't work well. Instead implement a stream with poll_next
that only does something when polled. That sounds a bit involved, but luckily there are macros for that: https://docs.rs/async-stream/latest/async_stream/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
0248991
to
6d1db9b
Compare
|
||
impl<T: Clone + PartialEq> PartialEq for Stream<T> { | ||
// A Stream is equal if it's the same internal pointer, or both streams are | ||
// closed with equivalent values. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has an interesting behavior where a === b can change after the streams have been read to the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also a === b might change after serialization...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need Eq?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has an interesting behavior where a === b can change after the streams have been read to the end.
Yes, but it can only go from false -> true.
Also a === b might change after serialization...
Not quite, it already needs to be closed before serialization, so it's the same "has the source been fully read" case.
Do we need Eq?
Isn't that a requirement for use in a Vc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but it can only go from false -> true.
hmm... technically this would be fine for Vcs, but it still feels a bit weird...
Isn't that a requirement for use in a Vc?
You could opt-out of the eq check on celling...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good...
If you want to merge it make sure the next.js PR passes CI, merge this one, release a tag, update the next.js with the tag, and merge that.
Co-authored-by: Alex Kirszenberg <[email protected]>
6d1db9b
to
5ad3155
Compare
Fun! This depends on vercel/turborepo#4251 to implement streamed Node evaluations, giving us the ability to support streamed middleware responses. This is just the first step to supporting RSC streaming in Turbopack. I chose to start with this because it requires all the same base logic, and I understand the full router->middleware->HTTP server code path, so it's a lot easier to work on. Fixes WEB-738
### Description This implements streaming Node evaluation using an fun generic `Stream<T>`, usable within any Vc (I'm kinda surprised it works). Think of it a bit like a `DuplexStream`, it implements both a reader (that can be streaming read with the `Stream` trait) and a writer (that can be sent cross threads). As new values are written to the stream, any readers get woken up to receive the next value. Included as part of this is a rework of our "bytes" implementations, extracting some logic into a new `turbo-tasks-bytes` crate. Eventually, `Rope` should also move out of `turbo-tasks-fs` into this crate. `BytesValue` are cheaply clonable (they're just a wrapper around `bytes::Bytes`), making them a perfect low-level type to hold the data being sent between processes and threads. They're also easy to convert to and from a real `Bytes`, so that they can be streamed into our `hyper::Server`. ### Testing Instructions Paired next.js PR: #47264 Fixes WEB-243 --------- Co-authored-by: Alex Kirszenberg <[email protected]>
### Description This implements streaming Node evaluation using an fun generic `Stream<T>`, usable within any Vc (I'm kinda surprised it works). Think of it a bit like a `DuplexStream`, it implements both a reader (that can be streaming read with the `Stream` trait) and a writer (that can be sent cross threads). As new values are written to the stream, any readers get woken up to receive the next value. Included as part of this is a rework of our "bytes" implementations, extracting some logic into a new `turbo-tasks-bytes` crate. Eventually, `Rope` should also move out of `turbo-tasks-fs` into this crate. `BytesValue` are cheaply clonable (they're just a wrapper around `bytes::Bytes`), making them a perfect low-level type to hold the data being sent between processes and threads. They're also easy to convert to and from a real `Bytes`, so that they can be streamed into our `hyper::Server`. ### Testing Instructions Paired next.js PR: #47264 Fixes WEB-243 --------- Co-authored-by: Alex Kirszenberg <[email protected]>
### Description This implements streaming Node evaluation using an fun generic `Stream<T>`, usable within any Vc (I'm kinda surprised it works). Think of it a bit like a `DuplexStream`, it implements both a reader (that can be streaming read with the `Stream` trait) and a writer (that can be sent cross threads). As new values are written to the stream, any readers get woken up to receive the next value. Included as part of this is a rework of our "bytes" implementations, extracting some logic into a new `turbo-tasks-bytes` crate. Eventually, `Rope` should also move out of `turbo-tasks-fs` into this crate. `BytesValue` are cheaply clonable (they're just a wrapper around `bytes::Bytes`), making them a perfect low-level type to hold the data being sent between processes and threads. They're also easy to convert to and from a real `Bytes`, so that they can be streamed into our `hyper::Server`. ### Testing Instructions Paired next.js PR: #47264 Fixes WEB-243 --------- Co-authored-by: Alex Kirszenberg <[email protected]>
Description
This implements streaming Node evaluation using an fun generic
Stream<T>
, usable within any Vc (I'm kinda surprised it works). Think of it a bit like aDuplexStream
, it implements both a reader (that can be streaming read with theStream
trait) and a writer (that can be sent cross threads). As new values are written to the stream, any readers get woken up to receive the next value.Included as part of this is a rework of our "bytes" implementations, extracting some logic into a new
turbo-tasks-bytes
crate. Eventually,Rope
should also move out ofturbo-tasks-fs
into this crate.BytesValue
are cheaply clonable (they're just a wrapper aroundbytes::Bytes
), making them a perfect low-level type to hold the data being sent between processes and threads. They're also easy to convert to and from a realBytes
, so that they can be streamed into ourhyper::Server
.Testing Instructions
Paired next.js PR: vercel/next.js#47264
Fixes WEB-243