Skip to content

Commit

Permalink
feat(metrics_exporter): expose pool iostats
Browse files Browse the repository at this point in the history
Signed-off-by: Abhilash Shetty <[email protected]>
  • Loading branch information
abhilashshetty04 committed Jan 22, 2024
1 parent ea97012 commit da89e0c
Show file tree
Hide file tree
Showing 14 changed files with 462 additions and 210 deletions.
2 changes: 0 additions & 2 deletions chart/templates/mayastor/io/io-engine-daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ spec:
valueFrom:
fieldRef:
fieldPath: status.podIP
args:
- "--api-versions={{ .Values.io_engine.api }}"
ports:
- containerPort: 9502
protocol: TCP
Expand Down
1 change: 0 additions & 1 deletion metrics-exporter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ name = "metrics-exporter"
description = "Metrics Exporters"
version = "0.1.0"
edition = "2021"
authors = ["Sahil Raja <[email protected]>"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[[bin]]
Expand Down
38 changes: 29 additions & 9 deletions metrics-exporter/src/bin/io_engine/cache/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
mod pool;
mod pool_stat;

use crate::client::{grpc_client::GrpcClient, pool::Pools};

use crate::client::{grpc_client::GrpcClient, pool::Pools, pool_stat::PoolIoStats};
use once_cell::sync::OnceCell;
use serde::{Deserialize, Serialize};
use std::sync::Mutex;

static CACHE: OnceCell<Mutex<Cache>> = OnceCell::new();

/// Trait to be implemented by all Resource structs stored in Cache.
Expand All @@ -19,6 +20,15 @@ pub(crate) struct Cache {
data: Data,
}

/// Wrapper over all the data that has to be stored in cache.
#[derive(Serialize, Deserialize, Debug)]
pub(crate) struct Data {
/// Contains Pool Capacity and state data.
pools: Pools,
/// Contains Pool IOStats data.
pool_stats: PoolIoStats,
}

impl Cache {
/// Initialize the cache with default value.
pub fn initialize(data: Data) {
Expand All @@ -34,13 +44,21 @@ impl Cache {
pub fn pool_mut(&mut self) -> &mut Pools {
&mut self.data.pools
}
}

/// Wrapper over all the data that has to be stored in cache.
#[derive(Serialize, Deserialize, Debug)]
pub(crate) struct Data {
/// Contains Pool Capacity and state data.
pools: Pools,
/// Get mutable reference to PoolIOStats.
pub fn pool_iostat_mut(&mut self) -> &mut PoolIoStats {
&mut self.data.pool_stats
}

/// Get a reference to Pool.
pub fn pool(&self) -> &Pools {
&self.data.pools
}

/// Get a reference to PoolIoStats.
pub fn pool_iostat(&self) -> &PoolIoStats {
&self.data.pool_stats
}
}

impl Default for Data {
Expand All @@ -54,11 +72,13 @@ impl Data {
fn new() -> Self {
Self {
pools: Pools { pools: vec![] },
pool_stats: PoolIoStats { pool_stats: vec![] },
}
}
}

/// Populates Resource cache struct.
pub(crate) async fn store_resource_data(client: &GrpcClient) {
let _ = pool::store_pool_info_data(client.clone()).await;
let _ = pool::store_pool_info_data(client).await;
let _ = pool_stat::store_pool_stats_data(client).await;
}
4 changes: 2 additions & 2 deletions metrics-exporter/src/bin/io_engine/cache/pool.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{Cache, ResourceOps};
use crate::client::{
grpc_client::GrpcClient,
pool::{PoolInfo, PoolOperations, Pools},
pool::{PoolInfo, Pools},
};
use std::ops::DerefMut;
use tracing::error;
Expand All @@ -19,7 +19,7 @@ impl ResourceOps for Pools {
}

/// To store pools state and capacity data in cache.
pub(crate) async fn store_pool_info_data(client: GrpcClient) -> Result<(), ()> {
pub(crate) async fn store_pool_info_data(client: &GrpcClient) -> Result<(), ()> {
let pools = client.list_pools().await;
let mut cache = match Cache::get_cache().lock() {
Ok(cache) => cache,
Expand Down
45 changes: 45 additions & 0 deletions metrics-exporter/src/bin/io_engine/cache/pool_stat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use super::{Cache, ResourceOps};
use crate::client::{
grpc_client::GrpcClient,
pool_stat::{PoolIoStat, PoolIoStats},
};
use std::ops::DerefMut;
use tracing::error;

impl ResourceOps for PoolIoStats {
type ResourceVec = Vec<PoolIoStat>;

fn set(&mut self, val: Self::ResourceVec) {
self.pool_stats = val
}

fn invalidate(&mut self) {
self.pool_stats = vec![]
}
}

/// To store pool iostat data in cache.
pub(crate) async fn store_pool_stats_data(client: &GrpcClient) -> Result<(), ()> {
let pool_stats = client.get_pool_iostat().await;
let mut cache = match Cache::get_cache().lock() {
Ok(cache) => cache,
Err(error) => {
error!(%error, "Error while getting cache resource");
return Err(());
}
};
let pools_cache = cache.deref_mut();
match pool_stats {
// set pools in the cache
Ok(pools) => {
pools_cache.pool_iostat_mut().set(pools.pool_stats);
}
// invalidate cache in case of error
Err(error) => {
error!(?error, "Error getting pools data, invalidating pools cache");
pools_cache.pool_iostat_mut().invalidate();
return Err(());
}
};
Ok(())
}
157 changes: 73 additions & 84 deletions metrics-exporter/src/bin/io_engine/client/grpc_client.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use crate::{error::ExporterError, get_node_name, get_pod_ip, ApiVersion};
use rpc::io_engine::IoEngineClientV0;
use crate::{error::ExporterError, get_node_name, get_pod_ip};

use crate::client::{
pool::{PoolInfo, Pools},
pool_stat::{PoolIoStat, PoolIoStats},
};
use actix_web::http::Uri;
use std::time::Duration;
use tokio::time::sleep;
use tonic::transport::Channel;
use tracing::{error, info};
use tracing::error;

/// Timeout for gRPC.
/// Timeout for gRPC connection.
#[derive(Debug, Clone)]
pub struct Timeouts {
connect: Duration,
Expand All @@ -33,113 +36,62 @@ impl Timeouts {
#[derive(Debug, Clone)]
pub(crate) struct GrpcContext {
endpoint: tonic::transport::Endpoint,
timeouts: Timeouts,
api_version: ApiVersion,
}

impl GrpcContext {
/// initialize context
pub fn new(endpoint: Uri, timeouts: Timeouts, api_version: ApiVersion) -> Self {
pub fn new(endpoint: Uri, timeouts: Timeouts) -> Self {
let endpoint = tonic::transport::Endpoint::from(endpoint)
.connect_timeout(timeouts.connect())
.timeout(timeouts.request());
Self {
endpoint,
timeouts,
api_version,
}
Self { endpoint }
}
}
/// The V0 Mayastor client.
type MayaClientV0 = IoEngineClientV0<Channel>;

/// The V1 PoolClient.
type PoolClient = rpc::v1::pool::pool_rpc_client::PoolRpcClient<Channel>;
type StatsClient = rpc::v1::stats::StatsRpcClient<Channel>;

/// A wrapper for client for the V1 dataplane interface.
#[derive(Clone, Debug)]
pub(crate) struct MayaClientV1 {
pub(crate) pool: PoolClient,
pub(crate) stats: StatsClient,
}

/// Dataplane grpc client.
#[derive(Debug, Clone)]
pub(crate) struct GrpcClient {
ctx: GrpcContext,
v0_client: Option<MayaClientV0>,
v1_client: Option<MayaClientV1>,
}

/// Number of grpc connect retries without error logging.
const SILENT_RETRIES: i32 = 3;

impl GrpcClient {
/// Initialize gRPC client.
/// Initialize v1 io engine gRPC client.
pub(crate) async fn new(context: GrpcContext) -> Result<Self, ExporterError> {
let sleep_duration_sec = 10;
let mut num_retires = 0;
loop {
match context.api_version {
ApiVersion::V0 => {
match tokio::time::timeout(
context.timeouts.connect(),
MayaClientV0::connect(context.endpoint.clone()),
)
.await
{
Err(error) => {
error!(error=%error, "Grpc connection timeout, retrying after {}s",sleep_duration_sec);
}
Ok(result) => match result {
Ok(v0_client) => {
return Ok(Self {
ctx: context.clone(),
v0_client: Some(v0_client),
v1_client: None,
})
}
Err(error) => {
error!(error=%error, "Grpc client connection error, retrying after {}s",sleep_duration_sec);
}
},
}
}
ApiVersion::V1 => {
match tokio::time::timeout(
context.timeouts.connect(),
PoolClient::connect(context.endpoint.clone()),
)
.await
{
Err(error) => {
error!(error=%error, "Grpc connection timeout, retrying after {}s",sleep_duration_sec);
}
Ok(result) => match result {
Ok(pool) => {
info!("grpc connected successfully");
return Ok(Self {
ctx: context.clone(),
v0_client: None,
v1_client: Some(MayaClientV1 { pool }),
});
}
Err(error) => {
error!(error=%error, "Grpc client connection error, retrying after {}s",sleep_duration_sec);
}
},
}
if let Ok(channel) = context.endpoint.connect().await {
let pool = PoolClient::new(channel.clone());
let stats = StatsClient::new(channel.clone());
return Ok(Self {
v1_client: Some(MayaClientV1 { pool, stats }),
});
} else {
if num_retires > SILENT_RETRIES {
error!(
"Grpc connection timeout, retrying after {}s",
sleep_duration_sec
);
}
num_retires += 1;
}
sleep(Duration::from_secs(sleep_duration_sec)).await;
}
}

/// Get the v0 api client.
pub(crate) fn client_v0(&self) -> Result<MayaClientV0, ExporterError> {
match self.v0_client.clone() {
Some(client) => Ok(client),
None => Err(ExporterError::GrpcClientError(
"Could not get v0 client".to_string(),
)),
}
}

/// Get the v1 api client.
pub(crate) fn client_v1(&self) -> Result<MayaClientV1, ExporterError> {
match self.v1_client.clone() {
Expand All @@ -149,15 +101,10 @@ impl GrpcClient {
)),
}
}

/// Get the api version.
pub(crate) fn api_version(&self) -> ApiVersion {
self.ctx.api_version.clone()
}
}

/// Initialize mayastor grpc client.
pub(crate) async fn init_client(api_version: ApiVersion) -> Result<GrpcClient, ExporterError> {
pub(crate) async fn init_client() -> Result<GrpcClient, ExporterError> {
let timeout = Timeouts::new(Duration::from_secs(1), Duration::from_secs(5));
let pod_ip = get_pod_ip()?;
let _ = get_node_name()?;
Expand All @@ -167,7 +114,49 @@ pub(crate) async fn init_client(api_version: ApiVersion) -> Result<GrpcClient, E
.path_and_query("")
.build()
.map_err(|error| ExporterError::InvalidURI(error.to_string()))?;
let ctx = GrpcContext::new(endpoint, timeout, api_version);
let ctx = GrpcContext::new(endpoint, timeout);
let client = GrpcClient::new(ctx).await?;
Ok(client)
}

impl GrpcClient {
/// Gets Capacity statistics of all pool on the io engine.
/// Maps the response to PoolInfo struct.
pub(crate) async fn list_pools(&self) -> Result<Pools, ExporterError> {
let pools = match self
.client_v1()?
.pool
.list_pools(rpc::v1::pool::ListPoolOptions::default())
.await
{
Ok(response) => response
.into_inner()
.pools
.into_iter()
.map(PoolInfo::from)
.collect::<Vec<_>>(),
Err(error) => return Err(ExporterError::GrpcResponseError(error.to_string())),
};

Ok(Pools { pools })
}

/// Gets Io Statistics of all pool on the io engine. Maps the response to PoolIoStat struct.
pub(crate) async fn get_pool_iostat(&self) -> Result<PoolIoStats, ExporterError> {
let pool_stats = match self
.client_v1()?
.stats
.get_pool_io_stats(rpc::v1::stats::ListStatsOption { name: None })
.await
{
Ok(response) => Ok(response
.into_inner()
.stats
.into_iter()
.map(PoolIoStat::from)
.collect::<Vec<_>>()),
Err(error) => Err(ExporterError::GrpcResponseError(error.to_string())),
}?;
Ok(PoolIoStats { pool_stats })
}
}
11 changes: 2 additions & 9 deletions metrics-exporter/src/bin/io_engine/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,5 @@
pub mod grpc_client;
/// PoolInfo module.
pub mod pool;

#[derive(
Debug, strum_macros::EnumString, strum_macros::AsRefStr, Clone, Ord, PartialOrd, Eq, PartialEq,
)]
#[strum(serialize_all = "lowercase")]
pub enum ApiVersion {
V0,
V1,
}
/// PoolIoStats module
pub mod pool_stat;
Loading

0 comments on commit da89e0c

Please sign in to comment.