Skip to content

Commit

Permalink
fix(core)!: remove unused get_committees call from base node (#4880)
Browse files Browse the repository at this point in the history
Description
---
- Remove unused `get_committee` function from base layer
- add test to KeyPrefixCursor to confirm it works as expected

Motivation and Context
---
`get_committee` would panic because it made simultaneous accesses to the same transaction by creating multiple KeyPrefixCursors. Since it is unused and implemented on the DAN layer, the decision was taken to remove it.

How Has This Been Tested?
---
Added new test to KeyPrefixCursor

BREAKING CHANGE: grpc interface no longer has get_committee call
  • Loading branch information
sdbondi authored Nov 7, 2022
1 parent bd49bf2 commit 392d541
Show file tree
Hide file tree
Showing 12 changed files with 76 additions and 142 deletions.
11 changes: 0 additions & 11 deletions applications/tari_app_grpc/proto/base_node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ service BaseNode {
rpc GetMempoolStats(Empty) returns (MempoolStatsResponse);
// Get VNs
rpc GetActiveValidatorNodes(GetActiveValidatorNodesRequest) returns (stream GetActiveValidatorNodesResponse);
rpc GetCommittee(GetCommitteeRequest) returns (GetCommitteeResponse);
rpc GetShardKey(GetShardKeyRequest) returns (GetShardKeyResponse);
// Get templates
rpc GetTemplateRegistrations(GetTemplateRegistrationsRequest) returns (stream GetTemplateRegistrationResponse);
Expand Down Expand Up @@ -451,16 +450,6 @@ message GetActiveValidatorNodesResponse {
bytes public_key = 2;
}


message GetCommitteeRequest {
uint64 height = 1;
bytes shard_key = 2;
}

message GetCommitteeResponse {
repeated bytes public_key = 1;
}

message GetShardKeyRequest {
uint64 height = 1;
bytes public_key = 2;
Expand Down
21 changes: 0 additions & 21 deletions applications/tari_base_node/src/grpc/base_node_grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1437,27 +1437,6 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
Ok(Response::new(response))
}

async fn get_committee(
&self,
request: Request<tari_rpc::GetCommitteeRequest>,
) -> Result<Response<tari_rpc::GetCommitteeResponse>, Status> {
let request = request.into_inner();
let report_error_flag = self.report_error_flag();
debug!(target: LOG_TARGET, "Incoming GRPC request for GetCommittee");
let mut handler = self.node_service.clone();
let response = handler
.get_committee(request.height, request.shard_key.try_into().unwrap())
.await
.map_err(|e| {
error!(target: LOG_TARGET, "Error {}", e);
obscure_error_if_true(report_error_flag, Status::internal(e.to_string()))
})?
.iter()
.map(|a| a.shard_key.to_vec())
.collect();
Ok(Response::new(tari_rpc::GetCommitteeResponse { public_key: response }))
}

async fn get_shard_key(
&self,
request: Request<tari_rpc::GetShardKeyRequest>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,6 @@ pub enum NodeCommsRequest {
FetchValidatorNodesKeys {
height: u64,
},
FetchCommittee {
height: u64,
shard: [u8; 32],
},
GetShardKey {
height: u64,
public_key: PublicKey,
Expand Down Expand Up @@ -125,9 +121,6 @@ impl Display for NodeCommsRequest {
FetchValidatorNodesKeys { height } => {
write!(f, "FetchValidatorNodesKeys ({})", height)
},
FetchCommittee { height, shard } => {
write!(f, "FetchCommittee height ({}), shard({:?})", height, shard)
},
GetShardKey { height, public_key } => {
write!(f, "GetShardKey height ({}), public key ({:?})", height, public_key)
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tari_common_types::{

use crate::{
blocks::{Block, ChainHeader, HistoricalBlock, NewBlockTemplate},
chain_storage::{ActiveValidatorNode, TemplateRegistrationEntry},
chain_storage::TemplateRegistrationEntry,
proof_of_work::Difficulty,
transactions::transaction_components::{Transaction, TransactionKernel, TransactionOutput},
};
Expand All @@ -57,7 +57,6 @@ pub enum NodeCommsResponse {
MmrNodes(Vec<HashOutput>, Vec<u8>),
FetchMempoolTransactionsByExcessSigsResponse(FetchMempoolTransactionsResponse),
FetchValidatorNodesKeysResponse(Vec<(PublicKey, [u8; 32])>),
FetchCommitteeResponse(Vec<ActiveValidatorNode>),
GetShardKeyResponse(Option<[u8; 32]>),
FetchTemplateRegistrationsResponse(Vec<TemplateRegistrationEntry>),
}
Expand Down Expand Up @@ -94,7 +93,6 @@ impl Display for NodeCommsResponse {
resp.not_found.len()
),
FetchValidatorNodesKeysResponse(_) => write!(f, "FetchValidatorNodesKeysResponse"),
FetchCommitteeResponse(_) => write!(f, "FetchCommitteeResponse"),
GetShardKeyResponse(_) => write!(f, "GetShardKeyResponse"),
FetchTemplateRegistrationsResponse(_) => write!(f, "FetchTemplateRegistrationsResponse"),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,10 +369,6 @@ where B: BlockchainBackend + 'static
active_validator_nodes,
))
},
NodeCommsRequest::FetchCommittee { height, shard } => {
let validator_nodes = self.blockchain_db.fetch_committee(height, shard).await?;
Ok(NodeCommsResponse::FetchCommitteeResponse(validator_nodes))
},
NodeCommsRequest::GetShardKey { height, public_key } => {
let shard_key = self.blockchain_db.get_shard_key(height, public_key).await?;
Ok(NodeCommsResponse::GetShardKeyResponse(shard_key))
Expand Down
17 changes: 1 addition & 16 deletions base_layer/core/src/base_node/comms_interface/local_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::{
NodeCommsResponse,
},
blocks::{Block, ChainHeader, HistoricalBlock, NewBlockTemplate},
chain_storage::{ActiveValidatorNode, TemplateRegistrationEntry},
chain_storage::TemplateRegistrationEntry,
proof_of_work::PowAlgorithm,
transactions::transaction_components::{TransactionKernel, TransactionOutput},
};
Expand Down Expand Up @@ -295,21 +295,6 @@ impl LocalNodeCommsInterface {
}
}

pub async fn get_committee(
&mut self,
height: u64,
shard: [u8; 32],
) -> Result<Vec<ActiveValidatorNode>, CommsInterfaceError> {
match self
.request_sender
.call(NodeCommsRequest::FetchCommittee { height, shard })
.await??
{
NodeCommsResponse::FetchCommitteeResponse(validator_node) => Ok(validator_node),
_ => Err(CommsInterfaceError::UnexpectedApiResponse),
}
}

pub async fn get_shard_key(
&mut self,
height: u64,
Expand Down
4 changes: 1 addition & 3 deletions base_layer/core/src/chain_storage/async_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tari_common_types::{
};
use tari_utilities::epoch_time::EpochTime;

use super::{ActiveValidatorNode, TemplateRegistrationEntry};
use super::TemplateRegistrationEntry;
use crate::{
blocks::{
Block,
Expand Down Expand Up @@ -269,8 +269,6 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {

make_async_fn!(fetch_active_validator_nodes(height: u64) -> Vec<(PublicKey, [u8;32])>, "fetch_active_validator_nodes");

make_async_fn!(fetch_committee(height: u64, shard: [u8;32]) -> Vec<ActiveValidatorNode>, "fetch_committee");

make_async_fn!(get_shard_key(height:u64, public_key: PublicKey) -> Option<[u8;32]>, "get_shard_key");

make_async_fn!(fetch_template_registrations<T: RangeBounds<u64>>(range: T) -> Vec<TemplateRegistrationEntry>, "fetch_template_registrations");
Expand Down
3 changes: 1 addition & 2 deletions base_layer/core/src/chain_storage/blockchain_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tari_common_types::{
types::{Commitment, HashOutput, PublicKey, Signature},
};

use super::{ActiveValidatorNode, TemplateRegistrationEntry};
use super::TemplateRegistrationEntry;
use crate::{
blocks::{
Block,
Expand Down Expand Up @@ -194,7 +194,6 @@ pub trait BlockchainBackend: Send + Sync {
fn fetch_all_reorgs(&self) -> Result<Vec<Reorg>, ChainStorageError>;

fn fetch_active_validator_nodes(&self, height: u64) -> Result<Vec<(PublicKey, [u8; 32])>, ChainStorageError>;
fn fetch_committee(&self, height: u64, shard: [u8; 32]) -> Result<Vec<ActiveValidatorNode>, ChainStorageError>;
fn get_shard_key(&self, height: u64, public_key: PublicKey) -> Result<Option<[u8; 32]>, ChainStorageError>;
fn fetch_template_registrations(
&self,
Expand Down
7 changes: 1 addition & 6 deletions base_layer/core/src/chain_storage/blockchain_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use tari_common_types::{
use tari_mmr::pruned_hashset::PrunedHashSet;
use tari_utilities::{epoch_time::EpochTime, hex::Hex, ByteArray};

use super::{ActiveValidatorNode, TemplateRegistrationEntry};
use super::TemplateRegistrationEntry;
use crate::{
blocks::{
Block,
Expand Down Expand Up @@ -1188,11 +1188,6 @@ where B: BlockchainBackend
db.fetch_active_validator_nodes(height)
}

pub fn fetch_committee(&self, height: u64, shard: [u8; 32]) -> Result<Vec<ActiveValidatorNode>, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_committee(height, shard)
}

pub fn fetch_template_registrations<T: RangeBounds<u64>>(
&self,
range: T,
Expand Down
72 changes: 71 additions & 1 deletion base_layer/core/src/chain_storage/lmdb_db/key_prefix_cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,14 @@ where V: DeserializeOwned
}
}

pub fn seek_gte(&mut self, key: &[u8]) -> Result<Option<(Vec<u8>, V)>, ChainStorageError> {
// This function could be used later in cases where multiple seeks are required.
#[cfg(test)]
pub fn reset_to(&mut self, prefix_key: &'a [u8]) {
self.has_seeked = false;
self.prefix_key = prefix_key;
}

fn seek_gte(&mut self, key: &[u8]) -> Result<Option<(Vec<u8>, V)>, ChainStorageError> {
self.has_seeked = true;
let seek_result = self.cursor.seek_range_k(&self.access, key).to_opt()?;
let (k, v) = match seek_result {
Expand All @@ -105,3 +112,66 @@ where V: DeserializeOwned
Ok(Some((k.to_vec(), val)))
}
}

#[cfg(test)]
mod tests {
use std::fs;

use lmdb_zero::{db, ReadTransaction, WriteTransaction};
use tari_storage::lmdb_store::{LMDBBuilder, LMDBConfig};
use tari_test_utils::paths::create_temporary_data_path;

use crate::chain_storage::lmdb_db::lmdb::{lmdb_get_prefix_cursor, lmdb_insert};

#[test]
fn test_lmdb_get_prefix_cursor() {
let temp_path = create_temporary_data_path();

let lmdb_store = LMDBBuilder::new()
.set_path(&temp_path)
.set_env_config(LMDBConfig::default())
.set_max_number_of_databases(1)
.add_database("test", db::CREATE)
.build()
.unwrap();

let db = lmdb_store.get_handle("test").unwrap();
{
let txn = WriteTransaction::new(lmdb_store.env()).unwrap();
lmdb_insert(&txn, &db.db(), &[0xffu8, 0, 0, 0], &1u64, "test").unwrap();
lmdb_insert(&txn, &db.db(), &[0x2bu8, 0, 0, 1], &2u64, "test").unwrap();
lmdb_insert(&txn, &db.db(), &[0x2bu8, 0, 1, 1], &3u64, "test").unwrap();
lmdb_insert(&txn, &db.db(), &[0x2bu8, 1, 1, 0], &4u64, "test").unwrap();
lmdb_insert(&txn, &db.db(), &[0x2bu8, 1, 1, 1], &5u64, "test").unwrap();
lmdb_insert(&txn, &db.db(), &[0x00u8, 1, 1, 1], &5u64, "test").unwrap();
txn.commit().unwrap();
}

{
let txn = ReadTransaction::new(lmdb_store.env()).unwrap();
let db = db.db();
let mut cursor = lmdb_get_prefix_cursor::<u64>(&txn, &db, &[0x2b]).unwrap();
let kv = cursor.next().unwrap().unwrap();
assert_eq!(kv, (vec![0x2b, 0, 0, 1], 2));
let kv = cursor.next().unwrap().unwrap();
assert_eq!(kv, (vec![0x2b, 0, 1, 1], 3));
let kv = cursor.next().unwrap().unwrap();
assert_eq!(kv, (vec![0x2b, 1, 1, 0], 4));
let kv = cursor.next().unwrap().unwrap();
assert_eq!(kv, (vec![0x2b, 1, 1, 1], 5));
assert_eq!(cursor.next().unwrap(), None);

cursor.reset_to(&[0x2b, 1, 1]);
let kv = cursor.next().unwrap().unwrap();
assert_eq!(kv, (vec![0x2b, 1, 1, 0], 4));
let kv = cursor.next().unwrap().unwrap();
assert_eq!(kv, (vec![0x2b, 1, 1, 1], 5));
assert_eq!(cursor.next().unwrap(), None);

cursor.reset_to(&[0x11]);
assert_eq!(cursor.next().unwrap(), None);
}

fs::remove_dir_all(&temp_path).expect("Could not delete temporary file");
}
}
63 changes: 0 additions & 63 deletions base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2520,69 +2520,6 @@ impl BlockchainBackend for LMDBDatabase {
.collect())
}

fn fetch_committee(&self, height: u64, shard: [u8; 32]) -> Result<Vec<ActiveValidatorNode>, ChainStorageError> {
// TODO: I'm not sure how effective this is compared to getting all and selecting by yourself. Also if there is
// less validator nodes than committee size this gets weird.
let txn = self.read_transaction()?;
let mut cursor: KeyPrefixCursor<ActiveValidatorNode> =
lmdb_get_prefix_cursor(&txn, &self.validator_nodes, &shard)?;
let mut result = vec![];
let committee_half_size = 5u64;
let mut size = 0u64;
// Right side of the committee
while let Some((_, val)) = cursor.next()? {
if val.from_height <= height && height <= val.to_height {
result.push(val);
size += 1;
if size == committee_half_size {
break;
}
}
}
// Check if it wraps around
if size < committee_half_size {
let mut cursor: KeyPrefixCursor<ActiveValidatorNode> =
lmdb_get_prefix_cursor(&txn, &self.validator_nodes, &[0; 32])?;
while let Some((_, val)) = cursor.next()? {
if val.from_height <= height && height <= val.to_height {
result.push(val);
size += 1;
if size == committee_half_size {
break;
}
}
}
}
let mut cursor: KeyPrefixCursor<ActiveValidatorNode> =
lmdb_get_prefix_cursor(&txn, &self.validator_nodes, &shard)?;
let mut size = 0u64;
// Left side of the committee
while let Some((_, val)) = cursor.prev()? {
if val.from_height <= height && height <= val.to_height {
result.push(val);
size += 1;
if size == committee_half_size {
break;
}
}
}
// Check if it wraps around
if size < committee_half_size {
let mut cursor: KeyPrefixCursor<ActiveValidatorNode> =
lmdb_get_prefix_cursor(&txn, &self.validator_nodes, &[255; 32])?;
while let Some((_, val)) = cursor.prev()? {
if val.from_height <= height && height <= val.to_height {
result.push(val);
size += 1;
if size == committee_half_size {
break;
}
}
}
}
Ok(result)
}

fn get_shard_key(&self, height: u64, public_key: PublicKey) -> Result<Option<[u8; 32]>, ChainStorageError> {
let txn = self.read_transaction()?;
let mut validator_nodes: Vec<ActiveValidatorNode> =
Expand Down
5 changes: 0 additions & 5 deletions base_layer/core/src/test_helpers/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ use crate::{
},
chain_storage::{
create_lmdb_database,
ActiveValidatorNode,
BlockAddResult,
BlockchainBackend,
BlockchainDatabase,
Expand Down Expand Up @@ -419,10 +418,6 @@ impl BlockchainBackend for TempDatabase {
self.db.as_ref().unwrap().fetch_active_validator_nodes(height)
}

fn fetch_committee(&self, height: u64, shard: [u8; 32]) -> Result<Vec<ActiveValidatorNode>, ChainStorageError> {
self.db.as_ref().unwrap().fetch_committee(height, shard)
}

fn get_shard_key(&self, height: u64, public_key: PublicKey) -> Result<Option<[u8; 32]>, ChainStorageError> {
self.db.as_ref().unwrap().get_shard_key(height, public_key)
}
Expand Down

0 comments on commit 392d541

Please sign in to comment.