Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
clarify
Browse files Browse the repository at this point in the history
  • Loading branch information
drahnr committed Jul 10, 2020
1 parent 21e8292 commit a2e8bc2
Showing 1 changed file with 124 additions and 89 deletions.
213 changes: 124 additions & 89 deletions node/network/bitfield-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@
//! The bitfield distribution subsystem spreading @todo .
use codec::{Codec, Decode, Encode};
use futures::stream::StreamExt;
use futures::{
channel::oneshot,
future::{abortable, AbortHandle, Abortable},
Future, FutureExt,
stream::FuturesUnordered,
Future, FutureExt,
};
use futures::stream::StreamExt;

use node_primitives::{ProtocolId, SignedFullStatement, View};
use polkadot_network::protocol::Message;
use polkadot_node_subsystem::messages::*;
use polkadot_node_subsystem::{
FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult,
};
use polkadot_primitives::parachain::SignedAvailabilityBitfield;
use polkadot_primitives::parachain::{SigningContext, ValidatorId};
use polkadot_primitives::Hash;
use sc_network::ReputationChange;
Expand Down Expand Up @@ -66,7 +67,6 @@ struct Tracker {
per_job: HashMap<Hash, JobData>,
}


/// Data for each relay parent
#[derive(Debug, Clone, Default)]
struct JobData {
Expand All @@ -80,8 +80,6 @@ struct JobData {
validator_set: Vec<ValidatorId>,
}



fn network_update_message(n: NetworkBridgeEvent) -> AllMessages {
AllMessages::BitfieldDistribution(BitfieldDistributionMessage::NetworkBridgeUpdate(n))
}
Expand Down Expand Up @@ -110,40 +108,51 @@ impl BitfieldDistribution {
loop {
{
let message = {
let mut ctx = ctx.clone();
ctx.recv().await?
let mut ctx = ctx.clone();
ctx.recv().await?
};
match message {
FromOverseer::Communication { msg } => {
let peerid = PeerId::random(); // @todo
process_incoming(ctx.clone(), &mut tracker, peerid, msg).await?;
match msg {
// Distribute a bitfield via gossip to other validators.
BitfieldDistributionMessage::DistributeBitfield(
hash,
signed_availability,
) => {
let msg = BitfieldGossip {
relay_parent: hash,
signed_availability,
};
process_incoming(ctx.clone(), &mut tracker, peerid, msg).await?;
}
BitfieldDistributionMessage::NetworkBridgeUpdate(event) => {
handle_network_msg(ctx.clone(), &mut tracker, event).await?;
}
}
}
FromOverseer::Signal(OverseerSignal::StartWork(relay_parent)) => {
let (validator_set, signing_context) =
query_basics(ctx.clone(), relay_parent).await?;

let _ = tracker.per_job.insert(relay_parent, JobData {
let _ = tracker.per_job.insert(
relay_parent,
JobData {
validator_bitset_received: HashSet::new(),
signing_context,
validator_set: validator_set,
});
},
);

let future = processor_per_relay_parent(ctx.clone(), relay_parent.clone());
let (future, abort_handle) = abortable(future);

let _future = ctx.spawn(Box::pin(future.map(|_| ())));

let _future =
ctx.spawn(Box::pin(
future.map(|_| { () })
));

active_jobs
.insert(relay_parent.clone(), abort_handle);
active_jobs.insert(relay_parent.clone(), abort_handle);
}
FromOverseer::Signal(OverseerSignal::StopWork(relay_parent)) => {
if let Some(abort_handle) =
active_jobs.remove(&relay_parent)
{
if let Some(abort_handle) = active_jobs.remove(&relay_parent) {
let _ = abort_handle.abort();
}
}
Expand All @@ -155,10 +164,33 @@ impl BitfieldDistribution {
// .map(|(_relay_parent, (cancellation, future))| future),
// )
// .await;

// better:
// let future = move || {
// tracker.peer_views.clone().into_iter()
// .filter(|(_peerid, view)| view.contains(&hash))
// .map(|(peerid, view)| {
// let mut ctx = ctx.clone();
// let hash = hash.clone();
// let signed_availability = signed_availability.clone();
// async move {
// let bytes = BitfieldGossip {
// relay_parent: hash,
// signed_availability,
// }.encode();
// ctx.send_message(
// AllMessages::NetworkBridge(
// NetworkBridgeMessage::SendMessage(vec![], BitfieldDistribution::PROTOCOL_ID, bytes),
// )).await
// }
// })
// .collect::<FuturesUnordered<_>>().into_future()
// };
// future().await;
for (_relay_parent, abort_handle) in active_jobs.drain() {
let _ = abort_handle.abort();
}
return Ok(())
return Ok(());
}
}
}
Expand All @@ -168,9 +200,11 @@ impl BitfieldDistribution {
}
}


/// Process all requests related to one relay parent hash
async fn processor_per_relay_parent<Context>(mut ctx: Context, relay_parent: Hash) -> SubsystemResult<()> {
async fn processor_per_relay_parent<Context>(
mut ctx: Context,
relay_parent: Hash,
) -> SubsystemResult<()> {
let mut tracker = Tracker::default();
loop {
// todo!("consume relay parents")
Expand All @@ -179,7 +213,11 @@ async fn processor_per_relay_parent<Context>(mut ctx: Context, relay_parent: Has
}

/// modify the reputiation, good or bad
async fn modify_reputiation<Context>(mut ctx: Context, peerid: PeerId, rep: ReputationChange) -> SubsystemResult<()>
async fn modify_reputiation<Context>(
mut ctx: Context,
peerid: PeerId,
rep: ReputationChange,
) -> SubsystemResult<()>
where
Context: SubsystemContext<Message = BitfieldDistributionMessage> + Clone,
{
Expand All @@ -194,65 +232,71 @@ async fn process_incoming<Context>(
mut ctx: Context,
tracker: &mut Tracker,
peerid: PeerId,
message: BitfieldDistributionMessage,
message: BitfieldGossip,
) -> SubsystemResult<()>
where
Context: SubsystemContext<Message = BitfieldDistributionMessage> + Clone,
{
let peer_view = tracker.peer_views.get(&peerid).expect("TODO");
match message {
// Distribute a bitfield via gossip to other validators.
BitfieldDistributionMessage::DistributeBitfield(hash, signed_availability) => {
let job_data = if let Some(job_data) = tracker.per_job.get(&hash) {
job_data
} else {
return modify_reputiation(ctx, peerid, COST_NOT_INTERESTED).await;
};

// @todo should we only distribute availability messages to peer if they are relevant to us
// or is the only discriminator if the peer cares about it?
if !peer_view.contains(&hash) {
// we don't care about this, the other side should have known better
return modify_reputiation(ctx, peerid, COST_NOT_INTERESTED).await;
}

let validator_set = &job_data.validator_set;
if validator_set.len() == 0 {
return modify_reputiation(ctx.clone(), peerid, COST_MISSING_PEER_SESSION_KEY).await
}
let BitfieldGossip { relay_parent, signed_availability} = message;

let job_data = if let Some(job_data) = tracker.per_job.get(&relay_parent) {
job_data
} else {
return modify_reputiation(ctx, peerid, COST_NOT_INTERESTED).await;
};

// @todo should we only distribute availability messages to peer if they are relevant to us
// or is the only discriminator if the peer cares about it?
if !peer_view.contains(&relay_parent) {
// we don't care about this, the other side should have known better
return modify_reputiation(ctx, peerid, COST_NOT_INTERESTED).await;
}

// check all validators that could have signed this message
if let Some(_) = validator_set.iter().find(|validator| { signed_availability.check_signature(&job_data.signing_context, validator).is_ok() }) {
return modify_reputiation(ctx.clone(), peerid, COST_SIGNATURE_INVALID)
.await
}
let validator_set = &job_data.validator_set;
if validator_set.len() == 0 {
return modify_reputiation(ctx.clone(), peerid, COST_MISSING_PEER_SESSION_KEY).await;
}

// concurrently pass on the bitfield distribution to all peers
let future = move || {
tracker.peer_views.clone().into_iter()
.filter(|(_peerid, view)| view.contains(&hash))
.map(|(peerid, view)| {
let mut ctx = ctx.clone();
let hash = hash.clone();
let signed_availability = signed_availability.clone();
async move {
ctx.send_message(
AllMessages::BitfieldDistribution(
BitfieldDistributionMessage::DistributeBitfield(hash, signed_availability),
)).await
}
})
.collect::<FuturesUnordered<_>>().into_future()
};
future().await;
}
BitfieldDistributionMessage::NetworkBridgeUpdate(event) => {
handle_network_msg(ctx, tracker, event).await?;
}
// check all validators that could have signed this message
if let Some(_) = validator_set.iter().find(|validator| {
signed_availability
.check_signature(&job_data.signing_context, validator)
.is_ok()
}) {
return modify_reputiation(ctx.clone(), peerid, COST_SIGNATURE_INVALID).await;
}

// concurrently pass on the bitfield distribution to all interested peers
let interested_peers = tracker
.peer_views
.iter()
.filter(|(_peerid, view)| view.contains(&relay_parent))
.map(|(peerid, _)| peerid.clone())
.collect::<Vec<PeerId>>();
let message = BitfieldGossip {
relay_parent: relay_parent,
signed_availability,
};
let bytes = message.encode();
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendMessage(
interested_peers,
BitfieldDistribution::PROTOCOL_ID,
bytes,
),
))
.await?;
Ok(())
}

/// A Gossiped signed availability bitfield for a particular relay hash
#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
pub struct BitfieldGossip {
relay_parent: Hash,
signed_availability: SignedAvailabilityBitfield,
}

/// Deal with network bridge updates and track what needs to be tracked
async fn handle_network_msg<Context>(
mut ctx: Context,
Expand Down Expand Up @@ -283,7 +327,9 @@ where
NetworkBridgeEvent::OurViewChange(view) => {
let ego = ego.clone();
let old_view = std::mem::replace(&mut (tracker.view), view);
tracker.per_job.retain(move |hash, _job_data| ego.0.contains(hash));
tracker
.per_job
.retain(move |hash, _job_data| ego.0.contains(hash));

for new in tracker.view.difference(&old_view) {
if !tracker.per_job.contains_key(&new) {
Expand All @@ -293,23 +339,12 @@ where
}
}
}
NetworkBridgeEvent::PeerMessage(remote, bytes) => {
NetworkBridgeEvent::PeerMessage(remote, mut bytes) => {
log::info!("Got a peer message from {:?}", &remote);
// @todo what would we receive here?
match Message::decode(&mut bytes.as_ref()) {
Ok(message) => {
match message {
// a new session key
Message::ValidatorId(session_key) => {
// @todo update all Validator ids I guess?
// let _ = tracker
// .per_job.
// .insert(remote.clone(), session_key);
}
_ => {}
}
}
Err(_) => unimplemented!("Invalid format shall be punished I guess"),
if let Ok(gossiped_bitfield) = BitfieldGossip::decode(&mut (bytes.as_slice())) {
process_incoming(ctx, tracker, remote, gossiped_bitfield).await?;
} else {
return modify_reputiation(ctx.clone(), remote, COST_MESSAGE_NOT_DECODABLE).await;
}
}
}
Expand Down

0 comments on commit a2e8bc2

Please sign in to comment.