Skip to content

Commit

Permalink
feat(mempool): gossip reference to tx and request if needed
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Nov 8, 2024
1 parent e96337b commit 9a111a1
Show file tree
Hide file tree
Showing 25 changed files with 1,219 additions and 495 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion applications/minotari_miner/src/run_miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,8 @@ async fn connect_base_node(config: &MinerConfig) -> Result<BaseNodeGrpcClient, M
let node_conn = BaseNodeClient::with_interceptor(
channel,
ClientAuthenticationInterceptor::create(&config.base_node_grpc_authentication)?,
);
)
.max_decoding_message_size(10 * 1024 * 1024);

Ok(node_conn)
}
Expand Down
12 changes: 10 additions & 2 deletions applications/minotari_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ where B: BlockchainBackend + 'static

let dispatcher = Dispatcher::new();
let user_agent = format!("tari/basenode/{}", consts::APP_VERSION_NUMBER);
let (mempool_want_list_tx, mempool_want_list_rx) = mpsc::unbounded_channel();
let handles = StackBuilder::new(self.interrupt_signal)
.add_initializer(P2pInitializer::new(
p2p_config.clone(),
Expand All @@ -124,8 +125,15 @@ where B: BlockchainBackend + 'static
base_node_config.state_machine.clone(),
dispatcher.clone(),
))
.add_initializer(MempoolServiceInitializer::new(self.mempool.clone()))
.add_initializer(MempoolSyncInitializer::new(mempool_config, self.mempool.clone()))
.add_initializer(MempoolServiceInitializer::new(
self.mempool.clone(),
mempool_want_list_tx,
))
.add_initializer(MempoolSyncInitializer::new(
mempool_config,
self.mempool.clone(),
mempool_want_list_rx,
))
.add_initializer(LivenessInitializer::new(
LivenessConfig {
auto_ping_interval: Some(base_node_config.metadata_auto_ping_interval),
Expand Down
13 changes: 10 additions & 3 deletions applications/minotari_node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ use tari_common_types::grpc_authentication::GrpcAuthentication;
use tari_network::{identity, multiaddr::Multiaddr};
use tari_shutdown::{Shutdown, ShutdownSignal};
use tokio::task;
use tonic::transport::{Identity, Server, ServerTlsConfig};
use tonic::{
codegen::InterceptedService,
transport::{Identity, Server, ServerTlsConfig},
};

use crate::cli::Cli;
pub use crate::config::{ApplicationConfig, BaseNodeConfig, DatabaseType};
Expand Down Expand Up @@ -195,15 +198,19 @@ async fn run_grpc(
let grpc_address = multiaddr_to_socketaddr(&grpc_address)?;
let auth = ServerAuthenticationInterceptor::new(auth_config)
.ok_or(anyhow::anyhow!("Unable to prepare server gRPC authentication"))?;
let service = minotari_app_grpc::tari_rpc::base_node_server::BaseNodeServer::with_interceptor(grpc, auth);
let service = minotari_app_grpc::tari_rpc::base_node_server::BaseNodeServer::new(grpc)
.max_encoding_message_size(10 * 1024 * 1024)
.max_decoding_message_size(10 * 1024 * 1024);
let service = InterceptedService::new(service, auth);

let mut server_builder = if let Some(identity) = tls_identity {
let server_builder = if let Some(identity) = tls_identity {
Server::builder().tls_config(ServerTlsConfig::new().identity(identity))?
} else {
Server::builder()
};

server_builder
.max_frame_size(Some(10 * 1024 * 1024))
.add_service(service)
.serve_with_shutdown(grpc_address, interrupt_signal.map(|_| ()))
.await
Expand Down
1 change: 1 addition & 0 deletions base_layer/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ derivative = "2.2.0"
digest = "0.10"
fs2 = "0.4.0"
futures = { version = "^0.3.16", features = ["async-await"] }
futures-bounded = { workspace = true }
hex = "0.4.2"
integer-encoding = "3.0"
lmdb-zero = "0.4.4"
Expand Down
15 changes: 13 additions & 2 deletions base_layer/core/src/mempool/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::time::Duration;

use serde::{Deserialize, Serialize};
use tari_common::SubConfigPath;

Expand All @@ -46,12 +48,18 @@ impl SubConfigPath for MempoolConfig {
#[serde(deny_unknown_fields)]
pub struct MempoolServiceConfig {
/// Number of peers from which to initiate a sync. Once this many peers have successfully synced, this node will
/// not initiate any more mempool syncs. Default: 2
/// not initiate any more mempool syncs.
pub initial_sync_num_peers: usize,
/// The maximum number of transactions to sync in a single sync session Default: 10_000
/// The maximum number of transactions to sync in a single sync session
pub initial_sync_max_transactions: usize,
/// The maximum number of blocks added via sync or re-org to triggering a sync
pub block_sync_trigger: usize,
/// The maximum number of transactions a peer can specifically request.
pub max_request_transactions: usize,
/// The length of time between checking and, if required, requesting the transactions from the want list.
pub request_want_list_interval: Duration,
/// Maximum concurrent inbound handler tasks.
pub max_concurrent_inbound_tasks: usize,
}

impl Default for MempoolServiceConfig {
Expand All @@ -60,6 +68,9 @@ impl Default for MempoolServiceConfig {
initial_sync_num_peers: 2,
initial_sync_max_transactions: 10_000,
block_sync_trigger: 5,
max_request_transactions: 10_000,
request_want_list_interval: Duration::from_secs(5),
max_concurrent_inbound_tasks: 50,
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions base_layer/core/src/mempool/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
use std::sync::{Arc, RwLock};

use log::debug;
use tari_common_types::types::{FixedHash, PrivateKey, Signature};
use tari_common_types::types::{FixedHash, PrivateKey};
use tari_crypto::ristretto::RistrettoSecretKey;
use tokio::task;

use crate::{
Expand Down Expand Up @@ -158,7 +159,10 @@ impl Mempool {
}

/// Check if the specified excess signature is found in the Mempool.
pub async fn has_tx_with_excess_sig(&self, excess_sig: Signature) -> Result<TxStorageResponse, MempoolError> {
pub async fn has_tx_with_excess_sig(
&self,
excess_sig: RistrettoSecretKey,
) -> Result<TxStorageResponse, MempoolError> {
self.with_read_access(move |storage| Ok(storage.has_tx_with_excess_sig(&excess_sig)))
.await
}
Expand Down
9 changes: 5 additions & 4 deletions base_layer/core/src/mempool/mempool_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
use std::{sync::Arc, time::Instant};

use log::*;
use tari_common_types::types::{FixedHash, PrivateKey, Signature};
use tari_common_types::types::{FixedHash, PrivateKey};
use tari_crypto::ristretto::RistrettoSecretKey;
use tari_utilities::hex::Hex;

use crate::{
Expand Down Expand Up @@ -328,7 +329,7 @@ impl MempoolStorage {
}

/// Check if the specified excess signature is found in the Mempool.
pub fn has_tx_with_excess_sig(&self, excess_sig: &Signature) -> TxStorageResponse {
pub fn has_tx_with_excess_sig(&self, excess_sig: &RistrettoSecretKey) -> TxStorageResponse {
if self.unconfirmed_pool.has_tx_with_excess_sig(excess_sig) {
TxStorageResponse::UnconfirmedPool
} else if self.reorg_pool.has_tx_with_excess_sig(excess_sig) {
Expand All @@ -345,10 +346,10 @@ impl MempoolStorage {
.iter()
.fold(None, |stored, kernel| {
if stored.is_none() {
return Some(self.has_tx_with_excess_sig(&kernel.excess_sig));
return Some(self.has_tx_with_excess_sig(kernel.excess_sig.get_signature()));
}
let stored = stored.unwrap();
match (self.has_tx_with_excess_sig(&kernel.excess_sig), stored) {
match (self.has_tx_with_excess_sig(kernel.excess_sig.get_signature()), stored) {
// All (so far) in unconfirmed pool
(TxStorageResponse::UnconfirmedPool, TxStorageResponse::UnconfirmedPool) => {
Some(TxStorageResponse::UnconfirmedPool)
Expand Down
7 changes: 7 additions & 0 deletions base_layer/core/src/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ pub use service::{MempoolServiceError, MempoolServiceInitializer};

#[cfg(feature = "base_node")]
mod sync_protocol;
mod transaction_id;

use core::fmt::{Display, Error, Formatter};
use std::sync::Arc;

Expand Down Expand Up @@ -119,6 +121,11 @@ impl TxStorageResponse {
pub fn is_stored(&self) -> bool {
matches!(self, Self::UnconfirmedPool | Self::ReorgPool)
}

/// Returns true if the transaction has not been seen before, otherwise false.
pub fn is_new(&self) -> bool {
matches!(self, Self::NotStored)
}
}

impl Display for TxStorageResponse {
Expand Down
61 changes: 31 additions & 30 deletions base_layer/core/src/mempool/reorg_pool/reorg_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use std::{

use log::*;
use serde::{Deserialize, Serialize};
use tari_common_types::types::{PrivateKey, Signature};
use tari_common_types::types::PrivateKey;
use tari_crypto::ristretto::RistrettoSecretKey;
use tari_utilities::hex::Hex;

use crate::{
Expand Down Expand Up @@ -168,8 +169,8 @@ impl ReorgPool {
}

/// Check if a transaction is stored in the ReorgPool
pub fn has_tx_with_excess_sig(&self, excess_sig: &Signature) -> bool {
self.txs_by_signature.contains_key(excess_sig.get_signature())
pub fn has_tx_with_excess_sig(&self, excess_sig: &RistrettoSecretKey) -> bool {
self.txs_by_signature.contains_key(excess_sig)
}

/// Remove the transactions from the ReorgPool that were used in provided removed blocks. The transactions
Expand Down Expand Up @@ -385,26 +386,26 @@ mod test {
reorg_pool.insert(1, tx1.clone());
reorg_pool.insert(2, tx2.clone());

assert!(reorg_pool.has_tx_with_excess_sig(&tx1.body.kernels()[0].excess_sig));
assert!(reorg_pool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig));
assert!(reorg_pool.has_tx_with_excess_sig(tx1.body.kernels()[0].excess_sig.get_signature()));
assert!(reorg_pool.has_tx_with_excess_sig(tx2.body.kernels()[0].excess_sig.get_signature()));

reorg_pool.insert(3, tx3.clone());
reorg_pool.insert(4, tx4.clone());
// Check that oldest utx was removed to make room for new incoming transactions
assert!(!reorg_pool.has_tx_with_excess_sig(&tx1.body.kernels()[0].excess_sig));
assert!(!reorg_pool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig));
assert!(reorg_pool.has_tx_with_excess_sig(&tx3.body.kernels()[0].excess_sig));
assert!(reorg_pool.has_tx_with_excess_sig(&tx4.body.kernels()[0].excess_sig));
assert!(!reorg_pool.has_tx_with_excess_sig(tx1.body.kernels()[0].excess_sig.get_signature()));
assert!(!reorg_pool.has_tx_with_excess_sig(tx2.body.kernels()[0].excess_sig.get_signature()));
assert!(reorg_pool.has_tx_with_excess_sig(tx3.body.kernels()[0].excess_sig.get_signature()));
assert!(reorg_pool.has_tx_with_excess_sig(tx4.body.kernels()[0].excess_sig.get_signature()));

reorg_pool.insert(5, tx5.clone());
reorg_pool.insert(6, tx6.clone());
assert_eq!(reorg_pool.len(), 2);
assert!(!reorg_pool.has_tx_with_excess_sig(&tx1.body.kernels()[0].excess_sig));
assert!(!reorg_pool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig));
assert!(!reorg_pool.has_tx_with_excess_sig(&tx3.body.kernels()[0].excess_sig));
assert!(!reorg_pool.has_tx_with_excess_sig(&tx4.body.kernels()[0].excess_sig));
assert!(reorg_pool.has_tx_with_excess_sig(&tx5.body.kernels()[0].excess_sig));
assert!(reorg_pool.has_tx_with_excess_sig(&tx6.body.kernels()[0].excess_sig));
assert!(!reorg_pool.has_tx_with_excess_sig(tx1.body.kernels()[0].excess_sig.get_signature()));
assert!(!reorg_pool.has_tx_with_excess_sig(tx2.body.kernels()[0].excess_sig.get_signature()));
assert!(!reorg_pool.has_tx_with_excess_sig(tx3.body.kernels()[0].excess_sig.get_signature()));
assert!(!reorg_pool.has_tx_with_excess_sig(tx4.body.kernels()[0].excess_sig.get_signature()));
assert!(reorg_pool.has_tx_with_excess_sig(tx5.body.kernels()[0].excess_sig.get_signature()));
assert!(reorg_pool.has_tx_with_excess_sig(tx6.body.kernels()[0].excess_sig.get_signature()));
}

#[tokio::test]
Expand Down Expand Up @@ -432,9 +433,9 @@ mod test {
reorg_pool.insert(1, tx3.clone());

let txs = reorg_pool.clear_and_retrieve_all();
assert!(!reorg_pool.has_tx_with_excess_sig(&tx1.body.kernels()[0].excess_sig));
assert!(!reorg_pool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig));
assert!(!reorg_pool.has_tx_with_excess_sig(&tx3.body.kernels()[0].excess_sig));
assert!(!reorg_pool.has_tx_with_excess_sig(tx1.body.kernels()[0].excess_sig.get_signature()));
assert!(!reorg_pool.has_tx_with_excess_sig(tx2.body.kernels()[0].excess_sig.get_signature()));
assert!(!reorg_pool.has_tx_with_excess_sig(tx3.body.kernels()[0].excess_sig.get_signature()));
assert!(reorg_pool.txs_by_height.is_empty());
assert!(reorg_pool.tx_by_key.is_empty());
assert!(reorg_pool.txs_by_signature.is_empty());
Expand Down Expand Up @@ -491,12 +492,12 @@ mod test {
]);
// Oldest transaction tx1 is removed to make space for new incoming transactions
assert_eq!(reorg_pool.len(), 6);
assert!(reorg_pool.has_tx_with_excess_sig(&tx1.body.kernels()[0].excess_sig));
assert!(reorg_pool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig));
assert!(reorg_pool.has_tx_with_excess_sig(&tx3.body.kernels()[0].excess_sig));
assert!(reorg_pool.has_tx_with_excess_sig(&tx4.body.kernels()[0].excess_sig));
assert!(reorg_pool.has_tx_with_excess_sig(&tx5.body.kernels()[0].excess_sig));
assert!(reorg_pool.has_tx_with_excess_sig(&tx6.body.kernels()[0].excess_sig));
assert!(reorg_pool.has_tx_with_excess_sig(tx1.body.kernels()[0].excess_sig.get_signature()));
assert!(reorg_pool.has_tx_with_excess_sig(tx2.body.kernels()[0].excess_sig.get_signature()));
assert!(reorg_pool.has_tx_with_excess_sig(tx3.body.kernels()[0].excess_sig.get_signature()));
assert!(reorg_pool.has_tx_with_excess_sig(tx4.body.kernels()[0].excess_sig.get_signature()));
assert!(reorg_pool.has_tx_with_excess_sig(tx5.body.kernels()[0].excess_sig.get_signature()));
assert!(reorg_pool.has_tx_with_excess_sig(tx6.body.kernels()[0].excess_sig.get_signature()));

let reorg_blocks = &[
create_orphan_block(3000, vec![(*tx3).clone(), (*tx4).clone()], &consensus).into(),
Expand All @@ -511,11 +512,11 @@ mod test {
assert!(removed_txs.contains(&tx4));

assert_eq!(reorg_pool.len(), 2);
assert!(!reorg_pool.has_tx_with_excess_sig(&tx1.body.kernels()[0].excess_sig));
assert!(!reorg_pool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig));
assert!(!reorg_pool.has_tx_with_excess_sig(&tx3.body.kernels()[0].excess_sig));
assert!(!reorg_pool.has_tx_with_excess_sig(&tx4.body.kernels()[0].excess_sig));
assert!(reorg_pool.has_tx_with_excess_sig(&tx5.body.kernels()[0].excess_sig));
assert!(reorg_pool.has_tx_with_excess_sig(&tx6.body.kernels()[0].excess_sig));
assert!(!reorg_pool.has_tx_with_excess_sig(tx1.body.kernels()[0].excess_sig.get_signature()));
assert!(!reorg_pool.has_tx_with_excess_sig(tx2.body.kernels()[0].excess_sig.get_signature()));
assert!(!reorg_pool.has_tx_with_excess_sig(tx3.body.kernels()[0].excess_sig.get_signature()));
assert!(!reorg_pool.has_tx_with_excess_sig(tx4.body.kernels()[0].excess_sig.get_signature()));
assert!(reorg_pool.has_tx_with_excess_sig(tx5.body.kernels()[0].excess_sig.get_signature()));
assert!(reorg_pool.has_tx_with_excess_sig(tx6.body.kernels()[0].excess_sig.get_signature()));
}
}
Loading

0 comments on commit 9a111a1

Please sign in to comment.