Skip to content

Commit

Permalink
add db snapshot restore logic
Browse files Browse the repository at this point in the history
  • Loading branch information
jillxuu committed Jan 26, 2024
1 parent 5908568 commit 59bdfd8
Show file tree
Hide file tree
Showing 10 changed files with 318 additions and 12 deletions.
10 changes: 8 additions & 2 deletions config/src/config/indexer_table_info_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
use serde::{Deserialize, Serialize};

// Useful defaults
pub const DEFAULT_PARSER_TASK_COUNT: u16 = 10;
pub const DEFAULT_PARSER_BATCH_SIZE: u16 = 100;
pub const DEFAULT_PARSER_TASK_COUNT: u16 = 20;
pub const DEFAULT_PARSER_BATCH_SIZE: u16 = 1000;
pub const DEFAULT_BUCKET_NAME: &str = "table-info";
pub const DEFAULT_VERSION_DIFF: u64 = 100_000;

#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
#[serde(default, deny_unknown_fields)]
Expand All @@ -27,6 +28,10 @@ pub struct IndexerTableInfoConfig {

/// Backup and restore service config
pub gcs_bucket_name: String,

/// if version difference btw this FN and latest ledger version is less than VERSION_DIFF
/// do not restore, start from the FN version, to avoid time consuming db restore from gcs
pub version_diff: u64,
}

// Reminder, #[serde(default)] on IndexerTableInfoConfig means that the default values for
Expand All @@ -41,6 +46,7 @@ impl Default for IndexerTableInfoConfig {
enable_expensive_logging: false,
db_backup_enabled: false,
gcs_bucket_name: DEFAULT_BUCKET_NAME.to_owned(),
version_diff: DEFAULT_VERSION_DIFF,
}
}
}
1 change: 1 addition & 0 deletions ecosystem/indexer-grpc/indexer-grpc-table-info/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ Follow instructions on how to run a fullnode against an existing network.
parser_batch_size: 1000
db_backup_enabled: false
gcs_bucket_name: "table-info"
version_diff: 100000
* Run fullnode `cargo run -p aptos-node --release -- -f ./fullnode.yaml`
171 changes: 166 additions & 5 deletions ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,35 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::{backup_restore::gcs::GcsBackupRestoreOperator, table_info_service::TableInfoService};
use crate::{
backup_restore::{fs_ops::rename_db_folders_and_cleanup, gcs::GcsBackupRestoreOperator},
table_info_service::TableInfoService,
};
use anyhow::Error;
use aptos_api::context::Context;
use aptos_config::config::NodeConfig;
use aptos_db_indexer::{db_ops::open_db, db_v2::IndexerAsyncV2};
use aptos_config::config::{NodeConfig, RocksdbConfig};
use aptos_db_indexer::{
db_ops::{close_db, open_db, read_db, write_db},
db_v2::IndexerAsyncV2,
metadata_v2::{MetadataKey, MetadataValue},
schema::indexer_metadata_v2::IndexerMetadataSchema,
};
use aptos_logger::info;
use aptos_mempool::MempoolClientSender;
use aptos_schemadb::DB;
use aptos_storage_interface::DbReaderWriter;
use aptos_types::chain_id::{ChainId, NamedChain};
use std::sync::Arc;
use std::{
sync::Arc,
thread::sleep,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tokio::runtime::Runtime;

const INDEX_ASYNC_V2_DB_NAME: &str = "index_indexer_async_v2_db";
/// if last restore timestamp is less than RESTORE_TIME_DIFF_SECS, do not restore to avoid gcs download spam
const RESTORE_TIME_DIFF_SECS: u64 = 600;
const DB_OPERATION_INTERVAL_MS: u64 = 500;

/// Creates a runtime which creates a thread pool which sets up fullnode indexer table info service
/// Returns corresponding Tokio runtime
Expand Down Expand Up @@ -49,6 +66,16 @@ pub fn bootstrap(
},
};

// Set up the gcs bucket
let gcs_bucket_name = node_config.indexer_table_info.gcs_bucket_name.clone();
let named_chain = match NamedChain::from_chain_id(&chain_id) {
Ok(named_chain) => format!("{}", named_chain).to_lowercase(),
Err(_err) => {
info!("Getting chain name from not named chains");
chain_id.id().to_string()
},
};

let indexer_async_v2 =
Arc::new(IndexerAsyncV2::new(db).expect("Failed to initialize indexer async v2"));
let indexer_async_v2_clone = Arc::clone(&indexer_async_v2);
Expand All @@ -75,7 +102,7 @@ pub fn bootstrap(

let mut parser = TableInfoService::new(
context,
indexer_async_v2_clone.next_version(),
next_version,
node_config.indexer_table_info.parser_task_count,
node_config.indexer_table_info.parser_batch_size,
node_config.indexer_table_info.enable_expensive_logging,
Expand All @@ -88,3 +115,137 @@ pub fn bootstrap(

Some((runtime, indexer_async_v2))
}

/// This function handles the conditional restoration of the database from a GCS snapshot.
/// It checks if the database needs to be restored based on metadata file existence
/// and metadata epoch and the time since the last restore and the version differences.
/// If a restore is needed, it:
/// 1. close the db
/// 2. performs the restore to a different folder
/// 3. rename the folder to atomically move restored db snapshot to the right db path
/// 4. re-open the db
/// 5. update the last restore timestamp in the restored db
/// If a restore is not needed, it:
/// 1. returns the original db
async fn handle_db_restore(
node_config: &NodeConfig,
chain_id: ChainId,
db: DB,
db_rw: DbReaderWriter,
version_diff: u64,
rocksdb_config: RocksdbConfig,
) -> Result<DB, Error> {
let binding = node_config.storage.get_dir_paths();
let db_root_path = binding.default_root_path();
let db_path = db_root_path.join(INDEX_ASYNC_V2_DB_NAME);
// Set up backup and restore config
let gcs_bucket_name = node_config.indexer_table_info.gcs_bucket_name.clone();
let named_chain = match NamedChain::from_chain_id(&chain_id) {
Ok(named_chain) => format!("{}", named_chain).to_lowercase(),
Err(_err) => {
info!("Getting chain name from not named chains");
chain_id.id().to_string()
},
};

let backup_restore_operator: Arc<GcsBackupRestoreOperator> = Arc::new(
GcsBackupRestoreOperator::new(format!("{}-{}", gcs_bucket_name.clone(), &named_chain))
.await,
);

// If there's no metadata json file in gcs, we will create a default one with epoch 0 and return early since there's no snapshot to restore from, and early return.
// If metadata epoch is 0, early return.
let metadata = backup_restore_operator.get_metadata().await;
if metadata.is_none() {
backup_restore_operator
.create_default_metadata_if_absent(chain_id.id() as u64)
.await
.expect("Failed to create default metadata");
return Ok(db);
} else if metadata.unwrap().epoch == 0 {
return Ok(db);
}

// Check the time duration since the last restore
let last_restored_timestamp = read_db::<MetadataKey, MetadataValue, IndexerMetadataSchema>(
&db,
&MetadataKey::RestoreTimestamp,
)
.unwrap()
.map_or(0, |v| v.last_restored_timestamp());
// Current timestamp will be used to compare duration from last restored timestamp, and to save db if restore is performed
let current_timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
assert!(
current_timestamp >= last_restored_timestamp,
"Last restored timestamp from db should be less or equal to the current timestamp"
);
let should_restore_based_on_time_duration =
current_timestamp - last_restored_timestamp > RESTORE_TIME_DIFF_SECS;

// Check the version difference
let latest_committed_version = db_rw.reader.get_latest_version().unwrap();
let next_version = read_db::<MetadataKey, MetadataValue, IndexerMetadataSchema>(
&db,
&MetadataKey::LatestVersion,
)
.unwrap()
.map_or(0, |v| v.expect_version());
let should_restore_based_on_version = latest_committed_version > next_version
&& latest_committed_version - next_version > version_diff;

if should_restore_based_on_time_duration && should_restore_based_on_version {
// after reading db metadata info and deciding to restore, drop the db so that we could re-open it later
close_db(db);

sleep(Duration::from_millis(DB_OPERATION_INTERVAL_MS));

backup_restore_operator
.verify_storage_bucket_existence()
.await;

// a different path to restore backup db snapshot to, to avoid db corruption
let restore_db_path = node_config
.storage
.get_dir_paths()
.default_root_path()
.join("restore");
backup_restore_operator
.restore_db_snapshot(
chain_id.id() as u64,
metadata.unwrap(),
restore_db_path.clone(),
node_config.get_data_dir().to_path_buf(),
)
.await
.expect("Failed to restore snapshot");

// Restore to a different folder and replace the target folder atomically
let tmp_db_path = db_root_path.join("tmp");
rename_db_folders_and_cleanup(&db_path, &tmp_db_path, &restore_db_path)
.expect("Failed to operate atomic restore in file system.");

sleep(Duration::from_millis(DB_OPERATION_INTERVAL_MS));

let db = open_db(&db_path, &rocksdb_config).expect("Failed to reopen db after restore");
write_db::<MetadataKey, MetadataValue, IndexerMetadataSchema>(
&db,
MetadataKey::RestoreTimestamp,
MetadataValue::Timestamp(current_timestamp),
)
.expect("Failed to write restore timestamp to indexer async v2");

info!(
should_restore_based_on_time_duration = should_restore_based_on_time_duration,
should_restore_based_on_version = should_restore_based_on_version,
latest_committed_version = latest_committed_version,
db_next_version = next_version,
last_restored_timestamp = last_restored_timestamp,
"[Table Info] Table info restored successfully"
);
return Ok(db);
}
Ok(db)
}
4 changes: 2 additions & 2 deletions storage/indexer/src/db_ops.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::schema::column_families;
use crate::schema::column_families_v2;
use anyhow::Result;
use aptos_config::config::RocksdbConfig;
use aptos_rocksdb_options::gen_rocksdb_options;
Expand All @@ -15,7 +15,7 @@ pub fn open_db<P: AsRef<Path>>(db_path: P, rocksdb_config: &RocksdbConfig) -> Re
Ok(DB::open(
db_path,
"index_asnync_v2_db",
column_families(),
column_families_v2(),
&gen_rocksdb_options(rocksdb_config, false),
)?)
}
Expand Down
33 changes: 31 additions & 2 deletions storage/indexer/src/db_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
/// and this file will be moved to /ecosystem/indexer-grpc/indexer-grpc-table-info.
use crate::{
db_ops::{read_db, write_db},
metadata::{MetadataKey, MetadataValue},
schema::{indexer_metadata::IndexerMetadataSchema, table_info::TableInfoSchema},
metadata_v2::{MetadataKey, MetadataValue},
schema::{indexer_metadata_v2::IndexerMetadataSchema, table_info::TableInfoSchema},
};
use aptos_logger::info;
use aptos_schemadb::{SchemaBatch, DB};
Expand Down Expand Up @@ -52,6 +52,8 @@ pub struct IndexerAsyncV2 {
pub db: DB,
// Next version to be processed
next_version: AtomicU64,
// DB snapshot most recently restored from gcs timestamp
restore_timestamp: AtomicU64,
// It is used in the context of processing write ops and extracting table information.
// As the code iterates through the write ops, it checks if the state key corresponds to a table item.
// If it does, the associated bytes are added to the pending_on map under the corresponding table handle.
Expand All @@ -70,10 +72,17 @@ impl IndexerAsyncV2 {
)
.unwrap()
.map_or(0, |v| v.expect_version());
let last_restored_timestamp = read_db::<MetadataKey, MetadataValue, IndexerMetadataSchema>(
&db,
&MetadataKey::RestoreTimestamp,
)
.unwrap()
.map_or(0, |v| v.last_restored_timestamp());

Ok(Self {
db,
next_version: AtomicU64::new(next_version),
restore_timestamp: AtomicU64::new(last_restored_timestamp),
pending_on: DashMap::new(),
})
}
Expand Down Expand Up @@ -147,6 +156,17 @@ impl IndexerAsyncV2 {
Ok(())
}

pub fn update_last_restored_timestamp(&self, restore_timestamp: u64) -> Result<()> {
write_db::<MetadataKey, MetadataValue, IndexerMetadataSchema>(
&self.db,
MetadataKey::RestoreTimestamp,
MetadataValue::Timestamp(restore_timestamp),
)?;
self.restore_timestamp
.store(restore_timestamp, Ordering::Relaxed);
Ok(())
}

/// Finishes the parsing process and writes the parsed table information to a SchemaBatch.
pub fn finish_table_info_parsing(
&self,
Expand Down Expand Up @@ -195,6 +215,15 @@ impl IndexerAsyncV2 {
.map_or(0, |v| v.expect_version())
}

pub fn restore_timestamp(&self) -> u64 {
read_db::<MetadataKey, MetadataValue, IndexerMetadataSchema>(
&self.db,
&MetadataKey::RestoreTimestamp,
)
.unwrap()
.map_or(0, |v| v.last_restored_timestamp())
}

pub fn get_table_info(&self, handle: TableHandle) -> Result<Option<TableInfo>> {
self.db.get::<TableInfoSchema>(&handle).map_err(Into::into)
}
Expand Down
3 changes: 2 additions & 1 deletion storage/indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
mod db;
pub mod db_ops;
pub mod db_v2;
pub mod metadata;
mod metadata;
pub mod metadata_v2;
pub mod schema;
pub mod table_info_reader;

Expand Down
35 changes: 35 additions & 0 deletions storage/indexer/src/metadata_v2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use aptos_types::transaction::Version;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
#[cfg_attr(any(test, feature = "fuzzing"), derive(proptest_derive::Arbitrary))]
pub enum MetadataValue {
Version(Version),
Timestamp(u64),
}

impl MetadataValue {
pub fn expect_version(self) -> Version {
match self {
Self::Version(v) => v,
_ => panic!("Expected MetadataValue::Version"),
}
}

pub fn last_restored_timestamp(self) -> u64 {
match self {
Self::Timestamp(t) => t,
_ => panic!("Expected MetadataValue::Timestamp"),
}
}
}

#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
#[cfg_attr(any(test, feature = "fuzzing"), derive(proptest_derive::Arbitrary))]
pub enum MetadataKey {
LatestVersion,
RestoreTimestamp,
}
Loading

0 comments on commit 59bdfd8

Please sign in to comment.