diff --git a/Cargo.lock b/Cargo.lock index 1d999aeac..c485f497d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1983,6 +1983,28 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +[[package]] +name = "metrics-exporter" +version = "0.1.0" +dependencies = [ + "actix-service", + "actix-web", + "clap", + "humantime", + "mime", + "once_cell", + "prometheus", + "rpc", + "serde", + "serde_json", + "strum", + "strum_macros", + "tokio", + "tonic", + "tracing", + "utils", +] + [[package]] name = "mime" version = "0.3.17" diff --git a/Cargo.toml b/Cargo.toml index ed555db7f..e7351fc32 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ members = [ "call-home", "console-logger", - "exporter", + "metrics-exporter", "k8s/plugin", "k8s/proxy", "k8s/supportability", diff --git a/README.md b/README.md index c7887bfc5..56ca0e899 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# mayastor-extensions +main.rs# mayastor-extensions Components and utilities which extend the core Mayastor (control & data plane) functionality ## License diff --git a/chart/templates/mayastor/io/io-engine-daemonset.yaml b/chart/templates/mayastor/io/io-engine-daemonset.yaml index 83d999843..dc05032cd 100644 --- a/chart/templates/mayastor/io/io-engine-daemonset.yaml +++ b/chart/templates/mayastor/io/io-engine-daemonset.yaml @@ -38,8 +38,8 @@ spec: {{- include "base_init_containers" . }} containers: {{- if .Values.base.metrics.enabled }} - - name: metrics-exporter-pool - image: "{{ .Values.image.registry }}/{{ .Values.image.repo }}/{{ .Chart.Name }}-metrics-exporter-pool:{{ default .Values.image.tag .Values.image.repoTags.extensions }}" + - name: metrics-exporter-io-engine + image: "{{ .Values.image.registry }}/{{ .Values.image.repo }}/{{ .Chart.Name }}-metrics-exporter-io-engine:{{ default .Values.image.tag .Values.image.repoTags.extensions }}" imagePullPolicy: {{ .Values.image.pullPolicy }} env: - name: MY_NODE_NAME @@ -54,7 +54,7 @@ spec: - "-p{{ .Values.base.metrics.pollingInterval }}" - "--api-versions={{ .Values.io_engine.api }}" command: - - metrics-exporter-pool + - metrics-exporter-io-engine ports: - containerPort: 9502 protocol: TCP diff --git a/chart/templates/mayastor/metrics/metrics-exporter-pool-service.yaml b/chart/templates/mayastor/metrics/metrics-exporter-io-engine-service.yaml similarity index 82% rename from chart/templates/mayastor/metrics/metrics-exporter-pool-service.yaml rename to chart/templates/mayastor/metrics/metrics-exporter-io-engine-service.yaml index cc0024830..286a7089c 100644 --- a/chart/templates/mayastor/metrics/metrics-exporter-pool-service.yaml +++ b/chart/templates/mayastor/metrics/metrics-exporter-io-engine-service.yaml @@ -2,9 +2,9 @@ apiVersion: v1 kind: Service metadata: - name: {{ .Release.Name }}-metrics-exporter-pool + name: {{ .Release.Name }}-metrics-exporter-io-engine labels: - app: metrics-exporter-pool + app: metrics-exporter-io-engine {{ include "label_prefix" . }}/release: {{ .Release.Name }} {{ include "label_prefix" . }}/version: {{ .Chart.Version }} spec: diff --git a/exporter/src/lib.rs b/exporter/src/lib.rs deleted file mode 100644 index 2263f18a5..000000000 --- a/exporter/src/lib.rs +++ /dev/null @@ -1,2 +0,0 @@ -/// Pool Metrics Exporter. -pub mod pool; diff --git a/exporter/src/pool/bin/main.rs b/exporter/src/pool/bin/main.rs deleted file mode 100644 index 1bb358491..000000000 --- a/exporter/src/pool/bin/main.rs +++ /dev/null @@ -1,147 +0,0 @@ -use actix_web::{http::header, middleware, web, HttpResponse, HttpServer, Responder}; - -use actix_web::http::Uri; -use clap::Parser; -use exporter::pool::{ - cache, - cache::{Cache, Data}, - client::{ - grpc_client::{GrpcClient, GrpcContext, Timeouts}, - ApiVersion, - }, - collector::pools_collector::{get_node_name, PoolsCollector}, - config::ExporterConfig, - error::ExporterError, -}; -use prometheus::{Encoder, Registry}; -use std::{env, net::SocketAddr, time::Duration}; -use tracing::{error, warn}; - -/// Initialize exporter config that are passed through arguments -fn initialize_exporter(args: &Cli) { - ExporterConfig::initialize(args.metrics_endpoint, args.polling_time.into()); -} - -/// Initialize mayastor grpc client -async fn initialize_client(api_version: ApiVersion) -> Result { - let timeout = Timeouts::new(Duration::from_secs(1), Duration::from_secs(5)); - let pod_ip = get_pod_ip()?; - let _ = get_node_name()?; - let endpoint = Uri::builder() - .scheme("https") - .authority(format!("{pod_ip}:10124")) - .path_and_query("") - .build() - .map_err(|error| ExporterError::InvalidURI(error.to_string()))?; - let ctx = GrpcContext::new(endpoint, timeout, api_version); - let client = GrpcClient::new(ctx).await?; - Ok(client) -} - -/// Initialize cache -async fn initialize_cache() { - Cache::initialize(Data::default()); -} - -#[derive(Parser, Debug)] -#[clap(name = utils::package_description!(), version = utils::version_info_str!())] -struct Cli { - /// TCP address where prometheus endpoint will listen to - #[clap(long, short, default_value = "0.0.0.0:9502")] - metrics_endpoint: SocketAddr, - - /// Polling time in seconds to get pools data through gRPC calls - #[clap(short, long, default_value = "300s")] - polling_time: humantime::Duration, - - /// Io engine api versions - #[clap(short, long, value_delimiter = ',', required = true)] - api_versions: Vec, -} - -impl Cli { - fn args() -> Self { - Cli::parse() - } -} - -#[tokio::main] -async fn main() -> Result<(), String> { - let args = Cli::args(); - - utils::print_package_info!(); - - utils::tracing_telemetry::init_tracing("metrics-exporter-pool", vec![], None); - - initialize_exporter(&args); - - initialize_cache().await; - - // sort to get the latest api version - let mut api_versions = args.api_versions; - api_versions.sort_by(|a, b| b.cmp(a)); - - let client = initialize_client(api_versions.get(0).unwrap_or(&ApiVersion::V0).clone()) - .await - .expect("gRPC client not initialized"); - - tokio::spawn(async move { - cache::store_data(client) - .await - .expect("Unable to store data in cache"); - }); - - let app = move || { - actix_web::App::new() - .wrap(middleware::Logger::default()) - .configure(metric_route) - }; - - HttpServer::new(app) - .bind(ExporterConfig::get_config().metrics_endpoint()) - .unwrap() - .workers(1) - .run() - .await - .expect("Port should be free to expose the metrics"); - Ok(()) -} - -fn metric_route(cfg: &mut web::ServiceConfig) { - cfg.route("/metrics", web::get().to(metrics_handlers)); -} - -/// Handler for prometheus -async fn metrics_handlers() -> impl Responder { - // Initialize pools collector - let pools_collector = PoolsCollector::default(); - // Create a new registry for prometheus - let registry = Registry::default(); - // Register pools collector in the registry - if let Err(error) = Registry::register(®istry, Box::new(pools_collector)) { - warn!(%error, "Pools collector already registered"); - } - let mut buffer = Vec::new(); - - let encoder = prometheus::TextEncoder::new(); - // Starts collecting metrics via calling gatherers - if let Err(error) = encoder.encode(®istry.gather(), &mut buffer) { - error!(%error, "Could not encode custom metrics"); - }; - - let res_custom = match String::from_utf8(buffer.clone()) { - Ok(v) => v, - Err(error) => { - error!(%error, "Prometheus metrics could not be parsed from_utf8'd"); - String::default() - } - }; - HttpResponse::Ok() - .insert_header(header::ContentType(mime::TEXT_PLAIN)) - .body(res_custom) -} - -// get pod ip -fn get_pod_ip() -> Result { - env::var("MY_POD_IP").map_err(|_| ExporterError::PodIPError("Unable to get pod ip".to_string())) -} diff --git a/exporter/src/pool/cache.rs b/exporter/src/pool/cache.rs deleted file mode 100644 index 314560682..000000000 --- a/exporter/src/pool/cache.rs +++ /dev/null @@ -1,111 +0,0 @@ -use std::{ops::DerefMut, sync::Mutex}; - -use once_cell::sync::OnceCell; -use serde::{Deserialize, Serialize}; -use tokio::time::sleep; -use tracing::{debug, error}; - -use crate::pool::{ - client::{ - grpc_client::GrpcClient, - pool::{PoolOperations, Pools}, - }, - config::ExporterConfig, -}; - -static CACHE: OnceCell> = OnceCell::new(); - -/// Wrapper over all the data that has to be stored in cache. -#[derive(Serialize, Deserialize, Debug)] -pub struct Data { - pools: Pools, -} - -impl Default for Data { - fn default() -> Self { - Self::new() - } -} - -impl Data { - // initialize Data - fn new() -> Self { - Self { - pools: Pools { pools: vec![] }, - } - } - - /// Get pools data. - pub fn pools(&self) -> &Pools { - &self.pools - } - - // Set pools data. - fn set_pools(&mut self, pools: Pools) { - self.pools = pools; - } - - // Invalidate pools for cache. - fn invalidate_pools(&mut self) { - self.pools = Pools { pools: vec![] }; - } -} - -/// Cache to store data that has to be exposed though exporter. -pub struct Cache { - data: Data, -} - -impl Cache { - /// Initialize the cache with default value. - pub fn initialize(data: Data) { - CACHE.get_or_init(|| Mutex::new(Self { data })); - } - - /// Returns cache. - pub fn get_cache() -> &'static Mutex { - CACHE.get().expect("Cache is not initialized") - } - - /// Get data field in cache. - pub fn data_mut(&mut self) -> &mut Data { - &mut self.data - } -} - -/// To store pools related data in cache. -pub async fn store_pool_data(client: GrpcClient) { - loop { - let pools = client.list_pools().await; - { - let mut cache = match Cache::get_cache().lock() { - Ok(cache) => cache, - Err(error) => { - error!(%error, "Error while getting cache resource"); - continue; - } - }; - let pools_cache = cache.deref_mut(); - match pools { - // set pools in the cache - Ok(pools) => { - debug!("Updated pool cache with latest metrics"); - pools_cache.data_mut().set_pools(pools); - } - // invalidate cache in case of error - Err(error) => { - error!(?error, "Error getting pools data, invalidating pools cache"); - pools_cache.data_mut().invalidate_pools(); - } - }; - } - sleep(ExporterConfig::get_config().polling_time()).await; - } -} - -/// To store data in shared variable i.e cache. -pub async fn store_data(client: GrpcClient) -> Result<(), String> { - // Store pools data - store_pool_data(client).await; - Ok(()) -} diff --git a/exporter/src/pool/mod.rs b/exporter/src/pool/mod.rs deleted file mode 100644 index 77715dc9f..000000000 --- a/exporter/src/pool/mod.rs +++ /dev/null @@ -1,10 +0,0 @@ -/// Cache module. -pub mod cache; -/// Grpc client module. -pub mod client; -/// Collector module. -pub mod collector; -/// Config module for exporter. -pub mod config; -/// Error module. -pub mod error; diff --git a/exporter/Cargo.toml b/metrics-exporter/Cargo.toml similarity index 88% rename from exporter/Cargo.toml rename to metrics-exporter/Cargo.toml index 4bdb7f6a6..960e5e549 100644 --- a/exporter/Cargo.toml +++ b/metrics-exporter/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "exporter" +name = "metrics-exporter" description = "Metrics Exporters" version = "0.1.0" edition = "2021" @@ -7,8 +7,9 @@ authors = ["Sahil Raja "] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [[bin]] -name = "metrics-exporter-pool" -path = "src/pool/bin/main.rs" +name = "metrics-exporter-io-engine" +path = "src/bin/io_engine/main.rs" + [dependencies] actix-web = { version = "4.3.1", features = ["rustls"] } diff --git a/exporter/README.md b/metrics-exporter/README.md similarity index 100% rename from exporter/README.md rename to metrics-exporter/README.md diff --git a/metrics-exporter/src/bin/io_engine/cache/mod.rs b/metrics-exporter/src/bin/io_engine/cache/mod.rs new file mode 100644 index 000000000..5a951c9ef --- /dev/null +++ b/metrics-exporter/src/bin/io_engine/cache/mod.rs @@ -0,0 +1,107 @@ +use std::sync::Mutex; + +use crate::{ + client::{ + grpc_client::GrpcClient, + pool::{PoolInfo, Pools}, + }, + ExporterConfig, +}; +use once_cell::sync::OnceCell; +use serde::{Deserialize, Serialize}; +use tokio::time::sleep; +use tracing::error; + +mod pool; + +static CACHE: OnceCell> = OnceCell::new(); + +/// Trait to be implemented by all Resource structs stored in Cache. +pub trait ResourceOps { + type ResourceVec; + fn set(&mut self, val: Self::ResourceVec); + fn invalidate(&mut self); +} + +pub async fn init_resource_cache(_client: GrpcClient) { + unimplemented!() +} + +/// Cache to store data that has to be exposed though metrics-exporter. +pub struct Cache { + data: Data, +} + +impl Cache { + /// Initialize the cache with default value. + pub fn initialize(data: Data) { + CACHE.get_or_init(|| Mutex::new(Self { data })); + } + + /// Returns cache. + pub fn get_cache() -> &'static Mutex { + CACHE.get().expect("Cache is not initialized") + } + + /// Get pool mutably stored in struct + pub fn pool_mut(&mut self) -> &mut Pools { + &mut self.data.pools + } +} + +/// Wrapper over all the data that has to be stored in cache. +#[derive(Serialize, Deserialize, Debug)] +pub struct Data { + /// Contains + pools: Pools, +} + +impl Default for Data { + fn default() -> Self { + Self::new() + } +} + +impl Data { + /// Constructor for Cache data. + fn new() -> Self { + Self { + pools: Pools { pools: vec![] }, + } + } + + /// Get pools data. + pub fn pools(&self) -> &Pools { + &self.pools + } +} + +impl ResourceOps for Pools { + type ResourceVec = Vec; + + fn set(&mut self, val: Self::ResourceVec) { + self.pools = val + } + + fn invalidate(&mut self) { + self.pools = vec![] + } +} + +/// To store data in shared variable i.e cache. +pub async fn store_data(client: GrpcClient) { + // Store pools data + tokio::spawn(async move { + store_resource_data(client).await; + }); +} + +/// To store pools related data in cache. +pub async fn store_resource_data(client: GrpcClient) { + loop { + if pool::store_pool_info_data(client.clone()).await.is_err() { + error!("error storing pool cache") + } + sleep(ExporterConfig::get_config().polling_time()).await; + } +} diff --git a/metrics-exporter/src/bin/io_engine/cache/pool.rs b/metrics-exporter/src/bin/io_engine/cache/pool.rs new file mode 100644 index 000000000..b00a44b0f --- /dev/null +++ b/metrics-exporter/src/bin/io_engine/cache/pool.rs @@ -0,0 +1,34 @@ +use super::Cache; +use crate::{ + cache::ResourceOps, + client::{grpc_client::GrpcClient, pool::PoolOperations}, +}; +use std::ops::DerefMut; +use tracing::{debug, error}; + +/// To store pools related data in cache. +pub async fn store_pool_info_data(client: GrpcClient) -> Result<(), ()> { + let pools = client.list_pools().await; + let mut cache = match Cache::get_cache().lock() { + Ok(cache) => cache, + Err(error) => { + error!(%error, "Error while getting cache resource"); + return Err(()); + } + }; + let pools_cache = cache.deref_mut(); + match pools { + // set pools in the cache + Ok(pools) => { + debug!("Updated io_engine cache with latest metrics"); + pools_cache.pool_mut().set(pools.pools) + } + // invalidate cache in case of error + Err(error) => { + error!(?error, "Error getting pools data, invalidating pools cache"); + pools_cache.pool_mut().invalidate() + } + }; + + Ok(()) +} diff --git a/exporter/src/pool/client/grpc_client.rs b/metrics-exporter/src/bin/io_engine/client/grpc_client.rs similarity index 80% rename from exporter/src/pool/client/grpc_client.rs rename to metrics-exporter/src/bin/io_engine/client/grpc_client.rs index 9ecc9c10b..cad61e723 100644 --- a/exporter/src/pool/client/grpc_client.rs +++ b/metrics-exporter/src/bin/io_engine/client/grpc_client.rs @@ -1,11 +1,10 @@ -use std::time::Duration; - -use crate::pool::{client::ApiVersion, error::ExporterError}; +use crate::{error::ExporterError, ApiVersion}; use actix_web::http::Uri; use rpc::io_engine::IoEngineClientV0; +use std::{env, time::Duration}; use tokio::time::sleep; use tonic::transport::Channel; -use tracing::error; +use tracing::{error, info}; /// Timeout for gRPC. #[derive(Debug, Clone)] @@ -112,11 +111,12 @@ impl GrpcClient { } Ok(result) => match result { Ok(pool) => { + info!("grpc connected successfully"); return Ok(Self { ctx: context.clone(), v0_client: None, v1_client: Some(MayaClientV1 { pool }), - }) + }); } Err(error) => { error!(error=%error, "Grpc client connection error, retrying after {}s",sleep_duration_sec); @@ -154,3 +154,30 @@ impl GrpcClient { self.ctx.api_version.clone() } } + +/// Initialize mayastor grpc client +pub async fn init_client(api_version: ApiVersion) -> Result { + let timeout = Timeouts::new(Duration::from_secs(1), Duration::from_secs(5)); + let pod_ip = get_pod_ip()?; + let _ = get_node_name()?; + let endpoint = Uri::builder() + .scheme("https") + .authority(format!("{pod_ip}:10124")) + .path_and_query("") + .build() + .map_err(|error| ExporterError::InvalidURI(error.to_string()))?; + let ctx = GrpcContext::new(endpoint, timeout, api_version); + let client = GrpcClient::new(ctx).await?; + Ok(client) +} + +/// get pod ip. +fn get_pod_ip() -> Result { + env::var("MY_POD_IP").map_err(|_| ExporterError::PodIPError("Unable to get pod ip".to_string())) +} + +/// Get node name from pod spec. +pub fn get_node_name() -> Result { + env::var("MY_NODE_NAME") + .map_err(|_| ExporterError::GetNodeError("Unable to get node name".to_string())) +} diff --git a/exporter/src/pool/client/mod.rs b/metrics-exporter/src/bin/io_engine/client/mod.rs similarity index 92% rename from exporter/src/pool/client/mod.rs rename to metrics-exporter/src/bin/io_engine/client/mod.rs index 3c9c8b2d2..67c5a56c9 100644 --- a/exporter/src/pool/client/mod.rs +++ b/metrics-exporter/src/bin/io_engine/client/mod.rs @@ -1,6 +1,6 @@ /// Grpc client module. pub mod grpc_client; -/// Pool module. +/// PoolInfo module. pub mod pool; #[derive( diff --git a/exporter/src/pool/client/pool.rs b/metrics-exporter/src/bin/io_engine/client/pool.rs similarity index 77% rename from exporter/src/pool/client/pool.rs rename to metrics-exporter/src/bin/io_engine/client/pool.rs index 32d71e25d..dee2b247f 100644 --- a/exporter/src/pool/client/pool.rs +++ b/metrics-exporter/src/bin/io_engine/client/pool.rs @@ -1,43 +1,38 @@ +use crate::{client::grpc_client::GrpcClient, error::ExporterError, ApiVersion}; use serde::{Deserialize, Serialize}; -use crate::pool::{ - client::{grpc_client::GrpcClient, ApiVersion}, - error::ExporterError, -}; - -/// Pool resource. +/// PoolInfo resource. #[derive(Serialize, Deserialize, Debug, Clone)] -pub struct Pool { +pub struct PoolInfo { name: String, - disks: Vec, used: u64, capacity: u64, state: u64, committed: u64, } -impl Pool { - /// Get name of the pool. +impl PoolInfo { + /// Get name of the io_engine. pub fn name(&self) -> &String { &self.name } - /// Get used capacity of the pool. + /// Get used capacity of the io_engine. pub fn used(&self) -> u64 { self.used } - /// Get total capacity of the pool. + /// Get total capacity of the io_engine. pub fn capacity(&self) -> u64 { self.capacity } - /// Get the pool commitment in bytes. + /// Get the io_engine commitment in bytes. pub fn committed(&self) -> u64 { self.committed } - /// Get state of the pool. + /// Get state of the io_engine. pub fn state(&self) -> u64 { self.state } @@ -46,20 +41,19 @@ impl Pool { /// Pools resource. #[derive(Serialize, Deserialize, Debug, Clone)] pub struct Pools { - pub pools: Vec, + pub pools: Vec, } -/// Pool operations i.e wrapper over rpc calls to get pools data. +/// PoolInfo operations i.e wrapper over rpc calls to get pools data. #[tonic::async_trait] pub trait PoolOperations: Send + Sync + Sized { async fn list_pools(&self) -> Result; } -impl From for Pool { +impl From for PoolInfo { fn from(value: rpc::io_engine::Pool) -> Self { Self { name: value.name, - disks: value.disks, used: value.used, capacity: value.capacity, state: value.state as u64, @@ -67,11 +61,10 @@ impl From for Pool { } } } -impl From for Pool { +impl From for PoolInfo { fn from(value: rpc::v1::pool::Pool) -> Self { Self { name: value.name, - disks: value.disks, used: value.used, capacity: value.capacity, state: value.state as u64, @@ -90,7 +83,7 @@ impl PoolOperations for GrpcClient { .into_inner() .pools .into_iter() - .map(Pool::from) + .map(PoolInfo::from) .collect::>(), Err(error) => return Err(ExporterError::GrpcResponseError(error.to_string())), }, @@ -104,7 +97,7 @@ impl PoolOperations for GrpcClient { .into_inner() .pools .into_iter() - .map(Pool::from) + .map(PoolInfo::from) .collect::>(), Err(error) => return Err(ExporterError::GrpcResponseError(error.to_string())), }, diff --git a/exporter/src/pool/collector/mod.rs b/metrics-exporter/src/bin/io_engine/collector/mod.rs similarity index 56% rename from exporter/src/pool/collector/mod.rs rename to metrics-exporter/src/bin/io_engine/collector/mod.rs index 00b563a7d..c686087c5 100644 --- a/exporter/src/pool/collector/mod.rs +++ b/metrics-exporter/src/bin/io_engine/collector/mod.rs @@ -1,2 +1,2 @@ /// Module for pools collector. -pub mod pools_collector; +pub mod pool; diff --git a/exporter/src/pool/collector/pools_collector.rs b/metrics-exporter/src/bin/io_engine/collector/pool.rs similarity index 68% rename from exporter/src/pool/collector/pools_collector.rs rename to metrics-exporter/src/bin/io_engine/collector/pool.rs index f6f054e81..4edfc0cd2 100644 --- a/exporter/src/pool/collector/pools_collector.rs +++ b/metrics-exporter/src/bin/io_engine/collector/pool.rs @@ -1,47 +1,112 @@ use std::{env, fmt::Debug, ops::DerefMut}; +use crate::{cache::Cache, client::pool::PoolInfo, error::ExporterError}; use prometheus::{ core::{Collector, Desc}, GaugeVec, Opts, }; use tracing::error; -use crate::pool::{cache::Cache, client::pool::Pool, error::ExporterError}; - -/// PoolsCollector contains the list of custom metrics that has to be exposed by exporter. +/// Collects Pool capacity metrics from Cache. #[derive(Clone, Debug)] -pub struct PoolsCollector { +pub struct PoolCapacityCollector { pool_total_size: GaugeVec, pool_used_size: GaugeVec, pool_committed_size: GaugeVec, + descs: Vec, +} + +/// Collects pool status info from Cache +#[derive(Clone, Debug)] +pub struct PoolStatusCollector { pool_status: GaugeVec, descs: Vec, } -impl Default for PoolsCollector { +impl Default for PoolStatusCollector { fn default() -> Self { Self::new() } } -impl PoolsCollector { - /// Initialize all the metrics to be defined for pools collector. +impl PoolStatusCollector { + /// Initialize all the metrics to be defined for pools status collector. pub fn new() -> Self { - let pool_total_size_opts = Opts::new("total_size_bytes", "Total size of the pool in bytes") - .subsystem("disk_pool") - .variable_labels(vec!["node".to_string(), "name".to_string()]); - let pool_used_size_opts = Opts::new("used_size_bytes", "Used size of the pool in bytes") + let pool_status_opts = Opts::new("status", "Status of the io_engine") .subsystem("disk_pool") .variable_labels(vec!["node".to_string(), "name".to_string()]); + let mut descs = Vec::new(); + let pool_status = GaugeVec::new(pool_status_opts, &["node", "name"]) + .expect("Unable to create gauge metric type for pool_status"); + descs.extend(pool_status.desc().into_iter().cloned()); + Self { pool_status, descs } + } +} + +impl Collector for PoolStatusCollector { + fn desc(&self) -> Vec<&prometheus::core::Desc> { + self.descs.iter().collect() + } + fn collect(&self) -> Vec { + let mut c = match Cache::get_cache().lock() { + Ok(c) => c, + Err(error) => { + error!(%error,"Error while getting cache resource"); + return Vec::new(); + } + }; + let cp = c.deref_mut(); + let mut metric_family = Vec::with_capacity(3 * cp.pool_mut().pools.capacity()); + let node_name = match get_node_name() { + Ok(name) => name, + Err(error) => { + error!(?error, "Unable to get node name"); + return metric_family; + } + }; + for i in &cp.pool_mut().pools { + let p: &PoolInfo = i; + let pool_status = match self + .pool_status + .get_metric_with_label_values(&[node_name.clone().as_str(), p.name().as_str()]) + { + Ok(pool_status) => pool_status, + Err(error) => { + error!(%error, "Error while creating metrics(pool_status) with label values"); + return metric_family; + } + }; + pool_status.set(p.state() as f64); + let mut x = pool_status.collect(); + metric_family.extend(x.pop()); + } + metric_family + } +} + +impl Default for PoolCapacityCollector { + fn default() -> Self { + Self::new() + } +} + +impl PoolCapacityCollector { + /// Initialize all the metrics to be defined for pools capacity collector. + pub fn new() -> Self { + let pool_total_size_opts = + Opts::new("total_size_bytes", "Total size of the io_engine in bytes") + .subsystem("disk_pool") + .variable_labels(vec!["node".to_string(), "name".to_string()]); + let pool_used_size_opts = + Opts::new("used_size_bytes", "Used size of the io_engine in bytes") + .subsystem("disk_pool") + .variable_labels(vec!["node".to_string(), "name".to_string()]); let pool_committed_size_opts = Opts::new( "committed_size_bytes", - "Committed size of the pool in bytes", + "Committed size of the io_engine in bytes", ) .subsystem("disk_pool") .variable_labels(vec!["node".to_string(), "name".to_string()]); - let pool_status_opts = Opts::new("status", "Status of the pool") - .subsystem("disk_pool") - .variable_labels(vec!["node".to_string(), "name".to_string()]); let mut descs = Vec::new(); let pool_total_size = GaugeVec::new(pool_total_size_opts, &["node", "name"]) @@ -50,26 +115,22 @@ impl PoolsCollector { .expect("Unable to create gauge metric type for pool_used_size"); let pool_committed_size = GaugeVec::new(pool_committed_size_opts, &["node", "name"]) .expect("Unable to create gauge metric type for pool_committed_size"); - let pool_status = GaugeVec::new(pool_status_opts, &["node", "name"]) - .expect("Unable to create gauge metric type for pool_status"); // Descriptors for the custom metrics descs.extend(pool_total_size.desc().into_iter().cloned()); descs.extend(pool_used_size.desc().into_iter().cloned()); descs.extend(pool_committed_size.desc().into_iter().cloned()); - descs.extend(pool_status.desc().into_iter().cloned()); Self { pool_total_size, pool_used_size, pool_committed_size, - pool_status, descs, } } } /// Prometheus collector implementation -impl Collector for PoolsCollector { +impl Collector for PoolCapacityCollector { fn desc(&self) -> Vec<&prometheus::core::Desc> { self.descs.iter().collect() } @@ -83,7 +144,7 @@ impl Collector for PoolsCollector { } }; let cp = c.deref_mut(); - let mut metric_family = Vec::with_capacity(3 * cp.data_mut().pools().pools.capacity()); + let mut metric_family = Vec::with_capacity(3 * cp.pool_mut().pools.capacity()); let node_name = match get_node_name() { Ok(name) => name, Err(error) => { @@ -92,8 +153,8 @@ impl Collector for PoolsCollector { } }; - for i in &cp.data_mut().pools().pools { - let p: &Pool = i; + for i in &cp.pool_mut().pools { + let p: &PoolInfo = i; let pool_total_size = match self .pool_total_size @@ -136,20 +197,6 @@ impl Collector for PoolsCollector { pool_committed_size.set(p.committed() as f64); let mut x = pool_committed_size.collect(); metric_family.extend(x.pop()); - - let pool_status = match self - .pool_status - .get_metric_with_label_values(&[node_name.clone().as_str(), p.name().as_str()]) - { - Ok(pool_status) => pool_status, - Err(error) => { - error!(%error, "Error while creating metrics(pool_status) with label values"); - return metric_family; - } - }; - pool_status.set(p.state() as f64); - let mut x = pool_status.collect(); - metric_family.extend(x.pop()); } metric_family } diff --git a/exporter/src/pool/config.rs b/metrics-exporter/src/bin/io_engine/config.rs similarity index 92% rename from exporter/src/pool/config.rs rename to metrics-exporter/src/bin/io_engine/config.rs index 3b3beac45..0673bc09b 100644 --- a/exporter/src/pool/config.rs +++ b/metrics-exporter/src/bin/io_engine/config.rs @@ -14,7 +14,7 @@ pub struct ExporterConfig { } impl ExporterConfig { - /// Initialize exporter configs. + /// Initialize metrics-exporter configs. pub fn initialize(addr: SocketAddr, polling_time: Duration) { CONFIG.get_or_init(|| Self { metrics_endpoint: addr, @@ -22,7 +22,7 @@ impl ExporterConfig { }); } - /// Get exporter config. + /// Get metrics-exporter config. pub fn get_config() -> &'static ExporterConfig { CONFIG.get().expect("Exporter config is not initialized") } diff --git a/exporter/src/pool/error.rs b/metrics-exporter/src/bin/io_engine/error.rs similarity index 100% rename from exporter/src/pool/error.rs rename to metrics-exporter/src/bin/io_engine/error.rs diff --git a/metrics-exporter/src/bin/io_engine/main.rs b/metrics-exporter/src/bin/io_engine/main.rs new file mode 100644 index 000000000..99387fbf5 --- /dev/null +++ b/metrics-exporter/src/bin/io_engine/main.rs @@ -0,0 +1,93 @@ +use crate::{ + cache::store_data, + client::{grpc_client::init_client, ApiVersion}, + config::ExporterConfig, + serve::metric_route, +}; +use actix_web::{middleware, HttpServer}; +use clap::Parser; +use std::{env, net::SocketAddr}; +use tracing::info; + +/// Cache module for exporter. +pub mod cache; +/// Grpc client module. +pub mod client; +/// Collector module. +pub mod collector; +/// Config module for metrics-exporter. +pub mod config; +/// Error module. +pub mod error; +/// Prometheus metrics handler module. +pub mod serve; + +/// Initialize metrics-exporter config that are passed through arguments. +fn initialize_exporter(args: &Cli) { + ExporterConfig::initialize(args.metrics_endpoint, args.polling_time.into()); +} + +/// Initialize cache. +async fn initialize_cache() { + cache::Cache::initialize(cache::Data::default()); +} + +#[derive(Parser, Debug)] +#[clap(name = utils::package_description!(), version = utils::version_info_str!())] +struct Cli { + /// TCP address where prometheus endpoint will listen to + #[clap(long, short, default_value = "0.0.0.0:9502")] + metrics_endpoint: SocketAddr, + + /// Polling time in seconds to get pools data through gRPC calls + #[clap(short, long, default_value = "300s")] + polling_time: humantime::Duration, + + /// Io engine api versions + #[clap(short, long, value_delimiter = ',', required = true)] + api_versions: Vec, +} + +impl Cli { + fn args() -> Self { + Cli::parse() + } +} + +#[tokio::main] +async fn main() -> Result<(), String> { + let args = Cli::args(); + + utils::print_package_info!(); + + utils::tracing_telemetry::init_tracing("metrics-exporter-io_engine", vec![], None); + + initialize_exporter(&args); + + initialize_cache().await; + + // sort to get the latest api version + let mut api_versions = args.api_versions; + api_versions.sort_by(|a, b| b.cmp(a)); + + let client = init_client(api_versions.get(0).unwrap_or(&ApiVersion::V0).clone()) + .await + .expect("gRPC client not initialized"); + + store_data(client).await; + info!("starting actix app"); + let app = move || { + actix_web::App::new() + .wrap(middleware::Logger::default()) + .configure(metric_route) + }; + info!("starting endpoint"); + HttpServer::new(app) + .bind(ExporterConfig::get_config().metrics_endpoint()) + .unwrap() + .workers(1) + .run() + .await + .expect("Port should be free to expose the metrics"); + Ok(()) +} diff --git a/metrics-exporter/src/bin/io_engine/serve/handler.rs b/metrics-exporter/src/bin/io_engine/serve/handler.rs new file mode 100644 index 000000000..8b059f1d6 --- /dev/null +++ b/metrics-exporter/src/bin/io_engine/serve/handler.rs @@ -0,0 +1,38 @@ +use crate::collector::pool::{PoolCapacityCollector, PoolStatusCollector}; +use actix_web::{http::header, HttpResponse, Responder}; +use prometheus::{Encoder, Registry}; +use tracing::{error, warn}; + +/// Handler for metrics. Initializes all collector and serves data over Http. +pub async fn metrics_handler() -> impl Responder { + let pools_collector = PoolCapacityCollector::default(); + let pool_status_collector = PoolStatusCollector::default(); + // Create a new registry for prometheus + let registry = Registry::default(); + // Register pools collector in the registry + if let Err(error) = Registry::register(®istry, Box::new(pools_collector)) { + warn!(%error, "Pools collector already registered"); + } + if let Err(error) = Registry::register(®istry, Box::new(pool_status_collector)) { + warn!(%error, "Pools status collector already registered"); + } + + let mut buffer = Vec::new(); + + let encoder = prometheus::TextEncoder::new(); + // Starts collecting metrics via calling gatherers + if let Err(error) = encoder.encode(®istry.gather(), &mut buffer) { + error!(%error, "Could not encode custom metrics"); + }; + + let res_custom = match String::from_utf8(buffer.clone()) { + Ok(v) => v, + Err(error) => { + error!(%error, "Prometheus metrics could not be parsed from_utf8'd"); + String::default() + } + }; + HttpResponse::Ok() + .insert_header(header::ContentType(mime::TEXT_PLAIN)) + .body(res_custom) +} diff --git a/metrics-exporter/src/bin/io_engine/serve/mod.rs b/metrics-exporter/src/bin/io_engine/serve/mod.rs new file mode 100644 index 000000000..34d10ec28 --- /dev/null +++ b/metrics-exporter/src/bin/io_engine/serve/mod.rs @@ -0,0 +1,7 @@ +use actix_web::web; +/// module for prometheus handlers. +mod handler; + +pub fn metric_route(cfg: &mut web::ServiceConfig) { + cfg.route("/metrics", web::get().to(handler::metrics_handler)); +} diff --git a/nix/pkgs/extensions/cargo-project.nix b/nix/pkgs/extensions/cargo-project.nix index 7a11c26b0..1003a0830 100644 --- a/nix/pkgs/extensions/cargo-project.nix +++ b/nix/pkgs/extensions/cargo-project.nix @@ -57,7 +57,7 @@ let src_list = [ "Cargo.lock" "Cargo.toml" - "exporter" + "metrics-exporter" "rpc" "console-logger" "call-home" @@ -119,7 +119,7 @@ in build = { buildType, cargoBuildFlags ? [ ] }: if allInOne then - builder { inherit buildType; cargoBuildFlags = [ "-p rpc" "-p exporter" "-p call-home" "-p upgrade" ]; } + builder { inherit buildType; cargoBuildFlags = [ "-p rpc" "-p metrics-exporter" "-p call-home" "-p upgrade" ]; } else builder { inherit buildType cargoBuildFlags; }; } diff --git a/nix/pkgs/extensions/default.nix b/nix/pkgs/extensions/default.nix index 4e0bfa9f7..de621d0a5 100644 --- a/nix/pkgs/extensions/default.nix +++ b/nix/pkgs/extensions/default.nix @@ -21,18 +21,18 @@ let }; components = { buildType, builder }: rec { - exporters = { - metrics = rec { + metrics = { + exporter = rec { recurseForDerivations = true; - metrics_builder = { buildType, builder, cargoBuildFlags ? [ "-p exporter" ] }: builder.build { inherit buildType cargoBuildFlags; }; + metrics_builder = { buildType, builder, cargoBuildFlags ? [ "-p metrics-exporter" ] }: builder.build { inherit buildType cargoBuildFlags; }; metrics_installer = { pname, src }: installer { inherit pname src; }; - pool = metrics_installer { + io-engine = metrics_installer { src = if allInOne then metrics_builder { inherit buildType builder; } else - metrics_builder { inherit buildType builder; cargoBuildFlags = [ "--bin metrics-exporter-pool" ]; }; - pname = "metrics-exporter-pool"; + metrics_builder { inherit buildType builder; cargoBuildFlags = [ "--bin metrics-exporter-io-engine" ]; }; + pname = "metrics-exporter-io-engine"; }; }; }; @@ -64,7 +64,7 @@ let stats = obs_installer { src = if allInOne then - obs_builder { inherit buildType builder; cargoBuildFlags = [ "-p call-home-stats" ]; } + obs_builder { inherit buildType builder; cargoBuildFlags = [ "-p call-home-stats" ]; } else obs_builder { inherit buildType builder; cargoBuildFlags = [ "--bin call-home-stats" ]; }; pname = "obs-callhome-stats"; diff --git a/nix/pkgs/images/default.nix b/nix/pkgs/images/default.nix index ab34d1110..cd171e23d 100644 --- a/nix/pkgs/images/default.nix +++ b/nix/pkgs/images/default.nix @@ -19,9 +19,9 @@ let } // config; }; build-exporter-image = { buildType }: { - pool = build-extensions-image rec{ + io-engine = build-extensions-image rec{ inherit buildType; - package = extensions.${buildType}.exporters.metrics.pool; + package = extensions.${buildType}.metrics.exporter.io-engine; pname = package.pname; config = { ExposedPorts = { @@ -90,7 +90,7 @@ let in let build-exporter-images = { buildType }: { - metrics = build-exporter-image { + exporter = build-exporter-image { inherit buildType; }; }; @@ -113,7 +113,7 @@ let in let build-images = { buildType }: { - exporters = build-exporter-images { inherit buildType; } // { + metrics = build-exporter-images { inherit buildType; } // { recurseForDerivations = true; }; upgrade = build-upgrade-images { inherit buildType; } // { diff --git a/scripts/release.sh b/scripts/release.sh index bae44eefe..2f5ea20e7 100755 --- a/scripts/release.sh +++ b/scripts/release.sh @@ -67,7 +67,7 @@ HASH=`get_hash` BRANCH=`git rev-parse --abbrev-ref HEAD` BRANCH=${BRANCH////-} IMAGES= -DEFAULT_IMAGES="exporters.metrics.pool obs.callhome stats.aggregator upgrade.job" +DEFAULT_IMAGES="metrics.exporter.io-engine obs.callhome stats.aggregator upgrade.job" IMAGES_THAT_REQUIRE_HELM_CHART=("upgrade.job") UPLOAD= SKIP_PUBLISH=