From 8c4684da4541217368c46d5c8b62100a9be338ec Mon Sep 17 00:00:00 2001 From: Balaji Arun Date: Tue, 20 Feb 2024 20:55:05 -0800 Subject: [PATCH] [dag] streamline payload prefetch --- consensus/src/consensusdb/mod.rs | 2 +- consensus/src/dag/adapter.rs | 7 +++---- consensus/src/dag/bootstrap.rs | 3 ++- consensus/src/dag/dag_store.rs | 10 +++++----- consensus/src/dag/mod.rs | 3 ++- consensus/src/dag/payload/manager.rs | 8 +++++++- consensus/src/dag/rb_handler.rs | 6 +----- consensus/src/dag/tests/helpers.rs | 2 +- consensus/src/dag/types.rs | 21 ++++++--------------- consensus/src/payload_manager.rs | 12 +++++++++--- 10 files changed, 37 insertions(+), 37 deletions(-) diff --git a/consensus/src/consensusdb/mod.rs b/consensus/src/consensusdb/mod.rs index da19dab1155fd..d3a64e6af1221 100644 --- a/consensus/src/consensusdb/mod.rs +++ b/consensus/src/consensusdb/mod.rs @@ -17,7 +17,7 @@ use aptos_schemadb::{ use aptos_storage_interface::AptosDbError; pub use schema::{ block::BlockSchema, - dag::{CertifiedNodeSchema, DagVoteSchema, DecoupledPayloadSchema, NodeMsgSchema, NodeSchema}, + dag::{CertifiedNodeSchema, DagVoteSchema, DecoupledPayloadSchema, NodeMsgSchema}, quorum_certificate::QCSchema, }; use schema::{ diff --git a/consensus/src/dag/adapter.rs b/consensus/src/dag/adapter.rs index 823bec72ce493..9612840d1c8a2 100644 --- a/consensus/src/dag/adapter.rs +++ b/consensus/src/dag/adapter.rs @@ -149,10 +149,9 @@ impl OrderedNotifier for OrderedNotifierAdapter { let timestamp = anchor.metadata().timestamp(); let author = *anchor.author(); let mut validator_txns = vec![]; - let mut payload = if matches!(anchor.payload(), DagPayload::Decoupled(_)) { - Payload::empty_dag_payload() - } else { - Payload::empty(anchor.payload().is_in_quorum_store()) + let mut payload = match anchor.payload() { + DagPayload::Inline(payload) => Payload::empty(payload.is_in_quorum_store()), + DagPayload::Decoupled(_) => Payload::empty_dag_payload(), }; let mut node_digests = vec![]; for node in &ordered_nodes { diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs index adc07fd0b4901..21ce1a1e90ade 100644 --- a/consensus/src/dag/bootstrap.rs +++ b/consensus/src/dag/bootstrap.rs @@ -641,6 +641,7 @@ impl DagBootstrapper { dag_store.clone(), payload_store.clone(), payload_requester, + self.payload_manager.clone(), )); let chain_health: Arc = ChainHealthBackoff::new( @@ -813,7 +814,7 @@ pub(super) fn bootstrap_dag_for_test( let (_base_state, handler, fetch_service, payload_fetch_service) = bootstraper.full_bootstrap(); let (dag_rpc_tx, dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None); - let (payload_link_tx, mut payload_link_rx) = mpsc::unbounded_channel(); + let (_payload_link_tx, mut payload_link_rx) = mpsc::unbounded_channel(); let dh_handle = tokio::spawn(async move { let mut dag_rpc_rx = dag_rpc_rx; diff --git a/consensus/src/dag/dag_store.rs b/consensus/src/dag/dag_store.rs index b67ed434fea0c..2d494d1653824 100644 --- a/consensus/src/dag/dag_store.rs +++ b/consensus/src/dag/dag_store.rs @@ -486,7 +486,7 @@ impl DagStore { pub fn new_empty( epoch_state: Arc, storage: Arc, - payload_manager: Arc, + external_payload_manager: Arc, start_round: Round, window_size: u64, ) -> Self { @@ -494,19 +494,19 @@ impl DagStore { Self { dag: RwLock::new(dag), storage, - external_payload_manager: payload_manager, + external_payload_manager, } } pub fn new_for_test( dag: InMemDag, storage: Arc, - payload_manager: Arc, + external_payload_manager: Arc, ) -> Self { Self { dag: RwLock::new(dag), storage, - external_payload_manager: payload_manager, + external_payload_manager, } } @@ -522,7 +522,7 @@ impl DagStore { debug!("Added node {}", node.id()); self.external_payload_manager - .prefetch_payload_data(node.payload(), node.metadata().timestamp()); + .prefetch_dag_payload_data(node.payload(), node.metadata().timestamp()); self.dag.write().add_validated_node(node) } diff --git a/consensus/src/dag/mod.rs b/consensus/src/dag/mod.rs index 13c5f34b54c28..4234b7d5371f6 100644 --- a/consensus/src/dag/mod.rs +++ b/consensus/src/dag/mod.rs @@ -31,5 +31,6 @@ pub use dag_network::{RpcHandler, RpcWithFallback, TDAGNetworkSender}; #[cfg(test)] pub use types::Extensions; pub use types::{ - CertifiedNode, DAGMessage, DAGNetworkMessage, DAGRpcResult, Node, NodeId, NodeMessage, Vote, + CertifiedNode, DAGMessage, DAGNetworkMessage, DAGRpcResult, DagPayload, Node, NodeId, + NodeMessage, Vote, }; diff --git a/consensus/src/dag/payload/manager.rs b/consensus/src/dag/payload/manager.rs index 542f4e6842815..828dcf1fa3906 100644 --- a/consensus/src/dag/payload/manager.rs +++ b/consensus/src/dag/payload/manager.rs @@ -2,7 +2,10 @@ use super::{ payload_fetcher::PayloadRequester, store::{DagPayloadStore, DagPayloadStoreError}, }; -use crate::dag::{dag_store::DagStore, types::DagPayload, CertifiedNode}; +use crate::{ + dag::{dag_store::DagStore, types::DagPayload, CertifiedNode}, + payload_manager::TPayloadManager, +}; use anyhow::bail; use aptos_collections::BoundedVecDeque; use aptos_consensus_types::{ @@ -26,6 +29,7 @@ pub struct DagPayloadManager { payload_store: Arc, requester: PayloadRequester, waiters: DashMap>>>, + external_payload_manager: Arc, } impl DagPayloadManager { @@ -33,12 +37,14 @@ impl DagPayloadManager { dag_store: Arc, payload_store: Arc, requester: PayloadRequester, + external_payload_manager: Arc, ) -> Self { Self { dag_store, payload_store, requester, waiters: DashMap::new(), + external_payload_manager, } } diff --git a/consensus/src/dag/rb_handler.rs b/consensus/src/dag/rb_handler.rs index 74c8d31bc73ef..40efb71467994 100644 --- a/consensus/src/dag/rb_handler.rs +++ b/consensus/src/dag/rb_handler.rs @@ -2,10 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use super::{ - dag_store::DagStore, - health::HealthBackoff, - payload::{DagPayloadManager, TDagPayloadResolver}, - NodeMessage, + dag_store::DagStore, health::HealthBackoff, payload::TDagPayloadResolver, NodeMessage, }; use crate::{ dag::{ @@ -20,7 +17,6 @@ use crate::{ types::{NodeCertificate, Vote}, NodeId, }, - payload_manager::PayloadManager, util::is_vtxn_expected, }; use anyhow::{bail, ensure}; diff --git a/consensus/src/dag/tests/helpers.rs b/consensus/src/dag/tests/helpers.rs index 573c57b0786af..366fc01cfb487 100644 --- a/consensus/src/dag/tests/helpers.rs +++ b/consensus/src/dag/tests/helpers.rs @@ -15,7 +15,7 @@ pub(super) const TEST_DAG_WINDOW: u64 = 5; pub(super) struct MockPayloadManager {} impl TPayloadManager for MockPayloadManager { - fn prefetch_payload_data(&self, _payload: &Payload, _timestamp: u64) {} + fn prefetch_dag_payload_data(&self, _payload: &DagPayload, _timestamp: u64) {} } pub(crate) fn new_certified_node( diff --git a/consensus/src/dag/types.rs b/consensus/src/dag/types.rs index dc0b44709a92f..86da45b2aa1ad 100644 --- a/consensus/src/dag/types.rs +++ b/consensus/src/dag/types.rs @@ -159,7 +159,7 @@ pub enum DagPayload { impl DagPayload { pub fn digest(&self) -> Option<&PayloadDigest> { match self { - DagPayload::Inline(payload) => None, + DagPayload::Inline(_) => None, DagPayload::Decoupled(info) => Some(info.digest()), } } @@ -179,17 +179,6 @@ impl DagPayload { } } -impl Deref for DagPayload { - type Target = Payload; - - fn deref(&self) -> &Self::Target { - match self { - DagPayload::Inline(payload) => payload, - _ => unimplemented!(), - } - } -} - impl From for DagPayload { fn from(payload: Payload) -> Self { Self::Inline(payload) @@ -464,9 +453,11 @@ impl NodeMessage { } pub fn payload(&self) -> &Payload { - match &self.decoupled_payload { - Some(payload) => payload.payload(), - None => self.node.payload(), + match self.node.payload() { + DagPayload::Inline(payload) => payload, + DagPayload::Decoupled(_) => self + .decoupled_payload() + .expect("must exist in decoupled mode"), } } diff --git a/consensus/src/payload_manager.rs b/consensus/src/payload_manager.rs index a9fc801805134..f9dbfaf1c9ca3 100644 --- a/consensus/src/payload_manager.rs +++ b/consensus/src/payload_manager.rs @@ -3,6 +3,7 @@ use crate::{ counters, + dag::DagPayload, quorum_store::{batch_store::BatchReader, quorum_store_coordinator::CoordinatorCommand}, }; use aptos_consensus_types::{ @@ -20,7 +21,7 @@ use std::sync::Arc; use tokio::sync::{mpsc::UnboundedSender, oneshot}; pub trait TPayloadManager: Send + Sync { - fn prefetch_payload_data(&self, payload: &Payload, timestamp: u64); + fn prefetch_dag_payload_data(&self, payload: &DagPayload, timestamp: u64); } pub struct DAGLink { @@ -74,8 +75,13 @@ pub enum PayloadManager { } impl TPayloadManager for PayloadManager { - fn prefetch_payload_data(&self, payload: &Payload, timestamp: u64) { - self.prefetch_payload_data(payload, timestamp); + fn prefetch_dag_payload_data(&self, payload: &DagPayload, timestamp: u64) { + match payload { + DagPayload::Inline(payload) => self.prefetch_payload_data(payload, timestamp), + DagPayload::Decoupled(info) => { + self.prefetch_payload_data(&Payload::DAG(info.clone().into()), timestamp) + }, + } } }