Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Add commmand for pruning states #4835

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 72 additions & 1 deletion beacon_node/beacon_chain/tests/store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::collections::HashSet;
use std::convert::TryInto;
use std::sync::Arc;
use std::time::Duration;
use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION};
use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION, STATE_UPPER_LIMIT_NO_RETAIN};
use store::{
chunked_vector::{chunk_key, Field},
get_key_for_col,
Expand Down Expand Up @@ -3306,6 +3306,77 @@ fn check_blob_existence(
}
}

#[tokio::test]
async fn prune_historic_states() {
let num_blocks_produced = E::slots_per_epoch() * 5;
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
let genesis_state_root = harness.chain.genesis_state_root;
let genesis_state = harness
.chain
.get_state(&genesis_state_root, None)
.unwrap()
.unwrap();

harness
.extend_chain(
num_blocks_produced as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;

// Check historical state is present.
let state_roots_iter = harness
.chain
.forwards_iter_state_roots(Slot::new(0))
.unwrap();
for (state_root, slot) in state_roots_iter
.take(E::slots_per_epoch() as usize)
.map(Result::unwrap)
{
assert!(store.get_state(&state_root, Some(slot)).unwrap().is_some());
}

store
.prune_historic_states(genesis_state_root, &genesis_state)
.unwrap();

// Check that anchor info is updated.
let anchor_info = store.get_anchor_info().unwrap();
assert_eq!(anchor_info.state_lower_limit, 0);
assert_eq!(anchor_info.state_upper_limit, STATE_UPPER_LIMIT_NO_RETAIN);

// Historical states should be pruned.
let state_roots_iter = harness
.chain
.forwards_iter_state_roots(Slot::new(1))
.unwrap();
for (state_root, slot) in state_roots_iter
.take(E::slots_per_epoch() as usize)
.map(Result::unwrap)
{
assert!(store.get_state(&state_root, Some(slot)).unwrap().is_none());
}

// Ensure that genesis state is still accessible
let genesis_state_root = harness.chain.genesis_state_root;
assert!(store
.get_state(&genesis_state_root, Some(Slot::new(0)))
.unwrap()
.is_some());

// Run for another two epochs.
let additional_blocks_produced = 2 * E::slots_per_epoch();
harness
.extend_slots(additional_blocks_produced as usize)
.await;

check_finalization(&harness, num_blocks_produced + additional_blocks_produced);
check_split_slot(&harness, store);
}

/// Checks that two chains are the same, for the purpose of these tests.
///
/// Several fields that are hard/impossible to check are ignored (e.g., the store).
Expand Down
88 changes: 88 additions & 0 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2226,6 +2226,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
Ok(())
}

/// This function fills in missing block roots between last restore point slot and split
/// slot, if any.
pub fn heal_freezer_block_roots(&self) -> Result<(), Error> {
let split = self.get_split_info();
let last_restore_point_slot = (split.slot - 1) / self.config.slots_per_restore_point
Expand Down Expand Up @@ -2254,6 +2256,92 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>

Ok(())
}

/// Delete *all* states from the freezer database and update the anchor accordingly.
///
/// WARNING: this method deletes the genesis state and replaces it with the provided
/// `genesis_state`. This is to support its use in schema migrations where the storage scheme of
/// the genesis state may be modified. It is the responsibility of the caller to ensure that the
/// genesis state is correct, else a corrupt database will be created.
pub fn prune_historic_states(
&self,
genesis_state_root: Hash256,
genesis_state: &BeaconState<E>,
) -> Result<(), Error> {
// Make sure there is no missing block roots before pruning
self.heal_freezer_block_roots()?;

// Update the anchor to use the dummy state upper limit and disable historic state storage.
let old_anchor = self.get_anchor_info();
let new_anchor = if let Some(old_anchor) = old_anchor.clone() {
AnchorInfo {
state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN,
..old_anchor.clone()
}
} else {
AnchorInfo {
anchor_slot: Slot::new(0),
oldest_block_slot: Slot::new(0),
oldest_block_parent: Hash256::zero(),
state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN,
state_lower_limit: Slot::new(0),
}
};

// Commit the anchor change immediately: if the cold database ops fail they can always be
// retried, and we can't do them atomically with this change anyway.
self.compare_and_set_anchor_info_with_write(old_anchor, Some(new_anchor))?;

// Stage freezer data for deletion. Do not bother loading and deserializing values as this
// wastes time and is less schema-agnostic. My hope is that this method will be useful for
// migrating to the tree-states schema (delete everything in the freezer then start afresh).
let mut cold_ops = vec![];

let columns = [
DBColumn::BeaconState,
DBColumn::BeaconStateSummary,
DBColumn::BeaconRestorePoint,
DBColumn::BeaconStateRoots,
DBColumn::BeaconHistoricalRoots,
DBColumn::BeaconRandaoMixes,
DBColumn::BeaconHistoricalSummaries,
];

for column in columns {
for res in self.cold_db.iter_column_keys::<Vec<u8>>(column) {
let key = res?;
cold_ops.push(KeyValueStoreOp::DeleteKey(get_key_for_col(
column.as_str(),
&key,
)));
}
}

// XXX: We need to commit the mass deletion here *before* re-storing the genesis state, as
// the current schema performs reads as part of `store_cold_state`. This can be deleted
// once the target schema is tree-states. If the process is killed before the genesis state
// is written this can be fixed by re-running.
info!(
self.log,
"Deleting historic states";
"num_kv" => cold_ops.len(),
);
self.cold_db.do_atomically(std::mem::take(&mut cold_ops))?;

// If we just deleted the the genesis state, re-store it using the *current* schema, which
// may be different from the schema of the genesis state we just deleted.
if self.get_split_slot() > 0 {
info!(
self.log,
"Re-storing genesis state";
"state_root" => ?genesis_state_root,
);
self.store_cold_state(&genesis_state_root, genesis_state, &mut cold_ops)?;
self.cold_db.do_atomically(cold_ops)?;
}

Ok(())
}
}

/// Advance the split point of the store, moving new finalized states to the freezer.
Expand Down
18 changes: 9 additions & 9 deletions beacon_node/store/src/leveldb_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
for (start_key, end_key) in [
endpoints(DBColumn::BeaconStateTemporary),
endpoints(DBColumn::BeaconState),
endpoints(DBColumn::BeaconStateSummary),
] {
self.db.compact(&start_key, &end_key);
}
Expand Down Expand Up @@ -225,23 +226,22 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
}

/// Iterate through all keys and values in a particular column.
fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter {
fn iter_column_keys<K: Key>(&self, column: DBColumn) -> ColumnKeyIter<K> {
let start_key =
BytesKey::from_vec(get_key_for_col(column.into(), Hash256::zero().as_bytes()));
BytesKey::from_vec(get_key_for_col(column.into(), &vec![0; column.key_size()]));

let iter = self.db.keys_iter(self.read_options());
iter.seek(&start_key);

Box::new(
iter.take_while(move |key| key.matches_column(column))
.map(move |bytes_key| {
let key =
bytes_key
.remove_column(column)
.ok_or(HotColdDBError::IterationError {
unexpected_key: bytes_key,
})?;
Ok(key)
let key = bytes_key.remove_column_variable(column).ok_or_else(|| {
HotColdDBError::IterationError {
unexpected_key: bytes_key.clone(),
}
})?;
K::from_bytes(key)
}),
)
}
Expand Down
7 changes: 4 additions & 3 deletions beacon_node/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use strum::{EnumString, IntoStaticStr};
pub use types::*;

pub type ColumnIter<'a, K> = Box<dyn Iterator<Item = Result<(K, Vec<u8>), Error>> + 'a>;
pub type ColumnKeyIter<'a> = Box<dyn Iterator<Item = Result<Hash256, Error>> + 'a>;
pub type ColumnKeyIter<'a, K> = Box<dyn Iterator<Item = Result<K, Error>> + 'a>;

pub type RawEntryIter<'a> = Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>), Error>> + 'a>;
pub type RawKeyIter<'a> = Box<dyn Iterator<Item = Result<Vec<u8>, Error>> + 'a>;
Expand Down Expand Up @@ -88,6 +88,7 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
self.iter_column_from(column, &vec![0; column.key_size()])
}

/// Iterate through all keys and values in a column from a given starting point.
fn iter_column_from<K: Key>(&self, column: DBColumn, from: &[u8]) -> ColumnIter<K>;

fn iter_raw_entries(&self, _column: DBColumn, _prefix: &[u8]) -> RawEntryIter {
Expand All @@ -99,7 +100,7 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
}

/// Iterate through all keys in a particular column.
fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter;
fn iter_column_keys<K: Key>(&self, column: DBColumn) -> ColumnKeyIter<K>;
}

pub trait Key: Sized + 'static {
Expand Down Expand Up @@ -274,7 +275,7 @@ impl DBColumn {
/// This function returns the number of bytes used by keys in a given column.
pub fn key_size(self) -> usize {
match self {
Self::OverflowLRUCache => 40,
Self::OverflowLRUCache => 33, // See `OverflowKey` encode impl.
Self::BeaconMeta
| Self::BeaconBlock
| Self::BeaconState
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/store/src/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl<E: EthSpec> KeyValueStore<E> for MemoryStore<E> {
}))
}

fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter {
fn iter_column_keys<K: Key>(&self, column: DBColumn) -> ColumnKeyIter<K> {
Box::new(self.iter_column(column).map(|res| res.map(|(k, _)| k)))
}

Expand Down
35 changes: 35 additions & 0 deletions book/src/database-migrations.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,38 @@ lighthouse db version --network mainnet
```

[run-correctly]: #how-to-run-lighthouse-db-correctly

## How to prune historic states

Pruning historic states helps in managing the disk space used by the Lighthouse beacon node by removing old beacon
states from the freezer database. This can be especially useful when the database has accumulated a significant amount
of historic data. This command is intended for nodes synced before 4.4.1, as newly synced node no longer store
historic states by default.

Here are the steps to prune historic states:

1. Before running the prune command, make sure that the Lighthouse beacon node is not running. If you are using systemd, you might stop the Lighthouse beacon node with a command like:

```bash
sudo systemctl stop lighthousebeacon
```

2. Use the `prune-states` command to prune the historic states. You can do a test run without the `--confirm` flag to check that the database can be pruned:

```bash
sudo -u "$LH_USER" lighthouse db prune-states --datadir "$LH_DATADIR" --network "$NET"
```

3. If you are ready to prune the states irreversibly, add the `--confirm` flag to commit the changes:

```bash
sudo -u "$LH_USER" lighthouse db prune-states --confirm --datadir "$LH_DATADIR" --network "$NET"
```

The `--confirm` flag ensures that you are aware the action is irreversible, and historic states will be permanently removed.

4. After successfully pruning the historic states, you can restart the Lighthouse beacon node:

```bash
sudo systemctl start lighthousebeacon
```
14 changes: 6 additions & 8 deletions consensus/fork_choice/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ impl ForkChoiceTest {
T: Fn(&BeaconForkChoiceStore<E, MemoryStore<E>, MemoryStore<E>>) -> U,
{
func(
&self
.harness
self.harness
.chain
.canonical_head
.fork_choice_read_lock()
Expand Down Expand Up @@ -386,8 +385,7 @@ impl ForkChoiceTest {
&self.harness.chain.spec,
self.harness.logger(),
)
.err()
.expect("on_block did not return an error");
.expect_err("on_block did not return an error");
comparison_func(err);
self
}
Expand Down Expand Up @@ -841,7 +839,7 @@ async fn valid_attestation() {
.apply_attestation_to_chain(
MutationDelay::NoDelay,
|_, _| {},
|result| assert_eq!(result.unwrap(), ()),
|result| assert!(result.is_ok()),
)
.await;
}
Expand Down Expand Up @@ -1074,7 +1072,7 @@ async fn invalid_attestation_delayed_slot() {
.apply_attestation_to_chain(
MutationDelay::NoDelay,
|_, _| {},
|result| assert_eq!(result.unwrap(), ()),
|result| assert!(result.is_ok()),
)
.await
.inspect_queued_attestations(|queue| assert_eq!(queue.len(), 1))
Expand Down Expand Up @@ -1183,7 +1181,7 @@ async fn weak_subjectivity_check_fails_early_epoch() {

let mut checkpoint = setup_harness.harness.finalized_checkpoint();

checkpoint.epoch = checkpoint.epoch - 1;
checkpoint.epoch -= 1;

let chain_config = ChainConfig {
weak_subjectivity_checkpoint: Some(checkpoint),
Expand All @@ -1210,7 +1208,7 @@ async fn weak_subjectivity_check_fails_late_epoch() {

let mut checkpoint = setup_harness.harness.finalized_checkpoint();

checkpoint.epoch = checkpoint.epoch + 1;
checkpoint.epoch += 1;

let chain_config = ChainConfig {
weak_subjectivity_checkpoint: Some(checkpoint),
Expand Down
Loading