Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(node_framework): Add tree api server & client to the metadata calculator #1885

Merged
merged 9 commits into from
May 14, 2024
1 change: 1 addition & 0 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ async fn run_tree(
tree_reader
.wait()
.await
.context("Cannot initialize tree reader")?
.run_api_server(address, stop_receiver)
.await
}));
Expand Down
1 change: 1 addition & 0 deletions core/lib/zksync_core/src/api_server/tree/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ async fn merkle_tree_api() {
let api_server = tree_reader
.wait()
.await
.unwrap()
.create_api_server(&api_addr, stop_receiver.clone())
.unwrap();
let local_addr = *api_server.local_addr();
Expand Down
1 change: 1 addition & 0 deletions core/lib/zksync_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,7 @@ async fn run_tree(
tree_reader
.wait()
.await
.context("Cannot initialize tree reader")?
.run_api_server(address, stop_receiver)
.await
}));
Expand Down
15 changes: 7 additions & 8 deletions core/lib/zksync_core/src/metadata_calculator/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

use std::{
collections::{BTreeMap, HashSet},
future,
future::Future,
path::Path,
sync::Arc,
Expand Down Expand Up @@ -88,7 +87,7 @@ impl MerkleTreeHealthCheck {
let weak_reader_for_task = weak_reader.clone();
tokio::spawn(async move {
weak_reader_for_task
.set(reader.wait().await.downgrade())
.set(reader.wait().await.unwrap().downgrade())
.ok();
});

Expand Down Expand Up @@ -355,15 +354,15 @@ impl LazyAsyncTreeReader {
}

/// Waits until the tree is initialized and returns a reader for it.
pub async fn wait(mut self) -> AsyncTreeReader {
pub async fn wait(mut self) -> anyhow::Result<AsyncTreeReader> {
popzxc marked this conversation as resolved.
Show resolved Hide resolved
loop {
if let Some(reader) = self.0.borrow().clone() {
break reader;
}
if self.0.changed().await.is_err() {
tracing::info!("Tree dropped without getting ready; not resolving tree reader");
future::pending::<()>().await;
break Ok(reader);
}
self.0
.changed()
.await
.context("Tree dropped without getting ready; not resolving tree reader")?;
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/lib/zksync_core/src/metadata_calculator/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ mod tests {
.wait_for(|health| matches!(health.status(), HealthStatus::Ready))
.await;
// Wait until the calculator is initialized.
let reader = reader.wait().await;
let reader = reader.wait().await.unwrap();
while reader.clone().info().await.next_l1_batch_number < L1BatchNumber(6) {
tokio::time::sleep(POLL_INTERVAL).await;
}
Expand Down Expand Up @@ -281,7 +281,7 @@ mod tests {
let pruning_task_handle = tokio::spawn(pruning_task.run(stop_receiver));

// Wait until the calculator is initialized.
let reader = reader.wait().await;
let reader = reader.wait().await.unwrap();
let tree_info = reader.clone().info().await;
assert_eq!(
tree_info.next_l1_batch_number,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ async fn entire_recovery_workflow(case: RecoveryWorkflowCase) {
match case {
// Wait until the tree is fully initialized and stop the calculator.
RecoveryWorkflowCase::Stop => {
let tree_info = tree_reader.wait().await.info().await;
let tree_info = tree_reader.wait().await.unwrap().info().await;
assert_eq!(tree_info.root_hash, snapshot_recovery.l1_batch_root_hash);
assert_eq!(tree_info.leaf_count, 200);
assert_eq!(
Expand All @@ -274,7 +274,7 @@ async fn entire_recovery_workflow(case: RecoveryWorkflowCase) {

// Emulate state keeper adding a new L1 batch to Postgres.
RecoveryWorkflowCase::CreateBatch => {
tree_reader.wait().await;
tree_reader.wait().await.unwrap();

let mut storage = storage.start_transaction().await.unwrap();
let mut new_logs = gen_storage_logs(500..600, 1).pop().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion core/node/node_framework/examples/main_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl MainNodeBuilder {
&operations_manager_env_config,
);
self.node
.add_layer(MetadataCalculatorLayer(metadata_calculator_config));
.add_layer(MetadataCalculatorLayer::new(metadata_calculator_config));
Ok(self)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
use std::sync::Arc;
use std::{
net::{Ipv4Addr, SocketAddr},
sync::Arc,
};

use anyhow::Context as _;
use zksync_core::metadata_calculator::{MetadataCalculator, MetadataCalculatorConfig};
use zksync_config::configs::{api::MerkleTreeApiConfig, database::MerkleTreeMode};
use zksync_core::metadata_calculator::{
LazyAsyncTreeReader, MetadataCalculator, MetadataCalculatorConfig,
};
use zksync_storage::RocksDB;

use crate::{
implementations::resources::{
healthcheck::AppHealthCheckResource,
object_store::ObjectStoreResource,
pools::{MasterPool, PoolResource, ReplicaPool},
web3_api::TreeApiClientResource,
},
service::{ServiceContext, StopReceiver},
task::Task,
Expand All @@ -25,11 +32,23 @@ use crate::{
/// - Adds `tree_health_check` to the `ResourceCollection<HealthCheckResource>`.
/// - Adds `metadata_calculator` to the node.
#[derive(Debug)]
popzxc marked this conversation as resolved.
Show resolved Hide resolved
pub struct MetadataCalculatorLayer(pub MetadataCalculatorConfig);
pub struct MetadataCalculatorLayer {
config: MetadataCalculatorConfig,
tree_api_config: Option<MerkleTreeApiConfig>,
}

#[derive(Debug)]
pub struct MetadataCalculatorTask {
metadata_calculator: MetadataCalculator,
impl MetadataCalculatorLayer {
pub fn new(config: MetadataCalculatorConfig) -> Self {
Self {
config,
tree_api_config: None,
}
}

pub fn with_tree_api_config(mut self, tree_api_config: MerkleTreeApiConfig) -> Self {
self.tree_api_config = Some(tree_api_config);
self
}
}

#[async_trait::async_trait]
Expand All @@ -49,15 +68,16 @@ impl WiringLayer for MetadataCalculatorLayer {
.get_custom(10)
.await?;

let object_store = context.get_resource::<ObjectStoreResource>().await.ok(); // OK to be None.
if object_store.is_none() {
tracing::info!(
"Object store is not provided, metadata calculator will run without it."
);
}
let object_store = match self.config.mode {
MerkleTreeMode::Lightweight => None,
popzxc marked this conversation as resolved.
Show resolved Hide resolved
MerkleTreeMode::Full => {
let store = context.get_resource::<ObjectStoreResource>().await?;
Some(store)
}
};

let metadata_calculator = MetadataCalculator::new(
self.0,
self.config,
object_store.map(|store_resource| store_resource.0),
main_pool,
)
Expand All @@ -69,14 +89,33 @@ impl WiringLayer for MetadataCalculatorLayer {
.insert_custom_component(Arc::new(metadata_calculator.tree_health_check()))
.map_err(WiringError::internal)?;

let task = Box::new(MetadataCalculatorTask {
if let Some(tree_api_config) = self.tree_api_config {
let bind_addr = (Ipv4Addr::UNSPECIFIED, tree_api_config.port).into();
let tree_reader = metadata_calculator.tree_reader();
context.add_task(Box::new(TreeApiTask {
bind_addr,
tree_reader,
}));
}

context.insert_resource(TreeApiClientResource(Arc::new(
metadata_calculator.tree_reader(),
)))?;

let metadata_calculator_task = Box::new(MetadataCalculatorTask {
metadata_calculator,
});
context.add_task(task);
context.add_task(metadata_calculator_task);

Ok(())
}
}

#[derive(Debug)]
pub struct MetadataCalculatorTask {
metadata_calculator: MetadataCalculator,
}

#[async_trait::async_trait]
impl Task for MetadataCalculatorTask {
fn name(&self) -> &'static str {
Expand All @@ -93,3 +132,25 @@ impl Task for MetadataCalculatorTask {
result
}
}

#[derive(Debug)]
pub struct TreeApiTask {
bind_addr: SocketAddr,
tree_reader: LazyAsyncTreeReader,
}

#[async_trait::async_trait]
impl Task for TreeApiTask {
fn name(&self) -> &'static str {
"tree_api"
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
self.tree_reader
.wait()
.await
slowli marked this conversation as resolved.
Show resolved Hide resolved
.context("Cannot initialize tree reader")?
.run_api_server(self.bind_addr, stop_receiver.0)
.await
}
}
Loading