Skip to content

Commit

Permalink
Mark reserved peers as explicit for gossipsub to avoid reputation dec…
Browse files Browse the repository at this point in the history
…reasing (#1423)

Closes #1384

After debugging the issue with transaction broadcasting in the Beta 4
network, I can confirm that it is related to the gossipsub reputation.

At some point, no one sends transactions to the authority node. When I
manually restart one sentry node, this node starts to send transactions
to the authority node. It sends transactions via gossiping and
publishing:
- Publishing: I send a transaction to sentry 1, which sends it to
authority.
- Gossiping: I send the transaction to sentry 0, it sends the
transaction to sentry 1, and it sends the transaction to authority.

But at some point, gossiping doesn't work while publishing still
works(Sentry 0 gossips transaction to Sentry 1, and Sentry 1 doesn't
gossip it to authority).

After some time, the publishing doesn't work either.

The gossipsub has two thresholds, one for gossiping and one for
publishing. We use these values:

<img width="652" alt="image"
src="https://github.com/FuelLabs/fuel-core/assets/18346821/bcb1585c-239b-4cf9-9fae-c9376b958201">

So, the described behavior aligns with authority reputation decreasing.
I don't know the reason why it happens(maybe decay), but adding all
reserved peers to explicit peers should solve the problem.
  • Loading branch information
xgreenx committed Oct 20, 2023
1 parent 9402068 commit e2f354f
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 9 deletions.
5 changes: 3 additions & 2 deletions crates/fuel-core/src/service/sub_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,13 @@ pub fn init_sub_services(

#[cfg(feature = "p2p")]
let mut network = {
if let Some(config) = config.p2p.clone() {
if let Some(p2p_config) = config.p2p.clone() {
let p2p_db = database.clone();
let genesis = p2p_db.get_genesis()?;
let p2p_config = config.init(genesis)?;
let p2p_config = p2p_config.init(genesis)?;

Some(fuel_core_p2p::service::new_service(
config.chain_conf.consensus_parameters.chain_id,
p2p_config,
p2p_db,
importer_adapter.clone(),
Expand Down
2 changes: 1 addition & 1 deletion crates/services/p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ pub(crate) fn build_transport(
(transport, connection_state)
}

fn peer_ids_set_from(multiaddr: &[Multiaddr]) -> HashSet<PeerId> {
pub fn peer_ids_set_from(multiaddr: &[Multiaddr]) -> HashSet<PeerId> {
multiaddr
.iter()
// Safety: as is the case with `bootstrap_nodes` it is assumed that `reserved_nodes` [`Multiadr`]
Expand Down
7 changes: 6 additions & 1 deletion crates/services/p2p/src/gossipsub/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ fn initialize_peer_score_thresholds() -> PeerScoreThresholds {

/// Given a `P2pConfig` containing `GossipsubConfig` creates a Gossipsub Behaviour
pub(crate) fn build_gossipsub_behaviour(p2p_config: &Config) -> Gossipsub {
if p2p_config.metrics {
let mut gossipsub = if p2p_config.metrics {
// Move to Metrics related feature flag
let mut p2p_registry = Registry::default();

Expand Down Expand Up @@ -208,7 +208,12 @@ pub(crate) fn build_gossipsub_behaviour(p2p_config: &Config) -> Gossipsub {
initialize_gossipsub(&mut gossipsub, p2p_config);

gossipsub
};
for peer_id in crate::config::peer_ids_set_from(&p2p_config.reserved_nodes) {
gossipsub.add_explicit_peer(&peer_id);
}

gossipsub
}

fn initialize_gossipsub(gossipsub: &mut Gossipsub, p2p_config: &Config) {
Expand Down
29 changes: 24 additions & 5 deletions crates/services/p2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,14 @@ use fuel_core_types::{
SealedBlock,
SealedBlockHeader,
},
fuel_tx::Transaction,
fuel_types::BlockHeight,
fuel_tx::{
Transaction,
UniqueIdentifier,
},
fuel_types::{
BlockHeight,
ChainId,
},
services::p2p::{
peer_reputation::{
AppScore,
Expand Down Expand Up @@ -275,6 +281,7 @@ impl Broadcast for SharedState {
/// Orchestrates various p2p-related events between the inner `P2pService`
/// and the top level `NetworkService`.
pub struct Task<P, D, B> {
chain_id: ChainId,
p2p_service: P,
db: Arc<D>,
next_block_height: BoxStream<BlockHeight>,
Expand All @@ -298,6 +305,7 @@ pub struct HeartbeatPeerReputationConfig {

impl<D> Task<FuelP2PService<PostcardCodec>, D, SharedState> {
pub fn new<B: BlockHeightImporter>(
chain_id: ChainId,
config: Config,
db: Arc<D>,
block_importer: Arc<B>,
Expand Down Expand Up @@ -330,6 +338,7 @@ impl<D> Task<FuelP2PService<PostcardCodec>, D, SharedState> {
let next_check_time = Instant::now() + heartbeat_check_interval;

Self {
chain_id,
p2p_service,
db,
request_receiver,
Expand Down Expand Up @@ -445,10 +454,11 @@ where
should_continue = true;
match next_service_request {
Some(TaskRequest::BroadcastTransaction(transaction)) => {
let tx_id = transaction.id(&self.chain_id);
let broadcast = GossipsubBroadcastRequest::NewTx(transaction);
let result = self.p2p_service.publish_message(broadcast);
if let Err(e) = result {
tracing::error!("Got an error during transaction broadcasting {}", e);
tracing::error!("Got an error during transaction {} broadcasting {}", tx_id, e);
}
}
Some(TaskRequest::BroadcastBlock(block)) => {
Expand Down Expand Up @@ -786,12 +796,18 @@ impl SharedState {
}
}

pub fn new_service<D, B>(p2p_config: Config, db: D, block_importer: B) -> Service<D>
pub fn new_service<D, B>(
chain_id: ChainId,
p2p_config: Config,
db: D,
block_importer: B,
) -> Service<D>
where
D: P2pDb + 'static,
B: BlockHeightImporter,
{
Service::new(Task::new(
chain_id,
p2p_config,
Arc::new(db),
Arc::new(block_importer),
Expand Down Expand Up @@ -891,7 +907,8 @@ pub mod tests {
#[tokio::test]
async fn start_and_stop_awaits_works() {
let p2p_config = Config::default_initialized("start_stop_works");
let service = new_service(p2p_config, FakeDb, FakeBlockImporter);
let service =
new_service(ChainId::default(), p2p_config, FakeDb, FakeBlockImporter);

// Node with p2p service started
assert!(service.start_and_await().await.unwrap().started());
Expand Down Expand Up @@ -1077,6 +1094,7 @@ pub mod tests {
};

let mut task = Task {
chain_id: Default::default(),
p2p_service,
db: Arc::new(FakeDB),
next_block_height: FakeBlockImporter.next_block_height(),
Expand Down Expand Up @@ -1155,6 +1173,7 @@ pub mod tests {
};

let mut task = Task {
chain_id: Default::default(),
p2p_service,
db: Arc::new(FakeDB),
next_block_height: FakeBlockImporter.next_block_height(),
Expand Down

0 comments on commit e2f354f

Please sign in to comment.