Skip to content

Commit

Permalink
fix precommit
Browse files Browse the repository at this point in the history
  • Loading branch information
filipmacek committed Apr 23, 2024
1 parent 6d50216 commit 3dc18ce
Show file tree
Hide file tree
Showing 17 changed files with 201 additions and 193 deletions.
6 changes: 4 additions & 2 deletions nautilus_core/cli/src/database/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
// limitations under the License.
// -------------------------------------------------------------------------------------------------

use nautilus_infrastructure::sql::pg::{connect_pg, drop_postgres, get_postgres_connect_options, init_postgres};
use crate::opt::{DatabaseCommand, DatabaseOpt};
use nautilus_infrastructure::sql::pg::{
connect_pg, drop_postgres, get_postgres_connect_options, init_postgres,
};

use crate::opt::{DatabaseCommand, DatabaseOpt};

pub async fn run_database_command(opt: DatabaseOpt) -> anyhow::Result<()> {
let command = opt.command.clone();
Expand Down
81 changes: 26 additions & 55 deletions nautilus_core/infrastructure/src/python/sql/cache_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@
// -------------------------------------------------------------------------------------------------

use std::collections::HashMap;
use pyo3::prelude::*;
use nautilus_core::python::{to_pyruntime_err};

use nautilus_common::runtime::get_runtime;
use nautilus_core::python::to_pyruntime_err;
use nautilus_model::types::currency::Currency;
use crate::sql::cache_database::PostgresCacheDatabase;
use crate::sql::pg::delete_nautilus_postgres_tables;
use crate::sql::queries::DatabaseQueries;
use pyo3::prelude::*;

use crate::sql::{
cache_database::PostgresCacheDatabase, pg::delete_nautilus_postgres_tables,
queries::DatabaseQueries,
};

#[pymethods]
impl PostgresCacheDatabase {
Expand All @@ -32,7 +34,7 @@ impl PostgresCacheDatabase {
port: Option<u16>,
username: Option<String>,
password: Option<String>,
database: Option<String>
database: Option<String>,
) -> PyResult<Self> {
let result = get_runtime().block_on(async {
PostgresCacheDatabase::connect(host, port, username, password, database).await
Expand All @@ -41,79 +43,48 @@ impl PostgresCacheDatabase {
}

#[pyo3(name = "load")]
fn py_load<'py>(
slf: PyRef<'_, Self>,
) -> PyResult<HashMap<String, Vec<u8>>> {
let result = get_runtime().block_on(async {
slf.load().await
});
fn py_load(slf: PyRef<'_, Self>) -> PyResult<HashMap<String, Vec<u8>>> {
let result = get_runtime().block_on(async { slf.load().await });
result.map_err(to_pyruntime_err)
}


#[pyo3(name = "load_currency")]
fn py_load_currency(
slf: PyRef<'_, Self>,
code: &str,
) -> PyResult<Option<Currency>> {
let result = get_runtime().block_on(async {
DatabaseQueries::load_currency(&slf.pool, code).await
});
fn py_load_currency(slf: PyRef<'_, Self>, code: &str) -> PyResult<Option<Currency>> {
let result =
get_runtime().block_on(async { DatabaseQueries::load_currency(&slf.pool, code).await });
result.map_err(to_pyruntime_err)
}

#[pyo3(name = "load_currencies")]
fn py_load_currencies<'py>(
slf: PyRef<'_, Self>,
) -> PyResult<Vec<Currency>> {
let result = get_runtime().block_on(async {
DatabaseQueries::load_currencies(&slf.pool).await
});
fn py_load_currencies(slf: PyRef<'_, Self>) -> PyResult<Vec<Currency>> {
let result =
get_runtime().block_on(async { DatabaseQueries::load_currencies(&slf.pool).await });
result.map_err(to_pyruntime_err)
}

#[pyo3(name = "add")]
fn py_add(
slf: PyRef<'_, Self>,
key: String,
value: Vec<u8>
) -> PyResult<()> {
let result = get_runtime().block_on(async {
slf.add(key,value).await
});
fn py_add(slf: PyRef<'_, Self>, key: String, value: Vec<u8>) -> PyResult<()> {
let result = get_runtime().block_on(async { slf.add(key, value).await });
result.map_err(to_pyruntime_err)
}

#[pyo3(name = "add_currency")]
fn py_add_currency(
slf: PyRef<'_, Self>,
currency: Currency,
) -> PyResult<()> {
let result = get_runtime().block_on(async {
slf.add_currency(currency).await
});
fn py_add_currency(slf: PyRef<'_, Self>, currency: Currency) -> PyResult<()> {
let result = get_runtime().block_on(async { slf.add_currency(currency).await });
result.map_err(to_pyruntime_err)
}

#[pyo3(name = "flush_db")]
fn py_drop_schema(
slf: PyRef<'_, Self>,
) -> PyResult<()> {
let result = get_runtime().block_on(async {
delete_nautilus_postgres_tables(&slf.pool)
.await
});
fn py_drop_schema(slf: PyRef<'_, Self>) -> PyResult<()> {
let result =
get_runtime().block_on(async { delete_nautilus_postgres_tables(&slf.pool).await });
result.map_err(to_pyruntime_err)
}

#[pyo3(name = "truncate")]
fn py_truncate(
slf: PyRef<'_, Self>,
table: String
) -> PyResult<()> {
let result = get_runtime().block_on(async {
DatabaseQueries::truncate(&slf.pool, table).await
});
fn py_truncate(slf: PyRef<'_, Self>, table: String) -> PyResult<()> {
let result =
get_runtime().block_on(async { DatabaseQueries::truncate(&slf.pool, table).await });
result.map_err(to_pyruntime_err)
}
}
2 changes: 1 addition & 1 deletion nautilus_core/infrastructure/src/python/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
// limitations under the License.
// -------------------------------------------------------------------------------------------------

pub mod cache_database;
pub mod cache_database;
82 changes: 40 additions & 42 deletions nautilus_core/infrastructure/src/sql/cache_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,23 @@
// limitations under the License.
// -------------------------------------------------------------------------------------------------

use std::collections::{HashMap, VecDeque};
use tokio::sync::mpsc::{Receiver,Sender,channel};
use std::time::{Duration, Instant};
use sqlx::{PgPool};
use sqlx::postgres::PgConnectOptions;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::time::sleep;
use nautilus_model::types::currency::Currency;
use crate::sql::models::general::GeneralRow;
use crate::sql::pg::{connect_pg, get_postgres_connect_options};
use crate::sql::queries::DatabaseQueries;
use std::{
collections::{HashMap, VecDeque},
time::{Duration, Instant},
};

use nautilus_model::types::currency::Currency;
use sqlx::{postgres::PgConnectOptions, PgPool};
use tokio::{
sync::mpsc::{channel, error::TryRecvError, Receiver, Sender},
time::sleep,
};

use crate::sql::{
models::general::GeneralRow,
pg::{connect_pg, get_postgres_connect_options},
queries::DatabaseQueries,
};

#[derive(Debug)]
#[cfg_attr(
Expand All @@ -36,28 +41,25 @@ pub struct PostgresCacheDatabase {
tx: Sender<DatabaseQuery>,
}


#[derive(Debug, Clone)]
pub enum DatabaseQuery{
Add(String,Vec<u8>),
pub enum DatabaseQuery {
Add(String, Vec<u8>),
AddCurrency(Currency),
}



fn get_buffer_interval() -> Duration {
Duration::from_millis(0)
}

async fn drain_buffer(pool: &PgPool, buffer: &mut VecDeque<DatabaseQuery>) {
for cmd in buffer.drain(..) {
match cmd {
DatabaseQuery::Add(key,value) => {
DatabaseQueries ::add(pool,key,value).await.unwrap();
},
DatabaseQuery::Add(key, value) => {
DatabaseQueries::add(pool, key, value).await.unwrap();
}
DatabaseQuery::AddCurrency(currency) => {
DatabaseQueries::add_currency(pool,currency).await.unwrap();
},
DatabaseQueries::add_currency(pool, currency).await.unwrap();
}
}
}
}
Expand All @@ -68,9 +70,10 @@ impl PostgresCacheDatabase {
port: Option<u16>,
username: Option<String>,
password: Option<String>,
database: Option<String>
) -> Result<Self,sqlx::Error> {
let pg_connect_options = get_postgres_connect_options(host,port,username,password,database).unwrap();
database: Option<String>,
) -> Result<Self, sqlx::Error> {
let pg_connect_options =
get_postgres_connect_options(host, port, username, password, database).unwrap();
let pool = connect_pg(pg_connect_options.clone().into()).await.unwrap();
let (tx, rx) = channel::<DatabaseQuery>(1000);
// spawn a thread to handle messages
Expand All @@ -80,10 +83,7 @@ impl PostgresCacheDatabase {
Ok(PostgresCacheDatabase { pool, tx })
}

async fn handle_message(
mut rx: Receiver<DatabaseQuery>,
pg_connect_options: PgConnectOptions
){
async fn handle_message(mut rx: Receiver<DatabaseQuery>, pg_connect_options: PgConnectOptions) {
let pool = connect_pg(pg_connect_options).await.unwrap();
// Buffering
let mut buffer: VecDeque<DatabaseQuery> = VecDeque::new();
Expand All @@ -106,13 +106,13 @@ impl PostgresCacheDatabase {
}
}
// rain any remaining message
if !buffer.is_empty(){
drain_buffer(&pool,&mut buffer).await;
if !buffer.is_empty() {
drain_buffer(&pool, &mut buffer).await;
}
}

pub async fn load(&self) -> Result<HashMap<String, Vec<u8>>,sqlx::Error> {
let query = sqlx::query_as::<_,GeneralRow>("SELECT * FROM general");
pub async fn load(&self) -> Result<HashMap<String, Vec<u8>>, sqlx::Error> {
let query = sqlx::query_as::<_, GeneralRow>("SELECT * FROM general");
let result = query.fetch_all(&self.pool).await;
match result {
Ok(rows) => {
Expand All @@ -127,20 +127,18 @@ impl PostgresCacheDatabase {
}
}
}



pub async fn add(&self, key: String, value: Vec<u8>) -> anyhow::Result<()> {
let query = DatabaseQuery::Add(key,value);
self.tx.send(query)
.await
.map_err(|err| anyhow::anyhow!("Failed to send query to database message handler: {err}"))
let query = DatabaseQuery::Add(key, value);
self.tx.send(query).await.map_err(|err| {
anyhow::anyhow!("Failed to send query to database message handler: {err}")
})
}

pub async fn add_currency(&self, currency: Currency) -> anyhow::Result<()> {
let query = DatabaseQuery::AddCurrency(currency);
self.tx.send(query)
.await
.map_err(|err| anyhow::anyhow!("Failed to query add_currency to database message handler: {err}"))
self.tx.send(query).await.map_err(|err| {
anyhow::anyhow!("Failed to query add_currency to database message handler: {err}")
})
}
}
}
9 changes: 3 additions & 6 deletions nautilus_core/infrastructure/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,9 @@
// -------------------------------------------------------------------------------------------------

// Be careful about ordering and foreign key constraints when deleting data.
pub const NAUTILUS_TABLES: [&str; 2] = [
"general",
"currency",
];
pub const NAUTILUS_TABLES: [&str; 2] = ["general", "currency"];

pub mod cache_database;
pub mod models;
pub mod pg;
pub mod cache_database;
pub mod queries;
pub mod queries;
3 changes: 1 addition & 2 deletions nautilus_core/infrastructure/src/sql/models/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
// limitations under the License.
// -------------------------------------------------------------------------------------------------


#[derive(Debug, sqlx::FromRow)]
pub struct GeneralRow {
pub key: String,
pub value: Vec<u8>,
}
}
2 changes: 1 addition & 1 deletion nautilus_core/infrastructure/src/sql/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@
// limitations under the License.
// -------------------------------------------------------------------------------------------------

pub mod instruments;
pub mod general;
pub mod instruments;
pub mod types;
35 changes: 18 additions & 17 deletions nautilus_core/infrastructure/src/sql/models/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,30 @@
// -------------------------------------------------------------------------------------------------

use std::str::FromStr;
use sqlx::{FromRow, Row};
use sqlx::postgres::PgRow;
use nautilus_model::enums::CurrencyType;
use nautilus_model::types::currency::Currency;

use nautilus_model::{enums::CurrencyType, types::currency::Currency};
use sqlx::{postgres::PgRow, FromRow, Row};

pub struct CurrencyModel(pub Currency);

impl <'r> FromRow<'r, PgRow> for CurrencyModel {
impl<'r> FromRow<'r, PgRow> for CurrencyModel {
fn from_row(row: &'r PgRow) -> Result<Self, sqlx::Error> {
let code = row.try_get::<String,_>("code")?;
let precision = row.try_get::<i32,_>("precision")?;
let iso4217 = row.try_get::<i32,_>("iso4217")?;
let name = row.try_get::<String,_>("name")?;
let currency_type = row.try_get::<String,_>("currency_type")
let code = row.try_get::<String, _>("code")?;
let precision = row.try_get::<i32, _>("precision")?;
let iso4217 = row.try_get::<i32, _>("iso4217")?;
let name = row.try_get::<String, _>("name")?;
let currency_type = row
.try_get::<String, _>("currency_type")
.map(|res| CurrencyType::from_str(res.as_str()).unwrap())?;

let currency = Currency::new(
code.as_str(),
precision as u8,
iso4217 as u16,
name.as_str(),
currency_type,
).unwrap();
code.as_str(),
precision as u8,
iso4217 as u16,
name.as_str(),
currency_type,
)
.unwrap();
Ok(CurrencyModel(currency))
}
}
Loading

0 comments on commit 3dc18ce

Please sign in to comment.