diff --git a/Cargo.lock b/Cargo.lock index 5db02b8..cd972a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4900,6 +4900,7 @@ dependencies = [ "env_logger", "fastcrypto", "futures", + "hex", "log", "prost", "protobuf-src", diff --git a/Makefile b/Makefile index 7b5d6da..b68d7bb 100644 --- a/Makefile +++ b/Makefile @@ -100,4 +100,4 @@ run-listener: run-spammer: cargo build --bin simple-spammer - RUST_LOG=info ./target/debug/simple-spammer --endpoint $(ENDPOINT) --sleep $(SLEEP) \ No newline at end of file + RUST_LOG=info ./target/debug/simple-spammer --endpoint $(ENDPOINT) --sleep $(SLEEP) diff --git a/README.md b/README.md index b33553e..aaa4688 100644 --- a/README.md +++ b/README.md @@ -37,19 +37,124 @@ sudo apt install protobuf-compiler clang ## How to run -### Local consensus benchmark +### Rollup operator -Check out the [instructions](./benchmark/README.md) +Start with: -### Remote consensus benchmark +``` +make run-operator +``` + +You will end up inside the docker container shell. +Every time you call this target, kernel and docker image will be rebuilt. +Also, existing docker volume and running container will be removed. + +#### Generate new keys + +For convenience, your local .tezos-client folder is mapped into the container in order to preserve the keys. Upon the first launch you need to create new keypair, in order to do that inside the operator shell: + +``` +$ operator generate_key +``` + +#### Check account info +If you already have a key, check it's balance: it should be at least 10k tez to operate a rollup, otherwise top up the balance from the faucet. To get your account address: -### Local DSN setup +``` +$ operator account_info +``` +#### Originate rollup + +``` +$ operator deploy_rollup +``` + +Rollup data is persisted meaning that you can restart the container without data loss. If you try to call this command again it will tell you that there's an existing rollup configuration. Use `--force` flag to remove all data and originate a new one. + +#### Run rollup node + +``` +$ operator run_node +``` + +Runs rollup node in synchronous mode, with logs being printed to stdout. +Also RPC is available at `127.0.0.1:8932` on your host machine. + +### Local DSN cluster + +In order to run 7 consensus nodes on a local machine: +``` +make run-dsn +``` + +Not that the output would be captured. In order to stop them all, type in another terminal: +``` +make kill-dsn +``` +The pre-block streaming API will available at: +- `http://127.0.0.1:64011` +- `http://127.0.0.1:64021` +- `http://127.0.0.1:64031` +- `http://127.0.0.1:64041` +- `http://127.0.0.1:64051` +- `http://127.0.0.1:64061` +- `http://127.0.0.1:64071` -#### Operator +The transaction server API will available at: +- `http://127.0.0.1:64012` +- `http://127.0.0.1:64022` +- `http://127.0.0.1:64032` +- `http://127.0.0.1:64042` +- `http://127.0.0.1:64052` +- `http://127.0.0.1:64062` +- `http://127.0.0.1:64072` -#### DSN +### Consensus node + + +### Sequencer + +Once you have both consensus and rollup nodes running, you can launch sequencer node to test the entire setup: + +``` +make run-sequencer +``` + +#### Mocked rollup + +It is possible to mock rollup node and do local pre-block verification instead: + +``` +make build-sequencer +./target/debug/sequencer --mock-rollup +``` + +#### Mocked consensus + +Similarly, you can make sequencer generate pre-blocks instead of connecting to a DSN + +``` +make build-sequencer +./target/debug/sequencer --mock-consensus +``` + +### DSN listener + +You can also subscribe to a remote DSN and listen for incoming pre-blocks: + +``` +make run-listener ENDPOINT=http://127.0.0.0:64001 FROM_ID=0 +``` + +### DSN spammer + +In order to generate a transaction stream to test latency run: + +``` +make run-spammer ENDPOINT=http://127.0.0.0:64003 SLEEP=10 +``` -#### Sequencer \ No newline at end of file +Every `SLEEP` milliseconds it will connect to remote DSN node and send a transaction of random size + timestamp in the beginning. If you also run a listener you will see stat messages for incoming pre-blocks. \ No newline at end of file diff --git a/benchmark/benchmark/config.py b/benchmark/benchmark/config.py index 823cbb6..dee11f2 100644 --- a/benchmark/benchmark/config.py +++ b/benchmark/benchmark/config.py @@ -167,6 +167,8 @@ def __init__(self, addresses, base_port): host = hosts.pop(0) primary_addr = f'/ip4/{host}/udp/{port}' port += 1 + grpc_address = f'/ip4/127.0.0.1/tcp/{port}' + port += 1 self.json['authorities'][name] = { 'stake': 1, @@ -174,7 +176,8 @@ def __init__(self, addresses, base_port): 'protocol_key_bytes': name, 'primary_address': primary_addr, 'network_key': network_name, - 'hostname': host + 'hostname': host, + 'grpc_address': grpc_address, } def primary_addresses(self, faults=0): diff --git a/crates/pre-block/src/fixture.rs b/crates/pre-block/src/fixture.rs index a246e6f..1f9a873 100644 --- a/crates/pre-block/src/fixture.rs +++ b/crates/pre-block/src/fixture.rs @@ -103,15 +103,16 @@ impl NarwhalFixture { let committee = self.fixture.committee(); let mut signatures = Vec::new(); - // 3 Signers satisfies the 2F + 1 signed stake requirement - for authority in self.fixture.authorities().take(3) { + let num_signers = 2 * (committee.size() - 1) / 3 + 1; + for authority in self.fixture.authorities().take(num_signers) { let vote = authority.vote(&header); signatures.push((vote.author(), vote.signature().clone())); } match CertificateV2::new_unverified(&committee, header, signatures) { Ok(narwhal_types::Certificate::V2(cert)) => cert.into(), - _ => unreachable!(), + Ok(_) => unreachable!(), + Err(err) => panic!("Failed to create cert: {}", err), } } diff --git a/sequencer/src/fixture.rs b/sequencer/src/fixture.rs index ee25f00..cdf4df2 100644 --- a/sequencer/src/fixture.rs +++ b/sequencer/src/fixture.rs @@ -8,14 +8,18 @@ use pre_block::{PreBlock, PublicKey, DsnConfig}; use std::path::PathBuf; use std::sync::mpsc; use std::time::Duration; -use log::info; +use log::{info, error}; + +use crate::consensus_client::PrimaryClient; +use crate::da_batcher::publish_pre_blocks; +use crate::rollup_client::RollupClient; pub async fn generate_pre_blocks( prev_index: u64, pre_blocks_tx: mpsc::Sender, ) -> anyhow::Result<()> { let mut index = prev_index; - let mut fixture = NarwhalFixture::default(); + let mut fixture = NarwhalFixture::new(7); loop { let pre_block = fixture.next_pre_block(1); @@ -68,3 +72,63 @@ pub async fn verify_pre_blocks( tokio::time::sleep(Duration::from_secs(1)).await; } } + +pub async fn run_da_task_with_mocked_consensus( + node_id: u8, + rollup_node_url: String, +) -> anyhow::Result<()> { + info!("[DA task] Starting..."); + + let rollup_client = RollupClient::new(rollup_node_url.clone()); + let smart_rollup_address = rollup_client.connect().await?; + + loop { + let from_id = rollup_client.get_next_index().await?; + let (tx, rx) = mpsc::channel(); + info!("[DA task] Starting from index #{}", from_id); + + tokio::select! { + res = generate_pre_blocks(from_id - 1, tx) => { + if let Err(err) = res { + error!("[DA generate] Failed with: {}", err); + } + }, + res = publish_pre_blocks(&rollup_client, &smart_rollup_address, node_id, rx) => { + if let Err(err) = res { + error!("[DA publish] Failed with: {}", err); + } + }, + }; + + tokio::time::sleep(Duration::from_secs(5)).await; + } +} + +pub async fn run_da_task_with_mocked_rollup( + primary_node_url: String, +) -> anyhow::Result<()> { + info!("[DA task] Starting..."); + + let mut primary_client = PrimaryClient::new(primary_node_url); + + loop { + let from_id = 1; + let (tx, rx) = mpsc::channel(); + info!("[DA task] Starting from index #{}", from_id); + + tokio::select! { + res = primary_client.subscribe_pre_blocks(from_id - 1, tx) => { + if let Err(err) = res { + error!("[DA fetch] Failed with: {}", err); + } + }, + res = verify_pre_blocks(rx) => { + if let Err(err) = res { + error!("[DA verify] Failed with: {}", err); + } + }, + }; + + tokio::time::sleep(Duration::from_secs(5)).await; + } +} diff --git a/sequencer/src/main.rs b/sequencer/src/main.rs index db77ad3..ec5fff5 100644 --- a/sequencer/src/main.rs +++ b/sequencer/src/main.rs @@ -10,7 +10,8 @@ use axum::{ }; use clap::Parser; use consensus_client::WorkerClient; -use log::{error, info, warn}; +use fixture::{run_da_task_with_mocked_consensus, run_da_task_with_mocked_rollup}; +use log::{error, info}; use rollup_client::RollupClient; use serde::{Deserialize, Serialize}; use std::sync::mpsc; @@ -19,7 +20,7 @@ use std::time::Duration; use tokio::signal; use tokio::task::JoinHandle; -use crate::{consensus_client::PrimaryClient, fixture::verify_pre_blocks}; +use crate::consensus_client::PrimaryClient; use crate::da_batcher::publish_pre_blocks; mod consensus_client; @@ -128,7 +129,7 @@ async fn run_api_server( Ok(()) } -async fn run_da_task( +async fn run_da_task_real( node_id: u8, rollup_node_url: String, primary_node_url: String, @@ -136,32 +137,12 @@ async fn run_da_task( info!("[DA task] Starting..."); let rollup_client = RollupClient::new(rollup_node_url.clone()); - let mut connection_attempts = 0; - - let smart_rollup_address = loop { - match rollup_client.get_rollup_address().await { - Ok(res) => break res, - Err(err) => { - connection_attempts += 1; - if connection_attempts == 10 { - error!("[DA task] Max attempts to connect to SR node: {}", err); - return Err(err); - } else { - warn!("[DA task] Attempt #{} {}", connection_attempts, err); - tokio::time::sleep(Duration::from_secs(connection_attempts)).await; - } - } - } - }; - info!( - "Connected to SR node: {} at {}", - smart_rollup_address, rollup_node_url - ); + let smart_rollup_address = rollup_client.connect().await?; let mut primary_client = PrimaryClient::new(primary_node_url); loop { - let from_id = 1; // rollup_client.get_next_index().await?; + let from_id = rollup_client.get_next_index().await?; let (tx, rx) = mpsc::channel(); info!("[DA task] Starting from index #{}", from_id); @@ -171,11 +152,6 @@ async fn run_da_task( error!("[DA fetch] Failed with: {}", err); } }, - // res = verify_pre_blocks(rx) => { - // if let Err(err) = res { - // error!("[DA verify] Failed with: {}", err); - // } - // }, res = publish_pre_blocks(&rollup_client, &smart_rollup_address, node_id, rx) => { if let Err(err) = res { error!("[DA publish] Failed with: {}", err); @@ -187,6 +163,21 @@ async fn run_da_task( } } +async fn run_da_task( + node_id: u8, + rollup_node_url: String, + primary_node_url: String, + mock_consensus: bool, + mock_rollup: bool, +) -> anyhow::Result<()> { + match (mock_consensus, mock_rollup) { + (false, false) => run_da_task_real(node_id, rollup_node_url, primary_node_url).await, + (true, false) => run_da_task_with_mocked_consensus(node_id, rollup_node_url).await, + (false, true) => run_da_task_with_mocked_rollup(primary_node_url).await, + (true, true) => unimplemented!() + } +} + #[derive(Parser, Debug)] struct Args { #[arg(long, default_value_t = String::from("http://localhost:8932"))] @@ -206,6 +197,12 @@ struct Args { #[arg(long, default_value_t = 1)] node_id: u8, + + #[arg(long, default_value_t = false)] + mock_consensus: bool, + + #[arg(long, default_value_t = false)] + mock_rollup: bool, } async fn flatten(handle: JoinHandle>) -> anyhow::Result<()> { @@ -231,7 +228,7 @@ async fn main() { args.rpc_addr, args.rpc_port, rollup_node_url, - args.worker_node_url + args.worker_node_url, ) => res, } }); @@ -239,7 +236,13 @@ async fn main() { let da_task = tokio::spawn(async move { tokio::select! { _ = signal::ctrl_c() => Ok(()), - res = run_da_task(args.node_id, args.rollup_node_url, args.primary_node_url) => res, + res = run_da_task( + args.node_id, + args.rollup_node_url, + args.primary_node_url, + args.mock_consensus, + args.mock_rollup, + ) => res, } }); diff --git a/sequencer/src/rollup_client.rs b/sequencer/src/rollup_client.rs index dda3ae2..ecccbbd 100644 --- a/sequencer/src/rollup_client.rs +++ b/sequencer/src/rollup_client.rs @@ -2,6 +2,9 @@ // // SPDX-License-Identifier: MIT +use std::time::Duration; + +use log::{warn, info, error}; use pre_block::PublicKey; use serde::Deserialize; use tezos_smart_rollup_encoding::smart_rollup::SmartRollupAddress; @@ -48,6 +51,33 @@ impl RollupClient { } } + pub async fn connect(&self) -> anyhow::Result { + let mut connection_attempts = 0; + + let smart_rollup_address = loop { + match self.get_rollup_address().await { + Ok(res) => break res, + Err(err) => { + connection_attempts += 1; + if connection_attempts == 10 { + error!("[DA task] Max attempts to connect to SR node: {}", err); + return Err(err); + } else { + warn!("[DA task] Attempt #{} {}", connection_attempts, err); + tokio::time::sleep(Duration::from_secs(connection_attempts)).await; + } + } + } + }; + + info!( + "Connected to SR node: {} at {}", + smart_rollup_address, self.base_url + ); + + Ok(smart_rollup_address) + } + pub async fn get_rollup_address(&self) -> anyhow::Result { let res = self .client diff --git a/simple-listener/Cargo.toml b/simple-listener/Cargo.toml index ff9ace7..1c4afee 100644 --- a/simple-listener/Cargo.toml +++ b/simple-listener/Cargo.toml @@ -17,6 +17,7 @@ roaring.workspace = true env_logger = "0.10.0" log = "0.4" +hex = "*" [target.'cfg(not(target_env = "msvc"))'.build-dependencies] protobuf-src.workspace = true diff --git a/simple-listener/src/main.rs b/simple-listener/src/main.rs index dd3a03b..9deba6b 100644 --- a/simple-listener/src/main.rs +++ b/simple-listener/src/main.rs @@ -2,7 +2,7 @@ use clap::Parser; use std::time::Duration; use tokio::time::sleep; use tonic::transport::Channel; -use log::{error, info}; +use log::{error, info, warn}; use std::time::{SystemTime, UNIX_EPOCH}; mod exporter { tonic::include_proto!("exporter"); @@ -64,12 +64,13 @@ async fn export( let stats = stats(&subdag); if stats.num_txs > 0 { info!( - "Received subdag #{} (num txs {}, payload size {}, avg latency {} ms, cert time delta {} ms)", + "Received subdag #{} (num txs {}, payload size {}, avg latency {} ms, cert delta {} ms / {} rounds)", subdag.id, stats.num_txs, stats.payload_size, stats.avg_latency, stats.cert_time_delta, + stats.cert_round_delta, ); } else { info!( @@ -90,6 +91,7 @@ pub struct Stats { pub payload_size: usize, pub avg_latency: u128, pub cert_time_delta: u128, + pub cert_round_delta: u64, } fn stats(subdag: &SubDag) -> Stats { @@ -101,13 +103,21 @@ fn stats(subdag: &SubDag) -> Stats { let first_cert_ts = subdag.certificates[0].clone().header.unwrap().created_at as u128; let last_cert_ts = subdag.leader.clone().unwrap().header.unwrap().created_at as u128; + let first_cert_round = subdag.certificates[0].clone().header.unwrap().round; + let last_cert_round = subdag.leader.clone().unwrap().header.unwrap().round; + for payload in subdag.payloads.iter() { for batch in payload.batches.iter() { num_txs += batch.transactions.len(); for tx in batch.transactions.iter() { payload_size += tx.len(); - // FIXME: handle parsing errors - let tx_time_bytes: [u8; 16] = tx[..16].try_into().unwrap(); + let tx_time_bytes: [u8; 16] = match tx.get(0..16) { + Some(value) => value.try_into().unwrap(), + None => { + warn!("Foreign transaction {}", hex::encode(tx)); + continue + } + }; let tx_time = u128::from_be_bytes(tx_time_bytes); sum_latency += subdag_time - tx_time; } @@ -120,5 +130,6 @@ fn stats(subdag: &SubDag) -> Stats { payload_size, avg_latency: if num_txs > 0 { sum_latency / (num_txs as u128) } else { 0 }, cert_time_delta: last_cert_ts - first_cert_ts, + cert_round_delta: last_cert_round - first_cert_round, } }