Skip to content

Commit

Permalink
[Storage] Add TransactionInfoDb and TransactionAccumulatorDb as submo…
Browse files Browse the repository at this point in the history
…dules of LedgerDb. (aptos-labs#11695)
  • Loading branch information
grao1991 authored Jan 22, 2024
1 parent 5ae2542 commit 6d29c17
Show file tree
Hide file tree
Showing 29 changed files with 533 additions and 516 deletions.
29 changes: 17 additions & 12 deletions storage/aptosdb/src/backup/backup_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

use crate::{
ledger_db::LedgerDb,
ledger_store::LedgerStore,
metrics::{
BACKUP_EPOCH_ENDING_EPOCH, BACKUP_STATE_SNAPSHOT_LEAF_IDX, BACKUP_STATE_SNAPSHOT_VERSION,
BACKUP_TXN_VERSION,
Expand All @@ -28,21 +27,18 @@ use std::{fmt, sync::Arc};
/// `BackupHandler` provides functionalities for AptosDB data backup.
#[derive(Clone)]
pub struct BackupHandler {
ledger_store: Arc<LedgerStore>,
transaction_store: Arc<TransactionStore>,
state_store: Arc<StateStore>,
ledger_db: Arc<LedgerDb>,
}

impl BackupHandler {
pub(crate) fn new(
ledger_store: Arc<LedgerStore>,
transaction_store: Arc<TransactionStore>,
state_store: Arc<StateStore>,
ledger_db: Arc<LedgerDb>,
) -> Self {
Self {
ledger_store,
transaction_store,
state_store,
ledger_db,
Expand All @@ -62,7 +58,8 @@ impl BackupHandler {
.transaction_db()
.get_transaction_iter(start_version, num_transactions)?;
let mut txn_info_iter = self
.ledger_store
.ledger_db
.transaction_info_db()
.get_transaction_info_iter(start_version, num_transactions)?;
let mut event_vec_iter = self
.ledger_db
Expand Down Expand Up @@ -117,11 +114,14 @@ impl BackupHandler {
let ledger_metadata_db = self.ledger_db.metadata_db();
let epoch = ledger_metadata_db.get_epoch(last_version)?;
let ledger_info = ledger_metadata_db.get_latest_ledger_info_in_epoch(epoch)?;
let accumulator_proof = self.ledger_store.get_transaction_range_proof(
Some(first_version),
num_transactions,
ledger_info.ledger_info().version(),
)?;
let accumulator_proof = self
.ledger_db
.transaction_accumulator_db()
.get_transaction_range_proof(
Some(first_version),
num_transactions,
ledger_info.ledger_info().version(),
)?;
Ok((accumulator_proof, ledger_info))
}

Expand Down Expand Up @@ -174,8 +174,13 @@ impl BackupHandler {
let epoch = ledger_metadata_db.get_epoch(version)?;
let ledger_info = ledger_metadata_db.get_latest_ledger_info_in_epoch(epoch)?;
let txn_info = self
.ledger_store
.get_transaction_info_with_proof(version, ledger_info.ledger_info().version())?;
.ledger_db
.transaction_info_db()
.get_transaction_info_with_proof(
version,
ledger_info.ledger_info().version(),
self.ledger_db.transaction_accumulator_db(),
)?;

Ok((txn_info, ledger_info))
}
Expand Down
8 changes: 1 addition & 7 deletions storage/aptosdb/src/backup/restore_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
use crate::{
backup::restore_utils,
ledger_db::LedgerDb,
ledger_store::LedgerStore,
schema::db_metadata::{DbMetadataKey, DbMetadataSchema},
state_restore::{StateSnapshotRestore, StateSnapshotRestoreMode},
state_store::StateStore,
Expand All @@ -28,7 +27,6 @@ use std::sync::Arc;
#[derive(Clone)]
pub struct RestoreHandler {
pub aptosdb: Arc<AptosDB>,
ledger_store: Arc<LedgerStore>,
transaction_store: Arc<TransactionStore>,
state_store: Arc<StateStore>,
ledger_db: Arc<LedgerDb>,
Expand All @@ -37,14 +35,12 @@ pub struct RestoreHandler {
impl RestoreHandler {
pub(crate) fn new(
aptosdb: Arc<AptosDB>,
ledger_store: Arc<LedgerStore>,
transaction_store: Arc<TransactionStore>,
state_store: Arc<StateStore>,
) -> Self {
Self {
ledger_db: Arc::clone(&aptosdb.ledger_db),
aptosdb,
ledger_store,
transaction_store,
state_store,
}
Expand Down Expand Up @@ -80,7 +76,7 @@ impl RestoreHandler {
frozen_subtrees: &[HashValue],
) -> Result<()> {
restore_utils::confirm_or_save_frozen_subtrees(
self.aptosdb.ledger_db.transaction_accumulator_db(),
self.aptosdb.ledger_db.transaction_accumulator_db_raw(),
num_leaves,
frozen_subtrees,
None,
Expand All @@ -96,7 +92,6 @@ impl RestoreHandler {
write_sets: Vec<WriteSet>,
) -> Result<()> {
restore_utils::save_transactions(
self.ledger_store.clone(),
self.transaction_store.clone(),
self.state_store.clone(),
self.ledger_db.clone(),
Expand All @@ -119,7 +114,6 @@ impl RestoreHandler {
write_sets: Vec<WriteSet>,
) -> Result<()> {
restore_utils::save_transactions(
self.ledger_store.clone(),
self.transaction_store.clone(),
self.state_store.clone(),
self.ledger_db.clone(),
Expand Down
31 changes: 19 additions & 12 deletions storage/aptosdb/src/backup/restore_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
//! database restore operations, as required by restore and
//! state sync v2.
use crate::{
ledger_db::{ledger_metadata_db::LedgerMetadataDb, LedgerDb, LedgerDbSchemaBatches},
ledger_store::LedgerStore,
ledger_db::{
ledger_metadata_db::LedgerMetadataDb, transaction_info_db::TransactionInfoDb, LedgerDb,
LedgerDbSchemaBatches,
},
schema::{
db_metadata::{DbMetadataKey, DbMetadataSchema, DbMetadataValue},
transaction_accumulator::TransactionAccumulatorSchema,
Expand Down Expand Up @@ -107,7 +109,6 @@ pub fn confirm_or_save_frozen_subtrees(
/// Saves the given transactions to the db. If a change set is provided, a batch
/// of db alterations will be added to the change set without writing them to the db.
pub(crate) fn save_transactions(
ledger_store: Arc<LedgerStore>,
transaction_store: Arc<TransactionStore>,
state_store: Arc<StateStore>,
ledger_db: Arc<LedgerDb>,
Expand All @@ -125,7 +126,6 @@ pub(crate) fn save_transactions(
) -> Result<()> {
if let Some((ledger_db_batch, state_kv_batches, state_kv_metadata_batch)) = existing_batch {
save_transactions_impl(
Arc::clone(&ledger_store),
transaction_store,
state_store,
ledger_db,
Expand All @@ -144,7 +144,6 @@ pub(crate) fn save_transactions(
let mut sharded_kv_schema_batch = new_sharded_kv_schema_batch();
let state_kv_metadata_batch = SchemaBatch::new();
save_transactions_impl(
Arc::clone(&ledger_store),
transaction_store,
Arc::clone(&state_store),
Arc::clone(&ledger_db),
Expand Down Expand Up @@ -219,7 +218,6 @@ fn save_ledger_infos_impl(

/// A helper function that saves the transactions to the given change set
pub(crate) fn save_transactions_impl(
ledger_store: Arc<LedgerStore>,
transaction_store: Arc<TransactionStore>,
state_store: Arc<StateStore>,
ledger_db: Arc<LedgerDb>,
Expand All @@ -242,12 +240,21 @@ pub(crate) fn save_transactions_impl(
)?;
}

ledger_store.put_transaction_infos(
first_version,
txn_infos,
&ledger_db_batch.transaction_info_db_batches,
&ledger_db_batch.transaction_accumulator_db_batches,
)?;
for (idx, txn_info) in txn_infos.iter().enumerate() {
TransactionInfoDb::put_transaction_info(
first_version + idx as Version,
txn_info,
&ledger_db_batch.transaction_info_db_batches,
)?;
}

ledger_db
.transaction_accumulator_db()
.put_transaction_accumulator(
first_version,
txn_infos,
&ledger_db_batch.transaction_accumulator_db_batches,
)?;

ledger_db.event_db().put_events_multiple_versions(
first_version,
Expand Down
4 changes: 2 additions & 2 deletions storage/aptosdb/src/db/aptosdb_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::{
db::{
get_first_seq_num_and_limit, test_helper,
test_helper::{arb_blocks_to_commit, put_as_state_root, put_transaction_info},
test_helper::{arb_blocks_to_commit, put_as_state_root, put_transaction_infos},
AptosDB,
},
pruner::{LedgerPrunerManager, PrunerManager, StateMerklePrunerManager},
Expand Down Expand Up @@ -162,7 +162,7 @@ fn test_get_latest_executed_trees() {
0,
ExecutionStatus::MiscellaneousError(None),
);
put_transaction_info(&db, 0, &txn_info);
put_transaction_infos(&db, 0, &[txn_info.clone()]);

let bootstrapped = db.get_latest_executed_trees().unwrap();
assert!(
Expand Down
1 change: 0 additions & 1 deletion storage/aptosdb/src/db/include/aptosdb_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ impl AptosDB {
ledger_db: Arc::clone(&ledger_db),
state_kv_db: Arc::clone(&state_kv_db),
event_store: Arc::new(EventStore::new(ledger_db.event_db().db_arc())),
ledger_store: Arc::new(LedgerStore::new(Arc::clone(&ledger_db))),
state_store,
transaction_store: Arc::new(TransactionStore::new(Arc::clone(&ledger_db))),
ledger_pruner,
Expand Down
72 changes: 46 additions & 26 deletions storage/aptosdb/src/db/include/aptosdb_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,11 @@ impl DbReader for AptosDB {
.map(|version| self.ledger_db.transaction_db().get_transaction(version))
.collect::<Result<Vec<_>>>()?;
let txn_infos = (start_version..start_version + limit)
.map(|version| self.ledger_store.get_transaction_info(version))
.map(|version| {
self.ledger_db
.transaction_info_db()
.get_transaction_info(version)
})
.collect::<Result<Vec<_>>>()?;
let events = if fetch_events {
Some(
Expand All @@ -158,11 +162,9 @@ impl DbReader for AptosDB {
None
};
let proof = TransactionInfoListWithProof::new(
self.ledger_store.get_transaction_range_proof(
Some(start_version),
limit,
ledger_version,
)?,
self.ledger_db
.transaction_accumulator_db()
.get_transaction_range_proof(Some(start_version), limit, ledger_version)?,
txn_infos,
);

Expand Down Expand Up @@ -221,7 +223,10 @@ impl DbReader for AptosDB {

let (txn_infos, txns_and_outputs) = (start_version..start_version + limit)
.map(|version| {
let txn_info = self.ledger_store.get_transaction_info(version)?;
let txn_info = self
.ledger_db
.transaction_info_db()
.get_transaction_info(version)?;
let events = self.ledger_db.event_db().get_events_by_version(version)?;
let write_set = self.transaction_store.get_write_set(version)?;
let txn = self.ledger_db.transaction_db().get_transaction(version)?;
Expand All @@ -237,11 +242,9 @@ impl DbReader for AptosDB {
.into_iter()
.unzip();
let proof = TransactionInfoListWithProof::new(
self.ledger_store.get_transaction_range_proof(
Some(start_version),
limit,
ledger_version,
)?,
self.ledger_db
.transaction_accumulator_db()
.get_transaction_range_proof(Some(start_version), limit, ledger_version)?,
txn_infos,
);

Expand Down Expand Up @@ -293,7 +296,8 @@ impl DbReader for AptosDB {
self.error_if_ledger_pruned("Transaction", start_version)?;

let iter = self
.ledger_store
.ledger_db
.transaction_info_db()
.get_transaction_info_iter(start_version, limit as usize)?;
Ok(Box::new(iter) as Box<dyn Iterator<Item = Result<TransactionInfo>> + '_>)
})
Expand Down Expand Up @@ -344,11 +348,9 @@ impl DbReader for AptosDB {
gauged_api("get_transaction_accumulator_range_proof", || {
self.error_if_ledger_pruned("Transaction", first_version)?;

self.ledger_store.get_transaction_range_proof(
Some(first_version),
limit,
ledger_version,
)
self.ledger_db
.transaction_accumulator_db()
.get_transaction_range_proof(Some(first_version), limit, ledger_version)
})
}

Expand Down Expand Up @@ -469,7 +471,10 @@ impl DbReader for AptosDB {
.current_version
.map_or(0, |v| v + 1);

let frozen_subtrees = self.ledger_store.get_frozen_subtree_hashes(num_txns)?;
let frozen_subtrees = self
.ledger_db
.transaction_accumulator_db()
.get_frozen_subtree_hashes(num_txns)?;
let transaction_accumulator =
Arc::new(InMemoryAccumulator::new(frozen_subtrees, num_txns)?);
let executed_trees = ExecutedTrees::new(
Expand All @@ -489,7 +494,10 @@ impl DbReader for AptosDB {
fn get_block_timestamp(&self, version: u64) -> Result<u64> {
gauged_api("get_block_timestamp", || {
self.error_if_ledger_pruned("NewBlockEvent", version)?;
ensure!(version <= self.get_latest_version()?, "version older than latest version");
ensure!(
version <= self.get_latest_version()?,
"version older than latest version"
);

match self.event_store.get_block_metadata(version) {
Ok((_first_version, new_block_event)) => Ok(new_block_event.proposed_time()),
Expand Down Expand Up @@ -709,7 +717,9 @@ impl DbReader for AptosDB {
fn get_accumulator_root_hash(&self, version: Version) -> Result<HashValue> {
gauged_api("get_accumulator_root_hash", || {
self.error_if_ledger_pruned("Transaction accumulator", version)?;
self.ledger_store.get_root_hash(version)
self.ledger_db
.transaction_accumulator_db()
.get_root_hash(version)
})
}

Expand All @@ -723,7 +733,8 @@ impl DbReader for AptosDB {
"Transaction accumulator",
client_known_version.unwrap_or(0),
)?;
self.ledger_store
self.ledger_db
.transaction_accumulator_db()
.get_consistency_proof(client_known_version, ledger_version)
})
}
Expand All @@ -733,8 +744,12 @@ impl DbReader for AptosDB {
ledger_version: Version,
) -> Result<TransactionAccumulatorSummary> {
let num_txns = ledger_version + 1;
let frozen_subtrees = self.ledger_store.get_frozen_subtree_hashes(num_txns)?;
TransactionAccumulatorSummary::new(InMemoryAccumulator::new(frozen_subtrees, num_txns)?).map_err(Into::into)
let frozen_subtrees = self
.ledger_db
.transaction_accumulator_db()
.get_frozen_subtree_hashes(num_txns)?;
TransactionAccumulatorSummary::new(InMemoryAccumulator::new(frozen_subtrees, num_txns)?)
.map_err(Into::into)
}

fn get_state_leaf_count(&self, version: Version) -> Result<usize> {
Expand Down Expand Up @@ -916,8 +931,13 @@ impl AptosDB {
self.error_if_ledger_pruned("Transaction", version)?;

let proof = self
.ledger_store
.get_transaction_info_with_proof(version, ledger_version)?;
.ledger_db
.transaction_info_db()
.get_transaction_info_with_proof(
version,
ledger_version,
self.ledger_db.transaction_accumulator_db(),
)?;
let transaction = self.ledger_db.transaction_db().get_transaction(version)?;

// If events were requested, also fetch those.
Expand Down
Loading

0 comments on commit 6d29c17

Please sign in to comment.