Skip to content

Commit

Permalink
Implement node eval streaming (vercel/turborepo#4251)
Browse files Browse the repository at this point in the history
### Description

This implements streaming Node evaluation using an fun generic
`Stream<T>`, usable within any Vc (I'm kinda surprised it works). Think
of it a bit like a `DuplexStream`, it implements both a reader (that can
be streaming read with the `Stream` trait) and a writer (that can be
sent cross threads). As new values are written to the stream, any
readers get woken up to receive the next value.

Included as part of this is a rework of our "bytes" implementations,
extracting some logic into a new `turbo-tasks-bytes` crate. Eventually,
`Rope` should also move out of `turbo-tasks-fs` into this crate.
`BytesValue` are cheaply clonable (they're just a wrapper around
`bytes::Bytes`), making them a perfect low-level type to hold the data
being sent between processes and threads. They're also easy to convert
to and from a real `Bytes`, so that they can be streamed into our
`hyper::Server`.

### Testing Instructions

Paired next.js PR: #47264

Fixes WEB-243

---------

Co-authored-by: Alex Kirszenberg <[email protected]>
  • Loading branch information
jridgewell and alexkirsz authored Mar 23, 2023
1 parent ddc71d0 commit 252c958
Show file tree
Hide file tree
Showing 22 changed files with 567 additions and 974 deletions.
24 changes: 24 additions & 0 deletions crates/turbo-tasks-bytes/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "turbo-tasks-bytes"
version = "0.1.0"
description = "TBD"
license = "MPL-2.0"
edition = "2021"

[lib]
bench = false

[dependencies]
anyhow = { workspace = true }
bytes = { workspace = true }
futures = { workspace = true }
serde = { workspace = true }
serde_bytes = "0.11.9"
tokio = { workspace = true }
turbo-tasks = { workspace = true }

[dev-dependencies]
serde_test = "1.0.157"

[build-dependencies]
turbo-tasks-build = { workspace = true }
5 changes: 5 additions & 0 deletions crates/turbo-tasks-bytes/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use turbo_tasks_build::generate_register;

fn main() {
generate_register();
}
120 changes: 120 additions & 0 deletions crates/turbo-tasks-bytes/src/bytes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use std::{
ops::Deref,
str::{from_utf8, Utf8Error},
};

use anyhow::Result;
use bytes::Bytes as CBytes;
use serde::{Deserialize, Deserializer, Serialize, Serializer};

/// Bytes is a thin wrapper around [bytes::Bytes], implementing easy
/// conversion to/from, ser/de support, and Vc containers.
#[derive(Clone, Debug, Default)]
#[turbo_tasks::value(transparent, serialization = "custom")]
pub struct Bytes(#[turbo_tasks(trace_ignore)] CBytes);

impl Bytes {
pub fn to_str(&self) -> Result<&'_ str, Utf8Error> {
from_utf8(&self.0)
}
}

impl Serialize for Bytes {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
serde_bytes::Bytes::new(&self.0).serialize(serializer)
}
}

impl<'de> Deserialize<'de> for Bytes {
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let bytes = serde_bytes::ByteBuf::deserialize(deserializer)?;
Ok(Bytes(bytes.into_vec().into()))
}
}

impl Deref for Bytes {
type Target = CBytes;
fn deref(&self) -> &Self::Target {
&self.0
}
}

/// Types that implement From<X> for Bytes {}
/// Unfortunately, we cannot just use the more generic `Into<Bytes>` without
/// running afoul of the `From<X> for X` base case, causing conflicting impls.
pub trait IntoBytes: Into<CBytes> {}
impl IntoBytes for &'static [u8] {}
impl IntoBytes for &'static str {}
impl IntoBytes for Vec<u8> {}
impl IntoBytes for Box<[u8]> {}
impl IntoBytes for String {}

impl<T: IntoBytes> From<T> for Bytes {
fn from(value: T) -> Self {
Bytes(value.into())
}
}

impl From<CBytes> for Bytes {
fn from(value: CBytes) -> Self {
Bytes(value)
}
}

impl From<Bytes> for CBytes {
fn from(value: Bytes) -> Self {
value.0
}
}

#[cfg(test)]
mod tests {
use bytes::Bytes as CBytes;
use serde_test::{assert_tokens, Token};

use super::Bytes;
impl PartialEq<&str> for Bytes {
fn eq(&self, other: &&str) -> bool {
self.0 == other
}
}

#[test]
fn into_bytes() {
let s = "foo".to_string();
assert_eq!(Bytes::from(b"foo" as &'static [u8]), "foo");
assert_eq!(Bytes::from("foo"), "foo");
assert_eq!(Bytes::from(s.as_bytes().to_vec()), "foo");
assert_eq!(Bytes::from(s.as_bytes().to_vec().into_boxed_slice()), "foo");
assert_eq!(Bytes::from(s), "foo");
}

#[test]
fn serde() {
let s = Bytes::from("test");
assert_tokens(&s, &[Token::Bytes(b"test")])
}

#[test]
fn from_into() {
let b = Bytes::from("foo");
let cb = CBytes::from("foo");
assert_eq!(Bytes::from(cb), "foo");
assert_eq!(CBytes::from(b), "foo");
}

#[test]
fn deref() {
let b = Bytes::from("foo");
assert_eq!(*b, CBytes::from("foo"));
}

#[test]
fn to_str() {
let cb = Bytes::from("foo");
assert_eq!(cb.to_str(), Ok("foo"));

let b = Bytes::from("💩".as_bytes()[0..3].to_vec());
assert!(b.to_str().is_err());
}
}
12 changes: 12 additions & 0 deletions crates/turbo-tasks-bytes/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
pub mod bytes;
pub mod stream;

pub use crate::{
bytes::{Bytes, BytesVc},
stream::{Stream, StreamRead},
};

pub fn register() {
turbo_tasks::register();
include!(concat!(env!("OUT_DIR"), "/register.rs"));
}
200 changes: 200 additions & 0 deletions crates/turbo-tasks-bytes/src/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
use std::{
fmt,
pin::Pin,
sync::{Arc, Mutex},
task::{Context as TaskContext, Poll},
};

use anyhow::Result;
use futures::{Stream as StreamTrait, StreamExt};
use serde::{Deserialize, Deserializer, Serialize, Serializer};

/// Streams allow for streaming values from source to sink.
///
/// A Stream implements both a reader (which implements the Stream trait), and a
/// writer (which can be sent to another thread). As new values are written, any
/// pending readers will be woken up to receive the new value.
#[derive(Clone, Debug)]
pub struct Stream<T: Clone> {
inner: Arc<Mutex<StreamState<T>>>,
}

/// The StreamState actually holds the data of a Stream.
struct StreamState<T> {
source: Option<Box<dyn StreamTrait<Item = T> + Send + Unpin>>,
pulled: Vec<T>,
}

impl<T: Clone> Stream<T> {
/// Constructs a new Stream, and immediately closes it with only the passed
/// values.
pub fn new_closed(pulled: Vec<T>) -> Self {
Self {
inner: Arc::new(Mutex::new(StreamState {
source: None,
pulled,
})),
}
}

/// Crates a new Stream, which will lazily pull from the source stream.
pub fn new_open<S: StreamTrait<Item = T> + Send + Unpin + 'static>(
pulled: Vec<T>,
source: S,
) -> Self {
Self {
inner: Arc::new(Mutex::new(StreamState {
source: Some(Box::new(source)),
pulled,
})),
}
}

/// Returns a [StreamTrait] implementation to poll values out of our Stream.
pub fn read(&self) -> StreamRead<T> {
StreamRead {
source: self.clone(),
index: 0,
}
}

pub async fn into_single(&self) -> SingleValue<T> {
let mut stream = self.read();
let Some(first) = stream.next().await else {
return SingleValue::None;
};

if stream.next().await.is_some() {
return SingleValue::Multiple;
}

SingleValue::Single(first)
}
}

pub enum SingleValue<T> {
/// The Stream did not hold a value.
None,

/// The Stream held multiple values.
Multiple,

/// The held only a single value.
Single(T),
}

impl<T: Clone, S: StreamTrait<Item = T> + Send + Unpin + 'static> From<S> for Stream<T> {
fn from(source: S) -> Self {
Self::new_open(vec![], source)
}
}

impl<T: Clone> Default for Stream<T> {
fn default() -> Self {
Self::new_closed(vec![])
}
}

impl<T: Clone + PartialEq> PartialEq for Stream<T> {
// A Stream is equal if it's the same internal pointer, or both streams are
// closed with equivalent values.
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.inner, &other.inner) || {
let left = self.inner.lock().unwrap();
let right = other.inner.lock().unwrap();

match (&*left, &*right) {
(
StreamState {
pulled: a,
source: None,
},
StreamState {
pulled: b,
source: None,
},
) => a == b,
_ => false,
}
}
}
}
impl<T: Clone + Eq> Eq for Stream<T> {}

impl<T: Clone + Serialize> Serialize for Stream<T> {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
use serde::ser::Error;
let lock = self.inner.lock().map_err(Error::custom)?;
match &*lock {
StreamState {
pulled,
source: None,
} => pulled.serialize(serializer),
_ => Err(Error::custom("cannot serialize open stream")),
}
}
}

impl<'de, T: Clone + Deserialize<'de>> Deserialize<'de> for Stream<T> {
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let data = <Vec<T>>::deserialize(deserializer)?;
Ok(Stream::new_closed(data))
}
}

impl<T: Clone + fmt::Debug> fmt::Debug for StreamState<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StreamState")
.field("pulled", &self.pulled)
.finish()
}
}

/// Implements [StreamTrait] over our Stream.
#[derive(Debug)]
pub struct StreamRead<T: Clone> {
index: usize,
source: Stream<T>,
}

impl<T: Clone> StreamTrait for StreamRead<T> {
type Item = T;

fn poll_next(self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let index = this.index;
let mut inner = this.source.inner.lock().unwrap();

if let Some(v) = inner.pulled.get(index) {
// If the current reader can be satisfied by a value we've already pulled, then
// just do that.
this.index += 1;
return Poll::Ready(Some(v.clone()));
};

let Some(source) = &mut inner.source else {
// If the source has been closed, there's nothing left to pull.
return Poll::Ready(None);
};

match source.poll_next_unpin(cx) {
// If the source stream is ready to give us a new value, we can immediately store that
// and return it to the caller. Any other readers will be able to read the value from
// the already-pulled data.
Poll::Ready(Some(v)) => {
this.index += 1;
inner.pulled.push(v.clone());
Poll::Ready(Some(v))
}
// If the source stream is finished, then we can transition to the closed state
// to drop the source stream.
Poll::Ready(None) => {
inner.source.take();
Poll::Ready(None)
}
// Else, we need to wait for the source stream to give us a new value. The
// source stream will be responsible for waking the TaskContext.
Poll::Pending => Poll::Pending,
}
}
}
2 changes: 1 addition & 1 deletion crates/turbo-tasks-fs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ dynamic_embed_contents = []
anyhow = { workspace = true }
auto-hash-map = { workspace = true }
bitflags = "1.3.2"
bytes = "1.1.0"
bytes = { workspace = true }
concurrent-queue = { workspace = true }
dashmap = { workspace = true }
dunce = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/turbo-tasks-fs/benches/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ fn bench_rope_iteration(c: &mut Criterion) {
}

let rope = root.build();
let _v = rope.read().into_iter().collect::<Vec<_>>();
let _v = rope.read().collect::<Vec<_>>();
})
},
);
Expand Down
Loading

0 comments on commit 252c958

Please sign in to comment.