diff --git a/.github/actions/prerequisite-nightly/action.yml b/.github/actions/prerequisite-nightly/action.yml index dd1985141..d6cf3b7a3 100644 --- a/.github/actions/prerequisite-nightly/action.yml +++ b/.github/actions/prerequisite-nightly/action.yml @@ -18,6 +18,8 @@ runs: token: ${{ inputs.token }} - name: Install Rust Nightly - uses: ./.github/actions/install-rust-nightly + uses: dtolnay/rust-toolchain@nightly with: - version: ${{ inputs.version }} \ No newline at end of file + toolchain: ${{ inputs.version }} + components: rustfmt, clippy + target: wasm32-unknown-unknown diff --git a/.github/workflows/ci-main.yml b/.github/workflows/ci-main.yml index 339049b5c..0f4a232ed 100644 --- a/.github/workflows/ci-main.yml +++ b/.github/workflows/ci-main.yml @@ -27,7 +27,7 @@ jobs: - name: Check Build run: | bash ./scripts/cmd-all build check "--release" - + - name: Clippy -- Libraries and Binaries run: | bash ./scripts/cmd-all clippy "clippy --lib --bins" "--release -- -W clippy::all -A clippy::style -A forgetting_copy_types -A forgetting_references" diff --git a/Cargo.lock b/Cargo.lock index 7cac70b58..4bf487d47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1539,7 +1539,20 @@ dependencies = [ "futures-core", "prost 0.12.6", "prost-types 0.12.6", - "tonic", + "tonic 0.10.2", + "tracing-core", +] + +[[package]] +name = "console-api" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a257c22cd7e487dd4a13d413beabc512c5052f0bc048db0da6a84c3d8a6142fd" +dependencies = [ + "futures-core", + "prost 0.12.6", + "prost-types 0.12.6", + "tonic 0.11.0", "tracing-core", ] @@ -1549,7 +1562,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7481d4c57092cd1c19dd541b92bdce883de840df30aa5d03fd48a3935c01842e" dependencies = [ - "console-api", + "console-api 0.6.0", "crossbeam-channel", "crossbeam-utils", "futures-task", @@ -1561,7 +1574,32 @@ dependencies = [ "thread_local", "tokio", "tokio-stream", - "tonic", + "tonic 0.10.2", + "tracing", + "tracing-core", + "tracing-subscriber 0.3.18", +] + +[[package]] +name = "console-subscriber" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31c4cc54bae66f7d9188996404abdf7fdfa23034ef8e43478c8810828abad758" +dependencies = [ + "console-api 0.7.0", + "crossbeam-channel", + "crossbeam-utils", + "futures-task", + "hdrhistogram", + "humantime 2.1.0", + "prost 0.12.6", + "prost-types 0.12.6", + "serde", + "serde_json", + "thread_local", + "tokio", + "tokio-stream", + "tonic 0.11.0", "tracing", "tracing-core", "tracing-subscriber 0.3.18", @@ -8122,7 +8160,6 @@ dependencies = [ "spacewalk-runtime-standalone-mainnet", "spacewalk-runtime-standalone-testnet", "spacewalk-standalone", - "substrate-stellar-sdk", "subxt", "subxt-client", "tempdir", @@ -11538,10 +11575,11 @@ version = "1.0.11" dependencies = [ "async-std", "base64 0.13.1", - "env_logger 0.9.3", + "console-subscriber 0.3.0", "err-derive", "hex", "hmac 0.12.1", + "ntest", "rand 0.8.5", "serde", "serde_json", @@ -11660,7 +11698,7 @@ dependencies = [ [[package]] name = "substrate-stellar-sdk" version = "0.3.0" -source = "git+https://github.com/pendulum-chain/substrate-stellar-sdk?branch=polkadot-v1.1.0#9b8e2b77b6c6a63e8e837d1e8f2b42b09d49a943" +source = "git+https://github.com/pendulum-chain/substrate-stellar-sdk?branch=polkadot-v1.1.0#6ff899abd7127d59261d7a5d06283a2e5913c6d2" dependencies = [ "base64 0.13.1", "hex", @@ -12325,6 +12363,33 @@ dependencies = [ "tracing", ] +[[package]] +name = "tonic" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.21.7", + "bytes", + "h2 0.3.26", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.28", + "hyper-timeout", + "percent-encoding 2.3.1", + "pin-project", + "prost 0.12.6", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.4.13" @@ -12775,9 +12840,8 @@ dependencies = [ "bincode", "cfg-if 1.0.0", "clap 3.2.25", - "console-subscriber", + "console-subscriber 0.2.0", "env_logger 0.9.3", - "err-derive", "flate2", "frame-support", "futures 0.3.30", diff --git a/clients/runtime/Cargo.toml b/clients/runtime/Cargo.toml index c5391eb4f..5cc6506ab 100644 --- a/clients/runtime/Cargo.toml +++ b/clients/runtime/Cargo.toml @@ -64,7 +64,6 @@ testchain-runtime = { package = "spacewalk-runtime-standalone-testnet", path = " mainnet-runtime = { package = "spacewalk-runtime-standalone-mainnet", path = "../../testchain/runtime/mainnet", optional = true } # Substrate Stellar Dependencies -substrate-stellar-sdk = { git = "https://github.com/pendulum-chain/substrate-stellar-sdk", branch = "polkadot-v1.1.0" } wallet = { path = "../wallet" } diff --git a/clients/runtime/src/rpc.rs b/clients/runtime/src/rpc.rs index fb64f02a9..807002eb8 100644 --- a/clients/runtime/src/rpc.rs +++ b/clients/runtime/src/rpc.rs @@ -11,7 +11,7 @@ use subxt::{ client::OnlineClient, events::StaticEvent, rpc_params, - storage::{address::Yes, StorageAddress}, + storage::{address::Yes, Storage, StorageAddress}, tx::TxPayload, utils::Static, Error as BasicError, @@ -163,7 +163,7 @@ impl SpacewalkParachain { pub aux: ImportedAux, } - let _head = self.get_finalized_block_hash().await.unwrap(); + let _head = self.get_finalized_block_hash().await.expect("failed to get head"); let _: CreatedBlock = self .rpc @@ -175,7 +175,7 @@ impl SpacewalkParachain { /// This function is used in tests to finalize the current block. #[cfg(feature = "testing-utils")] pub async fn manual_finalize(&self) { - let head = self.get_finalized_block_hash().await.unwrap(); + let head = self.get_finalized_block_hash().await.expect("failed to get head"); let _: bool = self .rpc @@ -271,11 +271,9 @@ impl SpacewalkParachain { // For getting the nonce, use latest, possibly non-finalized block. let storage_key = metadata::storage().system().account(&self.account_id); let on_chain_nonce = self - .api - .storage() - .at_latest() + .get_latest_storage() .await - .unwrap() + .expect("Failed to get storage from the latest block hash") .fetch(&storage_key) .await .transpose() @@ -293,7 +291,7 @@ impl SpacewalkParachain { where Address: StorageAddress, { - Ok(self.api.storage().at_latest().await.unwrap().fetch(&address).await?) + Ok(self.get_latest_storage().await?.fetch(&address).await?) } async fn query_finalized_or_error
( @@ -313,13 +311,22 @@ impl SpacewalkParachain { where Address: StorageAddress, { - Ok(self.api.storage().at_latest().await.unwrap().fetch_or_default(&address).await?) + Ok(self.get_latest_storage().await?.fetch_or_default(&address).await?) } pub async fn get_finalized_block_hash(&self) -> Result, Error> { Ok(Some(self.api.backend().latest_finalized_block_ref().await?.hash())) } + async fn get_latest_storage( + &self, + ) -> Result>, Error> { + self.api.storage().at_latest().await.map_err(|e| { + log::error!("Failed to get storage from the latest block hash"); + Error::SubxtRuntimeError(e) + }) + } + /// Subscribe to new parachain blocks. pub async fn on_block(&self, on_block: F) -> Result<(), Error> where @@ -427,17 +434,17 @@ impl SpacewalkParachain { self.api .tx() .create_signed_with_nonce(&call, &signer, nonce.into(), Default::default()) - .unwrap() + .expect("failed to create a signed extrinsic") .submit_and_watch() .await - .unwrap(); + .expect("failed to submit extrinsic to chain"); // now call with outdated nonce let result = self .api .tx() .create_signed_with_nonce(&call, &signer, 0, Default::default()) - .unwrap() + .expect("failed to create a signed extrinsic") .submit_and_watch() .await; @@ -462,17 +469,17 @@ impl SpacewalkParachain { self.api .tx() .create_signed_with_nonce(&call, &signer, nonce.into(), Default::default()) - .unwrap() + .expect("failed to create a signed extrinsic") .submit() .await - .unwrap(); + .expect("failed to submit extrinsic to chain"); // should call with the same nonce let result = self .api .tx() .create_signed_with_nonce(&call, &signer, nonce.into(), Default::default()) - .unwrap() + .expect("failed to create a signed extrinsic") .submit_and_watch() .await; @@ -499,7 +506,7 @@ pub trait UtilFuncs { impl UtilFuncs for SpacewalkParachain { async fn get_current_chain_height(&self) -> Result { let height_query = metadata::storage().system().number(); - let height = self.api.storage().at_latest().await.unwrap().fetch(&height_query).await?; + let height = self.get_latest_storage().await?.fetch(&height_query).await?; match height { Some(height) => Ok(height), None => Err(Error::BlockNotFound), @@ -510,13 +517,13 @@ impl UtilFuncs for SpacewalkParachain { self.native_currency_id } - fn is_this_vault(&self, vault_id: &VaultId) -> bool { - &vault_id.account_id == self.get_account_id() - } - fn get_account_id(&self) -> &AccountId { &self.account_id } + + fn is_this_vault(&self, vault_id: &VaultId) -> bool { + &vault_id.account_id == self.get_account_id() + } } #[async_trait] @@ -594,7 +601,7 @@ impl VaultRegistryPallet for SpacewalkParachain { async fn get_all_vaults(&self) -> Result, Error> { let mut vaults = Vec::new(); let key_addr = metadata::storage().vault_registry().vaults_iter(); - let mut iter = self.api.storage().at_latest().await.unwrap().iter(key_addr).await?; + let mut iter = self.get_latest_storage().await?.iter(key_addr).await?; while let Ok((_, account)) = iter.next().await.ok_or(Error::VaultNotFound)? { if let VaultStatus::Active(..) = account.status { vaults.push(account); @@ -779,7 +786,7 @@ impl CollateralBalancesPallet for SpacewalkParachain { async fn get_native_balance_for_id(&self, id: &AccountId) -> Result { let query = metadata::storage().system().account(id); - let result = self.api.storage().at_latest().await.unwrap().fetch(&query).await?; + let result = self.get_latest_storage().await?.fetch(&query).await?; Ok(result.map(|x| x.data.free).unwrap_or_default()) } @@ -790,7 +797,7 @@ impl CollateralBalancesPallet for SpacewalkParachain { ) -> Result { let query = metadata::storage().tokens().accounts(&id, &Static(currency_id)); - let result = self.api.storage().at_latest().await.unwrap().fetch(&query).await?; + let result = self.get_latest_storage().await?.fetch(&query).await?; Ok(result.map(|x| x.free).unwrap_or_default()) } @@ -805,7 +812,7 @@ impl CollateralBalancesPallet for SpacewalkParachain { ) -> Result { let query = metadata::storage().tokens().accounts(&id, &Static(currency_id)); - let result = self.api.storage().at_latest().await.unwrap().fetch(&query).await?; + let result = self.get_latest_storage().await?.fetch(&query).await?; Ok(result.map(|x| x.reserved).unwrap_or_default()) } @@ -1074,7 +1081,7 @@ impl IssuePallet for SpacewalkParachain { let mut issue_requests = Vec::new(); let key_addr = metadata::storage().issue().issue_requests_iter(); - let mut iter = self.api.storage().at_latest().await.unwrap().iter(key_addr).await?; + let mut iter = self.get_latest_storage().await?.iter(key_addr).await?; while let Some(result) = iter.next().await { let (issue_id, request) = result?; diff --git a/clients/runtime/src/types.rs b/clients/runtime/src/types.rs index 966609cb9..b9308e30f 100644 --- a/clients/runtime/src/types.rs +++ b/clients/runtime/src/types.rs @@ -6,6 +6,7 @@ pub use subxt::ext::sp_core::sr25519::Pair as KeyPair; use subxt::utils::Static; use crate::{metadata, Config, SpacewalkRuntime, SS58_PREFIX}; +use primitives::stellar::PublicKey; pub type AccountId = subxt::utils::AccountId32; pub type Address = subxt::ext::sp_runtime::MultiAddress; @@ -17,8 +18,6 @@ pub type H256 = subxt::ext::sp_core::H256; pub type SpacewalkSigner = subxt::tx::PairSigner; pub type FixedU128 = sp_arithmetic::FixedU128; -pub use substrate_stellar_sdk as stellar; - pub type IssueId = H256; pub type StellarPublicKeyRaw = [u8; 32]; @@ -139,19 +138,15 @@ pub mod currency_id { Asset::AlphaNum4 { code, issuer } => Ok(format!( "Stellar({:?}:{:?})", from_utf8(code).unwrap_or_default(), - from_utf8( - stellar::PublicKey::from_binary(*issuer).to_encoding().as_slice() - ) - .unwrap_or_default() + from_utf8(PublicKey::from_binary(*issuer).to_encoding().as_slice()) + .unwrap_or_default() ) .replace('\"', "")), Asset::AlphaNum12 { code, issuer } => Ok(format!( "Stellar({:?}:{:?})", from_utf8(code).unwrap_or_default(), - from_utf8( - stellar::PublicKey::from_binary(*issuer).to_encoding().as_slice() - ) - .unwrap_or_default() + from_utf8(PublicKey::from_binary(*issuer).to_encoding().as_slice()) + .unwrap_or_default() ) .replace('\"', "")), }, diff --git a/clients/service/src/lib.rs b/clients/service/src/lib.rs index c9650e623..5df74e54c 100644 --- a/clients/service/src/lib.rs +++ b/clients/service/src/lib.rs @@ -10,7 +10,10 @@ use async_trait::async_trait; use futures::{future::Either, Future, FutureExt}; use governor::{Quota, RateLimiter}; use nonzero_ext::*; -use tokio::{sync::RwLock, time::sleep}; +use tokio::{ + sync::{broadcast::error::TryRecvError, RwLock}, + time::sleep, +}; pub use warp; pub use cli::{LoggingFormat, MonitoringConfig, RestartPolicy, ServiceConfig}; @@ -170,17 +173,38 @@ impl ConnectionManager { } } -pub async fn wait_or_shutdown(shutdown_tx: ShutdownSender, future2: F) -> Result<(), E> +pub async fn wait_or_shutdown( + shutdown_tx: ShutdownSender, + future2: F, + // a consumer that receives a precheck signal to start a task. + precheck_signal: Option>, +) -> Result<(), E> where F: Future>, { + if let Some(mut precheck_signal) = precheck_signal { + loop { + match precheck_signal.try_recv() { + // Received a signal to start the task + Ok(_) => break, + Err(TryRecvError::Empty) => + tracing::trace!("wait_or_shutdown precheck signal: waiting..."), + // Precheck signal failed. Cannot start the task. + Err(e) => { + tracing::error!("Error receiving precheck signal: {:?}", e); + return Ok(()); + }, + } + } + } + match run_cancelable(shutdown_tx.subscribe(), future2).await { TerminationStatus::Cancelled => { - tracing::trace!("Received shutdown signal"); + tracing::trace!("wait_or_shutdown(): Received shutdown signal"); Ok(()) }, TerminationStatus::Completed(res) => { - tracing::trace!("Sending shutdown signal"); + tracing::trace!("wait_or_shutdown(): Sending shutdown signal"); let _ = shutdown_tx.send(()); res }, diff --git a/clients/stellar-relay-lib/Cargo.toml b/clients/stellar-relay-lib/Cargo.toml index fc0cde651..f223d30cb 100644 --- a/clients/stellar-relay-lib/Cargo.toml +++ b/clients/stellar-relay-lib/Cargo.toml @@ -10,9 +10,10 @@ name = "stellar_relay_lib" path = "src/lib.rs" [dev-dependencies] -env_logger = "0.9.0" +ntest = "0.9.0" serial_test = "0.9.0" wallet = { path = "../wallet", features = ["testing-utils"] } +console-subscriber = { version = "0.3.0" } [dependencies] hex = "0.4.3" diff --git a/clients/stellar-relay-lib/examples/connect.rs b/clients/stellar-relay-lib/examples/connect.rs index 6e3283dc7..ed2a489be 100644 --- a/clients/stellar-relay-lib/examples/connect.rs +++ b/clients/stellar-relay-lib/examples/connect.rs @@ -1,14 +1,14 @@ use stellar_relay_lib::{ connect_to_stellar_overlay_network, sdk::types::{ScpStatementPledges, StellarMessage}, - StellarOverlayConfig, + Error, StellarOverlayConfig, }; use wallet::keys::get_source_secret_key_from_env; #[tokio::main] async fn main() -> Result<(), Box> { - env_logger::init(); + console_subscriber::init(); let args: Vec = std::env::args().collect(); let arg_network = if args.len() > 1 { &args[1] } else { "testnet" }; @@ -22,33 +22,48 @@ async fn main() -> Result<(), Box> { let secret_key = get_source_secret_key_from_env(arg_network == "mainnet"); - let mut overlay_connection = connect_to_stellar_overlay_network(cfg, &secret_key).await?; - - while let Ok(Some(msg)) = overlay_connection.listen() { - match msg { - StellarMessage::ScpMessage(msg) => { - let node_id = msg.statement.node_id.to_encoding(); - let node_id = base64::encode(&node_id); - let slot = msg.statement.slot_index; - - let stmt_type = match msg.statement.pledges { - ScpStatementPledges::ScpStPrepare(_) => "ScpStPrepare", - ScpStatementPledges::ScpStConfirm(_) => "ScpStConfirm", - ScpStatementPledges::ScpStExternalize(_) => "ScpStExternalize", - ScpStatementPledges::ScpStNominate(_) => "ScpStNominate ", - }; - tracing::info!( - "{} sent StellarMessage of type {} for ledger {}", - node_id, - stmt_type, - slot - ); + let mut overlay_connection = connect_to_stellar_overlay_network(cfg, secret_key).await?; + + loop { + match overlay_connection.listen().await { + Ok(Some(msg)) => match msg { + StellarMessage::Hello(_) => { + tracing::info!("received Hello message"); + }, + + StellarMessage::ScpMessage(msg) => { + let node_id = msg.statement.node_id.to_encoding(); + let node_id = base64::encode(&node_id); + let slot = msg.statement.slot_index; + + let stmt_type = match msg.statement.pledges { + ScpStatementPledges::ScpStPrepare(_) => "ScpStPrepare", + ScpStatementPledges::ScpStConfirm(_) => "ScpStConfirm", + ScpStatementPledges::ScpStExternalize(_) => "ScpStExternalize", + ScpStatementPledges::ScpStNominate(_) => "ScpStNominate ", + }; + tracing::info!( + "{} sent StellarMessage of type {} for ledger {}", + node_id, + stmt_type, + slot + ); + }, + _ => { + let _ = overlay_connection.send_to_node(StellarMessage::GetPeers).await; + }, + }, + Ok(None) => {}, + Err(Error::Timeout) => { + tracing::warn!("took more than a second to respond"); }, - _ => { - let _ = overlay_connection.send_to_node(StellarMessage::GetPeers).await; + Err(e) => { + tracing::error!("Error: {:?}", e); + break }, } } + tracing::info!("ooops, connection stopped "); Ok(()) } diff --git a/clients/stellar-relay-lib/src/config.rs b/clients/stellar-relay-lib/src/config.rs index a00d3953f..40753c0c7 100644 --- a/clients/stellar-relay-lib/src/config.rs +++ b/clients/stellar-relay-lib/src/config.rs @@ -35,13 +35,16 @@ impl StellarOverlayConfig { #[allow(dead_code)] pub(crate) fn node_info(&self) -> NodeInfo { - self.node_info.clone().into() + NodeInfo::new(&self.node_info) } #[allow(dead_code)] - pub(crate) fn connection_info(&self, secret_key: &str) -> Result { + pub(crate) fn connection_info( + &self, + secret_key_as_string: String, + ) -> Result { let cfg = &self.connection_info; - let secret_key = SecretKey::from_encoding(secret_key)?; + let secret_key = SecretKey::from_encoding(secret_key_as_string)?; let public_key = secret_key.get_public().to_encoding(); let public_key = std::str::from_utf8(&public_key).unwrap(); @@ -128,9 +131,9 @@ impl ConnectionInfoCfg { /// Returns the `StellarOverlayConnection` if connection is a success, otherwise an Error pub async fn connect_to_stellar_overlay_network( cfg: StellarOverlayConfig, - secret_key: &str, + secret_key_as_string: String, ) -> Result { - let conn_info = cfg.connection_info(secret_key)?; + let conn_info = cfg.connection_info(secret_key_as_string)?; let local_node = cfg.node_info; StellarOverlayConnection::connect(local_node.into(), conn_info).await diff --git a/clients/stellar-relay-lib/src/connection/connector/connector.rs b/clients/stellar-relay-lib/src/connection/connector/connector.rs index 80cbeaade..07c0f0b49 100644 --- a/clients/stellar-relay-lib/src/connection/connector/connector.rs +++ b/clients/stellar-relay-lib/src/connection/connector/connector.rs @@ -282,7 +282,7 @@ mod test { let cfg = StellarOverlayConfig::try_from_path(cfg_file_path).expect("should create a config"); let node_info = cfg.node_info(); - let conn_info = cfg.connection_info(&secret_key).expect("should create a connection info"); + let conn_info = cfg.connection_info(secret_key).expect("should create a connection info"); // this is a channel to communicate with the connection/config (this needs renaming) let connector = Connector::start(node_info.clone(), conn_info.clone()) @@ -333,7 +333,7 @@ mod test { cert: new_auth_cert, nonce: [0; 32], }; - connector.set_remote(RemoteInfo::new(&hello)); + connector.set_remote(RemoteInfo::new(hello)); assert!(connector.remote().is_some()); } @@ -357,7 +357,7 @@ mod test { cert: new_auth_cert, nonce: [0; 32], }; - connector.set_remote(RemoteInfo::new(&hello)); + connector.set_remote(RemoteInfo::new(hello)); assert_eq!(connector.remote().unwrap().sequence(), 0); connector.increment_remote_sequence().unwrap(); @@ -385,7 +385,7 @@ mod test { cert: new_auth_cert, nonce: [0; 32], }; - let remote = RemoteInfo::new(&hello); + let remote = RemoteInfo::new(hello); let remote_nonce = remote.nonce(); connector.set_remote(remote.clone()); diff --git a/clients/stellar-relay-lib/src/connection/connector/message_creation.rs b/clients/stellar-relay-lib/src/connection/connector/message_creation.rs index e7473976f..0bb1ce5d1 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_creation.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_creation.rs @@ -3,7 +3,10 @@ use crate::connection::{ Connector, Error, }; use substrate_stellar_sdk::{ - types::{AuthenticatedMessage, AuthenticatedMessageV0, HmacSha256Mac, StellarMessage}, + compound_types::LimitedString, + types::{ + AuthenticatedMessage, AuthenticatedMessageV0, ErrorCode, HmacSha256Mac, StellarMessage, + }, XdrCodec, }; @@ -75,3 +78,14 @@ impl Connector { ) } } + +/// Create our own error to send over to the user/outsider. +pub(crate) fn crate_specific_error() -> StellarMessage { + let error = "Stellar Relay Error".as_bytes().to_vec(); + let error = substrate_stellar_sdk::types::Error { + code: ErrorCode::ErrMisc, + msg: LimitedString::new(error).expect("should return a valid LimitedString"), + }; + + StellarMessage::ErrorMsg(error) +} diff --git a/clients/stellar-relay-lib/src/connection/connector/message_handler.rs b/clients/stellar-relay-lib/src/connection/connector/message_handler.rs index dab70ba58..ec72d6474 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_handler.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_handler.rs @@ -70,7 +70,7 @@ impl Connector { match msg { StellarMessage::Hello(hello) => { // update the node info based on the hello message - self.process_hello_message(hello)?; + self.process_hello_message(hello.clone())?; self.got_hello(); @@ -80,6 +80,8 @@ impl Connector { self.send_auth_message(self.local().node().overlay_version).await?; } info!("process_stellar_message(): Hello message processed successfully"); + // Pass the hello message to the user/outsider. To signal that the Overlay is ready. + return Ok(Some(StellarMessage::Hello(hello))); }, StellarMessage::Auth(_) => { @@ -132,7 +134,7 @@ impl Connector { return Err(Error::AuthCertInvalid); } - let remote_info = RemoteInfo::new(&hello); + let remote_info = RemoteInfo::new(hello); let shared_key = self.get_shared_key(remote_info.pub_key_ecdh()); self.set_hmac_keys(HMacKeys::new( diff --git a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs index 98d901b52..07694e157 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs @@ -1,4 +1,7 @@ -use crate::connection::{xdr_converter::get_xdr_message_length, Connector, Error, Xdr}; +use crate::connection::{ + connector::message_creation::crate_specific_error, xdr_converter::get_xdr_message_length, + Connector, Error, Xdr, +}; use async_std::io::ReadExt; use std::time::Duration; use substrate_stellar_sdk::{types::StellarMessage, XdrCodec}; @@ -6,7 +9,7 @@ use tokio::{ sync::{mpsc, mpsc::error::TryRecvError}, time::timeout, }; -use tracing::{debug, error, info, trace, warn}; +use tracing::{error, info, trace, warn}; /// The waiting time for reading messages from stream. static READ_TIMEOUT_IN_SECS: u64 = 60; @@ -39,30 +42,44 @@ pub(crate) async fn poll_messages_from_stellar( error!("poll_messages_from_stellar(): Error occurred during sending message to node: {e:?}"); }, Err(TryRecvError::Disconnected) => break, - Err(TryRecvError::Empty) => {}, + Err(TryRecvError::Empty) => { + // there's no message from user; wait for the next iteration + tokio::task::yield_now().await; + }, } // check for messages from Stellar Node. - let xdr = match read_message_from_stellar(&mut connector).await { - Err(e) => { + // if reading took too much time, flag it as "disconnected" + let xdr = match timeout( + Duration::from_secs(READ_TIMEOUT_IN_SECS), + read_message_from_stellar(&mut connector), + ) + .await + { + Ok(Ok(xdr)) => xdr, + Ok(Err(e)) => { error!("poll_messages_from_stellar(): {e:?}"); - break; + break + }, + Err(_) => { + error!("poll_messages_from_stellar(): timed out"); + break }, - Ok(xdr) => xdr, }; match connector.process_raw_message(xdr).await { - Ok(Some(stellar_msg)) => - // push message to user - { - if let Err(e) = send_to_user_sender.send(stellar_msg.clone()).await { + Ok(Some(stellar_msg)) => { + // push message to user + let stellar_msg_as_base64_xdr = stellar_msg.to_base64_xdr(); + if let Err(e) = send_to_user_sender.send(stellar_msg).await { warn!("poll_messages_from_stellar(): Error occurred during sending message {} to user: {e:?}", - String::from_utf8(stellar_msg.to_base64_xdr()) - .unwrap_or_else(|_| format!("{:?}", stellar_msg.to_base64_xdr())) - ); + String::from_utf8(stellar_msg_as_base64_xdr.clone()) + .unwrap_or_else(|_| format!("{stellar_msg_as_base64_xdr:?}")) + ); } + tokio::task::yield_now().await; }, - Ok(None) => {}, + Ok(None) => tokio::task::yield_now().await, Err(e) => { error!("poll_messages_from_stellar(): Error occurred during processing xdr message: {e:?}"); break; @@ -70,12 +87,20 @@ pub(crate) async fn poll_messages_from_stellar( } } + // push error to user + if let Err(e) = send_to_user_sender.send(crate_specific_error()).await { + warn!( + "poll_messages_from_stellar(): Error occurred during sending message {} to user: {e:?}", + e + ); + } + // make sure to shutdown the connector connector.stop(); send_to_node_receiver.close(); drop(send_to_user_sender); - debug!("poll_messages_from_stellar(): stopped."); + info!("poll_messages_from_stellar(): stopped."); } /// Returns Xdr format of the `StellarMessage` sent from the Stellar Node @@ -90,13 +115,8 @@ async fn read_message_from_stellar(connector: &mut Connector) -> Result continue, + match connector.tcp_stream.read(&mut buff_for_reading).await { + Ok(0) => continue, Ok(_) if lack_bytes_from_prev == 0 => { // if there are no more bytes lacking from the previous message, // then check the size of next stellar message. @@ -112,7 +132,7 @@ async fn read_message_from_stellar(connector: &mut Connector) -> Result Result continue, - Ok(Some(xdr)) => return Ok(xdr), + Ok(false) => continue, + Ok(true) => return Ok(readbuf), Err(e) => { trace!("read_message_from_stellar(): ERROR: {e:?}"); - return Err(e); + return Err(e) }, } }, - Ok(Ok(size)) => { + Ok(size) => { // The next few bytes was read. Add it to the readbuf. lack_bytes_from_prev = lack_bytes_from_prev.saturating_sub(size); readbuf.append(&mut buff_for_reading); @@ -136,30 +156,30 @@ async fn read_message_from_stellar(connector: &mut Connector) -> Result continue, - Ok(Some(xdr)) => return Ok(xdr), + Ok(false) => continue, + Ok(true) => return Ok(readbuf), Err(e) => { trace!("read_message_from_stellar(): ERROR: {e:?}"); - return Err(e); + return Err(e) }, } }, - Ok(Err(e)) => { + Err(e) => { trace!("read_message_from_stellar(): ERROR reading messages: {e:?}"); - return Err(Error::ReadFailed(e.to_string())); - }, - Err(_) => { - trace!("read_message_from_stellar(): reading time elapsed."); - return Err(Error::Timeout); + return Err(Error::ReadFailed(e.to_string())) }, } } } -/// Returns Xdr when all bytes from the stream have successfully been converted; else None. +/// Returns true when all bytes from the stream have successfully been converted; else false. /// This reads a number of bytes based on the expected message length. /// /// # Arguments @@ -168,12 +188,12 @@ async fn read_message_from_stellar(connector: &mut Connector) -> Result, xpect_msg_len: usize, -) -> Result, Error> { +) -> Result { let actual_msg_len = connector .tcp_stream .read(readbuf) @@ -182,7 +202,7 @@ async fn read_message( // only when the message has the exact expected size bytes, should we send to user. if actual_msg_len == xpect_msg_len { - return Ok(Some(readbuf.clone())); + return Ok(true) } // The next bytes are remnants from the previous stellar message. @@ -193,10 +213,10 @@ async fn read_message( "read_message(): received only partial message. Need {lack_bytes_from_prev} bytes to complete." ); - Ok(None) + Ok(false) } -/// Returns Xdr when all bytes from the stream have successfully been converted; else None. +/// Returns true when all bytes from the stream have successfully been converted; else false. /// Reads a continuation of bytes that belong to the previous message /// /// # Arguments @@ -204,11 +224,11 @@ async fn read_message( /// Stellar Node /// * `lack_bytes_from_prev` - the number of bytes remaining, to complete the previous message /// * `readbuf` - the buffer that holds the bytes of the previous and incomplete message -async fn read_unfinished_message( +async fn is_reading_unfinished_message_complete( connector: &mut Connector, lack_bytes_from_prev: &mut usize, readbuf: &mut Vec, -) -> Result, Error> { +) -> Result { // let's read the continuation number of bytes from the previous message. let mut cont_buf = vec![0; *lack_bytes_from_prev]; @@ -223,7 +243,7 @@ async fn read_unfinished_message( trace!("read_unfinished_message(): received continuation from the previous message."); readbuf.append(&mut cont_buf); - return Ok(Some(readbuf.clone())); + return Ok(true) } // this partial message is not enough to complete the previous message. @@ -236,5 +256,5 @@ async fn read_unfinished_message( ); } - Ok(None) + Ok(false) } diff --git a/clients/stellar-relay-lib/src/connection/connector/message_sender.rs b/clients/stellar-relay-lib/src/connection/connector/message_sender.rs index 70cc1f4cc..8b676342f 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_sender.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_sender.rs @@ -1,14 +1,13 @@ use async_std::io::WriteExt; use std::time::Duration; -use substrate_stellar_sdk::types::{MessageType, StellarMessage}; +use substrate_stellar_sdk::{ + types::{MessageType, StellarMessage}, + StellarTypeToBase64String, +}; use tokio::time::timeout; use tracing::debug; -use crate::connection::{ - handshake::create_auth_message, - helper::{time_now, to_base64_xdr_string}, - Connector, Error, -}; +use crate::connection::{handshake::create_auth_message, helper::time_now, Connector, Error}; impl Connector { pub async fn send_to_node(&mut self, msg: StellarMessage) -> Result<(), Error> { @@ -28,7 +27,7 @@ impl Connector { pub async fn send_hello_message(&mut self) -> Result<(), Error> { let msg = self.create_hello_message(time_now())?; - debug!("send_hello_message(): Sending Hello Message: {}", to_base64_xdr_string(&msg)); + debug!("send_hello_message(): Sending Hello Message: {}", msg.as_base64_encoded_string()); self.send_to_node(msg).await } @@ -38,7 +37,7 @@ impl Connector { local_overlay_version: u32, ) -> Result<(), Error> { let msg = create_auth_message(local_overlay_version); - debug!("send_auth_message(): Sending Auth Message: {}", to_base64_xdr_string(&msg)); + debug!("send_auth_message(): Sending Auth Message: {}", msg.as_base64_encoded_string()); return self.send_to_node(create_auth_message(local_overlay_version)).await; } diff --git a/clients/stellar-relay-lib/src/connection/helper.rs b/clients/stellar-relay-lib/src/connection/helper.rs index 6c049cd0f..0bffe0f6f 100644 --- a/clients/stellar-relay-lib/src/connection/helper.rs +++ b/clients/stellar-relay-lib/src/connection/helper.rs @@ -35,8 +35,3 @@ pub fn error_to_string(e: Error) -> String { format!("Error{{ code:{:?} message:{msg} }}", e.code) } - -pub fn to_base64_xdr_string(msg: &T) -> String { - let xdr = msg.to_base64_xdr(); - String::from_utf8(xdr.clone()).unwrap_or(format!("{:?}", xdr)) -} diff --git a/clients/stellar-relay-lib/src/node/mod.rs b/clients/stellar-relay-lib/src/node/mod.rs index bd6fc642a..c44667c3d 100644 --- a/clients/stellar-relay-lib/src/node/mod.rs +++ b/clients/stellar-relay-lib/src/node/mod.rs @@ -19,6 +19,19 @@ pub struct NodeInfo { pub network_id: NetworkId, } +impl NodeInfo { + pub(crate) fn new(cfg: &NodeInfoCfg) -> Self { + let network: &Network = if cfg.is_pub_net { &PUBLIC_NETWORK } else { &TEST_NETWORK }; + NodeInfo { + ledger_version: cfg.ledger_version, + overlay_version: cfg.overlay_version, + overlay_min_version: cfg.overlay_min_version, + version_str: cfg.version_str.clone(), + network_id: *network.get_id(), + } + } +} + impl Debug for NodeInfo { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("NodeInfo") diff --git a/clients/stellar-relay-lib/src/node/remote.rs b/clients/stellar-relay-lib/src/node/remote.rs index ee6752d15..580dcb932 100644 --- a/clients/stellar-relay-lib/src/node/remote.rs +++ b/clients/stellar-relay-lib/src/node/remote.rs @@ -36,11 +36,11 @@ impl Debug for RemoteInfo { } impl RemoteInfo { - pub fn new(hello: &Hello) -> Self { + pub fn new(hello: Hello) -> Self { RemoteInfo { sequence: 0, - pub_key_ecdh: hello.cert.pubkey.clone(), - pub_key: hello.peer_id.clone(), + pub_key_ecdh: hello.cert.pubkey, + pub_key: hello.peer_id, nonce: hello.nonce, node: NodeInfo { ledger_version: hello.ledger_version, diff --git a/clients/stellar-relay-lib/src/overlay.rs b/clients/stellar-relay-lib/src/overlay.rs index 6ff44320a..4f1f6bad8 100644 --- a/clients/stellar-relay-lib/src/overlay.rs +++ b/clients/stellar-relay-lib/src/overlay.rs @@ -1,23 +1,19 @@ -use substrate_stellar_sdk::types::{ErrorCode, StellarMessage}; +use substrate_stellar_sdk::types::StellarMessage; use tokio::sync::{ mpsc, - mpsc::{ - error::{SendError, TryRecvError}, - Sender, - }, + mpsc::{error::SendError, Sender}, }; use tracing::{error, info}; use crate::{ connection::{poll_messages_from_stellar, ConnectionInfo, Connector}, - helper::error_to_string, node::NodeInfo, Error, }; /// Used to send/receive messages to/from Stellar Node pub struct StellarOverlayConnection { - sender: mpsc::Sender, + sender: Sender, receiver: mpsc::Receiver, } @@ -44,6 +40,17 @@ impl StellarOverlayConnection { let connector = Connector::start(local_node_info, conn_info).await?; + #[cfg(tokio_unstable)] + tokio::task::Builder::new() + .name("Poll Stellar Messages") + .spawn(poll_messages_from_stellar( + connector, + send_to_user_sender, + send_to_node_receiver, + )) + .expect("Failed to spawn poll_messages_from_stellar"); + + #[cfg(not(tokio_unstable))] tokio::spawn(poll_messages_from_stellar( connector, send_to_user_sender, @@ -56,26 +63,16 @@ impl StellarOverlayConnection { }) } - pub fn listen(&mut self) -> Result, Error> { - loop { - if !self.is_alive() { - return Err(Error::Disconnected); - } - - match self.receiver.try_recv() { - Ok(StellarMessage::ErrorMsg(e)) => { - error!("listen(): received error message: {e:?}"); - if e.code == ErrorCode::ErrConf || e.code == ErrorCode::ErrAuth { - return Err(Error::ConnectionFailed(error_to_string(e))); - } - - return Ok(None); - }, - Ok(msg) => return Ok(Some(msg)), - Err(TryRecvError::Disconnected) => return Err(Error::Disconnected), - Err(TryRecvError::Empty) => continue, - } + /// Listens for upcoming messages from Stellar Node via a receiver. + /// The sender pair can be found in [fn + /// poll_messages_from_stellar](../src/connection/connector/message_reader.rs) + pub async fn listen(&mut self) -> Result, Error> { + if !self.is_alive() { + error!("listen(): sender half of overlay has closed."); + return Err(Error::Disconnected) } + + Ok(self.receiver.recv().await) } pub fn is_alive(&mut self) -> bool { diff --git a/clients/stellar-relay-lib/src/tests/mod.rs b/clients/stellar-relay-lib/src/tests/mod.rs index e635a3493..b9402f54e 100644 --- a/clients/stellar-relay-lib/src/tests/mod.rs +++ b/clients/stellar-relay-lib/src/tests/mod.rs @@ -1,7 +1,7 @@ use crate::{ connection::ConnectionInfo, node::NodeInfo, StellarOverlayConfig, StellarOverlayConnection, }; -use async_std::{future::timeout, sync::Mutex}; +use async_std::sync::Mutex; use serial_test::serial; use std::{sync::Arc, thread::sleep, time::Duration}; use substrate_stellar_sdk::{ @@ -31,10 +31,7 @@ fn overlay_infos(is_mainnet: bool) -> (NodeInfo, ConnectionInfo) { let cfg = StellarOverlayConfig::try_from_path(&path).expect("should be able to extract config"); - ( - cfg.node_info(), - cfg.connection_info(&secret_key(is_mainnet)).expect("should return conn info"), - ) + (cfg.node_info(), cfg.connection_info(secret_key(is_mainnet)).expect("should return conn info")) } #[tokio::test(flavor = "multi_thread")] @@ -43,23 +40,33 @@ async fn stellar_overlay_should_receive_scp_messages() { let (node_info, conn_info) = overlay_infos(false); let overlay_connection = Arc::new(Mutex::new( - StellarOverlayConnection::connect(node_info, conn_info).await.unwrap(), + StellarOverlayConnection::connect(node_info, conn_info) + .await + .expect("should connect"), )); + + // to check if we receive any scp message from stellar node + let (sender, receiver) = tokio::sync::oneshot::channel(); let ov_conn = overlay_connection.clone(); let scps_vec = Arc::new(Mutex::new(vec![])); let scps_vec_clone = scps_vec.clone(); - - timeout(Duration::from_secs(300), async move { + tokio::spawn(async move { let mut ov_conn_locked = ov_conn.lock().await; - if let Ok(Some(msg)) = ov_conn_locked.listen() { - scps_vec_clone.lock().await.push(msg); - - ov_conn_locked.stop(); + loop { + if let Some(msg) = ov_conn_locked.listen().await.expect("should return a message") { + scps_vec_clone.lock().await.push(msg); + sender.send(()).unwrap(); + break; + } } + + ov_conn_locked.stop(); }) .await - .expect("time has elapsed"); + .expect("should finish"); + + let _ = receiver.await.expect("should receive a message"); //assert //ensure that we receive some scp message from stellar node @@ -68,6 +75,7 @@ async fn stellar_overlay_should_receive_scp_messages() { #[tokio::test(flavor = "multi_thread")] #[serial] +#[ntest::timeout(300_000)] // timeout at 5 minutes async fn stellar_overlay_should_receive_tx_set() { //arrange fn get_tx_set_hash(x: &ScpStatementExternalize) -> Hash { @@ -86,10 +94,10 @@ async fn stellar_overlay_should_receive_tx_set() { let tx_set_hashes_clone = tx_set_hashes.clone(); let actual_tx_set_hashes_clone = actual_tx_set_hashes.clone(); - timeout(Duration::from_secs(500), async move { - let mut ov_conn_locked = ov_conn.lock().await; + let mut ov_conn_locked = ov_conn.lock().await; - while let Ok(Some(msg)) = ov_conn_locked.listen() { + loop { + if let Ok(Some(msg)) = ov_conn_locked.listen().await { match msg { StellarMessage::ScpMessage(msg) => { if let ScpStatementPledges::ScpStExternalize(stmt) = &msg.statement.pledges { @@ -114,9 +122,7 @@ async fn stellar_overlay_should_receive_tx_set() { _ => {}, } } - }) - .await - .expect("time has elapsed"); + } //ensure that we receive some tx set from stellar node let expected_hashes = tx_set_hashes.lock().await; @@ -130,14 +136,24 @@ async fn stellar_overlay_should_receive_tx_set() { #[tokio::test(flavor = "multi_thread")] #[serial] +#[ntest::timeout(300_000)] // timeout at 5 minutes async fn stellar_overlay_disconnect_works() { let (node_info, conn_info) = overlay_infos(false); let mut overlay_connection = StellarOverlayConnection::connect(node_info.clone(), conn_info).await.unwrap(); - // let it run for a second, before disconnecting. - sleep(Duration::from_secs(1)); + loop { + if let Some(message) = overlay_connection.listen().await.expect("should return a message") { + match message { + // fail the test case if an error message is received + StellarMessage::ErrorMsg(_) => panic!("Error message received: {:?}", message), + // it means it has received a message from stellar node + _ => break, + } + } + } + overlay_connection.stop(); // let the disconnection call pass for a few seconds, before checking its status. diff --git a/clients/vault/Cargo.toml b/clients/vault/Cargo.toml index 8c7688a78..fb523ed18 100644 --- a/clients/vault/Cargo.toml +++ b/clients/vault/Cargo.toml @@ -66,7 +66,6 @@ sp-runtime = { git = "https://github.com/paritytech/polkadot-sdk", branch = "rel sp-std = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.1.0" } parking_lot = "0.12.1" -err-derive = "0.3.1" flate2 = "1.0" [dev-dependencies] diff --git a/clients/vault/resources/config/mainnet/stellar_relay_config_frankfurt.json b/clients/vault/resources/config/mainnet/stellar_relay_config_frankfurt.json index 7a93a15aa..7eabe4eb4 100644 --- a/clients/vault/resources/config/mainnet/stellar_relay_config_frankfurt.json +++ b/clients/vault/resources/config/mainnet/stellar_relay_config_frankfurt.json @@ -7,7 +7,7 @@ "ledger_version": 21, "overlay_version": 34, "overlay_min_version": 32, - "version_str": "stellar-core 21.1.0 (b3aeb14cc798f6d11deb2be913041be916f3b0cc)", + "version_str": "stellar-core 21.2.0 (d78f48eacabb51753e34443de7618b956e61c59f)", "is_pub_net": true }, "stellar_history_archive_urls": [ diff --git a/clients/vault/resources/config/mainnet/stellar_relay_config_iowa.json b/clients/vault/resources/config/mainnet/stellar_relay_config_iowa.json index 697f731af..d2e9aa416 100644 --- a/clients/vault/resources/config/mainnet/stellar_relay_config_iowa.json +++ b/clients/vault/resources/config/mainnet/stellar_relay_config_iowa.json @@ -7,7 +7,7 @@ "ledger_version": 21, "overlay_version": 34, "overlay_min_version": 32, - "version_str": "stellar-core 21.1.0 (b3aeb14cc798f6d11deb2be913041be916f3b0cc)", + "version_str": "stellar-core 21.2.0 (d78f48eacabb51753e34443de7618b956e61c59f)", "is_pub_net": true }, "stellar_history_archive_urls": [ diff --git a/clients/vault/resources/config/mainnet/stellar_relay_config_singapore.json b/clients/vault/resources/config/mainnet/stellar_relay_config_singapore.json index 504213c9e..2e2c23670 100644 --- a/clients/vault/resources/config/mainnet/stellar_relay_config_singapore.json +++ b/clients/vault/resources/config/mainnet/stellar_relay_config_singapore.json @@ -7,7 +7,7 @@ "ledger_version": 21, "overlay_version": 34, "overlay_min_version": 32, - "version_str": "stellar-core 21.1.0 (b3aeb14cc798f6d11deb2be913041be916f3b0cc)", + "version_str": "stellar-core 21.2.0 (d78f48eacabb51753e34443de7618b956e61c59f)", "is_pub_net": true }, "stellar_history_archive_urls": [ diff --git a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest1.json b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest1.json index 44e908d25..070253e8c 100644 --- a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest1.json +++ b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest1.json @@ -5,9 +5,9 @@ }, "node_info": { "ledger_version": 21, - "overlay_version": 34, - "overlay_min_version": 32, - "version_str": "stellar-core 21.2.0 (d78f48eacabb51753e34443de7618b956e61c59f)", + "overlay_version": 35, + "overlay_min_version": 33, + "version_str": "stellar-core 21.3.1 (4ede19620438bcd136276cdc8d4ed1f2c3b64624)", "is_pub_net": false }, "stellar_history_archive_urls": [ diff --git a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest2.json b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest2.json index 3b431ef78..003d98e12 100644 --- a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest2.json +++ b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest2.json @@ -5,9 +5,9 @@ }, "node_info": { "ledger_version": 21, - "overlay_version": 34, - "overlay_min_version": 32, - "version_str": "stellar-core 21.2.0 (d78f48eacabb51753e34443de7618b956e61c59f)", + "overlay_version": 35, + "overlay_min_version": 33, + "version_str": "stellar-core 21.3.1 (4ede19620438bcd136276cdc8d4ed1f2c3b64624)", "is_pub_net": false }, "stellar_history_archive_urls": [ diff --git a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest3.json b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest3.json index 404ded3bb..9e6cbaf2d 100644 --- a/clients/vault/resources/config/testnet/stellar_relay_config_sdftest3.json +++ b/clients/vault/resources/config/testnet/stellar_relay_config_sdftest3.json @@ -5,9 +5,9 @@ }, "node_info": { "ledger_version": 21, - "overlay_version": 34, - "overlay_min_version": 32, - "version_str": "stellar-core 21.2.0 (d78f48eacabb51753e34443de7618b956e61c59f)", + "overlay_version": 35, + "overlay_min_version": 33, + "version_str": "stellar-core 21.3.1 (4ede19620438bcd136276cdc8d4ed1f2c3b64624)", "is_pub_net": false }, "stellar_history_archive_urls": [ diff --git a/clients/vault/src/error.rs b/clients/vault/src/error.rs index 7a68dca40..6307250ba 100644 --- a/clients/vault/src/error.rs +++ b/clients/vault/src/error.rs @@ -1,4 +1,3 @@ -use jsonrpc_core_client::RpcError; use sp_std::str::Utf8Error; use thiserror::Error; use tokio_stream::wrappers::errors::BroadcastStreamRecvError; @@ -21,8 +20,6 @@ pub enum Error { #[error("Faucet url not set")] FaucetUrlNotSet, - #[error("RPC error: {0}")] - RpcError(#[from] RpcError), #[error("RuntimeError: {0}")] RuntimeError(#[from] RuntimeError), #[error("BroadcastStreamRecvError: {0}")] diff --git a/clients/vault/src/issue.rs b/clients/vault/src/issue.rs index 6ceee56aa..e1c445aa3 100644 --- a/clients/vault/src/issue.rs +++ b/clients/vault/src/issue.rs @@ -14,7 +14,7 @@ use wallet::{ types::FilterWith, LedgerTxEnvMap, Slot, SlotTask, SlotTaskStatus, TransactionResponse, }; -use crate::{oracle::OracleAgent, ArcRwLock, Error, Event}; +use crate::{oracle::OracleAgent, tokio_spawn, ArcRwLock, Error, Event}; fn is_vault(p1: &PublicKey, p2_raw: [u8; 32]) -> bool { return *p1.as_binary() == p2_raw; @@ -26,6 +26,7 @@ pub(crate) async fn initialize_issue_set( issue_set: &ArcRwLock, memos_to_issue_ids: &ArcRwLock, ) -> Result<(), Error> { + tracing::debug!("initialize_issue_set(): started"); let (mut issue_set, mut memos_to_issue_ids, requests) = future::join3( issue_set.write(), memos_to_issue_ids.write(), @@ -60,6 +61,7 @@ pub async fn listen_for_issue_requests( issues: ArcRwLock, memos_to_issue_ids: ArcRwLock, ) -> Result<(), ServiceError> { + tracing::info!("listen_for_issue_requests(): started"); // Use references to prevent 'moved closure' errors let parachain_rpc = ¶chain_rpc; let vault_public_key = &vault_public_key; @@ -113,6 +115,7 @@ pub async fn listen_for_issue_cancels( issues: ArcRwLock, memos_to_issue_ids: ArcRwLock, ) -> Result<(), ServiceError> { + tracing::info!("listen_for_issue_cancels(): started"); let issues = &issues; let memos_to_issue_ids = &memos_to_issue_ids; @@ -145,6 +148,7 @@ pub async fn listen_for_executed_issues( issues: ArcRwLock, memos_to_issue_ids: ArcRwLock, ) -> Result<(), ServiceError> { + tracing::info!("listen_for_executed_issues(): started"); let issues = &issues; let memos_to_issue_ids = &memos_to_issue_ids; @@ -253,6 +257,7 @@ pub async fn process_issues_requests( issues: ArcRwLock, memos_to_issue_ids: ArcRwLock, ) -> Result<(), ServiceError> { + tracing::info!("process_issue_requests(): started"); // collects all the tasks that are executed or about to be executed. let mut processed_map = HashMap::new(); @@ -267,15 +272,18 @@ pub async fn process_issues_requests( continue; }; - tokio::spawn(execute_issue( - parachain_rpc.clone(), - tx_env.clone(), - issues.clone(), - memos_to_issue_ids.clone(), - oracle_agent.clone(), - *slot, - sender, - )); + tokio_spawn( + "execute_issue", + execute_issue( + parachain_rpc.clone(), + tx_env.clone(), + issues.clone(), + memos_to_issue_ids.clone(), + oracle_agent.clone(), + *slot, + sender, + ), + ); } // Give 5 seconds interval before starting again. diff --git a/clients/vault/src/lib.rs b/clients/vault/src/lib.rs index f9499e71d..dfab04ec0 100644 --- a/clients/vault/src/lib.rs +++ b/clients/vault/src/lib.rs @@ -59,3 +59,17 @@ cfg_if::cfg_if! { pub type DecimalsLookupImpl = primitives::AmplitudeDecimalsLookup; } } + +pub fn tokio_spawn(_task_name: &str, future: F) -> tokio::task::JoinHandle +where + F: std::future::Future + Send + 'static, + F::Output: Send + 'static, +{ + cfg_if::cfg_if! { + if #[cfg(all(tokio_unstable, feature = "allow-debugger"))] { + tokio::task::Builder::new().name(_task_name).spawn(future).unwrap() + } else { + tokio::spawn(future) + } + } +} diff --git a/clients/vault/src/main.rs b/clients/vault/src/main.rs index 5a2d13f7e..a9099fdff 100644 --- a/clients/vault/src/main.rs +++ b/clients/vault/src/main.rs @@ -20,7 +20,7 @@ use signal_hook_tokio::Signals; use vault::{ metrics::{self, increment_restart_counter}, process::PidFile, - Error, VaultService, VaultServiceConfig, ABOUT, AUTHORS, NAME, VERSION, + tokio_spawn, Error, VaultService, VaultServiceConfig, ABOUT, AUTHORS, NAME, VERSION, }; #[derive(Parser)] @@ -70,7 +70,7 @@ async fn catch_signals( where F: Future>> + Send + 'static, { - let blocking_task = tokio::task::spawn(future); + let blocking_task = tokio_spawn("blocking task", future); tokio::select! { res = blocking_task => { return res?; @@ -117,7 +117,7 @@ async fn start() -> Result<(), ServiceError> { ); let prometheus_port = opts.monitoring.prometheus_port; - tokio::task::spawn(async move { + tokio_spawn("Prometheus", async move { warp::serve(metrics_route) .run(SocketAddr::new(prometheus_host.into(), prometheus_port)) .await; @@ -149,7 +149,11 @@ async fn start() -> Result<(), ServiceError> { #[tokio::main] async fn main() { #[cfg(feature = "allow-debugger")] - console_subscriber::init(); + console_subscriber::ConsoleLayer::builder() + .with_default_env() + .publish_interval(std::time::Duration::from_secs(2)) + .event_buffer_capacity(1024 * 500) + .init(); let exit_code = if let Err(err) = start().await { tracing::error!("Exiting: {}", err); @@ -164,17 +168,21 @@ async fn main() { mod tests { use std::{thread, time::Duration}; + use vault::tokio_spawn; + use super::*; #[tokio::test] async fn test_vault_termination_signal() { let termination_signals = &[SIGHUP, SIGTERM, SIGINT, SIGQUIT]; for sig in termination_signals { - let task = - tokio::spawn(catch_signals(Signals::new(termination_signals).unwrap(), async { + let task = tokio_spawn( + "catch signals", + catch_signals(Signals::new(termination_signals).unwrap(), async { tokio::time::sleep(Duration::from_millis(100_000)).await; Ok(()) - })); + }), + ); // Wait for the signals iterator to be polled // This `sleep` is based on the test case in `signal-hook-tokio` itself: // https://github.com/vorner/signal-hook/blob/a9e5ca5e46c9c8e6de89ff1b3ce63c5ff89cd708/signal-hook-tokio/tests/tests.rs#L50 diff --git a/clients/vault/src/oracle/agent.rs b/clients/vault/src/oracle/agent.rs index dc72c8982..366086f78 100644 --- a/clients/vault/src/oracle/agent.rs +++ b/clients/vault/src/oracle/agent.rs @@ -1,30 +1,109 @@ use std::{sync::Arc, time::Duration}; +use primitives::stellar::StellarTypeToBase64String; use tokio::{ - sync::{mpsc, mpsc::error::TryRecvError, RwLock}, - time::{sleep, timeout}, + sync::RwLock, + time::{sleep, timeout, Instant}, }; use runtime::ShutdownSender; use stellar_relay_lib::{ - connect_to_stellar_overlay_network, helper::to_base64_xdr_string, sdk::types::StellarMessage, - StellarOverlayConfig, + connect_to_stellar_overlay_network, sdk::types::StellarMessage, StellarOverlayConfig, + StellarOverlayConnection, }; -use crate::oracle::{ - collector::ScpMessageCollector, errors::Error, types::StellarMessageSender, AddTxSet, Proof, +use crate::{ + oracle::{ + collector::ScpMessageCollector, errors::Error, types::StellarMessageSender, AddTxSet, Proof, + }, + ArcRwLock, }; use wallet::Slot; +/// The interval to check if we are still receiving messages from Stellar Relay +const STELLAR_RELAY_HEALTH_CHECK_IN_SECS: u64 = 600; + pub struct OracleAgent { - collector: Arc>, + pub collector: ArcRwLock, pub is_public_network: bool, /// sends message directly to Stellar Node - message_sender: Option, + message_sender: StellarMessageSender, + overlay_conn: ArcRwLock, /// sends an entire Vault shutdown shutdown_sender: ShutdownSender, - /// sends a 'stop' signal to `StellarOverlayConnection` poll - overlay_conn_end_signal: mpsc::Sender<()>, +} + +impl OracleAgent { + // the interval for every build_proof retry + const BUILD_PROOF_INTERVAL: u64 = 10; + + pub async fn new( + config: &StellarOverlayConfig, + secret_key_as_string: String, + shutdown_sender: ShutdownSender, + ) -> Result { + let is_public_network = config.is_public_network(); + + let collector = Arc::new(RwLock::new(ScpMessageCollector::new( + is_public_network, + config.stellar_history_archive_urls(), + ))); + + let overlay_conn = + connect_to_stellar_overlay_network(config.clone(), secret_key_as_string).await?; + let message_sender = overlay_conn.sender(); + + let overlay_conn = Arc::new(RwLock::new(overlay_conn)); + + Ok(OracleAgent { + collector, + is_public_network, + message_sender, + overlay_conn, + shutdown_sender, + }) + } + + /// This method returns the proof for a given slot or an error if the proof cannot be provided. + /// The agent will try every possible way to get the proof before returning an error. + pub async fn get_proof(&self, slot: Slot) -> Result { + let collector = self.collector.clone(); + + #[cfg(test)] + let timeout_seconds = 180; + + #[cfg(not(test))] + let timeout_seconds = 60; + + timeout(Duration::from_secs(timeout_seconds), async move { + loop { + tracing::debug!("get_proof(): attempt to build proof for slot {slot}"); + let collector = collector.read().await; + match collector.build_proof(slot, &self.message_sender).await { + None => { + drop(collector); + // give enough interval for every retry + sleep(Duration::from_secs(OracleAgent::BUILD_PROOF_INTERVAL)).await; + continue + }, + Some(proof) => { + tracing::info!("get_proof(): Successfully build proof for slot {slot}"); + tracing::trace!(" with proof: {proof:?}"); + return Ok(proof) + }, + } + } + }) + .await + .map_err(|_| { + Error::ProofTimeout(format!("Timeout elapsed for building proof of slot {slot}")) + })? + } + + #[cfg(any(test, feature = "integration"))] + pub async fn is_stellar_running(&self) -> bool { + self.collector.read().await.last_slot_index() > 0 + } } /// listens to data to collect the scp messages and txsets. @@ -57,146 +136,84 @@ async fn handle_message( Ok(()) } -/// Start the connection to the Stellar Node. -/// Returns an `OracleAgent` that will handle incoming messages from Stellar Node, -/// and to send messages to Stellar Node -pub async fn start_oracle_agent( - config: StellarOverlayConfig, - secret_key: &str, +pub async fn listen_for_stellar_messages( + oracle_agent: Arc, shutdown_sender: ShutdownSender, -) -> Result { - let is_public_network = config.is_public_network(); - - tracing::info!("start_oracle_agent(): Starting connection to Stellar overlay network..."); - - let mut overlay_conn = connect_to_stellar_overlay_network(config.clone(), secret_key).await?; - // use StellarOverlayConnection's sender to send message to Stellar - let sender = overlay_conn.sender(); - - let collector = Arc::new(RwLock::new(ScpMessageCollector::new( - config.is_public_network(), - config.stellar_history_archive_urls(), - ))); - let collector_clone = collector.clone(); - - let shutdown_sender_clone = shutdown_sender.clone(); - // disconnect signal sender tells the StellarOverlayConnection to close its TcpStream to Stellar - // Node - let (disconnect_signal_sender, mut disconnect_signal_receiver) = mpsc::channel::<()>(2); - - tokio::spawn(async move { - let sender_clone = overlay_conn.sender(); - loop { - match disconnect_signal_receiver.try_recv() { - // if a disconnect signal was sent, disconnect from Stellar. - Ok(_) | Err(TryRecvError::Disconnected) => { - tracing::info!("start_oracle_agent(): disconnect overlay..."); - break; - }, - Err(TryRecvError::Empty) => {}, - } +) -> Result<(), service::Error> { + tracing::info!( + "listen_for_stellar_messages(): Starting connection to Stellar overlay network..." + ); + + let mut overlay_conn = oracle_agent.overlay_conn.write().await; + + // log a new message received. + let health_check_interval = Duration::from_secs(STELLAR_RELAY_HEALTH_CHECK_IN_SECS); + + let mut next_time = Instant::now() + health_check_interval; + loop { + let collector = oracle_agent.collector.clone(); + + match overlay_conn.listen().await { + Ok(None) => {}, + Ok(Some(StellarMessage::ErrorMsg(e))) => { + tracing::error!( + "listen_for_stellar_messages(): received error message from Stellar: {e:?}" + ); + break + }, + Ok(Some(msg)) => { + if Instant::now() >= next_time { + tracing::info!("listen_for_stellar_messages(): health check: received message from Stellar"); + next_time += health_check_interval; + } - // listen for messages from Stellar - match overlay_conn.listen() { - Ok(Some(msg)) => { - let msg_as_str = to_base64_xdr_string(&msg); - if let Err(e) = - handle_message(msg, collector_clone.clone(), &sender_clone).await - { - tracing::error!( - "start_oracle_agent(): failed to handle message: {msg_as_str}: {e:?}" - ); - } - }, - Ok(None) => {}, - // connection got lost - Err(e) => { - tracing::error!("start_oracle_agent(): encounter error in overlay: {e:?}"); - - if let Err(e) = shutdown_sender_clone.send(()) { - tracing::error!( - "start_oracle_agent(): Failed to send shutdown signal in thread: {e:?}" - ); - } - break; - }, - } + if let Err(e) = + handle_message(msg.clone(), collector.clone(), &oracle_agent.message_sender) + .await + { + let msg_as_str = msg.as_base64_encoded_string(); + tracing::error!("listen_for_stellar_messages(): failed to handle message: {msg_as_str}: {e:?}"); + } + }, + // connection got lost + Err(e) => { + tracing::error!("listen_for_stellar_messages(): encounter error in overlay: {e:?}"); + break + }, } + } - tracing::info!("start_oracle_agent(): shutting down overlay connection"); - // shutdown the overlay connection - overlay_conn.stop(); - }); - - Ok(OracleAgent { - collector, - is_public_network, - message_sender: Some(sender), - shutdown_sender, - overlay_conn_end_signal: disconnect_signal_sender, - }) -} - -impl OracleAgent { - /// This method returns the proof for a given slot or an error if the proof cannot be provided. - /// The agent will try every possible way to get the proof before returning an error. - pub async fn get_proof(&self, slot: Slot) -> Result { - let sender = self - .message_sender - .clone() - .ok_or_else(|| Error::Uninitialized("MessageSender".to_string()))?; - - let collector = self.collector.clone(); - - #[cfg(test)] - let timeout_seconds = 180; + if let Err(e) = shutdown_sender.send(()) { + tracing::error!( + "listen_for_stellar_messages(): Failed to send shutdown signal in thread: {e:?}" + ); + } - #[cfg(not(test))] - let timeout_seconds = 60; + tracing::info!("listen_for_stellar_messages(): shutting down overlay connection"); + overlay_conn.stop(); - timeout(Duration::from_secs(timeout_seconds), async move { - loop { - let stellar_sender = sender.clone(); - let collector = collector.read().await; - match collector.build_proof(slot, &stellar_sender).await { - None => { - drop(collector); - // give 10 seconds interval for every retry - sleep(Duration::from_secs(10)).await; - continue; - }, - Some(proof) => { - tracing::info!("get_proof(): Successfully build proof for slot {slot}"); - tracing::trace!(" with proof: {proof:?}"); - return Ok(proof); - }, - } - } - }) - .await - .map_err(|_| { - Error::ProofTimeout(format!("Timeout elapsed for building proof of slot {slot}")) - })? - } + Ok(()) +} +#[cfg(any(test, feature = "integration"))] +pub async fn start_oracle_agent( + cfg: StellarOverlayConfig, + vault_stellar_secret: String, + shutdown_sender: ShutdownSender, +) -> Arc { + let oracle_agent = Arc::new( + OracleAgent::new(&cfg, vault_stellar_secret, shutdown_sender.clone()) + .await + .expect("should work"), + ); - pub async fn last_slot_index(&self) -> Slot { - self.collector.read().await.last_slot_index() - } + tokio::spawn(listen_for_stellar_messages(oracle_agent.clone(), shutdown_sender)); - pub async fn remove_data(&self, slot: &Slot) { - self.collector.read().await.remove_data(slot); + while !oracle_agent.is_stellar_running().await { + sleep(Duration::from_millis(500)).await; } - /// Stops listening for new SCP messages. - pub async fn shutdown(&self) { - tracing::debug!("shutdown(): Shutting down OracleAgent..."); - if let Err(e) = self.overlay_conn_end_signal.send(()).await { - tracing::error!( - "shutdown(): Failed to send overlay conn end signal in OracleAgent: {:?}", - e - ); - } - } + tracing::info!("start_oracle_agent(): Stellar overlay network is running"); + oracle_agent } #[cfg(test)] @@ -222,20 +239,18 @@ mod tests { // We use a random secret key to avoid conflicts with other tests. let agent = start_oracle_agent( specific_stellar_relay_config(true, 0), - &get_random_secret_key(), + get_random_secret_key(), shutdown_sender, ) - .await - .expect("Failed to start agent"); + .await; - let mut latest_slot = 0; - while latest_slot == 0 { - sleep(Duration::from_secs(1)).await; - latest_slot = agent.last_slot_index().await; - } - latest_slot += 1; - // let's wait for envelopes and txset to be available for creating a proof - sleep(Duration::from_secs(5)).await; + let latest_slot = loop { + let slot = agent.collector.read().await.last_slot_index(); + if slot > 0 { + break slot + } + sleep(Duration::from_millis(500)).await; + }; let proof_result = agent.get_proof(latest_slot).await; assert!(proof_result.is_ok(), "Failed to get proof for slot: {}", latest_slot); @@ -254,13 +269,11 @@ mod tests { let shutdown_sender = ShutdownSender::new(); let agent = start_oracle_agent( specific_stellar_relay_config(is_public_network, 1), - &get_source_secret_key_from_env(is_public_network), + get_source_secret_key_from_env(is_public_network), shutdown_sender, ) - .await - .expect("Failed to start agent"); + .await; - sleep(Duration::from_secs(5)).await; // This slot should be archived on the public network let target_slot = 44041116; let proof = agent.get_proof(target_slot).await.expect("should return a proof"); @@ -295,13 +308,11 @@ mod tests { let shutdown_sender = ShutdownSender::new(); let agent = start_oracle_agent( modified_config, - &get_source_secret_key_from_env(is_public_network), + get_source_secret_key_from_env(is_public_network), shutdown_sender, ) - .await - .expect("Failed to start agent"); + .await; - sleep(Duration::from_secs(5)).await; // This slot should be archived on the public network let target_slot = 44041116; let proof = agent.get_proof(target_slot).await.expect("should return a proof"); @@ -327,16 +338,14 @@ mod tests { let shutdown = ShutdownSender::new(); let agent = start_oracle_agent( modified_config, - &get_source_secret_key_from_env(is_public_network), + get_source_secret_key_from_env(is_public_network), shutdown, ) - .await - .expect("Failed to start agent"); + .await; // This slot should be archived on the public network let target_slot = 44041116; - tracing::info!("let's sleep for 3 seconds,to get the network up and running"); - sleep(Duration::from_secs(3)).await; + let proof_result = agent.get_proof(target_slot).await; assert!(matches!(proof_result, Err(Error::ProofTimeout(_)))); diff --git a/clients/vault/src/oracle/collector/collector.rs b/clients/vault/src/oracle/collector/collector.rs index b8b57bb66..7cb1b399d 100644 --- a/clients/vault/src/oracle/collector/collector.rs +++ b/clients/vault/src/oracle/collector/collector.rs @@ -1,7 +1,7 @@ use std::{default::Default, sync::Arc}; use parking_lot::{lock_api::RwLockReadGuard, RawRwLock, RwLock}; -use stellar_relay_lib::helper::to_base64_xdr_string; +use primitives::stellar::StellarTypeToBase64String; use stellar_relay_lib::sdk::{ network::{Network, PUBLIC_NETWORK, TEST_NETWORK}, @@ -158,7 +158,7 @@ impl ScpMessageCollector { tracing::debug!("Collecting SCPEnvelopes for slot {slot}: success"); tracing::trace!( "Collecting SCPEnvelopes for slot {slot}: the scp envelope: {}", - to_base64_xdr_string(&scp_envelope.statement) + scp_envelope.statement.as_base64_encoded_string() ); envelopes_map.insert(slot, vec![scp_envelope]); } diff --git a/clients/vault/src/oracle/collector/handler.rs b/clients/vault/src/oracle/collector/handler.rs index 3eb7c3c7f..493ca7de9 100644 --- a/clients/vault/src/oracle/collector/handler.rs +++ b/clients/vault/src/oracle/collector/handler.rs @@ -3,14 +3,14 @@ use crate::oracle::{ errors::Error, types::StellarMessageSender, }; -use stellar_relay_lib::{ - helper::to_base64_xdr_string, - sdk::types::{ScpEnvelope, ScpStatementPledges, StellarMessage}, +use primitives::stellar::{ + types::{ScpEnvelope, ScpStatementPledges, StellarMessage}, + StellarTypeToBase64String, }; // Handling SCPEnvelopes impl ScpMessageCollector { - /// handles incoming ScpEnvelope. + /// Handles incoming ScpEnvelope. Return slot if it was saved /// /// # Arguments /// @@ -20,15 +20,19 @@ impl ScpMessageCollector { &mut self, env: ScpEnvelope, message_sender: &StellarMessageSender, - ) -> Result<(), Error> { + ) -> Result, Error> { let slot = env.statement.slot_index; // we are only interested with `ScpStExternalize`. Other messages are ignored. if let ScpStatementPledges::ScpStExternalize(stmt) = &env.statement.pledges { tracing::trace!( "Handling Incoming ScpEnvelopes for slot {slot}: SCPStExternalize found: {}", - to_base64_xdr_string(stmt) + stmt.as_base64_encoded_string() ); + + if self.last_slot_index() == 0 { + tracing::info!("handle_envelope(): for slot {slot}: first SCPStExternalize found"); + } // set the last scpenvenvelope with ScpStExternalize message self.set_last_slot_index(slot); @@ -48,10 +52,10 @@ impl ScpMessageCollector { // insert/add the externalized message to map. self.add_scp_envelope(slot, env); + Ok(Some(slot)) } else { self.remove_data(&slot); + Ok(None) } - - Ok(()) } } diff --git a/clients/vault/src/oracle/collector/proof_builder.rs b/clients/vault/src/oracle/collector/proof_builder.rs index 89dbeb2e0..7e3551e17 100644 --- a/clients/vault/src/oracle/collector/proof_builder.rs +++ b/clients/vault/src/oracle/collector/proof_builder.rs @@ -16,16 +16,6 @@ use crate::oracle::{ ScpArchiveStorage, ScpMessageCollector, TransactionsArchiveStorage, }; -/// Returns true if the SCP messages for a given slot are still recoverable from the overlay -/// because the slot is not too far back. -fn check_slot_still_recoverable_from_overlay(last_slot_index: Slot, slot: Slot) -> bool { - let recoverable_point = last_slot_index.saturating_sub(MAX_SLOTS_TO_REMEMBER); - log::trace!( - "check_slot_still_recoverable_from_overlay(): Proof Building for slot {slot}: Last Slot to refer to overlay: {recoverable_point}" - ); - last_slot_index != 0 && slot > recoverable_point -} - /// The Proof of Transactions that needed to be processed #[derive(Debug, Eq, PartialEq)] pub struct Proof { @@ -66,47 +56,30 @@ impl Proof { // handle missing envelopes impl ScpMessageCollector { - /// fetch envelopes not found in the collector - async fn fetch_missing_envelopes(&self, slot: Slot, sender: &StellarMessageSender) { - // If the current slot is still in the range of 'remembered' slots - if check_slot_still_recoverable_from_overlay(self.last_slot_index(), slot) { - tracing::debug!( - "fetch_missing_envelopes(): Proof Building for slot {slot}: fetching missing envelopes from Stellar Node..." - ); - self.ask_node_for_envelopes(slot, sender).await; - } else { - tracing::debug!( - "fetch_missing_envelopes(): Proof Building for slot {slot}: fetching missing envelopes from Archive Node..." + /// Returns the Proof + /// + /// # Arguments + /// + /// * `slot` - the slot where the txset is to get. + /// * `sender` - used to send messages to Stellar Node + pub async fn build_proof(&self, slot: Slot, sender: &StellarMessageSender) -> Option { + if self.last_slot_index() == 0 { + tracing::warn!( + "build_proof(): Proof Building for slot {slot}: last_slot_index is still 0, not yet ready to build proofs." ); - self.ask_archive_for_envelopes(slot).await; + return None; } - } - - /// fetches envelopes from the stellar node - async fn ask_node_for_envelopes(&self, slot: Slot, sender: &StellarMessageSender) { - // for this slot to be processed, we must put this in our watch list. - let slot = match u32::try_from(slot) { - Ok(slot) => slot, - Err(e) => { - tracing::error!( - "ask_node_for_envelopes(): Proof Building for slot {slot:} failed to convert slot value into u32 datatype: {e:?}" - ); - return; - }, - }; - if let Err(e) = sender.send(StellarMessage::GetScpState(slot)).await { - tracing::error!( - "ask_node_for_envelopes(): Proof Building for slot {slot}: failed to send `GetScpState` message: {e:?}" + let Some(envelopes) = self.get_envelopes(slot, sender).await else { + // return early if we don't have enough envelopes + tracing::warn!( + "build_proof(): Couldn't build proof for slot {slot} due to missing envelopes" ); - return; - } - tracing::info!("ask_node_for_envelopes(): Proof Building for slot {slot}: requesting to StellarNode for messages..."); - } + return None; + }; - /// fetches envelopes from the archive - async fn ask_archive_for_envelopes(&self, slot: Slot) { - tokio::spawn(self.get_envelopes_from_horizon_archive(slot)); + let tx_set = self.get_txset(slot, sender).await?; + Some(Proof { slot, envelopes, tx_set }) } /// Returns either a list of ScpEnvelopes @@ -136,11 +109,51 @@ impl ScpMessageCollector { } // forcefully retrieve envelopes - self.fetch_missing_envelopes(slot, sender).await; + self._get_envelopes(slot, sender).await; return None; } + /// fetch envelopes not found in the collector + async fn _get_envelopes(&self, slot: Slot, sender: &StellarMessageSender) { + tracing::debug!("_get_envelopes(): FOR SLOT {slot} check_slot_still_recoverable_from_overlay: LAST SLOT INDEX: {}",self.last_slot_index()); + // If the current slot is still in the range of 'remembered' slots, retrieve the envelopes + // from the overlay network + if check_slot_still_recoverable_from_overlay(self.last_slot_index(), slot) { + tracing::debug!( + "_get_envelopes(): Proof Building for slot {slot}: fetching missing envelopes from Stellar Node..." + ); + self.ask_overlay_for_envelopes(slot, sender).await; + + return; + } + + tracing::info!( + "_get_envelopes(): Proof Building for slot {slot}: fetching from Archive Node..." + ); + + self.get_envelopes_from_horizon_archive(slot).await + } + + /// fetches envelopes from the stellar node + async fn ask_overlay_for_envelopes(&self, slot: Slot, sender: &StellarMessageSender) { + // for this slot to be processed, we must put this in our watch list. + let Ok(slot) = u32::try_from(slot) else { + tracing::error!( + "ask_overlay_for_envelopes(): Proof Building for slot {slot:} failed to convert slot value into u32 datatype" + ); + return; + }; + + if let Err(e) = sender.send(StellarMessage::GetScpState(slot)).await { + tracing::error!( + "ask_overlay_for_envelopes(): Proof Building for slot {slot}: failed to send `GetScpState` message: {e:?}" + ); + return; + } + tracing::info!("ask_overlay_for_envelopes(): Proof Building for slot {slot}: requesting to StellarNode for messages..."); + } + /// Returns a TransactionSet if a txset is found; None if the slot does not have a txset /// /// # Arguments @@ -158,11 +171,12 @@ impl ScpMessageCollector { match tx_set { Some(res) => Some(res), None => { + tracing::debug!("get_txset(): FOR SLOT {slot} check_slot_still_recoverable_from_overlay: LAST SLOT INDEX: {}",self.last_slot_index()); // If the current slot is still in the range of 'remembered' slots if check_slot_still_recoverable_from_overlay(self.last_slot_index(), slot) { - self.fetch_missing_txset_from_overlay(slot, sender).await; + self.ask_overlay_for_txset(slot, sender).await; } else { - tokio::spawn(self.get_txset_from_horizon_archive(slot)); + self.get_txset_from_horizon_archive(slot).await; } tracing::warn!("get_txset(): Proof Building for slot {slot}: no txset found"); @@ -173,36 +187,16 @@ impl ScpMessageCollector { /// Send message to overlay network to fetch the missing txset _if_ we already have the txset /// hash for it. If we don't have the hash, we can't fetch it from the overlay network. - async fn fetch_missing_txset_from_overlay(&self, slot: Slot, sender: &StellarMessageSender) { + async fn ask_overlay_for_txset(&self, slot: Slot, sender: &StellarMessageSender) { // we need the txset hash to create the message. if let Some(txset_hash) = self.get_txset_hash_by_slot(&slot) { - tracing::debug!("fetch_missing_txset_from_overlay(): Proof Building for slot {slot}: Fetching TxSet from overlay..."); + tracing::debug!("ask_overlay_for_txset(): Proof Building for slot {slot}: Fetching TxSet from overlay..."); if let Err(error) = sender.send(StellarMessage::GetTxSet(txset_hash)).await { - tracing::error!("fetch_missing_txset_from_overlay(): Proof Building for slot {slot}: failed to send GetTxSet message to overlay {:?}", error); + tracing::error!("ask_overlay_for_txset(): Proof Building for slot {slot}: failed to send GetTxSet message to overlay {:?}", error); } } } - /// Returns the Proof - /// - /// # Arguments - /// - /// * `slot` - the slot where the txset is to get. - /// * `sender` - used to send messages to Stellar Node - pub async fn build_proof(&self, slot: Slot, sender: &StellarMessageSender) -> Option { - let envelopes_maybe = self.get_envelopes(slot, sender).await; - // return early if we don't have enough envelopes or the tx_set - if let Some(envelopes) = envelopes_maybe { - let tx_set = self.get_txset(slot, sender).await?; - return Some(Proof { slot, envelopes, tx_set }); - } else { - tracing::warn!( - "build_proof(): Couldn't build proof for slot {slot} due to missing envelopes" - ); - return None; - } - } - /// Insert envelopes fetched from the archive to the map /// /// # Arguments @@ -224,11 +218,11 @@ impl ScpMessageCollector { // We try to get the SCPArchive from each archive URL until we succeed or run out of // URLs for archive_url in archive_urls { - let scp_archive_storage = ScpArchiveStorage(archive_url); + let scp_archive_storage = ScpArchiveStorage(archive_url.clone()); let scp_archive_result = scp_archive_storage.get_archive(slot).await; if let Err(e) = scp_archive_result { tracing::error!( - "get_envelopes_from_horizon_archive(): Could not get SCPArchive for slot {slot} from Horizon Archive: {e:?}" + "get_envelopes_from_horizon_archive(): Could not get SCPArchive for slot {slot} from Horizon Archive {archive_url}: {e:?}" ); continue; } @@ -243,61 +237,60 @@ impl ScpMessageCollector { } }); - if let Some(i) = value { - if let ScpHistoryEntry::V0(scp_entry_v0) = i { - let slot_scp_envelopes = scp_entry_v0.clone().ledger_messages.messages; - let vec_scp = slot_scp_envelopes.get_vec().clone(); - - // Filter out any envelopes that are not externalize or confirm statements - let relevant_envelopes = vec_scp - .into_iter() - .filter(|scp| match scp.statement.pledges { - ScpStatementPledges::ScpStExternalize(_) | - ScpStatementPledges::ScpStConfirm(_) => true, - _ => false, - }) - .collect::>(); - - let externalized_envelopes_count = relevant_envelopes - .iter() - .filter(|scp| match scp.statement.pledges { - ScpStatementPledges::ScpStExternalize(_) => true, - _ => false, - }) - .count(); - - // Ensure that at least one envelope is externalized - if externalized_envelopes_count == 0 { - tracing::error!( + let Some(ScpHistoryEntry::V0(scp_entry_v0)) = value else { + tracing::warn!("get_envelopes_from_horizon_archive(): Could not get ScpHistory entry from archive {archive_url} for slot {slot}"); + continue; + }; + + let slot_scp_envelopes = scp_entry_v0.clone().ledger_messages.messages; + let vec_scp = slot_scp_envelopes.get_vec().clone(); + + // Filter out any envelopes that are not externalize or confirm statements + let relevant_envelopes = vec_scp + .into_iter() + .filter(|scp| match scp.statement.pledges { + ScpStatementPledges::ScpStExternalize(_) | + ScpStatementPledges::ScpStConfirm(_) => true, + _ => false, + }) + .collect::>(); + + let externalized_envelopes_count = relevant_envelopes + .iter() + .filter(|scp| match scp.statement.pledges { + ScpStatementPledges::ScpStExternalize(_) => true, + _ => false, + }) + .count(); + + // Ensure that at least one envelope is externalized + if externalized_envelopes_count == 0 { + tracing::error!( "get_envelopes_from_horizon_archive(): The contained archive entry fetched from {} for slot {slot} is invalid because it does not contain any externalized envelopes.", scp_archive_storage.0 ); - // remove the file since it's invalid. - scp_archive_storage.remove_file(slot); - continue; - } - - let mut envelopes_map = envelopes_map_arc.write(); - let mut from_archive_map = env_from_archive_map.write(); - - if envelopes_map.get(&slot).is_none() { - tracing::info!( - "get_envelopes_from_horizon_archive(): Adding {} archived SCP envelopes for slot {slot} to envelopes map. {} are externalized", - relevant_envelopes.len(), - externalized_envelopes_count - ); - envelopes_map.insert(slot, relevant_envelopes); - // indicates that the data was taken from the archive - from_archive_map.insert(slot, ()); - - // remove the archive file after successfully retrieving envelopes - scp_archive_storage.remove_file(slot); - break; - } - } - } else { - tracing::warn!("get_envelopes_from_horizon_archive(): Could not get ScpHistory entry from archive for slot {slot}"); + // remove the file since it's invalid. + scp_archive_storage.remove_file(slot); + continue; } + + let mut envelopes_map = envelopes_map_arc.write(); + let mut from_archive_map = env_from_archive_map.write(); + + tracing::info!( + "get_envelopes_from_horizon_archive(): Adding {} archived SCP envelopes for slot {slot} to envelopes map. {} are externalized", + relevant_envelopes.len(), + externalized_envelopes_count + ); + + envelopes_map.insert(slot, relevant_envelopes); + + // indicates that the data was taken from the archive + from_archive_map.insert(slot, ()); + + // remove the archive file after successfully retrieving envelopes + scp_archive_storage.remove_file(slot); + break; } } } @@ -363,13 +356,44 @@ impl ScpMessageCollector { } } +/// Returns true if the SCP messages for a given slot are still recoverable from the overlay +/// because the slot is not too far back. +fn check_slot_still_recoverable_from_overlay(last_slot_index: Slot, slot: Slot) -> bool { + let recoverable_point = last_slot_index.saturating_sub(MAX_SLOTS_TO_REMEMBER); + log::trace!( + "check_slot_still_recoverable_from_overlay(): Proof Building for slot {slot}: Last Slot to refer to overlay: {recoverable_point}" + ); + last_slot_index != 0 && slot > recoverable_point +} + #[cfg(test)] mod test { use crate::oracle::{ - collector::proof_builder::check_slot_still_recoverable_from_overlay, + collector::{ + proof_builder::check_slot_still_recoverable_from_overlay, ScpMessageCollector, + }, types::constants::MAX_SLOTS_TO_REMEMBER, }; + use stellar_relay_lib::sdk::types::StellarMessage; + use tokio::sync::mpsc; + + fn collector(is_mainnet: bool) -> ScpMessageCollector { + let archives = if is_mainnet { + vec![ + "http://history.stellar.org/prd/core-live/core_live_001".to_string(), + "http://history.stellar.org/prd/core-live/core_live_002".to_string(), + "http://history.stellar.org/prd/core-live/core_live_003".to_string(), + ] + } else { + vec![ + "http://history.stellar.org/prd/core-testnet/core_testnet_001".to_string(), + "http://history.stellar.org/prd/core-testnet/core_testnet_002".to_string(), + "http://history.stellar.org/prd/core-testnet/core_testnet_003".to_string(), + ] + }; + ScpMessageCollector::new(is_mainnet, archives) + } #[test] fn test_check_slot_position() { let last_slot = 50_000; @@ -387,4 +411,33 @@ mod test { last_slot - MAX_SLOTS_TO_REMEMBER + 1, )); } + + #[tokio::test] + async fn test_ask_overlay_for_envelopes() { + let (sender, mut receiver) = mpsc::channel::(1024); + let collector = collector(false); + + let expected_slot = 50; + collector.ask_overlay_for_envelopes(expected_slot, &sender).await; + + match receiver.recv().await.expect("should receive message") { + StellarMessage::GetScpState(actual_slot) => { + let actual_slot = u64::from(actual_slot); + assert_eq!(actual_slot, expected_slot); + }, + msg => panic!("Expected GetScpState message, got {:?}", msg), + } + } + + #[tokio::test] + async fn test_get_envelopes_from_horizon_archive() { + let collector = collector(false); + assert_eq!(collector.envelopes_map_len(), 0); + + let expected_slot = 500; + let fut = collector.get_envelopes_from_horizon_archive(expected_slot); + fut.await; + + assert!(collector.envelopes_map_len() > 0); + } } diff --git a/clients/vault/src/oracle/errors.rs b/clients/vault/src/oracle/errors.rs index b4c0e2643..3ec1ea32b 100644 --- a/clients/vault/src/oracle/errors.rs +++ b/clients/vault/src/oracle/errors.rs @@ -4,39 +4,39 @@ use tokio::sync::{mpsc, oneshot}; use stellar_relay_lib::sdk::StellarSdkError; -#[derive(Debug, err_derive::Error)] +#[derive(Debug, thiserror::Error)] pub enum Error { - #[error(display = "{:?}", _0)] + #[error("Stellar SDK Error: {0:?}")] StellarSdkError(StellarSdkError), - #[error(display = "{:?}", _0)] - TryFromSliceError(TryFromSliceError), + #[error("TryFromSliceError: {0}")] + TryFromSliceError(#[from] TryFromSliceError), - #[error(display = "{:?}", _0)] - SerdeError(bincode::Error), + #[error("Serde Error: {0}")] + SerdeError(#[from] bincode::Error), - #[error(display = "{:?}", _0)] - StdIoError(std::io::Error), + #[error("StdIoError: {0}")] + StdIoError(#[from] std::io::Error), - #[error(display = "{:?}", _0)] + #[error("Other: {0}")] Other(String), - #[error(display = "{:?}", _0)] - ConnError(stellar_relay_lib::Error), + #[error("Stellar Relay Error: {0}")] + ConnError(#[from] stellar_relay_lib::Error), - #[error(display = "{:?}", _0)] - WalletError(wallet::error::Error), + #[error("Wallet Error: {0}")] + WalletError(#[from] wallet::error::Error), - #[error(display = "{:?}", _0)] + #[error("Proof Timeout: {0}")] ProofTimeout(String), - #[error(display = "{} is not initialized", _0)] + #[error("Unititialized: {0}")] Uninitialized(String), - #[error(display = "{}", _0)] + #[error("Archive Error: {0}")] ArchiveError(String), - #[error(display = "{}", _0)] + #[error("ArchiveResponseError: {0}")] ArchiveResponseError(String), } @@ -46,30 +46,6 @@ impl From for Error { } } -impl From for Error { - fn from(e: std::io::Error) -> Self { - Error::StdIoError(e) - } -} - -impl From for Error { - fn from(e: bincode::Error) -> Self { - Error::SerdeError(e) - } -} - -impl From for Error { - fn from(e: TryFromSliceError) -> Self { - Error::TryFromSliceError(e) - } -} - -impl From for Error { - fn from(e: stellar_relay_lib::Error) -> Self { - Error::ConnError(e) - } -} - impl From> for Error { fn from(e: mpsc::error::SendError) -> Self { Error::ConnError(stellar_relay_lib::Error::SendFailed(e.to_string())) diff --git a/clients/vault/src/redeem.rs b/clients/vault/src/redeem.rs index c248b8b6e..9359b76d2 100644 --- a/clients/vault/src/redeem.rs +++ b/clients/vault/src/redeem.rs @@ -33,8 +33,8 @@ pub async fn listen_for_redeem_requests( // we will need to capture the arguments by value rather than by reference, so clone // these: let parachain_rpc = parachain_rpc.clone(); + let oracle_agent_clone = oracle_agent.clone(); // Spawn a new task so that we handle these events concurrently - let oracle_agent = oracle_agent.clone(); spawn_cancelable(shutdown_tx.subscribe(), async move { tracing::info!( "Received new RequestRedeemEvent {:?}. Trying to execute...", @@ -46,7 +46,7 @@ pub async fn listen_for_redeem_requests( parachain_rpc.get_redeem_request(event.redeem_id).await?, payment_margin, )?; - request.pay_and_execute(parachain_rpc, vault, oracle_agent).await + request.pay_and_execute(parachain_rpc, vault, oracle_agent_clone).await } .await; diff --git a/clients/vault/src/replace.rs b/clients/vault/src/replace.rs index 5c96e8914..524a48240 100644 --- a/clients/vault/src/replace.rs +++ b/clients/vault/src/replace.rs @@ -96,6 +96,8 @@ pub async fn listen_for_replace_requests( event_channel: Sender, accept_replace_requests: bool, ) -> Result<(), ServiceError> { + tracing::debug!("listen_for_replace_requests(): started"); + let parachain_rpc = ¶chain_rpc; let vault_id_manager = &vault_id_manager; let event_channel = &event_channel; diff --git a/clients/vault/src/requests/execution.rs b/clients/vault/src/requests/execution.rs index 3273e836d..2e5c988d5 100644 --- a/clients/vault/src/requests/execution.rs +++ b/clients/vault/src/requests/execution.rs @@ -9,7 +9,7 @@ use crate::{ structs::Request, PayAndExecute, }, - VaultIdManager, YIELD_RATE, + ArcRwLock, VaultIdManager, YIELD_RATE, }; use async_trait::async_trait; use governor::{ @@ -41,7 +41,7 @@ const MAX_EXECUTION_RETRIES: u32 = 3; /// * `rate_limiter` - a rate limiter async fn spawn_tasks_to_execute_open_requests_async( requests: &mut HashMap, - wallet: Arc>, + wallet: ArcRwLock, shutdown_tx: ShutdownSender, parachain_rpc: &SpacewalkParachain, oracle_agent: Arc, @@ -280,7 +280,9 @@ where /// * `vault_id_manager` - contains all the vault ids and their data. /// * `wallet` - the vault's wallet; used to retrieve a list of stellar transactions /// * `oracle_agent` - the agent used to get the proofs -/// * `payment_margin` - minimum time to the the redeem execution deadline to make the stellar +/// * `payment_margin` - minimum time to the redeem execution deadline to make the stellar +/// * `precheck_signal` - a signal sender to notify the caller that this process is done +/// and pending tasks can be started /// payment. #[allow(clippy::too_many_arguments)] pub async fn execute_open_requests( @@ -290,7 +292,9 @@ pub async fn execute_open_requests( wallet: Arc>, oracle_agent: Arc, payment_margin: Duration, + precheck_signal: tokio::sync::broadcast::Sender<()>, ) -> Result<(), ServiceError> { + tracing::info!("execute_open_requests(): started"); let parachain_rpc_ref = ¶chain_rpc; // get all redeem and replace requests @@ -303,6 +307,8 @@ pub async fn execute_open_requests( let rate_limiter = Arc::new(RateLimiter::direct(YIELD_RATE)); + tracing::info!("execute_open_requests(): Oracle agent is ready."); + // Check if the open requests have a corresponding payment on Stellar // and are just waiting to be executed on the parachain spawn_tasks_to_execute_open_requests_async( @@ -326,5 +332,9 @@ pub async fn execute_open_requests( rate_limiter, ); + if let Err(e) = precheck_signal.send(()) { + tracing::error!("execute_open_requests(): Failed to send signal: {e:?}"); + } + Ok(()) } diff --git a/clients/vault/src/system.rs b/clients/vault/src/system.rs index c654d54ad..28f438826 100644 --- a/clients/vault/src/system.rs +++ b/clients/vault/src/system.rs @@ -9,12 +9,18 @@ use clap::Parser; use futures::{ channel::{ mpsc, - mpsc::{Receiver, Sender}, + mpsc::{Receiver as mpscReceiver, Sender as mpscSender}, }, future::{join, join_all}, SinkExt, TryFutureExt, }; -use tokio::{sync::RwLock, time::sleep}; +use tokio::{ + sync::{ + broadcast::{self, Receiver as bcReceiver, Sender as bcSender}, + RwLock, + }, + time::sleep, +}; use runtime::{ cli::parse_duration_minutes, AccountId, BlockNumber, CollateralBalancesPallet, CurrencyId, @@ -32,12 +38,12 @@ use crate::{ issue, issue::IssueFilter, metrics::{monitor_bridge_metrics, poll_metrics, publish_tokio_metrics, PerCurrencyMetrics}, - oracle::OracleAgent, + oracle::{listen_for_stellar_messages, OracleAgent}, redeem::listen_for_redeem_requests, replace::{listen_for_accept_replace, listen_for_execute_replace, listen_for_replace_requests}, requests::execution::execute_open_requests, service::{CancellationScheduler, IssueCanceller}, - ArcRwLock, Event, CHAIN_HEIGHT_POLLING_INTERVAL, + tokio_spawn, ArcRwLock, Event, CHAIN_HEIGHT_POLLING_INTERVAL, }; pub const VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -45,7 +51,7 @@ pub const AUTHORS: &str = env!("CARGO_PKG_AUTHORS"); pub const NAME: &str = env!("CARGO_PKG_NAME"); pub const ABOUT: &str = env!("CARGO_PKG_DESCRIPTION"); -const RESTART_INTERVAL: Duration = Duration::from_secs(10800); // restart every 3 hours +const RESTART_INTERVAL: Duration = Duration::from_secs(7200); // restart every 2 hours #[derive(Clone, Debug)] pub struct VaultData { @@ -248,9 +254,10 @@ pub struct VaultServiceConfig { async fn active_block_listener( parachain_rpc: SpacewalkParachain, - issue_tx: Sender, - replace_tx: Sender, + issue_tx: mpscSender, + replace_tx: mpscSender, ) -> Result<(), ServiceError> { + tracing::info!("active_block_listener(): started"); let issue_tx = &issue_tx; let replace_tx = &replace_tx; parachain_rpc @@ -262,6 +269,8 @@ async fn active_block_listener( |err| tracing::error!("Error (UpdateActiveBlockEvent): {}", err.to_string()), ) .await?; + + tracing::info!("active_block_listener(): ended"); Ok(()) } @@ -293,10 +302,10 @@ impl Service for VaultService { async fn start(&mut self) -> Result<(), ServiceError> { let result = self.run_service().await; - self.try_shutdown_agent().await; self.try_shutdown_wallet().await; if let Err(error) = result { + tracing::error!("start(): Failed to run service: {error:?}"); let _ = self.shutdown.send(()); Err(error) } else { @@ -308,6 +317,8 @@ impl Service for VaultService { async fn run_and_monitor_tasks( shutdown_tx: ShutdownSender, items: Vec<(&str, ServiceTask)>, + // Sends a signal to start those tasks requiring a precheck. + mut precheck_signals: Vec>, ) -> Result<(), ServiceError> { let (metrics_iterators, tasks): (HashMap, Vec<_>) = items .into_iter() @@ -316,27 +327,39 @@ async fn run_and_monitor_tasks( let metrics_iterator = monitor.intervals(); let task = match task { ServiceTask::Optional(true, t) | ServiceTask::Essential(t) => - Some(wait_or_shutdown(shutdown_tx.clone(), t)), + Some(wait_or_shutdown(shutdown_tx.clone(), t, None)), + ServiceTask::PrecheckRequired(t) => + Some(wait_or_shutdown(shutdown_tx.clone(), t, precheck_signals.pop())), _ => None, }?; let task = monitor.instrument(task); - let task = tokio::spawn(task); + + let task = tokio_spawn(name, task); Some(((name.to_string(), metrics_iterator), task)) }) .unzip(); - let tokio_metrics = tokio::spawn(wait_or_shutdown( - shutdown_tx.clone(), - publish_tokio_metrics(metrics_iterators), - )); + let tokio_metrics = tokio_spawn( + "tokio metrics publisher", + wait_or_shutdown(shutdown_tx.clone(), publish_tokio_metrics(metrics_iterators), None), + ); + + tracing::info!("run_and_monitor_tasks(): running all tasks..."); match join(tokio_metrics, join_all(tasks)).await { (Ok(Err(err)), _) => Err(err), - (_, results) => results - .into_iter() - .find(|res| matches!(res, Ok(Err(_)))) - .and_then(|res| res.ok()) - .unwrap_or(Ok(())), + (_, results) => { + for result in results { + match result { + Ok(_) => {}, + Err(e) => { + tracing::error!("run_and_monitor_tasks(): One of the tasks failed: {e:?}"); + return Err(ServiceError::TokioError(e)); + }, + } + } + Ok(()) + }, } } @@ -345,6 +368,19 @@ type Task = Pin>> + Send enum ServiceTask { Optional(bool, Task), Essential(Task), + // Runs a task after a prequisite check has passed. + PrecheckRequired(Task), +} + +/// returns a single-producer multi-consumer channel to send a signal to start a task +fn precheck_signals(num_of_signals_required: u8) -> (bcSender<()>, Vec>) { + let (sender, receiver) = broadcast::channel(1); + let mut subscribers = vec![receiver]; + while subscribers.len() < usize::from(num_of_signals_required) { + subscribers.push(sender.subscribe()); + } + + (sender, subscribers) } fn maybe_run(should_run: bool, task: F) -> ServiceTask @@ -363,6 +399,14 @@ where ServiceTask::Essential(Box::pin(task.map_err(|x| x.into()))) } +fn run_with_precheck(task: F) -> ServiceTask +where + F: Future> + Send + 'static, + E: Into>, +{ + ServiceTask::PrecheckRequired(Box::pin(task.map_err(|x| x.into()))) +} + type RegistrationData = Vec<(CurrencyId, CurrencyId, Option)>; // dedicated for running the service impl VaultService { @@ -399,13 +443,19 @@ impl VaultService { // Subscribe to an event (any event will do) so that a period of inactivity does not close // the jsonrpsee connection let err_provider = self.spacewalk_parachain.clone(); - let err_listener = wait_or_shutdown(self.shutdown.clone(), async move { - err_provider - .on_event_error(|e| tracing::debug!("Client Service: Received error event: {}", e)) - .await?; - Ok::<_, Error>(()) - }); - tokio::task::spawn(err_listener); + let err_listener = wait_or_shutdown( + self.shutdown.clone(), + async move { + err_provider + .on_event_error(|e| { + tracing::debug!("Client Service: Received error event: {}", e) + }) + .await?; + Ok::<_, Error>(()) + }, + None, + ); + tokio_spawn("error listener", err_listener); Ok(()) } @@ -415,54 +465,28 @@ impl VaultService { is_public_network: bool, shutdown_sender: ShutdownSender, ) -> Result, ServiceError> { - let cfg_path = &self.config.stellar_overlay_config_filepath; - let stellar_overlay_cfg = - StellarOverlayConfig::try_from_path(cfg_path).map_err(Error::StellarRelayError)?; + let stellar_overlay_cfg = self.stellar_overlay_cfg()?; // check if both the config file and the wallet are the same. if is_public_network != stellar_overlay_cfg.is_public_network() { return Err(ServiceError::IncompatibleNetwork); } - let oracle_agent = crate::oracle::start_oracle_agent( - stellar_overlay_cfg, - &self.secret_key, - shutdown_sender, - ) - .await - .expect("Failed to start oracle agent"); + let oracle_agent = + OracleAgent::new(&stellar_overlay_cfg, self.secret_key(), shutdown_sender) + .await + .map_err(|e| { + tracing::error!("Failed to create OracleAgent: {e:?}"); + ServiceError::OracleError(Error::OracleError(e)) + })?; Ok(Arc::new(oracle_agent)) } - fn execute_open_requests(&self, oracle_agent: Arc) { - let open_request_executor = execute_open_requests( - self.shutdown.clone(), - self.spacewalk_parachain.clone(), - self.vault_id_manager.clone(), - self.stellar_wallet.clone(), - oracle_agent, - self.config.payment_margin_minutes, - ); - - let shutdown_clone = self.shutdown.clone(); - service::spawn_cancelable(self.shutdown.subscribe(), async move { - match open_request_executor.await { - Ok(_) => tracing::info!("Done processing open requests"), - Err(e) => { - tracing::error!("Failed to process open requests: {}", e); - if let Err(err) = shutdown_clone.send(()) { - tracing::error!("Failed to send shutdown signal: {}", err); - } - }, - } - }); - } - fn create_issue_tasks( &self, - issue_event_tx: Sender, - issue_event_rx: Receiver, + issue_event_tx: mpscSender, + issue_event_rx: mpscReceiver, startup_height: BlockNumber, account_id: AccountId, vault_public_key: PublicKey, @@ -474,7 +498,7 @@ impl VaultService { vec![ ( "Issue Request Listener", - run(issue::listen_for_issue_requests( + run_with_precheck(issue::listen_for_issue_requests( self.spacewalk_parachain.clone(), vault_public_key, issue_event_tx, @@ -484,7 +508,7 @@ impl VaultService { ), ( "Issue Cancel Listener", - run(issue::listen_for_issue_cancels( + run_with_precheck(issue::listen_for_issue_cancels( self.spacewalk_parachain.clone(), issue_map.clone(), memos_to_issue_ids.clone(), @@ -492,7 +516,7 @@ impl VaultService { ), ( "Issue Execute Listener", - run(issue::listen_for_executed_issues( + run_with_precheck(issue::listen_for_executed_issues( self.spacewalk_parachain.clone(), issue_map.clone(), memos_to_issue_ids.clone(), @@ -513,20 +537,22 @@ impl VaultService { ), ( "Issue Cancel Scheduler", - run(CancellationScheduler::new( - self.spacewalk_parachain.clone(), - startup_height, - account_id, - ) - .handle_cancellation::(issue_event_rx)), + run_with_precheck( + CancellationScheduler::new( + self.spacewalk_parachain.clone(), + startup_height, + account_id, + ) + .handle_cancellation::(issue_event_rx), + ), ), ] } fn create_replace_tasks( &self, - replace_event_tx: Sender, - replace_event_rx: Receiver, + replace_event_tx: mpscSender, + replace_event_rx: mpscReceiver, startup_height: BlockNumber, account_id: AccountId, oracle_agent: Arc, @@ -534,7 +560,7 @@ impl VaultService { vec![ ( "Request Replace Listener", - run(listen_for_replace_requests( + run_with_precheck(listen_for_replace_requests( self.spacewalk_parachain.clone(), self.vault_id_manager.clone(), replace_event_tx.clone(), @@ -543,7 +569,7 @@ impl VaultService { ), ( "Accept Replace Listener", - run(listen_for_accept_replace( + run_with_precheck(listen_for_accept_replace( self.shutdown.clone(), self.spacewalk_parachain.clone(), self.vault_id_manager.clone(), @@ -557,12 +583,14 @@ impl VaultService { ), ( "Replace Cancellation Scheduler", - run(CancellationScheduler::new( - self.spacewalk_parachain.clone(), - startup_height, - account_id, - ) - .handle_cancellation::(replace_event_rx)), + run_with_precheck( + CancellationScheduler::new( + self.spacewalk_parachain.clone(), + startup_height, + account_id, + ) + .handle_cancellation::(replace_event_rx), + ), ), ] } @@ -592,8 +620,8 @@ impl VaultService { fn create_initial_tasks( &self, is_public_network: bool, - issue_event_tx: Sender, - replace_event_tx: Sender, + issue_event_tx: mpscSender, + replace_event_tx: mpscSender, vault_public_key: PublicKey, issue_map: ArcRwLock, ledger_env_map: ArcRwLock, @@ -609,6 +637,7 @@ impl VaultService { ( "Restart Timer", run(async move { + tracing::info!("Periodic restart in {RESTART_INTERVAL:?} minutes."); tokio::time::sleep(RESTART_INTERVAL).await; tracing::info!("Initiating periodic restart..."); Err(ServiceError::ClientShutdown) @@ -627,7 +656,7 @@ impl VaultService { ), ( "Parachain Block Listener", - run(active_block_listener( + run_with_precheck(active_block_listener( self.spacewalk_parachain.clone(), issue_event_tx, replace_event_tx, @@ -701,6 +730,11 @@ impl VaultService { Ok(tasks) } + + fn stellar_overlay_cfg(&self) -> Result { + let cfg_path = &self.config.stellar_overlay_config_filepath; + StellarOverlayConfig::try_from_path(cfg_path).map_err(Error::StellarRelayError) + } } impl VaultService { @@ -735,6 +769,10 @@ impl VaultService { }) } + fn secret_key(&self) -> String { + self.secret_key.clone() + } + fn get_vault_id( &self, collateral_currency: CurrencyId, @@ -787,8 +825,6 @@ impl VaultService { self.create_oracle_agent(is_public_network, self.shutdown.clone()).await?; self.agent = Some(oracle_agent.clone()); - self.execute_open_requests(oracle_agent.clone()); - // issue handling // this vec is passed to the stellar wallet to filter out transactions that are not relevant // this has to be modified every time the issue set changes @@ -800,10 +836,30 @@ impl VaultService { issue::initialize_issue_set(&self.spacewalk_parachain, &issue_map, &memos_to_issue_ids) .await?; + let (precheck_sender, precheck_receivers) = precheck_signals(8); + tokio_spawn( + "Execute Open Requests", + execute_open_requests( + self.shutdown.clone(), + self.spacewalk_parachain.clone(), + self.vault_id_manager.clone(), + self.stellar_wallet.clone(), + oracle_agent.clone(), + self.config.payment_margin_minutes, + precheck_sender, + ), + ); + let ledger_env_map: ArcRwLock = Arc::new(RwLock::new(HashMap::new())); tracing::info!("Starting all services..."); - let tasks = self.create_tasks( + + let mut tasks = vec![( + "Stellar Messages Listener", + run(listen_for_stellar_messages(oracle_agent.clone(), self.shutdown.clone())), + )]; + + let mut _tasks = self.create_tasks( startup_height, account_id, is_public_network, @@ -813,8 +869,9 @@ impl VaultService { ledger_env_map, memos_to_issue_ids, )?; + tasks.append(&mut _tasks); - run_and_monitor_tasks(self.shutdown.clone(), tasks).await + run_and_monitor_tasks(self.shutdown.clone(), tasks, precheck_receivers).await } async fn register_public_key_if_not_present(&mut self) -> Result<(), Error> { @@ -946,16 +1003,4 @@ impl VaultService { wallet.try_stop_periodic_resubmission_of_transactions().await; drop(wallet); } - - async fn try_shutdown_agent(&mut self) { - let opt_agent = self.agent.clone(); - self.agent = None; - - if let Some(arc_agent) = opt_agent { - tracing::info!("try_shutdown_agent(): shutting down agent"); - arc_agent.shutdown().await; - } else { - tracing::debug!("try_shutdown_agent(): no agent found"); - } - } } diff --git a/clients/vault/tests/helper/mod.rs b/clients/vault/tests/helper/mod.rs index efbd1c645..0537eab03 100644 --- a/clients/vault/tests/helper/mod.rs +++ b/clients/vault/tests/helper/mod.rs @@ -143,14 +143,7 @@ where let vault_stellar_secret = get_source_secret_key_from_env(is_public_network); let shutdown_tx = ShutdownSender::new(); - let oracle_agent = - start_oracle_agent(stellar_config.clone(), &vault_stellar_secret, shutdown_tx) - .await - .expect("failed to start agent"); - let oracle_agent = Arc::new(oracle_agent); - - // continue ONLY if the oracle agent has received the first slot - while oracle_agent.last_slot_index().await == 0 {} + let oracle_agent = start_oracle_agent(stellar_config, vault_stellar_secret, shutdown_tx).await; execute(client, vault_wallet, user_wallet, oracle_agent, vault_id, vault_provider).await } diff --git a/clients/vault/tests/vault_integration_tests.rs b/clients/vault/tests/vault_integration_tests.rs index 6e4a615ca..c9b2d4052 100644 --- a/clients/vault/tests/vault_integration_tests.rs +++ b/clients/vault/tests/vault_integration_tests.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, convert::TryInto, sync::Arc, time::Duration}; use frame_support::assert_ok; use futures::{ channel::mpsc, - future::{join, join3, join4}, + future::{join, join3, join4, join5}, FutureExt, SinkExt, }; use serial_test::serial; @@ -24,8 +24,6 @@ mod helper; use helper::*; use primitives::DecimalsLookup; use subxt::utils::AccountId32 as AccountId; -use vault::oracle::{random_stellar_relay_config, start_oracle_agent}; -use wallet::keys::get_source_secret_key_from_env; #[tokio::test(flavor = "multi_thread")] #[serial] @@ -605,7 +603,6 @@ async fn test_issue_cancel_succeeds() { } #[tokio::test(flavor = "multi_thread")] -#[ntest::timeout(1_200_000)] // timeout at 20 minutes #[serial] async fn test_issue_execution_succeeds_from_archive_on_mainnet() { let is_public_network = true; @@ -613,7 +610,6 @@ async fn test_issue_execution_succeeds_from_archive_on_mainnet() { } #[tokio::test(flavor = "multi_thread")] -#[ntest::timeout(1_200_000)] // timeout at 20 minutes #[serial] async fn test_issue_execution_succeeds_from_archive_on_testnet() { let is_public_network = false; @@ -623,7 +619,7 @@ async fn test_issue_execution_succeeds_from_archive_on_testnet() { async fn test_issue_execution_succeeds_from_archive_on_network(is_public_network: bool) { test_with_vault( is_public_network, - |client, _vault_wallet, user_wallet, _oracle_agent, vault_id, vault_provider| async move { + |client, _vault_wallet, user_wallet, oracle_agent, vault_id, vault_provider| async move { let user_provider = setup_provider(client.clone(), AccountKeyring::Dave).await; let public_key = default_vault_stellar_address_as_binary(is_public_network); @@ -675,17 +671,6 @@ async fn test_issue_execution_succeeds_from_archive_on_network(is_public_network // We sleep here in order to wait for the fallback to the archive to be necessary sleep(Duration::from_secs(5 * 60)).await; - let shutdown_tx = ShutdownSender::new(); - let stellar_config = random_stellar_relay_config(is_public_network); - - let vault_stellar_secret = get_source_secret_key_from_env(is_public_network); - // Create new oracle agent with the same configuration as the previous one - let oracle_agent = - start_oracle_agent(stellar_config.clone(), &vault_stellar_secret, shutdown_tx) - .await - .expect("failed to start agent"); - let oracle_agent = Arc::new(oracle_agent); - // Loop pending proofs until it is ready let proof = oracle_agent.get_proof(slot).await.expect("Proof should be available"); let tx_envelope_xdr_encoded = transaction_response.envelope_xdr; @@ -1163,8 +1148,10 @@ async fn test_execute_open_requests_succeeds() { // add it to the set sleep(Duration::from_secs(5)).await; + let (precheck_signal, mut receiver) = tokio::sync::broadcast::channel(1); let shutdown_tx = ShutdownSender::new(); - join4( + + join5( vault::service::execute_open_requests( shutdown_tx.clone(), vault_provider, @@ -1172,8 +1159,12 @@ async fn test_execute_open_requests_succeeds() { vault_wallet.clone(), oracle_agent.clone(), Duration::from_secs(0), + precheck_signal, ) .map(Result::unwrap), + async move { + assert_ok!(receiver.recv().await); + }, // Redeem 0 should be executed without creating an extra payment since we already // sent one just before assert_execute_redeem_event(TIMEOUT * 3, user_provider.clone(), redeem_ids[0]), diff --git a/clients/wallet/src/stellar_wallet.rs b/clients/wallet/src/stellar_wallet.rs index b0c2ab60a..92d95e361 100644 --- a/clients/wallet/src/stellar_wallet.rs +++ b/clients/wallet/src/stellar_wallet.rs @@ -65,6 +65,9 @@ impl StellarWallet { pub(crate) const DEFAULT_MAX_RETRY_ATTEMPTS_BEFORE_FALLBACK: u8 = 3; pub(crate) const DEFAULT_MAX_BACKOFF_DELAY_IN_SECS: u16 = 60; + + /// We choose a default fee that is quite high to ensure that the transaction is processed + pub(crate) const DEFAULT_STROOP_FEE_PER_OPERATION: u32 = 100_000; } impl StellarWallet { @@ -377,9 +380,16 @@ impl StellarWallet { let _ = self.transaction_submission_lock.lock().await; let stroop_fee_per_operation = - get_fee_stat_for(self.is_public_network, FeeAttribute::default()) - .await - .map_err(|e| Error::FailedToGetFee(e))?; + match get_fee_stat_for(self.is_public_network, FeeAttribute::default()).await { + Ok(fee) => fee, + Err(e) => { + tracing::error!("Failed to get fee stat for Stellar network: {e:?}"); + // Return default fee for the operation. + let fallback_fee = StellarWallet::DEFAULT_STROOP_FEE_PER_OPERATION; + tracing::info!("Using the default stroop fee for operation: {fallback_fee:?}"); + fallback_fee + }, + }; let account = self.client.get_account(self.public_key(), self.is_public_network).await?; let next_sequence_number = account.sequence + 1;