Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite Task of listening Stellar Messages #545

Merged
merged 66 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
cc7c9cc
add logs; change log level
b-yap Jul 5, 2024
5ea8b71
adding started logs for each task
b-yap Jul 5, 2024
9a4b1c4
changing debug logs to info
b-yap Jul 8, 2024
8b75f56
reduce restart interval
b-yap Jul 11, 2024
519b7eb
revert to debug the other logs
b-yap Jul 11, 2024
878470d
make listen() async; use tokio::select
b-yap Jul 30, 2024
67e540c
first iteration
b-yap Aug 1, 2024
12779c7
fix the trait bound `service::Error<error::Error>: From<service::Err…
b-yap Aug 2, 2024
806568d
cleanup clones; add names to threads
b-yap Aug 9, 2024
b110ad1
add a precheck signal
b-yap Aug 12, 2024
fe68519
reduce timeouts; write log every 2 minutes
b-yap Aug 12, 2024
4d3183b
update config and remove precheck when listening for stellar messages
b-yap Aug 13, 2024
53c88de
log every minute
b-yap Aug 13, 2024
8c345e1
cleanup
b-yap Aug 13, 2024
2ad0d7f
reduce timeout by moving it out to `poll_messages_from_stellar()`
b-yap Aug 13, 2024
0cb2927
fix https://github.com/pendulum-chain/spacewalk/actions/runs/10369361…
b-yap Aug 13, 2024
7e891d4
move checking of proof building inside `fn start_oracle_agent()` for …
b-yap Aug 22, 2024
deba3da
rebase
b-yap Sep 4, 2024
970fac4
Merge branch 'main' into rewrite-agent-vault
b-yap Sep 5, 2024
0004c3a
https://github.com/pendulum-chain/spacewalk/actions/runs/10724299988/…
b-yap Sep 9, 2024
a4f2477
await before performing a compare
b-yap Sep 9, 2024
7b45cc5
update test config, and add `ntest` for long-running test cases
b-yap Sep 11, 2024
bcc5903
use ubuntu 20, instead of the latest 22
b-yap Sep 11, 2024
2d664c4
remove tokio spawns on archives; add ntest
b-yap Sep 13, 2024
d5a5adf
switch to ubuntu-latest
b-yap Sep 13, 2024
b8b4387
add ntest to all test cases
b-yap Sep 13, 2024
76fa194
set timeout to 5 minutes
b-yap Sep 13, 2024
cdb10a4
free up disk space
b-yap Sep 16, 2024
59aca69
Use random secret for connecting to overlay in integration tests
ebma Sep 17, 2024
ae50727
Run `cargo +nightly-2024-02-09 fmt --all`
ebma Sep 17, 2024
4777855
Remove ntest timeouts
ebma Sep 17, 2024
413fda1
Add fallback to default stroop fee
ebma Sep 18, 2024
6b2905e
Reduce amount in redeem test
ebma Sep 18, 2024
2cf9075
Fix compilation error
ebma Sep 18, 2024
1335c98
move `is_proof_building_ready` out of oracle agent
b-yap Sep 19, 2024
5b4307c
run release
b-yap Oct 1, 2024
01046af
use `try_recv` instead
b-yap Oct 1, 2024
be7ba9e
use `try_recv` instead
b-yap Oct 1, 2024
ee156ee
Merge remote-tracking branch 'origin/main' into rewrite-agent-vault
b-yap Oct 1, 2024
2eecdae
https://github.com/pendulum-chain/spacewalk/actions/runs/11120883420/…
b-yap Oct 1, 2024
37bc319
https://github.com/pendulum-chain/spacewalk/pull/545#discussion_r1761…
b-yap Oct 1, 2024
5f833d3
clean testing issues
b-yap Oct 2, 2024
c44bac8
fix issues
b-yap Oct 2, 2024
3f03be8
https://github.com/pendulum-chain/spacewalk/actions/runs/11139347479/…
b-yap Oct 3, 2024
6cc05d7
remove env_logger
b-yap Oct 3, 2024
b1800e8
merge from origin/master
b-yap Oct 3, 2024
9cc31b5
fix fmt and clippy issues
b-yap Oct 3, 2024
9d6fe30
fix nightly issues
b-yap Oct 3, 2024
23a1f9b
remove 2nd declaration of agent
b-yap Oct 4, 2024
524342e
https://github.com/pendulum-chain/spacewalk/actions/runs/11175397072/…
b-yap Oct 4, 2024
9b9dfd3
fix timeout issue
b-yap Oct 7, 2024
7b9ddda
remove env_logger
b-yap Oct 8, 2024
6e3f7c3
remove env_logger; move console-subscribe to dev-dependencies; remove…
b-yap Oct 8, 2024
71222e1
reduce changes
b-yap Oct 8, 2024
458ba4f
cargo fmt and clippy
b-yap Oct 8, 2024
3bd0100
https://github.com/pendulum-chain/spacewalk/pull/545#discussion_r1791…
b-yap Oct 8, 2024
e557b44
remove unnecessary dependencies
b-yap Oct 8, 2024
9cad8e4
fix "the `set-output` command is deprecated and will be disabled soon…
b-yap Oct 9, 2024
e7f3e26
https://github.com/pendulum-chain/spacewalk/pull/545#discussion_r1795…
b-yap Oct 11, 2024
0aa969e
address major comments
b-yap Oct 15, 2024
e2b3f02
run cargo fmt
b-yap Oct 15, 2024
c88e317
remove `unwrap()`s
b-yap Oct 16, 2024
8c07a3d
https://github.com/pendulum-chain/spacewalk/pull/545#discussion_r1802…
b-yap Oct 16, 2024
4d40c33
https://github.com/pendulum-chain/spacewalk/actions/runs/11364931112/…
b-yap Oct 21, 2024
636b608
use latest commit version
b-yap Oct 21, 2024
ebb96c4
use `.expect()` since this is only for test feature
b-yap Oct 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
- name: Check Build
run: |
bash ./scripts/cmd-all build check "--release"

- name: Clippy -- Libraries and Binaries
run: |
bash ./scripts/cmd-all clippy "clippy --lib --bins" "--release -- -W clippy::all -A clippy::style -A forgetting_copy_types -A forgetting_references"
Expand Down
81 changes: 75 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 28 additions & 4 deletions clients/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use async_trait::async_trait;
use futures::{future::Either, Future, FutureExt};
use governor::{Quota, RateLimiter};
use nonzero_ext::*;
use tokio::{sync::RwLock, time::sleep};
use tokio::{
sync::{broadcast::error::TryRecvError, RwLock},
time::sleep,
};
pub use warp;

pub use cli::{LoggingFormat, MonitoringConfig, RestartPolicy, ServiceConfig};
Expand Down Expand Up @@ -170,17 +173,38 @@ impl<Config: Clone + Send + 'static, F: Fn()> ConnectionManager<Config, F> {
}
}

pub async fn wait_or_shutdown<F, E>(shutdown_tx: ShutdownSender, future2: F) -> Result<(), E>
pub async fn wait_or_shutdown<F, E>(
shutdown_tx: ShutdownSender,
future2: F,
// a consumer that receives a precheck signal to start a task.
precheck_signal: Option<tokio::sync::broadcast::Receiver<()>>,
) -> Result<(), E>
where
F: Future<Output = Result<(), E>>,
{
if let Some(mut precheck_signal) = precheck_signal {
loop {
match precheck_signal.try_recv() {
// Received a signal to start the task
Ok(_) => break,
Err(TryRecvError::Empty) =>
tracing::trace!("wait_or_shutdown precheck signal: waiting..."),
Comment on lines +190 to +191
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is problematic because we are again creating tasks that never yield, potentially causing issues for the scheduling of other tasks.

Suggested change
Err(TryRecvError::Empty) =>
tracing::trace!("wait_or_shutdown precheck signal: waiting..."),
Err(TryRecvError::Empty) => {
tracing::trace!("wait_or_shutdown precheck signal: waiting..."),
sleep(Duration::from_millis(100)).await; // Yield to the runtime
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we can use precheck_signal.recv().await? this one is async.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

await will not yield; it will potentially wait forever. I've yielded instead of the sleep.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if the receiver precheck_signal has already consumed the message then next time calling recv().await, it will yield until a new one arrives. Or do you think there will always be a message in the broadcast channel?

This implementation is equivalent though. But with try_recv we do need to yield that's true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think there will always be a message in the broadcast channel?

It will only send 1 () message. See lines 335-337 in clients/vault/src/requests/execution.rs.

The first try was to use recv().await but the possibility of it waiting forever is there.

However.. we also cannot move forward with the task anyway, if the precheck (or the execute_open_requests) is not finished.
I also would not like to wrap the recv() in a timeout() because open requests might really take longer than the timeout we set. Also, a timeout is another thread; That's 8 tasks requiring prechecks, meaning 8 additional threads. (See Marcel's comment for the image of these tasks: Issue Cancel Listener, Issue Request Listener, etc.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Readability, reverting to recv().await would be better. I would add that back, but we have to risk it awaiting.
I think execute_open_requests() is reliable enough to be able to send the message fast.

// Precheck signal failed. Cannot start the task.
Err(e) => {
tracing::error!("Error receiving precheck signal: {:?}", e);
return Ok(());
},
}
}
}

match run_cancelable(shutdown_tx.subscribe(), future2).await {
TerminationStatus::Cancelled => {
tracing::trace!("Received shutdown signal");
tracing::trace!("wait_or_shutdown(): Received shutdown signal");
Ok(())
},
TerminationStatus::Completed(res) => {
tracing::trace!("Sending shutdown signal");
tracing::trace!("wait_or_shutdown(): Sending shutdown signal");
let _ = shutdown_tx.send(());
res
},
Expand Down
3 changes: 2 additions & 1 deletion clients/stellar-relay-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ name = "stellar_relay_lib"
path = "src/lib.rs"

[dev-dependencies]
env_logger = "0.9.0"
ntest = "0.9.0"
serial_test = "0.9.0"
wallet = { path = "../wallet", features = ["testing-utils"] }
console-subscriber = { version = "0.3.0" }

[dependencies]
hex = "0.4.3"
Expand Down
65 changes: 40 additions & 25 deletions clients/stellar-relay-lib/examples/connect.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use stellar_relay_lib::{
connect_to_stellar_overlay_network,
sdk::types::{ScpStatementPledges, StellarMessage},
StellarOverlayConfig,
Error, StellarOverlayConfig,
};

use wallet::keys::get_source_secret_key_from_env;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
console_subscriber::init();

let args: Vec<String> = std::env::args().collect();
let arg_network = if args.len() > 1 { &args[1] } else { "testnet" };
Expand All @@ -22,33 +22,48 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let secret_key = get_source_secret_key_from_env(arg_network == "mainnet");

let mut overlay_connection = connect_to_stellar_overlay_network(cfg, &secret_key).await?;

while let Ok(Some(msg)) = overlay_connection.listen() {
match msg {
StellarMessage::ScpMessage(msg) => {
let node_id = msg.statement.node_id.to_encoding();
let node_id = base64::encode(&node_id);
let slot = msg.statement.slot_index;

let stmt_type = match msg.statement.pledges {
ScpStatementPledges::ScpStPrepare(_) => "ScpStPrepare",
ScpStatementPledges::ScpStConfirm(_) => "ScpStConfirm",
ScpStatementPledges::ScpStExternalize(_) => "ScpStExternalize",
ScpStatementPledges::ScpStNominate(_) => "ScpStNominate ",
};
tracing::info!(
"{} sent StellarMessage of type {} for ledger {}",
node_id,
stmt_type,
slot
);
let mut overlay_connection = connect_to_stellar_overlay_network(cfg, secret_key).await?;

loop {
match overlay_connection.listen().await {
Ok(Some(msg)) => match msg {
StellarMessage::Hello(_) => {
tracing::info!("received Hello message");
},

StellarMessage::ScpMessage(msg) => {
let node_id = msg.statement.node_id.to_encoding();
let node_id = base64::encode(&node_id);
let slot = msg.statement.slot_index;

let stmt_type = match msg.statement.pledges {
ScpStatementPledges::ScpStPrepare(_) => "ScpStPrepare",
ScpStatementPledges::ScpStConfirm(_) => "ScpStConfirm",
ScpStatementPledges::ScpStExternalize(_) => "ScpStExternalize",
ScpStatementPledges::ScpStNominate(_) => "ScpStNominate ",
};
tracing::info!(
"{} sent StellarMessage of type {} for ledger {}",
node_id,
stmt_type,
slot
);
},
_ => {
let _ = overlay_connection.send_to_node(StellarMessage::GetPeers).await;
},
},
Ok(None) => {},
Err(Error::Timeout) => {
tracing::warn!("took more than a second to respond");
},
_ => {
let _ = overlay_connection.send_to_node(StellarMessage::GetPeers).await;
Err(e) => {
tracing::error!("Error: {:?}", e);
break
},
}
}

tracing::info!("ooops, connection stopped ");
Ok(())
}
13 changes: 8 additions & 5 deletions clients/stellar-relay-lib/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,16 @@ impl StellarOverlayConfig {

#[allow(dead_code)]
pub(crate) fn node_info(&self) -> NodeInfo {
self.node_info.clone().into()
NodeInfo::new(&self.node_info)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reduce clones

}

#[allow(dead_code)]
pub(crate) fn connection_info(&self, secret_key: &str) -> Result<ConnectionInfo, Error> {
pub(crate) fn connection_info(
&self,
secret_key_as_string: String,
) -> Result<ConnectionInfo, Error> {
let cfg = &self.connection_info;
let secret_key = SecretKey::from_encoding(secret_key)?;
let secret_key = SecretKey::from_encoding(secret_key_as_string)?;

let public_key = secret_key.get_public().to_encoding();
let public_key = std::str::from_utf8(&public_key).unwrap();
Expand Down Expand Up @@ -128,9 +131,9 @@ impl ConnectionInfoCfg {
/// Returns the `StellarOverlayConnection` if connection is a success, otherwise an Error
pub async fn connect_to_stellar_overlay_network(
cfg: StellarOverlayConfig,
secret_key: &str,
secret_key_as_string: String,
) -> Result<StellarOverlayConnection, Error> {
let conn_info = cfg.connection_info(secret_key)?;
let conn_info = cfg.connection_info(secret_key_as_string)?;
let local_node = cfg.node_info;

StellarOverlayConnection::connect(local_node.into(), conn_info).await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ mod test {
let cfg =
StellarOverlayConfig::try_from_path(cfg_file_path).expect("should create a config");
let node_info = cfg.node_info();
let conn_info = cfg.connection_info(&secret_key).expect("should create a connection info");
let conn_info = cfg.connection_info(secret_key).expect("should create a connection info");
// this is a channel to communicate with the connection/config (this needs renaming)

let connector = Connector::start(node_info.clone(), conn_info.clone())
Expand Down Expand Up @@ -333,7 +333,7 @@ mod test {
cert: new_auth_cert,
nonce: [0; 32],
};
connector.set_remote(RemoteInfo::new(&hello));
connector.set_remote(RemoteInfo::new(hello));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consume hello


assert!(connector.remote().is_some());
}
Expand All @@ -357,7 +357,7 @@ mod test {
cert: new_auth_cert,
nonce: [0; 32],
};
connector.set_remote(RemoteInfo::new(&hello));
connector.set_remote(RemoteInfo::new(hello));
assert_eq!(connector.remote().unwrap().sequence(), 0);

connector.increment_remote_sequence().unwrap();
Expand Down Expand Up @@ -385,7 +385,7 @@ mod test {
cert: new_auth_cert,
nonce: [0; 32],
};
let remote = RemoteInfo::new(&hello);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hello is not being used again, therefore it's best to just pass the value directly, instead of reference.

let remote = RemoteInfo::new(hello);
let remote_nonce = remote.nonce();
connector.set_remote(remote.clone());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use crate::connection::{
Connector, Error,
};
use substrate_stellar_sdk::{
types::{AuthenticatedMessage, AuthenticatedMessageV0, HmacSha256Mac, StellarMessage},
compound_types::LimitedString,
types::{
AuthenticatedMessage, AuthenticatedMessageV0, ErrorCode, HmacSha256Mac, StellarMessage,
},
XdrCodec,
};

Expand Down Expand Up @@ -75,3 +78,14 @@ impl Connector {
)
}
}

/// Create our own error to send over to the user/outsider.
pub(crate) fn crate_specific_error() -> StellarMessage {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mostly for lib-related errors, not necessarily caused by Stellar itself. But to communicate this error back to the user/caller, we have to wrap it with StellarMessage::ErrorMsg().

let error = "Stellar Relay Error".as_bytes().to_vec();
let error = substrate_stellar_sdk::types::Error {
code: ErrorCode::ErrMisc,
msg: LimitedString::new(error).expect("should return a valid LimitedString"),
};

StellarMessage::ErrorMsg(error)
}
Loading
Loading