diff --git a/crates/fuel-core/src/service/sub_services.rs b/crates/fuel-core/src/service/sub_services.rs index 56238449416..70ddf06cc2d 100644 --- a/crates/fuel-core/src/service/sub_services.rs +++ b/crates/fuel-core/src/service/sub_services.rs @@ -96,12 +96,13 @@ pub fn init_sub_services( #[cfg(feature = "p2p")] let mut network = { - if let Some(config) = config.p2p.clone() { + if let Some(p2p_config) = config.p2p.clone() { let p2p_db = database.clone(); let genesis = p2p_db.get_genesis()?; - let p2p_config = config.init(genesis)?; + let p2p_config = p2p_config.init(genesis)?; Some(fuel_core_p2p::service::new_service( + config.chain_conf.consensus_parameters.chain_id, p2p_config, p2p_db, importer_adapter.clone(), diff --git a/crates/services/p2p/src/config.rs b/crates/services/p2p/src/config.rs index 1ce3f25294c..54f7752d05b 100644 --- a/crates/services/p2p/src/config.rs +++ b/crates/services/p2p/src/config.rs @@ -320,7 +320,7 @@ pub(crate) fn build_transport( (transport, connection_state) } -fn peer_ids_set_from(multiaddr: &[Multiaddr]) -> HashSet { +pub fn peer_ids_set_from(multiaddr: &[Multiaddr]) -> HashSet { multiaddr .iter() // Safety: as is the case with `bootstrap_nodes` it is assumed that `reserved_nodes` [`Multiadr`] diff --git a/crates/services/p2p/src/gossipsub/config.rs b/crates/services/p2p/src/gossipsub/config.rs index 6901a8277b1..c703ac23064 100644 --- a/crates/services/p2p/src/gossipsub/config.rs +++ b/crates/services/p2p/src/gossipsub/config.rs @@ -175,7 +175,7 @@ fn initialize_peer_score_thresholds() -> PeerScoreThresholds { /// Given a `P2pConfig` containing `GossipsubConfig` creates a Gossipsub Behaviour pub(crate) fn build_gossipsub_behaviour(p2p_config: &Config) -> Gossipsub { - if p2p_config.metrics { + let mut gossipsub = if p2p_config.metrics { // Move to Metrics related feature flag let mut p2p_registry = Registry::default(); @@ -208,7 +208,12 @@ pub(crate) fn build_gossipsub_behaviour(p2p_config: &Config) -> Gossipsub { initialize_gossipsub(&mut gossipsub, p2p_config); gossipsub + }; + for peer_id in crate::config::peer_ids_set_from(&p2p_config.reserved_nodes) { + gossipsub.add_explicit_peer(&peer_id); } + + gossipsub } fn initialize_gossipsub(gossipsub: &mut Gossipsub, p2p_config: &Config) { diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 29e9ca155b1..3a4a25940dd 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -38,8 +38,14 @@ use fuel_core_types::{ SealedBlock, SealedBlockHeader, }, - fuel_tx::Transaction, - fuel_types::BlockHeight, + fuel_tx::{ + Transaction, + UniqueIdentifier, + }, + fuel_types::{ + BlockHeight, + ChainId, + }, services::p2p::{ peer_reputation::{ AppScore, @@ -275,6 +281,7 @@ impl Broadcast for SharedState { /// Orchestrates various p2p-related events between the inner `P2pService` /// and the top level `NetworkService`. pub struct Task { + chain_id: ChainId, p2p_service: P, db: Arc, next_block_height: BoxStream, @@ -298,6 +305,7 @@ pub struct HeartbeatPeerReputationConfig { impl Task, D, SharedState> { pub fn new( + chain_id: ChainId, config: Config, db: Arc, block_importer: Arc, @@ -330,6 +338,7 @@ impl Task, D, SharedState> { let next_check_time = Instant::now() + heartbeat_check_interval; Self { + chain_id, p2p_service, db, request_receiver, @@ -445,10 +454,11 @@ where should_continue = true; match next_service_request { Some(TaskRequest::BroadcastTransaction(transaction)) => { + let tx_id = transaction.id(&self.chain_id); let broadcast = GossipsubBroadcastRequest::NewTx(transaction); let result = self.p2p_service.publish_message(broadcast); if let Err(e) = result { - tracing::error!("Got an error during transaction broadcasting {}", e); + tracing::error!("Got an error during transaction {} broadcasting {}", tx_id, e); } } Some(TaskRequest::BroadcastBlock(block)) => { @@ -786,12 +796,18 @@ impl SharedState { } } -pub fn new_service(p2p_config: Config, db: D, block_importer: B) -> Service +pub fn new_service( + chain_id: ChainId, + p2p_config: Config, + db: D, + block_importer: B, +) -> Service where D: P2pDb + 'static, B: BlockHeightImporter, { Service::new(Task::new( + chain_id, p2p_config, Arc::new(db), Arc::new(block_importer), @@ -891,7 +907,8 @@ pub mod tests { #[tokio::test] async fn start_and_stop_awaits_works() { let p2p_config = Config::default_initialized("start_stop_works"); - let service = new_service(p2p_config, FakeDb, FakeBlockImporter); + let service = + new_service(ChainId::default(), p2p_config, FakeDb, FakeBlockImporter); // Node with p2p service started assert!(service.start_and_await().await.unwrap().started()); @@ -1077,6 +1094,7 @@ pub mod tests { }; let mut task = Task { + chain_id: Default::default(), p2p_service, db: Arc::new(FakeDB), next_block_height: FakeBlockImporter.next_block_height(), @@ -1155,6 +1173,7 @@ pub mod tests { }; let mut task = Task { + chain_id: Default::default(), p2p_service, db: Arc::new(FakeDB), next_block_height: FakeBlockImporter.next_block_height(),