From 9a111a1b99661b51253b8825445b7ad532c10583 Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Tue, 5 Nov 2024 17:19:30 +0400 Subject: [PATCH] feat(mempool): gossip reference to tx and request if needed --- Cargo.lock | 1 + applications/minotari_miner/src/run_miner.rs | 3 +- applications/minotari_node/src/bootstrap.rs | 12 +- applications/minotari_node/src/lib.rs | 13 +- base_layer/core/Cargo.toml | 1 + base_layer/core/src/mempool/config.rs | 15 +- base_layer/core/src/mempool/mempool.rs | 8 +- .../core/src/mempool/mempool_storage.rs | 9 +- base_layer/core/src/mempool/mod.rs | 7 + .../core/src/mempool/reorg_pool/reorg_pool.rs | 61 +- .../src/mempool/service/inbound_handlers.rs | 73 ++- .../core/src/mempool/service/initializer.rs | 10 +- .../core/src/mempool/service/service.rs | 184 +++++- .../core/src/mempool/sync_protocol/error.rs | 2 + .../src/mempool/sync_protocol/initializer.rs | 21 +- .../core/src/mempool/sync_protocol/mod.rs | 534 +++++----------- .../src/mempool/sync_protocol/protocol.rs | 596 ++++++++++++++++++ base_layer/core/src/mempool/transaction_id.rs | 49 ++ .../unconfirmed_pool/unconfirmed_pool.rs | 41 +- .../p2p/proto/base_node/mempool/gossip.proto | 15 + .../base_node/mempool/sync_protocol.proto | 12 + base_layer/p2p/src/proto/mempool_impl.rs | 20 + base_layer/p2p/src/proto/mod.rs | 1 + base_layer/p2p/src/proto/sync_protocol.rs | 23 +- network/rpc_framework/src/server/mod.rs | 3 +- 25 files changed, 1219 insertions(+), 495 deletions(-) create mode 100644 base_layer/core/src/mempool/sync_protocol/protocol.rs create mode 100644 base_layer/core/src/mempool/transaction_id.rs create mode 100644 base_layer/p2p/proto/base_node/mempool/gossip.proto create mode 100644 base_layer/p2p/src/proto/mempool_impl.rs diff --git a/Cargo.lock b/Cargo.lock index a4fa22a014..e7fe6a4a47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7914,6 +7914,7 @@ dependencies = [ "env_logger 0.7.1", "fs2", "futures 0.3.30", + "futures-bounded", "hex", "integer-encoding", "libp2p-substream", diff --git a/applications/minotari_miner/src/run_miner.rs b/applications/minotari_miner/src/run_miner.rs index dfe64059e5..bf85c1d7ce 100644 --- a/applications/minotari_miner/src/run_miner.rs +++ b/applications/minotari_miner/src/run_miner.rs @@ -344,7 +344,8 @@ async fn connect_base_node(config: &MinerConfig) -> Result Result { + pub async fn has_tx_with_excess_sig( + &self, + excess_sig: RistrettoSecretKey, + ) -> Result { self.with_read_access(move |storage| Ok(storage.has_tx_with_excess_sig(&excess_sig))) .await } diff --git a/base_layer/core/src/mempool/mempool_storage.rs b/base_layer/core/src/mempool/mempool_storage.rs index e44c8189e1..4574ee0115 100644 --- a/base_layer/core/src/mempool/mempool_storage.rs +++ b/base_layer/core/src/mempool/mempool_storage.rs @@ -23,7 +23,8 @@ use std::{sync::Arc, time::Instant}; use log::*; -use tari_common_types::types::{FixedHash, PrivateKey, Signature}; +use tari_common_types::types::{FixedHash, PrivateKey}; +use tari_crypto::ristretto::RistrettoSecretKey; use tari_utilities::hex::Hex; use crate::{ @@ -328,7 +329,7 @@ impl MempoolStorage { } /// Check if the specified excess signature is found in the Mempool. - pub fn has_tx_with_excess_sig(&self, excess_sig: &Signature) -> TxStorageResponse { + pub fn has_tx_with_excess_sig(&self, excess_sig: &RistrettoSecretKey) -> TxStorageResponse { if self.unconfirmed_pool.has_tx_with_excess_sig(excess_sig) { TxStorageResponse::UnconfirmedPool } else if self.reorg_pool.has_tx_with_excess_sig(excess_sig) { @@ -345,10 +346,10 @@ impl MempoolStorage { .iter() .fold(None, |stored, kernel| { if stored.is_none() { - return Some(self.has_tx_with_excess_sig(&kernel.excess_sig)); + return Some(self.has_tx_with_excess_sig(kernel.excess_sig.get_signature())); } let stored = stored.unwrap(); - match (self.has_tx_with_excess_sig(&kernel.excess_sig), stored) { + match (self.has_tx_with_excess_sig(kernel.excess_sig.get_signature()), stored) { // All (so far) in unconfirmed pool (TxStorageResponse::UnconfirmedPool, TxStorageResponse::UnconfirmedPool) => { Some(TxStorageResponse::UnconfirmedPool) diff --git a/base_layer/core/src/mempool/mod.rs b/base_layer/core/src/mempool/mod.rs index 255655d960..756512cd4b 100644 --- a/base_layer/core/src/mempool/mod.rs +++ b/base_layer/core/src/mempool/mod.rs @@ -68,6 +68,8 @@ pub use service::{MempoolServiceError, MempoolServiceInitializer}; #[cfg(feature = "base_node")] mod sync_protocol; +mod transaction_id; + use core::fmt::{Display, Error, Formatter}; use std::sync::Arc; @@ -119,6 +121,11 @@ impl TxStorageResponse { pub fn is_stored(&self) -> bool { matches!(self, Self::UnconfirmedPool | Self::ReorgPool) } + + /// Returns true if the transaction has not been seen before, otherwise false. + pub fn is_new(&self) -> bool { + matches!(self, Self::NotStored) + } } impl Display for TxStorageResponse { diff --git a/base_layer/core/src/mempool/reorg_pool/reorg_pool.rs b/base_layer/core/src/mempool/reorg_pool/reorg_pool.rs index eab697378c..eef812962b 100644 --- a/base_layer/core/src/mempool/reorg_pool/reorg_pool.rs +++ b/base_layer/core/src/mempool/reorg_pool/reorg_pool.rs @@ -27,7 +27,8 @@ use std::{ use log::*; use serde::{Deserialize, Serialize}; -use tari_common_types::types::{PrivateKey, Signature}; +use tari_common_types::types::PrivateKey; +use tari_crypto::ristretto::RistrettoSecretKey; use tari_utilities::hex::Hex; use crate::{ @@ -168,8 +169,8 @@ impl ReorgPool { } /// Check if a transaction is stored in the ReorgPool - pub fn has_tx_with_excess_sig(&self, excess_sig: &Signature) -> bool { - self.txs_by_signature.contains_key(excess_sig.get_signature()) + pub fn has_tx_with_excess_sig(&self, excess_sig: &RistrettoSecretKey) -> bool { + self.txs_by_signature.contains_key(excess_sig) } /// Remove the transactions from the ReorgPool that were used in provided removed blocks. The transactions @@ -385,26 +386,26 @@ mod test { reorg_pool.insert(1, tx1.clone()); reorg_pool.insert(2, tx2.clone()); - assert!(reorg_pool.has_tx_with_excess_sig(&tx1.body.kernels()[0].excess_sig)); - assert!(reorg_pool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig)); + assert!(reorg_pool.has_tx_with_excess_sig(tx1.body.kernels()[0].excess_sig.get_signature())); + assert!(reorg_pool.has_tx_with_excess_sig(tx2.body.kernels()[0].excess_sig.get_signature())); reorg_pool.insert(3, tx3.clone()); reorg_pool.insert(4, tx4.clone()); // Check that oldest utx was removed to make room for new incoming transactions - assert!(!reorg_pool.has_tx_with_excess_sig(&tx1.body.kernels()[0].excess_sig)); - assert!(!reorg_pool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig)); - assert!(reorg_pool.has_tx_with_excess_sig(&tx3.body.kernels()[0].excess_sig)); - assert!(reorg_pool.has_tx_with_excess_sig(&tx4.body.kernels()[0].excess_sig)); + assert!(!reorg_pool.has_tx_with_excess_sig(tx1.body.kernels()[0].excess_sig.get_signature())); + assert!(!reorg_pool.has_tx_with_excess_sig(tx2.body.kernels()[0].excess_sig.get_signature())); + assert!(reorg_pool.has_tx_with_excess_sig(tx3.body.kernels()[0].excess_sig.get_signature())); + assert!(reorg_pool.has_tx_with_excess_sig(tx4.body.kernels()[0].excess_sig.get_signature())); reorg_pool.insert(5, tx5.clone()); reorg_pool.insert(6, tx6.clone()); assert_eq!(reorg_pool.len(), 2); - assert!(!reorg_pool.has_tx_with_excess_sig(&tx1.body.kernels()[0].excess_sig)); - assert!(!reorg_pool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig)); - assert!(!reorg_pool.has_tx_with_excess_sig(&tx3.body.kernels()[0].excess_sig)); - assert!(!reorg_pool.has_tx_with_excess_sig(&tx4.body.kernels()[0].excess_sig)); - assert!(reorg_pool.has_tx_with_excess_sig(&tx5.body.kernels()[0].excess_sig)); - assert!(reorg_pool.has_tx_with_excess_sig(&tx6.body.kernels()[0].excess_sig)); + assert!(!reorg_pool.has_tx_with_excess_sig(tx1.body.kernels()[0].excess_sig.get_signature())); + assert!(!reorg_pool.has_tx_with_excess_sig(tx2.body.kernels()[0].excess_sig.get_signature())); + assert!(!reorg_pool.has_tx_with_excess_sig(tx3.body.kernels()[0].excess_sig.get_signature())); + assert!(!reorg_pool.has_tx_with_excess_sig(tx4.body.kernels()[0].excess_sig.get_signature())); + assert!(reorg_pool.has_tx_with_excess_sig(tx5.body.kernels()[0].excess_sig.get_signature())); + assert!(reorg_pool.has_tx_with_excess_sig(tx6.body.kernels()[0].excess_sig.get_signature())); } #[tokio::test] @@ -432,9 +433,9 @@ mod test { reorg_pool.insert(1, tx3.clone()); let txs = reorg_pool.clear_and_retrieve_all(); - assert!(!reorg_pool.has_tx_with_excess_sig(&tx1.body.kernels()[0].excess_sig)); - assert!(!reorg_pool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig)); - assert!(!reorg_pool.has_tx_with_excess_sig(&tx3.body.kernels()[0].excess_sig)); + assert!(!reorg_pool.has_tx_with_excess_sig(tx1.body.kernels()[0].excess_sig.get_signature())); + assert!(!reorg_pool.has_tx_with_excess_sig(tx2.body.kernels()[0].excess_sig.get_signature())); + assert!(!reorg_pool.has_tx_with_excess_sig(tx3.body.kernels()[0].excess_sig.get_signature())); assert!(reorg_pool.txs_by_height.is_empty()); assert!(reorg_pool.tx_by_key.is_empty()); assert!(reorg_pool.txs_by_signature.is_empty()); @@ -491,12 +492,12 @@ mod test { ]); // Oldest transaction tx1 is removed to make space for new incoming transactions assert_eq!(reorg_pool.len(), 6); - assert!(reorg_pool.has_tx_with_excess_sig(&tx1.body.kernels()[0].excess_sig)); - assert!(reorg_pool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig)); - assert!(reorg_pool.has_tx_with_excess_sig(&tx3.body.kernels()[0].excess_sig)); - assert!(reorg_pool.has_tx_with_excess_sig(&tx4.body.kernels()[0].excess_sig)); - assert!(reorg_pool.has_tx_with_excess_sig(&tx5.body.kernels()[0].excess_sig)); - assert!(reorg_pool.has_tx_with_excess_sig(&tx6.body.kernels()[0].excess_sig)); + assert!(reorg_pool.has_tx_with_excess_sig(tx1.body.kernels()[0].excess_sig.get_signature())); + assert!(reorg_pool.has_tx_with_excess_sig(tx2.body.kernels()[0].excess_sig.get_signature())); + assert!(reorg_pool.has_tx_with_excess_sig(tx3.body.kernels()[0].excess_sig.get_signature())); + assert!(reorg_pool.has_tx_with_excess_sig(tx4.body.kernels()[0].excess_sig.get_signature())); + assert!(reorg_pool.has_tx_with_excess_sig(tx5.body.kernels()[0].excess_sig.get_signature())); + assert!(reorg_pool.has_tx_with_excess_sig(tx6.body.kernels()[0].excess_sig.get_signature())); let reorg_blocks = &[ create_orphan_block(3000, vec![(*tx3).clone(), (*tx4).clone()], &consensus).into(), @@ -511,11 +512,11 @@ mod test { assert!(removed_txs.contains(&tx4)); assert_eq!(reorg_pool.len(), 2); - assert!(!reorg_pool.has_tx_with_excess_sig(&tx1.body.kernels()[0].excess_sig)); - assert!(!reorg_pool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig)); - assert!(!reorg_pool.has_tx_with_excess_sig(&tx3.body.kernels()[0].excess_sig)); - assert!(!reorg_pool.has_tx_with_excess_sig(&tx4.body.kernels()[0].excess_sig)); - assert!(reorg_pool.has_tx_with_excess_sig(&tx5.body.kernels()[0].excess_sig)); - assert!(reorg_pool.has_tx_with_excess_sig(&tx6.body.kernels()[0].excess_sig)); + assert!(!reorg_pool.has_tx_with_excess_sig(tx1.body.kernels()[0].excess_sig.get_signature())); + assert!(!reorg_pool.has_tx_with_excess_sig(tx2.body.kernels()[0].excess_sig.get_signature())); + assert!(!reorg_pool.has_tx_with_excess_sig(tx3.body.kernels()[0].excess_sig.get_signature())); + assert!(!reorg_pool.has_tx_with_excess_sig(tx4.body.kernels()[0].excess_sig.get_signature())); + assert!(reorg_pool.has_tx_with_excess_sig(tx5.body.kernels()[0].excess_sig.get_signature())); + assert!(reorg_pool.has_tx_with_excess_sig(tx6.body.kernels()[0].excess_sig.get_signature())); } } diff --git a/base_layer/core/src/mempool/service/inbound_handlers.rs b/base_layer/core/src/mempool/service/inbound_handlers.rs index ae43f1e9f0..7b0aad275e 100644 --- a/base_layer/core/src/mempool/service/inbound_handlers.rs +++ b/base_layer/core/src/mempool/service/inbound_handlers.rs @@ -23,9 +23,10 @@ use std::sync::Arc; use log::*; +use prost::Message; use tari_network::{identity::PeerId, GossipPublisher}; use tari_p2p::proto; -use tari_utilities::hex::Hex; +use tari_utilities::{hex::Hex, ByteArray}; #[cfg(feature = "metrics")] use crate::mempool::metrics; @@ -47,18 +48,22 @@ pub const LOG_TARGET: &str = "c::mp::service::inbound_handlers"; #[derive(Clone)] pub struct MempoolInboundHandlers { mempool: Mempool, - gossip_publisher: GossipPublisher, + gossip_publisher: GossipPublisher, } impl MempoolInboundHandlers { /// Construct the MempoolInboundHandlers. - pub fn new(mempool: Mempool, gossip_publisher: GossipPublisher) -> Self { + pub fn new(mempool: Mempool, gossip_publisher: GossipPublisher) -> Self { Self { mempool, gossip_publisher, } } + pub(super) fn mempool(&self) -> &Mempool { + &self.mempool + } + /// Handle inbound Mempool service requests from remote nodes and local services. pub async fn handle_request(&mut self, request: MempoolRequest) -> Result { trace!(target: LOG_TARGET, "Handling remote request: {}", request); @@ -67,30 +72,58 @@ impl MempoolInboundHandlers { GetStats => Ok(MempoolResponse::Stats(self.mempool.stats().await?)), GetState => Ok(MempoolResponse::State(self.mempool.state().await?)), GetTxStateByExcessSig(excess_sig) => Ok(MempoolResponse::TxStorage( - self.mempool.has_tx_with_excess_sig(excess_sig).await?, + self.mempool + .has_tx_with_excess_sig(excess_sig.get_signature().clone()) + .await?, )), SubmitTransaction(tx) => { let first_tx_kernel_excess_sig = tx .first_kernel_excess_sig() .ok_or(MempoolServiceError::TransactionNoKernels)? .get_signature() - .to_hex(); + .clone(); + debug!( target: LOG_TARGET, "Transaction ({}) submitted using request.", - first_tx_kernel_excess_sig, + first_tx_kernel_excess_sig.reveal(), ); let tx = Arc::new(tx); - let storage = self.submit_transaction(tx.clone()).await?; + let storage = self.insert_transaction(tx.clone()).await?; if storage.is_stored() { - let msg = - proto::common::Transaction::try_from(&*tx).map_err(MempoolServiceError::ConversionError)?; - // Gossip the transaction - if let Err(err) = self.gossip_publisher.publish(msg).await { - warn!( - target: LOG_TARGET, - "Error publishing transaction {}: {}.", first_tx_kernel_excess_sig, err - ); + let mut transaction_too_large_to_gossip = true; + // TODO: determine more precisely the maximum size of each transaction element + if tx.body.outputs().len() + tx.body.inputs().len() < 4 && tx.body().kernels().len() < 4 { + let msg = + proto::common::Transaction::try_from(&*tx).map_err(MempoolServiceError::ConversionError)?; + // TODO: allow configuration of full vs reference byte size + let encoded_len = msg.encoded_len(); + if encoded_len <= 64 * 1024 { + debug!(target: LOG_TARGET, "Transaction is less than 64KiB when encoded ({encoded_len}). Gossiping full transaction."); + transaction_too_large_to_gossip = false; + // Gossip the full transaction + if let Err(err) = self.gossip_publisher.publish(msg.into()).await { + warn!( + target: LOG_TARGET, + "Error publishing transaction {}: {}.", first_tx_kernel_excess_sig.reveal(), err + ); + } + } + } + + if transaction_too_large_to_gossip { + debug!(target: LOG_TARGET, "Transaction too large. Gossiping reference to the transaction."); + // Gossip a reference to the transaction + if let Err(err) = self + .gossip_publisher + .publish(first_tx_kernel_excess_sig.as_bytes().to_vec().into()) + .await + { + warn!( + target: LOG_TARGET, + "Error publishing transaction {}: {}.", first_tx_kernel_excess_sig.reveal(), err + ); + } } } Ok(MempoolResponse::TxStorage(storage)) @@ -112,7 +145,7 @@ impl MempoolInboundHandlers { .first_kernel_excess_sig() .ok_or(MempoolServiceError::TransactionNoKernels)? .get_signature() - .to_hex(); + .reveal(); debug!( target: LOG_TARGET, "Transaction ({}) received from {}.", @@ -120,12 +153,12 @@ impl MempoolInboundHandlers { source_peer ); let tx = Arc::new(tx); - self.submit_transaction(tx).await?; + self.insert_transaction(tx).await?; Ok(()) } - /// Submits a transaction to the mempool and propagate valid transactions. - async fn submit_transaction(&mut self, tx: Arc) -> Result { + /// Validates and inserts a transaction in the mempool + async fn insert_transaction(&mut self, tx: Arc) -> Result { trace!(target: LOG_TARGET, "submit_transaction: {}.", tx); let tx_storage = self.mempool.has_transaction(tx.clone()).await?; @@ -142,7 +175,7 @@ impl MempoolInboundHandlers { return Ok(tx_storage); } - match self.mempool.insert(tx.clone()).await { + match self.mempool.insert(tx).await { Ok(tx_storage) => { #[cfg(feature = "metrics")] if tx_storage.is_stored() { diff --git a/base_layer/core/src/mempool/service/initializer.rs b/base_layer/core/src/mempool/service/initializer.rs index a37a19d06c..54177591f4 100644 --- a/base_layer/core/src/mempool/service/initializer.rs +++ b/base_layer/core/src/mempool/service/initializer.rs @@ -29,6 +29,7 @@ use tari_service_framework::{ ServiceInitializer, ServiceInitializerContext, }; +use tokio::sync::mpsc; use crate::{ base_node::comms_interface::LocalNodeCommsInterface, @@ -40,6 +41,7 @@ use crate::{ service::{MempoolService, MempoolStreams}, MempoolHandle, }, + sync_protocol::NewTransactionNotification, }, topics::TRANSACTION_TOPIC, }; @@ -49,12 +51,13 @@ const LOG_TARGET: &str = "c::bn::mempool_service::initializer"; /// Initializer for the Mempool service and service future. pub struct MempoolServiceInitializer { mempool: Mempool, + want_list_tx: mpsc::UnboundedSender, } impl MempoolServiceInitializer { /// Create a new MempoolServiceInitializer from the inbound message subscriber. - pub fn new(mempool: Mempool) -> Self { - Self { mempool } + pub fn new(mempool: Mempool, want_list_tx: mpsc::UnboundedSender) -> Self { + Self { mempool, want_list_tx } } } @@ -71,6 +74,7 @@ impl ServiceInitializer for MempoolServiceInitializer { context.register_handle(local_mp_interface); let mempool = self.mempool.clone(); + let want_list_tx = self.want_list_tx.clone(); context.spawn_until_shutdown(move |handles| async move { let base_node = handles.expect_handle::(); @@ -92,7 +96,7 @@ impl ServiceInitializer for MempoolServiceInitializer { request_receiver, }; debug!(target: LOG_TARGET, "Mempool service started"); - if let Err(err) = MempoolService::new(inbound_handlers, network).start(streams).await { + if let Err(err) = MempoolService::new(inbound_handlers, network, want_list_tx).start(streams).await { error!(target: LOG_TARGET, "Mempool service error: {}", err); } }); diff --git a/base_layer/core/src/mempool/service/service.rs b/base_layer/core/src/mempool/service/service.rs index a567d975e9..07c8269e3d 100644 --- a/base_layer/core/src/mempool/service/service.rs +++ b/base_layer/core/src/mempool/service/service.rs @@ -24,19 +24,24 @@ use std::{convert::TryFrom, sync::Arc}; use futures::{pin_mut, stream::StreamExt, Stream}; use log::*; +use tari_crypto::ristretto::RistrettoSecretKey; use tari_network::{gossipsub, GossipMessage, GossipSubscription, InboundGossipError, NetworkHandle}; use tari_p2p::proto; use tari_service_framework::{reply_channel, reply_channel::RequestContext}; -use tari_utilities::hex::Hex; -use tokio::task; +use tari_utilities::{hex::Hex, ByteArray}; +use tokio::{sync::mpsc, task}; use crate::{ base_node::comms_interface::{BlockEvent, BlockEventReceiver}, - mempool::service::{ - error::MempoolServiceError, - inbound_handlers::MempoolInboundHandlers, - MempoolRequest, - MempoolResponse, + mempool::{ + service::{ + error::MempoolServiceError, + inbound_handlers::MempoolInboundHandlers, + MempoolRequest, + MempoolResponse, + }, + sync_protocol::NewTransactionNotification, + transaction_id::MempoolTransactionId, }, transactions::transaction_components::Transaction, }; @@ -45,7 +50,7 @@ const LOG_TARGET: &str = "c::mempool::service::service"; /// A convenience struct to hold all the Mempool service streams pub struct MempoolStreams { - pub transaction_subscription: GossipSubscription, + pub transaction_subscription: GossipSubscription, pub local_request_stream: SLocalReq, pub block_event_stream: BlockEventReceiver, pub request_receiver: reply_channel::TryReceiver, @@ -56,13 +61,19 @@ pub struct MempoolStreams { pub struct MempoolService { inbound_handlers: MempoolInboundHandlers, network: NetworkHandle, + new_transactions_tx: mpsc::UnboundedSender, } impl MempoolService { - pub fn new(inbound_handlers: MempoolInboundHandlers, network: NetworkHandle) -> Self { + pub fn new( + inbound_handlers: MempoolInboundHandlers, + network: NetworkHandle, + new_transactions_tx: mpsc::UnboundedSender, + ) -> Self { Self { inbound_handlers, network, + new_transactions_tx, } } @@ -141,7 +152,11 @@ impl MempoolService { }); } - async fn handle_incoming_tx(&self, result: Result, InboundGossipError>) { + #[allow(clippy::too_many_lines)] + async fn handle_incoming_tx( + &self, + result: Result, InboundGossipError>, + ) { let msg = match result { Ok(msg) => msg, Err(err) => { @@ -161,44 +176,151 @@ impl MempoolService { }, }; - let source_peer_id = msg.propagation_source; - let transaction = match Transaction::try_from(msg.message) { - Ok(tx) => tx, - Err(e) => { - warn!( - target: LOG_TARGET, - "Received transaction message from {} with invalid transaction: {:?}", source_peer_id, e - ); - if let Err(err) = self - .network - .report_gossip_message_validation_result( - msg.message_id, - msg.propagation_source, - gossipsub::MessageAcceptance::Reject, - ) - .await - { - warn!(target: LOG_TARGET, "handle_incoming_tx call network: {err}"); + let propagation_source = msg.propagation_source; + let proto::mempool::NewTransaction { payload: Some(payload) } = msg.message else { + warn!(target: LOG_TARGET, "Rejecting gossiped message from peer {propagation_source} because it contains an empty payload."); + if let Err(err) = self + .network + .report_gossip_message_validation_result( + msg.message_id, + propagation_source, + gossipsub::MessageAcceptance::Reject, + ) + .await + { + warn!(target: LOG_TARGET, "handle_incoming_tx call network: {err}"); + } + return; + }; + + let transaction = match payload { + proto::mempool::new_transaction::Payload::ExcessSig(excess_sig) => { + match RistrettoSecretKey::from_canonical_bytes(&excess_sig) { + Ok(excess_sig) => { + let tx_id = MempoolTransactionId::try_from(excess_sig.as_bytes()).unwrap(); + match self.inbound_handlers.mempool().has_tx_with_excess_sig(excess_sig).await { + Ok(status) if status.is_new() => { + if self + .new_transactions_tx + .send(NewTransactionNotification { + transaction_id: tx_id, + propagation_source, + message_id: msg.message_id, + }) + .is_err() + { + error!(target: LOG_TARGET, "🚨 Want list sender has closed"); + } + }, + Ok(status) if status.is_stored() => { + // Already stored, we can forward the gossip + if let Err(err) = self + .network + .report_gossip_message_validation_result( + msg.message_id, + msg.propagation_source, + gossipsub::MessageAcceptance::Accept, + ) + .await + { + warn!(target: LOG_TARGET, "handle_incoming_tx call network: {err}"); + } + }, + Ok(status) => { + // Technically unreachable since has_tx_with_excess_sig does not return this + warn!(target: LOG_TARGET, "has_tx_with_excess_sig returned unexpected status {status}"); + // Already stored, we can forward the gossip + if let Err(err) = self + .network + .report_gossip_message_validation_result( + msg.message_id, + msg.propagation_source, + gossipsub::MessageAcceptance::Reject, + ) + .await + { + warn!(target: LOG_TARGET, "handle_incoming_tx call network: {err}"); + } + }, + Err(err) => { + // has_tx_with_excess_sig is infallible unless the mempool panics, so we'll just log + // this and ignore. + error!(target: LOG_TARGET, "NEVER HAPPEN: has_tx_with_excess_sig errored {err}"); + if let Err(err) = self + .network + .report_gossip_message_validation_result( + msg.message_id, + msg.propagation_source, + gossipsub::MessageAcceptance::Ignore, + ) + .await + { + warn!(target: LOG_TARGET, "handle_incoming_tx call network: {err}"); + } + }, + } + }, + Err(_) => { + debug!(target: LOG_TARGET, "NewTransaction message from gossip contained an invalid excess sig, ignoring"); + if let Err(err) = self + .network + .report_gossip_message_validation_result( + msg.message_id, + msg.propagation_source, + gossipsub::MessageAcceptance::Reject, + ) + .await + { + warn!(target: LOG_TARGET, "handle_incoming_tx call network: {err}"); + } + }, } return; }, + proto::mempool::new_transaction::Payload::Transaction(transaction) => { + match Transaction::try_from(transaction) { + Ok(tx) => tx, + Err(e) => { + warn!( + target: LOG_TARGET, + "Received transaction message from {} with invalid transaction: {:?}", propagation_source, e + ); + if let Err(err) = self + .network + .report_gossip_message_validation_result( + msg.message_id, + msg.propagation_source, + gossipsub::MessageAcceptance::Reject, + ) + .await + { + warn!(target: LOG_TARGET, "handle_incoming_tx call network: {err}"); + } + return; + }, + } + }, }; + debug!( "New transaction received: {}, from: {}", transaction .first_kernel_excess_sig() .map(|s| s.get_signature().to_hex()) .unwrap_or_else(|| "No kernels!".to_string()), - source_peer_id, + propagation_source, ); trace!( target: LOG_TARGET, "New transaction: {}, from: {}", transaction, - source_peer_id, + propagation_source, ); let mut inbound_handlers = self.inbound_handlers.clone(); - if let Err(e) = inbound_handlers.handle_transaction(transaction, source_peer_id).await { + if let Err(e) = inbound_handlers + .handle_transaction(transaction, propagation_source) + .await + { error!( target: LOG_TARGET, "Failed to handle incoming transaction message: {:?}", e diff --git a/base_layer/core/src/mempool/sync_protocol/error.rs b/base_layer/core/src/mempool/sync_protocol/error.rs index 1e8f228d03..07b80dfde9 100644 --- a/base_layer/core/src/mempool/sync_protocol/error.rs +++ b/base_layer/core/src/mempool/sync_protocol/error.rs @@ -45,4 +45,6 @@ pub enum MempoolProtocolError { SendTimeout, #[error("Receive timeout occurred")] RecvTimeout, + #[error("Invalid request: {details}")] + InvalidRequest { details: String }, } diff --git a/base_layer/core/src/mempool/sync_protocol/initializer.rs b/base_layer/core/src/mempool/sync_protocol/initializer.rs index 90b601c7c4..e0c69a75ed 100644 --- a/base_layer/core/src/mempool/sync_protocol/initializer.rs +++ b/base_layer/core/src/mempool/sync_protocol/initializer.rs @@ -28,7 +28,7 @@ use tokio::sync::mpsc; use crate::{ base_node::{comms_interface::LocalNodeCommsInterface, StateMachineHandle}, mempool::{ - sync_protocol::{MempoolSyncProtocol, MEMPOOL_SYNC_PROTOCOL}, + sync_protocol::{MempoolSyncProtocol, NewTransactionNotification, MEMPOOL_SYNC_PROTOCOL}, Mempool, MempoolServiceConfig, }, @@ -39,11 +39,20 @@ const LOG_TARGET: &str = "c::mempool::sync_protocol"; pub struct MempoolSyncInitializer { config: MempoolServiceConfig, mempool: Mempool, + want_list_rx: Option>, } impl MempoolSyncInitializer { - pub fn new(config: MempoolServiceConfig, mempool: Mempool) -> Self { - Self { mempool, config } + pub fn new( + config: MempoolServiceConfig, + mempool: Mempool, + want_list_rx: mpsc::UnboundedReceiver, + ) -> Self { + Self { + mempool, + config, + want_list_rx: Some(want_list_rx), + } } } @@ -53,6 +62,10 @@ impl ServiceInitializer for MempoolSyncInitializer { debug!(target: LOG_TARGET, "Initializing Mempool Sync Service"); let config = self.config.clone(); let mempool = self.mempool.clone(); + let want_list_rx = self + .want_list_rx + .take() + .expect("MempoolSyncInitializer initialized more than once"); let mut mdc = vec![]; log_mdc::iter(|k, v| mdc.push((k.to_owned(), v.to_owned()))); @@ -89,7 +102,7 @@ impl ServiceInitializer for MempoolSyncInitializer { } let base_node_events = base_node.get_block_event_stream(); - MempoolSyncProtocol::new(config, notif_rx, mempool, network, base_node_events) + MempoolSyncProtocol::new(config, notif_rx, mempool, network, base_node_events, want_list_rx) .run(network_events) .await; }); diff --git a/base_layer/core/src/mempool/sync_protocol/mod.rs b/base_layer/core/src/mempool/sync_protocol/mod.rs index 8294ac146e..5c2be9bc73 100644 --- a/base_layer/core/src/mempool/sync_protocol/mod.rs +++ b/base_layer/core/src/mempool/sync_protocol/mod.rs @@ -64,9 +64,8 @@ //! ``` use std::{ - collections::HashSet, - convert::TryFrom, - iter, + collections::{HashMap, HashSet}, + future::poll_fn, sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -74,36 +73,41 @@ use std::{ time::Duration, }; -use error::MempoolProtocolError; -use futures::{stream, SinkExt, Stream, StreamExt}; pub use initializer::MempoolSyncInitializer; use libp2p_substream::{ProtocolEvent, ProtocolNotification, Substream}; use log::*; -use prost::{bytes::Bytes, Message}; -use tari_network::{identity::PeerId, NetworkEvent, NetworkHandle, StreamProtocol}; -use tari_p2p::{framing, framing::CanonicalFraming, proto as shared_proto, proto::mempool as proto}; -use tari_utilities::{hex::Hex, ByteArray}; +use tari_network::{ + gossipsub::{MessageAcceptance, MessageId}, + identity::PeerId, + NetworkEvent, + NetworkHandle, + StreamProtocol, +}; +use tari_p2p::framing; use tokio::{ sync::{broadcast, mpsc, Semaphore}, task, time, + time::MissedTickBehavior, }; -#[cfg(feature = "metrics")] -use crate::mempool::metrics; use crate::{ base_node::comms_interface::{BlockEvent, BlockEventReceiver}, chain_storage::BlockAddResult, - mempool::{Mempool, MempoolServiceConfig}, - transactions::transaction_components::Transaction, + mempool::{ + sync_protocol::protocol::MempoolPeerProtocol, + transaction_id::MempoolTransactionId, + Mempool, + MempoolServiceConfig, + }, }; - // FIXME: fix these tests // #[cfg(test)] // mod test; mod error; mod initializer; +mod protocol; const MAX_FRAME_SIZE: usize = 3 * 1024 * 1024; // 3 MiB const LOG_TARGET: &str = "c::mempool::sync_protocol"; @@ -119,6 +123,15 @@ pub struct MempoolSyncProtocol { permits: Arc, network: NetworkHandle, block_event_stream: BlockEventReceiver, + want_list_rx: mpsc::UnboundedReceiver, + pending_request_task: Option>, + inbound_tasks: futures_bounded::FuturesSet<()>, +} + +pub struct NewTransactionNotification { + pub propagation_source: PeerId, + pub transaction_id: MempoolTransactionId, + pub message_id: MessageId, } impl MempoolSyncProtocol { @@ -128,9 +141,9 @@ impl MempoolSyncProtocol { mempool: Mempool, network: NetworkHandle, block_event_stream: BlockEventReceiver, + want_list_rx: mpsc::UnboundedReceiver, ) -> Self { Self { - config, protocol_notifier, mempool, peers_attempted: HashSet::new(), @@ -138,19 +151,41 @@ impl MempoolSyncProtocol { permits: Arc::new(Semaphore::new(1)), network, block_event_stream, + want_list_rx, + inbound_tasks: futures_bounded::FuturesSet::new( + Duration::from_secs(60), + config.max_concurrent_inbound_tasks, + ), + pending_request_task: None, + config, } } pub async fn run(mut self, mut network_events: broadcast::Receiver) { info!(target: LOG_TARGET, "Mempool protocol handler has started"); + let mut want_list_buffer = Vec::new(); + + let mut interval = time::interval(self.config.request_want_list_interval); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + loop { + let mut inbound_tasks = poll_fn(|cx| self.inbound_tasks.poll_unpin(cx)); tokio::select! { + _ = interval.tick() => { + if self.is_done.load(Ordering::SeqCst) { + self.request_wanted_transactions(&mut want_list_buffer).await; + } + } + + // Work on inbound tasks + _ = &mut inbound_tasks => {}, + Ok(block_event) = self.block_event_stream.recv() => { self.handle_block_event(&block_event).await; }, Ok(event) = network_events.recv() => { - self.handle_network_event(event).await; + self.handle_network_event(event); }, Some(notif) = self.protocol_notifier.recv() => { @@ -160,7 +195,7 @@ impl MempoolSyncProtocol { } } - async fn handle_network_event(&mut self, event: NetworkEvent) { + fn handle_network_event(&mut self, event: NetworkEvent) { #[allow(clippy::single_match)] match event { NetworkEvent::PeerIdentified { @@ -168,17 +203,110 @@ impl MempoolSyncProtocol { supported_protocols, .. } => { - if !self.is_synched() && - !self.has_attempted_peer(peer_id) && - supported_protocols.iter().any(|p| *p == MEMPOOL_SYNC_PROTOCOL) - { - self.spawn_initiator_protocol(peer_id).await; + if self.is_synched() || self.has_attempted_peer(peer_id) { + debug!(target: LOG_TARGET, "PeerConnected: Local node already synced or already attempted peer {peer_id}"); + } else if supported_protocols.iter().any(|p| *p == MEMPOOL_SYNC_PROTOCOL) { + debug!(target: LOG_TARGET, "PeerConnected: initiating sync with peer {peer_id}"); + self.peers_attempted.insert(peer_id); + self.spawn_initiator_sync_protocol(peer_id, false); + } else { + debug!(target: LOG_TARGET, "PeerConnected: remote peer {peer_id}s is not a mempool sync peer"); } }, _ => {}, } } + async fn request_wanted_transactions(&mut self, buffer: &mut Vec) { + if self.pending_request_task.as_ref().map_or(false, |t| !t.is_finished()) { + debug!(target: LOG_TARGET, "Want list request in progress"); + return; + } + self.pending_request_task = None; + + if self.want_list_rx.is_empty() { + trace!(target: LOG_TARGET, "No transactions in want list"); + return; + } + + let remaining_buf_space = self.config.max_request_transactions.saturating_sub(buffer.len()); + if remaining_buf_space > 0 { + // Guaranteed to add at least one item and not await indefinitely because we check is_empty() above + self.want_list_rx.recv_many(buffer, remaining_buf_space).await; + } + + let config = self.config.clone(); + let mempool = self.mempool.clone(); + let network = self.network.clone(); + let mut grouped = HashMap::with_capacity(buffer.len()); + + buffer + .drain(..) + .map(|n| (n.propagation_source, n)) + .for_each(|(key, val)| { + grouped.entry(key).or_insert_with(Vec::new).push(val); + }); + + let task = task::spawn(async move { + for (peer_id, notifs) in grouped { + match network + .open_framed_substream(peer_id, &MEMPOOL_SYNC_PROTOCOL, MAX_FRAME_SIZE) + .await + { + Ok(framed) => { + let mut protocol = MempoolPeerProtocol::new(&config, framed, peer_id, &mempool); + let progress = protocol.request_transactions(notifs).await; + // Resolve each of the processed messages for this peer + let resolved = progress + .accept + .into_iter() + .map(|id| (id, MessageAcceptance::Accept)) + .chain(progress.ignore.into_iter().map(|id| (id, MessageAcceptance::Ignore))) + .chain(progress.reject.into_iter().map(|id| (id, MessageAcceptance::Reject))); + + for (id, acceptance) in resolved { + if let Err(err) = network + .report_gossip_message_validation_result(id, peer_id, acceptance) + .await + { + // This can only happen if the network is shutdown or crashes. no further calls will be + // possible so we can stop trying + error!(target: LOG_TARGET, "Failed to notify network: {}", err); + break; + } + } + }, + Err(err) => { + warn!( + target: LOG_TARGET, + "Unable to establish mempool request protocol substream to peer `{}`: {}", + peer_id, + err + ); + // Fail for all peer notifications + for notif in notifs { + if let Err(err) = network + .report_gossip_message_validation_result( + notif.message_id, + notif.propagation_source, + MessageAcceptance::Ignore, + ) + .await + { + error!( + target: LOG_TARGET, + "report_gossip_message_validation_result error: {}", + err + ) + } + } + }, + } + } + }); + self.pending_request_task = Some(task); + } + async fn handle_block_event(&mut self, block_event: &BlockEvent) { use BlockEvent::{BlockSyncComplete, ValidBlockAdded}; match block_event { @@ -200,7 +328,6 @@ impl MempoolSyncProtocol { // we want to at least sync initial_sync_num_peers, so we reset the num_synced to 0, so it can run till // initial_sync_num_peers again. This is made to run as a best effort in that it will at least run the // initial_sync_num_peers - self.peers_attempted.clear(); let connections = match self .network .select_random_connections(self.config.initial_sync_num_peers, Default::default()) @@ -222,7 +349,7 @@ impl MempoolSyncProtocol { }, }; for connection in connections { - self.spawn_initiator_protocol(connection.peer_id).await; + self.spawn_initiator_sync_protocol(connection.peer_id, true); } } @@ -237,23 +364,23 @@ impl MempoolSyncProtocol { fn handle_protocol_notification(&mut self, notification: ProtocolNotification) { match notification.event { ProtocolEvent::NewInboundSubstream { peer_id, substream } => { - self.spawn_inbound_handler(peer_id, substream); + // TODO: we need to limit the number of sessions we handle - switch to using RPC? + self.start_inbound_handler(peer_id, substream); }, } } - async fn spawn_initiator_protocol(&mut self, peer_id: PeerId) { + fn spawn_initiator_sync_protocol(&self, peer_id: PeerId, force_sync: bool) { let mempool = self.mempool.clone(); let permits = self.permits.clone(); let is_done = self.is_done.clone(); let config = self.config.clone(); let network = self.network.clone(); let num_synced = self.peers_attempted.len(); - self.peers_attempted.insert(peer_id); task::spawn(async move { // Only initiate this protocol with a single peer at a time let _permit = permits.acquire().await; - if is_done.load(Ordering::SeqCst) { + if !force_sync && is_done.load(Ordering::SeqCst) { return; } match network @@ -262,8 +389,8 @@ impl MempoolSyncProtocol { { Ok(framed) => { let initial_sync_num_peers = config.initial_sync_num_peers; - let protocol = MempoolPeerProtocol::new(config, framed, peer_id, mempool); - match protocol.start_initiator().await { + let protocol = MempoolPeerProtocol::new(&config, framed, peer_id, &mempool); + match protocol.start_initiator_sync().await { Ok(_) => { debug!( target: LOG_TARGET, @@ -294,12 +421,12 @@ impl MempoolSyncProtocol { }); } - fn spawn_inbound_handler(&self, peer_id: PeerId, substream: Substream) { + fn start_inbound_handler(&mut self, peer_id: PeerId, substream: Substream) { let mempool = self.mempool.clone(); let config = self.config.clone(); - task::spawn(async move { + let fut = async move { let framed = framing::canonical(substream, MAX_FRAME_SIZE); - let mut protocol = MempoolPeerProtocol::new(config, framed, peer_id, mempool); + let mut protocol = MempoolPeerProtocol::new(&config, framed, peer_id, &mempool); match protocol.start_responder().await { Ok(_) => { debug!( @@ -317,344 +444,9 @@ impl MempoolSyncProtocol { ); }, } - }); - } -} - -struct MempoolPeerProtocol { - config: MempoolServiceConfig, - framed: CanonicalFraming, - mempool: Mempool, - peer_id: PeerId, -} - -impl MempoolPeerProtocol { - pub fn new( - config: MempoolServiceConfig, - framed: CanonicalFraming, - peer_id: PeerId, - mempool: Mempool, - ) -> Self { - Self { - config, - framed, - mempool, - peer_id, - } - } - - pub async fn start_initiator(mut self) -> Result<(), MempoolProtocolError> { - match self.start_initiator_inner().await { - Ok(_) => { - debug!(target: LOG_TARGET, "Initiator protocol complete"); - Ok(()) - }, - Err(err) => { - if let Err(err) = self.framed.flush().await { - debug!(target: LOG_TARGET, "IO error when flushing stream: {}", err); - } - if let Err(err) = self.framed.close().await { - debug!(target: LOG_TARGET, "IO error when closing stream: {}", err); - } - Err(err) - }, - } - } - - async fn start_initiator_inner(&mut self) -> Result<(), MempoolProtocolError> { - debug!( - target: LOG_TARGET, - "Starting initiator mempool sync for peer `{}`", - self.peer_id, - ); - - let transactions = self.mempool.snapshot().await?; - let items = transactions - .iter() - .take(self.config.initial_sync_max_transactions) - .filter_map(|txn| txn.first_kernel_excess_sig()) - .map(|excess| excess.get_signature().to_vec()) - .collect(); - let inventory = proto::TransactionInventory { items }; - - // Send an inventory of items currently in this node's mempool - debug!( - target: LOG_TARGET, - "Sending transaction inventory containing {} item(s) to peer `{}`", - inventory.items.len(), - self.peer_id, - ); - - self.write_message(inventory).await?; - - self.read_and_insert_transactions_until_complete().await?; - - let missing_items: proto::InventoryIndexes = self.read_message().await?; - debug!( - target: LOG_TARGET, - "Received {} missing transaction index(es) from peer `{}`", - missing_items.indexes.len(), - self.peer_id, - ); - let missing_txns = missing_items - .indexes - .iter() - .filter_map(|idx| transactions.get(*idx as usize).cloned()) - .collect::>(); - debug!( - target: LOG_TARGET, - "Sending {} missing transaction(s) to peer `{}`", - missing_items.indexes.len(), - self.peer_id, - ); - - // If we don't have any transactions at the given indexes we still need to send back an empty if they requested - // at least one index - if !missing_items.indexes.is_empty() { - self.write_transactions(missing_txns).await?; - } - - // Close the stream after writing - self.framed.close().await?; - - Ok(()) - } - - pub async fn start_responder(&mut self) -> Result<(), MempoolProtocolError> { - match self.start_responder_inner().await { - Ok(_) => { - debug!(target: LOG_TARGET, "Responder protocol complete"); - Ok(()) - }, - Err(err) => { - if let Err(err) = self.framed.flush().await { - debug!(target: LOG_TARGET, "IO error when flushing stream: {}", err); - } - if let Err(err) = self.framed.close().await { - debug!(target: LOG_TARGET, "IO error when closing stream: {}", err); - } - Err(err) - }, - } - } - - async fn start_responder_inner(&mut self) -> Result<(), MempoolProtocolError> { - debug!( - target: LOG_TARGET, - "Starting responder mempool sync for peer `{}`", - self.peer_id, - ); - - let inventory: proto::TransactionInventory = self.read_message().await?; - - debug!( - target: LOG_TARGET, - "Received inventory from peer `{}` containing {} item(s)", - self.peer_id, - inventory.items.len() - ); - - let transactions = self.mempool.snapshot().await?; - - let mut duplicate_inventory_items = Vec::new(); - let (transactions, _) = transactions.into_iter().partition::, _>(|transaction| { - let excess_sig = transaction - .first_kernel_excess_sig() - .expect("transaction stored in mempool did not have any kernels"); - - let has_item = inventory - .items - .iter() - .position(|bytes| bytes.as_slice() == excess_sig.get_signature().as_bytes()); - - match has_item { - Some(pos) => { - duplicate_inventory_items.push(pos); - false - }, - None => true, - } - }); - - debug!( - target: LOG_TARGET, - "Streaming {} transaction(s) to peer `{}`", - transactions.len(), - self.peer_id, - ); - - self.write_transactions(transactions).await?; - - // Generate an index list of inventory indexes that this node does not have - #[allow(clippy::cast_possible_truncation)] - let missing_items = inventory - .items - .into_iter() - .enumerate() - .filter_map(|(i, _)| { - if duplicate_inventory_items.contains(&i) { - None - } else { - Some(i as u32) - } - }) - .collect::>(); - debug!( - target: LOG_TARGET, - "Requesting {} missing transaction index(es) from peer `{}`", - missing_items.len(), - self.peer_id, - ); - - let missing_items = proto::InventoryIndexes { indexes: missing_items }; - let num_missing_items = missing_items.indexes.len(); - self.write_message(missing_items).await?; - - if num_missing_items > 0 { - debug!(target: LOG_TARGET, "Waiting for missing transactions"); - self.read_and_insert_transactions_until_complete().await?; - } - - Ok(()) - } - - async fn read_and_insert_transactions_until_complete(&mut self) -> Result<(), MempoolProtocolError> { - let mut num_recv = 0; - while let Some(result) = self.framed.next().await { - let bytes = result?; - let item = proto::TransactionItem::decode(&mut bytes.freeze()).map_err(|err| { - MempoolProtocolError::DecodeFailed { - source: err, - peer: self.peer_id, - } - })?; - - match item.transaction { - Some(txn) => { - self.validate_and_insert_transaction(txn).await?; - num_recv += 1; - }, - None => { - debug!( - target: LOG_TARGET, - "All transaction(s) (count={}) received from peer `{}`. ", - num_recv, - self.peer_id, - ); - break; - }, - } - } - - #[allow(clippy::cast_possible_truncation)] - #[allow(clippy::cast_possible_wrap)] - #[cfg(feature = "metrics")] - { - let stats = self.mempool.stats().await?; - metrics::unconfirmed_pool_size().set(stats.unconfirmed_txs as i64); - metrics::reorg_pool_size().set(stats.reorg_txs as i64); - } - - Ok(()) - } - - async fn validate_and_insert_transaction( - &mut self, - txn: shared_proto::common::Transaction, - ) -> Result<(), MempoolProtocolError> { - let txn = Transaction::try_from(txn).map_err(|err| MempoolProtocolError::MessageConversionFailed { - peer: self.peer_id, - message: err, - })?; - let excess_sig = txn - .first_kernel_excess_sig() - .ok_or_else(|| MempoolProtocolError::ExcessSignatureMissing(self.peer_id))?; - let excess_sig_hex = excess_sig.get_signature().to_hex(); - - debug!( - target: LOG_TARGET, - "Received transaction `{}` from peer `{}`", - excess_sig_hex, - self.peer_id, - ); - let txn = Arc::new(txn); - let store_state = self.mempool.has_transaction(txn.clone()).await?; - if store_state.is_stored() { - return Ok(()); - } - - let stored_result = self.mempool.insert(txn).await?; - if stored_result.is_stored() { - #[cfg(feature = "metrics")] - metrics::inbound_transactions().inc(); - debug!( - target: LOG_TARGET, - "Inserted transaction `{}` from peer `{}`", - excess_sig_hex, - self.peer_id, - ); - } else { - #[cfg(feature = "metrics")] - metrics::rejected_inbound_transactions().inc(); - debug!( - target: LOG_TARGET, - "Did not store new transaction `{}` in mempool: {}", excess_sig_hex, stored_result - ) + }; + if self.inbound_tasks.try_push(fut).is_err() { + warn!(target: LOG_TARGET, "Rejecting inbound task for peer {peer_id} because we've reached the max_concurrent_inbound_tasks ({})", self.config.max_concurrent_inbound_tasks); } - - Ok(()) - } - - async fn write_transactions(&mut self, transactions: Vec>) -> Result<(), MempoolProtocolError> { - let txns = transactions.into_iter().take(self.config.initial_sync_max_transactions) - .filter_map(|txn| { - match shared_proto::common::Transaction::try_from(&*txn) { - Ok(txn) => Some(proto::TransactionItem { - transaction: Some(txn), - }), - Err(e) => { - warn!(target: LOG_TARGET, "Could not convert transaction: {}", e); - None - } - } - }) - // Write an empty `TransactionItem` to indicate we're done - .chain(iter::once(proto::TransactionItem::empty())); - - self.write_messages(stream::iter(txns)).await?; - - Ok(()) - } - - async fn read_message(&mut self) -> Result { - let msg = time::timeout(Duration::from_secs(10), self.framed.next()) - .await - .map_err(|_| MempoolProtocolError::RecvTimeout)? - .ok_or_else(|| MempoolProtocolError::SubstreamClosed(self.peer_id))??; - - T::decode(&mut msg.freeze()).map_err(|err| MempoolProtocolError::DecodeFailed { - source: err, - peer: self.peer_id, - }) - } - - async fn write_messages(&mut self, stream: S) -> Result<(), MempoolProtocolError> - where - S: Stream + Unpin, - T: prost::Message, - { - let mut s = stream.map(|m| Bytes::from(m.encode_to_vec())).map(Ok); - self.framed.send_all(&mut s).await?; - Ok(()) - } - - async fn write_message(&mut self, message: T) -> Result<(), MempoolProtocolError> { - time::timeout( - Duration::from_secs(10), - self.framed.send(message.encode_to_vec().into()), - ) - .await - .map_err(|_| MempoolProtocolError::SendTimeout)??; - Ok(()) } } diff --git a/base_layer/core/src/mempool/sync_protocol/protocol.rs b/base_layer/core/src/mempool/sync_protocol/protocol.rs new file mode 100644 index 0000000000..0496ecddcf --- /dev/null +++ b/base_layer/core/src/mempool/sync_protocol/protocol.rs @@ -0,0 +1,596 @@ +// Copyright 2024 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +use std::{ + collections::{HashMap, HashSet}, + iter, + sync::Arc, + time::{Duration, Instant}, +}; + +use futures::{stream, SinkExt, Stream, StreamExt}; +use libp2p_substream::Substream; +use log::{debug, warn}; +use prost::Message; +use tari_network::{gossipsub::MessageId, identity::PeerId}; +use tari_p2p::{ + framing::CanonicalFraming, + proto as shared_proto, + proto::{mempool as proto, mempool::MempoolSyncRequest}, +}; +use tari_rpc_framework::__macro_reexports::Bytes; +use tari_utilities::{hex::Hex, ByteArray}; +use tokio::time; + +#[cfg(feature = "metrics")] +use crate::mempool::metrics; +use crate::{ + mempool::{ + sync_protocol::{error::MempoolProtocolError, NewTransactionNotification}, + transaction_id::MempoolTransactionId, + Mempool, + MempoolError, + MempoolServiceConfig, + TxStorageResponse, + }, + transactions::transaction_components::Transaction, +}; + +const LOG_TARGET: &str = "c::mempool::sync_protocol"; +pub(super) struct MempoolPeerProtocol<'a> { + config: &'a MempoolServiceConfig, + framed: CanonicalFraming, + mempool: &'a Mempool, + peer_id: PeerId, +} + +impl<'a> MempoolPeerProtocol<'a> { + pub fn new( + config: &'a MempoolServiceConfig, + framed: CanonicalFraming, + peer_id: PeerId, + mempool: &'a Mempool, + ) -> Self { + Self { + config, + framed, + mempool, + peer_id, + } + } + + pub async fn request_transactions( + &mut self, + notifs: Vec, + ) -> RequestedTransactionProgress { + let timer = Instant::now(); + debug!(target: LOG_TARGET, "Request transactions protocol started. Want {} transaction(s)", notifs.len()); + let progress = self.request_transactions_inner(notifs).await; + debug!(target: LOG_TARGET, "Request transactions protocol complete in {:.2?}", timer.elapsed()); + if let Err(err) = self.framed.close().await { + debug!(target: LOG_TARGET, "IO error when closing stream: {}", err); + } + progress + } + + #[allow(clippy::too_many_lines)] + pub async fn request_transactions_inner( + &mut self, + notifs: Vec, + ) -> RequestedTransactionProgress { + let mut set = notifs + .into_iter() + .map(|n| (n.transaction_id, n.message_id)) + .collect::>(); + + if set.is_empty() { + debug!(target: LOG_TARGET, "No transactions to request"); + return RequestedTransactionProgress::default(); + } + + let num_requested = set.len(); + + let ids = set.keys().map(|id| id.as_bytes().to_vec()).collect::>(); + let request = MempoolSyncRequest::from(proto::RequestSpecificTransactions { ids }); + if let Err(err) = self.write_message(request).await { + warn!(target: LOG_TARGET, "Failed to send request to peer {}: {}. All gossips will be ignored", self.peer_id, err); + return RequestedTransactionProgress::ignore_many(set.into_values()); + } + + let mut accept = Vec::new(); + let mut reject = Vec::new(); + let mut ignore = Vec::new(); + + let mut num_recv = 0; + while let Some(result) = self.framed.next().await { + let read_and_decode_result = result.map_err(MempoolProtocolError::IoError).and_then(|bytes| { + proto::TransactionItem::decode(&mut bytes.freeze()).map_err(|err| MempoolProtocolError::DecodeFailed { + source: err, + peer: self.peer_id, + }) + }); + let item = match read_and_decode_result { + Ok(item) => item, + Err(err) => { + warn!(target: LOG_TARGET, "Error reading from stream: {err}. All gossip messages for peer {} will be ignored", self.peer_id); + return RequestedTransactionProgress::ignore_many(set.into_values()); + }, + }; + + match item.transaction { + Some(txn) => { + let Some(tx_id) = extract_transaction_id(&txn) else { + warn!(target: LOG_TARGET, "Peer returned an invalid transaction with no first kernel"); + continue; + }; + let Some(msg_id) = set.remove(&tx_id) else { + // Not requested + warn!(target: LOG_TARGET, "Peer sent transaction {tx_id} that was not requested."); + continue; + }; + num_recv += 1; + debug!(target: LOG_TARGET, "Requested transaction {tx_id} received"); + match self.validate_and_insert_transaction(txn).await { + Ok(stored) if stored.is_stored() => { + accept.push(msg_id); + }, + Ok(stored) => { + warn!(target: LOG_TARGET, "Transaction {tx_id} was not stored: {stored}"); + ignore.push(msg_id); + }, + Err(MempoolProtocolError::MempoolError(MempoolError::TransactionError(err))) => { + warn!(target: LOG_TARGET, "Transaction {tx_id} failed validation: {err}"); + reject.push(msg_id); + }, + Err(err) => { + warn!(target: LOG_TARGET, "Mempool protocol error: {err}"); + ignore.push(msg_id); + }, + } + }, + None => { + debug!( + target: LOG_TARGET, + "All transaction(s) (count={}) received from peer `{}`. ", + num_recv, + self.peer_id, + ); + break; + }, + } + + if num_recv > num_requested { + warn!( + target: LOG_TARGET, + "Peer sent more than the requested amount of transaction (num_recv={}) `{}`. ", + num_recv, + self.peer_id, + ); + break; + } + } + + if !set.is_empty() { + warn!(target: LOG_TARGET, "Not all transactions returned ({} received, {} remaining, {} requested). Ignoring all remaining", num_recv, set.len(), num_requested); + ignore.extend(set.into_values()); + } + + #[allow(clippy::cast_possible_truncation)] + #[allow(clippy::cast_possible_wrap)] + #[cfg(feature = "metrics")] + { + match self.mempool.stats().await { + Ok(stats) => { + metrics::unconfirmed_pool_size().set(stats.unconfirmed_txs as i64); + metrics::reorg_pool_size().set(stats.reorg_txs as i64); + }, + Err(err) => { + warn!(target: LOG_TARGET, "mempool.stats() call failed when collecting metrics: {err}"); + }, + } + } + + RequestedTransactionProgress { accept, ignore, reject } + } + + pub async fn start_initiator_sync(mut self) -> Result<(), MempoolProtocolError> { + match self.start_initiator_inner().await { + Ok(_) => { + debug!(target: LOG_TARGET, "Initiator protocol complete"); + Ok(()) + }, + Err(err) => { + if let Err(err) = self.framed.flush().await { + debug!(target: LOG_TARGET, "IO error when flushing stream: {}", err); + } + if let Err(err) = self.framed.close().await { + debug!(target: LOG_TARGET, "IO error when closing stream: {}", err); + } + Err(err) + }, + } + } + + async fn start_initiator_inner(&mut self) -> Result<(), MempoolProtocolError> { + debug!( + target: LOG_TARGET, + "Starting initiator mempool sync for peer `{}`", + self.peer_id, + ); + + let transactions = self.mempool.snapshot().await?; + let items = transactions + .iter() + .take(self.config.initial_sync_max_transactions) + .filter_map(|txn| txn.first_kernel_excess_sig()) + .map(|excess| excess.get_signature().to_vec()) + .collect(); + let inventory = proto::TransactionInventory { items }; + + // Send an inventory of items currently in this node's mempool + debug!( + target: LOG_TARGET, + "Sending transaction inventory containing {} item(s) to peer `{}`", + inventory.items.len(), + self.peer_id, + ); + + self.write_message(MempoolSyncRequest::from(inventory)).await?; + + self.read_and_insert_transactions_until_complete().await?; + + let missing_items: proto::InventoryIndexes = self.read_message().await?; + debug!( + target: LOG_TARGET, + "Received {} missing transaction index(es) from peer `{}`", + missing_items.indexes.len(), + self.peer_id, + ); + let missing_txns = missing_items + .indexes + .iter() + .filter_map(|idx| transactions.get(*idx as usize).cloned()) + .collect::>(); + debug!( + target: LOG_TARGET, + "Sending {} missing transaction(s) to peer `{}`", + missing_items.indexes.len(), + self.peer_id, + ); + + // If we don't have any transactions at the given indexes we still need to send back an empty if they requested + // at least one index + if !missing_items.indexes.is_empty() { + self.write_transactions(missing_txns).await?; + } + + // Close the stream after writing + self.framed.close().await?; + + Ok(()) + } + + pub async fn start_responder(&mut self) -> Result<(), MempoolProtocolError> { + match self.start_responder_inner().await { + Ok(_) => { + debug!(target: LOG_TARGET, "Responder protocol complete"); + Ok(()) + }, + Err(err) => { + if let Err(err) = self.framed.flush().await { + debug!(target: LOG_TARGET, "IO error when flushing stream: {}", err); + } + if let Err(err) = self.framed.close().await { + debug!(target: LOG_TARGET, "IO error when closing stream: {}", err); + } + Err(err) + }, + } + } + + async fn start_responder_inner(&mut self) -> Result<(), MempoolProtocolError> { + debug!( + target: LOG_TARGET, + "Starting responder mempool sync for peer `{}`", + self.peer_id, + ); + + let proto::MempoolSyncRequest { request: Some(request) } = self.read_message().await? else { + return Err(MempoolProtocolError::InvalidRequest { + details: format!("Peer {} sent empty request", self.peer_id), + }); + }; + + match request { + proto::mempool_sync_request::Request::Inventory(inv) => self.respond_to_inventory_request(inv).await, + proto::mempool_sync_request::Request::Specific(req) => self.respond_to_specific_request(req).await, + } + } + + async fn respond_to_specific_request( + &mut self, + request: proto::RequestSpecificTransactions, + ) -> Result<(), MempoolProtocolError> { + if request.ids.len() > self.config.max_request_transactions { + return Err(MempoolProtocolError::InvalidRequest { + details: format!( + "Peer {} requested {} transactions (max: {})", + self.peer_id, + request.ids.len(), + self.config.max_request_transactions + ), + }); + } + + let requested_index = request.ids.iter().map(|s| s.as_bytes()).collect::>(); + + let snapshot = self.mempool.snapshot().await?; + for transaction in snapshot { + let Some(excess_sig) = transaction.first_kernel_excess_sig() else { + continue; + }; + + if requested_index.contains(excess_sig.get_signature().as_bytes()) { + match shared_proto::common::Transaction::try_from(&*transaction) { + Ok(txn) => { + self.write_message(proto::TransactionItem { transaction: Some(txn) }) + .await?; + }, + Err(e) => { + warn!(target: LOG_TARGET, "Could not convert transaction: {}", e); + }, + } + } + } + + // Empty message to indicate we are done + self.write_message(proto::TransactionItem { transaction: None }).await?; + if let Err(err) = self.framed.flush().await { + debug!(target: LOG_TARGET, "IO error when flushing stream: {}", err); + } + + debug!(target: LOG_TARGET, "Done responding to specific transaction request from peer {}", self.peer_id); + Ok(()) + } + + async fn respond_to_inventory_request( + &mut self, + inventory: proto::TransactionInventory, + ) -> Result<(), MempoolProtocolError> { + debug!( + target: LOG_TARGET, + "Received inventory from peer `{}` containing {} item(s)", + self.peer_id, + inventory.items.len() + ); + + let transactions = self.mempool.snapshot().await?; + let inventory_index = inventory + .items + .iter() + .enumerate() + .take(self.config.initial_sync_max_transactions) + .map(|(idx, s)| (s.as_bytes(), idx)) + .collect::>(); + + let mut duplicate_inventory_items = Vec::new(); + let (transactions, _) = transactions.into_iter().partition::, _>(|transaction| { + let Some(excess_sig) = transaction.first_kernel_excess_sig() else { + return false; + }; + + match inventory_index.get(excess_sig.get_signature().as_bytes()) { + Some(pos) => { + duplicate_inventory_items.push(*pos); + false + }, + None => true, + } + }); + + debug!( + target: LOG_TARGET, + "Streaming {} transaction(s) to peer `{}`", + transactions.len(), + self.peer_id, + ); + + self.write_transactions(transactions).await?; + + // Generate an index list of inventory indexes that this node does not have + #[allow(clippy::cast_possible_truncation)] + let missing_items = inventory + .items + .into_iter() + .enumerate() + .filter_map(|(i, _)| { + if duplicate_inventory_items.contains(&i) { + None + } else { + Some(i as u32) + } + }) + .collect::>(); + debug!( + target: LOG_TARGET, + "Requesting {} missing transaction index(es) from peer `{}`", + missing_items.len(), + self.peer_id, + ); + + let missing_items = proto::InventoryIndexes { indexes: missing_items }; + let num_missing_items = missing_items.indexes.len(); + self.write_message(missing_items).await?; + + if num_missing_items > 0 { + debug!(target: LOG_TARGET, "Waiting for missing transactions"); + self.read_and_insert_transactions_until_complete().await?; + } + + Ok(()) + } + + async fn read_and_insert_transactions_until_complete(&mut self) -> Result<(), MempoolProtocolError> { + let mut num_recv = 0; + while let Some(result) = self.framed.next().await { + let bytes = result?; + let item = proto::TransactionItem::decode(&mut bytes.freeze()).map_err(|err| { + MempoolProtocolError::DecodeFailed { + source: err, + peer: self.peer_id, + } + })?; + + match item.transaction { + Some(txn) => { + self.validate_and_insert_transaction(txn).await?; + num_recv += 1; + }, + None => { + debug!( + target: LOG_TARGET, + "All transaction(s) (count={}) received from peer `{}`. ", + num_recv, + self.peer_id, + ); + break; + }, + } + } + + #[allow(clippy::cast_possible_truncation)] + #[allow(clippy::cast_possible_wrap)] + #[cfg(feature = "metrics")] + { + let stats = self.mempool.stats().await?; + metrics::unconfirmed_pool_size().set(stats.unconfirmed_txs as i64); + metrics::reorg_pool_size().set(stats.reorg_txs as i64); + } + + Ok(()) + } + + async fn validate_and_insert_transaction( + &mut self, + txn: shared_proto::common::Transaction, + ) -> Result { + let txn = Transaction::try_from(txn).map_err(|err| MempoolProtocolError::MessageConversionFailed { + peer: self.peer_id, + message: err, + })?; + let excess_sig = txn + .first_kernel_excess_sig() + .ok_or_else(|| MempoolProtocolError::ExcessSignatureMissing(self.peer_id))?; + let excess_sig_hex = excess_sig.get_signature().to_hex(); + + debug!( + target: LOG_TARGET, + "validate_and_insert_transaction: Received transaction `{}` from peer `{}`", + excess_sig_hex, + self.peer_id, + ); + let txn = Arc::new(txn); + let stored_result = self.mempool.has_transaction(txn.clone()).await?; + if stored_result.is_stored() { + return Ok(stored_result); + } + + let stored_result = self.mempool.insert(txn).await?; + if stored_result.is_stored() { + #[cfg(feature = "metrics")] + metrics::inbound_transactions().inc(); + debug!( + target: LOG_TARGET, + "Inserted transaction `{}` from peer `{}`", + excess_sig_hex, + self.peer_id, + ); + } else { + #[cfg(feature = "metrics")] + metrics::rejected_inbound_transactions().inc(); + debug!( + target: LOG_TARGET, + "Did not store new transaction `{}` in mempool: {}", excess_sig_hex, stored_result + ); + } + + Ok(stored_result) + } + + async fn write_transactions(&mut self, transactions: Vec>) -> Result<(), MempoolProtocolError> { + let txns = transactions.into_iter().take(self.config.initial_sync_max_transactions) + .filter_map(|txn| { + match shared_proto::common::Transaction::try_from(&*txn) { + Ok(txn) => Some(proto::TransactionItem { + transaction: Some(txn), + }), + Err(e) => { + warn!(target: LOG_TARGET, "Could not convert transaction: {}", e); + None + } + } + }) + // Write an empty `TransactionItem` to indicate we're done + .chain(iter::once(proto::TransactionItem::empty())); + + self.write_messages(stream::iter(txns)).await?; + + Ok(()) + } + + async fn read_message(&mut self) -> Result { + let msg = time::timeout(Duration::from_secs(10), self.framed.next()) + .await + .map_err(|_| MempoolProtocolError::RecvTimeout)? + .ok_or_else(|| MempoolProtocolError::SubstreamClosed(self.peer_id))??; + + T::decode(&mut msg.freeze()).map_err(|err| MempoolProtocolError::DecodeFailed { + source: err, + peer: self.peer_id, + }) + } + + async fn write_messages(&mut self, stream: S) -> Result<(), MempoolProtocolError> + where + S: Stream + Unpin, + T: prost::Message, + { + let mut s = stream.map(|m| Bytes::from(m.encode_to_vec())).map(Ok); + self.framed.send_all(&mut s).await?; + Ok(()) + } + + async fn write_message(&mut self, message: T) -> Result<(), MempoolProtocolError> { + time::timeout( + Duration::from_secs(10), + self.framed.send(message.encode_to_vec().into()), + ) + .await + .map_err(|_| MempoolProtocolError::SendTimeout)??; + Ok(()) + } +} + +fn extract_transaction_id(msg: &shared_proto::common::Transaction) -> Option { + msg.body + .as_ref() + .and_then(|b| b.kernels.first()) + .and_then(|k| k.excess_sig.as_ref()) + .and_then(|s| MempoolTransactionId::try_from(s.signature.as_slice()).ok()) +} + +#[derive(Default)] +pub(super) struct RequestedTransactionProgress { + pub accept: Vec, + pub reject: Vec, + pub ignore: Vec, +} + +impl RequestedTransactionProgress { + pub fn ignore_many>(ignore: I) -> Self { + Self { + ignore: ignore.into_iter().collect(), + ..Default::default() + } + } +} diff --git a/base_layer/core/src/mempool/transaction_id.rs b/base_layer/core/src/mempool/transaction_id.rs new file mode 100644 index 0000000000..1260b3fc45 --- /dev/null +++ b/base_layer/core/src/mempool/transaction_id.rs @@ -0,0 +1,49 @@ +// Copyright 2024 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +use std::fmt::Display; + +use tari_common_types::types::Signature; +use tari_utilities::ByteArray; + +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +pub struct MempoolTransactionId([u8; MempoolTransactionId::byte_len()]); + +impl MempoolTransactionId { + pub const fn byte_len() -> usize { + 32 + } + + pub fn as_bytes(&self) -> &[u8] { + &self.0 + } +} + +impl TryFrom<&[u8]> for MempoolTransactionId { + type Error = (); + + fn try_from(value: &[u8]) -> Result { + if value.len() != Self::byte_len() { + return Err(()); + } + let mut id = [0u8; Self::byte_len()]; + id.copy_from_slice(value); + Ok(Self(id)) + } +} + +impl From<&Signature> for MempoolTransactionId { + fn from(sig: &Signature) -> Self { + Self::try_from(sig.get_signature().as_bytes()) + .expect("From for MempoolTransactionId: Signature bytes expected to be 32") + } +} + +impl Display for MempoolTransactionId { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + for byte in self.as_bytes() { + write!(fmt, "{:02x}", byte)?; + } + Ok(()) + } +} diff --git a/base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs b/base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs index ed4e8fae20..5ae61a0c3b 100644 --- a/base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs +++ b/base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs @@ -27,7 +27,8 @@ use std::{ use log::*; use serde::{Deserialize, Serialize}; -use tari_common_types::types::{FixedHash, HashOutput, PrivateKey, Signature}; +use tari_common_types::types::{FixedHash, HashOutput, PrivateKey}; +use tari_crypto::ristretto::RistrettoSecretKey; use tokio::time::Instant; use crate::{ @@ -178,8 +179,8 @@ impl UnconfirmedPool { } /// Check if a transaction is available in the UnconfirmedPool - pub fn has_tx_with_excess_sig(&self, excess_sig: &Signature) -> bool { - self.txs_by_signature.contains_key(excess_sig.get_signature()) + pub fn has_tx_with_excess_sig(&self, excess_sig: &RistrettoSecretKey) -> bool { + self.txs_by_signature.contains_key(excess_sig) } /// Returns a set of the highest priority unconfirmed transactions, that can be included in a block @@ -953,11 +954,11 @@ mod test { ) .expect("Failed to insert many"); // Check that lowest priority tx was removed to make room for new incoming transactions - assert!(unconfirmed_pool.has_tx_with_excess_sig(&tx1.body.kernels()[0].excess_sig)); - assert!(!unconfirmed_pool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig)); - assert!(unconfirmed_pool.has_tx_with_excess_sig(&tx3.body.kernels()[0].excess_sig)); - assert!(unconfirmed_pool.has_tx_with_excess_sig(&tx4.body.kernels()[0].excess_sig)); - assert!(unconfirmed_pool.has_tx_with_excess_sig(&tx5.body.kernels()[0].excess_sig)); + assert!(unconfirmed_pool.has_tx_with_excess_sig(tx1.body.kernels()[0].excess_sig.get_signature())); + assert!(!unconfirmed_pool.has_tx_with_excess_sig(tx2.body.kernels()[0].excess_sig.get_signature())); + assert!(unconfirmed_pool.has_tx_with_excess_sig(tx3.body.kernels()[0].excess_sig.get_signature())); + assert!(unconfirmed_pool.has_tx_with_excess_sig(tx4.body.kernels()[0].excess_sig.get_signature())); + assert!(unconfirmed_pool.has_tx_with_excess_sig(tx5.body.kernels()[0].excess_sig.get_signature())); // Retrieve the set of highest priority unspent transactions let desired_weight = tx1.calculate_weight(&tx_weight).expect("Failed to get tx") + tx3.calculate_weight(&tx_weight).expect("Failed to get tx") + @@ -1123,12 +1124,12 @@ mod test { let published_block = create_orphan_block(0, vec![(*tx1).clone(), (*tx3).clone(), (*tx5).clone()], &consensus); let _result = unconfirmed_pool.remove_published_and_discard_deprecated_transactions(&published_block); - assert!(!unconfirmed_pool.has_tx_with_excess_sig(&tx1.body.kernels()[0].excess_sig),); - assert!(unconfirmed_pool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig),); - assert!(!unconfirmed_pool.has_tx_with_excess_sig(&tx3.body.kernels()[0].excess_sig),); - assert!(unconfirmed_pool.has_tx_with_excess_sig(&tx4.body.kernels()[0].excess_sig),); - assert!(!unconfirmed_pool.has_tx_with_excess_sig(&tx5.body.kernels()[0].excess_sig),); - assert!(!unconfirmed_pool.has_tx_with_excess_sig(&tx6.body.kernels()[0].excess_sig),); + assert!(!unconfirmed_pool.has_tx_with_excess_sig(tx1.body.kernels()[0].excess_sig.get_signature()),); + assert!(unconfirmed_pool.has_tx_with_excess_sig(tx2.body.kernels()[0].excess_sig.get_signature()),); + assert!(!unconfirmed_pool.has_tx_with_excess_sig(tx3.body.kernels()[0].excess_sig.get_signature()),); + assert!(unconfirmed_pool.has_tx_with_excess_sig(tx4.body.kernels()[0].excess_sig.get_signature()),); + assert!(!unconfirmed_pool.has_tx_with_excess_sig(tx5.body.kernels()[0].excess_sig.get_signature()),); + assert!(!unconfirmed_pool.has_tx_with_excess_sig(tx6.body.kernels()[0].excess_sig.get_signature()),); assert!(unconfirmed_pool.check_data_consistency()); } @@ -1198,12 +1199,12 @@ mod test { let _result = unconfirmed_pool.remove_published_and_discard_deprecated_transactions(&published_block); // Double spends are discarded - assert!(!unconfirmed_pool.has_tx_with_excess_sig(&tx1.body.kernels()[0].excess_sig)); - assert!(!unconfirmed_pool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig)); - assert!(!unconfirmed_pool.has_tx_with_excess_sig(&tx3.body.kernels()[0].excess_sig)); - assert!(unconfirmed_pool.has_tx_with_excess_sig(&tx4.body.kernels()[0].excess_sig)); - assert!(!unconfirmed_pool.has_tx_with_excess_sig(&tx5.body.kernels()[0].excess_sig)); - assert!(!unconfirmed_pool.has_tx_with_excess_sig(&tx6.body.kernels()[0].excess_sig)); + assert!(!unconfirmed_pool.has_tx_with_excess_sig(tx1.body.kernels()[0].excess_sig.get_signature())); + assert!(!unconfirmed_pool.has_tx_with_excess_sig(tx2.body.kernels()[0].excess_sig.get_signature())); + assert!(!unconfirmed_pool.has_tx_with_excess_sig(tx3.body.kernels()[0].excess_sig.get_signature())); + assert!(unconfirmed_pool.has_tx_with_excess_sig(tx4.body.kernels()[0].excess_sig.get_signature())); + assert!(!unconfirmed_pool.has_tx_with_excess_sig(tx5.body.kernels()[0].excess_sig.get_signature())); + assert!(!unconfirmed_pool.has_tx_with_excess_sig(tx6.body.kernels()[0].excess_sig.get_signature())); assert!(unconfirmed_pool.check_data_consistency()); } diff --git a/base_layer/p2p/proto/base_node/mempool/gossip.proto b/base_layer/p2p/proto/base_node/mempool/gossip.proto new file mode 100644 index 0000000000..e95f823ead --- /dev/null +++ b/base_layer/p2p/proto/base_node/mempool/gossip.proto @@ -0,0 +1,15 @@ +// Copyright 2024 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +syntax = "proto3"; + +package tari.mempool; + +import "common/transaction.proto"; + +message NewTransaction { + oneof payload { + bytes excess_sig = 1; + tari.common.Transaction transaction = 2; + } +} diff --git a/base_layer/p2p/proto/base_node/mempool/sync_protocol.proto b/base_layer/p2p/proto/base_node/mempool/sync_protocol.proto index 07add294a0..f6e03d7992 100644 --- a/base_layer/p2p/proto/base_node/mempool/sync_protocol.proto +++ b/base_layer/p2p/proto/base_node/mempool/sync_protocol.proto @@ -7,11 +7,23 @@ import "common/transaction.proto"; package tari.mempool; +message MempoolSyncRequest { + oneof request { + TransactionInventory inventory = 1; + RequestSpecificTransactions specific = 2; + } +} + message TransactionInventory { // A list of kernel excess sigs used to identify transactions repeated bytes items = 1; } +message RequestSpecificTransactions { + // A list of kernel excess sigs used to identify transactions + repeated bytes ids = 1; +} + message TransactionItem { tari.common.Transaction transaction = 1; } diff --git a/base_layer/p2p/src/proto/mempool_impl.rs b/base_layer/p2p/src/proto/mempool_impl.rs new file mode 100644 index 0000000000..71afe2b5ff --- /dev/null +++ b/base_layer/p2p/src/proto/mempool_impl.rs @@ -0,0 +1,20 @@ +// Copyright 2024 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +use crate::proto::{common, mempool, mempool::NewTransaction}; + +impl From> for NewTransaction { + fn from(tx_bytes: Vec) -> Self { + Self { + payload: Some(mempool::new_transaction::Payload::ExcessSig(tx_bytes)), + } + } +} + +impl From for NewTransaction { + fn from(tx: common::Transaction) -> Self { + Self { + payload: Some(mempool::new_transaction::Payload::Transaction(tx)), + } + } +} diff --git a/base_layer/p2p/src/proto/mod.rs b/base_layer/p2p/src/proto/mod.rs index a2f03f459f..44d2f25096 100644 --- a/base_layer/p2p/src/proto/mod.rs +++ b/base_layer/p2p/src/proto/mod.rs @@ -54,6 +54,7 @@ pub mod mempool { } mod chain_metadata; +mod mempool_impl; mod sync_protocol; mod transaction_sender; mod types_impls; diff --git a/base_layer/p2p/src/proto/sync_protocol.rs b/base_layer/p2p/src/proto/sync_protocol.rs index 38893687a6..427d302d0f 100644 --- a/base_layer/p2p/src/proto/sync_protocol.rs +++ b/base_layer/p2p/src/proto/sync_protocol.rs @@ -20,8 +20,29 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -impl super::mempool::TransactionItem { +use crate::{ + proto, + proto::mempool::{MempoolSyncRequest, RequestSpecificTransactions, TransactionInventory}, +}; + +impl proto::mempool::TransactionItem { pub fn empty() -> Self { Self { transaction: None } } } + +impl From for MempoolSyncRequest { + fn from(value: RequestSpecificTransactions) -> Self { + Self { + request: Some(proto::mempool::mempool_sync_request::Request::Specific(value)), + } + } +} + +impl From for MempoolSyncRequest { + fn from(value: TransactionInventory) -> Self { + Self { + request: Some(proto::mempool::mempool_sync_request::Request::Inventory(value)), + } + } +} diff --git a/network/rpc_framework/src/server/mod.rs b/network/rpc_framework/src/server/mod.rs index 242d22250d..d9639f23c9 100644 --- a/network/rpc_framework/src/server/mod.rs +++ b/network/rpc_framework/src/server/mod.rs @@ -593,7 +593,7 @@ where TSvc: Service, Response = Response, Error = RpcStatus }, Err(err) => { if let Err(err) = self.framed.close().await { - error!( + debug!( target: LOG_TARGET, "({}) Failed to close substream after socket error: {}", self.logging_context_string, err ); @@ -929,6 +929,7 @@ fn err_to_log_level(err: &io::Error) -> log::Level { ErrorKind::BrokenPipe | ErrorKind::WriteZero | ErrorKind::UnexpectedEof | + ErrorKind::Other | ErrorKind::Interrupted => log::Level::Debug, _ => log::Level::Error, }