diff --git a/CHANGELOG.md b/CHANGELOG.md index 9078cf492a7..43b154421a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - [#1856](https://github.com/FuelLabs/fuel-core/pull/1856): Replaced instances of `Union` with `Enum` for GraphQL definitions of `ConsensusParametersVersion` and related types. This is needed because `Union` does not support multiple `Version`s inside discriminants or empty variants. +### Added + +- [#1860](https://github.com/FuelLabs/fuel-core/pull/1860): Regenesis now preserves `FuelBlockIdsToHeights` off-chain table. + ### Changed - [#1832](https://github.com/FuelLabs/fuel-core/pull/1832): Snapshot generation can be cancelled. Progress is also reported. diff --git a/benches/benches/block_target_gas.rs b/benches/benches/block_target_gas.rs index aac9ef2a128..341848f4c01 100644 --- a/benches/benches/block_target_gas.rs +++ b/benches/benches/block_target_gas.rs @@ -20,6 +20,7 @@ use fuel_core::{ database::{ balances::BalancesInitializer, state::StateInitializer, + Database, }, service::{ config::Trigger, diff --git a/benches/benches/vm.rs b/benches/benches/vm.rs index 16e7cf2461e..799d222a0eb 100644 --- a/benches/benches/vm.rs +++ b/benches/benches/vm.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use crate::vm_initialization::vm_initialization; use contract::*; +use fuel_core::database::GenesisDatabase; use fuel_core_benches::*; use fuel_core_storage::transactional::IntoTransaction; use fuel_core_types::fuel_asm::Instruction; @@ -47,7 +48,7 @@ where let relayer_database_tx = block_database_tx.into_transaction(); let thread_database_tx = relayer_database_tx.into_transaction(); let tx_database_tx = thread_database_tx.into_transaction(); - let database = Database::new(Arc::new(tx_database_tx)); + let database = GenesisDatabase::new(Arc::new(tx_database_tx)); *vm.as_mut().database_mut() = database.into_transaction(); let mut total = core::time::Duration::ZERO; diff --git a/benches/benches/vm_set/blockchain.rs b/benches/benches/vm_set/blockchain.rs index ada8d1116f2..59581834499 100644 --- a/benches/benches/vm_set/blockchain.rs +++ b/benches/benches/vm_set/blockchain.rs @@ -16,6 +16,7 @@ use fuel_core::{ balances::BalancesInitializer, database_description::on_chain::OnChain, state::StateInitializer, + GenesisDatabase, }, service::Config, state::rocks_db::{ @@ -63,7 +64,7 @@ use rand::{ }; pub struct BenchDb { - db: Database, + db: GenesisDatabase, /// Used for RAII cleanup. Contents of this directory are deleted on drop. _tmp_dir: ShallowTempDir, } @@ -79,7 +80,7 @@ impl BenchDb { let state_size = crate::utils::get_state_size(); - let mut database = Database::new(db); + let mut database = GenesisDatabase::new(db); database.init_contract_state( contract_id, (0..state_size).map(|_| { @@ -125,7 +126,7 @@ impl BenchDb { } /// Creates a `VmDatabase` instance. - fn to_vm_database(&self) -> VmStorage> { + fn to_vm_database(&self) -> VmStorage> { let consensus = ConsensusHeader { prev_root: Default::default(), height: 1.into(), diff --git a/benches/src/lib.rs b/benches/src/lib.rs index 2807ddf0fc5..af672011f08 100644 --- a/benches/src/lib.rs +++ b/benches/src/lib.rs @@ -1,7 +1,6 @@ pub mod default_gas_costs; pub mod import; -pub use fuel_core::database::Database; pub use fuel_core_storage::vm_storage::VmStorage; use fuel_core_types::{ fuel_asm::{ @@ -31,13 +30,14 @@ use fuel_core_types::{ }, }; +use fuel_core::database::GenesisDatabase; use fuel_core_storage::transactional::StorageTransaction; pub use rand::Rng; use std::iter; const LARGE_GAS_LIMIT: u64 = u64::MAX - 1001; -fn new_db() -> VmStorage> { +fn new_db() -> VmStorage> { // when rocksdb is enabled, this creates a new db instance with a temporary path VmStorage::default() } @@ -90,7 +90,7 @@ pub struct VmBench { pub inputs: Vec, pub outputs: Vec, pub witnesses: Vec, - pub db: Option>>, + pub db: Option>>, pub instruction: Instruction, pub prepare_call: Option, pub dummy_contract: Option, @@ -101,7 +101,7 @@ pub struct VmBench { #[derive(Debug, Clone)] pub struct VmBenchPrepared { - pub vm: Interpreter>, Script>, + pub vm: Interpreter>, Script>, pub instruction: Instruction, pub diff: diff::Diff, } @@ -149,7 +149,7 @@ impl VmBench { pub fn contract_using_db( rng: &mut R, - mut db: VmStorage>, + mut db: VmStorage>, instruction: Instruction, ) -> anyhow::Result where @@ -208,7 +208,7 @@ impl VmBench { .with_prepare_call(prepare_call)) } - pub fn with_db(mut self, db: VmStorage>) -> Self { + pub fn with_db(mut self, db: VmStorage>) -> Self { self.db.replace(db); self } diff --git a/crates/fuel-core/src/combined_database.rs b/crates/fuel-core/src/combined_database.rs index 9679045f0da..6023c580a47 100644 --- a/crates/fuel-core/src/combined_database.rs +++ b/crates/fuel-core/src/combined_database.rs @@ -6,6 +6,7 @@ use crate::{ relayer::Relayer, }, Database, + GenesisDatabase, Result as DatabaseResult, }, service::DbType, @@ -178,4 +179,36 @@ impl CombinedDatabase { Ok(state_config) } + + /// Converts the combined database into a genesis combined database. + pub fn into_genesis(self) -> CombinedGenesisDatabase { + CombinedGenesisDatabase { + on_chain: self.on_chain.into_genesis(), + off_chain: self.off_chain.into_genesis(), + relayer: self.relayer.into_genesis(), + } + } +} + +/// A genesis database that combines the on-chain, off-chain and relayer +/// genesis databases into one entity. +#[derive(Default, Clone)] +pub struct CombinedGenesisDatabase { + on_chain: GenesisDatabase, + off_chain: GenesisDatabase, + relayer: GenesisDatabase, +} + +impl CombinedGenesisDatabase { + pub fn on_chain(&self) -> &GenesisDatabase { + &self.on_chain + } + + pub fn off_chain(&self) -> &GenesisDatabase { + &self.off_chain + } + + pub fn relayer(&self) -> &GenesisDatabase { + &self.relayer + } } diff --git a/crates/fuel-core/src/database.rs b/crates/fuel-core/src/database.rs index 707fd00222b..de14aac4f2e 100644 --- a/crates/fuel-core/src/database.rs +++ b/crates/fuel-core/src/database.rs @@ -85,13 +85,38 @@ pub mod state; pub mod storage; pub mod transactions; -#[derive(Clone, Debug)] -pub struct Database +#[derive(Default, Debug, Copy, Clone)] +pub struct GenesisStage; + +#[derive(Debug, Clone)] +pub struct RegularStage where Description: DatabaseDescription, { + /// Cached value from Metadata table, used to speed up lookups. height: SharedMutex>, +} + +impl Default for RegularStage +where + Description: DatabaseDescription, +{ + fn default() -> Self { + Self { + height: SharedMutex::new(None), + } + } +} + +pub type GenesisDatabase = Database; + +#[derive(Clone, Debug)] +pub struct Database> +where + Description: DatabaseDescription, +{ data: DataSource, + stage: Stage, } impl Database { @@ -122,21 +147,36 @@ where } } +impl GenesisDatabase +where + Description: DatabaseDescription, +{ + pub fn new(data_source: DataSource) -> Self { + Self { + stage: GenesisStage, + data: data_source, + } + } +} + impl Database where Description: DatabaseDescription, - Self: StorageInspect, Error = StorageError>, + Database: + StorageInspect, Error = StorageError>, { pub fn new(data_source: DataSource) -> Self { let mut database = Self { - height: SharedMutex::new(None), + stage: RegularStage { + height: SharedMutex::new(None), + }, data: data_source, }; let height = database .latest_height() .expect("Failed to get latest height during creation of the database"); - database.height = SharedMutex::new(height); + database.stage.height = SharedMutex::new(height); database } @@ -146,19 +186,31 @@ where use anyhow::Context; let db = RocksDb::::default_open(path, capacity.into()).map_err(Into::::into).with_context(|| format!("Failed to open rocksdb, you may need to wipe a pre-existing incompatible db e.g. `rm -rf {path:?}`"))?; - Ok(Database::new(Arc::new(db))) + Ok(Self::new(Arc::new(db))) + } + + /// Converts to an unchecked database. + /// Panics if the height is already set. + pub fn into_genesis(self) -> GenesisDatabase { + assert!( + !self.stage.height.lock().is_some(), + "Height is already set for `{}`", + Description::name() + ); + GenesisDatabase::new(self.data) } } -impl Database +impl Database where Description: DatabaseDescription, + Stage: Default, { pub fn in_memory() -> Self { let data = Arc::>::new(MemoryStore::default()); Self { - height: SharedMutex::new(None), data, + stage: Stage::default(), } } @@ -167,13 +219,13 @@ where let data = Arc::>::new(RocksDb::default_open_temp(None).unwrap()); Self { - height: SharedMutex::new(None), data, + stage: Stage::default(), } } } -impl KeyValueInspect for Database +impl KeyValueInspect for Database where Description: DatabaseDescription, { @@ -205,7 +257,7 @@ where } } -impl IterableStore for Database +impl IterableStore for Database where Description: DatabaseDescription, { @@ -225,9 +277,10 @@ where /// Construct an ephemeral database /// uses rocksdb when rocksdb features are enabled /// uses in-memory when rocksdb features are disabled -impl Default for Database +impl Default for Database where Description: DatabaseDescription, + Stage: Default, { fn default() -> Self { #[cfg(not(feature = "rocksdb"))] @@ -247,7 +300,7 @@ impl AtomicView for Database { type Height = BlockHeight; fn latest_height(&self) -> Option { - *self.height.lock() + *self.stage.height.lock() } fn view_at(&self, _: &BlockHeight) -> StorageResult { @@ -267,7 +320,7 @@ impl AtomicView for Database { type Height = BlockHeight; fn latest_height(&self) -> Option { - *self.height.lock() + *self.stage.height.lock() } fn view_at(&self, _: &BlockHeight) -> StorageResult { @@ -286,7 +339,7 @@ impl AtomicView for Database { type Height = DaBlockHeight; fn latest_height(&self) -> Option { - *self.height.lock() + *self.stage.height.lock() } fn view_at(&self, _: &Self::Height) -> StorageResult { @@ -338,6 +391,24 @@ impl Modifiable for Database { } } +impl Modifiable for GenesisDatabase { + fn commit_changes(&mut self, changes: Changes) -> StorageResult<()> { + self.data.as_ref().commit_changes(None, changes) + } +} + +impl Modifiable for GenesisDatabase { + fn commit_changes(&mut self, changes: Changes) -> StorageResult<()> { + self.data.as_ref().commit_changes(None, changes) + } +} + +impl Modifiable for GenesisDatabase { + fn commit_changes(&mut self, changes: Changes) -> StorageResult<()> { + self.data.as_ref().commit_changes(None, changes) + } +} + trait DatabaseHeight: Sized { fn as_u64(&self) -> u64; @@ -393,7 +464,7 @@ where } let new_height = new_heights.into_iter().last(); - let prev_height = *database.height.lock(); + let prev_height = *database.stage.height.lock(); match (prev_height, new_height) { (None, None) => { @@ -456,14 +527,14 @@ where changes }; - let mut guard = database.height.lock(); - database - .data - .as_ref() - .commit_changes(new_height, updated_changes)?; + // Atomically commit the changes to the database, and to the mutex-protected field. + let mut guard = database.stage.height.lock(); + database.data.commit_changes(new_height, updated_changes)?; // Update the block height - *guard = new_height; + if let Some(new_height) = new_height { + *guard = Some(new_height); + } Ok(()) } diff --git a/crates/fuel-core/src/database/genesis_progress.rs b/crates/fuel-core/src/database/genesis_progress.rs index 81823d571ba..9e847f38dba 100644 --- a/crates/fuel-core/src/database/genesis_progress.rs +++ b/crates/fuel-core/src/database/genesis_progress.rs @@ -6,7 +6,7 @@ use super::{ on_chain::OnChain, DatabaseDescription, }, - Database, + GenesisDatabase, }; use fuel_core_chain_config::GenesisCommitment; use fuel_core_executor::refs::ContractRef; @@ -102,7 +102,7 @@ where } } -impl Database { +impl GenesisDatabase { pub fn genesis_coins_root(&self) -> Result { let coins = self.iter_all::(None); diff --git a/crates/fuel-core/src/database/metadata.rs b/crates/fuel-core/src/database/metadata.rs index d9f8672b073..83f78d0992f 100644 --- a/crates/fuel-core/src/database/metadata.rs +++ b/crates/fuel-core/src/database/metadata.rs @@ -42,7 +42,7 @@ where } } -impl Database +impl Database where Description: DatabaseDescription, Self: StorageInspect, Error = StorageError>, diff --git a/crates/fuel-core/src/database/storage.rs b/crates/fuel-core/src/database/storage.rs index acabb92b5fc..cf6b55823e9 100644 --- a/crates/fuel-core/src/database/storage.rs +++ b/crates/fuel-core/src/database/storage.rs @@ -30,7 +30,7 @@ use fuel_core_storage::{ StorageWrite, }; -impl StorageInspect for Database +impl StorageInspect for Database where Description: DatabaseDescription, M: Mappable, @@ -57,7 +57,7 @@ where } #[cfg(feature = "test-helpers")] -impl StorageMutate for Database +impl StorageMutate for Database where Description: DatabaseDescription, M: Mappable, @@ -92,7 +92,7 @@ where } } -impl StorageSize for Database +impl StorageSize for Database where Description: DatabaseDescription, M: Mappable, @@ -103,7 +103,8 @@ where } } -impl MerkleRootStorage for Database +impl MerkleRootStorage + for Database where Description: DatabaseDescription, M: Mappable, @@ -114,7 +115,7 @@ where } } -impl StorageRead for Database +impl StorageRead for Database where Description: DatabaseDescription, M: Mappable, @@ -130,7 +131,7 @@ where } #[cfg(feature = "test-helpers")] -impl StorageWrite for Database +impl StorageWrite for Database where Description: DatabaseDescription, M: Mappable, @@ -177,7 +178,7 @@ where } #[cfg(feature = "test-helpers")] -impl StorageBatchMutate for Database +impl StorageBatchMutate for Database where Description: DatabaseDescription, M: Mappable, diff --git a/crates/fuel-core/src/service/genesis.rs b/crates/fuel-core/src/service/genesis.rs index e4ec83e8422..c0f94383706 100644 --- a/crates/fuel-core/src/service/genesis.rs +++ b/crates/fuel-core/src/service/genesis.rs @@ -71,6 +71,7 @@ pub async fn execute_genesis_block( ) -> anyhow::Result> { let genesis_block = create_genesis_block(config); tracing::info!("Genesis block created: {:?}", genesis_block.header()); + let db = db.clone().into_genesis(); SnapshotImporter::import( db.clone(), diff --git a/crates/fuel-core/src/service/genesis/exporter.rs b/crates/fuel-core/src/service/genesis/exporter.rs index c382ace5e18..940ad068db7 100644 --- a/crates/fuel-core/src/service/genesis/exporter.rs +++ b/crates/fuel-core/src/service/genesis/exporter.rs @@ -172,7 +172,7 @@ where TableEntry: serde::Serialize, StateConfigBuilder: AddTable, DbDesc: DatabaseDescription, - DbDesc::Height: Send, + DbDesc::Height: Send + Sync, { let mut writer = self.create_writer()?; let group_size = self.group_size; diff --git a/crates/fuel-core/src/service/genesis/importer.rs b/crates/fuel-core/src/service/genesis/importer.rs index ecf479bcc17..178f3b360f8 100644 --- a/crates/fuel-core/src/service/genesis/importer.rs +++ b/crates/fuel-core/src/service/genesis/importer.rs @@ -3,13 +3,14 @@ use super::{ task_manager::TaskManager, }; use crate::{ - combined_database::CombinedDatabase, + combined_database::CombinedGenesisDatabase, database::database_description::{ off_chain::OffChain, on_chain::OnChain, }, fuel_core_graphql_api::storage::messages::SpentMessages, graphql_api::storage::{ + blocks::FuelBlockIdsToHeights, coins::OwnedCoins, contracts::ContractsInfo, messages::OwnedMessageIds, @@ -67,7 +68,7 @@ mod on_chain; const GROUPS_NUMBER_FOR_PARALLELIZATION: usize = 10; pub struct SnapshotImporter { - db: CombinedDatabase, + db: CombinedGenesisDatabase, task_manager: TaskManager<()>, genesis_block: Block, snapshot_reader: SnapshotReader, @@ -76,7 +77,7 @@ pub struct SnapshotImporter { impl SnapshotImporter { fn new( - db: CombinedDatabase, + db: CombinedGenesisDatabase, genesis_block: Block, snapshot_reader: SnapshotReader, watcher: StateWatcher, @@ -93,7 +94,7 @@ impl SnapshotImporter { } pub async fn import( - db: CombinedDatabase, + db: CombinedGenesisDatabase, genesis_block: Block, snapshot_reader: SnapshotReader, watcher: StateWatcher, @@ -126,6 +127,8 @@ impl SnapshotImporter { self.spawn_worker_off_chain::()?; self.spawn_worker_off_chain::()?; self.spawn_worker_off_chain::()?; + self.spawn_worker_off_chain::()?; + self.spawn_worker_off_chain::()?; self.task_manager.wait().await?; diff --git a/crates/fuel-core/src/service/genesis/importer/import_task.rs b/crates/fuel-core/src/service/genesis/importer/import_task.rs index 94b89c8f718..4325d6165e0 100644 --- a/crates/fuel-core/src/service/genesis/importer/import_task.rs +++ b/crates/fuel-core/src/service/genesis/importer/import_task.rs @@ -19,7 +19,7 @@ use crate::{ GenesisMetadata, GenesisProgressMutate, }, - Database, + GenesisDatabase, }, service::genesis::{ progress::ProgressReporter, @@ -36,7 +36,7 @@ where handler: Handler, skip: usize, groups: Groups, - db: Database, + db: GenesisDatabase, reporter: ProgressReporter, } @@ -48,7 +48,7 @@ pub trait ImportTable { fn process( &mut self, group: Vec>, - tx: &mut StorageTransaction<&mut Database>, + tx: &mut StorageTransaction<&mut GenesisDatabase>, ) -> anyhow::Result<()>; } @@ -56,12 +56,12 @@ impl ImportTask where DbDesc: DatabaseDescription, Logic: ImportTable, - Database: StorageInspect>, + GenesisDatabase: StorageInspect>, { pub fn new( handler: Logic, groups: GroupGenerator, - db: Database, + db: GenesisDatabase, reporter: ProgressReporter, ) -> Self { let progress_name = @@ -95,9 +95,9 @@ where Value = usize, OwnedValue = usize, >, - Database: + GenesisDatabase: StorageInspect> + WriteTransaction + Modifiable, - for<'a> StorageTransaction<&'a mut Database>: + for<'a> StorageTransaction<&'a mut GenesisDatabase>: StorageMutate, Error = fuel_core_storage::Error>, { pub fn run(mut self, cancel_token: CancellationToken) -> anyhow::Result<()> { @@ -137,7 +137,10 @@ where #[cfg(test)] mod tests { use crate::{ - database::genesis_progress::GenesisProgressInspect, + database::{ + genesis_progress::GenesisProgressInspect, + GenesisDatabase, + }, service::genesis::{ importer::{ import_task::ImportTask, @@ -196,11 +199,9 @@ mod tests { }; use crate::{ - combined_database::CombinedDatabase, database::{ database_description::on_chain::OnChain, genesis_progress::GenesisProgressMutate, - Database, }, state::{ in_memory::memory_store::MemoryStore, @@ -227,7 +228,7 @@ mod tests { where L: FnMut( TableEntry, - &mut StorageTransaction<&mut Database>, + &mut StorageTransaction<&mut GenesisDatabase>, ) -> anyhow::Result<()>, { type TableInSnapshot = Coins; @@ -236,7 +237,7 @@ mod tests { fn process( &mut self, group: Vec>, - tx: &mut StorageTransaction<&mut Database>, + tx: &mut StorageTransaction<&mut GenesisDatabase>, ) -> anyhow::Result<()> { group .into_iter() @@ -287,7 +288,7 @@ mod tests { Ok(()) }), data.as_ok_groups(), - Database::default(), + GenesisDatabase::default(), ProgressReporter::default(), ); @@ -304,9 +305,9 @@ mod tests { let data = TestData::new(2); let mut called_with = vec![]; - let mut db = CombinedDatabase::default(); + let mut db = GenesisDatabase::::default(); GenesisProgressMutate::::update_genesis_progress( - db.on_chain_mut(), + &mut db, &migration_name::(), 0, ) @@ -317,7 +318,7 @@ mod tests { Ok(()) }), data.as_ok_groups(), - db.on_chain().clone(), + db, ProgressReporter::default(), ); @@ -332,7 +333,7 @@ mod tests { fn changes_to_db_by_handler_are_behind_a_transaction() { // given let groups = TestData::new(1); - let outer_db = Database::default(); + let outer_db = GenesisDatabase::default(); let utxo_id = UtxoId::new(Default::default(), 0); let runner = ImportTask::new( @@ -369,7 +370,10 @@ mod tests { .unwrap()); } - fn insert_a_coin(tx: &mut StorageTransaction<&mut Database>, utxo_id: &UtxoId) { + fn insert_a_coin( + tx: &mut StorageTransaction<&mut GenesisDatabase>, + utxo_id: &UtxoId, + ) { let coin: CompressedCoin = CompressedCoinV1::default().into(); tx.storage_as_mut::().insert(utxo_id, &coin).unwrap(); @@ -379,7 +383,7 @@ mod tests { fn tx_reverted_if_handler_fails() { // given let groups = TestData::new(1); - let db = Database::default(); + let db = GenesisDatabase::default(); let utxo_id = UtxoId::new(Default::default(), 0); let runner = ImportTask::new( @@ -406,7 +410,7 @@ mod tests { let runner = ImportTask::new( TestHandler::new(|_, _| bail!("Some error")), groups.as_ok_groups(), - Database::default(), + Default::default(), ProgressReporter::default(), ); @@ -424,7 +428,7 @@ mod tests { let runner = ImportTask::new( TestHandler::new(|_, _| Ok(())), groups, - Database::default(), + Default::default(), ProgressReporter::default(), ); @@ -439,7 +443,7 @@ mod tests { fn succesfully_processed_batch_updates_the_genesis_progress() { // given let data = TestData::new(2); - let db = Database::default(); + let db = GenesisDatabase::default(); let runner = ImportTask::new( TestHandler::new(|_, _| Ok(())), data.as_ok_groups(), @@ -475,7 +479,7 @@ mod tests { Ok(()) }), rx, - Database::default(), + Default::default(), ProgressReporter::default(), ) }; @@ -566,7 +570,7 @@ mod tests { let runner = ImportTask::new( TestHandler::new(|_, _| Ok(())), groups.as_ok_groups(), - Database::new(Arc::new(BrokenTransactions::new())), + GenesisDatabase::new(Arc::new(BrokenTransactions::new())), ProgressReporter::default(), ); diff --git a/crates/fuel-core/src/service/genesis/importer/off_chain.rs b/crates/fuel-core/src/service/genesis/importer/off_chain.rs index f73d7b95b23..58cce0dd211 100644 --- a/crates/fuel-core/src/service/genesis/importer/off_chain.rs +++ b/crates/fuel-core/src/service/genesis/importer/off_chain.rs @@ -3,7 +3,7 @@ use std::borrow::Cow; use crate::{ database::{ database_description::off_chain::OffChain, - Database, + GenesisDatabase, }, fuel_core_graphql_api::storage::messages::SpentMessages, graphql_api::{ @@ -52,7 +52,7 @@ impl ImportTable for Handler { fn process( &mut self, group: Vec>, - tx: &mut StorageTransaction<&mut Database>, + tx: &mut StorageTransaction<&mut GenesisDatabase>, ) -> anyhow::Result<()> { for tx_status in group { tx.storage::() @@ -70,7 +70,7 @@ impl ImportTable for Handler { fn process( &mut self, group: Vec>, - tx: &mut StorageTransaction<&mut Database>, + tx: &mut StorageTransaction<&mut GenesisDatabase>, ) -> anyhow::Result<()> { for entry in group { tx.storage::() @@ -88,7 +88,7 @@ impl ImportTable for Handler { fn process( &mut self, group: Vec>, - tx: &mut StorageTransaction<&mut Database>, + tx: &mut StorageTransaction<&mut GenesisDatabase>, ) -> anyhow::Result<()> { for entry in group { tx.storage::() @@ -106,7 +106,7 @@ impl ImportTable for Handler { fn process( &mut self, group: Vec>, - tx: &mut StorageTransaction<&mut Database>, + tx: &mut StorageTransaction<&mut GenesisDatabase>, ) -> anyhow::Result<()> { let events = group .into_iter() @@ -124,7 +124,7 @@ impl ImportTable for Handler { fn process( &mut self, group: Vec>, - tx: &mut StorageTransaction<&mut Database>, + tx: &mut StorageTransaction<&mut GenesisDatabase>, ) -> anyhow::Result<()> { let events = group.into_iter().map(|TableEntry { value, key }| { Cow::Owned(Event::CoinCreated(value.uncompress(key))) @@ -142,7 +142,7 @@ impl ImportTable for Handler { fn process( &mut self, group: Vec>, - tx: &mut StorageTransaction<&mut Database>, + tx: &mut StorageTransaction<&mut GenesisDatabase>, ) -> anyhow::Result<()> { let transactions = group.iter().map(|TableEntry { value, .. }| value); worker_service::process_transactions(transactions, tx)?; @@ -158,7 +158,7 @@ impl ImportTable for Handler { fn process( &mut self, group: Vec>, - tx: &mut StorageTransaction<&mut Database>, + tx: &mut StorageTransaction<&mut GenesisDatabase>, ) -> anyhow::Result<()> { let transactions = group.iter().map(|TableEntry { value, .. }| value); worker_service::process_transactions(transactions, tx)?; @@ -174,7 +174,7 @@ impl ImportTable for Handler { fn process( &mut self, group: Vec>, - tx: &mut StorageTransaction<&mut Database>, + tx: &mut StorageTransaction<&mut GenesisDatabase>, ) -> anyhow::Result<()> { let blocks = group .iter() @@ -192,7 +192,7 @@ impl ImportTable for Handler { fn process( &mut self, group: Vec>, - tx: &mut StorageTransaction<&mut Database>, + tx: &mut StorageTransaction<&mut GenesisDatabase>, ) -> anyhow::Result<()> { let blocks = group .iter() @@ -210,7 +210,7 @@ impl ImportTable for Handler { fn process( &mut self, group: Vec>, - tx: &mut StorageTransaction<&mut Database>, + tx: &mut StorageTransaction<&mut GenesisDatabase>, ) -> anyhow::Result<()> { let blocks = group .iter() @@ -228,7 +228,7 @@ impl ImportTable for Handler { fn process( &mut self, group: Vec>, - tx: &mut StorageTransaction<&mut Database>, + tx: &mut StorageTransaction<&mut GenesisDatabase>, ) -> anyhow::Result<()> { let blocks = group .iter() @@ -246,7 +246,7 @@ impl ImportTable for Handler { fn process( &mut self, group: Vec>, - tx: &mut StorageTransaction<&mut Database>, + tx: &mut StorageTransaction<&mut GenesisDatabase>, ) -> anyhow::Result<()> { let transactions = group .iter() @@ -264,7 +264,7 @@ impl ImportTable for Handler { fn process( &mut self, group: Vec>, - tx: &mut StorageTransaction<&mut Database>, + tx: &mut StorageTransaction<&mut GenesisDatabase>, ) -> anyhow::Result<()> { let transactions = group .iter() @@ -282,7 +282,7 @@ impl ImportTable for Handler { fn process( &mut self, group: Vec>, - tx: &mut StorageTransaction<&mut Database>, + tx: &mut StorageTransaction<&mut GenesisDatabase>, ) -> anyhow::Result<()> { for entry in group { tx.storage_as_mut::() @@ -291,3 +291,39 @@ impl ImportTable for Handler { Ok(()) } } + +impl ImportTable for Handler { + type TableInSnapshot = FuelBlocks; + type TableBeingWritten = FuelBlockIdsToHeights; + type DbDesc = OffChain; + + fn process( + &mut self, + group: Vec>, + tx: &mut StorageTransaction<&mut GenesisDatabase>, + ) -> anyhow::Result<()> { + for entry in group { + tx.storage_as_mut::() + .insert(&entry.value.id(), &entry.key)?; + } + Ok(()) + } +} + +impl ImportTable for Handler { + type TableInSnapshot = OldFuelBlocks; + type TableBeingWritten = FuelBlockIdsToHeights; + type DbDesc = OffChain; + + fn process( + &mut self, + group: Vec>, + tx: &mut StorageTransaction<&mut GenesisDatabase>, + ) -> anyhow::Result<()> { + for entry in group { + tx.storage_as_mut::() + .insert(&entry.value.id(), &entry.key)?; + } + Ok(()) + } +} diff --git a/crates/fuel-core/src/service/genesis/importer/on_chain.rs b/crates/fuel-core/src/service/genesis/importer/on_chain.rs index eac8ee71699..e75fd097995 100644 --- a/crates/fuel-core/src/service/genesis/importer/on_chain.rs +++ b/crates/fuel-core/src/service/genesis/importer/on_chain.rs @@ -6,7 +6,7 @@ use crate::database::{ balances::BalancesInitializer, database_description::on_chain::OnChain, state::StateInitializer, - Database, + GenesisDatabase, }; use anyhow::anyhow; use fuel_core_chain_config::TableEntry; @@ -41,7 +41,7 @@ impl ImportTable for Handler { fn process( &mut self, group: Vec>, - tx: &mut StorageTransaction<&mut Database>, + tx: &mut StorageTransaction<&mut GenesisDatabase>, ) -> anyhow::Result<()> { group.into_iter().try_for_each(|coin| { init_coin(tx, &coin, self.block_height)?; @@ -58,7 +58,7 @@ impl ImportTable for Handler { fn process( &mut self, group: Vec>, - tx: &mut StorageTransaction<&mut Database>, + tx: &mut StorageTransaction<&mut GenesisDatabase>, ) -> anyhow::Result<()> { group .into_iter() @@ -74,7 +74,7 @@ impl ImportTable for Handler { fn process( &mut self, group: Vec>, - tx: &mut StorageTransaction<&mut Database>, + tx: &mut StorageTransaction<&mut GenesisDatabase>, ) -> anyhow::Result<()> { group.into_iter().try_for_each(|transaction| { tx.storage_as_mut::() @@ -93,7 +93,7 @@ impl ImportTable for Handler { fn process( &mut self, group: Vec>, - tx: &mut StorageTransaction<&mut Database>, + tx: &mut StorageTransaction<&mut GenesisDatabase>, ) -> anyhow::Result<()> { group.into_iter().try_for_each(|contract| { init_contract_raw_code(tx, &contract)?; @@ -110,7 +110,7 @@ impl ImportTable for Handler { fn process( &mut self, group: Vec>, - tx: &mut StorageTransaction<&mut Database>, + tx: &mut StorageTransaction<&mut GenesisDatabase>, ) -> anyhow::Result<()> { group.into_iter().try_for_each(|contract| { init_contract_latest_utxo(tx, &contract, self.block_height)?; @@ -127,7 +127,7 @@ impl ImportTable for Handler { fn process( &mut self, group: Vec>, - tx: &mut StorageTransaction<&mut Database>, + tx: &mut StorageTransaction<&mut GenesisDatabase>, ) -> anyhow::Result<()> { tx.update_contract_states(group)?; Ok(()) @@ -142,7 +142,7 @@ impl ImportTable for Handler { fn process( &mut self, group: Vec>, - tx: &mut StorageTransaction<&mut Database>, + tx: &mut StorageTransaction<&mut GenesisDatabase>, ) -> anyhow::Result<()> { tx.update_contract_balances(group)?; Ok(()) @@ -150,7 +150,7 @@ impl ImportTable for Handler { } fn init_coin( - transaction: &mut StorageTransaction<&mut Database>, + transaction: &mut StorageTransaction<&mut GenesisDatabase>, coin: &TableEntry, height: BlockHeight, ) -> anyhow::Result<()> { @@ -185,7 +185,7 @@ fn init_coin( } fn init_contract_latest_utxo( - transaction: &mut StorageTransaction<&mut Database>, + transaction: &mut StorageTransaction<&mut GenesisDatabase>, entry: &TableEntry, height: BlockHeight, ) -> anyhow::Result<()> { @@ -209,7 +209,7 @@ fn init_contract_latest_utxo( } fn init_contract_raw_code( - transaction: &mut StorageTransaction<&mut Database>, + transaction: &mut StorageTransaction<&mut GenesisDatabase>, entry: &TableEntry, ) -> anyhow::Result<()> { let contract = entry.value.as_ref(); @@ -228,7 +228,7 @@ fn init_contract_raw_code( } fn init_da_message( - transaction: &mut StorageTransaction<&mut Database>, + transaction: &mut StorageTransaction<&mut GenesisDatabase>, msg: TableEntry, da_height: DaBlockHeight, ) -> anyhow::Result<()> { diff --git a/crates/fuel-core/src/state/in_memory/memory_store.rs b/crates/fuel-core/src/state/in_memory/memory_store.rs index 28f1fca30ce..bd8b0564f38 100644 --- a/crates/fuel-core/src/state/in_memory/memory_store.rs +++ b/crates/fuel-core/src/state/in_memory/memory_store.rs @@ -162,7 +162,7 @@ mod tests { let mut transaction = self.read_transaction(); let len = transaction.write(key, column, buf)?; let changes = transaction.into_changes(); - self.commit_changes(Default::default(), changes)?; + self.commit_changes(None, changes)?; Ok(len) } @@ -170,7 +170,7 @@ mod tests { let mut transaction = self.read_transaction(); transaction.delete(key, column)?; let changes = transaction.into_changes(); - self.commit_changes(Default::default(), changes)?; + self.commit_changes(None, changes)?; Ok(()) } } diff --git a/crates/fuel-core/src/state/rocks_db.rs b/crates/fuel-core/src/state/rocks_db.rs index a51732c7171..d44b5e484f1 100644 --- a/crates/fuel-core/src/state/rocks_db.rs +++ b/crates/fuel-core/src/state/rocks_db.rs @@ -596,7 +596,7 @@ mod tests { let mut transaction = self.read_transaction(); let len = transaction.write(key, column, buf)?; let changes = transaction.into_changes(); - self.commit_changes(Default::default(), changes)?; + self.commit_changes(None, changes)?; Ok(len) } @@ -605,7 +605,7 @@ mod tests { let mut transaction = self.read_transaction(); transaction.delete(key, column)?; let changes = transaction.into_changes(); - self.commit_changes(Default::default(), changes)?; + self.commit_changes(None, changes)?; Ok(()) } } @@ -680,8 +680,7 @@ mod tests { )]), )]; - db.commit_changes(Default::default(), HashMap::from_iter(ops)) - .unwrap(); + db.commit_changes(None, HashMap::from_iter(ops)).unwrap(); assert_eq!(db.get(&key, Column::Metadata).unwrap().unwrap(), value) } @@ -697,8 +696,7 @@ mod tests { Column::Metadata.id(), BTreeMap::from_iter(vec![(key.clone(), WriteOperation::Remove)]), )]; - db.commit_changes(Default::default(), HashMap::from_iter(ops)) - .unwrap(); + db.commit_changes(None, HashMap::from_iter(ops)).unwrap(); assert_eq!(db.get(&key, Column::Metadata).unwrap(), None); } diff --git a/crates/services/src/lib.rs b/crates/services/src/lib.rs index 12333c5b50d..be22f30180e 100644 --- a/crates/services/src/lib.rs +++ b/crates/services/src/lib.rs @@ -7,6 +7,7 @@ mod service; mod state; +mod sync; /// Re-exports for streaming utilities pub mod stream { @@ -45,10 +46,12 @@ pub use service::{ RunnableTask, Service, ServiceRunner, - Shared, - SharedMutex, }; pub use state::{ State, StateWatcher, }; +pub use sync::{ + Shared, + SharedMutex, +}; diff --git a/crates/services/src/service.rs b/crates/services/src/service.rs index d2ba048cad5..108c5cdbbe5 100644 --- a/crates/services/src/service.rs +++ b/crates/services/src/service.rs @@ -1,9 +1,11 @@ -use crate::state::{ - State, - StateWatcher, +use crate::{ + state::{ + State, + StateWatcher, + }, + Shared, }; use anyhow::anyhow; -use core::ops::Deref; use fuel_core_metrics::{ future_tracker::FutureTracker, services::{ @@ -16,27 +18,6 @@ use std::any::Any; use tokio::sync::watch; use tracing::Instrument; -/// Alias for `Arc` -pub type Shared = std::sync::Arc; - -/// A mutex that can safely be in async contexts and avoids deadlocks. -#[derive(Default, Debug)] -pub struct SharedMutex(Shared>); - -impl Clone for SharedMutex { - fn clone(&self) -> Self { - Self(self.0.clone()) - } -} - -impl Deref for SharedMutex { - type Target = Shared>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - /// Used if services have no asynchronously shared data #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct EmptyShared; @@ -440,25 +421,6 @@ where got_panic } -impl SharedMutex { - /// Creates a new `SharedMutex` with the given value. - pub fn new(t: T) -> Self { - Self(Shared::new(parking_lot::Mutex::new(t))) - } - - /// Apply a function to the inner value and return a value. - pub fn apply(&self, f: impl FnOnce(&mut T) -> R) -> R { - let mut t = self.0.lock(); - f(&mut t) - } -} - -impl From for SharedMutex { - fn from(t: T) -> Self { - Self::new(t) - } -} - fn panic_to_string(e: Box) -> String { match e.downcast::() { Ok(v) => *v, diff --git a/crates/services/src/sync.rs b/crates/services/src/sync.rs new file mode 100644 index 00000000000..7d869b67203 --- /dev/null +++ b/crates/services/src/sync.rs @@ -0,0 +1,43 @@ +//! Wrappers for synchronization containers. + +use core::ops::Deref; + +/// Alias for `Arc` +pub type Shared = std::sync::Arc; + +/// A mutex that can safely be in async contexts and avoids deadlocks. +#[derive(Default, Debug)] +pub struct SharedMutex(Shared>); + +impl Clone for SharedMutex { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl Deref for SharedMutex { + type Target = Shared>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl SharedMutex { + /// Creates a new `SharedMutex` with the given value. + pub fn new(t: T) -> Self { + Self(Shared::new(parking_lot::Mutex::new(t))) + } + + /// Apply a function to the inner value and return a value. + pub fn apply(&self, f: impl FnOnce(&mut T) -> R) -> R { + let mut t = self.0.lock(); + f(&mut t) + } +} + +impl From for SharedMutex { + fn from(t: T) -> Self { + Self::new(t) + } +}