Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Change best effort queue behaviour in dispute-coordinator #6275

Merged
merged 17 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 82 additions & 92 deletions node/core/dispute-coordinator/src/participation/queues/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use std::{
cmp::Ordering,
collections::{BTreeMap, HashMap},
};
use std::{cmp::Ordering, collections::BTreeMap};

use futures::channel::oneshot;
use polkadot_node_subsystem::{messages::ChainApiMessage, overseer};
Expand Down Expand Up @@ -50,25 +47,14 @@ const PRIORITY_QUEUE_SIZE: usize = 20_000;
#[cfg(test)]
const PRIORITY_QUEUE_SIZE: usize = 2;

/// Type for counting how often a candidate was added to the best effort queue.
type BestEffortCount = u32;

/// Queues for dispute participation.
/// In both queues we have a strict ordering of candidates and participation will
/// happen in that order. Refer to `CandidateComparator` for details on the ordering.
pub struct Queues {
/// Set of best effort participation requests.
///
/// Note that as size is limited to `BEST_EFFORT_QUEUE_SIZE` we simply do a linear search for
/// the entry with the highest `added_count` to determine what dispute to participate next in.
///
/// This mechanism leads to an amplifying effect - the more validators already participated,
/// the more likely it becomes that more validators will participate soon, which should lead to
/// a quick resolution of disputes, even in the best effort queue.
best_effort: HashMap<CandidateHash, BestEffortEntry>,
best_effort: BTreeMap<CandidateComparator, ParticipationRequest>,

/// Priority queue.
///
/// In the priority queue, we have a strict ordering of candidates and participation will
/// happen in that order.
priority: BTreeMap<CandidateComparator, ParticipationRequest>,
}

Expand Down Expand Up @@ -143,14 +129,13 @@ impl ParticipationRequest {
impl Queues {
/// Create new `Queues`.
pub fn new() -> Self {
Self { best_effort: HashMap::new(), priority: BTreeMap::new() }
Self { best_effort: BTreeMap::new(), priority: BTreeMap::new() }
}

/// Will put message in queue, either priority or best effort depending on priority.
///
/// If the message was already previously present on best effort, it will be moved to priority
/// if it considered priority now, otherwise the `added_count` on the best effort queue will be
/// bumped.
/// if it considered priority now.
tdimitrov marked this conversation as resolved.
Show resolved Hide resolved
///
/// Returns error in case a queue was found full already.
pub async fn queue(
Expand All @@ -159,94 +144,76 @@ impl Queues {
priority: ParticipationPriority,
req: ParticipationRequest,
) -> Result<()> {
let comparator = match priority {
ParticipationPriority::BestEffort => None,
ParticipationPriority::Priority =>
CandidateComparator::new(sender, &req.candidate_receipt).await?,
};
self.queue_with_comparator(comparator, req)?;
let comparator = CandidateComparator::new(sender, &req.candidate_receipt).await?;

self.queue_with_comparator(comparator, priority, req)?;
Ok(())
}

/// Get the next best request for dispute participation
///
/// if any. Priority queue is always considered first, then the best effort queue based on
/// `added_count`.
/// Get the next best request for dispute participation if any.
/// First the priority queue is considered and then the best effort one.
pub fn dequeue(&mut self) -> Option<ParticipationRequest> {
if let Some(req) = self.pop_priority() {
// In case a candidate became best effort over time, we might have it also queued in
// the best effort queue - get rid of any such entry:
self.best_effort.remove(req.candidate_hash());
return Some(req)
self.best_effort.remove(&req.0);
return Some(req.1)
}
self.pop_best_effort()
self.pop_best_effort().map(|d| d.1)
}

fn queue_with_comparator(
&mut self,
comparator: Option<CandidateComparator>,
comparator: CandidateComparator,
priority: ParticipationPriority,
req: ParticipationRequest,
) -> std::result::Result<(), QueueError> {
if let Some(comparator) = comparator {
if priority.is_priority() {
if self.priority.len() >= PRIORITY_QUEUE_SIZE {
return Err(QueueError::PriorityFull)
}
// Remove any best effort entry:
self.best_effort.remove(&req.candidate_hash);
self.best_effort.remove(&comparator);
self.priority.insert(comparator, req);
} else {
if self.best_effort.len() >= BEST_EFFORT_QUEUE_SIZE {
return Err(QueueError::BestEffortFull)
}
// Note: The request might have been added to priority in a previous call already, we
// take care of that case in `dequeue` (more efficient).
eskimor marked this conversation as resolved.
Show resolved Hide resolved
self.best_effort
.entry(req.candidate_hash)
.or_insert(BestEffortEntry { req, added_count: 0 })
.added_count += 1;
self.best_effort.entry(comparator).or_insert(req);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually a plain insert like in the priority case would suffice.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be fixed in the course of other work, no reason not to merge this one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But what about that failing zombienet test? Zombienet problem or actual issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a small one. I'll fix it now.

The zombienet test fails because a metric can't be fetched:

2022-11-17T08:20:31.893Z zombie::network-node Connected to honest-validator-0
	 Error:  
		 Timeout(180), "getting desired parachain block height 2 within 180 secs".
    1) honest-validator-0: parachain 2000 block height is at least 2 within 180 seconds
2022-11-17T08:23:31.427Z zombie::network-node Connecting api for honest-validator-1 at ws://10.27.63.7:9944...

Looks like a connectivity issue to me. I don't see anything suspicious in the logs. There is a dispute opened and it concludes invalid.

Let's see if it will pass on the next run.

}
Ok(())
}

/// Get the next best from the best effort queue.
///
/// If there are multiple best - just pick one.
fn pop_best_effort(&mut self) -> Option<ParticipationRequest> {
let best = self.best_effort.iter().reduce(|(hash1, entry1), (hash2, entry2)| {
if entry1.added_count > entry2.added_count {
(hash1, entry1)
} else {
(hash2, entry2)
}
});
if let Some((best_hash, _)) = best {
let best_hash = best_hash.clone();
self.best_effort.remove(&best_hash).map(|e| e.req)
} else {
None
}
/// Get best from the best effort queue.
fn pop_best_effort(&mut self) -> Option<(CandidateComparator, ParticipationRequest)> {
return Self::pop_impl(&mut self.best_effort)
}

/// Get best priority queue entry.
fn pop_priority(&mut self) -> Option<ParticipationRequest> {
fn pop_priority(&mut self) -> Option<(CandidateComparator, ParticipationRequest)> {
return Self::pop_impl(&mut self.priority)
}

// `pop_best_effort` and `pop_priority` do the same but on different `BTreeMap`s. This function has
// the extracted implementation
fn pop_impl(
target: &mut BTreeMap<CandidateComparator, ParticipationRequest>,
) -> Option<(CandidateComparator, ParticipationRequest)> {
// Once https://github.com/rust-lang/rust/issues/62924 is there, we can use a simple:
// priority.pop_first().
if let Some((comparator, _)) = self.priority.iter().next() {
// target.pop_first().
if let Some((comparator, _)) = target.iter().next() {
let comparator = comparator.clone();
self.priority.remove(&comparator)
target
.remove(&comparator)
.map(|participation_request| (comparator, participation_request))
} else {
None
}
}
}

/// Entry for the best effort queue.
struct BestEffortEntry {
req: ParticipationRequest,
/// How often was the above request added to the queue.
added_count: BestEffortCount,
}

/// `Comparator` for ordering of disputes for candidates.
///
/// This `comparator` makes it possible to order disputes based on age and to ensure some fairness
Expand All @@ -266,9 +233,12 @@ struct BestEffortEntry {
#[derive(Copy, Clone)]
#[cfg_attr(test, derive(Debug))]
struct CandidateComparator {
/// Block number of the relay parent.
/// Block number of the relay parent. It's wrapped in an `Option<>` because there are cases when
/// it can't be obtained. For example when the node is lagging behind and new leaves are received
/// with a slight delay. Candidates with unknown relay parent are treated with the lowest priority.
///
/// Important, so we will be participating in oldest disputes first.
/// The order enforced by `CandidateComparator` is important because we want to participate in
/// the oldest disputes first.
///
/// Note: In theory it would make more sense to use the `BlockNumber` of the including
/// block, as inclusion time is the actual relevant event when it comes to ordering. The
Expand All @@ -277,8 +247,9 @@ struct CandidateComparator {
/// just using the lowest `BlockNumber` of all available including blocks - the problem is,
/// that is not stable. If a new fork appears after the fact, we would start ordering the same
/// candidate differently, which would result in the same candidate getting queued twice.
relay_parent_block_number: BlockNumber,
relay_parent_block_number: Option<BlockNumber>,
/// By adding the `CandidateHash`, we can guarantee a unique ordering across candidates.
/// Additionally if `BlockNumber` can't be obtained the `CandidateHash` is used for ordering.
tdimitrov marked this conversation as resolved.
Show resolved Hide resolved
candidate_hash: CandidateHash,
}

Expand All @@ -287,33 +258,35 @@ impl CandidateComparator {
///
/// Useful for testing.
#[cfg(test)]
pub fn new_dummy(block_number: BlockNumber, candidate_hash: CandidateHash) -> Self {
pub fn new_dummy(block_number: Option<BlockNumber>, candidate_hash: CandidateHash) -> Self {
Self { relay_parent_block_number: block_number, candidate_hash }
}

/// Create a candidate comparator for a given candidate.
///
/// Returns:
/// `Ok(None)` in case we could not lookup the candidate's relay parent, returns a
/// `FatalError` in case the chain API call fails with an unexpected error.
/// - `Ok(CandidateComparator{Some(relay_parent_block_number), candidate_hash})` when the
/// relay parent can be obtained. This is the happy case.
/// - `Ok(CandidateComparator{None, candidate_hash})` in case the candidate's relay parent
/// can't be obtained.
/// - `FatalError` in case the chain API call fails with an unexpected error.
pub async fn new(
sender: &mut impl overseer::DisputeCoordinatorSenderTrait,
candidate: &CandidateReceipt,
) -> FatalResult<Option<Self>> {
) -> FatalResult<Self> {
let candidate_hash = candidate.hash();
let n = match get_block_number(sender, candidate.descriptor().relay_parent).await? {
None => {
gum::warn!(
target: LOG_TARGET,
candidate_hash = ?candidate_hash,
"Candidate's relay_parent could not be found via chain API - `CandidateComparator could not be provided!"
);
return Ok(None)
},
Some(n) => n,
};
let n = get_block_number(sender, candidate.descriptor().relay_parent).await?;

if n.is_none() {
gum::warn!(
target: LOG_TARGET,
candidate_hash = ?candidate_hash,
"Candidate's relay_parent could not be found via chain API - `CandidateComparator` \
with an empty relay parent block number will be provided!"
);
}

Ok(Some(CandidateComparator { relay_parent_block_number: n, candidate_hash }))
Ok(CandidateComparator { relay_parent_block_number: n, candidate_hash })
}
}

Expand All @@ -333,11 +306,28 @@ impl PartialOrd for CandidateComparator {

impl Ord for CandidateComparator {
fn cmp(&self, other: &Self) -> Ordering {
match self.relay_parent_block_number.cmp(&other.relay_parent_block_number) {
Ordering::Equal => (),
o => return o,
return match (self.relay_parent_block_number, other.relay_parent_block_number) {
(None, None) => {
// No relay parents for both -> compare hashes
self.candidate_hash.cmp(&other.candidate_hash)
},
(Some(_), Some(_)) => {
tdimitrov marked this conversation as resolved.
Show resolved Hide resolved
match self.relay_parent_block_number.cmp(&other.relay_parent_block_number) {
Ordering::Equal => (),
o => return o,
}
// if the relay parent is the same for both -> compare hashes
self.candidate_hash.cmp(&other.candidate_hash)
tdimitrov marked this conversation as resolved.
Show resolved Hide resolved
},
(Some(_), None) => {
// Candidates with known relay parents are always with priority
Ordering::Less
},
(None, Some(_)) => {
// Ditto
Ordering::Greater
},
}
self.candidate_hash.cmp(&other.candidate_hash)
}
}

Expand Down
Loading