From e2f354fd58d1f1be6cad870ef6c8d26f06f761a8 Mon Sep 17 00:00:00 2001 From: Green Baneling Date: Mon, 16 Oct 2023 21:23:14 +0200 Subject: [PATCH] Mark reserved peers as explicit for gossipsub to avoid reputation decreasing (#1423) Closes https://github.com/FuelLabs/fuel-core/issues/1384 After debugging the issue with transaction broadcasting in the Beta 4 network, I can confirm that it is related to the gossipsub reputation. At some point, no one sends transactions to the authority node. When I manually restart one sentry node, this node starts to send transactions to the authority node. It sends transactions via gossiping and publishing: - Publishing: I send a transaction to sentry 1, which sends it to authority. - Gossiping: I send the transaction to sentry 0, it sends the transaction to sentry 1, and it sends the transaction to authority. But at some point, gossiping doesn't work while publishing still works(Sentry 0 gossips transaction to Sentry 1, and Sentry 1 doesn't gossip it to authority). After some time, the publishing doesn't work either. The gossipsub has two thresholds, one for gossiping and one for publishing. We use these values: image So, the described behavior aligns with authority reputation decreasing. I don't know the reason why it happens(maybe decay), but adding all reserved peers to explicit peers should solve the problem. --- crates/fuel-core/src/service/sub_services.rs | 5 ++-- crates/services/p2p/src/config.rs | 2 +- crates/services/p2p/src/gossipsub/config.rs | 7 ++++- crates/services/p2p/src/service.rs | 29 ++++++++++++++++---- 4 files changed, 34 insertions(+), 9 deletions(-) diff --git a/crates/fuel-core/src/service/sub_services.rs b/crates/fuel-core/src/service/sub_services.rs index 56238449416..70ddf06cc2d 100644 --- a/crates/fuel-core/src/service/sub_services.rs +++ b/crates/fuel-core/src/service/sub_services.rs @@ -96,12 +96,13 @@ pub fn init_sub_services( #[cfg(feature = "p2p")] let mut network = { - if let Some(config) = config.p2p.clone() { + if let Some(p2p_config) = config.p2p.clone() { let p2p_db = database.clone(); let genesis = p2p_db.get_genesis()?; - let p2p_config = config.init(genesis)?; + let p2p_config = p2p_config.init(genesis)?; Some(fuel_core_p2p::service::new_service( + config.chain_conf.consensus_parameters.chain_id, p2p_config, p2p_db, importer_adapter.clone(), diff --git a/crates/services/p2p/src/config.rs b/crates/services/p2p/src/config.rs index 1ce3f25294c..54f7752d05b 100644 --- a/crates/services/p2p/src/config.rs +++ b/crates/services/p2p/src/config.rs @@ -320,7 +320,7 @@ pub(crate) fn build_transport( (transport, connection_state) } -fn peer_ids_set_from(multiaddr: &[Multiaddr]) -> HashSet { +pub fn peer_ids_set_from(multiaddr: &[Multiaddr]) -> HashSet { multiaddr .iter() // Safety: as is the case with `bootstrap_nodes` it is assumed that `reserved_nodes` [`Multiadr`] diff --git a/crates/services/p2p/src/gossipsub/config.rs b/crates/services/p2p/src/gossipsub/config.rs index 6901a8277b1..c703ac23064 100644 --- a/crates/services/p2p/src/gossipsub/config.rs +++ b/crates/services/p2p/src/gossipsub/config.rs @@ -175,7 +175,7 @@ fn initialize_peer_score_thresholds() -> PeerScoreThresholds { /// Given a `P2pConfig` containing `GossipsubConfig` creates a Gossipsub Behaviour pub(crate) fn build_gossipsub_behaviour(p2p_config: &Config) -> Gossipsub { - if p2p_config.metrics { + let mut gossipsub = if p2p_config.metrics { // Move to Metrics related feature flag let mut p2p_registry = Registry::default(); @@ -208,7 +208,12 @@ pub(crate) fn build_gossipsub_behaviour(p2p_config: &Config) -> Gossipsub { initialize_gossipsub(&mut gossipsub, p2p_config); gossipsub + }; + for peer_id in crate::config::peer_ids_set_from(&p2p_config.reserved_nodes) { + gossipsub.add_explicit_peer(&peer_id); } + + gossipsub } fn initialize_gossipsub(gossipsub: &mut Gossipsub, p2p_config: &Config) { diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 29e9ca155b1..3a4a25940dd 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -38,8 +38,14 @@ use fuel_core_types::{ SealedBlock, SealedBlockHeader, }, - fuel_tx::Transaction, - fuel_types::BlockHeight, + fuel_tx::{ + Transaction, + UniqueIdentifier, + }, + fuel_types::{ + BlockHeight, + ChainId, + }, services::p2p::{ peer_reputation::{ AppScore, @@ -275,6 +281,7 @@ impl Broadcast for SharedState { /// Orchestrates various p2p-related events between the inner `P2pService` /// and the top level `NetworkService`. pub struct Task { + chain_id: ChainId, p2p_service: P, db: Arc, next_block_height: BoxStream, @@ -298,6 +305,7 @@ pub struct HeartbeatPeerReputationConfig { impl Task, D, SharedState> { pub fn new( + chain_id: ChainId, config: Config, db: Arc, block_importer: Arc, @@ -330,6 +338,7 @@ impl Task, D, SharedState> { let next_check_time = Instant::now() + heartbeat_check_interval; Self { + chain_id, p2p_service, db, request_receiver, @@ -445,10 +454,11 @@ where should_continue = true; match next_service_request { Some(TaskRequest::BroadcastTransaction(transaction)) => { + let tx_id = transaction.id(&self.chain_id); let broadcast = GossipsubBroadcastRequest::NewTx(transaction); let result = self.p2p_service.publish_message(broadcast); if let Err(e) = result { - tracing::error!("Got an error during transaction broadcasting {}", e); + tracing::error!("Got an error during transaction {} broadcasting {}", tx_id, e); } } Some(TaskRequest::BroadcastBlock(block)) => { @@ -786,12 +796,18 @@ impl SharedState { } } -pub fn new_service(p2p_config: Config, db: D, block_importer: B) -> Service +pub fn new_service( + chain_id: ChainId, + p2p_config: Config, + db: D, + block_importer: B, +) -> Service where D: P2pDb + 'static, B: BlockHeightImporter, { Service::new(Task::new( + chain_id, p2p_config, Arc::new(db), Arc::new(block_importer), @@ -891,7 +907,8 @@ pub mod tests { #[tokio::test] async fn start_and_stop_awaits_works() { let p2p_config = Config::default_initialized("start_stop_works"); - let service = new_service(p2p_config, FakeDb, FakeBlockImporter); + let service = + new_service(ChainId::default(), p2p_config, FakeDb, FakeBlockImporter); // Node with p2p service started assert!(service.start_and_await().await.unwrap().started()); @@ -1077,6 +1094,7 @@ pub mod tests { }; let mut task = Task { + chain_id: Default::default(), p2p_service, db: Arc::new(FakeDB), next_block_height: FakeBlockImporter.next_block_height(), @@ -1155,6 +1173,7 @@ pub mod tests { }; let mut task = Task { + chain_id: Default::default(), p2p_service, db: Arc::new(FakeDB), next_block_height: FakeBlockImporter.next_block_height(),