-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dag] network sender implementation #9456
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,7 @@ | |
use crate::{ | ||
block_storage::tracing::{observe_block, BlockStage}, | ||
counters, | ||
dag::DAGNetworkMessage, | ||
dag::{DAGMessage, DAGNetworkMessage, RpcWithFallback, TDAGNetworkSender}, | ||
logging::LogEvent, | ||
monitor, | ||
network_interface::{ConsensusMsg, ConsensusNetworkClient}, | ||
|
@@ -29,10 +29,12 @@ use aptos_network::{ | |
protocols::{network::Event, rpc::error::RpcError}, | ||
ProtocolId, | ||
}; | ||
use aptos_reliable_broadcast::{RBMessage, RBNetworkSender}; | ||
use aptos_types::{ | ||
account_address::AccountAddress, epoch_change::EpochChangeProof, | ||
ledger_info::LedgerInfoWithSignatures, validator_verifier::ValidatorVerifier, | ||
}; | ||
use async_trait::async_trait; | ||
use bytes::Bytes; | ||
use fail::fail_point; | ||
use futures::{ | ||
|
@@ -43,6 +45,7 @@ use futures::{ | |
use serde::{de::DeserializeOwned, Serialize}; | ||
use std::{ | ||
mem::{discriminant, Discriminant}, | ||
sync::Arc, | ||
time::Duration, | ||
}; | ||
|
||
|
@@ -404,6 +407,79 @@ impl QuorumStoreSender for NetworkSender { | |
} | ||
} | ||
|
||
// TODO: this can be improved | ||
#[derive(Clone)] | ||
pub struct DAGNetworkSenderImpl { | ||
sender: Arc<NetworkSender>, | ||
time_service: aptos_time_service::TimeService, | ||
} | ||
|
||
impl DAGNetworkSenderImpl { | ||
pub fn new(sender: Arc<NetworkSender>) -> Self { | ||
Self { | ||
sender, | ||
time_service: aptos_time_service::TimeService::real(), | ||
} | ||
} | ||
} | ||
|
||
#[async_trait] | ||
impl TDAGNetworkSender for DAGNetworkSenderImpl { | ||
async fn send_rpc( | ||
&self, | ||
receiver: Author, | ||
message: DAGMessage, | ||
timeout: Duration, | ||
) -> anyhow::Result<DAGMessage> { | ||
self.sender | ||
.consensus_network_client | ||
.send_rpc(receiver, message.into_network_message(), timeout) | ||
.await | ||
.map_err(|e| anyhow!("invalid rpc response: {}", e)) | ||
.and_then(TConsensusMsg::from_network_message) | ||
} | ||
|
||
/// Given a list of potential responders, sending rpc to get response from any of them and could | ||
/// fallback to more in case of failures. | ||
async fn send_rpc_with_fallbacks( | ||
&self, | ||
responders: Vec<Author>, | ||
message: DAGMessage, | ||
retry_interval: Duration, | ||
rpc_timeout: Duration, | ||
) -> RpcWithFallback { | ||
let sender = Arc::new(self.clone()); | ||
RpcWithFallback::new( | ||
responders, | ||
message, | ||
retry_interval, | ||
rpc_timeout, | ||
sender, | ||
self.time_service.clone(), | ||
) | ||
} | ||
} | ||
|
||
#[async_trait] | ||
impl<M> RBNetworkSender<M> for DAGNetworkSenderImpl | ||
where | ||
M: RBMessage + TConsensusMsg + 'static, | ||
{ | ||
async fn send_rb_rpc( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the difference from the DAGNetworkSender send_rpc implementation? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No difference at all. Because we separated out the reliable broadcast to its own crate, we need a separate trait for it. I cannot use the same fn name in two different traits and have NetworkSender implement both of the traits, so this is the way I found. Not sure if there is something better. |
||
&self, | ||
receiver: Author, | ||
message: M, | ||
timeout: Duration, | ||
) -> anyhow::Result<M> { | ||
self.sender | ||
.consensus_network_client | ||
.send_rpc(receiver, message.into_network_message(), timeout) | ||
.await | ||
.map_err(|e| anyhow!("invalid rpc response: {}", e)) | ||
.and_then(|msg| TConsensusMsg::from_network_message(msg)) | ||
} | ||
} | ||
|
||
pub struct NetworkTask { | ||
consensus_messages_tx: aptos_channel::Sender< | ||
(AccountAddress, Discriminant<ConsensusMsg>), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd probably call this DAGNetworkSender and rename the trait to T*