Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mempool)!: gossip reference to tx and request if needed #6675

Merged
merged 9 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion applications/minotari_miner/src/run_miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,8 @@ async fn connect_base_node(config: &MinerConfig) -> Result<BaseNodeGrpcClient, M
let node_conn = BaseNodeClient::with_interceptor(
channel,
ClientAuthenticationInterceptor::create(&config.base_node_grpc_authentication)?,
);
)
.max_decoding_message_size(10 * 1024 * 1024);

Ok(node_conn)
}
Expand Down
12 changes: 10 additions & 2 deletions applications/minotari_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ where B: BlockchainBackend + 'static

let dispatcher = Dispatcher::new();
let user_agent = format!("tari/basenode/{}", consts::APP_VERSION_NUMBER);
let (mempool_want_list_tx, mempool_want_list_rx) = mpsc::unbounded_channel();
let handles = StackBuilder::new(self.interrupt_signal)
.add_initializer(P2pInitializer::new(
p2p_config.clone(),
Expand All @@ -124,8 +125,15 @@ where B: BlockchainBackend + 'static
base_node_config.state_machine.clone(),
dispatcher.clone(),
))
.add_initializer(MempoolServiceInitializer::new(self.mempool.clone()))
.add_initializer(MempoolSyncInitializer::new(mempool_config, self.mempool.clone()))
.add_initializer(MempoolServiceInitializer::new(
self.mempool.clone(),
mempool_want_list_tx,
))
.add_initializer(MempoolSyncInitializer::new(
mempool_config,
self.mempool.clone(),
mempool_want_list_rx,
))
.add_initializer(LivenessInitializer::new(
LivenessConfig {
auto_ping_interval: Some(base_node_config.metadata_auto_ping_interval),
Expand Down
13 changes: 10 additions & 3 deletions applications/minotari_node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ use tari_common_types::grpc_authentication::GrpcAuthentication;
use tari_network::{identity, multiaddr::Multiaddr};
use tari_shutdown::{Shutdown, ShutdownSignal};
use tokio::task;
use tonic::transport::{Identity, Server, ServerTlsConfig};
use tonic::{
codegen::InterceptedService,
transport::{Identity, Server, ServerTlsConfig},
};

use crate::cli::Cli;
pub use crate::config::{ApplicationConfig, BaseNodeConfig, DatabaseType};
Expand Down Expand Up @@ -195,15 +198,19 @@ async fn run_grpc(
let grpc_address = multiaddr_to_socketaddr(&grpc_address)?;
let auth = ServerAuthenticationInterceptor::new(auth_config)
.ok_or(anyhow::anyhow!("Unable to prepare server gRPC authentication"))?;
let service = minotari_app_grpc::tari_rpc::base_node_server::BaseNodeServer::with_interceptor(grpc, auth);
let service = minotari_app_grpc::tari_rpc::base_node_server::BaseNodeServer::new(grpc)
.max_encoding_message_size(10 * 1024 * 1024)
.max_decoding_message_size(10 * 1024 * 1024);
let service = InterceptedService::new(service, auth);

let mut server_builder = if let Some(identity) = tls_identity {
let server_builder = if let Some(identity) = tls_identity {
Server::builder().tls_config(ServerTlsConfig::new().identity(identity))?
} else {
Server::builder()
};

server_builder
.max_frame_size(Some(10 * 1024 * 1024))
.add_service(service)
.serve_with_shutdown(grpc_address, interrupt_signal.map(|_| ()))
.await
Expand Down
1 change: 1 addition & 0 deletions base_layer/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ derivative = "2.2.0"
digest = "0.10"
fs2 = "0.4.0"
futures = { version = "^0.3.16", features = ["async-await"] }
futures-bounded = { workspace = true }
hex = "0.4.2"
integer-encoding = "3.0"
lmdb-zero = "0.4.4"
Expand Down
15 changes: 13 additions & 2 deletions base_layer/core/src/mempool/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::time::Duration;

use serde::{Deserialize, Serialize};
use tari_common::SubConfigPath;

Expand All @@ -46,12 +48,18 @@ impl SubConfigPath for MempoolConfig {
#[serde(deny_unknown_fields)]
pub struct MempoolServiceConfig {
/// Number of peers from which to initiate a sync. Once this many peers have successfully synced, this node will
/// not initiate any more mempool syncs. Default: 2
/// not initiate any more mempool syncs.
pub initial_sync_num_peers: usize,
/// The maximum number of transactions to sync in a single sync session Default: 10_000
/// The maximum number of transactions to sync in a single sync session
pub initial_sync_max_transactions: usize,
/// The maximum number of blocks added via sync or re-org to triggering a sync
pub block_sync_trigger: usize,
/// The maximum number of transactions a peer can specifically request.
pub max_request_transactions: usize,
/// The length of time between checking and, if required, requesting the transactions from the want list.
pub request_want_list_interval: Duration,
/// Maximum concurrent inbound handler tasks.
pub max_concurrent_inbound_tasks: usize,
hansieodendaal marked this conversation as resolved.
Show resolved Hide resolved
}

impl Default for MempoolServiceConfig {
Expand All @@ -60,6 +68,9 @@ impl Default for MempoolServiceConfig {
initial_sync_num_peers: 2,
initial_sync_max_transactions: 10_000,
block_sync_trigger: 5,
max_request_transactions: 10_000,
request_want_list_interval: Duration::from_secs(5),
max_concurrent_inbound_tasks: 50,
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions base_layer/core/src/mempool/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
use std::sync::{Arc, RwLock};

use log::debug;
use tari_common_types::types::{FixedHash, PrivateKey, Signature};
use tari_common_types::types::{FixedHash, PrivateKey};
use tari_crypto::ristretto::RistrettoSecretKey;
use tokio::task;

use crate::{
Expand Down Expand Up @@ -158,7 +159,10 @@ impl Mempool {
}

/// Check if the specified excess signature is found in the Mempool.
pub async fn has_tx_with_excess_sig(&self, excess_sig: Signature) -> Result<TxStorageResponse, MempoolError> {
pub async fn has_tx_with_excess_sig(
&self,
excess_sig: RistrettoSecretKey,
) -> Result<TxStorageResponse, MempoolError> {
self.with_read_access(move |storage| Ok(storage.has_tx_with_excess_sig(&excess_sig)))
.await
}
Expand Down
9 changes: 5 additions & 4 deletions base_layer/core/src/mempool/mempool_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
use std::{sync::Arc, time::Instant};

use log::*;
use tari_common_types::types::{FixedHash, PrivateKey, Signature};
use tari_common_types::types::{FixedHash, PrivateKey};
use tari_crypto::ristretto::RistrettoSecretKey;
use tari_utilities::hex::Hex;

use crate::{
Expand Down Expand Up @@ -328,7 +329,7 @@ impl MempoolStorage {
}

/// Check if the specified excess signature is found in the Mempool.
pub fn has_tx_with_excess_sig(&self, excess_sig: &Signature) -> TxStorageResponse {
pub fn has_tx_with_excess_sig(&self, excess_sig: &RistrettoSecretKey) -> TxStorageResponse {
if self.unconfirmed_pool.has_tx_with_excess_sig(excess_sig) {
TxStorageResponse::UnconfirmedPool
} else if self.reorg_pool.has_tx_with_excess_sig(excess_sig) {
Expand All @@ -345,10 +346,10 @@ impl MempoolStorage {
.iter()
.fold(None, |stored, kernel| {
if stored.is_none() {
return Some(self.has_tx_with_excess_sig(&kernel.excess_sig));
return Some(self.has_tx_with_excess_sig(kernel.excess_sig.get_signature()));
}
let stored = stored.unwrap();
match (self.has_tx_with_excess_sig(&kernel.excess_sig), stored) {
match (self.has_tx_with_excess_sig(kernel.excess_sig.get_signature()), stored) {
// All (so far) in unconfirmed pool
(TxStorageResponse::UnconfirmedPool, TxStorageResponse::UnconfirmedPool) => {
Some(TxStorageResponse::UnconfirmedPool)
Expand Down
7 changes: 7 additions & 0 deletions base_layer/core/src/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ pub use service::{MempoolServiceError, MempoolServiceInitializer};

#[cfg(feature = "base_node")]
mod sync_protocol;
mod transaction_id;

use core::fmt::{Display, Error, Formatter};
use std::sync::Arc;

Expand Down Expand Up @@ -119,6 +121,11 @@ impl TxStorageResponse {
pub fn is_stored(&self) -> bool {
matches!(self, Self::UnconfirmedPool | Self::ReorgPool)
}

/// Returns true if the transaction has not been seen before, otherwise false.
pub fn is_new(&self) -> bool {
matches!(self, Self::NotStored)
}
}

impl Display for TxStorageResponse {
Expand Down
61 changes: 31 additions & 30 deletions base_layer/core/src/mempool/reorg_pool/reorg_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use std::{

use log::*;
use serde::{Deserialize, Serialize};
use tari_common_types::types::{PrivateKey, Signature};
use tari_common_types::types::PrivateKey;
use tari_crypto::ristretto::RistrettoSecretKey;
use tari_utilities::hex::Hex;

use crate::{
Expand Down Expand Up @@ -168,8 +169,8 @@ impl ReorgPool {
}

/// Check if a transaction is stored in the ReorgPool
pub fn has_tx_with_excess_sig(&self, excess_sig: &Signature) -> bool {
self.txs_by_signature.contains_key(excess_sig.get_signature())
pub fn has_tx_with_excess_sig(&self, excess_sig: &RistrettoSecretKey) -> bool {
self.txs_by_signature.contains_key(excess_sig)
}

/// Remove the transactions from the ReorgPool that were used in provided removed blocks. The transactions
Expand Down Expand Up @@ -385,26 +386,26 @@ mod test {
reorg_pool.insert(1, tx1.clone());
reorg_pool.insert(2, tx2.clone());

assert!(reorg_pool.has_tx_with_excess_sig(&tx1.body.kernels()[0].excess_sig));
assert!(reorg_pool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig));
assert!(reorg_pool.has_tx_with_excess_sig(tx1.body.kernels()[0].excess_sig.get_signature()));
assert!(reorg_pool.has_tx_with_excess_sig(tx2.body.kernels()[0].excess_sig.get_signature()));

reorg_pool.insert(3, tx3.clone());
reorg_pool.insert(4, tx4.clone());
// Check that oldest utx was removed to make room for new incoming transactions
assert!(!reorg_pool.has_tx_with_excess_sig(&tx1.body.kernels()[0].excess_sig));
assert!(!reorg_pool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig));
assert!(reorg_pool.has_tx_with_excess_sig(&tx3.body.kernels()[0].excess_sig));
assert!(reorg_pool.has_tx_with_excess_sig(&tx4.body.kernels()[0].excess_sig));
assert!(!reorg_pool.has_tx_with_excess_sig(tx1.body.kernels()[0].excess_sig.get_signature()));
assert!(!reorg_pool.has_tx_with_excess_sig(tx2.body.kernels()[0].excess_sig.get_signature()));
assert!(reorg_pool.has_tx_with_excess_sig(tx3.body.kernels()[0].excess_sig.get_signature()));
assert!(reorg_pool.has_tx_with_excess_sig(tx4.body.kernels()[0].excess_sig.get_signature()));

reorg_pool.insert(5, tx5.clone());
reorg_pool.insert(6, tx6.clone());
assert_eq!(reorg_pool.len(), 2);
assert!(!reorg_pool.has_tx_with_excess_sig(&tx1.body.kernels()[0].excess_sig));
assert!(!reorg_pool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig));
assert!(!reorg_pool.has_tx_with_excess_sig(&tx3.body.kernels()[0].excess_sig));
assert!(!reorg_pool.has_tx_with_excess_sig(&tx4.body.kernels()[0].excess_sig));
assert!(reorg_pool.has_tx_with_excess_sig(&tx5.body.kernels()[0].excess_sig));
assert!(reorg_pool.has_tx_with_excess_sig(&tx6.body.kernels()[0].excess_sig));
assert!(!reorg_pool.has_tx_with_excess_sig(tx1.body.kernels()[0].excess_sig.get_signature()));
assert!(!reorg_pool.has_tx_with_excess_sig(tx2.body.kernels()[0].excess_sig.get_signature()));
assert!(!reorg_pool.has_tx_with_excess_sig(tx3.body.kernels()[0].excess_sig.get_signature()));
assert!(!reorg_pool.has_tx_with_excess_sig(tx4.body.kernels()[0].excess_sig.get_signature()));
assert!(reorg_pool.has_tx_with_excess_sig(tx5.body.kernels()[0].excess_sig.get_signature()));
assert!(reorg_pool.has_tx_with_excess_sig(tx6.body.kernels()[0].excess_sig.get_signature()));
}

#[tokio::test]
Expand Down Expand Up @@ -432,9 +433,9 @@ mod test {
reorg_pool.insert(1, tx3.clone());

let txs = reorg_pool.clear_and_retrieve_all();
assert!(!reorg_pool.has_tx_with_excess_sig(&tx1.body.kernels()[0].excess_sig));
assert!(!reorg_pool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig));
assert!(!reorg_pool.has_tx_with_excess_sig(&tx3.body.kernels()[0].excess_sig));
assert!(!reorg_pool.has_tx_with_excess_sig(tx1.body.kernels()[0].excess_sig.get_signature()));
assert!(!reorg_pool.has_tx_with_excess_sig(tx2.body.kernels()[0].excess_sig.get_signature()));
assert!(!reorg_pool.has_tx_with_excess_sig(tx3.body.kernels()[0].excess_sig.get_signature()));
assert!(reorg_pool.txs_by_height.is_empty());
assert!(reorg_pool.tx_by_key.is_empty());
assert!(reorg_pool.txs_by_signature.is_empty());
Expand Down Expand Up @@ -491,12 +492,12 @@ mod test {
]);
// Oldest transaction tx1 is removed to make space for new incoming transactions
assert_eq!(reorg_pool.len(), 6);
assert!(reorg_pool.has_tx_with_excess_sig(&tx1.body.kernels()[0].excess_sig));
assert!(reorg_pool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig));
assert!(reorg_pool.has_tx_with_excess_sig(&tx3.body.kernels()[0].excess_sig));
assert!(reorg_pool.has_tx_with_excess_sig(&tx4.body.kernels()[0].excess_sig));
assert!(reorg_pool.has_tx_with_excess_sig(&tx5.body.kernels()[0].excess_sig));
assert!(reorg_pool.has_tx_with_excess_sig(&tx6.body.kernels()[0].excess_sig));
assert!(reorg_pool.has_tx_with_excess_sig(tx1.body.kernels()[0].excess_sig.get_signature()));
assert!(reorg_pool.has_tx_with_excess_sig(tx2.body.kernels()[0].excess_sig.get_signature()));
assert!(reorg_pool.has_tx_with_excess_sig(tx3.body.kernels()[0].excess_sig.get_signature()));
assert!(reorg_pool.has_tx_with_excess_sig(tx4.body.kernels()[0].excess_sig.get_signature()));
assert!(reorg_pool.has_tx_with_excess_sig(tx5.body.kernels()[0].excess_sig.get_signature()));
assert!(reorg_pool.has_tx_with_excess_sig(tx6.body.kernels()[0].excess_sig.get_signature()));

let reorg_blocks = &[
create_orphan_block(3000, vec![(*tx3).clone(), (*tx4).clone()], &consensus).into(),
Expand All @@ -511,11 +512,11 @@ mod test {
assert!(removed_txs.contains(&tx4));

assert_eq!(reorg_pool.len(), 2);
assert!(!reorg_pool.has_tx_with_excess_sig(&tx1.body.kernels()[0].excess_sig));
assert!(!reorg_pool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig));
assert!(!reorg_pool.has_tx_with_excess_sig(&tx3.body.kernels()[0].excess_sig));
assert!(!reorg_pool.has_tx_with_excess_sig(&tx4.body.kernels()[0].excess_sig));
assert!(reorg_pool.has_tx_with_excess_sig(&tx5.body.kernels()[0].excess_sig));
assert!(reorg_pool.has_tx_with_excess_sig(&tx6.body.kernels()[0].excess_sig));
assert!(!reorg_pool.has_tx_with_excess_sig(tx1.body.kernels()[0].excess_sig.get_signature()));
assert!(!reorg_pool.has_tx_with_excess_sig(tx2.body.kernels()[0].excess_sig.get_signature()));
assert!(!reorg_pool.has_tx_with_excess_sig(tx3.body.kernels()[0].excess_sig.get_signature()));
assert!(!reorg_pool.has_tx_with_excess_sig(tx4.body.kernels()[0].excess_sig.get_signature()));
assert!(reorg_pool.has_tx_with_excess_sig(tx5.body.kernels()[0].excess_sig.get_signature()));
assert!(reorg_pool.has_tx_with_excess_sig(tx6.body.kernels()[0].excess_sig.get_signature()));
}
}
Loading
Loading