Skip to content

Commit

Permalink
pass instant from aptosdb for calculating latency metric
Browse files Browse the repository at this point in the history
  • Loading branch information
areshand committed Jan 10, 2025
1 parent f97ba5e commit cc5d099
Show file tree
Hide file tree
Showing 14 changed files with 93 additions and 55 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions api/test-context/src/test_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,13 @@ use bytes::Bytes;
use hyper::{HeaderMap, Response};
use rand::SeedableRng;
use serde_json::{json, Value};
use std::{boxed::Box, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
use std::{
boxed::Box,
net::SocketAddr,
path::PathBuf,
sync::Arc,
time::{Duration, Instant},
};
use tokio::sync::watch::channel;
use warp::{http::header::CONTENT_TYPE, Filter, Rejection, Reply};
use warp_reverse_proxy::reverse_proxy_filter;
Expand Down Expand Up @@ -131,7 +137,7 @@ pub fn new_test_context_inner(
let (root_key, genesis, genesis_waypoint, validators) = builder.build(&mut rng).unwrap();
let (validator_identity, _, _, _) = validators[0].get_key_objects(None).unwrap();
let validator_owner = validator_identity.account_address.unwrap();
let (sender, recver) = channel::<Version>(0);
let (sender, recver) = channel::<(Instant, Version)>((Instant::now(), 0 as Version));
let (db, db_rw) = if use_db_with_indexer {
let mut aptos_db = AptosDB::new_for_test_with_indexer(
&tmp_dir,
Expand Down
6 changes: 4 additions & 2 deletions aptos-node/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use aptos_peer_monitoring_service_server::{
use aptos_peer_monitoring_service_types::PeerMonitoringServiceMessage;
use aptos_storage_interface::{DbReader, DbReaderWriter};
use aptos_time_service::TimeService;
use aptos_types::{chain_id::ChainId, indexer::indexer_db_reader::IndexerReader};
use aptos_types::{
chain_id::ChainId, indexer::indexer_db_reader::IndexerReader, transaction::Version,
};
use aptos_validator_transaction_pool::VTxnPoolState;
use futures::channel::{mpsc, mpsc::Sender, oneshot};
use std::{sync::Arc, time::Instant};
Expand All @@ -51,7 +53,7 @@ pub fn bootstrap_api_and_indexer(
db_rw: DbReaderWriter,
chain_id: ChainId,
internal_indexer_db: Option<InternalIndexerDB>,
update_receiver: Option<WatchReceiver<u64>>,
update_receiver: Option<WatchReceiver<(Instant, Version)>>,
api_port_tx: Option<oneshot::Sender<u16>>,
indexer_grpc_port_tx: Option<oneshot::Sender<u16>>,
) -> anyhow::Result<(
Expand Down
7 changes: 3 additions & 4 deletions aptos-node/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use tokio::{
runtime::Runtime,
sync::watch::{channel, Receiver as WatchReceiver},
};

pub(crate) fn maybe_apply_genesis(
db_rw: &DbReaderWriter,
node_config: &NodeConfig,
Expand Down Expand Up @@ -51,11 +50,11 @@ pub(crate) fn bootstrap_db(
DbReaderWriter,
Option<Runtime>,
Option<InternalIndexerDB>,
Option<WatchReceiver<u64>>,
Option<WatchReceiver<(Instant, Version)>>,
)> {
let internal_indexer_db = InternalIndexerDBService::get_indexer_db(node_config);
let (update_sender, update_receiver) = if internal_indexer_db.is_some() {
let (sender, receiver) = channel::<u64>(0);
let (sender, receiver) = channel::<(Instant, Version)>((Instant::now(), 0 as Version));
(Some(sender), Some(receiver))
} else {
(None, None)
Expand Down Expand Up @@ -177,7 +176,7 @@ pub fn initialize_database_and_checkpoints(
Option<Runtime>,
Waypoint,
Option<InternalIndexerDB>,
Option<WatchReceiver<Version>>,
Option<WatchReceiver<(Instant, Version)>>,
)> {
// If required, create RocksDB checkpoints and change the working directory.
// This is test-only.
Expand Down
2 changes: 2 additions & 0 deletions ecosystem/indexer-grpc/indexer-grpc-table-info/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ aptos-indexer-grpc-fullnode = { workspace = true }
aptos-indexer-grpc-utils = { workspace = true }
aptos-logger = { workspace = true }
aptos-mempool = { workspace = true }
aptos-metrics-core = { workspace = true }
aptos-runtimes = { workspace = true }
aptos-storage-interface = { workspace = true }
aptos-types = { workspace = true }
Expand All @@ -29,6 +30,7 @@ futures = { workspace = true }
google-cloud-storage = { workspace = true }
hyper = { workspace = true }
itertools = { workspace = true }
once_cell = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tar = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::metrics::INDEXER_DB_LATENCY;
use anyhow::Result;
use aptos_config::config::{internal_indexer_db_config::InternalIndexerDBConfig, NodeConfig};
use aptos_db_indexer::{
Expand All @@ -9,13 +10,12 @@ use aptos_db_indexer::{
indexer_reader::IndexerReaders,
};
use aptos_indexer_grpc_utils::counters::{log_grpc_step, IndexerGrpcStep};
use aptos_logger::info;
use aptos_storage_interface::DbReader;
use aptos_types::{indexer::indexer_db_reader::IndexerReader, transaction::Version};
use std::{
path::{Path, PathBuf},
sync::Arc,
time::Duration,
time::Instant,
};
use tokio::{runtime::Handle, sync::watch::Receiver as WatchReceiver};

Expand All @@ -24,14 +24,14 @@ const INTERNAL_INDEXER_DB: &str = "internal_indexer_db";

pub struct InternalIndexerDBService {
pub db_indexer: Arc<DBIndexer>,
pub update_receiver: WatchReceiver<Version>,
pub update_receiver: WatchReceiver<(Instant, Version)>,
}

impl InternalIndexerDBService {
pub fn new(
db_reader: Arc<dyn DbReader>,
internal_indexer_db: InternalIndexerDB,
update_receiver: WatchReceiver<Version>,
update_receiver: WatchReceiver<(Instant, Version)>,
) -> Self {
let internal_db_indexer = Arc::new(DBIndexer::new(internal_indexer_db, db_reader));
Self {
Expand Down Expand Up @@ -166,31 +166,30 @@ impl InternalIndexerDBService {

pub async fn run(&mut self, node_config: &NodeConfig) -> Result<()> {
let mut start_version = self.get_start_version(node_config).await?;
let mut target_version = self.db_indexer.main_db_reader.ensure_synced_version()?;
let mut step_timer = std::time::Instant::now();

loop {
let start_time: std::time::Instant = std::time::Instant::now();
let next_version = self.db_indexer.process_a_batch(start_version)?;

if next_version == start_version {
if let Ok(recv_res) =
tokio::time::timeout(Duration::from_millis(100), self.update_receiver.changed())
.await
{
if recv_res.is_err() {
info!("update sender is dropped");
return Ok(());
}
if target_version == start_version {
match self.update_receiver.changed().await {
Ok(_) => {
(step_timer, target_version) = *self.update_receiver.borrow();
},
Err(e) => {
panic!("Failed to get update from update_receiver: {}", e);
},
}
continue;
};
}
let next_version = self.db_indexer.process(start_version, target_version)?;
INDEXER_DB_LATENCY.set(step_timer.elapsed().as_millis() as i64);
log_grpc_step(
SERVICE_TYPE,
IndexerGrpcStep::InternalIndexerDBProcessed,
Some(start_version as i64),
Some(next_version as i64),
None,
None,
Some(start_time.elapsed().as_secs_f64()),
Some(step_timer.elapsed().as_secs_f64()),
None,
Some((next_version - start_version) as i64),
None,
Expand All @@ -205,17 +204,14 @@ impl InternalIndexerDBService {
node_config: &NodeConfig,
end_version: Option<Version>,
) -> Result<()> {
let mut start_version = self.get_start_version(node_config).await?;
while start_version <= end_version.unwrap_or(std::u64::MAX) {
let next_version = self.db_indexer.process_a_batch(start_version)?;
if next_version == start_version {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
continue;
}
start_version = next_version;
let start_version = self.get_start_version(node_config).await?;
let end_version = end_version.unwrap_or(std::u64::MAX);
let mut next_version = start_version;
while next_version < end_version {
next_version = self.db_indexer.process(start_version, end_version)?;
// We should stop the internal indexer so that internal indexer can catch up with the main DB
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
// We should never stop the internal indexer
tokio::time::sleep(std::time::Duration::from_secs(100)).await;

Ok(())
}
Expand All @@ -230,7 +226,7 @@ impl MockInternalIndexerDBService {
pub fn new_for_test(
db_reader: Arc<dyn DbReader>,
node_config: &NodeConfig,
update_receiver: WatchReceiver<Version>,
update_receiver: WatchReceiver<(Instant, Version)>,
end_version: Option<Version>,
) -> Self {
if !node_config
Expand Down
1 change: 1 addition & 0 deletions ecosystem/indexer-grpc/indexer-grpc-table-info/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

pub mod backup_restore;
pub mod internal_indexer_db_service;
pub mod metrics;
pub mod runtime;
pub mod table_info_service;

Expand Down
13 changes: 13 additions & 0 deletions ecosystem/indexer-grpc/indexer-grpc-table-info/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use aptos_metrics_core::{register_int_gauge, IntGauge};
use once_cell::sync::Lazy;

pub static INDEXER_DB_LATENCY: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"aptos_internal_indexer_latency",
"The latency between main db update and data written to indexer db"
)
.unwrap()
});
4 changes: 2 additions & 2 deletions ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use aptos_db_indexer::{
use aptos_mempool::MempoolClientSender;
use aptos_storage_interface::DbReaderWriter;
use aptos_types::{chain_id::ChainId, transaction::Version};
use std::sync::Arc;
use std::{sync::Arc, time::Instant};
use tokio::{runtime::Runtime, sync::watch::Receiver as WatchReceiver};

const INDEX_ASYNC_V2_DB_NAME: &str = "index_indexer_async_v2_db";
Expand All @@ -24,7 +24,7 @@ pub fn bootstrap_internal_indexer_db(
config: &NodeConfig,
db_rw: DbReaderWriter,
internal_indexer_db: Option<InternalIndexerDB>,
update_receiver: Option<WatchReceiver<Version>>,
update_receiver: Option<WatchReceiver<(Instant, Version)>>,
) -> Option<(Runtime, Arc<DBIndexer>)> {
if !config.indexer_db_config.is_internal_indexer_db_enabled() || internal_indexer_db.is_none() {
return None;
Expand Down
8 changes: 4 additions & 4 deletions execution/executor/tests/internal_indexer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,10 @@ fn test_db_indexer_data() {
// assert the data matches the expected data
let version = internal_indexer_db.get_persisted_version().unwrap();
assert_eq!(version, None);
let mut start_version = version.map_or(0, |v| v + 1);
while start_version < total_version {
start_version = db_indexer.process_a_batch(start_version).unwrap();
}
let start_version = version.map_or(0, |v| v + 1);
db_indexer
.process_a_batch(start_version, total_version)
.unwrap();
// wait for the commit to finish
thread::sleep(Duration::from_millis(100));
// indexer has process all the transactions
Expand Down
2 changes: 1 addition & 1 deletion storage/aptosdb/src/db/include/aptosdb_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ impl AptosDB {
LATEST_TXN_VERSION.set(version as i64);
if let Some(update_sender) = &self.update_subscriber {
update_sender.send(
version
(Instant::now(), version)
).map_err(| err | {
AptosDbError::Other(format!("Failed to send update to subscriber: {}", err))
})?;
Expand Down
7 changes: 5 additions & 2 deletions storage/aptosdb/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub struct AptosDB {
commit_lock: std::sync::Mutex<()>,
indexer: Option<Indexer>,
skip_index_and_usage: bool,
update_subscriber: Option<Sender<Version>>,
update_subscriber: Option<Sender<(Instant, Version)>>,
}

// DbReader implementations and private functions used by them.
Expand Down Expand Up @@ -189,7 +189,10 @@ impl AptosDB {
Ok((ledger_db, state_merkle_db, state_kv_db))
}

pub fn add_version_update_subscriber(&mut self, sender: Sender<Version>) -> Result<()> {
pub fn add_version_update_subscriber(
&mut self,
sender: Sender<(Instant, Version)>,
) -> Result<()> {
self.update_subscriber = Some(sender);
Ok(())
}
Expand Down
5 changes: 2 additions & 3 deletions storage/aptosdb/src/fast_sync_storage_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ use aptos_types::{
transaction::{TransactionOutputListWithProof, Version},
};
use either::Either;
use std::sync::Arc;
use std::{sync::Arc, time::Instant};
use tokio::sync::watch::Sender;

pub const SECONDARY_DB_DIR: &str = "fast_sync_secondary";

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
Expand All @@ -44,7 +43,7 @@ impl FastSyncStorageWrapper {
pub fn initialize_dbs(
config: &NodeConfig,
internal_indexer_db: Option<InternalIndexerDB>,
update_sender: Option<Sender<Version>>,
update_sender: Option<Sender<(Instant, Version)>>,
) -> Result<Either<AptosDB, Self>> {
let mut db_main = AptosDB::open(
config.storage.get_dir_paths(),
Expand Down
25 changes: 20 additions & 5 deletions storage/indexer/src/db_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,8 @@ impl DBIndexer {
Ok(zipped)
}

fn get_num_of_transactions(&self, version: Version) -> Result<u64> {
let highest_version = self.main_db_reader.ensure_synced_version()?;
fn get_num_of_transactions(&self, version: Version, end_version: Version) -> Result<u64> {
let highest_version = min(self.main_db_reader.ensure_synced_version()?, end_version);
if version > highest_version {
// In case main db is not synced yet or recreated
return Ok(0);
Expand All @@ -392,10 +392,25 @@ impl DBIndexer {
Ok(num_of_transaction)
}

pub fn process_a_batch(&self, start_version: Version) -> Result<Version> {
let _timer = TIMER.with_label_values(&["process_a_batch"]).start_timer();
/// Process all transactions from `start_version` to `end_version`
pub fn process(&self, start_version: Version, end_version: Version) -> Result<Version> {
let mut version = start_version;
let num_transactions = self.get_num_of_transactions(version)?;
while version < end_version {
let next_version = self.process_a_batch(version, end_version)?;
if next_version == version {
break;
}
version = next_version;
}
Ok(version)
}

/// Process a batch of transactions that is within the range of `start_version` to `end_version`
pub fn process_a_batch(&self, start_version: Version, end_version: Version) -> Result<Version> {
let _timer: aptos_metrics_core::HistogramTimer =
TIMER.with_label_values(&["process_a_batch"]).start_timer();
let mut version = start_version;
let num_transactions = self.get_num_of_transactions(version, end_version)?;
// This promises num_transactions should be readable from main db
let mut db_iter = self.get_main_db_iter(version, num_transactions)?;
let batch = SchemaBatch::new();
Expand Down

0 comments on commit cc5d099

Please sign in to comment.