Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Remove Request.id from the protocol #782

Merged
merged 1 commit into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ where
// 2. Send Request
{
debug!("sending request");
let req = Request { id: 1, name: hash };
let req = Request { name: hash };

let used = postcard::to_slice(&req, &mut out_buffer)?;
write_lp(&mut writer, used).await?;
Expand Down
2 changes: 0 additions & 2 deletions src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,12 @@ impl Handshake {

#[derive(Deserialize, Serialize, Debug, PartialEq, Eq, Clone, MaxSize)]
pub(crate) struct Request {
pub id: u64,
/// blake3 hash
pub name: Hash,
}

#[derive(Deserialize, Serialize, Debug, PartialEq, Eq, Clone)]
pub(crate) struct Response {
pub id: u64,
pub data: Res,
}

Expand Down
55 changes: 24 additions & 31 deletions src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ pub enum Event {
RequestReceived {
/// An unique connection id.
connection_id: u64,
/// The request id.
/// An identifier uniquely identifying this transfer request.
request_id: u64,
/// The hash for which the client wants to receive data.
hash: Hash,
Expand All @@ -237,16 +237,15 @@ pub enum Event {
TransferCompleted {
/// An unique connection id.
connection_id: u64,
/// The request id.
/// An identifier uniquely identifying this transfer request.
request_id: u64,
},
/// A request was aborted because the client disconnected.
TransferAborted {
/// The quic connection id.
connection_id: u64,
/// The request id. When `None`, the transfer was aborted before or during reading and decoding
/// the transfer request.
request_id: Option<u64>,
/// An identifier uniquely identifying this request.
request_id: u64,
},
}

Expand Down Expand Up @@ -414,8 +413,6 @@ async fn transfer_collection(
mut writer: quinn::SendStream,
// Buffer used when writing to writer.
buffer: &mut BytesMut,
// The id of the transfer request.
request_id: u64,
// The bao outboard encoded data.
outboard: &Bytes,
// The actual blob data.
Expand All @@ -441,7 +438,6 @@ async fn transfer_collection(
write_response(
&mut writer,
buffer,
request_id,
Res::FoundCollection {
total_blobs_size: c.total_blobs_size,
},
Expand All @@ -452,11 +448,10 @@ async fn transfer_collection(
writer.write_buf(&mut data).await?;
for (i, blob) in c.blobs.iter().enumerate() {
debug!("writing blob {}/{}", i, c.blobs.len());
let (status, writer1) =
send_blob(db.clone(), blob.hash, writer, buffer, request_id).await?;
let (status, writer1) = send_blob(db.clone(), blob.hash, writer, buffer).await?;
writer = writer1;
if SentStatus::NotFound == status {
write_response(&mut writer, buffer, request_id, Res::NotFound).await?;
write_response(&mut writer, buffer, Res::NotFound).await?;
writer.finish().await?;
return Ok(status);
}
Expand All @@ -466,11 +461,7 @@ async fn transfer_collection(
Ok(SentStatus::Sent)
}

fn notify_transfer_aborted(
events: broadcast::Sender<Event>,
connection_id: u64,
request_id: Option<u64>,
) {
fn notify_transfer_aborted(events: broadcast::Sender<Event>, connection_id: u64, request_id: u64) {
let _ = events.send(Event::TransferAborted {
connection_id,
request_id,
Expand All @@ -487,10 +478,14 @@ async fn handle_stream(
let mut out_buffer = BytesMut::with_capacity(1024);
let mut in_buffer = BytesMut::with_capacity(1024);

// The stream ID index is used to identify this request. Requests only arrive in
// bi-directional RecvStreams initiated by the client, so this uniquely identifies them.
let request_id = reader.id().index();

// 1. Read Handshake
debug!("reading handshake");
if let Err(e) = read_handshake(&mut reader, &mut in_buffer, token).await {
notify_transfer_aborted(events, connection_id, None);
notify_transfer_aborted(events, connection_id, request_id);
return Err(e);
}

Expand All @@ -499,17 +494,17 @@ async fn handle_stream(
let request = match read_request(reader, &mut in_buffer).await {
Ok(r) => r,
Err(e) => {
notify_transfer_aborted(events, connection_id, None);
notify_transfer_aborted(events, connection_id, request_id);
return Err(e);
}
};

let hash = request.name;
debug!("got request({})", request.id);
debug!("got request for ({hash})");
let _ = events.send(Event::RequestReceived {
connection_id,
request_id: request.id,
hash,
request_id,
});

// 4. Attempt to find hash
Expand All @@ -518,27 +513,27 @@ async fn handle_stream(
Some(BlobOrCollection::Collection(d)) => d,
_ => {
debug!("not found {}", hash);
notify_transfer_aborted(events, connection_id, Some(request.id));
write_response(&mut writer, &mut out_buffer, request.id, Res::NotFound).await?;
notify_transfer_aborted(events, connection_id, request_id);
write_response(&mut writer, &mut out_buffer, Res::NotFound).await?;
writer.finish().await?;

return Ok(());
}
};

// 5. Transfer data!
match transfer_collection(&db, writer, &mut out_buffer, request.id, outboard, data).await {
match transfer_collection(&db, writer, &mut out_buffer, outboard, data).await {
Ok(SentStatus::Sent) => {
let _ = events.send(Event::TransferCompleted {
connection_id,
request_id: request.id,
request_id,
});
}
Ok(SentStatus::NotFound) => {
notify_transfer_aborted(events, connection_id, Some(request.id));
notify_transfer_aborted(events, connection_id, request_id);
}
Err(e) => {
notify_transfer_aborted(events, connection_id, Some(request.id));
notify_transfer_aborted(events, connection_id, request_id);
return Err(e);
}
}
Expand All @@ -558,15 +553,14 @@ async fn send_blob<W: AsyncWrite + Unpin + Send + 'static>(
name: Hash,
mut writer: W,
buffer: &mut BytesMut,
id: u64,
) -> Result<(SentStatus, W)> {
match db.get(&name) {
Some(BlobOrCollection::Blob(Data {
outboard,
path,
size,
})) => {
write_response(&mut writer, buffer, id, Res::Found).await?;
write_response(&mut writer, buffer, Res::Found).await?;
let path = path.clone();
let outboard = outboard.clone();
let size = *size;
Expand All @@ -590,7 +584,7 @@ async fn send_blob<W: AsyncWrite + Unpin + Send + 'static>(
Ok((SentStatus::Sent, writer))
}
_ => {
write_response(&mut writer, buffer, id, Res::NotFound).await?;
write_response(&mut writer, buffer, Res::NotFound).await?;
Ok((SentStatus::NotFound, writer))
}
}
Expand Down Expand Up @@ -766,10 +760,9 @@ pub async fn create_collection(data_sources: Vec<DataSource>) -> Result<(Databas
async fn write_response<W: AsyncWrite + Unpin>(
mut writer: W,
buffer: &mut BytesMut,
id: u64,
res: Res,
) -> Result<()> {
let response = Response { id, data: res };
let response = Response { data: res };

// TODO: do not transfer blob data as part of the responses
if buffer.len() < 1024 {
Expand Down