From 65faaddbd1e227a4dffe96b097098afa6effd472 Mon Sep 17 00:00:00 2001 From: jillxuu Date: Fri, 26 Jan 2024 11:49:48 -0800 Subject: [PATCH 1/4] separate indexer async v2 db from aptosdb --- Cargo.lock | 27 ++----- Cargo.toml | 1 + api/Cargo.toml | 1 + .../src/fake_context.rs | 1 + api/src/accounts.rs | 10 ++- api/src/context.rs | 8 +- api/src/events.rs | 5 +- api/src/runtime.rs | 5 +- api/src/state.rs | 10 ++- api/src/tests/converter_test.rs | 2 +- api/src/transactions.rs | 24 ++++-- api/src/view_function.rs | 6 +- api/test-context/src/test_context.rs | 2 +- api/types/Cargo.toml | 2 + api/types/src/convert.rs | 80 ++++++++++++++----- .../src/aptos_test_harness.rs | 2 +- .../src/storage_interface.rs | 1 - aptos-node/Cargo.toml | 1 + aptos-node/src/services.rs | 16 +++- crates/aptos-genesis/src/lib.rs | 1 - crates/aptos-genesis/src/mainnet.rs | 1 - crates/indexer/src/indexer/fetcher.rs | 2 +- crates/indexer/src/runtime.rs | 8 +- .../indexer-grpc-fullnode/Cargo.toml | 1 + .../indexer-grpc-fullnode/src/runtime.rs | 10 ++- .../src/stream_coordinator.rs | 3 +- .../indexer-grpc-table-info/Cargo.toml | 31 +------ .../indexer-grpc-table-info/src/runtime.rs | 45 +++++++---- .../src/table_info_service.rs | 42 ++++------ .../executor-benchmark/src/db_generator.rs | 1 - execution/executor-benchmark/src/lib.rs | 1 - .../src/tests/driver_factory.rs | 1 - storage/aptosdb/src/db/aptosdb_test.rs | 1 - .../src/db/include/aptosdb_internal.rs | 21 ----- .../aptosdb/src/db/include/aptosdb_reader.rs | 47 +---------- .../src/db/include/aptosdb_testonly.rs | 7 +- .../aptosdb/src/db/include/aptosdb_writer.rs | 41 ---------- storage/aptosdb/src/db/mod.rs | 8 +- .../aptosdb/src/fast_sync_storage_wrapper.rs | 2 - storage/aptosdb/src/schema/db_metadata/mod.rs | 2 +- storage/backup/backup-cli/src/utils/mod.rs | 1 - storage/db-tool/src/bootstrap.rs | 1 - storage/db-tool/src/replay_verify.rs | 1 - storage/indexer/src/db_ops.rs | 45 +++++++++++ storage/indexer/src/db_v2.rs | 60 +++++++------- storage/indexer/src/lib.rs | 6 +- storage/indexer/src/metadata.rs | 4 +- storage/indexer/src/schema/mod.rs | 4 +- storage/indexer/src/table_info_reader.rs | 19 +++++ storage/schemadb/src/schema.rs | 2 +- storage/schemadb/tests/db.rs | 2 +- storage/schemadb/tests/iterator.rs | 4 +- storage/storage-interface/src/lib.rs | 32 -------- 53 files changed, 319 insertions(+), 342 deletions(-) create mode 100644 storage/indexer/src/db_ops.rs create mode 100644 storage/indexer/src/table_info_reader.rs diff --git a/Cargo.lock b/Cargo.lock index 3444021c62be3..74038a2031e81 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -389,6 +389,7 @@ dependencies = [ "aptos-cached-packages", "aptos-config", "aptos-crypto", + "aptos-db-indexer", "aptos-framework", "aptos-gas-meter", "aptos-gas-schedule", @@ -506,6 +507,7 @@ dependencies = [ "anyhow", "aptos-config", "aptos-crypto", + "aptos-db-indexer", "aptos-framework", "aptos-logger", "aptos-openapi", @@ -2214,6 +2216,7 @@ dependencies = [ "aptos-config", "aptos-crypto", "aptos-db", + "aptos-db-indexer", "aptos-executor", "aptos-executor-types", "aptos-framework", @@ -2330,50 +2333,31 @@ version = "1.0.0" dependencies = [ "anyhow", "aptos-api", - "aptos-api-test-context", "aptos-api-types", "aptos-bitvec", "aptos-config", - "aptos-crypto", "aptos-db", - "aptos-executor", - "aptos-executor-types", - "aptos-framework", - "aptos-genesis", - "aptos-global-constants", + "aptos-db-indexer", "aptos-indexer-grpc-fullnode", "aptos-indexer-grpc-utils", "aptos-logger", "aptos-mempool", - "aptos-mempool-notifications", "aptos-metrics-core", - "aptos-moving-average", - "aptos-proptest-helpers", "aptos-protos 1.1.2", "aptos-rocksdb-options", "aptos-runtimes", - "aptos-sdk", - "aptos-secure-storage", + "aptos-schemadb", "aptos-storage-interface", - "aptos-temppath", "aptos-types", - "aptos-vm", - "aptos-vm-validator", "base64 0.13.1", "bytes", "chrono", "fail 0.5.1", "futures", - "goldenfile", "hex", "hyper", - "move-binary-format", - "move-core-types", - "move-package", "move-resource-viewer", "once_cell", - "rand 0.7.3", - "regex", "serde", "serde_json", "tokio", @@ -3025,6 +3009,7 @@ dependencies = [ "aptos-data-client", "aptos-data-streaming-service", "aptos-db", + "aptos-db-indexer", "aptos-dkg-runtime", "aptos-event-notifications", "aptos-executor", diff --git a/Cargo.toml b/Cargo.toml index 72de4d6e24ba2..0b09c4fcaea0d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -661,6 +661,7 @@ strum = "0.24.1" strum_macros = "0.24.2" syn = { version = "1.0.92", features = ["derive", "extra-traits"] } sysinfo = "0.28.4" +tar = "0.4.40" tempfile = "3.3.0" termcolor = "1.1.2" test-case = "3.1.0" diff --git a/api/Cargo.toml b/api/Cargo.toml index 0071d869b0365..7733bb0620d0e 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -19,6 +19,7 @@ aptos-bcs-utils = { workspace = true } aptos-build-info = { workspace = true } aptos-config = { workspace = true } aptos-crypto = { workspace = true } +aptos-db-indexer = { workspace = true } aptos-framework = { workspace = true } aptos-gas-schedule = { workspace = true } aptos-global-constants = { workspace = true } diff --git a/api/openapi-spec-generator/src/fake_context.rs b/api/openapi-spec-generator/src/fake_context.rs index 5a87200f855ac..98dee1782b641 100644 --- a/api/openapi-spec-generator/src/fake_context.rs +++ b/api/openapi-spec-generator/src/fake_context.rs @@ -16,5 +16,6 @@ pub fn get_fake_context() -> Context { Arc::new(MockDbReaderWriter), mempool.ac_client, NodeConfig::default(), + None, /* table info reader */ ) } diff --git a/api/src/accounts.rs b/api/src/accounts.rs index 0ae87d4f634b9..4cfa4bc398de8 100644 --- a/api/src/accounts.rs +++ b/api/src/accounts.rs @@ -351,7 +351,10 @@ impl Account { .latest_state_view_poem(&self.latest_ledger_info)?; let converted_resources = state_view .as_move_resolver() - .as_converter(self.context.db.clone()) + .as_converter( + self.context.db.clone(), + self.context.table_info_reader.clone(), + ) .try_into_resources(resources.iter().map(|(k, v)| (k.clone(), v.as_slice()))) .context("Failed to build move resource response from data in DB") .map_err(|err| { @@ -545,7 +548,10 @@ impl Account { })?; resolver - .as_converter(self.context.db.clone()) + .as_converter( + self.context.db.clone(), + self.context.table_info_reader.clone(), + ) .move_struct_fields(resource_type, &bytes) .context("Failed to convert move structs from storage") .map_err(|err| { diff --git a/api/src/context.rs b/api/src/context.rs index 4213f39e877dd..8f4068ca640b4 100644 --- a/api/src/context.rs +++ b/api/src/context.rs @@ -17,6 +17,7 @@ use aptos_api_types::{ }; use aptos_config::config::{NodeConfig, RoleType}; use aptos_crypto::HashValue; +use aptos_db_indexer::table_info_reader::TableInfoReader; use aptos_gas_schedule::{AptosGasParameters, FromOnChainGasSchedule}; use aptos_logger::{error, warn}; use aptos_mempool::{MempoolClientRequest, MempoolClientSender, SubmissionStatus}; @@ -66,6 +67,7 @@ pub struct Context { gas_schedule_cache: Arc>, gas_estimation_cache: Arc>, gas_limit_cache: Arc>, + pub table_info_reader: Option>, } impl std::fmt::Debug for Context { @@ -80,6 +82,7 @@ impl Context { db: Arc, mp_sender: MempoolClientSender, node_config: NodeConfig, + table_info_reader: Option>, ) -> Self { Self { chain_id, @@ -101,6 +104,7 @@ impl Context { block_executor_onchain_config: OnChainExecutionConfig::default_if_missing() .block_executor_onchain_config(), })), + table_info_reader, } } @@ -636,7 +640,7 @@ impl Context { let state_view = self.latest_state_view_poem(ledger_info)?; let resolver = state_view.as_move_resolver(); - let converter = resolver.as_converter(self.db.clone()); + let converter = resolver.as_converter(self.db.clone(), self.table_info_reader.clone()); let txns: Vec = data .into_iter() .map(|t| { @@ -667,7 +671,7 @@ impl Context { let state_view = self.latest_state_view_poem(ledger_info)?; let resolver = state_view.as_move_resolver(); - let converter = resolver.as_converter(self.db.clone()); + let converter = resolver.as_converter(self.db.clone(), self.table_info_reader.clone()); let txns: Vec = data .into_iter() .map(|t| { diff --git a/api/src/events.rs b/api/src/events.rs index f5232bbca725c..53a2f32db04ae 100644 --- a/api/src/events.rs +++ b/api/src/events.rs @@ -186,7 +186,10 @@ impl EventsApi { .context .latest_state_view_poem(&latest_ledger_info)? .as_move_resolver() - .as_converter(self.context.db.clone()) + .as_converter( + self.context.db.clone(), + self.context.table_info_reader.clone(), + ) .try_into_versioned_events(&events) .context("Failed to convert events from storage into response") .map_err(|err| { diff --git a/api/src/runtime.rs b/api/src/runtime.rs index 3e32a2e8a8d08..d18575f3a4aba 100644 --- a/api/src/runtime.rs +++ b/api/src/runtime.rs @@ -10,6 +10,7 @@ use crate::{ }; use anyhow::Context as AnyhowContext; use aptos_config::config::{ApiConfig, NodeConfig}; +use aptos_db_indexer::table_info_reader::TableInfoReader; use aptos_logger::info; use aptos_mempool::MempoolClientSender; use aptos_storage_interface::DbReader; @@ -34,11 +35,12 @@ pub fn bootstrap( chain_id: ChainId, db: Arc, mp_sender: MempoolClientSender, + table_info_reader: Option>, ) -> anyhow::Result { let max_runtime_workers = get_max_runtime_workers(&config.api); let runtime = aptos_runtimes::spawn_named_runtime("api".into(), Some(max_runtime_workers)); - let context = Context::new(chain_id, db, mp_sender, config.clone()); + let context = Context::new(chain_id, db, mp_sender, config.clone(), table_info_reader); attach_poem_to_runtime(runtime.handle(), context.clone(), config, false) .context("Failed to attach poem to runtime")?; @@ -321,6 +323,7 @@ mod tests { ChainId::test(), context.db.clone(), context.mempool.ac_client.clone(), + None, ); assert!(ret.is_ok()); diff --git a/api/src/state.rs b/api/src/state.rs index 75621f679baa0..34c74b44d4a58 100644 --- a/api/src/state.rs +++ b/api/src/state.rs @@ -315,7 +315,10 @@ impl StateApi { AcceptType::Json => { let resource = state_view .as_move_resolver() - .as_converter(self.context.db.clone()) + .as_converter( + self.context.db.clone(), + self.context.table_info_reader.clone(), + ) .try_into_resource(&resource_type, &bytes) .context("Failed to deserialize resource data retrieved from DB") .map_err(|err| { @@ -421,7 +424,10 @@ impl StateApi { .state_view(ledger_version.map(|inner| inner.0))?; let resolver = state_view.as_move_resolver(); - let converter = resolver.as_converter(self.context.db.clone()); + let converter = resolver.as_converter( + self.context.db.clone(), + self.context.table_info_reader.clone(), + ); // Convert key to lookup version for DB let vm_key = converter diff --git a/api/src/tests/converter_test.rs b/api/src/tests/converter_test.rs index b72703ca21f2f..05be4aafbb11e 100644 --- a/api/src/tests/converter_test.rs +++ b/api/src/tests/converter_test.rs @@ -21,7 +21,7 @@ async fn test_value_conversion() { let state_view = context.latest_state_view(); let resolver = state_view.as_move_resolver(); - let converter = resolver.as_converter(context.db); + let converter = resolver.as_converter(context.db, None); assert_value_conversion(&converter, "u8", 1i32, VmMoveValue::U8(1)); assert_value_conversion(&converter, "u64", "1", VmMoveValue::U64(1)); diff --git a/api/src/transactions.rs b/api/src/transactions.rs index 965a58eda55fd..6b757545811ba 100644 --- a/api/src/transactions.rs +++ b/api/src/transactions.rs @@ -762,7 +762,10 @@ impl TransactionsApi { let timestamp = self.context.get_block_timestamp(ledger_info, txn.version)?; resolver - .as_converter(self.context.db.clone()) + .as_converter( + self.context.db.clone(), + self.context.table_info_reader.clone(), + ) .try_into_onchain_transaction(timestamp, txn) .context("Failed to convert on chain transaction to Transaction") .map_err(|err| { @@ -774,7 +777,10 @@ impl TransactionsApi { })? }, TransactionData::Pending(txn) => resolver - .as_converter(self.context.db.clone()) + .as_converter( + self.context.db.clone(), + self.context.table_info_reader.clone(), + ) .try_into_pending_transaction(*txn) .context("Failed to convert on pending transaction to Transaction") .map_err(|err| { @@ -946,7 +952,10 @@ impl TransactionsApi { .context .latest_state_view_poem(ledger_info)? .as_move_resolver() - .as_converter(self.context.db.clone()) + .as_converter( + self.context.db.clone(), + self.context.table_info_reader.clone(), + ) .try_into_signed_transaction_poem(data.0, self.context.chain_id()) .context("Failed to create SignedTransaction from SubmitTransactionRequest") .map_err(|err| { @@ -1025,7 +1034,7 @@ impl TransactionsApi { .map(|(index, txn)| { self.context .latest_state_view_poem(ledger_info)?.as_move_resolver() - .as_converter(self.context.db.clone()) + .as_converter(self.context.db.clone(), self.context.table_info_reader.clone()) .try_into_signed_transaction_poem(txn, self.context.chain_id()) .context(format!("Failed to create SignedTransaction from SubmitTransactionRequest at position {}", index)) .map_err(|err| { @@ -1117,7 +1126,7 @@ impl TransactionsApi { // We provide the pending transaction so that users have the hash associated let pending_txn = resolver - .as_converter(self.context.db.clone()) + .as_converter(self.context.db.clone(), self.context.table_info_reader.clone()) .try_into_pending_transaction_poem(txn) .context("Failed to build PendingTransaction from mempool response, even though it said the request was accepted") .map_err(|err| SubmitTransactionError::internal_with_code( @@ -1322,7 +1331,10 @@ impl TransactionsApi { let state_view = self.context.latest_state_view_poem(&ledger_info)?; let resolver = state_view.as_move_resolver(); let raw_txn: RawTransaction = resolver - .as_converter(self.context.db.clone()) + .as_converter( + self.context.db.clone(), + self.context.table_info_reader.clone(), + ) .try_into_raw_transaction_poem(request.transaction, self.context.chain_id()) .context("The given transaction is invalid") .map_err(|err| { diff --git a/api/src/view_function.rs b/api/src/view_function.rs index ade7915215be9..6f6e30118f526 100644 --- a/api/src/view_function.rs +++ b/api/src/view_function.rs @@ -97,7 +97,7 @@ fn view_request( ViewFunctionRequest::Json(data) => { let resolver = state_view.as_move_resolver(); resolver - .as_converter(context.db.clone()) + .as_converter(context.db.clone(), context.table_info_reader.clone()) .convert_view_function(data.0) .map_err(|err| { BasicErrorWith404::bad_request_with_code( @@ -171,7 +171,7 @@ fn view_request( AcceptType::Json => { let resolver = state_view.as_move_resolver(); let return_types = resolver - .as_converter(context.db.clone()) + .as_converter(context.db.clone(), context.table_info_reader.clone()) .function_return_types(&view_function) .and_then(|tys| { tys.into_iter() @@ -191,7 +191,7 @@ fn view_request( .zip(return_types.into_iter()) .map(|(v, ty)| { resolver - .as_converter(context.db.clone()) + .as_converter(context.db.clone(), context.table_info_reader.clone()) .try_into_move_value(&ty, &v) }) .collect::>>() diff --git a/api/test-context/src/test_context.rs b/api/test-context/src/test_context.rs index f93400202f75b..800d8bacee042 100644 --- a/api/test-context/src/test_context.rs +++ b/api/test-context/src/test_context.rs @@ -130,7 +130,6 @@ pub fn new_test_context( false, /* indexer */ BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, - false, /* indexer async v2 */ ) .unwrap(), ) @@ -146,6 +145,7 @@ pub fn new_test_context( db.clone(), mempool.ac_client.clone(), node_config.clone(), + None, /* table info reader */ ); // Configure the testing depending on which API version we're testing. diff --git a/api/types/Cargo.toml b/api/types/Cargo.toml index 7154f0cee7f58..b36f5698cda21 100644 --- a/api/types/Cargo.toml +++ b/api/types/Cargo.toml @@ -16,6 +16,7 @@ rust-version = { workspace = true } anyhow = { workspace = true } aptos-config = { workspace = true } aptos-crypto = { workspace = true } +aptos-db-indexer = { workspace = true } aptos-framework = { workspace = true } aptos-logger = { workspace = true } aptos-openapi = { workspace = true } @@ -35,3 +36,4 @@ poem-openapi = { workspace = true } poem-openapi-derive = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } + diff --git a/api/types/src/convert.rs b/api/types/src/convert.rs index d69c5f7a869de..8644112da6c6a 100644 --- a/api/types/src/convert.rs +++ b/api/types/src/convert.rs @@ -18,6 +18,7 @@ use crate::{ }; use anyhow::{bail, ensure, format_err, Context as AnyhowContext, Result}; use aptos_crypto::{hash::CryptoHash, HashValue}; +use aptos_db_indexer::table_info_reader::TableInfoReader; use aptos_storage_interface::DbReader; use aptos_types::{ access_path::{AccessPath, Path}, @@ -25,7 +26,7 @@ use aptos_types::{ contract_event::{ContractEvent, EventWithVersion}, state_store::{ state_key::{StateKey, StateKeyInner}, - table::TableHandle, + table::{TableHandle, TableInfo}, }, transaction::{ EntryFunction, ExecutionStatus, ModuleBundle, Multisig, RawTransaction, Script, @@ -62,13 +63,19 @@ const OBJECT_STRUCT: &IdentStr = ident_str!("Object"); pub struct MoveConverter<'a, R: ?Sized> { inner: MoveValueAnnotator<'a, R>, db: Arc, + table_info_reader: Option>, } impl<'a, R: ModuleResolver + ?Sized> MoveConverter<'a, R> { - pub fn new(inner: &'a R, db: Arc) -> Self { + pub fn new( + inner: &'a R, + db: Arc, + table_info_reader: Option>, + ) -> Self { Self { inner: MoveValueAnnotator::new(inner), db, + table_info_reader, } } @@ -422,19 +429,32 @@ impl<'a, R: ModuleResolver + ?Sized> MoveConverter<'a, R> { key: &[u8], value: &[u8], ) -> Result> { - if !self.db.indexer_enabled() && !self.db.indexer_async_v2_enabled() { - return Ok(None); - } - let table_info = match self.db.get_table_info(handle) { - Ok(ti) => ti, - Err(_) => { + // Define a closure to encapsulate the logic + let get_table_info = |handle| -> Result> { + if let Some(table_info_reader) = self.table_info_reader.as_ref() { + // If table_info_reader exists, use it to get table_info + Ok(table_info_reader.get_table_info(handle)?) + } else if self.db.indexer_enabled() { + // If indexer is enabled in db, get table_info from the db + Ok(Some(self.db.get_table_info(handle)?)) + } else { + // If neither condition is met, return None + Ok(None) + } + }; + + // Use the above closure to get table_info + let table_info = match get_table_info(handle)? { + Some(ti) => ti, + None => { aptos_logger::warn!( "Table info not found for handle {:?}, can't decode table item. OK for simulation", handle ); - return Ok(None); // if table item not found return None anyway to avoid crash + return Ok(None); }, }; + let key = self.try_into_move_value(&table_info.key_type, key)?; let value = self.try_into_move_value(&table_info.value_type, value)?; @@ -451,19 +471,33 @@ impl<'a, R: ModuleResolver + ?Sized> MoveConverter<'a, R> { handle: TableHandle, key: &[u8], ) -> Result> { - if !self.db.indexer_enabled() && !self.db.indexer_async_v2_enabled() { - return Ok(None); - } - let table_info = match self.db.get_table_info(handle) { - Ok(ti) => ti, - Err(_) => { + // Define a closure to encapsulate the logic + let get_table_info = |handle| -> Result> { + if let Some(table_info_reader) = self.table_info_reader.as_ref() { + // If table_info_reader exists, use it to get table_info + Ok(table_info_reader.get_table_info(handle)?) + } else if self.db.indexer_enabled() { + // If indexer is enabled in db, get table_info from the db + Ok(Some(self.db.get_table_info(handle)?)) + } else { + // If neither condition is met, return None + Ok(None) + } + }; + + // Use the closure to get table_info + let table_info = match get_table_info(handle)? { + Some(ti) => ti, + None => { aptos_logger::warn!( "Table info not found for handle {:?}, can't decode table item. OK for simulation", handle ); - return Ok(None); // if table item not found return None anyway to avoid crash + return Ok(None); }, }; + + // Continue processing with the obtained table_info let key = self.try_into_move_value(&table_info.key_type, key)?; Ok(Some(DeletedTableData { @@ -934,12 +968,20 @@ impl<'a, R: ModuleResolver + ?Sized> ExplainVMStatus for MoveConverter<'a, R> { } } pub trait AsConverter { - fn as_converter(&self, db: Arc) -> MoveConverter; + fn as_converter( + &self, + db: Arc, + table_info_reader: Option>, + ) -> MoveConverter; } impl AsConverter for R { - fn as_converter(&self, db: Arc) -> MoveConverter { - MoveConverter::new(self, db) + fn as_converter( + &self, + db: Arc, + table_info_reader: Option>, + ) -> MoveConverter { + MoveConverter::new(self, db, table_info_reader) } } diff --git a/aptos-move/aptos-transactional-test-harness/src/aptos_test_harness.rs b/aptos-move/aptos-transactional-test-harness/src/aptos_test_harness.rs index effee015f0f4e..9a25a4cabf246 100644 --- a/aptos-move/aptos-transactional-test-harness/src/aptos_test_harness.rs +++ b/aptos-move/aptos-transactional-test-harness/src/aptos_test_harness.rs @@ -919,7 +919,7 @@ impl<'a> MoveTestAdapter<'a> for AptosTestAdapter<'a> { }, AptosSubCommand::ViewTableCommand(view_table_cmd) => { let resolver = self.storage.as_move_resolver(); - let converter = resolver.as_converter(Arc::new(FakeDbReader {})); + let converter = resolver.as_converter(Arc::new(FakeDbReader {}), None); let vm_key = converter .try_into_vm_value(&view_table_cmd.key_type, view_table_cmd.key_value) diff --git a/aptos-move/aptos-validator-interface/src/storage_interface.rs b/aptos-move/aptos-validator-interface/src/storage_interface.rs index ca914522f8328..f9ed2da2a90a4 100644 --- a/aptos-move/aptos-validator-interface/src/storage_interface.rs +++ b/aptos-move/aptos-validator-interface/src/storage_interface.rs @@ -32,7 +32,6 @@ impl DBDebuggerInterface { false, /* indexer */ BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, - false, /* indexer async v2 */ ) .map_err(anyhow::Error::from)?, ))) diff --git a/aptos-node/Cargo.toml b/aptos-node/Cargo.toml index 2b5ca01938778..675902c2d26a4 100644 --- a/aptos-node/Cargo.toml +++ b/aptos-node/Cargo.toml @@ -28,6 +28,7 @@ aptos-crypto = { workspace = true } aptos-data-client = { workspace = true } aptos-data-streaming-service = { workspace = true } aptos-db = { workspace = true } +aptos-db-indexer = { workspace = true } aptos-dkg-runtime = { workspace = true } aptos-event-notifications = { workspace = true } aptos-executor = { workspace = true } diff --git a/aptos-node/src/services.rs b/aptos-node/src/services.rs index 2562a6d848086..ad2c37a914c0f 100644 --- a/aptos-node/src/services.rs +++ b/aptos-node/src/services.rs @@ -11,6 +11,7 @@ use aptos_consensus::{ }; use aptos_consensus_notifications::ConsensusNotifier; use aptos_data_client::client::AptosDataClient; +use aptos_db_indexer::table_info_reader::TableInfoReader; use aptos_event_notifications::{DbBackedOnChainConfig, ReconfigNotificationListener}; use aptos_indexer_grpc_fullnode::runtime::bootstrap as bootstrap_indexer_grpc; use aptos_indexer_grpc_table_info::runtime::bootstrap as bootstrap_indexer_table_info; @@ -52,20 +53,28 @@ pub fn bootstrap_api_and_indexer( let (mempool_client_sender, mempool_client_receiver) = mpsc::channel(AC_SMP_CHANNEL_BUFFER_SIZE); - let indexer_table_info = bootstrap_indexer_table_info( + let (indexer_table_info_runtime, indexer_async_v2) = match bootstrap_indexer_table_info( node_config, chain_id, db_rw.clone(), mempool_client_sender.clone(), - ); + ) { + Some((runtime, indexer_v2)) => (Some(runtime), Some(indexer_v2)), + None => (None, None), + }; // Create the API runtime + let table_info_reader: Option> = indexer_async_v2.map(|arc| { + let trait_object: Arc = arc; + trait_object + }); let api_runtime = if node_config.api.enabled { Some(bootstrap_api( node_config, chain_id, db_rw.reader.clone(), mempool_client_sender.clone(), + table_info_reader.clone(), )?) } else { None @@ -77,6 +86,7 @@ pub fn bootstrap_api_and_indexer( chain_id, db_rw.reader.clone(), mempool_client_sender.clone(), + table_info_reader, ); // Create the indexer runtime @@ -90,7 +100,7 @@ pub fn bootstrap_api_and_indexer( Ok(( mempool_client_receiver, api_runtime, - indexer_table_info, + indexer_table_info_runtime, indexer_runtime, indexer_grpc, )) diff --git a/crates/aptos-genesis/src/lib.rs b/crates/aptos-genesis/src/lib.rs index 1eeeea0f8e178..73633e2d2c73c 100644 --- a/crates/aptos-genesis/src/lib.rs +++ b/crates/aptos-genesis/src/lib.rs @@ -156,7 +156,6 @@ impl GenesisInfo { false, /* indexer */ BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, - false, /* indexer async v2 */ )?; let db_rw = DbReaderWriter::new(aptosdb); aptos_executor::db_bootstrapper::generate_waypoint::(&db_rw, genesis) diff --git a/crates/aptos-genesis/src/mainnet.rs b/crates/aptos-genesis/src/mainnet.rs index d3cbb9c15cce7..7e1aa674dd72d 100644 --- a/crates/aptos-genesis/src/mainnet.rs +++ b/crates/aptos-genesis/src/mainnet.rs @@ -141,7 +141,6 @@ impl MainnetGenesisInfo { false, /* indexer */ BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, - false, /* indexer async v2 */ )?; let db_rw = DbReaderWriter::new(aptosdb); aptos_executor::db_bootstrapper::generate_waypoint::(&db_rw, genesis) diff --git a/crates/indexer/src/indexer/fetcher.rs b/crates/indexer/src/indexer/fetcher.rs index 42f775256b236..c565ddf516eab 100644 --- a/crates/indexer/src/indexer/fetcher.rs +++ b/crates/indexer/src/indexer/fetcher.rs @@ -244,7 +244,7 @@ async fn fetch_nexts( let state_view = context.latest_state_view().unwrap(); let resolver = state_view.as_move_resolver(); - let converter = resolver.as_converter(context.db.clone()); + let converter = resolver.as_converter(context.db.clone(), context.table_info_reader.clone()); let mut transactions = vec![]; for (ind, raw_txn) in raw_txns.into_iter().enumerate() { diff --git a/crates/indexer/src/runtime.rs b/crates/indexer/src/runtime.rs index 14067021107f7..c3bf5a0516c0d 100644 --- a/crates/indexer/src/runtime.rs +++ b/crates/indexer/src/runtime.rs @@ -90,7 +90,13 @@ pub fn bootstrap( let node_config = config.clone(); runtime.spawn(async move { - let context = Arc::new(Context::new(chain_id, db, mp_sender, node_config)); + let context = Arc::new(Context::new( + chain_id, + db, + mp_sender, + node_config, + None, /* table info reader */ + )); run_forever(indexer_config, context).await; }); diff --git a/ecosystem/indexer-grpc/indexer-grpc-fullnode/Cargo.toml b/ecosystem/indexer-grpc/indexer-grpc-fullnode/Cargo.toml index e687cf779d6c1..03a78588be33e 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-fullnode/Cargo.toml +++ b/ecosystem/indexer-grpc/indexer-grpc-fullnode/Cargo.toml @@ -35,6 +35,7 @@ aptos-api = { workspace = true } aptos-api-types = { workspace = true } aptos-bitvec = { workspace = true } aptos-config = { workspace = true } +aptos-db-indexer = { workspace = true } aptos-indexer-grpc-utils = { workspace = true } aptos-logger = { workspace = true } aptos-mempool = { workspace = true } diff --git a/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs b/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs index 79a92569572cb..3b687b5c8ed2b 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs @@ -7,6 +7,7 @@ use crate::{ }; use aptos_api::context::Context; use aptos_config::config::NodeConfig; +use aptos_db_indexer::table_info_reader::TableInfoReader; use aptos_logger::info; use aptos_mempool::MempoolClientSender; use aptos_protos::{ @@ -34,6 +35,7 @@ pub fn bootstrap( chain_id: ChainId, db: Arc, mp_sender: MempoolClientSender, + table_info_reader: Option>, ) -> Option { if !config.indexer_grpc.enabled { return None; @@ -50,7 +52,13 @@ pub fn bootstrap( let output_batch_size = node_config.indexer_grpc.output_batch_size; runtime.spawn(async move { - let context = Arc::new(Context::new(chain_id, db, mp_sender, node_config)); + let context = Arc::new(Context::new( + chain_id, + db, + mp_sender, + node_config, + table_info_reader, + )); let service_context = ServiceContext { context: context.clone(), processor_task_count, diff --git a/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/stream_coordinator.rs b/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/stream_coordinator.rs index 3e0793b4271f9..e35a0c00af230 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/stream_coordinator.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/stream_coordinator.rs @@ -340,7 +340,8 @@ impl IndexerStreamCoordinator { let first_version = raw_txns.first().map(|txn| txn.version).unwrap(); let state_view = context.latest_state_view().unwrap(); let resolver = state_view.as_move_resolver(); - let converter = resolver.as_converter(context.db.clone()); + let converter = + resolver.as_converter(context.db.clone(), context.table_info_reader.clone()); // Enrich data with block metadata let (_, _, block_event) = context diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/Cargo.toml b/ecosystem/indexer-grpc/indexer-grpc-table-info/Cargo.toml index b7f4496ec9f64..d739b3b042c48 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/Cargo.toml +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/Cargo.toml @@ -34,43 +34,18 @@ aptos-api = { workspace = true } aptos-api-types = { workspace = true } aptos-bitvec = { workspace = true } aptos-config = { workspace = true } +aptos-db = { workspace = true } +aptos-db-indexer = { workspace = true } aptos-indexer-grpc-fullnode = { workspace = true } aptos-indexer-grpc-utils = { workspace = true } aptos-logger = { workspace = true } aptos-mempool = { workspace = true } aptos-metrics-core = { workspace = true } -aptos-moving-average = { workspace = true } aptos-protos = { workspace = true } aptos-runtimes = { workspace = true } -aptos-sdk = { workspace = true } +aptos-schemadb = { workspace = true } aptos-storage-interface = { workspace = true } aptos-types = { workspace = true } -aptos-vm = { workspace = true } - -move-binary-format = { workspace = true } -move-core-types = { workspace = true } -move-package = { workspace = true } - -[dev-dependencies] -goldenfile = { workspace = true } -rand = { workspace = true } -regex = { workspace = true } - -aptos-api-test-context = { workspace = true } -aptos-crypto = { workspace = true } -aptos-db = { workspace = true } -aptos-executor = { workspace = true } -aptos-executor-types = { workspace = true } -aptos-framework = { workspace = true } -aptos-genesis = { workspace = true } -aptos-global-constants = { workspace = true } -aptos-mempool = { workspace = true } -aptos-mempool-notifications = { workspace = true } -aptos-proptest-helpers = { workspace = true } -aptos-secure-storage = { workspace = true } -aptos-temppath = { workspace = true } -aptos-vm = { workspace = true } -aptos-vm-validator = { workspace = true } [features] failpoints = ["fail/failpoints"] diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs index 969f9757f563d..cb5aa9531e880 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs @@ -4,48 +4,65 @@ use crate::table_info_service::TableInfoService; use aptos_api::context::Context; use aptos_config::config::NodeConfig; +use aptos_db_indexer::{db_ops::open_db, db_v2::IndexerAsyncV2}; use aptos_mempool::MempoolClientSender; use aptos_storage_interface::DbReaderWriter; use aptos_types::chain_id::ChainId; use std::sync::Arc; use tokio::runtime::Runtime; +const INDEX_ASYNC_V2_DB_NAME: &str = "index_indexer_async_v2_db"; + /// Creates a runtime which creates a thread pool which sets up fullnode indexer table info service /// Returns corresponding Tokio runtime pub fn bootstrap( config: &NodeConfig, chain_id: ChainId, - db: DbReaderWriter, + db_rw: DbReaderWriter, mp_sender: MempoolClientSender, -) -> Option { +) -> Option<(Runtime, Arc)> { if !config.indexer_table_info.enabled { return None; } let runtime = aptos_runtimes::spawn_named_runtime("table-info".to_string(), None); + // Set up db config and open up the db initially to read metadata let node_config = config.clone(); - let parser_task_count = node_config.indexer_table_info.parser_task_count; - let parser_batch_size = node_config.indexer_table_info.parser_batch_size; - let enable_expensive_logging = node_config.indexer_table_info.enable_expensive_logging; - let next_version = db.reader.get_indexer_async_v2_next_version().unwrap(); + let db_path = node_config + .storage + .get_dir_paths() + .default_root_path() + .join(INDEX_ASYNC_V2_DB_NAME); + let rocksdb_config = node_config.storage.rocksdb_configs.index_db_config; + let db = + open_db(db_path, &rocksdb_config).expect("Failed to open up indexer async v2 db initially"); + + let indexer_async_v2 = + Arc::new(IndexerAsyncV2::new(db).expect("Failed to initialize indexer async v2")); + let indexer_async_v2_clone = Arc::clone(&indexer_async_v2); // Spawn the runtime for table info parsing runtime.spawn(async move { let context = Arc::new(Context::new( chain_id, - db.reader.clone(), + db_rw.reader.clone(), mp_sender, - node_config, + node_config.clone(), + None, )); + let mut parser = TableInfoService::new( context, - next_version, - parser_task_count, - parser_batch_size, - enable_expensive_logging, + indexer_async_v2_clone.next_version(), + node_config.indexer_table_info.parser_task_count, + node_config.indexer_table_info.parser_batch_size, + node_config.indexer_table_info.enable_expensive_logging, + indexer_async_v2_clone, ); - parser.run(db.clone()).await + + parser.run().await; }); - Some(runtime) + + Some((runtime, indexer_async_v2)) } diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/table_info_service.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/table_info_service.rs index 4d68ba98a9342..af965fac7b993 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/table_info_service.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/table_info_service.rs @@ -4,12 +4,12 @@ use anyhow::Error; use aptos_api::context::Context; use aptos_api_types::TransactionOnChainData; +use aptos_db_indexer::db_v2::IndexerAsyncV2; use aptos_indexer_grpc_fullnode::stream_coordinator::{ IndexerStreamCoordinator, TransactionBatchInfo, }; use aptos_indexer_grpc_utils::counters::{log_grpc_step, IndexerGrpcStep}; use aptos_logger::{debug, error, info, sample, sample::SampleRate}; -use aptos_storage_interface::{DbReaderWriter, DbWriter}; use aptos_types::write_set::WriteSet; use std::{sync::Arc, time::Duration}; use tonic::Status; @@ -24,6 +24,7 @@ pub struct TableInfoService { pub parser_batch_size: u16, pub context: Arc, pub enable_expensive_logging: bool, + pub indexer_async_v2: Arc, } impl TableInfoService { @@ -33,6 +34,7 @@ impl TableInfoService { parser_task_count: u16, parser_batch_size: u16, enable_expensive_logging: bool, + indexer_async_v2: Arc, ) -> Self { Self { current_version: request_start_version, @@ -40,6 +42,7 @@ impl TableInfoService { parser_batch_size, context, enable_expensive_logging, + indexer_async_v2, } } @@ -49,13 +52,13 @@ impl TableInfoService { /// 4. write parsed table info to rocksdb /// 5. after all batches from the loop complete, if pending on items not empty, move on to 6, otherwise, start from 1 again /// 6. retry all the txns in the loop sequentially to clean up the pending on items - pub async fn run(&mut self, db: DbReaderWriter) { + pub async fn run(&mut self) { loop { let start_time = std::time::Instant::now(); let ledger_version = self.get_highest_known_version().await.unwrap_or_default(); let batches = self.get_batches(ledger_version).await; let results = self - .process_multiple_batches(db.clone(), batches, ledger_version) + .process_multiple_batches(self.indexer_async_v2.clone(), batches, ledger_version) .await; let max_version = self.get_max_batch_version(results).unwrap_or_default(); let versions_processed = max_version - self.current_version + 1; @@ -85,18 +88,17 @@ impl TableInfoService { /// 2. Get write sets from transactions and parse write sets to get handle -> key,value type mapping, write the mapping to the rocksdb async fn process_multiple_batches( &self, - db: DbReaderWriter, + indexer_async_v2: Arc, batches: Vec, ledger_version: u64, ) -> Vec> { let mut tasks = vec![]; - let db_writer = db.writer.clone(); let context = self.context.clone(); for batch in batches.iter().cloned() { let task = tokio::spawn(Self::process_single_batch( context.clone(), - db_writer.clone(), + indexer_async_v2.clone(), ledger_version, batch, false, /* end_early_if_pending_on_empty */ @@ -115,8 +117,7 @@ impl TableInfoService { last_batch.start_version + last_batch.num_transactions_to_fetch as u64; // Clean up pending on items across threads - db.writer - .clone() + self.indexer_async_v2 .cleanup_pending_on_items() .expect("[Table Info] Failed to clean up the pending on items"); @@ -126,12 +127,7 @@ impl TableInfoService { // // Risk of this sequential approach is that it could be slow when the txns to process contain extremely // nested table items, but the risk is bounded by the the configuration of the number of txns to process and number of threads - if !db - .reader - .clone() - .is_indexer_async_v2_pending_on_empty() - .unwrap_or(false) - { + if !self.indexer_async_v2.is_indexer_async_v2_pending_on_empty() { let retry_batch = TransactionBatchInfo { start_version: self.current_version, num_transactions_to_fetch: total_txns_to_process as u16, @@ -140,7 +136,7 @@ impl TableInfoService { Self::process_single_batch( context.clone(), - db_writer, + indexer_async_v2.clone(), ledger_version, retry_batch, true, /* end_early_if_pending_on_empty */ @@ -151,16 +147,12 @@ impl TableInfoService { } assert!( - db.reader - .clone() - .is_indexer_async_v2_pending_on_empty() - .unwrap_or(false), + self.indexer_async_v2.is_indexer_async_v2_pending_on_empty(), "Missing data in table info parsing after sequential retry" ); // Update rocksdb's to be processed next version after verifying all txns are successfully parsed - db.writer - .clone() + self.indexer_async_v2 .update_next_version(end_version + 1) .unwrap(); @@ -179,7 +171,7 @@ impl TableInfoService { /// if pending on items are not empty async fn process_single_batch( context: Arc, - db_writer: Arc, + indexer_async_v2: Arc, ledger_version: u64, batch: TransactionBatchInfo, end_early_if_pending_on_empty: bool, @@ -197,7 +189,7 @@ impl TableInfoService { Self::parse_table_info( context.clone(), raw_txns.clone(), - db_writer.clone(), + indexer_async_v2, end_early_if_pending_on_empty, ) .expect("[Table Info] Failed to parse table info"); @@ -270,7 +262,7 @@ impl TableInfoService { fn parse_table_info( context: Arc, raw_txns: Vec, - db_writer: Arc, + indexer_async_v2: Arc, end_early_if_pending_on_empty: bool, ) -> Result<(), Error> { if raw_txns.is_empty() { @@ -281,7 +273,7 @@ impl TableInfoService { let first_version = raw_txns.first().map(|txn| txn.version).unwrap(); let write_sets: Vec = raw_txns.iter().map(|txn| txn.changes.clone()).collect(); let write_sets_slice: Vec<&WriteSet> = write_sets.iter().collect(); - db_writer + indexer_async_v2 .index_table_info( context.db.clone(), first_version, diff --git a/execution/executor-benchmark/src/db_generator.rs b/execution/executor-benchmark/src/db_generator.rs index 2930e0fe875d9..a9fe87b29fae8 100644 --- a/execution/executor-benchmark/src/db_generator.rs +++ b/execution/executor-benchmark/src/db_generator.rs @@ -74,7 +74,6 @@ fn bootstrap_with_genesis(db_dir: impl AsRef, enable_storage_sharding: boo false, /* indexer */ BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, - false, /* indexer async v2 */ ) .expect("DB should open."), ); diff --git a/execution/executor-benchmark/src/lib.rs b/execution/executor-benchmark/src/lib.rs index cee8e602c7bb1..b6fbeaf87054b 100644 --- a/execution/executor-benchmark/src/lib.rs +++ b/execution/executor-benchmark/src/lib.rs @@ -66,7 +66,6 @@ where false, config.storage.buffered_state_target_items, config.storage.max_num_nodes_per_lru_cache_shard, - false, ) .expect("DB should open."), ); diff --git a/state-sync/state-sync-driver/src/tests/driver_factory.rs b/state-sync/state-sync-driver/src/tests/driver_factory.rs index 7a7df57591ce6..13d88dd66ba9c 100644 --- a/state-sync/state-sync-driver/src/tests/driver_factory.rs +++ b/state-sync/state-sync-driver/src/tests/driver_factory.rs @@ -40,7 +40,6 @@ fn test_new_initialized_configs() { false, /* indexer */ BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, - false, /* indexer async v2 */ ) .unwrap(); let (_, db_rw) = DbReaderWriter::wrap(db); diff --git a/storage/aptosdb/src/db/aptosdb_test.rs b/storage/aptosdb/src/db/aptosdb_test.rs index 0ad8e5b52b95b..f57be62579a64 100644 --- a/storage/aptosdb/src/db/aptosdb_test.rs +++ b/storage/aptosdb/src/db/aptosdb_test.rs @@ -205,7 +205,6 @@ pub fn test_state_merkle_pruning_impl( false, /* enable_indexer */ BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, - false, /* enable_indexer_async_v2 */ ) .unwrap(); diff --git a/storage/aptosdb/src/db/include/aptosdb_internal.rs b/storage/aptosdb/src/db/include/aptosdb_internal.rs index b44cb4a6c7de2..5337cdf99223c 100644 --- a/storage/aptosdb/src/db/include/aptosdb_internal.rs +++ b/storage/aptosdb/src/db/include/aptosdb_internal.rs @@ -56,7 +56,6 @@ impl AptosDB { ledger_commit_lock: std::sync::Mutex::new(()), indexer: None, skip_index_and_usage, - indexer_async_v2: None, } } @@ -69,7 +68,6 @@ impl AptosDB { buffered_state_target_items: usize, max_num_nodes_per_lru_cache_shard: usize, empty_buffered_state_for_restore: bool, - enable_indexer_async_v2: bool, ) -> Result { ensure!( pruner_config.eq(&NO_OP_STORAGE_PRUNER_CONFIG) || !readonly, @@ -101,13 +99,6 @@ impl AptosDB { )?; } - if enable_indexer_async_v2 { - myself.open_indexer_async_v2( - db_paths.default_root_path(), - rocksdb_configs.index_db_config, - )?; - } - Ok(myself) } @@ -153,16 +144,6 @@ impl AptosDB { Ok(()) } - fn open_indexer_async_v2( - &mut self, - db_root_path: impl AsRef, - rocksdb_config: RocksdbConfig, - ) -> Result<()> { - let indexer_async_v2 = IndexerAsyncV2::open(db_root_path, rocksdb_config, DashMap::new())?; - self.indexer_async_v2 = Some(indexer_async_v2); - Ok(()) - } - #[cfg(any(test, feature = "fuzzing"))] fn new_without_pruner + Clone>( db_root_path: P, @@ -170,7 +151,6 @@ impl AptosDB { buffered_state_target_items: usize, max_num_nodes_per_lru_cache_shard: usize, enable_indexer: bool, - enable_indexer_async_v2: bool, ) -> Self { Self::open( StorageDirPaths::from_path(db_root_path), @@ -180,7 +160,6 @@ impl AptosDB { enable_indexer, buffered_state_target_items, max_num_nodes_per_lru_cache_shard, - enable_indexer_async_v2, ) .expect("Unable to open AptosDB") } diff --git a/storage/aptosdb/src/db/include/aptosdb_reader.rs b/storage/aptosdb/src/db/include/aptosdb_reader.rs index 78b4174604f08..95ce8b0a4f113 100644 --- a/storage/aptosdb/src/db/include/aptosdb_reader.rs +++ b/storage/aptosdb/src/db/include/aptosdb_reader.rs @@ -817,11 +817,6 @@ impl DbReader for AptosDB { self.indexer.is_some() } - /// Returns whether the indexer async v2 DB has been enabled or not - fn indexer_async_v2_enabled(&self) -> bool { - self.indexer_async_v2.is_some() - } - fn get_state_storage_usage(&self, version: Option) -> Result { gauged_api("get_state_storage_usage", || { if let Some(v) = version { @@ -830,28 +825,6 @@ impl DbReader for AptosDB { self.state_store.get_usage(version) }) } - - /// Returns the next version for indexer async v2 to be processed - /// It is mainly used by table info service to decide the start version - fn get_indexer_async_v2_next_version(&self) -> Result { - gauged_api("get_indexer_async_v2_next_version", || { - Ok(self - .indexer_async_v2 - .as_ref() - .map(|indexer| indexer.next_version()) - .unwrap_or(0)) - }) - } - - fn is_indexer_async_v2_pending_on_empty(&self) -> Result { - gauged_api("is_indexer_async_v2_pending_on_empty", || { - Ok(self - .indexer_async_v2 - .as_ref() - .map(|indexer| indexer.is_indexer_async_v2_pending_on_empty()) - .unwrap_or(false)) - }) - } } impl AptosDB { @@ -1026,26 +999,8 @@ impl AptosDB { Ok(events_with_version) } - fn get_table_info_option(&self, handle: TableHandle) -> Result> { - if self.indexer_async_v2_enabled() { - return self.get_table_info_from_indexer_async_v2(handle); - } - - self.get_table_info_from_indexer(handle) - } - - fn get_table_info_from_indexer_async_v2( - &self, - handle: TableHandle, - ) -> Result> { - match &self.indexer_async_v2 { - Some(indexer_async_v2) => indexer_async_v2.get_table_info_with_retry(handle), - None => bail!("Indexer Async V2 not enabled."), - } - } - /// TODO(jill): deprecate Indexer once Indexer Async V2 is ready - fn get_table_info_from_indexer(&self, handle: TableHandle) -> Result> { + fn get_table_info_option(&self, handle: TableHandle) -> Result> { match &self.indexer { Some(indexer) => indexer.get_table_info(handle), None => bail!("Indexer not enabled."), diff --git a/storage/aptosdb/src/db/include/aptosdb_testonly.rs b/storage/aptosdb/src/db/include/aptosdb_testonly.rs index 4d0c402132b03..3dea085136ca7 100644 --- a/storage/aptosdb/src/db/include/aptosdb_testonly.rs +++ b/storage/aptosdb/src/db/include/aptosdb_testonly.rs @@ -17,7 +17,6 @@ impl AptosDB { BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, false, /* indexer */ - false, /* indexer async v2 */ ) } @@ -38,14 +37,13 @@ impl AptosDB { false, /* indexer */ BUFFERED_STATE_TARGET_ITEMS, max_node_cache, - false, /* indexer async v2 */ ) .expect("Unable to open AptosDB") } /// This opens db in non-readonly mode, without the pruner and cache. pub fn new_for_test_no_cache + Clone>(db_root_path: P) -> Self { - Self::new_without_pruner(db_root_path, false, BUFFERED_STATE_TARGET_ITEMS, 0, false, false) + Self::new_without_pruner(db_root_path, false, BUFFERED_STATE_TARGET_ITEMS, 0, false) } /// This opens db in non-readonly mode, without the pruner, and with the indexer @@ -56,7 +54,6 @@ impl AptosDB { BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, true, /* indexer */ - true, /* indexer async v2 */ ) } @@ -71,7 +68,6 @@ impl AptosDB { buffered_state_target_items, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, false, /* indexer */ - false, /* indexer async v2 */ ) } @@ -83,7 +79,6 @@ impl AptosDB { BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, false, /* indexer */ - false, /* indexer async v2 */ ) } diff --git a/storage/aptosdb/src/db/include/aptosdb_writer.rs b/storage/aptosdb/src/db/include/aptosdb_writer.rs index 6c6f8ec42af57..3f2e273d4fda1 100644 --- a/storage/aptosdb/src/db/include/aptosdb_writer.rs +++ b/storage/aptosdb/src/db/include/aptosdb_writer.rs @@ -198,47 +198,6 @@ impl DbWriter for AptosDB { Ok(()) }) } - - /// Open up dbwriter for table info indexing on indexer async v2 rocksdb - fn index_table_info( - &self, - db_reader: Arc, - first_version: Version, - write_sets: &[&WriteSet], - end_early_if_pending_on_empty: bool, - ) -> Result<()> { - gauged_api("index_table_info", || { - self.indexer_async_v2 - .as_ref() - .map(|indexer| { - indexer.index_table_info( - db_reader, - first_version, - write_sets, - end_early_if_pending_on_empty, - ) - }) - .unwrap_or(Ok(())) - }) - } - - fn cleanup_pending_on_items(&self) -> Result<()> { - gauged_api("cleanup_pending_on_items", || { - self.indexer_async_v2 - .as_ref() - .map(|indexer| indexer.cleanup_pending_on_items()) - .unwrap_or(Ok(())) - }) - } - - fn update_next_version(&self, end_version: u64) -> Result<()> { - gauged_api("update_next_version", || { - self.indexer_async_v2 - .as_ref() - .map(|indexer| indexer.update_next_version(end_version)) - .unwrap_or(Ok(())) - }) - } } impl AptosDB { diff --git a/storage/aptosdb/src/db/mod.rs b/storage/aptosdb/src/db/mod.rs index e209899cf27ec..3008f9025b88d 100644 --- a/storage/aptosdb/src/db/mod.rs +++ b/storage/aptosdb/src/db/mod.rs @@ -30,7 +30,7 @@ use aptos_config::config::{ PrunerConfig, RocksdbConfig, RocksdbConfigs, StorageDirPaths, NO_OP_STORAGE_PRUNER_CONFIG, }; use aptos_crypto::HashValue; -use aptos_db_indexer::{db_v2::IndexerAsyncV2, Indexer}; +use aptos_db_indexer::Indexer; use aptos_experimental_runtimes::thread_manager::{optimal_min_len, THREAD_MANAGER}; use aptos_logger::prelude::*; use aptos_metrics_core::TimerHelper; @@ -72,7 +72,6 @@ use aptos_types::{ write_set::WriteSet, }; use aptos_vm::data_cache::AsMoveResolver; -use dashmap::DashMap; use move_resource_viewer::MoveValueAnnotator; use rayon::prelude::*; use std::{ @@ -101,7 +100,6 @@ pub struct AptosDB { ledger_commit_lock: std::sync::Mutex<()>, indexer: Option, skip_index_and_usage: bool, - indexer_async_v2: Option, } // DbReader implementations and private functions used by them. @@ -123,7 +121,6 @@ impl AptosDB { enable_indexer: bool, buffered_state_target_items: usize, max_num_nodes_per_lru_cache_shard: usize, - enable_indexer_async_v2: bool, ) -> Result { Self::open_internal( &db_paths, @@ -134,7 +131,6 @@ impl AptosDB { buffered_state_target_items, max_num_nodes_per_lru_cache_shard, false, - enable_indexer_async_v2, ) } @@ -146,7 +142,6 @@ impl AptosDB { enable_indexer: bool, buffered_state_target_items: usize, max_num_nodes_per_lru_cache_shard: usize, - enable_indexer_async_v2: bool, ) -> Result { Self::open_internal( &db_paths, @@ -157,7 +152,6 @@ impl AptosDB { buffered_state_target_items, max_num_nodes_per_lru_cache_shard, true, - enable_indexer_async_v2, ) } diff --git a/storage/aptosdb/src/fast_sync_storage_wrapper.rs b/storage/aptosdb/src/fast_sync_storage_wrapper.rs index d191810c6b24d..d29bf307ef25e 100644 --- a/storage/aptosdb/src/fast_sync_storage_wrapper.rs +++ b/storage/aptosdb/src/fast_sync_storage_wrapper.rs @@ -49,7 +49,6 @@ impl FastSyncStorageWrapper { config.storage.enable_indexer, config.storage.buffered_state_target_items, config.storage.max_num_nodes_per_lru_cache_shard, - config.indexer_table_info.enabled, ) .map_err(|err| anyhow!("fast sync DB failed to open {}", err))?; @@ -76,7 +75,6 @@ impl FastSyncStorageWrapper { config.storage.enable_indexer, config.storage.buffered_state_target_items, config.storage.max_num_nodes_per_lru_cache_shard, - config.indexer_table_info.enabled, ) .map_err(|err| anyhow!("Secondary DB failed to open {}", err))?; diff --git a/storage/aptosdb/src/schema/db_metadata/mod.rs b/storage/aptosdb/src/schema/db_metadata/mod.rs index 4a8ebf2fbb49d..10782d5085b4c 100644 --- a/storage/aptosdb/src/schema/db_metadata/mod.rs +++ b/storage/aptosdb/src/schema/db_metadata/mod.rs @@ -22,7 +22,7 @@ type ShardId = usize; #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] #[cfg_attr(any(test, feature = "fuzzing"), derive(proptest_derive::Arbitrary))] -pub(crate) enum DbMetadataValue { +pub enum DbMetadataValue { Version(Version), StateSnapshotProgress(StateSnapshotProgress), } diff --git a/storage/backup/backup-cli/src/utils/mod.rs b/storage/backup/backup-cli/src/utils/mod.rs index a3b9bf33ae150..f737d3d868b36 100644 --- a/storage/backup/backup-cli/src/utils/mod.rs +++ b/storage/backup/backup-cli/src/utils/mod.rs @@ -291,7 +291,6 @@ impl TryFrom for GlobalRestoreOptions { false, /* indexer */ BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, - false, /* indexer async v2 */ )?) .get_restore_handler(); diff --git a/storage/db-tool/src/bootstrap.rs b/storage/db-tool/src/bootstrap.rs index 74ce3cb605b3d..46aed10777957 100644 --- a/storage/db-tool/src/bootstrap.rs +++ b/storage/db-tool/src/bootstrap.rs @@ -57,7 +57,6 @@ impl Command { false, /* indexer */ BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, - false, /* indexer async v2 */ ) .expect("Failed to open DB."); let db = DbReaderWriter::new(db); diff --git a/storage/db-tool/src/replay_verify.rs b/storage/db-tool/src/replay_verify.rs index deafc78a6445e..21c1bd817e400 100644 --- a/storage/db-tool/src/replay_verify.rs +++ b/storage/db-tool/src/replay_verify.rs @@ -69,7 +69,6 @@ impl Opt { false, /* indexer */ BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, - false, /* indexer async v2 */ )?) .get_restore_handler(); let ret = ReplayVerifyCoordinator::new( diff --git a/storage/indexer/src/db_ops.rs b/storage/indexer/src/db_ops.rs new file mode 100644 index 0000000000000..65d1ae2b4ef73 --- /dev/null +++ b/storage/indexer/src/db_ops.rs @@ -0,0 +1,45 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::schema::column_families; +use anyhow::Result; +use aptos_config::config::RocksdbConfig; +use aptos_rocksdb_options::gen_rocksdb_options; +use aptos_schemadb::{ + schema::{KeyCodec, Schema, ValueCodec}, + SchemaBatch, DB, +}; +use std::{mem, path::Path}; + +pub fn open_db>(db_path: P, rocksdb_config: &RocksdbConfig) -> Result { + Ok(DB::open( + db_path, + "index_asnync_v2_db", + column_families(), + &gen_rocksdb_options(rocksdb_config, false), + )?) +} + +pub fn close_db(db: DB) { + mem::drop(db) +} + +pub fn read_db(db: &DB, key: &K) -> Result> +where + K: KeyCodec, + V: ValueCodec, + S: Schema, +{ + Ok(db.get::(key)?) +} + +pub fn write_db(db: &DB, key: K, value: V) -> Result<()> +where + K: KeyCodec, + V: ValueCodec, + S: Schema, +{ + let batch = SchemaBatch::new(); + batch.put::(&key, &value)?; + Ok(db.write_schemas(batch)?) +} diff --git a/storage/indexer/src/db_v2.rs b/storage/indexer/src/db_v2.rs index 7c60a758ff671..f7d7880e7db99 100644 --- a/storage/indexer/src/db_v2.rs +++ b/storage/indexer/src/db_v2.rs @@ -6,14 +6,11 @@ /// from storage critical path to indexer, the other file will be removed /// and this file will be moved to /ecosystem/indexer-grpc/indexer-grpc-table-info. use crate::{ + db_ops::{read_db, write_db}, metadata::{MetadataKey, MetadataValue}, - schema::{ - column_families, indexer_metadata::IndexerMetadataSchema, table_info::TableInfoSchema, - }, + schema::{indexer_metadata::IndexerMetadataSchema, table_info::TableInfoSchema}, }; -use aptos_config::config::RocksdbConfig; use aptos_logger::info; -use aptos_rocksdb_options::gen_rocksdb_options; use aptos_schemadb::{SchemaBatch, DB}; use aptos_storage_interface::{ db_other_bail as bail, state_view::DbStateView, AptosDbError, DbReader, Result, @@ -39,6 +36,8 @@ use move_core_types::{ use move_resource_viewer::{AnnotatedMoveValue, MoveValueAnnotator}; use std::{ collections::{BTreeMap, HashMap}, + fs, + path::PathBuf, sync::{ atomic::{AtomicU64, Ordering}, Arc, @@ -46,12 +45,11 @@ use std::{ time::Duration, }; -pub const INDEX_ASYNC_V2_DB_NAME: &str = "index_indexer_async_v2_db"; const TABLE_INFO_RETRY_TIME_MILLIS: u64 = 10; #[derive(Debug)] pub struct IndexerAsyncV2 { - db: DB, + pub db: DB, // Next version to be processed next_version: AtomicU64, // It is used in the context of processing write ops and extracting table information. @@ -65,29 +63,18 @@ pub struct IndexerAsyncV2 { } impl IndexerAsyncV2 { - /// Opens up this rocksdb to get ready for read and write when bootstraping the aptosdb - pub fn open( - db_root_path: impl AsRef, - rocksdb_config: RocksdbConfig, - pending_on: DashMap>, - ) -> Result { - let db_path = db_root_path.as_ref().join(INDEX_ASYNC_V2_DB_NAME); - - let db = DB::open( - db_path, - "index_asnync_v2_db", - column_families(), - &gen_rocksdb_options(&rocksdb_config, false), - )?; - - let next_version = db - .get::(&MetadataKey::LatestVersion)? - .map_or(0, |v| v.expect_version()); + pub fn new(db: DB) -> Result { + let next_version = read_db::( + &db, + &MetadataKey::LatestVersion, + ) + .unwrap() + .map_or(0, |v| v.expect_version()); Ok(Self { db, next_version: AtomicU64::new(next_version), - pending_on, + pending_on: DashMap::new(), }) } @@ -151,12 +138,11 @@ impl IndexerAsyncV2 { } pub fn update_next_version(&self, end_version: u64) -> Result<()> { - let batch = SchemaBatch::new(); - batch.put::( - &MetadataKey::LatestVersion, - &MetadataValue::Version(end_version - 1), + write_db::( + &self.db, + MetadataKey::LatestVersion, + MetadataValue::Version(end_version), )?; - self.db.write_schemas(batch)?; self.next_version.store(end_version, Ordering::Relaxed); Ok(()) } @@ -201,7 +187,12 @@ impl IndexerAsyncV2 { } pub fn next_version(&self) -> Version { - self.next_version.load(Ordering::Relaxed) + read_db::( + &self.db, + &MetadataKey::LatestVersion, + ) + .unwrap() + .map_or(0, |v| v.expect_version()) } pub fn get_table_info(&self, handle: TableHandle) -> Result> { @@ -227,6 +218,11 @@ impl IndexerAsyncV2 { pub fn is_indexer_async_v2_pending_on_empty(&self) -> bool { self.pending_on.is_empty() } + + pub fn create_checkpoint(&self, path: &PathBuf) -> Result<()> { + fs::remove_dir_all(path).unwrap_or(()); + self.db.create_checkpoint(path) + } } struct TableInfoParser<'a, R> { diff --git a/storage/indexer/src/lib.rs b/storage/indexer/src/lib.rs index 5eebb36331880..425112933e32d 100644 --- a/storage/indexer/src/lib.rs +++ b/storage/indexer/src/lib.rs @@ -3,9 +3,11 @@ /// TODO(jill): deprecate Indexer once Indexer Async V2 is ready mod db; +pub mod db_ops; pub mod db_v2; -mod metadata; -mod schema; +pub mod metadata; +pub mod schema; +pub mod table_info_reader; use crate::{ db::INDEX_DB_NAME, diff --git a/storage/indexer/src/metadata.rs b/storage/indexer/src/metadata.rs index 116709252727a..f0af9f42b51b6 100644 --- a/storage/indexer/src/metadata.rs +++ b/storage/indexer/src/metadata.rs @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] #[cfg_attr(any(test, feature = "fuzzing"), derive(proptest_derive::Arbitrary))] -pub(crate) enum MetadataValue { +pub enum MetadataValue { Version(Version), } @@ -20,6 +20,6 @@ impl MetadataValue { #[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] #[cfg_attr(any(test, feature = "fuzzing"), derive(proptest_derive::Arbitrary))] -pub(crate) enum MetadataKey { +pub enum MetadataKey { LatestVersion, } diff --git a/storage/indexer/src/schema/mod.rs b/storage/indexer/src/schema/mod.rs index 90ba38374788d..0b52a72474d46 100644 --- a/storage/indexer/src/schema/mod.rs +++ b/storage/indexer/src/schema/mod.rs @@ -6,8 +6,8 @@ //! //! All schemas are `pub(crate)` so not shown in rustdoc, refer to the source code to see details. -pub(crate) mod indexer_metadata; -pub(crate) mod table_info; +pub mod indexer_metadata; +pub mod table_info; use aptos_schemadb::ColumnFamilyName; diff --git a/storage/indexer/src/table_info_reader.rs b/storage/indexer/src/table_info_reader.rs new file mode 100644 index 0000000000000..98a2acca03e1f --- /dev/null +++ b/storage/indexer/src/table_info_reader.rs @@ -0,0 +1,19 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::db_v2::IndexerAsyncV2; +use aptos_storage_interface::Result; +use aptos_types::state_store::table::{TableHandle, TableInfo}; + +/// Table info reader is to create a thin interface for other services to read the db data, +/// this standalone db is officially not part of the AptosDB anymore. +/// For services that need table info mapping, they need to acquire this reader in the FN bootstrapping stage. +pub trait TableInfoReader: Send + Sync { + fn get_table_info(&self, handle: TableHandle) -> Result>; +} + +impl TableInfoReader for IndexerAsyncV2 { + fn get_table_info(&self, handle: TableHandle) -> Result> { + Self::get_table_info_with_retry(self, handle) + } +} diff --git a/storage/schemadb/src/schema.rs b/storage/schemadb/src/schema.rs index 34c26607da22a..3bb96f2076b64 100644 --- a/storage/schemadb/src/schema.rs +++ b/storage/schemadb/src/schema.rs @@ -69,7 +69,7 @@ use std::fmt::Debug; macro_rules! define_schema { ($schema_type:ident, $key_type:ty, $value_type:ty, $cf_name:expr) => { #[derive(Debug)] - pub(crate) struct $schema_type; + pub struct $schema_type; impl $crate::schema::Schema for $schema_type { type Key = $key_type; diff --git a/storage/schemadb/tests/db.rs b/storage/schemadb/tests/db.rs index 201033ef95cdc..1516cd12b4564 100644 --- a/storage/schemadb/tests/db.rs +++ b/storage/schemadb/tests/db.rs @@ -20,7 +20,7 @@ define_schema!(TestSchema1, TestField, TestField, "TestCF1"); define_schema!(TestSchema2, TestField, TestField, "TestCF2"); #[derive(Debug, Eq, PartialEq)] -struct TestField(u32); +pub struct TestField(u32); impl TestField { fn to_bytes(&self) -> Vec { diff --git a/storage/schemadb/tests/iterator.rs b/storage/schemadb/tests/iterator.rs index e500276c147d9..fdbc2d32af6dd 100644 --- a/storage/schemadb/tests/iterator.rs +++ b/storage/schemadb/tests/iterator.rs @@ -15,10 +15,10 @@ use rocksdb::DEFAULT_COLUMN_FAMILY_NAME; define_schema!(TestSchema, TestKey, TestValue, "TestCF"); #[derive(Debug, Eq, PartialEq)] -struct TestKey(u32, u32, u32); +pub struct TestKey(u32, u32, u32); #[derive(Debug, Eq, PartialEq)] -struct TestValue(u32); +pub struct TestValue(u32); impl KeyCodec for TestKey { fn encode_key(&self) -> Result> { diff --git a/storage/storage-interface/src/lib.rs b/storage/storage-interface/src/lib.rs index c6f4f567f2085..cebc7e7c276be 100644 --- a/storage/storage-interface/src/lib.rs +++ b/storage/storage-interface/src/lib.rs @@ -450,16 +450,6 @@ pub trait DbReader: Send + Sync { /// Returns whether the internal indexer DB has been enabled or not fn indexer_enabled(&self) -> bool; - /// Returns whether the internal indexer async v2 DB has been enabled or not - fn indexer_async_v2_enabled(&self) -> bool; - - /// Returns the next version which internal indexer async v2 DB should parse - fn get_indexer_async_v2_next_version(&self) -> Result; - - /// Returns boolean whether indexer async v2 pending on items are empty - /// if so, the whole batches are processed completely, if not, need to retry - fn is_indexer_async_v2_pending_on_empty(&self) -> Result; - /// Returns state storage usage at the end of an epoch. fn get_state_storage_usage(&self, version: Option) -> Result; ); // end delegated @@ -577,28 +567,6 @@ pub trait DbWriter: Send + Sync { ) -> Result<()> { unimplemented!() } - - /// Index table info mapping for the indexer async v2 rocksdb. - /// Called by the table info service when its constantly parsing the table info. - fn index_table_info( - &self, - db_reader: Arc, - first_version: Version, - write_sets: &[&WriteSet], - end_early_if_pending_on_empty: bool, - ) -> Result<()> { - unimplemented!() - } - - /// Clean up pending on items in the indexer async v2 rocksdb. - /// Called by the table info service when all threads finish processing. - fn cleanup_pending_on_items(&self) -> Result<()> { - unimplemented!() - } - - fn update_next_version(&self, end_version: u64) -> Result<()> { - unimplemented!() - } } #[derive(Clone)] From 7a778c7fa1ec63d0e2c7d28dc0fe5550bab854c3 Mon Sep 17 00:00:00 2001 From: jillxuu Date: Thu, 25 Jan 2024 16:57:57 -0800 Subject: [PATCH 2/4] add utils for table info backup and restore and redesign the db read --- Cargo.lock | 39 +++ .../indexer-grpc-table-info/Cargo.toml | 7 + .../src/backup_restore/fs_ops.rs | 257 +++++++++++++++++ .../src/backup_restore/gcs.rs | 270 ++++++++++++++++++ .../src/backup_restore/mod.rs | 36 +++ .../indexer-grpc-table-info/src/lib.rs | 1 + 6 files changed, 610 insertions(+) create mode 100644 ecosystem/indexer-grpc/indexer-grpc-table-info/src/backup_restore/fs_ops.rs create mode 100644 ecosystem/indexer-grpc/indexer-grpc-table-info/src/backup_restore/gcs.rs create mode 100644 ecosystem/indexer-grpc/indexer-grpc-table-info/src/backup_restore/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 74038a2031e81..15d14f550096d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2353,13 +2353,18 @@ dependencies = [ "bytes", "chrono", "fail 0.5.1", + "flate2", "futures", + "google-cloud-storage", "hex", "hyper", "move-resource-viewer", "once_cell", + "rocksdb", "serde", "serde_json", + "tar", + "tempfile", "tokio", "tokio-stream", "tonic 0.10.2", @@ -7742,6 +7747,18 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "31a7a908b8f32538a2143e59a6e4e2508988832d5d4d6f7c156b3cbc762643a5" +[[package]] +name = "filetime" +version = "0.2.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ee447700ac8aa0b2f2bd7bc4462ad686ba06baa6727ac149a2d6277f0d240fd" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall 0.4.1", + "windows-sys 0.52.0", +] + [[package]] name = "findshlibs" version = "0.10.2" @@ -15140,6 +15157,17 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" +[[package]] +name = "tar" +version = "0.4.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b16afcea1f22891c49a00c751c7b63b2233284064f11a200fc624137c51e2ddb" +dependencies = [ + "filetime", + "libc", + "xattr", +] + [[package]] name = "target-lexicon" version = "0.12.13" @@ -17005,6 +17033,17 @@ dependencies = [ "zeroize", ] +[[package]] +name = "xattr" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "914566e6413e7fa959cc394fb30e563ba80f3541fbd40816d4c05a0fc3f2a0f1" +dependencies = [ + "libc", + "linux-raw-sys 0.4.12", + "rustix 0.38.28", +] + [[package]] name = "yaml-rust" version = "0.4.5" diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/Cargo.toml b/ecosystem/indexer-grpc/indexer-grpc-table-info/Cargo.toml index d739b3b042c48..8683ad9d788f3 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/Cargo.toml +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/Cargo.toml @@ -18,13 +18,16 @@ base64 = { workspace = true } bytes = { workspace = true } chrono = { workspace = true } fail = { workspace = true } +flate2 = { workspace = true } futures = { workspace = true } +google-cloud-storage = { workspace = true } hex = { workspace = true } hyper = { workspace = true } move-resource-viewer = { workspace = true } once_cell = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +tar = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } tonic = { workspace = true } @@ -47,5 +50,9 @@ aptos-schemadb = { workspace = true } aptos-storage-interface = { workspace = true } aptos-types = { workspace = true } +[dev-dependencies] +rocksdb = { workspace = true } +tempfile = { workspace = true } + [features] failpoints = ["fail/failpoints"] diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/backup_restore/fs_ops.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/backup_restore/fs_ops.rs new file mode 100644 index 0000000000000..d725de8c9d8d1 --- /dev/null +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/backup_restore/fs_ops.rs @@ -0,0 +1,257 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use flate2::{read::GzDecoder, write::GzEncoder, Compression}; +use std::{ + fs, + fs::File, + io::{BufWriter, Error, Write}, + path::PathBuf, +}; +use tar::{Archive, Builder}; + +pub fn rename_db_folders_and_cleanup( + original_db_path: &PathBuf, + temp_old_db_path: &PathBuf, + restored_db_path: &PathBuf, +) -> Result<(), Error> { + // Rename the original DB path to a temporary old DB path + fs::rename(original_db_path, temp_old_db_path).map_err(|e| { + Error::new( + e.kind(), + format!( + "Failed to rename original DB folder from {:?} to {:?}: {}", + original_db_path, temp_old_db_path, e + ), + ) + })?; + + // Rename the restored DB path to the original DB path + fs::rename(restored_db_path, original_db_path).map_err(|e| { + Error::new( + e.kind(), + format!( + "Failed to rename restored DB folder from {:?} to {:?}: {}", + restored_db_path, original_db_path, e + ), + ) + })?; + + // Remove the temporary old DB folder + fs::remove_dir_all(temp_old_db_path).map_err(|e| { + Error::new( + e.kind(), + format!( + "Failed to remove old DB folder {:?}: {}", + temp_old_db_path, e + ), + ) + })?; + + Ok(()) +} + +/// Creates a tar.gz archive from the db snapshot directory +pub fn create_tar_gz( + dir_path: PathBuf, + backup_file_name: &str, +) -> Result<(PathBuf, String), anyhow::Error> { + let tar_file_name = format!("{}.tar.gz", backup_file_name); + let tar_file_path = dir_path.join(&tar_file_name); + let temp_tar_file_path = dir_path.join(format!("{}.tmp", tar_file_name)); + + let tar_file = File::create(&temp_tar_file_path)?; + let gz_encoder = GzEncoder::new(tar_file, Compression::default()); + let tar_data = BufWriter::new(gz_encoder); + let mut tar_builder = Builder::new(tar_data); + + tar_builder.append_dir_all(".", &dir_path)?; + tar_builder.into_inner()?; + + std::fs::rename(&temp_tar_file_path, &tar_file_path)?; + + Ok((tar_file_path, tar_file_name)) +} + +pub fn write_snapshot_to_file(snapshot: &[u8], target_path: &PathBuf) -> anyhow::Result<()> { + let temp_file_path = target_path.with_extension("tmp"); + let mut temp_file = File::create(&temp_file_path)?; + temp_file.write_all(snapshot)?; + temp_file.sync_all()?; // Ensure all data is written to disk + fs::rename(&temp_file_path, target_path)?; // Atomically move the temp file to the target path + Ok(()) +} + +/// Unpack a tar.gz archive to a specified directory +pub fn unpack_tar_gz(temp_file_path: &PathBuf, target_db_path: &PathBuf) -> anyhow::Result<()> { + let temp_dir_path = target_db_path.with_extension("tmp"); + fs::create_dir(&temp_dir_path)?; + + let file = File::open(temp_file_path)?; + let gz_decoder = GzDecoder::new(file); + let mut archive = Archive::new(gz_decoder); + archive.unpack(&temp_dir_path)?; + + fs::remove_dir_all(target_db_path).unwrap_or(()); + fs::rename(&temp_dir_path, target_db_path)?; // Atomically replace the directory + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use rocksdb::{ + ColumnFamilyDescriptor, DBWithThreadMode, IteratorMode, Options, SingleThreaded, DB, + }; + use std::{ + fs::File, + io::{Read, Write}, + }; + use tempfile::tempdir; + + #[test] + fn test_rename_db_folders_and_cleanup() { + // Create temporary directories to simulate the original, temp old, and restored DB paths + let original_db_dir = tempdir().unwrap(); + let temp_old_db_dir = tempdir().unwrap(); + let restored_db_dir = tempdir().unwrap(); + + // Create a mock file in each directory to simulate DB contents + File::create(original_db_dir.path().join("original_db_file")).unwrap(); + File::create(restored_db_dir.path().join("restored_db_file")).unwrap(); + + // Call the function with the paths + let result = rename_db_folders_and_cleanup( + &original_db_dir.path().to_path_buf(), + &temp_old_db_dir.path().to_path_buf(), + &restored_db_dir.path().to_path_buf(), + ); + + // Check if the function executed successfully + assert!(result.is_ok()); + + // Check if the original DB directory now contains the restored DB file + assert!(original_db_dir.path().join("restored_db_file").exists()); + + // Check if the temp old DB directory has been removed + assert!(!temp_old_db_dir.path().exists()); + } + + #[test] + fn test_create_unpack_tar_gz_and_preserves_content() -> anyhow::Result<()> { + // Create a temporary directory and a file within it + let dir_to_compress = tempdir()?; + let file_path = dir_to_compress.path().join("testfile.txt"); + let test_content = "Sample content"; + let mut file = File::create(file_path)?; + writeln!(file, "{}", test_content)?; + + // Create a tar.gz file from the directory + let (tar_gz_path, _) = create_tar_gz(dir_to_compress.path().to_path_buf(), "testbackup")?; + assert!(tar_gz_path.exists()); + + // Create a new temporary directory to unpack the tar.gz file + let unpack_dir = tempdir()?; + unpack_tar_gz(&tar_gz_path, &unpack_dir.path().to_path_buf())?; + + // Verify the file is correctly unpacked + let unpacked_file_path = unpack_dir.path().join("testfile.txt"); + assert!(unpacked_file_path.exists()); + + // Read content from the unpacked file + let mut unpacked_file = File::open(unpacked_file_path)?; + let mut unpacked_content = String::new(); + unpacked_file.read_to_string(&mut unpacked_content)?; + + // Assert that the original content is equal to the unpacked content + assert_eq!(unpacked_content.trim_end(), test_content); + + Ok(()) + } + + #[tokio::test] + async fn test_pack_unpack_compare_rocksdb() -> anyhow::Result<()> { + // Create a temporary directory for the original RocksDB + let original_db_dir = tempdir()?; + let original_db_path = original_db_dir.path(); + + // Initialize RocksDB with some data + { + let db = DB::open_default(original_db_path)?; + db.put(b"key1", b"value1")?; + db.put(b"key2", b"value2")?; + db.flush()?; + } + + // Pack the original RocksDB into a tar.gz file + let (tar_gz_path, _) = create_tar_gz(original_db_path.to_path_buf(), "testbackup")?; + assert!(tar_gz_path.exists(), "Tar.gz file was not created."); + + // Create a temporary directory for the unpacked RocksDB + let unpacked_db_dir = tempdir()?; + let unpacked_db_path = unpacked_db_dir.path(); + + // Unpack the tar.gz file to the new directory + unpack_tar_gz(&tar_gz_path, &unpacked_db_path.to_path_buf())?; + + // Compare the original and unpacked databases + let comparison_result = compare_rocksdb( + original_db_path.to_str().unwrap(), + unpacked_db_path.to_str().unwrap(), + )?; + assert!( + comparison_result, + "Databases are not the same after packing and unpacking." + ); + + Ok(()) + } + + fn compare_rocksdb(db1_path: &str, db2_path: &str) -> Result { + let db1 = open_db_with_column_families(db1_path)?; + let db2 = open_db_with_column_families(db2_path)?; + + let iter1 = db1.iterator(IteratorMode::Start); // Iterate from the start of db1 + let mut iter2 = db2.iterator(IteratorMode::Start); // Iterate from the start of db2 + + for result1 in iter1 { + let (key1, value1) = result1?; + + match iter2.next() { + Some(result2) => { + let (key2, value2) = result2?; + if key1 != key2 || value1 != value2 { + // If keys or values differ, the databases are not identical + return Ok(false); + } + }, + None => { + // db2 has fewer elements than db1 + return Ok(false); + }, + } + } + + // Check if db2 has more elements than db1 + if iter2.next().is_some() { + return Ok(false); + } + + Ok(true) // Databases are identical + } + + fn open_db_with_column_families( + db_path: &str, + ) -> anyhow::Result> { + let mut db_opts = Options::default(); + db_opts.create_if_missing(false); + + let cfs = DB::list_cf(&db_opts, db_path).map_err(anyhow::Error::new)?; // Convert rocksdb::Error to anyhow::Error + let cf_descriptors = cfs + .into_iter() + .map(|cf| ColumnFamilyDescriptor::new(cf, Options::default())) + .collect::>(); + + DB::open_cf_descriptors(&db_opts, db_path, cf_descriptors).map_err(anyhow::Error::new) + } +} diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/backup_restore/gcs.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/backup_restore/gcs.rs new file mode 100644 index 0000000000000..89239edee8d33 --- /dev/null +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/backup_restore/gcs.rs @@ -0,0 +1,270 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use super::{ + fs_ops::create_tar_gz, generate_blob_name, BackupRestoreMetadata, JSON_FILE_TYPE, + METADATA_FILE_NAME, TAR_FILE_TYPE, +}; +use crate::backup_restore::fs_ops::{unpack_tar_gz, write_snapshot_to_file}; +use anyhow::Context; +use aptos_db_indexer::db_v2::IndexerAsyncV2; +use aptos_logger::{error, info}; +use google_cloud_storage::{ + client::{Client, ClientConfig}, + http::{ + buckets::get::GetBucketRequest, + objects::{ + download::Range, + get::GetObjectRequest, + upload::{Media, UploadObjectRequest, UploadType}, + }, + Error, + }, +}; +use hyper::StatusCode; +use std::{ + borrow::Cow::Borrowed, + env, fs, + path::PathBuf, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + time::Duration, +}; + +pub struct GcsBackupRestoreOperator { + bucket_name: String, + metadata_epoch: AtomicU64, + gcs_client: Client, +} + +impl GcsBackupRestoreOperator { + pub async fn new(bucket_name: String) -> Self { + let gcs_config = ClientConfig::default() + .with_auth() + .await + .expect("Failed to create GCS client."); + let gcs_client = Client::new(gcs_config); + Self { + bucket_name, + metadata_epoch: AtomicU64::new(0), + gcs_client, + } + } +} + +impl GcsBackupRestoreOperator { + pub async fn verify_storage_bucket_existence(&self) { + info!( + bucket_name = self.bucket_name, + "Before gcs backup restore operator starts, verify the bucket exists." + ); + + self.gcs_client + .get_bucket(&GetBucketRequest { + bucket: self.bucket_name.to_string(), + ..Default::default() + }) + .await + .unwrap_or_else(|_| panic!("Failed to get the bucket with name: {}", self.bucket_name)); + } + + pub async fn get_metadata(&self) -> Option { + match self.download_metadata_object().await { + Ok(metadata) => Some(metadata), + Err(Error::HttpClient(err)) => { + if err.status() == Some(StatusCode::NOT_FOUND) { + None + } else { + panic!("Error happens when accessing metadata file. {}", err); + } + }, + Err(e) => { + panic!("Error happens when accessing metadata file. {}", e); + }, + } + } + + pub async fn create_default_metadata_if_absent( + &self, + expected_chain_id: u64, + ) -> anyhow::Result { + match self.download_metadata_object().await { + Ok(metadata) => { + assert!(metadata.chain_id == expected_chain_id, "Chain ID mismatch."); + self.set_metadata_epoch(metadata.epoch); + Ok(metadata) + }, + Err(Error::HttpClient(err)) => { + let is_file_missing = err.status() == Some(StatusCode::NOT_FOUND); + if is_file_missing { + self.update_metadata(expected_chain_id, 0) + .await + .expect("Update metadata failed."); + self.set_metadata_epoch(0); + Ok(BackupRestoreMetadata::new(expected_chain_id, 0)) + } else { + Err(anyhow::Error::msg(format!( + "Metadata not found or gcs operator is not in write mode. {}", + err + ))) + } + }, + Err(err) => Err(anyhow::Error::from(err)), + } + } + + async fn download_metadata_object(&self) -> Result { + self.gcs_client + .download_object( + &GetObjectRequest { + bucket: self.bucket_name.clone(), + object: METADATA_FILE_NAME.to_string(), + ..Default::default() + }, + &Range::default(), + ) + .await + .map(BackupRestoreMetadata::from) + } + + pub async fn update_metadata(&self, chain_id: u64, epoch: u64) -> anyhow::Result<()> { + let metadata = BackupRestoreMetadata::new(chain_id, epoch); + loop { + match self + .gcs_client + .upload_object( + &UploadObjectRequest { + bucket: self.bucket_name.clone(), + ..Default::default() + }, + serde_json::to_vec(&metadata).unwrap(), + &UploadType::Simple(Media { + name: Borrowed(METADATA_FILE_NAME), + content_type: Borrowed(JSON_FILE_TYPE), + content_length: None, + }), + ) + .await + { + Ok(_) => { + return Ok(()); + }, + // https://cloud.google.com/storage/quotas + // add retry logic due to: "Maximum rate of writes to the same object name: One write per second" + Err(Error::Response(err)) if (err.is_retriable() && err.code == 429) => { + info!("Retried with rateLimitExceeded on gcs single object at epoch {} when updating the metadata", epoch); + tokio::time::sleep(Duration::from_millis(500)).await; + continue; + }, + Err(err) => { + anyhow::bail!("Failed to update metadata: {}", err); + }, + } + } + } + + pub async fn backup_db_snapshot( + &self, + chain_id: u64, + epoch: u64, + indexer_async_v2: Arc, + snapshot_path: PathBuf, + ) -> anyhow::Result<()> { + // reading epoch from gcs metadata is too slow, so updating the local var first so that every previous + // new epoch based backup will be observed correctly by the next backup + self.set_metadata_epoch(epoch); + + // rocksdb will create a checkpoint to take a snapshot of full db and then save it to snapshot_path + indexer_async_v2 + .create_checkpoint(&snapshot_path) + .context(format!("DB checkpoint failed at epoch {}", epoch))?; + + // create a gzipped tar file by compressing a folder into a single file + let (tar_file, _tar_file_name) = create_tar_gz(snapshot_path.clone(), &epoch.to_string())?; + let buffer = std::fs::read(&tar_file).context("Failed to read gzipped tar file")?; + + let filename = generate_blob_name(epoch); + + match self + .gcs_client + .upload_object( + &UploadObjectRequest { + bucket: self.bucket_name.clone(), + ..Default::default() + }, + buffer.clone(), + &UploadType::Simple(Media { + name: filename.clone().into(), + content_type: Borrowed(TAR_FILE_TYPE), + content_length: None, + }), + ) + .await + { + Ok(_) => { + self.update_metadata(chain_id, epoch).await?; + + std::fs::remove_file(&tar_file) + .and_then(|_| fs::remove_dir_all(&snapshot_path)) + .expect("Failed to clean up after db snapshot upload"); + }, + Err(err) => { + error!("Failed to upload snapshot: {}", err); + }, + }; + + Ok(()) + } + + /// When fullnode is getting started, it will first restore its table info db by restoring most recent snapshot from gcs buckets. + /// Download the right snapshot based on epoch to a local file and then unzip it and write to the indexer async v2 db. + pub async fn restore_db_snapshot( + &self, + chain_id: u64, + metadata: BackupRestoreMetadata, + db_path: PathBuf, + base_path: PathBuf, + ) -> anyhow::Result<()> { + assert!(metadata.chain_id == chain_id, "Chain ID mismatch."); + + let epoch = metadata.epoch; + let epoch_based_filename = generate_blob_name(epoch); + + match self + .gcs_client + .download_object( + &GetObjectRequest { + bucket: self.bucket_name.clone(), + object: epoch_based_filename.clone(), + ..Default::default() + }, + &Range::default(), + ) + .await + { + Ok(snapshot) => { + let temp_file_name = "snapshot.tar.gz"; + let temp_file_path = base_path.join(temp_file_name); + write_snapshot_to_file(&snapshot, &temp_file_path)?; + + unpack_tar_gz(&temp_file_path, &db_path)?; + fs::remove_file(&temp_file_path).context("Failed to remove temporary file")?; + + self.set_metadata_epoch(epoch); + + Ok(()) + }, + Err(e) => Err(anyhow::Error::new(e)), + } + } + + pub fn set_metadata_epoch(&self, epoch: u64) { + self.metadata_epoch.store(epoch, Ordering::Relaxed) + } + + pub fn get_metadata_epoch(&self) -> u64 { + self.metadata_epoch.load(Ordering::Relaxed) + } +} diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/backup_restore/mod.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/backup_restore/mod.rs new file mode 100644 index 0000000000000..e9aedd83313b0 --- /dev/null +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/backup_restore/mod.rs @@ -0,0 +1,36 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use serde::{Deserialize, Serialize}; + +pub mod fs_ops; +pub mod gcs; + +pub const FILE_FOLDER_NAME: &str = "files"; +pub const METADATA_FILE_NAME: &str = "metadata.json"; +pub const JSON_FILE_TYPE: &str = "application/json"; +pub const TAR_FILE_TYPE: &str = "application/gzip"; + +#[inline] +pub fn generate_blob_name(epoch: u64) -> String { + format!("{}/{}.tar.gz", FILE_FOLDER_NAME, epoch) +} + +#[derive(Serialize, Deserialize, Copy, Clone, Debug)] +pub struct BackupRestoreMetadata { + pub chain_id: u64, + pub epoch: u64, +} + +impl BackupRestoreMetadata { + pub fn new(chain_id: u64, epoch: u64) -> Self { + Self { chain_id, epoch } + } +} + +impl From> for BackupRestoreMetadata { + fn from(bytes: Vec) -> Self { + serde_json::from_slice(bytes.as_slice()) + .expect("Failed to deserialize BackupRestoreMetadata file.") + } +} diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/lib.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/lib.rs index 8baedd375790e..2d311d0902e48 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/lib.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/lib.rs @@ -1,5 +1,6 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 +pub mod backup_restore; pub mod runtime; pub mod table_info_service; From 59085689eb1f615ef2f80fb47be20efab20e0c13 Mon Sep 17 00:00:00 2001 From: jillxuu Date: Thu, 25 Jan 2024 16:57:57 -0800 Subject: [PATCH 3/4] add epoch based db snapshot backup --- .../src/config/indexer_table_info_config.rs | 13 +++- .../indexer-grpc-table-info/README.md | 2 + .../indexer-grpc-table-info/src/runtime.rs | 26 ++++++- .../src/table_info_service.rs | 78 ++++++++++++++++++- 4 files changed, 113 insertions(+), 6 deletions(-) diff --git a/config/src/config/indexer_table_info_config.rs b/config/src/config/indexer_table_info_config.rs index b5dca102d02ab..c7aeb20b01d69 100644 --- a/config/src/config/indexer_table_info_config.rs +++ b/config/src/config/indexer_table_info_config.rs @@ -4,8 +4,9 @@ use serde::{Deserialize, Serialize}; // Useful defaults -pub const DEFAULT_PARSER_TASK_COUNT: u16 = 20; -pub const DEFAULT_PARSER_BATCH_SIZE: u16 = 1000; +pub const DEFAULT_PARSER_TASK_COUNT: u16 = 10; +pub const DEFAULT_PARSER_BATCH_SIZE: u16 = 100; +pub const DEFAULT_BUCKET_NAME: &str = "table-info"; #[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] #[serde(default, deny_unknown_fields)] @@ -20,6 +21,12 @@ pub struct IndexerTableInfoConfig { pub parser_batch_size: u16, pub enable_expensive_logging: bool, + + /// Enable backup service + pub db_backup_enabled: bool, + + /// Backup and restore service config + pub gcs_bucket_name: String, } // Reminder, #[serde(default)] on IndexerTableInfoConfig means that the default values for @@ -32,6 +39,8 @@ impl Default for IndexerTableInfoConfig { parser_task_count: DEFAULT_PARSER_TASK_COUNT, parser_batch_size: DEFAULT_PARSER_BATCH_SIZE, enable_expensive_logging: false, + db_backup_enabled: false, + gcs_bucket_name: DEFAULT_BUCKET_NAME.to_owned(), } } } diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/README.md b/ecosystem/indexer-grpc/indexer-grpc-table-info/README.md index a29b9b7db0ef6..b736e60ec7d4f 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/README.md +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/README.md @@ -15,5 +15,7 @@ Follow instructions on how to run a fullnode against an existing network. enabled: true parser_task_count: 10 parser_batch_size: 1000 + db_backup_enabled: false + gcs_bucket_name: "table-info" * Run fullnode `cargo run -p aptos-node --release -- -f ./fullnode.yaml` diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs index cb5aa9531e880..2d0998e6d4961 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs @@ -1,13 +1,14 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::table_info_service::TableInfoService; +use crate::{backup_restore::gcs::GcsBackupRestoreOperator, table_info_service::TableInfoService}; use aptos_api::context::Context; use aptos_config::config::NodeConfig; use aptos_db_indexer::{db_ops::open_db, db_v2::IndexerAsyncV2}; +use aptos_logger::info; use aptos_mempool::MempoolClientSender; use aptos_storage_interface::DbReaderWriter; -use aptos_types::chain_id::ChainId; +use aptos_types::chain_id::{ChainId, NamedChain}; use std::sync::Arc; use tokio::runtime::Runtime; @@ -38,12 +39,26 @@ pub fn bootstrap( let db = open_db(db_path, &rocksdb_config).expect("Failed to open up indexer async v2 db initially"); + // Set up the gcs bucket + let gcs_bucket_name = node_config.indexer_table_info.gcs_bucket_name.clone(); + let named_chain = match NamedChain::from_chain_id(&chain_id) { + Ok(named_chain) => format!("{}", named_chain).to_lowercase(), + Err(_err) => { + info!("Getting chain name from not named chains"); + chain_id.id().to_string() + }, + }; + let indexer_async_v2 = Arc::new(IndexerAsyncV2::new(db).expect("Failed to initialize indexer async v2")); let indexer_async_v2_clone = Arc::clone(&indexer_async_v2); // Spawn the runtime for table info parsing runtime.spawn(async move { + let backup_restore_operator: Arc = Arc::new( + GcsBackupRestoreOperator::new(format!("{}-{}", gcs_bucket_name.clone(), named_chain)) + .await, + ); let context = Arc::new(Context::new( chain_id, db_rw.reader.clone(), @@ -51,6 +66,12 @@ pub fn bootstrap( node_config.clone(), None, )); + // DB backup is optional + let backup_restore_operator = if node_config.indexer_table_info.db_backup_enabled { + Some(backup_restore_operator) + } else { + None + }; let mut parser = TableInfoService::new( context, @@ -58,6 +79,7 @@ pub fn bootstrap( node_config.indexer_table_info.parser_task_count, node_config.indexer_table_info.parser_batch_size, node_config.indexer_table_info.enable_expensive_logging, + backup_restore_operator, indexer_async_v2_clone, ); diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/table_info_service.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/table_info_service.rs index af965fac7b993..a71b45e528c2e 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/table_info_service.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/table_info_service.rs @@ -1,6 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 +use crate::backup_restore::gcs::GcsBackupRestoreOperator; use anyhow::Error; use aptos_api::context::Context; use aptos_api_types::TransactionOnChainData; @@ -24,6 +25,7 @@ pub struct TableInfoService { pub parser_batch_size: u16, pub context: Arc, pub enable_expensive_logging: bool, + pub backup_restore_operator: Option>, pub indexer_async_v2: Arc, } @@ -34,6 +36,7 @@ impl TableInfoService { parser_task_count: u16, parser_batch_size: u16, enable_expensive_logging: bool, + backup_restore_operator: Option>, indexer_async_v2: Arc, ) -> Self { Self { @@ -42,6 +45,7 @@ impl TableInfoService { parser_batch_size, context, enable_expensive_logging, + backup_restore_operator, indexer_async_v2, } } @@ -52,6 +56,7 @@ impl TableInfoService { /// 4. write parsed table info to rocksdb /// 5. after all batches from the loop complete, if pending on items not empty, move on to 6, otherwise, start from 1 again /// 6. retry all the txns in the loop sequentially to clean up the pending on items + /// 7. try to backup rocksdb snapshot if new epoch have been found pub async fn run(&mut self) { loop { let start_time = std::time::Instant::now(); @@ -62,11 +67,28 @@ impl TableInfoService { .await; let max_version = self.get_max_batch_version(results).unwrap_or_default(); let versions_processed = max_version - self.current_version + 1; + let context = self.context.clone(); + let backup_restore_operator = self.backup_restore_operator.clone(); + let start_version = self.current_version; + let indexer_async_v2 = self.indexer_async_v2.clone(); + + // Try uploading the rocksdb snapshot by taking a full db checkpoint and save it to gcs if found new epoch + // running backup logic in a separate thread to not let it block the main thread to parse table info, since + // gcs operation could be slow + tokio::spawn(async move { + Self::try_backup_db_snapshot( + context.clone(), + max_version, + indexer_async_v2.clone(), + backup_restore_operator.clone(), + ) + .await; + }); log_grpc_step( SERVICE_TYPE, IndexerGrpcStep::TableInfoProcessed, - Some(self.current_version as i64), + Some(start_version as i64), Some(max_version as i64), None, None, @@ -192,6 +214,7 @@ impl TableInfoService { indexer_async_v2, end_early_if_pending_on_empty, ) + .await .expect("[Table Info] Failed to parse table info"); log_grpc_step( @@ -259,7 +282,7 @@ impl TableInfoService { /// Parse table info from write sets, /// end_early_if_pending_on_empty flag will be true if we couldn't parse all table infos in the first try with multithread, /// in the second try with sequential looping, to make parsing efficient, we end early if all table infos are parsed - fn parse_table_info( + async fn parse_table_info( context: Arc, raw_txns: Vec, indexer_async_v2: Arc, @@ -294,6 +317,57 @@ impl TableInfoService { Ok(()) } + /// Tries to upload a snapshot of the database if the backup service is enabled. + /// This function is called to periodically back up the database state to Google Cloud Storage (GCS). + /// It checks the latest epoch of data already backed up in GCS and compares it with the current epoch. + async fn try_backup_db_snapshot( + context: Arc, + last_version: u64, + indexer_async_v2: Arc, + backup_restore_operator: Option>, + ) { + // only try backup db if backup service is enabled + if backup_restore_operator.is_some() { + let metadata_epoch = backup_restore_operator + .clone() + .unwrap() + .get_metadata_epoch(); + let (_, _, block_event) = context + .db + .get_block_info_by_version(last_version) + .unwrap_or_else(|_| { + panic!("Could not get block_info for last version {}", last_version,) + }); + let block_event_epoch = block_event.epoch(); + // If gcs most recent transaction version in metadata is behind, take a snapshot of rocksdb and upload + if metadata_epoch < block_event_epoch { + let start_time = std::time::Instant::now(); + // temporary path to store the snapshot + let snapshot_dir = context + .node_config + .get_data_dir() + .join(block_event_epoch.to_string()); + let ledger_chain_id = context.chain_id().id(); + backup_restore_operator + .unwrap() + .backup_db_snapshot( + ledger_chain_id as u64, + block_event_epoch, + indexer_async_v2, + snapshot_dir.clone(), + ) + .await + .expect("Failed to upload snapshot in table info service"); + + info!( + backup_epoch = block_event_epoch, + backup_millis = start_time.elapsed().as_millis(), + "[Table Info] Table info db backed up successfully" + ); + } + } + } + /// TODO(jill): consolidate it with `ensure_highest_known_version` /// Will keep looping and checking the latest ledger info to see if there are new transactions /// If there are, it will update the ledger version version From 59bdfd8cf12b479e140756314101edcc1471339d Mon Sep 17 00:00:00 2001 From: jillxuu Date: Fri, 26 Jan 2024 11:59:54 -0800 Subject: [PATCH 4/4] add db snapshot restore logic --- .../src/config/indexer_table_info_config.rs | 10 +- .../indexer-grpc-table-info/README.md | 1 + .../indexer-grpc-table-info/src/runtime.rs | 171 +++++++++++++++++- storage/indexer/src/db_ops.rs | 4 +- storage/indexer/src/db_v2.rs | 33 +++- storage/indexer/src/lib.rs | 3 +- storage/indexer/src/metadata_v2.rs | 35 ++++ .../src/schema/indexer_metadata_v2/mod.rs | 44 +++++ .../src/schema/indexer_metadata_v2/test.rs | 18 ++ storage/indexer/src/schema/mod.rs | 11 ++ 10 files changed, 318 insertions(+), 12 deletions(-) create mode 100644 storage/indexer/src/metadata_v2.rs create mode 100644 storage/indexer/src/schema/indexer_metadata_v2/mod.rs create mode 100644 storage/indexer/src/schema/indexer_metadata_v2/test.rs diff --git a/config/src/config/indexer_table_info_config.rs b/config/src/config/indexer_table_info_config.rs index c7aeb20b01d69..bf18d09548f31 100644 --- a/config/src/config/indexer_table_info_config.rs +++ b/config/src/config/indexer_table_info_config.rs @@ -4,9 +4,10 @@ use serde::{Deserialize, Serialize}; // Useful defaults -pub const DEFAULT_PARSER_TASK_COUNT: u16 = 10; -pub const DEFAULT_PARSER_BATCH_SIZE: u16 = 100; +pub const DEFAULT_PARSER_TASK_COUNT: u16 = 20; +pub const DEFAULT_PARSER_BATCH_SIZE: u16 = 1000; pub const DEFAULT_BUCKET_NAME: &str = "table-info"; +pub const DEFAULT_VERSION_DIFF: u64 = 100_000; #[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] #[serde(default, deny_unknown_fields)] @@ -27,6 +28,10 @@ pub struct IndexerTableInfoConfig { /// Backup and restore service config pub gcs_bucket_name: String, + + /// if version difference btw this FN and latest ledger version is less than VERSION_DIFF + /// do not restore, start from the FN version, to avoid time consuming db restore from gcs + pub version_diff: u64, } // Reminder, #[serde(default)] on IndexerTableInfoConfig means that the default values for @@ -41,6 +46,7 @@ impl Default for IndexerTableInfoConfig { enable_expensive_logging: false, db_backup_enabled: false, gcs_bucket_name: DEFAULT_BUCKET_NAME.to_owned(), + version_diff: DEFAULT_VERSION_DIFF, } } } diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/README.md b/ecosystem/indexer-grpc/indexer-grpc-table-info/README.md index b736e60ec7d4f..12a1911f61f94 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/README.md +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/README.md @@ -17,5 +17,6 @@ Follow instructions on how to run a fullnode against an existing network. parser_batch_size: 1000 db_backup_enabled: false gcs_bucket_name: "table-info" + version_diff: 100000 * Run fullnode `cargo run -p aptos-node --release -- -f ./fullnode.yaml` diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs index 2d0998e6d4961..a28d7bfa07347 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs @@ -1,18 +1,35 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::{backup_restore::gcs::GcsBackupRestoreOperator, table_info_service::TableInfoService}; +use crate::{ + backup_restore::{fs_ops::rename_db_folders_and_cleanup, gcs::GcsBackupRestoreOperator}, + table_info_service::TableInfoService, +}; +use anyhow::Error; use aptos_api::context::Context; -use aptos_config::config::NodeConfig; -use aptos_db_indexer::{db_ops::open_db, db_v2::IndexerAsyncV2}; +use aptos_config::config::{NodeConfig, RocksdbConfig}; +use aptos_db_indexer::{ + db_ops::{close_db, open_db, read_db, write_db}, + db_v2::IndexerAsyncV2, + metadata_v2::{MetadataKey, MetadataValue}, + schema::indexer_metadata_v2::IndexerMetadataSchema, +}; use aptos_logger::info; use aptos_mempool::MempoolClientSender; +use aptos_schemadb::DB; use aptos_storage_interface::DbReaderWriter; use aptos_types::chain_id::{ChainId, NamedChain}; -use std::sync::Arc; +use std::{ + sync::Arc, + thread::sleep, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; use tokio::runtime::Runtime; const INDEX_ASYNC_V2_DB_NAME: &str = "index_indexer_async_v2_db"; +/// if last restore timestamp is less than RESTORE_TIME_DIFF_SECS, do not restore to avoid gcs download spam +const RESTORE_TIME_DIFF_SECS: u64 = 600; +const DB_OPERATION_INTERVAL_MS: u64 = 500; /// Creates a runtime which creates a thread pool which sets up fullnode indexer table info service /// Returns corresponding Tokio runtime @@ -49,6 +66,16 @@ pub fn bootstrap( }, }; + // Set up the gcs bucket + let gcs_bucket_name = node_config.indexer_table_info.gcs_bucket_name.clone(); + let named_chain = match NamedChain::from_chain_id(&chain_id) { + Ok(named_chain) => format!("{}", named_chain).to_lowercase(), + Err(_err) => { + info!("Getting chain name from not named chains"); + chain_id.id().to_string() + }, + }; + let indexer_async_v2 = Arc::new(IndexerAsyncV2::new(db).expect("Failed to initialize indexer async v2")); let indexer_async_v2_clone = Arc::clone(&indexer_async_v2); @@ -75,7 +102,7 @@ pub fn bootstrap( let mut parser = TableInfoService::new( context, - indexer_async_v2_clone.next_version(), + next_version, node_config.indexer_table_info.parser_task_count, node_config.indexer_table_info.parser_batch_size, node_config.indexer_table_info.enable_expensive_logging, @@ -88,3 +115,137 @@ pub fn bootstrap( Some((runtime, indexer_async_v2)) } + +/// This function handles the conditional restoration of the database from a GCS snapshot. +/// It checks if the database needs to be restored based on metadata file existence +/// and metadata epoch and the time since the last restore and the version differences. +/// If a restore is needed, it: +/// 1. close the db +/// 2. performs the restore to a different folder +/// 3. rename the folder to atomically move restored db snapshot to the right db path +/// 4. re-open the db +/// 5. update the last restore timestamp in the restored db +/// If a restore is not needed, it: +/// 1. returns the original db +async fn handle_db_restore( + node_config: &NodeConfig, + chain_id: ChainId, + db: DB, + db_rw: DbReaderWriter, + version_diff: u64, + rocksdb_config: RocksdbConfig, +) -> Result { + let binding = node_config.storage.get_dir_paths(); + let db_root_path = binding.default_root_path(); + let db_path = db_root_path.join(INDEX_ASYNC_V2_DB_NAME); + // Set up backup and restore config + let gcs_bucket_name = node_config.indexer_table_info.gcs_bucket_name.clone(); + let named_chain = match NamedChain::from_chain_id(&chain_id) { + Ok(named_chain) => format!("{}", named_chain).to_lowercase(), + Err(_err) => { + info!("Getting chain name from not named chains"); + chain_id.id().to_string() + }, + }; + + let backup_restore_operator: Arc = Arc::new( + GcsBackupRestoreOperator::new(format!("{}-{}", gcs_bucket_name.clone(), &named_chain)) + .await, + ); + + // If there's no metadata json file in gcs, we will create a default one with epoch 0 and return early since there's no snapshot to restore from, and early return. + // If metadata epoch is 0, early return. + let metadata = backup_restore_operator.get_metadata().await; + if metadata.is_none() { + backup_restore_operator + .create_default_metadata_if_absent(chain_id.id() as u64) + .await + .expect("Failed to create default metadata"); + return Ok(db); + } else if metadata.unwrap().epoch == 0 { + return Ok(db); + } + + // Check the time duration since the last restore + let last_restored_timestamp = read_db::( + &db, + &MetadataKey::RestoreTimestamp, + ) + .unwrap() + .map_or(0, |v| v.last_restored_timestamp()); + // Current timestamp will be used to compare duration from last restored timestamp, and to save db if restore is performed + let current_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + assert!( + current_timestamp >= last_restored_timestamp, + "Last restored timestamp from db should be less or equal to the current timestamp" + ); + let should_restore_based_on_time_duration = + current_timestamp - last_restored_timestamp > RESTORE_TIME_DIFF_SECS; + + // Check the version difference + let latest_committed_version = db_rw.reader.get_latest_version().unwrap(); + let next_version = read_db::( + &db, + &MetadataKey::LatestVersion, + ) + .unwrap() + .map_or(0, |v| v.expect_version()); + let should_restore_based_on_version = latest_committed_version > next_version + && latest_committed_version - next_version > version_diff; + + if should_restore_based_on_time_duration && should_restore_based_on_version { + // after reading db metadata info and deciding to restore, drop the db so that we could re-open it later + close_db(db); + + sleep(Duration::from_millis(DB_OPERATION_INTERVAL_MS)); + + backup_restore_operator + .verify_storage_bucket_existence() + .await; + + // a different path to restore backup db snapshot to, to avoid db corruption + let restore_db_path = node_config + .storage + .get_dir_paths() + .default_root_path() + .join("restore"); + backup_restore_operator + .restore_db_snapshot( + chain_id.id() as u64, + metadata.unwrap(), + restore_db_path.clone(), + node_config.get_data_dir().to_path_buf(), + ) + .await + .expect("Failed to restore snapshot"); + + // Restore to a different folder and replace the target folder atomically + let tmp_db_path = db_root_path.join("tmp"); + rename_db_folders_and_cleanup(&db_path, &tmp_db_path, &restore_db_path) + .expect("Failed to operate atomic restore in file system."); + + sleep(Duration::from_millis(DB_OPERATION_INTERVAL_MS)); + + let db = open_db(&db_path, &rocksdb_config).expect("Failed to reopen db after restore"); + write_db::( + &db, + MetadataKey::RestoreTimestamp, + MetadataValue::Timestamp(current_timestamp), + ) + .expect("Failed to write restore timestamp to indexer async v2"); + + info!( + should_restore_based_on_time_duration = should_restore_based_on_time_duration, + should_restore_based_on_version = should_restore_based_on_version, + latest_committed_version = latest_committed_version, + db_next_version = next_version, + last_restored_timestamp = last_restored_timestamp, + "[Table Info] Table info restored successfully" + ); + return Ok(db); + } + Ok(db) +} diff --git a/storage/indexer/src/db_ops.rs b/storage/indexer/src/db_ops.rs index 65d1ae2b4ef73..b7318b1ad3d19 100644 --- a/storage/indexer/src/db_ops.rs +++ b/storage/indexer/src/db_ops.rs @@ -1,7 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::schema::column_families; +use crate::schema::column_families_v2; use anyhow::Result; use aptos_config::config::RocksdbConfig; use aptos_rocksdb_options::gen_rocksdb_options; @@ -15,7 +15,7 @@ pub fn open_db>(db_path: P, rocksdb_config: &RocksdbConfig) -> Re Ok(DB::open( db_path, "index_asnync_v2_db", - column_families(), + column_families_v2(), &gen_rocksdb_options(rocksdb_config, false), )?) } diff --git a/storage/indexer/src/db_v2.rs b/storage/indexer/src/db_v2.rs index f7d7880e7db99..773f27dd24730 100644 --- a/storage/indexer/src/db_v2.rs +++ b/storage/indexer/src/db_v2.rs @@ -7,8 +7,8 @@ /// and this file will be moved to /ecosystem/indexer-grpc/indexer-grpc-table-info. use crate::{ db_ops::{read_db, write_db}, - metadata::{MetadataKey, MetadataValue}, - schema::{indexer_metadata::IndexerMetadataSchema, table_info::TableInfoSchema}, + metadata_v2::{MetadataKey, MetadataValue}, + schema::{indexer_metadata_v2::IndexerMetadataSchema, table_info::TableInfoSchema}, }; use aptos_logger::info; use aptos_schemadb::{SchemaBatch, DB}; @@ -52,6 +52,8 @@ pub struct IndexerAsyncV2 { pub db: DB, // Next version to be processed next_version: AtomicU64, + // DB snapshot most recently restored from gcs timestamp + restore_timestamp: AtomicU64, // It is used in the context of processing write ops and extracting table information. // As the code iterates through the write ops, it checks if the state key corresponds to a table item. // If it does, the associated bytes are added to the pending_on map under the corresponding table handle. @@ -70,10 +72,17 @@ impl IndexerAsyncV2 { ) .unwrap() .map_or(0, |v| v.expect_version()); + let last_restored_timestamp = read_db::( + &db, + &MetadataKey::RestoreTimestamp, + ) + .unwrap() + .map_or(0, |v| v.last_restored_timestamp()); Ok(Self { db, next_version: AtomicU64::new(next_version), + restore_timestamp: AtomicU64::new(last_restored_timestamp), pending_on: DashMap::new(), }) } @@ -147,6 +156,17 @@ impl IndexerAsyncV2 { Ok(()) } + pub fn update_last_restored_timestamp(&self, restore_timestamp: u64) -> Result<()> { + write_db::( + &self.db, + MetadataKey::RestoreTimestamp, + MetadataValue::Timestamp(restore_timestamp), + )?; + self.restore_timestamp + .store(restore_timestamp, Ordering::Relaxed); + Ok(()) + } + /// Finishes the parsing process and writes the parsed table information to a SchemaBatch. pub fn finish_table_info_parsing( &self, @@ -195,6 +215,15 @@ impl IndexerAsyncV2 { .map_or(0, |v| v.expect_version()) } + pub fn restore_timestamp(&self) -> u64 { + read_db::( + &self.db, + &MetadataKey::RestoreTimestamp, + ) + .unwrap() + .map_or(0, |v| v.last_restored_timestamp()) + } + pub fn get_table_info(&self, handle: TableHandle) -> Result> { self.db.get::(&handle).map_err(Into::into) } diff --git a/storage/indexer/src/lib.rs b/storage/indexer/src/lib.rs index 425112933e32d..a7f87dced54ac 100644 --- a/storage/indexer/src/lib.rs +++ b/storage/indexer/src/lib.rs @@ -5,7 +5,8 @@ mod db; pub mod db_ops; pub mod db_v2; -pub mod metadata; +mod metadata; +pub mod metadata_v2; pub mod schema; pub mod table_info_reader; diff --git a/storage/indexer/src/metadata_v2.rs b/storage/indexer/src/metadata_v2.rs new file mode 100644 index 0000000000000..257fa02b7ad8f --- /dev/null +++ b/storage/indexer/src/metadata_v2.rs @@ -0,0 +1,35 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_types::transaction::Version; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] +#[cfg_attr(any(test, feature = "fuzzing"), derive(proptest_derive::Arbitrary))] +pub enum MetadataValue { + Version(Version), + Timestamp(u64), +} + +impl MetadataValue { + pub fn expect_version(self) -> Version { + match self { + Self::Version(v) => v, + _ => panic!("Expected MetadataValue::Version"), + } + } + + pub fn last_restored_timestamp(self) -> u64 { + match self { + Self::Timestamp(t) => t, + _ => panic!("Expected MetadataValue::Timestamp"), + } + } +} + +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] +#[cfg_attr(any(test, feature = "fuzzing"), derive(proptest_derive::Arbitrary))] +pub enum MetadataKey { + LatestVersion, + RestoreTimestamp, +} diff --git a/storage/indexer/src/schema/indexer_metadata_v2/mod.rs b/storage/indexer/src/schema/indexer_metadata_v2/mod.rs new file mode 100644 index 0000000000000..dc8890feefee5 --- /dev/null +++ b/storage/indexer/src/schema/indexer_metadata_v2/mod.rs @@ -0,0 +1,44 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +//! This module defines physical storage schema storing metadata for the internal indexer +//! +use crate::{ + metadata_v2::{MetadataKey, MetadataValue}, + schema::INDEXER_METADATA_V2_CF_NAME, +}; +use anyhow::Result; +use aptos_schemadb::{ + define_schema, + schema::{KeyCodec, ValueCodec}, +}; + +define_schema!( + IndexerMetadataSchema, + MetadataKey, + MetadataValue, + INDEXER_METADATA_V2_CF_NAME +); + +impl KeyCodec for MetadataKey { + fn encode_key(&self) -> Result> { + Ok(bcs::to_bytes(self)?) + } + + fn decode_key(data: &[u8]) -> Result { + Ok(bcs::from_bytes(data)?) + } +} + +impl ValueCodec for MetadataValue { + fn encode_value(&self) -> Result> { + Ok(bcs::to_bytes(self)?) + } + + fn decode_value(data: &[u8]) -> Result { + Ok(bcs::from_bytes(data)?) + } +} + +#[cfg(test)] +mod test; diff --git a/storage/indexer/src/schema/indexer_metadata_v2/test.rs b/storage/indexer/src/schema/indexer_metadata_v2/test.rs new file mode 100644 index 0000000000000..a1117d2b17ea3 --- /dev/null +++ b/storage/indexer/src/schema/indexer_metadata_v2/test.rs @@ -0,0 +1,18 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use super::*; +use aptos_schemadb::{schema::fuzzing::assert_encode_decode, test_no_panic_decoding}; +use proptest::prelude::*; + +proptest! { + #[test] + fn test_encode_decode( + tag in any::(), + metadata in any::(), + ) { + assert_encode_decode::(&tag, &metadata); + } +} + +test_no_panic_decoding!(IndexerMetadataSchema); diff --git a/storage/indexer/src/schema/mod.rs b/storage/indexer/src/schema/mod.rs index 0b52a72474d46..c9bbc648fbe8c 100644 --- a/storage/indexer/src/schema/mod.rs +++ b/storage/indexer/src/schema/mod.rs @@ -7,12 +7,15 @@ //! All schemas are `pub(crate)` so not shown in rustdoc, refer to the source code to see details. pub mod indexer_metadata; +pub mod indexer_metadata_v2; pub mod table_info; use aptos_schemadb::ColumnFamilyName; pub const DEFAULT_COLUMN_FAMILY_NAME: ColumnFamilyName = "default"; +/// TODO(jill): to be deleted once INDEXER_METADATA_V2_CF_NAME is deployed pub const INDEXER_METADATA_CF_NAME: ColumnFamilyName = "indexer_metadata"; +pub const INDEXER_METADATA_V2_CF_NAME: ColumnFamilyName = "indexer_metadata_v2"; pub const TABLE_INFO_CF_NAME: ColumnFamilyName = "table_info"; pub fn column_families() -> Vec { @@ -22,3 +25,11 @@ pub fn column_families() -> Vec { TABLE_INFO_CF_NAME, ] } + +pub fn column_families_v2() -> Vec { + vec![ + /* empty cf */ DEFAULT_COLUMN_FAMILY_NAME, + INDEXER_METADATA_V2_CF_NAME, + TABLE_INFO_CF_NAME, + ] +}