Skip to content

Commit

Permalink
Ignore duplicate messages sent within 5 minutes of each other
Browse files Browse the repository at this point in the history
  • Loading branch information
birchmd committed Jan 21, 2021
1 parent 7f2bef7 commit 21f2c55
Showing 1 changed file with 17 additions and 0 deletions.
17 changes: 17 additions & 0 deletions chain/network/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use actix::{
Actor, ActorContext, ActorFuture, Addr, Arbiter, AsyncContext, Context, ContextFutureSpawner,
Handler, Recipient, Running, StreamHandler, WrapFuture,
};
use cached::{Cached, TimedCache};
use tracing::{debug, error, info, trace, warn};

use near_metrics;
Expand Down Expand Up @@ -60,6 +61,10 @@ const MAX_PEER_MSG_PER_MIN: u64 = std::u64::MAX;
/// dispatching transactions when we should be focusing on consensus-related messages.
const MAX_TXNS_PER_BLOCK_MESSAGE: usize = 1000;

/// Time during which we will ignore duplicate messages (i.e. we assume we received already and
/// performed the necessary action).
const IDEMPOTENT_SECONDS: u64 = 5 * 60; // 5 minutes

/// Internal structure to keep a circular queue within a tracker with unique hashes.
struct CircularUniqueQueue {
v: Vec<CryptoHash>,
Expand Down Expand Up @@ -182,6 +187,8 @@ pub struct Peer {
txns_since_last_block: Arc<AtomicUsize>,
/// How many peer actors are created
peer_counter: Arc<AtomicUsize>,
/// cache keeping track of messages we recieved recently
recent_messages: TimedCache<CryptoHash, ()>,
}

impl Peer {
Expand Down Expand Up @@ -220,6 +227,7 @@ impl Peer {
network_metrics,
txns_since_last_block,
peer_counter,
recent_messages: TimedCache::with_lifespan(IDEMPOTENT_SECONDS),
}
}

Expand Down Expand Up @@ -710,6 +718,15 @@ impl StreamHandler<Result<Vec<u8>, ReasonForBan>> for Peer {
return;
}
}
// We purposely choose to detect duplicate message at this point
// (i.e. before deserialization) to prevent as much unnecessary work
// as possible.
let bytes_hash = near_primitives::hash::hash(&msg);
if self.recent_messages.cache_get(&bytes_hash).is_some() {
// This is a duplicate message -- ignore
return;
}
self.recent_messages.cache_set(bytes_hash, ());
let mut peer_msg = match bytes_to_peer_message(&msg) {
Ok(peer_msg) => peer_msg,
Err(err) => {
Expand Down

0 comments on commit 21f2c55

Please sign in to comment.