Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add generic JSON-over-GRPC wrapper to examples/rest-grpc-multiplex #1082

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions examples/rest-grpc-multiplex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ publish = false
axum = { path = "../../axum" }
futures = "0.3"
hyper = { version = "0.14", features = ["full"] }
once_cell = "1.12"
prost = "0.10"
serde = {version = "1", features = ["derive"]}
serde_json = {version = "1"}
tokio = { version = "1", features = ["full"] }
tonic = { version = "0.7" }
tower = { version = "0.4", features = ["full"] }
Expand Down
5 changes: 4 additions & 1 deletion examples/rest-grpc-multiplex/build.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
fn main() {
tonic_build::compile_protos("proto/helloworld.proto").unwrap();
tonic_build::configure()
// make the gRPC message structs serializable to support the json_wrap_grpc function
.type_attribute(".", r#"#[derive(serde::Serialize, serde::Deserialize)]"#)
.compile(&["proto/helloworld.proto"], &["proto"]).unwrap();
}
61 changes: 54 additions & 7 deletions examples/rest-grpc-multiplex/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@
//! cd examples && cargo run -p example-rest-grpc-multiplex
//! ```

use self::multiplex_service::MultiplexService;
use axum::{routing::get, Router};
use self::multiplex_service::{GrpcErrorAsJson, MultiplexService};
use axum::{extract::Json, routing::get, Router};
use once_cell::sync::OnceCell;
use proto::{
greeter_server::{Greeter, GreeterServer},
HelloReply, HelloRequest,
};
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use tonic::{Response as TonicResponse, Status};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

Expand All @@ -20,8 +24,9 @@ mod proto {
tonic::include_proto!("helloworld");
}

#[derive(Default)]
struct GrpcServiceImpl {}
struct GrpcServiceImpl {
static_name: &'static str
}

#[tonic::async_trait]
impl Greeter for GrpcServiceImpl {
Expand All @@ -32,19 +37,59 @@ impl Greeter for GrpcServiceImpl {
tracing::info!("Got a request from {:?}", request.remote_addr());

let reply = HelloReply {
message: format!("Hello {}!", request.into_inner().name),
message: format!("Hello {}, my name is {}.", request.into_inner().name, self.static_name),
};

Ok(TonicResponse::new(reply))
}
}

/// axum Handler only takes one parameter (request),
/// but tokio impls take two parameters (&self + request).
/// tokio provides &self from an Arc stored in the service,
/// so create GRPC_SERVICE as a static, pass it to tokio's GreeterServer::from_arc
/// and statically reference it in the json_wrap_grpc.
/// This ensures that all endpoints reference the same service.
static GRPC_SERVICE: OnceCell<Arc<GrpcServiceImpl>> = OnceCell::new();

/// Given a gRPC RPC implementation function,
/// produce a closure that can be used as an axum Handler that:
/// 1. deserializes JSON into the request type
/// 2. calls the gRPC RPC implementation
/// 3. serializes the response back to JSON
fn json_wrap_grpc<'a, 'r, F, ReqT, ResT> (grpc_impl_func: F)
-> impl FnOnce(Json<ReqT>)
-> Pin<Box<dyn Future<Output = Result<Json<ResT>, GrpcErrorAsJson>> + Send + 'a>> + Clone + Send + Sized + 'static
where
F: FnOnce(&'r GrpcServiceImpl, tonic::Request<ReqT>)
-> Pin<Box<dyn Future<Output = Result<tonic::Response<ResT>, tonic::Status>> + Send + 'r>> + Clone + Send + Sync + 'static,
for<'de> ReqT: serde::Deserialize<'de> + Send + 'a,
ResT: serde::Serialize
Comment on lines +61 to +67
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate what you're trying to do but we cannot merge something this complicated as an example. That is just gonna confuse people. So unless you're able to find a simpler solution I don't think we can merge this.

Copy link
Author

@muusbolla muusbolla Jun 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I understand. Now that I've made the PR at least people who are looking to do this can find it from the discussion.

{
move |Json(req): Json<ReqT>| {
Box::pin((|Json(req): Json<ReqT>| async move {
let r = grpc_impl_func(GRPC_SERVICE.get().unwrap(), tonic::Request::new(req)).await;
match r {
Ok(r) => Ok(Json(r.into_inner())),
Err(e) => Err(GrpcErrorAsJson(e))
}
})(Json(req)))
}
}

async fn web_root() -> &'static str {
"Hello, World!"
}

#[tokio::main]
async fn main() {
match GRPC_SERVICE.set(Arc::new(GrpcServiceImpl {
static_name: "HAL 9000"
})) {
Ok(_) => {}
Err(_) => { panic!("GRPC_HANDLER created twice"); }
}

// initialize tracing
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new(
Expand All @@ -55,10 +100,12 @@ async fn main() {
.init();

// build the rest service
let rest = Router::new().route("/", get(web_root));
let rest = Router::new()
.route("/", get(web_root))
.route("/Hello", axum::routing::any(json_wrap_grpc(GrpcServiceImpl::say_hello)));

// build the grpc service
let grpc = GreeterServer::new(GrpcServiceImpl::default());
let grpc = GreeterServer::from_arc(GRPC_SERVICE.get().unwrap().clone());

// combine them into one service
let service = MultiplexService::new(rest, grpc);
Expand Down
49 changes: 49 additions & 0 deletions examples/rest-grpc-multiplex/src/multiplex_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,52 @@ fn is_grpc_request<B>(req: &Request<B>) -> bool {
.filter(|content_type| content_type.starts_with(b"application/grpc"))
.is_some()
}

// wrapper type to convert GRPC errors to HTTP responses with JSON body
pub struct GrpcErrorAsJson(pub tonic::Status);

#[derive(serde::Serialize)]
struct GrpcStatus<'a> {
grpc_error_code: i32,
grpc_error_description: &'a str,
message: &'a str
}

impl axum::response::IntoResponse for GrpcErrorAsJson {
fn into_response(self) -> axum::response::Response {
let json_status = GrpcStatus {
grpc_error_code: self.0.code() as i32,
grpc_error_description: self.0.code().description(),
message: self.0.message()
};
let response_body = serde_json::to_string(&json_status).unwrap();

// https://chromium.googlesource.com/external/github.com/grpc/grpc/+/refs/tags/v1.21.4-pre1/doc/statuscodes.md
let code = match self.0.code() {
tonic::Code::Ok => hyper::StatusCode::OK,
tonic::Code::Cancelled => hyper::StatusCode::from_u16(499u16).unwrap(),
tonic::Code::Unknown => hyper::StatusCode::INTERNAL_SERVER_ERROR,
tonic::Code::InvalidArgument => hyper::StatusCode::BAD_REQUEST,
tonic::Code::DeadlineExceeded => hyper::StatusCode::GATEWAY_TIMEOUT,
tonic::Code::NotFound => hyper::StatusCode::NOT_FOUND,
tonic::Code::AlreadyExists => hyper::StatusCode::CONFLICT,
tonic::Code::PermissionDenied => hyper::StatusCode::FORBIDDEN,
tonic::Code::ResourceExhausted => hyper::StatusCode::TOO_MANY_REQUESTS,
tonic::Code::FailedPrecondition => hyper::StatusCode::BAD_REQUEST,
tonic::Code::Aborted => hyper::StatusCode::CONFLICT,
tonic::Code::OutOfRange => hyper::StatusCode::BAD_REQUEST,
tonic::Code::Unimplemented => hyper::StatusCode::NOT_IMPLEMENTED,
tonic::Code::Internal => hyper::StatusCode::INTERNAL_SERVER_ERROR,
tonic::Code::Unavailable => hyper::StatusCode::SERVICE_UNAVAILABLE,
tonic::Code::DataLoss => hyper::StatusCode::INTERNAL_SERVER_ERROR,
tonic::Code::Unauthenticated => hyper::StatusCode::UNAUTHORIZED
};

let mut response = (code, response_body).into_response();
response.headers_mut().insert(
hyper::header::CONTENT_TYPE,
hyper::header::HeaderValue::from_static("application/json"),
);
response
}
}