Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: trickle validators on layer 1 #1197

Merged
Merged
694 changes: 460 additions & 234 deletions Cargo.lock

Large diffs are not rendered by default.

46 changes: 26 additions & 20 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -131,26 +131,6 @@ tari_p2p = { git = "https://github.com/tari-project/tari.git", branch = "feature
tari_shutdown = { git = "https://github.com/tari-project/tari.git", branch = "feature-dan2" }
tari_storage = { git = "https://github.com/tari-project/tari.git", branch = "feature-dan2" }

#minotari_app_grpc = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#minotari_app_utilities = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#minotari_console_wallet = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#minotari_node = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#minotari_node_grpc_client = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#minotari_wallet = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#minotari_wallet_grpc_client = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#tari_common = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#tari_common_types = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#tari_hashing = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#
## avoid including default features so each crate can choose which ones to import
#tari_core = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2", default-features = false }
#tari_key_manager = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#tari_metrics = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#tari_mmr = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#tari_p2p = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#tari_shutdown = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#tari_storage = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }

tari_crypto = "0.21.0"
tari_utilities = "0.8.0"

Expand Down Expand Up @@ -297,3 +277,29 @@ overflow-checks = true
#tari_libtor = { git = "https://github.com/account/tari.git", branch = "my-branch" }
#tari_hashing = { git = "https://github.com/account/tari.git", branch = "my-branch" }


#[patch."https://github.com/tari-project/tari.git"]
#minotari_app_grpc = { path = "../tari/applications/minotari_app_grpc" }
#minotari_wallet_grpc_client = { path = "../tari/clients/rust/wallet_grpc_client" }
#minotari_node_grpc_client = { path = "../tari/clients/rust/base_node_grpc_client" }
#tari_common = { path = "../tari/common" }
#tari_common_types = { path = "../tari/base_layer/common_types" }
#tari_comms = { path = "../tari/comms/core" }
#tari_comms_rpc_macros = { path = "../tari/comms/rpc_macros" }
#tari_core = { path = "../tari/base_layer/core" }
#tari_key_manager = { path = "../tari/base_layer/key_manager" }
#tari_mmr = { path = "../tari/base_layer/mmr" }
#tari_p2p = { path = "../tari/base_layer/p2p" }
#tari_shutdown = { path = "../tari/infrastructure/shutdown" }
#tari_storage = { path = "../tari/infrastructure/storage" }
#tari_script = { path = "../tari/infrastructure/tari_script" }
#minotari_wallet = { path = "../tari/base_layer/wallet" }
#minotari_console_wallet = { path = "../tari/applications/minotari_console_wallet" }
#tari_service_framework = { path = "../tari/base_layer/service_framework" }
#tari_comms_dht = { path = "../tari/comms/dht" }
#minotari_app_utilities = { path = "../tari/applications/minotari_app_utilities" }
#minotari_node = { path = "../tari/applications/minotari_node" }
#tari_metrics = { path = "../tari/infrastructure/metrics" }
#tari_libtor = { path = "../tari/infrastructure/libtor" }
#tari_hashing = { path = "../tari/hashing" }

1 change: 1 addition & 0 deletions applications/tari_dan_app_utilities/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ tari_bor = { workspace = true, default-features = true }
tari_indexer_lib = { workspace = true }
tari_networking = { workspace = true }
tari_validator_node_rpc = { workspace = true }
minotari_app_grpc = { workspace = true }

anyhow = { workspace = true }
async-trait = { workspace = true }
Expand Down
105 changes: 82 additions & 23 deletions applications/tari_dan_app_utilities/src/base_layer_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use std::time::Duration;

use log::*;
use minotari_app_grpc::tari_rpc::ValidatorNodeChangeState;
use tari_base_node_client::{
grpc::GrpcBaseNodeClient,
types::{BaseLayerMetadata, BlockInfo},
Expand All @@ -42,7 +43,7 @@ use tari_core::transactions::{
};
use tari_crypto::{
ristretto::RistrettoPublicKey,
tari_utilities::{hex::Hex, ByteArray},
tari_utilities::{hex::Hex, ByteArray, ByteArrayError},
};
use tari_dan_common_types::{optional::Optional, NodeAddressable, VersionedSubstateId};
use tari_dan_storage::{
Expand Down Expand Up @@ -106,6 +107,7 @@ pub struct BaseLayerScanner<TAddr> {
last_scanned_height: u64,
last_scanned_tip: Option<FixedHash>,
last_scanned_hash: Option<FixedHash>,
last_scanned_validator_node_mr: Option<FixedHash>,
next_block_hash: Option<FixedHash>,
base_node_client: GrpcBaseNodeClient,
epoch_manager: EpochManagerHandle<TAddr>,
Expand Down Expand Up @@ -141,6 +143,7 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
last_scanned_tip: None,
last_scanned_height: 0,
last_scanned_hash: None,
last_scanned_validator_node_mr: None,
next_block_hash: None,
base_node_client,
epoch_manager,
Expand Down Expand Up @@ -223,6 +226,7 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
);
// TODO: we need to figure out where the fork happened, and delete data after the fork.
self.last_scanned_hash = None;
self.last_scanned_validator_node_mr = None;
self.last_scanned_height = 0;
self.sync_blockchain().await?;
},
Expand Down Expand Up @@ -278,6 +282,7 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
Some(end_height) => end_height,
};
let mut scan = tip.tip_hash;
let mut current_last_validator_nodes_mr = self.last_scanned_validator_node_mr;
loop {
let header = self.base_node_client.get_header_by_hash(scan).await?;
if let Some(last_tip) = self.last_scanned_tip {
Expand All @@ -290,9 +295,60 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
// This will be processed down below.
break;
}
current_last_validator_nodes_mr = Some(header.validator_node_mr);
self.epoch_manager.add_block_hash(header.height, scan).await?;
scan = header.prev_hash;
}

// syncing validator node changes
if current_last_validator_nodes_mr != self.last_scanned_validator_node_mr {
info!(target: LOG_TARGET,
"⛓️ Syncing validator nodes (sidechain ID: {:?}) from base node (height range: {}-{})",
self.validator_node_sidechain_id,
start_scan_height,
end_height,
);

let node_changes = self
.base_node_client
.get_validator_node_changes(start_scan_height, end_height, self.validator_node_sidechain_id.as_ref())
.await
.map_err(BaseLayerScannerError::BaseNodeError)?;

for node_change in node_changes {
if node_change.registration.is_none() {
warn!(
target: LOG_TARGET,
"Can't register validator node \"{}\" because it has empty registration!",
node_change.public_key.to_hex(),
);
continue;
}
let registration = ValidatorNodeRegistration::try_from(node_change.registration.clone().unwrap())
.map_err(BaseLayerScannerError::GrpcConversion)?;
match node_change.state() {
ValidatorNodeChangeState::Add => {
self.add_validator_node_registration(
node_change.start_height,
registration,
node_change.minimum_value_promise.into(),
)
.await?;
},
ValidatorNodeChangeState::Remove => {
self.remove_validator_node_registration(
PublicKey::from_canonical_bytes(node_change.public_key.as_slice())
.map_err(BaseLayerScannerError::PublicKeyConversion)?,
registration.sidechain_id().cloned(),
)
.await?;
},
}
}

self.last_scanned_validator_node_mr = current_last_validator_nodes_mr;
}

for current_height in start_scan_height..=end_height {
let utxos = self
.base_node_client
Expand Down Expand Up @@ -329,27 +385,7 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
};
match sidechain_feature {
SideChainFeature::ValidatorNodeRegistration(reg) => {
info!(
target: LOG_TARGET,
"⛓️ Validator node registration UTXO for {} sidechain {} found at height {}",
reg.public_key(),
reg.sidechain_id().map(|v| v.to_hex()).unwrap_or("None".to_string()),
current_height,
);
if reg.sidechain_id() == self.validator_node_sidechain_id.as_ref() {
self.register_validator_node_registration(
current_height,
reg.clone(),
output.minimum_value_promise,
)
.await?;
} else {
warn!(
target: LOG_TARGET,
"Ignoring validator node registration for sidechain ID {:?}. Expected sidechain ID: {:?}",
reg.sidechain_id().map(|v| v.to_hex()),
self.validator_node_sidechain_id.as_ref().map(|v| v.to_hex()));
}
trace!(target: LOG_TARGET, "New validator node registration scanned: {reg:?}");
},
SideChainFeature::CodeTemplateRegistration(reg) => {
if reg.sidechain_id != self.template_sidechain_id {
Expand Down Expand Up @@ -496,7 +532,7 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
Ok(())
}

async fn register_validator_node_registration(
async fn add_validator_node_registration(
&mut self,
height: u64,
registration: ValidatorNodeRegistration,
Expand All @@ -516,6 +552,25 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
Ok(())
}

async fn remove_validator_node_registration(
&mut self,
public_key: PublicKey,
sidechain_id: Option<PublicKey>,
) -> Result<(), BaseLayerScannerError> {
info!(
target: LOG_TARGET,
"⛓️ Remove validator node registration for {}(side chain ID: {:?})",
public_key,
sidechain_id
);

self.epoch_manager
.remove_validator_node_registration(public_key, sidechain_id)
.await?;

Ok(())
}

async fn register_code_template_registration(
&mut self,
template_name: String,
Expand Down Expand Up @@ -574,6 +629,10 @@ pub enum BaseLayerScannerError {
commitment: Box<Commitment>,
source: StorageError,
},
#[error("Public key conversion error: {0}")]
PublicKeyConversion(ByteArrayError),
#[error("GRPC conversion error: {0}")]
GrpcConversion(String),
}

enum BlockchainProgression {
Expand Down
1 change: 0 additions & 1 deletion applications/tari_indexer/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ pub async fn spawn_services(
.try_into()
.context("committee_size must be non-zero")?,
validator_node_sidechain_id: config.indexer.sidechain_id.clone(),
max_vns_per_epoch_activated: consensus_constants.max_vns_per_epoch_activated,
},
global_db.clone(),
base_node_client.clone(),
Expand Down
18 changes: 9 additions & 9 deletions applications/tari_swarm_daemon/src/process_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,19 +108,19 @@ impl ProcessManager {
};
let num_blocks = num_vns + u64::try_from(templates_to_register.len()).unwrap();

// Mine some initial funds, guessing 10 blocks extra to allow for coinbase maturity
self.mine(num_blocks + 10).await.context("initial mining failed")?;
self.wait_for_wallet_funds(num_blocks)
.await
.context("waiting for wallet funds")?;

if !self.skip_registration {
// Mine some initial funds, guessing 10 blocks extra to allow for coinbase maturity
self.mine(num_blocks + 10).await.context("initial mining failed")?;
self.wait_for_wallet_funds(num_blocks)
.await
.context("waiting for wallet funds")?;

self.register_all_validator_nodes()
.await
.context("registering validator node via GRPC")?;
}
for templates in templates_to_register {
self.register_template(templates).await?;
for templates in templates_to_register {
self.register_template(templates).await?;
}
}

if num_blocks > 0 {
Expand Down
Loading
Loading