Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(consensus)!: implement preshards and shard groups #1092

Merged
merged 2 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions applications/tari_dan_app_utilities/src/base_layer_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use tari_crypto::{
ristretto::RistrettoPublicKey,
tari_utilities::{hex::Hex, ByteArray},
};
use tari_dan_common_types::{optional::Optional, shard::Shard, Epoch, NodeAddressable, NodeHeight};
use tari_dan_common_types::{optional::Optional, shard::Shard, Epoch, NodeAddressable, NodeHeight, ShardGroup};
use tari_dan_storage::{
consensus_models::{Block, SubstateRecord},
global::{GlobalDb, MetadataKey},
Expand Down Expand Up @@ -459,7 +459,11 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
});
self.state_store
.with_write_tx(|tx| {
let genesis = Block::genesis(self.network, Epoch(0), Shard::zero());
let genesis = Block::genesis(
self.network,
Epoch(0),
ShardGroup::all_shards(self.consensus_constants.num_preshards),
);

// TODO: This should be proposed in a block...
SubstateRecord {
Expand Down
20 changes: 19 additions & 1 deletion applications/tari_dan_app_utilities/src/consensus_constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::time::Duration;

use tari_common::configuration::Network;
use tari_dan_common_types::NumPreshards;

#[derive(Clone, Debug)]
pub struct ConsensusConstants {
pub base_layer_confirmations: u64,
pub committee_size: u32,
pub max_base_layer_blocks_ahead: u64,
pub max_base_layer_blocks_behind: u64,
pub num_preshards: NumPreshards,
pub pacemaker_max_base_time: std::time::Duration,
}

Expand All @@ -36,7 +42,19 @@ impl ConsensusConstants {
committee_size: 7,
max_base_layer_blocks_ahead: 5,
max_base_layer_blocks_behind: 5,
pacemaker_max_base_time: std::time::Duration::from_secs(10),
num_preshards: NumPreshards::SixtyFour,
pacemaker_max_base_time: Duration::from_secs(10),
}
}
}

impl From<Network> for ConsensusConstants {
fn from(network: Network) -> Self {
match network {
Network::MainNet => unimplemented!("Mainnet consensus constants not implemented"),
Network::StageNet | Network::NextNet | Network::LocalNet | Network::Igor | Network::Esmeralda => {
Self::devnet()
},
}
}
}
1 change: 1 addition & 0 deletions applications/tari_indexer/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ pub async fn spawn_services(
let validator_node_client_factory = TariValidatorNodeRpcClientFactory::new(networking.clone());
let (epoch_manager, _) = tari_epoch_manager::base_layer::spawn_service(
EpochManagerConfig {
num_preshards: consensus_constants.num_preshards,
base_layer_confirmations: consensus_constants.base_layer_confirmations,
committee_size: consensus_constants
.committee_size
Expand Down
63 changes: 35 additions & 28 deletions applications/tari_indexer/src/event_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use log::*;
use tari_bor::decode;
use tari_common::configuration::Network;
use tari_crypto::tari_utilities::message_format::MessageFormat;
use tari_dan_common_types::{committee::Committee, shard::Shard, Epoch, PeerAddress};
use tari_dan_app_utilities::consensus_constants::ConsensusConstants;
use tari_dan_common_types::{committee::Committee, Epoch, NumPreshards, PeerAddress, ShardGroup};
use tari_dan_p2p::proto::rpc::{GetTransactionResultRequest, PayloadResultStatus, SyncBlocksRequest};
use tari_dan_storage::consensus_models::{Block, BlockId, Decision, TransactionRecord};
use tari_engine_types::{
Expand Down Expand Up @@ -128,15 +129,15 @@ impl EventScanner {

let current_epoch = self.epoch_manager.current_epoch().await?;
let current_committees = self.epoch_manager.get_committees(current_epoch).await?;
for (shard, mut committee) in current_committees {
for (shard_group, mut committee) in current_committees {
info!(
target: LOG_TARGET,
"Scanning committee epoch={}, shard={}",
"Scanning committee epoch={}, sg={}",
current_epoch,
shard
shard_group
);
let new_blocks = self
.get_new_blocks_from_committee(shard, &mut committee, current_epoch)
.get_new_blocks_from_committee(shard_group, &mut committee, current_epoch)
.await?;
info!(
target: LOG_TARGET,
Expand Down Expand Up @@ -409,24 +410,27 @@ impl EventScanner {
.collect()
}

fn build_genesis_block_id(&self) -> BlockId {
let start_block = Block::zero_block(self.network);
fn build_genesis_block_id(&self, num_preshards: NumPreshards) -> BlockId {
// TODO: this should return the actual genesis for the shard group and epoch
let start_block = Block::zero_block(self.network, num_preshards);
*start_block.id()
}

#[allow(unused_assignments)]
async fn get_new_blocks_from_committee(
&self,
shard: Shard,
shard_group: ShardGroup,
committee: &mut Committee<PeerAddress>,
epoch: Epoch,
) -> Result<Vec<Block>, anyhow::Error> {
// We start scanning from the last scanned block for this commitee
let start_block_id = {
let mut tx = self.substate_store.create_read_tx()?;
tx.get_last_scanned_block_id(epoch, shard)?
};
let start_block_id = start_block_id.unwrap_or(self.build_genesis_block_id());
let start_block_id = self
.substate_store
.with_read_tx(|tx| tx.get_last_scanned_block_id(epoch, shard_group))?;
let start_block_id = start_block_id.unwrap_or_else(|| {
let consensus_constants = ConsensusConstants::from(self.network);
self.build_genesis_block_id(consensus_constants.num_preshards)
});

committee.shuffle();
let mut last_block_id = start_block_id;
Expand All @@ -436,16 +440,16 @@ impl EventScanner {
"Scanning new blocks since {} from (epoch={}, shard={})",
last_block_id,
epoch,
shard
shard_group
);

for member in committee.members() {
debug!(
target: LOG_TARGET,
"Trying to get blocks from VN {} (epoch={}, shard={})",
"Trying to get blocks from VN {} (epoch={}, shard_group={})",
member,
epoch,
shard
shard_group
);
let resp = self.get_blocks_from_vn(member, start_block_id).await;

Expand All @@ -454,27 +458,27 @@ impl EventScanner {
// TODO: try more than 1 VN per commitee
info!(
target: LOG_TARGET,
"Got {} blocks from VN {} (epoch={}, shard={})",
"Got {} blocks from VN {} (epoch={}, shard_group={})",
blocks.len(),
member,
epoch,
shard,
shard_group,
);
if let Some(block) = blocks.last() {
last_block_id = *block.id();
}
// Store the latest scanned block id in the database for future scans
self.save_scanned_block_id(epoch, shard, last_block_id)?;
self.save_scanned_block_id(epoch, shard_group, last_block_id)?;
return Ok(blocks);
},
Err(e) => {
// We do nothing on a single VN failure, we only log it
warn!(
target: LOG_TARGET,
"Could not get blocks from VN {} (epoch={}, shard={}): {}",
"Could not get blocks from VN {} (epoch={}, shard_group={}): {}",
member,
epoch,
shard,
shard_group,
e
);
},
Expand All @@ -484,22 +488,25 @@ impl EventScanner {
// We don't raise an error if none of the VNs have blocks, the scanning will retry eventually
warn!(
target: LOG_TARGET,
"Could not get blocks from any of the VNs of the committee (epoch={}, shard={})",
"Could not get blocks from any of the VNs of the committee (epoch={}, shard_group={})",
epoch,
shard
shard_group
);
Ok(vec![])
}

fn save_scanned_block_id(&self, epoch: Epoch, shard: Shard, last_block_id: BlockId) -> Result<(), anyhow::Error> {
fn save_scanned_block_id(
&self,
epoch: Epoch,
shard_group: ShardGroup,
last_block_id: BlockId,
) -> Result<(), anyhow::Error> {
let row = NewScannedBlockId {
epoch: epoch.0 as i64,
shard: i64::from(shard.as_u32()),
shard_group: shard_group.encode_as_u32() as i32,
last_block_id: last_block_id.as_bytes().to_vec(),
};
let mut tx = self.substate_store.create_write_tx()?;
tx.save_scanned_block_id(row)?;
tx.commit()?;
self.substate_store.with_write_tx(|tx| tx.save_scanned_block_id(row))?;
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ alter table events
add column version integer not null;

alter table events
add column component_address string null;
add column component_address text null;

-- drop previous index
drop index unique_events_indexer;
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
-- Latests scanned blocks, separatedly by committee (epoch + shard)
-- Used mostly for effient scanning of events in the whole network
-- Latest scanned blocks, separately by committee (epoch + shard)
-- Used mostly for efficient scanning of events in the whole network
create table scanned_block_ids
(
id integer not NULL primary key AUTOINCREMENT,
epoch bigint not NULL,
shard bigint not null,
shard_group integer not null,
last_block_id blob not null
);


-- There should only be one last scanned block by committee (epoch + shard)
create unique index scanned_block_ids_unique_commitee on scanned_block_ids (epoch, shard);
create unique index scanned_block_ids_unique_committee on scanned_block_ids (epoch, shard_group);

-- DB index for faster retrieval of the latest block by committee
create index scanned_block_ids_commitee on scanned_block_ids (epoch, shard);
create index scanned_block_ids_committee on scanned_block_ids (epoch, shard_group);
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,14 @@ impl TryFrom<EventData> for tari_engine_types::events::Event {
pub struct ScannedBlockId {
pub id: i32,
pub epoch: i64,
pub shard: i64,
pub shard_group: i32,
pub last_block_id: Vec<u8>,
}

#[derive(Debug, Clone, Insertable, AsChangeset)]
#[diesel(table_name = scanned_block_ids)]
pub struct NewScannedBlockId {
pub epoch: i64,
pub shard: i64,
pub shard_group: i32,
pub last_block_id: Vec<u8>,
}
52 changes: 29 additions & 23 deletions applications/tari_indexer/src/substate_storage_sqlite/schema.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,11 @@
// @generated automatically by Diesel CLI.

diesel::table! {
non_fungible_indexes (id) {
id -> Integer,
resource_address -> Text,
idx -> Integer,
non_fungible_address -> Text,
}
}

diesel::table! {
substates (id) {
event_payloads (id) {
id -> Integer,
address -> Text,
version -> BigInt,
data -> Text,
tx_hash -> Text,
template_address -> Nullable<Text>,
module_name -> Nullable<Text>,
timestamp -> BigInt,
payload_key -> Text,
payload_value -> Text,
event_id -> Integer,
}
}

Expand All @@ -36,23 +23,42 @@ diesel::table! {
}

diesel::table! {
event_payloads (id) {
non_fungible_indexes (id) {
id -> Integer,
payload_key -> Text,
payload_value -> Text,
event_id -> Integer,
resource_address -> Text,
idx -> Integer,
non_fungible_address -> Text,
}
}

diesel::table! {
scanned_block_ids (id) {
id -> Integer,
epoch -> BigInt,
shard -> BigInt,
shard_group -> Integer,
last_block_id -> Binary,
}
}

diesel::table! {
substates (id) {
id -> Integer,
address -> Text,
version -> BigInt,
data -> Text,
tx_hash -> Text,
template_address -> Nullable<Text>,
module_name -> Nullable<Text>,
sdbondi marked this conversation as resolved.
Show resolved Hide resolved
timestamp -> BigInt,
}
}

diesel::joinable!(event_payloads -> events (event_id));

diesel::allow_tables_to_appear_in_same_query!(substates, non_fungible_indexes, events, event_payloads);
diesel::allow_tables_to_appear_in_same_query!(
event_payloads,
events,
non_fungible_indexes,
scanned_block_ids,
substates,
);
Loading
Loading