Skip to content

Commit

Permalink
feat(transport): Make transport server and channel independent (#1630)
Browse files Browse the repository at this point in the history
* feat(transport): Make transport server and channel independent

* chore(channel): Move BoxFuture to channel module

* chore(tls): Move server part of tls service to server module

* chore(server): Move io service to server module
  • Loading branch information
tottoto authored Jun 20, 2024
1 parent 58b4443 commit 654289f
Show file tree
Hide file tree
Showing 20 changed files with 142 additions and 129 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ jobs:
with:
tool: protoc@${{ env.PROTOC_VERSION }}
- uses: Swatinem/rust-cache@v2
- run: cargo hack udeps --workspace --each-feature ${{ matrix.option }}
- run: cargo hack udeps --workspace --exclude-features tls --each-feature ${{ matrix.option }}
- run: cargo udeps --package tonic --features tls,transport
- run: cargo udeps --package tonic --features tls,server
- run: cargo udeps --package tonic --features tls,channel

check:
runs-on: ${{ matrix.os }}
Expand All @@ -81,6 +84,8 @@ jobs:
- uses: Swatinem/rust-cache@v2
- name: Check features
run: cargo hack check --workspace --no-private --each-feature --no-dev-deps
- name: Check tonic feature powerset
run: cargo hack check --package tonic --feature-powerset --depth 2
- name: Check all targets
run: cargo check --workspace --all-targets --all-features

Expand Down
18 changes: 10 additions & 8 deletions tonic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,31 @@ version = "0.11.0"
codegen = ["dep:async-trait"]
gzip = ["dep:flate2"]
zstd = ["dep:zstd"]
default = ["channel", "codegen", "prost"]
default = ["transport", "codegen", "prost"]
prost = ["dep:prost"]
tls = ["dep:rustls-pemfile", "transport", "dep:tokio-rustls", "dep:tokio", "tokio?/rt", "tokio?/macros"]
tls = ["dep:rustls-pemfile", "dep:tokio-rustls", "dep:tokio", "tokio?/rt", "tokio?/macros"]
tls-roots = ["tls-roots-common", "dep:rustls-native-certs"]
tls-roots-common = ["tls", "channel"]
tls-webpki-roots = ["tls-roots-common", "dep:webpki-roots"]
router = ["dep:axum"]
transport = [
server = [
"router",
"dep:async-stream",
"dep:h2",
"dep:hyper", "dep:hyper-util",
"dep:hyper", "hyper?/server",
"dep:hyper-util", "hyper-util?/service", "hyper-util?/server-auto",
"dep:socket2",
"dep:tokio", "tokio?/macros", "tokio?/net", "tokio?/time",
"dep:tower", "tower?/util", "tower?/limit",
]
channel = [
"transport",
"dep:hyper", "hyper?/client",
"dep:hyper-util", "hyper-util?/client-legacy",
"dep:tower", "tower?/balance", "tower?/buffer", "tower?/discover", "tower?/load", "tower?/make",
"dep:tower", "tower?/balance", "tower?/buffer", "tower?/discover", "tower?/limit",
"dep:tokio", "tokio?/time",
"dep:hyper-timeout",
]
transport = ["server", "channel"]

# [[bench]]
# name = "bench_main"
Expand Down Expand Up @@ -76,8 +78,8 @@ async-trait = {version = "0.1.13", optional = true}
# transport
async-stream = {version = "0.3", optional = true}
h2 = {version = "0.4", optional = true}
hyper = {version = "1", features = ["http1", "http2", "server"], optional = true}
hyper-util = { version = ">=0.1.4, <0.2", features = ["service", "server-auto", "tokio"], optional = true }
hyper = {version = "1", features = ["http1", "http2"], optional = true}
hyper-util = { version = ">=0.1.4, <0.2", features = ["tokio"], optional = true }
socket2 = { version = ">=0.4.7, <0.6.0", optional = true, features = ["all"] }
tokio = {version = "1", default-features = false, optional = true}
tokio-stream = { version = "0.1", features = ["net"] }
Expand Down
9 changes: 5 additions & 4 deletions tonic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
//!
//! # Feature Flags
//!
//! - `transport`: Enables just the full featured server portion of the `channel` feature.
//! - `channel`: Enables the fully featured, batteries included client and server
//! - `transport`: Enables the fully featured, batteries included client and server
//! implementation based on [`hyper`], [`tower`] and [`tokio`]. Enabled by default.
//! - `server`: Enables just the full featured server portion of the `transport` feature.
//! - `channel`: Enables just the full featured channel portion of the `transport` feature.
//! - `codegen`: Enables all the required exports and optional dependencies required
//! for [`tonic-build`]. Enabled by default.
//! - `tls`: Enables the `rustls` based TLS options for the `transport` feature. Not
Expand Down Expand Up @@ -100,8 +101,8 @@ pub mod metadata;
pub mod server;
pub mod service;

#[cfg(feature = "transport")]
#[cfg_attr(docsrs, doc(cfg(feature = "transport")))]
#[cfg(any(feature = "server", feature = "channel"))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "server", feature = "channel"))))]
pub mod transport;

mod extensions;
Expand Down
22 changes: 11 additions & 11 deletions tonic/src/request.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use crate::metadata::{MetadataMap, MetadataValue};
#[cfg(feature = "transport")]
#[cfg(feature = "server")]
use crate::transport::server::TcpConnectInfo;
#[cfg(feature = "tls")]
#[cfg(all(feature = "server", feature = "tls"))]
use crate::transport::server::TlsConnectInfo;
use http::Extensions;
#[cfg(feature = "transport")]
#[cfg(feature = "server")]
use std::net::SocketAddr;
#[cfg(feature = "tls")]
#[cfg(all(feature = "server", feature = "tls"))]
use std::sync::Arc;
use std::time::Duration;
#[cfg(feature = "tls")]
#[cfg(all(feature = "server", feature = "tls"))]
use tokio_rustls::rustls::pki_types::CertificateDer;
use tokio_stream::Stream;

Expand Down Expand Up @@ -211,8 +211,8 @@ impl<T> Request<T> {
/// This will return `None` if the `IO` type used
/// does not implement `Connected` or when using a unix domain socket.
/// This currently only works on the server side.
#[cfg(feature = "transport")]
#[cfg_attr(docsrs, doc(cfg(feature = "transport")))]
#[cfg(feature = "server")]
#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
pub fn local_addr(&self) -> Option<SocketAddr> {
let addr = self
.extensions()
Expand All @@ -234,8 +234,8 @@ impl<T> Request<T> {
/// This will return `None` if the `IO` type used
/// does not implement `Connected` or when using a unix domain socket.
/// This currently only works on the server side.
#[cfg(feature = "transport")]
#[cfg_attr(docsrs, doc(cfg(feature = "transport")))]
#[cfg(feature = "server")]
#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
pub fn remote_addr(&self) -> Option<SocketAddr> {
let addr = self
.extensions()
Expand All @@ -258,8 +258,8 @@ impl<T> Request<T> {
/// and is mostly used for mTLS. This currently only returns
/// `Some` on the server side of the `transport` server with
/// TLS enabled connections.
#[cfg(feature = "tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
#[cfg(all(feature = "server", feature = "tls"))]
#[cfg_attr(docsrs, doc(all(feature = "server", feature = "tls")))]
pub fn peer_certs(&self) -> Option<Arc<Vec<CertificateDer<'static>>>> {
self.extensions()
.get::<TlsConnectInfo<TcpConnectInfo>>()
Expand Down
2 changes: 1 addition & 1 deletion tonic/src/service/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ struct AxumBodyService<S> {
service: S,
}

pub(crate) type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;

impl<S> Service<Request<axum::body::Body>> for AxumBodyService<S>
where
Expand Down
25 changes: 12 additions & 13 deletions tonic/src/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@ impl Status {
Status::new(Code::Unauthenticated, message)
}

#[cfg_attr(not(feature = "transport"), allow(dead_code))]
pub(crate) fn from_error_generic(
err: impl Into<Box<dyn Error + Send + Sync + 'static>>,
) -> Status {
Expand All @@ -316,7 +315,6 @@ impl Status {
///
/// Inspects the error source chain for recognizable errors, including statuses, HTTP2, and
/// hyper, and attempts to maps them to a `Status`, or else returns an Unknown `Status`.
#[cfg_attr(not(feature = "transport"), allow(dead_code))]
pub fn from_error(err: Box<dyn Error + Send + Sync + 'static>) -> Status {
Status::try_from_error(err).unwrap_or_else(|err| {
let mut status = Status::new(Code::Unknown, err.to_string());
Expand All @@ -342,7 +340,7 @@ impl Status {
Err(err) => err,
};

#[cfg(feature = "transport")]
#[cfg(feature = "server")]
let err = match err.downcast::<h2::Error>() {
Ok(h2) => {
return Ok(Status::from_h2_error(h2));
Expand All @@ -359,7 +357,7 @@ impl Status {
}

// FIXME: bubble this into `transport` and expose generic http2 reasons.
#[cfg(feature = "transport")]
#[cfg(feature = "server")]
fn from_h2_error(err: Box<h2::Error>) -> Status {
let code = Self::code_from_h2(&err);

Expand All @@ -368,7 +366,7 @@ impl Status {
status
}

#[cfg(feature = "transport")]
#[cfg(feature = "server")]
fn code_from_h2(err: &h2::Error) -> Code {
// See https://github.com/grpc/grpc/blob/3977c30/doc/PROTOCOL-HTTP2.md#errors
match err.reason() {
Expand All @@ -388,7 +386,7 @@ impl Status {
}
}

#[cfg(feature = "transport")]
#[cfg(feature = "server")]
fn to_h2_error(&self) -> h2::Error {
// conservatively transform to h2 error codes...
let reason = match self.code {
Expand All @@ -404,7 +402,7 @@ impl Status {
///
/// Returns Some if there's a way to handle the error, or None if the information from this
/// hyper error, but perhaps not its source, should be ignored.
#[cfg(feature = "transport")]
#[cfg(any(feature = "server", feature = "channel"))]
fn from_hyper_error(err: &hyper::Error) -> Option<Status> {
// is_timeout results from hyper's keep-alive logic
// (https://docs.rs/hyper/0.14.11/src/hyper/error.rs.html#192-194). Per the grpc spec
Expand All @@ -420,6 +418,7 @@ impl Status {
return Some(Status::cancelled(err.to_string()));
}

#[cfg(feature = "server")]
if let Some(h2_err) = err.source().and_then(|e| e.downcast_ref::<h2::Error>()) {
let code = Status::code_from_h2(h2_err);
let status = Self::new(code, format!("h2 protocol error: {}", err));
Expand Down Expand Up @@ -607,7 +606,7 @@ fn find_status_in_source_chain(err: &(dyn Error + 'static)) -> Option<Status> {
});
}

#[cfg(feature = "transport")]
#[cfg(feature = "server")]
if let Some(timeout) = err.downcast_ref::<crate::transport::TimeoutExpired>() {
return Some(Status::cancelled(timeout.to_string()));
}
Expand All @@ -624,7 +623,7 @@ fn find_status_in_source_chain(err: &(dyn Error + 'static)) -> Option<Status> {
return Some(Status::unavailable(connect.to_string()));
}

#[cfg(feature = "transport")]
#[cfg(any(feature = "server", feature = "channel"))]
if let Some(hyper) = err
.downcast_ref::<hyper::Error>()
.and_then(Status::from_hyper_error)
Expand Down Expand Up @@ -671,14 +670,14 @@ fn invalid_header_value_byte<Error: fmt::Display>(err: Error) -> Status {
)
}

#[cfg(feature = "transport")]
#[cfg(feature = "server")]
impl From<h2::Error> for Status {
fn from(err: h2::Error) -> Self {
Status::from_h2_error(Box::new(err))
}
}

#[cfg(feature = "transport")]
#[cfg(feature = "server")]
impl From<Status> for h2::Error {
fn from(status: Status) -> Self {
status.to_h2_error()
Expand Down Expand Up @@ -927,7 +926,7 @@ mod tests {
}

#[test]
#[cfg(feature = "transport")]
#[cfg(feature = "server")]
fn from_error_h2() {
use std::error::Error as _;

Expand All @@ -944,7 +943,7 @@ mod tests {
}

#[test]
#[cfg(feature = "transport")]
#[cfg(feature = "server")]
fn to_h2_error() {
let orig = Status::new(Code::Cancelled, "stop eet!");
let err = orig.to_h2_error();
Expand Down
3 changes: 2 additions & 1 deletion tonic/src/transport/channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use tower::{
Service,
};

type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
type Svc = Either<Connection, BoxService<Request<BoxBody>, Response<BoxBody>, crate::Error>>;

const DEFAULT_BUFFER_SIZE: usize = 1024;
Expand Down Expand Up @@ -186,7 +187,7 @@ impl Channel {
D: Discover<Service = Connection> + Unpin + Send + 'static,
D::Error: Into<crate::Error>,
D::Key: Hash + Send + Clone,
E: Executor<crate::transport::BoxFuture<'static, ()>> + Send + Sync + 'static,
E: Executor<BoxFuture<'static, ()>> + Send + Sync + 'static,
{
let svc = Balance::new(discover);

Expand Down
2 changes: 1 addition & 1 deletion tonic/src/transport/channel/service/add_origin.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::transport::BoxFuture;
use crate::transport::channel::BoxFuture;
use http::uri::Authority;
use http::uri::Scheme;
use http::{Request, Uri};
Expand Down
2 changes: 1 addition & 1 deletion tonic/src/transport/channel/service/connection.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{AddOrigin, Reconnect, SharedExec, UserAgent};
use crate::{
body::{boxed, BoxBody},
transport::{service::GrpcTimeout, BoxFuture, Endpoint},
transport::{channel::BoxFuture, service::GrpcTimeout, Endpoint},
};
use http::Uri;
use hyper::rt;
Expand Down
2 changes: 1 addition & 1 deletion tonic/src/transport/channel/service/connector.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::BoxedIo;
#[cfg(feature = "tls")]
use super::TlsConnector;
use crate::transport::BoxFuture;
use crate::transport::channel::BoxFuture;
use http::Uri;
use std::fmt;
use std::task::{Context, Poll};
Expand Down
2 changes: 1 addition & 1 deletion tonic/src/transport/channel/service/executor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::transport::BoxFuture;
use crate::transport::channel::BoxFuture;
use std::{future::Future, sync::Arc};

pub(crate) use hyper::rt::Executor;
Expand Down
11 changes: 6 additions & 5 deletions tonic/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
#[cfg(feature = "channel")]
pub mod channel;
#[cfg(feature = "server")]
pub mod server;

mod error;
Expand All @@ -104,13 +105,16 @@ mod tls;
pub use self::channel::{Channel, Endpoint};
pub use self::error::Error;
#[doc(inline)]
#[cfg(feature = "server")]
pub use self::server::Server;
#[doc(inline)]
pub use self::service::grpc_timeout::TimeoutExpired;

#[cfg(feature = "tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
pub use self::tls::Certificate;
#[cfg(feature = "server")]
#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
pub use axum::{body::Body as AxumBoxBody, Router as AxumRouter};
pub use hyper::{body::Body, Uri};
#[cfg(feature = "tls")]
Expand All @@ -119,12 +123,9 @@ pub use tokio_rustls::rustls::pki_types::CertificateDer;
#[cfg(all(feature = "channel", feature = "tls"))]
#[cfg_attr(docsrs, doc(cfg(all(feature = "channel", feature = "tls"))))]
pub use self::channel::ClientTlsConfig;
#[cfg(feature = "tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
#[cfg(all(feature = "server", feature = "tls"))]
#[cfg_attr(docsrs, doc(all(feature = "server", feature = "tls")))]
pub use self::server::ServerTlsConfig;
#[cfg(feature = "tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
pub use self::tls::Identity;

#[cfg(feature = "channel")]
use crate::service::router::BoxFuture;
3 changes: 1 addition & 2 deletions tonic/src/transport/server/incoming.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use super::{Connected, Server};
use crate::transport::service::ServerIo;
use super::{service::ServerIo, Connected, Server};
use std::{
net::{SocketAddr, TcpListener as StdTcpListener},
pin::{pin, Pin},
Expand Down
Loading

0 comments on commit 654289f

Please sign in to comment.