Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC Postgres cache database design #1607

Merged
merged 22 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
5efe0e7
create build-postgres action that runs on every commit
filipmacek Apr 22, 2024
281e42f
add redis docker service
filipmacek Apr 22, 2024
de87964
currency pyo3 cython transformers
filipmacek Apr 22, 2024
3ce212d
bootstrap CachePostgresAdapter
filipmacek Apr 22, 2024
3087ac1
implement currency fromRow
filipmacek Apr 22, 2024
b0c14d2
move postgres database code to infrastructure crate sql module
filipmacek Apr 22, 2024
9032f6c
remove old cache file unused
filipmacek Apr 23, 2024
44e6411
remove old database file unused
filipmacek Apr 23, 2024
a7eac2f
psql cache database all
filipmacek Apr 23, 2024
ace8ff0
add loging in sql statement execution
filipmacek Apr 23, 2024
229d03b
add postgres service to github action
filipmacek Apr 23, 2024
7019e93
implement postgres cache e2e tests
filipmacek Apr 23, 2024
3931c8b
fix precommit
filipmacek Apr 23, 2024
0773a00
refactor statement exeuction in init-postgres
filipmacek Apr 25, 2024
f0daa70
add nautilus-cli install and init-postgres in github action workflow
filipmacek Apr 25, 2024
6614b2c
refactor test cache database postgres without init and drop postgres
filipmacek Apr 25, 2024
b10f79a
fixup! add nautilus-cli install and init-postgres in github action wo…
filipmacek Apr 25, 2024
b61950f
update cli and database opt with schema dir path
filipmacek Apr 25, 2024
28c411a
refactor nautilus cli install and run for github build action
filipmacek Apr 25, 2024
1e2ee86
reset datebase any before tests
filipmacek Apr 25, 2024
7f62523
move github action change to offical build.yml file
filipmacek Apr 25, 2024
c748c70
delete nautilus_trader.iml
filipmacek Apr 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 56 additions & 6 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,25 @@ jobs:
env:
BUILD_MODE: debug
RUST_BACKTRACE: 1
services:
redis:
image: redis
ports:
- 6379:6379
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
postgres:
image: postgres
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: pass
POSTGRES_DB: nautilus
ports:
- 5432:5432
options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5

steps:
- name: Free disk space (Ubuntu)
Expand Down Expand Up @@ -105,10 +124,16 @@ jobs:
# pre-commit run --hook-stage manual gitlint-ci
pre-commit run --all-files

- name: Install Redis (Linux)
- name: Install Nautilus CLI and run init postgres
run: |
sudo apt-get install redis-server
redis-server --daemonize yes
make install-cli
nautilus database init --schema ${{ github.workspace }}/schema
env:
POSTGRES_HOST: localhost
POSTGRES_PORT: 5432
POSTGRES_USERNAME: postgres
POSTGRES_PASSWORD: pass
POSTGRES_DATABASE: nautilus

- name: Run nautilus_core cargo tests (Linux)
run: |
Expand Down Expand Up @@ -224,6 +249,25 @@ jobs:
env:
BUILD_MODE: debug
RUST_BACKTRACE: 1
services:
redis:
image: redis
ports:
- 6379:6379
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
postgres:
image: postgres
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: pass
POSTGRES_DB: nautilus
ports:
- 5432:5432
options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5

steps:
- name: Checkout repository
Expand Down Expand Up @@ -290,10 +334,16 @@ jobs:
# pre-commit run --hook-stage manual gitlint-ci
pre-commit run --all-files

- name: Install Redis (macOS)
- name: Install Nautilus CLI and run init postgres
run: |
brew install redis
redis-server --daemonize yes
make install-cli
nautilus database init --schema ${{ github.workspace }}/schema
env:
POSTGRES_HOST: localhost
POSTGRES_PORT: 5432
POSTGRES_USERNAME: postgres
POSTGRES_PASSWORD: pass
POSTGRES_DATABASE: nautilus

- name: Run nautilus_core cargo tests (macOS)
run: |
Expand Down
2 changes: 1 addition & 1 deletion nautilus_core/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ path = "src/bin/cli.rs"
nautilus-common = { path = "../common"}
nautilus-model = { path = "../model" }
nautilus-core = { path = "../core" }
nautilus-infrastructure = { path = "../infrastructure" , features = ['sql']}
nautilus-infrastructure = { path = "../infrastructure" , features = ['postgres']}
anyhow = { workspace = true }
tokio = {workspace = true}
log = { workspace = true }
Expand Down
177 changes: 4 additions & 173 deletions nautilus_core/cli/src/database/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,182 +13,12 @@
// limitations under the License.
// -------------------------------------------------------------------------------------------------

use log::{error, info};
use nautilus_infrastructure::sql::pg::{connect_pg, get_postgres_connect_options};
use sqlx::PgPool;
use nautilus_infrastructure::sql::pg::{
connect_pg, drop_postgres, get_postgres_connect_options, init_postgres,
};

use crate::opt::{DatabaseCommand, DatabaseOpt};

/// Scans current path with keyword `nautilus_trader` and build schema dir
fn get_schema_dir() -> anyhow::Result<String> {
std::env::var("SCHEMA_DIR").or_else(|_| {
let nautilus_git_repo_name = "nautilus_trader";
let binding = std::env::current_dir().unwrap();
let current_dir = binding.to_str().unwrap();
match current_dir.find(nautilus_git_repo_name){
Some(index) => {
let schema_path = current_dir[0..index + nautilus_git_repo_name.len()].to_string() + "/schema";
Ok(schema_path)
}
None => anyhow::bail!("Could not calculate schema dir from current directory path or SCHEMA_DIR env variable")
}
})
}

pub async fn init_postgres(pg: &PgPool, database: String, password: String) -> anyhow::Result<()> {
info!("Initializing Postgres database with target permissions and schema");
// create public schema
match sqlx::query("CREATE SCHEMA IF NOT EXISTS public;")
.execute(pg)
.await
{
Ok(_) => info!("Schema public created successfully"),
Err(err) => error!("Error creating schema public: {:?}", err),
}
// create role if not exists
match sqlx::query(format!("CREATE ROLE {database} PASSWORD '{password}' LOGIN;").as_str())
.execute(pg)
.await
{
Ok(_) => info!("Role {} created successfully", database),
Err(err) => {
if err.to_string().contains("already exists") {
info!("Role {} already exists", database);
} else {
error!("Error creating role {}: {:?}", database, err);
}
}
}
// execute all the sql files in schema dir
let schema_dir = get_schema_dir()?;
let mut sql_files =
std::fs::read_dir(schema_dir)?.collect::<Result<Vec<_>, std::io::Error>>()?;
for file in &mut sql_files {
let file_name = file.file_name();
info!("Executing schema file: {:?}", file_name);
let file_path = file.path();
let sql_content = std::fs::read_to_string(file_path.clone())?;
for sql_statement in sql_content.split(';').filter(|s| !s.trim().is_empty()) {
sqlx::query(sql_statement).execute(pg).await?;
}
}
// grant connect
match sqlx::query(format!("GRANT CONNECT ON DATABASE {database} TO {database};").as_str())
.execute(pg)
.await
{
Ok(_) => info!("Connect privileges granted to role {}", database),
Err(err) => error!(
"Error granting connect privileges to role {}: {:?}",
database, err
),
}
// grant all schema privileges to the role
match sqlx::query(format!("GRANT ALL PRIVILEGES ON SCHEMA public TO {database};").as_str())
.execute(pg)
.await
{
Ok(_) => info!("All schema privileges granted to role {}", database),
Err(err) => error!(
"Error granting all privileges to role {}: {:?}",
database, err
),
}
// grant all table privileges to the role
match sqlx::query(
format!("GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO {database};").as_str(),
)
.execute(pg)
.await
{
Ok(_) => info!("All tables privileges granted to role {}", database),
Err(err) => error!(
"Error granting all privileges to role {}: {:?}",
database, err
),
}
// grant all sequence privileges to the role
match sqlx::query(
format!("GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO {database};").as_str(),
)
.execute(pg)
.await
{
Ok(_) => info!("All sequences privileges granted to role {}", database),
Err(err) => error!(
"Error granting all privileges to role {}: {:?}",
database, err
),
}
// grant all function privileges to the role
match sqlx::query(
format!("GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA public TO {database};").as_str(),
)
.execute(pg)
.await
{
Ok(_) => info!("All functions privileges granted to role {}", database),
Err(err) => error!(
"Error granting all privileges to role {}: {:?}",
database, err
),
}

Ok(())
}

pub async fn drop_postgres(pg: &PgPool, database: String) -> anyhow::Result<()> {
// execute drop owned
match sqlx::query(format!("DROP OWNED BY {database}").as_str())
.execute(pg)
.await
{
Ok(_) => info!("Dropped owned objects by role {}", database),
Err(err) => error!("Error dropping owned by role {}: {:?}", database, err),
}
// revoke connect
match sqlx::query(format!("REVOKE CONNECT ON DATABASE {database} FROM {database};").as_str())
.execute(pg)
.await
{
Ok(_) => info!("Revoked connect privileges from role {}", database),
Err(err) => error!(
"Error revoking connect privileges from role {}: {:?}",
database, err
),
}
// revoke privileges
match sqlx::query(
format!("REVOKE ALL PRIVILEGES ON DATABASE {database} FROM {database};").as_str(),
)
.execute(pg)
.await
{
Ok(_) => info!("Revoked all privileges from role {}", database),
Err(err) => error!(
"Error revoking all privileges from role {}: {:?}",
database, err
),
}
// execute drop schema
match sqlx::query("DROP SCHEMA IF EXISTS public CASCADE")
.execute(pg)
.await
{
Ok(_) => info!("Dropped schema public"),
Err(err) => error!("Error dropping schema public: {:?}", err),
}
// drop role
match sqlx::query(format!("DROP ROLE IF EXISTS {database};").as_str())
.execute(pg)
.await
{
Ok(_) => info!("Dropped role {}", database),
Err(err) => error!("Error dropping role {}: {:?}", database, err),
}
Ok(())
}

pub async fn run_database_command(opt: DatabaseOpt) -> anyhow::Result<()> {
let command = opt.command.clone();

Expand All @@ -207,6 +37,7 @@ pub async fn run_database_command(opt: DatabaseOpt) -> anyhow::Result<()> {
&pg,
pg_connect_options.database,
pg_connect_options.password,
config.schema,
)
.await?;
}
Expand Down
3 changes: 3 additions & 0 deletions nautilus_core/cli/src/opt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ pub struct DatabaseConfig {
/// Password for connecting to the database
#[arg(long)]
pub password: Option<String>,
/// Directory path to the schema files
#[arg(long)]
pub schema: Option<String>,
}

#[derive(Parser, Debug, Clone)]
Expand Down
2 changes: 1 addition & 1 deletion nautilus_core/infrastructure/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ extension-module = [
]
python = ["pyo3"]
redis = ["dep:redis"]
sql = ["dep:sqlx"]
postgres = ["dep:sqlx"]
2 changes: 1 addition & 1 deletion nautilus_core/infrastructure/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ pub mod python;
#[cfg(feature = "redis")]
pub mod redis;

#[cfg(feature = "sql")]
#[cfg(feature = "postgres")]
pub mod sql;
5 changes: 5 additions & 0 deletions nautilus_core/infrastructure/src/python/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
#[cfg(feature = "redis")]
pub mod redis;

#[cfg(feature = "postgres")]
pub mod sql;

use pyo3::{prelude::*, pymodule};

#[pymodule]
Expand All @@ -26,5 +29,7 @@ pub fn infrastructure(_: Python<'_>, m: &PyModule) -> PyResult<()> {
m.add_class::<crate::redis::cache::RedisCacheDatabase>()?;
#[cfg(feature = "redis")]
m.add_class::<crate::redis::msgbus::RedisMessageBusDatabase>()?;
#[cfg(feature = "postgres")]
m.add_class::<crate::sql::cache_database::PostgresCacheDatabase>()?;
Ok(())
}
Loading