From 9012e2c18e4026d7005d227d34d2a8edec552f10 Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Tue, 14 Jul 2020 13:58:41 +0200 Subject: [PATCH] lacks abortable processing impl details --- node/network/bitfield-distribution/src/lib.rs | 216 +++++++++--------- 1 file changed, 108 insertions(+), 108 deletions(-) diff --git a/node/network/bitfield-distribution/src/lib.rs b/node/network/bitfield-distribution/src/lib.rs index cc0d5b721667..0d7b3daca5c8 100644 --- a/node/network/bitfield-distribution/src/lib.rs +++ b/node/network/bitfield-distribution/src/lib.rs @@ -17,11 +17,10 @@ //! The bitfield distribution subsystem spreading @todo . use codec::{Codec, Decode, Encode}; -use futures::stream::StreamExt; use futures::{ channel::oneshot, future::{abortable, AbortHandle, Abortable}, - stream::FuturesUnordered, + stream::{FuturesUnordered, Stream, StreamExt}, Future, FutureExt, }; @@ -41,13 +40,13 @@ use std::{ }; const COST_SIGNATURE_INVALID: ReputationChange = - ReputationChange::new(-10000, "Bitfield signature invalid"); + ReputationChange::new(-100, "Bitfield signature invalid"); const COST_MISSING_PEER_SESSION_KEY: ReputationChange = - ReputationChange::new(-1337, "Missing peer session key"); + ReputationChange::new(-133, "Missing peer session key"); const COST_MULTIPLE_BITFIELDS_FROM_PEER: ReputationChange = - ReputationChange::new(-10000, "Received more than once bitfield from peer"); + ReputationChange::new(-22, "Received more than once bitfield from peer"); const COST_NOT_INTERESTED: ReputationChange = - ReputationChange::new(-100, "Not intersted in that parent hash"); + ReputationChange::new(-51, "Not intersted in that parent hash"); const COST_MESSAGE_NOT_DECODABLE: ReputationChange = ReputationChange::new(-100, "Not intersted in that parent hash"); @@ -78,6 +77,10 @@ struct JobData { // set of validators for a particular relay_parent validator_set: Vec, + + // set of validators for a particular relay_parent and the number of messages + // received authored by them + received_bitsets_per_validator: HashSet, } fn network_update_message(n: NetworkBridgeEvent) -> AllMessages { @@ -101,9 +104,8 @@ impl BitfieldDistribution { // set of active heads the overseer told us to work on with the connected // tasks abort handles - // @todo do we need Box>) for anything? - // let mut active_jobs = HashMap::>>)>::new(); let mut active_jobs = HashMap::::new(); + let mut futurama = HashMap::>>>::new(); let mut tracker = Tracker::default(); loop { { @@ -113,21 +115,21 @@ impl BitfieldDistribution { }; match message { FromOverseer::Communication { msg } => { - let peerid = PeerId::random(); // @todo + // we signed this bitfield match msg { // Distribute a bitfield via gossip to other validators. BitfieldDistributionMessage::DistributeBitfield( hash, signed_availability, ) => { - let msg = BitfieldGossip { + let msg = BitfieldGossipMessage { relay_parent: hash, signed_availability, }; - process_incoming(ctx.clone(), &mut tracker, peerid, msg).await?; + distribute(ctx.clone(), &mut tracker, msg).await?; } BitfieldDistributionMessage::NetworkBridgeUpdate(event) => { - handle_network_msg(ctx.clone(), &mut tracker, event).await?; + handle_network_msg(ctx.clone(), &mut tracker, event).await; } } } @@ -138,58 +140,39 @@ impl BitfieldDistribution { let _ = tracker.per_job.insert( relay_parent, JobData { - validator_bitset_received: HashSet::new(), signing_context, validator_set: validator_set, + ..Default::default() }, ); - - let future = processor_per_relay_parent(ctx.clone(), relay_parent.clone()); - let (future, abort_handle) = abortable(future); - - let _future = ctx.spawn(Box::pin(future.map(|_| ()))); - - active_jobs.insert(relay_parent.clone(), abort_handle); + // futurama.insert(relay_parent.clone(), Box::pin(future.map(|_| ()))); + // active_jobs.insert(relay_parent.clone(), abort_handle); } FromOverseer::Signal(OverseerSignal::StopWork(relay_parent)) => { - if let Some(abort_handle) = active_jobs.remove(&relay_parent) { - let _ = abort_handle.abort(); - } + // if let Some(abort_handle) = active_jobs.remove(&relay_parent) { + // let _ = abort_handle.abort(); + // } + // let _ = futurama.remove(&relay_parent); + let _ = tracker.per_job.remove(&relay_parent); } FromOverseer::Signal(OverseerSignal::Conclude) => { - // @todo cannot store the future - // let _ = futures::future::join_all( - // active_jobs - // .drain() - // .map(|(_relay_parent, (cancellation, future))| future), - // ) - // .await; - - // better: - // let future = move || { - // tracker.peer_views.clone().into_iter() - // .filter(|(_peerid, view)| view.contains(&hash)) - // .map(|(peerid, view)| { - // let mut ctx = ctx.clone(); - // let hash = hash.clone(); - // let signed_availability = signed_availability.clone(); - // async move { - // let bytes = BitfieldGossip { - // relay_parent: hash, - // signed_availability, - // }.encode(); - // ctx.send_message( - // AllMessages::NetworkBridge( - // NetworkBridgeMessage::SendMessage(vec![], BitfieldDistribution::PROTOCOL_ID, bytes), - // )).await - // } + tracker.per_job.clear(); + // let unordered = futurama + // .drain() + // .map(|(_relay_parent, future)| future) + // .zip( + // active_jobs + // .drain() + // .map(|(_relay_parent, abort_handle)| abort_handle), + // ) + // .map(|(future, abort_handle)| { + // abort_handle.abort(); + // // TODO pipe to cleanup state + // future // }) - // .collect::>().into_future() - // }; - // future().await; - for (_relay_parent, abort_handle) in active_jobs.drain() { - let _ = abort_handle.abort(); - } + // .collect::>(); + + // let _ = async move { unordered.into_future().await }.await; return Ok(()); } } @@ -200,18 +183,6 @@ impl BitfieldDistribution { } } -/// Process all requests related to one relay parent hash -async fn processor_per_relay_parent( - mut ctx: Context, - relay_parent: Hash, -) -> SubsystemResult<()> { - let mut tracker = Tracker::default(); - loop { - // todo!("consume relay parents") - } - Ok(()) -} - /// modify the reputiation, good or bad async fn modify_reputiation( mut ctx: Context, @@ -227,20 +198,59 @@ where .await } +/// Distribute a checked message, either originated by us or gossiped on from other peers. +async fn distribute( + mut ctx: Context, + tracker: &mut Tracker, + message: BitfieldGossipMessage, +) -> SubsystemResult<()> +where + Context: SubsystemContext + Clone, +{ + let BitfieldGossipMessage { + relay_parent, + signed_availability, + } = message; + // concurrently pass on the bitfield distribution to all interested peers + let interested_peers = tracker + .peer_views + .iter() + .filter(|(_peerid, view)| view.contains(&relay_parent)) + .map(|(peerid, _)| peerid.clone()) + .collect::>(); + let message = BitfieldGossipMessage { + relay_parent: relay_parent, + signed_availability, + }; + let bytes = message.encode(); + ctx.send_message(AllMessages::NetworkBridge( + NetworkBridgeMessage::SendMessage( + interested_peers, + BitfieldDistribution::PROTOCOL_ID, + bytes, + ), + )) + .await?; + Ok(()) +} + /// Handle an incoming message from a peer -async fn process_incoming( +async fn process_incoming_peer_message( mut ctx: Context, tracker: &mut Tracker, peerid: PeerId, - message: BitfieldGossip, + message: BitfieldGossipMessage, ) -> SubsystemResult<()> where Context: SubsystemContext + Clone, { - let peer_view = tracker.peer_views.get(&peerid).expect("TODO"); - let BitfieldGossip { relay_parent, signed_availability} = message; + let peer_view = if let Some(peer_view) = tracker.peer_views.get(&peerid) { + peer_view + } else { + return modify_reputiation(ctx, peerid, COST_NOT_INTERESTED).await; + }; - let job_data = if let Some(job_data) = tracker.per_job.get(&relay_parent) { + let job_data = if let Some(job_data) = tracker.per_job.get(&message.relay_parent) { job_data } else { return modify_reputiation(ctx, peerid, COST_NOT_INTERESTED).await; @@ -248,51 +258,34 @@ where // @todo should we only distribute availability messages to peer if they are relevant to us // or is the only discriminator if the peer cares about it? - if !peer_view.contains(&relay_parent) { + if !peer_view.contains(&message.relay_parent) { // we don't care about this, the other side should have known better return modify_reputiation(ctx, peerid, COST_NOT_INTERESTED).await; } let validator_set = &job_data.validator_set; if validator_set.len() == 0 { - return modify_reputiation(ctx.clone(), peerid, COST_MISSING_PEER_SESSION_KEY).await; + return modify_reputiation(ctx, peerid, COST_MISSING_PEER_SESSION_KEY).await; } // check all validators that could have signed this message - if let Some(_) = validator_set.iter().find(|validator| { - signed_availability + if validator_set.iter().find(|validator| { + message + .signed_availability .check_signature(&job_data.signing_context, validator) .is_ok() - }) { - return modify_reputiation(ctx.clone(), peerid, COST_SIGNATURE_INVALID).await; + }).is_none() { + return modify_reputiation(ctx, peerid, COST_SIGNATURE_INVALID).await; } - // concurrently pass on the bitfield distribution to all interested peers - let interested_peers = tracker - .peer_views - .iter() - .filter(|(_peerid, view)| view.contains(&relay_parent)) - .map(|(peerid, _)| peerid.clone()) - .collect::>(); - let message = BitfieldGossip { - relay_parent: relay_parent, - signed_availability, - }; - let bytes = message.encode(); - ctx.send_message(AllMessages::NetworkBridge( - NetworkBridgeMessage::SendMessage( - interested_peers, - BitfieldDistribution::PROTOCOL_ID, - bytes, - ), - )) - .await?; + distribute(ctx, tracker, message).await?; + Ok(()) } -/// A Gossiped signed availability bitfield for a particular relay hash +/// A gossiped or gossipable signed availability bitfield for a particular relay hash #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] -pub struct BitfieldGossip { +pub struct BitfieldGossipMessage { relay_parent: Hash, signed_availability: SignedAvailabilityBitfield, } @@ -341,10 +334,15 @@ where } NetworkBridgeEvent::PeerMessage(remote, mut bytes) => { log::info!("Got a peer message from {:?}", &remote); - if let Ok(gossiped_bitfield) = BitfieldGossip::decode(&mut (bytes.as_slice())) { - process_incoming(ctx, tracker, remote, gossiped_bitfield).await?; + if let Ok(gossiped_bitfield) = BitfieldGossipMessage::decode(&mut (bytes.as_slice())) { + let (future, _abort_handle) = abortable(process_incoming_peer_message(ctx, tracker, remote, gossiped_bitfield)); + future.await; + // tracker.active_jobs.insert(&gossiped_bitfield.relay_parent, abort_handle); + // let _future = ctx.spawn(Box::pin(async move { + // future.map(|_| ()).await + // })); } else { - return modify_reputiation(ctx.clone(), remote, COST_MESSAGE_NOT_DECODABLE).await; + return modify_reputiation(ctx, remote, COST_MESSAGE_NOT_DECODABLE).await; } } } @@ -353,12 +351,10 @@ where impl Subsystem for BitfieldDistribution where - C: SubsystemContext + Clone + Sync, + C: SubsystemContext + Clone + Sync + Send, { fn start(self, ctx: C) -> SpawnedSubsystem { - SpawnedSubsystem(Box::pin(async move { - Self::run(ctx).await; - })) + SpawnedSubsystem(Box::pin(async move { Self::run(ctx) }.map(|_| ()))) } } @@ -393,8 +389,12 @@ where mod test { use super::*; + fn generate_valid_message() -> AllMessages {} + + fn generate_invalid_message() -> AllMessages {} + #[test] - fn x() { + fn game_changer() { // @todo } }