Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
[client]: Fix for incorrectly dropped consensus messages (#11082) (#1…
Browse files Browse the repository at this point in the history
…1086)

Fixes a race condition causing the currently_queued counter to underflow and consensus messages getting dropped incorrectly as a consequence.
  • Loading branch information
dforsten authored and dvdplm committed Nov 6, 2019
1 parent 9bcb911 commit 7401732
Showing 1 changed file with 43 additions and 36 deletions.
79 changes: 43 additions & 36 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
use std::cmp;
use std::collections::{HashSet, BTreeMap, VecDeque};
use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::convert::TryFrom;
use std::sync::atomic::{AtomicI64, AtomicBool, Ordering as AtomicOrdering};
use std::sync::{Arc, Weak};
use std::time::{Instant, Duration};

Expand Down Expand Up @@ -2605,6 +2606,47 @@ fn transaction_receipt(
}
}

/// Queue some items to be processed by IO client.
struct IoChannelQueue {
/// Using a *signed* integer for counting currently queued messages since the
/// order in which the counter is incremented and decremented is not defined.
/// Using an unsigned integer can (and will) result in integer underflow,
/// incorrectly rejecting messages and returning a FullQueue error.
currently_queued: Arc<AtomicI64>,
limit: i64,
}

impl IoChannelQueue {
pub fn new(limit: usize) -> Self {
let limit = i64::try_from(limit).unwrap_or(i64::max_value());
IoChannelQueue {
currently_queued: Default::default(),
limit,
}
}

pub fn queue<F>(&self, channel: &IoChannel<ClientIoMessage>, count: usize, fun: F) -> EthcoreResult<()> where
F: Fn(&Client) + Send + Sync + 'static,
{
let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed);
if queue_size >= self.limit {
let err_limit = usize::try_from(self.limit).unwrap_or(usize::max_value());
bail!("The queue is full ({})", err_limit);
};

let count = i64::try_from(count).unwrap_or(i64::max_value());

let currently_queued = self.currently_queued.clone();
let _ok = channel.send(ClientIoMessage::execute(move |client| {
currently_queued.fetch_sub(count, AtomicOrdering::SeqCst);
fun(client);
}))?;

self.currently_queued.fetch_add(count, AtomicOrdering::SeqCst);
Ok(())
}
}

#[cfg(test)]
mod tests {

Expand Down Expand Up @@ -2762,38 +2804,3 @@ mod tests {
}
}

/// Queue some items to be processed by IO client.
struct IoChannelQueue {
currently_queued: Arc<AtomicUsize>,
limit: usize,
}

impl IoChannelQueue {
pub fn new(limit: usize) -> Self {
IoChannelQueue {
currently_queued: Default::default(),
limit,
}
}

pub fn queue<F>(&self, channel: &IoChannel<ClientIoMessage>, count: usize, fun: F) -> Result<(), QueueError> where
F: Fn(&Client) + Send + Sync + 'static,
{
let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed);
ensure!(queue_size < self.limit, QueueErrorKind::Full(self.limit));

let currently_queued = self.currently_queued.clone();
let result = channel.send(ClientIoMessage::execute(move |client| {
currently_queued.fetch_sub(count, AtomicOrdering::SeqCst);
fun(client);
}));

match result {
Ok(_) => {
self.currently_queued.fetch_add(count, AtomicOrdering::SeqCst);
Ok(())
},
Err(e) => bail!(QueueErrorKind::Channel(e)),
}
}
}

0 comments on commit 7401732

Please sign in to comment.