From 59bdfd8cf12b479e140756314101edcc1471339d Mon Sep 17 00:00:00 2001 From: jillxuu Date: Fri, 26 Jan 2024 11:59:54 -0800 Subject: [PATCH] add db snapshot restore logic --- .../src/config/indexer_table_info_config.rs | 10 +- .../indexer-grpc-table-info/README.md | 1 + .../indexer-grpc-table-info/src/runtime.rs | 171 +++++++++++++++++- storage/indexer/src/db_ops.rs | 4 +- storage/indexer/src/db_v2.rs | 33 +++- storage/indexer/src/lib.rs | 3 +- storage/indexer/src/metadata_v2.rs | 35 ++++ .../src/schema/indexer_metadata_v2/mod.rs | 44 +++++ .../src/schema/indexer_metadata_v2/test.rs | 18 ++ storage/indexer/src/schema/mod.rs | 11 ++ 10 files changed, 318 insertions(+), 12 deletions(-) create mode 100644 storage/indexer/src/metadata_v2.rs create mode 100644 storage/indexer/src/schema/indexer_metadata_v2/mod.rs create mode 100644 storage/indexer/src/schema/indexer_metadata_v2/test.rs diff --git a/config/src/config/indexer_table_info_config.rs b/config/src/config/indexer_table_info_config.rs index c7aeb20b01d69..bf18d09548f31 100644 --- a/config/src/config/indexer_table_info_config.rs +++ b/config/src/config/indexer_table_info_config.rs @@ -4,9 +4,10 @@ use serde::{Deserialize, Serialize}; // Useful defaults -pub const DEFAULT_PARSER_TASK_COUNT: u16 = 10; -pub const DEFAULT_PARSER_BATCH_SIZE: u16 = 100; +pub const DEFAULT_PARSER_TASK_COUNT: u16 = 20; +pub const DEFAULT_PARSER_BATCH_SIZE: u16 = 1000; pub const DEFAULT_BUCKET_NAME: &str = "table-info"; +pub const DEFAULT_VERSION_DIFF: u64 = 100_000; #[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] #[serde(default, deny_unknown_fields)] @@ -27,6 +28,10 @@ pub struct IndexerTableInfoConfig { /// Backup and restore service config pub gcs_bucket_name: String, + + /// if version difference btw this FN and latest ledger version is less than VERSION_DIFF + /// do not restore, start from the FN version, to avoid time consuming db restore from gcs + pub version_diff: u64, } // Reminder, #[serde(default)] on IndexerTableInfoConfig means that the default values for @@ -41,6 +46,7 @@ impl Default for IndexerTableInfoConfig { enable_expensive_logging: false, db_backup_enabled: false, gcs_bucket_name: DEFAULT_BUCKET_NAME.to_owned(), + version_diff: DEFAULT_VERSION_DIFF, } } } diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/README.md b/ecosystem/indexer-grpc/indexer-grpc-table-info/README.md index b736e60ec7d4f..12a1911f61f94 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/README.md +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/README.md @@ -17,5 +17,6 @@ Follow instructions on how to run a fullnode against an existing network. parser_batch_size: 1000 db_backup_enabled: false gcs_bucket_name: "table-info" + version_diff: 100000 * Run fullnode `cargo run -p aptos-node --release -- -f ./fullnode.yaml` diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs index 2d0998e6d4961..a28d7bfa07347 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs @@ -1,18 +1,35 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::{backup_restore::gcs::GcsBackupRestoreOperator, table_info_service::TableInfoService}; +use crate::{ + backup_restore::{fs_ops::rename_db_folders_and_cleanup, gcs::GcsBackupRestoreOperator}, + table_info_service::TableInfoService, +}; +use anyhow::Error; use aptos_api::context::Context; -use aptos_config::config::NodeConfig; -use aptos_db_indexer::{db_ops::open_db, db_v2::IndexerAsyncV2}; +use aptos_config::config::{NodeConfig, RocksdbConfig}; +use aptos_db_indexer::{ + db_ops::{close_db, open_db, read_db, write_db}, + db_v2::IndexerAsyncV2, + metadata_v2::{MetadataKey, MetadataValue}, + schema::indexer_metadata_v2::IndexerMetadataSchema, +}; use aptos_logger::info; use aptos_mempool::MempoolClientSender; +use aptos_schemadb::DB; use aptos_storage_interface::DbReaderWriter; use aptos_types::chain_id::{ChainId, NamedChain}; -use std::sync::Arc; +use std::{ + sync::Arc, + thread::sleep, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; use tokio::runtime::Runtime; const INDEX_ASYNC_V2_DB_NAME: &str = "index_indexer_async_v2_db"; +/// if last restore timestamp is less than RESTORE_TIME_DIFF_SECS, do not restore to avoid gcs download spam +const RESTORE_TIME_DIFF_SECS: u64 = 600; +const DB_OPERATION_INTERVAL_MS: u64 = 500; /// Creates a runtime which creates a thread pool which sets up fullnode indexer table info service /// Returns corresponding Tokio runtime @@ -49,6 +66,16 @@ pub fn bootstrap( }, }; + // Set up the gcs bucket + let gcs_bucket_name = node_config.indexer_table_info.gcs_bucket_name.clone(); + let named_chain = match NamedChain::from_chain_id(&chain_id) { + Ok(named_chain) => format!("{}", named_chain).to_lowercase(), + Err(_err) => { + info!("Getting chain name from not named chains"); + chain_id.id().to_string() + }, + }; + let indexer_async_v2 = Arc::new(IndexerAsyncV2::new(db).expect("Failed to initialize indexer async v2")); let indexer_async_v2_clone = Arc::clone(&indexer_async_v2); @@ -75,7 +102,7 @@ pub fn bootstrap( let mut parser = TableInfoService::new( context, - indexer_async_v2_clone.next_version(), + next_version, node_config.indexer_table_info.parser_task_count, node_config.indexer_table_info.parser_batch_size, node_config.indexer_table_info.enable_expensive_logging, @@ -88,3 +115,137 @@ pub fn bootstrap( Some((runtime, indexer_async_v2)) } + +/// This function handles the conditional restoration of the database from a GCS snapshot. +/// It checks if the database needs to be restored based on metadata file existence +/// and metadata epoch and the time since the last restore and the version differences. +/// If a restore is needed, it: +/// 1. close the db +/// 2. performs the restore to a different folder +/// 3. rename the folder to atomically move restored db snapshot to the right db path +/// 4. re-open the db +/// 5. update the last restore timestamp in the restored db +/// If a restore is not needed, it: +/// 1. returns the original db +async fn handle_db_restore( + node_config: &NodeConfig, + chain_id: ChainId, + db: DB, + db_rw: DbReaderWriter, + version_diff: u64, + rocksdb_config: RocksdbConfig, +) -> Result { + let binding = node_config.storage.get_dir_paths(); + let db_root_path = binding.default_root_path(); + let db_path = db_root_path.join(INDEX_ASYNC_V2_DB_NAME); + // Set up backup and restore config + let gcs_bucket_name = node_config.indexer_table_info.gcs_bucket_name.clone(); + let named_chain = match NamedChain::from_chain_id(&chain_id) { + Ok(named_chain) => format!("{}", named_chain).to_lowercase(), + Err(_err) => { + info!("Getting chain name from not named chains"); + chain_id.id().to_string() + }, + }; + + let backup_restore_operator: Arc = Arc::new( + GcsBackupRestoreOperator::new(format!("{}-{}", gcs_bucket_name.clone(), &named_chain)) + .await, + ); + + // If there's no metadata json file in gcs, we will create a default one with epoch 0 and return early since there's no snapshot to restore from, and early return. + // If metadata epoch is 0, early return. + let metadata = backup_restore_operator.get_metadata().await; + if metadata.is_none() { + backup_restore_operator + .create_default_metadata_if_absent(chain_id.id() as u64) + .await + .expect("Failed to create default metadata"); + return Ok(db); + } else if metadata.unwrap().epoch == 0 { + return Ok(db); + } + + // Check the time duration since the last restore + let last_restored_timestamp = read_db::( + &db, + &MetadataKey::RestoreTimestamp, + ) + .unwrap() + .map_or(0, |v| v.last_restored_timestamp()); + // Current timestamp will be used to compare duration from last restored timestamp, and to save db if restore is performed + let current_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + assert!( + current_timestamp >= last_restored_timestamp, + "Last restored timestamp from db should be less or equal to the current timestamp" + ); + let should_restore_based_on_time_duration = + current_timestamp - last_restored_timestamp > RESTORE_TIME_DIFF_SECS; + + // Check the version difference + let latest_committed_version = db_rw.reader.get_latest_version().unwrap(); + let next_version = read_db::( + &db, + &MetadataKey::LatestVersion, + ) + .unwrap() + .map_or(0, |v| v.expect_version()); + let should_restore_based_on_version = latest_committed_version > next_version + && latest_committed_version - next_version > version_diff; + + if should_restore_based_on_time_duration && should_restore_based_on_version { + // after reading db metadata info and deciding to restore, drop the db so that we could re-open it later + close_db(db); + + sleep(Duration::from_millis(DB_OPERATION_INTERVAL_MS)); + + backup_restore_operator + .verify_storage_bucket_existence() + .await; + + // a different path to restore backup db snapshot to, to avoid db corruption + let restore_db_path = node_config + .storage + .get_dir_paths() + .default_root_path() + .join("restore"); + backup_restore_operator + .restore_db_snapshot( + chain_id.id() as u64, + metadata.unwrap(), + restore_db_path.clone(), + node_config.get_data_dir().to_path_buf(), + ) + .await + .expect("Failed to restore snapshot"); + + // Restore to a different folder and replace the target folder atomically + let tmp_db_path = db_root_path.join("tmp"); + rename_db_folders_and_cleanup(&db_path, &tmp_db_path, &restore_db_path) + .expect("Failed to operate atomic restore in file system."); + + sleep(Duration::from_millis(DB_OPERATION_INTERVAL_MS)); + + let db = open_db(&db_path, &rocksdb_config).expect("Failed to reopen db after restore"); + write_db::( + &db, + MetadataKey::RestoreTimestamp, + MetadataValue::Timestamp(current_timestamp), + ) + .expect("Failed to write restore timestamp to indexer async v2"); + + info!( + should_restore_based_on_time_duration = should_restore_based_on_time_duration, + should_restore_based_on_version = should_restore_based_on_version, + latest_committed_version = latest_committed_version, + db_next_version = next_version, + last_restored_timestamp = last_restored_timestamp, + "[Table Info] Table info restored successfully" + ); + return Ok(db); + } + Ok(db) +} diff --git a/storage/indexer/src/db_ops.rs b/storage/indexer/src/db_ops.rs index 65d1ae2b4ef73..b7318b1ad3d19 100644 --- a/storage/indexer/src/db_ops.rs +++ b/storage/indexer/src/db_ops.rs @@ -1,7 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::schema::column_families; +use crate::schema::column_families_v2; use anyhow::Result; use aptos_config::config::RocksdbConfig; use aptos_rocksdb_options::gen_rocksdb_options; @@ -15,7 +15,7 @@ pub fn open_db>(db_path: P, rocksdb_config: &RocksdbConfig) -> Re Ok(DB::open( db_path, "index_asnync_v2_db", - column_families(), + column_families_v2(), &gen_rocksdb_options(rocksdb_config, false), )?) } diff --git a/storage/indexer/src/db_v2.rs b/storage/indexer/src/db_v2.rs index f7d7880e7db99..773f27dd24730 100644 --- a/storage/indexer/src/db_v2.rs +++ b/storage/indexer/src/db_v2.rs @@ -7,8 +7,8 @@ /// and this file will be moved to /ecosystem/indexer-grpc/indexer-grpc-table-info. use crate::{ db_ops::{read_db, write_db}, - metadata::{MetadataKey, MetadataValue}, - schema::{indexer_metadata::IndexerMetadataSchema, table_info::TableInfoSchema}, + metadata_v2::{MetadataKey, MetadataValue}, + schema::{indexer_metadata_v2::IndexerMetadataSchema, table_info::TableInfoSchema}, }; use aptos_logger::info; use aptos_schemadb::{SchemaBatch, DB}; @@ -52,6 +52,8 @@ pub struct IndexerAsyncV2 { pub db: DB, // Next version to be processed next_version: AtomicU64, + // DB snapshot most recently restored from gcs timestamp + restore_timestamp: AtomicU64, // It is used in the context of processing write ops and extracting table information. // As the code iterates through the write ops, it checks if the state key corresponds to a table item. // If it does, the associated bytes are added to the pending_on map under the corresponding table handle. @@ -70,10 +72,17 @@ impl IndexerAsyncV2 { ) .unwrap() .map_or(0, |v| v.expect_version()); + let last_restored_timestamp = read_db::( + &db, + &MetadataKey::RestoreTimestamp, + ) + .unwrap() + .map_or(0, |v| v.last_restored_timestamp()); Ok(Self { db, next_version: AtomicU64::new(next_version), + restore_timestamp: AtomicU64::new(last_restored_timestamp), pending_on: DashMap::new(), }) } @@ -147,6 +156,17 @@ impl IndexerAsyncV2 { Ok(()) } + pub fn update_last_restored_timestamp(&self, restore_timestamp: u64) -> Result<()> { + write_db::( + &self.db, + MetadataKey::RestoreTimestamp, + MetadataValue::Timestamp(restore_timestamp), + )?; + self.restore_timestamp + .store(restore_timestamp, Ordering::Relaxed); + Ok(()) + } + /// Finishes the parsing process and writes the parsed table information to a SchemaBatch. pub fn finish_table_info_parsing( &self, @@ -195,6 +215,15 @@ impl IndexerAsyncV2 { .map_or(0, |v| v.expect_version()) } + pub fn restore_timestamp(&self) -> u64 { + read_db::( + &self.db, + &MetadataKey::RestoreTimestamp, + ) + .unwrap() + .map_or(0, |v| v.last_restored_timestamp()) + } + pub fn get_table_info(&self, handle: TableHandle) -> Result> { self.db.get::(&handle).map_err(Into::into) } diff --git a/storage/indexer/src/lib.rs b/storage/indexer/src/lib.rs index 425112933e32d..a7f87dced54ac 100644 --- a/storage/indexer/src/lib.rs +++ b/storage/indexer/src/lib.rs @@ -5,7 +5,8 @@ mod db; pub mod db_ops; pub mod db_v2; -pub mod metadata; +mod metadata; +pub mod metadata_v2; pub mod schema; pub mod table_info_reader; diff --git a/storage/indexer/src/metadata_v2.rs b/storage/indexer/src/metadata_v2.rs new file mode 100644 index 0000000000000..257fa02b7ad8f --- /dev/null +++ b/storage/indexer/src/metadata_v2.rs @@ -0,0 +1,35 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_types::transaction::Version; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] +#[cfg_attr(any(test, feature = "fuzzing"), derive(proptest_derive::Arbitrary))] +pub enum MetadataValue { + Version(Version), + Timestamp(u64), +} + +impl MetadataValue { + pub fn expect_version(self) -> Version { + match self { + Self::Version(v) => v, + _ => panic!("Expected MetadataValue::Version"), + } + } + + pub fn last_restored_timestamp(self) -> u64 { + match self { + Self::Timestamp(t) => t, + _ => panic!("Expected MetadataValue::Timestamp"), + } + } +} + +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] +#[cfg_attr(any(test, feature = "fuzzing"), derive(proptest_derive::Arbitrary))] +pub enum MetadataKey { + LatestVersion, + RestoreTimestamp, +} diff --git a/storage/indexer/src/schema/indexer_metadata_v2/mod.rs b/storage/indexer/src/schema/indexer_metadata_v2/mod.rs new file mode 100644 index 0000000000000..dc8890feefee5 --- /dev/null +++ b/storage/indexer/src/schema/indexer_metadata_v2/mod.rs @@ -0,0 +1,44 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +//! This module defines physical storage schema storing metadata for the internal indexer +//! +use crate::{ + metadata_v2::{MetadataKey, MetadataValue}, + schema::INDEXER_METADATA_V2_CF_NAME, +}; +use anyhow::Result; +use aptos_schemadb::{ + define_schema, + schema::{KeyCodec, ValueCodec}, +}; + +define_schema!( + IndexerMetadataSchema, + MetadataKey, + MetadataValue, + INDEXER_METADATA_V2_CF_NAME +); + +impl KeyCodec for MetadataKey { + fn encode_key(&self) -> Result> { + Ok(bcs::to_bytes(self)?) + } + + fn decode_key(data: &[u8]) -> Result { + Ok(bcs::from_bytes(data)?) + } +} + +impl ValueCodec for MetadataValue { + fn encode_value(&self) -> Result> { + Ok(bcs::to_bytes(self)?) + } + + fn decode_value(data: &[u8]) -> Result { + Ok(bcs::from_bytes(data)?) + } +} + +#[cfg(test)] +mod test; diff --git a/storage/indexer/src/schema/indexer_metadata_v2/test.rs b/storage/indexer/src/schema/indexer_metadata_v2/test.rs new file mode 100644 index 0000000000000..a1117d2b17ea3 --- /dev/null +++ b/storage/indexer/src/schema/indexer_metadata_v2/test.rs @@ -0,0 +1,18 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use super::*; +use aptos_schemadb::{schema::fuzzing::assert_encode_decode, test_no_panic_decoding}; +use proptest::prelude::*; + +proptest! { + #[test] + fn test_encode_decode( + tag in any::(), + metadata in any::(), + ) { + assert_encode_decode::(&tag, &metadata); + } +} + +test_no_panic_decoding!(IndexerMetadataSchema); diff --git a/storage/indexer/src/schema/mod.rs b/storage/indexer/src/schema/mod.rs index 0b52a72474d46..c9bbc648fbe8c 100644 --- a/storage/indexer/src/schema/mod.rs +++ b/storage/indexer/src/schema/mod.rs @@ -7,12 +7,15 @@ //! All schemas are `pub(crate)` so not shown in rustdoc, refer to the source code to see details. pub mod indexer_metadata; +pub mod indexer_metadata_v2; pub mod table_info; use aptos_schemadb::ColumnFamilyName; pub const DEFAULT_COLUMN_FAMILY_NAME: ColumnFamilyName = "default"; +/// TODO(jill): to be deleted once INDEXER_METADATA_V2_CF_NAME is deployed pub const INDEXER_METADATA_CF_NAME: ColumnFamilyName = "indexer_metadata"; +pub const INDEXER_METADATA_V2_CF_NAME: ColumnFamilyName = "indexer_metadata_v2"; pub const TABLE_INFO_CF_NAME: ColumnFamilyName = "table_info"; pub fn column_families() -> Vec { @@ -22,3 +25,11 @@ pub fn column_families() -> Vec { TABLE_INFO_CF_NAME, ] } + +pub fn column_families_v2() -> Vec { + vec![ + /* empty cf */ DEFAULT_COLUMN_FAMILY_NAME, + INDEXER_METADATA_V2_CF_NAME, + TABLE_INFO_CF_NAME, + ] +}