-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dag] decouple payload 3/n #12126
[dag] decouple payload 3/n #12126
Conversation
⏱️ 10h 55m total CI duration on this PR
🚨 3 jobs on the last run were significantly faster/slower than expected
|
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
This stack of pull requests is managed by Graphite. Learn more about stacking. Join @ibalajiarun and the rest of your teammates on Graphite |
5ec01ca
to
8c4684d
Compare
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
59d1466
to
cb5bb85
Compare
8c4684d
to
3c3b250
Compare
This comment has been minimized.
This comment has been minimized.
❌ Forge suite
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where do you call prefetch_payload()?
Also, I do not see all the logic for caching the responses for execution.
// TODO: decide if payload should be fetched here or wait until later | ||
let Some(payload) = | ||
self.payload_manager.get_payload_if_exists(node.as_ref()) | ||
else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is some trade-off here.
task::{block_in_place, JoinHandle}, | ||
}; | ||
use tokio_retry::strategy::ExponentialBackoff; | ||
|
||
#[derive(Clone)] | ||
struct BootstrapBaseState { | ||
dag_store: Arc<DagStore>, | ||
payload_store: Arc<DagPayloadStore>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see this being used
@@ -199,6 +225,7 @@ impl SyncMode { | |||
async fn run_internal( | |||
self, | |||
dag_rpc_rx: &mut Receiver<Author, IncomingDAGRequest>, | |||
_payload_link_rx: &mut mpsc::UnboundedReceiver<PayloadLinkMsg>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this is not run in sync mode? won't it have potential issues that buffer manager keeps sending messages? we should also probably decouple it from the dag tasks?
@@ -57,18 +53,18 @@ impl<T> Stream for FetchWaiter<T> { | |||
} | |||
|
|||
pub trait TFetchRequester: Send + Sync { | |||
fn request_for_node(&self, node: Node) -> anyhow::Result<()>; | |||
fn request_for_node(&self, node: NodeMessage) -> anyhow::Result<()>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we need to change this?
@@ -439,7 +442,7 @@ impl InMemDag { | |||
pub struct DagStore { | |||
dag: RwLock<InMemDag>, | |||
storage: Arc<dyn DAGStorage>, | |||
payload_manager: Arc<dyn TPayloadManager>, | |||
external_payload_manager: Arc<dyn TPayloadManager>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why it's called external
?
error!("Error deleting expired nodes: {:?}", e); | ||
} | ||
payload_digests.into_iter().filter_map(|d| d).collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: just call flatten?
self.config.mempool_txn_pull_timeout_ms, | ||
)) | ||
let mut quorum_store_builder = match (is_dag_enabled, is_quorum_store_enabled) { | ||
(true, true) => unreachable!("not yet supported"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is supported, no?
fn prefetch_payload_data(&self, payload: &Payload, timestamp: u64) { | ||
self.prefetch_payload_data(payload, timestamp); | ||
fn prefetch_dag_payload_data(&self, payload: &DagPayload, timestamp: u64) { | ||
match payload { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks weird, we can convert DagPayload to Payload first then do a single line prefetch?
cb5bb85
to
af74f3b
Compare
This issue is stale because it has been open 45 days with no activity. Remove the |
This issue is stale because it has been open 45 days with no activity. Remove the |
Description
Test Plan