Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: da_dispatcher refactoring #3409

Merged
merged 13 commits into from
Jan 8, 2025
39 changes: 25 additions & 14 deletions core/bin/zksync_server/src/node_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,21 +121,21 @@ impl MainNodeBuilder {
self.node.runtime_handle()
}

pub fn get_pubdata_type(&self) -> PubdataType {
pub fn get_pubdata_type(&self) -> anyhow::Result<PubdataType> {
if self.genesis_config.l1_batch_commit_data_generator_mode == L1BatchCommitmentMode::Rollup
{
return PubdataType::Rollup;
return Ok(PubdataType::Rollup);
}

let Some(da_client_config) = self.configs.da_client_config.clone() else {
return PubdataType::NoDA;
};

match da_client_config {
DAClientConfig::Avail(_) => PubdataType::Avail,
DAClientConfig::Celestia(_) => PubdataType::Celestia,
DAClientConfig::Eigen(_) => PubdataType::Eigen,
DAClientConfig::ObjectStore(_) => PubdataType::ObjectStore,
match self.configs.da_client_config.clone() {
None => Err(anyhow::anyhow!("No config for DA client")),
Some(da_client_config) => Ok(match da_client_config {
DAClientConfig::Avail(_) => PubdataType::Avail,
DAClientConfig::Celestia(_) => PubdataType::Celestia,
DAClientConfig::Eigen(_) => PubdataType::Eigen,
DAClientConfig::ObjectStore(_) => PubdataType::ObjectStore,
DAClientConfig::NoDA => PubdataType::NoDA,
}),
}
}

Expand Down Expand Up @@ -273,7 +273,7 @@ impl MainNodeBuilder {
try_load_config!(self.configs.mempool_config),
try_load_config!(wallets.state_keeper),
self.contracts_config.l2_da_validator_addr,
self.get_pubdata_type(),
self.get_pubdata_type()?,
);
let db_config = try_load_config!(self.configs.db_config);
let experimental_vm_config = self
Expand Down Expand Up @@ -551,11 +551,22 @@ impl MainNodeBuilder {
}

fn add_da_client_layer(mut self) -> anyhow::Result<Self> {
let eth_sender_config = try_load_config!(self.configs.eth);
if let Some(sender_config) = eth_sender_config.sender {
if sender_config.pubdata_sending_mode != PubdataSendingMode::Custom {
tracing::warn!("DA dispatcher is enabled, but the pubdata sending mode is not `Custom`. DA client will not be started.");
return Ok(self);
dimazhornyk marked this conversation as resolved.
Show resolved Hide resolved
}
}

let Some(da_client_config) = self.configs.da_client_config.clone() else {
tracing::warn!("No config for DA client, using the NoDA client");
bail!("No config for DA client");
};

if let DAClientConfig::NoDA = da_client_config {
self.node.add_layer(NoDAClientWiringLayer);
return Ok(self);
};
}

let secrets = try_load_config!(self.secrets.data_availability);
match (da_client_config, secrets) {
Expand Down
21 changes: 21 additions & 0 deletions core/lib/config/src/configs/da_client/avail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use zksync_basic_types::secrets::{APIKey, SeedPhrase};
pub const AVAIL_GAS_RELAY_CLIENT_NAME: &str = "GasRelay";
pub const AVAIL_FULL_CLIENT_NAME: &str = "FullClient";

pub const IN_BLOCK_FINALITY_STATE: &str = "inBlock";
pub const FINALIZED_FINALITY_STATE: &str = "finalized";

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(tag = "avail_client")]
pub enum AvailClientConfig {
Expand All @@ -23,6 +26,7 @@ pub struct AvailConfig {
pub struct AvailDefaultConfig {
pub api_node_url: String,
pub app_id: u32,
pub finality_state: Option<String>,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
Expand All @@ -36,3 +40,20 @@ pub struct AvailSecrets {
pub seed_phrase: Option<SeedPhrase>,
pub gas_relay_api_key: Option<APIKey>,
}

impl AvailDefaultConfig {
pub fn finality_state(&self) -> anyhow::Result<String> {
match self.finality_state.clone() {
Some(finality_state) => match finality_state.as_str() {
IN_BLOCK_FINALITY_STATE | FINALIZED_FINALITY_STATE => Ok(finality_state),
_ => Err(anyhow::anyhow!(
"Invalid finality state: {}. Supported values are: {}, {}",
finality_state,
IN_BLOCK_FINALITY_STATE,
FINALIZED_FINALITY_STATE
)),
},
None => Ok(IN_BLOCK_FINALITY_STATE.to_string()),
}
}
}
2 changes: 2 additions & 0 deletions core/lib/config/src/configs/da_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ pub const AVAIL_CLIENT_CONFIG_NAME: &str = "Avail";
pub const CELESTIA_CLIENT_CONFIG_NAME: &str = "Celestia";
pub const EIGEN_CLIENT_CONFIG_NAME: &str = "Eigen";
pub const OBJECT_STORE_CLIENT_CONFIG_NAME: &str = "ObjectStore";
pub const NO_DA_CLIENT_CONFIG_NAME: &str = "NoDA";

#[derive(Debug, Clone, PartialEq)]
pub enum DAClientConfig {
Avail(AvailConfig),
Celestia(CelestiaConfig),
Eigen(EigenConfig),
ObjectStore(ObjectStoreConfig),
NoDA,
}

impl From<AvailConfig> for DAClientConfig {
Expand Down
7 changes: 6 additions & 1 deletion core/lib/config/src/configs/da_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@ use std::time::Duration;

use serde::Deserialize;

/// The default interval between the `da_dispatcher's` iterations.
pub const DEFAULT_POLLING_INTERVAL_MS: u32 = 5000;
pub const DEFAULT_MAX_ROWS_TO_DISPATCH: u32 = 100;
/// The maximum number of rows to fetch from the database in a single query. The value has to be
/// not too high to avoid the dispatcher iteration taking too much time.
pub const DEFAULT_MAX_ROWS_TO_DISPATCH: u32 = 3;
dimazhornyk marked this conversation as resolved.
Show resolved Hide resolved
/// The maximum number of retries for the dispatch of a blob.
pub const DEFAULT_MAX_RETRIES: u16 = 5;
/// Use dummy value as inclusion proof instead of getting it from the client.
pub const DEFAULT_USE_DUMMY_INCLUSION_DATA: bool = false;

#[derive(Debug, Clone, PartialEq, Deserialize)]
Expand Down
1 change: 1 addition & 0 deletions core/lib/config/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,7 @@ impl Distribution<configs::da_client::DAClientConfig> for EncodeDist {
config: AvailClientConfig::FullClient(AvailDefaultConfig {
api_node_url: self.sample(rng),
app_id: self.sample(rng),
finality_state: None,
}),
})
}
Expand Down
1 change: 1 addition & 0 deletions core/lib/env_config/src/da_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ mod tests {
config: AvailClientConfig::FullClient(AvailDefaultConfig {
api_node_url: api_node_url.to_string(),
app_id,
finality_state: None,
}),
})
}
Expand Down
9 changes: 6 additions & 3 deletions core/lib/protobuf_config/src/da_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use zksync_config::configs::{
avail::{AvailClientConfig, AvailConfig, AvailDefaultConfig, AvailGasRelayConfig},
celestia::CelestiaConfig,
eigen::EigenConfig,
DAClientConfig::{Avail, Celestia, Eigen, ObjectStore},
DAClientConfig::{Avail, Celestia, Eigen, NoDA, ObjectStore},
},
};
use zksync_protobuf::{required, ProtoRepr};
Expand All @@ -16,8 +16,7 @@ impl ProtoRepr for proto::DataAvailabilityClient {
type Type = configs::DAClientConfig;

fn read(&self) -> anyhow::Result<Self::Type> {
let config = required(&self.config).context("config")?;

let config = required(&self.config).context("da_client config")?;
let client = match config {
proto::data_availability_client::Config::Avail(conf) => Avail(AvailConfig {
bridge_api_url: required(&conf.bridge_api_url)
Expand All @@ -31,6 +30,7 @@ impl ProtoRepr for proto::DataAvailabilityClient {
.context("api_node_url")?
.clone(),
app_id: *required(&full_client_conf.app_id).context("app_id")?,
finality_state: full_client_conf.finality_state.clone(),
})
}
Some(proto::avail_config::Config::GasRelay(gas_relay_conf)) => {
Expand Down Expand Up @@ -62,6 +62,7 @@ impl ProtoRepr for proto::DataAvailabilityClient {
proto::data_availability_client::Config::ObjectStore(conf) => {
ObjectStore(object_store_proto::ObjectStore::read(conf)?)
}
proto::data_availability_client::Config::NoDa(_) => NoDA,
dimazhornyk marked this conversation as resolved.
Show resolved Hide resolved
};

Ok(client)
Expand All @@ -77,6 +78,7 @@ impl ProtoRepr for proto::DataAvailabilityClient {
proto::avail_config::Config::FullClient(proto::AvailClientConfig {
api_node_url: Some(conf.api_node_url.clone()),
app_id: Some(conf.app_id),
finality_state: conf.finality_state.clone(),
}),
),
AvailClientConfig::GasRelay(conf) => Some(
Expand All @@ -102,6 +104,7 @@ impl ProtoRepr for proto::DataAvailabilityClient {
ObjectStore(config) => proto::data_availability_client::Config::ObjectStore(
object_store_proto::ObjectStore::build(config),
),
NoDA => proto::data_availability_client::Config::NoDa(proto::NoDaConfig {}),
};

Self {
Expand Down
4 changes: 4 additions & 0 deletions core/lib/protobuf_config/src/proto/config/da_client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ message AvailConfig {
message AvailClientConfig {
optional string api_node_url = 1;
optional uint32 app_id = 2;
optional string finality_state = 3;
}

message AvailGasRelayConfig {
Expand All @@ -41,12 +42,15 @@ message EigenConfig {
optional uint64 inclusion_polling_interval_ms = 2;
}

message NoDAConfig {}

message DataAvailabilityClient {
// oneof in protobuf allows for None
oneof config {
AvailConfig avail = 1;
object_store.ObjectStore object_store = 2;
CelestiaConfig celestia = 3;
EigenConfig eigen = 4;
NoDAConfig no_da = 5;
}
}
8 changes: 6 additions & 2 deletions core/node/da_clients/src/avail/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,12 @@ impl AvailClient {
.seed_phrase
.ok_or_else(|| anyhow::anyhow!("Seed phrase is missing"))?;
// these unwraps are safe because we validate in protobuf config
let sdk_client =
RawAvailClient::new(conf.app_id, seed_phrase.0.expose_secret()).await?;
let sdk_client = RawAvailClient::new(
conf.app_id,
seed_phrase.0.expose_secret(),
conf.finality_state()?,
)
.await?;

Ok(Self {
config,
Expand Down
15 changes: 12 additions & 3 deletions core/node/da_clients/src/avail/sdk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const PROTOCOL_VERSION: u8 = 4;
pub(crate) struct RawAvailClient {
app_id: u32,
keypair: Keypair,
finality_state: String,
}

/// Utility type needed for encoding the call data
Expand All @@ -44,11 +45,19 @@ struct BoundedVec<_0>(pub Vec<_0>);
impl RawAvailClient {
pub(crate) const MAX_BLOB_SIZE: usize = 512 * 1024; // 512kb

pub(crate) async fn new(app_id: u32, seed: &str) -> anyhow::Result<Self> {
pub(crate) async fn new(
app_id: u32,
seed: &str,
finality_state: String,
) -> anyhow::Result<Self> {
let mnemonic = Mnemonic::parse(seed)?;
let keypair = Keypair::from_phrase(&mnemonic, None)?;

Ok(Self { app_id, keypair })
Ok(Self {
app_id,
keypair,
finality_state,
})
}

/// Returns a hex-encoded extrinsic
Expand Down Expand Up @@ -291,7 +300,7 @@ impl RawAvailClient {
let status = sub.next().await.transpose()?;

if status.is_some() && status.as_ref().unwrap().is_object() {
if let Some(block_hash) = status.unwrap().get("finalized") {
if let Some(block_hash) = status.unwrap().get(self.finality_state.as_str()) {
break block_hash
.as_str()
.ok_or_else(|| anyhow::anyhow!("Invalid block hash"))?
Expand Down
70 changes: 46 additions & 24 deletions core/node/da_dispatcher/src/da_dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{future::Future, time::Duration};
use std::{future::Future, sync::Arc, time::Duration};

use anyhow::Context;
use chrono::Utc;
Expand All @@ -14,7 +14,7 @@ use zksync_types::L1BatchNumber;

use crate::metrics::METRICS;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct DataAvailabilityDispatcher {
client: Box<dyn DataAvailabilityClient>,
pool: ConnectionPool<Core>,
Expand All @@ -35,37 +35,59 @@ impl DataAvailabilityDispatcher {
}

pub async fn run(self, mut stop_receiver: Receiver<bool>) -> anyhow::Result<()> {
loop {
if *stop_receiver.borrow() {
break;
}
let self_arc = Arc::new(self.clone());

let subtasks = futures::future::join(
async {
if let Err(err) = self.dispatch().await {
tracing::error!("dispatch error {err:?}");
}
},
async {
if let Err(err) = self.poll_for_inclusion().await {
tracing::error!("poll_for_inclusion error {err:?}");
}
},
);
let mut stop_receiver_dispatch = stop_receiver.clone();
let mut stop_receiver_poll_for_inclusion = stop_receiver.clone();

let dispatch_task = tokio::spawn(async move {
loop {
if *stop_receiver_dispatch.borrow() {
break;
}

tokio::select! {
_ = subtasks => {},
_ = stop_receiver.changed() => {
if let Err(err) = self_arc.dispatch().await {
tracing::error!("dispatch error {err:?}");
}

if tokio::time::timeout(
self_arc.config.polling_interval(),
stop_receiver_dispatch.changed(),
)
.await
.is_ok()
{
break;
}
}
});

if tokio::time::timeout(self.config.polling_interval(), stop_receiver.changed())
let inclusion_task = tokio::spawn(async move {
loop {
if *stop_receiver_poll_for_inclusion.borrow() {
break;
}

if let Err(err) = self.poll_for_inclusion().await {
tracing::error!("poll_for_inclusion error {err:?}");
}

if tokio::time::timeout(
self.config.polling_interval(),
stop_receiver_poll_for_inclusion.changed(),
)
.await
.is_ok()
{
break;
{
break;
}
}
});

tokio::select! {
_ = dispatch_task => {},
_ = inclusion_task => {},
_ = stop_receiver.changed() => {},
}

tracing::info!("Stop signal received, da_dispatcher is shutting down");
Expand Down
2 changes: 2 additions & 0 deletions etc/env/file_based/overrides/validium.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ eth:
state_keeper:
pubdata_overhead_part: 0
compute_overhead_part: 1
da_client:
no_da: {}
Loading