diff --git a/tonic-web/src/call.rs b/tonic-web/src/call.rs index 353e644e5..b9fd628a5 100644 --- a/tonic-web/src/call.rs +++ b/tonic-web/src/call.rs @@ -245,36 +245,41 @@ where let mut me = self.as_mut(); loop { - let buf = ready!(me.as_mut().poll_decode(cx)); - - return if let Some(Ok(incoming_buf)) = buf { - let buf = &mut me.as_mut().project().buf; - - buf.put(incoming_buf); - - match find_trailers(&buf[..]) { - FindTrailers::Trailer(len) => { - // Extract up to len of where the trailers are at - let msg_buf = buf.copy_to_bytes(len); - match decode_trailers_frame(buf.split().freeze()) { - Ok(Some(trailers)) => { - self.project().trailers.replace(trailers); - } - Err(e) => return Poll::Ready(Some(Err(e))), - _ => {} - } + let incoming_buf = match ready!(me.as_mut().poll_decode(cx)) { + Some(Ok(incoming_buf)) => incoming_buf, + None => { + // TODO: Consider eofing here? + // Even if the buffer has more data, this will hit the eof branch + // of decode in tonic + return Poll::Ready(None); + } + Some(Err(e)) => return Poll::Ready(Some(Err(e))), + }; + + let buf = &mut me.as_mut().project().buf; - if msg_buf.has_remaining() { - return Poll::Ready(Some(Ok(msg_buf))); - } else { - return Poll::Ready(None); + buf.put(incoming_buf); + + return match find_trailers(&buf[..])? { + FindTrailers::Trailer(len) => { + // Extract up to len of where the trailers are at + let msg_buf = buf.copy_to_bytes(len); + match decode_trailers_frame(buf.split().freeze()) { + Ok(Some(trailers)) => { + self.project().trailers.replace(trailers); } + Err(e) => return Poll::Ready(Some(Err(e))), + _ => {} + } + + if msg_buf.has_remaining() { + Poll::Ready(Some(Ok(msg_buf))) + } else { + Poll::Ready(None) } - FindTrailers::IncompleteBuf => continue, - FindTrailers::Done => Poll::Ready(Some(Ok(buf.split().freeze()))), } - } else { - Poll::Ready(buf) + FindTrailers::IncompleteBuf => continue, + FindTrailers::Done(len) => Poll::Ready(Some(Ok(buf.split_to(len).freeze()))), }; } } @@ -421,7 +426,7 @@ fn make_trailers_frame(trailers: HeaderMap) -> Vec { /// its location in the original buf. If `None` is returned we did /// not find a trailers in this buffer either because its incomplete /// or the buffer jsut contained grpc message frames. -fn find_trailers(buf: &[u8]) -> FindTrailers { +fn find_trailers(buf: &[u8]) -> Result { let mut len = 0; let mut temp_buf = &buf[..]; @@ -429,13 +434,17 @@ fn find_trailers(buf: &[u8]) -> FindTrailers { // To check each frame, there must be at least GRPC_HEADER_SIZE // amount of bytes available otherwise the buffer is incomplete. if temp_buf.is_empty() || temp_buf.len() < GRPC_HEADER_SIZE { - return FindTrailers::Done; + return Ok(FindTrailers::Done(len)); } let header = temp_buf.get_u8(); if header == GRPC_WEB_TRAILERS_BIT { - return FindTrailers::Trailer(len); + return Ok(FindTrailers::Trailer(len)); + } + + if !(header == 0 || header == 1) { + return Err(Status::internal("Invalid header bit {} expected 0 or 1")); } let msg_len = temp_buf.get_u32(); @@ -445,7 +454,7 @@ fn find_trailers(buf: &[u8]) -> FindTrailers { // If the msg len of a non-grpc-web trailer frame is larger than // the overall buffer we know within that buffer there are no trailers. if len > buf.len() { - return FindTrailers::IncompleteBuf; + return Ok(FindTrailers::IncompleteBuf); } temp_buf = &buf[len as usize..]; @@ -456,11 +465,13 @@ fn find_trailers(buf: &[u8]) -> FindTrailers { enum FindTrailers { Trailer(usize), IncompleteBuf, - Done, + Done(usize), } #[cfg(test)] mod tests { + use tonic::Code; + use super::*; #[test] @@ -507,7 +518,7 @@ mod tests { 128, 0, 0, 0, 15, 103, 114, 112, 99, 45, 115, 116, 97, 116, 117, 115, 58, 48, 13, 10, ]; - let out = find_trailers(&buf[..]); + let out = find_trailers(&buf[..]).unwrap(); assert_eq!(out, FindTrailers::Trailer(0)); } @@ -524,7 +535,7 @@ mod tests { 15, 103, 114, 112, 99, 45, 115, 116, 97, 116, 117, 115, 58, 48, 13, 10, ]; - let out = find_trailers(&buf[..]); + let out = find_trailers(&buf[..]).unwrap(); assert_eq!(out, FindTrailers::Trailer(81)); @@ -566,8 +577,17 @@ mod tests { 105, 116, 116, 101, 110, 32, 98, 121, 32, ]; - let out = find_trailers(&buf[..]); + let out = find_trailers(&buf[..]).unwrap(); assert_eq!(out, FindTrailers::IncompleteBuf); } + + #[test] + #[ignore] + fn find_trailers_buffered_incomplete_buf_bug() { + let buf = std::fs::read("tests/incomplete-buf-bug.bin").unwrap(); + let out = find_trailers(&buf[..]).unwrap_err(); + + assert_eq!(out.code(), Code::Internal); + } } diff --git a/tonic-web/tests/incomplete-buf-bug.bin b/tonic-web/tests/incomplete-buf-bug.bin new file mode 100644 index 000000000..19cb7417b Binary files /dev/null and b/tonic-web/tests/incomplete-buf-bug.bin differ diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index 4c1068271..38644b6e5 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -253,7 +253,7 @@ impl StreamingInner { } else { // FIXME: improve buf usage. if self.buf.has_remaining() { - trace!("unexpected EOF decoding stream"); + trace!("unexpected EOF decoding stream, state: {:?}", self.state); Err(Status::new( Code::Internal, "Unexpected EOF decoding stream.".to_string(),