From edc6d0ed0d762311c1dafc3cb8980d6a551a2aac Mon Sep 17 00:00:00 2001 From: Abhilash Shetty Date: Mon, 9 Oct 2023 18:46:29 +0000 Subject: [PATCH] chore(metrics-exporter): refactoring current implementation Signed-off-by: Abhilash Shetty --- Cargo.lock | 44 +++--- Cargo.toml | 2 +- .../mayastor/io/io-engine-daemonset.yaml | 6 +- ...> metrics-exporter-io-engine-service.yaml} | 4 +- exporter/src/lib.rs | 2 - exporter/src/pool/bin/main.rs | 147 ------------------ exporter/src/pool/cache.rs | 111 ------------- exporter/src/pool/mod.rs | 10 -- {exporter => metrics-exporter}/Cargo.toml | 7 +- {exporter => metrics-exporter}/README.md | 0 .../src/bin/io_engine/cache/mod.rs | 78 ++++++++++ .../src/bin/io_engine/cache/pool.rs | 46 ++++++ .../src/bin/io_engine}/client/grpc_client.rs | 51 ++++-- .../src/bin/io_engine}/client/mod.rs | 2 +- .../src/bin/io_engine}/client/pool.rs | 47 +++--- .../src/bin/io_engine}/collector/mod.rs | 2 +- .../src/bin/io_engine/collector/pool.rs | 93 +++++++---- .../src/bin/io_engine}/config.rs | 4 +- .../src/bin/io_engine}/error.rs | 3 +- metrics-exporter/src/bin/io_engine/main.rs | 102 ++++++++++++ .../src/bin/io_engine/serve/handler.rs | 38 +++++ .../src/bin/io_engine/serve/mod.rs | 7 + nix/pkgs/extensions/cargo-project.nix | 4 +- nix/pkgs/extensions/default.nix | 14 +- nix/pkgs/images/default.nix | 8 +- scripts/release.sh | 2 +- 26 files changed, 441 insertions(+), 393 deletions(-) rename chart/templates/mayastor/metrics/{metrics-exporter-pool-service.yaml => metrics-exporter-io-engine-service.yaml} (82%) delete mode 100644 exporter/src/lib.rs delete mode 100644 exporter/src/pool/bin/main.rs delete mode 100644 exporter/src/pool/cache.rs delete mode 100644 exporter/src/pool/mod.rs rename {exporter => metrics-exporter}/Cargo.toml (88%) rename {exporter => metrics-exporter}/README.md (100%) create mode 100644 metrics-exporter/src/bin/io_engine/cache/mod.rs create mode 100644 metrics-exporter/src/bin/io_engine/cache/pool.rs rename {exporter/src/pool => metrics-exporter/src/bin/io_engine}/client/grpc_client.rs (76%) rename {exporter/src/pool => metrics-exporter/src/bin/io_engine}/client/mod.rs (92%) rename {exporter/src/pool => metrics-exporter/src/bin/io_engine}/client/pool.rs (73%) rename {exporter/src/pool => metrics-exporter/src/bin/io_engine}/collector/mod.rs (56%) rename exporter/src/pool/collector/pools_collector.rs => metrics-exporter/src/bin/io_engine/collector/pool.rs (74%) rename {exporter/src/pool => metrics-exporter/src/bin/io_engine}/config.rs (92%) rename {exporter/src/pool => metrics-exporter/src/bin/io_engine}/error.rs (78%) create mode 100644 metrics-exporter/src/bin/io_engine/main.rs create mode 100644 metrics-exporter/src/bin/io_engine/serve/handler.rs create mode 100644 metrics-exporter/src/bin/io_engine/serve/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 4a0fafcc4..cf831e35a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1108,28 +1108,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "exporter" -version = "0.1.0" -dependencies = [ - "actix-service", - "actix-web", - "clap", - "humantime", - "mime", - "once_cell", - "prometheus", - "rpc", - "serde", - "serde_json", - "strum", - "strum_macros", - "tokio", - "tonic 0.9.2", - "tracing", - "utils", -] - [[package]] name = "fastrand" version = "2.0.0" @@ -1972,6 +1950,28 @@ version = "2.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" +[[package]] +name = "metrics-exporter" +version = "0.1.0" +dependencies = [ + "actix-service", + "actix-web", + "clap", + "humantime", + "mime", + "once_cell", + "prometheus", + "rpc", + "serde", + "serde_json", + "strum", + "strum_macros", + "tokio", + "tonic 0.9.2", + "tracing", + "utils", +] + [[package]] name = "mime" version = "0.3.17" diff --git a/Cargo.toml b/Cargo.toml index ed555db7f..e7351fc32 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ members = [ "call-home", "console-logger", - "exporter", + "metrics-exporter", "k8s/plugin", "k8s/proxy", "k8s/supportability", diff --git a/chart/templates/mayastor/io/io-engine-daemonset.yaml b/chart/templates/mayastor/io/io-engine-daemonset.yaml index 589eb83fb..940067574 100644 --- a/chart/templates/mayastor/io/io-engine-daemonset.yaml +++ b/chart/templates/mayastor/io/io-engine-daemonset.yaml @@ -38,8 +38,8 @@ spec: {{- include "base_init_containers" . }} containers: {{- if .Values.base.metrics.enabled }} - - name: metrics-exporter-pool - image: "{{ .Values.image.registry }}/{{ .Values.image.repo }}/{{ .Chart.Name }}-metrics-exporter-pool:{{ default .Values.image.tag .Values.image.repoTags.extensions }}" + - name: metrics-exporter-io-engine + image: "{{ .Values.image.registry }}/{{ .Values.image.repo }}/{{ .Chart.Name }}-metrics-exporter-io-engine:{{ default .Values.image.tag .Values.image.repoTags.extensions }}" imagePullPolicy: {{ .Values.image.pullPolicy }} env: - name: MY_NODE_NAME @@ -53,8 +53,6 @@ spec: args: - "-p{{ .Values.base.metrics.pollingInterval }}" - "--api-versions={{ .Values.io_engine.api }}" - command: - - metrics-exporter-pool ports: - containerPort: 9502 protocol: TCP diff --git a/chart/templates/mayastor/metrics/metrics-exporter-pool-service.yaml b/chart/templates/mayastor/metrics/metrics-exporter-io-engine-service.yaml similarity index 82% rename from chart/templates/mayastor/metrics/metrics-exporter-pool-service.yaml rename to chart/templates/mayastor/metrics/metrics-exporter-io-engine-service.yaml index cc0024830..286a7089c 100644 --- a/chart/templates/mayastor/metrics/metrics-exporter-pool-service.yaml +++ b/chart/templates/mayastor/metrics/metrics-exporter-io-engine-service.yaml @@ -2,9 +2,9 @@ apiVersion: v1 kind: Service metadata: - name: {{ .Release.Name }}-metrics-exporter-pool + name: {{ .Release.Name }}-metrics-exporter-io-engine labels: - app: metrics-exporter-pool + app: metrics-exporter-io-engine {{ include "label_prefix" . }}/release: {{ .Release.Name }} {{ include "label_prefix" . }}/version: {{ .Chart.Version }} spec: diff --git a/exporter/src/lib.rs b/exporter/src/lib.rs deleted file mode 100644 index 2263f18a5..000000000 --- a/exporter/src/lib.rs +++ /dev/null @@ -1,2 +0,0 @@ -/// Pool Metrics Exporter. -pub mod pool; diff --git a/exporter/src/pool/bin/main.rs b/exporter/src/pool/bin/main.rs deleted file mode 100644 index 1bb358491..000000000 --- a/exporter/src/pool/bin/main.rs +++ /dev/null @@ -1,147 +0,0 @@ -use actix_web::{http::header, middleware, web, HttpResponse, HttpServer, Responder}; - -use actix_web::http::Uri; -use clap::Parser; -use exporter::pool::{ - cache, - cache::{Cache, Data}, - client::{ - grpc_client::{GrpcClient, GrpcContext, Timeouts}, - ApiVersion, - }, - collector::pools_collector::{get_node_name, PoolsCollector}, - config::ExporterConfig, - error::ExporterError, -}; -use prometheus::{Encoder, Registry}; -use std::{env, net::SocketAddr, time::Duration}; -use tracing::{error, warn}; - -/// Initialize exporter config that are passed through arguments -fn initialize_exporter(args: &Cli) { - ExporterConfig::initialize(args.metrics_endpoint, args.polling_time.into()); -} - -/// Initialize mayastor grpc client -async fn initialize_client(api_version: ApiVersion) -> Result { - let timeout = Timeouts::new(Duration::from_secs(1), Duration::from_secs(5)); - let pod_ip = get_pod_ip()?; - let _ = get_node_name()?; - let endpoint = Uri::builder() - .scheme("https") - .authority(format!("{pod_ip}:10124")) - .path_and_query("") - .build() - .map_err(|error| ExporterError::InvalidURI(error.to_string()))?; - let ctx = GrpcContext::new(endpoint, timeout, api_version); - let client = GrpcClient::new(ctx).await?; - Ok(client) -} - -/// Initialize cache -async fn initialize_cache() { - Cache::initialize(Data::default()); -} - -#[derive(Parser, Debug)] -#[clap(name = utils::package_description!(), version = utils::version_info_str!())] -struct Cli { - /// TCP address where prometheus endpoint will listen to - #[clap(long, short, default_value = "0.0.0.0:9502")] - metrics_endpoint: SocketAddr, - - /// Polling time in seconds to get pools data through gRPC calls - #[clap(short, long, default_value = "300s")] - polling_time: humantime::Duration, - - /// Io engine api versions - #[clap(short, long, value_delimiter = ',', required = true)] - api_versions: Vec, -} - -impl Cli { - fn args() -> Self { - Cli::parse() - } -} - -#[tokio::main] -async fn main() -> Result<(), String> { - let args = Cli::args(); - - utils::print_package_info!(); - - utils::tracing_telemetry::init_tracing("metrics-exporter-pool", vec![], None); - - initialize_exporter(&args); - - initialize_cache().await; - - // sort to get the latest api version - let mut api_versions = args.api_versions; - api_versions.sort_by(|a, b| b.cmp(a)); - - let client = initialize_client(api_versions.get(0).unwrap_or(&ApiVersion::V0).clone()) - .await - .expect("gRPC client not initialized"); - - tokio::spawn(async move { - cache::store_data(client) - .await - .expect("Unable to store data in cache"); - }); - - let app = move || { - actix_web::App::new() - .wrap(middleware::Logger::default()) - .configure(metric_route) - }; - - HttpServer::new(app) - .bind(ExporterConfig::get_config().metrics_endpoint()) - .unwrap() - .workers(1) - .run() - .await - .expect("Port should be free to expose the metrics"); - Ok(()) -} - -fn metric_route(cfg: &mut web::ServiceConfig) { - cfg.route("/metrics", web::get().to(metrics_handlers)); -} - -/// Handler for prometheus -async fn metrics_handlers() -> impl Responder { - // Initialize pools collector - let pools_collector = PoolsCollector::default(); - // Create a new registry for prometheus - let registry = Registry::default(); - // Register pools collector in the registry - if let Err(error) = Registry::register(®istry, Box::new(pools_collector)) { - warn!(%error, "Pools collector already registered"); - } - let mut buffer = Vec::new(); - - let encoder = prometheus::TextEncoder::new(); - // Starts collecting metrics via calling gatherers - if let Err(error) = encoder.encode(®istry.gather(), &mut buffer) { - error!(%error, "Could not encode custom metrics"); - }; - - let res_custom = match String::from_utf8(buffer.clone()) { - Ok(v) => v, - Err(error) => { - error!(%error, "Prometheus metrics could not be parsed from_utf8'd"); - String::default() - } - }; - HttpResponse::Ok() - .insert_header(header::ContentType(mime::TEXT_PLAIN)) - .body(res_custom) -} - -// get pod ip -fn get_pod_ip() -> Result { - env::var("MY_POD_IP").map_err(|_| ExporterError::PodIPError("Unable to get pod ip".to_string())) -} diff --git a/exporter/src/pool/cache.rs b/exporter/src/pool/cache.rs deleted file mode 100644 index 314560682..000000000 --- a/exporter/src/pool/cache.rs +++ /dev/null @@ -1,111 +0,0 @@ -use std::{ops::DerefMut, sync::Mutex}; - -use once_cell::sync::OnceCell; -use serde::{Deserialize, Serialize}; -use tokio::time::sleep; -use tracing::{debug, error}; - -use crate::pool::{ - client::{ - grpc_client::GrpcClient, - pool::{PoolOperations, Pools}, - }, - config::ExporterConfig, -}; - -static CACHE: OnceCell> = OnceCell::new(); - -/// Wrapper over all the data that has to be stored in cache. -#[derive(Serialize, Deserialize, Debug)] -pub struct Data { - pools: Pools, -} - -impl Default for Data { - fn default() -> Self { - Self::new() - } -} - -impl Data { - // initialize Data - fn new() -> Self { - Self { - pools: Pools { pools: vec![] }, - } - } - - /// Get pools data. - pub fn pools(&self) -> &Pools { - &self.pools - } - - // Set pools data. - fn set_pools(&mut self, pools: Pools) { - self.pools = pools; - } - - // Invalidate pools for cache. - fn invalidate_pools(&mut self) { - self.pools = Pools { pools: vec![] }; - } -} - -/// Cache to store data that has to be exposed though exporter. -pub struct Cache { - data: Data, -} - -impl Cache { - /// Initialize the cache with default value. - pub fn initialize(data: Data) { - CACHE.get_or_init(|| Mutex::new(Self { data })); - } - - /// Returns cache. - pub fn get_cache() -> &'static Mutex { - CACHE.get().expect("Cache is not initialized") - } - - /// Get data field in cache. - pub fn data_mut(&mut self) -> &mut Data { - &mut self.data - } -} - -/// To store pools related data in cache. -pub async fn store_pool_data(client: GrpcClient) { - loop { - let pools = client.list_pools().await; - { - let mut cache = match Cache::get_cache().lock() { - Ok(cache) => cache, - Err(error) => { - error!(%error, "Error while getting cache resource"); - continue; - } - }; - let pools_cache = cache.deref_mut(); - match pools { - // set pools in the cache - Ok(pools) => { - debug!("Updated pool cache with latest metrics"); - pools_cache.data_mut().set_pools(pools); - } - // invalidate cache in case of error - Err(error) => { - error!(?error, "Error getting pools data, invalidating pools cache"); - pools_cache.data_mut().invalidate_pools(); - } - }; - } - sleep(ExporterConfig::get_config().polling_time()).await; - } -} - -/// To store data in shared variable i.e cache. -pub async fn store_data(client: GrpcClient) -> Result<(), String> { - // Store pools data - store_pool_data(client).await; - Ok(()) -} diff --git a/exporter/src/pool/mod.rs b/exporter/src/pool/mod.rs deleted file mode 100644 index 77715dc9f..000000000 --- a/exporter/src/pool/mod.rs +++ /dev/null @@ -1,10 +0,0 @@ -/// Cache module. -pub mod cache; -/// Grpc client module. -pub mod client; -/// Collector module. -pub mod collector; -/// Config module for exporter. -pub mod config; -/// Error module. -pub mod error; diff --git a/exporter/Cargo.toml b/metrics-exporter/Cargo.toml similarity index 88% rename from exporter/Cargo.toml rename to metrics-exporter/Cargo.toml index 8aa3c71de..bfa3f6002 100644 --- a/exporter/Cargo.toml +++ b/metrics-exporter/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "exporter" +name = "metrics-exporter" description = "Metrics Exporters" version = "0.1.0" edition = "2021" @@ -7,8 +7,9 @@ authors = ["Sahil Raja "] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [[bin]] -name = "metrics-exporter-pool" -path = "src/pool/bin/main.rs" +name = "metrics-exporter-io-engine" +path = "src/bin/io_engine/main.rs" + [dependencies] actix-web = { version = "4.4.0", features = ["rustls"] } diff --git a/exporter/README.md b/metrics-exporter/README.md similarity index 100% rename from exporter/README.md rename to metrics-exporter/README.md diff --git a/metrics-exporter/src/bin/io_engine/cache/mod.rs b/metrics-exporter/src/bin/io_engine/cache/mod.rs new file mode 100644 index 000000000..d689dd6e5 --- /dev/null +++ b/metrics-exporter/src/bin/io_engine/cache/mod.rs @@ -0,0 +1,78 @@ +mod pool; + +use crate::{ + client::{grpc_client::GrpcClient, pool::Pools}, + ExporterConfig, +}; + +use once_cell::sync::OnceCell; +use serde::{Deserialize, Serialize}; +use std::sync::Mutex; +use tokio::time::sleep; +static CACHE: OnceCell> = OnceCell::new(); + +/// Trait to be implemented by all Resource structs stored in Cache. +trait ResourceOps { + type ResourceVec; + fn set(&mut self, val: Self::ResourceVec); + fn invalidate(&mut self); +} + +/// Cache to store data that has to be exposed though metrics-exporter. +pub(crate) struct Cache { + data: Data, +} + +impl Cache { + /// Initialize the cache with default value. + pub fn initialize(data: Data) { + CACHE.get_or_init(|| Mutex::new(Self { data })); + } + + /// Returns cache. + pub fn get_cache() -> &'static Mutex { + CACHE.get().expect("Cache is not initialized") + } + + /// Get pool mutably stored in struct. + pub fn pool_mut(&mut self) -> &mut Pools { + &mut self.data.pools + } +} + +/// Wrapper over all the data that has to be stored in cache. +#[derive(Serialize, Deserialize, Debug)] +pub(crate) struct Data { + /// Contains Pool Capacity and state data. + pools: Pools, +} + +impl Default for Data { + fn default() -> Self { + Self::new() + } +} + +impl Data { + /// Constructor for Cache data. + fn new() -> Self { + Self { + pools: Pools { pools: vec![] }, + } + } +} + +/// To store data in shared variable i.e cache. +pub(crate) async fn store_data(client: GrpcClient) { + tokio::spawn(async move { + store_resource_data(client).await; + }); +} + +/// To store pools related data in cache. +async fn store_resource_data(client: GrpcClient) { + loop { + let _ = pool::store_pool_info_data(client.clone()).await; + sleep(ExporterConfig::get_config().polling_time()).await; + } +} diff --git a/metrics-exporter/src/bin/io_engine/cache/pool.rs b/metrics-exporter/src/bin/io_engine/cache/pool.rs new file mode 100644 index 000000000..a2e325731 --- /dev/null +++ b/metrics-exporter/src/bin/io_engine/cache/pool.rs @@ -0,0 +1,46 @@ +use super::{Cache, ResourceOps}; +use crate::client::{ + grpc_client::GrpcClient, + pool::{PoolInfo, PoolOperations, Pools}, +}; +use std::ops::DerefMut; +use tracing::{debug, error}; + +impl ResourceOps for Pools { + type ResourceVec = Vec; + + fn set(&mut self, val: Self::ResourceVec) { + self.pools = val + } + + fn invalidate(&mut self) { + self.pools = vec![] + } +} + +/// To store pools state and capacity data in cache. +pub(crate) async fn store_pool_info_data(client: GrpcClient) -> Result<(), ()> { + let pools = client.list_pools().await; + let mut cache = match Cache::get_cache().lock() { + Ok(cache) => cache, + Err(error) => { + error!(%error, "Error while getting cache resource"); + return Err(()); + } + }; + let pools_cache = cache.deref_mut(); + match pools { + // set pools in the cache + Ok(pools) => { + debug!("Updated pool cache with latest metrics"); + pools_cache.pool_mut().set(pools.pools) + } + // invalidate cache in case of error + Err(error) => { + error!(?error, "Error getting pools data, invalidating pools cache"); + pools_cache.pool_mut().invalidate(); + return Err(()); + } + }; + Ok(()) +} diff --git a/exporter/src/pool/client/grpc_client.rs b/metrics-exporter/src/bin/io_engine/client/grpc_client.rs similarity index 76% rename from exporter/src/pool/client/grpc_client.rs rename to metrics-exporter/src/bin/io_engine/client/grpc_client.rs index 9ecc9c10b..3537de0a7 100644 --- a/exporter/src/pool/client/grpc_client.rs +++ b/metrics-exporter/src/bin/io_engine/client/grpc_client.rs @@ -1,11 +1,11 @@ -use std::time::Duration; +use crate::{error::ExporterError, get_node_name, get_pod_ip, ApiVersion}; +use rpc::io_engine::IoEngineClientV0; -use crate::pool::{client::ApiVersion, error::ExporterError}; use actix_web::http::Uri; -use rpc::io_engine::IoEngineClientV0; +use std::time::Duration; use tokio::time::sleep; use tonic::transport::Channel; -use tracing::error; +use tracing::{error, info}; /// Timeout for gRPC. #[derive(Debug, Clone)] @@ -31,7 +31,7 @@ impl Timeouts { /// Context for Grpc client. #[derive(Debug, Clone)] -pub struct GrpcContext { +pub(crate) struct GrpcContext { endpoint: tonic::transport::Endpoint, timeouts: Timeouts, api_version: ApiVersion, @@ -50,21 +50,21 @@ impl GrpcContext { } } } -/// The V0 Mayastor client; -pub type MayaClientV0 = IoEngineClientV0; +/// The V0 Mayastor client. +type MayaClientV0 = IoEngineClientV0; /// The V1 PoolClient. -pub type PoolClient = rpc::v1::pool::pool_rpc_client::PoolRpcClient; +type PoolClient = rpc::v1::pool::pool_rpc_client::PoolRpcClient; /// A wrapper for client for the V1 dataplane interface. #[derive(Clone, Debug)] -pub struct MayaClientV1 { - pub pool: PoolClient, +pub(crate) struct MayaClientV1 { + pub(crate) pool: PoolClient, } -/// Grpc client +/// Dataplane grpc client. #[derive(Debug, Clone)] -pub struct GrpcClient { +pub(crate) struct GrpcClient { ctx: GrpcContext, v0_client: Option, v1_client: Option, @@ -72,7 +72,7 @@ pub struct GrpcClient { impl GrpcClient { /// Initialize gRPC client. - pub async fn new(context: GrpcContext) -> Result { + pub(crate) async fn new(context: GrpcContext) -> Result { let sleep_duration_sec = 10; loop { match context.api_version { @@ -112,11 +112,12 @@ impl GrpcClient { } Ok(result) => match result { Ok(pool) => { + info!("grpc connected successfully"); return Ok(Self { ctx: context.clone(), v0_client: None, v1_client: Some(MayaClientV1 { pool }), - }) + }); } Err(error) => { error!(error=%error, "Grpc client connection error, retrying after {}s",sleep_duration_sec); @@ -130,7 +131,7 @@ impl GrpcClient { } /// Get the v0 api client. - pub fn client_v0(&self) -> Result { + pub(crate) fn client_v0(&self) -> Result { match self.v0_client.clone() { Some(client) => Ok(client), None => Err(ExporterError::GrpcClientError( @@ -140,7 +141,7 @@ impl GrpcClient { } /// Get the v1 api client. - pub fn client_v1(&self) -> Result { + pub(crate) fn client_v1(&self) -> Result { match self.v1_client.clone() { Some(client) => Ok(client), None => Err(ExporterError::GrpcClientError( @@ -150,7 +151,23 @@ impl GrpcClient { } /// Get the api version. - pub fn api_version(&self) -> ApiVersion { + pub(crate) fn api_version(&self) -> ApiVersion { self.ctx.api_version.clone() } } + +/// Initialize mayastor grpc client. +pub(crate) async fn init_client(api_version: ApiVersion) -> Result { + let timeout = Timeouts::new(Duration::from_secs(1), Duration::from_secs(5)); + let pod_ip = get_pod_ip()?; + let _ = get_node_name()?; + let endpoint = Uri::builder() + .scheme("https") + .authority(format!("{pod_ip}:10124")) + .path_and_query("") + .build() + .map_err(|error| ExporterError::InvalidURI(error.to_string()))?; + let ctx = GrpcContext::new(endpoint, timeout, api_version); + let client = GrpcClient::new(ctx).await?; + Ok(client) +} diff --git a/exporter/src/pool/client/mod.rs b/metrics-exporter/src/bin/io_engine/client/mod.rs similarity index 92% rename from exporter/src/pool/client/mod.rs rename to metrics-exporter/src/bin/io_engine/client/mod.rs index 3c9c8b2d2..67c5a56c9 100644 --- a/exporter/src/pool/client/mod.rs +++ b/metrics-exporter/src/bin/io_engine/client/mod.rs @@ -1,6 +1,6 @@ /// Grpc client module. pub mod grpc_client; -/// Pool module. +/// PoolInfo module. pub mod pool; #[derive( diff --git a/exporter/src/pool/client/pool.rs b/metrics-exporter/src/bin/io_engine/client/pool.rs similarity index 73% rename from exporter/src/pool/client/pool.rs rename to metrics-exporter/src/bin/io_engine/client/pool.rs index 32d71e25d..be5864944 100644 --- a/exporter/src/pool/client/pool.rs +++ b/metrics-exporter/src/bin/io_engine/client/pool.rs @@ -1,65 +1,60 @@ -use serde::{Deserialize, Serialize}; +use crate::{client::grpc_client::GrpcClient, error::ExporterError, ApiVersion}; -use crate::pool::{ - client::{grpc_client::GrpcClient, ApiVersion}, - error::ExporterError, -}; +use serde::{Deserialize, Serialize}; -/// Pool resource. +/// This stores Capacity and state information of a pool. #[derive(Serialize, Deserialize, Debug, Clone)] -pub struct Pool { +pub(crate) struct PoolInfo { name: String, - disks: Vec, used: u64, capacity: u64, state: u64, committed: u64, } -impl Pool { +impl PoolInfo { /// Get name of the pool. - pub fn name(&self) -> &String { + pub(crate) fn name(&self) -> &String { &self.name } /// Get used capacity of the pool. - pub fn used(&self) -> u64 { + pub(crate) fn used(&self) -> u64 { self.used } /// Get total capacity of the pool. - pub fn capacity(&self) -> u64 { + pub(crate) fn capacity(&self) -> u64 { self.capacity } /// Get the pool commitment in bytes. - pub fn committed(&self) -> u64 { + pub(crate) fn committed(&self) -> u64 { self.committed } - /// Get state of the pool. - pub fn state(&self) -> u64 { + /// Get pool of the io_engine. + pub(crate) fn state(&self) -> u64 { self.state } } -/// Pools resource. +/// Array of PoolInfo objects. #[derive(Serialize, Deserialize, Debug, Clone)] -pub struct Pools { - pub pools: Vec, +pub(crate) struct Pools { + pub(crate) pools: Vec, } -/// Pool operations i.e wrapper over rpc calls to get pools data. +/// Trait to be implemented by grpc client to call pool rpc. #[tonic::async_trait] -pub trait PoolOperations: Send + Sync + Sized { +pub(crate) trait PoolOperations: Send + Sync + Sized { async fn list_pools(&self) -> Result; } -impl From for Pool { +impl From for PoolInfo { fn from(value: rpc::io_engine::Pool) -> Self { Self { name: value.name, - disks: value.disks, used: value.used, capacity: value.capacity, state: value.state as u64, @@ -67,11 +62,10 @@ impl From for Pool { } } } -impl From for Pool { +impl From for PoolInfo { fn from(value: rpc::v1::pool::Pool) -> Self { Self { name: value.name, - disks: value.disks, used: value.used, capacity: value.capacity, state: value.state as u64, @@ -82,7 +76,6 @@ impl From for Pool { #[tonic::async_trait] impl PoolOperations for GrpcClient { - // wrapper over list_pools rpc call async fn list_pools(&self) -> Result { let pools = match self.api_version() { ApiVersion::V0 => match self.client_v0()?.list_pools(rpc::io_engine::Null {}).await { @@ -90,7 +83,7 @@ impl PoolOperations for GrpcClient { .into_inner() .pools .into_iter() - .map(Pool::from) + .map(PoolInfo::from) .collect::>(), Err(error) => return Err(ExporterError::GrpcResponseError(error.to_string())), }, @@ -104,7 +97,7 @@ impl PoolOperations for GrpcClient { .into_inner() .pools .into_iter() - .map(Pool::from) + .map(PoolInfo::from) .collect::>(), Err(error) => return Err(ExporterError::GrpcResponseError(error.to_string())), }, diff --git a/exporter/src/pool/collector/mod.rs b/metrics-exporter/src/bin/io_engine/collector/mod.rs similarity index 56% rename from exporter/src/pool/collector/mod.rs rename to metrics-exporter/src/bin/io_engine/collector/mod.rs index 00b563a7d..c686087c5 100644 --- a/exporter/src/pool/collector/mod.rs +++ b/metrics-exporter/src/bin/io_engine/collector/mod.rs @@ -1,2 +1,2 @@ /// Module for pools collector. -pub mod pools_collector; +pub mod pool; diff --git a/exporter/src/pool/collector/pools_collector.rs b/metrics-exporter/src/bin/io_engine/collector/pool.rs similarity index 74% rename from exporter/src/pool/collector/pools_collector.rs rename to metrics-exporter/src/bin/io_engine/collector/pool.rs index f6f054e81..a24816fbf 100644 --- a/exporter/src/pool/collector/pools_collector.rs +++ b/metrics-exporter/src/bin/io_engine/collector/pool.rs @@ -1,31 +1,28 @@ -use std::{env, fmt::Debug, ops::DerefMut}; - +use crate::{cache::Cache, client::pool::PoolInfo, get_node_name}; use prometheus::{ core::{Collector, Desc}, GaugeVec, Opts, }; +use std::{fmt::Debug, ops::DerefMut}; use tracing::error; -use crate::pool::{cache::Cache, client::pool::Pool, error::ExporterError}; - -/// PoolsCollector contains the list of custom metrics that has to be exposed by exporter. +/// Collects Pool capacity metrics from cache. #[derive(Clone, Debug)] -pub struct PoolsCollector { +pub(crate) struct PoolCapacityCollector { pool_total_size: GaugeVec, pool_used_size: GaugeVec, pool_committed_size: GaugeVec, - pool_status: GaugeVec, descs: Vec, } -impl Default for PoolsCollector { +impl Default for PoolCapacityCollector { fn default() -> Self { Self::new() } } -impl PoolsCollector { - /// Initialize all the metrics to be defined for pools collector. +impl PoolCapacityCollector { + /// Initialize all the metrics to be defined for pools capacity collector. pub fn new() -> Self { let pool_total_size_opts = Opts::new("total_size_bytes", "Total size of the pool in bytes") .subsystem("disk_pool") @@ -39,9 +36,6 @@ impl PoolsCollector { ) .subsystem("disk_pool") .variable_labels(vec!["node".to_string(), "name".to_string()]); - let pool_status_opts = Opts::new("status", "Status of the pool") - .subsystem("disk_pool") - .variable_labels(vec!["node".to_string(), "name".to_string()]); let mut descs = Vec::new(); let pool_total_size = GaugeVec::new(pool_total_size_opts, &["node", "name"]) @@ -50,26 +44,21 @@ impl PoolsCollector { .expect("Unable to create gauge metric type for pool_used_size"); let pool_committed_size = GaugeVec::new(pool_committed_size_opts, &["node", "name"]) .expect("Unable to create gauge metric type for pool_committed_size"); - let pool_status = GaugeVec::new(pool_status_opts, &["node", "name"]) - .expect("Unable to create gauge metric type for pool_status"); // Descriptors for the custom metrics descs.extend(pool_total_size.desc().into_iter().cloned()); descs.extend(pool_used_size.desc().into_iter().cloned()); descs.extend(pool_committed_size.desc().into_iter().cloned()); - descs.extend(pool_status.desc().into_iter().cloned()); Self { pool_total_size, pool_used_size, pool_committed_size, - pool_status, descs, } } } -/// Prometheus collector implementation -impl Collector for PoolsCollector { +impl Collector for PoolCapacityCollector { fn desc(&self) -> Vec<&prometheus::core::Desc> { self.descs.iter().collect() } @@ -83,7 +72,7 @@ impl Collector for PoolsCollector { } }; let cp = c.deref_mut(); - let mut metric_family = Vec::with_capacity(3 * cp.data_mut().pools().pools.capacity()); + let mut metric_family = Vec::with_capacity(3 * cp.pool_mut().pools.capacity()); let node_name = match get_node_name() { Ok(name) => name, Err(error) => { @@ -92,8 +81,8 @@ impl Collector for PoolsCollector { } }; - for i in &cp.data_mut().pools().pools { - let p: &Pool = i; + for i in &cp.pool_mut().pools { + let p: &PoolInfo = i; let pool_total_size = match self .pool_total_size @@ -136,7 +125,61 @@ impl Collector for PoolsCollector { pool_committed_size.set(p.committed() as f64); let mut x = pool_committed_size.collect(); metric_family.extend(x.pop()); + } + metric_family + } +} + +/// Collects pool status info from cache. +#[derive(Clone, Debug)] +pub(crate) struct PoolStatusCollector { + pool_status: GaugeVec, + descs: Vec, +} + +impl Default for PoolStatusCollector { + fn default() -> Self { + Self::new() + } +} +impl PoolStatusCollector { + /// Initialize all the metrics to be defined for pools status collector. + pub fn new() -> Self { + let pool_status_opts = Opts::new("status", "Status of the pool") + .subsystem("disk_pool") + .variable_labels(vec!["node".to_string(), "name".to_string()]); + let mut descs = Vec::new(); + let pool_status = GaugeVec::new(pool_status_opts, &["node", "name"]) + .expect("Unable to create gauge metric type for pool_status"); + descs.extend(pool_status.desc().into_iter().cloned()); + Self { pool_status, descs } + } +} + +impl Collector for PoolStatusCollector { + fn desc(&self) -> Vec<&prometheus::core::Desc> { + self.descs.iter().collect() + } + fn collect(&self) -> Vec { + let mut c = match Cache::get_cache().lock() { + Ok(c) => c, + Err(error) => { + error!(%error,"Error while getting cache resource"); + return Vec::new(); + } + }; + let cp = c.deref_mut(); + let mut metric_family = Vec::with_capacity(3 * cp.pool_mut().pools.capacity()); + let node_name = match get_node_name() { + Ok(name) => name, + Err(error) => { + error!(?error, "Unable to get node name"); + return metric_family; + } + }; + for i in &cp.pool_mut().pools { + let p: &PoolInfo = i; let pool_status = match self .pool_status .get_metric_with_label_values(&[node_name.clone().as_str(), p.name().as_str()]) @@ -154,9 +197,3 @@ impl Collector for PoolsCollector { metric_family } } - -/// Get node name from pod spec. -pub fn get_node_name() -> Result { - env::var("MY_NODE_NAME") - .map_err(|_| ExporterError::GetNodeError("Unable to get node name".to_string())) -} diff --git a/exporter/src/pool/config.rs b/metrics-exporter/src/bin/io_engine/config.rs similarity index 92% rename from exporter/src/pool/config.rs rename to metrics-exporter/src/bin/io_engine/config.rs index 3b3beac45..0673bc09b 100644 --- a/exporter/src/pool/config.rs +++ b/metrics-exporter/src/bin/io_engine/config.rs @@ -14,7 +14,7 @@ pub struct ExporterConfig { } impl ExporterConfig { - /// Initialize exporter configs. + /// Initialize metrics-exporter configs. pub fn initialize(addr: SocketAddr, polling_time: Duration) { CONFIG.get_or_init(|| Self { metrics_endpoint: addr, @@ -22,7 +22,7 @@ impl ExporterConfig { }); } - /// Get exporter config. + /// Get metrics-exporter config. pub fn get_config() -> &'static ExporterConfig { CONFIG.get().expect("Exporter config is not initialized") } diff --git a/exporter/src/pool/error.rs b/metrics-exporter/src/bin/io_engine/error.rs similarity index 78% rename from exporter/src/pool/error.rs rename to metrics-exporter/src/bin/io_engine/error.rs index 6a592b2e3..08693ec4b 100644 --- a/exporter/src/pool/error.rs +++ b/metrics-exporter/src/bin/io_engine/error.rs @@ -4,7 +4,8 @@ pub enum ExporterError { GrpcResponseError(String), GetNodeError(String), InvalidURI(String), - DeserializationError(String), PodIPError(String), GrpcClientError(String), + HttpServerError(String), + HttpBindError(String), } diff --git a/metrics-exporter/src/bin/io_engine/main.rs b/metrics-exporter/src/bin/io_engine/main.rs new file mode 100644 index 000000000..765ad74f2 --- /dev/null +++ b/metrics-exporter/src/bin/io_engine/main.rs @@ -0,0 +1,102 @@ +use crate::{ + cache::store_data, + client::{grpc_client::init_client, ApiVersion}, + config::ExporterConfig, + error::ExporterError, + serve::metric_route, +}; +use actix_web::{middleware, HttpServer}; +use clap::Parser; +use std::{env, net::SocketAddr}; + +/// Cache module for exporter. +pub(crate) mod cache; +/// Grpc client module. +pub(crate) mod client; +/// Collector module. +pub(crate) mod collector; +/// Config module for metrics-exporter. +pub(crate) mod config; +/// Error module. +pub(crate) mod error; +/// Prometheus metrics handler module. +pub(crate) mod serve; + +/// Initialize metrics-exporter config that are passed through arguments. +fn initialize_exporter(args: &Cli) { + ExporterConfig::initialize(args.metrics_endpoint, args.polling_time.into()); +} + +/// Initialize cache. +async fn initialize_cache() { + cache::Cache::initialize(cache::Data::default()); +} + +/// Get pod ip from env. +fn get_pod_ip() -> Result { + env::var("MY_POD_IP").map_err(|_| ExporterError::PodIPError("Unable to get pod ip".to_string())) +} + +/// Get node name from env. +fn get_node_name() -> Result { + env::var("MY_NODE_NAME") + .map_err(|_| ExporterError::GetNodeError("Unable to get node name".to_string())) +} + +#[derive(Parser, Debug)] +#[clap(name = utils::package_description!(), version = utils::version_info_str!())] +struct Cli { + /// TCP address where prometheus endpoint will listen to + #[clap(long, short, default_value = "0.0.0.0:9502")] + metrics_endpoint: SocketAddr, + + /// Polling time in seconds to get pools data through gRPC calls + #[clap(short, long, default_value = "300s")] + polling_time: humantime::Duration, + + /// Io engine api versions + #[clap(short, long, value_delimiter = ',', required = true)] + api_versions: Vec, +} + +impl Cli { + fn args() -> Self { + Cli::parse() + } +} + +#[tokio::main] +async fn main() -> Result<(), ExporterError> { + let args = Cli::args(); + + utils::print_package_info!(); + + utils::tracing_telemetry::init_tracing("metrics-exporter-io_engine", vec![], None); + + initialize_exporter(&args); + + initialize_cache().await; + + // sort to get the latest api version + let mut api_versions = args.api_versions; + api_versions.sort_by(|a, b| b.cmp(a)); + + let client = init_client(api_versions.get(0).unwrap_or(&ApiVersion::V0).clone()).await?; + + store_data(client).await; + let app = move || { + actix_web::App::new() + .wrap(middleware::Logger::default()) + .configure(metric_route) + }; + HttpServer::new(app) + .bind(ExporterConfig::get_config().metrics_endpoint()) + .map_err(|_| { + ExporterError::HttpBindError("Failed to bind endpoint to http server".to_string()) + })? + .workers(1) + .run() + .await + .map_err(|_| ExporterError::HttpServerError("Failed to start http Service".to_string()))?; + Ok(()) +} diff --git a/metrics-exporter/src/bin/io_engine/serve/handler.rs b/metrics-exporter/src/bin/io_engine/serve/handler.rs new file mode 100644 index 000000000..2f2976beb --- /dev/null +++ b/metrics-exporter/src/bin/io_engine/serve/handler.rs @@ -0,0 +1,38 @@ +use crate::collector::pool::{PoolCapacityCollector, PoolStatusCollector}; +use actix_web::{http::header, HttpResponse, Responder}; +use prometheus::{Encoder, Registry}; +use tracing::{error, warn}; + +/// Handler for metrics. Initializes all collector and serves data over Http. +pub(crate) async fn metrics_handler() -> impl Responder { + let pools_collector = PoolCapacityCollector::default(); + let pool_status_collector = PoolStatusCollector::default(); + // Create a new registry for prometheus + let registry = Registry::default(); + // Register pools collector in the registry + if let Err(error) = Registry::register(®istry, Box::new(pools_collector)) { + warn!(%error, "Pools collector already registered"); + } + if let Err(error) = Registry::register(®istry, Box::new(pool_status_collector)) { + warn!(%error, "Pools status collector already registered"); + } + + let mut buffer = Vec::new(); + + let encoder = prometheus::TextEncoder::new(); + // Starts collecting metrics via calling gatherers + if let Err(error) = encoder.encode(®istry.gather(), &mut buffer) { + error!(%error, "Could not encode custom metrics"); + }; + + let res_custom = match String::from_utf8(buffer.clone()) { + Ok(v) => v, + Err(error) => { + error!(%error, "Prometheus metrics could not be parsed from_utf8'd"); + String::default() + } + }; + HttpResponse::Ok() + .insert_header(header::ContentType(mime::TEXT_PLAIN)) + .body(res_custom) +} diff --git a/metrics-exporter/src/bin/io_engine/serve/mod.rs b/metrics-exporter/src/bin/io_engine/serve/mod.rs new file mode 100644 index 000000000..9607c0013 --- /dev/null +++ b/metrics-exporter/src/bin/io_engine/serve/mod.rs @@ -0,0 +1,7 @@ +use actix_web::web; +/// module for prometheus handlers. +mod handler; + +pub(crate) fn metric_route(cfg: &mut web::ServiceConfig) { + cfg.route("/metrics", web::get().to(handler::metrics_handler)); +} diff --git a/nix/pkgs/extensions/cargo-project.nix b/nix/pkgs/extensions/cargo-project.nix index 7a11c26b0..1003a0830 100644 --- a/nix/pkgs/extensions/cargo-project.nix +++ b/nix/pkgs/extensions/cargo-project.nix @@ -57,7 +57,7 @@ let src_list = [ "Cargo.lock" "Cargo.toml" - "exporter" + "metrics-exporter" "rpc" "console-logger" "call-home" @@ -119,7 +119,7 @@ in build = { buildType, cargoBuildFlags ? [ ] }: if allInOne then - builder { inherit buildType; cargoBuildFlags = [ "-p rpc" "-p exporter" "-p call-home" "-p upgrade" ]; } + builder { inherit buildType; cargoBuildFlags = [ "-p rpc" "-p metrics-exporter" "-p call-home" "-p upgrade" ]; } else builder { inherit buildType cargoBuildFlags; }; } diff --git a/nix/pkgs/extensions/default.nix b/nix/pkgs/extensions/default.nix index 4e0bfa9f7..de621d0a5 100644 --- a/nix/pkgs/extensions/default.nix +++ b/nix/pkgs/extensions/default.nix @@ -21,18 +21,18 @@ let }; components = { buildType, builder }: rec { - exporters = { - metrics = rec { + metrics = { + exporter = rec { recurseForDerivations = true; - metrics_builder = { buildType, builder, cargoBuildFlags ? [ "-p exporter" ] }: builder.build { inherit buildType cargoBuildFlags; }; + metrics_builder = { buildType, builder, cargoBuildFlags ? [ "-p metrics-exporter" ] }: builder.build { inherit buildType cargoBuildFlags; }; metrics_installer = { pname, src }: installer { inherit pname src; }; - pool = metrics_installer { + io-engine = metrics_installer { src = if allInOne then metrics_builder { inherit buildType builder; } else - metrics_builder { inherit buildType builder; cargoBuildFlags = [ "--bin metrics-exporter-pool" ]; }; - pname = "metrics-exporter-pool"; + metrics_builder { inherit buildType builder; cargoBuildFlags = [ "--bin metrics-exporter-io-engine" ]; }; + pname = "metrics-exporter-io-engine"; }; }; }; @@ -64,7 +64,7 @@ let stats = obs_installer { src = if allInOne then - obs_builder { inherit buildType builder; cargoBuildFlags = [ "-p call-home-stats" ]; } + obs_builder { inherit buildType builder; cargoBuildFlags = [ "-p call-home-stats" ]; } else obs_builder { inherit buildType builder; cargoBuildFlags = [ "--bin call-home-stats" ]; }; pname = "obs-callhome-stats"; diff --git a/nix/pkgs/images/default.nix b/nix/pkgs/images/default.nix index 3626e633d..570743fc6 100644 --- a/nix/pkgs/images/default.nix +++ b/nix/pkgs/images/default.nix @@ -19,9 +19,9 @@ let } // config; }; build-exporter-image = { buildType }: { - pool = build-extensions-image rec{ + io-engine = build-extensions-image rec{ inherit buildType; - package = extensions.${buildType}.exporters.metrics.pool; + package = extensions.${buildType}.metrics.exporter.io-engine; pname = package.pname; config = { ExposedPorts = { @@ -93,7 +93,7 @@ let in let build-exporter-images = { buildType }: { - metrics = build-exporter-image { + exporter = build-exporter-image { inherit buildType; }; }; @@ -116,7 +116,7 @@ let in let build-images = { buildType }: { - exporters = build-exporter-images { inherit buildType; } // { + metrics = build-exporter-images { inherit buildType; } // { recurseForDerivations = true; }; upgrade = build-upgrade-images { inherit buildType; } // { diff --git a/scripts/release.sh b/scripts/release.sh index bae44eefe..2f5ea20e7 100755 --- a/scripts/release.sh +++ b/scripts/release.sh @@ -67,7 +67,7 @@ HASH=`get_hash` BRANCH=`git rev-parse --abbrev-ref HEAD` BRANCH=${BRANCH////-} IMAGES= -DEFAULT_IMAGES="exporters.metrics.pool obs.callhome stats.aggregator upgrade.job" +DEFAULT_IMAGES="metrics.exporter.io-engine obs.callhome stats.aggregator upgrade.job" IMAGES_THAT_REQUIRE_HELM_CHART=("upgrade.job") UPLOAD= SKIP_PUBLISH=