Skip to content

Commit

Permalink
Move table row metrics into autopilot
Browse files Browse the repository at this point in the history
  • Loading branch information
vkgnosis committed Jun 30, 2022
1 parent 61d569b commit c605680
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 75 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/autopilot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ path = "src/main.rs"
anyhow = "1.0"
async-trait = "0.1"
clap = { version = "3.1", features = ["derive", "env"] }
database = { path = "../database" }
prometheus = "0.13"
prometheus-metric-storage = { git = "https://github.com/cowprotocol/prometheus-metric-storage" , tag = "v0.4.0" }
shared= { path = "../shared" }
sqlx = { version = "0.6", default-features = false, features = ["bigdecimal", "chrono", "macros", "runtime-tokio-native-tls", "postgres"] }
tokio = { version = "1.15", features = ["macros", "rt-multi-thread", "sync", "time", "signal"] }
tracing = "0.1"
url = "2.2"
6 changes: 6 additions & 0 deletions crates/autopilot/src/arguments.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::net::SocketAddr;
use tracing::level_filters::LevelFilter;
use url::Url;

#[derive(clap::Parser)]
pub struct Arguments {
Expand All @@ -11,13 +12,18 @@ pub struct Arguments {

#[clap(long, env, default_value = "0.0.0.0:9589")]
pub metrics_address: SocketAddr,

/// Url of the Postgres database. By default connects to locally running postgres.
#[clap(long, env, default_value = "postgresql://")]
pub db_url: Url,
}

impl std::fmt::Display for Arguments {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "log_filter: {}", self.log_filter)?;
writeln!(f, "log_stderr_threshold: {}", self.log_stderr_threshold)?;
writeln!(f, "metrics_address: {}", self.metrics_address)?;
writeln!(f, "db_url: SECRET")?;
Ok(())
}
}
42 changes: 42 additions & 0 deletions crates/autopilot/src/database.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use sqlx::{Executor, PgPool, Row};

#[derive(Clone)]
pub struct Postgres(pub PgPool);

impl Postgres {
pub fn new(url: &str) -> sqlx::Result<Self> {
Ok(Self(PgPool::connect_lazy(url)?))
}

async fn count_rows_in_table(&self, table: &str) -> sqlx::Result<i64> {
let query = format!("SELECT COUNT(*) FROM {};", table);
let row = self.0.fetch_one(query.as_str()).await?;
row.try_get(0).map_err(Into::into)
}

pub async fn update_table_rows_metric(&self) -> sqlx::Result<()> {
let metrics = Metrics::get();
for &table in database::ALL_TABLES {
let count = self.count_rows_in_table(table).await?;
metrics.table_rows.with_label_values(&[table]).set(count);
}
Ok(())
}
}

#[derive(prometheus_metric_storage::MetricStorage)]
struct Metrics {
/// Number of rows in db tables.
#[metric(labels("table"))]
table_rows: prometheus::IntGaugeVec,

/// Timing of db queries.
#[metric(labels("type"))]
database_queries: prometheus::HistogramVec,
}

impl Metrics {
fn get() -> &'static Self {
Metrics::instance(shared::metrics::get_metric_storage_registry()).unwrap()
}
}
42 changes: 20 additions & 22 deletions crates/autopilot/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
pub mod arguments;
pub mod database;

use crate::database::Postgres;
use anyhow::Result;
use shared::metrics::LivenessChecking;
use std::{
sync::Arc,
time::{Duration, Instant},
};
use std::{sync::Arc, time::Duration};

#[derive(prometheus_metric_storage::MetricStorage)]
struct Metrics {
/// Number of seconds program has been running for.
seconds_alive: prometheus::IntGauge,
/// Assumes tracing and metrics registry have already been set up.
pub async fn main(args: arguments::Arguments) -> Result<()> {
let serve_metrics = shared::metrics::serve_metrics(Arc::new(Liveness), args.metrics_address);
let db = Postgres::new(args.db_url.as_str()).unwrap();
let db_metrics = database_metrics(db);
tokio::select! {
result = serve_metrics => tracing::error!(?result, "serve_metrics exited"),
_ = db_metrics => (),
};
Ok(())
}

struct Liveness;
Expand All @@ -20,19 +26,11 @@ impl LivenessChecking for Liveness {
}
}

/// Assumes tracing and metrics registry have already been set up.
pub async fn main(args: arguments::Arguments) {
let update_metrics = async {
let start = Instant::now();
let metrics = Metrics::instance(shared::metrics::get_metric_storage_registry()).unwrap();
loop {
metrics.seconds_alive.set(start.elapsed().as_secs() as i64);
tokio::time::sleep(Duration::from_secs(1)).await;
async fn database_metrics(db: Postgres) -> ! {
loop {
if let Err(err) = db.update_table_rows_metric().await {
tracing::error!(?err, "failed to update table rows metric");
}
};
let serve_metrics = shared::metrics::serve_metrics(Arc::new(Liveness), args.metrics_address);
tokio::select! {
result = serve_metrics => tracing::error!(?result, "serve_metrics exited"),
_ = update_metrics => (),
};
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
43 changes: 1 addition & 42 deletions crates/orderbook/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub mod quotes;
pub mod trades;

use anyhow::Result;
use sqlx::{Executor, PgPool, Row};
use sqlx::PgPool;

// TODO: There is remaining optimization potential by implementing sqlx encoding and decoding for
// U256 directly instead of going through BigDecimal. This is not very important as this is fast
Expand All @@ -24,29 +24,10 @@ impl Postgres {
pool: PgPool::connect_lazy(uri)?,
})
}

async fn count_rows_in_table(&self, table: &str) -> Result<i64> {
let query = format!("SELECT COUNT(*) FROM {};", table);
let row = self.pool.fetch_one(query.as_str()).await?;
row.try_get(0).map_err(Into::into)
}

pub async fn update_table_rows_metric(&self) -> Result<()> {
let metrics = Metrics::get();
for &table in database::ALL_TABLES {
let count = self.count_rows_in_table(table).await?;
metrics.table_rows.with_label_values(&[table]).set(count);
}
Ok(())
}
}

#[derive(prometheus_metric_storage::MetricStorage)]
struct Metrics {
/// Number of rows in db tables.
#[metric(labels("table"))]
table_rows: prometheus::IntGaugeVec,

/// Timing of db queries.
#[metric(labels("type"))]
database_queries: prometheus::HistogramVec,
Expand All @@ -57,25 +38,3 @@ impl Metrics {
Metrics::instance(shared::metrics::get_metric_storage_registry()).unwrap()
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::database::orders::OrderStoring;

#[tokio::test]
#[ignore]
async fn postgres_count_rows_in_tables_works() {
let db = Postgres::new("postgresql://").unwrap();
database::clear_DANGER(&db.pool).await.unwrap();

let count = db.count_rows_in_table("orders").await.unwrap();
assert_eq!(count, 0);

db.insert_order(&Default::default(), Default::default())
.await
.unwrap();
let count = db.count_rows_in_table("orders").await.unwrap();
assert_eq!(count, 1);
}
}
11 changes: 0 additions & 11 deletions crates/orderbook/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,6 @@ use shared::{
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::task;

pub async fn database_metrics(database: Postgres) -> ! {
loop {
if let Err(err) = database.update_table_rows_metric().await {
tracing::error!(?err, "failed to update table rows metric");
}
tokio::time::sleep(Duration::from_secs(10)).await;
}
}

#[tokio::main]
async fn main() {
let args = orderbook::arguments::Arguments::parse();
Expand Down Expand Up @@ -581,7 +572,6 @@ async fn main() {
);
let maintenance_task =
task::spawn(service_maintainer.run_maintenance_on_new_block(current_block_stream));
let db_metrics_task = task::spawn(database_metrics(postgres));

let mut metrics_address = args.bind_address;
metrics_address.set_port(DEFAULT_METRICS_PORT);
Expand All @@ -592,7 +582,6 @@ async fn main() {
tokio::select! {
result = &mut serve_api => tracing::error!(?result, "API task exited"),
result = maintenance_task => tracing::error!(?result, "maintenance task exited"),
result = db_metrics_task => tracing::error!(?result, "database metrics task exited"),
result = metrics_task => tracing::error!(?result, "metrics task exited"),
_ = shutdown_signal() => {
tracing::info!("Gracefully shutting down API");
Expand Down

0 comments on commit c605680

Please sign in to comment.