Skip to content

Commit

Permalink
#538 shutdown the OracleAgent at VaultService level (#539)
Browse files Browse the repository at this point in the history
* add agent to `VaultService`

* remove shutdown subscriber

* remove unused function

* Update clients/stellar-relay-lib/src/overlay.rs

Co-authored-by: Marcel Ebert <[email protected]>

* show error response

* update slot for archive unit tests, and update configs

* fix https://github.com/pendulum-chain/spacewalk/actions/runs/9657641728/job/26648202437?pr=539#step:12:4486

* cargo fmt

---------

Co-authored-by: Marcel Ebert <[email protected]>
  • Loading branch information
b-yap and ebma authored Jun 27, 2024
1 parent 53f81a2 commit 6f9fda6
Show file tree
Hide file tree
Showing 20 changed files with 96 additions and 80 deletions.
8 changes: 0 additions & 8 deletions clients/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,3 @@ where
{
tokio::spawn(run_cancelable(shutdown_rx, future));
}

pub async fn on_shutdown(shutdown_tx: ShutdownSender, future2: impl Future) {
let mut shutdown_rx = shutdown_tx.subscribe();
let future1 = shutdown_rx.recv().fuse();

let _ = future1.await;
future2.await;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
"port": 11625
},
"node_info": {
"ledger_version": 20,
"overlay_version": 32,
"ledger_version": 21,
"overlay_version": 33,
"overlay_min_version": 32,
"version_str": "stellar-core 20.4.0 (7fc7671b8bc1ccc3b1f16a6ab83bc9f671db8b70)",
"version_str": "stellar-core 21.0.0 (c6f474133738ae5f6d11b07963ca841909210273)",
"is_pub_net": true
},
"stellar_history_archive_urls": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
},
"node_info": {
"ledger_version": 21,
"overlay_version": 33,
"overlay_version": 34,
"overlay_min_version": 32,
"version_str": "stellar-core 21.0.0 (c6f474133738ae5f6d11b07963ca841909210273)",
"version_str": "stellar-core 21.1.0 (b3aeb14cc798f6d11deb2be913041be916f3b0cc)",
"is_pub_net": false
},
"stellar_history_archive_urls": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
},
"node_info": {
"ledger_version": 21,
"overlay_version": 33,
"overlay_version": 34,
"overlay_min_version": 32,
"version_str": "stellar-core 21.0.0 (c6f474133738ae5f6d11b07963ca841909210273)",
"version_str": "stellar-core 21.1.0 (b3aeb14cc798f6d11deb2be913041be916f3b0cc)",
"is_pub_net": false
},
"stellar_history_archive_urls": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
},
"node_info": {
"ledger_version": 21,
"overlay_version": 33,
"overlay_version": 34,
"overlay_min_version": 32,
"version_str": "stellar-core 21.0.0 (c6f474133738ae5f6d11b07963ca841909210273)",
"version_str": "stellar-core 21.1.0 (b3aeb14cc798f6d11deb2be913041be916f3b0cc)",
"is_pub_net": false
},
"stellar_history_archive_urls": [
Expand Down
4 changes: 3 additions & 1 deletion clients/stellar-relay-lib/src/overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio::sync::{
Sender,
},
};
use tracing::{error, info};
use tracing::{debug, error, info};

use crate::{
connection::{poll_messages_from_stellar, ConnectionInfo, Connector},
Expand Down Expand Up @@ -59,6 +59,7 @@ impl StellarOverlayConnection {
pub fn listen(&mut self) -> Result<Option<StellarMessage>, Error> {
loop {
if !self.is_alive() {
debug!("listen(): sender half of overlay has closed.");
return Err(Error::Disconnected)
}

Expand Down Expand Up @@ -96,6 +97,7 @@ impl StellarOverlayConnection {

impl Drop for StellarOverlayConnection {
fn drop(&mut self) {
debug!("drop(): shutting down StellarOverlayConnection");
self.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
"port": 11625
},
"node_info": {
"ledger_version": 20,
"overlay_version": 32,
"ledger_version": 21,
"overlay_version": 33,
"overlay_min_version": 32,
"version_str": "stellar-core 20.4.0 (7fc7671b8bc1ccc3b1f16a6ab83bc9f671db8b70)",
"version_str": "stellar-core 21.0.0 (c6f474133738ae5f6d11b07963ca841909210273)",
"is_pub_net": true
},
"stellar_history_archive_urls": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
"port": 11625
},
"node_info": {
"ledger_version": 20,
"overlay_version": 32,
"ledger_version": 21,
"overlay_version": 33,
"overlay_min_version": 32,
"version_str": "stellar-core 20.4.0 (7fc7671b8bc1ccc3b1f16a6ab83bc9f671db8b70)",
"version_str": "stellar-core 21.0.0 (c6f474133738ae5f6d11b07963ca841909210273)",
"is_pub_net": true
},
"stellar_history_archive_urls": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
"port": 11625
},
"node_info": {
"ledger_version": 20,
"overlay_version": 32,
"ledger_version": 21,
"overlay_version": 33,
"overlay_min_version": 32,
"version_str": "stellar-core 20.4.0 (7fc7671b8bc1ccc3b1f16a6ab83bc9f671db8b70)",
"version_str": "stellar-core 21.0.0 (c6f474133738ae5f6d11b07963ca841909210273)",
"is_pub_net": true
},
"stellar_history_archive_urls": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
},
"node_info": {
"ledger_version": 21,
"overlay_version": 33,
"overlay_version": 34,
"overlay_min_version": 32,
"version_str": "stellar-core 21.0.0 (c6f474133738ae5f6d11b07963ca841909210273)",
"version_str": "stellar-core 21.1.0 (b3aeb14cc798f6d11deb2be913041be916f3b0cc)",
"is_pub_net": false
},
"stellar_history_archive_urls": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
},
"node_info": {
"ledger_version": 21,
"overlay_version": 33,
"overlay_version": 34,
"overlay_min_version": 32,
"version_str": "stellar-core 21.0.0 (c6f474133738ae5f6d11b07963ca841909210273)",
"version_str": "stellar-core 21.1.0 (b3aeb14cc798f6d11deb2be913041be916f3b0cc)",
"is_pub_net": false
},
"stellar_history_archive_urls": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
},
"node_info": {
"ledger_version": 21,
"overlay_version": 33,
"overlay_version": 34,
"overlay_min_version": 32,
"version_str": "stellar-core 21.0.0 (c6f474133738ae5f6d11b07963ca841909210273)",
"version_str": "stellar-core 21.1.0 (b3aeb14cc798f6d11deb2be913041be916f3b0cc)",
"is_pub_net": false
},
"stellar_history_archive_urls": [
Expand Down
45 changes: 21 additions & 24 deletions clients/vault/src/oracle/agent.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{sync::Arc, time::Duration};

use service::on_shutdown;
use tokio::{
sync::{mpsc, mpsc::error::TryRecvError, RwLock},
time::{sleep, timeout},
Expand All @@ -20,8 +19,12 @@ use wallet::Slot;
pub struct OracleAgent {
collector: Arc<RwLock<ScpMessageCollector>>,
pub is_public_network: bool,
/// sends message directly to Stellar Node
message_sender: Option<StellarMessageSender>,
/// sends an entire Vault shutdown
shutdown_sender: ShutdownSender,
/// sends a 'stop' signal to `StellarOverlayConnection` poll
overlay_conn_end_signal: mpsc::Sender<()>,
}

/// listens to data to collect the scp messages and txsets.
Expand Down Expand Up @@ -77,14 +80,11 @@ pub async fn start_oracle_agent(
let collector_clone = collector.clone();

let shutdown_sender_clone = shutdown_sender.clone();
// a clone used to forcefully call a shutdown, when StellarOverlay disconnects.
let shutdown_sender_clone2 = shutdown_sender.clone();

// disconnect signal sender tells the StellarOverlayConnection to close its TcpStream to Stellar
// Node
let (disconnect_signal_sender, mut disconnect_signal_receiver) = mpsc::channel::<()>(2);

service::spawn_cancelable(shutdown_sender_clone.subscribe(), async move {
tokio::spawn(async move {
let sender_clone = overlay_conn.sender();
loop {
match disconnect_signal_receiver.try_recv() {
Expand Down Expand Up @@ -113,7 +113,7 @@ pub async fn start_oracle_agent(
Err(e) => {
tracing::error!("start_oracle_agent(): encounter error in overlay: {e:?}");

if let Err(e) = shutdown_sender_clone2.send(()) {
if let Err(e) = shutdown_sender_clone.send(()) {
tracing::error!(
"start_oracle_agent(): Failed to send shutdown signal in thread: {e:?}"
);
Expand All @@ -123,24 +123,18 @@ pub async fn start_oracle_agent(
}
}

tracing::info!("start_oracle_agent(): shutting down overlay connection");
// shutdown the overlay connection
overlay_conn.stop();
});

tokio::spawn(on_shutdown(shutdown_sender.clone(), async move {
tracing::debug!("start_oracle_agent(): sending signal to shutdown overlay connection...");
if let Err(e) = disconnect_signal_sender.send(()).await {
tracing::warn!("start_oracle_agent(): failed to send disconnect signal: {e:?}");
}
}));

Ok(OracleAgent { collector, is_public_network, message_sender: Some(sender), shutdown_sender })
}

impl Drop for OracleAgent {
fn drop(&mut self) {
self.stop();
}
Ok(OracleAgent {
collector,
is_public_network,
message_sender: Some(sender),
shutdown_sender,
overlay_conn_end_signal: disconnect_signal_sender,
})
}

impl OracleAgent {
Expand Down Expand Up @@ -194,10 +188,13 @@ impl OracleAgent {
}

/// Stops listening for new SCP messages.
pub fn stop(&self) {
tracing::debug!("stop(): Shutting down OracleAgent...");
if let Err(e) = self.shutdown_sender.send(()) {
tracing::error!("stop(): Failed to send shutdown signal in OracleAgent: {:?}", e);
pub async fn shutdown(&self) {
tracing::debug!("shutdown(): Shutting down OracleAgent...");
if let Err(e) = self.overlay_conn_end_signal.send(()).await {
tracing::error!(
"shutdown(): Failed to send overlay conn end signal in OracleAgent: {:?}",
e
);
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions clients/vault/src/oracle/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ pub enum Error {

#[error(display = "{}", _0)]
ArchiveError(String),

#[error(display = "{}", _0)]
ArchiveResponseError(String),
}

impl From<StellarSdkError> for Error {
Expand Down
4 changes: 2 additions & 2 deletions clients/vault/src/oracle/storage/impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ mod test {
.expect("should have an archive url")
.clone();
let scp_archive_storage = ScpArchiveStorage(testnet_archive_url);
let slot_index = 235001;
let slot_index = 10;

let scp_archive = scp_archive_storage
.get_archive(slot_index)
Expand Down Expand Up @@ -487,7 +487,7 @@ mod test {
let tx_archive_storage = TransactionsArchiveStorage(testnet_archive_url);

//arrange
let slot_index = 235001;
let slot_index = 10;
let (_url, ref filename) = tx_archive_storage.get_url_and_file_name(slot_index);

//act
Expand Down
5 changes: 4 additions & 1 deletion clients/vault/src/oracle/storage/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,11 @@ pub trait ArchiveStorage {

pub(crate) async fn download_file_and_save(url: &str, file_name: &str) -> Result<(), Error> {
let response = reqwest::get(url).await.map_err(|e| Error::ArchiveError(e.to_string()))?;
let content = response.bytes().await.map_err(|e| Error::ArchiveError(e.to_string()))?;
if response.status().is_server_error() | response.status().is_client_error() {
return Err(Error::ArchiveResponseError(format!("{response:?}")));
}

let content = response.bytes().await.map_err(|e| Error::ArchiveError(e.to_string()))?;
let mut file = File::create(&file_name).map_err(|e| Error::ArchiveError(e.to_string()))?;
file.write_all(content.as_bytes_ref())
.map_err(|e| Error::ArchiveError(e.to_string()))?;
Expand Down
16 changes: 16 additions & 0 deletions clients/vault/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ pub struct VaultService {
shutdown: ShutdownSender,
vault_id_manager: VaultIdManager,
secret_key: String,
agent: Option<Arc<OracleAgent>>,
}

#[async_trait]
Expand All @@ -292,6 +293,7 @@ impl Service<VaultServiceConfig, Error> for VaultService {
async fn start(&mut self) -> Result<(), ServiceError<Error>> {
let result = self.run_service().await;

self.try_shutdown_agent().await;
self.try_shutdown_wallet().await;

if let Err(error) = result {
Expand Down Expand Up @@ -729,6 +731,7 @@ impl VaultService {
shutdown,
vault_id_manager: VaultIdManager::new(spacewalk_parachain, stellar_wallet),
secret_key,
agent: None,
})
}

Expand Down Expand Up @@ -782,6 +785,7 @@ impl VaultService {

let oracle_agent =
self.create_oracle_agent(is_public_network, self.shutdown.clone()).await?;
self.agent = Some(oracle_agent.clone());

self.execute_open_requests(oracle_agent.clone());

Expand Down Expand Up @@ -942,4 +946,16 @@ impl VaultService {
wallet.try_stop_periodic_resubmission_of_transactions().await;
drop(wallet);
}

async fn try_shutdown_agent(&mut self) {
let opt_agent = self.agent.clone();
self.agent = None;

if let Some(arc_agent) = opt_agent {
tracing::info!("try_shutdown_agent(): shutting down agent");
arc_agent.shutdown().await;
} else {
tracing::debug!("try_shutdown_agent(): no agent found");
}
}
}
12 changes: 6 additions & 6 deletions clients/wallet/src/horizon/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,14 @@ async fn fetch_transactions_iter_success() {
// the list should be empty, as the last record of this page was returned.
assert_eq!(txs_iter.records.len(), 0);

// if the next page
let next_page = txs_iter.next_page.clone();
if !next_page.is_empty() {
// continue reading for transactions
assert!(txs_iter.next().await.is_some());

// new records can be read
assert_ne!(txs_iter.records.len(), 0);
// this will access the next page
match txs_iter.next().await {
// if none, then the next pages should be the same.
None => assert_eq!(next_page, txs_iter.next_page),
// the next page should be different already
Some(_) => assert_ne!(next_page, txs_iter.next_page),
}
}

Expand Down
11 changes: 10 additions & 1 deletion clients/wallet/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,16 @@ pub fn wallet_with_secret_key_for_storage(

pub fn default_destination() -> PublicKey {
let dest_secret = get_dest_secret_key_from_env(IS_PUBLIC_NETWORK);
let dest_secret_key = SecretKey::from_encoding(dest_secret).expect("should work");
_public_key(dest_secret)
}

pub fn default_source() -> PublicKey {
let source_secret = get_source_secret_key_from_env(IS_PUBLIC_NETWORK);
_public_key(source_secret)
}

fn _public_key(secret: String) -> PublicKey {
let dest_secret_key = SecretKey::from_encoding(secret).expect("should work");
dest_secret_key.get_public().clone()
}

Expand Down
Loading

0 comments on commit 6f9fda6

Please sign in to comment.