diff --git a/Cargo.lock b/Cargo.lock index 92b5099468fc..6baacabc02eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3126,6 +3126,7 @@ dependencies = [ "tonic", "tracing", "tracing-subscriber", + "trait-variant", ] [[package]] @@ -3292,6 +3293,7 @@ dependencies = [ "async-trait", "bincode", "bytes", + "cfg-if", "cfg_aliases", "clap", "dashmap", @@ -3321,8 +3323,10 @@ dependencies = [ "tonic-build", "tonic-health", "tonic-reflection", + "tonic-web-wasm-client", "tower", "tracing", + "wasm-bindgen-test", ] [[package]] @@ -6170,6 +6174,30 @@ dependencies = [ "tracing", ] +[[package]] +name = "tonic-web-wasm-client" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79bb296fba9974fbc7ea2f1364a661c0e7acb7ebfca648343e5a4ac44ec9a0ec" +dependencies = [ + "base64 0.21.7", + "byteorder", + "bytes", + "futures-util", + "http 0.2.12", + "http-body 0.4.6", + "httparse", + "js-sys", + "pin-project", + "thiserror", + "tonic", + "tower-service", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-streams", + "web-sys", +] + [[package]] name = "tower" version = "0.4.13" @@ -6301,6 +6329,17 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "trait-variant" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45e1a477061e97925d81a2f89fb73b2b8038e6baa5a0023bad380ac23b5f4fa6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.52", +] + [[package]] name = "try-lock" version = "0.2.5" @@ -6586,9 +6625,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.39" +version = "0.4.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac36a15a220124ac510204aec1c3e5db8a22ab06fd6706d881dc6149f8ed9a12" +checksum = "76bc14366121efc8dbb487ab05bcc9d346b3b5ec0eaa76e46594cabbe51762c0" dependencies = [ "cfg-if", "js-sys", @@ -6625,6 +6664,31 @@ version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" +[[package]] +name = "wasm-bindgen-test" +version = "0.3.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9bf62a58e0780af3e852044583deee40983e5886da43a271dd772379987667b" +dependencies = [ + "console_error_panic_hook", + "js-sys", + "scoped-tls", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-bindgen-test-macro", +] + +[[package]] +name = "wasm-bindgen-test-macro" +version = "0.3.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7f89739351a2e03cb94beb799d47fb2cac01759b40ec441f7de39b00cbf7ef0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.52", +] + [[package]] name = "wasm-encoder" version = "0.24.1" @@ -6665,6 +6729,19 @@ dependencies = [ "wasmparser 0.107.0", ] +[[package]] +name = "wasm-streams" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b65dc4c90b63b118468cf747d8bf3566c1913ef60be765b5730ead9e0a3ba129" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasmer" version = "3.1.1" diff --git a/Cargo.toml b/Cargo.toml index f9d7d953c3ca..4f59d5328ae6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,7 +60,6 @@ custom_debug_derive = "0.6.1" dashmap = "5.5.3" derive_more = "0.99.17" dirs = "5.0.1" -ed25519 = "2.2" ed25519-dalek = { version = "2.1.1", features = ["batch", "serde"] } either = "1.10.0" frunk = "0.4.2" @@ -117,6 +116,7 @@ tonic-build = { version = "0.11", default-features = false } tonic-health = "0.11" tonic-reflection = "0.11" tonic-web = "0.11" +tonic-web-wasm-client = "0.5.1" tokio = "1.36.0" tokio-stream = "0.1.14" tokio-test = "0.4.3" @@ -126,6 +126,8 @@ tower-http = "0.5.2" tower = "0.4.13" tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", default-features = false, features = ["env-filter"] } +trait-variant = "0.1.1" +wasm-bindgen-test = "0.3.42" wasm-encoder = "0.24.1" wasmer = { version = "=3.1.1", features = ["singlepass"] } wasmer-middlewares = "=3.1.1" @@ -142,7 +144,6 @@ linera-chain = { version = "0.9.0", path = "./linera-chain" } linera-core = { version = "0.9.0", path = "./linera-core", default-features = false } linera-execution = { version = "0.9.0", path = "./linera-execution", default-features = false } linera-indexer = { path = "./linera-indexer/lib" } -linera-indexer-example = { path = "./linera-indexer/example" } linera-indexer-graphql-client = { path = "./linera-indexer/graphql-client" } linera-indexer-plugins = { path = "./linera-indexer/plugins" } linera-rpc = { version = "0.9.0", path = "./linera-rpc" } diff --git a/examples/Cargo.lock b/examples/Cargo.lock index d7ce6511cc14..a7312b01ad30 100644 --- a/examples/Cargo.lock +++ b/examples/Cargo.lock @@ -1893,6 +1893,7 @@ dependencies = [ "tokio-stream", "tonic", "tracing", + "trait-variant", ] [[package]] @@ -3892,6 +3893,17 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "trait-variant" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45e1a477061e97925d81a2f89fb73b2b8038e6baa5a0023bad380ac23b5f4fa6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.52", +] + [[package]] name = "try-lock" version = "0.2.5" diff --git a/flake.nix b/flake.nix index 2fd5fd4994bf..24a62f65e479 100644 --- a/flake.nix +++ b/flake.nix @@ -18,11 +18,8 @@ let cargoToml = builtins.fromTOML (builtins.readFile ./Cargo.toml); nonRustDeps = with pkgs; [ + # for building clang - jq - kubernetes-helm - kind - kubectl libclang.lib libiconv nodejs @@ -30,6 +27,17 @@ protobuf pkg-config rocksdb + + # for testing + jq + kubernetes-helm + kind + kubectl + + # for Wasm testing + chromium + chromedriver + wasm-pack ]; rustBuildToolchain = (pkgs.rust-bin.fromRustupToolchainFile ./toolchains/build/rust-toolchain.toml); diff --git a/linera-core/Cargo.toml b/linera-core/Cargo.toml index 39504a5e7fbc..00c2673a088d 100644 --- a/linera-core/Cargo.toml +++ b/linera-core/Cargo.toml @@ -72,6 +72,7 @@ tokio.workspace = true tokio-stream.workspace = true tonic.workspace = true tracing.workspace = true +trait-variant.workspace = true [target.'cfg(not(target_arch = "wasm32"))'.dependencies] linera-storage-service.workspace = true diff --git a/linera-core/src/client.rs b/linera-core/src/client.rs index f3ed2eb1e0ba..c4cdae144786 100644 --- a/linera-core/src/client.rs +++ b/linera-core/src/client.rs @@ -8,8 +8,8 @@ use crate::{ }, local_node::{LocalNodeClient, LocalNodeError}, node::{ - CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode, - ValidatorNodeProvider, + CrossChainMessageDelivery, LocalValidatorNode, LocalValidatorNodeProvider, NodeError, + NotificationStream, ValidatorNodeProvider, }, notifier::Notifier, updater::{communicate_with_quorum, CommunicateAction, CommunicationError, ValidatorUpdater}, @@ -272,7 +272,7 @@ enum ReceiveCertificateMode { impl<P, S> ChainClient<P, S> where - P: ValidatorNodeProvider + Sync, + P: LocalValidatorNodeProvider + Sync, S: Storage + Clone + Send + Sync + 'static, ViewError: From<S::ContextError>, { @@ -722,7 +722,7 @@ where mut node_client: LocalNodeClient<S>, ) -> Result<(ValidatorName, u64, Vec<Certificate>), NodeError> where - A: ValidatorNode + Send + Sync + 'static + Clone, + A: LocalValidatorNode + Clone + 'static, { // Retrieve newly received certificates from this validator. let query = ChainInfoQuery::new(chain_id).with_received_log_excluding_first_nth(tracker); @@ -2003,16 +2003,19 @@ impl<P, S> Clone for ArcChainClient<P, S> { } } +impl<P, S> ArcChainClient<P, S> { + pub fn new(client: ChainClient<P, S>) -> Self { + Self(Arc::new(Mutex::new(client))) + } +} + impl<P, S> ArcChainClient<P, S> where P: ValidatorNodeProvider + Sync, + <<P as ValidatorNodeProvider>::Node as crate::node::ValidatorNode>::NotificationStream: Send, S: Storage + Clone + Send + Sync + 'static, ViewError: From<S::ContextError>, { - pub fn new(client: ChainClient<P, S>) -> Self { - Self(Arc::new(Mutex::new(client))) - } - async fn local_chain_info( &self, chain_id: ChainId, @@ -2040,7 +2043,7 @@ where async fn process_notification( &self, name: ValidatorName, - node: P::Node, + node: <P as ValidatorNodeProvider>::Node, mut local_node: LocalNodeClient<S>, notification: Notification, ) { @@ -2198,7 +2201,7 @@ where pub async fn find_received_certificates_from_validator( &self, name: ValidatorName, - node: P::Node, + node: <P as ValidatorNodeProvider>::Node, node_client: LocalNodeClient<S>, ) -> Result<(), ChainClientError> { let ((committees, max_epoch), chain_id, current_tracker) = { diff --git a/linera-core/src/local_node.rs b/linera-core/src/local_node.rs index 3e6f99b3646b..0703e539c3dd 100644 --- a/linera-core/src/local_node.rs +++ b/linera-core/src/local_node.rs @@ -4,7 +4,7 @@ use crate::{ data_types::{BlockHeightRange, ChainInfo, ChainInfoQuery, ChainInfoResponse}, - node::{NotificationStream, ValidatorNode}, + node::{LocalValidatorNode, NotificationStream}, notifier::Notifier, worker::{Notification, ValidatorWorker, WorkerError, WorkerState}, }; @@ -182,7 +182,7 @@ where certificates: Vec<Certificate>, ) -> Option<Box<ChainInfo>> where - A: ValidatorNode + Send + Sync + 'static + Clone, + A: LocalValidatorNode + Clone + 'static, { let mut info = None; for certificate in certificates { @@ -268,7 +268,7 @@ where target_next_block_height: BlockHeight, ) -> Result<Box<ChainInfo>, LocalNodeError> where - A: ValidatorNode + Send + Sync + 'static + Clone, + A: LocalValidatorNode + Clone + 'static, { // Sequentially try each validator in random order. validators.shuffle(&mut rand::thread_rng()); @@ -306,7 +306,7 @@ where blob_locations: impl IntoIterator<Item = (BytecodeLocation, ChainId)>, ) -> Result<Vec<HashedValue>, LocalNodeError> where - A: ValidatorNode + Send + Sync + 'static + Clone, + A: LocalValidatorNode + Clone + 'static, { let mut blobs = vec![]; let mut tasks = vec![]; @@ -344,7 +344,7 @@ where location: BytecodeLocation, ) -> Result<Option<HashedValue>, LocalNodeError> where - A: ValidatorNode + Send + Sync + 'static + Clone, + A: LocalValidatorNode + Clone + 'static, { match storage.read_value(location.certificate_hash).await { Ok(blob) => return Ok(Some(blob)), @@ -387,7 +387,7 @@ where stop: BlockHeight, ) -> Result<(), LocalNodeError> where - A: ValidatorNode + Send + Sync + 'static + Clone, + A: LocalValidatorNode + Clone + 'static, { let limit = u64::from(stop) .checked_sub(u64::from(start)) @@ -421,7 +421,7 @@ where chain_id: ChainId, ) -> Result<Box<ChainInfo>, LocalNodeError> where - A: ValidatorNode + Send + Sync + 'static + Clone, + A: LocalValidatorNode + Clone + 'static, { let futures = validators .into_iter() @@ -446,7 +446,7 @@ where chain_id: ChainId, ) -> Result<(), LocalNodeError> where - A: ValidatorNode + Send + Sync + 'static + Clone, + A: LocalValidatorNode + Clone + 'static, { let local_info = self.local_chain_info(chain_id).await?; let range = BlockHeightRange { @@ -506,7 +506,7 @@ where location: BytecodeLocation, ) -> Option<HashedValue> where - A: ValidatorNode + Send + Sync + 'static + Clone, + A: LocalValidatorNode + Clone + 'static, { // Sequentially try each validator in random order. validators.shuffle(&mut rand::thread_rng()); @@ -527,7 +527,7 @@ where location: BytecodeLocation, ) -> Option<HashedValue> where - A: ValidatorNode + Send + Sync + 'static + Clone, + A: LocalValidatorNode + Clone + 'static, { let query = ChainInfoQuery::new(chain_id).with_blob(location.certificate_hash); if let Ok(response) = node.handle_chain_info_query(query).await { diff --git a/linera-core/src/node.rs b/linera-core/src/node.rs index a799572eefc1..dfe39a52b10b 100644 --- a/linera-core/src/node.rs +++ b/linera-core/src/node.rs @@ -6,8 +6,7 @@ use crate::{ data_types::{ChainInfoQuery, ChainInfoResponse}, worker::{Notification, WorkerError}, }; -use async_trait::async_trait; -use futures::Stream; +use futures::stream::{BoxStream, LocalBoxStream, Stream}; use linera_base::{ crypto::CryptoError, data_types::{ArithmeticError, BlockHeight}, @@ -24,11 +23,12 @@ use linera_execution::{ use linera_version::VersionInfo; use linera_views::views::ViewError; use serde::{Deserialize, Serialize}; -use std::pin::Pin; use thiserror::Error; /// A pinned [`Stream`] of Notifications. -pub type NotificationStream = Pin<Box<dyn Stream<Item = Notification> + Send>>; +pub type NotificationStream = BoxStream<'static, Notification>; +/// A pinned [`Stream`] of Notifications, without the `Send` constraint. +pub type LocalNotificationStream = LocalBoxStream<'static, Notification>; /// Whether to wait for the delivery of outgoing cross-chain messages. #[derive(Debug, Default, Clone, Copy)] @@ -39,8 +39,10 @@ pub enum CrossChainMessageDelivery { } /// How to communicate with a validator node. -#[async_trait] -pub trait ValidatorNode { +#[trait_variant::make(ValidatorNode: Send)] +pub trait LocalValidatorNode { + type NotificationStream: Stream<Item = Notification> + Unpin; + /// Proposes a new block. async fn handle_block_proposal( &mut self, @@ -72,13 +74,16 @@ pub trait ValidatorNode { async fn get_version_info(&mut self) -> Result<VersionInfo, NodeError>; /// Subscribes to receiving notifications for a collection of chains. - async fn subscribe(&mut self, chains: Vec<ChainId>) -> Result<NotificationStream, NodeError>; + async fn subscribe( + &mut self, + chains: Vec<ChainId>, + ) -> Result<Self::NotificationStream, NodeError>; } /// Turn an address into a validator node. #[allow(clippy::result_large_err)] -pub trait ValidatorNodeProvider { - type Node: ValidatorNode + Clone + Send + Sync + 'static; +pub trait LocalValidatorNodeProvider { + type Node: LocalValidatorNode + Clone + 'static; fn make_node(&self, address: &str) -> Result<Self::Node, NodeError>; @@ -104,6 +109,19 @@ pub trait ValidatorNodeProvider { } } +pub trait ValidatorNodeProvider: + LocalValidatorNodeProvider<Node = <Self as ValidatorNodeProvider>::Node> +{ + type Node: ValidatorNode + Send + Sync + Clone + 'static; +} + +impl<T: LocalValidatorNodeProvider> ValidatorNodeProvider for T +where + T::Node: ValidatorNode + Send + Sync + Clone + 'static, +{ + type Node = <T as LocalValidatorNodeProvider>::Node; +} + /// Error type for node queries. /// /// This error is meant to be serialized over the network and aggregated by clients (i.e. diff --git a/linera-core/src/unit_tests/client_test_utils.rs b/linera-core/src/unit_tests/client_test_utils.rs index 366be398e32f..ccfb605cc608 100644 --- a/linera-core/src/unit_tests/client_test_utils.rs +++ b/linera-core/src/unit_tests/client_test_utils.rs @@ -2,9 +2,12 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - client::{ChainClient, ChainClientBuilder, ValidatorNodeProvider}, + client::{ChainClient, ChainClientBuilder}, data_types::*, - node::{CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode}, + node::{ + CrossChainMessageDelivery, LocalValidatorNodeProvider, NodeError, NotificationStream, + ValidatorNode, + }, notifier::Notifier, worker::{Notification, ValidatorWorker, WorkerState}, }; @@ -90,12 +93,13 @@ pub struct LocalValidatorClient<S> { client: Arc<Mutex<LocalValidator<S>>>, } -#[async_trait] impl<S> ValidatorNode for LocalValidatorClient<S> where S: Storage + Clone + Send + Sync + 'static, ViewError: From<S::ContextError>, { + type NotificationStream = NotificationStream; + async fn handle_block_proposal( &mut self, proposal: BlockProposal, @@ -316,7 +320,7 @@ where #[derive(Clone)] pub struct NodeProvider<S>(BTreeMap<ValidatorName, Arc<Mutex<LocalValidator<S>>>>); -impl<S> ValidatorNodeProvider for NodeProvider<S> +impl<S> LocalValidatorNodeProvider for NodeProvider<S> where S: Storage + Clone + Send + Sync + 'static, ViewError: From<S::ContextError>, diff --git a/linera-core/src/updater.rs b/linera-core/src/updater.rs index 02bb78a0dce6..88e5bdddb456 100644 --- a/linera-core/src/updater.rs +++ b/linera-core/src/updater.rs @@ -4,9 +4,9 @@ use crate::{ data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse}, - node::{CrossChainMessageDelivery, NodeError, ValidatorNode}, + node::{CrossChainMessageDelivery, LocalValidatorNode, NodeError}, }; -use futures::{future, StreamExt}; +use futures::{future, Future, StreamExt}; use linera_base::{ data_types::{BlockHeight, Round}, identifiers::ChainId, @@ -73,15 +73,16 @@ pub enum CommunicationError<E: fmt::Debug> { /// Tries to stop early when a quorum is reached. If `grace_period` is not zero, other validators /// are given this much additional time to contribute to the result, as a fraction of how long it /// took to reach the quorum. -pub async fn communicate_with_quorum<'a, A, V, K, F, G>( +pub async fn communicate_with_quorum<'a, A, V, K, F, R, G>( validator_clients: &'a [(ValidatorName, A)], committee: &Committee, group_by: G, execute: F, ) -> Result<(K, Vec<V>), CommunicationError<NodeError>> where - A: ValidatorNode + Send + Sync + 'static + Clone, - F: Fn(ValidatorName, A) -> future::BoxFuture<'a, Result<V, NodeError>> + Clone, + A: LocalValidatorNode + Clone + 'static, + F: Clone + Fn(ValidatorName, A) -> R, + R: Future<Output = Result<V, NodeError>> + 'a, G: Fn(&V) -> K, K: Hash + PartialEq + Eq + Clone + 'static, V: 'static, @@ -160,7 +161,7 @@ where impl<A, S> ValidatorUpdater<A, S> where - A: ValidatorNode + Clone + Send + Sync + 'static, + A: LocalValidatorNode + Clone + 'static, S: Storage + Clone + Send + Sync + 'static, ViewError: From<S::ContextError>, { diff --git a/linera-execution/src/lib.rs b/linera-execution/src/lib.rs index be6aaff8fca0..ee39ce21f15e 100644 --- a/linera-execution/src/lib.rs +++ b/linera-execution/src/lib.rs @@ -904,7 +904,7 @@ pub struct Bytecode { impl Bytecode { /// Creates a new [`Bytecode`] instance using the provided `bytes`. - #[cfg(any(test, with_wasm_runtime))] + #[allow(dead_code)] pub(crate) fn new(bytes: Vec<u8>) -> Self { Bytecode { bytes } } diff --git a/linera-rpc/Cargo.toml b/linera-rpc/Cargo.toml index ca7a61070aa6..be5831920c1f 100644 --- a/linera-rpc/Cargo.toml +++ b/linera-rpc/Cargo.toml @@ -45,6 +45,7 @@ anyhow.workspace = true async-trait.workspace = true bincode.workspace = true bytes.workspace = true +cfg-if.workspace = true clap.workspace = true dashmap.workspace = true ed25519-dalek.workspace = true @@ -80,7 +81,11 @@ test-strategy.workspace = true tonic = { workspace = true, features = ["prost", "codegen", "transport"] } [target.'cfg(target_arch = "wasm32")'.dependencies] -tonic = { workspace = true, features = ["prost", "codegen"] } +tonic = { workspace = true, features = ["codegen", "prost"] } +tonic-web-wasm-client.workspace = true + +[target.'cfg(target_arch = "wasm32")'.dev-dependencies] +wasm-bindgen-test.workspace = true [build-dependencies] cfg_aliases.workspace = true diff --git a/linera-rpc/src/client.rs b/linera-rpc/src/client.rs index 894a3796d8a4..a922378fb362 100644 --- a/linera-rpc/src/client.rs +++ b/linera-rpc/src/client.rs @@ -1,8 +1,6 @@ // Copyright (c) Zefchain Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use async_trait::async_trait; - use crate::grpc::GrpcClient; #[cfg(with_simple_network)] use crate::simple::SimpleClient; @@ -10,8 +8,15 @@ use linera_base::identifiers::ChainId; use linera_chain::data_types::{BlockProposal, Certificate, HashedValue, LiteCertificate}; use linera_core::{ data_types::{ChainInfoQuery, ChainInfoResponse}, - node::{CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode}, + node::{CrossChainMessageDelivery, NodeError}, +}; + +#[cfg(web)] +use linera_core::node::{ + LocalNotificationStream as NotificationStream, LocalValidatorNode as ValidatorNode, }; +#[cfg(not(web))] +use linera_core::node::{NotificationStream, ValidatorNode}; #[derive(Clone)] pub enum Client { @@ -33,8 +38,9 @@ impl From<SimpleClient> for Client { } } -#[async_trait] impl ValidatorNode for Client { + type NotificationStream = NotificationStream; + async fn handle_block_proposal( &mut self, proposal: BlockProposal, @@ -102,7 +108,10 @@ impl ValidatorNode for Client { } } - async fn subscribe(&mut self, chains: Vec<ChainId>) -> Result<NotificationStream, NodeError> { + async fn subscribe( + &mut self, + chains: Vec<ChainId>, + ) -> Result<Self::NotificationStream, NodeError> { Ok(match self { Client::Grpc(grpc_client) => Box::pin(grpc_client.subscribe(chains).await?), diff --git a/linera-rpc/src/grpc/client.rs b/linera-rpc/src/grpc/client.rs index 1e73c2a7431f..7bfcd0867107 100644 --- a/linera-rpc/src/grpc/client.rs +++ b/linera-rpc/src/grpc/client.rs @@ -5,21 +5,31 @@ use super::{ api::{ chain_info_result::Inner, validator_node_client::ValidatorNodeClient, SubscriptionRequest, }, - GrpcError, GrpcProtoConversionError, GRPC_MAX_MESSAGE_SIZE, + transport, GrpcError, GRPC_MAX_MESSAGE_SIZE, }; use crate::{ - config::ValidatorPublicNetworkConfig, mass_client, node_provider::NodeOptions, - HandleCertificateRequest, HandleLiteCertificateRequest, RpcMessage, + config::ValidatorPublicNetworkConfig, node_provider::NodeOptions, HandleCertificateRequest, + HandleLiteCertificateRequest, }; -use async_trait::async_trait; use futures::{future, stream, StreamExt}; use linera_base::identifiers::ChainId; use linera_chain::data_types; use linera_core::{ - node::{CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode}, + node::{CrossChainMessageDelivery, NodeError}, worker::Notification, }; +#[cfg(web)] +use linera_core::node::{ + LocalNotificationStream as NotificationStream, LocalValidatorNode as ValidatorNode, +}; +#[cfg(not(web))] +use { + super::GrpcProtoConversionError, + crate::{mass_client, RpcMessage}, + linera_core::node::{NotificationStream, ValidatorNode}, +}; + use linera_version::VersionInfo; use std::{iter, time::Duration}; @@ -31,7 +41,7 @@ use tracing::{debug, info, instrument, warn}; #[derive(Clone)] pub struct GrpcClient { address: String, - client: ValidatorNodeClient<tonic::transport::Channel>, + client: ValidatorNodeClient<transport::Channel>, notification_retry_delay: Duration, notification_retries: u32, } @@ -42,10 +52,9 @@ impl GrpcClient { options: NodeOptions, ) -> Result<Self, GrpcError> { let address = network.http_address(); - let channel = tonic::transport::Channel::from_shared(address.clone())? - .connect_timeout(options.send_timeout) - .timeout(options.recv_timeout) - .connect_lazy(); + + let channel = + transport::create_channel(address.clone(), &transport::Options::from(&options))?; let client = ValidatorNodeClient::new(channel) .max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE) .max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE); @@ -126,8 +135,9 @@ macro_rules! client_delegate { }}; } -#[async_trait] impl ValidatorNode for GrpcClient { + type NotificationStream = NotificationStream; + #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))] async fn handle_block_proposal( &mut self, @@ -175,7 +185,10 @@ impl ValidatorNode for GrpcClient { } #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))] - async fn subscribe(&mut self, chains: Vec<ChainId>) -> Result<NotificationStream, NodeError> { + async fn subscribe( + &mut self, + chains: Vec<ChainId>, + ) -> Result<Self::NotificationStream, NodeError> { let notification_retry_delay = self.notification_retry_delay; let notification_retries = self.notification_retries; let mut retry_count = 0; @@ -250,7 +263,8 @@ impl ValidatorNode for GrpcClient { } } -#[async_trait] +#[cfg(not(web))] +#[async_trait::async_trait] impl mass_client::MassClient for GrpcClient { #[tracing::instrument(skip_all, err)] async fn send( diff --git a/linera-rpc/src/grpc/mod.rs b/linera-rpc/src/grpc/mod.rs index d2040662232c..6b7a3bcaf5de 100644 --- a/linera-rpc/src/grpc/mod.rs +++ b/linera-rpc/src/grpc/mod.rs @@ -5,9 +5,9 @@ mod client; mod conversions; mod node_provider; pub mod pool; - #[cfg(with_server)] mod server; +pub mod transport; pub use client::*; pub use conversions::*; @@ -24,7 +24,7 @@ pub mod api { #[derive(thiserror::Error, Debug)] pub enum GrpcError { #[error("failed to connect to address: {0}")] - ConnectionFailed(#[from] tonic::transport::Error), + ConnectionFailed(#[from] transport::Error), #[error("failed to communicate cross-chain queries: {0}")] CrossChain(#[from] tonic::Status), diff --git a/linera-rpc/src/grpc/node_provider.rs b/linera-rpc/src/grpc/node_provider.rs index 376284e0d9cc..f5a832f55712 100644 --- a/linera-rpc/src/grpc/node_provider.rs +++ b/linera-rpc/src/grpc/node_provider.rs @@ -5,8 +5,7 @@ use super::GrpcClient; use crate::{config::ValidatorPublicNetworkConfig, node_provider::NodeOptions}; -use linera_core::node::{NodeError, ValidatorNodeProvider}; - +use linera_core::node::{LocalValidatorNodeProvider, NodeError}; use std::str::FromStr as _; #[derive(Copy, Clone)] @@ -18,7 +17,7 @@ impl GrpcNodeProvider { } } -impl ValidatorNodeProvider for GrpcNodeProvider { +impl LocalValidatorNodeProvider for GrpcNodeProvider { type Node = GrpcClient; fn make_node(&self, address: &str) -> anyhow::Result<Self::Node, NodeError> { diff --git a/linera-rpc/src/grpc/pool.rs b/linera-rpc/src/grpc/pool.rs index 32c6138aaec9..0c02b6fbb3f6 100644 --- a/linera-rpc/src/grpc/pool.rs +++ b/linera-rpc/src/grpc/pool.rs @@ -1,48 +1,37 @@ // Copyright (c) Zefchain Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use super::GrpcError; +use super::{transport, GrpcError}; use dashmap::DashMap; use std::time::Duration; -use tonic::transport::{Channel, Endpoint}; -/// A pool of transport channels to be used by Grpc. +/// A pool of transport channels to be used by gRPC. #[derive(Clone, Default)] pub struct GrpcConnectionPool { - connect_timeout: Option<Duration>, - timeout: Option<Duration>, - channels: DashMap<String, Channel>, + options: transport::Options, + channels: DashMap<String, transport::Channel>, } impl GrpcConnectionPool { pub fn with_connect_timeout(mut self, connect_timeout: impl Into<Option<Duration>>) -> Self { - self.connect_timeout = connect_timeout.into(); + self.options.connect_timeout = connect_timeout.into(); self } pub fn with_timeout(mut self, timeout: impl Into<Option<Duration>>) -> Self { - self.timeout = timeout.into(); + self.options.timeout = timeout.into(); self } /// Obtains a channel for the current address. Either clones an existing one (thereby /// reusing the connection), or creates one if needed. New channels do not create a /// connection immediately. - pub fn channel(&self, address: String) -> Result<Channel, GrpcError> { - let channel = self + pub fn channel(&self, address: String) -> Result<transport::Channel, GrpcError> { + Ok(self .channels .entry(address.clone()) - .or_try_insert_with(|| { - let mut endpoint = Endpoint::from_shared(address)?; - if let Some(timeout) = self.connect_timeout { - endpoint = endpoint.connect_timeout(timeout); - } - if let Some(timeout) = self.timeout { - endpoint = endpoint.timeout(timeout); - } - Ok::<_, GrpcError>(endpoint.connect_lazy()) - })?; - Ok(channel.clone()) + .or_try_insert_with(|| transport::create_channel(address, &self.options))? + .clone()) } } diff --git a/linera-rpc/src/grpc/transport.rs b/linera-rpc/src/grpc/transport.rs new file mode 100644 index 000000000000..0013c5923616 --- /dev/null +++ b/linera-rpc/src/grpc/transport.rs @@ -0,0 +1,46 @@ +// Copyright (c) Zefchain Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::NodeOptions; + +#[derive(Clone, Debug, Default)] +pub struct Options { + pub connect_timeout: Option<std::time::Duration>, + pub timeout: Option<std::time::Duration>, +} + +impl From<&'_ NodeOptions> for Options { + fn from(node_options: &NodeOptions) -> Self { + Self { + connect_timeout: Some(node_options.send_timeout), + timeout: Some(node_options.recv_timeout), + } + } +} + +cfg_if::cfg_if! { + if #[cfg(web)] { + pub use tonic_web_wasm_client::{Client as Channel, Error}; + + pub fn create_channel(address: String, _options: &Options) -> Result<Channel, Error> { + // TODO(#1817): this should respect `options` + Ok(tonic_web_wasm_client::Client::new(address)) + } + } else { + pub use tonic::transport::{Channel, Error}; + + pub fn create_channel( + address: String, + options: &Options, + ) -> Result<Channel, Error> { + let mut endpoint = tonic::transport::Endpoint::from_shared(address)?; + if let Some(timeout) = options.connect_timeout { + endpoint = endpoint.connect_timeout(timeout); + } + if let Some(timeout) = options.timeout { + endpoint = endpoint.timeout(timeout); + } + Ok(endpoint.connect_lazy()) + } + } +} diff --git a/linera-rpc/src/lib.rs b/linera-rpc/src/lib.rs index 98604a419200..54f05c69467b 100644 --- a/linera-rpc/src/lib.rs +++ b/linera-rpc/src/lib.rs @@ -21,6 +21,7 @@ pub mod simple; pub mod grpc; pub use message::RpcMessage; +pub use node_provider::NodeOptions; #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] #[cfg_attr(with_testing, derive(Eq, PartialEq))] diff --git a/linera-rpc/src/mass_client.rs b/linera-rpc/src/mass_client.rs index c8e267afebaf..faf56b62b05c 100644 --- a/linera-rpc/src/mass_client.rs +++ b/linera-rpc/src/mass_client.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::RpcMessage; + use async_trait::async_trait; use thiserror::Error; @@ -10,7 +11,7 @@ pub enum MassClientError { #[error("io error: {0}")] Io(#[from] std::io::Error), #[error("tonic transport: {0}")] - Tonic(#[from] tonic::transport::Error), + TonicTransport(#[from] crate::grpc::transport::Error), #[error("conversion error: {0}")] Conversion(#[from] crate::grpc::GrpcProtoConversionError), #[error("error while making a remote call: {0}")] @@ -18,7 +19,7 @@ pub enum MassClientError { } #[async_trait] -pub trait MassClient: Send + Sync { +pub trait MassClient { async fn send( &mut self, requests: Vec<RpcMessage>, diff --git a/linera-rpc/src/node_provider.rs b/linera-rpc/src/node_provider.rs index 9c381cf31816..102ebf377174 100644 --- a/linera-rpc/src/node_provider.rs +++ b/linera-rpc/src/node_provider.rs @@ -5,7 +5,7 @@ use crate::simple::SimpleNodeProvider; use crate::{client::Client, grpc::GrpcNodeProvider}; -use linera_core::node::{NodeError, ValidatorNodeProvider}; +use linera_core::node::{LocalValidatorNodeProvider, NodeError}; use std::time::Duration; @@ -28,7 +28,7 @@ impl NodeProvider { } } -impl ValidatorNodeProvider for NodeProvider { +impl LocalValidatorNodeProvider for NodeProvider { type Node = Client; fn make_node(&self, address: &str) -> anyhow::Result<Self::Node, NodeError> { diff --git a/linera-rpc/src/simple/client.rs b/linera-rpc/src/simple/client.rs index b2bd7db07a4d..1fd922ae9c92 100644 --- a/linera-rpc/src/simple/client.rs +++ b/linera-rpc/src/simple/client.rs @@ -3,24 +3,26 @@ // SPDX-License-Identifier: Apache-2.0 use super::{codec, transport::TransportProtocol}; + use crate::{ config::ValidatorPublicNetworkPreConfig, mass_client, HandleCertificateRequest, HandleLiteCertificateRequest, RpcMessage, }; -use async_trait::async_trait; -use futures::{sink::SinkExt, stream::StreamExt}; + use linera_base::identifiers::ChainId; use linera_chain::data_types::{BlockProposal, Certificate, HashedValue, LiteCertificate}; use linera_core::{ data_types::{ChainInfoQuery, ChainInfoResponse}, node::{CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode}, }; - use linera_version::VersionInfo; -use std::time::Duration; +use async_trait::async_trait; +use futures::{sink::SinkExt, stream::StreamExt}; use tokio::time; +use std::{future::Future, time::Duration}; + #[derive(Clone)] pub struct SimpleClient { network: ValidatorPublicNetworkPreConfig<TransportProtocol>, @@ -68,8 +70,9 @@ impl SimpleClient { } } -#[async_trait] impl ValidatorNode for SimpleClient { + type NotificationStream = NotificationStream; + /// Initiates a new block. async fn handle_block_proposal( &mut self, @@ -116,10 +119,12 @@ impl ValidatorNode for SimpleClient { self.query(query.into()).await } - async fn subscribe(&mut self, _chains: Vec<ChainId>) -> Result<NotificationStream, NodeError> { - Err(NodeError::SubscriptionError { - transport: self.network.protocol.to_string(), - }) + fn subscribe( + &mut self, + _chains: Vec<ChainId>, + ) -> impl Future<Output = Result<NotificationStream, NodeError>> + Send { + let transport = self.network.protocol.to_string(); + async { Err(NodeError::SubscriptionError { transport }) } } async fn get_version_info(&mut self) -> Result<VersionInfo, NodeError> { diff --git a/linera-rpc/src/simple/node_provider.rs b/linera-rpc/src/simple/node_provider.rs index 6f29b4065783..9f642b4cd797 100644 --- a/linera-rpc/src/simple/node_provider.rs +++ b/linera-rpc/src/simple/node_provider.rs @@ -5,7 +5,7 @@ use super::SimpleClient; use crate::{config::ValidatorPublicNetworkPreConfig, node_provider::NodeOptions}; -use linera_core::node::{NodeError, ValidatorNodeProvider}; +use linera_core::node::{LocalValidatorNodeProvider, NodeError}; use std::str::FromStr as _; @@ -19,7 +19,7 @@ impl SimpleNodeProvider { } } -impl ValidatorNodeProvider for SimpleNodeProvider { +impl LocalValidatorNodeProvider for SimpleNodeProvider { type Node = SimpleClient; fn make_node(&self, address: &str) -> Result<Self::Node, NodeError> { diff --git a/linera-rpc/tests/transport.rs b/linera-rpc/tests/transport.rs new file mode 100644 index 000000000000..33fa2637c4a1 --- /dev/null +++ b/linera-rpc/tests/transport.rs @@ -0,0 +1,34 @@ +// Copyright (c) Zefchain Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +#[cfg(web)] +wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); + +#[cfg_attr(web, wasm_bindgen_test::wasm_bindgen_test)] +#[cfg_attr(not(web), tokio::test(flavor = "current_thread"))] +#[ignore] +// this test currently must be run manually, as it requires a Linera proxy to be running on 127.0.0.1:9000. +async fn client() { + use linera_core::node::LocalValidatorNode as _; + use linera_rpc::config::*; + use std::time::Duration; + + let network_config = ValidatorPublicNetworkPreConfig { + protocol: NetworkProtocol::Grpc(TlsConfig::ClearText), + host: "127.0.0.1".into(), + port: 9000, + }; + + let node_options = linera_rpc::node_provider::NodeOptions { + send_timeout: Duration::from_millis(100), + recv_timeout: Duration::from_millis(100), + notification_retry_delay: Duration::from_millis(100), + notification_retries: 5, + }; + + let _ = linera_rpc::grpc::GrpcClient::new(network_config, node_options) + .unwrap() + .get_version_info() + .await + .unwrap(); +} diff --git a/linera-service/src/chain_listener.rs b/linera-service/src/chain_listener.rs index 5e41acea7114..cee7309e3a01 100644 --- a/linera-service/src/chain_listener.rs +++ b/linera-service/src/chain_listener.rs @@ -12,7 +12,7 @@ use linera_base::{ use linera_chain::data_types::OutgoingMessage; use linera_core::{ client::{ArcChainClient, ChainClient}, - node::ValidatorNodeProvider, + node::{ValidatorNode, ValidatorNodeProvider}, worker::{Notification, Reason}, }; use linera_execution::{Message, SystemMessage}; @@ -61,6 +61,7 @@ pub struct ChainListener<P, S> { impl<P, S> ChainListener<P, S> where P: ValidatorNodeProvider + Send + Sync + 'static, + <<P as ValidatorNodeProvider>::Node as ValidatorNode>::NotificationStream: Send, S: Storage + Clone + Send + Sync + 'static, ViewError: From<S::ContextError>, { diff --git a/linera-service/src/faucet.rs b/linera-service/src/faucet.rs index 641a2c600954..cb82fb8abd33 100644 --- a/linera-service/src/faucet.rs +++ b/linera-service/src/faucet.rs @@ -113,6 +113,7 @@ where impl<P, S, C> MutationRoot<P, S, C> where P: ValidatorNodeProvider + Send + Sync + 'static, + <P as ValidatorNodeProvider>::Node: Sync, S: Storage + Clone + Send + Sync + 'static, C: ClientContext<P> + Send + 'static, ViewError: From<S::ContextError>, @@ -126,6 +127,7 @@ where impl<P, S, C> MutationRoot<P, S, C> where P: ValidatorNodeProvider + Send + Sync + 'static, + <P as ValidatorNodeProvider>::Node: Sync, S: Storage + Clone + Send + Sync + 'static, C: ClientContext<P> + Send + 'static, ViewError: From<S::ContextError>, diff --git a/linera-service/src/linera/client_context.rs b/linera-service/src/linera/client_context.rs index a19e80baf382..5d8d74b48e03 100644 --- a/linera-service/src/linera/client_context.rs +++ b/linera-service/src/linera/client_context.rs @@ -766,10 +766,10 @@ impl ClientContext { responses } - fn make_validator_mass_clients(&self) -> Vec<Box<dyn MassClient>> { + fn make_validator_mass_clients(&self) -> Vec<Box<dyn MassClient + Send>> { let mut validator_clients = Vec::new(); for config in &self.wallet_state.genesis_config().committee.validators { - let client: Box<dyn MassClient> = match config.network.protocol { + let client: Box<dyn MassClient + Send> = match config.network.protocol { NetworkProtocol::Simple(protocol) => { let network = config.network.clone_with_protocol(protocol); Box::new(SimpleMassClient::new( diff --git a/linera-service/src/linera/main.rs b/linera-service/src/linera/main.rs index 47f995a58c52..a31e30baeb62 100644 --- a/linera-service/src/linera/main.rs +++ b/linera-service/src/linera/main.rs @@ -20,7 +20,7 @@ use linera_core::{ client::ChainClientError, data_types::{ChainInfoQuery, ClientOutcome}, local_node::LocalNodeClient, - node::ValidatorNodeProvider, + node::LocalValidatorNodeProvider, notifier::Notifier, worker::WorkerState, }; diff --git a/linera-service/src/node_service.rs b/linera-service/src/node_service.rs index ba5dccb41c67..bca9e67f025d 100644 --- a/linera-service/src/node_service.rs +++ b/linera-service/src/node_service.rs @@ -29,7 +29,7 @@ use linera_chain::{data_types::HashedValue, ChainStateView}; use linera_core::{ client::{ArcChainClient, ChainClient, ChainClientError}, data_types::{ClientOutcome, RoundTimeout}, - node::{NotificationStream, ValidatorNodeProvider}, + node::{NotificationStream, ValidatorNode, ValidatorNodeProvider}, worker::{Notification, Reason}, }; use linera_execution::{ @@ -884,6 +884,7 @@ impl<P, S: Clone, C> Clone for NodeService<P, S, C> { impl<P, S, C> NodeService<P, S, C> where P: ValidatorNodeProvider + Send + Sync + 'static, + <<P as ValidatorNodeProvider>::Node as ValidatorNode>::NotificationStream: Send, S: Storage + Clone + Send + Sync + 'static, C: ClientContext<P> + Send + 'static, ViewError: From<S::ContextError>, diff --git a/linera-service/src/schema_export.rs b/linera-service/src/schema_export.rs index f62eddaf43bd..ce3e203785b8 100644 --- a/linera-service/src/schema_export.rs +++ b/linera-service/src/schema_export.rs @@ -8,8 +8,8 @@ use linera_core::{ client::ChainClient, data_types::{ChainInfoQuery, ChainInfoResponse}, node::{ - CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode, - ValidatorNodeProvider, + CrossChainMessageDelivery, LocalValidatorNodeProvider, NodeError, NotificationStream, + ValidatorNode, }, }; use linera_execution::committee::Committee; @@ -28,8 +28,9 @@ use linera_views::{ #[derive(Clone)] struct DummyValidatorNode; -#[async_trait] impl ValidatorNode for DummyValidatorNode { + type NotificationStream = NotificationStream; + async fn handle_block_proposal( &mut self, _: BlockProposal, @@ -72,7 +73,7 @@ impl ValidatorNode for DummyValidatorNode { struct DummyValidatorNodeProvider; -impl ValidatorNodeProvider for DummyValidatorNodeProvider { +impl LocalValidatorNodeProvider for DummyValidatorNodeProvider { type Node = DummyValidatorNode; fn make_node(&self, _: &str) -> Result<Self::Node, NodeError> {