Skip to content

Commit

Permalink
feat(tonic): Add client feature flag to support connect_with_connecto…
Browse files Browse the repository at this point in the history
…r in wasm32 targets

This adds a new feature that enables compilation of connect_with_connector and the Endpoint and Channel struct for wasm.

Related: hyperium#491
  • Loading branch information
lucasmerlin committed Jun 17, 2024
1 parent e2c506a commit 4f0596e
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 109 deletions.
22 changes: 19 additions & 3 deletions tonic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,24 @@ transport = [
"dep:async-stream",
"channel",
"dep:h2",
"dep:hyper", "dep:hyper-util", "dep:hyper-timeout",
"dep:hyper",
"dep:hyper-util",
"dep:hyper-timeout",
"dep:socket2",
"dep:tokio", "tokio?/macros", "tokio?/net", "tokio?/time",
"dep:tokio",
"tokio?/macros",
"tokio?/net",
"tokio?/time",
"dep:tower",
]
client = [
"dep:h2",
"hyper/client",
"hyper/http2",
"dep:tokio",
"dep:tower",
"dep:hyper-timeout",
]
channel = []

# [[bench]]
Expand Down Expand Up @@ -72,7 +85,7 @@ async-trait = {version = "0.1.13", optional = true}
async-stream = {version = "0.3", optional = true}
h2 = {version = "0.4", optional = true}
hyper = {version = "1", features = ["full"], optional = true}
hyper-util = { version = ">=0.1.4, <0.2", features = ["full"], optional = true }
hyper-util = { version = ">=0.1.4, <0.2", default-features = false, optional = true }
hyper-timeout = {version = "0.5", optional = true}
socket2 = { version = ">=0.4.7, <0.6.0", optional = true, features = ["all"] }
tokio = {version = "1", default-features = false, optional = true}
Expand All @@ -90,6 +103,9 @@ webpki-roots = { version = "0.26", optional = true }
flate2 = {version = "1.0", optional = true}
zstd = { version = "0.13.0", optional = true }

[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen-futures = "0.4.38"

[dev-dependencies]
bencher = "0.1.5"
quickcheck = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion tonic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub mod metadata;
pub mod server;
pub mod service;

#[cfg(feature = "transport")]
#[cfg(any(feature = "transport", feature = "client"))]
#[cfg_attr(docsrs, doc(cfg(feature = "transport")))]
pub mod transport;

Expand Down
4 changes: 3 additions & 1 deletion tonic/src/transport/channel/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ impl Endpoint {
}

/// Create a channel from this config.
#[cfg(feature = "transport")]
pub async fn connect(&self) -> Result<Channel, Error> {
let mut http = HttpConnector::new();
http.enforce_http(false);
Expand All @@ -349,6 +350,7 @@ impl Endpoint {
///
/// The channel returned by this method does not attempt to connect to the endpoint until first
/// use.
#[cfg(feature = "transport")]
pub fn connect_lazy(&self) -> Channel {
let mut http = HttpConnector::new();
http.enforce_http(false);
Expand Down Expand Up @@ -447,7 +449,7 @@ impl From<Uri> for Endpoint {
http2_keep_alive_while_idle: None,
connect_timeout: None,
http2_adaptive_window: None,
executor: SharedExec::tokio(),
executor: SharedExec::default_exec(),
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions tonic/src/transport/channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ pub use endpoint::Endpoint;
#[cfg(feature = "tls")]
pub use tls::ClientTlsConfig;

use super::service::{Connection, DynamicServiceStream, SharedExec};
use super::service::{Connection};
#[cfg(feature = "transport")]
use super::service::{DynamicServiceStream, SharedExec};
use crate::body::BoxBody;
use crate::transport::Executor;
use bytes::Bytes;
Expand Down Expand Up @@ -107,6 +109,7 @@ impl Channel {
///
/// This creates a [`Channel`] that will load balance across all the
/// provided endpoints.
#[cfg(feature = "transport")]
pub fn balance_list(list: impl Iterator<Item = Endpoint>) -> Self {
let (channel, tx) = Self::balance_channel(DEFAULT_BUFFER_SIZE);
list.for_each(|endpoint| {
Expand All @@ -120,18 +123,20 @@ impl Channel {
/// Balance a list of [`Endpoint`]'s.
///
/// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints.
#[cfg(feature = "transport")]
pub fn balance_channel<K>(capacity: usize) -> (Self, Sender<Change<K, Endpoint>>)
where
K: Hash + Eq + Send + Clone + 'static,
{
Self::balance_channel_with_executor(capacity, SharedExec::tokio())
Self::balance_channel_with_executor(capacity, SharedExec::default_exec())
}

/// Balance a list of [`Endpoint`]'s.
///
/// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints.
///
/// The [`Channel`] will use the given executor to spawn async tasks.
#[cfg(feature = "transport")]
pub fn balance_channel_with_executor<K, E>(
capacity: usize,
executor: E,
Expand Down
3 changes: 3 additions & 0 deletions tonic/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
//! [rustls]: https://docs.rs/rustls/0.16.0/rustls/

pub mod channel;
#[cfg(feature = "transport")]
pub mod server;

mod error;
Expand All @@ -102,6 +103,7 @@ mod tls;
#[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
pub use self::channel::{Channel, Endpoint};
pub use self::error::Error;
#[cfg(feature = "transport")]
#[doc(inline)]
pub use self::server::Server;
#[doc(inline)]
Expand All @@ -111,6 +113,7 @@ pub(crate) use self::service::ConnectError;
#[cfg(feature = "tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
pub use self::tls::Certificate;
#[cfg(feature = "transport")]
pub use axum::{body::Body as AxumBoxBody, Router as AxumRouter};
pub use hyper::{body::Body, Uri};
#[cfg(feature = "tls")]
Expand Down
55 changes: 49 additions & 6 deletions tonic/src/transport/service/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
body::{boxed, BoxBody},
transport::{BoxFuture, Endpoint},
};

use http::Uri;
use hyper::rt;
use hyper::{client::conn::http2::Builder, rt::Executor};
Expand All @@ -16,11 +17,19 @@ use tower::load::Load;
use tower::{
layer::Layer,
limit::{concurrency::ConcurrencyLimitLayer, rate::RateLimitLayer},
util::BoxService,
ServiceBuilder, ServiceExt,
ServiceBuilder,
ServiceExt, util::BoxService,
};
use tower::load::Load;
use tower_service::Service;

use crate::{
body::BoxBody,
transport::{BoxFuture, Endpoint},
};

use super::{AddOrigin, grpc_timeout::GrpcTimeout, reconnect::Reconnect, UserAgent};

pub(crate) type Response<B = BoxBody> = http::Response<B>;
pub(crate) type Request<B = BoxBody> = http::Request<B>;

Expand All @@ -43,18 +52,36 @@ impl Connection {
.timer(TokioTimer::new())
.clone();

if let Some(val) = endpoint.http2_keep_alive_timeout {
settings.keep_alive_timeout(val);
if let Some(val) = endpoint.http2_adaptive_window {
settings.http2_adaptive_window(val);
}

if let Some(val) = endpoint.http2_keep_alive_while_idle {
settings.keep_alive_while_idle(val);
#[cfg(feature = "transport")]
{
settings
.http_keep_alive_interval(endpoint.http2_keep_alive_interval);

if let Some(val) = endpoint.http2_keep_alive_timeout {
settings.keep_alive_timeout(val);
}

if let Some(val) = endpoint.http2_keep_alive_while_idle {
settings.keep_alive_while_idle(val);
}
}

if let Some(val) = endpoint.http2_adaptive_window {
settings.adaptive_window(val);
}


#[cfg(target_arch = "wasm32")]
{
settings.executor(wasm::Executor)
// reset streams require `Instant::now` which is not available on wasm
.http_max_concurrent_reset_streams(0);
}

let stack = ServiceBuilder::new()
.layer_fn(|s| {
let origin = endpoint.origin.as_ref().unwrap_or(&endpoint.uri).clone();
Expand Down Expand Up @@ -209,3 +236,19 @@ where
})
}
}

#[cfg(target_arch = "wasm32")]
mod wasm {
use std::future::Future;
use std::pin::Pin;

type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>;

pub(crate) struct Executor;

impl hyper::rt::Executor<BoxSendFuture> for Executor {
fn execute(&self, fut: BoxSendFuture) {
wasm_bindgen_futures::spawn_local(fut)
}
}
}
24 changes: 22 additions & 2 deletions tonic/src/transport/service/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ use std::{future::Future, sync::Arc};

pub(crate) use hyper::rt::Executor;

#[cfg(not(target_arch = "wasm32"))]
#[derive(Copy, Clone)]
struct TokioExec;

#[cfg(not(target_arch = "wasm32"))]
impl<F> Executor<F> for TokioExec
where
F: Future + Send + 'static,
Expand All @@ -16,6 +18,21 @@ where
}
}

#[cfg(target_arch = "wasm32")]
#[derive(Copy, Clone)]
struct WasmBindgenExec;

#[cfg(target_arch = "wasm32")]
impl<F> Executor<F> for WasmBindgenExec
where
F: Future + 'static,
F::Output: 'static,
{
fn execute(&self, fut: F) {
wasm_bindgen_futures::spawn_local(async move {fut.await;});
}
}

#[derive(Clone)]
pub(crate) struct SharedExec {
inner: Arc<dyn Executor<BoxFuture<'static, ()>> + Send + Sync + 'static>,
Expand All @@ -31,8 +48,11 @@ impl SharedExec {
}
}

pub(crate) fn tokio() -> Self {
Self::new(TokioExec)
pub(crate) fn default_exec() -> Self {
#[cfg(not(target_arch = "wasm32"))]
return Self::new(TokioExec);
#[cfg(target_arch = "wasm32")]
Self::new(WasmBindgenExec)
}
}

Expand Down
Loading

0 comments on commit 4f0596e

Please sign in to comment.