Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(network): Limit number of peer connections per IP address, Ignore new peer connections from the same IP and port #6980

Merged
merged 7 commits into from
Jun 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions zebra-network/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ pub const INBOUND_PEER_LIMIT_MULTIPLIER: usize = 5;
/// See [`INBOUND_PEER_LIMIT_MULTIPLIER`] for details.
pub const OUTBOUND_PEER_LIMIT_MULTIPLIER: usize = 3;

/// The maximum number of peer connections Zebra will keep for a given IP address
/// before it drops any additional peer connections with that IP.
pub const MAX_CONNS_PER_IP: usize = 3;

/// The buffer size for the peer set.
///
/// This should be greater than 1 to avoid sender contention, but also reasonably
Expand Down
1 change: 1 addition & 0 deletions zebra-network/src/peer_set/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ where
inv_receiver,
address_metrics,
MinimumPeerVersion::new(latest_chain_tip, config.network),
None,
);
let peer_set = Buffer::new(BoxService::new(peer_set), constants::PEERSET_BUFFER_SIZE);

Expand Down
51 changes: 50 additions & 1 deletion zebra-network/src/peer_set/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ where

/// The last time we logged a message about the peer set size
last_peer_log: Option<Instant>,

/// The configured maximum number of peers that can be in the
/// peer set per IP, defaults to [`crate::constants::MAX_CONNS_PER_IP`]
max_conns_per_ip: usize,
}

impl<D, C> Drop for PeerSet<D, C>
Expand All @@ -270,6 +274,7 @@ where
D::Error: Into<BoxError>,
C: ChainTip,
{
#[allow(clippy::too_many_arguments)]
/// Construct a peerset which uses `discover` to manage peer connections.
///
/// Arguments:
Expand All @@ -282,6 +287,9 @@ where
/// - `inv_stream`: receives inventory changes from peers,
/// allowing the peer set to direct inventory requests;
/// - `address_book`: when peer set is busy, it logs address book diagnostics.
/// - `minimum_peer_version`: endpoint to see the minimum peer protocol version in real time.
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
/// - `max_conns_per_ip`: configured maximum number of peers that can be in the
/// peer set per IP, defaults to [`crate::constants::MAX_CONNS_PER_IP`].
pub fn new(
config: &Config,
discover: D,
Expand All @@ -290,6 +298,7 @@ where
inv_stream: broadcast::Receiver<InventoryChange>,
address_metrics: watch::Receiver<AddressMetrics>,
minimum_peer_version: MinimumPeerVersion<C>,
max_conns_per_ip: Option<usize>,
) -> Self {
Self {
// New peers
Expand Down Expand Up @@ -317,6 +326,8 @@ where
// Metrics
last_peer_log: None,
address_metrics,

max_conns_per_ip: max_conns_per_ip.unwrap_or(crate::constants::MAX_CONNS_PER_IP),
}
}

Expand Down Expand Up @@ -476,6 +487,26 @@ where
}
}

/// Returns the number of peer connections Zebra already has with
/// the provided IP address
///
/// # Performance
///
/// This method is `O(connected peers)`, so it should not be called from a loop
/// that is already iterating through the peer set.
fn num_peers_with_ip(&self, ip: IpAddr) -> usize {
self.ready_services
.keys()
.chain(self.cancel_handles.keys())
.filter(|addr| addr.ip() == ip)
.count()
}

/// Returns `true` if Zebra is already connected to the IP and port in `addr`.
fn has_peer_with_addr(&self, addr: PeerSocketAddr) -> bool {
self.ready_services.contains_key(&addr) || self.cancel_handles.contains_key(&addr)
}

/// Checks for newly inserted or removed services.
///
/// Puts inserted services in the unready list.
Expand All @@ -496,7 +527,25 @@ where
// - always do the same checks on every ready peer, and
// - check for any errors that happened right after the handshake
trace!(?key, "got Change::Insert from Discover");
self.remove(&key);
arya2 marked this conversation as resolved.
Show resolved Hide resolved

// # Security
//
// Drop the new peer if we are already connected to it.
// Preferring old connections avoids connection thrashing.
if self.has_peer_with_addr(key) {
std::mem::drop(svc);
continue;
}

// # Security
//
// drop the new peer if there are already `MAX_CONNS_PER_IP` peers with
arya2 marked this conversation as resolved.
Show resolved Hide resolved
// the same IP address in the peer set.
if self.num_peers_with_ip(key.ip()) >= self.max_conns_per_ip {
std::mem::drop(svc);
continue;
arya2 marked this conversation as resolved.
Show resolved Hide resolved
}

self.push_unready(key, svc);
}
}
Expand Down
24 changes: 24 additions & 0 deletions zebra-network/src/peer_set/set/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ struct PeerSetBuilder<D, C> {
inv_stream: Option<broadcast::Receiver<InventoryChange>>,
address_book: Option<Arc<std::sync::Mutex<AddressBook>>>,
minimum_peer_version: Option<MinimumPeerVersion<C>>,
max_conns_per_ip: Option<usize>,
}

impl PeerSetBuilder<(), ()> {
Expand All @@ -137,6 +138,7 @@ impl<D, C> PeerSetBuilder<D, C> {
inv_stream: self.inv_stream,
address_book: self.address_book,
minimum_peer_version: self.minimum_peer_version,
max_conns_per_ip: self.max_conns_per_ip,
}
}

Expand All @@ -146,13 +148,33 @@ impl<D, C> PeerSetBuilder<D, C> {
minimum_peer_version: MinimumPeerVersion<NewC>,
) -> PeerSetBuilder<D, NewC> {
PeerSetBuilder {
config: self.config,
discover: self.discover,
demand_signal: self.demand_signal,
handle_rx: self.handle_rx,
inv_stream: self.inv_stream,
address_book: self.address_book,
minimum_peer_version: Some(minimum_peer_version),
max_conns_per_ip: self.max_conns_per_ip,
}
}

/// Use the provided [`MinimumPeerVersion`] instance when constructing the [`PeerSet`] instance.
pub fn max_conns_per_ip(self, max_conns_per_ip: usize) -> PeerSetBuilder<D, C> {
assert!(
max_conns_per_ip > 0,
"max_conns_per_ip must be greater than zero"
);

PeerSetBuilder {
config: self.config,
discover: self.discover,
demand_signal: self.demand_signal,
handle_rx: self.handle_rx,
inv_stream: self.inv_stream,
address_book: self.address_book,
minimum_peer_version: self.minimum_peer_version,
max_conns_per_ip: Some(max_conns_per_ip),
}
}
}
Expand All @@ -175,6 +197,7 @@ where
let minimum_peer_version = self
.minimum_peer_version
.expect("`minimum_peer_version` must be set");
let max_conns_per_ip = self.max_conns_per_ip;

let demand_signal = self
.demand_signal
Expand All @@ -196,6 +219,7 @@ where
inv_stream,
address_metrics,
minimum_peer_version,
max_conns_per_ip,
);

(peer_set, guard)
Expand Down
5 changes: 5 additions & 0 deletions zebra-network/src/peer_set/set/tests/prop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ proptest! {
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version)
.max_conns_per_ip(usize::MAX)
.build();

check_if_only_up_to_date_peers_are_live(
Expand Down Expand Up @@ -72,6 +73,7 @@ proptest! {
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version.clone())
.max_conns_per_ip(usize::MAX)
.build();

check_if_only_up_to_date_peers_are_live(
Expand Down Expand Up @@ -122,6 +124,7 @@ proptest! {
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version.clone())
.max_conns_per_ip(usize::MAX)
.build();

// Get the total number of active peers
Expand Down Expand Up @@ -197,6 +200,7 @@ proptest! {
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version.clone())
.max_conns_per_ip(usize::MAX)
.build();

// Remove peers, test broadcast until there is only 1 peer left in the peerset
Expand Down Expand Up @@ -267,6 +271,7 @@ proptest! {
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version.clone())
.max_conns_per_ip(usize::MAX)
.build();

// Remove peers
Expand Down
49 changes: 49 additions & 0 deletions zebra-network/src/peer_set/set/tests/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,55 @@ fn peer_set_ready_multiple_connections() {
});
}

#[test]
fn peer_set_rejects_connections_past_per_ip_limit() {
const NUM_PEER_VERSIONS: usize = crate::constants::MAX_CONNS_PER_IP + 1;

// Use three peers with the same version
let peer_version = Version::min_specified_for_upgrade(Network::Mainnet, NetworkUpgrade::Nu5);
let peer_versions = PeerVersions {
peer_versions: [peer_version; NUM_PEER_VERSIONS].into_iter().collect(),
};

// Start the runtime
let (runtime, _init_guard) = zebra_test::init_async();
let _guard = runtime.enter();

// Pause the runtime's timer so that it advances automatically.
//
// CORRECTNESS: This test does not depend on external resources that could really timeout, like
// real network connections.
tokio::time::pause();

// Get peers and client handles of them
let (discovered_peers, handles) = peer_versions.mock_peer_discovery();
let (minimum_peer_version, _best_tip_height) =
MinimumPeerVersion::with_mock_chain_tip(Network::Mainnet);

// Make sure we have the right number of peers
assert_eq!(handles.len(), NUM_PEER_VERSIONS);

runtime.block_on(async move {
// Build a peerset
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version.clone())
.build();

// Get peerset ready
let peer_ready = peer_set
.ready()
.await
.expect("peer set service is always ready");

// Check we have the right amount of ready services
assert_eq!(
peer_ready.ready_services.len(),
crate::constants::MAX_CONNS_PER_IP
);
});
}

/// Check that a peer set with an empty inventory registry routes requests to a random ready peer.
#[test]
fn peer_set_route_inv_empty_registry() {
Expand Down