Skip to content

Commit

Permalink
flush buffered state in executor-benchmark
Browse files Browse the repository at this point in the history
expect more reasonable and predicatable numbers.
  • Loading branch information
msmouse committed Jan 7, 2025
1 parent 9ba3c2a commit 7507439
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 1 deletion.
13 changes: 12 additions & 1 deletion execution/executor-benchmark/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
use crate::{
block_preparation::BlockPreparationStage,
ledger_update_stage::{CommitProcessing, LedgerUpdateStage},
metrics::NUM_TXNS,
metrics::{NUM_TXNS, TIMER},
OverallMeasuring, TransactionCommitter, TransactionExecutor,
};
use aptos_block_partitioner::v2::config::PartitionerV2Config;
use aptos_crypto::HashValue;
use aptos_executor::block_executor::BlockExecutor;
use aptos_executor_types::{state_compute_result::StateComputeResult, BlockExecutorTrait};
use aptos_logger::info;
use aptos_metrics_core::TimerHelper;
use aptos_storage_interface::DbWriter;
use aptos_types::{
block_executor::partitioner::ExecutableBlock,
transaction::{Transaction, Version},
Expand Down Expand Up @@ -49,6 +51,7 @@ pub struct Pipeline<V> {
join_handles: Vec<JoinHandle<u64>>,
phantom: PhantomData<V>,
start_pipeline_tx: Option<SyncSender<()>>,
db: Arc<dyn DbWriter>,
}

impl<V> Pipeline<V>
Expand All @@ -63,6 +66,7 @@ where
num_blocks: Option<usize>,
) -> (Self, SyncSender<Vec<Transaction>>) {
let parent_block_id = executor.committed_block_id();
let db = executor.db.writer.clone();
let executor_1 = Arc::new(executor);
let executor_2 = executor_1.clone();
let executor_3 = executor_1.clone();
Expand Down Expand Up @@ -249,6 +253,7 @@ where
join_handles,
phantom: PhantomData,
start_pipeline_tx,
db,
},
raw_block_sender,
)
Expand All @@ -266,6 +271,12 @@ where
counts.push(count);
}
}

{
let _timer = TIMER.timer_with(&["final_db_flush"]);
self.db.try_flush_buffers();
}

counts.into_iter().min()
}
}
Expand Down
4 changes: 4 additions & 0 deletions storage/aptosdb/src/db/include/aptosdb_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ impl DbWriter for AptosDB {
Ok(())
})
}

fn try_flush_buffers(&self) {
self.state_store.buffered_state().lock().sync_commit()
}
}

impl AptosDB {
Expand Down
3 changes: 3 additions & 0 deletions storage/storage-interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,9 @@ pub trait DbWriter: Send + Sync {
) -> Result<()> {
unimplemented!()
}

/// Flush buffered data to disk, if possible.
fn try_flush_buffers(&self) {}
}

#[derive(Clone)]
Expand Down

0 comments on commit 7507439

Please sign in to comment.