Skip to content

Commit

Permalink
rename variants for enum, to make it easier to understand.
Browse files Browse the repository at this point in the history
  • Loading branch information
b-yap committed Mar 7, 2023
1 parent e565d48 commit cf7d387
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 119 deletions.
43 changes: 23 additions & 20 deletions clients/vault/src/issue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ use service::Error as ServiceError;
use stellar_relay_lib::sdk::{PublicKey, TransactionEnvelope, XdrCodec};
use wallet::{
types::{FilterWith, TransactionFilterParam},
Ledger, LedgerTask, LedgerTaskStatus, LedgerTxEnvMap,
LedgerTxEnvMap,
// todo: replace all `Ledger` names to `Slot`
Slot as Ledger,
SlotTask,
SlotTaskStatus,
};

use crate::{
Expand Down Expand Up @@ -165,20 +169,20 @@ fn get_issue_id_from_tx_env(tx_env: &TransactionEnvelope) -> Option<IssueId> {
// If not, returns None.
#[doc(hidden)]
fn create_task_status_sender(
processed_map: &mut HashMap<Ledger, LedgerTask>,
processed_map: &mut HashMap<Ledger, SlotTask>,
ledger: &Ledger,
) -> Option<tokio::sync::oneshot::Sender<LedgerTaskStatus>> {
) -> Option<tokio::sync::oneshot::Sender<SlotTaskStatus>> {
// An existing task is found.
if let Some(existing) = processed_map.get_mut(ledger) {
// Only recoverable errors can be given a new task.
return existing.recover_and_create_sender()
return existing.recover_with_new_sender()
}

// Not finding the ledger in the map means there's no existing task for this ledger.
let (sender, receiver) = tokio::sync::oneshot::channel();
tracing::trace!("Creating a task for ledger {}", ledger);

let ledger_task = LedgerTask::create(*ledger, receiver);
let ledger_task = SlotTask::create(*ledger, receiver);
processed_map.insert(*ledger, ledger_task);

Some(sender)
Expand All @@ -187,7 +191,7 @@ fn create_task_status_sender(
// Remove successful tasks and failed ones (and cannot be retried again) from the map.
#[doc(hidden)]
async fn cleanup_ledger_env_map(
processed_map: &mut HashMap<Ledger, LedgerTask>,
processed_map: &mut HashMap<Ledger, SlotTask>,
ledger_env_map: ArcRwLock<LedgerTxEnvMap>,
) {
// check the tasks if:
Expand All @@ -203,19 +207,20 @@ async fn cleanup_ledger_env_map(
processed_map.retain(|ledger, task| {
match task.status() {
// the task is not yet finished/ hasn't started; let's keep it
LedgerTaskStatus::NotStarted |
SlotTaskStatus::Ready |
// the task failed, but is possible to retry again
LedgerTaskStatus::RecoverableError => true,
SlotTaskStatus::RecoverableError => true,

// the task succeeded
LedgerTaskStatus::ProcessSuccess => {
SlotTaskStatus::Success |
// the task has reached maximum retries, and cannot be executed again.
SlotTaskStatus::ReachedMaxRetries => {
// we cannot process this again, so remove it from the list.
ledger_map.remove(ledger);
false

}
// the task failed and this transaction cannot be executed again
LedgerTaskStatus::Error(e) => {
SlotTaskStatus::Failed(e) => {
tracing::error!("{}",e);
// we cannot process this again, so remove it from the list.
ledger_map.remove(ledger);
Expand All @@ -224,8 +229,6 @@ async fn cleanup_ledger_env_map(

}
});

drop(ledger_map);
}

/// Processes all the issue requests
Expand Down Expand Up @@ -293,12 +296,12 @@ pub async fn execute_issue(
issues: ArcRwLock<IssueRequestsMap>,
oracle_agent: Arc<OracleAgent>,
slot: Slot,
sender: tokio::sync::oneshot::Sender<LedgerTaskStatus>,
sender: tokio::sync::oneshot::Sender<SlotTaskStatus>,
) {
let ledger = match Ledger::try_from(slot) {
Ok(ledger) => ledger,
Err(e) => {
if let Err(e) = sender.send(LedgerTaskStatus::Error(format!("{:?}", e))) {
if let Err(e) = sender.send(SlotTaskStatus::Failed(format!("{:?}", e))) {
tracing::error!("Failed to send {:?} status for slot {}", e, slot);
}
return
Expand All @@ -310,7 +313,7 @@ pub async fn execute_issue(
Ok(proof) => proof,
Err(e) => {
tracing::error!("Failed to get proof for ledger {}: {:?}", ledger, e);
if let Err(e) = sender.send(LedgerTaskStatus::RecoverableError) {
if let Err(e) = sender.send(SlotTaskStatus::RecoverableError) {
tracing::error!("Failed to send {:?} status for slot {}", e, slot);
}
return
Expand Down Expand Up @@ -339,20 +342,20 @@ pub async fn execute_issue(
tracing::debug!("Slot {:?} executed with issue_id: {:?}", ledger, issue_id);
issues.write().await.remove(&issue_id);

if let Err(e) = sender.send(LedgerTaskStatus::ProcessSuccess) {
if let Err(e) = sender.send(SlotTaskStatus::Success) {
tracing::error!("Failed to send {:?} status for slot {}", e, slot);
}
return
},
Err(err) if err.is_issue_completed() => {
tracing::debug!("Issue #{} has been completed", issue_id);
if let Err(e) = sender.send(LedgerTaskStatus::ProcessSuccess) {
if let Err(e) = sender.send(SlotTaskStatus::Success) {
tracing::error!("Failed to send {:?} status for slot {}", e, slot);
}
return
},
Err(e) => {
if let Err(e) = sender.send(LedgerTaskStatus::Error(format!("{:?}", e))) {
if let Err(e) = sender.send(SlotTaskStatus::Failed(format!("{:?}", e))) {
tracing::error!("Failed to send {:?} status for slot {}", e, slot);
}
return
Expand All @@ -361,7 +364,7 @@ pub async fn execute_issue(
}

if let Err(e) =
sender.send(LedgerTaskStatus::Error(format!("Cannot find issue_id for ledger {}", ledger)))
sender.send(SlotTaskStatus::Failed(format!("Cannot find issue_id for ledger {}", ledger)))
{
tracing::error!("Failed to send {:?} status for slot {}", e, slot);
}
Expand Down
4 changes: 3 additions & 1 deletion clients/wallet/src/horizon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ use tokio::{sync::RwLock, time::sleep};
use crate::{
error::Error,
types::{FilterWith, TransactionFilterParam},
Ledger, LedgerTxEnvMap,
LedgerTxEnvMap,
};

pub type PagingToken = u128;
// todo: change to Slot
pub type Ledger = u32;

const POLL_INTERVAL: u64 = 5000;

Expand Down
4 changes: 2 additions & 2 deletions clients/wallet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ mod stellar_wallet;
mod task;
pub mod types;

pub type Ledger = u32;
pub type LedgerTxEnvMap = HashMap<Ledger, TransactionEnvelope>;
pub type Slot = u32;
pub type LedgerTxEnvMap = HashMap<Slot, TransactionEnvelope>;
Loading

0 comments on commit cf7d387

Please sign in to comment.