diff --git a/src/get.rs b/src/get.rs index 8f52f7f6a2..534538315e 100644 --- a/src/get.rs +++ b/src/get.rs @@ -157,7 +157,7 @@ where // 2. Send Request { debug!("sending request"); - let req = Request { id: 1, name: hash }; + let req = Request { name: hash }; let used = postcard::to_slice(&req, &mut out_buffer)?; write_lp(&mut writer, used).await?; diff --git a/src/protocol.rs b/src/protocol.rs index 954b1ed2d2..b42be55d9f 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -37,14 +37,12 @@ impl Handshake { #[derive(Deserialize, Serialize, Debug, PartialEq, Eq, Clone, MaxSize)] pub(crate) struct Request { - pub id: u64, /// blake3 hash pub name: Hash, } #[derive(Deserialize, Serialize, Debug, PartialEq, Eq, Clone)] pub(crate) struct Response { - pub id: u64, pub data: Res, } diff --git a/src/provider.rs b/src/provider.rs index 0af84b6a4f..2d1f7b5598 100644 --- a/src/provider.rs +++ b/src/provider.rs @@ -228,7 +228,7 @@ pub enum Event { RequestReceived { /// An unique connection id. connection_id: u64, - /// The request id. + /// An identifier uniquely identifying this transfer request. request_id: u64, /// The hash for which the client wants to receive data. hash: Hash, @@ -237,16 +237,15 @@ pub enum Event { TransferCompleted { /// An unique connection id. connection_id: u64, - /// The request id. + /// An identifier uniquely identifying this transfer request. request_id: u64, }, /// A request was aborted because the client disconnected. TransferAborted { /// The quic connection id. connection_id: u64, - /// The request id. When `None`, the transfer was aborted before or during reading and decoding - /// the transfer request. - request_id: Option, + /// An identifier uniquely identifying this request. + request_id: u64, }, } @@ -414,8 +413,6 @@ async fn transfer_collection( mut writer: quinn::SendStream, // Buffer used when writing to writer. buffer: &mut BytesMut, - // The id of the transfer request. - request_id: u64, // The bao outboard encoded data. outboard: &Bytes, // The actual blob data. @@ -441,7 +438,6 @@ async fn transfer_collection( write_response( &mut writer, buffer, - request_id, Res::FoundCollection { total_blobs_size: c.total_blobs_size, }, @@ -452,11 +448,10 @@ async fn transfer_collection( writer.write_buf(&mut data).await?; for (i, blob) in c.blobs.iter().enumerate() { debug!("writing blob {}/{}", i, c.blobs.len()); - let (status, writer1) = - send_blob(db.clone(), blob.hash, writer, buffer, request_id).await?; + let (status, writer1) = send_blob(db.clone(), blob.hash, writer, buffer).await?; writer = writer1; if SentStatus::NotFound == status { - write_response(&mut writer, buffer, request_id, Res::NotFound).await?; + write_response(&mut writer, buffer, Res::NotFound).await?; writer.finish().await?; return Ok(status); } @@ -466,11 +461,7 @@ async fn transfer_collection( Ok(SentStatus::Sent) } -fn notify_transfer_aborted( - events: broadcast::Sender, - connection_id: u64, - request_id: Option, -) { +fn notify_transfer_aborted(events: broadcast::Sender, connection_id: u64, request_id: u64) { let _ = events.send(Event::TransferAborted { connection_id, request_id, @@ -487,10 +478,14 @@ async fn handle_stream( let mut out_buffer = BytesMut::with_capacity(1024); let mut in_buffer = BytesMut::with_capacity(1024); + // The stream ID index is used to identify this request. Requests only arrive in + // bi-directional RecvStreams initiated by the client, so this uniquely identifies them. + let request_id = reader.id().index(); + // 1. Read Handshake debug!("reading handshake"); if let Err(e) = read_handshake(&mut reader, &mut in_buffer, token).await { - notify_transfer_aborted(events, connection_id, None); + notify_transfer_aborted(events, connection_id, request_id); return Err(e); } @@ -499,17 +494,17 @@ async fn handle_stream( let request = match read_request(reader, &mut in_buffer).await { Ok(r) => r, Err(e) => { - notify_transfer_aborted(events, connection_id, None); + notify_transfer_aborted(events, connection_id, request_id); return Err(e); } }; let hash = request.name; - debug!("got request({})", request.id); + debug!("got request for ({hash})"); let _ = events.send(Event::RequestReceived { connection_id, - request_id: request.id, hash, + request_id, }); // 4. Attempt to find hash @@ -518,8 +513,8 @@ async fn handle_stream( Some(BlobOrCollection::Collection(d)) => d, _ => { debug!("not found {}", hash); - notify_transfer_aborted(events, connection_id, Some(request.id)); - write_response(&mut writer, &mut out_buffer, request.id, Res::NotFound).await?; + notify_transfer_aborted(events, connection_id, request_id); + write_response(&mut writer, &mut out_buffer, Res::NotFound).await?; writer.finish().await?; return Ok(()); @@ -527,18 +522,18 @@ async fn handle_stream( }; // 5. Transfer data! - match transfer_collection(&db, writer, &mut out_buffer, request.id, outboard, data).await { + match transfer_collection(&db, writer, &mut out_buffer, outboard, data).await { Ok(SentStatus::Sent) => { let _ = events.send(Event::TransferCompleted { connection_id, - request_id: request.id, + request_id, }); } Ok(SentStatus::NotFound) => { - notify_transfer_aborted(events, connection_id, Some(request.id)); + notify_transfer_aborted(events, connection_id, request_id); } Err(e) => { - notify_transfer_aborted(events, connection_id, Some(request.id)); + notify_transfer_aborted(events, connection_id, request_id); return Err(e); } } @@ -558,7 +553,6 @@ async fn send_blob( name: Hash, mut writer: W, buffer: &mut BytesMut, - id: u64, ) -> Result<(SentStatus, W)> { match db.get(&name) { Some(BlobOrCollection::Blob(Data { @@ -566,7 +560,7 @@ async fn send_blob( path, size, })) => { - write_response(&mut writer, buffer, id, Res::Found).await?; + write_response(&mut writer, buffer, Res::Found).await?; let path = path.clone(); let outboard = outboard.clone(); let size = *size; @@ -590,7 +584,7 @@ async fn send_blob( Ok((SentStatus::Sent, writer)) } _ => { - write_response(&mut writer, buffer, id, Res::NotFound).await?; + write_response(&mut writer, buffer, Res::NotFound).await?; Ok((SentStatus::NotFound, writer)) } } @@ -766,10 +760,9 @@ pub async fn create_collection(data_sources: Vec) -> Result<(Databas async fn write_response( mut writer: W, buffer: &mut BytesMut, - id: u64, res: Res, ) -> Result<()> { - let response = Response { id, data: res }; + let response = Response { data: res }; // TODO: do not transfer blob data as part of the responses if buffer.len() < 1024 {