diff --git a/benchmark/src/client.rs b/benchmark/src/client.rs index e98553f5..25e7cdf6 100644 --- a/benchmark/src/client.rs +++ b/benchmark/src/client.rs @@ -370,16 +370,18 @@ impl Client { builder = builder.raw_cfg_int(key, arg.get_int_value() as i32); } } + // Check https://github.com/grpc/grpc/issues/31465. + builder = builder.enable_retry(false); if cfg.has_security_params() { let params = cfg.get_security_params(); if !params.get_server_host_override().is_empty() { builder = builder .override_ssl_target(params.get_server_host_override().to_owned()); } - builder.secure_connect(addr, proto_util::create_test_channel_credentials()) - } else { - builder.connect(addr) + builder = + builder.set_credentials(proto_util::create_test_channel_credentials()); } + builder.connect(addr) }); let client_type = cfg.get_client_type(); diff --git a/benchmark/src/main.rs b/benchmark/src/main.rs index 30e0f489..b907d88b 100644 --- a/benchmark/src/main.rs +++ b/benchmark/src/main.rs @@ -11,7 +11,7 @@ use std::sync::Arc; use benchmark::{init_log, Worker}; use clap::{App, Arg}; use futures_channel::oneshot; -use grpc::{Environment, ServerBuilder}; +use grpc::{Environment, ServerBuilder, ServerCredentials}; use grpc_proto::testing::services_grpc::create_worker_service; use rand::Rng; @@ -40,13 +40,13 @@ fn main() { let service = create_worker_service(worker); let mut server = ServerBuilder::new(env) .register_service(service) - .bind("[::]", port) .build() .unwrap(); + let port = server + .add_listening_port(&format!("[::]:{port}"), ServerCredentials::insecure()) + .unwrap(); - for (host, port) in server.bind_addrs() { - info!("listening on {}:{}", host, port); - } + info!("listening on [::]:{}", port); server.start(); diff --git a/benchmark/src/server.rs b/benchmark/src/server.rs index 7ff33d6e..5839dfc3 100644 --- a/benchmark/src/server.rs +++ b/benchmark/src/server.rs @@ -4,6 +4,7 @@ use std::ffi::CString; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use grpc::ServerCredentials; use grpc_proto::testing::control::{ServerConfig, ServerStatus, ServerType}; use grpc_proto::testing::services_grpc::create_benchmark_service; use grpc_proto::testing::stats::ServerStats; @@ -17,6 +18,7 @@ use crate::util::{self, CpuRecorder}; pub struct Server { server: GrpcServer, + port: u16, recorder: CpuRecorder, keep_running: Arc, } @@ -60,20 +62,19 @@ impl Server { } builder = builder.channel_args(ch_builder.build_args()); } - builder = if cfg.has_security_params() { - builder.bind_with_cred( - "[::]", - cfg.get_port() as u16, - proto_util::create_test_server_credentials(), - ) + let mut s = builder.build().unwrap(); + let creds = if cfg.has_security_params() { + proto_util::create_test_server_credentials() } else { - builder.bind("[::]", cfg.get_port() as u16) + ServerCredentials::insecure() }; - - let mut s = builder.build().unwrap(); + let port = s + .add_listening_port(&format!("[::]:{}", cfg.get_port()), creds) + .unwrap(); s.start(); Ok(Server { server: s, + port, recorder: CpuRecorder::new(), keep_running: keep_running1, }) @@ -98,7 +99,7 @@ impl Server { pub fn get_status(&self) -> ServerStatus { let mut status = ServerStatus::default(); - status.set_port(i32::from(self.server.bind_addrs().next().unwrap().1)); + status.set_port(self.port as i32); status.set_cores(util::cpu_num_cores() as i32); status } diff --git a/grpc-sys/bindings/bindings.rs b/grpc-sys/bindings/bindings.rs index f18ad44b..c804d203 100644 --- a/grpc-sys/bindings/bindings.rs +++ b/grpc-sys/bindings/bindings.rs @@ -132,12 +132,10 @@ pub const GRPC_WRITE_BUFFER_HINT: u32 = 1; pub const GRPC_WRITE_NO_COMPRESS: u32 = 2; pub const GRPC_WRITE_THROUGH: u32 = 4; pub const GRPC_WRITE_USED_MASK: u32 = 7; -pub const GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST: u32 = 16; pub const GRPC_INITIAL_METADATA_WAIT_FOR_READY: u32 = 32; -pub const GRPC_INITIAL_METADATA_CACHEABLE_REQUEST: u32 = 64; pub const GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET: u32 = 128; pub const GRPC_INITIAL_METADATA_CORKED: u32 = 256; -pub const GRPC_INITIAL_METADATA_USED_MASK: u32 = 500; +pub const GRPC_INITIAL_METADATA_USED_MASK: u32 = 420; pub const GRPC_CQ_CURRENT_VERSION: u32 = 2; pub const GRPC_CQ_VERSION_MINIMUM_FOR_CALLBACKABLE: u32 = 2; pub const GRPC_MAX_COMPLETION_QUEUE_PLUCKERS: u32 = 6; @@ -226,6 +224,9 @@ pub struct grpc_slice_refcount { #[doc = ""] #[doc = "If the slice does not have a refcount, it represents an inlined small piece"] #[doc = "of data that is copied by value."] +#[doc = ""] +#[doc = "As a special case, a slice can be given refcount == uintptr_t(1), meaning"] +#[doc = "that the slice represents external data that is not refcounted."] #[repr(C)] #[derive(Copy, Clone)] pub struct grpc_slice { @@ -474,13 +475,6 @@ extern "C" { extern "C" { pub fn grpc_slice_malloc_large(length: usize) -> grpc_slice; } -extern "C" { - #[doc = " Intern a slice:"] - #[doc = ""] - #[doc = "The return value for two invocations of this function with the same sequence"] - #[doc = "of bytes is a slice which points to the same memory."] - pub fn grpc_slice_intern(slice: grpc_slice) -> grpc_slice; -} extern "C" { #[doc = " Create a slice by copying a string."] #[doc = "Does not preserve null terminators."] @@ -555,12 +549,6 @@ extern "C" { extern "C" { pub fn grpc_empty_slice() -> grpc_slice; } -extern "C" { - pub fn grpc_slice_default_hash_impl(s: grpc_slice) -> u32; -} -extern "C" { - pub fn grpc_slice_default_eq_impl(a: grpc_slice, b: grpc_slice) -> ::std::os::raw::c_int; -} extern "C" { pub fn grpc_slice_eq(a: grpc_slice, b: grpc_slice) -> ::std::os::raw::c_int; } @@ -596,9 +584,6 @@ extern "C" { #[doc = "if it's not found"] pub fn grpc_slice_slice(haystack: grpc_slice, needle: grpc_slice) -> ::std::os::raw::c_int; } -extern "C" { - pub fn grpc_slice_hash(s: grpc_slice) -> u32; -} extern "C" { #[doc = " Do two slices point at the same memory, with the same length"] #[doc = "If a or b is inlined, actually compares data"] @@ -1934,18 +1919,38 @@ extern "C" { #[doc = "to non-experimental or remove it."] pub fn grpc_channel_reset_connect_backoff(channel: *mut grpc_channel); } +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct grpc_channel_credentials { + _unused: [u8; 0], +} extern "C" { - #[doc = " Create a client channel to 'target'. Additional channel level configuration"] - #[doc = "MAY be provided by grpc_channel_args, though the expectation is that most"] - #[doc = "clients will want to simply pass NULL. The user data in 'args' need only"] - #[doc = "live through the invocation of this function. However, if any args of the"] - #[doc = "'pointer' type are passed, then the referenced vtable must be maintained"] - #[doc = "by the caller until grpc_channel_destroy terminates. See grpc_channel_args"] - #[doc = "definition for more on this."] - pub fn grpc_insecure_channel_create( + #[doc = " Releases a channel credentials object."] + #[doc = "The creator of the credentials object is responsible for its release."] + pub fn grpc_channel_credentials_release(creds: *mut grpc_channel_credentials); +} +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct grpc_server_credentials { + _unused: [u8; 0], +} +extern "C" { + #[doc = " Releases a server_credentials object."] + #[doc = "The creator of the server_credentials object is responsible for its release."] + pub fn grpc_server_credentials_release(creds: *mut grpc_server_credentials); +} +extern "C" { + #[doc = " Creates a secure channel using the passed-in credentials. Additional"] + #[doc = "channel level configuration MAY be provided by grpc_channel_args, though"] + #[doc = "the expectation is that most clients will want to simply pass NULL. The"] + #[doc = "user data in 'args' need only live through the invocation of this function."] + #[doc = "However, if any args of the 'pointer' type are passed, then the referenced"] + #[doc = "vtable must be maintained by the caller until grpc_channel_destroy"] + #[doc = "terminates. See grpc_channel_args definition for more on this."] + pub fn grpc_channel_create( target: *const ::std::os::raw::c_char, + creds: *mut grpc_channel_credentials, args: *const grpc_channel_args, - reserved: *mut ::std::os::raw::c_void, ) -> *mut grpc_channel; } extern "C" { @@ -2128,12 +2133,13 @@ extern "C" { ); } extern "C" { - #[doc = " Add a HTTP2 over plaintext over tcp listener."] + #[doc = " Add a HTTP2 over an encrypted link over tcp listener."] #[doc = "Returns bound port number on success, 0 on failure."] #[doc = "REQUIRES: server not started"] - pub fn grpc_server_add_insecure_http2_port( + pub fn grpc_server_add_http2_port( server: *mut grpc_server, addr: *const ::std::os::raw::c_char, + creds: *mut grpc_server_credentials, ) -> ::std::os::raw::c_int; } extern "C" { @@ -2283,30 +2289,31 @@ extern "C" { ) -> *mut grpc_channel; } extern "C" { - #[doc = " Create a client channel to 'target' using file descriptor 'fd'. The 'target'"] - #[doc = "argument will be used to indicate the name for this channel. See the comment"] - #[doc = "for grpc_insecure_channel_create for description of 'args' argument."] - pub fn grpc_insecure_channel_create_from_fd( + #[doc = " Create a secure channel to 'target' using file descriptor 'fd' and passed-in"] + #[doc = "credentials. The 'target' argument will be used to indicate the name for"] + #[doc = "this channel. Note that this API currently only supports insecure channel"] + #[doc = "credentials. Using other types of credentials will result in a failure."] + pub fn grpc_channel_create_from_fd( target: *const ::std::os::raw::c_char, fd: ::std::os::raw::c_int, + creds: *mut grpc_channel_credentials, args: *const grpc_channel_args, ) -> *mut grpc_channel; } extern "C" { - #[doc = " Add the connected communication channel based on file descriptor 'fd' to the"] - #[doc = "'server'. The 'fd' must be an open file descriptor corresponding to a"] - #[doc = "connected socket. Events from the file descriptor may come on any of the"] - #[doc = "server completion queues (i.e completion queues registered via the"] - #[doc = "grpc_server_register_completion_queue API)."] - #[doc = ""] - #[doc = "The 'reserved' pointer MUST be NULL."] - #[doc = ""] + #[doc = " Add the connected secure communication channel based on file descriptor 'fd'"] + #[doc = "to the 'server' and server credentials 'creds'. The 'fd' must be an open file"] + #[doc = "descriptor corresponding to a connected socket. Events from the file"] + #[doc = "descriptor may come on any of the server completion queues (i.e completion"] + #[doc = "queues registered via the grpc_server_register_completion_queue API)."] + #[doc = "Note that this API currently only supports inseure server credentials"] + #[doc = "Using other types of credentials will result in a failure."] #[doc = "TODO(hork): add channel_args to this API to allow endpoints and transports"] #[doc = "created in this function to participate in the resource quota feature."] - pub fn grpc_server_add_insecure_channel_from_fd( + pub fn grpc_server_add_channel_from_fd( server: *mut grpc_server, - reserved: *mut ::std::os::raw::c_void, fd: ::std::os::raw::c_int, + creds: *mut grpc_server_credentials, ); } #[repr(u32)] @@ -2529,16 +2536,6 @@ extern "C" { #[doc = "The creator of the credentials object is responsible for its release."] pub fn grpc_call_credentials_release(creds: *mut grpc_call_credentials); } -#[repr(C)] -#[derive(Debug, Copy, Clone)] -pub struct grpc_channel_credentials { - _unused: [u8; 0], -} -extern "C" { - #[doc = " Releases a channel credentials object."] - #[doc = "The creator of the credentials object is responsible for its release."] - pub fn grpc_channel_credentials_release(creds: *mut grpc_channel_credentials); -} extern "C" { #[doc = " Creates default credentials to connect to a google gRPC service."] #[doc = "WARNING: Do NOT use this credentials to connect to a non-google service as"] @@ -2905,31 +2902,6 @@ extern "C" { reserved: *mut ::std::os::raw::c_void, ) -> *mut grpc_call_credentials; } -extern "C" { - #[doc = " Creates a secure channel using the passed-in credentials. Additional"] - #[doc = "channel level configuration MAY be provided by grpc_channel_args, though"] - #[doc = "the expectation is that most clients will want to simply pass NULL. The"] - #[doc = "user data in 'args' need only live through the invocation of this function."] - #[doc = "However, if any args of the 'pointer' type are passed, then the referenced"] - #[doc = "vtable must be maintained by the caller until grpc_channel_destroy"] - #[doc = "terminates. See grpc_channel_args definition for more on this."] - pub fn grpc_secure_channel_create( - creds: *mut grpc_channel_credentials, - target: *const ::std::os::raw::c_char, - args: *const grpc_channel_args, - reserved: *mut ::std::os::raw::c_void, - ) -> *mut grpc_channel; -} -#[repr(C)] -#[derive(Debug, Copy, Clone)] -pub struct grpc_server_credentials { - _unused: [u8; 0], -} -extern "C" { - #[doc = " Releases a server_credentials object."] - #[doc = "The creator of the server_credentials object is responsible for its release."] - pub fn grpc_server_credentials_release(creds: *mut grpc_server_credentials); -} #[repr(C)] #[derive(Debug, Copy, Clone)] pub struct grpc_ssl_server_certificate_config { @@ -3046,16 +3018,6 @@ extern "C" { options: *mut grpc_ssl_server_credentials_options, ) -> *mut grpc_server_credentials; } -extern "C" { - #[doc = " Add a HTTP2 over an encrypted link over tcp listener."] - #[doc = "Returns bound port number on success, 0 on failure."] - #[doc = "REQUIRES: server not started"] - pub fn grpc_server_add_secure_http2_port( - server: *mut grpc_server, - addr: *const ::std::os::raw::c_char, - creds: *mut grpc_server_credentials, - ) -> ::std::os::raw::c_int; -} extern "C" { #[doc = " Sets a credentials to a call. Can only be called on the client side before"] #[doc = "grpc_call_start_batch."] @@ -3668,9 +3630,9 @@ pub struct grpc_authorization_policy_provider { } extern "C" { #[doc = " EXPERIMENTAL - Subject to change."] - #[doc = " Creates a grpc_authorization_policy_provider using SDK authorization policy"] + #[doc = " Creates a grpc_authorization_policy_provider using gRPC authorization policy"] #[doc = " from static string."] - #[doc = " - authz_policy is the input SDK authorization policy."] + #[doc = " - authz_policy is the input gRPC authorization policy."] #[doc = " - code is the error status code on failure. On success, it equals"] #[doc = " GRPC_STATUS_OK."] #[doc = " - error_details contains details about the error if any. If the"] @@ -3684,9 +3646,9 @@ extern "C" { } extern "C" { #[doc = " EXPERIMENTAL - Subject to change."] - #[doc = " Creates a grpc_authorization_policy_provider by watching for SDK"] + #[doc = " Creates a grpc_authorization_policy_provider by watching for gRPC"] #[doc = " authorization policy changes in filesystem."] - #[doc = " - authz_policy is the file path of SDK authorization policy."] + #[doc = " - authz_policy is the file path of gRPC authorization policy."] #[doc = " - refresh_interval_sec is the amount of time the internal thread would wait"] #[doc = " before checking for file updates."] #[doc = " - code is the error status code on failure. On success, it equals"] @@ -3709,6 +3671,19 @@ extern "C" { provider: *mut grpc_authorization_policy_provider, ); } +extern "C" { + #[doc = " EXPERIMENTAL API - Subject to change."] + #[doc = " Configures a grpc_tls_credentials_options object with tls session key"] + #[doc = " logging capability. TLS channels using these credentials have tls session"] + #[doc = " key logging enabled."] + #[doc = " - options is the grpc_tls_credentials_options object"] + #[doc = " - path is a string pointing to the location where TLS session keys would be"] + #[doc = " stored."] + pub fn grpc_tls_credentials_options_set_tls_session_key_log_file_path( + options: *mut grpc_tls_credentials_options, + path: *const ::std::os::raw::c_char, + ); +} #[repr(u32)] #[doc = " The severity of a log message - use the #defines below when calling into"] #[doc = "gpr_log to additionally supply file and line data"] diff --git a/grpc-sys/grpc b/grpc-sys/grpc index 38a9cd9d..996605a5 160000 --- a/grpc-sys/grpc +++ b/grpc-sys/grpc @@ -1 +1 @@ -Subproject commit 38a9cd9d632e4eba70790525abf38674251eb2a0 +Subproject commit 996605a5e62f3f00043ac8d3ebca84523bc2dd76 diff --git a/health/tests/health_check.rs b/health/tests/health_check.rs index 9cb5ffda..c968b989 100644 --- a/health/tests/health_check.rs +++ b/health/tests/health_check.rs @@ -48,11 +48,12 @@ fn setup() -> (Server, HealthService, HealthClient) { let health_service = create_health(service.clone()); let mut server = ServerBuilder::new(env.clone()) .register_service(health_service) - .bind("127.0.0.1", 0) .build() .unwrap(); + let port = server + .add_listening_port("127.0.0.1:0", ServerCredentials::insecure()) + .unwrap(); server.start(); - let (_, port) = server.bind_addrs().next().unwrap(); let ch = ChannelBuilder::new(env).connect(&format!("127.0.0.1:{}", port)); let client = HealthClient::new(ch); diff --git a/interop/src/bin/client.rs b/interop/src/bin/client.rs index ac72c8af..20e11d16 100644 --- a/interop/src/bin/client.rs +++ b/interop/src/bin/client.rs @@ -70,17 +70,16 @@ fn main() { .unwrap(); let env = Arc::new(Environment::new(1)); - let builder = ChannelBuilder::new(env).override_ssl_target(host_override.to_owned()); - let channel = if use_tls { + let mut builder = ChannelBuilder::new(env).override_ssl_target(host_override.to_owned()); + if use_tls { let creds = if use_test_ca { util::create_test_channel_credentials() } else { ChannelCredentialsBuilder::new().build() }; - builder.secure_connect(&format!("{}:{}", host, port), creds) - } else { - builder.connect(&format!("{}:{}", host, port)) - }; + builder = builder.set_credentials(creds); + } + let channel = builder.connect(&format!("{}:{}", host, port)); let client = Client::new(channel); futures_executor::block_on(run_test(client, case)).unwrap(); diff --git a/interop/src/bin/server.rs b/interop/src/bin/server.rs index 11f48d9e..e14ed6eb 100644 --- a/interop/src/bin/server.rs +++ b/interop/src/bin/server.rs @@ -9,7 +9,7 @@ use std::sync::Arc; use clap::{App, Arg}; use futures_executor::block_on; -use grpc::{Environment, ServerBuilder}; +use grpc::{Environment, ServerBuilder, ServerCredentials}; use grpc_proto::testing::test_grpc::create_test_service; use grpc_proto::util; use interop::InteropTestService; @@ -37,7 +37,7 @@ fn main() { ) .get_matches(); let host = matches.value_of("host").unwrap_or("127.0.0.1"); - let port: u16 = matches.value_of("port").unwrap_or("8080").parse().unwrap(); + let mut port: u16 = matches.value_of("port").unwrap_or("8080").parse().unwrap(); let use_tls: bool = matches .value_of("use_tls") .unwrap_or("false") @@ -46,19 +46,19 @@ fn main() { let env = Arc::new(Environment::new(2)); let service = create_test_service(InteropTestService); - let mut builder = ServerBuilder::new(env).register_service(service); - - builder = if use_tls { - let creds = util::create_test_server_credentials(); - builder.bind_with_cred(host, port, creds) + let mut server = ServerBuilder::new(env) + .register_service(service) + .build() + .unwrap(); + let creds = if use_tls { + util::create_test_server_credentials() } else { - builder.bind(host, port) + ServerCredentials::insecure() }; - - let mut server = builder.build().unwrap(); - for (host, port) in server.bind_addrs() { - info!("listening on {}:{}", host, port); - } + port = server + .add_listening_port(&format!("{host}:{port}"), creds) + .unwrap(); + info!("listening on {}:{}", host, port); server.start(); block_on(futures_util::future::pending::<()>()); diff --git a/interop/tests/tests.rs b/interop/tests/tests.rs index 0b9ecb0a..868df7f5 100644 --- a/interop/tests/tests.rs +++ b/interop/tests/tests.rs @@ -11,32 +11,26 @@ macro_rules! mk_test { let env = Arc::new(Environment::new(2)); let service = create_test_service(InteropTestService); - let mut builder = ServerBuilder::new(env.clone()).register_service(service); - - builder = if $use_tls { - let creds = util::create_test_server_credentials(); - builder.bind_with_cred("127.0.0.1", 0, creds) + let mut server = ServerBuilder::new(env.clone()) + .register_service(service) + .build() + .unwrap(); + let creds = if $use_tls { + util::create_test_server_credentials() } else { - builder.bind("127.0.0.1", 0) + grpcio::ServerCredentials::insecure() }; - - let mut server = builder.build().unwrap(); + let port = server.add_listening_port("127.0.0.1:0", creds).unwrap(); server.start(); - let builder = + let mut builder = ChannelBuilder::new(env.clone()).override_ssl_target("foo.test.google.fr"); - let channel = { - let (host, port) = server.bind_addrs().next().unwrap(); - if $use_tls { - let creds = util::create_test_channel_credentials(); - builder.secure_connect(&format!("{}:{}", host, port), creds) - } else { - builder.connect(&format!("{}:{}", host, port)) - } - }; - + if $use_tls { + let creds = util::create_test_channel_credentials(); + builder = builder.set_credentials(creds); + } + let channel = builder.connect(&format!("127.0.0.1:{port}")); let client = Client::new(channel); - block_on(client.$func()).unwrap(); } }; diff --git a/src/call/client.rs b/src/call/client.rs index 1032ad54..a183ad4b 100644 --- a/src/call/client.rs +++ b/src/call/client.rs @@ -42,16 +42,6 @@ pub struct CallOption { } impl CallOption { - /// Signal that the call is idempotent. - pub fn idempotent(mut self, is_idempotent: bool) -> CallOption { - change_flag( - &mut self.call_flags, - grpc_sys::GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST, - is_idempotent, - ); - self - } - /// Signal that the call should not return UNAVAILABLE before it has started. pub fn wait_for_ready(mut self, wait_for_ready: bool) -> CallOption { change_flag( @@ -62,16 +52,6 @@ impl CallOption { self } - /// Signal that the call is cacheable. gRPC is free to use GET verb. - pub fn cacheable(mut self, cacheable: bool) -> CallOption { - change_flag( - &mut self.call_flags, - grpc_sys::GRPC_INITIAL_METADATA_CACHEABLE_REQUEST, - cacheable, - ); - self - } - /// Set write flags. pub fn write_flags(mut self, write_flags: WriteFlags) -> CallOption { self.write_flags = write_flags; diff --git a/src/channel.rs b/src/channel.rs index b477a8d9..0bbe96bd 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -21,8 +21,8 @@ use crate::env::Environment; use crate::error::Result; use crate::task::CallTag; use crate::task::Kicker; -use crate::CallOption; use crate::ResourceQuota; +use crate::{CallOption, ChannelCredentials}; pub use crate::grpc_sys::{ grpc_compression_algorithm as CompressionAlgorithms, @@ -73,6 +73,7 @@ pub enum LbPolicy { pub struct ChannelBuilder { env: Arc, options: HashMap, Options>, + credentials: Option, } impl ChannelBuilder { @@ -81,6 +82,7 @@ impl ChannelBuilder { ChannelBuilder { env, options: HashMap::new(), + credentials: None, } } @@ -416,6 +418,19 @@ impl ChannelBuilder { self } + /// Enables retry functionality. Defaults to true. When enabled, transparent + /// retries will be performed as appropriate, and configurable retries are + /// enabled when they are configured via the service config. For details, see: + /// https://github.com/grpc/proposal/blob/master/A6-client-retries.md + /// NOTE: Hedging functionality is not yet implemented. + pub fn enable_retry(mut self, enable: bool) -> ChannelBuilder { + self.options.insert( + Cow::Borrowed(grpcio_sys::GRPC_ARG_ENABLE_RETRIES), + Options::Integer(enable as i32), + ); + self + } + /// Set a raw integer configuration. /// /// This method is only for bench usage, users should use the encapsulated API instead. @@ -481,18 +496,21 @@ impl ChannelBuilder { self.build_args() } - /// Build an insecure [`Channel`] that connects to a specific address. + /// Build an [`Channel`] that connects to a specific address. pub fn connect(mut self, addr: &str) -> Channel { let args = self.prepare_connect_args(); let addr = CString::new(addr).unwrap(); let addr_ptr = addr.as_ptr(); + let mut creds = self + .credentials + .unwrap_or_else(ChannelCredentials::insecure); let channel = - unsafe { grpc_sys::grpc_insecure_channel_create(addr_ptr, args.args, ptr::null_mut()) }; + unsafe { grpcio_sys::grpc_channel_create(addr_ptr, creds.as_mut_ptr(), args.args) }; unsafe { Channel::new(self.env.pick_cq(), self.env, channel) } } - /// Build an insecure [`Channel`] taking over an established connection from + /// Build an [`Channel`] taking over an established connection from /// a file descriptor. The target string given is purely informative to /// describe the endpoint of the connection. Takes ownership of the given /// file descriptor and will close it when the connection is closed. @@ -509,7 +527,12 @@ impl ChannelBuilder { let args = self.prepare_connect_args(); let target = CString::new(target).unwrap(); let target_ptr = target.as_ptr(); - let channel = grpc_sys::grpc_insecure_channel_create_from_fd(target_ptr, fd, args.args); + // Actually only insecure credentials are supported currently. + let mut creds = self + .credentials + .unwrap_or_else(ChannelCredentials::insecure); + let channel = + grpcio_sys::grpc_channel_create_from_fd(target_ptr, fd, creds.as_mut_ptr(), args.args); Channel::new(self.env.pick_cq(), self.env, channel) } @@ -519,13 +542,10 @@ impl ChannelBuilder { mod secure_channel { use std::borrow::Cow; use std::ffi::CString; - use std::ptr; - - use crate::grpc_sys; use crate::ChannelCredentials; - use super::{Channel, ChannelBuilder, Options}; + use super::{ChannelBuilder, Options}; const OPT_SSL_TARGET_NAME_OVERRIDE: &[u8] = b"grpc.ssl_target_name_override\0"; @@ -544,21 +564,10 @@ mod secure_channel { self } - /// Build a secure [`Channel`] that connects to a specific address. - pub fn secure_connect(mut self, addr: &str, mut creds: ChannelCredentials) -> Channel { - let args = self.prepare_connect_args(); - let addr = CString::new(addr).unwrap(); - let addr_ptr = addr.as_ptr(); - let channel = unsafe { - grpc_sys::grpc_secure_channel_create( - creds.as_mut_ptr(), - addr_ptr, - args.args, - ptr::null_mut(), - ) - }; - - unsafe { Channel::new(self.env.pick_cq(), self.env, channel) } + /// Set the credentials used to build the connection. + pub fn set_credentials(mut self, creds: ChannelCredentials) -> ChannelBuilder { + self.credentials = Some(creds); + self } } } diff --git a/src/error.rs b/src/error.rs index 2d65eb23..8abe3bb7 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,5 +1,6 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. +use std::ffi::CString; use std::{error, fmt, result}; use crate::call::RpcStatus; @@ -24,7 +25,7 @@ pub enum Error { /// Failed to shutdown. ShutdownFailed, /// Failed to bind. - BindFail(String, u16), + BindFail(CString), /// gRPC completion queue is shutdown. QueueShutdown, /// Failed to create Google default credentials. diff --git a/src/lib.rs b/src/lib.rs index 970b6f3b..0e5d2251 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,7 +42,6 @@ mod error; mod log_util; mod metadata; mod quota; -#[cfg(feature = "_secure")] mod security; mod server; mod task; @@ -76,11 +75,7 @@ pub use crate::error::{Error, Result}; pub use crate::log_util::redirect_log; pub use crate::metadata::{Metadata, MetadataBuilder, MetadataIter}; pub use crate::quota::ResourceQuota; -#[cfg(feature = "_secure")] -pub use crate::security::{ - CertificateRequestType, ChannelCredentials, ChannelCredentialsBuilder, ServerCredentials, - ServerCredentialsBuilder, ServerCredentialsFetcher, -}; +pub use crate::security::*; pub use crate::server::{ CheckResult, Server, ServerBuilder, ServerChecker, Service, ServiceBuilder, ShutdownFuture, }; diff --git a/src/security/credentials.rs b/src/security/credentials.rs index 78c54cda..1627fa24 100644 --- a/src/security/credentials.rs +++ b/src/security/credentials.rs @@ -8,9 +8,9 @@ use crate::error::{Error, Result}; use crate::grpc_sys::grpc_ssl_certificate_config_reload_status::{self, *}; use crate::grpc_sys::grpc_ssl_client_certificate_request_type::*; use crate::grpc_sys::{ - self, grpc_channel_credentials, grpc_server_credentials, - grpc_ssl_client_certificate_request_type, grpc_ssl_server_certificate_config, + self, grpc_ssl_client_certificate_request_type, grpc_ssl_server_certificate_config, }; +use crate::{ChannelCredentials, ServerCredentials}; #[repr(u32)] #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] @@ -102,7 +102,7 @@ pub(crate) unsafe extern "C" fn server_cert_fetcher_wrapper( panic!("fetcher user_data must be set up!"); } let f: &mut dyn ServerCredentialsFetcher = - (&mut *(user_data as *mut Box)).as_mut(); + (*(user_data as *mut Box)).as_mut(); let result = f.fetch(); match result { Ok(Some(builder)) => { @@ -185,15 +185,14 @@ impl ServerCredentialsBuilder { /// Finalize the [`ServerCredentialsBuilder`] and build the [`ServerCredentials`]. pub fn build(self) -> ServerCredentials { - let credentials = unsafe { + unsafe { let opt = grpcio_sys::grpc_ssl_server_credentials_create_options_using_config( self.cer_request_type.to_native(), self.build_config(), ); - grpcio_sys::grpc_ssl_server_credentials_create_with_options(opt) - }; - - ServerCredentials { creds: credentials } + let credentials = grpcio_sys::grpc_ssl_server_credentials_create_with_options(opt); + ServerCredentials::from_raw(credentials) + } } } @@ -209,29 +208,28 @@ impl Drop for ServerCredentialsBuilder { } } -/// Server-side SSL credentials. -/// -/// Use [`ServerCredentialsBuilder`] to build a [`ServerCredentials`]. -pub struct ServerCredentials { - creds: *mut grpc_server_credentials, -} - -unsafe impl Send for ServerCredentials {} - impl ServerCredentials { - pub(crate) unsafe fn frow_raw(creds: *mut grpc_server_credentials) -> ServerCredentials { - ServerCredentials { creds } - } - - pub fn as_mut_ptr(&mut self) -> *mut grpc_server_credentials { - self.creds - } -} - -impl Drop for ServerCredentials { - fn drop(&mut self) { + /// Creates the credentials using a certificate config fetcher. Use this + /// method to reload the certificates and keys of the SSL server without + /// interrupting the operation of the server. Initial certificate config will be + /// fetched during server initialization. + pub fn with_fetcher( + fetcher: Box, + cer_request_type: CertificateRequestType, + ) -> Self { + let fetcher_wrap = Box::new(fetcher); + let fetcher_wrap_ptr = Box::into_raw(fetcher_wrap); unsafe { - grpc_sys::grpc_server_credentials_release(self.creds); + let opt = grpcio_sys::grpc_ssl_server_credentials_create_options_using_config_fetcher( + cer_request_type.to_native(), + Some(server_cert_fetcher_wrapper), + fetcher_wrap_ptr as _, + ); + let mut creds = ServerCredentials::from_raw( + grpcio_sys::grpc_ssl_server_credentials_create_with_options(opt), + ); + creds._fetcher = Some(Box::from_raw(fetcher_wrap_ptr)); + creds } } } @@ -331,19 +329,7 @@ impl Drop for ChannelCredentialsBuilder { } } -/// Client-side SSL credentials. -/// -/// Use [`ChannelCredentialsBuilder`] or [`ChannelCredentials::google_default_credentials`] to -/// build a [`ChannelCredentials`]. -pub struct ChannelCredentials { - creds: *mut grpc_channel_credentials, -} - impl ChannelCredentials { - pub fn as_mut_ptr(&mut self) -> *mut grpc_channel_credentials { - self.creds - } - /// Try to build a [`ChannelCredentials`] to authenticate with Google OAuth credentials. pub fn google_default_credentials() -> Result { // Initialize the runtime here. Because this is an associated method @@ -360,9 +346,3 @@ impl ChannelCredentials { } } } - -impl Drop for ChannelCredentials { - fn drop(&mut self) { - unsafe { grpc_sys::grpc_channel_credentials_release(self.creds) } - } -} diff --git a/src/security/mod.rs b/src/security/mod.rs index f2c2badc..c9354616 100644 --- a/src/security/mod.rs +++ b/src/security/mod.rs @@ -1,10 +1,81 @@ // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. +#[cfg(feature = "_secure")] mod credentials; +use grpcio_sys::{grpc_channel_credentials, grpc_server_credentials}; + +#[cfg(feature = "_secure")] pub use self::credentials::{ - CertificateRequestType, ChannelCredentials, ChannelCredentialsBuilder, ServerCredentials, - ServerCredentialsBuilder, ServerCredentialsFetcher, + CertificateRequestType, ChannelCredentialsBuilder, ServerCredentialsBuilder, + ServerCredentialsFetcher, }; -pub(crate) use self::credentials::server_cert_fetcher_wrapper; +/// Client-side SSL credentials. +/// +/// Use [`ChannelCredentialsBuilder`] or [`ChannelCredentials::google_default_credentials`] to +/// build a [`ChannelCredentials`]. +pub struct ChannelCredentials { + creds: *mut grpc_channel_credentials, +} + +impl ChannelCredentials { + pub fn as_mut_ptr(&mut self) -> *mut grpc_channel_credentials { + self.creds + } + + /// Creates an insecure channel credentials object. + pub fn insecure() -> ChannelCredentials { + unsafe { + let creds = grpcio_sys::grpc_insecure_credentials_create(); + ChannelCredentials { creds } + } + } +} + +impl Drop for ChannelCredentials { + fn drop(&mut self) { + unsafe { grpcio_sys::grpc_channel_credentials_release(self.creds) } + } +} + +/// Server-side SSL credentials. +/// +/// Use [`ServerCredentialsBuilder`] to build a [`ServerCredentials`]. +pub struct ServerCredentials { + creds: *mut grpc_server_credentials, + // Double allocation to get around C call. + #[cfg(feature = "_secure")] + _fetcher: Option>>, +} + +unsafe impl Send for ServerCredentials {} + +impl ServerCredentials { + /// Creates an insecure server credentials object. + pub fn insecure() -> ServerCredentials { + unsafe { + let creds = grpcio_sys::grpc_insecure_server_credentials_create(); + ServerCredentials::from_raw(creds) + } + } + pub(crate) unsafe fn from_raw(creds: *mut grpc_server_credentials) -> ServerCredentials { + ServerCredentials { + creds, + #[cfg(feature = "_secure")] + _fetcher: None, + } + } + + pub fn as_mut_ptr(&mut self) -> *mut grpc_server_credentials { + self.creds + } +} + +impl Drop for ServerCredentials { + fn drop(&mut self) { + unsafe { + grpcio_sys::grpc_server_credentials_release(self.creds); + } + } +} diff --git a/src/server.rs b/src/server.rs index c79a2f55..042217e5 100644 --- a/src/server.rs +++ b/src/server.rs @@ -2,13 +2,13 @@ use std::cell::UnsafeCell; use std::collections::HashMap; +use std::ffi::CString; use std::fmt::{self, Debug, Formatter}; use std::future::Future; -use std::net::{IpAddr, SocketAddr}; use std::pin::Pin; use std::ptr; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use crate::grpc_sys::{self, grpc_call_error, grpc_server}; @@ -21,8 +21,8 @@ use crate::cq::CompletionQueue; use crate::env::Environment; use crate::error::{Error, Result}; use crate::task::{CallTag, CqFuture}; -use crate::RpcContext; use crate::RpcStatus; +use crate::{RpcContext, ServerCredentials}; const DEFAULT_REQUEST_SLOTS_PER_CQ: usize = 1024; @@ -65,106 +65,6 @@ where } } -/// Given a host and port, creates a string of the form "host:port" or -/// "[host]:port", depending on whether the host is an IPv6 literal. -fn join_host_port(host: &str, port: u16) -> String { - if host.starts_with("unix:") | host.starts_with("unix-abstract:") { - format!("{}\0", host) - } else if let Ok(ip) = host.parse::() { - format!("{}\0", SocketAddr::new(ip, port)) - } else { - format!("{}:{}\0", host, port) - } -} - -#[cfg(feature = "_secure")] -mod imp { - use super::join_host_port; - use crate::grpc_sys::{self, grpc_server}; - use crate::security::ServerCredentialsFetcher; - use crate::ServerCredentials; - - pub struct Binder { - pub host: String, - pub port: u16, - cred: Option, - // Double allocation to get around C call. - #[allow(clippy::redundant_allocation)] - _fetcher: Option>>, - } - - impl Binder { - pub fn new(host: String, port: u16) -> Binder { - let cred = None; - Binder { - host, - port, - cred, - _fetcher: None, - } - } - - #[allow(clippy::redundant_allocation)] - pub fn with_cred( - host: String, - port: u16, - cred: ServerCredentials, - _fetcher: Option>>, - ) -> Binder { - let cred = Some(cred); - Binder { - host, - port, - cred, - _fetcher, - } - } - - pub unsafe fn bind(&mut self, server: *mut grpc_server) -> u16 { - let addr = join_host_port(&self.host, self.port); - let port = match self.cred.take() { - None => grpc_sys::grpc_server_add_insecure_http2_port(server, addr.as_ptr() as _), - Some(mut cert) => grpc_sys::grpc_server_add_secure_http2_port( - server, - addr.as_ptr() as _, - cert.as_mut_ptr(), - ), - }; - port as u16 - } - } -} - -#[cfg(not(feature = "_secure"))] -mod imp { - use super::join_host_port; - use crate::grpc_sys::{self, grpc_server}; - - pub struct Binder { - pub host: String, - pub port: u16, - } - - impl Binder { - pub fn new(host: String, port: u16) -> Binder { - Binder { host, port } - } - - pub unsafe fn bind(&mut self, server: *mut grpc_server) -> u16 { - let addr = join_host_port(&self.host, self.port); - grpc_sys::grpc_server_add_insecure_http2_port(server, addr.as_ptr() as _) as u16 - } - } -} - -use self::imp::Binder; - -impl Debug for Binder { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - write!(f, "Binder {{ host: {}, port: {} }}", self.host, self.port) - } -} - /// [`Service`] factory in order to configure the properties. /// /// Use it to build a service which can be registered to a server. @@ -299,7 +199,6 @@ pub struct Service { /// [`Server`] factory in order to configure the properties. pub struct ServerBuilder { env: Arc, - binders: Vec, args: Option, slots_per_cq: usize, handlers: HashMap<&'static [u8], BoxHandler>, @@ -311,7 +210,6 @@ impl ServerBuilder { pub fn new(env: Arc) -> ServerBuilder { ServerBuilder { env, - binders: Vec::new(), args: None, slots_per_cq: DEFAULT_REQUEST_SLOTS_PER_CQ, handlers: HashMap::new(), @@ -319,14 +217,6 @@ impl ServerBuilder { } } - /// Bind to an address. - /// - /// This function can be called multiple times to bind to multiple ports. - pub fn bind>(mut self, host: S, port: u16) -> ServerBuilder { - self.binders.push(Binder::new(host.into(), port)); - self - } - /// Add additional configuration for each incoming channel. pub fn channel_args(mut self, args: ChannelArgs) -> ServerBuilder { self.args = Some(args); @@ -356,22 +246,13 @@ impl ServerBuilder { } /// Finalize the [`ServerBuilder`] and build the [`Server`]. - pub fn build(mut self) -> Result { + pub fn build(self) -> Result { let args = self .args .as_ref() .map_or_else(ptr::null, ChannelArgs::as_ptr); unsafe { let server = grpc_sys::grpc_server_create(args, ptr::null_mut()); - for binder in self.binders.iter_mut() { - let bind_port = binder.bind(server); - if bind_port == 0 { - grpc_sys::grpc_server_destroy(server); - return Err(Error::BindFail(binder.host.clone(), binder.port)); - } - binder.port = bind_port; - } - for cq in self.env.completion_queues() { let cq_ref = cq.borrow()?; grpc_sys::grpc_server_register_completion_queue( @@ -385,8 +266,8 @@ impl ServerBuilder { env: self.env, core: Arc::new(ServerCore { server, + creds: Mutex::new(Vec::new()), shutdown: AtomicBool::new(false), - binders: self.binders, slots_per_cq: self.slots_per_cq, }), handlers: self.handlers, @@ -396,66 +277,9 @@ impl ServerBuilder { } } -#[cfg(feature = "_secure")] -mod secure_server { - use super::{Binder, ServerBuilder}; - use crate::grpc_sys; - use crate::security::{ - server_cert_fetcher_wrapper, CertificateRequestType, ServerCredentials, - ServerCredentialsFetcher, - }; - - impl ServerBuilder { - /// Bind to an address with credentials for secure connection. - /// - /// This function can be called multiple times to bind to multiple ports. - pub fn bind_with_cred>( - mut self, - host: S, - port: u16, - c: ServerCredentials, - ) -> ServerBuilder { - self.binders - .push(Binder::with_cred(host.into(), port, c, None)); - self - } - - /// Bind to an address for secure connection. - /// - /// The required credentials will be fetched using provided `fetcher`. This - /// function can be called multiple times to bind to multiple ports. - pub fn bind_with_fetcher>( - mut self, - host: S, - port: u16, - fetcher: Box, - cer_request_type: CertificateRequestType, - ) -> ServerBuilder { - let fetcher_wrap = Box::new(fetcher); - let fetcher_wrap_ptr = Box::into_raw(fetcher_wrap); - let (sc, fb) = unsafe { - let opt = grpc_sys::grpc_ssl_server_credentials_create_options_using_config_fetcher( - cer_request_type.to_native(), - Some(server_cert_fetcher_wrapper), - fetcher_wrap_ptr as _, - ); - ( - ServerCredentials::frow_raw( - grpcio_sys::grpc_ssl_server_credentials_create_with_options(opt), - ), - Box::from_raw(fetcher_wrap_ptr), - ) - }; - self.binders - .push(Binder::with_cred(host.into(), port, sc, Some(fb))); - self - } - } -} - struct ServerCore { server: *mut grpc_server, - binders: Vec, + creds: Mutex>, slots_per_cq: usize, shutdown: AtomicBool, } @@ -606,9 +430,34 @@ impl Server { } } - /// Get binded addresses pairs. - pub fn bind_addrs(&self) -> impl ExactSizeIterator { - self.core.binders.iter().map(|b| (&b.host, b.port)) + /// Try binding the server to the given `addr` endpoint (eg, localhost:1234, + /// 192.168.1.1:31416, [::1]:27182, etc.). + /// + /// It can be invoked multiple times. Should be used before starting the server. + /// + /// # Return + /// + /// The bound port is returned on success. + pub fn add_listening_port( + &mut self, + addr: impl Into, + mut creds: ServerCredentials, + ) -> Result { + // There is no Null in UTF-8 string. + let addr = CString::new(addr.into()).unwrap(); + let port = unsafe { + grpcio_sys::grpc_server_add_http2_port( + self.core.server, + addr.as_ptr() as _, + creds.as_mut_ptr(), + ) as u16 + }; + if port != 0 { + self.core.creds.lock().unwrap().push(creds); + Ok(port) + } else { + Err(Error::BindFail(addr)) + } } /// Add an rpc channel for an established connection represented as a file @@ -621,8 +470,9 @@ impl Server { /// this call, the socket must not be accessed (read / written / closed) /// by other code. #[cfg(unix)] - pub unsafe fn add_insecure_channel_from_fd(&self, fd: ::std::os::raw::c_int) { - grpc_sys::grpc_server_add_insecure_channel_from_fd(self.core.server, ptr::null_mut(), fd) + pub unsafe fn add_channel_from_fd(&mut self, fd: ::std::os::raw::c_int) { + let mut creds = ServerCredentials::insecure(); + grpcio_sys::grpc_server_add_channel_from_fd(self.core.server, fd, creds.as_mut_ptr()) } } @@ -642,29 +492,11 @@ impl Drop for Server { impl Debug for Server { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - write!(f, "Server {:?}", self.core.binders) - } -} - -#[cfg(test)] -mod tests { - use super::join_host_port; - - #[test] - fn test_join_host_port() { - let tbl = vec![ - ("localhost", 0u16, "localhost:0\0"), - ("127.0.0.1", 100u16, "127.0.0.1:100\0"), - ("::1", 0u16, "[::1]:0\0"), - ( - "fe80::7376:45d5:fb08:61e3", - 10028u16, - "[fe80::7376:45d5:fb08:61e3]:10028\0", - ), - ]; - - for (h, p, e) in &tbl { - assert_eq!(join_host_port(h, *p), e.to_owned()); - } + write!( + f, + "Server {{ handlers: {}, checkers: {} }}", + self.handlers.len(), + self.checkers.len() + ) } } diff --git a/tests-and-examples/examples/hello_world/server.rs b/tests-and-examples/examples/hello_world/server.rs index ed44aab1..1bf2d976 100644 --- a/tests-and-examples/examples/hello_world/server.rs +++ b/tests-and-examples/examples/hello_world/server.rs @@ -13,7 +13,10 @@ use std::{io, thread}; use futures_channel::oneshot; use futures_executor::block_on; use futures_util::future::{FutureExt as _, TryFutureExt as _}; -use grpcio::{ChannelBuilder, Environment, ResourceQuota, RpcContext, ServerBuilder, UnarySink}; +use grpcio::{ + ChannelBuilder, Environment, ResourceQuota, RpcContext, ServerBuilder, ServerCredentials, + UnarySink, +}; use grpcio_proto::example::helloworld::{HelloReply, HelloRequest}; use grpcio_proto::example::helloworld_grpc::{create_greeter, Greeter}; @@ -38,20 +41,21 @@ fn main() { let _guard = log_util::init_log(None); let env = Arc::new(Environment::new(1)); let service = create_greeter(GreeterService); + let addr = "127.0.0.1:50051"; let quota = ResourceQuota::new(Some("HelloServerQuota")).resize_memory(1024 * 1024); let ch_builder = ChannelBuilder::new(env.clone()).set_resource_quota(quota); let mut server = ServerBuilder::new(env) .register_service(service) - .bind("127.0.0.1", 50_051) .channel_args(ch_builder.build_args()) .build() .unwrap(); + server + .add_listening_port(addr, ServerCredentials::insecure()) + .unwrap(); server.start(); - for (host, port) in server.bind_addrs() { - info!("listening on {}:{}", host, port); - } + info!("listening on {addr}"); let (tx, rx) = oneshot::channel(); thread::spawn(move || { info!("Press ENTER to exit..."); diff --git a/tests-and-examples/examples/load_balancing/server.rs b/tests-and-examples/examples/load_balancing/server.rs index 7b42697e..6492f1d6 100644 --- a/tests-and-examples/examples/load_balancing/server.rs +++ b/tests-and-examples/examples/load_balancing/server.rs @@ -14,7 +14,8 @@ use futures_channel::oneshot; use futures_executor::block_on; use futures_util::future::{FutureExt as _, TryFutureExt as _}; use grpcio::{ - ChannelBuilder, Environment, ResourceQuota, RpcContext, Server, ServerBuilder, UnarySink, + ChannelBuilder, Environment, ResourceQuota, RpcContext, Server, ServerBuilder, + ServerCredentials, UnarySink, }; use grpcio_proto::example::helloworld::{HelloReply, HelloRequest}; @@ -38,7 +39,7 @@ impl Greeter for GreeterService { } } -fn build_server(env: Arc, port: u16) -> Server { +fn build_server(env: Arc, mut port: u16) -> Server { let service = create_greeter(GreeterService { name: format!("{}", port), }); @@ -47,14 +48,14 @@ fn build_server(env: Arc, port: u16) -> Server { let mut server = ServerBuilder::new(env) .register_service(service) - .bind("127.0.0.1", port) .channel_args(ch_builder.build_args()) .build() .unwrap(); + port = server + .add_listening_port(&format!("127.0.0.1:{port}"), ServerCredentials::insecure()) + .unwrap(); server.start(); - for (host, port) in server.bind_addrs() { - info!("listening on {}:{}", host, port); - } + info!("listening on 127.0.0.1:{port}"); server } diff --git a/tests-and-examples/examples/route_guide/server.rs b/tests-and-examples/examples/route_guide/server.rs index 24a6cd26..e1441732 100644 --- a/tests-and-examples/examples/route_guide/server.rs +++ b/tests-and-examples/examples/route_guide/server.rs @@ -134,6 +134,7 @@ impl RouteGuide for RouteGuideService { } fn main() { + let addr = "127.0.0.1:50051"; let _guard = log_util::init_log(None); let env = Arc::new(Environment::new(2)); let instance = RouteGuideService { @@ -143,13 +144,13 @@ fn main() { let service = create_route_guide(instance); let mut server = ServerBuilder::new(env) .register_service(service) - .bind("127.0.0.1", 50_051) .build() .unwrap(); + server + .add_listening_port(addr, ServerCredentials::insecure()) + .unwrap(); server.start(); - for (host, port) in server.bind_addrs() { - info!("listening on {}:{}", host, port); - } + info!("listening on {addr}"); let (tx, rx) = oneshot::channel(); thread::spawn(move || { info!("Press ENTER to exit..."); diff --git a/tests-and-examples/tests/cases/auth_context.rs b/tests-and-examples/tests/cases/auth_context.rs index dfb554f8..c5dd3f35 100644 --- a/tests-and-examples/tests/cases/auth_context.rs +++ b/tests-and-examples/tests/cases/auth_context.rs @@ -59,11 +59,12 @@ fn test_auth_context() { .build(); let mut server = ServerBuilder::new(env.clone()) .register_service(service) - .bind_with_cred("127.0.0.1", 0, server_credentials) .build() .unwrap(); + let port = server + .add_listening_port("127.0.0.1:0", server_credentials) + .unwrap(); server.start(); - let port = server.bind_addrs().next().unwrap().1; let (client_crt, client_key) = read_cert_pair("client1").unwrap(); let client_credentials = ChannelCredentialsBuilder::new() @@ -72,7 +73,8 @@ fn test_auth_context() { .build(); let ch = ChannelBuilder::new(env) .override_ssl_target("rust.test.google.fr") - .secure_connect(&format!("127.0.0.1:{}", port), client_credentials); + .set_credentials(client_credentials) + .connect(&format!("127.0.0.1:{port}")); let client = GreeterClient::new(ch); let mut req = HelloRequest::default(); @@ -102,16 +104,17 @@ fn test_auth_context() { fn test_no_crash_on_insecure() { let env = Arc::new(EnvBuilder::new().build()); let (tx, rx) = mpsc::channel(); - let service = create_greeter(GreeterService { tx: tx }); + let service = create_greeter(GreeterService { tx }); let mut server = ServerBuilder::new(env.clone()) .register_service(service) - .bind("127.0.0.1", 0) .build() .unwrap(); + let port = server + .add_listening_port("127.0.0.1:0", ServerCredentials::insecure()) + .unwrap(); server.start(); - let port = server.bind_addrs().next().unwrap().1; - let ch = ChannelBuilder::new(env).connect(&format!("127.0.0.1:{}", port)); + let ch = ChannelBuilder::new(env).connect(&format!("127.0.0.1:{port}")); let client = GreeterClient::new(ch); let mut req = HelloRequest::default(); @@ -121,7 +124,7 @@ fn test_no_crash_on_insecure() { assert_eq!(resp.get_message(), "hello world"); // Test auth_context keys - let _empty_keys: mpsc::RecvTimeoutError = rx - .recv_timeout(Duration::from_secs(1)) - .expect_err("Received auth context even though not authenticated"); + let ctx_map = rx.recv_timeout(Duration::from_secs(1)).unwrap().unwrap(); + assert_eq!(ctx_map.get("transport_security_type").unwrap(), "insecure"); + assert_eq!(ctx_map.get("security_level").unwrap(), "TSI_SECURITY_NONE"); } diff --git a/tests-and-examples/tests/cases/cancel.rs b/tests-and-examples/tests/cases/cancel.rs index 946e2b2b..89cef28f 100644 --- a/tests-and-examples/tests/cases/cancel.rs +++ b/tests-and-examples/tests/cases/cancel.rs @@ -128,12 +128,13 @@ fn prepare_suite() -> (CancelService, RouteGuideClient, Server) { let service = CancelService::new(); let mut server = ServerBuilder::new(env.clone()) .register_service(create_route_guide(service.clone())) - .bind("127.0.0.1", 0) .build() .unwrap(); + let port = server + .add_listening_port("127.0.0.1:0", ServerCredentials::insecure()) + .unwrap(); server.start(); - let port = server.bind_addrs().next().unwrap().1; - let ch = ChannelBuilder::new(env).connect(&format!("127.0.0.1:{}", port)); + let ch = ChannelBuilder::new(env).connect(&format!("127.0.0.1:{port}")); let client = RouteGuideClient::new(ch); (service, client, server) } diff --git a/tests-and-examples/tests/cases/credential.rs b/tests-and-examples/tests/cases/credential.rs index 28f4d1bd..c397fd26 100644 --- a/tests-and-examples/tests/cases/credential.rs +++ b/tests-and-examples/tests/cases/credential.rs @@ -4,7 +4,8 @@ use futures_util::future::{FutureExt as _, TryFutureExt as _}; use grpcio::{ CertificateRequestType, ChannelBuilder, ChannelCredentialsBuilder, EnvBuilder, RpcContext, - ServerBuilder, ServerCredentialsBuilder, ServerCredentialsFetcher, UnarySink, + ServerBuilder, ServerCredentials, ServerCredentialsBuilder, ServerCredentialsFetcher, + UnarySink, }; use grpcio_proto::example::helloworld::*; use std::sync::atomic::{AtomicBool, Ordering}; @@ -79,28 +80,29 @@ fn test_reload_new() { let env = Arc::new(EnvBuilder::new().build()); let service = create_greeter(GreeterService); let switch = Arc::new(AtomicBool::new(false)); + let server_creds = ServerCredentials::with_fetcher( + Box::new(DataReload { + switch: switch.clone(), + }), + CertificateRequestType::DontRequestClientCertificate, + ); let mut server = ServerBuilder::new(env.clone()) .register_service(service) - .bind_with_fetcher( - "127.0.0.1", - 0, - Box::new(DataReload { - switch: switch.clone(), - }), - CertificateRequestType::DontRequestClientCertificate, - ) .build() .unwrap(); + let port = server + .add_listening_port("127.0.0.1:0", server_creds) + .unwrap(); server.start(); - let port = server.bind_addrs().next().unwrap().1; // To connect the server whose CN is "*.test.google.com.au". - let cred = ChannelCredentialsBuilder::new() + let creds = ChannelCredentialsBuilder::new() .root_cert(read_single_crt("ca").unwrap().into()) .build(); let ch = ChannelBuilder::new(env.clone()) .override_ssl_target("rust.test.google.com.au") - .secure_connect(&format!("127.0.0.1:{}", port.clone()), cred); + .set_credentials(creds) + .connect(&format!("127.0.0.1:{port}")); let client1 = GreeterClient::new(ch); let mut req = HelloRequest::default(); req.set_name("world".to_owned()); @@ -109,12 +111,13 @@ fn test_reload_new() { // To connect the server whose CN is "*.test.google.fr". switch.store(true, Ordering::Relaxed); - let cred = ChannelCredentialsBuilder::new() + let creds = ChannelCredentialsBuilder::new() .root_cert(read_single_crt("ca").unwrap().into()) .build(); let ch = ChannelBuilder::new(env.clone()) .override_ssl_target("rust.test.google.fr") - .secure_connect(&format!("127.0.0.1:{}", port.clone()), cred); + .set_credentials(creds) + .connect(&format!("127.0.0.1:{}", port.clone())); let client2 = GreeterClient::new(ch); let mut req = HelloRequest::default(); req.set_name("world".to_owned()); @@ -132,27 +135,28 @@ fn test_reload_new() { fn test_reload_fail() { let env = Arc::new(EnvBuilder::new().build()); let service = create_greeter(GreeterService); + let server_creds = ServerCredentials::with_fetcher( + Box::new(DataReloadFail { + initial: AtomicBool::new(false), + }), + CertificateRequestType::DontRequestClientCertificate, + ); let mut server = ServerBuilder::new(env.clone()) .register_service(service) - .bind_with_fetcher( - "127.0.0.1", - 0, - Box::new(DataReloadFail { - initial: AtomicBool::new(false), - }), - CertificateRequestType::DontRequestClientCertificate, - ) .build() .unwrap(); + let port = server + .add_listening_port("127.0.0.1:0", server_creds) + .unwrap(); server.start(); - let port = server.bind_addrs().next().unwrap().1; - let cred = ChannelCredentialsBuilder::new() + let creds = ChannelCredentialsBuilder::new() .root_cert(read_single_crt("ca").unwrap().into()) .build(); let ch = ChannelBuilder::new(env) .override_ssl_target("rust.test.google.fr") - .secure_connect(&format!("127.0.0.1:{}", port), cred); + .set_credentials(creds) + .connect(&format!("127.0.0.1:{port}")); let client = GreeterClient::new(ch); for _ in 0..10 { diff --git a/tests-and-examples/tests/cases/kick.rs b/tests-and-examples/tests/cases/kick.rs index 6f93b12a..422ba07e 100644 --- a/tests-and-examples/tests/cases/kick.rs +++ b/tests-and-examples/tests/cases/kick.rs @@ -48,12 +48,13 @@ fn test_kick() { let service = create_greeter(GreeterService { tx: tx.clone() }); let mut server = ServerBuilder::new(env.clone()) .register_service(service) - .bind("127.0.0.1", 0) .build() .unwrap(); + let port = server + .add_listening_port("127.0.0.1:0", ServerCredentials::insecure()) + .unwrap(); server.start(); - let port = server.bind_addrs().next().unwrap().1; - let ch = ChannelBuilder::new(env).connect(&format!("127.0.0.1:{}", port)); + let ch = ChannelBuilder::new(env).connect(&format!("127.0.0.1:{port}")); let client = GreeterClient::new(ch); let mut req = HelloRequest::default(); req.set_name("world".to_owned()); @@ -202,12 +203,13 @@ fn test_deadlock() { let service = create_greeter(DeadLockService { reporter: tx }); let mut server = ServerBuilder::new(env.clone()) .register_service(service) - .bind("127.0.0.1", 0) .build() .unwrap(); + let port = server + .add_listening_port("127.0.0.1:0", ServerCredentials::insecure()) + .unwrap(); server.start(); - let port = server.bind_addrs().next().unwrap().1; - let ch = ChannelBuilder::new(env).connect(&format!("127.0.0.1:{}", port)); + let ch = ChannelBuilder::new(env).connect(&format!("127.0.0.1:{port}")); let client = GreeterClient::new(ch); let mut req = HelloRequest::default(); req.set_name("world".to_owned()); diff --git a/tests-and-examples/tests/cases/metadata.rs b/tests-and-examples/tests/cases/metadata.rs index 4deaa47f..53c441d6 100644 --- a/tests-and-examples/tests/cases/metadata.rs +++ b/tests-and-examples/tests/cases/metadata.rs @@ -73,12 +73,13 @@ fn test_metadata() { let service = create_route_guide(GreeterService); let mut server = ServerBuilder::new(env.clone()) .register_service(service) - .bind("127.0.0.1", 0) .build() .unwrap(); + let port = server + .add_listening_port("127.0.0.1:0", ServerCredentials::insecure()) + .unwrap(); server.start(); - let port = server.bind_addrs().next().unwrap().1; - let ch = ChannelBuilder::new(env).connect(&format!("127.0.0.1:{}", port)); + let ch = ChannelBuilder::new(env).connect(&format!("127.0.0.1:{port}")); let client = RouteGuideClient::new(ch); let mut builder = MetadataBuilder::with_capacity(3); @@ -119,12 +120,13 @@ fn test_rich_error() { let service = create_greeter(GreeterService); let mut server = ServerBuilder::new(env.clone()) .register_service(service) - .bind("127.0.0.1", 0) .build() .unwrap(); + let port = server + .add_listening_port("127.0.0.1:0", ServerCredentials::insecure()) + .unwrap(); server.start(); - let port = server.bind_addrs().next().unwrap().1; - let ch = ChannelBuilder::new(env).connect(&format!("127.0.0.1:{}", port)); + let ch = ChannelBuilder::new(env).connect(&format!("127.0.0.1:{port}")); let client = GreeterClient::new(ch); let mut req = HelloRequest::default(); diff --git a/tests-and-examples/tests/cases/misc.rs b/tests-and-examples/tests/cases/misc.rs index 25ace7ec..552655ba 100644 --- a/tests-and-examples/tests/cases/misc.rs +++ b/tests-and-examples/tests/cases/misc.rs @@ -60,12 +60,13 @@ fn test_peer() { let service = create_greeter(PeerService); let mut server = ServerBuilder::new(env.clone()) .register_service(service) - .bind("127.0.0.1", 0) .build() .unwrap(); + let port = server + .add_listening_port("127.0.0.1:0", ServerCredentials::insecure()) + .unwrap(); server.start(); - let port = server.bind_addrs().next().unwrap().1; - let ch = ChannelBuilder::new(env).connect(&format!("127.0.0.1:{}", port)); + let ch = ChannelBuilder::new(env).connect(&format!("127.0.0.1:{port}")); let client = GreeterClient::new(ch); let req = HelloRequest::default(); @@ -127,14 +128,15 @@ fn test_soundness() { }; let mut server = ServerBuilder::new(env.clone()) .register_service(create_greeter(service)) - .bind("127.0.0.1", 0) .build() .unwrap(); + let port = server + .add_listening_port("127.0.0.1:0", ServerCredentials::insecure()) + .unwrap(); server.start(); - let port = server.bind_addrs().next().unwrap().1; let spawn_reqs = |env| -> JoinHandle<()> { - let ch = ChannelBuilder::new(env).connect(&format!("127.0.0.1:{}", port)); + let ch = ChannelBuilder::new(env).connect(&format!("127.0.0.1:{port}")); let client = GreeterClient::new(ch); let mut resps = Vec::with_capacity(3000); thread::spawn(move || { @@ -173,9 +175,11 @@ mod unix_domain_socket { let mut server = ServerBuilder::new(env.clone()) .register_service(service) - .bind(path, 0) .build() .unwrap(); + server + .add_listening_port(path, ServerCredentials::insecure()) + .unwrap(); server.start(); let ch = ChannelBuilder::new(env).connect(path); let client = GreeterClient::new(ch); @@ -213,12 +217,13 @@ fn test_shutdown_when_exists_grpc_call() { let service = create_greeter(SleepService(true)); let mut server = ServerBuilder::new(env.clone()) .register_service(service) - .bind("127.0.0.1", 0) .build() .unwrap(); + let port = server + .add_listening_port("127.0.0.1:0", ServerCredentials::insecure()) + .unwrap(); server.start(); - let port = server.bind_addrs().next().unwrap().1; - let ch = ChannelBuilder::new(env).connect(&format!("127.0.0.1:{}", port)); + let ch = ChannelBuilder::new(env).connect(&format!("127.0.0.1:{port}")); let client = GreeterClient::new(ch); let req = HelloRequest::default(); @@ -241,12 +246,13 @@ fn test_custom_checker_server_side() { let mut server = ServerBuilder::new(env.clone()) .add_checker(checker) .register_service(service) - .bind("127.0.0.1", 0) .build() .unwrap(); + let port = server + .add_listening_port("127.0.0.1:0", ServerCredentials::insecure()) + .unwrap(); server.start(); - let port = server.bind_addrs().next().unwrap().1; - let ch = ChannelBuilder::new(env).connect(&format!("127.0.0.1:{}", port)); + let ch = ChannelBuilder::new(env).connect(&format!("127.0.0.1:{port}")); let client = GreeterClient::new(ch); let req = HelloRequest::default(); @@ -289,12 +295,13 @@ fn test_connectivity() { let service = create_greeter(PeerService); let mut server = ServerBuilder::new(env.clone()) .register_service(service) - .bind("127.0.0.1", 0) .build() .unwrap(); + let port = server + .add_listening_port("127.0.0.1:0", ServerCredentials::insecure()) + .unwrap(); server.start(); - let port = server.bind_addrs().next().unwrap().1; - let ch = ChannelBuilder::new(env.clone()).connect(&format!("127.0.0.1:{}", port)); + let ch = ChannelBuilder::new(env.clone()).connect(&format!("127.0.0.1:{port}")); assert!(block_on(ch.wait_for_connected(Duration::from_secs(3)))); assert_eq!( ch.check_connectivity_state(false), @@ -350,9 +357,11 @@ fn test_connectivity() { let service = create_greeter(PeerService); let mut server = ServerBuilder::new(env.clone()) .register_service(service) - .bind("localhost", port) .build() .unwrap(); + server + .add_listening_port(&format!("localhost:{port}"), ServerCredentials::insecure()) + .unwrap(); server.start(); assert!(block_on(ch.wait_for_connected(Duration::from_secs(3)))); assert_eq!( @@ -375,11 +384,12 @@ fn test_channelz() { let service = create_greeter(PeerService); let mut server = ServerBuilder::new(env.clone()) .register_service(service) - .bind("127.0.0.1", 0) .build() .unwrap(); + let port = server + .add_listening_port("127.0.0.1:0", ServerCredentials::insecure()) + .unwrap(); server.start(); - let port = server.bind_addrs().next().unwrap().1; let mut res = None; channelz::get_servers(0, |s| { res = Some(s.to_string()); @@ -398,7 +408,7 @@ fn test_channelz() { assert_eq!(res, Some(String::new())); res = None; - let ch = ChannelBuilder::new(env.clone()).connect(&format!("127.0.0.1:{}", port)); + let ch = ChannelBuilder::new(env.clone()).connect(&format!("127.0.0.1:{port}")); assert!(block_on(ch.wait_for_connected(Duration::from_secs(3)))); channelz::get_top_channels(0, |s| { res = Some(s.to_string()); diff --git a/tests-and-examples/tests/cases/stream.rs b/tests-and-examples/tests/cases/stream.rs index a0a29104..a522d923 100644 --- a/tests-and-examples/tests/cases/stream.rs +++ b/tests-and-examples/tests/cases/stream.rs @@ -11,7 +11,7 @@ use futures_util::{ }; use grpcio::{ ChannelBuilder, ClientStreamingSink, DuplexSink, EnvBuilder, RequestStream, RpcContext, - ServerBuilder, ServerStreamingSink, UnarySink, WriteFlags, + ServerBuilder, ServerCredentials, ServerStreamingSink, UnarySink, WriteFlags, }; use grpcio_proto::example::route_guide::*; @@ -84,12 +84,13 @@ fn test_client_send_all() { let service = create_route_guide(RouteGuideService {}); let mut server = ServerBuilder::new(env.clone()) .register_service(service) - .bind("127.0.0.1", 0) .build() .unwrap(); + let port = server + .add_listening_port("127.0.0.1:0", ServerCredentials::insecure()) + .unwrap(); server.start(); - let port = server.bind_addrs().next().unwrap().1; - let ch = ChannelBuilder::new(env).connect(&format!("127.0.0.1:{}", port)); + let ch = ChannelBuilder::new(env).connect(&format!("127.0.0.1:{port}")); let client = RouteGuideClient::new(ch); let exec_test_f = async move {