Skip to content

Commit

Permalink
[dag] streamline payload prefetch
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Feb 21, 2024
1 parent 82d3dbd commit 8c4684d
Show file tree
Hide file tree
Showing 10 changed files with 37 additions and 37 deletions.
2 changes: 1 addition & 1 deletion consensus/src/consensusdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use aptos_schemadb::{
use aptos_storage_interface::AptosDbError;
pub use schema::{
block::BlockSchema,
dag::{CertifiedNodeSchema, DagVoteSchema, DecoupledPayloadSchema, NodeMsgSchema, NodeSchema},
dag::{CertifiedNodeSchema, DagVoteSchema, DecoupledPayloadSchema, NodeMsgSchema},
quorum_certificate::QCSchema,
};
use schema::{
Expand Down
7 changes: 3 additions & 4 deletions consensus/src/dag/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,9 @@ impl OrderedNotifier for OrderedNotifierAdapter {
let timestamp = anchor.metadata().timestamp();
let author = *anchor.author();
let mut validator_txns = vec![];
let mut payload = if matches!(anchor.payload(), DagPayload::Decoupled(_)) {
Payload::empty_dag_payload()
} else {
Payload::empty(anchor.payload().is_in_quorum_store())
let mut payload = match anchor.payload() {
DagPayload::Inline(payload) => Payload::empty(payload.is_in_quorum_store()),
DagPayload::Decoupled(_) => Payload::empty_dag_payload(),
};
let mut node_digests = vec![];
for node in &ordered_nodes {
Expand Down
3 changes: 2 additions & 1 deletion consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ impl DagBootstrapper {
dag_store.clone(),
payload_store.clone(),
payload_requester,
self.payload_manager.clone(),
));

let chain_health: Arc<dyn TChainHealth> = ChainHealthBackoff::new(
Expand Down Expand Up @@ -813,7 +814,7 @@ pub(super) fn bootstrap_dag_for_test(
let (_base_state, handler, fetch_service, payload_fetch_service) = bootstraper.full_bootstrap();

let (dag_rpc_tx, dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None);
let (payload_link_tx, mut payload_link_rx) = mpsc::unbounded_channel();
let (_payload_link_tx, mut payload_link_rx) = mpsc::unbounded_channel();

let dh_handle = tokio::spawn(async move {
let mut dag_rpc_rx = dag_rpc_rx;
Expand Down
10 changes: 5 additions & 5 deletions consensus/src/dag/dag_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,27 +486,27 @@ impl DagStore {
pub fn new_empty(
epoch_state: Arc<EpochState>,
storage: Arc<dyn DAGStorage>,
payload_manager: Arc<dyn TPayloadManager>,
external_payload_manager: Arc<dyn TPayloadManager>,
start_round: Round,
window_size: u64,
) -> Self {
let dag = InMemDag::new_empty(epoch_state, start_round, window_size);
Self {
dag: RwLock::new(dag),
storage,
external_payload_manager: payload_manager,
external_payload_manager,
}
}

pub fn new_for_test(
dag: InMemDag,
storage: Arc<dyn DAGStorage>,
payload_manager: Arc<dyn TPayloadManager>,
external_payload_manager: Arc<dyn TPayloadManager>,
) -> Self {
Self {
dag: RwLock::new(dag),
storage,
external_payload_manager: payload_manager,
external_payload_manager,
}
}

Expand All @@ -522,7 +522,7 @@ impl DagStore {

debug!("Added node {}", node.id());
self.external_payload_manager
.prefetch_payload_data(node.payload(), node.metadata().timestamp());
.prefetch_dag_payload_data(node.payload(), node.metadata().timestamp());

self.dag.write().add_validated_node(node)
}
Expand Down
3 changes: 2 additions & 1 deletion consensus/src/dag/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ pub use dag_network::{RpcHandler, RpcWithFallback, TDAGNetworkSender};
#[cfg(test)]
pub use types::Extensions;
pub use types::{
CertifiedNode, DAGMessage, DAGNetworkMessage, DAGRpcResult, Node, NodeId, NodeMessage, Vote,
CertifiedNode, DAGMessage, DAGNetworkMessage, DAGRpcResult, DagPayload, Node, NodeId,
NodeMessage, Vote,
};
8 changes: 7 additions & 1 deletion consensus/src/dag/payload/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use super::{
payload_fetcher::PayloadRequester,
store::{DagPayloadStore, DagPayloadStoreError},
};
use crate::dag::{dag_store::DagStore, types::DagPayload, CertifiedNode};
use crate::{
dag::{dag_store::DagStore, types::DagPayload, CertifiedNode},
payload_manager::TPayloadManager,
};
use anyhow::bail;
use aptos_collections::BoundedVecDeque;
use aptos_consensus_types::{
Expand All @@ -26,19 +29,22 @@ pub struct DagPayloadManager {
payload_store: Arc<DagPayloadStore>,
requester: PayloadRequester,
waiters: DashMap<PayloadDigest, BoundedVecDeque<oneshot::Sender<Vec<SignedTransaction>>>>,
external_payload_manager: Arc<dyn TPayloadManager>,
}

impl DagPayloadManager {
pub fn new(
dag_store: Arc<DagStore>,
payload_store: Arc<DagPayloadStore>,
requester: PayloadRequester,
external_payload_manager: Arc<dyn TPayloadManager>,
) -> Self {
Self {
dag_store,
payload_store,
requester,
waiters: DashMap::new(),
external_payload_manager,
}
}

Expand Down
6 changes: 1 addition & 5 deletions consensus/src/dag/rb_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use super::{
dag_store::DagStore,
health::HealthBackoff,
payload::{DagPayloadManager, TDagPayloadResolver},
NodeMessage,
dag_store::DagStore, health::HealthBackoff, payload::TDagPayloadResolver, NodeMessage,
};
use crate::{
dag::{
Expand All @@ -20,7 +17,6 @@ use crate::{
types::{NodeCertificate, Vote},
NodeId,
},
payload_manager::PayloadManager,
util::is_vtxn_expected,
};
use anyhow::{bail, ensure};
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub(super) const TEST_DAG_WINDOW: u64 = 5;
pub(super) struct MockPayloadManager {}

impl TPayloadManager for MockPayloadManager {
fn prefetch_payload_data(&self, _payload: &Payload, _timestamp: u64) {}
fn prefetch_dag_payload_data(&self, _payload: &DagPayload, _timestamp: u64) {}
}

pub(crate) fn new_certified_node(
Expand Down
21 changes: 6 additions & 15 deletions consensus/src/dag/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ pub enum DagPayload {
impl DagPayload {
pub fn digest(&self) -> Option<&PayloadDigest> {
match self {
DagPayload::Inline(payload) => None,
DagPayload::Inline(_) => None,
DagPayload::Decoupled(info) => Some(info.digest()),
}
}
Expand All @@ -179,17 +179,6 @@ impl DagPayload {
}
}

impl Deref for DagPayload {
type Target = Payload;

fn deref(&self) -> &Self::Target {
match self {
DagPayload::Inline(payload) => payload,
_ => unimplemented!(),
}
}
}

impl From<Payload> for DagPayload {
fn from(payload: Payload) -> Self {
Self::Inline(payload)
Expand Down Expand Up @@ -464,9 +453,11 @@ impl NodeMessage {
}

pub fn payload(&self) -> &Payload {
match &self.decoupled_payload {
Some(payload) => payload.payload(),
None => self.node.payload(),
match self.node.payload() {
DagPayload::Inline(payload) => payload,
DagPayload::Decoupled(_) => self
.decoupled_payload()
.expect("must exist in decoupled mode"),
}
}

Expand Down
12 changes: 9 additions & 3 deletions consensus/src/payload_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::{
counters,
dag::DagPayload,
quorum_store::{batch_store::BatchReader, quorum_store_coordinator::CoordinatorCommand},
};
use aptos_consensus_types::{
Expand All @@ -20,7 +21,7 @@ use std::sync::Arc;
use tokio::sync::{mpsc::UnboundedSender, oneshot};

pub trait TPayloadManager: Send + Sync {
fn prefetch_payload_data(&self, payload: &Payload, timestamp: u64);
fn prefetch_dag_payload_data(&self, payload: &DagPayload, timestamp: u64);
}

pub struct DAGLink {
Expand Down Expand Up @@ -74,8 +75,13 @@ pub enum PayloadManager {
}

impl TPayloadManager for PayloadManager {
fn prefetch_payload_data(&self, payload: &Payload, timestamp: u64) {
self.prefetch_payload_data(payload, timestamp);
fn prefetch_dag_payload_data(&self, payload: &DagPayload, timestamp: u64) {
match payload {
DagPayload::Inline(payload) => self.prefetch_payload_data(payload, timestamp),
DagPayload::Decoupled(info) => {
self.prefetch_payload_data(&Payload::DAG(info.clone().into()), timestamp)
},
}
}
}

Expand Down

0 comments on commit 8c4684d

Please sign in to comment.