Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor BestTipHeight into a generic ChainTip sender and receiver #2676

Merged
merged 7 commits into from
Aug 27, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions zebra-chain/src/chain_tip.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
//! Chain tip interfaces.

use crate::block;

/// An interface for querying the chain tip.
///
/// This trait helps avoid dependencies between:
/// * zebra-chain and tokio
/// * zebra-network and zebra-state
pub trait ChainTip {
/// Return the height of the best chain tip.
fn best_tip_height(&self) -> Option<block::Height>;
}

/// A chain tip that is always empty.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct NoChainTip;

impl ChainTip for NoChainTip {
fn best_tip_height(&self) -> Option<block::Height> {
None
}
}

// convenience implementations for optional chain tips
teor2345 marked this conversation as resolved.
Show resolved Hide resolved

impl<T> ChainTip for Option<T>
where
T: ChainTip,
{
fn best_tip_height(&self) -> Option<block::Height> {
self.as_ref()
.and_then(|chain_tip| chain_tip.best_tip_height())
}
}

impl<T> ChainTip for &Option<T>
where
T: ChainTip,
{
fn best_tip_height(&self) -> Option<block::Height> {
self.as_ref()
.and_then(|chain_tip| chain_tip.best_tip_height())
}
}
1 change: 1 addition & 0 deletions zebra-chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ extern crate bitflags;

pub mod amount;
pub mod block;
pub mod chain_tip;
pub mod fmt;
pub mod history_tree;
pub mod orchard;
Expand Down
10 changes: 7 additions & 3 deletions zebra-network/src/isolated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@ use std::{
};

use futures::future::{FutureExt, TryFutureExt};

use tokio::net::TcpStream;
use tower::{
util::{BoxService, Oneshot},
Service,
};

use crate::{peer, BoxError, Config, Request, Response};
use peer::ConnectedAddr;
use zebra_chain::chain_tip::NoChainTip;

use crate::{
peer::{self, ConnectedAddr},
BoxError, Config, Request, Response,
};

/// Use the provided TCP connection to create a Zcash connection completely
/// isolated from all other node state.
Expand Down Expand Up @@ -55,6 +58,7 @@ pub fn connect_isolated(
Ok::<Response, Box<dyn std::error::Error + Send + Sync + 'static>>(Response::Nil)
}))
.with_user_agent(user_agent)
.with_chain_tip_receiver(NoChainTip)
.finish()
.expect("provided mandatory builder parameters");

Expand Down
15 changes: 9 additions & 6 deletions zebra-network/src/peer/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,35 +10,38 @@ use tokio::net::TcpStream;
use tower::{discover::Change, Service, ServiceExt};
use tracing_futures::Instrument;

use zebra_chain::chain_tip::{ChainTip, NoChainTip};

use crate::{BoxError, Request, Response};

use super::{Client, ConnectedAddr, Handshake};

/// A wrapper around [`peer::Handshake`] that opens a TCP connection before
/// forwarding to the inner handshake service. Writing this as its own
/// [`tower::Service`] lets us apply unified timeout policies, etc.
pub struct Connector<S> {
handshaker: Handshake<S>,
pub struct Connector<S, C = NoChainTip> {
handshaker: Handshake<S, C>,
}

impl<S: Clone> Clone for Connector<S> {
impl<S: Clone, C: Clone> Clone for Connector<S, C> {
fn clone(&self) -> Self {
Connector {
handshaker: self.handshaker.clone(),
}
}
}

impl<S> Connector<S> {
pub fn new(handshaker: Handshake<S>) -> Self {
impl<S, C> Connector<S, C> {
pub fn new(handshaker: Handshake<S, C>) -> Self {
Connector { handshaker }
}
}

impl<S> Service<SocketAddr> for Connector<S>
impl<S, C> Service<SocketAddr> for Connector<S, C>
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send,
C: ChainTip + Clone + Send + 'static,
{
type Response = Change<SocketAddr, Client>;
type Error = BoxError;
Expand Down
55 changes: 28 additions & 27 deletions zebra-network/src/peer/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,17 @@ use futures::{
channel::{mpsc, oneshot},
future, FutureExt, SinkExt, StreamExt,
};
use tokio::{
net::TcpStream,
sync::{broadcast, watch},
task::JoinError,
time::timeout,
};
use tokio::{net::TcpStream, sync::broadcast, task::JoinError, time::timeout};
use tokio_util::codec::Framed;
use tower::Service;
use tracing::{span, Level, Span};
use tracing_futures::Instrument;

use zebra_chain::{block, parameters::Network};
use zebra_chain::{
block,
chain_tip::{ChainTip, NoChainTip},
parameters::Network,
};

use crate::{
constants,
Expand All @@ -48,7 +47,7 @@ use super::{Client, ClientRequest, Connection, ErrorSlot, HandshakeError, PeerEr
/// - launched in a separate task, and
/// - wrapped in a timeout.
#[derive(Clone)]
pub struct Handshake<S> {
pub struct Handshake<S, C = NoChainTip> {
config: Config,
inbound_service: S,
timestamp_collector: mpsc::Sender<MetaAddrChange>,
Expand All @@ -58,7 +57,7 @@ pub struct Handshake<S> {
our_services: PeerServices,
relay: bool,
parent_span: Span,
best_tip_height: Option<watch::Receiver<Option<block::Height>>>,
chain_tip_receiver: Option<C>,
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
}

/// The peer address that we are handshaking with.
Expand Down Expand Up @@ -300,21 +299,22 @@ impl fmt::Debug for ConnectedAddr {
}

/// A builder for `Handshake`.
pub struct Builder<S> {
pub struct Builder<S, C = NoChainTip> {
config: Option<Config>,
inbound_service: Option<S>,
timestamp_collector: Option<mpsc::Sender<MetaAddrChange>>,
our_services: Option<PeerServices>,
user_agent: Option<String>,
relay: Option<bool>,
inv_collector: Option<broadcast::Sender<(InventoryHash, SocketAddr)>>,
best_tip_height: Option<watch::Receiver<Option<block::Height>>>,
chain_tip_receiver: Option<C>,
}

impl<S> Builder<S>
impl<S, C> Builder<S, C>
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send,
C: ChainTip,
{
/// Provide a config. Mandatory.
pub fn with_config(mut self, config: Config) -> Self {
Expand Down Expand Up @@ -372,11 +372,10 @@ where
///
/// If this is unset, the minimum accepted protocol version for peer connections is kept
/// constant over network upgrade activations.
pub fn with_best_tip_height(
mut self,
best_tip_height: Option<watch::Receiver<Option<block::Height>>>,
) -> Self {
self.best_tip_height = best_tip_height;
///
/// Use [`NoChainTip`] to explicitly provide no chain tip.
pub fn with_chain_tip_receiver(mut self, chain_tip_receiver: C) -> Self {
self.chain_tip_receiver = Some(chain_tip_receiver);
self
}
teor2345 marked this conversation as resolved.
Show resolved Hide resolved

Expand All @@ -391,7 +390,7 @@ where
/// Consume this builder and produce a [`Handshake`].
///
/// Returns an error only if any mandatory field was unset.
pub fn finish(self) -> Result<Handshake<S>, &'static str> {
pub fn finish(self) -> Result<Handshake<S, C>, &'static str> {
let config = self.config.ok_or("did not specify config")?;
let inbound_service = self
.inbound_service
Expand Down Expand Up @@ -421,18 +420,19 @@ where
our_services,
relay,
parent_span: Span::current(),
best_tip_height: self.best_tip_height,
chain_tip_receiver: self.chain_tip_receiver,
})
}
}

impl<S> Handshake<S>
impl<S, C> Handshake<S, C>
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send,
C: ChainTip,
{
/// Create a builder that configures a [`Handshake`] service.
pub fn builder() -> Builder<S> {
pub fn builder() -> Builder<S, C> {
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
// We don't derive `Default` because the derive inserts a `where S:
// Default` bound even though `Option<S>` implements `Default` even if
// `S` does not.
Expand All @@ -444,7 +444,7 @@ where
our_services: None,
relay: None,
inv_collector: None,
best_tip_height: None,
chain_tip_receiver: None,
}
}
}
Expand All @@ -463,7 +463,7 @@ pub async fn negotiate_version(
user_agent: String,
our_services: PeerServices,
relay: bool,
best_tip_height: Option<watch::Receiver<Option<block::Height>>>,
chain_tip_receiver: impl ChainTip,
) -> Result<(Version, PeerServices, SocketAddr), HandshakeError> {
// Create a random nonce for this connection
let local_nonce = Nonce::default();
Expand Down Expand Up @@ -577,7 +577,7 @@ pub async fn negotiate_version(

// SECURITY: Reject connections to peers on old versions, because they might not know about all
// network upgrades and could lead to chain forks or slower block propagation.
let height = best_tip_height.and_then(|height| *height.borrow());
let height = chain_tip_receiver.best_tip_height();
let min_version = Version::min_remote_for_height(config.network, height);
if remote_version < min_version {
// Disconnect if peer is using an obsolete version.
Expand All @@ -601,10 +601,11 @@ pub async fn negotiate_version(

pub type HandshakeRequest = (TcpStream, ConnectedAddr);

impl<S> Service<HandshakeRequest> for Handshake<S>
impl<S, C> Service<HandshakeRequest> for Handshake<S, C>
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send,
C: ChainTip + Clone + Send + 'static,
{
type Response = Client;
type Error = BoxError;
Expand Down Expand Up @@ -634,7 +635,7 @@ where
let user_agent = self.user_agent.clone();
let our_services = self.our_services;
let relay = self.relay;
let best_tip_height = self.best_tip_height.clone();
let chain_tip_receiver = self.chain_tip_receiver.clone();

let fut = async move {
debug!(
Expand Down Expand Up @@ -665,7 +666,7 @@ where
user_agent,
our_services,
relay,
best_tip_height,
chain_tip_receiver,
),
)
.await??;
Expand Down
24 changes: 12 additions & 12 deletions zebra-network/src/peer_set/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,7 @@ use futures::{
stream::{FuturesUnordered, StreamExt},
TryFutureExt,
};
use tokio::{
net::TcpListener,
sync::{broadcast, watch},
time::Instant,
};
use tokio::{net::TcpListener, sync::broadcast, time::Instant};
use tower::{
buffer::Buffer, discover::Change, layer::Layer, load::peak_ewma::PeakEwmaDiscover,
util::BoxService, Service, ServiceExt,
Expand All @@ -29,18 +25,19 @@ use crate::{
BoxError, Config, Request, Response,
};

use zebra_chain::{block, parameters::Network};
use zebra_chain::{chain_tip::ChainTip, parameters::Network};

use super::{CandidateSet, PeerSet};

use super::CandidateSet;
use super::PeerSet;
use peer::Client;

#[cfg(test)]
mod tests;

type PeerChange = Result<Change<SocketAddr, peer::Client>, BoxError>;

/// Initialize a peer set.
/// Initialize a peer set, using a network `config`, `inbound_service`,
/// and `chain_tip_receiver`.
///
/// The peer set abstracts away peer management to provide a
/// [`tower::Service`] representing "the network" that load-balances requests
Expand All @@ -57,20 +54,23 @@ type PeerChange = Result<Change<SocketAddr, peer::Client>, BoxError>;
/// cause the peer set to shrink when the inbound service is unable to keep up
/// with the volume of inbound requests.
///
/// Use [`NoChainTip`] to explicitly provide no chain tip receiver.
///
/// In addition to returning a service for outbound requests, this method
/// returns a shared [`AddressBook`] updated with last-seen timestamps for
/// connected peers.
pub async fn init<S>(
pub async fn init<S, C>(
config: Config,
inbound_service: S,
best_tip_height: Option<watch::Receiver<Option<block::Height>>>,
chain_tip_receiver: C,
) -> (
Buffer<BoxService<Request, Response, BoxError>, Request>,
Arc<std::sync::Mutex<AddressBook>>,
)
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send + 'static,
C: ChainTip + Clone + Send + 'static,
{
let (tcp_listener, listen_addr) = open_listener(&config.clone()).await;

Expand All @@ -92,7 +92,7 @@ where
.with_timestamp_collector(timestamp_collector)
.with_advertised_services(PeerServices::NODE_NETWORK)
.with_user_agent(crate::constants::USER_AGENT.to_string())
.with_best_tip_height(best_tip_height)
.with_chain_tip_receiver(chain_tip_receiver)
.want_transactions(true)
.finish()
.expect("configured all required parameters");
Expand Down
4 changes: 2 additions & 2 deletions zebra-network/src/peer_set/initialize/tests/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::{collections::HashSet, net::SocketAddr};

use tower::service_fn;

use zebra_chain::parameters::Network;
use zebra_chain::{chain_tip::NoChainTip, parameters::Network};
use zebra_test::net::random_known_port;

use crate::Config;
Expand Down Expand Up @@ -110,7 +110,7 @@ async fn local_listener_port_with(listen_addr: SocketAddr, network: Network) {
let inbound_service =
service_fn(|_| async { unreachable!("inbound service should never be called") });

let (_peer_service, address_book) = init(config, inbound_service, None).await;
let (_peer_service, address_book) = init(config, inbound_service, NoChainTip).await;
let local_listener = address_book.lock().unwrap().local_listener_meta_addr();

if listen_addr.port() == 0 {
Expand Down
Loading