From 3dc18ceb436013326d76f1be8819f67a46eaccae Mon Sep 17 00:00:00 2001 From: Filip Macek Date: Tue, 23 Apr 2024 16:42:23 +0200 Subject: [PATCH] fix precommit --- nautilus_core/cli/src/database/postgres.rs | 6 +- .../src/python/sql/cache_database.rs | 81 ++++++------------ .../infrastructure/src/python/sql/mod.rs | 2 +- .../infrastructure/src/sql/cache_database.rs | 82 +++++++++---------- nautilus_core/infrastructure/src/sql/mod.rs | 9 +- .../infrastructure/src/sql/models/general.rs | 3 +- .../infrastructure/src/sql/models/mod.rs | 2 +- .../infrastructure/src/sql/models/types.rs | 35 ++++---- nautilus_core/infrastructure/src/sql/pg.rs | 20 ++--- .../infrastructure/src/sql/queries.rs | 27 +++--- .../tests/test_cache_database_postgres.rs | 56 +++++++------ nautilus_core/model/src/types/currency.rs | 1 - nautilus_trader.iml | 35 ++++++++ nautilus_trader/cache/postgres/adapter.py | 12 ++- .../cache/postgres/transformers.py | 13 ++- nautilus_trader/core/nautilus_pyo3.pyi | 5 ++ .../test_cache_database_postgres.py | 5 +- 17 files changed, 201 insertions(+), 193 deletions(-) create mode 100644 nautilus_trader.iml diff --git a/nautilus_core/cli/src/database/postgres.rs b/nautilus_core/cli/src/database/postgres.rs index a2cf8cf41066..07f43d390b9b 100644 --- a/nautilus_core/cli/src/database/postgres.rs +++ b/nautilus_core/cli/src/database/postgres.rs @@ -13,9 +13,11 @@ // limitations under the License. // ------------------------------------------------------------------------------------------------- -use nautilus_infrastructure::sql::pg::{connect_pg, drop_postgres, get_postgres_connect_options, init_postgres}; -use crate::opt::{DatabaseCommand, DatabaseOpt}; +use nautilus_infrastructure::sql::pg::{ + connect_pg, drop_postgres, get_postgres_connect_options, init_postgres, +}; +use crate::opt::{DatabaseCommand, DatabaseOpt}; pub async fn run_database_command(opt: DatabaseOpt) -> anyhow::Result<()> { let command = opt.command.clone(); diff --git a/nautilus_core/infrastructure/src/python/sql/cache_database.rs b/nautilus_core/infrastructure/src/python/sql/cache_database.rs index a2c871401220..f1c7b181eb33 100644 --- a/nautilus_core/infrastructure/src/python/sql/cache_database.rs +++ b/nautilus_core/infrastructure/src/python/sql/cache_database.rs @@ -14,14 +14,16 @@ // ------------------------------------------------------------------------------------------------- use std::collections::HashMap; -use pyo3::prelude::*; -use nautilus_core::python::{to_pyruntime_err}; + use nautilus_common::runtime::get_runtime; +use nautilus_core::python::to_pyruntime_err; use nautilus_model::types::currency::Currency; -use crate::sql::cache_database::PostgresCacheDatabase; -use crate::sql::pg::delete_nautilus_postgres_tables; -use crate::sql::queries::DatabaseQueries; +use pyo3::prelude::*; +use crate::sql::{ + cache_database::PostgresCacheDatabase, pg::delete_nautilus_postgres_tables, + queries::DatabaseQueries, +}; #[pymethods] impl PostgresCacheDatabase { @@ -32,7 +34,7 @@ impl PostgresCacheDatabase { port: Option, username: Option, password: Option, - database: Option + database: Option, ) -> PyResult { let result = get_runtime().block_on(async { PostgresCacheDatabase::connect(host, port, username, password, database).await @@ -41,79 +43,48 @@ impl PostgresCacheDatabase { } #[pyo3(name = "load")] - fn py_load<'py>( - slf: PyRef<'_, Self>, - ) -> PyResult>> { - let result = get_runtime().block_on(async { - slf.load().await - }); + fn py_load(slf: PyRef<'_, Self>) -> PyResult>> { + let result = get_runtime().block_on(async { slf.load().await }); result.map_err(to_pyruntime_err) } - #[pyo3(name = "load_currency")] - fn py_load_currency( - slf: PyRef<'_, Self>, - code: &str, - ) -> PyResult> { - let result = get_runtime().block_on(async { - DatabaseQueries::load_currency(&slf.pool, code).await - }); + fn py_load_currency(slf: PyRef<'_, Self>, code: &str) -> PyResult> { + let result = + get_runtime().block_on(async { DatabaseQueries::load_currency(&slf.pool, code).await }); result.map_err(to_pyruntime_err) } #[pyo3(name = "load_currencies")] - fn py_load_currencies<'py>( - slf: PyRef<'_, Self>, - ) -> PyResult> { - let result = get_runtime().block_on(async { - DatabaseQueries::load_currencies(&slf.pool).await - }); + fn py_load_currencies(slf: PyRef<'_, Self>) -> PyResult> { + let result = + get_runtime().block_on(async { DatabaseQueries::load_currencies(&slf.pool).await }); result.map_err(to_pyruntime_err) } #[pyo3(name = "add")] - fn py_add( - slf: PyRef<'_, Self>, - key: String, - value: Vec - ) -> PyResult<()> { - let result = get_runtime().block_on(async { - slf.add(key,value).await - }); + fn py_add(slf: PyRef<'_, Self>, key: String, value: Vec) -> PyResult<()> { + let result = get_runtime().block_on(async { slf.add(key, value).await }); result.map_err(to_pyruntime_err) } #[pyo3(name = "add_currency")] - fn py_add_currency( - slf: PyRef<'_, Self>, - currency: Currency, - ) -> PyResult<()> { - let result = get_runtime().block_on(async { - slf.add_currency(currency).await - }); + fn py_add_currency(slf: PyRef<'_, Self>, currency: Currency) -> PyResult<()> { + let result = get_runtime().block_on(async { slf.add_currency(currency).await }); result.map_err(to_pyruntime_err) } #[pyo3(name = "flush_db")] - fn py_drop_schema( - slf: PyRef<'_, Self>, - ) -> PyResult<()> { - let result = get_runtime().block_on(async { - delete_nautilus_postgres_tables(&slf.pool) - .await - }); + fn py_drop_schema(slf: PyRef<'_, Self>) -> PyResult<()> { + let result = + get_runtime().block_on(async { delete_nautilus_postgres_tables(&slf.pool).await }); result.map_err(to_pyruntime_err) } #[pyo3(name = "truncate")] - fn py_truncate( - slf: PyRef<'_, Self>, - table: String - ) -> PyResult<()> { - let result = get_runtime().block_on(async { - DatabaseQueries::truncate(&slf.pool, table).await - }); + fn py_truncate(slf: PyRef<'_, Self>, table: String) -> PyResult<()> { + let result = + get_runtime().block_on(async { DatabaseQueries::truncate(&slf.pool, table).await }); result.map_err(to_pyruntime_err) } } diff --git a/nautilus_core/infrastructure/src/python/sql/mod.rs b/nautilus_core/infrastructure/src/python/sql/mod.rs index 0f12dc82fc2d..454f4be6bd37 100644 --- a/nautilus_core/infrastructure/src/python/sql/mod.rs +++ b/nautilus_core/infrastructure/src/python/sql/mod.rs @@ -13,4 +13,4 @@ // limitations under the License. // ------------------------------------------------------------------------------------------------- -pub mod cache_database; \ No newline at end of file +pub mod cache_database; diff --git a/nautilus_core/infrastructure/src/sql/cache_database.rs b/nautilus_core/infrastructure/src/sql/cache_database.rs index baa1244ed331..ed6ab08a9e48 100644 --- a/nautilus_core/infrastructure/src/sql/cache_database.rs +++ b/nautilus_core/infrastructure/src/sql/cache_database.rs @@ -13,18 +13,23 @@ // limitations under the License. // ------------------------------------------------------------------------------------------------- -use std::collections::{HashMap, VecDeque}; -use tokio::sync::mpsc::{Receiver,Sender,channel}; -use std::time::{Duration, Instant}; -use sqlx::{PgPool}; -use sqlx::postgres::PgConnectOptions; -use tokio::sync::mpsc::error::TryRecvError; -use tokio::time::sleep; -use nautilus_model::types::currency::Currency; -use crate::sql::models::general::GeneralRow; -use crate::sql::pg::{connect_pg, get_postgres_connect_options}; -use crate::sql::queries::DatabaseQueries; +use std::{ + collections::{HashMap, VecDeque}, + time::{Duration, Instant}, +}; +use nautilus_model::types::currency::Currency; +use sqlx::{postgres::PgConnectOptions, PgPool}; +use tokio::{ + sync::mpsc::{channel, error::TryRecvError, Receiver, Sender}, + time::sleep, +}; + +use crate::sql::{ + models::general::GeneralRow, + pg::{connect_pg, get_postgres_connect_options}, + queries::DatabaseQueries, +}; #[derive(Debug)] #[cfg_attr( @@ -36,15 +41,12 @@ pub struct PostgresCacheDatabase { tx: Sender, } - #[derive(Debug, Clone)] -pub enum DatabaseQuery{ - Add(String,Vec), +pub enum DatabaseQuery { + Add(String, Vec), AddCurrency(Currency), } - - fn get_buffer_interval() -> Duration { Duration::from_millis(0) } @@ -52,12 +54,12 @@ fn get_buffer_interval() -> Duration { async fn drain_buffer(pool: &PgPool, buffer: &mut VecDeque) { for cmd in buffer.drain(..) { match cmd { - DatabaseQuery::Add(key,value) => { - DatabaseQueries ::add(pool,key,value).await.unwrap(); - }, + DatabaseQuery::Add(key, value) => { + DatabaseQueries::add(pool, key, value).await.unwrap(); + } DatabaseQuery::AddCurrency(currency) => { - DatabaseQueries::add_currency(pool,currency).await.unwrap(); - }, + DatabaseQueries::add_currency(pool, currency).await.unwrap(); + } } } } @@ -68,9 +70,10 @@ impl PostgresCacheDatabase { port: Option, username: Option, password: Option, - database: Option - ) -> Result { - let pg_connect_options = get_postgres_connect_options(host,port,username,password,database).unwrap(); + database: Option, + ) -> Result { + let pg_connect_options = + get_postgres_connect_options(host, port, username, password, database).unwrap(); let pool = connect_pg(pg_connect_options.clone().into()).await.unwrap(); let (tx, rx) = channel::(1000); // spawn a thread to handle messages @@ -80,10 +83,7 @@ impl PostgresCacheDatabase { Ok(PostgresCacheDatabase { pool, tx }) } - async fn handle_message( - mut rx: Receiver, - pg_connect_options: PgConnectOptions - ){ + async fn handle_message(mut rx: Receiver, pg_connect_options: PgConnectOptions) { let pool = connect_pg(pg_connect_options).await.unwrap(); // Buffering let mut buffer: VecDeque = VecDeque::new(); @@ -106,13 +106,13 @@ impl PostgresCacheDatabase { } } // rain any remaining message - if !buffer.is_empty(){ - drain_buffer(&pool,&mut buffer).await; + if !buffer.is_empty() { + drain_buffer(&pool, &mut buffer).await; } } - pub async fn load(&self) -> Result>,sqlx::Error> { - let query = sqlx::query_as::<_,GeneralRow>("SELECT * FROM general"); + pub async fn load(&self) -> Result>, sqlx::Error> { + let query = sqlx::query_as::<_, GeneralRow>("SELECT * FROM general"); let result = query.fetch_all(&self.pool).await; match result { Ok(rows) => { @@ -127,20 +127,18 @@ impl PostgresCacheDatabase { } } } - - pub async fn add(&self, key: String, value: Vec) -> anyhow::Result<()> { - let query = DatabaseQuery::Add(key,value); - self.tx.send(query) - .await - .map_err(|err| anyhow::anyhow!("Failed to send query to database message handler: {err}")) + let query = DatabaseQuery::Add(key, value); + self.tx.send(query).await.map_err(|err| { + anyhow::anyhow!("Failed to send query to database message handler: {err}") + }) } pub async fn add_currency(&self, currency: Currency) -> anyhow::Result<()> { let query = DatabaseQuery::AddCurrency(currency); - self.tx.send(query) - .await - .map_err(|err| anyhow::anyhow!("Failed to query add_currency to database message handler: {err}")) + self.tx.send(query).await.map_err(|err| { + anyhow::anyhow!("Failed to query add_currency to database message handler: {err}") + }) } -} \ No newline at end of file +} diff --git a/nautilus_core/infrastructure/src/sql/mod.rs b/nautilus_core/infrastructure/src/sql/mod.rs index 7e2558fdce60..6e14caa333d3 100644 --- a/nautilus_core/infrastructure/src/sql/mod.rs +++ b/nautilus_core/infrastructure/src/sql/mod.rs @@ -14,12 +14,9 @@ // ------------------------------------------------------------------------------------------------- // Be careful about ordering and foreign key constraints when deleting data. -pub const NAUTILUS_TABLES: [&str; 2] = [ - "general", - "currency", -]; +pub const NAUTILUS_TABLES: [&str; 2] = ["general", "currency"]; +pub mod cache_database; pub mod models; pub mod pg; -pub mod cache_database; -pub mod queries; \ No newline at end of file +pub mod queries; diff --git a/nautilus_core/infrastructure/src/sql/models/general.rs b/nautilus_core/infrastructure/src/sql/models/general.rs index 1c47f74a7f74..824714a2c0d3 100644 --- a/nautilus_core/infrastructure/src/sql/models/general.rs +++ b/nautilus_core/infrastructure/src/sql/models/general.rs @@ -13,9 +13,8 @@ // limitations under the License. // ------------------------------------------------------------------------------------------------- - #[derive(Debug, sqlx::FromRow)] pub struct GeneralRow { pub key: String, pub value: Vec, -} \ No newline at end of file +} diff --git a/nautilus_core/infrastructure/src/sql/models/mod.rs b/nautilus_core/infrastructure/src/sql/models/mod.rs index 7e2e8e9c01da..4fe4acea056d 100644 --- a/nautilus_core/infrastructure/src/sql/models/mod.rs +++ b/nautilus_core/infrastructure/src/sql/models/mod.rs @@ -13,6 +13,6 @@ // limitations under the License. // ------------------------------------------------------------------------------------------------- -pub mod instruments; pub mod general; +pub mod instruments; pub mod types; diff --git a/nautilus_core/infrastructure/src/sql/models/types.rs b/nautilus_core/infrastructure/src/sql/models/types.rs index ede57202f430..a2d98170f723 100644 --- a/nautilus_core/infrastructure/src/sql/models/types.rs +++ b/nautilus_core/infrastructure/src/sql/models/types.rs @@ -14,29 +14,30 @@ // ------------------------------------------------------------------------------------------------- use std::str::FromStr; -use sqlx::{FromRow, Row}; -use sqlx::postgres::PgRow; -use nautilus_model::enums::CurrencyType; -use nautilus_model::types::currency::Currency; + +use nautilus_model::{enums::CurrencyType, types::currency::Currency}; +use sqlx::{postgres::PgRow, FromRow, Row}; pub struct CurrencyModel(pub Currency); -impl <'r> FromRow<'r, PgRow> for CurrencyModel { +impl<'r> FromRow<'r, PgRow> for CurrencyModel { fn from_row(row: &'r PgRow) -> Result { - let code = row.try_get::("code")?; - let precision = row.try_get::("precision")?; - let iso4217 = row.try_get::("iso4217")?; - let name = row.try_get::("name")?; - let currency_type = row.try_get::("currency_type") + let code = row.try_get::("code")?; + let precision = row.try_get::("precision")?; + let iso4217 = row.try_get::("iso4217")?; + let name = row.try_get::("name")?; + let currency_type = row + .try_get::("currency_type") .map(|res| CurrencyType::from_str(res.as_str()).unwrap())?; - + let currency = Currency::new( - code.as_str(), - precision as u8, - iso4217 as u16, - name.as_str(), - currency_type, - ).unwrap(); + code.as_str(), + precision as u8, + iso4217 as u16, + name.as_str(), + currency_type, + ) + .unwrap(); Ok(CurrencyModel(currency)) } } diff --git a/nautilus_core/infrastructure/src/sql/pg.rs b/nautilus_core/infrastructure/src/sql/pg.rs index e5ddf8f603a6..4871a183a6bf 100644 --- a/nautilus_core/infrastructure/src/sql/pg.rs +++ b/nautilus_core/infrastructure/src/sql/pg.rs @@ -167,7 +167,7 @@ pub async fn init_postgres(pg: &PgPool, database: String, password: String) -> a let result = sqlx::query(sql_statement).execute(pg).await; match result { Ok(_) => info!("Executed statement successfully"), - Err(err) =>{ + Err(err) => { if err.to_string().contains("already exists") { info!("Already exists error on statement, skipping"); } else { @@ -205,10 +205,10 @@ pub async fn init_postgres(pg: &PgPool, database: String, password: String) -> a "GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO {};", database ) - .as_str(), + .as_str(), ) - .execute(pg) - .await + .execute(pg) + .await { Ok(_) => info!("All tables privileges granted to role {}", database), Err(err) => error!( @@ -222,10 +222,10 @@ pub async fn init_postgres(pg: &PgPool, database: String, password: String) -> a "GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO {};", database ) - .as_str(), + .as_str(), ) - .execute(pg) - .await + .execute(pg) + .await { Ok(_) => info!("All sequences privileges granted to role {}", database), Err(err) => error!( @@ -239,10 +239,10 @@ pub async fn init_postgres(pg: &PgPool, database: String, password: String) -> a "GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA public TO {};", database ) - .as_str(), + .as_str(), ) - .execute(pg) - .await + .execute(pg) + .await { Ok(_) => info!("All functions privileges granted to role {}", database), Err(err) => error!( diff --git a/nautilus_core/infrastructure/src/sql/queries.rs b/nautilus_core/infrastructure/src/sql/queries.rs index 20f836913877..c0db3edc4bc3 100644 --- a/nautilus_core/infrastructure/src/sql/queries.rs +++ b/nautilus_core/infrastructure/src/sql/queries.rs @@ -13,16 +13,16 @@ // limitations under the License. // ------------------------------------------------------------------------------------------------- - use std::collections::HashMap; -use sqlx::{PgPool}; + use nautilus_model::types::currency::Currency; -use crate::sql::models::general::GeneralRow; -use crate::sql::models::types::CurrencyModel; +use sqlx::PgPool; + +use crate::sql::models::{general::GeneralRow, types::CurrencyModel}; pub struct DatabaseQueries; -impl DatabaseQueries{ +impl DatabaseQueries { pub async fn add(pool: &PgPool, key: String, value: Vec) -> anyhow::Result<()> { sqlx::query("INSERT INTO general (key, value) VALUES ($1, $2)") .bind(key) @@ -33,9 +33,8 @@ impl DatabaseQueries{ .map_err(|err| anyhow::anyhow!("Failed to insert into general table: {err}")) } - pub async fn load(pool: &PgPool) -> anyhow::Result>> { - sqlx::query_as::<_,GeneralRow>("SELECT * FROM general") + sqlx::query_as::<_, GeneralRow>("SELECT * FROM general") .fetch_all(pool) .await .map(|rows| { @@ -47,9 +46,8 @@ impl DatabaseQueries{ }) .map_err(|err| anyhow::anyhow!("Failed to load general table: {err}")) } - - pub async fn add_currency(pool: &PgPool, currency: Currency) -> anyhow::Result<()> { + pub async fn add_currency(pool: &PgPool, currency: Currency) -> anyhow::Result<()> { sqlx::query( "INSERT INTO currency (code, precision, iso4217, name, currency_type) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (code) DO NOTHING" ) @@ -64,7 +62,7 @@ impl DatabaseQueries{ .map_err(|err| anyhow::anyhow!("Failed to insert into currency table: {err}")) } - pub async fn load_currencies(pool: &PgPool) -> anyhow::Result> { + pub async fn load_currencies(pool: &PgPool) -> anyhow::Result> { sqlx::query_as::<_, CurrencyModel>("SELECT * FROM currency ORDER BY code ASC") .fetch_all(pool) .await @@ -72,21 +70,20 @@ impl DatabaseQueries{ .map_err(|err| anyhow::anyhow!("Failed to load currencies: {err}")) } - pub async fn load_currency(pool: &PgPool, code: &str) -> anyhow::Result> { sqlx::query_as::<_, CurrencyModel>("SELECT * FROM currency WHERE code = $1") .bind(code) .fetch_optional(pool) .await - .map(|currency| currency.map(|row| {row.0}) ) + .map(|currency| currency.map(|row| row.0)) .map_err(|err| anyhow::anyhow!("Failed to load currency: {err}")) } - pub async fn truncate(pool: &PgPool, table: String) -> anyhow::Result<()>{ - sqlx::query(format!("TRUNCATE TABLE {} CASCADE",table).as_str()) + pub async fn truncate(pool: &PgPool, table: String) -> anyhow::Result<()> { + sqlx::query(format!("TRUNCATE TABLE {} CASCADE", table).as_str()) .execute(pool) .await .map(|_| ()) .map_err(|err| anyhow::anyhow!("Failed to truncate table: {err}")) } -} \ No newline at end of file +} diff --git a/nautilus_core/infrastructure/tests/test_cache_database_postgres.rs b/nautilus_core/infrastructure/tests/test_cache_database_postgres.rs index f39079df502a..8fcc5ea2cf11 100644 --- a/nautilus_core/infrastructure/tests/test_cache_database_postgres.rs +++ b/nautilus_core/infrastructure/tests/test_cache_database_postgres.rs @@ -13,15 +13,18 @@ // limitations under the License. // ------------------------------------------------------------------------------------------------- - -use tokio::sync::Mutex; +use nautilus_infrastructure::sql::{ + cache_database::PostgresCacheDatabase, + pg::{ + connect_pg, delete_nautilus_postgres_tables, drop_postgres, init_postgres, + PostgresConnectOptions, + }, +}; use sqlx::PgPool; -use nautilus_infrastructure::sql::cache_database::PostgresCacheDatabase; -use nautilus_infrastructure::sql::pg::{connect_pg, delete_nautilus_postgres_tables, drop_postgres, init_postgres, PostgresConnectOptions}; +use tokio::sync::Mutex; static INITlIZED: Mutex = Mutex::const_new(false); - pub fn get_test_pg_connect_options() -> PostgresConnectOptions { PostgresConnectOptions::new( "localhost".to_string(), @@ -36,15 +39,19 @@ pub async fn get_pg() -> PgPool { connect_pg(pg_connect_options.into()).await.unwrap() } -pub async fn initialize() -> anyhow::Result<()>{ +pub async fn initialize() -> anyhow::Result<()> { let pg_pool = get_pg().await; let mut initialized = INITlIZED.lock().await; // 1. check if we need to init schema if !*initialized { // drop and init postgres commands dont throw, they just log // se we can use them here in init login in this order - drop_postgres(&pg_pool, "nautilus".to_string()).await.unwrap(); - init_postgres(&pg_pool, "nautilus".to_string(), "pass".to_string()).await.unwrap(); + drop_postgres(&pg_pool, "nautilus".to_string()) + .await + .unwrap(); + init_postgres(&pg_pool, "nautilus".to_string(), "pass".to_string()) + .await + .unwrap(); *initialized = true; } // truncate all table @@ -56,46 +63,49 @@ pub async fn initialize() -> anyhow::Result<()>{ pub async fn get_pg_cache_database() -> anyhow::Result { initialize().await.unwrap(); let connect_options = get_test_pg_connect_options(); - Ok( - PostgresCacheDatabase::connect( - Some(connect_options.host), + Ok(PostgresCacheDatabase::connect( + Some(connect_options.host), Some(connect_options.port), Some(connect_options.username), Some(connect_options.password), Some(connect_options.database), - ) - .await.unwrap() ) + .await + .unwrap()) } #[cfg(test)] -mod tests{ +mod tests { use std::time::Duration; - use crate::get_pg_cache_database; - + use crate::get_pg_cache_database; /// ----------------------------------- General ----------------------------------- #[tokio::test] - async fn test_load_general_objects_when_nothing_in_cache_returns_empty_hashmap(){ + async fn test_load_general_objects_when_nothing_in_cache_returns_empty_hashmap() { let pg_cache = get_pg_cache_database().await.unwrap(); let result = pg_cache.load().await.unwrap(); - println!("1: {:?}",result); + println!("1: {:?}", result); assert_eq!(result.len(), 0); } #[tokio::test] - async fn test_add_general_object_adds_to_cache(){ + async fn test_add_general_object_adds_to_cache() { let pg_cache = get_pg_cache_database().await.unwrap(); let test_id_value = String::from("test_value").into_bytes(); - pg_cache.add(String::from("test_id"),test_id_value.clone()).await.unwrap(); + pg_cache + .add(String::from("test_id"), test_id_value.clone()) + .await + .unwrap(); // sleep with tokio tokio::time::sleep(Duration::from_secs(1)).await; let result = pg_cache.load().await.unwrap(); - println!("2: {:?}",result); + println!("2: {:?}", result); assert_eq!(result.keys().len(), 1); - assert_eq!(result.keys().cloned().collect::>(), vec![String::from("test_id")]); // assert_eq!(result.get(&test_id_key).unwrap().to_owned(),&test_id_value.clone()); + assert_eq!( + result.keys().cloned().collect::>(), + vec![String::from("test_id")] + ); // assert_eq!(result.get(&test_id_key).unwrap().to_owned(),&test_id_value.clone()); assert_eq!(result.get("test_id").unwrap().to_owned(), test_id_value); } - } diff --git a/nautilus_core/model/src/types/currency.rs b/nautilus_core/model/src/types/currency.rs index 78a031cabe03..28d0576e3870 100644 --- a/nautilus_core/model/src/types/currency.rs +++ b/nautilus_core/model/src/types/currency.rs @@ -142,7 +142,6 @@ impl<'de> Deserialize<'de> for Currency { } } - //////////////////////////////////////////////////////////////////////////////// // Tests //////////////////////////////////////////////////////////////////////////////// diff --git a/nautilus_trader.iml b/nautilus_trader.iml new file mode 100644 index 000000000000..b7d9291c25aa --- /dev/null +++ b/nautilus_trader.iml @@ -0,0 +1,35 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/nautilus_trader/cache/postgres/adapter.py b/nautilus_trader/cache/postgres/adapter.py index d52a6b9d2e45..fe67593bc242 100644 --- a/nautilus_trader/cache/postgres/adapter.py +++ b/nautilus_trader/cache/postgres/adapter.py @@ -24,8 +24,8 @@ class CachePostgresAdapter(CacheDatabaseFacade): def __init__( - self, - config: CacheConfig = None, + self, + config: CacheConfig | None = None, ): if config: config = CacheConfig() @@ -46,14 +46,12 @@ def add_currency(self, currency: Currency): currency_pyo3 = transform_currency_to_pyo3(currency) self._backing.add_currency(currency_pyo3) - def load_currencies(self) -> dict[str,Currency]: - currencies = self._backing.load_currencies() - return { currency.code: transform_currency_from_pyo3(currency) for currency in currencies} + def load_currencies(self) -> dict[str, Currency]: + currencies = self._backing.load_currencies() + return {currency.code: transform_currency_from_pyo3(currency) for currency in currencies} def load_currency(self, code: str) -> Currency | None: currency_pyo3 = self._backing.load_currency(code) if currency_pyo3: return transform_currency_from_pyo3(currency_pyo3) return None - - diff --git a/nautilus_trader/cache/postgres/transformers.py b/nautilus_trader/cache/postgres/transformers.py index bf5a252ca491..65542490015d 100644 --- a/nautilus_trader/cache/postgres/transformers.py +++ b/nautilus_trader/cache/postgres/transformers.py @@ -13,32 +13,29 @@ # limitations under the License. # ------------------------------------------------------------------------------------------------- +from nautilus_trader.core import nautilus_pyo3 from nautilus_trader.model.enums import CurrencyType from nautilus_trader.model.objects import Currency -from nautilus_trader.core import nautilus_pyo3 ################################################################################ # Currency ################################################################################ -def transform_currency_from_pyo3(currency: nautilus_pyo3.Currency)-> Currency: +def transform_currency_from_pyo3(currency: nautilus_pyo3.Currency) -> Currency: return Currency( code=currency.code, precision=currency.precision, iso4217=currency.iso4217, name=currency.name, - currency_type=CurrencyType(currency.currency_type.value) + currency_type=CurrencyType(currency.currency_type.value), ) -def transform_currency_to_pyo3(currency: Currency)-> nautilus_pyo3.Currency: +def transform_currency_to_pyo3(currency: Currency) -> nautilus_pyo3.Currency: return nautilus_pyo3.Currency( code=currency.code, precision=currency.precision, iso4217=currency.iso4217, name=currency.name, - currency_type=nautilus_pyo3.CurrencyType.from_str(currency.currency_type.name) + currency_type=nautilus_pyo3.CurrencyType.from_str(currency.currency_type.name), ) - - - diff --git a/nautilus_trader/core/nautilus_pyo3.pyi b/nautilus_trader/core/nautilus_pyo3.pyi index ea41259eb7cf..457b1343fb78 100644 --- a/nautilus_trader/core/nautilus_pyo3.pyi +++ b/nautilus_trader/core/nautilus_pyo3.pyi @@ -682,6 +682,8 @@ class CurrencyType(Enum): CRYPTO = "CRYPTO" FIAT = "FIAT" COMMODITY_BACKED = "COMMODITY_BACKED" + @classmethod + def from_str(cls, value: str) -> CurrencyType: ... class InstrumentCloseType(Enum): END_OF_SESSION = "END_OF_SESSION" @@ -2250,6 +2252,9 @@ class RedisCacheDatabase: config: dict[str, Any], ) -> None: ... +class PostgresCacheDatabase: + pass + ################################################################################################### # Network ################################################################################################### diff --git a/tests/integration_tests/infrastructure/test_cache_database_postgres.py b/tests/integration_tests/infrastructure/test_cache_database_postgres.py index 3f878839ede6..ee69790d2a7e 100644 --- a/tests/integration_tests/infrastructure/test_cache_database_postgres.py +++ b/tests/integration_tests/infrastructure/test_cache_database_postgres.py @@ -13,12 +13,11 @@ # limitations under the License. # ------------------------------------------------------------------------------------------------- -import asyncio import os -from nautilus_trader.cache.postgres.adapter import CachePostgresAdapter import pytest +from nautilus_trader.cache.postgres.adapter import CachePostgresAdapter from nautilus_trader.common.component import MessageBus from nautilus_trader.common.component import TestClock from nautilus_trader.model.enums import CurrencyType @@ -31,6 +30,7 @@ from nautilus_trader.test_kit.stubs.identifiers import TestIdStubs from nautilus_trader.trading.strategy import Strategy + AUDUSD_SIM = TestInstrumentProvider.default_fx_ccy("AUD/USD") @@ -70,7 +70,6 @@ def setup(self): clock=self.clock, ) - def teardown(self): self.database.flush()