From 7944d686fd05673cc1b621b0f20d67443a5cf8bf Mon Sep 17 00:00:00 2001 From: Valentin <77051586+vkgnosis@users.noreply.github.com> Date: Wed, 20 Jul 2022 14:20:52 +0200 Subject: [PATCH] Move quotes module into database --- Cargo.lock | 1 + crates/database/Cargo.toml | 1 + crates/database/src/lib.rs | 1 + crates/database/src/quotes.rs | 346 ++++++++++++++++++++++++ crates/orderbook/src/database/quotes.rs | 339 +++-------------------- 5 files changed, 384 insertions(+), 304 deletions(-) create mode 100644 crates/database/src/quotes.rs diff --git a/Cargo.lock b/Cargo.lock index 6ce0a3a605..6fe610dff0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -639,6 +639,7 @@ name = "database" version = "0.1.0" dependencies = [ "bigdecimal", + "chrono", "const_format", "futures", "hex", diff --git a/crates/database/Cargo.toml b/crates/database/Cargo.toml index b44458311d..bb840edd45 100644 --- a/crates/database/Cargo.toml +++ b/crates/database/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] bigdecimal = "0.3" +chrono = { version = "0.4", default-features = false, features = ["clock"] } const_format = "0.2" futures = { version = "0.3", default-features = false, features = ["std"] } hex = "0.4" diff --git a/crates/database/src/lib.rs b/crates/database/src/lib.rs index 2bc1d1b37e..c753c2176b 100644 --- a/crates/database/src/lib.rs +++ b/crates/database/src/lib.rs @@ -1,6 +1,7 @@ pub mod byte_array; pub mod events; pub mod orders; +pub mod quotes; use byte_array::ByteArray; use sqlx::{Executor, PgPool}; diff --git a/crates/database/src/quotes.rs b/crates/database/src/quotes.rs new file mode 100644 index 0000000000..9bbb63a63d --- /dev/null +++ b/crates/database/src/quotes.rs @@ -0,0 +1,346 @@ +use crate::{orders::OrderKind, Address}; +use bigdecimal::BigDecimal; +use sqlx::{ + types::chrono::{DateTime, Utc}, + PgConnection, +}; + +pub type QuoteId = i64; + +/// One row in the `quotes` table. +#[derive(Clone, Debug, PartialEq, sqlx::FromRow)] +pub struct Quote { + pub id: QuoteId, + pub sell_token: Address, + pub buy_token: Address, + pub sell_amount: BigDecimal, + pub buy_amount: BigDecimal, + pub gas_amount: f64, + pub gas_price: f64, + pub sell_token_price: f64, + pub order_kind: OrderKind, + pub expiration_timestamp: DateTime, +} + +/// Stores the quote and returns the id. The id of the quote parameter is not used. +pub async fn save(ex: &mut PgConnection, quote: &Quote) -> Result { + const QUERY: &str = r#" +INSERT INTO quotes ( + sell_token, + buy_token, + sell_amount, + buy_amount, + gas_amount, + gas_price, + sell_token_price, + order_kind, + expiration_timestamp +) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) +RETURNING id + "#; + let (id,) = sqlx::query_as(QUERY) + .bind("e.sell_token) + .bind("e.buy_token) + .bind("e.sell_amount) + .bind("e.buy_amount) + .bind(quote.gas_amount) + .bind(quote.gas_price) + .bind(quote.sell_token_price) + .bind(quote.order_kind) + .bind(quote.expiration_timestamp) + .fetch_one(ex) + .await?; + Ok(id) +} + +pub async fn get(ex: &mut PgConnection, id: QuoteId) -> Result, sqlx::Error> { + const QUERY: &str = r#" +SELECT * +FROM quotes +WHERE id = $1 + "#; + sqlx::query_as(QUERY).bind(id).fetch_optional(ex).await +} + +/// Fields for searching stored quotes. +#[derive(Clone)] +pub struct QuoteSearchParameters { + pub sell_token: Address, + pub buy_token: Address, + pub sell_amount_0: BigDecimal, + pub sell_amount_1: BigDecimal, + pub buy_amount: BigDecimal, + pub kind: OrderKind, + pub expiration: DateTime, +} + +pub async fn find( + ex: &mut PgConnection, + params: &QuoteSearchParameters, +) -> Result, sqlx::Error> { + const QUERY: &str = r#" +SELECT * +FROM quotes +WHERE + sell_token = $1 AND + buy_token = $2 AND + ( + (order_kind = 'sell' AND sell_amount = $3) OR + (order_kind = 'sell' AND sell_amount = $4) OR + (order_kind = 'buy' AND buy_amount = $5) + ) AND + order_kind = $6 AND + expiration_timestamp >= $7 +ORDER BY gas_amount * gas_price * sell_token_price ASC +LIMIT 1 + "#; + sqlx::query_as(QUERY) + .bind(¶ms.sell_token) + .bind(¶ms.buy_token) + .bind(¶ms.sell_amount_0) + .bind(¶ms.sell_amount_1) + .bind(¶ms.buy_amount) + .bind(params.kind) + .bind(params.expiration) + .fetch_optional(ex) + .await +} + +pub async fn remove_expired_quotes( + ex: &mut PgConnection, + max_expiry: DateTime, +) -> Result<(), sqlx::Error> { + const QUERY: &str = r#" +DELETE FROM quotes +WHERE expiration_timestamp < $1 + "#; + sqlx::query(QUERY) + .bind(max_expiry) + .execute(ex) + .await + .map(|_| ()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::byte_array::ByteArray; + use chrono::Duration; + use sqlx::{types::chrono::TimeZone, Connection}; + + /// The postgres database in our CI has different datetime precision than + /// the `DateTime` uses. This leads to issues comparing round-tripped data. + /// Work around the issue by created `DateTime`s with lower precision. + fn low_precision_now() -> DateTime { + Utc.timestamp(Utc::now().timestamp(), 0) + } + + #[tokio::test] + #[ignore] + async fn postgres_save_and_get_quote_by_id() { + let mut db = PgConnection::connect("postgresql://").await.unwrap(); + let mut db = db.begin().await.unwrap(); + + let now = low_precision_now(); + let mut quote = Quote { + id: Default::default(), + sell_token: ByteArray([1; 20]), + buy_token: ByteArray([2; 20]), + sell_amount: 3.into(), + buy_amount: 4.into(), + gas_amount: 5., + gas_price: 6., + sell_token_price: 7., + order_kind: OrderKind::Sell, + expiration_timestamp: now, + }; + let id = save(&mut db, "e).await.unwrap(); + quote.id = id; + assert_eq!(get(&mut db, id).await.unwrap().unwrap(), quote); + + remove_expired_quotes(&mut db, now + Duration::seconds(30)) + .await + .unwrap(); + assert_eq!(get(&mut db, id).await.unwrap(), None); + } + + #[tokio::test] + #[ignore] + async fn postgres_save_and_find_quote() { + let mut db = PgConnection::connect("postgresql://").await.unwrap(); + let mut db = db.begin().await.unwrap(); + + let now = low_precision_now(); + let token_a = ByteArray([1; 20]); + let quote_a = Quote { + id: Default::default(), + sell_token: token_a, + buy_token: ByteArray([3; 20]), + sell_amount: 4.into(), + buy_amount: 5.into(), + order_kind: OrderKind::Sell, + gas_amount: 1., + gas_price: 1., + sell_token_price: 1., + expiration_timestamp: now, + }; + + let token_b = ByteArray([2; 20]); + let quote_b = Quote { + id: Default::default(), + sell_token: token_b, + buy_token: token_a, + sell_amount: 200.into(), + buy_amount: 100.into(), + order_kind: OrderKind::Buy, + gas_amount: 20_000_u32.into(), + gas_price: 1., + sell_token_price: 1., + expiration_timestamp: now, + }; + + // Save two measurements for token_a + let quotes_a = [ + { + let mut quote = Quote { + expiration_timestamp: now, + gas_amount: 100_u32.into(), + ..quote_a.clone() + }; + let id = save(&mut db, "e).await.unwrap(); + quote.id = id; + quote + }, + { + let mut quote = Quote { + expiration_timestamp: now + Duration::seconds(60), + gas_amount: 200_u32.into(), + ..quote_a.clone() + }; + let id = save(&mut db, "e).await.unwrap(); + quote.id = id; + quote + }, + ]; + + // Save one measurement for token_b + let quotes_b = [{ + let mut quote = Quote { + expiration_timestamp: now, + gas_amount: 10_u32.into(), + ..quote_b.clone() + }; + let id = save(&mut db, "e).await.unwrap(); + quote.id = id; + quote + }]; + + // Token A has readings valid until now and in 30s + let search_a = QuoteSearchParameters { + sell_token: quote_a.sell_token, + buy_token: quote_a.buy_token, + sell_amount_0: quote_a.sell_amount.clone(), + sell_amount_1: quote_a.sell_amount.clone(), + buy_amount: 1.into(), + kind: quote_a.order_kind, + expiration: now, + }; + assert_eq!( + find(&mut db, &search_a).await.unwrap().unwrap(), + quotes_a[0], + ); + assert_eq!( + find( + &mut db, + &QuoteSearchParameters { + expiration: now + Duration::seconds(30), + ..search_a.clone() + } + ) + .await + .unwrap() + .unwrap(), + quotes_a[1] + ); + + // Token A has readings for sell + fee amount equal to quoted amount. + assert_eq!( + find( + &mut db, + &QuoteSearchParameters { + sell_amount_0: quote_a.sell_amount.clone() - BigDecimal::from(1), + sell_amount_1: quote_a.sell_amount.clone(), + ..search_a.clone() + }, + ) + .await + .unwrap() + .unwrap(), + quotes_a[0], + ); + + // Token A has no reading for wrong filter + assert_eq!( + find( + &mut db, + &QuoteSearchParameters { + sell_amount_0: quote_a.sell_amount.clone() - BigDecimal::from(1), + sell_amount_1: quote_a.sell_amount.clone() - BigDecimal::from(1), + ..search_a.clone() + } + ) + .await + .unwrap(), + None + ); + + // Token B only has readings valid until now + let search_b = QuoteSearchParameters { + sell_token: quote_b.sell_token, + buy_token: quote_b.buy_token, + sell_amount_0: 999.into(), + sell_amount_1: 999.into(), + buy_amount: quote_b.buy_amount, + kind: quote_b.order_kind, + expiration: now, + }; + assert_eq!( + find(&mut db, &search_b).await.unwrap().unwrap(), + quotes_b[0], + ); + assert_eq!( + find( + &mut db, + &QuoteSearchParameters { + expiration: now + Duration::seconds(30), + ..search_b.clone() + } + ) + .await + .unwrap(), + None + ); + + // Token B has no reading for wrong filter + assert_eq!( + find( + &mut db, + &QuoteSearchParameters { + buy_amount: 99.into(), + ..search_b.clone() + } + ) + .await + .unwrap(), + None + ); + + // Query that previously succeeded after cleaning up expired measurements. + remove_expired_quotes(&mut db, now + Duration::seconds(120)) + .await + .unwrap(); + assert_eq!(find(&mut db, &search_a).await.unwrap(), None); + assert_eq!(find(&mut db, &search_b).await.unwrap(), None); + } +} diff --git a/crates/orderbook/src/database/quotes.rs b/crates/orderbook/src/database/quotes.rs index cdabad9f70..4b909425ff 100644 --- a/crates/orderbook/src/database/quotes.rs +++ b/crates/orderbook/src/database/quotes.rs @@ -8,27 +8,15 @@ use crate::{ order_quoting::{QuoteData, QuoteSearchParameters, QuoteStoring}, }; use anyhow::{Context, Result}; -use bigdecimal::BigDecimal; use chrono::{DateTime, Utc}; -use database::orders::OrderKind as DbOrderKind; +use database::{ + byte_array::ByteArray, + quotes::{Quote as QuoteRow, QuoteSearchParameters as DbQuoteSearchParameters}, +}; use model::quote::QuoteId; use primitive_types::H160; use shared::maintenance::Maintaining; -#[derive(sqlx::FromRow)] -struct QuoteRow { - id: QuoteId, - sell_token: database::Address, - buy_token: database::Address, - sell_amount: BigDecimal, - buy_amount: BigDecimal, - gas_amount: f64, - gas_price: f64, - sell_token_price: f64, - order_kind: DbOrderKind, - expiration_timestamp: DateTime, -} - impl TryFrom for QuoteData { type Error = anyhow::Error; @@ -59,36 +47,20 @@ impl QuoteStoring for Postgres { .with_label_values(&["save_quote"]) .start_timer(); - const QUERY: &str = r#" - INSERT INTO quotes ( - sell_token, - buy_token, - sell_amount, - buy_amount, - gas_amount, - gas_price, - sell_token_price, - order_kind, - expiration_timestamp - ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) - RETURNING id - ;"#; - - let (id,) = sqlx::query_as(QUERY) - .bind(data.sell_token.as_bytes()) - .bind(data.buy_token.as_bytes()) - .bind(u256_to_big_decimal(&data.quoted_sell_amount)) - .bind(u256_to_big_decimal(&data.quoted_buy_amount)) - .bind(&data.fee_parameters.gas_amount) - .bind(&data.fee_parameters.gas_price) - .bind(&data.fee_parameters.sell_token_price) - .bind(order_kind_into(data.kind)) - .bind(data.expiration) - .fetch_one(&self.pool) - .await - .context("failed to insert quote")?; - + let mut ex = self.pool.acquire().await?; + let row = QuoteRow { + id: Default::default(), + sell_token: ByteArray(data.sell_token.0), + buy_token: ByteArray(data.buy_token.0), + sell_amount: u256_to_big_decimal(&data.quoted_sell_amount), + buy_amount: u256_to_big_decimal(&data.quoted_buy_amount), + gas_amount: data.fee_parameters.gas_amount, + gas_price: data.fee_parameters.gas_price, + sell_token_price: data.fee_parameters.sell_token_price, + order_kind: order_kind_into(data.kind), + expiration_timestamp: data.expiration, + }; + let id = database::quotes::save(&mut ex, &row).await?; Ok(Some(id)) } @@ -98,24 +70,14 @@ impl QuoteStoring for Postgres { .with_label_values(&["get_quote"]) .start_timer(); - const QUERY: &str = r#" - SELECT * - FROM quotes - WHERE id = $1 - ;"#; - - let quote: Option = sqlx::query_as(QUERY) - .bind(id) - .fetch_optional(&self.pool) - .await - .context("failed to get quote by ID")?; - + let mut ex = self.pool.acquire().await?; + let quote = database::quotes::get(&mut ex, id).await?; quote.map(TryFrom::try_from).transpose() } async fn find( &self, - parameters: QuoteSearchParameters, + params: QuoteSearchParameters, expiration: DateTime, ) -> Result> { let _timer = super::Metrics::get() @@ -123,37 +85,19 @@ impl QuoteStoring for Postgres { .with_label_values(&["find_quote"]) .start_timer(); - const QUERY: &str = r#" - SELECT * - FROM quotes - WHERE - sell_token = $1 AND - buy_token = $2 AND - ( - (order_kind = 'sell' AND sell_amount = $3) OR - (order_kind = 'sell' AND sell_amount = $4) OR - (order_kind = 'buy' AND buy_amount = $5) - ) AND - order_kind = $6 AND - expiration_timestamp >= $7 - ORDER BY gas_amount * gas_price * sell_token_price ASC - LIMIT 1 - ;"#; - - let quote: Option = sqlx::query_as(QUERY) - .bind(parameters.sell_token.as_bytes()) - .bind(parameters.buy_token.as_bytes()) - .bind(u256_to_big_decimal(¶meters.sell_amount)) - .bind(u256_to_big_decimal( - &(parameters.sell_amount + parameters.fee_amount), - )) - .bind(u256_to_big_decimal(¶meters.buy_amount)) - .bind(order_kind_into(parameters.kind)) - .bind(expiration) - .fetch_optional(&self.pool) + let mut ex = self.pool.acquire().await?; + let params = DbQuoteSearchParameters { + sell_token: ByteArray(params.sell_token.0), + buy_token: ByteArray(params.buy_token.0), + sell_amount_0: u256_to_big_decimal(¶ms.sell_amount), + sell_amount_1: u256_to_big_decimal(&(params.sell_amount + params.fee_amount)), + buy_amount: u256_to_big_decimal(¶ms.buy_amount), + kind: order_kind_into(params.kind), + expiration, + }; + let quote = database::quotes::find(&mut ex, ¶ms) .await .context("failed finding quote by parameters")?; - quote .map(|quote| Ok((quote.id, quote.try_into()?))) .transpose() @@ -167,13 +111,9 @@ impl Postgres { .with_label_values(&["remove_expired_quotes"]) .start_timer(); - const QUERY: &str = "DELETE FROM quotes WHERE expiration_timestamp < $1;"; - sqlx::query(QUERY) - .bind(max_expiry) - .execute(&self.pool) - .await - .context("remove_expired_quotes failed") - .map(|_| ()) + let mut ex = self.pool.acquire().await?; + database::quotes::remove_expired_quotes(&mut ex, max_expiry).await?; + Ok(()) } } @@ -185,212 +125,3 @@ impl Maintaining for Postgres { .context("fee measurement maintenance error") } } - -#[cfg(test)] -mod tests { - use super::*; - use chrono::{Duration, TimeZone as _}; - use ethcontract::U256; - use model::order::OrderKind; - use primitive_types::H160; - - /// The postgres database in our CI has different datetime precision than - /// the `DateTime` uses. This leads to issues comparing round-tripped data. - /// Work around the issue by created `DateTime`s with lower precision. - fn low_precision_now() -> DateTime { - Utc.timestamp(Utc::now().timestamp(), 0) - } - - #[tokio::test] - #[ignore] - async fn postgres_save_and_get_quote_by_id() { - let db = Postgres::new("postgresql://").unwrap(); - database::clear_DANGER(&db.pool).await.unwrap(); - - let now = low_precision_now(); - let quote = QuoteData { - sell_token: H160([1; 20]), - buy_token: H160([2; 20]), - quoted_sell_amount: 3.into(), - quoted_buy_amount: 4.into(), - fee_parameters: 5_u32.into(), - kind: OrderKind::Sell, - expiration: now, - }; - let id = db.save(quote.clone()).await.unwrap().unwrap(); - - assert_eq!(db.get(id).await.unwrap().unwrap(), quote); - - db.remove_expired_quotes(now + Duration::seconds(30)) - .await - .unwrap(); - - assert_eq!(db.get(id).await.unwrap(), None); - } - - #[tokio::test] - #[ignore] - async fn postgres_save_and_find_quote() { - let db = Postgres::new("postgresql://").unwrap(); - database::clear_DANGER(&db.pool).await.unwrap(); - - let now = low_precision_now(); - let token_a = H160::from_low_u64_be(1); - let quote_a = QuoteData { - sell_token: token_a, - buy_token: H160::from_low_u64_be(3), - quoted_sell_amount: 4.into(), - quoted_buy_amount: 5.into(), - kind: OrderKind::Sell, - ..Default::default() - }; - - let token_b = H160::from_low_u64_be(2); - let quote_b = QuoteData { - sell_token: token_b, - buy_token: token_a, - quoted_sell_amount: 200.into(), - quoted_buy_amount: 100.into(), - fee_parameters: 20_000_u32.into(), - kind: OrderKind::Buy, - expiration: now, - }; - - // Save two measurements for token_a - let quotes_a = [ - { - let quote = QuoteData { - expiration: now, - fee_parameters: 100_u32.into(), - ..quote_a.clone() - }; - let id = db.save(quote.clone()).await.unwrap().unwrap(); - - (id, quote) - }, - { - let quote = QuoteData { - expiration: now + Duration::seconds(60), - fee_parameters: 200_u32.into(), - ..quote_a.clone() - }; - let id = db.save(quote.clone()).await.unwrap().unwrap(); - - (id, quote) - }, - ]; - - // Save one measurement for token_b - let quotes_b = [{ - let quote = QuoteData { - expiration: now, - fee_parameters: 10_u32.into(), - ..quote_b.clone() - }; - let id = db.save(quote.clone()).await.unwrap().unwrap(); - - (id, quote) - }]; - - // Token A has readings valid until now and in 30s - let search_a = QuoteSearchParameters { - sell_token: quote_a.sell_token, - buy_token: quote_a.buy_token, - sell_amount: quote_a.quoted_sell_amount, - buy_amount: 1.into(), - fee_amount: 0.into(), - kind: quote_a.kind, - ..Default::default() - }; - assert_eq!( - db.find(search_a.clone(), now).await.unwrap().unwrap(), - quotes_a[0], - ); - assert_eq!( - db.find(search_a.clone(), now + Duration::seconds(30)) - .await - .unwrap() - .unwrap(), - quotes_a[1], - ); - - // Token A has readings for sell + fee amount equal to quoted amount. - assert_eq!( - db.find( - QuoteSearchParameters { - sell_amount: quote_a.quoted_sell_amount - U256::from(1), - fee_amount: 1.into(), - ..search_a.clone() - }, - now - ) - .await - .unwrap() - .unwrap(), - quotes_a[0], - ); - assert_eq!( - db.find(search_a.clone(), now + Duration::seconds(30)) - .await - .unwrap() - .unwrap(), - quotes_a[1], - ); - - // Token A has no reading for wrong filter - assert_eq!( - db.find( - QuoteSearchParameters { - sell_amount: quote_a.quoted_sell_amount - U256::from(1), - ..search_a.clone() - }, - now - ) - .await - .unwrap(), - None - ); - - // Token B only has readings valid until now - let search_b = QuoteSearchParameters { - sell_token: quote_b.sell_token, - buy_token: quote_b.buy_token, - sell_amount: 999.into(), - buy_amount: quote_b.quoted_buy_amount, - fee_amount: 0.into(), - kind: quote_b.kind, - ..Default::default() - }; - assert_eq!( - db.find(search_b.clone(), now).await.unwrap().unwrap(), - quotes_b[0], - ); - assert_eq!( - db.find(search_b.clone(), now + Duration::seconds(30)) - .await - .unwrap(), - None - ); - - // Token B has no reading for wrong filter - assert_eq!( - db.find( - QuoteSearchParameters { - buy_amount: 99.into(), - ..search_b.clone() - }, - now - ) - .await - .unwrap(), - None - ); - - // Query that previously succeeded after cleaning up expired measurements. - db.remove_expired_quotes(now + Duration::seconds(120)) - .await - .unwrap(); - assert_eq!(db.find(search_a, now).await.unwrap(), None); - assert_eq!(db.find(search_b, now).await.unwrap(), None); - } -}