diff --git a/node/network/bitfield-distribution/src/lib.rs b/node/network/bitfield-distribution/src/lib.rs index ded5f7161133..cc0d5b721667 100644 --- a/node/network/bitfield-distribution/src/lib.rs +++ b/node/network/bitfield-distribution/src/lib.rs @@ -17,13 +17,13 @@ //! The bitfield distribution subsystem spreading @todo . use codec::{Codec, Decode, Encode}; +use futures::stream::StreamExt; use futures::{ channel::oneshot, future::{abortable, AbortHandle, Abortable}, - Future, FutureExt, stream::FuturesUnordered, + Future, FutureExt, }; -use futures::stream::StreamExt; use node_primitives::{ProtocolId, SignedFullStatement, View}; use polkadot_network::protocol::Message; @@ -31,6 +31,7 @@ use polkadot_node_subsystem::messages::*; use polkadot_node_subsystem::{ FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult, }; +use polkadot_primitives::parachain::SignedAvailabilityBitfield; use polkadot_primitives::parachain::{SigningContext, ValidatorId}; use polkadot_primitives::Hash; use sc_network::ReputationChange; @@ -66,7 +67,6 @@ struct Tracker { per_job: HashMap, } - /// Data for each relay parent #[derive(Debug, Clone, Default)] struct JobData { @@ -80,8 +80,6 @@ struct JobData { validator_set: Vec, } - - fn network_update_message(n: NetworkBridgeEvent) -> AllMessages { AllMessages::BitfieldDistribution(BitfieldDistributionMessage::NetworkBridgeUpdate(n)) } @@ -110,40 +108,51 @@ impl BitfieldDistribution { loop { { let message = { - let mut ctx = ctx.clone(); - ctx.recv().await? + let mut ctx = ctx.clone(); + ctx.recv().await? }; match message { FromOverseer::Communication { msg } => { let peerid = PeerId::random(); // @todo - process_incoming(ctx.clone(), &mut tracker, peerid, msg).await?; + match msg { + // Distribute a bitfield via gossip to other validators. + BitfieldDistributionMessage::DistributeBitfield( + hash, + signed_availability, + ) => { + let msg = BitfieldGossip { + relay_parent: hash, + signed_availability, + }; + process_incoming(ctx.clone(), &mut tracker, peerid, msg).await?; + } + BitfieldDistributionMessage::NetworkBridgeUpdate(event) => { + handle_network_msg(ctx.clone(), &mut tracker, event).await?; + } + } } FromOverseer::Signal(OverseerSignal::StartWork(relay_parent)) => { let (validator_set, signing_context) = query_basics(ctx.clone(), relay_parent).await?; - let _ = tracker.per_job.insert(relay_parent, JobData { + let _ = tracker.per_job.insert( + relay_parent, + JobData { validator_bitset_received: HashSet::new(), signing_context, validator_set: validator_set, - }); + }, + ); let future = processor_per_relay_parent(ctx.clone(), relay_parent.clone()); let (future, abort_handle) = abortable(future); + let _future = ctx.spawn(Box::pin(future.map(|_| ()))); - let _future = - ctx.spawn(Box::pin( - future.map(|_| { () }) - )); - - active_jobs - .insert(relay_parent.clone(), abort_handle); + active_jobs.insert(relay_parent.clone(), abort_handle); } FromOverseer::Signal(OverseerSignal::StopWork(relay_parent)) => { - if let Some(abort_handle) = - active_jobs.remove(&relay_parent) - { + if let Some(abort_handle) = active_jobs.remove(&relay_parent) { let _ = abort_handle.abort(); } } @@ -155,10 +164,33 @@ impl BitfieldDistribution { // .map(|(_relay_parent, (cancellation, future))| future), // ) // .await; + + // better: + // let future = move || { + // tracker.peer_views.clone().into_iter() + // .filter(|(_peerid, view)| view.contains(&hash)) + // .map(|(peerid, view)| { + // let mut ctx = ctx.clone(); + // let hash = hash.clone(); + // let signed_availability = signed_availability.clone(); + // async move { + // let bytes = BitfieldGossip { + // relay_parent: hash, + // signed_availability, + // }.encode(); + // ctx.send_message( + // AllMessages::NetworkBridge( + // NetworkBridgeMessage::SendMessage(vec![], BitfieldDistribution::PROTOCOL_ID, bytes), + // )).await + // } + // }) + // .collect::>().into_future() + // }; + // future().await; for (_relay_parent, abort_handle) in active_jobs.drain() { let _ = abort_handle.abort(); } - return Ok(()) + return Ok(()); } } } @@ -168,9 +200,11 @@ impl BitfieldDistribution { } } - /// Process all requests related to one relay parent hash -async fn processor_per_relay_parent(mut ctx: Context, relay_parent: Hash) -> SubsystemResult<()> { +async fn processor_per_relay_parent( + mut ctx: Context, + relay_parent: Hash, +) -> SubsystemResult<()> { let mut tracker = Tracker::default(); loop { // todo!("consume relay parents") @@ -179,7 +213,11 @@ async fn processor_per_relay_parent(mut ctx: Context, relay_parent: Has } /// modify the reputiation, good or bad -async fn modify_reputiation(mut ctx: Context, peerid: PeerId, rep: ReputationChange) -> SubsystemResult<()> +async fn modify_reputiation( + mut ctx: Context, + peerid: PeerId, + rep: ReputationChange, +) -> SubsystemResult<()> where Context: SubsystemContext + Clone, { @@ -194,65 +232,71 @@ async fn process_incoming( mut ctx: Context, tracker: &mut Tracker, peerid: PeerId, - message: BitfieldDistributionMessage, + message: BitfieldGossip, ) -> SubsystemResult<()> where Context: SubsystemContext + Clone, { let peer_view = tracker.peer_views.get(&peerid).expect("TODO"); - match message { - // Distribute a bitfield via gossip to other validators. - BitfieldDistributionMessage::DistributeBitfield(hash, signed_availability) => { - let job_data = if let Some(job_data) = tracker.per_job.get(&hash) { - job_data - } else { - return modify_reputiation(ctx, peerid, COST_NOT_INTERESTED).await; - }; - - // @todo should we only distribute availability messages to peer if they are relevant to us - // or is the only discriminator if the peer cares about it? - if !peer_view.contains(&hash) { - // we don't care about this, the other side should have known better - return modify_reputiation(ctx, peerid, COST_NOT_INTERESTED).await; - } - - let validator_set = &job_data.validator_set; - if validator_set.len() == 0 { - return modify_reputiation(ctx.clone(), peerid, COST_MISSING_PEER_SESSION_KEY).await - } + let BitfieldGossip { relay_parent, signed_availability} = message; + + let job_data = if let Some(job_data) = tracker.per_job.get(&relay_parent) { + job_data + } else { + return modify_reputiation(ctx, peerid, COST_NOT_INTERESTED).await; + }; + + // @todo should we only distribute availability messages to peer if they are relevant to us + // or is the only discriminator if the peer cares about it? + if !peer_view.contains(&relay_parent) { + // we don't care about this, the other side should have known better + return modify_reputiation(ctx, peerid, COST_NOT_INTERESTED).await; + } - // check all validators that could have signed this message - if let Some(_) = validator_set.iter().find(|validator| { signed_availability.check_signature(&job_data.signing_context, validator).is_ok() }) { - return modify_reputiation(ctx.clone(), peerid, COST_SIGNATURE_INVALID) - .await - } + let validator_set = &job_data.validator_set; + if validator_set.len() == 0 { + return modify_reputiation(ctx.clone(), peerid, COST_MISSING_PEER_SESSION_KEY).await; + } - // concurrently pass on the bitfield distribution to all peers - let future = move || { - tracker.peer_views.clone().into_iter() - .filter(|(_peerid, view)| view.contains(&hash)) - .map(|(peerid, view)| { - let mut ctx = ctx.clone(); - let hash = hash.clone(); - let signed_availability = signed_availability.clone(); - async move { - ctx.send_message( - AllMessages::BitfieldDistribution( - BitfieldDistributionMessage::DistributeBitfield(hash, signed_availability), - )).await - } - }) - .collect::>().into_future() - }; - future().await; - } - BitfieldDistributionMessage::NetworkBridgeUpdate(event) => { - handle_network_msg(ctx, tracker, event).await?; - } + // check all validators that could have signed this message + if let Some(_) = validator_set.iter().find(|validator| { + signed_availability + .check_signature(&job_data.signing_context, validator) + .is_ok() + }) { + return modify_reputiation(ctx.clone(), peerid, COST_SIGNATURE_INVALID).await; } + + // concurrently pass on the bitfield distribution to all interested peers + let interested_peers = tracker + .peer_views + .iter() + .filter(|(_peerid, view)| view.contains(&relay_parent)) + .map(|(peerid, _)| peerid.clone()) + .collect::>(); + let message = BitfieldGossip { + relay_parent: relay_parent, + signed_availability, + }; + let bytes = message.encode(); + ctx.send_message(AllMessages::NetworkBridge( + NetworkBridgeMessage::SendMessage( + interested_peers, + BitfieldDistribution::PROTOCOL_ID, + bytes, + ), + )) + .await?; Ok(()) } +/// A Gossiped signed availability bitfield for a particular relay hash +#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] +pub struct BitfieldGossip { + relay_parent: Hash, + signed_availability: SignedAvailabilityBitfield, +} + /// Deal with network bridge updates and track what needs to be tracked async fn handle_network_msg( mut ctx: Context, @@ -283,7 +327,9 @@ where NetworkBridgeEvent::OurViewChange(view) => { let ego = ego.clone(); let old_view = std::mem::replace(&mut (tracker.view), view); - tracker.per_job.retain(move |hash, _job_data| ego.0.contains(hash)); + tracker + .per_job + .retain(move |hash, _job_data| ego.0.contains(hash)); for new in tracker.view.difference(&old_view) { if !tracker.per_job.contains_key(&new) { @@ -293,23 +339,12 @@ where } } } - NetworkBridgeEvent::PeerMessage(remote, bytes) => { + NetworkBridgeEvent::PeerMessage(remote, mut bytes) => { log::info!("Got a peer message from {:?}", &remote); - // @todo what would we receive here? - match Message::decode(&mut bytes.as_ref()) { - Ok(message) => { - match message { - // a new session key - Message::ValidatorId(session_key) => { - // @todo update all Validator ids I guess? - // let _ = tracker - // .per_job. - // .insert(remote.clone(), session_key); - } - _ => {} - } - } - Err(_) => unimplemented!("Invalid format shall be punished I guess"), + if let Ok(gossiped_bitfield) = BitfieldGossip::decode(&mut (bytes.as_slice())) { + process_incoming(ctx, tracker, remote, gossiped_bitfield).await?; + } else { + return modify_reputiation(ctx.clone(), remote, COST_MESSAGE_NOT_DECODABLE).await; } } }