Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: compile out the metrics #5944

Merged
merged 2 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion base_layer/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mempool_proto = []
base_node = ["tari_mmr", "transactions", "mempool_proto", "base_node_proto", "monero", "randomx-rs"]
base_node_proto = []
benches = ["base_node"]
metrics = ["tari_metrics"]

[dependencies]
tari_common = { path = "../../common" }
Expand All @@ -24,7 +25,7 @@ tari_comms = { path = "../../comms/core" }
tari_comms_dht = { path = "../../comms/dht" }
tari_comms_rpc_macros = { path = "../../comms/rpc_macros" }
tari_crypto = { version = "0.19", features = ["borsh"] }
tari_metrics = { path = "../../infrastructure/metrics" }
tari_metrics = { path = "../../infrastructure/metrics", optional = true }
tari_mmr = { path = "../../base_layer/mmr", optional = true, features = ["native_bitmap"] }
tari_p2p = { path = "../../base_layer/p2p" }
tari_script = { path = "../../infrastructure/tari_script" }
Expand Down
62 changes: 42 additions & 20 deletions base_layer/core/src/base_node/comms_interface/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

#[cfg(feature = "metrics")]
use std::convert::{TryFrom, TryInto};
use std::{
cmp::max,
collections::HashSet,
convert::{TryFrom, TryInto},
sync::Arc,
time::{Duration, Instant},
};
Expand All @@ -35,17 +36,16 @@ use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeId};
use tari_utilities::hex::Hex;
use tokio::sync::RwLock;

#[cfg(feature = "metrics")]
use crate::base_node::metrics;
use crate::{
base_node::{
comms_interface::{
error::CommsInterfaceError,
local_interface::BlockEventSender,
FetchMempoolTransactionsResponse,
NodeCommsRequest,
NodeCommsResponse,
OutboundNodeCommsInterface,
},
metrics,
base_node::comms_interface::{
error::CommsInterfaceError,
local_interface::BlockEventSender,
FetchMempoolTransactionsResponse,
NodeCommsRequest,
NodeCommsResponse,
OutboundNodeCommsInterface,
},
blocks::{Block, BlockBuilder, BlockHeader, BlockHeaderValidationError, ChainBlock, NewBlock, NewBlockTemplate},
chain_storage::{async_db::AsyncBlockchainDb, BlockAddResult, BlockchainBackend, ChainStorageError},
Expand Down Expand Up @@ -619,6 +619,7 @@ where B: BlockchainBackend + 'static
.build();
return Ok(block);
}
#[cfg(feature = "metrics")]
metrics::compact_block_tx_misses(header.height).set(excess_sigs.len() as i64);
let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
return Ok(block);
Expand All @@ -628,6 +629,7 @@ where B: BlockchainBackend + 'static
let (known_transactions, missing_excess_sigs) = self.mempool.retrieve_by_excess_sigs(excess_sigs).await?;
let known_transactions = known_transactions.into_iter().map(|tx| (*tx).clone()).collect();

#[cfg(feature = "metrics")]
metrics::compact_block_tx_misses(header.height).set(missing_excess_sigs.len() as i64);

let mut builder = BlockBuilder::new(header.version)
Expand Down Expand Up @@ -673,6 +675,7 @@ where B: BlockchainBackend + 'static
not_found.len()
);

#[cfg(feature = "metrics")]
metrics::compact_block_full_misses(header.height).inc();
let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
return Ok(block);
Expand Down Expand Up @@ -710,6 +713,7 @@ where B: BlockchainBackend + 'static
e,
);

#[cfg(feature = "metrics")]
metrics::compact_block_mmr_mismatch(header.height).inc();
let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
return Ok(block);
Expand Down Expand Up @@ -834,8 +838,11 @@ where B: BlockchainBackend + 'static
},

Err(e @ ChainStorageError::ValidationError { .. }) => {
let block_hash = block.hash();
metrics::rejected_blocks(block.header.height, &block_hash).inc();
#[cfg(feature = "metrics")]
{
let block_hash = block.hash();
metrics::rejected_blocks(block.header.height, &block_hash).inc();
}
warn!(
target: LOG_TARGET,
"Peer {} sent an invalid block: {}",
Expand All @@ -856,14 +863,20 @@ where B: BlockchainBackend + 'static
}
},
// SECURITY: This indicates an issue in the transaction validator.
None => metrics::rejected_local_blocks(block.header.height, &block_hash).inc(),
None => {
#[cfg(feature = "metrics")]
metrics::rejected_local_blocks(block.header.height, &block_hash).inc();
debug!(target: LOG_TARGET, "There may have been an issue in the transaction validator");
},
}
self.publish_block_event(BlockEvent::AddBlockValidationFailed { block, source_peer });
Err(e.into())
},

Err(e) => {
#[cfg(feature = "metrics")]
metrics::rejected_blocks(block.header.height, &block.hash()).inc();

self.publish_block_event(BlockEvent::AddBlockErrored { block });
Err(e.into())
},
Expand Down Expand Up @@ -936,6 +949,7 @@ where B: BlockchainBackend + 'static

async fn update_block_result_metrics(&self, block_add_result: &BlockAddResult) -> Result<(), CommsInterfaceError> {
fn update_target_difficulty(block: &ChainBlock) {
#[cfg(feature = "metrics")]
match block.header().pow_algo() {
PowAlgorithm::Sha3x => {
metrics::target_difficulty_sha()
Expand All @@ -950,25 +964,33 @@ where B: BlockchainBackend + 'static

match block_add_result {
BlockAddResult::Ok(ref block) => {
#[allow(clippy::cast_possible_wrap)]
metrics::tip_height().set(block.height() as i64);
update_target_difficulty(block);
let utxo_set_size = self.blockchain_db.utxo_count().await?;
metrics::utxo_set_size().set(utxo_set_size.try_into().unwrap_or(i64::MAX));

#[cfg(feature = "metrics")]
{
#[allow(clippy::cast_possible_wrap)]
metrics::tip_height().set(block.height() as i64);
let utxo_set_size = self.blockchain_db.utxo_count().await?;
metrics::utxo_set_size().set(utxo_set_size.try_into().unwrap_or(i64::MAX));
}
},
#[allow(unused_variables)] // `removed` variable is used if metrics are compiled
BlockAddResult::ChainReorg { added, removed } => {
#[cfg(feature = "metrics")]
if let Some(fork_height) = added.last().map(|b| b.height()) {
#[allow(clippy::cast_possible_wrap)]
metrics::tip_height().set(fork_height as i64);
metrics::reorg(fork_height, added.len(), removed.len()).inc();

let utxo_set_size = self.blockchain_db.utxo_count().await?;
metrics::utxo_set_size().set(utxo_set_size.try_into().unwrap_or(i64::MAX));
}
for block in added {
update_target_difficulty(block);
}
let utxo_set_size = self.blockchain_db.utxo_count().await?;
metrics::utxo_set_size().set(utxo_set_size.try_into().unwrap_or(i64::MAX));
},
BlockAddResult::OrphanBlock => {
#[cfg(feature = "metrics")]
metrics::orphaned_blocks().inc();
},
_ => {},
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/base_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub mod chain_metadata_service;
pub mod comms_interface;
#[cfg(feature = "base_node")]
pub use comms_interface::LocalNodeCommsInterface;
#[cfg(feature = "base_node")]
#[cfg(feature = "metrics")]
mod metrics;

#[cfg(feature = "base_node")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ use std::time::Instant;

use log::*;

#[cfg(feature = "metrics")]
use crate::base_node::metrics;
use crate::{
base_node::{
comms_interface::BlockEvent,
metrics,
state_machine_service::states::{BlockSyncInfo, HorizonStateSync, StateEvent, StateInfo, StatusInfo},
sync::{BlockSynchronizer, SyncPeer},
BaseNodeStateMachine,
Expand Down Expand Up @@ -63,6 +64,7 @@ impl BlockSync {
let local_nci = shared.local_node_interface.clone();
let randomx_vm_cnt = shared.get_randomx_vm_cnt();
let randomx_vm_flags = shared.get_randomx_vm_flags();
#[cfg(feature = "metrics")]
let tip_height_metric = metrics::tip_height();
synchronizer.on_starting(move |sync_peer| {
let _result = status_event_sender.send(StatusInfo {
Expand All @@ -81,6 +83,7 @@ impl BlockSync {
BlockAddResult::Ok(block),
));

#[cfg(feature = "metrics")]
tip_height_metric.set(local_height as i64);
let _result = status_event_sender.send(StatusInfo {
bootstrapped,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ use log::*;
use tari_common_types::chain_metadata::ChainMetadata;
use tari_comms::peer_manager::NodeId;

#[cfg(feature = "metrics")]
use crate::base_node::metrics;
use crate::{
base_node::{
comms_interface::BlockEvent,
metrics,
state_machine_service::states::{BlockSyncInfo, StateEvent, StateInfo, StatusInfo},
sync::{BlockHeaderSyncError, HeaderSynchronizer, SyncPeer},
BaseNodeStateMachine,
Expand Down Expand Up @@ -146,6 +147,7 @@ impl HeaderSyncState {

let local_nci = shared.local_node_interface.clone();
synchronizer.on_rewind(move |removed| {
#[cfg(feature = "metrics")]
if let Some(fork_height) = removed.last().map(|b| b.height().saturating_sub(1)) {
metrics::tip_height().set(fork_height as i64);
metrics::reorg(fork_height, 0, removed.len()).inc();
Expand Down
7 changes: 6 additions & 1 deletion base_layer/core/src/base_node/sync/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ use tokio::{
};
use tracing::{instrument, span, Instrument, Level};

#[cfg(feature = "metrics")]
use crate::base_node::metrics;
use crate::{
base_node::{
comms_interface::{BlockEvent, BlockEvent::BlockSyncRewind},
metrics,
sync::{
header_sync::HEADER_SYNC_INITIAL_MAX_HEADERS,
rpc::{sync_utxos_task::SyncUtxosTask, BaseNodeSyncService},
Expand Down Expand Up @@ -99,6 +100,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncRpcService<B> {

let token = Arc::new(peer);
lock.push(Arc::downgrade(&token));
#[cfg(feature = "metrics")]
metrics::active_sync_peers().set(lock.len() as i64);
Ok(token)
}
Expand Down Expand Up @@ -256,6 +258,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
}
}

#[cfg(feature = "metrics")]
metrics::active_sync_peers().dec();
debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -355,6 +358,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
}
}

#[cfg(feature = "metrics")]
metrics::active_sync_peers().dec();
debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -572,6 +576,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
}
}

#[cfg(feature = "metrics")]
metrics::active_sync_peers().dec();
debug!(
target: LOG_TARGET,
Expand Down
4 changes: 3 additions & 1 deletion base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ use tari_comms::{
use tari_utilities::hex::Hex;
use tokio::{sync::mpsc, task};

#[cfg(feature = "metrics")]
use crate::base_node::metrics;
use crate::{
base_node::metrics,
blocks::BlockHeader,
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
proto::base_node::{SyncUtxosRequest, SyncUtxosResponse},
Expand Down Expand Up @@ -106,6 +107,7 @@ where B: BlockchainBackend + 'static
target: LOG_TARGET,
"UTXO stream completed for peer '{}'", self.peer_node_id
);
#[cfg(feature = "metrics")]
metrics::active_sync_peers().dec();
});

Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ mod rpc;
pub use rpc::create_mempool_rpc_service;
#[cfg(feature = "base_node")]
pub use rpc::{MempoolRpcClient, MempoolRpcServer, MempoolRpcService, MempoolService};
#[cfg(feature = "base_node")]
#[cfg(feature = "metrics")]
mod metrics;
#[cfg(feature = "base_node")]
mod shrink_hashmap;
Expand Down
5 changes: 4 additions & 1 deletion base_layer/core/src/mempool/service/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ use log::*;
use tari_comms::peer_manager::NodeId;
use tari_utilities::hex::Hex;

#[cfg(feature = "metrics")]
use crate::mempool::metrics;
use crate::{
base_node::comms_interface::{BlockEvent, BlockEvent::AddBlockErrored},
chain_storage::BlockAddResult,
mempool::{
metrics,
service::{MempoolRequest, MempoolResponse, MempoolServiceError, OutboundMempoolServiceInterface},
Mempool,
TxStorageResponse,
Expand Down Expand Up @@ -135,6 +136,7 @@ impl MempoolInboundHandlers {
}
match self.mempool.insert(tx.clone()).await {
Ok(tx_storage) => {
#[cfg(feature = "metrics")]
if tx_storage.is_stored() {
metrics::inbound_transactions(source_peer.as_ref()).inc();
} else {
Expand Down Expand Up @@ -164,6 +166,7 @@ impl MempoolInboundHandlers {

#[allow(clippy::cast_possible_wrap)]
async fn update_pool_size_metrics(&self) {
#[cfg(feature = "metrics")]
if let Ok(stats) = self.mempool.stats().await {
metrics::unconfirmed_pool_size().set(stats.unconfirmed_txs as i64);
metrics::reorg_pool_size().set(stats.reorg_txs as i64);
Expand Down
7 changes: 6 additions & 1 deletion base_layer/core/src/mempool/sync_protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,12 @@ use tokio::{
time,
};

#[cfg(feature = "metrics")]
use crate::mempool::metrics;
use crate::{
base_node::comms_interface::{BlockEvent, BlockEventReceiver},
chain_storage::BlockAddResult,
mempool::{metrics, proto, Mempool, MempoolServiceConfig},
mempool::{proto, Mempool, MempoolServiceConfig},
proto as shared_proto,
transactions::transaction_components::Transaction,
};
Expand Down Expand Up @@ -544,6 +546,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin

#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::cast_possible_wrap)]
#[cfg(feature = "metrics")]
{
let stats = self.mempool.stats().await?;
metrics::unconfirmed_pool_size().set(stats.unconfirmed_txs as i64);
Expand Down Expand Up @@ -580,6 +583,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin

let stored_result = self.mempool.insert(txn).await?;
if stored_result.is_stored() {
#[cfg(feature = "metrics")]
metrics::inbound_transactions(Some(&self.peer_node_id)).inc();
debug!(
target: LOG_TARGET,
Expand All @@ -588,6 +592,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin
self.peer_node_id.short_str()
);
} else {
#[cfg(feature = "metrics")]
metrics::rejected_inbound_transactions(Some(&self.peer_node_id)).inc();
debug!(
target: LOG_TARGET,
Expand Down
4 changes: 2 additions & 2 deletions comms/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ edition = "2018"

[dependencies]
tari_crypto = { version = "0.19" }
tari_metrics = { path = "../../infrastructure/metrics" }
tari_metrics = { path = "../../infrastructure/metrics", optional = true }
tari_storage = { path = "../../infrastructure/storage" }
tari_shutdown = { path = "../../infrastructure/shutdown" }
tari_utilities = { version = "0.6" }
Expand Down Expand Up @@ -63,5 +63,5 @@ tari_common = { path = "../../common", features = ["build"] }

[features]
c_integration = []
metrics = []
metrics = ["tari_metrics"]
rpc = ["tower/make", "tower/util"]
Loading
Loading