From c34d0807f15e1add617bca20743fe25bc365e482 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 22 Feb 2023 10:41:05 +0100 Subject: [PATCH] feat: Remove Request.id from the protocol Since we now only issue a single request per stream this field was not used. Using it in the provider to identify requests would introduce complexity and overhead for the bookkeeping of request IDs. Instead remove the field from the protocol. The provider still has a use-case for identifying which request events relate to. It now uses the QUIC stream ID for this, but does not promise so on an API-level. This also simplifies the events since this new request_id is always known, while previously it was only known after having decoded the request message. --- src/get.rs | 2 +- src/protocol.rs | 2 -- src/provider.rs | 55 +++++++++++++++++++++---------------------------- 3 files changed, 25 insertions(+), 34 deletions(-) diff --git a/src/get.rs b/src/get.rs index 8f52f7f6a2..534538315e 100644 --- a/src/get.rs +++ b/src/get.rs @@ -157,7 +157,7 @@ where // 2. Send Request { debug!("sending request"); - let req = Request { id: 1, name: hash }; + let req = Request { name: hash }; let used = postcard::to_slice(&req, &mut out_buffer)?; write_lp(&mut writer, used).await?; diff --git a/src/protocol.rs b/src/protocol.rs index 954b1ed2d2..b42be55d9f 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -37,14 +37,12 @@ impl Handshake { #[derive(Deserialize, Serialize, Debug, PartialEq, Eq, Clone, MaxSize)] pub(crate) struct Request { - pub id: u64, /// blake3 hash pub name: Hash, } #[derive(Deserialize, Serialize, Debug, PartialEq, Eq, Clone)] pub(crate) struct Response { - pub id: u64, pub data: Res, } diff --git a/src/provider.rs b/src/provider.rs index 0af84b6a4f..2d1f7b5598 100644 --- a/src/provider.rs +++ b/src/provider.rs @@ -228,7 +228,7 @@ pub enum Event { RequestReceived { /// An unique connection id. connection_id: u64, - /// The request id. + /// An identifier uniquely identifying this transfer request. request_id: u64, /// The hash for which the client wants to receive data. hash: Hash, @@ -237,16 +237,15 @@ pub enum Event { TransferCompleted { /// An unique connection id. connection_id: u64, - /// The request id. + /// An identifier uniquely identifying this transfer request. request_id: u64, }, /// A request was aborted because the client disconnected. TransferAborted { /// The quic connection id. connection_id: u64, - /// The request id. When `None`, the transfer was aborted before or during reading and decoding - /// the transfer request. - request_id: Option, + /// An identifier uniquely identifying this request. + request_id: u64, }, } @@ -414,8 +413,6 @@ async fn transfer_collection( mut writer: quinn::SendStream, // Buffer used when writing to writer. buffer: &mut BytesMut, - // The id of the transfer request. - request_id: u64, // The bao outboard encoded data. outboard: &Bytes, // The actual blob data. @@ -441,7 +438,6 @@ async fn transfer_collection( write_response( &mut writer, buffer, - request_id, Res::FoundCollection { total_blobs_size: c.total_blobs_size, }, @@ -452,11 +448,10 @@ async fn transfer_collection( writer.write_buf(&mut data).await?; for (i, blob) in c.blobs.iter().enumerate() { debug!("writing blob {}/{}", i, c.blobs.len()); - let (status, writer1) = - send_blob(db.clone(), blob.hash, writer, buffer, request_id).await?; + let (status, writer1) = send_blob(db.clone(), blob.hash, writer, buffer).await?; writer = writer1; if SentStatus::NotFound == status { - write_response(&mut writer, buffer, request_id, Res::NotFound).await?; + write_response(&mut writer, buffer, Res::NotFound).await?; writer.finish().await?; return Ok(status); } @@ -466,11 +461,7 @@ async fn transfer_collection( Ok(SentStatus::Sent) } -fn notify_transfer_aborted( - events: broadcast::Sender, - connection_id: u64, - request_id: Option, -) { +fn notify_transfer_aborted(events: broadcast::Sender, connection_id: u64, request_id: u64) { let _ = events.send(Event::TransferAborted { connection_id, request_id, @@ -487,10 +478,14 @@ async fn handle_stream( let mut out_buffer = BytesMut::with_capacity(1024); let mut in_buffer = BytesMut::with_capacity(1024); + // The stream ID index is used to identify this request. Requests only arrive in + // bi-directional RecvStreams initiated by the client, so this uniquely identifies them. + let request_id = reader.id().index(); + // 1. Read Handshake debug!("reading handshake"); if let Err(e) = read_handshake(&mut reader, &mut in_buffer, token).await { - notify_transfer_aborted(events, connection_id, None); + notify_transfer_aborted(events, connection_id, request_id); return Err(e); } @@ -499,17 +494,17 @@ async fn handle_stream( let request = match read_request(reader, &mut in_buffer).await { Ok(r) => r, Err(e) => { - notify_transfer_aborted(events, connection_id, None); + notify_transfer_aborted(events, connection_id, request_id); return Err(e); } }; let hash = request.name; - debug!("got request({})", request.id); + debug!("got request for ({hash})"); let _ = events.send(Event::RequestReceived { connection_id, - request_id: request.id, hash, + request_id, }); // 4. Attempt to find hash @@ -518,8 +513,8 @@ async fn handle_stream( Some(BlobOrCollection::Collection(d)) => d, _ => { debug!("not found {}", hash); - notify_transfer_aborted(events, connection_id, Some(request.id)); - write_response(&mut writer, &mut out_buffer, request.id, Res::NotFound).await?; + notify_transfer_aborted(events, connection_id, request_id); + write_response(&mut writer, &mut out_buffer, Res::NotFound).await?; writer.finish().await?; return Ok(()); @@ -527,18 +522,18 @@ async fn handle_stream( }; // 5. Transfer data! - match transfer_collection(&db, writer, &mut out_buffer, request.id, outboard, data).await { + match transfer_collection(&db, writer, &mut out_buffer, outboard, data).await { Ok(SentStatus::Sent) => { let _ = events.send(Event::TransferCompleted { connection_id, - request_id: request.id, + request_id, }); } Ok(SentStatus::NotFound) => { - notify_transfer_aborted(events, connection_id, Some(request.id)); + notify_transfer_aborted(events, connection_id, request_id); } Err(e) => { - notify_transfer_aborted(events, connection_id, Some(request.id)); + notify_transfer_aborted(events, connection_id, request_id); return Err(e); } } @@ -558,7 +553,6 @@ async fn send_blob( name: Hash, mut writer: W, buffer: &mut BytesMut, - id: u64, ) -> Result<(SentStatus, W)> { match db.get(&name) { Some(BlobOrCollection::Blob(Data { @@ -566,7 +560,7 @@ async fn send_blob( path, size, })) => { - write_response(&mut writer, buffer, id, Res::Found).await?; + write_response(&mut writer, buffer, Res::Found).await?; let path = path.clone(); let outboard = outboard.clone(); let size = *size; @@ -590,7 +584,7 @@ async fn send_blob( Ok((SentStatus::Sent, writer)) } _ => { - write_response(&mut writer, buffer, id, Res::NotFound).await?; + write_response(&mut writer, buffer, Res::NotFound).await?; Ok((SentStatus::NotFound, writer)) } } @@ -766,10 +760,9 @@ pub async fn create_collection(data_sources: Vec) -> Result<(Databas async fn write_response( mut writer: W, buffer: &mut BytesMut, - id: u64, res: Res, ) -> Result<()> { - let response = Response { id, data: res }; + let response = Response { data: res }; // TODO: do not transfer blob data as part of the responses if buffer.len() < 1024 {