Skip to content

Commit

Permalink
Add less aggressive txn cancelling
Browse files Browse the repository at this point in the history
Added less aggressive transaction cancelling when a wallet has difficulty
discovering the receiving party (for direct P2P comms) or base node peers
that could be used to send the transaction via store and forward.
  • Loading branch information
hansieodendaal committed Mar 19, 2022
1 parent 7145201 commit 9bf72fb
Show file tree
Hide file tree
Showing 21 changed files with 755 additions and 1,044 deletions.
2 changes: 2 additions & 0 deletions applications/tari_app_grpc/proto/wallet.proto
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ enum TransactionStatus {
TRANSACTION_STATUS_FAUX_UNCONFIRMED = 9;
// All Imported and FauxUnconfirmed transactions will end up with this status when the outputs have been confirmed
TRANSACTION_STATUS_FAUX_CONFIRMED = 10;
// This transaction is still being queued for sending
TRANSACTION_STATUS_QUEUED = 11;
}

message GetCompletedTransactionsRequest { }
Expand Down
1 change: 1 addition & 0 deletions applications/tari_app_grpc/src/conversions/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ impl From<TransactionStatus> for grpc::TransactionStatus {
Rejected => grpc::TransactionStatus::Rejected,
FauxUnconfirmed => grpc::TransactionStatus::FauxUnconfirmed,
FauxConfirmed => grpc::TransactionStatus::FauxConfirmed,
Queued => grpc::TransactionStatus::Queued,
}
}
}
Expand Down
18 changes: 14 additions & 4 deletions applications/tari_console_wallet/src/notifier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use tokio::runtime::Handle;
pub const LOG_TARGET: &str = "wallet::notifier";
const RECEIVED: &str = "received";
const SENT: &str = "sent";
const QUEUED: &str = "queued";
const CONFIRMATION: &str = "confirmation";
const MINED: &str = "mined";
const CANCELLED: &str = "cancelled";
Expand Down Expand Up @@ -132,9 +133,18 @@ impl Notifier {
}
}

/// Trigger a notification that a pending transaction was sent.
pub fn transaction_sent(&self, tx_id: TxId) {
debug!(target: LOG_TARGET, "transaction_sent tx_id: {}", tx_id);
/// Trigger a notification that a pending transaction was sent or queued.
pub fn transaction_sent_or_queued(&self, tx_id: TxId, is_sent: bool) {
let event = if is_sent {
debug!(target: LOG_TARGET, "Transaction sent tx_id: {}", tx_id);
SENT
} else {
debug!(
target: LOG_TARGET,
"Transaction queued for further retry sending tx_id: {}", tx_id
);
QUEUED
};

if let Some(program) = self.path.clone() {
let mut transaction_service = self.wallet.transaction_service.clone();
Expand All @@ -143,7 +153,7 @@ impl Notifier {
match transaction_service.get_pending_outbound_transactions().await {
Ok(txs) => {
if let Some(tx) = txs.get(&tx_id) {
let args = args_from_outbound(tx, SENT);
let args = args_from_outbound(tx, event);
let result = Command::new(program).args(&args).output();
log(result);
} else {
Expand Down
25 changes: 25 additions & 0 deletions applications/tari_console_wallet/src/ui/components/send_tab.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use log::*;
use tari_core::transactions::tari_amount::MicroTari;
use tari_utilities::hex::Hex;
use tari_wallet::tokens::Token;
Expand All @@ -18,6 +19,8 @@ use crate::ui::{
widgets::{draw_dialog, WindowedListState},
};

const LOG_TARGET: &str = "wallet::console_wallet::send_tab ";

pub struct SendTab {
balance: Balance,
send_input_mode: SendInputMode,
Expand All @@ -28,6 +31,7 @@ pub struct SendTab {
message_field: String,
error_message: Option<String>,
success_message: Option<String>,
offline_message: Option<String>,
contacts_list_state: WindowedListState,
send_result_watch: Option<watch::Receiver<UiTransactionSendStatus>>,
confirmation_dialog: Option<ConfirmationDialogType>,
Expand All @@ -47,6 +51,7 @@ impl SendTab {
message_field: String::new(),
error_message: None,
success_message: None,
offline_message: None,
contacts_list_state: WindowedListState::new(),
send_result_watch: None,
confirmation_dialog: None,
Expand Down Expand Up @@ -448,6 +453,7 @@ impl<B: Backend> Component<B> for SendTab {

let rx_option = self.send_result_watch.take();
if let Some(rx) = rx_option {
trace!(target: LOG_TARGET, "{:?}", (*rx.borrow()).clone());
let status = match (*rx.borrow()).clone() {
UiTransactionSendStatus::Initiated => "Initiated",
UiTransactionSendStatus::DiscoveryInProgress => "Discovery In Progress",
Expand All @@ -460,6 +466,14 @@ impl<B: Backend> Component<B> for SendTab {
Some("Transaction successfully sent!\nPlease press Enter to continue".to_string());
return;
},
UiTransactionSendStatus::Queued => {
self.offline_message = Some(
"This wallet appears to be offline; transaction queued for further retry sending.\n Please \
press Enter to continue"
.to_string(),
);
return;
},
UiTransactionSendStatus::TransactionComplete => {
self.success_message =
Some("Transaction completed successfully!\nPlease press Enter to continue".to_string());
Expand All @@ -482,6 +496,10 @@ impl<B: Backend> Component<B> for SendTab {
draw_dialog(f, area, "Success!".to_string(), msg, Color::Green, 120, 9);
}

if let Some(msg) = self.offline_message.clone() {
draw_dialog(f, area, "Offline!".to_string(), msg, Color::Green, 120, 9);
}

if let Some(msg) = self.error_message.clone() {
draw_dialog(f, area, "Error!".to_string(), msg, Color::Red, 120, 9);
}
Expand Down Expand Up @@ -528,6 +546,13 @@ impl<B: Backend> Component<B> for SendTab {
return;
}

if self.offline_message.is_some() {
if '\n' == c {
self.offline_message = None;
}
return;
}

if self.send_result_watch.is_some() {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,9 @@ impl<B: Backend> Component<B> for TransactionsTab {

match c {
'p' => {
if let Err(e) = Handle::current().block_on(app_state.restart_transaction_protocols()) {
error!(target: LOG_TARGET, "Error rebroadcasting transactions: {}", e);
}
self.completed_list_state.select(None);
self.selected_tx_list = SelectedTransactionList::PendingTxs;
self.pending_list_state.set_num_items(app_state.get_pending_txs().len());
Expand Down
23 changes: 22 additions & 1 deletion applications/tari_console_wallet/src/ui/state/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,13 @@ impl AppState {
Ok(())
}

pub async fn restart_transaction_protocols(&mut self) -> Result<(), UiError> {
let inner = self.inner.write().await;
let mut tx_service = inner.wallet.transaction_service.clone();
tx_service.restart_transaction_protocols().await?;
Ok(())
}

pub fn get_identity(&self) -> &MyIdentity {
&self.cached_data.my_identity
}
Expand Down Expand Up @@ -857,6 +864,7 @@ impl AppStateInner {
)
.await?;

self.spawn_restart_transaction_protocols_task();
self.spawn_transaction_revalidation_task();

self.data.base_node_previous = self.data.base_node_selected.clone();
Expand All @@ -881,6 +889,7 @@ impl AppStateInner {
)
.await?;

self.spawn_restart_transaction_protocols_task();
self.spawn_transaction_revalidation_task();

self.data.base_node_previous = self.data.base_node_selected.clone();
Expand Down Expand Up @@ -922,6 +931,7 @@ impl AppStateInner {
)
.await?;

self.spawn_restart_transaction_protocols_task();
self.spawn_transaction_revalidation_task();

self.data.base_node_peer_custom = None;
Expand Down Expand Up @@ -956,6 +966,16 @@ impl AppStateInner {
});
}

pub fn spawn_restart_transaction_protocols_task(&mut self) {
let mut txn_service = self.wallet.transaction_service.clone();

task::spawn(async move {
if let Err(e) = txn_service.restart_transaction_protocols().await {
error!(target: LOG_TARGET, "Problem restarting transaction protocols: {}", e);
}
});
}

pub fn add_notification(&mut self, notification: String) {
self.data.notifications.push((Local::now(), notification));
self.data.new_notification_count += 1;
Expand Down Expand Up @@ -1173,9 +1193,10 @@ pub struct MyIdentity {
pub node_id: String,
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub enum UiTransactionSendStatus {
Initiated,
Queued,
SentDirect,
TransactionComplete,
DiscoveryInProgress,
Expand Down
98 changes: 66 additions & 32 deletions applications/tari_console_wallet/src/ui/state/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,23 @@ pub async fn send_transaction_task(
) {
let _ = result_tx.send(UiTransactionSendStatus::Initiated);
let mut event_stream = transaction_service_handle.get_event_stream();
let mut send_direct_received_result = (false, false);
let mut send_saf_received_result = (false, false);
#[derive(Debug)]
struct SendStatus {
event_received: bool,
result: bool,
}
let mut send_direct = SendStatus {
event_received: false,
result: false,
};
let mut send_saf = SendStatus {
event_received: false,
result: false,
};
let mut queued = SendStatus {
event_received: false,
result: false,
};
match transaction_service_handle
.send_transaction_or_token(public_key, amount, unique_id, parent_public_key, fee_per_gram, message)
.await
Expand All @@ -53,36 +68,53 @@ pub async fn send_transaction_task(
},
Ok(our_tx_id) => {
loop {
match event_stream.recv().await {
Ok(event) => match &*event {
TransactionEvent::TransactionDiscoveryInProgress(tx_id) => {
if our_tx_id == *tx_id {
let _ = result_tx.send(UiTransactionSendStatus::DiscoveryInProgress);
}
},
TransactionEvent::TransactionDirectSendResult(tx_id, result) => {
if our_tx_id == *tx_id {
send_direct_received_result = (true, *result);
if send_saf_received_result.0 {
break;
let next_event = event_stream.recv().await;
match next_event {
Ok(event) => {
match &*event {
TransactionEvent::TransactionDiscoveryInProgress(tx_id) => {
if our_tx_id == *tx_id {
let _ = result_tx.send(UiTransactionSendStatus::DiscoveryInProgress);
}
}
},
TransactionEvent::TransactionStoreForwardSendResult(tx_id, result) => {
if our_tx_id == *tx_id {
send_saf_received_result = (true, *result);
if send_direct_received_result.0 {
break;
},
TransactionEvent::TransactionQueuedForRetrySending(tx_id, status) => {
if our_tx_id == *tx_id {
queued.event_received = true;
queued.result = *status;
// Only break if direct send and SAF send events are received as well
if send_direct.event_received && send_saf.event_received {
break;
}
}
}
},
TransactionEvent::TransactionCompletedImmediately(tx_id) => {
if our_tx_id == *tx_id {
let _ = result_tx.send(UiTransactionSendStatus::TransactionComplete);
return;
}
},
_ => (),
},
TransactionEvent::TransactionDirectSendResult(tx_id, result) => {
if our_tx_id == *tx_id {
send_direct.event_received = true;
send_direct.result = *result;
// Only break if SAF send and queued events are received as well
if send_saf.event_received && queued.event_received {
break;
}
}
},
TransactionEvent::TransactionStoreForwardSendResult(tx_id, result) => {
if our_tx_id == *tx_id {
send_saf.event_received = true;
send_saf.result = *result;
// Only break if direct send and queued events are received as well
if send_direct.event_received && queued.event_received {
break;
}
}
},
TransactionEvent::TransactionCompletedImmediately(tx_id) => {
if our_tx_id == *tx_id {
let _ = result_tx.send(UiTransactionSendStatus::TransactionComplete);
return;
}
},
_ => (),
}
},
Err(e @ broadcast::error::RecvError::Lagged(_)) => {
log::warn!(target: LOG_TARGET, "Error reading from event broadcast channel {:?}", e);
Expand All @@ -94,10 +126,12 @@ pub async fn send_transaction_task(
}
}

if send_direct_received_result.1 {
if send_direct.result {
let _ = result_tx.send(UiTransactionSendStatus::SentDirect);
} else if send_saf_received_result.1 {
} else if send_saf.result {
let _ = result_tx.send(UiTransactionSendStatus::SentViaSaf);
} else if queued.result {
let _ = result_tx.send(UiTransactionSendStatus::Queued);
} else {
let _ = result_tx.send(UiTransactionSendStatus::Error(
"Transaction could not be sent".to_string(),
Expand Down
Loading

0 comments on commit 9bf72fb

Please sign in to comment.