Skip to content

Commit

Permalink
Merge branch 'development' into sw_fix_unwrap
Browse files Browse the repository at this point in the history
  • Loading branch information
SWvheerden authored Oct 11, 2023
2 parents 0785f4c + f4f3845 commit 0eaf5b3
Show file tree
Hide file tree
Showing 15 changed files with 13 additions and 77 deletions.
2 changes: 0 additions & 2 deletions base_layer/core/src/base_node/sync/block_sync/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use num_format::{Locale, ToFormattedString};
use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeId, protocol::rpc::RpcClient, PeerConnection};
use tari_utilities::hex::Hex;
use tokio::task;
use tracing;

use super::error::BlockSyncError;
use crate::{
Expand Down Expand Up @@ -97,7 +96,6 @@ impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> {
self.hooks.add_on_complete_hook(hook);
}

#[tracing::instrument(skip(self), err)]
pub async fn synchronize(&mut self) -> Result<(), BlockSyncError> {
let mut max_latency = self.config.initial_max_sync_latency;
let mut sync_round = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use tari_comms::{
PeerConnection,
};
use tari_utilities::hex::Hex;
use tracing;

use super::{validator::BlockHeaderSyncValidator, BlockHeaderSyncError};
use crate::{
Expand Down Expand Up @@ -232,7 +231,6 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
Ok(conn)
}

#[tracing::instrument(skip(self, client), err)]
async fn attempt_sync(
&mut self,
sync_peer: &SyncPeer,
Expand Down
1 change: 0 additions & 1 deletion comms/core/src/connection_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,6 @@ where
let _result = self.connection_manager_events_tx.send(Arc::new(event));
}

#[tracing::instrument(level = "trace", skip(self, reply))]
async fn dial_peer(
&mut self,
node_id: NodeId,
Expand Down
6 changes: 0 additions & 6 deletions comms/core/src/connection_manager/peer_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ impl PeerConnection {
Arc::strong_count(&self.handle_counter)
}

#[tracing::instrument(level = "trace", "peer_connection::open_substream", skip(self))]
pub async fn open_substream(
&mut self,
protocol_id: &ProtocolId,
Expand All @@ -228,7 +227,6 @@ impl PeerConnection {
.map_err(|_| PeerConnectionError::InternalReplyCancelled)?
}

#[tracing::instrument(level = "trace", "peer_connection::open_framed_substream", skip(self))]
pub async fn open_framed_substream(
&mut self,
protocol_id: &ProtocolId,
Expand All @@ -239,14 +237,12 @@ impl PeerConnection {
}

#[cfg(feature = "rpc")]
#[tracing::instrument("peer_connection::connect_rpc", level="trace", skip(self), fields(peer_node_id = self.peer_node_id.to_string().as_str()))]
pub async fn connect_rpc<T>(&mut self) -> Result<T, RpcError>
where T: From<RpcClient> + NamedProtocolService {
self.connect_rpc_using_builder(Default::default()).await
}

#[cfg(feature = "rpc")]
#[tracing::instrument("peer_connection::connect_rpc_with_builder", level = "trace", skip(self, builder))]
pub async fn connect_rpc_using_builder<T>(&mut self, builder: RpcClientBuilder<T>) -> Result<T, RpcError>
where T: From<RpcClient> + NamedProtocolService {
let protocol = ProtocolId::from_static(T::PROTOCOL_NAME);
Expand Down Expand Up @@ -431,7 +427,6 @@ impl PeerConnectionActor {
}
}

#[tracing::instrument(level="trace", skip(self, stream),fields(comms.direction="inbound"))]
async fn handle_incoming_substream(&mut self, mut stream: Substream) {
let our_supported_protocols = self.our_supported_protocols.clone();
self.inbound_protocol_negotiations.push(Box::pin(async move {
Expand Down Expand Up @@ -487,7 +482,6 @@ impl PeerConnectionActor {
}
}

#[tracing::instrument(skip(self))]
async fn open_negotiated_protocol_stream(
&mut self,
protocol: ProtocolId,
Expand Down
2 changes: 0 additions & 2 deletions comms/core/src/connection_manager/requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ impl ConnectionManagerRequester {
}

/// Attempt to connect to a remote peer
#[tracing::instrument(level = "trace", skip(self))]
pub async fn dial_peer(&mut self, node_id: NodeId) -> Result<PeerConnection, ConnectionManagerError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.send_dial_peer(node_id, Some(reply_tx)).await?;
Expand All @@ -94,7 +93,6 @@ impl ConnectionManagerRequester {
}

/// Send instruction to ConnectionManager to dial a peer and return the result on the given oneshot
#[tracing::instrument(level = "trace", skip(self, reply_tx))]
pub(crate) async fn send_dial_peer(
&mut self,
node_id: NodeId,
Expand Down
1 change: 0 additions & 1 deletion comms/core/src/connectivity/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ impl ConnectivityManagerActor {
})
}

#[tracing::instrument(level = "trace", name = "connectivity_manager_actor::run", skip(self))]
pub async fn run(mut self) {
debug!(target: LOG_TARGET, "ConnectivityManager started");

Expand Down
4 changes: 0 additions & 4 deletions comms/core/src/connectivity/requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use tokio::{
sync::{broadcast, broadcast::error::RecvError, mpsc, oneshot},
time,
};
use tracing;

use super::{
connection_pool::PeerConnectionState,
Expand Down Expand Up @@ -128,7 +127,6 @@ impl ConnectivityRequester {
}

/// Dial a single peer
#[tracing::instrument(level = "trace", skip(self))]
pub async fn dial_peer(&self, peer: NodeId) -> Result<PeerConnection, ConnectivityError> {
let mut num_cancels = 0;
loop {
Expand Down Expand Up @@ -159,7 +157,6 @@ impl ConnectivityRequester {

/// Dial many peers, returning a Stream that emits the dial Result as each dial completes.
#[allow(clippy::let_with_type_underscore)]
#[tracing::instrument(level = "trace", skip(self, peers))]
pub fn dial_many_peers<I: IntoIterator<Item = NodeId>>(
&self,
peers: I,
Expand All @@ -171,7 +168,6 @@ impl ConnectivityRequester {
}

/// Send a request to dial many peers without waiting for the response.
#[tracing::instrument(level = "trace", skip(self, peers))]
pub async fn request_many_dials<I: IntoIterator<Item = NodeId>>(&self, peers: I) -> Result<(), ConnectivityError> {
future::join_all(peers.into_iter().map(|peer| {
self.sender.send(ConnectivityRequest::DialPeer {
Expand Down
10 changes: 1 addition & 9 deletions comms/core/src/multiplexing/yamux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tokio::{
sync::mpsc,
};
use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt};
use tracing::{self, debug, error, event, Level};
use tracing::{self, debug, error};
// Reexport
pub use yamux::ConnectionError;
use yamux::Mode;
Expand Down Expand Up @@ -252,7 +252,6 @@ where TSocket: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static
Self { connection, sender }
}

#[tracing::instrument(name = "yamux::incoming_worker::run", skip(self), fields(connection = %self.connection))]
pub async fn run(mut self) {
loop {
tokio::select! {
Expand All @@ -264,7 +263,6 @@ where TSocket: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static
result = self.connection.next_stream() => {
match result {
Ok(Some(stream)) => {
event!(Level::TRACE, "yamux::incoming_worker::new_stream {}", stream);
if self.sender.send(stream).await.is_err() {
debug!(
target: LOG_TARGET,
Expand All @@ -284,12 +282,6 @@ where TSocket: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static
break;
}
Err(err) => {
event!(
Level::ERROR,
"{} Incoming peer substream task received an error because '{}'",
self.connection,
err
);
error!(
target: LOG_TARGET,
"{} Incoming peer substream task received an error because '{}'",
Expand Down
1 change: 0 additions & 1 deletion comms/core/src/noise/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ impl NoiseConfig {

/// Upgrades the given socket to using the noise protocol. The upgraded socket and the peer's static key
/// is returned.
#[tracing::instrument(name = "noise::upgrade_socket", skip(self, socket))]
pub async fn upgrade_socket<TSocket>(
&self,
socket: TSocket,
Expand Down
16 changes: 1 addition & 15 deletions comms/core/src/protocol/messaging/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::time::Instant;

use futures::{future, SinkExt, StreamExt};
use tokio::{pin, sync::mpsc};
use tracing::{debug, error, event, span, Instrument, Level};
use tracing::{debug, error, span, Instrument, Level};

use super::{error::MessagingProtocolError, metrics, MessagingEvent, MessagingProtocol, SendFailReason};
use crate::{
Expand Down Expand Up @@ -88,12 +88,6 @@ impl OutboundMessaging {
let messaging_events_tx = self.messaging_events_tx.clone();
match self.run_inner().await {
Ok(_) => {
event!(
Level::DEBUG,
"Outbound messaging for peer '{}' has stopped because the stream was closed",
peer_node_id
);

debug!(
target: LOG_TARGET,
"Outbound messaging for peer '{}' has stopped because the stream was closed", peer_node_id
Expand Down Expand Up @@ -140,7 +134,6 @@ impl OutboundMessaging {
let (conn, substream) = loop {
match self.try_establish().await {
Ok(conn_and_substream) => {
event!(Level::DEBUG, "Substream established");
break conn_and_substream;
},
Err(err) => {
Expand Down Expand Up @@ -271,13 +264,6 @@ impl OutboundMessaging {
let outbound_count = metrics::outbound_message_count(&peer_node_id);
let stream = outbound_stream.map(|mut out_msg| {
outbound_count.inc();
event!(
Level::DEBUG,
"Message for peer '{}' sending {} on stream {}",
peer_node_id,
out_msg,
stream_id
);
debug!(
target: LOG_TARGET,
"Message for peer '{}' sending {} on stream {}", peer_node_id, out_msg, stream_id
Expand Down
1 change: 0 additions & 1 deletion comms/core/src/protocol/messaging/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ impl MessagingProtocol {
Ok(())
}

// #[tracing::instrument(skip(self, out_msg), err)]
fn send_message(&mut self, out_msg: OutboundMessage) -> Result<(), MessagingProtocolError> {
trace!(target: LOG_TARGET, "Received request to send message ({})", out_msg);
let peer_node_id = out_msg.peer_node_id.clone();
Expand Down
17 changes: 3 additions & 14 deletions comms/core/src/protocol/rpc/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use tokio::{
time,
};
use tower::{Service, ServiceExt};
use tracing::{event, span, Instrument, Level};
use tracing::{span, Instrument, Level};

use super::message::RpcMethod;
use crate::{
Expand Down Expand Up @@ -440,7 +440,6 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
self.framed.stream_id()
}

#[tracing::instrument(level="trace", name = "rpc_client_worker run", skip(self), fields(next_request_id = self.next_request_id))]
async fn run(mut self) {
debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -479,6 +478,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
metrics::num_sessions(&self.node_id, &self.protocol_id).inc();
loop {
tokio::select! {
// Check the futures in the order they are listed
biased;
_ = &mut self.shutdown_signal => {
break;
Expand Down Expand Up @@ -600,7 +600,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
Ok(())
}

#[tracing::instrument(level="trace", name = "rpc_do_request_response", skip(self, reply, request), fields(request_method = ?request.method, request_body_size = request.message.len()))]
#[allow(clippy::too_many_lines)]
async fn do_request_response(
&mut self,
request: BaseRequest<Bytes>,
Expand All @@ -621,7 +621,6 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
trace!(target: LOG_TARGET, "Sending request: {}", req);

if reply.is_closed() {
event!(Level::WARN, "Client request was cancelled before request was sent");
warn!(
target: LOG_TARGET,
"Client request was cancelled before request was sent"
Expand All @@ -630,7 +629,6 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId

let (response_tx, response_rx) = mpsc::channel(5);
if let Err(mut rx) = reply.send(response_rx) {
event!(Level::WARN, "Client request was cancelled after request was sent");
warn!(
target: LOG_TARGET,
"Client request was cancelled after request was sent. This means that you are making an RPC request \
Expand Down Expand Up @@ -690,7 +688,6 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
if let Some(t) = time_to_first_msg {
let _ = self.last_request_latency_tx.send(Some(partial_latency + t));
}
event!(Level::TRACE, "Message received");
trace!(
target: LOG_TARGET,
"Received response ({} byte(s)) from request #{} (protocol = {}, method={})",
Expand All @@ -710,7 +707,6 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
target: LOG_TARGET,
"Request {} (method={}) timed out", request_id, method,
);
event!(Level::ERROR, "Response timed out");
metrics::client_timeouts(&self.node_id, &self.protocol_id).inc();
if response_tx.is_closed() {
self.premature_close(request_id, method).await?;
Expand All @@ -728,13 +724,6 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId
break;
},
Err(err) => {
event!(
Level::WARN,
"Request {} (method={}) returned an error: {}",
request_id,
method,
err
);
return Err(err);
},
};
Expand Down
Loading

0 comments on commit 0eaf5b3

Please sign in to comment.