Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bandaid for intermittent hanging when hyper::body::to_bytes is called #80

Merged
merged 4 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,16 @@ Once you have installed the prerequisites, go to your enlistment's root director

This should build all of the libraries and executables.

### Tokio Console Support
### <a name="tokio-console-support">Tokio Console Support</a>

Ibeji has support for using the [tokio console](https://github.com/tokio-rs/console) for advanced debugging. To enable this support, you need to build with the `tokio_console` feature enabled and with the `tokio_unstable` config flag for the rust compiler:

```bash
RUSTFLAGS="--cfg tokio_unstable" cargo build --features tokio_console
```

Read the [tokio console documentation](https://github.com/tokio-rs/console) to learn how to install tokio console and how to run it.

Note that the tokio console will intercept trace-level logs, so these will not be visible when debugging with the tokio console.

## <a name="running-the-tests">Running the Tests</a>
Expand Down
1 change: 1 addition & 0 deletions core/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ edition = "2021"
license = "MIT"

[dependencies]
async-std = { workspace = true }
bytes = { workspace = true }
config = { workspace = true }
core-protobuf-data-access = { path = "../protobuf_data_access" }
Expand Down
37 changes: 27 additions & 10 deletions core/common/src/grpc_interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use dyn_clone::DynClone;
use futures_core::task::{Context, Poll};
use http::uri::Uri;
use http_body::Body;
use hyper::Method;
use log::warn;
use regex::Regex;
use std::error::Error;
Expand Down Expand Up @@ -154,19 +155,35 @@ where
let interceptor = self.interceptor.clone();

let (service_name, method_name) = Self::retrieve_grpc_names_from_uri(request.uri());
let is_applicable = interceptor.is_applicable(&service_name, &method_name);
let is_applicable = interceptor.is_applicable(&service_name, &method_name)
&& (request.method() == Method::POST);

if is_applicable && interceptor.must_handle_request() {
let (parts, body) = request.into_parts();
let mut body_bytes: Bytes =
match futures::executor::block_on(hyper::body::to_bytes(body)) {
Ok(bytes) => bytes,
Err(err) => {
return Box::pin(async move {
Err(Box::new(err) as Box<dyn std::error::Error + Sync + Send>)
})
}
};

// There is a known issue where hyper::body::to_bytes sometimes hangs in the code below.
// We will use a timeout to break out when this happens. This fix is a bandaid. We will
// implement a better fix after we have upgraded to the latest major version of the hyper crate.
let mut body_bytes: Bytes = match futures::executor::block_on(async {
async_std::future::timeout(
core::time::Duration::from_secs(5),
hyper::body::to_bytes(body),
)
.await
}) {
Ok(Ok(bytes)) => bytes,
Ok(Err(err)) => {
return Box::pin(async move {
Err(Box::new(err) as Box<dyn std::error::Error + Sync + Send>)
});
}
Err(err) => {
return Box::pin(async move {
Err(Box::new(err) as Box<dyn std::error::Error + Sync + Send>)
});
}
};

let protobuf_message_bytes: Bytes = body_bytes.split_off(GRPC_HEADER_LENGTH);
let grpc_header_bytes = body_bytes;
let new_protobuf_message_bytes: Bytes = match interceptor.handle_request(
Expand Down