From 01114b0eae7abd0bd389e292858a24a4530fa6c1 Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Fri, 22 Nov 2024 07:11:45 +0200 Subject: [PATCH] feat(peersync)!: push local peer record to connected peer on change (#1202) Description --- feat(peersync)!: push local peer record to connected peer on change Add libp2p logs to log4rs Motivation and Context --- Connected peers should have an up-to-date and shareable address record for peers they are connected to. This PR adds this to the peer sync protocol. How Has This Been Tested? --- Manually - looked at logs to observe it working What process can a PR reviewer use to test or verify this change? --- Given node A connected to node B prior, a node C can request node B from Node A and always get a response. Breaking Changes --- - [ ] None - [ ] Requires data directory to be deleted - [x] Other - Please specify BREAKING CHANGE: peer sync protocol is not compatible with previous versions --- applications/tari_indexer/log4rs_sample.yml | 106 +++++++++++++- .../src/process_manager/manager.rs | 13 +- .../tari_validator_node/log4rs_sample.yml | 98 +++++++++++++ networking/core/src/relay_state.rs | 42 ++++-- networking/core/src/worker.rs | 46 ++++-- networking/libp2p-messaging/src/behaviour.rs | 21 ++- .../libp2p-peersync/proto/messages.proto | 35 +++-- networking/libp2p-peersync/src/behaviour.rs | 96 ++++++------ networking/libp2p-peersync/src/event.rs | 8 +- networking/libp2p-peersync/src/handler.rs | 95 +++++++----- .../libp2p-peersync/src/inbound_task.rs | 57 +++++++- networking/libp2p-peersync/src/lib.rs | 2 +- .../libp2p-peersync/src/outbound_task.rs | 138 +++++++++++------- networking/libp2p-peersync/src/peer_record.rs | 70 +++++---- networking/libp2p-peersync/src/proto.rs | 16 ++ 15 files changed, 602 insertions(+), 241 deletions(-) diff --git a/applications/tari_indexer/log4rs_sample.yml b/applications/tari_indexer/log4rs_sample.yml index fed295cc4..53f813bb3 100644 --- a/applications/tari_indexer/log4rs_sample.yml +++ b/applications/tari_indexer/log4rs_sample.yml @@ -72,6 +72,23 @@ appenders: encoder: pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{t}] [{X(node-public-key)},{X(node-id)}] {l:5} {m} // {f}:{L}{n}" + # An appender named "libp2p" that writes to a file with a custom pattern encoder + libp2p: + kind: rolling_file + path: "{{log_dir}}/log/indexer/libp2p.log" + policy: + kind: compound + trigger: + kind: size + limit: 10mb + roller: + kind: fixed_window + base: 1 + count: 5 + pattern: "{{log_dir}}/log/indexer/libp2p.{}.log" + encoder: + pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{t}] [Thread:{I}] {l:5} {m}{n} // {f}:{L} " + # An appender named "other" that writes to a file with a custom pattern encoder other: kind: rolling_file @@ -124,13 +141,7 @@ loggers: - stdout additive: false - # Route log events sent to the "comms" logger to the "network" appender - comms: - level: debug - appenders: - - network - - # Route log events sent to the "yamux" logger to the "network" appender + # Route log events sent to the "yamux" logger to the "network" appender yamux: level: info appenders: @@ -152,3 +163,84 @@ loggers: appenders: - other additive: false + # libp2p + libp2p_identify: + level: debug + appenders: + - libp2p + additive: false + libp2p_noise: + level: debug + appenders: + - libp2p + additive: false + libp2p_peersync: + level: debug + appenders: + - libp2p + additive: false + libp2p_gossipsub: + level: info + appenders: + - libp2p + additive: false + quinn: + level: info + appenders: + - libp2p + additive: false + quinn_proto: + level: info + appenders: + - libp2p + additive: false + libp2p_core: + level: info + appenders: + - libp2p + additive: false + libp2p_mdns: + level: debug + appenders: + - libp2p + additive: false + libp2p_swarm: + level: debug + appenders: + - libp2p + additive: false + hickory_proto: + level: info + appenders: + - libp2p + additive: false + multistream_select: + level: info + appenders: + - libp2p + additive: false + libp2p_tcp: + level: debug + appenders: + - libp2p + additive: false + libp2p_quic: + level: debug + appenders: + - libp2p + additive: false + libp2p_kad: + level: debug + appenders: + - libp2p + additive: false + libp2p_ping: + level: debug + appenders: + - libp2p + additive: false + libp2p_messaging: + level: debug + appenders: + - libp2p + additive: false diff --git a/applications/tari_swarm_daemon/src/process_manager/manager.rs b/applications/tari_swarm_daemon/src/process_manager/manager.rs index f642e5029..cd7218ae9 100644 --- a/applications/tari_swarm_daemon/src/process_manager/manager.rs +++ b/applications/tari_swarm_daemon/src/process_manager/manager.rs @@ -101,15 +101,15 @@ impl ProcessManager { } } - let num_vns = if self.skip_registration { + let num_blocks = if self.skip_registration { 0 } else { - self.instance_manager.num_validator_nodes() + self.instance_manager.num_validator_nodes() + + u64::try_from(templates_to_register.len()).expect("impossibly many templates") }; - let num_blocks = num_vns + u64::try_from(templates_to_register.len()).unwrap(); - if !self.skip_registration { - // Mine some initial funds, guessing 10 blocks extra to allow for coinbase maturity + if num_blocks > 0 { + // Mine some initial funds, guessing 10 blocks extra is sufficient for coinbase maturity self.mine(num_blocks + 10).await.context("initial mining failed")?; self.wait_for_wallet_funds(num_blocks) .await @@ -121,9 +121,8 @@ impl ProcessManager { for templates in templates_to_register { self.register_template(templates).await?; } - } - if num_blocks > 0 { + // "Mine in" the validators and templates self.mine(20).await?; } diff --git a/applications/tari_validator_node/log4rs_sample.yml b/applications/tari_validator_node/log4rs_sample.yml index 12c64e456..e17d54083 100644 --- a/applications/tari_validator_node/log4rs_sample.yml +++ b/applications/tari_validator_node/log4rs_sample.yml @@ -89,6 +89,23 @@ appenders: encoder: pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{t}] [{X(node-public-key)},{X(node-id)}] {l:5} {m} // {f}:{L}{n}" + # An appender named "libp2p" that writes to a file with a custom pattern encoder + libp2p: + kind: rolling_file + path: "{{log_dir}}/log/validator-node/libp2p.log" + policy: + kind: compound + trigger: + kind: size + limit: 10mb + roller: + kind: fixed_window + base: 1 + count: 5 + pattern: "{{log_dir}}/log/validator-node/libp2p.{}.log" + encoder: + pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{t}] [Thread:{I}] {l:5} {m}{n} // {f}:{L} " + # An appender named "other" that writes to a file with a custom pattern encoder other: @@ -196,3 +213,84 @@ loggers: appenders: - other additive: false + # libp2p + libp2p_identify: + level: debug + appenders: + - libp2p + additive: false + libp2p_noise: + level: debug + appenders: + - libp2p + additive: false + libp2p_peersync: + level: debug + appenders: + - libp2p + additive: false + libp2p_gossipsub: + level: info + appenders: + - libp2p + additive: false + quinn: + level: info + appenders: + - libp2p + additive: false + quinn_proto: + level: info + appenders: + - libp2p + additive: false + libp2p_core: + level: info + appenders: + - libp2p + additive: false + libp2p_mdns: + level: debug + appenders: + - libp2p + additive: false + libp2p_swarm: + level: debug + appenders: + - libp2p + additive: false + hickory_proto: + level: info + appenders: + - libp2p + additive: false + multistream_select: + level: info + appenders: + - libp2p + additive: false + libp2p_tcp: + level: debug + appenders: + - libp2p + additive: false + libp2p_quic: + level: debug + appenders: + - libp2p + additive: false + libp2p_kad: + level: debug + appenders: + - libp2p + additive: false + libp2p_ping: + level: debug + appenders: + - libp2p + additive: false + libp2p_messaging: + level: debug + appenders: + - libp2p + additive: false diff --git a/networking/core/src/relay_state.rs b/networking/core/src/relay_state.rs index 3d6a4d3f4..2f7680a0f 100644 --- a/networking/core/src/relay_state.rs +++ b/networking/core/src/relay_state.rs @@ -3,11 +3,11 @@ use std::collections::{HashMap, HashSet}; -use libp2p::{multiaddr::Protocol, Multiaddr, PeerId}; +use libp2p::{multiaddr::Protocol, swarm::ConnectionId, Multiaddr, PeerId}; use rand::seq::IteratorRandom; #[derive(Debug, Clone, Default)] -pub struct RelayState { +pub(crate) struct RelayState { selected_relay: Option, possible_relays: HashMap>, } @@ -40,6 +40,23 @@ impl RelayState { self.selected_relay.as_mut() } + pub fn set_relay_peer(&mut self, peer_id: PeerId, dialled_address: Option) -> bool { + if self.selected_relay.as_ref().map_or(false, |p| p.peer_id == peer_id) { + return true; + } + + if let Some(addrs) = self.possible_relays.get(&peer_id) { + self.selected_relay = Some(RelayPeer { + peer_id, + addresses: addrs.iter().cloned().collect(), + circuit_connection_id: None, + remote_address: dialled_address, + }); + return true; + } + false + } + pub fn possible_relays(&self) -> impl Iterator)> { self.possible_relays.iter() } @@ -49,10 +66,7 @@ impl RelayState { } pub fn has_active_relay(&self) -> bool { - self.selected_relay - .as_ref() - .map(|r| r.is_circuit_established) - .unwrap_or(false) + self.selected_relay.as_ref().map(|r| r.has_circuit()).unwrap_or(false) } pub fn add_possible_relay(&mut self, peer: PeerId, address: Multiaddr) { @@ -70,16 +84,22 @@ impl RelayState { self.selected_relay = Some(RelayPeer { peer_id: *peer, addresses: addrs.iter().cloned().collect(), - is_circuit_established: false, - dialled_address: None, + circuit_connection_id: None, + remote_address: None, }); } } #[derive(Debug, Clone)] -pub struct RelayPeer { +pub(crate) struct RelayPeer { pub peer_id: PeerId, pub addresses: Vec, - pub is_circuit_established: bool, - pub dialled_address: Option, + pub circuit_connection_id: Option, + pub remote_address: Option, +} + +impl RelayPeer { + pub fn has_circuit(&self) -> bool { + self.circuit_connection_id.is_some() + } } diff --git a/networking/core/src/worker.rs b/networking/core/src/worker.rs index af0fc5b52..b14ea2035 100644 --- a/networking/core/src/worker.rs +++ b/networking/core/src/worker.rs @@ -404,6 +404,7 @@ where peer_id, endpoint, cause, + connection_id, .. } => { info!(target: LOG_TARGET, "🔌 Connection closed: peer_id={}, endpoint={:?}, cause={:?}", peer_id, endpoint, cause); @@ -419,6 +420,12 @@ where }, } shrink_hashmap_if_required(&mut self.active_connections); + if let Some(selected) = self.relays.selected_relay() { + if selected.circuit_connection_id == Some(connection_id) { + // Our selected relay has disconnected, attempt to reserve another + self.attempt_relay_reservation(); + } + } }, SwarmEvent::OutgoingConnectionError { peer_id: Some(peer_id), @@ -502,7 +509,7 @@ where connection_id, }) => { info!(target: LOG_TARGET, "👋 Received identify from {} with {} addresses (id={connection_id})", peer_id, info.listen_addrs.len()); - self.on_peer_identified(peer_id, info)?; + self.on_peer_identified(connection_id, peer_id, info)?; }, Identify(event) => { debug!(target: LOG_TARGET, "ℹī¸ Identify event: {:?}", event); @@ -565,14 +572,15 @@ where Autonat(event) => { self.on_autonat_event(event)?; }, - PeerSync(peersync::Event::LocalPeerRecordUpdated { record }) => { - info!(target: LOG_TARGET, "🧑‍🧑‍🧒‍🧒 Local peer record updated: {:?} announce enabled = {}, has_sent_announce = {}",record, self.config.announce, self.has_sent_announce); - if self.config.announce && !self.has_sent_announce && record.is_signed() { + PeerSync(peersync::Event::LocalPeerRecordUpdated) => { + if self.config.announce && !self.has_sent_announce { + let record = self.swarm.behaviour().peer_sync.local_peer_record(); info!(target: LOG_TARGET, "đŸ“Ŗ Sending local peer announce with {} address(es)", record.addresses().len()); + let proto_rec = record.encode_to_proto()?; self.swarm .behaviour_mut() .gossipsub - .publish(IdentTopic::new(PEER_ANNOUNCE_TOPIC), record.encode_to_proto()?)?; + .publish(IdentTopic::new(PEER_ANNOUNCE_TOPIC), proto_rec)?; self.has_sent_announce = true; } }, @@ -678,8 +686,12 @@ where use autonat::Event::*; match event { StatusChanged { old, new } => { - if let Some(public_address) = self.swarm.behaviour().autonat.public_address() { + if let Some(public_address) = self.swarm.behaviour().autonat.public_address().cloned() { info!(target: LOG_TARGET, "🌍ī¸ Autonat: Our public address is {public_address}"); + self.swarm + .behaviour_mut() + .peer_sync + .add_known_local_public_addresses(vec![public_address]); } // If we are/were "Private", let's establish a relay reservation with a known relay @@ -742,7 +754,7 @@ where if let Some(relay) = self.relays.selected_relay_mut() { if endpoint.is_dialer() && relay.peer_id == peer_id { - relay.dialled_address = Some(endpoint.get_remote_address().clone()); + relay.remote_address = Some(endpoint.get_remote_address().clone()); } } @@ -770,7 +782,12 @@ where Ok(()) } - fn on_peer_identified(&mut self, peer_id: PeerId, info: identify::Info) -> Result<(), NetworkingError> { + fn on_peer_identified( + &mut self, + connection_id: ConnectionId, + peer_id: PeerId, + info: identify::Info, + ) -> Result<(), NetworkingError> { if !self.config.swarm.protocol_version.is_compatible(&info.protocol_version) { info!(target: LOG_TARGET, "🚨 Peer {} is using an incompatible protocol version: {}. Our version {}", peer_id, info.protocol_version, self.config.swarm.protocol_version); // Error can be ignored as the docs indicate that an error only occurs if there was no connection to the @@ -814,6 +831,9 @@ where // Otherwise, if the peer advertises as a relay we'll add them info!(target: LOG_TARGET, "📡 Adding peer {peer_id} {address} as a relay"); self.relays.add_possible_relay(peer_id, address.clone()); + if !self.relays.has_active_relay() { + self.relays.set_relay_peer(peer_id, Some(address.clone())); + } } else { // Nothing to do } @@ -823,7 +843,7 @@ where // If this peer is the selected relay that was dialled previously, listen on the circuit address // Note we only select a relay if autonat says we are not publicly accessible. if is_relay { - self.establish_relay_circuit_on_connect(&peer_id); + self.establish_relay_circuit_on_connect(&peer_id, connection_id); } self.publish_event(NetworkingEvent::NewIdentifiedPeer { @@ -847,7 +867,7 @@ where /// Establishes a relay circuit for the given peer if it is the selected relay peer. Returns true if the circuit /// was established from this call. - fn establish_relay_circuit_on_connect(&mut self, peer_id: &PeerId) -> bool { + fn establish_relay_circuit_on_connect(&mut self, peer_id: &PeerId, connection_id: ConnectionId) -> bool { let Some(relay) = self.relays.selected_relay() else { return false; }; @@ -858,12 +878,12 @@ where } // If we've already established a circuit with the relay, there's nothing to do here - if relay.is_circuit_established { + if relay.has_circuit() { return false; } // Check if we've got a confirmed address for the relay - let Some(dialled_address) = relay.dialled_address.as_ref() else { + let Some(dialled_address) = relay.remote_address.as_ref() else { return false; }; @@ -880,7 +900,7 @@ where // unreachable return false; }; - relay_mut.is_circuit_established = true; + relay_mut.circuit_connection_id = Some(connection_id); true }, Err(e) => { diff --git a/networking/libp2p-messaging/src/behaviour.rs b/networking/libp2p-messaging/src/behaviour.rs index a2266ea2c..89f8a36f8 100644 --- a/networking/libp2p-messaging/src/behaviour.rs +++ b/networking/libp2p-messaging/src/behaviour.rs @@ -16,6 +16,7 @@ use libp2p::{ ConnectionDenied, ConnectionHandler, ConnectionId, + DialError, DialFailure, FromSwarm, NetworkBehaviour, @@ -129,12 +130,12 @@ where TCodec: Codec + Send + Clone + 'static }, None => { let stream_id = self.next_outbound_stream_id(); - tracing::debug!("create a new outbound dial {stream_id}"); let (sink, stream) = stream::channel(stream_id, peer_id); - self.pending_events.push_back(ToSwarm::Dial { - opts: DialOpts::peer_id(peer_id).build(), - }); + let opts = DialOpts::peer_id(peer_id).build(); + let connection_id = opts.connection_id(); + self.pending_events.push_back(ToSwarm::Dial { opts }); + tracing::debug!("create a new outbound dial (conn_id={connection_id}, stream {stream_id})"); self.pending_outbound_dials.insert(peer_id, (sink.clone(), stream)); sink @@ -214,14 +215,12 @@ where TCodec: Codec + Send + Clone + 'static } } - fn on_dial_failure(&mut self, DialFailure { peer_id, .. }: DialFailure) { + fn on_dial_failure(&mut self, DialFailure { peer_id, error, .. }: DialFailure) { + if matches!(error, DialError::DialPeerConditionFalse(_)) { + return; + } + if let Some(peer) = peer_id { - // If there are pending outgoing messages when a dial failure occurs, - // it is implied that we are not connected to the peer, since pending - // outgoing messages are drained when a connection is established and - // only created when a peer is not connected when a request is made. - // Thus these requests must be considered failed, even if there is - // another, concurrent dialing attempt ongoing. if let Some((_sink, stream)) = self.pending_outbound_dials.remove(&peer) { self.pending_events .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure { diff --git a/networking/libp2p-peersync/proto/messages.proto b/networking/libp2p-peersync/proto/messages.proto index 75d1366f7..58ce4b036 100644 --- a/networking/libp2p-peersync/proto/messages.proto +++ b/networking/libp2p-peersync/proto/messages.proto @@ -3,30 +3,37 @@ syntax = "proto3"; +message Message { + oneof payload { + SignedPeerRecord LocalRecord = 1; + WantPeers WantPeers = 2; + } +} + // A want request for peers message WantPeers { - // Requested peers - repeated bytes want_peer_ids = 1; + // Requested peers + repeated bytes want_peer_ids = 1; } // Response to a want request. This response is streamed back to the requester. message WantPeerResponse { - // A peer that was requested. - SignedPeerRecord peer = 1; + // A peer that was requested. + SignedPeerRecord peer = 1; } message SignedPeerRecord { - // The addresses of the peer - repeated bytes addresses = 1; - // The Unix epoch based timestamp when this peer record was signed - uint64 ts_updated_at = 2; - // The signature that signs the peer record (addresses | ts_updated_at) - PeerSignature signature = 3; + // The addresses of the peer + repeated bytes addresses = 1; + // The Unix epoch based timestamp when this peer record was signed + uint64 ts_updated_at = 2; + // The signature that signs the peer record (addresses | ts_updated_at) + PeerSignature signature = 3; } message PeerSignature { - // The public key of the peer - bytes public_key = 1; - // The signature that signs the peer record (addresses | ts_updated_at) - bytes signature = 2; + // The public key of the peer + bytes public_key = 1; + // The signature that signs the peer record (addresses | ts_updated_at) + bytes signature = 2; } \ No newline at end of file diff --git a/networking/libp2p-peersync/src/behaviour.rs b/networking/libp2p-peersync/src/behaviour.rs index 08efef0c7..eeeb880ad 100644 --- a/networking/libp2p-peersync/src/behaviour.rs +++ b/networking/libp2p-peersync/src/behaviour.rs @@ -13,8 +13,6 @@ use libp2p::{ futures::executor::block_on, identity::Keypair, swarm::{ - behaviour::ExternalAddrConfirmed, - AddressChange, ConnectionClosed, ConnectionDenied, ConnectionId, @@ -34,7 +32,7 @@ use libp2p::{ use crate::{ error::Error, event::Event, - handler::Handler, + handler::{Handler, HandlerAction}, store::PeerStore, Config, LocalPeerRecord, @@ -72,7 +70,7 @@ where TPeerStore: PeerStore Self::with_custom_protocol(keypair, DEFAULT_PROTOCOL_NAME, store, config) } - pub fn with_custom_protocol( + fn with_custom_protocol( keypair: Keypair, protocol: StreamProtocol, peer_store: TPeerStore, @@ -88,7 +86,7 @@ where TPeerStore: PeerStore active_outbound_connections: HashMap::new(), remaining_want_peers: HashSet::new(), pending_syncs: VecDeque::new(), - pending_tasks: futures_bounded::FuturesSet::new(Duration::from_secs(1000), 1024), + pending_tasks: futures_bounded::FuturesSet::new(Duration::from_secs(30), 1024), sync_semaphore: Arc::new(async_semaphore::Semaphore::new(1)), } } @@ -106,16 +104,23 @@ where TPeerStore: PeerStore .map_err(|e| Error::StoreError(e.to_string())) } + pub fn local_peer_record(&self) -> &LocalPeerRecord { + &self.local_peer_record + } + pub fn add_known_local_public_addresses(&mut self, addrs: Vec) { if addrs.is_empty() { return; } + let mut is_any_new = false; for addr in addrs { - self.local_peer_record.add_address(addr.clone()); + is_any_new |= self.local_peer_record.add_address(addr.clone()); } - self.handle_update_local_record(); + if is_any_new { + self.handle_update_local_record(); + } } pub async fn want_peers>(&mut self, peers: I) -> Result<(), Error> { @@ -151,7 +156,7 @@ where TPeerStore: PeerStore self.pending_events.push_back(ToSwarm::NotifyHandler { peer_id: *peer_id, handler: NotifyHandler::One(*conn_id), - event: list.clone(), + event: HandlerAction::WantPeers(list.clone()), }); } } @@ -174,31 +179,14 @@ where TPeerStore: PeerStore } } - fn on_address_change(&mut self, _address_change: AddressChange) {} - - fn on_external_addr_confirmed(&mut self, addr_confirmed: ExternalAddrConfirmed) { - self.local_peer_record.add_address(addr_confirmed.addr.clone()); - self.handle_update_local_record() - } - fn handle_update_local_record(&mut self) { let store = self.peer_store.clone(); let local_peer_record = self.local_peer_record.clone(); - if !local_peer_record.is_signed() { - return; - } - let peer_rec = match local_peer_record.clone().try_into() { - Ok(peer_rec) => peer_rec, - Err(err) => { - tracing::error!("Failed to convert local peer record to signed peer record: {}", err); - return; - }, - }; + let local_peer_rec = SignedPeerRecord::from(local_peer_record); + let local_rec = Arc::new(local_peer_rec.clone()); let task = async move { - match store.put(peer_rec).await { - Ok(_) => Event::LocalPeerRecordUpdated { - record: local_peer_record, - }, + match store.put(local_peer_rec).await { + Ok(_) => Event::LocalPeerRecordUpdated, Err(err) => { tracing::error!("Failed to add local peer record to store: {}", err); Event::Error(Error::StoreError(err.to_string())) @@ -206,7 +194,16 @@ where TPeerStore: PeerStore } }; match self.pending_tasks.try_push(task) { - Ok(()) => {}, + Ok(()) => { + self.pending_events.reserve(self.active_outbound_connections.len()); + for (peer_id, conn_id) in &self.active_outbound_connections { + self.pending_events.push_back(ToSwarm::NotifyHandler { + peer_id: *peer_id, + handler: NotifyHandler::One(*conn_id), + event: HandlerAction::PushLocalRecord(local_rec.clone()), + }); + } + }, Err(_) => { self.pending_events.push_back(ToSwarm::GenerateEvent(Event::Error( Error::ExceededMaxNumberOfPendingTasks, @@ -225,17 +222,18 @@ where TPeerStore: PeerStore fn handle_established_inbound_connection( &mut self, _connection_id: ConnectionId, - peer: PeerId, + peer_id: PeerId, _local_addr: &Multiaddr, _remote_addr: &Multiaddr, ) -> Result, ConnectionDenied> { let handler = Handler::new( - peer, + peer_id, self.peer_store.clone(), self.protocol.clone(), &self.config, self.remaining_want_peers.clone(), self.sync_semaphore.clone(), + None, ); Ok(handler) } @@ -248,6 +246,7 @@ where TPeerStore: PeerStore _role_override: Endpoint, _port_use: PortUse, ) -> Result, ConnectionDenied> { + tracing::debug!("outbound connection to peer {}", peer); let handler = Handler::new( peer, self.peer_store.clone(), @@ -255,6 +254,7 @@ where TPeerStore: PeerStore &self.config, self.remaining_want_peers.clone(), self.sync_semaphore.clone(), + Some(Arc::new(self.local_peer_record.clone().into())), ); self.active_outbound_connections.insert(peer, connection_id); Ok(handler) @@ -264,16 +264,20 @@ where TPeerStore: PeerStore match event { FromSwarm::ConnectionEstablished(_) => {}, FromSwarm::ConnectionClosed(connection_closed) => self.on_connection_closed(connection_closed), - FromSwarm::AddressChange(address_change) => self.on_address_change(address_change), + FromSwarm::AddressChange(_) => {}, FromSwarm::ExternalAddrConfirmed(addr_confirmed) => { - self.on_external_addr_confirmed(addr_confirmed); + if self.local_peer_record.add_address(addr_confirmed.addr.clone()) { + self.handle_update_local_record(); + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::LocalPeerRecordUpdated)); + } }, FromSwarm::ExternalAddrExpired(addr_expired) => { - self.local_peer_record.remove_address(addr_expired.addr); - self.pending_events - .push_back(ToSwarm::GenerateEvent(Event::LocalPeerRecordUpdated { - record: self.local_peer_record.clone(), - })); + if self.local_peer_record.remove_address(addr_expired.addr) { + self.handle_update_local_record(); + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::LocalPeerRecordUpdated)); + } }, _ => {}, } @@ -302,7 +306,7 @@ where TPeerStore: PeerStore }, Event::InboundStreamInterrupted { .. } => {}, Event::OutboundStreamInterrupted { .. } => {}, - Event::ResponseStreamComplete { .. } => {}, + Event::InboundRequestCompleted { .. } => {}, Event::LocalPeerRecordUpdated { .. } => {}, Event::Error(_) => {}, } @@ -312,18 +316,6 @@ where TPeerStore: PeerStore fn poll(&mut self, cx: &mut Context<'_>) -> Poll>> { if let Some(event) = self.pending_events.pop_front() { - // if let - // ToSwarm::GenerateEvent(event) = - // &event { - // match event { - // Event::InboundFailure { peer_id, .. } => {} - // Event::OutboundFailure { peer_id, .. } => {} - // Event::InboundStreamInterrupted { peer_id, .. } => {} - // Event::OutboundStreamInterrupted { peer_id, .. } => {} - // Event::ResponseStreamComplete { peer_id, .. } => {} - // Event::Error(_) => {} - // } - // } return Poll::Ready(event); } if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { diff --git a/networking/libp2p-peersync/src/event.rs b/networking/libp2p-peersync/src/event.rs index 7c439b4be..2a96b43c8 100644 --- a/networking/libp2p-peersync/src/event.rs +++ b/networking/libp2p-peersync/src/event.rs @@ -3,7 +3,7 @@ use libp2p::PeerId; -use crate::{error::Error, LocalPeerRecord}; +use crate::error::Error; #[derive(Debug)] pub enum Event { @@ -25,13 +25,11 @@ pub enum Event { OutboundStreamInterrupted { peer_id: PeerId, }, - ResponseStreamComplete { + InboundRequestCompleted { peer_id: PeerId, peers_sent: usize, requested: usize, }, - LocalPeerRecordUpdated { - record: LocalPeerRecord, - }, + LocalPeerRecordUpdated, Error(Error), } diff --git a/networking/libp2p-peersync/src/handler.rs b/networking/libp2p-peersync/src/handler.rs index a1b76b08c..16d908c42 100644 --- a/networking/libp2p-peersync/src/handler.rs +++ b/networking/libp2p-peersync/src/handler.rs @@ -12,7 +12,6 @@ use std::{ use async_semaphore::{Semaphore, SemaphoreGuardArc}; use libp2p::{ core::UpgradeInfo, - futures::FutureExt, swarm::{ handler::{ ConnectionEvent, @@ -38,23 +37,31 @@ use crate::{ error::Error, event::Event, inbound_task::inbound_sync_task, - outbound_task::outbound_sync_task, + outbound_task::outbound_request_want_list_task, proto, store::PeerStore, Config, + SignedPeerRecord, EMPTY_QUEUE_SHRINK_THRESHOLD, MAX_MESSAGE_SIZE, }; pub(crate) type Framed = asynchronous_codec::Framed>; -pub(crate) type FramedOutbound = Framed; -pub(crate) type FramedInbound = Framed; +pub(crate) type FramedOutbound = Framed; +pub(crate) type FramedInbound = Framed; + +#[derive(Debug)] +pub enum HandlerAction { + PushLocalRecord(Arc), + WantPeers(Arc), +} pub struct Handler { peer_id: PeerId, protocol: StreamProtocol, must_request_substream: bool, is_complete: bool, + pending_local_peer_record: Option>, failed_attempts: usize, current_want_list: Arc, pending_events: VecDeque, @@ -73,12 +80,14 @@ impl Handler { config: &Config, want_list: WantList, semaphore: Arc, + pending_local_peer_record: Option>, ) -> Self { Self { store, peer_id, protocol, is_complete: false, + pending_local_peer_record, failed_attempts: 0, current_want_list: Arc::new(want_list), pending_events: VecDeque::new(), @@ -126,19 +135,26 @@ where TStore: PeerStore } fn on_fully_negotiated_outbound(&mut self, outbound: FullyNegotiatedOutbound, ()>) { - if self.current_want_list.is_empty() { - tracing::debug!("No peers wanted, ignoring outbound stream"); + if self.current_want_list.is_empty() && self.pending_local_peer_record.is_none() { + tracing::trace!("No peers wanted or no local peer record update, ignoring outbound stream"); return; } + tracing::debug!( + "peer-sync[{}]: Starting outbound protocol on outbound stream", + self.peer_id + ); let (stream, _protocol) = outbound.protocol; let framed = new_framed_codec(stream, MAX_MESSAGE_SIZE); let store = self.store.clone(); - if self.current_want_list.is_empty() { - tracing::debug!("No peers wanted, ignoring outbound stream"); - return; - } - let fut = outbound_sync_task(self.peer_id, framed, store, self.current_want_list.clone()).boxed(); + let pending_local_peer_record = self.pending_local_peer_record.take(); + let fut = outbound_request_want_list_task( + self.peer_id, + framed, + store, + self.current_want_list.clone(), + pending_local_peer_record, + ); if self.tasks.try_push(fut).is_err() { tracing::warn!("Dropping outbound peer sync because we are at capacity") @@ -151,7 +167,7 @@ where TStore: PeerStore let framed = new_framed_codec(stream, MAX_MESSAGE_SIZE); let store = self.store.clone(); - let fut = inbound_sync_task(self.peer_id, framed, store, config).boxed(); + let fut = inbound_sync_task(self.peer_id, framed, store, config); if self.tasks.try_push(fut).is_err() { tracing::warn!("Dropping inbound peer sync because we are at capacity") @@ -162,7 +178,7 @@ where TStore: PeerStore impl ConnectionHandler for Handler where TStore: PeerStore { - type FromBehaviour = Arc; + type FromBehaviour = HandlerAction; type InboundOpenInfo = (); type InboundProtocol = Protocol; type OutboundOpenInfo = (); @@ -233,35 +249,32 @@ where TStore: PeerStore } // If we do not want any peers, there's nothing further to do - if self.current_want_list.is_empty() { - tracing::debug!( - "peer-sync[{}]: No peers wanted, waiting until peers are wanted", - self.peer_id - ); + if self.current_want_list.is_empty() && self.pending_local_peer_record.is_none() { return Poll::Pending; } tracing::debug!( - "peer-sync[{}]: Want {} peers", + "peer-sync[{}]: Want {} peers, has pending local peer update = {}", self.peer_id, - self.current_want_list.len() + self.current_want_list.len(), + self.pending_local_peer_record.is_some() ); - // Otherwise, wait until another sync is complete - if self.aquired.is_none() { - match self.semaphore.try_acquire_arc() { - Some(guard) => { - self.aquired = Some(guard); - }, - None => { - return Poll::Pending; - }, + // Our turn + if self.must_request_substream { + // Wait until another sync is complete + if self.aquired.is_none() { + match self.semaphore.try_acquire_arc() { + Some(guard) => { + self.aquired = Some(guard); + }, + None => { + return Poll::Pending; + }, + } } - } - tracing::debug!("peer-sync[{}]: Acquired semaphore", self.peer_id); + tracing::debug!("peer-sync[{}]: Acquired semaphore", self.peer_id); - // Our turn, open the substream - if self.must_request_substream { let protocol = self.protocol.clone(); self.must_request_substream = false; @@ -274,10 +287,18 @@ where TStore: PeerStore Poll::Pending } - fn on_behaviour_event(&mut self, want_list: Self::FromBehaviour) { - // Sync from existing connections if there are more want-peers - self.is_complete = !want_list.is_empty(); - self.current_want_list = want_list; + fn on_behaviour_event(&mut self, action: Self::FromBehaviour) { + match action { + HandlerAction::PushLocalRecord(local_record) => { + self.is_complete = false; + self.pending_local_peer_record = Some(local_record); + }, + HandlerAction::WantPeers(want_list) => { + // Sync from existing connections if there are more want-peers + self.is_complete = !want_list.is_empty(); + self.current_want_list = want_list; + }, + } } fn on_connection_event( diff --git a/networking/libp2p-peersync/src/inbound_task.rs b/networking/libp2p-peersync/src/inbound_task.rs index c95a67f7f..27af4bfe6 100644 --- a/networking/libp2p-peersync/src/inbound_task.rs +++ b/networking/libp2p-peersync/src/inbound_task.rs @@ -28,8 +28,59 @@ async fn inbound_sync_task_inner( store: TPeerStore, config: Config, ) -> Result { - let msg = framed.next().await.ok_or(Error::InboundStreamEnded)??; + let mut received_remote_peer_record = false; + loop { + let msg = framed.next().await.ok_or(Error::InboundStreamEnded)??; + match msg.payload { + proto::mod_Message::OneOfpayload::LocalRecord(msg) => { + // Only permitted once per session + if received_remote_peer_record { + return Err(Error::InvalidMessage { + peer_id, + details: format!("peer {peer_id} sent more than one local peer record"), + }); + } + + let msg = SignedPeerRecord::try_from(msg)?; + tracing::debug!( + "Received local peer record from peer {peer_id} containing {} address(es)", + msg.addresses.len() + ); + if !msg.is_valid() { + return Err(Error::InvalidSignedPeer { + peer_id, + details: format!("peer {peer_id} sent an invalid local peer record"), + }); + } + store.put(msg).await.map_err(|e| Error::StoreError(e.to_string()))?; + received_remote_peer_record = true; + }, + proto::mod_Message::OneOfpayload::WantPeers(msg) => { + tracing::debug!( + "Want peer request (size={}) from peer {peer_id}", + msg.want_peer_ids.len() + ); + return handle_want_peers(peer_id, &mut framed, &store, &config, msg).await; + }, + proto::mod_Message::OneOfpayload::None => { + return Ok(Event::InboundRequestCompleted { + peer_id, + peers_sent: 0, + requested: 0, + }); + }, + } + } +} + +async fn handle_want_peers( + peer_id: PeerId, + framed: &mut FramedInbound, + store: &TPeerStore, + config: &Config, + msg: proto::WantPeers, +) -> Result { let mut store_stream = store.stream(); let orig_want_list_len = msg.want_peer_ids.len(); @@ -58,7 +109,7 @@ async fn inbound_sync_task_inner( let event = loop { if remaining_want_list.is_empty() { - break Event::ResponseStreamComplete { + break Event::InboundRequestCompleted { peer_id, peers_sent: orig_want_list_len - remaining_want_list.len(), requested: orig_want_list_len, @@ -66,7 +117,7 @@ async fn inbound_sync_task_inner( } let Some(result) = store_stream.next().await else { - break Event::ResponseStreamComplete { + break Event::InboundRequestCompleted { peer_id, peers_sent: orig_want_list_len - remaining_want_list.len(), requested: orig_want_list_len, diff --git a/networking/libp2p-peersync/src/lib.rs b/networking/libp2p-peersync/src/lib.rs index 0f7c90349..4793735cc 100644 --- a/networking/libp2p-peersync/src/lib.rs +++ b/networking/libp2p-peersync/src/lib.rs @@ -35,4 +35,4 @@ pub use event::*; pub use peer_record::*; /// The maximum message size permitted for peer messages -pub(crate) const MAX_MESSAGE_SIZE: usize = 1024; +pub(crate) const MAX_MESSAGE_SIZE: usize = 3 * 1024; diff --git a/networking/libp2p-peersync/src/outbound_task.rs b/networking/libp2p-peersync/src/outbound_task.rs index d7fefe51c..5740d6f41 100644 --- a/networking/libp2p-peersync/src/outbound_task.rs +++ b/networking/libp2p-peersync/src/outbound_task.rs @@ -10,86 +10,116 @@ use libp2p::{ use crate::{behaviour::WantList, handler::FramedOutbound, proto, store::PeerStore, Error, Event, SignedPeerRecord}; -pub async fn outbound_sync_task( +pub async fn outbound_request_want_list_task( peer_id: PeerId, mut framed: FramedOutbound, store: TPeerStore, want_list: Arc, + local_peer_record: Option>, ) -> Event { - tracing::debug!("Starting outbound protocol sync with peer {}", peer_id); - outbound_sync_task_inner(peer_id, &mut framed, store, want_list) + tracing::debug!("Starting outbound protocol with peer {}", peer_id); + outbound_request_want_list_task_inner(peer_id, &mut framed, store, want_list, local_peer_record.as_deref()) .await .unwrap_or_else(Event::Error) } -async fn outbound_sync_task_inner( - from_peer: PeerId, +async fn outbound_request_want_list_task_inner( + peer_id: PeerId, framed: &mut FramedOutbound, store: TPeerStore, want_list: Arc, + local_peer_record: Option<&SignedPeerRecord>, ) -> Result { - { + if let Some(local_peer_record) = local_peer_record { + tracing::debug!("Sending updated local peer record to peer {}", peer_id); + let msg = proto::SignedPeerRecord::from(local_peer_record); + framed.send(msg.into()).await.map_err(|e| Error::CodecError(e.into()))?; + } + + if want_list.is_empty() { + tracing::debug!("[peer_id={peer_id}] Empty want list. Protocol complete"); + // Nothing further to do, let the peer know framed - .send(proto::WantPeers { - want_peer_ids: want_list.iter().map(|p| p.to_bytes()).collect(), + .send(proto::Message { + payload: proto::mod_Message::OneOfpayload::None, }) .await .map_err(|e| Error::CodecError(e.into()))?; - tracing::debug!("Sent want list to peer {}", from_peer); + return Ok(Event::InboundRequestCompleted { + peer_id, + peers_sent: 1, + requested: 0, + }); + } - let mut new_peers = 0; - while let Some(msg) = framed.next().await { - if new_peers + 1 > want_list.len() { - return Err(Error::InvalidMessage { - peer_id: from_peer, - details: format!("Peer {from_peer} sent us more peers than we requested"), - }); + framed + .send( + proto::WantPeers { + want_peer_ids: want_list.iter().map(|p| p.to_bytes()).collect(), } + .into(), + ) + .await + .map_err(|e| Error::CodecError(e.into()))?; + tracing::debug!("Sent want list (size={}) to peer {}", want_list.len(), peer_id); - match msg { - Ok(msg) => { - let Some(peer) = msg.peer else { - return Err(Error::InvalidMessage { - peer_id: from_peer, - details: "empty message".to_string(), - }); - }; + let mut new_peers = 0; + while let Some(msg) = framed.next().await { + if new_peers + 1 > want_list.len() { + return Err(Error::InvalidMessage { + peer_id, + details: format!("Peer {peer_id} sent us more peers than we requested"), + }); + } - let rec = match SignedPeerRecord::try_from(peer) { - Ok(rec) => rec, - Err(e) => { - return Err(Error::InvalidMessage { - peer_id: from_peer, - details: e.to_string(), - }); - }, - }; + match msg { + Ok(msg) => { + let Some(peer) = msg.peer else { + return Err(Error::InvalidMessage { + peer_id, + details: "empty message".to_string(), + }); + }; - if !want_list.contains(&rec.to_peer_id()) { + let rec = match SignedPeerRecord::try_from(peer) { + Ok(rec) => rec, + Err(e) => { return Err(Error::InvalidMessage { - peer_id: from_peer, - details: format!("Peer {from_peer} sent us a peer we didnt request"), + peer_id, + details: e.to_string(), }); - } + }, + }; - new_peers += 1; + if !want_list.contains(&rec.to_peer_id()) { + return Err(Error::InvalidMessage { + peer_id, + details: format!("Peer {peer_id} sent us a peer we didnt request"), + }); + } - store - .put_if_newer(rec) - .await - .map_err(|err| Error::StoreError(err.to_string()))?; - }, - Err(e) => { - let e = io::Error::from(e); - if e.kind() == io::ErrorKind::UnexpectedEof { - return Ok(Event::OutboundStreamInterrupted { peer_id: from_peer }); - } else { - return Err(Error::CodecError(e)); - } - }, - } - } + new_peers += 1; - Ok(Event::PeerBatchReceived { from_peer, new_peers }) + store + .put_if_newer(rec) + .await + .map_err(|err| Error::StoreError(err.to_string()))?; + }, + Err(e) => { + let e = io::Error::from(e); + if e.kind() == io::ErrorKind::UnexpectedEof { + return Ok(Event::OutboundStreamInterrupted { peer_id }); + } else { + return Err(Error::CodecError(e)); + } + }, + } } + + tracing::debug!("Received {} new peers from {}", new_peers, peer_id); + + Ok(Event::PeerBatchReceived { + from_peer: peer_id, + new_peers, + }) } diff --git a/networking/libp2p-peersync/src/peer_record.rs b/networking/libp2p-peersync/src/peer_record.rs index df895681d..77c9ae227 100644 --- a/networking/libp2p-peersync/src/peer_record.rs +++ b/networking/libp2p-peersync/src/peer_record.rs @@ -69,25 +69,33 @@ impl TryFrom for SignedPeerRecord { } } +impl From<&SignedPeerRecord> for proto::SignedPeerRecord { + fn from(value: &SignedPeerRecord) -> Self { + Self { + addresses: value.addresses.iter().map(|a| a.to_vec()).collect(), + ts_updated_at: value.updated_at.as_secs(), + signature: Some(proto::PeerSignature::from(value.signature.clone())), + } + } +} + impl From for proto::SignedPeerRecord { fn from(value: SignedPeerRecord) -> Self { Self { - addresses: value.addresses.into_iter().map(|a| a.to_vec()).collect(), + addresses: value.addresses.iter().map(|a| a.to_vec()).collect(), ts_updated_at: value.updated_at.as_secs(), - signature: Some(value.signature.into()), + signature: Some(proto::PeerSignature::from(value.signature)), } } } -impl TryFrom for SignedPeerRecord { - type Error = Error; - - fn try_from(value: LocalPeerRecord) -> Result { - Ok(Self { +impl From for SignedPeerRecord { + fn from(value: LocalPeerRecord) -> Self { + Self { addresses: value.addresses.into_iter().collect(), updated_at: value.updated_at, - signature: value.signature.ok_or_else(|| Error::LocalPeerNotSigned)?, - }) + signature: value.signature, + } } } @@ -96,16 +104,19 @@ pub struct LocalPeerRecord { keypair: Arc, addresses: HashSet, updated_at: Duration, - signature: Option, + signature: PeerSignature, } impl LocalPeerRecord { pub fn new(keypair: Arc) -> Self { + let updated_at = epoch_time_now(); + let addresses = HashSet::new(); + let signature = sign_peer_rec(&keypair, &addresses, &updated_at); Self { keypair, - addresses: HashSet::new(), - updated_at: epoch_time_now(), - signature: None, + addresses, + updated_at, + signature, } } @@ -113,32 +124,34 @@ impl LocalPeerRecord { self.keypair.public().to_peer_id() } - pub fn add_address(&mut self, address: Multiaddr) { - self.addresses.insert(address); - self.sign(); + pub fn add_address(&mut self, address: Multiaddr) -> bool { + if self.addresses.insert(address) { + // Sign only if the address was not already there + self.sign(); + return true; + } + false } - pub fn remove_address(&mut self, address: &Multiaddr) { - self.addresses.remove(address); - self.sign(); + pub fn remove_address(&mut self, address: &Multiaddr) -> bool { + if self.addresses.remove(address) { + self.sign(); + return true; + } + false } pub fn addresses(&self) -> &HashSet { &self.addresses } - pub fn is_signed(&self) -> bool { - self.signature.is_some() - } - pub fn encode_to_proto(&self) -> Result { - SignedPeerRecord::try_from(self.clone())?.encode_to_proto() + SignedPeerRecord::from(self.clone()).encode_to_proto() } fn sign(&mut self) { self.updated_at = epoch_time_now(); - let msg = peer_signature_challenge(&self.addresses, &self.updated_at); - self.signature = Some(PeerSignature::sign(&self.keypair, &msg)); + self.signature = sign_peer_rec(&self.keypair, &self.addresses, &self.updated_at); } } @@ -189,6 +202,11 @@ impl TryFrom for PeerSignature { } } +fn sign_peer_rec(keypair: &identity::Keypair, addresses: &HashSet, updated_at: &Duration) -> PeerSignature { + let msg = peer_signature_challenge(addresses, updated_at); + PeerSignature::sign(keypair, &msg) +} + fn peer_signature_challenge<'a, I: IntoIterator>( addresses: I, updated_at: &Duration, diff --git a/networking/libp2p-peersync/src/proto.rs b/networking/libp2p-peersync/src/proto.rs index cf8df6ad7..aeb335b67 100644 --- a/networking/libp2p-peersync/src/proto.rs +++ b/networking/libp2p-peersync/src/proto.rs @@ -4,3 +4,19 @@ include!(concat!(env!("OUT_DIR"), "/proto/mod.rs")); pub use messages::*; + +impl From for Message { + fn from(value: WantPeers) -> Self { + Self { + payload: mod_Message::OneOfpayload::WantPeers(value), + } + } +} + +impl From for Message { + fn from(value: SignedPeerRecord) -> Self { + Self { + payload: mod_Message::OneOfpayload::LocalRecord(value), + } + } +}