Skip to content

Commit

Permalink
Handle receiver cancelling an inbound transaction that is later received
Browse files Browse the repository at this point in the history
This PR addresses the following scenario spotted by @stanimal:

 - NodeA sends to nodeB(offline) 
- NodeA goes offline 
- NodeB receives tx, and cancels it (weird I know) 
- NodeA comes online and broadcasts the transaction 
- NodeB is not aware of the transaction, transaction complete for NodeA

This is handled by adding logic that if a FinalizedTransaction is received with no active Receive Protocols that the database is checked if there is a matching cancelled inbound transaction from the same pubkey. If there is the receiver might as well restart that protocol and accept the finalized transaction.

A cucumber test is provided to test this case.
  • Loading branch information
philipr-za committed Aug 11, 2021
1 parent ebe3f44 commit 5beb8e8
Show file tree
Hide file tree
Showing 19 changed files with 556 additions and 126 deletions.
10 changes: 10 additions & 0 deletions applications/tari_app_grpc/proto/wallet.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ service Wallet {
rpc GetNetworkStatus(Empty) returns (NetworkStatusResponse);
// List currently connected peers
rpc ListConnectedPeers(Empty) returns (ListConnectedPeersResponse);
// Cancel pending transaction
rpc CancelTransaction (CancelTransactionRequest) returns (CancelTransactionResponse);
}

message GetVersionRequest { }
Expand Down Expand Up @@ -185,3 +187,11 @@ message ImportUtxosResponse {
repeated uint64 tx_ids = 1;
}

message CancelTransactionRequest {
uint64 tx_id = 1;
}

message CancelTransactionResponse {
bool is_success = 1;
string failure_message = 2;
}
27 changes: 27 additions & 0 deletions applications/tari_console_wallet/src/grpc/wallet_grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,33 @@ impl wallet_server::Wallet for WalletGrpcServer {

Ok(Response::new(resp))
}

async fn cancel_transaction(
&self,
request: Request<tari_rpc::CancelTransactionRequest>,
) -> Result<Response<tari_rpc::CancelTransactionResponse>, Status> {
let message = request.into_inner();
debug!(
target: LOG_TARGET,
"Incoming gRPC request to Cancel Transaction (TxId: {})", message.tx_id,
);
let mut transaction_service = self.get_transaction_service();

match transaction_service.cancel_transaction(message.tx_id).await {
Ok(_) => {
return Ok(Response::new(tari_rpc::CancelTransactionResponse {
is_success: true,
failure_message: "".to_string(),
}))
},
Err(e) => {
return Ok(Response::new(tari_rpc::CancelTransactionResponse {
is_success: false,
failure_message: e.to_string(),
}))
},
}
}
}

fn convert_wallet_transaction_into_transaction_info(
Expand Down
14 changes: 14 additions & 0 deletions base_layer/wallet/src/output_manager_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub enum OutputManagerRequest {
ScanForRecoverableOutputs(Vec<TransactionOutput>),
ScanOutputs(Vec<TransactionOutput>),
AddKnownOneSidedPaymentScript(KnownOneSidedPaymentScript),
ReinstateCancelledInboundTx(TxId),
}

impl fmt::Display for OutputManagerRequest {
Expand Down Expand Up @@ -115,6 +116,7 @@ impl fmt::Display for OutputManagerRequest {
ScanForRecoverableOutputs(_) => write!(f, "ScanForRecoverableOutputs"),
ScanOutputs(_) => write!(f, "ScanRewindAndImportOutputs"),
AddKnownOneSidedPaymentScript(_) => write!(f, "AddKnownOneSidedPaymentScript"),
ReinstateCancelledInboundTx(_) => write!(f, "ReinstateCancelledInboundTx"),
}
}
}
Expand Down Expand Up @@ -149,6 +151,7 @@ pub enum OutputManagerResponse {
RewoundOutputs(Vec<UnblindedOutput>),
ScanOutputs(Vec<UnblindedOutput>),
AddKnownOneSidedPaymentScript,
ReinstatedCancelledInboundTx,
}

pub type OutputManagerEventSender = broadcast::Sender<Arc<OutputManagerEvent>>;
Expand Down Expand Up @@ -545,4 +548,15 @@ impl OutputManagerHandle {
_ => Err(OutputManagerError::UnexpectedApiResponse),
}
}

pub async fn reinstate_cancelled_inbound_transaction(&mut self, tx_id: TxId) -> Result<(), OutputManagerError> {
match self
.handle
.call(OutputManagerRequest::ReinstateCancelledInboundTx(tx_id))
.await??
{
OutputManagerResponse::ReinstatedCancelledInboundTx => Ok(()),
_ => Err(OutputManagerError::UnexpectedApiResponse),
}
}
}
26 changes: 26 additions & 0 deletions base_layer/wallet/src/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::{
types::{HashDigest, ValidationRetryStrategy},
};
use blake2::Digest;
use chrono::Utc;
use diesel::result::{DatabaseErrorKind, Error as DieselError};
use futures::{pin_mut, StreamExt};
use log::*;
Expand Down Expand Up @@ -343,6 +344,10 @@ where TBackend: OutputManagerBackend + 'static
.add_known_script(known_script)
.await
.map(|_| OutputManagerResponse::AddKnownOneSidedPaymentScript),
OutputManagerRequest::ReinstateCancelledInboundTx(tx_id) => self
.reinstate_cancelled_inbound_transaction(tx_id)
.await
.map(|_| OutputManagerResponse::ReinstatedCancelledInboundTx),
}
}

Expand Down Expand Up @@ -896,6 +901,27 @@ where TBackend: OutputManagerBackend + 'static
Ok(self.resources.db.cancel_pending_transaction_outputs(tx_id).await?)
}

/// Restore the pending transaction encumberance and output for an inbound transaction that was previously
/// cancelled.
async fn reinstate_cancelled_inbound_transaction(&mut self, tx_id: TxId) -> Result<(), OutputManagerError> {
self.resources.db.reinstate_inbound_output(tx_id).await?;

self.resources
.db
.add_pending_transaction_outputs(PendingTransactionOutputs {
tx_id,
outputs_to_be_spent: Vec::new(),
outputs_to_be_received: Vec::new(),
timestamp: Utc::now().naive_utc(),
coinbase_block_height: None,
})
.await?;

self.confirm_encumberance(tx_id).await?;

Ok(())
}

/// Go through the pending transaction and if any have existed longer than the specified duration, cancel them
async fn timeout_pending_transactions(&mut self, period: Duration) -> Result<(), OutputManagerError> {
Ok(self.resources.db.timeout_pending_transaction_outputs(period).await?)
Expand Down
49 changes: 48 additions & 1 deletion base_layer/wallet/src/output_manager_service/storage/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
use crate::output_manager_service::{
error::OutputManagerStorageError,
service::Balance,
storage::models::{DbUnblindedOutput, KnownOneSidedPaymentScript},
storage::models::{DbUnblindedOutput, KnownOneSidedPaymentScript, OutputStatus},
TxId,
};
use aes_gcm::Aes256Gcm;
Expand Down Expand Up @@ -135,6 +135,7 @@ pub enum DbKey {
KeyManagerState,
InvalidOutputs,
KnownOneSidedPaymentScripts,
OutputsByTxIdAndStatus(TxId, OutputStatus),
}

#[derive(Debug)]
Expand All @@ -149,6 +150,7 @@ pub enum DbValue {
KeyManagerState(KeyManagerState),
KnownOneSidedPaymentScripts(Vec<KnownOneSidedPaymentScript>),
AnyOutput(Box<DbUnblindedOutput>),
AnyOutputs(Vec<DbUnblindedOutput>),
}

pub enum DbKeyValuePair {
Expand All @@ -158,6 +160,7 @@ pub enum DbKeyValuePair {
PendingTransactionOutputs(TxId, Box<PendingTransactionOutputs>),
KeyManagerState(KeyManagerState),
KnownOneSidedPaymentScripts(KnownOneSidedPaymentScript),
UpdateOutputStatus(Commitment, OutputStatus),
}

pub enum WriteOperation {
Expand Down Expand Up @@ -713,6 +716,48 @@ where T: OutputManagerBackend + 'static
.map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??;
Ok(())
}

/// Check if a single cancelled inbound output exists that matches this TxID, if it does then return its status to
/// EncumberedToBeReceived
pub async fn reinstate_inbound_output(&self, tx_id: TxId) -> Result<(), OutputManagerStorageError> {
let db_clone = self.db.clone();
let outputs = tokio::task::spawn_blocking(move || {
match db_clone.fetch(&DbKey::OutputsByTxIdAndStatus(tx_id, OutputStatus::CancelledInbound)) {
Ok(None) => Err(OutputManagerStorageError::ValueNotFound),
Ok(Some(DbValue::AnyOutputs(o))) => Ok(o),
Ok(Some(other)) => unexpected_result(
DbKey::OutputsByTxIdAndStatus(tx_id, OutputStatus::CancelledInbound),
other,
),
Err(e) => log_error(DbKey::OutputsByTxIdAndStatus(tx_id, OutputStatus::CancelledInbound), e),
}
})
.await
.map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))
.and_then(|inner_result| inner_result)?;

if outputs.len() != 1 {
return Err(OutputManagerStorageError::UnexpectedResult(
"There should be only 1 output for a cancelled inbound transaction but more were found".to_string(),
));
}
let db_clone2 = self.db.clone();

tokio::task::spawn_blocking(move || {
db_clone2.write(WriteOperation::Insert(DbKeyValuePair::UpdateOutputStatus(
outputs
.first()
.expect("Must be only one element in outputs")
.commitment
.clone(),
OutputStatus::EncumberedToBeReceived,
)))
})
.await
.map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??;

Ok(())
}
}

fn unexpected_result<T>(req: DbKey, res: DbValue) -> Result<T, OutputManagerStorageError> {
Expand All @@ -737,6 +782,7 @@ impl Display for DbKey {
DbKey::TimeLockedUnspentOutputs(_t) => f.write_str(&"Timelocked Outputs"),
DbKey::KnownOneSidedPaymentScripts => f.write_str(&"Known claiming scripts"),
DbKey::AnyOutputByCommitment(_) => f.write_str(&"AnyOutputByCommitment"),
DbKey::OutputsByTxIdAndStatus(_, _) => f.write_str(&"OutputsByTxIdAndStatus"),
}
}
}
Expand All @@ -754,6 +800,7 @@ impl Display for DbValue {
DbValue::InvalidOutputs(_) => f.write_str("Invalid Outputs"),
DbValue::KnownOneSidedPaymentScripts(_) => f.write_str(&"Known claiming scripts"),
DbValue::AnyOutput(_) => f.write_str(&"Any Output"),
DbValue::AnyOutputs(_) => f.write_str(&"Any Outputs"),
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions base_layer/wallet/src/output_manager_service/storage/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,14 @@ impl PartialEq for KnownOneSidedPaymentScript {
self.script_hash == other.script_hash
}
}

/// The status of a given output
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum OutputStatus {
Unspent,
Spent,
EncumberedToBeReceived,
EncumberedToBeSpent,
Invalid,
CancelledInbound,
}
54 changes: 42 additions & 12 deletions base_layer/wallet/src/output_manager_service/storage/sqlite_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
PendingTransactionOutputs,
WriteOperation,
},
models::{DbUnblindedOutput, KnownOneSidedPaymentScript},
models::{DbUnblindedOutput, KnownOneSidedPaymentScript, OutputStatus},
},
TxId,
},
Expand Down Expand Up @@ -105,6 +105,7 @@ impl OutputManagerSqliteDatabase {
}
}
impl OutputManagerBackend for OutputManagerSqliteDatabase {
#[allow(clippy::cognitive_complexity)]
fn fetch(&self, key: &DbKey) -> Result<Option<DbValue>, OutputManagerStorageError> {
let conn = self.database_connection.acquire_lock();

Expand Down Expand Up @@ -135,6 +136,7 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
None
},
},

DbKey::AnyOutputByCommitment(commitment) => {
match OutputSql::find_by_commitment(&commitment.to_vec(), &(*conn)) {
Ok(mut o) => {
Expand Down Expand Up @@ -173,6 +175,18 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
None
},
},
DbKey::OutputsByTxIdAndStatus(tx_id, status) => {
let mut outputs = OutputSql::find_by_tx_id_and_status(*tx_id, *status, &(*conn))?;
for o in outputs.iter_mut() {
self.decrypt_if_necessary(o)?;
}
Some(DbValue::AnyOutputs(
outputs
.iter()
.map(|o| DbUnblindedOutput::try_from(o.clone()))
.collect::<Result<Vec<_>, _>>()?,
))
},
DbKey::UnspentOutputs => {
let mut outputs = OutputSql::index_status(OutputStatus::Unspent, &(*conn))?;
for o in outputs.iter_mut() {
Expand Down Expand Up @@ -273,6 +287,7 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
Ok(result)
}

#[allow(clippy::cognitive_complexity)]
fn write(&self, op: WriteOperation) -> Result<Option<DbValue>, OutputManagerStorageError> {
let conn = self.database_connection.acquire_lock();

Expand Down Expand Up @@ -337,6 +352,20 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
self.encrypt_if_necessary(&mut script_sql)?;
script_sql.commit(&(*conn))?
},
DbKeyValuePair::UpdateOutputStatus(commitment, status) => {
let output = OutputSql::find_by_commitment(&commitment.to_vec(), &(*conn))?;
output.update(
UpdateOutput {
status: Some(status),
tx_id: None,
spending_key: None,
script_private_key: None,
metadata_signature_nonce: None,
metadata_signature_u_key: None,
},
&(*conn),
)?;
},
},
WriteOperation::Remove(k) => match k {
DbKey::SpentOutput(s) => match OutputSql::find_status(&s.to_vec(), OutputStatus::Spent, &(*conn)) {
Expand Down Expand Up @@ -409,6 +438,7 @@ impl OutputManagerBackend for OutputManagerSqliteDatabase {
DbKey::InvalidOutputs => return Err(OutputManagerStorageError::OperationNotSupported),
DbKey::TimeLockedUnspentOutputs(_) => return Err(OutputManagerStorageError::OperationNotSupported),
DbKey::KnownOneSidedPaymentScripts => return Err(OutputManagerStorageError::OperationNotSupported),
DbKey::OutputsByTxIdAndStatus(_, _) => return Err(OutputManagerStorageError::OperationNotSupported),
},
}

Expand Down Expand Up @@ -840,17 +870,6 @@ fn pending_transaction_outputs_from_sql_outputs(
})
}

/// The status of a given output
#[derive(PartialEq)]
enum OutputStatus {
Unspent,
Spent,
EncumberedToBeReceived,
EncumberedToBeSpent,
Invalid,
CancelledInbound,
}

impl TryFrom<i32> for OutputStatus {
type Error = OutputManagerStorageError;

Expand Down Expand Up @@ -1011,6 +1030,17 @@ impl OutputSql {
Ok(request.first::<OutputSql>(conn)?)
}

pub fn find_by_tx_id_and_status(
tx_id: TxId,
status: OutputStatus,
conn: &SqliteConnection,
) -> Result<Vec<OutputSql>, OutputManagerStorageError> {
Ok(outputs::table
.filter(outputs::tx_id.eq(Some(tx_id as i64)))
.filter(outputs::status.eq(status as i32))
.load(conn)?)
}

/// Find outputs via tx_id that are encumbered. Any outputs that are encumbered cannot be marked as spent.
pub fn find_by_tx_id_and_encumbered(
tx_id: TxId,
Expand Down
Loading

0 comments on commit 5beb8e8

Please sign in to comment.