Skip to content

Commit

Permalink
🚀perf: pre-check sector len > 0
Browse files Browse the repository at this point in the history
  • Loading branch information
swift-mx committed Sep 13, 2022
1 parent 508b6d2 commit 793b986
Showing 1 changed file with 74 additions and 47 deletions.
121 changes: 74 additions & 47 deletions actors/miner/src/expiration_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,58 +362,85 @@ impl<'db, BS: Blockstore> ExpirationQueue<'db, BS> {
sectors: Vec<SectorOnChainInfo>,
sector_size: SectorSize,
) -> anyhow::Result<PowerPair> {
let mut remaining: BTreeSet<SectorNumber> =
sectors.iter().map(|sector| sector.sector_number).collect();
let mut sectors_group = BTreeMap::<ChainEpoch, BitField>::new();

// Traverse the expiration queue once to find each recovering sector and remove it from early/faulty there.
// We expect this to find all recovering sectors within the first FaultMaxAge/WPoStProvingPeriod entries
// (i.e. 14 for 14-day faults), but if something has gone wrong it's safer not to fail if that's not met.
let mut sectors_rescheduled = Vec::<&SectorOnChainInfo>::new();
let mut recovered_power = PowerPair::zero();
let mut sectors_by_number = BTreeMap::<u64, &SectorOnChainInfo>::new();

for sector in sectors.iter() {
let epoch = self.quant.quantize_up(sector.expiration);
sectors_group.entry(epoch).or_insert(BitField::new()).set(sector.sector_number);
sectors_by_number.insert(sector.sector_number, sector);
}
for (epoch, sectors_field) in sectors_group {
let mut expiration_set = self.may_get(epoch)?;
let on_time_sectors = expiration_set.on_time_sectors.bitand(&sectors_field);
for sector_number in on_time_sectors.iter() {
let power =
power_for_sector(sector_size, sectors_by_number.get(&sector_number).unwrap());
// If the sector expires on-time at this epoch, leave it here but change faulty power to active.
// The pledge is already part of the on-time pledge at this entry.
expiration_set.faulty_power -= &power;
expiration_set.active_power += &power;

recovered_power += &power;
remaining.remove(&sector_number);
}
let mut early_sectors = expiration_set.early_sectors.bitand(&sectors_field);
early_sectors -= &on_time_sectors;
for sector_number in early_sectors.iter() {
let sector_info = sectors_by_number.get(&sector_number).unwrap();
let power = power_for_sector(sector_size, sector_info);
// If the sector expires early at this epoch, remove it for re-scheduling.
// It's not part of the on-time pledge number here.
expiration_set.early_sectors.unset(sector_number);
expiration_set.faulty_power -= &power;
sectors_rescheduled.push(sector_info);

recovered_power += &power;
remaining.remove(&sector_number);
if !sectors.is_empty() {
let mut remaining: BTreeSet<SectorNumber> =
sectors.iter().map(|sector| sector.sector_number).collect();

// Traverse the expiration queue once to find each recovering sector and remove it from early/faulty there.
// We expect this to find all recovering sectors within the first FaultMaxAge/WPoStProvingPeriod entries
// (i.e. 14 for 14-day faults), but if something has gone wrong it's safer not to fail if that's not met.
let mut sectors_rescheduled = Vec::<&SectorOnChainInfo>::new();

self.iter_while_mut(|_epoch, expiration_set| {

let on_time_sectors: BTreeSet<SectorNumber> = expiration_set
.on_time_sectors
.bounded_iter(ENTRY_SECTORS_MAX)
.context("too many sectors to reschedule")?
.map(|i| i as SectorNumber)
.collect();

let early_sectors: BTreeSet<SectorNumber> = expiration_set
.early_sectors
.bounded_iter(ENTRY_SECTORS_MAX)
.context("too many sectors to reschedule")?
.map(|i| i as SectorNumber)
.collect();

// This loop could alternatively be done by constructing bitfields and intersecting them, but it's not
// clear that would be much faster (O(max(N, M)) vs O(N+M)).
// If faults are correlated, the first queue entry likely has them all anyway.
// The length of sectors has a maximum of one partition size.
let bon_time = on_time_sectors.len() > 0;
let bearly = early_sectors.len() > 0;
let mut sectors_total = Vec::new();
for sector in sectors.iter() {
let sector_number = sector.sector_number;
let power = power_for_sector(sector_size, sector);
let mut found = false;

if bon_time && on_time_sectors.contains(&sector_number) {
found = true;
// If the sector expires on-time at this epoch, leave it here but change faulty power to active.
// The pledge is already part of the on-time pledge at this entry.
expiration_set.active_power += &power;
} else if bearly && early_sectors.contains(&sector_number) {
found = true;
// If the sector expires early at this epoch, remove it for re-scheduling.
// It's not part of the on-time pledge number here.
//expiration_set.early_sectors.unset(sector_number);
sectors_total.push(sector_number);
sectors_rescheduled.push(sector);
}

if found {
expiration_set.faulty_power -= &power;
recovered_power += &power;
remaining.remove(&sector_number);
}
}

if !sectors_total.is_empty() {
let early_sectors = BitField::try_from_bits(sectors_total)?;
expiration_set.early_sectors -= &early_sectors;
}

expiration_set.validate_state()?;

let keep_going = !remaining.is_empty();

Ok(keep_going)
})?;

if !remaining.is_empty() {
return Err(anyhow!("sectors not found in expiration queue: {:?}", remaining));
}
expiration_set.validate_state()?;
// Re-schedule the removed sectors to their target expiration.
self.add_active_sectors(sectors_rescheduled, sector_size)?;
}
if !remaining.is_empty() {
return Err(anyhow!("sectors not found in expiration queue: {:?}", remaining));
}

// Re-schedule the removed sectors to their target expiration.
self.add_active_sectors(sectors_rescheduled, sector_size)?;

Ok(recovered_power)
}
Expand Down

0 comments on commit 793b986

Please sign in to comment.