Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix duties override bug in VC #5305

Merged
merged 5 commits into from
Mar 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 127 additions & 68 deletions validator_client/src/duties_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,43 +122,37 @@ pub struct SubscriptionSlots {
slots: Vec<(Slot, AtomicBool)>,
}

impl DutyAndProof {
/// Instantiate `Self`, computing the selection proof as well.
pub async fn new_with_selection_proof<T: SlotClock + 'static, E: EthSpec>(
duty: AttesterData,
validator_store: &ValidatorStore<T, E>,
spec: &ChainSpec,
) -> Result<Self, Error> {
let selection_proof = validator_store
.produce_selection_proof(duty.pubkey, duty.slot)
.await
.map_err(Error::FailedToProduceSelectionProof)?;

let selection_proof = selection_proof
.is_aggregator(duty.committee_length as usize, spec)
.map_err(Error::InvalidModulo)
.map(|is_aggregator| {
if is_aggregator {
Some(selection_proof)
} else {
// Don't bother storing the selection proof if the validator isn't an
// aggregator, we won't need it.
None
}
})?;

let subscription_slots = SubscriptionSlots::new(duty.slot);

Ok(Self {
duty,
selection_proof,
subscription_slots,
/// Create a selection proof for `duty`.
///
/// Return `Ok(None)` if the attesting validator is not an aggregator.
async fn make_selection_proof<T: SlotClock + 'static, E: EthSpec>(
duty: &AttesterData,
validator_store: &ValidatorStore<T, E>,
spec: &ChainSpec,
) -> Result<Option<SelectionProof>, Error> {
let selection_proof = validator_store
.produce_selection_proof(duty.pubkey, duty.slot)
.await
.map_err(Error::FailedToProduceSelectionProof)?;

selection_proof
.is_aggregator(duty.committee_length as usize, spec)
.map_err(Error::InvalidModulo)
.map(|is_aggregator| {
if is_aggregator {
Some(selection_proof)
} else {
// Don't bother storing the selection proof if the validator isn't an
// aggregator, we won't need it.
None
}
})
}
}

impl DutyAndProof {
/// Create a new `DutyAndProof` with the selection proof waiting to be filled in.
pub fn new_without_selection_proof(duty: AttesterData) -> Self {
let subscription_slots = SubscriptionSlots::new(duty.slot);
pub fn new_without_selection_proof(duty: AttesterData, current_slot: Slot) -> Self {
let subscription_slots = SubscriptionSlots::new(duty.slot, current_slot);
Self {
duty,
selection_proof: None,
Expand All @@ -168,10 +162,13 @@ impl DutyAndProof {
}

impl SubscriptionSlots {
fn new(duty_slot: Slot) -> Arc<Self> {
fn new(duty_slot: Slot, current_slot: Slot) -> Arc<Self> {
let slots = ATTESTATION_SUBSCRIPTION_OFFSETS
.into_iter()
.filter_map(|offset| duty_slot.safe_sub(offset).ok())
// Keep only scheduled slots that haven't happened yet. This avoids sending expired
// subscriptions.
.filter(|scheduled_slot| *scheduled_slot > current_slot)
.map(|scheduled_slot| (scheduled_slot, AtomicBool::new(false)))
.collect();
Arc::new(Self { slots })
Expand Down Expand Up @@ -787,14 +784,14 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
// request for extra data unless necessary in order to save on network bandwidth.
let uninitialized_validators =
get_uninitialized_validators(duties_service, &epoch, local_pubkeys);
let indices_to_request = if !uninitialized_validators.is_empty() {
let initial_indices_to_request = if !uninitialized_validators.is_empty() {
uninitialized_validators.as_slice()
} else {
&local_indices[0..min(INITIAL_DUTIES_QUERY_SIZE, local_indices.len())]
};

let response =
post_validator_duties_attester(duties_service, epoch, indices_to_request).await?;
post_validator_duties_attester(duties_service, epoch, initial_indices_to_request).await?;
let dependent_root = response.dependent_root;

// Find any validators which have conflicting (epoch, dependent_root) values or missing duties for the epoch.
Expand All @@ -818,24 +815,29 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
return Ok(());
}

// Filter out validators which have already been requested.
let initial_duties = &response.data;
// Make a request for all indices that require updating which we have not already made a request
// for.
let indices_to_request = validators_to_update
.iter()
.filter(|&&&pubkey| !initial_duties.iter().any(|duty| duty.pubkey == pubkey))
.filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey))
.filter(|validator_index| !initial_indices_to_request.contains(validator_index))
.collect::<Vec<_>>();

let new_duties = if !indices_to_request.is_empty() {
// Filter the initial duties by their relevance so that we don't hit the warning below about
// overwriting duties. There was previously a bug here.
let new_initial_duties = response
.data
.into_iter()
.filter(|duty| validators_to_update.contains(&&duty.pubkey));

let mut new_duties = if !indices_to_request.is_empty() {
post_validator_duties_attester(duties_service, epoch, indices_to_request.as_slice())
.await?
.data
.into_iter()
.chain(response.data)
.collect::<Vec<_>>()
} else {
response.data
vec![]
};
new_duties.extend(new_initial_duties);

drop(fetch_timer);

Expand All @@ -854,26 +856,53 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
// Update the duties service with the new `DutyAndProof` messages.
let mut attesters = duties_service.attesters.write();
let mut already_warned = Some(());
let current_slot = duties_service
.slot_clock
.now_or_genesis()
.unwrap_or_default();
for duty in &new_duties {
let attester_map = attesters.entry(duty.pubkey).or_default();

// Create initial entries in the map without selection proofs. We'll compute them in the
// background later to avoid creating a thundering herd of signing threads whenever new
// duties are computed.
let duty_and_proof = DutyAndProof::new_without_selection_proof(duty.clone());
let duty_and_proof = DutyAndProof::new_without_selection_proof(duty.clone(), current_slot);

match attester_map.entry(epoch) {
hash_map::Entry::Occupied(mut occupied) => {
let mut_value = occupied.get_mut();
let (prior_dependent_root, prior_duty_and_proof) = &mut_value;

// Guard against overwriting an existing value for the same duty. If we did
// overwrite we could lose a selection proof or information from
// `subscription_slots`. Hitting this branch should be prevented by our logic for
// fetching duties only for unknown indices.
if dependent_root == *prior_dependent_root
&& prior_duty_and_proof.duty == duty_and_proof.duty
{
warn!(
log,
"Redundant attester duty update";
"dependent_root" => %dependent_root,
"validator_index" => duty.validator_index,
);
continue;
}

if let Some((prior_dependent_root, _)) =
attester_map.insert(epoch, (dependent_root, duty_and_proof))
{
// Using `already_warned` avoids excessive logs.
if dependent_root != prior_dependent_root && already_warned.take().is_some() {
warn!(
log,
"Attester duties re-org";
"prior_dependent_root" => %prior_dependent_root,
"dependent_root" => %dependent_root,
"msg" => "this may happen from time to time"
)
// Using `already_warned` avoids excessive logs.
if dependent_root != *prior_dependent_root && already_warned.take().is_some() {
warn!(
log,
"Attester duties re-org";
"prior_dependent_root" => %prior_dependent_root,
"dependent_root" => %dependent_root,
"msg" => "this may happen from time to time"
)
}
*mut_value = (dependent_root, duty_and_proof);
}
hash_map::Entry::Vacant(vacant) => {
vacant.insert((dependent_root, duty_and_proof));
}
}
}
Expand Down Expand Up @@ -1030,20 +1059,21 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
// Sign selection proofs (serially).
let duty_and_proof_results = stream::iter(relevant_duties.into_values().flatten())
.then(|duty| async {
DutyAndProof::new_with_selection_proof(
duty,
let opt_selection_proof = make_selection_proof(
&duty,
&duties_service.validator_store,
&duties_service.spec,
)
.await
.await?;
Ok((duty, opt_selection_proof))
})
.collect::<Vec<_>>()
.await;

// Add to attesters store.
let mut attesters = duties_service.attesters.write();
for result in duty_and_proof_results {
let duty_and_proof = match result {
let (duty, selection_proof) = match result {
Ok(duty_and_proof) => duty_and_proof,
Err(Error::FailedToProduceSelectionProof(
ValidatorStoreError::UnknownPubkey(pubkey),
Expand Down Expand Up @@ -1071,12 +1101,12 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
}
};

let attester_map = attesters.entry(duty_and_proof.duty.pubkey).or_default();
let epoch = duty_and_proof.duty.slot.epoch(E::slots_per_epoch());
let attester_map = attesters.entry(duty.pubkey).or_default();
let epoch = duty.slot.epoch(E::slots_per_epoch());
match attester_map.entry(epoch) {
hash_map::Entry::Occupied(mut entry) => {
// No need to update duties for which no proof was computed.
let Some(selection_proof) = duty_and_proof.selection_proof else {
let Some(selection_proof) = selection_proof else {
continue;
};

Expand All @@ -1097,6 +1127,14 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
}
}
hash_map::Entry::Vacant(entry) => {
// This probably shouldn't happen, but we have enough info to fill in the
// entry so we may as well.
let subscription_slots = SubscriptionSlots::new(duty.slot, current_slot);
let duty_and_proof = DutyAndProof {
duty,
selection_proof,
subscription_slots,
};
entry.insert((dependent_root, duty_and_proof));
}
}
Expand Down Expand Up @@ -1320,13 +1358,15 @@ mod test {

#[test]
fn subscription_slots_exact() {
// Set current slot in the past so no duties are considered expired.
let current_slot = Slot::new(0);
for duty_slot in [
Slot::new(32),
Slot::new(33),
Slot::new(47),
Slot::new(99),
Slot::new(1002003),
] {
let subscription_slots = SubscriptionSlots::new(duty_slot);
let subscription_slots = SubscriptionSlots::new(duty_slot, current_slot);

// Run twice to check idempotence (subscription slots shouldn't be marked as done until
// we mark them manually).
Expand Down Expand Up @@ -1360,8 +1400,9 @@ mod test {
#[test]
fn subscription_slots_mark_multiple() {
for (i, offset) in ATTESTATION_SUBSCRIPTION_OFFSETS.into_iter().enumerate() {
let current_slot = Slot::new(0);
let duty_slot = Slot::new(64);
let subscription_slots = SubscriptionSlots::new(duty_slot);
let subscription_slots = SubscriptionSlots::new(duty_slot, current_slot);

subscription_slots.record_successful_subscription_at(duty_slot - offset);

Expand All @@ -1376,4 +1417,22 @@ mod test {
}
}
}

/// Test the boundary condition where all subscription slots are *just* expired.
#[test]
fn subscription_slots_expired() {
let current_slot = Slot::new(100);
let duty_slot = current_slot + ATTESTATION_SUBSCRIPTION_OFFSETS[0];
let subscription_slots = SubscriptionSlots::new(duty_slot, current_slot);
for offset in ATTESTATION_SUBSCRIPTION_OFFSETS.into_iter() {
let slot = duty_slot - offset;
assert!(!subscription_slots.should_send_subscription_at(slot));
}
assert!(subscription_slots.slots.is_empty());

// If the duty slot is 1 later, we get a non-empty set of duties.
let subscription_slots = SubscriptionSlots::new(duty_slot + 1, current_slot);
assert_eq!(subscription_slots.slots.len(), 1);
assert!(subscription_slots.should_send_subscription_at(current_slot + 1),);
}
}
Loading