Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

274 Improve handling of issue request execution #276

Merged
merged 14 commits into from
Mar 13, 2023
Merged
205 changes: 155 additions & 50 deletions clients/vault/src/issue.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{convert::TryFrom, sync::Arc, time::Duration};
use std::{collections::HashMap, sync::Arc, time::Duration};

use futures::{channel::mpsc::Sender, future, SinkExt};
use sp_runtime::traits::StaticLookup;
Expand All @@ -12,11 +12,11 @@ use service::Error as ServiceError;
use stellar_relay_lib::sdk::{PublicKey, TransactionEnvelope, XdrCodec};
use wallet::{
types::{FilterWith, TransactionFilterParam},
Ledger, LedgerTxEnvMap,
LedgerTxEnvMap, Slot, SlotTask, SlotTaskStatus,
};

use crate::{
oracle::{types::Slot, *},
oracle::{types::Slot as OracleSlot, OracleAgent},
ArcRwLock, Error, Event,
};

Expand Down Expand Up @@ -159,6 +159,74 @@ fn get_issue_id_from_tx_env(tx_env: &TransactionEnvelope) -> Option<IssueId> {
}
}

// Returns oneshot sender for sending status of a slot.
//
// Checks the map if the slot should be executed/processed.
// If not, returns None.
#[doc(hidden)]
fn create_task_status_sender(
processed_map: &mut HashMap<Slot, SlotTask>,
slot: &Slot,
) -> Option<tokio::sync::oneshot::Sender<SlotTaskStatus>> {
// An existing task is found.
if let Some(existing) = processed_map.get_mut(slot) {
// Only recoverable errors can be given a new task.
return existing.recover_with_new_sender()
}

// Not finding the slot in the map means there's no existing task for it.
let (sender, receiver) = tokio::sync::oneshot::channel();
tracing::trace!("Creating a task for slot {}", slot);

let slot_task = SlotTask::new(*slot, receiver);
processed_map.insert(*slot, slot_task);

Some(sender)
}

// Remove successful tasks and failed ones (and cannot be retried again) from the map.
#[doc(hidden)]
async fn cleanup_ledger_env_map(
processed_map: &mut HashMap<Slot, SlotTask>,
ledger_env_map: ArcRwLock<LedgerTxEnvMap>,
) {
// check the tasks if:
// * processing has finished
// - then remove from the ledger_env_map

// * process failed somehow
// - check if we can retry again
// - there is no chance to process this transaction at all
let mut ledger_map = ledger_env_map.write().await;

// retain only those not yet started or possibly to retry processing again
processed_map.retain(|slot, task| {
match task.update_status() {
// the task is not yet finished/ hasn't started; let's keep it
SlotTaskStatus::Ready |
// the task failed, but is possible to retry again
SlotTaskStatus::RecoverableError => true,

// the task succeeded
SlotTaskStatus::Success |
// the task has reached maximum retries, and cannot be executed again.
SlotTaskStatus::ReachedMaxRetries => {
// we cannot process this again, so remove it from the list.
ledger_map.remove(slot);
false
}
// the task failed and this transaction cannot be executed again
SlotTaskStatus::Failed(e) => {
tracing::error!("{}",e);
// we cannot process this again, so remove it from the list.
ledger_map.remove(slot);
false
}

}
});
}

/// Processes all the issue requests
///
/// # Arguments
Expand All @@ -173,18 +241,39 @@ pub async fn process_issues_requests(
ledger_env_map: ArcRwLock<LedgerTxEnvMap>,
issues: ArcRwLock<IssueRequestsMap>,
) -> Result<(), ServiceError<Error>> {
// collects all the tasks that are executed or about to be executed.
let mut processed_map = HashMap::new();

loop {
for (ledger, _tx_env) in ledger_env_map.read().await.iter() {
let ledger_clone = ledger_env_map.clone();

// iterate over a list of transactions for processing.
for (slot, tx_env) in ledger_env_map.read().await.iter() {
// create a one shot sender
let sender = match create_task_status_sender(&mut processed_map, slot) {
None => continue,
Some(sender) => sender,
};

let parachain_rpc_clone = parachain_rpc.clone();
let issues_clone = issues.clone();
let oracle_agent_clone = oracle_agent.clone();

tokio::spawn(execute_issue(
parachain_rpc.clone(),
ledger_env_map.clone(),
issues.clone(),
oracle_agent.clone(),
Slot::from(*ledger),
parachain_rpc_clone,
tx_env.clone(),
issues_clone,
oracle_agent_clone,
*slot,
sender,
));
}

// Give 5 seconds interval before starting again.
tokio::time::sleep(Duration::from_secs(5)).await;

// before we loop again, let's make sure to clean the map first.
cleanup_ledger_env_map(&mut processed_map, ledger_clone).await;
}
}

Expand All @@ -193,63 +282,79 @@ pub async fn process_issues_requests(
/// # Arguments
///
/// * `parachain_rpc` - the parachain RPC handle
/// * `ledger_env_map` - a list of TransactionEnvelopes and its corresponding ledger it belongs to
/// * `ledger_env_map` - a list of TransactionEnvelopes and its corresponding slot it belongs to
/// * `issues` - a map of all issue requests
/// * `oracle_agent` - the agent used to get the proofs
/// * `slot` - the slot of the transaction envelope it belongs to
pub async fn execute_issue(
parachain_rpc: SpacewalkParachain,
ledger_env_map: ArcRwLock<LedgerTxEnvMap>,
tx_env: TransactionEnvelope,
issues: ArcRwLock<IssueRequestsMap>,
oracle_agent: Arc<OracleAgent>,
slot: Slot,
) -> Result<(), ServiceError<Error>> {
let ledger =
Ledger::try_from(slot).map_err(|e| ServiceError::VaultError(Error::TryIntoIntError(e)))?;

sender: tokio::sync::oneshot::Sender<SlotTaskStatus>,
) {
let slot = OracleSlot::from(slot);
// Get the proof of the given slot
let proof = oracle_agent
.get_proof(slot)
.await
.map_err(|e| ServiceError::OracleError(Error::OracleError(e)))?;
let proof = match oracle_agent.get_proof(slot).await {
Ok(proof) => proof,
Err(e) => {
tracing::error!("Failed to get proof for slot {}: {:?}", slot, e);
if let Err(e) = sender.send(SlotTaskStatus::RecoverableError) {
tracing::error!("Failed to send {:?} status for slot {}", e, slot);
}
return
},
};

let (envelopes, tx_set) = proof.encode();

let mut ledger_env_map = ledger_env_map.write().await;

// Get the transaction envelope where the ledger belongs to
if let Some(tx_env) = ledger_env_map.get(&ledger) {
let tx_env_encoded = {
let tx_env_xdr = tx_env.to_xdr();
base64::encode(tx_env_xdr)
};
let tx_env_encoded = {
let tx_env_xdr = tx_env.to_xdr();
base64::encode(tx_env_xdr)
};

if let Some(issue_id) = get_issue_id_from_tx_env(tx_env) {
// calls the execute_issue of the `Issue` Pallet
match parachain_rpc
.execute_issue(
issue_id,
tx_env_encoded.as_bytes(),
envelopes.as_bytes(),
tx_set.as_bytes(),
)
.await
{
Ok(_) => {
tracing::trace!("Slot {:?} EXECUTED with issue_id: {:?}", ledger, issue_id);
tracing::info!("Issue request {:?} was executed", issue_id);
ledger_env_map.remove(&ledger);
issues.write().await.remove(&issue_id);
},
Err(err) if err.is_issue_completed() => {
tracing::info!("Issue #{} has been completed", issue_id);
},
Err(err) => return Err(err.into()),
}
if let Some(issue_id) = get_issue_id_from_tx_env(&tx_env) {
// calls the execute_issue of the `Issue` Pallet
match parachain_rpc
.execute_issue(
issue_id,
tx_env_encoded.as_bytes(),
envelopes.as_bytes(),
tx_set.as_bytes(),
)
.await
{
Ok(_) => {
tracing::debug!("Slot {:?} executed with issue_id: {:?}", slot, issue_id);
issues.write().await.remove(&issue_id);

if let Err(e) = sender.send(SlotTaskStatus::Success) {
tracing::error!("Failed to send {:?} status for slot {}", e, slot);
}
return
},
Err(err) if err.is_issue_completed() => {
tracing::debug!("Issue #{} has been completed", issue_id);
if let Err(e) = sender.send(SlotTaskStatus::Success) {
tracing::error!("Failed to send {:?} status for slot {}", e, slot);
}
return
},
Err(e) => {
if let Err(e) = sender.send(SlotTaskStatus::Failed(format!("{:?}", e))) {
tracing::error!("Failed to send {:?} status for slot {}", e, slot);
}
return
},
}
}

Ok(())
if let Err(e) =
sender.send(SlotTaskStatus::Failed(format!("Cannot find issue_id for slot {}", slot)))
{
tracing::error!("Failed to send {:?} status for slot {}", e, slot);
}
}

/// The IssueFilter used for
Expand Down
3 changes: 1 addition & 2 deletions clients/vault/tests/vault_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ use tokio::{sync::RwLock, time::sleep};
use primitives::H256;
use runtime::{
integration::*, types::*, FixedPointNumber, FixedU128, IssuePallet, RedeemPallet,
ReplacePallet, ShutdownSender, SpacewalkParachain, StellarRelayPallet, SudoPallet, UtilFuncs,
VaultRegistryPallet,
ReplacePallet, ShutdownSender, SpacewalkParachain, SudoPallet, UtilFuncs, VaultRegistryPallet,
};
use stellar_relay_lib::sdk::{PublicKey, XdrCodec};
use vault::{
Expand Down
36 changes: 15 additions & 21 deletions clients/wallet/src/horizon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ use tokio::{sync::RwLock, time::sleep};
use crate::{
error::Error,
types::{FilterWith, TransactionFilterParam},
Ledger, LedgerTxEnvMap,
LedgerTxEnvMap,
};

pub type PagingToken = u128;
// todo: change to Slot
pub type Ledger = u32;

const POLL_INTERVAL: u64 = 5000;

Expand Down Expand Up @@ -353,7 +355,7 @@ impl<C: HorizonClient> HorizonFetcher<C> {
targets: Arc<RwLock<T>>,
filter: impl FilterWith<TransactionFilterParam<T>>,
last_paging_token: PagingToken,
) -> Result<PagingToken, Error> {
) -> PagingToken {
let res = self.fetch_latest_txs(last_paging_token).await;
let transactions = match res {
Ok(txs) => txs._embedded.records,
Expand All @@ -371,20 +373,20 @@ impl<C: HorizonClient> HorizonFetcher<C> {
let targets = targets.read().await;
for transaction in transactions {
let tx = transaction.clone();
let id = tx.id.clone();

if filter.is_relevant((tx.clone(), targets.clone())) {
tracing::info!(
"Adding transaction {:?} with slot {} to the ledger_env_map",
String::from_utf8(id.clone()),
String::from_utf8(tx.id.clone()),
tx.ledger
);
let tx_env = tx.to_envelope()?;
ledger_env_map.write().await.insert(tx.ledger, tx_env);
if let Ok(tx_env) = tx.to_envelope() {
ledger_env_map.write().await.insert(tx.ledger, tx_env);
}
}
}

Ok(latest_paging_token)
latest_paging_token
}
}

Expand Down Expand Up @@ -415,17 +417,14 @@ where
let mut latest_paging_token: PagingToken = 0;

loop {
if let Ok(new_paging_token) = fetcher
latest_paging_token = fetcher
.fetch_horizon_and_process_new_transactions(
ledger_env_map.clone(),
targets.clone(),
filter.clone(),
latest_paging_token,
)
.await
{
latest_paging_token = new_paging_token;
}
.await;

sleep(Duration::from_millis(POLL_INTERVAL)).await;
}
Expand Down Expand Up @@ -602,18 +601,14 @@ mod tests {

assert!(slot_env_map.read().await.is_empty());

let mut cursor = 0;
if let Ok(next_page) = fetcher
let cursor = fetcher
.fetch_horizon_and_process_new_transactions(
slot_env_map.clone(),
issue_hashes.clone(),
MockFilter,
cursor,
0u128,
)
.await
{
cursor = next_page;
}
.await;

fetcher
.fetch_horizon_and_process_new_transactions(
Expand All @@ -622,8 +617,7 @@ mod tests {
MockFilter,
cursor,
)
.await
.unwrap();
.await;

assert!(!slot_env_map.read().await.is_empty());
}
Expand Down
6 changes: 4 additions & 2 deletions clients/wallet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ use substrate_stellar_sdk::TransactionEnvelope;

pub use horizon::{listen_for_new_transactions, TransactionResponse};
pub use stellar_wallet::StellarWallet;
pub use task::*;

pub mod error;
mod horizon;
mod stellar_wallet;
mod task;
pub mod types;

pub type Ledger = u32;
pub type LedgerTxEnvMap = HashMap<u32, TransactionEnvelope>;
pub type Slot = u32;
pub type LedgerTxEnvMap = HashMap<Slot, TransactionEnvelope>;
Loading