Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dag] decouple payload 3/n #12126

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
06d25ba
[consensus] unify different locks in state_computer
ibalajiarun Feb 16, 2024
b5e0d2e
[dag][bugfix] only fetch nodes in window
ibalajiarun Feb 12, 2024
61ebd96
[dag] try order all nodes after a fetch
ibalajiarun Feb 12, 2024
4fe66e4
[dag] smoke tests
ibalajiarun Feb 12, 2024
eba7043
fix units
ibalajiarun Feb 16, 2024
2d3a5f1
[forge] dag land blocking-style test
ibalajiarun Feb 13, 2024
32edab5
test
ibalajiarun Feb 13, 2024
26e30e6
[dag][racefix] dont enter round with strong links
ibalajiarun Feb 14, 2024
519df20
tune test
ibalajiarun Feb 14, 2024
1824abd
[dag][forge] changing working quorum test
ibalajiarun Feb 14, 2024
ecd67c1
[dag][forge] onchain config enable test
ibalajiarun Feb 14, 2024
5d82fef
test reconfig
ibalajiarun Feb 14, 2024
25f5f77
fix reconfig
ibalajiarun Feb 14, 2024
393be41
tune reconfig
ibalajiarun Feb 18, 2024
6555d99
[dag] introduce internal payload enum
ibalajiarun Feb 20, 2024
596bd3c
[dag] NodeMessage type and tests
ibalajiarun Feb 20, 2024
297e672
[dag] NodeMessage db schema and tests
ibalajiarun Feb 20, 2024
e65e0e2
[dag][db] DecoupledPayload schema
ibalajiarun Feb 20, 2024
ee1b243
[dag][storage] APIs for NodeMessage and DecoupledPayload
ibalajiarun Feb 20, 2024
087b963
[bounded-vec] add pop methods
ibalajiarun Feb 20, 2024
df90355
[dag] payload store
ibalajiarun Feb 20, 2024
eaa0988
[dag] payload fetcher
ibalajiarun Feb 20, 2024
be23bc4
[dag] payload manager basic impl
ibalajiarun Feb 20, 2024
cb5bb85
[dag] new Payload type for decoupled payload
ibalajiarun Feb 20, 2024
a0d57ae
[dag] broadcast NodeMessage instead of Node
ibalajiarun Feb 20, 2024
dd7bbc7
[dag] e2e decoupled payload integration
ibalajiarun Feb 21, 2024
3c3b250
[dag] streamline payload prefetch
ibalajiarun Feb 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion consensus/consensus-types/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,9 @@ impl Block {
None => 0,
Some(payload) => match payload {
Payload::InQuorumStore(pos) => pos.proofs.len(),
Payload::DirectMempool(_txns) => 0,
Payload::DirectMempool(txns) => txns.len(),
Payload::InQuorumStoreWithLimit(pos) => pos.proof_with_data.proofs.len(),
Payload::DAG(bundle) => bundle.len(),
},
}
}
Expand Down
60 changes: 44 additions & 16 deletions consensus/consensus-types/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::proof_of_store::{BatchInfo, ProofOfStore};
use crate::{
dag_payload::DAGPayloadBundle,
proof_of_store::{BatchInfo, ProofOfStore},
};
use anyhow::bail;
use aptos_crypto::HashValue;
use aptos_executor_types::ExecutorResult;
use aptos_infallible::Mutex;
Expand All @@ -12,7 +16,7 @@ use aptos_types::{
};
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use std::{cmp::min, collections::HashSet, fmt, fmt::Write, sync::Arc};
use std::{collections::HashSet, fmt, fmt::Write, sync::Arc};
use tokio::sync::oneshot;

/// The round of a block is a consensus-internal counter, which starts with 0 and increases
Expand Down Expand Up @@ -177,19 +181,26 @@ pub enum Payload {
DirectMempool(Vec<SignedTransaction>),
InQuorumStore(ProofWithData),
InQuorumStoreWithLimit(ProofWithDataWithTxnLimit),
DAG(DAGPayloadBundle),
}

impl Payload {
pub fn transform_to_quorum_store_v2(self, max_txns_to_execute: Option<usize>) -> Self {
pub fn transform_to_quorum_store_v2(
self,
max_txns_to_execute: Option<usize>,
) -> anyhow::Result<Self> {
match self {
Payload::InQuorumStore(proof_with_status) => Payload::InQuorumStoreWithLimit(
Payload::InQuorumStore(proof_with_status) => Ok(Payload::InQuorumStoreWithLimit(
ProofWithDataWithTxnLimit::new(proof_with_status, max_txns_to_execute),
),
)),
Payload::InQuorumStoreWithLimit(_) => {
panic!("Payload is already in quorumStoreV2 format");
bail!("Payload is already in quorumStoreV2 format");
},
Payload::DirectMempool(_) => {
panic!("Payload is in direct mempool format");
bail!("Payload is in direct mempool format");
},
Payload::DAG(_) => {
bail!("Payload is in DAG format");
},
}
}
Expand All @@ -202,6 +213,10 @@ impl Payload {
}
}

pub fn empty_dag_payload() -> Self {
Payload::DAG(DAGPayloadBundle::new_empty())
}

pub fn len(&self) -> usize {
match self {
Payload::DirectMempool(txns) => txns.len(),
Expand All @@ -217,12 +232,11 @@ impl Payload {
.iter()
.map(|proof| proof.num_txns() as usize)
.sum();
if proof_with_status.max_txns_to_execute.is_some() {
min(proof_with_status.max_txns_to_execute.unwrap(), num_txns)
} else {
num_txns
}
proof_with_status
.max_txns_to_execute
.map_or(num_txns, |max_txns| max_txns.min(num_txns))
},
Payload::DAG(payload) => payload.len(),
}
}

Expand All @@ -235,6 +249,7 @@ impl Payload {
|| (proof_with_status.max_txns_to_execute.is_some()
&& proof_with_status.max_txns_to_execute.unwrap() == 0)
},
Payload::DAG(payload) => payload.is_empty(),
}
}

Expand All @@ -245,12 +260,16 @@ impl Payload {
(Payload::InQuorumStoreWithLimit(p1), Payload::InQuorumStoreWithLimit(p2)) => {
p1.extend(p2)
},
(Payload::DAG(p1), Payload::DAG(p2)) => p1.extend(p2),
(_, _) => unreachable!(),
}
}

pub fn is_direct(&self) -> bool {
matches!(self, Payload::DirectMempool(_))
pub fn is_in_quorum_store(&self) -> bool {
matches!(
self,
Payload::InQuorumStore(_) | Payload::InQuorumStoreWithLimit(_)
)
}

/// This is computationally expensive on the first call
Expand All @@ -274,6 +293,7 @@ impl Payload {
.iter()
.map(|proof| proof.num_bytes() as usize)
.sum(),
Payload::DAG(payload) => payload.size(),
}
}

Expand All @@ -296,6 +316,9 @@ impl Payload {
}
Ok(())
},
(false, Payload::DAG(_)) => {
unreachable!("verify() is not expected in DAG mode");
},
(_, _) => Err(anyhow::anyhow!(
"Wrong payload type. Expected Payload::InQuorumStore {} got {} ",
quorum_store_enabled,
Expand All @@ -321,6 +344,9 @@ impl fmt::Display for Payload {
proof_with_status.proof_with_data.proofs.len()
)
},
Payload::DAG(payload) => {
write!(f, "DAG Payload: {}", payload)
},
}
}
}
Expand All @@ -338,9 +364,11 @@ impl From<&Vec<&Payload>> for PayloadFilter {
if exclude_payloads.is_empty() {
return PayloadFilter::Empty;
}
let direct_mode = exclude_payloads.iter().any(|payload| payload.is_direct());
let quorum_store_mode = exclude_payloads
.iter()
.any(|payload| payload.is_in_quorum_store());

if direct_mode {
if !quorum_store_mode {
let mut exclude_txns = Vec::new();
for payload in exclude_payloads {
if let Payload::DirectMempool(txns) = payload {
Expand Down
Loading
Loading