Skip to content

Commit

Permalink
Blockstore: track when all primary-index data has been purged (#33668)
Browse files Browse the repository at this point in the history
* Fix typo

* Add Blockstore::highest_primary_index_slot

* Add getter

* Populate highest_primary_index_slot on boot

* Wipe highest_primary_index_slot when surpassed by oldest_slot

* Update highest_primary_index_slot in exact purge

* Return indexes early if highest_primary_index_slot has been cleared

* Limit read_transaction_status based on highest_primary_index_slot

* Limit read_transaction_memos based on highest_primary_index_slot

* Use highest_primary_index_slot to add early return to get_transaction_status_with_counter

* Fixup tests

* Use existing getter for highest_primary_index_slot

Co-authored-by: steviez <[email protected]>

---------

Co-authored-by: steviez <[email protected]>
  • Loading branch information
CriesofCarrots and steviez authored Oct 12, 2023
1 parent 0ad5199 commit d286c00
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 5 deletions.
64 changes: 61 additions & 3 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ pub struct Blockstore {
address_signatures_cf: LedgerColumn<cf::AddressSignatures>,
transaction_memos_cf: LedgerColumn<cf::TransactionMemos>,
transaction_status_index_cf: LedgerColumn<cf::TransactionStatusIndex>,
highest_primary_index_slot: RwLock<Option<Slot>>,
rewards_cf: LedgerColumn<cf::Rewards>,
blocktime_cf: LedgerColumn<cf::Blocktime>,
perf_samples_cf: LedgerColumn<cf::PerfSamples>,
Expand Down Expand Up @@ -343,6 +344,7 @@ impl Blockstore {
address_signatures_cf,
transaction_memos_cf,
transaction_status_index_cf,
highest_primary_index_slot: RwLock::<Option<Slot>>::default(),
rewards_cf,
blocktime_cf,
perf_samples_cf,
Expand All @@ -359,6 +361,7 @@ impl Blockstore {
slots_stats: SlotsStats::default(),
};
blockstore.cleanup_old_entries()?;
blockstore.update_highest_primary_index_slot()?;

Ok(blockstore)
}
Expand Down Expand Up @@ -2130,6 +2133,43 @@ impl Blockstore {
Ok(())
}

fn get_highest_primary_index_slot(&self) -> Option<Slot> {
*self.highest_primary_index_slot.read().unwrap()
}

fn set_highest_primary_index_slot(&self, slot: Option<Slot>) {
*self.highest_primary_index_slot.write().unwrap() = slot;
}

fn update_highest_primary_index_slot(&self) -> Result<()> {
let iterator = self.transaction_status_index_cf.iter(IteratorMode::Start)?;
let mut highest_primary_index_slot = None;
for (_, data) in iterator {
let meta: TransactionStatusIndexMeta = deserialize(&data).unwrap();
if highest_primary_index_slot.is_none()
|| highest_primary_index_slot.is_some_and(|slot| slot < meta.max_slot)
{
highest_primary_index_slot = Some(meta.max_slot);
}
}
if highest_primary_index_slot.is_some() {
self.set_highest_primary_index_slot(highest_primary_index_slot);
}
Ok(())
}

fn maybe_cleanup_highest_primary_index_slot(&self, oldest_slot: Slot) -> Result<()> {
let mut w_highest_primary_index_slot = self.highest_primary_index_slot.write().unwrap();
if let Some(highest_primary_index_slot) = *w_highest_primary_index_slot {
if oldest_slot > highest_primary_index_slot {
*w_highest_primary_index_slot = None;
self.transaction_status_index_cf.delete(0)?;
self.transaction_status_index_cf.delete(1)?;
}
}
Ok(())
}

fn read_deprecated_transaction_status(
&self,
index: (Signature, Slot),
Expand Down Expand Up @@ -2157,7 +2197,11 @@ impl Blockstore {
index: (Signature, Slot),
) -> Result<Option<TransactionStatusMeta>> {
let result = self.transaction_status_cf.get_protobuf(index)?;
if result.is_none() {
if result.is_none()
&& self
.get_highest_primary_index_slot()
.is_some_and(|highest_slot| highest_slot >= index.1)
{
self.read_deprecated_transaction_status(index)
} else {
Ok(result.and_then(|meta| meta.try_into().ok()))
Expand Down Expand Up @@ -2199,7 +2243,11 @@ impl Blockstore {
slot: Slot,
) -> Result<Option<String>> {
let memos = self.transaction_memos_cf.get((signature, slot))?;
if memos.is_none() {
if memos.is_none()
&& self
.get_highest_primary_index_slot()
.is_some_and(|highest_slot| highest_slot >= slot)
{
self.transaction_memos_cf
.get_raw(&cf::TransactionMemos::deprecated_key(signature))
} else {
Expand Down Expand Up @@ -2283,6 +2331,9 @@ impl Blockstore {
return Ok((status, counter));
}

if self.get_highest_primary_index_slot().is_none() {
return Ok((None, counter));
}
for transaction_status_cf_primary_index in 0..=1 {
let index_iterator =
self.transaction_status_cf
Expand Down Expand Up @@ -2595,7 +2646,7 @@ impl Blockstore {
let mut iterator =
self.address_signatures_cf
.iter_current_index_filtered(IteratorMode::From(
// Ragardless of whether a `before` signature is provided, the latest relevant
// Regardless of whether a `before` signature is provided, the latest relevant
// `slot` is queried directly with the `find_address_signatures_for_slot()`
// call above. Thus, this iterator starts at the lowest entry of `address,
// slot` and iterates backwards to continue reporting the next earliest
Expand Down Expand Up @@ -7690,6 +7741,7 @@ pub mod tests {
transaction_status_cf
.put_deprecated_protobuf((0, signature2, 4), &status)
.unwrap();
blockstore.set_highest_primary_index_slot(Some(4));

transaction_status_cf
.put_protobuf((signature3, 4), &status)
Expand Down Expand Up @@ -8149,6 +8201,12 @@ pub mod tests {
&AddressSignatureMeta { writeable: false },
)?;
}
let mut w_highest_primary_index_slot = self.highest_primary_index_slot.write().unwrap();
if w_highest_primary_index_slot.is_none()
|| w_highest_primary_index_slot.is_some_and(|highest_slot| highest_slot < slot)
{
*w_highest_primary_index_slot = Some(slot);
}
Ok(())
}
}
Expand Down
20 changes: 18 additions & 2 deletions ledger/src/blockstore/blockstore_purge.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use {
super::*, crate::blockstore_db::ColumnIndexDeprecation, solana_sdk::message::AccountKeys,
std::time::Instant,
super::*,
crate::blockstore_db::ColumnIndexDeprecation,
solana_sdk::message::AccountKeys,
std::{cmp::max, time::Instant},
};

#[derive(Default)]
Expand Down Expand Up @@ -73,6 +75,10 @@ impl Blockstore {
// with Slot::default() for initial compaction filter behavior consistency
let to_slot = to_slot.checked_add(1).unwrap();
self.db.set_oldest_slot(to_slot);

if let Err(err) = self.maybe_cleanup_highest_primary_index_slot(to_slot) {
warn!("Could not clean up TransactionStatusIndex: {err:?}");
}
}

pub fn purge_and_compact_slots(&self, from_slot: Slot, to_slot: Slot) {
Expand Down Expand Up @@ -364,8 +370,12 @@ impl Blockstore {

let mut index0 = self.transaction_status_index_cf.get(0)?.unwrap_or_default();
let mut index1 = self.transaction_status_index_cf.get(1)?.unwrap_or_default();
let highest_primary_index_slot = self.get_highest_primary_index_slot();
let slot_indexes = |slot: Slot| -> Vec<u64> {
let mut indexes = vec![];
if highest_primary_index_slot.is_none() {
return indexes;
}
if slot <= index0.max_slot && (index0.frozen || slot >= index1.max_slot) {
indexes.push(0);
}
Expand Down Expand Up @@ -431,13 +441,19 @@ impl Blockstore {
}
}
}
let mut update_highest_primary_index_slot = false;
if index0.max_slot >= from_slot && index0.max_slot <= to_slot {
index0.max_slot = from_slot.saturating_sub(1);
batch.put::<cf::TransactionStatusIndex>(0, &index0)?;
update_highest_primary_index_slot = true;
}
if index1.max_slot >= from_slot && index1.max_slot <= to_slot {
index1.max_slot = from_slot.saturating_sub(1);
batch.put::<cf::TransactionStatusIndex>(1, &index1)?;
update_highest_primary_index_slot = true
}
if update_highest_primary_index_slot {
self.set_highest_primary_index_slot(Some(max(index0.max_slot, index1.max_slot)))
}
Ok(())
}
Expand Down

0 comments on commit d286c00

Please sign in to comment.