diff --git a/src/lib.rs b/src/lib.rs index 528e51c08e..574e07b3a4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -56,6 +56,17 @@ mod tests { transfer_random_data(file_opts).await } + #[tokio::test] + async fn many_files() -> Result<()> { + let num_files = [10, 100, 1000, 10000]; + for num in num_files { + println!("NUM_FILES: {}", num); + let file_opts = (0..num).map(|i| (i.to_string(), 10)).collect(); + transfer_random_data(file_opts).await?; + } + Ok(()) + } + #[tokio::test] async fn sizes() -> Result<()> { let sizes = [ diff --git a/src/protocol.rs b/src/protocol.rs index 1210e374e9..99142ff045 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -40,12 +40,12 @@ pub(crate) struct Request { pub name: Hash, } -#[derive(Deserialize, Serialize, Debug, PartialEq, Eq, Clone)] +#[derive(Deserialize, Serialize, Debug, PartialEq, Eq, Clone, MaxSize)] pub(crate) struct Response { pub data: Res, } -#[derive(Deserialize, Serialize, Debug, PartialEq, Eq, Clone)] +#[derive(Deserialize, Serialize, Debug, PartialEq, Eq, Clone, MaxSize)] pub(crate) enum Res { NotFound, // If found, a stream of bao data is sent as next message. diff --git a/src/provider.rs b/src/provider.rs index 4852072ed6..3f8d26e297 100644 --- a/src/provider.rs +++ b/src/provider.rs @@ -24,6 +24,7 @@ use anyhow::{ensure, Context, Result}; use bytes::{Bytes, BytesMut}; use futures::future; use futures::Stream; +use postcard::experimental::max_size::MaxSize; use quic_rpc::server::RpcChannel; use quic_rpc::transport::flume::FlumeConnection; use quic_rpc::transport::misc::DummyServerEndpoint; @@ -922,7 +923,6 @@ async fn create_collection_inner( let mut db = HashMap::with_capacity(data_sources.len() + 1); let mut blobs = Vec::with_capacity(data_sources.len()); let mut total_blobs_size: u64 = 0; - let mut blobs_encoded_size_estimate = 0; // compute outboards in parallel, using tokio's blocking thread pool let outboards = data_sources.into_iter().map(|data| { @@ -974,7 +974,6 @@ async fn create_collection_inner( .unwrap_or_default() .to_string() }); - blobs_encoded_size_estimate += name.len() + 32; blobs.push(Blob { name, hash }); } @@ -983,14 +982,8 @@ async fn create_collection_inner( blobs, total_blobs_size, }; - blobs_encoded_size_estimate += c.name.len(); - - // NOTE: we can't use the postcard::MaxSize to estimate the encoding buffer size - // because the Collection and Blobs have `String` fields. - // So instead, we are tracking the filename + hash sizes of each blob, plus an extra 1024 - // to account for any postcard encoding data. - let mut buffer = BytesMut::zeroed(blobs_encoded_size_estimate + 1024); - let data = postcard::to_slice(&c, &mut buffer)?; + + let data = postcard::to_stdvec(&c).context("blob encoding")?; let (outboard, hash) = abao::encode::outboard(&data); let hash = Hash::from(hash); db.insert( @@ -1009,8 +1002,8 @@ async fn write_response( let response = Response { data: res }; // TODO: do not transfer blob data as part of the responses - if buffer.len() < 1024 { - buffer.resize(1024, 0u8); + if buffer.len() < Response::POSTCARD_MAX_SIZE { + buffer.resize(Response::POSTCARD_MAX_SIZE, 0u8); } let used = postcard::to_slice(&response, buffer)?;