Skip to content

Commit

Permalink
Web client (#1738)
Browse files Browse the repository at this point in the history
* `linera-rpc`: implement WebSocket client, modulo Send-ness

* Implement non-`Send` versions of RPC traits

* `linera-rpc`: deduplicate implementation by cunning use of import renaming

* `linera-rpc`: add transport test

* `linera-rpc`: use `cfg-if` to simplify implementation of the `grpc::transport` module

* `linera-rpc`: note the issue for the Web client not respecting its transport options

* `linera-rpc`: refer to `async_trait` unqualified

* `linera-core`: use explicit trait bounds for `ValidatorNode` with `Send` `NotificationStream`
  • Loading branch information
Twey authored Mar 22, 2024
1 parent 810ad97 commit 8ce69b8
Show file tree
Hide file tree
Showing 30 changed files with 350 additions and 117 deletions.
81 changes: 79 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ custom_debug_derive = "0.6.1"
dashmap = "5.5.3"
derive_more = "0.99.17"
dirs = "5.0.1"
ed25519 = "2.2"
ed25519-dalek = { version = "2.1.1", features = ["batch", "serde"] }
either = "1.10.0"
frunk = "0.4.2"
Expand Down Expand Up @@ -117,6 +116,7 @@ tonic-build = { version = "0.11", default-features = false }
tonic-health = "0.11"
tonic-reflection = "0.11"
tonic-web = "0.11"
tonic-web-wasm-client = "0.5.1"
tokio = "1.36.0"
tokio-stream = "0.1.14"
tokio-test = "0.4.3"
Expand All @@ -126,6 +126,8 @@ tower-http = "0.5.2"
tower = "0.4.13"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", default-features = false, features = ["env-filter"] }
trait-variant = "0.1.1"
wasm-bindgen-test = "0.3.42"
wasm-encoder = "0.24.1"
wasmer = { version = "=3.1.1", features = ["singlepass"] }
wasmer-middlewares = "=3.1.1"
Expand All @@ -142,7 +144,6 @@ linera-chain = { version = "0.9.0", path = "./linera-chain" }
linera-core = { version = "0.9.0", path = "./linera-core", default-features = false }
linera-execution = { version = "0.9.0", path = "./linera-execution", default-features = false }
linera-indexer = { path = "./linera-indexer/lib" }
linera-indexer-example = { path = "./linera-indexer/example" }
linera-indexer-graphql-client = { path = "./linera-indexer/graphql-client" }
linera-indexer-plugins = { path = "./linera-indexer/plugins" }
linera-rpc = { version = "0.9.0", path = "./linera-rpc" }
Expand Down
12 changes: 12 additions & 0 deletions examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 12 additions & 4 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,26 @@
let
cargoToml = builtins.fromTOML (builtins.readFile ./Cargo.toml);
nonRustDeps = with pkgs; [
# for building
clang
jq
kubernetes-helm
kind
kubectl
libclang.lib
libiconv
nodejs
openssl
protobuf
pkg-config
rocksdb

# for testing
jq
kubernetes-helm
kind
kubectl

# for Wasm testing
chromium
chromedriver
wasm-pack
];
rustBuildToolchain = (pkgs.rust-bin.fromRustupToolchainFile
./toolchains/build/rust-toolchain.toml);
Expand Down
1 change: 1 addition & 0 deletions linera-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ tokio.workspace = true
tokio-stream.workspace = true
tonic.workspace = true
tracing.workspace = true
trait-variant.workspace = true

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
linera-storage-service.workspace = true
Expand Down
23 changes: 13 additions & 10 deletions linera-core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use crate::{
},
local_node::{LocalNodeClient, LocalNodeError},
node::{
CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
ValidatorNodeProvider,
CrossChainMessageDelivery, LocalValidatorNode, LocalValidatorNodeProvider, NodeError,
NotificationStream, ValidatorNodeProvider,
},
notifier::Notifier,
updater::{communicate_with_quorum, CommunicateAction, CommunicationError, ValidatorUpdater},
Expand Down Expand Up @@ -272,7 +272,7 @@ enum ReceiveCertificateMode {

impl<P, S> ChainClient<P, S>
where
P: ValidatorNodeProvider + Sync,
P: LocalValidatorNodeProvider + Sync,
S: Storage + Clone + Send + Sync + 'static,
ViewError: From<S::ContextError>,
{
Expand Down Expand Up @@ -722,7 +722,7 @@ where
mut node_client: LocalNodeClient<S>,
) -> Result<(ValidatorName, u64, Vec<Certificate>), NodeError>
where
A: ValidatorNode + Send + Sync + 'static + Clone,
A: LocalValidatorNode + Clone + 'static,
{
// Retrieve newly received certificates from this validator.
let query = ChainInfoQuery::new(chain_id).with_received_log_excluding_first_nth(tracker);
Expand Down Expand Up @@ -2003,16 +2003,19 @@ impl<P, S> Clone for ArcChainClient<P, S> {
}
}

impl<P, S> ArcChainClient<P, S> {
pub fn new(client: ChainClient<P, S>) -> Self {
Self(Arc::new(Mutex::new(client)))
}
}

impl<P, S> ArcChainClient<P, S>
where
P: ValidatorNodeProvider + Sync,
<<P as ValidatorNodeProvider>::Node as crate::node::ValidatorNode>::NotificationStream: Send,
S: Storage + Clone + Send + Sync + 'static,
ViewError: From<S::ContextError>,
{
pub fn new(client: ChainClient<P, S>) -> Self {
Self(Arc::new(Mutex::new(client)))
}

async fn local_chain_info(
&self,
chain_id: ChainId,
Expand Down Expand Up @@ -2040,7 +2043,7 @@ where
async fn process_notification(
&self,
name: ValidatorName,
node: P::Node,
node: <P as ValidatorNodeProvider>::Node,
mut local_node: LocalNodeClient<S>,
notification: Notification,
) {
Expand Down Expand Up @@ -2198,7 +2201,7 @@ where
pub async fn find_received_certificates_from_validator(
&self,
name: ValidatorName,
node: P::Node,
node: <P as ValidatorNodeProvider>::Node,
node_client: LocalNodeClient<S>,
) -> Result<(), ChainClientError> {
let ((committees, max_epoch), chain_id, current_tracker) = {
Expand Down
20 changes: 10 additions & 10 deletions linera-core/src/local_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use crate::{
data_types::{BlockHeightRange, ChainInfo, ChainInfoQuery, ChainInfoResponse},
node::{NotificationStream, ValidatorNode},
node::{LocalValidatorNode, NotificationStream},
notifier::Notifier,
worker::{Notification, ValidatorWorker, WorkerError, WorkerState},
};
Expand Down Expand Up @@ -182,7 +182,7 @@ where
certificates: Vec<Certificate>,
) -> Option<Box<ChainInfo>>
where
A: ValidatorNode + Send + Sync + 'static + Clone,
A: LocalValidatorNode + Clone + 'static,
{
let mut info = None;
for certificate in certificates {
Expand Down Expand Up @@ -268,7 +268,7 @@ where
target_next_block_height: BlockHeight,
) -> Result<Box<ChainInfo>, LocalNodeError>
where
A: ValidatorNode + Send + Sync + 'static + Clone,
A: LocalValidatorNode + Clone + 'static,
{
// Sequentially try each validator in random order.
validators.shuffle(&mut rand::thread_rng());
Expand Down Expand Up @@ -306,7 +306,7 @@ where
blob_locations: impl IntoIterator<Item = (BytecodeLocation, ChainId)>,
) -> Result<Vec<HashedValue>, LocalNodeError>
where
A: ValidatorNode + Send + Sync + 'static + Clone,
A: LocalValidatorNode + Clone + 'static,
{
let mut blobs = vec![];
let mut tasks = vec![];
Expand Down Expand Up @@ -344,7 +344,7 @@ where
location: BytecodeLocation,
) -> Result<Option<HashedValue>, LocalNodeError>
where
A: ValidatorNode + Send + Sync + 'static + Clone,
A: LocalValidatorNode + Clone + 'static,
{
match storage.read_value(location.certificate_hash).await {
Ok(blob) => return Ok(Some(blob)),
Expand Down Expand Up @@ -387,7 +387,7 @@ where
stop: BlockHeight,
) -> Result<(), LocalNodeError>
where
A: ValidatorNode + Send + Sync + 'static + Clone,
A: LocalValidatorNode + Clone + 'static,
{
let limit = u64::from(stop)
.checked_sub(u64::from(start))
Expand Down Expand Up @@ -421,7 +421,7 @@ where
chain_id: ChainId,
) -> Result<Box<ChainInfo>, LocalNodeError>
where
A: ValidatorNode + Send + Sync + 'static + Clone,
A: LocalValidatorNode + Clone + 'static,
{
let futures = validators
.into_iter()
Expand All @@ -446,7 +446,7 @@ where
chain_id: ChainId,
) -> Result<(), LocalNodeError>
where
A: ValidatorNode + Send + Sync + 'static + Clone,
A: LocalValidatorNode + Clone + 'static,
{
let local_info = self.local_chain_info(chain_id).await?;
let range = BlockHeightRange {
Expand Down Expand Up @@ -506,7 +506,7 @@ where
location: BytecodeLocation,
) -> Option<HashedValue>
where
A: ValidatorNode + Send + Sync + 'static + Clone,
A: LocalValidatorNode + Clone + 'static,
{
// Sequentially try each validator in random order.
validators.shuffle(&mut rand::thread_rng());
Expand All @@ -527,7 +527,7 @@ where
location: BytecodeLocation,
) -> Option<HashedValue>
where
A: ValidatorNode + Send + Sync + 'static + Clone,
A: LocalValidatorNode + Clone + 'static,
{
let query = ChainInfoQuery::new(chain_id).with_blob(location.certificate_hash);
if let Ok(response) = node.handle_chain_info_query(query).await {
Expand Down
Loading

0 comments on commit 8ce69b8

Please sign in to comment.