Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

regenesis: migrate FuelBlockIdsToHeights #1860

Merged
merged 16 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

- [#1856](https://github.com/FuelLabs/fuel-core/pull/1856): Replaced instances of `Union` with `Enum` for GraphQL definitions of `ConsensusParametersVersion` and related types. This is needed because `Union` does not support multiple `Version`s inside discriminants or empty variants.

### Added

- [#1860](https://github.com/FuelLabs/fuel-core/pull/1860): Regenesis now preserves `FuelBlockIdsToHeights` off-chain table.

### Changed

- [#1832](https://github.com/FuelLabs/fuel-core/pull/1832): Snapshot generation can be cancelled. Progress is also reported.
Expand Down
7 changes: 7 additions & 0 deletions crates/fuel-core/src/combined_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,13 @@ impl CombinedDatabase {
Ok(())
}

/// Set the genesis flag that controls some consistency checks.
pub fn set_genesis_active(&self, active: bool) {
self.on_chain.set_genesis_active(active);
self.off_chain.set_genesis_active(active);
self.relayer.set_genesis_active(active);
}

pub fn on_chain(&self) -> &Database<OnChain> {
&self.on_chain
}
Expand Down
126 changes: 71 additions & 55 deletions crates/fuel-core/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ pub struct Database<Description = OnChain>
where
Description: DatabaseDescription,
{
/// Cached value from Metadata table, used to speed up lookups.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See 7b1141a#r1504911777 for the explanation behind this comment

height: SharedMutex<Option<Description::Height>>,
/// If set, some consistency checks are not performed.
genesis_active: SharedMutex<bool>,
data: DataSource<Description>,
}

Expand Down Expand Up @@ -128,15 +131,15 @@ where
Self: StorageInspect<MetadataTable<Description>, Error = StorageError>,
{
pub fn new(data_source: DataSource<Description>) -> Self {
let mut database = Self {
let database = Self {
height: SharedMutex::new(None),
genesis_active: SharedMutex::new(false),
data: data_source,
};
let height = database
.latest_height()
.expect("Failed to get latest height during creation of the database");

database.height = SharedMutex::new(height);
*database.height.lock() = height;

database
}
Expand All @@ -148,6 +151,11 @@ where

Ok(Database::new(Arc::new(db)))
}

/// Set the genesis flag that controls some consistency checks.
pub fn set_genesis_active(&self, active: bool) {
*self.genesis_active.lock() = active;
}
}

impl<Description> Database<Description>
Expand All @@ -157,8 +165,9 @@ where
pub fn in_memory() -> Self {
let data = Arc::<MemoryStore<Description>>::new(MemoryStore::default());
Self {
height: SharedMutex::new(None),
data,
height: SharedMutex::new(None),
genesis_active: SharedMutex::new(false),
}
}

Expand All @@ -167,8 +176,9 @@ where
let data =
Arc::<RocksDb<Description>>::new(RocksDb::default_open_temp(None).unwrap());
Self {
height: SharedMutex::new(None),
data,
height: SharedMutex::new(None),
genesis_active: SharedMutex::new(false),
}
}
}
Expand Down Expand Up @@ -378,58 +388,64 @@ where
for<'a> StorageTransaction<&'a &'a mut Database<Description>>:
StorageMutate<MetadataTable<Description>, Error = StorageError>,
{
// Gets the all new heights from the `changes`
let iterator = ChangesIterator::<Description>::new(&changes);
let new_heights = heights_lookup(&iterator)?;

// Changes for each block should be committed separately.
// If we have more than one height, it means we are mixing commits
// for several heights in one batch - return error in this case.
if new_heights.len() > 1 {
return Err(DatabaseError::MultipleHeightsInCommit {
heights: new_heights.iter().map(DatabaseHeight::as_u64).collect(),
}
.into());
}

let new_height = new_heights.into_iter().last();
let prev_height = *database.height.lock();

match (prev_height, new_height) {
(None, None) => {
// We are inside the regenesis process if the old and new heights are not set.
// In this case, we continue to commit until we discover a new height.
// This height will be the start of the database.
// DB consistency checks are only enforced when genesis flag is not set.
let new_height = if *database.genesis_active.lock() {
None
} else {
// Gets the all new heights from the `changes`
let iterator = ChangesIterator::<Description>::new(&changes);
let new_heights = heights_lookup(&iterator)?;

// Changes for each block should be committed separately.
// If we have more than one height, it means we are mixing commits
// for several heights in one batch - return error in this case.
if new_heights.len() > 1 {
return Err(DatabaseError::MultipleHeightsInCommit {
heights: new_heights.iter().map(DatabaseHeight::as_u64).collect(),
}
.into());
}
(Some(prev_height), Some(new_height)) => {
// Each new commit should be linked to the previous commit to create a monotonically growing database.

let next_expected_height = prev_height
.advance_height()
.ok_or(DatabaseError::FailedToAdvanceHeight)?;
let new_height = new_heights.into_iter().last();
let prev_height = *database.height.lock();

// TODO: After https://github.com/FuelLabs/fuel-core/issues/451
// we can replace `next_expected_height > new_height` with `next_expected_height != new_height`.
if next_expected_height > new_height {
return Err(DatabaseError::HeightsAreNotLinked {
match (prev_height, new_height) {
(None, None) => {
// We are inside the regenesis process if the old and new heights are not set.
// In this case, we continue to commit until we discover a new height.
// This height will be the start of the database.
}
(Some(prev_height), Some(new_height)) => {
// Each new commit should be linked to the previous commit to create a monotonically growing database.

let next_expected_height = prev_height
.advance_height()
.ok_or(DatabaseError::FailedToAdvanceHeight)?;

// TODO: After https://github.com/FuelLabs/fuel-core/issues/451
// we can replace `next_expected_height > new_height` with `next_expected_height != new_height`.
if next_expected_height > new_height {
return Err(DatabaseError::HeightsAreNotLinked {
prev_height: prev_height.as_u64(),
new_height: new_height.as_u64(),
}
.into());
}
}
(None, Some(_)) => {
// The new height is finally found; starting at this point,
// all next commits should be linked(the height should increase each time by one).
}
(Some(prev_height), None) => {
// In production, we shouldn't have cases where we call `commit_chagnes` with intermediate changes.
// The commit always should contain all data for the corresponding height.
return Err(DatabaseError::NewHeightIsNotSet {
prev_height: prev_height.as_u64(),
new_height: new_height.as_u64(),
}
.into());
}
}
(None, Some(_)) => {
// The new height is finally found; starting at this point,
// all next commits should be linked(the height should increase each time by one).
}
(Some(prev_height), None) => {
// In production, we shouldn't have cases where we call `commit_chagnes` with intermediate changes.
// The commit always should contain all data for the corresponding height.
return Err(DatabaseError::NewHeightIsNotSet {
prev_height: prev_height.as_u64(),
}
.into());
}
};
new_height
};

let updated_changes = if let Some(new_height) = new_height {
Expand All @@ -456,14 +472,14 @@ where
changes
};

// Atomically commit the changes to the database, and to the mutex-protected field.
let mut guard = database.height.lock();
database
.data
.as_ref()
.commit_changes(new_height, updated_changes)?;
database.data.as_ref().commit_changes(updated_changes)?;

// Update the block height
*guard = new_height;
if let Some(new_height) = new_height {
*guard = Some(new_height);
}

Ok(())
}
Expand Down
4 changes: 4 additions & 0 deletions crates/fuel-core/src/service/genesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ pub async fn execute_genesis_block(
let genesis_block = create_genesis_block(config);
tracing::info!("Genesis block created: {:?}", genesis_block.header());

db.set_genesis_active(true);

SnapshotImporter::import(
db.clone(),
genesis_block.clone(),
Expand All @@ -80,6 +82,8 @@ pub async fn execute_genesis_block(
)
.await?;

db.set_genesis_active(false);

let genesis_progress_on_chain: Vec<String> = db
.on_chain()
.iter_all::<GenesisMetadata<OnChain>>(None)
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/service/genesis/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ where
TableEntry<T>: serde::Serialize,
StateConfigBuilder: AddTable<T>,
DbDesc: DatabaseDescription<Column = T::Column>,
DbDesc::Height: Send,
DbDesc::Height: Send + Sync,
{
let mut writer = self.create_writer()?;
let group_size = self.group_size;
Expand Down
3 changes: 3 additions & 0 deletions crates/fuel-core/src/service/genesis/importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
},
fuel_core_graphql_api::storage::messages::SpentMessages,
graphql_api::storage::{
blocks::FuelBlockIdsToHeights,
coins::OwnedCoins,
contracts::ContractsInfo,
messages::OwnedMessageIds,
Expand Down Expand Up @@ -126,6 +127,8 @@ impl SnapshotImporter {
self.spawn_worker_off_chain::<OldFuelBlocks, OldFuelBlocks>()?;
self.spawn_worker_off_chain::<OldFuelBlockConsensus, OldFuelBlockConsensus>()?;
self.spawn_worker_off_chain::<OldTransactions, OldTransactions>()?;
self.spawn_worker_off_chain::<FuelBlocks, FuelBlockIdsToHeights>()?;
self.spawn_worker_off_chain::<OldFuelBlocks, FuelBlockIdsToHeights>()?;

self.task_manager.wait().await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,11 +550,7 @@ mod tests {
}

impl TransactableStorage<BlockHeight> for BrokenTransactions {
fn commit_changes(
&self,
_: Option<BlockHeight>,
_: Changes,
) -> StorageResult<()> {
fn commit_changes(&self, _: Changes) -> StorageResult<()> {
Err(anyhow::anyhow!("I refuse to work!").into())
}
}
Expand Down
36 changes: 36 additions & 0 deletions crates/fuel-core/src/service/genesis/importer/off_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,3 +291,39 @@ impl ImportTable for Handler<SpentMessages, SpentMessages> {
Ok(())
}
}

impl ImportTable for Handler<FuelBlockIdsToHeights, FuelBlocks> {
type TableInSnapshot = FuelBlocks;
type TableBeingWritten = FuelBlockIdsToHeights;
type DbDesc = OffChain;

fn process(
&mut self,
group: Vec<TableEntry<Self::TableInSnapshot>>,
tx: &mut StorageTransaction<&mut Database<Self::DbDesc>>,
) -> anyhow::Result<()> {
for entry in group {
tx.storage_as_mut::<FuelBlockIdsToHeights>()
.insert(&entry.value.id(), &entry.key)?;
}
Ok(())
}
}

impl ImportTable for Handler<FuelBlockIdsToHeights, OldFuelBlocks> {
type TableInSnapshot = OldFuelBlocks;
type TableBeingWritten = FuelBlockIdsToHeights;
type DbDesc = OffChain;

fn process(
&mut self,
group: Vec<TableEntry<Self::TableInSnapshot>>,
tx: &mut StorageTransaction<&mut Database<Self::DbDesc>>,
) -> anyhow::Result<()> {
for entry in group {
tx.storage_as_mut::<FuelBlockIdsToHeights>()
.insert(&entry.value.id(), &entry.key)?;
}
Ok(())
}
}
8 changes: 2 additions & 6 deletions crates/fuel-core/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,7 @@ where

pub trait TransactableStorage<Height>: IterableStore + Debug + Send + Sync {
/// Commits the changes into the storage.
fn commit_changes(
&self,
height: Option<Height>,
changes: Changes,
) -> StorageResult<()>;
fn commit_changes(&self, changes: Changes) -> StorageResult<()>;
Dentosal marked this conversation as resolved.
Show resolved Hide resolved
}

// It is used only to allow conversion of the `StorageTransaction` into the `DataSource`.
Expand All @@ -47,7 +43,7 @@ impl<Height, S> TransactableStorage<Height>
where
S: IterableStore + Debug + Send + Sync,
{
fn commit_changes(&self, _: Option<Height>, _: Changes) -> StorageResult<()> {
fn commit_changes(&self, _: Changes) -> StorageResult<()> {
unimplemented!()
}
}
Expand Down
10 changes: 3 additions & 7 deletions crates/fuel-core/src/state/in_memory/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,7 @@ impl<Description> TransactableStorage<Description::Height> for MemoryStore<Descr
where
Description: DatabaseDescription,
{
fn commit_changes(
&self,
_: Option<Description::Height>,
changes: Changes,
) -> StorageResult<()> {
fn commit_changes(&self, changes: Changes) -> StorageResult<()> {
for (column, btree) in changes.into_iter() {
let mut lock = self.inner[column as usize]
.lock()
Expand Down Expand Up @@ -162,15 +158,15 @@ mod tests {
let mut transaction = self.read_transaction();
let len = transaction.write(key, column, buf)?;
let changes = transaction.into_changes();
self.commit_changes(Default::default(), changes)?;
self.commit_changes(changes)?;
Ok(len)
}

fn delete(&mut self, key: &[u8], column: Self::Column) -> StorageResult<()> {
let mut transaction = self.read_transaction();
transaction.delete(key, column)?;
let changes = transaction.into_changes();
self.commit_changes(Default::default(), changes)?;
self.commit_changes(changes)?;
Ok(())
}
}
Expand Down
Loading
Loading