Skip to content

Commit

Permalink
Solve mandatory flow control (#550)
Browse files Browse the repository at this point in the history
* first iteration to solve mandatory flow control

* cleanup

* fix auth flag

* remove extra check_to_send_more

* auth response could work with flow control w/o bytes also

* remove unused import for test

* re-introduce interfaces, remove test

* run cargo fmt

* cargo fmt with nightly

* Revert "cargo fmt with nightly"

This reverts commit 08ab6fc.

* run cargo fmt with nightly 2024-02-09

* refactor auth flag, fix incorrect data len calculation

* run cargo fmt

* rename send_more method of flow controller

* readability changes
  • Loading branch information
gianfra-t authored Sep 3, 2024
1 parent d74009b commit d507d6b
Show file tree
Hide file tree
Showing 40 changed files with 365 additions and 362 deletions.
5 changes: 2 additions & 3 deletions clients/runtime/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ impl ProviderUserOpts {
// load parachain credentials
let (pair, user_name) = match (self.keyfile.as_ref(), self.keyname.as_ref(), &self.keyring)
{
(Some(file_path), Some(keyname), None) => {
(get_credentials_from_file(file_path, keyname)?, keyname.to_string())
},
(Some(file_path), Some(keyname), None) =>
(get_credentials_from_file(file_path, keyname)?, keyname.to_string()),
(None, None, Some(keyring)) => {
let pair = Pair::from_string(keyring.to_seed().as_str(), None)
.map_err(|_| Error::KeyringAccountParsingError)?;
Expand Down
10 changes: 4 additions & 6 deletions clients/runtime/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl Error {

pub fn is_rpc_disconnect_error(&self) -> bool {
match self {
Error::SubxtRuntimeError(SubxtError::Rpc(RpcError::ClientError(e))) => {
Error::SubxtRuntimeError(SubxtError::Rpc(RpcError::ClientError(e))) =>
match e.downcast_ref::<JsonRpseeError>() {
Some(e) => matches!(e, JsonRpseeError::RestartNeeded(_)),
None => {
Expand All @@ -169,8 +169,7 @@ impl Error {
);
false
},
}
},
},
Error::SubxtRuntimeError(SubxtError::Rpc(RpcError::SubscriptionDropped)) => true,
_ => false,
}
Expand Down Expand Up @@ -204,7 +203,7 @@ impl Error {

pub fn is_timeout_error(&self) -> bool {
match self {
Error::SubxtRuntimeError(SubxtError::Rpc(RpcError::ClientError(e))) => {
Error::SubxtRuntimeError(SubxtError::Rpc(RpcError::ClientError(e))) =>
match e.downcast_ref::<JsonRpseeError>() {
Some(e) => matches!(e, JsonRpseeError::RequestTimeout),
None => {
Expand All @@ -213,8 +212,7 @@ impl Error {
);
false
},
}
},
},
_ => false,
}
}
Expand Down
19 changes: 8 additions & 11 deletions clients/runtime/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,10 @@ impl SpacewalkParachain {
match result.map_err(Into::<Error>::into) {
Ok(ok) => Ok(ok),
Err(err) => match err.is_invalid_transaction() {
Some(Recoverability::Recoverable(data)) => {
Err(RetryPolicy::Skip(Error::InvalidTransaction(data)))
},
Some(Recoverability::Unrecoverable(data)) => {
Err(RetryPolicy::Throw(Error::InvalidTransaction(data)))
},
Some(Recoverability::Recoverable(data)) =>
Err(RetryPolicy::Skip(Error::InvalidTransaction(data))),
Some(Recoverability::Unrecoverable(data)) =>
Err(RetryPolicy::Throw(Error::InvalidTransaction(data))),
None => {
// Handle other errors
if err.is_pool_too_low_priority() {
Expand Down Expand Up @@ -572,9 +570,8 @@ impl VaultRegistryPallet for SpacewalkParachain {
let query = metadata::storage().vault_registry().vaults(&vault_id.clone());

match self.query_finalized(query).await? {
Some(SpacewalkVault { status: VaultStatus::Liquidated, .. }) => {
Err(Error::VaultLiquidated)
},
Some(SpacewalkVault { status: VaultStatus::Liquidated, .. }) =>
Err(Error::VaultLiquidated),
Some(vault) if &vault.id == vault_id => Ok(vault),
_ => Err(Error::VaultNotFound),
}
Expand Down Expand Up @@ -1082,8 +1079,8 @@ impl IssuePallet for SpacewalkParachain {
while let Ok((issue_id, request)) =
iter.next().await.ok_or(Error::RequestIssueIDNotFound)?
{
if request.status == IssueRequestStatus::Pending
&& request.opentime + issue_period > current_height
if request.status == IssueRequestStatus::Pending &&
request.opentime + issue_period > current_height
{
let key_hash = issue_id.as_slice();
// last bytes are the raw key
Expand Down
20 changes: 8 additions & 12 deletions clients/runtime/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,11 @@ pub mod currency_id {
)
.replace('\"', "")),
},
CurrencyId::ZenlinkLPToken(token1_id, token1_type, token2_id, token2_type) => {
CurrencyId::ZenlinkLPToken(token1_id, token1_type, token2_id, token2_type) =>
Ok(format!(
"ZenlinkLPToken({},{},{},{})",
token1_id, token1_type, token2_id, token2_type
))
},
)),
CurrencyId::Token(token_id) => Ok(format!("Token({})", token_id)),
}
}
Expand Down Expand Up @@ -340,19 +339,16 @@ mod dispatch_error {
RichDispatchError::Other(_) => DispatchError::Other,
RichDispatchError::CannotLookup => DispatchError::CannotLookup,
RichDispatchError::BadOrigin => DispatchError::BadOrigin,
RichDispatchError::Module(RichModuleError { index, error, .. }) => {
DispatchError::Module(ModuleError { index, error })
},
RichDispatchError::Module(RichModuleError { index, error, .. }) =>
DispatchError::Module(ModuleError { index, error }),
RichDispatchError::ConsumerRemaining => DispatchError::ConsumerRemaining,
RichDispatchError::NoProviders => DispatchError::NoProviders,
RichDispatchError::TooManyConsumers => DispatchError::TooManyConsumers,
RichDispatchError::Token(token_error) => DispatchError::Token(token_error.into()),
RichDispatchError::Arithmetic(arithmetic_error) => {
DispatchError::Arithmetic(arithmetic_error.into())
},
RichDispatchError::Transactional(transactional_error) => {
DispatchError::Transactional(transactional_error.into())
},
RichDispatchError::Arithmetic(arithmetic_error) =>
DispatchError::Arithmetic(arithmetic_error.into()),
RichDispatchError::Transactional(transactional_error) =>
DispatchError::Transactional(transactional_error.into()),
RichDispatchError::Exhausted => DispatchError::Exhausted,
sp_runtime::DispatchError::Corruption => DispatchError::Corruption,
sp_runtime::DispatchError::Unavailable => DispatchError::Unavailable,
Expand Down
42 changes: 14 additions & 28 deletions clients/stellar-relay-lib/src/connection/connector/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ use std::{
net::Shutdown,
};
use substrate_stellar_sdk::{
types::{AuthenticatedMessageV0, Curve25519Public, HmacSha256Mac, MessageType},
types::{AuthenticatedMessageV0, Curve25519Public, HmacSha256Mac},
XdrCodec,
};

use substrate_stellar_sdk::types::StellarMessage;
use tracing::{error, trace};

use crate::{
Expand Down Expand Up @@ -34,7 +36,7 @@ pub struct Connector {
receive_scp_messages: bool,

handshake_state: HandshakeState,
flow_controller: FlowController,
pub(crate) flow_controller: FlowController,

/// for writing/reading xdr messages to/from Stellar Node.
pub(crate) tcp_stream: TcpStream,
Expand Down Expand Up @@ -227,20 +229,18 @@ impl Connector {
pub fn got_hello(&mut self) {
self.handshake_state = HandshakeState::GotHello;
}

pub fn handshake_completed(&mut self) {
self.handshake_state = HandshakeState::Completed;
}

pub fn inner_check_to_send_more(&mut self, msg_type: MessageType) -> bool {
self.flow_controller.send_more(msg_type)
}
pub fn enable_flow_controller(
pub fn maybe_start_flow_control_bytes(
&mut self,
local_overlay_version: u32,
remote_overlay_version: u32,
) {
self.flow_controller.enable(local_overlay_version, remote_overlay_version)
) -> StellarMessage {
return self
.flow_controller
.start(local_overlay_version, remote_overlay_version);
}

pub fn handshake_completed(&mut self) {
self.handshake_state = HandshakeState::Completed;
}
}

Expand All @@ -249,12 +249,6 @@ mod test {
use crate::{connection::hmac::HMacKeys, node::RemoteInfo, StellarOverlayConfig};
use serial_test::serial;

use substrate_stellar_sdk::{
compound_types::LimitedString,
types::{Hello, MessageType},
PublicKey,
};

use crate::{
connection::{
authentication::{create_auth_cert, ConnectionAuth},
Expand All @@ -264,6 +258,7 @@ mod test {
node::NodeInfo,
ConnectionInfo,
};
use substrate_stellar_sdk::{compound_types::LimitedString, types::Hello, PublicKey};

use wallet::keys::get_source_secret_key_from_env;

Expand Down Expand Up @@ -424,13 +419,4 @@ mod test {
connector.handshake_completed();
assert!(connector.is_handshake_created());
}

#[tokio::test]
#[serial]
async fn enable_flow_controller_works() {
let (node_info, _, mut connector) = create_connector().await;

assert!(!connector.inner_check_to_send_more(MessageType::ScpMessage));
connector.enable_flow_controller(node_info.overlay_version, node_info.overlay_version);
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use crate::connection::{
authentication::verify_remote_auth_cert,
helper::{error_to_string, time_now},
hmac::HMacKeys,
xdr_converter::parse_authenticated_message,
Connector, Error, Xdr,
use crate::{
connection::{
authentication::verify_remote_auth_cert,
helper::{error_to_string, time_now},
hmac::HMacKeys,
xdr_converter::parse_authenticated_message,
Connector, Error, Xdr,
},
node::RemoteInfo,
};
use substrate_stellar_sdk::{
types::{ErrorCode, Hello, MessageType, StellarMessage},
XdrCodec,
};
use tracing::{error, info, trace, warn};

use crate::node::RemoteInfo;

impl Connector {
/// Processes the raw bytes from the stream
pub(super) async fn process_raw_message(
Expand All @@ -24,7 +25,7 @@ impl Connector {
match msg_type {
MessageType::Transaction | MessageType::FloodAdvert if !self.receive_tx_messages() => {
self.increment_remote_sequence()?;
self.check_to_send_more(MessageType::Transaction).await?;
self.maybe_reclaim_capacity(MessageType::Transaction, data.len()).await?;
},

MessageType::ScpMessage if !self.receive_scp_messages() => {
Expand Down Expand Up @@ -52,8 +53,7 @@ impl Connector {
self.increment_remote_sequence()?;
trace!("process_raw_message(): Processing {msg_type:?} message: auth verified");
}

return self.process_stellar_message(auth_msg.message, msg_type).await;
return self.process_stellar_message(auth_msg.message, msg_type, data.len()).await;
},
}
Ok(None)
Expand All @@ -65,6 +65,7 @@ impl Connector {
&mut self,
msg: StellarMessage,
msg_type: MessageType,
data_len: usize,
) -> Result<Option<StellarMessage>, Error> {
match msg {
StellarMessage::Hello(hello) => {
Expand All @@ -76,7 +77,7 @@ impl Connector {
if self.remote_called_us() {
self.send_hello_message().await?;
} else {
self.send_auth_message().await?;
self.send_auth_message(self.local().node().overlay_version).await?;
}
info!("process_stellar_message(): Hello message processed successfully");
},
Expand All @@ -92,10 +93,11 @@ impl Connector {
}
return Ok(Some(StellarMessage::ErrorMsg(e)));
},

StellarMessage::SendMore(_) => {},
StellarMessage::SendMoreExtended(_) => {},
// we do not handle other messages. Return to caller
other => {
self.check_to_send_more(msg_type).await?;
self.maybe_reclaim_capacity(msg_type, data_len).await?;
return Ok(Some(other));
},
}
Expand All @@ -105,21 +107,21 @@ impl Connector {

async fn process_auth_message(&mut self) -> Result<(), Error> {
if self.remote_called_us() {
self.send_auth_message().await?;
self.send_auth_message(self.local().node().overlay_version).await?;
}

self.handshake_completed();

if let Some(remote) = self.remote() {
self.enable_flow_controller(
let msg = self.maybe_start_flow_control_bytes(
self.local().node().overlay_version,
remote.node().overlay_version,
);
self.send_to_node(msg).await?;
} else {
warn!("process_auth_message(): No remote overlay version after handshake.");
}

self.check_to_send_more(MessageType::Auth).await
Ok(())
}

/// Updates the config based on the hello message that was received from the Stellar Node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,10 @@ pub(crate) async fn poll_messages_from_stellar(

// check for messages from user.
match send_to_node_receiver.try_recv() {
Ok(msg) => {
Ok(msg) =>
if let Err(e) = connector.send_to_node(msg).await {
error!("poll_messages_from_stellar(): Error occurred during sending message to node: {e:?}");
}
},
},
Err(TryRecvError::Disconnected) => break,
Err(TryRecvError::Empty) => {},
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use async_std::io::WriteExt;
use std::time::Duration;
use substrate_stellar_sdk::types::{MessageType, SendMore, StellarMessage};
use substrate_stellar_sdk::types::{MessageType, StellarMessage};
use tokio::time::timeout;
use tracing::debug;

use crate::connection::{
flow_controller::MAX_FLOOD_MSG_CAP,
handshake::create_auth_message,
helper::{time_now, to_base64_xdr_string},
Connector, Error,
Expand Down Expand Up @@ -34,22 +33,25 @@ impl Connector {
self.send_to_node(msg).await
}

pub(super) async fn send_auth_message(&mut self) -> Result<(), Error> {
let msg = create_auth_message();
pub(super) async fn send_auth_message(
&mut self,
local_overlay_version: u32,
) -> Result<(), Error> {
let msg = create_auth_message(local_overlay_version);
debug!("send_auth_message(): Sending Auth Message: {}", to_base64_xdr_string(&msg));

self.send_to_node(create_auth_message()).await
return self.send_to_node(create_auth_message(local_overlay_version)).await;
}

pub(super) async fn check_to_send_more(
pub(super) async fn maybe_reclaim_capacity(
&mut self,
message_type: MessageType,
data_len: usize,
) -> Result<(), Error> {
if !self.inner_check_to_send_more(message_type) {
return Ok(());
}

let msg = StellarMessage::SendMore(SendMore { num_messages: MAX_FLOOD_MSG_CAP });
self.send_to_node(msg).await
let msg = self.flow_controller.create_reclaim_capacity_message(message_type, data_len);
if let Some(inner_msg) = msg {
return self.send_to_node(inner_msg).await;
};
Ok(())
}
}
Loading

0 comments on commit d507d6b

Please sign in to comment.