diff --git a/chart/templates/mayastor/io/io-engine-daemonset.yaml b/chart/templates/mayastor/io/io-engine-daemonset.yaml index 64372d8d1..f185d4698 100644 --- a/chart/templates/mayastor/io/io-engine-daemonset.yaml +++ b/chart/templates/mayastor/io/io-engine-daemonset.yaml @@ -50,9 +50,6 @@ spec: valueFrom: fieldRef: fieldPath: status.podIP - args: - - "-p{{ .Values.base.metrics.pollingInterval }}" - - "--api-versions={{ .Values.io_engine.api }}" ports: - containerPort: 9502 protocol: TCP diff --git a/metrics-exporter/Cargo.toml b/metrics-exporter/Cargo.toml index 204938db4..d69d922b8 100644 --- a/metrics-exporter/Cargo.toml +++ b/metrics-exporter/Cargo.toml @@ -3,7 +3,6 @@ name = "metrics-exporter" description = "Metrics Exporters" version = "0.1.0" edition = "2021" -authors = ["Sahil Raja "] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [[bin]] diff --git a/metrics-exporter/src/bin/io_engine/cache/mod.rs b/metrics-exporter/src/bin/io_engine/cache/mod.rs index d689dd6e5..045d5681f 100644 --- a/metrics-exporter/src/bin/io_engine/cache/mod.rs +++ b/metrics-exporter/src/bin/io_engine/cache/mod.rs @@ -1,14 +1,11 @@ mod pool; +mod pool_stat; -use crate::{ - client::{grpc_client::GrpcClient, pool::Pools}, - ExporterConfig, -}; - +use crate::client::{grpc_client::GrpcClient, pool::Pools, pool_stat::PoolIoStats}; use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; use std::sync::Mutex; -use tokio::time::sleep; + static CACHE: OnceCell> = OnceCell::new(); /// Trait to be implemented by all Resource structs stored in Cache. @@ -23,6 +20,15 @@ pub(crate) struct Cache { data: Data, } +/// Wrapper over all the data that has to be stored in cache. +#[derive(Serialize, Deserialize, Debug)] +pub(crate) struct Data { + /// Contains Pool Capacity and state data. + pools: Pools, + /// Contains Pool IOStats data. + pool_stats: PoolIoStats, +} + impl Cache { /// Initialize the cache with default value. pub fn initialize(data: Data) { @@ -38,13 +44,21 @@ impl Cache { pub fn pool_mut(&mut self) -> &mut Pools { &mut self.data.pools } -} -/// Wrapper over all the data that has to be stored in cache. -#[derive(Serialize, Deserialize, Debug)] -pub(crate) struct Data { - /// Contains Pool Capacity and state data. - pools: Pools, + /// Get mutable reference to PoolIOStats. + pub fn pool_iostat_mut(&mut self) -> &mut PoolIoStats { + &mut self.data.pool_stats + } + + /// Get a reference to Pool. + pub fn pool(&self) -> &Pools { + &self.data.pools + } + + /// Get a reference to PoolIoStats. + pub fn pool_iostat(&self) -> &PoolIoStats { + &self.data.pool_stats + } } impl Default for Data { @@ -58,21 +72,13 @@ impl Data { fn new() -> Self { Self { pools: Pools { pools: vec![] }, + pool_stats: PoolIoStats { pool_stats: vec![] }, } } } -/// To store data in shared variable i.e cache. -pub(crate) async fn store_data(client: GrpcClient) { - tokio::spawn(async move { - store_resource_data(client).await; - }); -} - -/// To store pools related data in cache. -async fn store_resource_data(client: GrpcClient) { - loop { - let _ = pool::store_pool_info_data(client.clone()).await; - sleep(ExporterConfig::get_config().polling_time()).await; - } +/// Populates Resource cache struct. +pub(crate) async fn store_resource_data(client: &GrpcClient) { + let _ = pool::store_pool_info_data(client).await; + let _ = pool_stat::store_pool_stats_data(client).await; } diff --git a/metrics-exporter/src/bin/io_engine/cache/pool.rs b/metrics-exporter/src/bin/io_engine/cache/pool.rs index a2e325731..9d4e22f82 100644 --- a/metrics-exporter/src/bin/io_engine/cache/pool.rs +++ b/metrics-exporter/src/bin/io_engine/cache/pool.rs @@ -1,10 +1,10 @@ use super::{Cache, ResourceOps}; use crate::client::{ grpc_client::GrpcClient, - pool::{PoolInfo, PoolOperations, Pools}, + pool::{PoolInfo, Pools}, }; use std::ops::DerefMut; -use tracing::{debug, error}; +use tracing::error; impl ResourceOps for Pools { type ResourceVec = Vec; @@ -19,7 +19,7 @@ impl ResourceOps for Pools { } /// To store pools state and capacity data in cache. -pub(crate) async fn store_pool_info_data(client: GrpcClient) -> Result<(), ()> { +pub(crate) async fn store_pool_info_data(client: &GrpcClient) -> Result<(), ()> { let pools = client.list_pools().await; let mut cache = match Cache::get_cache().lock() { Ok(cache) => cache, @@ -32,8 +32,7 @@ pub(crate) async fn store_pool_info_data(client: GrpcClient) -> Result<(), ()> { match pools { // set pools in the cache Ok(pools) => { - debug!("Updated pool cache with latest metrics"); - pools_cache.pool_mut().set(pools.pools) + pools_cache.pool_mut().set(pools.pools); } // invalidate cache in case of error Err(error) => { diff --git a/metrics-exporter/src/bin/io_engine/cache/pool_stat.rs b/metrics-exporter/src/bin/io_engine/cache/pool_stat.rs new file mode 100644 index 000000000..094f80207 --- /dev/null +++ b/metrics-exporter/src/bin/io_engine/cache/pool_stat.rs @@ -0,0 +1,45 @@ +use super::{Cache, ResourceOps}; +use crate::client::{ + grpc_client::GrpcClient, + pool_stat::{PoolIoStat, PoolIoStats}, +}; +use std::ops::DerefMut; +use tracing::error; + +impl ResourceOps for PoolIoStats { + type ResourceVec = Vec; + + fn set(&mut self, val: Self::ResourceVec) { + self.pool_stats = val + } + + fn invalidate(&mut self) { + self.pool_stats = vec![] + } +} + +/// To store pool iostat data in cache. +pub(crate) async fn store_pool_stats_data(client: &GrpcClient) -> Result<(), ()> { + let pool_stats = client.get_pool_iostat().await; + let mut cache = match Cache::get_cache().lock() { + Ok(cache) => cache, + Err(error) => { + error!(%error, "Error while getting cache resource"); + return Err(()); + } + }; + let pools_cache = cache.deref_mut(); + match pool_stats { + // set pools in the cache + Ok(pools) => { + pools_cache.pool_iostat_mut().set(pools.pool_stats); + } + // invalidate cache in case of error + Err(error) => { + error!(?error, "Error getting pools data, invalidating pools cache"); + pools_cache.pool_iostat_mut().invalidate(); + return Err(()); + } + }; + Ok(()) +} diff --git a/metrics-exporter/src/bin/io_engine/client/grpc_client.rs b/metrics-exporter/src/bin/io_engine/client/grpc_client.rs index 3537de0a7..a97b23e7c 100644 --- a/metrics-exporter/src/bin/io_engine/client/grpc_client.rs +++ b/metrics-exporter/src/bin/io_engine/client/grpc_client.rs @@ -1,13 +1,16 @@ -use crate::{error::ExporterError, get_node_name, get_pod_ip, ApiVersion}; -use rpc::io_engine::IoEngineClientV0; +use crate::{error::ExporterError, get_node_name, get_pod_ip}; +use crate::client::{ + pool::{PoolInfo, Pools}, + pool_stat::{PoolIoStat, PoolIoStats}, +}; use actix_web::http::Uri; use std::time::Duration; use tokio::time::sleep; use tonic::transport::Channel; -use tracing::{error, info}; +use tracing::error; -/// Timeout for gRPC. +/// Timeout for gRPC connection. #[derive(Debug, Clone)] pub struct Timeouts { connect: Duration, @@ -33,113 +36,62 @@ impl Timeouts { #[derive(Debug, Clone)] pub(crate) struct GrpcContext { endpoint: tonic::transport::Endpoint, - timeouts: Timeouts, - api_version: ApiVersion, } impl GrpcContext { - /// initialize context - pub fn new(endpoint: Uri, timeouts: Timeouts, api_version: ApiVersion) -> Self { + pub fn new(endpoint: Uri, timeouts: Timeouts) -> Self { let endpoint = tonic::transport::Endpoint::from(endpoint) .connect_timeout(timeouts.connect()) .timeout(timeouts.request()); - Self { - endpoint, - timeouts, - api_version, - } + Self { endpoint } } } -/// The V0 Mayastor client. -type MayaClientV0 = IoEngineClientV0; /// The V1 PoolClient. type PoolClient = rpc::v1::pool::pool_rpc_client::PoolRpcClient; +type StatsClient = rpc::v1::stats::StatsRpcClient; /// A wrapper for client for the V1 dataplane interface. #[derive(Clone, Debug)] pub(crate) struct MayaClientV1 { pub(crate) pool: PoolClient, + pub(crate) stats: StatsClient, } /// Dataplane grpc client. #[derive(Debug, Clone)] pub(crate) struct GrpcClient { - ctx: GrpcContext, - v0_client: Option, v1_client: Option, } +/// Number of grpc connect retries without error logging. +const SILENT_RETRIES: i32 = 3; + impl GrpcClient { - /// Initialize gRPC client. + /// Initialize v1 io engine gRPC client. pub(crate) async fn new(context: GrpcContext) -> Result { let sleep_duration_sec = 10; + let mut num_retires = 0; loop { - match context.api_version { - ApiVersion::V0 => { - match tokio::time::timeout( - context.timeouts.connect(), - MayaClientV0::connect(context.endpoint.clone()), - ) - .await - { - Err(error) => { - error!(error=%error, "Grpc connection timeout, retrying after {}s",sleep_duration_sec); - } - Ok(result) => match result { - Ok(v0_client) => { - return Ok(Self { - ctx: context.clone(), - v0_client: Some(v0_client), - v1_client: None, - }) - } - Err(error) => { - error!(error=%error, "Grpc client connection error, retrying after {}s",sleep_duration_sec); - } - }, - } - } - ApiVersion::V1 => { - match tokio::time::timeout( - context.timeouts.connect(), - PoolClient::connect(context.endpoint.clone()), - ) - .await - { - Err(error) => { - error!(error=%error, "Grpc connection timeout, retrying after {}s",sleep_duration_sec); - } - Ok(result) => match result { - Ok(pool) => { - info!("grpc connected successfully"); - return Ok(Self { - ctx: context.clone(), - v0_client: None, - v1_client: Some(MayaClientV1 { pool }), - }); - } - Err(error) => { - error!(error=%error, "Grpc client connection error, retrying after {}s",sleep_duration_sec); - } - }, - } + if let Ok(channel) = context.endpoint.connect().await { + let pool = PoolClient::new(channel.clone()); + let stats = StatsClient::new(channel.clone()); + return Ok(Self { + v1_client: Some(MayaClientV1 { pool, stats }), + }); + } else { + if num_retires > SILENT_RETRIES { + error!( + "Grpc connection timeout, retrying after {}s", + sleep_duration_sec + ); } + num_retires += 1; } sleep(Duration::from_secs(sleep_duration_sec)).await; } } - /// Get the v0 api client. - pub(crate) fn client_v0(&self) -> Result { - match self.v0_client.clone() { - Some(client) => Ok(client), - None => Err(ExporterError::GrpcClientError( - "Could not get v0 client".to_string(), - )), - } - } - /// Get the v1 api client. pub(crate) fn client_v1(&self) -> Result { match self.v1_client.clone() { @@ -149,15 +101,10 @@ impl GrpcClient { )), } } - - /// Get the api version. - pub(crate) fn api_version(&self) -> ApiVersion { - self.ctx.api_version.clone() - } } /// Initialize mayastor grpc client. -pub(crate) async fn init_client(api_version: ApiVersion) -> Result { +pub(crate) async fn init_client() -> Result { let timeout = Timeouts::new(Duration::from_secs(1), Duration::from_secs(5)); let pod_ip = get_pod_ip()?; let _ = get_node_name()?; @@ -167,7 +114,49 @@ pub(crate) async fn init_client(api_version: ApiVersion) -> Result Result { + let pools = match self + .client_v1()? + .pool + .list_pools(rpc::v1::pool::ListPoolOptions::default()) + .await + { + Ok(response) => response + .into_inner() + .pools + .into_iter() + .map(PoolInfo::from) + .collect::>(), + Err(error) => return Err(ExporterError::GrpcResponseError(error.to_string())), + }; + + Ok(Pools { pools }) + } + + /// Gets Io Statistics of all pool on the io engine. Maps the response to PoolIoStat struct. + pub(crate) async fn get_pool_iostat(&self) -> Result { + let pool_stats = match self + .client_v1()? + .stats + .get_pool_io_stats(rpc::v1::stats::ListStatsOption { name: None }) + .await + { + Ok(response) => Ok(response + .into_inner() + .stats + .into_iter() + .map(PoolIoStat::from) + .collect::>()), + Err(error) => Err(ExporterError::GrpcResponseError(error.to_string())), + }?; + Ok(PoolIoStats { pool_stats }) + } +} diff --git a/metrics-exporter/src/bin/io_engine/client/mod.rs b/metrics-exporter/src/bin/io_engine/client/mod.rs index 67c5a56c9..650846223 100644 --- a/metrics-exporter/src/bin/io_engine/client/mod.rs +++ b/metrics-exporter/src/bin/io_engine/client/mod.rs @@ -2,12 +2,5 @@ pub mod grpc_client; /// PoolInfo module. pub mod pool; - -#[derive( - Debug, strum_macros::EnumString, strum_macros::AsRefStr, Clone, Ord, PartialOrd, Eq, PartialEq, -)] -#[strum(serialize_all = "lowercase")] -pub enum ApiVersion { - V0, - V1, -} +/// PoolIoStats module +pub mod pool_stat; diff --git a/metrics-exporter/src/bin/io_engine/client/pool.rs b/metrics-exporter/src/bin/io_engine/client/pool.rs index be5864944..7c89261bf 100644 --- a/metrics-exporter/src/bin/io_engine/client/pool.rs +++ b/metrics-exporter/src/bin/io_engine/client/pool.rs @@ -1,5 +1,3 @@ -use crate::{client::grpc_client::GrpcClient, error::ExporterError, ApiVersion}; - use serde::{Deserialize, Serialize}; /// This stores Capacity and state information of a pool. @@ -45,23 +43,6 @@ pub(crate) struct Pools { pub(crate) pools: Vec, } -/// Trait to be implemented by grpc client to call pool rpc. -#[tonic::async_trait] -pub(crate) trait PoolOperations: Send + Sync + Sized { - async fn list_pools(&self) -> Result; -} - -impl From for PoolInfo { - fn from(value: rpc::io_engine::Pool) -> Self { - Self { - name: value.name, - used: value.used, - capacity: value.capacity, - state: value.state as u64, - committed: value.used, - } - } -} impl From for PoolInfo { fn from(value: rpc::v1::pool::Pool) -> Self { Self { @@ -73,36 +54,3 @@ impl From for PoolInfo { } } } - -#[tonic::async_trait] -impl PoolOperations for GrpcClient { - async fn list_pools(&self) -> Result { - let pools = match self.api_version() { - ApiVersion::V0 => match self.client_v0()?.list_pools(rpc::io_engine::Null {}).await { - Ok(response) => response - .into_inner() - .pools - .into_iter() - .map(PoolInfo::from) - .collect::>(), - Err(error) => return Err(ExporterError::GrpcResponseError(error.to_string())), - }, - ApiVersion::V1 => match self - .client_v1()? - .pool - .list_pools(rpc::v1::pool::ListPoolOptions::default()) - .await - { - Ok(response) => response - .into_inner() - .pools - .into_iter() - .map(PoolInfo::from) - .collect::>(), - Err(error) => return Err(ExporterError::GrpcResponseError(error.to_string())), - }, - }; - - Ok(Pools { pools }) - } -} diff --git a/metrics-exporter/src/bin/io_engine/client/pool_stat.rs b/metrics-exporter/src/bin/io_engine/client/pool_stat.rs new file mode 100644 index 000000000..2c8e493f9 --- /dev/null +++ b/metrics-exporter/src/bin/io_engine/client/pool_stat.rs @@ -0,0 +1,74 @@ +use serde::{Deserialize, Serialize}; + +/// This stores IoStat information of a pool. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub(crate) struct PoolIoStat { + name: String, + bytes_read: u64, + num_read_ops: u64, + bytes_written: u64, + num_write_ops: u64, + read_latency_us: u64, + write_latency_us: u64, +} + +impl PoolIoStat { + /// Get name of the pool. + pub(crate) fn name(&self) -> &String { + &self.name + } + + /// Get used bytes read of the pool. + pub(crate) fn bytes_read(&self) -> u64 { + self.bytes_read + } + + /// Get total number of read ops of the pool. + pub(crate) fn num_read_ops(&self) -> u64 { + self.num_read_ops + } + + /// Get the total bytes written in bytes. + pub(crate) fn bytes_written(&self) -> u64 { + self.bytes_written + } + + /// Get total number of write ops of the pool. + pub(crate) fn num_write_ops(&self) -> u64 { + self.num_write_ops + } + + /// Get total read latency in usec of the pool. + pub(crate) fn read_latency(&self) -> u64 { + self.read_latency_us + } + + /// Get total write latency in usec of the pool. + pub(crate) fn write_latency(&self) -> u64 { + self.write_latency_us + } +} + +/// Array of PoolIoStat objects. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub(crate) struct PoolIoStats { + pub(crate) pool_stats: Vec, +} + +impl From for PoolIoStat { + fn from(value: rpc::v1::stats::IoStats) -> Self { + Self { + name: value.name, + bytes_read: value.bytes_read, + num_read_ops: value.num_read_ops, + bytes_written: value.bytes_written, + num_write_ops: value.num_write_ops, + read_latency_us: ticks_to_time(value.read_latency_ticks, value.tick_rate), + write_latency_us: ticks_to_time(value.write_latency_ticks, value.tick_rate), + } + } +} + +fn ticks_to_time(tick: u64, tick_rate: u64) -> u64 { + ((tick as u128 * 1000000) / tick_rate as u128) as u64 +} diff --git a/metrics-exporter/src/bin/io_engine/collector/mod.rs b/metrics-exporter/src/bin/io_engine/collector/mod.rs index c686087c5..5a3aac8d5 100644 --- a/metrics-exporter/src/bin/io_engine/collector/mod.rs +++ b/metrics-exporter/src/bin/io_engine/collector/mod.rs @@ -1,2 +1,19 @@ +use prometheus::{ + core::{Collector, Desc}, + GaugeVec, Opts, +}; + /// Module for pools collector. pub mod pool; +pub mod pool_stat; + +/// Initializes a GaugeVec metric with the provided metric name, description and descriptors. +fn init_gauge_vec(metric_name: &str, metric_desc: &str, descs: &mut Vec) -> GaugeVec { + let opts = Opts::new(metric_name, metric_desc) + .subsystem("diskpool") + .variable_labels(vec!["node".to_string(), "name".to_string()]); + let gauge_vec = GaugeVec::new(opts, &["node", "name"]) + .unwrap_or_else(|_| panic!("Unable to create gauge metric type for {}", metric_name)); + descs.extend(gauge_vec.desc().into_iter().cloned()); + gauge_vec +} diff --git a/metrics-exporter/src/bin/io_engine/collector/pool.rs b/metrics-exporter/src/bin/io_engine/collector/pool.rs index a24816fbf..8b279700f 100644 --- a/metrics-exporter/src/bin/io_engine/collector/pool.rs +++ b/metrics-exporter/src/bin/io_engine/collector/pool.rs @@ -1,9 +1,12 @@ -use crate::{cache::Cache, client::pool::PoolInfo, get_node_name}; +use crate::{cache::Cache, client::pool::PoolInfo, collector::init_gauge_vec, get_node_name}; use prometheus::{ core::{Collector, Desc}, - GaugeVec, Opts, + GaugeVec, +}; +use std::{ + fmt::Debug, + ops::{Deref, DerefMut}, }; -use std::{fmt::Debug, ops::DerefMut}; use tracing::error; /// Collects Pool capacity metrics from cache. @@ -24,30 +27,22 @@ impl Default for PoolCapacityCollector { impl PoolCapacityCollector { /// Initialize all the metrics to be defined for pools capacity collector. pub fn new() -> Self { - let pool_total_size_opts = Opts::new("total_size_bytes", "Total size of the pool in bytes") - .subsystem("disk_pool") - .variable_labels(vec!["node".to_string(), "name".to_string()]); - let pool_used_size_opts = Opts::new("used_size_bytes", "Used size of the pool in bytes") - .subsystem("disk_pool") - .variable_labels(vec!["node".to_string(), "name".to_string()]); - let pool_committed_size_opts = Opts::new( + let mut descs = Vec::new(); + let pool_total_size = init_gauge_vec( + "total_size_bytes", + "Total size of the pool in bytes", + &mut descs, + ); + let pool_used_size = init_gauge_vec( + "total_used_bytes", + "Used size of the pool in bytes", + &mut descs, + ); + let pool_committed_size = init_gauge_vec( "committed_size_bytes", "Committed size of the pool in bytes", - ) - .subsystem("disk_pool") - .variable_labels(vec!["node".to_string(), "name".to_string()]); - let mut descs = Vec::new(); - - let pool_total_size = GaugeVec::new(pool_total_size_opts, &["node", "name"]) - .expect("Unable to create gauge metric type for pool_total_size"); - let pool_used_size = GaugeVec::new(pool_used_size_opts, &["node", "name"]) - .expect("Unable to create gauge metric type for pool_used_size"); - let pool_committed_size = GaugeVec::new(pool_committed_size_opts, &["node", "name"]) - .expect("Unable to create gauge metric type for pool_committed_size"); - // Descriptors for the custom metrics - descs.extend(pool_total_size.desc().into_iter().cloned()); - descs.extend(pool_used_size.desc().into_iter().cloned()); - descs.extend(pool_committed_size.desc().into_iter().cloned()); + &mut descs, + ); Self { pool_total_size, @@ -64,15 +59,15 @@ impl Collector for PoolCapacityCollector { } fn collect(&self) -> Vec { - let mut c = match Cache::get_cache().lock() { + let c = match Cache::get_cache().lock() { Ok(c) => c, Err(error) => { error!(%error,"Error while getting cache resource"); return Vec::new(); } }; - let cp = c.deref_mut(); - let mut metric_family = Vec::with_capacity(3 * cp.pool_mut().pools.capacity()); + let cp = c.deref(); + let mut metric_family = Vec::with_capacity(3 * cp.pool().pools.capacity()); let node_name = match get_node_name() { Ok(name) => name, Err(error) => { @@ -81,7 +76,7 @@ impl Collector for PoolCapacityCollector { } }; - for i in &cp.pool_mut().pools { + for i in &cp.pool().pools { let p: &PoolInfo = i; let pool_total_size = match self @@ -146,13 +141,8 @@ impl Default for PoolStatusCollector { impl PoolStatusCollector { /// Initialize all the metrics to be defined for pools status collector. pub fn new() -> Self { - let pool_status_opts = Opts::new("status", "Status of the pool") - .subsystem("disk_pool") - .variable_labels(vec!["node".to_string(), "name".to_string()]); let mut descs = Vec::new(); - let pool_status = GaugeVec::new(pool_status_opts, &["node", "name"]) - .expect("Unable to create gauge metric type for pool_status"); - descs.extend(pool_status.desc().into_iter().cloned()); + let pool_status = init_gauge_vec("status", "Status of the pool", &mut descs); Self { pool_status, descs } } } diff --git a/metrics-exporter/src/bin/io_engine/collector/pool_stat.rs b/metrics-exporter/src/bin/io_engine/collector/pool_stat.rs new file mode 100644 index 000000000..7b4a0b173 --- /dev/null +++ b/metrics-exporter/src/bin/io_engine/collector/pool_stat.rs @@ -0,0 +1,185 @@ +use super::init_gauge_vec; +use crate::{cache::Cache, client::pool_stat::PoolIoStat, get_node_name}; +use prometheus::{ + core::{Collector, Desc}, + GaugeVec, +}; +use std::{fmt::Debug, ops::Deref}; +use tracing::error; + +/// Collects Pool IoStat metrics from cache. +#[derive(Clone, Debug)] +pub(crate) struct PoolIoStatsCollector { + pool_bytes_read: GaugeVec, + pool_num_read_ops: GaugeVec, + pool_bytes_written: GaugeVec, + pool_num_write_ops: GaugeVec, + pool_read_latency_us: GaugeVec, + pool_write_latency_us: GaugeVec, + descs: Vec, +} + +impl Default for PoolIoStatsCollector { + fn default() -> Self { + Self::new() + } +} +/// Initialize all the metrics to be defined for pools iostat collector. +impl PoolIoStatsCollector { + /// Initialize all the metrics to be defined for pools iostat collector. + pub fn new() -> Self { + let mut descs = Vec::new(); + + let pool_bytes_read = + init_gauge_vec("bytes_read", "Total bytes read on the pool", &mut descs); + let pool_num_read_ops = init_gauge_vec( + "num_read_ops", + "Number of read operations on the pool", + &mut descs, + ); + let pool_bytes_written = init_gauge_vec( + "bytes_written", + "Total bytes written on the pool", + &mut descs, + ); + let pool_num_write_ops = init_gauge_vec( + "num_write_ops", + "Number of write operations on the pool", + &mut descs, + ); + let pool_read_latency_us = init_gauge_vec( + "read_latency_us", + "Total read latency on the pool in usec", + &mut descs, + ); + let pool_write_latency_us = init_gauge_vec( + "write_latency_us", + "Total write latency on the pool in usec", + &mut descs, + ); + + Self { + pool_bytes_read, + pool_num_read_ops, + pool_bytes_written, + pool_num_write_ops, + pool_read_latency_us, + pool_write_latency_us, + descs, + } + } +} + +impl Collector for PoolIoStatsCollector { + fn desc(&self) -> Vec<&Desc> { + self.descs.iter().collect() + } + + fn collect(&self) -> Vec { + let c = match Cache::get_cache().lock() { + Ok(c) => c, + Err(error) => { + error!(%error,"Error while getting cache resource"); + return Vec::new(); + } + }; + let cp = c.deref(); + let mut metric_family = Vec::with_capacity(6 * cp.pool_iostat().pool_stats.capacity()); + let node_name = match get_node_name() { + Ok(name) => name, + Err(error) => { + error!(?error, "Unable to get node name"); + return metric_family; + } + }; + + for i in &cp.pool_iostat().pool_stats { + let p: &PoolIoStat = i; + + let pool_bytes_read = match self + .pool_bytes_read + .get_metric_with_label_values(&[node_name.clone().as_str(), p.name().as_str()]) + { + Ok(pool_bytes_read) => pool_bytes_read, + Err(error) => { + error!(%error, "Error while creating metrics(pool_bytes_read) with label values"); + return metric_family; + } + }; + pool_bytes_read.set(p.bytes_read() as f64); + let mut x = pool_bytes_read.collect(); + metric_family.extend(x.pop()); + + let pool_num_read_ops = match self + .pool_num_read_ops + .get_metric_with_label_values(&[node_name.clone().as_str(), p.name().as_str()]) + { + Ok(pool_num_read_ops) => pool_num_read_ops, + Err(error) => { + error!(%error, "Error while creating metrics(pool_num_read_ops) with label values"); + return metric_family; + } + }; + pool_num_read_ops.set(p.num_read_ops() as f64); + let mut x = pool_num_read_ops.collect(); + metric_family.extend(x.pop()); + + let pool_bytes_written = match self + .pool_bytes_written + .get_metric_with_label_values(&[node_name.clone().as_str(), p.name().as_str()]) + { + Ok(pool_bytes_written) => pool_bytes_written, + Err(error) => { + error!(%error, "Error while creating metrics(pool_bytes_written) with label values"); + return metric_family; + } + }; + pool_bytes_written.set(p.bytes_written() as f64); + let mut x = pool_bytes_written.collect(); + metric_family.extend(x.pop()); + + let pool_num_write_ops = match self + .pool_num_write_ops + .get_metric_with_label_values(&[node_name.clone().as_str(), p.name().as_str()]) + { + Ok(pool_num_write_ops) => pool_num_write_ops, + Err(error) => { + error!(%error, "Error while creating metrics(pool_num_write_ops) with label values"); + return metric_family; + } + }; + pool_num_write_ops.set(p.num_write_ops() as f64); + let mut x = pool_num_write_ops.collect(); + metric_family.extend(x.pop()); + + let pool_read_latency_us = match self + .pool_read_latency_us + .get_metric_with_label_values(&[node_name.clone().as_str(), p.name().as_str()]) + { + Ok(pool_read_latency_us) => pool_read_latency_us, + Err(error) => { + error!(%error, "Error while creating metrics(pool_read_latency) with label values"); + return metric_family; + } + }; + pool_read_latency_us.set(p.read_latency() as f64); + let mut x = pool_read_latency_us.collect(); + metric_family.extend(x.pop()); + + let pool_write_latency_us = match self + .pool_write_latency_us + .get_metric_with_label_values(&[node_name.clone().as_str(), p.name().as_str()]) + { + Ok(pool_write_latency_us) => pool_write_latency_us, + Err(error) => { + error!(%error, "Error while creating metrics(pool_write_latency) with label values"); + return metric_family; + } + }; + pool_write_latency_us.set(p.write_latency() as f64); + let mut x = pool_write_latency_us.collect(); + metric_family.extend(x.pop()); + } + metric_family + } +} diff --git a/metrics-exporter/src/bin/io_engine/config.rs b/metrics-exporter/src/bin/io_engine/config.rs deleted file mode 100644 index 0673bc09b..000000000 --- a/metrics-exporter/src/bin/io_engine/config.rs +++ /dev/null @@ -1,39 +0,0 @@ -use std::{net::SocketAddr, time::Duration}; - -use once_cell::sync::OnceCell; - -static CONFIG: OnceCell = OnceCell::new(); - -/// Exporter config that can be passed through arguments. -pub struct ExporterConfig { - /// Network address where the prometheus metrics endpoint will listen (example: 9502). - metrics_endpoint: SocketAddr, - - /// polling time to do grpc calls to get data from the server.(Default: 30s). - polling_time: Duration, -} - -impl ExporterConfig { - /// Initialize metrics-exporter configs. - pub fn initialize(addr: SocketAddr, polling_time: Duration) { - CONFIG.get_or_init(|| Self { - metrics_endpoint: addr, - polling_time, - }); - } - - /// Get metrics-exporter config. - pub fn get_config() -> &'static ExporterConfig { - CONFIG.get().expect("Exporter config is not initialized") - } - - /// Get metrics endpoint. - pub fn metrics_endpoint(&self) -> &SocketAddr { - &self.metrics_endpoint - } - - /// Get polling time. - pub fn polling_time(&self) -> Duration { - self.polling_time - } -} diff --git a/metrics-exporter/src/bin/io_engine/main.rs b/metrics-exporter/src/bin/io_engine/main.rs index 765ad74f2..e14e9c46f 100644 --- a/metrics-exporter/src/bin/io_engine/main.rs +++ b/metrics-exporter/src/bin/io_engine/main.rs @@ -1,12 +1,11 @@ use crate::{ - cache::store_data, - client::{grpc_client::init_client, ApiVersion}, - config::ExporterConfig, + client::grpc_client::{init_client, GrpcClient}, error::ExporterError, serve::metric_route, }; use actix_web::{middleware, HttpServer}; use clap::Parser; +use once_cell::sync::OnceCell; use std::{env, net::SocketAddr}; /// Cache module for exporter. @@ -15,18 +14,11 @@ pub(crate) mod cache; pub(crate) mod client; /// Collector module. pub(crate) mod collector; -/// Config module for metrics-exporter. -pub(crate) mod config; /// Error module. pub(crate) mod error; /// Prometheus metrics handler module. pub(crate) mod serve; -/// Initialize metrics-exporter config that are passed through arguments. -fn initialize_exporter(args: &Cli) { - ExporterConfig::initialize(args.metrics_endpoint, args.polling_time.into()); -} - /// Initialize cache. async fn initialize_cache() { cache::Cache::initialize(cache::Data::default()); @@ -45,18 +37,10 @@ fn get_node_name() -> Result { #[derive(Parser, Debug)] #[clap(name = utils::package_description!(), version = utils::version_info_str!())] -struct Cli { +pub(crate) struct Cli { /// TCP address where prometheus endpoint will listen to #[clap(long, short, default_value = "0.0.0.0:9502")] metrics_endpoint: SocketAddr, - - /// Polling time in seconds to get pools data through gRPC calls - #[clap(short, long, default_value = "300s")] - polling_time: humantime::Duration, - - /// Io engine api versions - #[clap(short, long, value_delimiter = ',', required = true)] - api_versions: Vec, } impl Cli { @@ -65,32 +49,35 @@ impl Cli { } } +static GRPC_CLIENT: OnceCell = OnceCell::new(); + +/// Get IO engine gRPC Client. +pub(crate) fn grpc_client<'a>() -> &'a GrpcClient { + GRPC_CLIENT + .get() + .expect("gRPC Client should have been initialised") +} + #[tokio::main] async fn main() -> Result<(), ExporterError> { let args = Cli::args(); - utils::print_package_info!(); utils::tracing_telemetry::init_tracing("metrics-exporter-io_engine", vec![], None); - initialize_exporter(&args); - initialize_cache().await; - - // sort to get the latest api version - let mut api_versions = args.api_versions; - api_versions.sort_by(|a, b| b.cmp(a)); - - let client = init_client(api_versions.get(0).unwrap_or(&ApiVersion::V0).clone()).await?; - - store_data(client).await; + let client = init_client().await?; + // Initialize io engine gRPC client. + GRPC_CLIENT + .set(client) + .expect("Expect to be initialised only once"); let app = move || { actix_web::App::new() .wrap(middleware::Logger::default()) .configure(metric_route) }; HttpServer::new(app) - .bind(ExporterConfig::get_config().metrics_endpoint()) + .bind(args.metrics_endpoint) .map_err(|_| { ExporterError::HttpBindError("Failed to bind endpoint to http server".to_string()) })? diff --git a/metrics-exporter/src/bin/io_engine/serve/handler.rs b/metrics-exporter/src/bin/io_engine/serve/handler.rs index 2f2976beb..77ba36cdc 100644 --- a/metrics-exporter/src/bin/io_engine/serve/handler.rs +++ b/metrics-exporter/src/bin/io_engine/serve/handler.rs @@ -1,12 +1,21 @@ -use crate::collector::pool::{PoolCapacityCollector, PoolStatusCollector}; +use crate::{ + cache::store_resource_data, + collector::{ + pool::{PoolCapacityCollector, PoolStatusCollector}, + pool_stat::PoolIoStatsCollector, + }, + grpc_client, +}; use actix_web::{http::header, HttpResponse, Responder}; use prometheus::{Encoder, Registry}; use tracing::{error, warn}; /// Handler for metrics. Initializes all collector and serves data over Http. pub(crate) async fn metrics_handler() -> impl Responder { + store_resource_data(grpc_client()).await; let pools_collector = PoolCapacityCollector::default(); let pool_status_collector = PoolStatusCollector::default(); + let pool_iostat_collector = PoolIoStatsCollector::default(); // Create a new registry for prometheus let registry = Registry::default(); // Register pools collector in the registry @@ -16,6 +25,9 @@ pub(crate) async fn metrics_handler() -> impl Responder { if let Err(error) = Registry::register(®istry, Box::new(pool_status_collector)) { warn!(%error, "Pools status collector already registered"); } + if let Err(error) = Registry::register(®istry, Box::new(pool_iostat_collector)) { + warn!(%error, "Pools status collector already registered"); + } let mut buffer = Vec::new();