Skip to content

Commit

Permalink
remove unused early close
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Nov 11, 2024
1 parent c78fac2 commit ad35c63
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 154 deletions.
47 changes: 20 additions & 27 deletions base_layer/core/src/mempool/service/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,34 +96,27 @@ impl MempoolInboundHandlers {
let tx = Arc::new(tx);
let storage = self.insert_transaction(tx.clone()).await?;
if storage.is_stored() {
let mut transaction_too_large_to_gossip = true;
// TODO: determine more precisely the maximum size of each transaction element
if tx.body.outputs().len() + tx.body.inputs().len() < 4 && tx.body().kernels().len() < 4 {
let msg =
proto::common::Transaction::try_from(&*tx).map_err(MempoolServiceError::ConversionError)?;
let encoded_len = msg.encoded_len();
debug!(
target: LOG_TARGET,
"Transaction has {} input(s), {} output(s), and {} kernel(s). Encoded size = {}",
tx.body.inputs().len(),
tx.body.outputs().len(),
tx.body.kernels().len(),
encoded_len
);
if encoded_len <= MEMPOOL_TRANSACTION_FULL_PROPAGATION_THRESHOLD_BYTES {
debug!(target: LOG_TARGET, "Transaction is less than 64KiB when encoded ({encoded_len}). Gossiping full transaction.");
transaction_too_large_to_gossip = false;
// Gossip the full transaction
if let Err(err) = self.gossip_publisher.publish(msg.into()).await {
warn!(
target: LOG_TARGET,
"Error publishing transaction {}: {}.", first_tx_kernel_excess_sig.reveal(), err
);
}
let msg =
proto::common::Transaction::try_from(&*tx).map_err(MempoolServiceError::ConversionError)?;
let encoded_len = msg.encoded_len();
debug!(
target: LOG_TARGET,
"Transaction has {} input(s), {} output(s), and {} kernel(s). Encoded size = {}",
tx.body.inputs().len(),
tx.body.outputs().len(),
tx.body.kernels().len(),
encoded_len
);
if encoded_len <= MEMPOOL_TRANSACTION_FULL_PROPAGATION_THRESHOLD_BYTES {
debug!(target: LOG_TARGET, "Transaction is less than 64KiB when encoded ({encoded_len}). Gossiping full transaction.");
// Gossip the full transaction
if let Err(err) = self.gossip_publisher.publish(msg.into()).await {
warn!(
target: LOG_TARGET,
"Error publishing transaction {}: {}.", first_tx_kernel_excess_sig.reveal(), err
);
}
}

if transaction_too_large_to_gossip {
} else {
debug!(target: LOG_TARGET, "Transaction too large. Gossiping reference to the transaction.");
// Gossip a reference to the transaction
if let Err(err) = self
Expand Down
7 changes: 5 additions & 2 deletions base_layer/core/src/mempool/sync_protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,6 @@ impl MempoolSyncProtocol {
fn handle_protocol_notification(&mut self, notification: ProtocolNotification<Substream>) {
match notification.event {
ProtocolEvent::NewInboundSubstream { peer_id, substream } => {
// TODO: we need to limit the number of sessions we handle - switch to using RPC?
self.start_inbound_handler(peer_id, substream);
},
}
Expand Down Expand Up @@ -446,7 +445,11 @@ impl MempoolSyncProtocol {
}
};
if self.inbound_tasks.try_push(fut).is_err() {
warn!(target: LOG_TARGET, "Rejecting inbound task for peer {peer_id} because we've reached the max_concurrent_inbound_tasks ({})", self.config.max_concurrent_inbound_tasks);
warn!(
target: LOG_TARGET,
"Rejecting inbound task for peer {peer_id} because we've reached the max_concurrent_inbound_tasks ({})",
self.config.max_concurrent_inbound_tasks,
);
}
}
}
2 changes: 2 additions & 0 deletions lints.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,6 @@ allow = [
# `assert!(!foo(bar))` is misread the majority of the time, while `assert_eq!(foo(bar), false)` is crystal clear
'clippy::bool-assert-comparison',
'clippy::blocks_in_conditions',
# FIXME: docs expect a short first paragraph
"clippy::too_long_first_doc_paragraph",
]
118 changes: 0 additions & 118 deletions network/rpc_framework/src/server/early_close.rs

This file was deleted.

6 changes: 1 addition & 5 deletions network/rpc_framework/src/server/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@

use std::io;

use bytes::BytesMut;
use libp2p::{PeerId, StreamProtocol};
use prost::DecodeError;
use tokio::sync::oneshot;

use crate::{handshake::RpcHandshakeError, proto, server::early_close::EarlyCloseError};
use crate::{handshake::RpcHandshakeError, proto};

#[derive(Debug, thiserror::Error)]
pub enum RpcServerError {
Expand Down Expand Up @@ -57,16 +56,13 @@ pub enum RpcServerError {
ServiceCallExceededDeadline,
#[error("Stream read exceeded deadline")]
ReadStreamExceededDeadline,
#[error("Early close: {0}")]
EarlyClose(#[from] EarlyCloseError<BytesMut>),
#[error("Protocol error: {0}")]
ProtocolError(String),
}

impl RpcServerError {
pub fn io(&self) -> Option<&io::Error> {
match self {
Self::EarlyClose(e) => e.io(),
Self::Io(e) => Some(e),
_ => None,
}
Expand Down
2 changes: 0 additions & 2 deletions network/rpc_framework/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ mod metrics;
// TODO: tests
// pub mod mock;

mod early_close;
mod router;

use std::{
Expand Down Expand Up @@ -516,7 +515,6 @@ where TSvc: Service<Request<Bytes>, Response = Response<Body>, Error = RpcStatus
metrics::error_counter(&self.peer_id, &self.protocol, &err).inc();
let level = match &err {
RpcServerError::Io(e) => err_to_log_level(e),
RpcServerError::EarlyClose(e) => e.io().map(err_to_log_level).unwrap_or(log::Level::Error),
_ => log::Level::Error,
};
log!(
Expand Down

0 comments on commit ad35c63

Please sign in to comment.