Skip to content

Commit

Permalink
feat!(consensus): implement preshards and shard groups
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Jul 30, 2024
1 parent 5b0820e commit f0696f0
Show file tree
Hide file tree
Showing 113 changed files with 2,365 additions and 1,423 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions applications/tari_dan_app_utilities/src/base_layer_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use tari_crypto::{
ristretto::RistrettoPublicKey,
tari_utilities::{hex::Hex, ByteArray},
};
use tari_dan_common_types::{optional::Optional, shard::Shard, Epoch, NodeAddressable, NodeHeight};
use tari_dan_common_types::{optional::Optional, shard::Shard, Epoch, NodeAddressable, NodeHeight, ShardGroup};
use tari_dan_storage::{
consensus_models::{Block, SubstateRecord},
global::{GlobalDb, MetadataKey},
Expand Down Expand Up @@ -459,7 +459,11 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
});
self.state_store
.with_write_tx(|tx| {
let genesis = Block::genesis(self.network, Epoch(0), Shard::zero());
let genesis = Block::genesis(
self.network,
Epoch(0),
ShardGroup::all_shards(self.consensus_constants.num_preshards),
);

// TODO: This should be proposed in a block...
SubstateRecord {
Expand Down
20 changes: 19 additions & 1 deletion applications/tari_dan_app_utilities/src/consensus_constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::time::Duration;

use tari_common::configuration::Network;
use tari_dan_common_types::NumPreshards;

#[derive(Clone, Debug)]
pub struct ConsensusConstants {
pub base_layer_confirmations: u64,
pub committee_size: u32,
pub max_base_layer_blocks_ahead: u64,
pub max_base_layer_blocks_behind: u64,
pub num_preshards: NumPreshards,
pub pacemaker_max_base_time: std::time::Duration,
}

Expand All @@ -36,7 +42,19 @@ impl ConsensusConstants {
committee_size: 7,
max_base_layer_blocks_ahead: 5,
max_base_layer_blocks_behind: 5,
pacemaker_max_base_time: std::time::Duration::from_secs(10),
num_preshards: NumPreshards::SixtyFour,
pacemaker_max_base_time: Duration::from_secs(10),
}
}
}

impl From<Network> for ConsensusConstants {
fn from(network: Network) -> Self {
match network {
Network::MainNet => unimplemented!("Mainnet consensus constants not implemented"),
Network::StageNet | Network::NextNet | Network::LocalNet | Network::Igor | Network::Esmeralda => {
Self::devnet()
},
}
}
}
1 change: 1 addition & 0 deletions applications/tari_indexer/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ pub async fn spawn_services(
let validator_node_client_factory = TariValidatorNodeRpcClientFactory::new(networking.clone());
let (epoch_manager, _) = tari_epoch_manager::base_layer::spawn_service(
EpochManagerConfig {
num_preshards: consensus_constants.num_preshards,
base_layer_confirmations: consensus_constants.base_layer_confirmations,
committee_size: consensus_constants
.committee_size
Expand Down
63 changes: 35 additions & 28 deletions applications/tari_indexer/src/event_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use log::*;
use tari_bor::decode;
use tari_common::configuration::Network;
use tari_crypto::tari_utilities::message_format::MessageFormat;
use tari_dan_common_types::{committee::Committee, shard::Shard, Epoch, PeerAddress};
use tari_dan_app_utilities::consensus_constants::ConsensusConstants;
use tari_dan_common_types::{committee::Committee, Epoch, NumPreshards, PeerAddress, ShardGroup};
use tari_dan_p2p::proto::rpc::{GetTransactionResultRequest, PayloadResultStatus, SyncBlocksRequest};
use tari_dan_storage::consensus_models::{Block, BlockId, Decision, TransactionRecord};
use tari_engine_types::{
Expand Down Expand Up @@ -128,15 +129,15 @@ impl EventScanner {

let current_epoch = self.epoch_manager.current_epoch().await?;
let current_committees = self.epoch_manager.get_committees(current_epoch).await?;
for (shard, mut committee) in current_committees {
for (shard_group, mut committee) in current_committees {
info!(
target: LOG_TARGET,
"Scanning committee epoch={}, shard={}",
"Scanning committee epoch={}, sg={}",
current_epoch,
shard
shard_group
);
let new_blocks = self
.get_new_blocks_from_committee(shard, &mut committee, current_epoch)
.get_new_blocks_from_committee(shard_group, &mut committee, current_epoch)
.await?;
info!(
target: LOG_TARGET,
Expand Down Expand Up @@ -409,24 +410,27 @@ impl EventScanner {
.collect()
}

fn build_genesis_block_id(&self) -> BlockId {
let start_block = Block::zero_block(self.network);
fn build_genesis_block_id(&self, num_preshards: NumPreshards) -> BlockId {
// TODO: this should return the actual genesis for the shard group and epoch
let start_block = Block::zero_block(self.network, num_preshards);
*start_block.id()
}

#[allow(unused_assignments)]
async fn get_new_blocks_from_committee(
&self,
shard: Shard,
shard_group: ShardGroup,
committee: &mut Committee<PeerAddress>,
epoch: Epoch,
) -> Result<Vec<Block>, anyhow::Error> {
// We start scanning from the last scanned block for this commitee
let start_block_id = {
let mut tx = self.substate_store.create_read_tx()?;
tx.get_last_scanned_block_id(epoch, shard)?
};
let start_block_id = start_block_id.unwrap_or(self.build_genesis_block_id());
let start_block_id = self
.substate_store
.with_read_tx(|tx| tx.get_last_scanned_block_id(epoch, shard_group))?;
let start_block_id = start_block_id.unwrap_or_else(|| {
let consensus_constants = ConsensusConstants::from(self.network);
self.build_genesis_block_id(consensus_constants.num_preshards)
});

committee.shuffle();
let mut last_block_id = start_block_id;
Expand All @@ -436,16 +440,16 @@ impl EventScanner {
"Scanning new blocks since {} from (epoch={}, shard={})",
last_block_id,
epoch,
shard
shard_group
);

for member in committee.members() {
debug!(
target: LOG_TARGET,
"Trying to get blocks from VN {} (epoch={}, shard={})",
"Trying to get blocks from VN {} (epoch={}, shard_group={})",
member,
epoch,
shard
shard_group
);
let resp = self.get_blocks_from_vn(member, start_block_id).await;

Expand All @@ -454,27 +458,27 @@ impl EventScanner {
// TODO: try more than 1 VN per commitee
info!(
target: LOG_TARGET,
"Got {} blocks from VN {} (epoch={}, shard={})",
"Got {} blocks from VN {} (epoch={}, shard_group={})",
blocks.len(),
member,
epoch,
shard,
shard_group,
);
if let Some(block) = blocks.last() {
last_block_id = *block.id();
}
// Store the latest scanned block id in the database for future scans
self.save_scanned_block_id(epoch, shard, last_block_id)?;
self.save_scanned_block_id(epoch, shard_group, last_block_id)?;
return Ok(blocks);
},
Err(e) => {
// We do nothing on a single VN failure, we only log it
warn!(
target: LOG_TARGET,
"Could not get blocks from VN {} (epoch={}, shard={}): {}",
"Could not get blocks from VN {} (epoch={}, shard_group={}): {}",
member,
epoch,
shard,
shard_group,
e
);
},
Expand All @@ -484,22 +488,25 @@ impl EventScanner {
// We don't raise an error if none of the VNs have blocks, the scanning will retry eventually
warn!(
target: LOG_TARGET,
"Could not get blocks from any of the VNs of the committee (epoch={}, shard={})",
"Could not get blocks from any of the VNs of the committee (epoch={}, shard_group={})",
epoch,
shard
shard_group
);
Ok(vec![])
}

fn save_scanned_block_id(&self, epoch: Epoch, shard: Shard, last_block_id: BlockId) -> Result<(), anyhow::Error> {
fn save_scanned_block_id(
&self,
epoch: Epoch,
shard_group: ShardGroup,
last_block_id: BlockId,
) -> Result<(), anyhow::Error> {
let row = NewScannedBlockId {
epoch: epoch.0 as i64,
shard: i64::from(shard.as_u32()),
shard_group: shard_group.encode_as_u32() as i32,
last_block_id: last_block_id.as_bytes().to_vec(),
};
let mut tx = self.substate_store.create_write_tx()?;
tx.save_scanned_block_id(row)?;
tx.commit()?;
self.substate_store.with_write_tx(|tx| tx.save_scanned_block_id(row))?;
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ alter table events
add column version integer not null;

alter table events
add column component_address string null;
add column component_address text null;

-- drop previous index
drop index unique_events_indexer;
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
-- Latests scanned blocks, separatedly by committee (epoch + shard)
-- Used mostly for effient scanning of events in the whole network
-- Latest scanned blocks, separately by committee (epoch + shard)
-- Used mostly for efficient scanning of events in the whole network
create table scanned_block_ids
(
id integer not NULL primary key AUTOINCREMENT,
epoch bigint not NULL,
shard bigint not null,
shard_group integer not null,
last_block_id blob not null
);


-- There should only be one last scanned block by committee (epoch + shard)
create unique index scanned_block_ids_unique_commitee on scanned_block_ids (epoch, shard);
create unique index scanned_block_ids_unique_committee on scanned_block_ids (epoch, shard_group);

-- DB index for faster retrieval of the latest block by committee
create index scanned_block_ids_commitee on scanned_block_ids (epoch, shard);
create index scanned_block_ids_committee on scanned_block_ids (epoch, shard_group);
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,14 @@ impl TryFrom<EventData> for tari_engine_types::events::Event {
pub struct ScannedBlockId {
pub id: i32,
pub epoch: i64,
pub shard: i64,
pub shard_group: i32,
pub last_block_id: Vec<u8>,
}

#[derive(Debug, Clone, Insertable, AsChangeset)]
#[diesel(table_name = scanned_block_ids)]
pub struct NewScannedBlockId {
pub epoch: i64,
pub shard: i64,
pub shard_group: i32,
pub last_block_id: Vec<u8>,
}
52 changes: 29 additions & 23 deletions applications/tari_indexer/src/substate_storage_sqlite/schema.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,11 @@
// @generated automatically by Diesel CLI.

diesel::table! {
non_fungible_indexes (id) {
id -> Integer,
resource_address -> Text,
idx -> Integer,
non_fungible_address -> Text,
}
}

diesel::table! {
substates (id) {
event_payloads (id) {
id -> Integer,
address -> Text,
version -> BigInt,
data -> Text,
tx_hash -> Text,
template_address -> Nullable<Text>,
module_name -> Nullable<Text>,
timestamp -> BigInt,
payload_key -> Text,
payload_value -> Text,
event_id -> Integer,
}
}

Expand All @@ -36,23 +23,42 @@ diesel::table! {
}

diesel::table! {
event_payloads (id) {
non_fungible_indexes (id) {
id -> Integer,
payload_key -> Text,
payload_value -> Text,
event_id -> Integer,
resource_address -> Text,
idx -> Integer,
non_fungible_address -> Text,
}
}

diesel::table! {
scanned_block_ids (id) {
id -> Integer,
epoch -> BigInt,
shard -> BigInt,
shard_group -> Integer,
last_block_id -> Binary,
}
}

diesel::table! {
substates (id) {
id -> Integer,
address -> Text,
version -> BigInt,
data -> Text,
tx_hash -> Text,
template_address -> Nullable<Text>,
module_name -> Nullable<Text>,
timestamp -> BigInt,
}
}

diesel::joinable!(event_payloads -> events (event_id));

diesel::allow_tables_to_appear_in_same_query!(substates, non_fungible_indexes, events, event_payloads);
diesel::allow_tables_to_appear_in_same_query!(
event_payloads,
events,
non_fungible_indexes,
scanned_block_ids,
substates,
);
Loading

0 comments on commit f0696f0

Please sign in to comment.