Skip to content

Commit

Permalink
fix(codec): Cancelled client streaming handling
Browse files Browse the repository at this point in the history
This PR fixes how client side streaming is handled on the server side
and improves overall source error matching.

Fixes:

- Correctly, detect h2 codes when its wrapped in a hyper error.
- Cancelled requests from the client side during client streaming
  requests correctly return EOF (`None` from `Streaming::message()`)

Closes #848
  • Loading branch information
LucioFranco committed Mar 13, 2023
1 parent 99eae57 commit c74d280
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 8 deletions.
8 changes: 6 additions & 2 deletions tonic/src/codec/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ enum State {
Error,
}

#[derive(Debug)]
#[derive(Debug, PartialEq, Eq)]
enum Direction {
Request,
Response(StatusCode),
Expand Down Expand Up @@ -229,9 +229,13 @@ impl StreamingInner {

// Returns Some(()) if data was found or None if the loop in `poll_next` should break
fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<()>, Status>> {
let chunk = match ready!(Pin::new(&mut self.body).poll_data(cx)) {
let chunk = match dbg!(ready!(Pin::new(&mut self.body).poll_data(cx))) {
Some(Ok(d)) => Some(d),
Some(Err(e)) => {
if self.direction == Direction::Request && e.code() == Code::Cancelled {
return Poll::Ready(Ok(None));
}

let _ = std::mem::replace(&mut self.state, State::Error);
let err: crate::Error = e.into();
debug!("decoder inner stream error: {:?}", err);
Expand Down
25 changes: 19 additions & 6 deletions tonic/src/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,17 @@ impl Status {
// FIXME: bubble this into `transport` and expose generic http2 reasons.
#[cfg(feature = "transport")]
fn from_h2_error(err: Box<h2::Error>) -> Status {
let code = Self::code_from_h2(&err);

let mut status = Self::new(code, format!("h2 protocol error: {}", err));
status.source = Some(Arc::new(*err));
status
}

#[cfg(feature = "transport")]
fn code_from_h2(err: &h2::Error) -> Code {
// See https://github.com/grpc/grpc/blob/3977c30/doc/PROTOCOL-HTTP2.md#errors
let code = match err.reason() {
match err.reason() {
Some(h2::Reason::NO_ERROR)
| Some(h2::Reason::PROTOCOL_ERROR)
| Some(h2::Reason::INTERNAL_ERROR)
Expand All @@ -376,11 +385,7 @@ impl Status {
Some(h2::Reason::INADEQUATE_SECURITY) => Code::PermissionDenied,

_ => Code::Unknown,
};

let mut status = Self::new(code, format!("h2 protocol error: {}", err));
status.source = Some(Arc::new(*err));
status
}
}

#[cfg(feature = "transport")]
Expand Down Expand Up @@ -416,6 +421,14 @@ impl Status {
if err.is_timeout() || err.is_connect() {
return Some(Status::unavailable(err.to_string()));
}

if let Some(h2_err) = err.source().and_then(|e| e.downcast_ref::<h2::Error>()) {
let code = Status::code_from_h2(&h2_err);
let status = Self::new(code, format!("h2 protocol error: {}", err));

return Some(status);
}

None
}

Expand Down

0 comments on commit c74d280

Please sign in to comment.