Skip to content

Commit

Permalink
🚀perf: pre-grouping sector by expiration time
Browse files Browse the repository at this point in the history
  • Loading branch information
swift-mx committed Sep 6, 2022
1 parent d5e46c0 commit 508b6d2
Showing 1 changed file with 13 additions and 22 deletions.
35 changes: 13 additions & 22 deletions actors/miner/src/expiration_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use std::collections::{BTreeMap, BTreeSet};
use std::convert::TryInto;
use std::ops::BitAnd;

use anyhow::{anyhow, Context};
use cid::Cid;
Expand Down Expand Up @@ -363,26 +364,23 @@ impl<'db, BS: Blockstore> ExpirationQueue<'db, BS> {
) -> anyhow::Result<PowerPair> {
let mut remaining: BTreeSet<SectorNumber> =
sectors.iter().map(|sector| sector.sector_number).collect();
let mut sectors_group = BTreeMap::<ChainEpoch, BitField>::new();

// Traverse the expiration queue once to find each recovering sector and remove it from early/faulty there.
// We expect this to find all recovering sectors within the first FaultMaxAge/WPoStProvingPeriod entries
// (i.e. 14 for 14-day faults), but if something has gone wrong it's safer not to fail if that's not met.
let mut sectors_rescheduled = Vec::<&SectorOnChainInfo>::new();
let mut recovered_power = PowerPair::zero();

let mut sectors_field = BitField::new();
let mut sectors_by_number = BTreeMap::<u64, &SectorOnChainInfo>::new();

for sector in &sectors {
sectors_field.set(sector.sector_number);
for sector in sectors.iter() {
let epoch = self.quant.quantize_up(sector.expiration);
sectors_group.entry(epoch).or_insert(BitField::new()).set(sector.sector_number);
sectors_by_number.insert(sector.sector_number, sector);
}

self.iter_while_mut(|_epoch, expiration_set| {
let on_time_sectors = &sectors_field & &expiration_set.on_time_sectors;

// If faults are correlated, the first queue entry likely has them all anyway.
// The length of sectors has a maximum of one partition size.
for (epoch, sectors_field) in sectors_group {
let mut expiration_set = self.may_get(epoch)?;
let on_time_sectors = expiration_set.on_time_sectors.bitand(&sectors_field);
for sector_number in on_time_sectors.iter() {
let power =
power_for_sector(sector_size, sectors_by_number.get(&sector_number).unwrap());
Expand All @@ -394,29 +392,22 @@ impl<'db, BS: Blockstore> ExpirationQueue<'db, BS> {
recovered_power += &power;
remaining.remove(&sector_number);
}

let mut early_sectors = &sectors_field & (&expiration_set.early_sectors);
let mut early_sectors = expiration_set.early_sectors.bitand(&sectors_field);
early_sectors -= &on_time_sectors;

for sector_number in early_sectors.iter() {
let power =
power_for_sector(sector_size, sectors_by_number.get(&sector_number).unwrap());
let sector_info = sectors_by_number.get(&sector_number).unwrap();
let power = power_for_sector(sector_size, sector_info);
// If the sector expires early at this epoch, remove it for re-scheduling.
// It's not part of the on-time pledge number here.
expiration_set.early_sectors.unset(sector_number);
expiration_set.faulty_power -= &power;
sectors_rescheduled.push(sectors_by_number.get(&sector_number).unwrap());
sectors_rescheduled.push(sector_info);

recovered_power += &power;
remaining.remove(&sector_number);
}

expiration_set.validate_state()?;

let keep_going = !remaining.is_empty();
Ok(keep_going)
})?;

}
if !remaining.is_empty() {
return Err(anyhow!("sectors not found in expiration queue: {:?}", remaining));
}
Expand Down

0 comments on commit 508b6d2

Please sign in to comment.