Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
lacks abortable processing impl details
Browse files Browse the repository at this point in the history
  • Loading branch information
drahnr committed Jul 14, 2020
1 parent a2e8bc2 commit 9012e2c
Showing 1 changed file with 108 additions and 108 deletions.
216 changes: 108 additions & 108 deletions node/network/bitfield-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
//! The bitfield distribution subsystem spreading @todo .
use codec::{Codec, Decode, Encode};
use futures::stream::StreamExt;
use futures::{
channel::oneshot,
future::{abortable, AbortHandle, Abortable},
stream::FuturesUnordered,
stream::{FuturesUnordered, Stream, StreamExt},
Future, FutureExt,
};

Expand All @@ -41,13 +40,13 @@ use std::{
};

const COST_SIGNATURE_INVALID: ReputationChange =
ReputationChange::new(-10000, "Bitfield signature invalid");
ReputationChange::new(-100, "Bitfield signature invalid");
const COST_MISSING_PEER_SESSION_KEY: ReputationChange =
ReputationChange::new(-1337, "Missing peer session key");
ReputationChange::new(-133, "Missing peer session key");
const COST_MULTIPLE_BITFIELDS_FROM_PEER: ReputationChange =
ReputationChange::new(-10000, "Received more than once bitfield from peer");
ReputationChange::new(-22, "Received more than once bitfield from peer");
const COST_NOT_INTERESTED: ReputationChange =
ReputationChange::new(-100, "Not intersted in that parent hash");
ReputationChange::new(-51, "Not intersted in that parent hash");
const COST_MESSAGE_NOT_DECODABLE: ReputationChange =
ReputationChange::new(-100, "Not intersted in that parent hash");

Expand Down Expand Up @@ -78,6 +77,10 @@ struct JobData {

// set of validators for a particular relay_parent
validator_set: Vec<ValidatorId>,

// set of validators for a particular relay_parent and the number of messages
// received authored by them
received_bitsets_per_validator: HashSet<ValidatorId, usize>,
}

fn network_update_message(n: NetworkBridgeEvent) -> AllMessages {
Expand All @@ -101,9 +104,8 @@ impl BitfieldDistribution {

// set of active heads the overseer told us to work on with the connected
// tasks abort handles
// @todo do we need Box<dyn Future<Output = ()>>) for anything?
// let mut active_jobs = HashMap::<Hash, (AbortHandle, Pin<Box<dyn Future<Output = ()>>>)>::new();
let mut active_jobs = HashMap::<Hash, AbortHandle>::new();
let mut futurama = HashMap::<Hash, Pin<Box<dyn Future<Output = ()>>>>::new();
let mut tracker = Tracker::default();
loop {
{
Expand All @@ -113,21 +115,21 @@ impl BitfieldDistribution {
};
match message {
FromOverseer::Communication { msg } => {
let peerid = PeerId::random(); // @todo
// we signed this bitfield
match msg {
// Distribute a bitfield via gossip to other validators.
BitfieldDistributionMessage::DistributeBitfield(
hash,
signed_availability,
) => {
let msg = BitfieldGossip {
let msg = BitfieldGossipMessage {
relay_parent: hash,
signed_availability,
};
process_incoming(ctx.clone(), &mut tracker, peerid, msg).await?;
distribute(ctx.clone(), &mut tracker, msg).await?;
}
BitfieldDistributionMessage::NetworkBridgeUpdate(event) => {
handle_network_msg(ctx.clone(), &mut tracker, event).await?;
handle_network_msg(ctx.clone(), &mut tracker, event).await;
}
}
}
Expand All @@ -138,58 +140,39 @@ impl BitfieldDistribution {
let _ = tracker.per_job.insert(
relay_parent,
JobData {
validator_bitset_received: HashSet::new(),
signing_context,
validator_set: validator_set,
..Default::default()
},
);

let future = processor_per_relay_parent(ctx.clone(), relay_parent.clone());
let (future, abort_handle) = abortable(future);

let _future = ctx.spawn(Box::pin(future.map(|_| ())));

active_jobs.insert(relay_parent.clone(), abort_handle);
// futurama.insert(relay_parent.clone(), Box::pin(future.map(|_| ())));
// active_jobs.insert(relay_parent.clone(), abort_handle);
}
FromOverseer::Signal(OverseerSignal::StopWork(relay_parent)) => {
if let Some(abort_handle) = active_jobs.remove(&relay_parent) {
let _ = abort_handle.abort();
}
// if let Some(abort_handle) = active_jobs.remove(&relay_parent) {
// let _ = abort_handle.abort();
// }
// let _ = futurama.remove(&relay_parent);
let _ = tracker.per_job.remove(&relay_parent);
}
FromOverseer::Signal(OverseerSignal::Conclude) => {
// @todo cannot store the future
// let _ = futures::future::join_all(
// active_jobs
// .drain()
// .map(|(_relay_parent, (cancellation, future))| future),
// )
// .await;

// better:
// let future = move || {
// tracker.peer_views.clone().into_iter()
// .filter(|(_peerid, view)| view.contains(&hash))
// .map(|(peerid, view)| {
// let mut ctx = ctx.clone();
// let hash = hash.clone();
// let signed_availability = signed_availability.clone();
// async move {
// let bytes = BitfieldGossip {
// relay_parent: hash,
// signed_availability,
// }.encode();
// ctx.send_message(
// AllMessages::NetworkBridge(
// NetworkBridgeMessage::SendMessage(vec![], BitfieldDistribution::PROTOCOL_ID, bytes),
// )).await
// }
tracker.per_job.clear();
// let unordered = futurama
// .drain()
// .map(|(_relay_parent, future)| future)
// .zip(
// active_jobs
// .drain()
// .map(|(_relay_parent, abort_handle)| abort_handle),
// )
// .map(|(future, abort_handle)| {
// abort_handle.abort();
// // TODO pipe to cleanup state
// future
// })
// .collect::<FuturesUnordered<_>>().into_future()
// };
// future().await;
for (_relay_parent, abort_handle) in active_jobs.drain() {
let _ = abort_handle.abort();
}
// .collect::<FuturesUnordered<_>>();

// let _ = async move { unordered.into_future().await }.await;
return Ok(());
}
}
Expand All @@ -200,18 +183,6 @@ impl BitfieldDistribution {
}
}

/// Process all requests related to one relay parent hash
async fn processor_per_relay_parent<Context>(
mut ctx: Context,
relay_parent: Hash,
) -> SubsystemResult<()> {
let mut tracker = Tracker::default();
loop {
// todo!("consume relay parents")
}
Ok(())
}

/// modify the reputiation, good or bad
async fn modify_reputiation<Context>(
mut ctx: Context,
Expand All @@ -227,72 +198,94 @@ where
.await
}

/// Distribute a checked message, either originated by us or gossiped on from other peers.
async fn distribute<Context>(
mut ctx: Context,
tracker: &mut Tracker,
message: BitfieldGossipMessage,
) -> SubsystemResult<()>
where
Context: SubsystemContext<Message = BitfieldDistributionMessage> + Clone,
{
let BitfieldGossipMessage {
relay_parent,
signed_availability,
} = message;
// concurrently pass on the bitfield distribution to all interested peers
let interested_peers = tracker
.peer_views
.iter()
.filter(|(_peerid, view)| view.contains(&relay_parent))
.map(|(peerid, _)| peerid.clone())
.collect::<Vec<PeerId>>();
let message = BitfieldGossipMessage {
relay_parent: relay_parent,
signed_availability,
};
let bytes = message.encode();
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendMessage(
interested_peers,
BitfieldDistribution::PROTOCOL_ID,
bytes,
),
))
.await?;
Ok(())
}

/// Handle an incoming message from a peer
async fn process_incoming<Context>(
async fn process_incoming_peer_message<Context>(
mut ctx: Context,
tracker: &mut Tracker,
peerid: PeerId,
message: BitfieldGossip,
message: BitfieldGossipMessage,
) -> SubsystemResult<()>
where
Context: SubsystemContext<Message = BitfieldDistributionMessage> + Clone,
{
let peer_view = tracker.peer_views.get(&peerid).expect("TODO");
let BitfieldGossip { relay_parent, signed_availability} = message;
let peer_view = if let Some(peer_view) = tracker.peer_views.get(&peerid) {
peer_view
} else {
return modify_reputiation(ctx, peerid, COST_NOT_INTERESTED).await;
};

let job_data = if let Some(job_data) = tracker.per_job.get(&relay_parent) {
let job_data = if let Some(job_data) = tracker.per_job.get(&message.relay_parent) {
job_data
} else {
return modify_reputiation(ctx, peerid, COST_NOT_INTERESTED).await;
};

// @todo should we only distribute availability messages to peer if they are relevant to us
// or is the only discriminator if the peer cares about it?
if !peer_view.contains(&relay_parent) {
if !peer_view.contains(&message.relay_parent) {
// we don't care about this, the other side should have known better
return modify_reputiation(ctx, peerid, COST_NOT_INTERESTED).await;
}

let validator_set = &job_data.validator_set;
if validator_set.len() == 0 {
return modify_reputiation(ctx.clone(), peerid, COST_MISSING_PEER_SESSION_KEY).await;
return modify_reputiation(ctx, peerid, COST_MISSING_PEER_SESSION_KEY).await;
}

// check all validators that could have signed this message
if let Some(_) = validator_set.iter().find(|validator| {
signed_availability
if validator_set.iter().find(|validator| {
message
.signed_availability
.check_signature(&job_data.signing_context, validator)
.is_ok()
}) {
return modify_reputiation(ctx.clone(), peerid, COST_SIGNATURE_INVALID).await;
}).is_none() {
return modify_reputiation(ctx, peerid, COST_SIGNATURE_INVALID).await;
}

// concurrently pass on the bitfield distribution to all interested peers
let interested_peers = tracker
.peer_views
.iter()
.filter(|(_peerid, view)| view.contains(&relay_parent))
.map(|(peerid, _)| peerid.clone())
.collect::<Vec<PeerId>>();
let message = BitfieldGossip {
relay_parent: relay_parent,
signed_availability,
};
let bytes = message.encode();
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendMessage(
interested_peers,
BitfieldDistribution::PROTOCOL_ID,
bytes,
),
))
.await?;
distribute(ctx, tracker, message).await?;

Ok(())
}

/// A Gossiped signed availability bitfield for a particular relay hash
/// A gossiped or gossipable signed availability bitfield for a particular relay hash
#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
pub struct BitfieldGossip {
pub struct BitfieldGossipMessage {
relay_parent: Hash,
signed_availability: SignedAvailabilityBitfield,
}
Expand Down Expand Up @@ -341,10 +334,15 @@ where
}
NetworkBridgeEvent::PeerMessage(remote, mut bytes) => {
log::info!("Got a peer message from {:?}", &remote);
if let Ok(gossiped_bitfield) = BitfieldGossip::decode(&mut (bytes.as_slice())) {
process_incoming(ctx, tracker, remote, gossiped_bitfield).await?;
if let Ok(gossiped_bitfield) = BitfieldGossipMessage::decode(&mut (bytes.as_slice())) {
let (future, _abort_handle) = abortable(process_incoming_peer_message(ctx, tracker, remote, gossiped_bitfield));
future.await;
// tracker.active_jobs.insert(&gossiped_bitfield.relay_parent, abort_handle);
// let _future = ctx.spawn(Box::pin(async move {
// future.map(|_| ()).await
// }));
} else {
return modify_reputiation(ctx.clone(), remote, COST_MESSAGE_NOT_DECODABLE).await;
return modify_reputiation(ctx, remote, COST_MESSAGE_NOT_DECODABLE).await;
}
}
}
Expand All @@ -353,12 +351,10 @@ where

impl<C> Subsystem<C> for BitfieldDistribution
where
C: SubsystemContext<Message = BitfieldDistributionMessage> + Clone + Sync,
C: SubsystemContext<Message = BitfieldDistributionMessage> + Clone + Sync + Send,
{
fn start(self, ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
Self::run(ctx).await;
}))
SpawnedSubsystem(Box::pin(async move { Self::run(ctx) }.map(|_| ())))
}
}

Expand Down Expand Up @@ -393,8 +389,12 @@ where
mod test {
use super::*;

fn generate_valid_message() -> AllMessages {}

fn generate_invalid_message() -> AllMessages {}

#[test]
fn x() {
fn game_changer() {
// @todo
}
}

0 comments on commit 9012e2c

Please sign in to comment.