diff --git a/boltzr/Cargo.lock b/boltzr/Cargo.lock index 8f8cd761..fbd85151 100644 --- a/boltzr/Cargo.lock +++ b/boltzr/Cargo.lock @@ -720,6 +720,12 @@ version = "1.0.95" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34ac096ce696dc2fcabef30516bb13c0a68a11d30131d3df6f04711467681b04" +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "ark-ff" version = "0.3.0" @@ -1373,6 +1379,28 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-extra" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "460fc6f625a1f7705c6cf62d0d070794e94668988b1c38111baeec177c715f7b" +dependencies = [ + "axum 0.8.1", + "axum-core 0.5.0", + "bytes", + "futures-util", + "headers", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "serde", + "tower 0.5.2", + "tower-layer", + "tower-service", +] + [[package]] name = "axum-prometheus" version = "0.6.1" @@ -1636,6 +1664,7 @@ dependencies = [ "async-trait", "async-tungstenite", "axum 0.8.1", + "axum-extra", "axum-prometheus", "base64 0.22.1", "bech32 0.9.1", @@ -1659,6 +1688,7 @@ dependencies = [ "flate2", "futures", "futures-util", + "http-body-util", "lightning", "lightning-invoice", "metrics 0.24.1", @@ -1676,6 +1706,7 @@ dependencies = [ "r2d2", "rand", "rcgen", + "redis", "reqwest", "rstest", "rust-s3", @@ -1689,6 +1720,7 @@ dependencies = [ "toml", "tonic 0.12.3", "tonic-build 0.12.3", + "tower 0.5.2", "tracing", "tracing-loki", "tracing-opentelemetry", @@ -1944,6 +1976,20 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0" +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "const-hex" version = "1.14.0" @@ -2888,6 +2934,30 @@ dependencies = [ "serde", ] +[[package]] +name = "headers" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "322106e6bd0cba2d5ead589ddb8150a13d7c4217cf80d7c4f682ca994ccc6aa9" +dependencies = [ + "base64 0.21.7", + "bytes", + "headers-core", + "http 1.2.0", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" +dependencies = [ + "http 1.2.0", +] + [[package]] name = "heck" version = "0.5.0" @@ -4865,6 +4935,29 @@ dependencies = [ "yasna", ] +[[package]] +name = "redis" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f89727cba9cec05cc579942321ff6dd09fe57a8b3217f52f952301efa010da5" +dependencies = [ + "arc-swap", + "bytes", + "combine", + "futures-util", + "itoa", + "num-bigint", + "percent-encoding", + "pin-project-lite", + "r2d2", + "ryu", + "sha1_smol", + "socket2", + "tokio", + "tokio-util", + "url", +] + [[package]] name = "redox_syscall" version = "0.5.3" @@ -5651,6 +5744,12 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + [[package]] name = "sha2" version = "0.10.8" diff --git a/boltzr/Cargo.toml b/boltzr/Cargo.toml index f4d8bf7c..776b28ea 100644 --- a/boltzr/Cargo.toml +++ b/boltzr/Cargo.toml @@ -90,6 +90,8 @@ flate2 = "1.0.35" pyroscope = { version = "0.5.8", optional = true } pyroscope_pprofrs = { version = "0.2.8", optional = true } csv = "1.3.1" +axum-extra = { version = "0.10.0", features = ["typed-header"] } +redis = { version = "0.28.1", features = ["tokio-comp", "r2d2"] } [build-dependencies] built = { version = "0.7.5", features = ["git2"] } @@ -97,7 +99,9 @@ tonic-build = "0.12.3" [dev-dependencies] eventsource-client = "0.13.0" +http-body-util = "0.1.2" mockall = "0.13.1" rand = "0.8.5" rstest = "0.24.0" serial_test = "3.2.0" +tower = { version = "0.5.2", features = ["util"] } diff --git a/boltzr/src/api/errors.rs b/boltzr/src/api/errors.rs new file mode 100644 index 00000000..6ab99988 --- /dev/null +++ b/boltzr/src/api/errors.rs @@ -0,0 +1,169 @@ +use axum::body::Body; +use axum::extract::Request; +use axum::http::header::CONTENT_TYPE; +use axum::http::StatusCode; +use axum::middleware::Next; +use axum::response::{IntoResponse, Response}; +use axum::Json; +use serde::{Deserialize, Serialize}; + +#[derive(Deserialize, Serialize)] +pub struct ApiError { + pub error: String, +} + +pub struct AxumError(anyhow::Error); + +impl IntoResponse for AxumError { + fn into_response(self) -> Response { + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError { + error: format!("{}", self.0), + }), + ) + .into_response() + } +} + +impl From for AxumError +where + E: Into, +{ + fn from(err: E) -> Self { + Self(err.into()) + } +} + +pub async fn error_middleware(request: Request, next: Next) -> Response { + let response = next.run(request).await; + + if response.status().is_server_error() || response.status().is_client_error() { + let is_json = match response.headers().get(CONTENT_TYPE) { + Some(content_type) => content_type == "application/json", + None => false, + }; + if is_json { + return response; + } + + let (parts, body) = response.into_parts(); + let body_str = match axum::body::to_bytes(body, 1024 * 32).await { + Ok(bytes) => { + if !bytes.is_empty() { + match std::str::from_utf8(&bytes) { + Ok(str) => str.to_string(), + Err(_) => return Response::from_parts(parts, Body::from(bytes)), + } + } else { + return Response::from_parts(parts, Body::from(bytes)); + } + } + Err(err) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError { + error: format!("could not handle body: {}", err), + }), + ) + .into_response() + } + }; + + return (parts.status, Json(ApiError { error: body_str })).into_response(); + } + + response +} + +#[cfg(test)] +mod test { + use super::*; + use axum::routing::get; + use axum::Router; + use http_body_util::BodyExt; + use rstest::rstest; + use tower::util::ServiceExt; + + #[tokio::test] + async fn test_error_middleware_ignore_success() { + let msg = "gm"; + + let router = Router::new() + .route( + "/", + get(move || async move { (StatusCode::CREATED, msg).into_response() }), + ) + .layer(axum::middleware::from_fn(error_middleware)); + + let res = router + .oneshot(Request::builder().uri("/").body(Body::empty()).unwrap()) + .await + .unwrap(); + + assert_eq!(res.status(), StatusCode::CREATED); + + let body = res.into_body().collect().await.unwrap().to_bytes(); + assert_eq!(std::str::from_utf8(&body).unwrap(), msg); + } + + #[rstest] + #[case(StatusCode::BAD_REQUEST)] + #[case(StatusCode::NOT_IMPLEMENTED)] + #[tokio::test] + async fn test_error_middleware_already_json(#[case] code: StatusCode) { + let msg = "ngmi"; + + let router = Router::new() + .route( + "/", + get(move || async move { + ( + code, + Json(ApiError { + error: msg.to_owned(), + }), + ) + .into_response() + }), + ) + .layer(axum::middleware::from_fn(error_middleware)); + + let res = router + .oneshot(Request::builder().uri("/").body(Body::empty()).unwrap()) + .await + .unwrap(); + + assert_eq!(res.status(), code); + + let body = res.into_body().collect().await.unwrap().to_bytes(); + let body: ApiError = serde_json::from_slice(&body).unwrap(); + assert_eq!(body.error, msg); + } + + #[rstest] + #[case(StatusCode::BAD_REQUEST)] + #[case(StatusCode::NOT_IMPLEMENTED)] + #[tokio::test] + async fn test_error_middleware_to_json(#[case] code: StatusCode) { + let msg = "ngmi"; + + let router = Router::new() + .route( + "/", + get(move || async move { (code, msg.to_owned()).into_response() }), + ) + .layer(axum::middleware::from_fn(error_middleware)); + + let res = router + .oneshot(Request::builder().uri("/").body(Body::empty()).unwrap()) + .await + .unwrap(); + + assert_eq!(res.status(), code); + + let body = res.into_body().collect().await.unwrap().to_bytes(); + let body: ApiError = serde_json::from_slice(&body).unwrap(); + assert_eq!(body.error, msg); + } +} diff --git a/boltzr/src/api/headers.rs b/boltzr/src/api/headers.rs new file mode 100644 index 00000000..8785ac99 --- /dev/null +++ b/boltzr/src/api/headers.rs @@ -0,0 +1,82 @@ +use axum_extra::headers::{Error, Header, HeaderName, HeaderValue}; + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct Referral(String); + +impl Referral { + pub fn inner(&self) -> &str { + &self.0 + } +} + +impl Header for Referral { + fn name() -> &'static HeaderName { + static NAME: HeaderName = HeaderName::from_static("referral"); + &NAME + } + + fn decode<'i, I>(values: &mut I) -> Result + where + Self: Sized, + I: Iterator, + { + let value = values.next().ok_or_else(Error::invalid)?; + value + .to_str() + .map_err(|_| Error::invalid()) + .map(|value| Self(value.to_owned())) + } + + fn encode>(&self, values: &mut E) { + values.extend(std::iter::once( + HeaderValue::from_str(&self.0).expect("invalid header value"), + )); + } +} + +#[cfg(test)] +mod test { + use super::*; + use axum::body::Body; + use axum::extract::Request; + use axum::http::StatusCode; + use axum::response::IntoResponse; + use axum::routing::get; + use axum::Router; + use axum_extra::TypedHeader; + use http_body_util::BodyExt; + use rstest::rstest; + use tower::ServiceExt; + + #[rstest] + #[case("Referral")] + #[case("referral")] + #[case("refeRral")] + #[tokio::test] + async fn test_referral_decode(#[case] key: &str) { + let expected = "pro"; + + let router = Router::new().route( + "/", + get( + move |TypedHeader(referral): TypedHeader| async move { + (StatusCode::CREATED, referral.inner().to_owned()).into_response() + }, + ), + ); + + let res = router + .oneshot( + Request::builder() + .uri("/") + .header(key, expected) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let body = res.into_body().collect().await.unwrap().to_bytes(); + assert_eq!(std::str::from_utf8(&body).unwrap(), expected); + } +} diff --git a/boltzr/src/api/mod.rs b/boltzr/src/api/mod.rs index bf08226a..6da94a93 100644 --- a/boltzr/src/api/mod.rs +++ b/boltzr/src/api/mod.rs @@ -1,4 +1,7 @@ +use crate::api::errors::error_middleware; use crate::api::sse::sse_handler; +use crate::api::stats::get_stats; +use crate::service::Service; use axum::routing::get; use axum::{Extension, Router}; use serde::{Deserialize, Serialize}; @@ -12,7 +15,10 @@ use ws::types::SwapStatus; #[cfg(feature = "metrics")] use crate::metrics::server::MetricsLayer; +mod errors; +mod headers; mod sse; +mod stats; pub mod ws; #[derive(Deserialize, Serialize, PartialEq, Clone, Debug)] @@ -22,13 +28,18 @@ pub struct Config { } pub struct Server { - swap_infos: S, config: Config, cancellation_token: CancellationToken, + + service: Arc, + + swap_infos: S, swap_status_update_tx: tokio::sync::broadcast::Sender>, } struct ServerState { + service: Arc, + swap_infos: S, swap_status_update_tx: tokio::sync::broadcast::Sender>, } @@ -40,11 +51,13 @@ where pub fn new( config: Config, cancellation_token: CancellationToken, + service: Arc, swap_infos: S, swap_status_update_tx: tokio::sync::broadcast::Sender>, ) -> Self { Server { config, + service, swap_infos, cancellation_token, swap_status_update_tx, @@ -79,6 +92,7 @@ where axum::serve( listener, router.layer(Extension(Arc::new(ServerState { + service: self.service.clone(), swap_infos: self.swap_infos.clone(), swap_status_update_tx: self.swap_status_update_tx.clone(), }))), @@ -95,24 +109,33 @@ where } fn add_routes(router: Router) -> Router { - router.route("/streamswapstatus", get(sse_handler::)) + router + .route("/streamswapstatus", get(sse_handler::)) + .route( + "/v2/swap/{swap_type}/stats/{from}/{to}", + get(get_stats::), + ) + .layer(axum::middleware::from_fn(error_middleware)) } } #[cfg(test)] -mod test { +pub mod test { use crate::api::ws::status::SwapInfos; use crate::api::ws::types::SwapStatus; use crate::api::{Config, Server}; + use crate::cache::Redis; + use crate::service::Service; use async_trait::async_trait; use reqwest::StatusCode; + use std::sync::Arc; use std::time::Duration; use tokio::sync::broadcast::Sender; use tokio_util::sync::CancellationToken; #[derive(Debug, Clone)] - struct Fetcher { - status_tx: Sender>, + pub struct Fetcher { + pub status_tx: Sender>, } #[async_trait] @@ -150,6 +173,7 @@ mod test { host: "127.0.0.1".to_string(), }, cancel.clone(), + Arc::new(Service::new::(None, None, None)), Fetcher { status_tx: status_tx.clone(), }, diff --git a/boltzr/src/api/stats.rs b/boltzr/src/api/stats.rs new file mode 100644 index 00000000..1665141b --- /dev/null +++ b/boltzr/src/api/stats.rs @@ -0,0 +1,225 @@ +use crate::api::errors::{ApiError, AxumError}; +use crate::api::headers::Referral; +use crate::api::ws::status::SwapInfos; +use crate::api::ServerState; +use crate::db::models::SwapType; +use anyhow::{anyhow, Result}; +use axum::extract::Path; +use axum::http::StatusCode; +use axum::response::IntoResponse; +use axum::{Extension, Json}; +use axum_extra::TypedHeader; +use serde::Deserialize; +use std::sync::Arc; + +const PRO_REFERRAL: &str = "pro"; + +fn parse_swap_type(swap_type: &str) -> Result { + match swap_type { + "submarine" => Ok(SwapType::Submarine), + "reverse" => Ok(SwapType::Reverse), + "chain" => Ok(SwapType::Chain), + _ => Err(anyhow!("invalid swap type: {}", swap_type)), + } +} + +#[derive(Deserialize)] +pub struct StatsParams { + swap_type: String, + from: String, + to: String, +} + +pub async fn get_stats( + Extension(state): Extension>>, + TypedHeader(referral): TypedHeader, + Path(StatsParams { + to, + from, + swap_type, + }): Path, +) -> Result +where + S: SwapInfos + Send + Sync + Clone + 'static, +{ + let referral = referral.inner(); + if referral != PRO_REFERRAL { + return Ok(( + StatusCode::BAD_REQUEST, + Json(ApiError { + error: "allowed only for Boltz Pro".to_string(), + }), + ) + .into_response()); + } + + let swap_type = match parse_swap_type(&swap_type) { + Ok(swap_type) => swap_type, + Err(err) => { + return Ok(( + StatusCode::BAD_REQUEST, + Json(ApiError { + error: err.to_string(), + }), + ) + .into_response()) + } + }; + + let pair_stats = if let Some(stats) = &state.service.pair_stats { + stats + } else { + return Ok(( + StatusCode::NOT_IMPLEMENTED, + Json(ApiError { + error: "historical data not available".to_string(), + }), + ) + .into_response()); + }; + + let res = pair_stats + .get_pair_stats(&format!("{}/{}", from, to), swap_type, referral) + .await?; + + Ok(match res { + Some(res) => (StatusCode::OK, Json(res)).into_response(), + None => ( + StatusCode::NOT_FOUND, + Json(ApiError { + error: "invalid pair".to_string(), + }), + ) + .into_response(), + }) +} + +#[cfg(test)] +mod test { + use super::*; + use crate::api::test::Fetcher; + use crate::api::ws::types::SwapStatus; + use crate::api::Server; + use crate::service::test::PairStats; + use crate::service::Service; + use axum::body::Body; + use axum::extract::Request; + use axum::Router; + use http_body_util::BodyExt; + use rstest::rstest; + use tower::ServiceExt; + + fn setup_router(with_pair_stats: bool) -> Router { + let (status_tx, _) = tokio::sync::broadcast::channel::>(1); + + Server::::add_routes(Router::new()).layer(Extension(Arc::new(ServerState { + service: Arc::new(Service::new_mocked_prometheus(with_pair_stats)), + swap_status_update_tx: status_tx.clone(), + swap_infos: Fetcher { status_tx }, + }))) + } + + #[tokio::test] + async fn get_stats() { + let res = setup_router(true) + .oneshot( + Request::builder() + .uri("/v2/swap/submarine/stats/BTC/BTC") + .header("Referral", PRO_REFERRAL) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(res.status(), StatusCode::OK); + + let body = res.into_body().collect().await.unwrap().to_bytes(); + assert!(serde_json::from_slice::(&body).is_ok()); + } + + #[tokio::test] + async fn get_stats_historical_data_not_available() { + let res = setup_router(false) + .oneshot( + Request::builder() + .uri("/v2/swap/submarine/stats/BTC/BTC") + .header("Referral", PRO_REFERRAL) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(res.status(), StatusCode::NOT_IMPLEMENTED); + + let body = res.into_body().collect().await.unwrap().to_bytes(); + let error: ApiError = serde_json::from_slice(&body).unwrap(); + assert_eq!(error.error, "historical data not available"); + } + + #[rstest] + #[case("invalid")] + #[case("asdf")] + #[case("123")] + #[tokio::test] + async fn get_stats_invalid_swap_type(#[case] swap_type: &str) { + let res = setup_router(true) + .oneshot( + Request::builder() + .uri(format!("/v2/swap/{}/stats/BTC/BTC", swap_type)) + .header("Referral", PRO_REFERRAL) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(res.status(), StatusCode::BAD_REQUEST); + + let body = res.into_body().collect().await.unwrap().to_bytes(); + let error: ApiError = serde_json::from_slice(&body).unwrap(); + assert_eq!(error.error, format!("invalid swap type: {}", swap_type)); + } + + #[rstest] + #[case("default")] + #[case("not-pro")] + #[case("invalid")] + #[tokio::test] + async fn get_stats_non_pro(#[case] referral: &str) { + let res = setup_router(true) + .oneshot( + Request::builder() + .uri("/v2/swap/submarine/stats/BTC/BTC") + .header("Referral", referral) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(res.status(), StatusCode::BAD_REQUEST); + + let body = res.into_body().collect().await.unwrap().to_bytes(); + let error: ApiError = serde_json::from_slice(&body).unwrap(); + assert_eq!(error.error, "allowed only for Boltz Pro"); + } + + #[rstest] + #[case("submarine", SwapType::Submarine)] + #[case("reverse", SwapType::Reverse)] + #[case("chain", SwapType::Chain)] + fn test_parse_swap_type(#[case] input: &str, #[case] expected: SwapType) { + assert_eq!(parse_swap_type(input).unwrap(), expected); + } + + #[test] + fn test_parse_swap_type_invalid() { + let input = "invalid"; + assert_eq!( + parse_swap_type(input).err().unwrap().to_string(), + format!("invalid swap type: {}", input) + ); + } +} diff --git a/boltzr/src/cache/mod.rs b/boltzr/src/cache/mod.rs new file mode 100644 index 00000000..975e560d --- /dev/null +++ b/boltzr/src/cache/mod.rs @@ -0,0 +1,66 @@ +use anyhow::Result; +use async_trait::async_trait; +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; + +mod redis; + +pub use redis::*; + +fn expiry_seconds_default() -> u64 { + 120 +} + +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct CacheConfig { + #[serde(rename = "redisEndpoint")] + pub redis_endpoint: String, + + #[serde(rename = "defaultExpiry", default = "expiry_seconds_default")] + pub default_expiry: u64, +} + +#[async_trait] +pub trait Cache { + async fn get(&self, key: &str) -> Result>; + async fn set(&self, key: &str, value: &V) -> Result<()>; +} + +#[cfg(test)] +pub mod test { + use async_trait::async_trait; + use dashmap::DashMap; + use serde::de::DeserializeOwned; + use serde::Serialize; + use std::sync::Arc; + + #[derive(Clone, Debug)] + pub struct MemCache { + pub map: Arc>, + } + + impl MemCache { + pub fn new() -> Self { + Self { + map: Arc::new(DashMap::new()), + } + } + } + + #[async_trait] + impl super::Cache for MemCache { + async fn get(&self, key: &str) -> anyhow::Result> { + let res = self.map.get(key); + Ok(match res { + Some(res) => Some(serde_json::from_str(res.value())?), + None => None, + }) + } + + async fn set(&self, key: &str, value: &V) -> anyhow::Result<()> { + self.map + .insert(key.to_owned(), serde_json::to_string(value)?.to_string()); + Ok(()) + } + } +} diff --git a/boltzr/src/cache/redis.rs b/boltzr/src/cache/redis.rs new file mode 100644 index 00000000..e4bd1bc2 --- /dev/null +++ b/boltzr/src/cache/redis.rs @@ -0,0 +1,127 @@ +use crate::cache::{Cache, CacheConfig}; +use anyhow::Result; +use async_trait::async_trait; +use redis::aio::MultiplexedConnection; +use redis::Client; +use serde::de::DeserializeOwned; +use serde::Serialize; +use tracing::info; + +#[derive(Debug, Clone)] +pub struct Redis { + default_expiry: u64, + connection: MultiplexedConnection, +} + +impl Redis { + pub async fn new(config: &CacheConfig) -> Result { + let client = Client::open(&*config.redis_endpoint)?; + + let cache = Self { + default_expiry: config.default_expiry, + connection: client.get_multiplexed_tokio_connection().await?, + }; + info!("Connected to Redis cache"); + Ok(cache) + } +} + +#[async_trait] +impl Cache for Redis { + async fn get(&self, key: &str) -> Result> { + let res: Option = redis::cmd("GET") + .arg(key) + .query_async(&mut self.connection.clone()) + .await?; + + Ok(match res { + Some(res) => Some(serde_json::from_str(&res)?), + None => None, + }) + } + + async fn set(&self, key: &str, value: &V) -> Result<()> { + redis::cmd("SET") + .arg(key) + .arg(serde_json::to_string(value)?) + .arg("EX") + .arg(self.default_expiry) + .exec_async(&mut self.connection.clone()) + .await?; + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + use serde::Deserialize; + + pub const REDIS_ENDPOINT: &str = "redis://127.0.0.1:6379"; + + #[derive(Serialize, Deserialize, PartialEq, Debug)] + struct Data { + data: String, + } + + #[tokio::test] + async fn test_get_set() { + let cache = Redis::new(&CacheConfig { + redis_endpoint: REDIS_ENDPOINT.to_string(), + default_expiry: 120, + }) + .await + .unwrap(); + + let key = "test:data"; + let data = Data { + data: "is super hard to compute".to_string(), + }; + + cache.set(key, &data).await.unwrap(); + assert_eq!(cache.get::(key).await.unwrap().unwrap(), data); + } + + #[tokio::test] + async fn test_get_empty() { + let cache = Redis::new(&CacheConfig { + redis_endpoint: REDIS_ENDPOINT.to_string(), + default_expiry: 120, + }) + .await + .unwrap(); + + assert!(cache.get::("empty").await.unwrap().is_none()); + } + + #[tokio::test] + async fn test_set_ttl() { + let default_expiry = 120; + let cache = Redis::new(&CacheConfig { + default_expiry, + redis_endpoint: REDIS_ENDPOINT.to_string(), + }) + .await + .unwrap(); + + let key = "test:ttl"; + + cache + .set( + key, + &Data { + data: "ttl".to_string(), + }, + ) + .await + .unwrap(); + + let ttl: u64 = redis::cmd("TTL") + .arg(key) + .query_async(&mut cache.connection.clone()) + .await + .unwrap(); + + assert!(ttl >= default_expiry - 1 && ttl <= default_expiry); + } +} diff --git a/boltzr/src/config.rs b/boltzr/src/config.rs index 5d96cd24..467634bc 100644 --- a/boltzr/src/config.rs +++ b/boltzr/src/config.rs @@ -58,6 +58,9 @@ pub struct GlobalConfig { #[serde(rename = "marking")] pub marking: Option, + pub cache: Option, + pub historical: Option, + pub backup: Option, pub notification: Option, diff --git a/boltzr/src/grpc/server.rs b/boltzr/src/grpc/server.rs index cdb4154e..c37a72b4 100644 --- a/boltzr/src/grpc/server.rs +++ b/boltzr/src/grpc/server.rs @@ -157,6 +157,7 @@ where mod server_test { use crate::api::ws; use crate::api::ws::types::SwapStatus; + use crate::cache::Redis; use crate::chain::utils::Transaction; use crate::currencies::Currency; use crate::db::helpers::web_hook::WebHookHelper; @@ -249,7 +250,7 @@ mod server_test { disable_ssl: Some(true), }, ReloadHandler::new(), - Arc::new(Service::new(None)), + Arc::new(Service::new::(None, None, None)), Arc::new(make_mock_manager()), status_tx, Box::new(make_mock_hook_helper()), @@ -377,7 +378,7 @@ mod server_test { disable_ssl: Some(false), }, ReloadHandler::new(), - Arc::new(Service::new(None)), + Arc::new(Service::new::(None, None, None)), Arc::new(make_mock_manager()), status_tx, Box::new(make_mock_hook_helper()), diff --git a/boltzr/src/grpc/service.rs b/boltzr/src/grpc/service.rs index 6f720ed6..aff8812c 100644 --- a/boltzr/src/grpc/service.rs +++ b/boltzr/src/grpc/service.rs @@ -656,6 +656,7 @@ where } } + #[instrument(name = "grpc::is_marked", skip_all)] async fn is_marked( &self, request: Request, @@ -668,7 +669,7 @@ where })), Err(err) => Err(Status::new( Code::InvalidArgument, - format!("could not parse ip: {}", err), + format!("could not parse IP: {}", err), )), } } @@ -725,6 +726,7 @@ fn extract_parent_context(request: &Request) { #[cfg(test)] mod test { use crate::api::ws; + use crate::cache::Redis; use crate::chain::utils::Transaction; use crate::currencies::Currency; use crate::db::helpers::web_hook::WebHookHelper; @@ -1098,7 +1100,7 @@ mod test { token.clone(), BoltzService::new( ReloadHandler::new(), - Arc::new(Service::new(None)), + Arc::new(Service::new::(None, None, None)), Arc::new(make_mock_manager()), StatusFetcher::new(), status_tx, diff --git a/boltzr/src/main.rs b/boltzr/src/main.rs index b4b2f8a8..503632b2 100644 --- a/boltzr/src/main.rs +++ b/boltzr/src/main.rs @@ -11,6 +11,7 @@ use tracing::{debug, error, info, trace, warn}; mod api; mod backup; +mod cache; mod chain; mod config; mod currencies; @@ -94,6 +95,19 @@ async fn main() { std::process::exit(1); }); + let cache = if let Some(config) = config.cache { + Some(match cache::Redis::new(&config).await { + Ok(cache) => cache, + Err(err) => { + error!("Could not connect to cache: {}", err); + std::process::exit(1); + } + }) + } else { + warn!("No cache was configured"); + None + }; + // TODO: move to currencies let refund_signer = if let Some(rsk_config) = config.rsk { Some( @@ -114,7 +128,7 @@ async fn main() { let cancellation_token = tokio_util::sync::CancellationToken::new(); - let service = Arc::new(Service::new(config.marking)); + let service = Arc::new(Service::new(config.marking, config.historical, cache)); { let service = service.clone(); let cancellation_token = cancellation_token.clone(); @@ -225,7 +239,7 @@ async fn main() { cancellation_token.clone(), config.sidecar.grpc, log_reload_handler, - service, + service.clone(), swap_manager.clone(), swap_status_update_tx.clone(), Box::new(db::helpers::web_hook::WebHookHelperDatabase::new(db_pool)), @@ -240,6 +254,7 @@ async fn main() { let api_server = api::Server::new( config.sidecar.api, cancellation_token.clone(), + service, grpc_server.status_fetcher(), swap_status_update_tx.clone(), ); diff --git a/boltzr/src/service/mod.rs b/boltzr/src/service/mod.rs index a890ee99..482c540b 100644 --- a/boltzr/src/service/mod.rs +++ b/boltzr/src/service/mod.rs @@ -1,19 +1,51 @@ +use crate::cache::Cache; +use crate::service::country_codes::CountryCodes; +use crate::service::pair_stats::PairStatsFetcher; +use crate::service::prometheus::{CachedPrometheusClient, RawPrometheusClient}; use anyhow::Result; -pub use country_codes::MarkingsConfig; +use std::fmt::Debug; +use std::sync::Arc; +use tracing::warn; mod country_codes; +mod pair_stats; +mod prometheus; -use crate::service::country_codes::CountryCodes; +pub use country_codes::MarkingsConfig; +pub use pair_stats::HistoricalConfig; -#[derive(Debug)] pub struct Service { + pub pair_stats: Option, pub country_codes: CountryCodes, } impl Service { - pub fn new(markings_config: Option) -> Self { + pub fn new( + markings_config: Option, + historical_config: Option, + cache: Option, + ) -> Self { Self { country_codes: CountryCodes::new(markings_config), + pair_stats: if let Some(config) = historical_config { + Some(PairStatsFetcher::new( + if let Some(cache) = cache { + Arc::new(CachedPrometheusClient::new( + RawPrometheusClient::new(&config.prometheus_endpoint), + cache, + // The cached result being a little stale is fine + // for historical pair stats + false, + )) + } else { + Arc::new(RawPrometheusClient::new(&config.prometheus_endpoint)) + }, + config.instance, + )) + } else { + warn!("Historical data config is missing"); + None + }, } } @@ -22,3 +54,27 @@ impl Service { Ok(()) } } + +#[cfg(test)] +pub mod test { + use super::*; + use crate::service::prometheus::test::MockPrometheus; + + pub use pair_stats::PairStats; + + impl Service { + pub fn new_mocked_prometheus(with_pair_stats: bool) -> Self { + Self { + country_codes: CountryCodes::new(None), + pair_stats: if with_pair_stats { + Some(PairStatsFetcher::new( + Arc::new(MockPrometheus {}), + "".to_string(), + )) + } else { + None + }, + } + } + } +} diff --git a/boltzr/src/service/pair_stats.rs b/boltzr/src/service/pair_stats.rs new file mode 100644 index 00000000..20d2902b --- /dev/null +++ b/boltzr/src/service/pair_stats.rs @@ -0,0 +1,126 @@ +use crate::db::models::SwapType; +use crate::service::prometheus::{ + max_routing_fee_query, pair_fees_query, PrometheusClient, PROMETHEUS_QUERY_STEP, +}; +use anyhow::Result; +use futures_util::try_join; +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; + +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct HistoricalConfig { + #[serde(rename = "prometheusEndpoint")] + pub prometheus_endpoint: String, + + pub instance: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct PairStats { + pub fee: Vec<(u64, f64)>, + + #[serde(rename = "maximalRoutingFee", skip_serializing_if = "Option::is_none")] + pub maximal_routing_fee: Option>, +} + +#[derive(Clone)] +pub struct PairStatsFetcher { + prometheus_client: Arc, + + instance: String, +} + +impl PairStatsFetcher { + pub fn new(p: Arc, instance: String) -> Self { + Self { + instance, + prometheus_client: p, + } + } + + pub async fn get_pair_stats( + &self, + pair: &str, + kind: SwapType, + referral: &str, + ) -> Result> { + let (start, end) = Self::get_start_end()?; + let (fee, maximal_routing_fee) = try_join!( + self.fetch_fees(pair, kind, referral, start, end), + self.fetch_max_routing_fee(pair, kind, referral, start, end) + )?; + + Ok(fee.map(|fee| PairStats { + fee, + maximal_routing_fee, + })) + } + + async fn fetch_fees( + &self, + pair: &str, + kind: SwapType, + referral: &str, + start: u64, + end: u64, + ) -> Result>> { + let fees = self + .prometheus_client + .query( + &pair_fees_query(&self.instance, pair, kind, referral), + PROMETHEUS_QUERY_STEP, + start, + end, + ) + .await?; + if fees.len() != 1 { + // No info available for the pair + return Ok(None); + } + + Ok(Some(Self::format_values(&fees[0].values))) + } + + async fn fetch_max_routing_fee( + &self, + pair: &str, + kind: SwapType, + referral: &str, + start: u64, + end: u64, + ) -> Result>> { + if kind != SwapType::Submarine { + return Ok(None); + } + + let max_routing_fees = self + .prometheus_client + .query( + &max_routing_fee_query(&self.instance, pair, referral), + PROMETHEUS_QUERY_STEP, + start, + end, + ) + .await?; + if max_routing_fees.len() != 1 { + // No info available for the pair + return Ok(None); + } + + Ok(Some(Self::format_values(&max_routing_fees[0].values))) + } + + fn format_values(values: &[(u64, String)]) -> Vec<(u64, f64)> { + values + .iter() + .map(|(time, value)| (*time, value.parse().unwrap_or(0.0))) + .collect() + } + + fn get_start_end() -> Result<(u64, u64)> { + let end = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(); + Ok((end - 60 * 60 * 24 * 7, end)) + } +} diff --git a/boltzr/src/service/prometheus.rs b/boltzr/src/service/prometheus.rs new file mode 100644 index 00000000..e53fdf5e --- /dev/null +++ b/boltzr/src/service/prometheus.rs @@ -0,0 +1,286 @@ +use crate::cache::Cache; +use crate::db::models::SwapType; +use anyhow::{anyhow, Result}; +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; +use std::hash::{DefaultHasher, Hash, Hasher}; + +pub const PROMETHEUS_QUERY_STEP: &str = "10m"; + +#[derive(Deserialize, Debug)] +struct PrometheusError { + error: String, +} + +#[derive(Deserialize, Serialize, PartialEq, Debug)] +pub struct RangeResult { + pub values: Vec<(u64, String)>, +} + +#[derive(Deserialize, Debug)] +struct QueryRangeData { + result: Vec, +} + +#[derive(Deserialize, Debug)] +struct QueryRangeSuccess { + data: QueryRangeData, +} + +#[derive(Deserialize, Debug)] +#[serde(tag = "status")] +enum QueryRangeResponse { + #[serde(rename = "success")] + Success(QueryRangeSuccess), + #[serde(rename = "error")] + Error(PrometheusError), +} + +#[async_trait] +pub trait PrometheusClient { + async fn query( + &self, + query: &str, + step: &str, + start: u64, + end: u64, + ) -> Result>; +} + +#[derive(Debug, Clone)] +pub struct RawPrometheusClient { + endpoint: String, +} + +impl RawPrometheusClient { + pub fn new(endpoint: &str) -> Self { + Self { + endpoint: endpoint.strip_suffix("/").unwrap_or(endpoint).to_string(), + } + } +} + +#[async_trait] +impl PrometheusClient for RawPrometheusClient { + async fn query( + &self, + query: &str, + step: &str, + start: u64, + end: u64, + ) -> Result> { + let url = reqwest::Url::parse_with_params( + &format!("{}/api/v1/query_range", self.endpoint), + &[ + ("query", query), + ("step", step), + ("start", &start.to_string()), + ("end", &end.to_string()), + ], + )?; + let res = reqwest::get(url) + .await? + .json::() + .await?; + match res { + QueryRangeResponse::Success(res) => Ok(res.data.result), + QueryRangeResponse::Error(err) => Err(anyhow!(err.error)), + } + } +} + +#[derive(Debug, Clone)] +pub struct CachedPrometheusClient +where + P: PrometheusClient + Clone + Debug + Sync, + C: Cache + Clone + Debug + Sync, +{ + cache: C, + client: P, + include_start_end_in_key: bool, +} + +impl CachedPrometheusClient +where + P: PrometheusClient + Clone + Debug + Sync, + C: Cache + Clone + Debug + Sync, +{ + pub fn new(client: P, cache: C, include_start_end_in_key: bool) -> Self { + Self { + cache, + client, + include_start_end_in_key, + } + } + + fn get_cache_key(&self, query: &str, step: &str, start: u64, end: u64) -> String { + // Some caches might not like the unescaped query string, so we hash it + let mut hasher = DefaultHasher::new(); + query.hash(&mut hasher); + let key = format!("prometheus:query:{}:{}", hasher.finish(), step); + + if self.include_start_end_in_key { + format!("{}:{}:{}", key, start, end) + } else { + key + } + } +} + +#[async_trait] +impl PrometheusClient for CachedPrometheusClient +where + P: PrometheusClient + Clone + Debug + Sync, + C: Cache + Clone + Debug + Sync, +{ + async fn query( + &self, + query: &str, + step: &str, + start: u64, + end: u64, + ) -> Result> { + let cache_key = self.get_cache_key(query, step, start, end); + if let Some(cached) = self.cache.get(&cache_key).await? { + return Ok(cached); + } + + let res = self.client.query(query, step, start, end).await?; + self.cache.set(&cache_key, &res).await?; + + Ok(res) + } +} + +pub fn pair_fees_query(instance: &str, pair: &str, kind: SwapType, referral: &str) -> String { + format!( + "boltz_pair_fees{{instance=\"{}\", pair=\"{}\", type=\"{}\", referral=\"{}\"}}", + instance, + pair, + format_kind(kind), + referral, + ) +} + +pub fn max_routing_fee_query(instance: &str, pair: &str, referral: &str) -> String { + format!( + "boltz_pair_max_routing_fee{{instance=\"{}\", pair=\"{}\", referral=\"{}\"}}", + instance, pair, referral + ) +} + +fn format_kind(kind: SwapType) -> String { + match kind { + SwapType::Submarine => "swap", + SwapType::Reverse => "reverse", + SwapType::Chain => "chain", + } + .to_string() +} + +#[cfg(test)] +pub mod test { + use super::*; + use crate::cache::test::MemCache; + use rstest::rstest; + + #[derive(Clone, Debug)] + pub struct MockPrometheus {} + + #[async_trait] + impl PrometheusClient for MockPrometheus { + async fn query( + &self, + _query: &str, + _step: &str, + _start: u64, + _end: u64, + ) -> Result> { + Ok(vec![RangeResult { + values: vec![(1, "data".to_owned())], + }]) + } + } + + #[tokio::test] + async fn test_cache_query_set() { + let prom = MockPrometheus {}; + let cache = MemCache::new(); + + let client = CachedPrometheusClient::new(prom.clone(), cache.clone(), false); + + let query = "query"; + let step = "10m"; + + assert!(cache.map.is_empty()); + + let res = client.query(query, step, 1, 2).await.unwrap(); + assert_eq!(res, prom.query(query, step, 1, 2).await.unwrap()); + + assert_eq!(cache.map.len(), 1); + assert_eq!( + cache + .map + .get(&client.get_cache_key(query, step, 1, 2)) + .unwrap() + .value(), + &serde_json::to_string(&res).unwrap() + ); + + let cached_res = client.query(query, step, 1, 2).await.unwrap(); + assert_eq!(cached_res, res); + } + + #[rstest] + #[case("cpu", "10m", 1, 2)] + #[case("ram", "1m", 21, 42)] + #[case("network", "1h", 100, 250)] + fn test_get_cache_key( + #[case] query: &str, + #[case] step: &str, + #[case] start: u64, + #[case] end: u64, + ) { + let client = CachedPrometheusClient::new(MockPrometheus {}, MemCache::new(), false); + let mut hasher = DefaultHasher::new(); + query.hash(&mut hasher); + assert_eq!( + client.get_cache_key(query, step, start, end), + format!("prometheus:query:{}:{}", hasher.finish(), step) + ); + } + + #[rstest] + #[case("cpu", "10m", 1, 2)] + #[case("ram", "1m", 21, 42)] + #[case("network", "1h", 100, 250)] + fn test_get_cache_key_include_start_end_in_key( + #[case] query: &str, + #[case] step: &str, + #[case] start: u64, + #[case] end: u64, + ) { + let client = CachedPrometheusClient::new(MockPrometheus {}, MemCache::new(), true); + let mut hasher = DefaultHasher::new(); + query.hash(&mut hasher); + assert_eq!( + client.get_cache_key(query, step, start, end), + format!( + "prometheus:query:{}:{}:{}:{}", + hasher.finish(), + step, + start, + end + ) + ); + } + + #[rstest] + #[case(SwapType::Submarine, "swap")] + #[case(SwapType::Reverse, "reverse")] + #[case(SwapType::Chain, "chain")] + fn test_format_kind(#[case] kind: SwapType, #[case] expected: &str) { + assert_eq!(format_kind(kind), expected); + } +} diff --git a/lib/Prometheus.ts b/lib/Prometheus.ts index 55e5c28a..d3c2670f 100644 --- a/lib/Prometheus.ts +++ b/lib/Prometheus.ts @@ -139,7 +139,7 @@ class Prometheus { labelNames: ['pair', 'type', 'extrema', 'referral'], help: 'pair limits', collect: async function () { - iterateAllPairs((pair, pairId, type, referral) => { + await iterateAllPairs((pair, pairId, type, referral) => { this.set( { type, @@ -170,7 +170,7 @@ class Prometheus { labelNames: ['pair', 'type', 'referral'], help: 'pair fees', collect: async function () { - iterateAllPairs((pair, pairId, type, referral) => { + await iterateAllPairs((pair, pairId, type, referral) => { this.set( { type, @@ -190,7 +190,7 @@ class Prometheus { labelNames: ['pair', 'referral'], help: 'pair max routing fee', collect: async function () { - iterateAllPairs( + await iterateAllPairs( (pair, pairId, _, referral) => { this.set( { diff --git a/package.json b/package.json index 412d15c7..55159550 100644 --- a/package.json +++ b/package.json @@ -20,9 +20,9 @@ "prettier": "npx prettier docs '{lib,test}/**/*.ts' bin", "prettier:write": "npm run prettier -- --write", "prettier:check": "npm run prettier -- --check", - "db:start": "docker run --name boltz-db --rm -v ~/.boltz/db:/var/lib/postgresql/data -e PGDATA=/var/lib/postgresql/data/pgdata -e POSTGRES_DB=boltz -e POSTGRES_USER=boltz -e POSTGRES_PASSWORD=boltz -d -p 5432:5432 postgres:14-alpine && npm run db:setup", + "db:start": "npm run docker:dragonfly:start && docker run --name boltz-db --rm -v ~/.boltz/db:/var/lib/postgresql/data -e PGDATA=/var/lib/postgresql/data/pgdata -e POSTGRES_DB=boltz -e POSTGRES_USER=boltz -e POSTGRES_PASSWORD=boltz -d -p 5432:5432 postgres:14-alpine && npm run db:setup", "db:setup": "docker exec boltz-db sh -c \"sleep 5 && psql -U boltz -tc \\\"SELECT 1 FROM pg_database WHERE datname = 'boltz_test'\\\" | grep -q 1 || psql -U boltz -c \\\"CREATE DATABASE boltz_test\\\"\"", - "db:stop": "docker stop boltz-db", + "db:stop": "docker stop boltz-db && npm run docker:dragonfly:stop", "docker:build": "docker build . -t boltz/backend:latest -f ./docker/boltz/Dockerfile", "docker:regtest": "./docker/regtest/startRegtest.sh", "docker:solidity": "docker run -d --name anvil -p 8545:8545 ghcr.io/foundry-rs/foundry:nightly-95015894110734539c53ffad97cd64ca116fce5e \"anvil --host 0.0.0.0 --chain-id 33\"", @@ -31,6 +31,8 @@ "docker:start": "npm run docker:regtest && npm run docker:solidity && npm run docker:solidity:deploy && npm run docker:solidity:fund", "docker:cln:hold": "cd hold && cargo build && cp target/debug/hold ../docker/regtest/data/cln/plugins && docker exec regtest lightning-cli plugin start /root/.lightning/plugins/hold && docker exec regtest chmod -R 777 /root/.lightning/regtest/hold", "docker:cln:plugins": "npm run docker:cln:hold", + "docker:dragonfly:start": "docker run -d -p 6379:6379 --ulimit memlock=-1 --name dragonfly docker.dragonflydb.io/dragonflydb/dragonfly --cache_mode --maxmemory 8g", + "docker:dragonfly:stop": "docker stop dragonfly && docker rm dragonfly", "docker:stop": "docker kill regtest && docker rm regtest && docker kill anvil && docker rm anvil", "test": "npm run test:unit && npm run docker:start && npm run test:int && npm run docker:stop", "test:nodocker": "npm run test:unit && npm run test:int",