Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send crawled transaction IDs to downloader #2801

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
9a3b68a
Rename type parameter to be more explicit
jvff Sep 7, 2021
6e08988
Remove imports for `Request` and `Response`
jvff Sep 8, 2021
0b4a3da
Attach `Mempool` service to the `Crawler`
jvff Sep 8, 2021
0aae289
Forward crawled transactions to downloader
jvff Sep 8, 2021
274f22d
Derive `Eq` and `PartialEq` for `mempool::Request`
jvff Sep 10, 2021
2bd463f
Test if crawled transactions are downloaded
jvff Sep 10, 2021
efba980
Don't send empty transaction ID list to downloader
jvff Sep 10, 2021
d527bae
Log errors when forwarding crawled transaction IDs
jvff Sep 28, 2021
d632c6c
Document existing `mempool::Crawler` test
jvff Sep 29, 2021
b6a367f
Refactor to create `setup_crawler` helper function
jvff Sep 28, 2021
62d9052
Simplify code to expect requests
jvff Sep 28, 2021
86e0af7
Refactor to create `respond_with_transaction_ids`
jvff Sep 29, 2021
cee73eb
Refactor to create `crawler_iterator` helper
jvff Sep 29, 2021
7b93439
Refactor to create `respond_to_queue_request`
jvff Sep 29, 2021
f279ffa
Add `respond_to_queue_request_with_error` helper
jvff Sep 29, 2021
123ac43
Derive `Arbitrary` for `NetworkUpgrade`
jvff Sep 29, 2021
ecf4ec1
Derive `Arbitrary` for `TransactionError`
jvff Sep 29, 2021
23bbb9d
Derive `Arbitrary` for `MempoolError`
jvff Sep 29, 2021
5431a12
Test if errors don't stop the mempool crawler
jvff Sep 29, 2021
bc26a7e
Reduce the log level for download errors
jvff Oct 1, 2021
ddad6d1
Stop crawler if service stops
jvff Oct 1, 2021
ab1b121
Merge branch 'main' into send-crawled-transaction-ids-to-downloader
conradoplg Oct 4, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions zebra-chain/src/parameters/network_upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@ use std::ops::Bound::*;

use chrono::{DateTime, Duration, Utc};

#[cfg(any(test, feature = "proptest-impl"))]
use proptest_derive::Arbitrary;

/// A Zcash network upgrade.
///
/// Network upgrades can change the Zcash network protocol or consensus rules in
/// incompatible ways.
#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq, Serialize, Deserialize)]
#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))]
pub enum NetworkUpgrade {
/// The Zcash protocol for a Genesis block.
///
Expand Down
9 changes: 9 additions & 0 deletions zebra-consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ authors = ["Zcash Foundation <[email protected]>"]
license = "MIT OR Apache-2.0"
edition = "2018"

[features]
default = []
proptest-impl = ["proptest", "proptest-derive", "zebra-chain/proptest-impl"]

[dependencies]
blake2b_simd = "0.5.11"
bellman = "0.10.0"
Expand Down Expand Up @@ -33,8 +37,13 @@ zebra-state = { path = "../zebra-state" }
zebra-script = { path = "../zebra-script" }
wagyu-zcash-parameters = "0.2.0"

proptest = { version = "0.10", optional = true }
proptest-derive = { version = "0.3.0", optional = true }

[dev-dependencies]
color-eyre = "0.5.11"
proptest = "0.10"
proptest-derive = "0.3.0"
rand07 = { package = "rand", version = "0.7" }
spandoc = "0.2"
tokio = { version = "0.3.6", features = ["full"] }
Expand Down
9 changes: 9 additions & 0 deletions zebra-consensus/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use thiserror::Error;

use crate::BoxError;

#[cfg(any(test, feature = "proptest-impl"))]
use proptest_derive::Arbitrary;

#[derive(Error, Copy, Clone, Debug, PartialEq)]
pub enum SubsidyError {
#[error("no coinbase transaction in block")]
Expand All @@ -19,6 +22,7 @@ pub enum SubsidyError {
}

#[derive(Error, Clone, Debug, PartialEq)]
#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))]
pub enum TransactionError {
#[error("first transaction must be coinbase")]
CoinbasePosition,
Expand All @@ -45,6 +49,7 @@ pub enum TransactionError {
CoinbaseInMempool,

#[error("coinbase transaction failed subsidy validation")]
#[cfg_attr(any(test, feature = "proptest-impl"), proptest(skip))]
Subsidy(#[from] SubsidyError),

#[error("transaction version number MUST be >= 4")]
Expand All @@ -63,6 +68,7 @@ pub enum TransactionError {
BadBalance,

#[error("could not verify a transparent script")]
#[cfg_attr(any(test, feature = "proptest-impl"), proptest(skip))]
Script(#[from] zebra_script::Error),

#[error("spend description cv and rk MUST NOT be of small order")]
Expand All @@ -76,12 +82,15 @@ pub enum TransactionError {
#[error(
"Sprout joinSplitSig MUST represent a valid signature under joinSplitPubKey of dataToBeSigned"
)]
#[cfg_attr(any(test, feature = "proptest-impl"), proptest(skip))]
Ed25519(#[from] zebra_chain::primitives::ed25519::Error),

#[error("Sapling bindingSig MUST represent a valid signature under the transaction binding validating key bvk of SigHash")]
#[cfg_attr(any(test, feature = "proptest-impl"), proptest(skip))]
RedJubjub(zebra_chain::primitives::redjubjub::Error),

#[error("Orchard bindingSig MUST represent a valid signature under the transaction binding validating key bvk of SigHash")]
#[cfg_attr(any(test, feature = "proptest-impl"), proptest(skip))]
RedPallas(zebra_chain::primitives::redpallas::Error),

// temporary error type until #1186 is fixed
Expand Down
1 change: 1 addition & 0 deletions zebrad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ proptest = "0.10"
proptest-derive = "0.3"

zebra-chain = { path = "../zebra-chain", features = ["proptest-impl"] }
zebra-consensus = { path = "../zebra-consensus/", features = ["proptest-impl"] }
zebra-state = { path = "../zebra-state", features = ["proptest-impl"] }
zebra-test = { path = "../zebra-test" }

Expand Down
4 changes: 2 additions & 2 deletions zebrad/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,12 @@ impl StartCmd {
let mempool = ServiceBuilder::new().buffer(20).service(mempool_service);

setup_tx
.send((peer_set.clone(), address_book, mempool))
.send((peer_set.clone(), address_book, mempool.clone()))
.map_err(|_| eyre!("could not send setup data to inbound service"))?;

select! {
result = syncer.sync().fuse() => result,
_ = mempool::Crawler::spawn(peer_set, sync_status).fuse() => {
_ = mempool::Crawler::spawn(peer_set, mempool, sync_status).fuse() => {
unreachable!("The mempool crawler only stops if it panics");
}
}
Expand Down
2 changes: 1 addition & 1 deletion zebrad/src/components/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type TxVerifier = Buffer<
>;
type InboundTxDownloads = TxDownloads<Timeout<Outbound>, Timeout<TxVerifier>, State>;

#[derive(Debug)]
#[derive(Debug, Eq, PartialEq)]
#[allow(dead_code)]
pub enum Request {
TransactionIds,
Expand Down
72 changes: 58 additions & 14 deletions zebrad/src/components/mempool/crawler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ use futures::{stream::FuturesUnordered, StreamExt};
use tokio::{task::JoinHandle, time::sleep};
use tower::{timeout::Timeout, BoxError, Service, ServiceExt};

use zebra_network::{Request, Response};
use zebra_network as zn;

use super::super::sync::SyncStatus;
use super::{
super::{mempool, sync::SyncStatus},
downloads::Gossip,
};

#[cfg(test)]
mod tests;
Expand All @@ -31,20 +34,30 @@ const RATE_LIMIT_DELAY: Duration = Duration::from_secs(75);
const PEER_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6);

/// The mempool transaction crawler.
pub struct Crawler<S> {
peer_set: Timeout<S>,
pub struct Crawler<PeerSet, Mempool> {
peer_set: Timeout<PeerSet>,
mempool: Mempool,
status: SyncStatus,
}

impl<S> Crawler<S>
impl<PeerSet, Mempool> Crawler<PeerSet, Mempool>
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send,
PeerSet:
Service<zn::Request, Response = zn::Response, Error = BoxError> + Clone + Send + 'static,
PeerSet::Future: Send,
Mempool:
Service<mempool::Request, Response = mempool::Response, Error = BoxError> + Send + 'static,
Mempool::Future: Send,
{
/// Spawn an asynchronous task to run the mempool crawler.
pub fn spawn(peer_set: S, status: SyncStatus) -> JoinHandle<Result<(), BoxError>> {
pub fn spawn(
peer_set: PeerSet,
mempool: Mempool,
status: SyncStatus,
) -> JoinHandle<Result<(), BoxError>> {
let crawler = Crawler {
peer_set: Timeout::new(peer_set, PEER_RESPONSE_TIMEOUT),
mempool,
status,
};

Expand Down Expand Up @@ -79,13 +92,13 @@ where
// end the task on permanent peer set errors
let peer_set = peer_set.ready_and().await?;

requests.push(peer_set.call(Request::MempoolTransactionIds));
requests.push(peer_set.call(zn::Request::MempoolTransactionIds));
}

while let Some(result) = requests.next().await {
// log individual response errors
match result {
Ok(response) => self.handle_response(response).await,
Ok(response) => self.handle_response(response).await?,
// TODO: Reduce the log level of the errors (#2655).
Err(error) => info!("Failed to crawl peer for mempool transactions: {}", error),
}
Expand All @@ -95,9 +108,9 @@ where
}

/// Handle a peer's response to the crawler's request for transactions.
async fn handle_response(&mut self, response: Response) {
let transaction_ids = match response {
Response::TransactionIds(ids) => ids,
async fn handle_response(&mut self, response: zn::Response) -> Result<(), BoxError> {
let transaction_ids: Vec<_> = match response {
zn::Response::TransactionIds(ids) => ids.into_iter().map(Gossip::Id).collect(),
_ => unreachable!("Peer set did not respond with transaction IDs to mempool crawler"),
};

Expand All @@ -106,6 +119,37 @@ where
transaction_ids.len()
);

// TODO: Send transaction IDs to the download and verify stream (#2650)
if !transaction_ids.is_empty() {
self.queue_transactions(transaction_ids).await?;
}

Ok(())
}

/// Forward the crawled transactions IDs to the mempool transaction downloader.
async fn queue_transactions(&mut self, transaction_ids: Vec<Gossip>) -> Result<(), BoxError> {
let call_result = self
.mempool
.ready_and()
.await?
.call(mempool::Request::Queue(transaction_ids))
.await;

let queue_errors = match call_result {
Ok(mempool::Response::Queued(queue_results)) => {
queue_results.into_iter().filter_map(Result::err)
}
Ok(_) => unreachable!("Mempool did not respond with queue results to mempool crawler"),
Err(call_error) => {
debug!("Ignoring unexpected peer behavior: {}", call_error);
return Ok(());
}
};

for error in queue_errors {
debug!("Failed to download a crawled transaction: {}", error);
}

Ok(())
}
}
Loading