Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

regenesis: migrate FuelBlockIdsToHeights #1860

Merged
merged 16 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 151 additions & 83 deletions crates/fuel-core/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use fuel_core_storage::{
Changes,
ConflictPolicy,
Modifiable,
ModifyHeightPolicy,
StorageTransaction,
},
Error as StorageError,
Expand Down Expand Up @@ -299,42 +300,126 @@ impl AtomicView for Database<Relayer> {
}

impl Modifiable for Database<OnChain> {
fn commit_changes(&mut self, changes: Changes) -> StorageResult<()> {
commit_changes_with_height_update(self, changes, |iter| {
iter.iter_all::<FuelBlocks>(Some(IterDirection::Reverse))
.map(|result| result.map(|(height, _)| height))
.try_collect()
})
fn commit_changes(
&mut self,
changes: Changes,
height_policy: ModifyHeightPolicy,
) -> StorageResult<()> {
let new_height = match dbg!(height_policy) {
ModifyHeightPolicy::Ignore => UpdateHeight::Ignore,
ModifyHeightPolicy::Update => {
let mut new_heights: Vec<_> = ChangesIterator::<OnChain>::new(&changes)
.iter_all::<FuelBlocks>(Some(IterDirection::Reverse))
.map(|result| result.map(|(height, _)| height))
.try_collect()?;
new_heights.dedup();

// Changes for each block should be committed separately.
// If we have more than one height, it means we are mixing commits
// for several heights in one batch - return error in this case.
if new_heights.len() > 1 {
return Err(DatabaseError::MultipleHeightsInCommit {
heights: new_heights
.iter()
.map(|h| u32::from(*h).into())
.collect(),
}
.into());
}
new_heights
.iter()
.next()
.copied()
.map(UpdateHeight::SetIncremented)
.unwrap_or(UpdateHeight::NoChange)
}
};
commit_changes_with_height_update(self, changes, new_height)
}
}

impl Modifiable for Database<OffChain> {
fn commit_changes(&mut self, changes: Changes) -> StorageResult<()> {
commit_changes_with_height_update(self, changes, |iter| {
iter.iter_all::<FuelBlockIdsToHeights>(Some(IterDirection::Reverse))
.map(|result| result.map(|(_, height)| height))
.try_collect()
})
fn commit_changes(
&mut self,
changes: Changes,
height_policy: ModifyHeightPolicy,
) -> StorageResult<()> {
let new_height = match dbg!(height_policy) {
ModifyHeightPolicy::Ignore => UpdateHeight::Ignore,
ModifyHeightPolicy::Update => {
let mut new_heights: Vec<_> = ChangesIterator::<OffChain>::new(&changes)
.iter_all::<FuelBlockIdsToHeights>(Some(IterDirection::Reverse))
.map(|result| result.map(|(_, height)| height))
.try_collect()?;
new_heights.dedup();

// Changes for each block should be committed separately.
// If we have more than one height, it means we are mixing commits
// for several heights in one batch - return error in this case.
if new_heights.len() > 1 {
return Err(DatabaseError::MultipleHeightsInCommit {
heights: new_heights
.iter()
.map(|h| u32::from(*h).into())
.collect(),
}
.into());
}
new_heights
.iter()
.next()
.copied()
.map(UpdateHeight::SetIncremented)
.unwrap_or(UpdateHeight::NoChange)
}
};
commit_changes_with_height_update(self, changes, new_height)
}
}

#[cfg(feature = "relayer")]
impl Modifiable for Database<Relayer> {
fn commit_changes(&mut self, changes: Changes) -> StorageResult<()> {
commit_changes_with_height_update(self, changes, |iter| {
iter.iter_all::<fuel_core_relayer::storage::EventsHistory>(Some(
IterDirection::Reverse,
))
.map(|result| result.map(|(height, _)| height))
.try_collect()
})
fn commit_changes(
&mut self,
changes: Changes,
height_policy: ModifyHeightPolicy,
) -> StorageResult<()> {
let new_height = match dbg!(height_policy) {
ModifyHeightPolicy::Ignore => UpdateHeight::Ignore,
ModifyHeightPolicy::Update => {
let mut new_heights: Vec<_> = ChangesIterator::<Relayer>::new(&changes)
.iter_all::<fuel_core_relayer::storage::EventsHistory>(Some(
IterDirection::Reverse,
))
.map(|result| result.map(|(height, _)| height))
.try_collect()?;
new_heights.dedup();

// Changes for each block should be committed separately.
// If we have more than one height, it means we are mixing commits
// for several heights in one batch - return error in this case.
if new_heights.len() > 1 {
return Err(DatabaseError::MultipleHeightsInCommit {
heights: new_heights.iter().map(|h| u64::from(*h)).collect(),
}
.into());
}
new_heights
.iter()
.next()
.copied()
.map(UpdateHeight::SetIncremented)
.unwrap_or(UpdateHeight::NoChange)
}
};
commit_changes_with_height_update(self, changes, new_height)
}
}

#[cfg(not(feature = "relayer"))]
impl Modifiable for Database<Relayer> {
fn commit_changes(&mut self, changes: Changes) -> StorageResult<()> {
commit_changes_with_height_update(self, changes, |_| Ok(vec![]))
commit_changes_with_height_update(self, changes, vec![])
}
}

Expand Down Expand Up @@ -365,77 +450,64 @@ impl DatabaseHeight for DaBlockHeight {
}
}

enum UpdateHeight<Height> {
/// Set new height, that must be one more than the previous one.
SetIncremented(Height),
/// This commit doesn't contain any height.
/// This causes an error if the previous height is set.
NoChange,
/// Do not perform any height update.
Ignore,
}

fn commit_changes_with_height_update<Description>(
database: &mut Database<Description>,
changes: Changes,
heights_lookup: impl Fn(
&ChangesIterator<Description>,
) -> StorageResult<Vec<Description::Height>>,
mut changes: Changes,
update_height: UpdateHeight<Description::Height>,
) -> StorageResult<()>
where
Description: DatabaseDescription,
Description::Height: Debug + PartialOrd + DatabaseHeight,
for<'a> StorageTransaction<&'a &'a mut Database<Description>>:
StorageMutate<MetadataTable<Description>, Error = StorageError>,
{
// Gets the all new heights from the `changes`
let iterator = ChangesIterator::<Description>::new(&changes);
let new_heights = heights_lookup(&iterator)?;

// Changes for each block should be committed separately.
// If we have more than one height, it means we are mixing commits
// for several heights in one batch - return error in this case.
if new_heights.len() > 1 {
return Err(DatabaseError::MultipleHeightsInCommit {
heights: new_heights.iter().map(DatabaseHeight::as_u64).collect(),
}
.into());
}

let new_height = new_heights.into_iter().last();
let prev_height = *database.height.lock();

match (prev_height, new_height) {
(None, None) => {
// We are inside the regenesis process if the old and new heights are not set.
// In this case, we continue to commit until we discover a new height.
// This height will be the start of the database.
let new_height = match update_height {
UpdateHeight::SetIncremented(new_height) => {
// If height is already set, check that the new height is valid
if let Some(prev_height) = prev_height {
// Each new commit should be linked to the previous commit to create a monotonically growing database.
let next_expected_height = prev_height
.advance_height()
.ok_or(DatabaseError::FailedToAdvanceHeight)?;

// TODO: After https://github.com/FuelLabs/fuel-core/issues/451
// we can replace `next_expected_height > new_height` with `next_expected_height != new_height`.
if next_expected_height > new_height {
return Err(DatabaseError::HeightsAreNotLinked {
prev_height: prev_height.as_u64(),
new_height: new_height.as_u64(),
}
.into());
}
}
Some(new_height)
}
(Some(prev_height), Some(new_height)) => {
// Each new commit should be linked to the previous commit to create a monotonically growing database.

let next_expected_height = prev_height
.advance_height()
.ok_or(DatabaseError::FailedToAdvanceHeight)?;

// TODO: After https://github.com/FuelLabs/fuel-core/issues/451
// we can replace `next_expected_height > new_height` with `next_expected_height != new_height`.
if next_expected_height > new_height {
return Err(DatabaseError::HeightsAreNotLinked {
UpdateHeight::NoChange => {
if let Some(prev_height) = prev_height {
return Err(DatabaseError::NewHeightIsNotSet {
prev_height: prev_height.as_u64(),
new_height: new_height.as_u64(),
}
.into());
} else {
None
}
}
(None, Some(_)) => {
// The new height is finally found; starting at this point,
// all next commits should be linked(the height should increase each time by one).
}
(Some(prev_height), None) => {
// In production, we shouldn't have cases where we call `commit_chagnes` with intermediate changes.
// The commit always should contain all data for the corresponding height.
return Err(DatabaseError::NewHeightIsNotSet {
prev_height: prev_height.as_u64(),
}
.into());
}
UpdateHeight::Ignore => None,
};

let updated_changes = if let Some(new_height) = new_height {
// We want to update the metadata table to include a new height.
// For that, we are building a new storage transaction around `changes`.
// Modifying this transaction will include all required updates into the `changes`.
if let Some(new_height) = new_height {
// We want to also update the metadata table to include a new height.
let mut transaction = StorageTransaction::transaction(
&database,
ConflictPolicy::Overwrite,
Expand All @@ -450,20 +522,16 @@ where
height: new_height,
},
)?;

transaction.into_changes()
} else {
changes
changes = transaction.into_changes();
};

let mut guard = database.height.lock();
database
.data
.as_ref()
.commit_changes(new_height, updated_changes)?;
database.data.as_ref().commit_changes(new_height, changes)?;

// Update the block height
*guard = new_height;
if new_height.is_some() {
*guard = new_height;
}

Ok(())
}
Expand Down
16 changes: 8 additions & 8 deletions crates/fuel-core/src/database/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ where
Default::default(),
);
let prev = transaction.storage_as_mut::<M>().insert(key, value)?;
self.commit_changes(transaction.into_changes())?;
self.commit_changes(transaction.into_changes(), Default::default())?;
Ok(prev)
}

Expand All @@ -87,7 +87,7 @@ where
Default::default(),
);
let prev = transaction.storage_as_mut::<M>().remove(key)?;
self.commit_changes(transaction.into_changes())?;
self.commit_changes(transaction.into_changes(), Default::default())?;
Ok(prev)
}
}
Expand Down Expand Up @@ -145,7 +145,7 @@ where
Default::default(),
);
let prev = <_ as StorageWrite<M>>::write(&mut transaction, key, buf)?;
self.commit_changes(transaction.into_changes())?;
self.commit_changes(transaction.into_changes(), Default::default())?;
Ok(prev)
}

Expand All @@ -160,7 +160,7 @@ where
Default::default(),
);
let prev = <_ as StorageWrite<M>>::replace(&mut transaction, key, buf)?;
self.commit_changes(transaction.into_changes())?;
self.commit_changes(transaction.into_changes(), Default::default())?;
Ok(prev)
}

Expand All @@ -171,7 +171,7 @@ where
Default::default(),
);
let prev = <_ as StorageWrite<M>>::take(&mut transaction, key)?;
self.commit_changes(transaction.into_changes())?;
self.commit_changes(transaction.into_changes(), Default::default())?;
Ok(prev)
}
}
Expand All @@ -197,7 +197,7 @@ where
Default::default(),
);
StorageBatchMutate::init_storage(&mut transaction, set)?;
self.commit_changes(transaction.into_changes())?;
self.commit_changes(transaction.into_changes(), Default::default())?;
Ok(())
}

Expand All @@ -213,7 +213,7 @@ where
Default::default(),
);
StorageBatchMutate::insert_batch(&mut transaction, set)?;
self.commit_changes(transaction.into_changes())?;
self.commit_changes(transaction.into_changes(), Default::default())?;
Ok(())
}

Expand All @@ -228,7 +228,7 @@ where
Default::default(),
);
StorageBatchMutate::remove_batch(&mut transaction, set)?;
self.commit_changes(transaction.into_changes())?;
self.commit_changes(transaction.into_changes(), Default::default())?;
Ok(())
}
}
6 changes: 3 additions & 3 deletions crates/fuel-core/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ mod tests {
.into();
producer
.storage_view_provider
.commit_changes(changes)
.commit_changes(changes, Default::default())
.unwrap();

assert_eq!(skipped_transactions.len(), 1);
Expand Down Expand Up @@ -563,7 +563,7 @@ mod tests {
.into();
producer
.storage_view_provider
.commit_changes(changes)
.commit_changes(changes, Default::default())
.unwrap();

assert_eq!(skipped_transactions.len(), 0);
Expand Down Expand Up @@ -786,7 +786,7 @@ mod tests {
.into();
producer
.storage_view_provider
.commit_changes(changes)
.commit_changes(changes, Default::default())
.unwrap();
let receipts = tx_status[0].result.receipts();

Expand Down
Loading
Loading