diff --git a/clients/vault/src/issue.rs b/clients/vault/src/issue.rs index a0a098da8..59f04747c 100644 --- a/clients/vault/src/issue.rs +++ b/clients/vault/src/issue.rs @@ -12,7 +12,11 @@ use service::Error as ServiceError; use stellar_relay_lib::sdk::{PublicKey, TransactionEnvelope, XdrCodec}; use wallet::{ types::{FilterWith, TransactionFilterParam}, - Ledger, LedgerTask, LedgerTaskStatus, LedgerTxEnvMap, + LedgerTxEnvMap, + // todo: replace all `Ledger` names to `Slot` + Slot as Ledger, + SlotTask, + SlotTaskStatus, }; use crate::{ @@ -165,20 +169,20 @@ fn get_issue_id_from_tx_env(tx_env: &TransactionEnvelope) -> Option { // If not, returns None. #[doc(hidden)] fn create_task_status_sender( - processed_map: &mut HashMap, + processed_map: &mut HashMap, ledger: &Ledger, -) -> Option> { +) -> Option> { // An existing task is found. if let Some(existing) = processed_map.get_mut(ledger) { // Only recoverable errors can be given a new task. - return existing.recover_and_create_sender() + return existing.recover_with_new_sender() } // Not finding the ledger in the map means there's no existing task for this ledger. let (sender, receiver) = tokio::sync::oneshot::channel(); tracing::trace!("Creating a task for ledger {}", ledger); - let ledger_task = LedgerTask::create(*ledger, receiver); + let ledger_task = SlotTask::create(*ledger, receiver); processed_map.insert(*ledger, ledger_task); Some(sender) @@ -187,7 +191,7 @@ fn create_task_status_sender( // Remove successful tasks and failed ones (and cannot be retried again) from the map. #[doc(hidden)] async fn cleanup_ledger_env_map( - processed_map: &mut HashMap, + processed_map: &mut HashMap, ledger_env_map: ArcRwLock, ) { // check the tasks if: @@ -203,19 +207,20 @@ async fn cleanup_ledger_env_map( processed_map.retain(|ledger, task| { match task.status() { // the task is not yet finished/ hasn't started; let's keep it - LedgerTaskStatus::NotStarted | + SlotTaskStatus::Ready | // the task failed, but is possible to retry again - LedgerTaskStatus::RecoverableError => true, + SlotTaskStatus::RecoverableError => true, // the task succeeded - LedgerTaskStatus::ProcessSuccess => { + SlotTaskStatus::Success | + // the task has reached maximum retries, and cannot be executed again. + SlotTaskStatus::ReachedMaxRetries => { // we cannot process this again, so remove it from the list. ledger_map.remove(ledger); false - } // the task failed and this transaction cannot be executed again - LedgerTaskStatus::Error(e) => { + SlotTaskStatus::Failed(e) => { tracing::error!("{}",e); // we cannot process this again, so remove it from the list. ledger_map.remove(ledger); @@ -224,8 +229,6 @@ async fn cleanup_ledger_env_map( } }); - - drop(ledger_map); } /// Processes all the issue requests @@ -293,12 +296,12 @@ pub async fn execute_issue( issues: ArcRwLock, oracle_agent: Arc, slot: Slot, - sender: tokio::sync::oneshot::Sender, + sender: tokio::sync::oneshot::Sender, ) { let ledger = match Ledger::try_from(slot) { Ok(ledger) => ledger, Err(e) => { - if let Err(e) = sender.send(LedgerTaskStatus::Error(format!("{:?}", e))) { + if let Err(e) = sender.send(SlotTaskStatus::Failed(format!("{:?}", e))) { tracing::error!("Failed to send {:?} status for slot {}", e, slot); } return @@ -310,7 +313,7 @@ pub async fn execute_issue( Ok(proof) => proof, Err(e) => { tracing::error!("Failed to get proof for ledger {}: {:?}", ledger, e); - if let Err(e) = sender.send(LedgerTaskStatus::RecoverableError) { + if let Err(e) = sender.send(SlotTaskStatus::RecoverableError) { tracing::error!("Failed to send {:?} status for slot {}", e, slot); } return @@ -339,20 +342,20 @@ pub async fn execute_issue( tracing::debug!("Slot {:?} executed with issue_id: {:?}", ledger, issue_id); issues.write().await.remove(&issue_id); - if let Err(e) = sender.send(LedgerTaskStatus::ProcessSuccess) { + if let Err(e) = sender.send(SlotTaskStatus::Success) { tracing::error!("Failed to send {:?} status for slot {}", e, slot); } return }, Err(err) if err.is_issue_completed() => { tracing::debug!("Issue #{} has been completed", issue_id); - if let Err(e) = sender.send(LedgerTaskStatus::ProcessSuccess) { + if let Err(e) = sender.send(SlotTaskStatus::Success) { tracing::error!("Failed to send {:?} status for slot {}", e, slot); } return }, Err(e) => { - if let Err(e) = sender.send(LedgerTaskStatus::Error(format!("{:?}", e))) { + if let Err(e) = sender.send(SlotTaskStatus::Failed(format!("{:?}", e))) { tracing::error!("Failed to send {:?} status for slot {}", e, slot); } return @@ -361,7 +364,7 @@ pub async fn execute_issue( } if let Err(e) = - sender.send(LedgerTaskStatus::Error(format!("Cannot find issue_id for ledger {}", ledger))) + sender.send(SlotTaskStatus::Failed(format!("Cannot find issue_id for ledger {}", ledger))) { tracing::error!("Failed to send {:?} status for slot {}", e, slot); } diff --git a/clients/wallet/src/horizon.rs b/clients/wallet/src/horizon.rs index 12b5435d4..63956ec35 100644 --- a/clients/wallet/src/horizon.rs +++ b/clients/wallet/src/horizon.rs @@ -9,10 +9,12 @@ use tokio::{sync::RwLock, time::sleep}; use crate::{ error::Error, types::{FilterWith, TransactionFilterParam}, - Ledger, LedgerTxEnvMap, + LedgerTxEnvMap, }; pub type PagingToken = u128; +// todo: change to Slot +pub type Ledger = u32; const POLL_INTERVAL: u64 = 5000; diff --git a/clients/wallet/src/lib.rs b/clients/wallet/src/lib.rs index a1dc014d2..627ff2579 100644 --- a/clients/wallet/src/lib.rs +++ b/clients/wallet/src/lib.rs @@ -11,5 +11,5 @@ mod stellar_wallet; mod task; pub mod types; -pub type Ledger = u32; -pub type LedgerTxEnvMap = HashMap; +pub type Slot = u32; +pub type LedgerTxEnvMap = HashMap; diff --git a/clients/wallet/src/task.rs b/clients/wallet/src/task.rs index 008e42559..a54ff92a9 100644 --- a/clients/wallet/src/task.rs +++ b/clients/wallet/src/task.rs @@ -1,103 +1,137 @@ -use crate::Ledger; +use crate::Slot; use tokio::sync::oneshot::{channel, error::TryRecvError, Receiver, Sender}; -pub const LEDGER_TASK_MAX_RETRIES: u8 = 3; /// Determines the status of the task of processing a transaction #[derive(Debug, Clone, Eq, PartialEq)] -pub enum LedgerTaskStatus { - NotStarted, - /// Process Done and it was a success - ProcessSuccess, +pub enum SlotTaskStatus { + Ready, + /// The task ended and it was a success + Success, /// The error is acceptable, and is best to just retry again RecoverableError, - /// Something happened when - Error(String), + /// The task failed. + Failed(String), + /// Cannot reprocess the task since the max number of retries have been reached. + ReachedMaxRetries, } -pub struct LedgerTask { - ledger: Ledger, +/// Constructs a task dedicated to a given slot. +pub struct SlotTask { + slot: Slot, /// receives the status of the task execution - task_execution_receiver: Receiver, + task_execution_receiver: Receiver, /// the number of retries allowed to process this specific task retries_remaining: u8, /// The latest status of this task - latest_status: LedgerTaskStatus, + latest_status: SlotTaskStatus, } -impl LedgerTask { - /// Creates a task structure the `TransactionEnvelope` - pub fn create(ledger: Ledger, task_execution_receiver: Receiver) -> Self { - LedgerTask { - ledger, +impl SlotTask { + const DEFAULT_MAX_RETRIES: u8 = 3; + /// Creates a task structure for the provided slot + /// + /// # Arguments + /// + /// * `slot` - the slot number for this task + /// * `task_execution_receiver` - a oneshot receiver to receive `SlotTaskStatus` message. + pub fn new(slot: Slot, task_execution_receiver: Receiver) -> Self { + SlotTask::new_with_max_retries(slot, task_execution_receiver, SlotTask::DEFAULT_MAX_RETRIES) + } + + /// Creates a task structure for the provide slot, and can specify how many retries + /// a failed (with a recoverable error) task can be executed. + pub fn new_with_max_retries( + slot: Slot, + task_execution_receiver: Receiver, + max_retries: u8, + ) -> Self { + SlotTask { + slot, task_execution_receiver, // The decrement action is called everytime a new receiver is set. - retries_remaining: LEDGER_TASK_MAX_RETRIES, - latest_status: LedgerTaskStatus::NotStarted, + retries_remaining: max_retries, + latest_status: SlotTaskStatus::Ready, } } + pub fn slot(&self) -> Slot { + self.slot + } + + /// Checks the status of the task + pub fn status(&mut self) -> SlotTaskStatus { + match self.latest_status { + // These status are considered final, and cannot be updated anymore. + SlotTaskStatus::Success | + SlotTaskStatus::Failed(_) | + SlotTaskStatus::ReachedMaxRetries => {}, + + // The status is immediately "ReachedMaxRetries", after exhausting the max number of + // retries. + _ if self.retries_remaining == 0 => { + tracing::warn!( + "For slot {}, maximum number of retries has been reached.", + self.slot + ); + self.latest_status = SlotTaskStatus::ReachedMaxRetries; + }, + + _ => { + self.latest_status = match &self.task_execution_receiver.try_recv() { + // Only when the retries ACTUALLY reached maximum, should the status be `ReachedMaxRetries`. + Ok(SlotTaskStatus::ReachedMaxRetries) | + // The status can only be "Ready" during initialization AND when calling the method + // `recover_with_new_sender` + Ok(SlotTaskStatus::Ready)=> { + tracing::warn!("For slot {}, user can only send the ff. status: Success, Failed, RecoverableError", self.slot); + self.latest_status.clone() + } + Ok(new_status) => { new_status.clone() }, + Err(TryRecvError::Empty) => SlotTaskStatus::Ready, + Err(TryRecvError::Closed) => SlotTaskStatus::RecoverableError + }; + }, + } + + self.latest_status.clone() + } + /// Sets a new one shot receiver. /// Returns true if it was successful. - pub fn change_receiver(&mut self, receiver: Receiver) -> bool { + fn set_receiver(&mut self, receiver: Receiver) -> bool { match self.latest_status { - LedgerTaskStatus::NotStarted | LedgerTaskStatus::RecoverableError + SlotTaskStatus::Ready | SlotTaskStatus::RecoverableError if self.retries_remaining > 0 => { self.task_execution_receiver = receiver; self.retries_remaining = self.retries_remaining.saturating_sub(1); true }, + // Change of status is NOT allowed for tasks with latest status of either "Success" or + // "Failed". "Success" and "Failed" status are considered final. _ => false, } } - pub fn ledger(&self) -> Ledger { - self.ledger - } - - /// Checks the status of the task - pub fn status(&mut self) -> LedgerTaskStatus { - if self.retries_remaining == 0 { - self.latest_status = LedgerTaskStatus::Error(format!( - "Transaction for ledger {} reached max limit of retries.", - self.ledger - )); - } else { - self.latest_status = match &self.task_execution_receiver.try_recv() { - Ok(new_status) => match self.latest_status { - LedgerTaskStatus::NotStarted | LedgerTaskStatus::RecoverableError => - new_status.clone(), - LedgerTaskStatus::ProcessSuccess | LedgerTaskStatus::Error(_) => - self.latest_status.clone(), - }, - Err(TryRecvError::Empty) => LedgerTaskStatus::NotStarted, - Err(TryRecvError::Closed) => LedgerTaskStatus::RecoverableError, - }; - } - - self.latest_status.clone() - } - /// Returns oneshot sender for sending status of a ledger /// if and only if the status is `RecoverableError` - pub fn recover_and_create_sender(&mut self) -> Option> { + pub fn recover_with_new_sender(&mut self) -> Option> { // Only recoverable errors can be given a new task. - if self.status() == LedgerTaskStatus::RecoverableError { + if self.status() == SlotTaskStatus::RecoverableError { let (sender, receiver) = channel(); - if self.change_receiver(receiver) { - self.latest_status = LedgerTaskStatus::NotStarted; + if self.set_receiver(receiver) { + self.latest_status = SlotTaskStatus::Ready; return Some(sender) } } - // For tasks flagged as `NotStarted` , wait for it. - // For tasks flagged as `ProcessSuccess`, they will be removed in the for loop + // For tasks flagged as `Ready` , wait for it. + // For tasks flagged as `Success`, they will be removed in the for loop // For tasks flagged as `Error`, these will be removed. - // continue with the loop None } } @@ -106,101 +140,163 @@ impl LedgerTask { mod test { use super::*; - #[test] - fn test_create() { - let ledger = 10; - let (sender, receiver) = channel(); + fn dummy_error() -> String { + "this is a test".to_string() + } - let task = LedgerTask::create(ledger, receiver); + #[test] + fn test_task_creation() { + { + let slot = 10; + let (_, receiver) = channel(); - assert_eq!(task.ledger, ledger); - assert_eq!(task.retries_remaining, LEDGER_TASK_MAX_RETRIES); - assert_eq!(task.latest_status, LedgerTaskStatus::NotStarted); + let task = SlotTask::new(slot, receiver); - drop(sender); + assert_eq!(task.slot, slot); + assert_eq!(task.retries_remaining, SlotTask::DEFAULT_MAX_RETRIES); + assert_eq!(task.latest_status, SlotTaskStatus::Ready); + } + { + let slot = 50; + let max_retries = 10; + let (_, receiver) = channel(); + let task = SlotTask::new_with_max_retries(slot, receiver, max_retries); + assert_eq!(task.slot, slot); + assert_eq!(task.retries_remaining, max_retries); + assert_eq!(task.latest_status, SlotTaskStatus::Ready); + } } #[test] - fn test_status() { + fn status_change_success() { { let (sender, receiver) = channel(); - let mut task = LedgerTask::create(15, receiver); + let mut task = SlotTask::new(15, receiver); sender - .send(LedgerTaskStatus::ProcessSuccess) - .expect("should be able to send status"); - assert_eq!(task.status(), LedgerTaskStatus::ProcessSuccess); + .send(SlotTaskStatus::Success) + .expect("should be able to send Success status"); + assert_eq!(task.status(), SlotTaskStatus::Success); } { let (sender, receiver) = channel(); - let mut task = LedgerTask::create(20, receiver); + let mut task = SlotTask::new(20, receiver); + + sender + .send(SlotTaskStatus::Failed(dummy_error())) + .expect("should be able to send Failed status"); + assert_eq!(task.status(), SlotTaskStatus::Failed(dummy_error())); + } + } + + #[test] + fn status_change_failed() { + { + let (sender, receiver) = channel(); + let mut task = SlotTask::new(40, receiver); sender - .send(LedgerTaskStatus::Error("this is a test".to_string())) + .send(SlotTaskStatus::ReachedMaxRetries) .expect("should be able to send status"); - assert_eq!(task.status(), LedgerTaskStatus::Error("this is a test".to_string())); + assert_ne!(task.status(), SlotTaskStatus::ReachedMaxRetries); + } + { + let (sender, receiver) = channel(); + let mut task = SlotTask::new(40, receiver); + + sender.send(SlotTaskStatus::Ready).expect("should be able to send status"); + // actually the status remains the same. It's just that the "change" was never made. + assert_eq!(task.status(), SlotTaskStatus::Ready); } } #[test] - fn recover_and_create_sender_success() { + fn recover_with_new_sender_success() { { - let ledger = 10; let (sender, receiver) = channel(); - let mut task = LedgerTask::create(ledger, receiver); + let mut task = SlotTask::new(10, receiver); sender - .send(LedgerTaskStatus::RecoverableError) - .expect("should be able to send a status"); - assert_eq!(task.status(), LedgerTaskStatus::RecoverableError); + .send(SlotTaskStatus::RecoverableError) + .expect("should be able to send a RecoverableError status"); + assert_eq!(task.status(), SlotTaskStatus::RecoverableError); // let's try to recover: - let new_sender = task.recover_and_create_sender().expect("should return a sender"); - assert_eq!(task.status(), LedgerTaskStatus::NotStarted); + let new_sender = task.recover_with_new_sender().expect("should return a sender"); + assert_eq!(task.status(), SlotTaskStatus::Ready); new_sender - .send(LedgerTaskStatus::ProcessSuccess) + .send(SlotTaskStatus::Success) .expect("should be able to send a status"); - assert_eq!(task.status(), LedgerTaskStatus::ProcessSuccess); + assert_eq!(task.status(), SlotTaskStatus::Success); } { let (sender, receiver) = channel(); - let mut task = LedgerTask::create(12, receiver); + let mut task = SlotTask::new(12, receiver); sender - .send(LedgerTaskStatus::RecoverableError) + .send(SlotTaskStatus::RecoverableError) .expect("should be able to send a status"); - assert_eq!(task.status(), LedgerTaskStatus::RecoverableError); + assert_eq!(task.status(), SlotTaskStatus::RecoverableError); // let's try to recover: - let new_sender = task.recover_and_create_sender().expect("should return a sender"); - assert_eq!(task.status(), LedgerTaskStatus::NotStarted); + let new_sender = task.recover_with_new_sender().expect("should return a sender"); + assert_eq!(task.status(), SlotTaskStatus::Ready); new_sender - .send(LedgerTaskStatus::Error("this is a test".to_string())) + .send(SlotTaskStatus::Failed(dummy_error())) .expect("should be able to send a status"); - assert_ne!(task.status(), LedgerTaskStatus::NotStarted); + assert_ne!(task.status(), SlotTaskStatus::Ready); } } #[test] - fn recover_and_create_sender_failed() { + fn recover_with_new_sender_failed() { { let (sender, receiver) = channel(); - let mut task = LedgerTask::create(5, receiver); + let mut task = SlotTask::new(5, receiver); - sender.send(LedgerTaskStatus::ProcessSuccess).expect("should be able to send"); - assert!(task.recover_and_create_sender().is_none()); + sender.send(SlotTaskStatus::Success).expect("should be able to send"); + assert!(task.recover_with_new_sender().is_none()); } { let (sender, receiver) = channel(); - let mut task = LedgerTask::create(5, receiver); + let mut task = SlotTask::new(5, receiver); sender - .send(LedgerTaskStatus::Error("this is a test".to_string())) + .send(SlotTaskStatus::Failed(dummy_error())) .expect("should be able to send"); - assert!(task.recover_and_create_sender().is_none()); + assert!(task.recover_with_new_sender().is_none()); + } + } + + #[test] + fn max_retries_exhausted() { + fn exhaustion_test(sender: Sender, mut task: SlotTask, max_retries: u8) { + let mut sender = sender; + // let's exhaust the retries + for _ in 0..max_retries { + assert_eq!(task.status(), SlotTaskStatus::Ready); + sender.send(SlotTaskStatus::RecoverableError).expect("should send status"); + sender = task.recover_with_new_sender().expect("should return a sender"); + } + assert_eq!(task.retries_remaining, 0); + assert_eq!(task.status(), SlotTaskStatus::ReachedMaxRetries); + } + + { + let (sender, receiver) = channel(); + let task = SlotTask::new(123, receiver); + + exhaustion_test(sender, task, SlotTask::DEFAULT_MAX_RETRIES); + } + { + let max_retries = 8; + let (sender, receiver) = channel(); + let task = SlotTask::new_with_max_retries(321, receiver, max_retries); + + exhaustion_test(sender, task, max_retries) } } }