Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rpc): Return verification errors from sendrawtransaction RPC method #8788

Merged
merged 8 commits into from
Aug 30, 2024
4 changes: 2 additions & 2 deletions zebra-node-services/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ rpc-client = [
"serde_json",
]

shielded-scan = ["tokio"]
shielded-scan = []

[dependencies]
zebra-chain = { path = "../zebra-chain" , version = "1.0.0-beta.39" }
Expand All @@ -48,7 +48,7 @@ jsonrpc-core = { version = "18.0.0", optional = true }
reqwest = { version = "0.11.26", default-features = false, features = ["rustls-tls"], optional = true }
serde = { version = "1.0.204", optional = true }
serde_json = { version = "1.0.122", optional = true }
tokio = { version = "1.39.2", features = ["time"], optional = true }
tokio = { version = "1.39.2", features = ["time", "sync"] }

[dev-dependencies]

Expand Down
9 changes: 4 additions & 5 deletions zebra-node-services/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use std::collections::HashSet;

use tokio::sync::oneshot;
use zebra_chain::transaction::{self, UnminedTx, UnminedTxId};

#[cfg(feature = "getblocktemplate-rpcs")]
Expand Down Expand Up @@ -114,13 +115,11 @@ pub enum Response {
/// Returns matching cached rejected [`UnminedTxId`]s from the mempool,
RejectedTransactionIds(HashSet<UnminedTxId>),

/// Returns a list of queue results.
///
/// These are the results of the initial queue checks.
/// The transaction may also fail download or verification later.
/// Returns a list of initial queue checks results and a oneshot receiver
/// for awaiting download and/or verification results.
///
/// Each result matches the request at the corresponding vector index.
Queued(Vec<Result<(), BoxError>>),
Queued(Vec<Result<oneshot::Receiver<Result<(), BoxError>>, BoxError>>),

/// Confirms that the mempool has checked for recently verified transactions.
CheckedForVerifiedTransactions,
Expand Down
15 changes: 11 additions & 4 deletions zebra-rpc/src/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ where

let response = mempool.oneshot(request).await.map_server_error()?;

let queue_results = match response {
let mut queue_results = match response {
mempool::Response::Queued(results) => results,
_ => unreachable!("incorrect response variant from mempool service"),
};
Expand All @@ -675,10 +675,17 @@ where
"mempool service returned more results than expected"
);

tracing::debug!("sent transaction to mempool: {:?}", &queue_results[0]);
let queue_result = queue_results
.pop()
.expect("there should be exactly one item in Vec")
.inspect_err(|err| tracing::debug!("sent transaction to mempool: {:?}", &err))
.map_server_error()?
.await;

tracing::debug!("sent transaction to mempool: {:?}", &queue_result);

queue_results[0]
.as_ref()
queue_result
.map_server_error()?
.map(|_| SentTransactionHash(transaction_hash))
.map_server_error()
}
Expand Down
43 changes: 38 additions & 5 deletions zebra-rpc/src/methods/tests/prop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use hex::ToHex;
use jsonrpc_core::{Error, ErrorCode};
use proptest::{collection::vec, prelude::*};
use thiserror::Error;
use tokio::sync::oneshot;
use tower::buffer::Buffer;

use zebra_chain::{
Expand Down Expand Up @@ -61,7 +62,9 @@ proptest! {

let unmined_transaction = UnminedTx::from(transaction);
let expected_request = mempool::Request::Queue(vec![unmined_transaction.into()]);
let response = mempool::Response::Queued(vec![Ok(())]);
let (rsp_tx, rsp_rx) = oneshot::channel();
let _ = rsp_tx.send(Ok(()));
let response = mempool::Response::Queued(vec![Ok(rsp_rx)]);

mempool
.expect_request(expected_request)
Expand Down Expand Up @@ -111,10 +114,10 @@ proptest! {
.expect("Transaction serializes successfully");
let transaction_hex = hex::encode(&transaction_bytes);

let send_task = tokio::spawn(rpc.send_raw_transaction(transaction_hex));
let send_task = tokio::spawn(rpc.send_raw_transaction(transaction_hex.clone()));

let unmined_transaction = UnminedTx::from(transaction);
let expected_request = mempool::Request::Queue(vec![unmined_transaction.into()]);
let expected_request = mempool::Request::Queue(vec![unmined_transaction.clone().into()]);

mempool
.expect_request(expected_request)
Expand All @@ -138,6 +141,32 @@ proptest! {
"Result is not a server error: {result:?}"
);

let send_task = tokio::spawn(rpc.send_raw_transaction(transaction_hex));

let expected_request = mempool::Request::Queue(vec![unmined_transaction.clone().into()]);

let (rsp_tx, rsp_rx) = oneshot::channel();
let _ = rsp_tx.send(Err("any verification error".into()));
mempool
.expect_request(expected_request)
.await?
.respond(Ok::<_, BoxError>(mempool::Response::Queued(vec![Ok(rsp_rx)])));

let result = send_task
.await
.expect("Sending raw transactions should not panic");

prop_assert!(
matches!(
result,
Err(Error {
code: ErrorCode::ServerError(_),
..
})
),
"Result is not a server error: {result:?}"
);

// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
prop_assert!(rpc_tx_queue_task_result.is_none());
Expand Down Expand Up @@ -897,7 +926,9 @@ proptest! {
// now a retry will be sent to the mempool
let expected_request =
mempool::Request::Queue(vec![mempool::Gossip::Tx(tx_unmined.clone())]);
let response = mempool::Response::Queued(vec![Ok(())]);
let (rsp_tx, rsp_rx) = oneshot::channel();
let _ = rsp_tx.send(Ok(()));
let response = mempool::Response::Queued(vec![Ok(rsp_rx)]);

mempool
.expect_request(expected_request)
Expand Down Expand Up @@ -997,7 +1028,9 @@ proptest! {
for tx in txs.clone() {
let expected_request =
mempool::Request::Queue(vec![mempool::Gossip::Tx(UnminedTx::from(tx))]);
let response = mempool::Response::Queued(vec![Ok(())]);
let (rsp_tx, rsp_rx) = oneshot::channel();
let _ = rsp_tx.send(Ok(()));
let response = mempool::Response::Queued(vec![Ok(rsp_rx)]);

mempool
.expect_request(expected_request)
Expand Down
10 changes: 7 additions & 3 deletions zebra-rpc/src/queue/tests/prop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{collections::HashSet, env, sync::Arc};
use proptest::prelude::*;

use chrono::Duration;
use tokio::time;
use tokio::{sync::oneshot, time};
use tower::ServiceExt;

use zebra_chain::{
Expand Down Expand Up @@ -196,7 +196,9 @@ proptest! {
let request = Request::Queue(vec![Gossip::Tx(unmined_transaction.clone())]);
let expected_request = Request::Queue(vec![Gossip::Tx(unmined_transaction.clone())]);
let send_task = tokio::spawn(mempool.clone().oneshot(request));
let response = Response::Queued(vec![Ok(())]);
let (rsp_tx, rsp_rx) = oneshot::channel();
let _ = rsp_tx.send(Ok(()));
let response = Response::Queued(vec![Ok(rsp_rx)]);

mempool
.expect_request(expected_request)
Expand Down Expand Up @@ -337,7 +339,9 @@ proptest! {
// retry will queue the transaction to mempool
let gossip = Gossip::Tx(UnminedTx::from(transaction.clone()));
let expected_request = Request::Queue(vec![gossip]);
let response = Response::Queued(vec![Ok(())]);
let (rsp_tx, rsp_rx) = oneshot::channel();
let _ = rsp_tx.send(Ok(()));
let response = Response::Queued(vec![Ok(rsp_rx)]);

mempool
.expect_request(expected_request)
Expand Down
36 changes: 22 additions & 14 deletions zebrad/src/components/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::{
};

use futures::{future::FutureExt, stream::Stream};
use tokio::sync::broadcast;
use tokio::sync::{broadcast, oneshot};
use tokio_stream::StreamExt;
use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service};

Expand Down Expand Up @@ -560,7 +560,7 @@ impl Service<Request> for Mempool {
for tx in tx_retries {
// This is just an efficiency optimisation, so we don't care if queueing
// transaction requests fails.
let _result = tx_downloads.download_if_needed_and_verify(tx);
let _result = tx_downloads.download_if_needed_and_verify(tx, None);
}
}

Expand Down Expand Up @@ -608,8 +608,8 @@ impl Service<Request> for Mempool {
tracing::trace!("chain grew during tx verification, retrying ..",);

// We don't care if re-queueing the transaction request fails.
let _result =
tx_downloads.download_if_needed_and_verify(tx.transaction.into());
let _result = tx_downloads
.download_if_needed_and_verify(tx.transaction.into(), None);
}
}
Ok(Err((txid, error))) => {
Expand Down Expand Up @@ -758,16 +758,24 @@ impl Service<Request> for Mempool {
Request::Queue(gossiped_txs) => {
trace!(req_count = ?gossiped_txs.len(), "got mempool Queue request");

let rsp: Vec<Result<(), BoxError>> = gossiped_txs
.into_iter()
.map(|gossiped_tx| -> Result<(), MempoolError> {
storage.should_download_or_verify(gossiped_tx.id())?;
tx_downloads.download_if_needed_and_verify(gossiped_tx)?;

Ok(())
})
.map(|result| result.map_err(BoxError::from))
.collect();
let rsp: Vec<Result<oneshot::Receiver<Result<(), BoxError>>, BoxError>> =
gossiped_txs
.into_iter()
.map(
|gossiped_tx| -> Result<
oneshot::Receiver<Result<(), BoxError>>,
MempoolError,
> {
let (rsp_tx, rsp_rx) = oneshot::channel();
storage.should_download_or_verify(gossiped_tx.id())?;
tx_downloads
.download_if_needed_and_verify(gossiped_tx, Some(rsp_tx))?;

Ok(rsp_rx)
},
)
.map(|result| result.map_err(BoxError::from))
.collect();

// We've added transactions to the queue
self.update_metrics();
Expand Down
14 changes: 11 additions & 3 deletions zebrad/src/components/mempool/crawler/tests/prop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use proptest::{
collection::{hash_set, vec},
prelude::*,
};
use tokio::time;
use tokio::{sync::oneshot, time};

use zebra_chain::{
chain_sync_status::ChainSyncStatus, parameters::Network, transaction::UnminedTxId,
Expand Down Expand Up @@ -317,9 +317,17 @@ async fn respond_to_queue_request(
expected_transaction_ids: HashSet<UnminedTxId>,
response: impl IntoIterator<Item = Result<(), MempoolError>>,
) -> Result<(), TestCaseError> {
let response = response
let response: Vec<Result<oneshot::Receiver<Result<(), BoxError>>, BoxError>> = response
.into_iter()
.map(|result| result.map_err(BoxError::from))
.map(|result| {
result
.map(|_| {
let (rsp_tx, rsp_rx) = oneshot::channel();
let _ = rsp_tx.send(Ok(()));
rsp_rx
})
.map_err(BoxError::from)
})
.collect();

mempool
Expand Down
31 changes: 23 additions & 8 deletions zebrad/src/components/mempool/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use zebra_chain::{
use zebra_consensus::transaction as tx;
use zebra_network as zn;
use zebra_node_services::mempool::Gossip;
use zebra_state as zs;
use zebra_state::{self as zs, CloneError};

use crate::components::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT};

Expand Down Expand Up @@ -105,17 +105,17 @@ pub const MAX_INBOUND_CONCURRENCY: usize = 25;
struct CancelDownloadAndVerify;

/// Errors that can occur while downloading and verifying a transaction.
#[derive(Error, Debug)]
#[derive(Error, Debug, Clone)]
#[allow(dead_code)]
pub enum TransactionDownloadVerifyError {
#[error("transaction is already in state")]
InState,

#[error("error in state service")]
StateError(#[source] BoxError),
StateError(#[source] CloneError),

#[error("error downloading transaction")]
DownloadFailed(#[source] BoxError),
DownloadFailed(#[source] CloneError),

#[error("transaction download / verification was cancelled")]
Cancelled,
Expand Down Expand Up @@ -243,6 +243,7 @@ where
pub fn download_if_needed_and_verify(
&mut self,
gossiped_tx: Gossip,
rsp_tx: Option<oneshot::Sender<Result<(), BoxError>>>,
) -> Result<(), MempoolError> {
let txid = gossiped_tx.id();

Expand Down Expand Up @@ -295,7 +296,7 @@ where
Ok((Some(height), next_height))
}
Ok(_) => unreachable!("wrong response"),
Err(e) => Err(TransactionDownloadVerifyError::StateError(e)),
Err(e) => Err(TransactionDownloadVerifyError::StateError(e.into())),
}?;

trace!(?txid, ?next_height, "got next height");
Expand All @@ -307,11 +308,12 @@ where
let tx = match network
.oneshot(req)
.await
.map_err(CloneError::from)
.map_err(TransactionDownloadVerifyError::DownloadFailed)?
{
zn::Response::Transactions(mut txs) => txs.pop().ok_or_else(|| {
TransactionDownloadVerifyError::DownloadFailed(
"no transactions returned".into(),
BoxError::from("no transactions returned").into(),
)
})?,
_ => unreachable!("wrong response to transaction request"),
Expand Down Expand Up @@ -373,15 +375,27 @@ where

let task = tokio::spawn(async move {
// Prefer the cancel handle if both are ready.
tokio::select! {
let result = tokio::select! {
biased;
_ = &mut cancel_rx => {
trace!("task cancelled prior to completion");
metrics::counter!("mempool.cancelled.verify.tasks.total").increment(1);
Err((TransactionDownloadVerifyError::Cancelled, txid))
}
verification = fut => verification,
};

// Send the result to responder channel if one was provided.
if let Some(rsp_tx) = rsp_tx {
let _ = rsp_tx.send(
result
.as_ref()
.map(|_| ())
.map_err(|(err, _)| err.clone().into()),
);
}

result
});

self.pending.push(task);
Expand Down Expand Up @@ -458,14 +472,15 @@ where
match state
.ready()
.await
.map_err(CloneError::from)
.map_err(TransactionDownloadVerifyError::StateError)?
.call(zs::Request::Transaction(txid.mined_id()))
.await
{
Ok(zs::Response::Transaction(None)) => Ok(()),
Ok(zs::Response::Transaction(Some(_))) => Err(TransactionDownloadVerifyError::InState),
Ok(_) => unreachable!("wrong response"),
Err(e) => Err(TransactionDownloadVerifyError::StateError(e)),
Err(e) => Err(TransactionDownloadVerifyError::StateError(e.into())),
}?;

Ok(())
Expand Down
Loading
Loading