Skip to content

Commit

Permalink
Add Postgres cache database (#1607)
Browse files Browse the repository at this point in the history
  • Loading branch information
filipmacek authored Apr 26, 2024
1 parent 2fd9337 commit 6513d20
Show file tree
Hide file tree
Showing 25 changed files with 1,012 additions and 520 deletions.
62 changes: 56 additions & 6 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,25 @@ jobs:
env:
BUILD_MODE: debug
RUST_BACKTRACE: 1
services:
redis:
image: redis
ports:
- 6379:6379
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
postgres:
image: postgres
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: pass
POSTGRES_DB: nautilus
ports:
- 5432:5432
options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5

steps:
- name: Free disk space (Ubuntu)
Expand Down Expand Up @@ -105,10 +124,16 @@ jobs:
# pre-commit run --hook-stage manual gitlint-ci
pre-commit run --all-files
- name: Install Redis (Linux)
- name: Install Nautilus CLI and run init postgres
run: |
sudo apt-get install redis-server
redis-server --daemonize yes
make install-cli
nautilus database init --schema ${{ github.workspace }}/schema
env:
POSTGRES_HOST: localhost
POSTGRES_PORT: 5432
POSTGRES_USERNAME: postgres
POSTGRES_PASSWORD: pass
POSTGRES_DATABASE: nautilus

- name: Run nautilus_core cargo tests (Linux)
run: |
Expand Down Expand Up @@ -224,6 +249,25 @@ jobs:
env:
BUILD_MODE: debug
RUST_BACKTRACE: 1
services:
redis:
image: redis
ports:
- 6379:6379
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
postgres:
image: postgres
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: pass
POSTGRES_DB: nautilus
ports:
- 5432:5432
options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5

steps:
- name: Checkout repository
Expand Down Expand Up @@ -290,10 +334,16 @@ jobs:
# pre-commit run --hook-stage manual gitlint-ci
pre-commit run --all-files
- name: Install Redis (macOS)
- name: Install Nautilus CLI and run init postgres
run: |
brew install redis
redis-server --daemonize yes
make install-cli
nautilus database init --schema ${{ github.workspace }}/schema
env:
POSTGRES_HOST: localhost
POSTGRES_PORT: 5432
POSTGRES_USERNAME: postgres
POSTGRES_PASSWORD: pass
POSTGRES_DATABASE: nautilus

- name: Run nautilus_core cargo tests (macOS)
run: |
Expand Down
2 changes: 1 addition & 1 deletion nautilus_core/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ path = "src/bin/cli.rs"
nautilus-common = { path = "../common"}
nautilus-model = { path = "../model" }
nautilus-core = { path = "../core" }
nautilus-infrastructure = { path = "../infrastructure" , features = ['sql']}
nautilus-infrastructure = { path = "../infrastructure" , features = ['postgres']}
anyhow = { workspace = true }
tokio = {workspace = true}
log = { workspace = true }
Expand Down
177 changes: 4 additions & 173 deletions nautilus_core/cli/src/database/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,182 +13,12 @@
// limitations under the License.
// -------------------------------------------------------------------------------------------------

use log::{error, info};
use nautilus_infrastructure::sql::pg::{connect_pg, get_postgres_connect_options};
use sqlx::PgPool;
use nautilus_infrastructure::sql::pg::{
connect_pg, drop_postgres, get_postgres_connect_options, init_postgres,
};

use crate::opt::{DatabaseCommand, DatabaseOpt};

/// Scans current path with keyword `nautilus_trader` and build schema dir
fn get_schema_dir() -> anyhow::Result<String> {
std::env::var("SCHEMA_DIR").or_else(|_| {
let nautilus_git_repo_name = "nautilus_trader";
let binding = std::env::current_dir().unwrap();
let current_dir = binding.to_str().unwrap();
match current_dir.find(nautilus_git_repo_name){
Some(index) => {
let schema_path = current_dir[0..index + nautilus_git_repo_name.len()].to_string() + "/schema";
Ok(schema_path)
}
None => anyhow::bail!("Could not calculate schema dir from current directory path or SCHEMA_DIR env variable")
}
})
}

pub async fn init_postgres(pg: &PgPool, database: String, password: String) -> anyhow::Result<()> {
info!("Initializing Postgres database with target permissions and schema");
// create public schema
match sqlx::query("CREATE SCHEMA IF NOT EXISTS public;")
.execute(pg)
.await
{
Ok(_) => info!("Schema public created successfully"),
Err(err) => error!("Error creating schema public: {:?}", err),
}
// create role if not exists
match sqlx::query(format!("CREATE ROLE {database} PASSWORD '{password}' LOGIN;").as_str())
.execute(pg)
.await
{
Ok(_) => info!("Role {} created successfully", database),
Err(err) => {
if err.to_string().contains("already exists") {
info!("Role {} already exists", database);
} else {
error!("Error creating role {}: {:?}", database, err);
}
}
}
// execute all the sql files in schema dir
let schema_dir = get_schema_dir()?;
let mut sql_files =
std::fs::read_dir(schema_dir)?.collect::<Result<Vec<_>, std::io::Error>>()?;
for file in &mut sql_files {
let file_name = file.file_name();
info!("Executing schema file: {:?}", file_name);
let file_path = file.path();
let sql_content = std::fs::read_to_string(file_path.clone())?;
for sql_statement in sql_content.split(';').filter(|s| !s.trim().is_empty()) {
sqlx::query(sql_statement).execute(pg).await?;
}
}
// grant connect
match sqlx::query(format!("GRANT CONNECT ON DATABASE {database} TO {database};").as_str())
.execute(pg)
.await
{
Ok(_) => info!("Connect privileges granted to role {}", database),
Err(err) => error!(
"Error granting connect privileges to role {}: {:?}",
database, err
),
}
// grant all schema privileges to the role
match sqlx::query(format!("GRANT ALL PRIVILEGES ON SCHEMA public TO {database};").as_str())
.execute(pg)
.await
{
Ok(_) => info!("All schema privileges granted to role {}", database),
Err(err) => error!(
"Error granting all privileges to role {}: {:?}",
database, err
),
}
// grant all table privileges to the role
match sqlx::query(
format!("GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO {database};").as_str(),
)
.execute(pg)
.await
{
Ok(_) => info!("All tables privileges granted to role {}", database),
Err(err) => error!(
"Error granting all privileges to role {}: {:?}",
database, err
),
}
// grant all sequence privileges to the role
match sqlx::query(
format!("GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO {database};").as_str(),
)
.execute(pg)
.await
{
Ok(_) => info!("All sequences privileges granted to role {}", database),
Err(err) => error!(
"Error granting all privileges to role {}: {:?}",
database, err
),
}
// grant all function privileges to the role
match sqlx::query(
format!("GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA public TO {database};").as_str(),
)
.execute(pg)
.await
{
Ok(_) => info!("All functions privileges granted to role {}", database),
Err(err) => error!(
"Error granting all privileges to role {}: {:?}",
database, err
),
}

Ok(())
}

pub async fn drop_postgres(pg: &PgPool, database: String) -> anyhow::Result<()> {
// execute drop owned
match sqlx::query(format!("DROP OWNED BY {database}").as_str())
.execute(pg)
.await
{
Ok(_) => info!("Dropped owned objects by role {}", database),
Err(err) => error!("Error dropping owned by role {}: {:?}", database, err),
}
// revoke connect
match sqlx::query(format!("REVOKE CONNECT ON DATABASE {database} FROM {database};").as_str())
.execute(pg)
.await
{
Ok(_) => info!("Revoked connect privileges from role {}", database),
Err(err) => error!(
"Error revoking connect privileges from role {}: {:?}",
database, err
),
}
// revoke privileges
match sqlx::query(
format!("REVOKE ALL PRIVILEGES ON DATABASE {database} FROM {database};").as_str(),
)
.execute(pg)
.await
{
Ok(_) => info!("Revoked all privileges from role {}", database),
Err(err) => error!(
"Error revoking all privileges from role {}: {:?}",
database, err
),
}
// execute drop schema
match sqlx::query("DROP SCHEMA IF EXISTS public CASCADE")
.execute(pg)
.await
{
Ok(_) => info!("Dropped schema public"),
Err(err) => error!("Error dropping schema public: {:?}", err),
}
// drop role
match sqlx::query(format!("DROP ROLE IF EXISTS {database};").as_str())
.execute(pg)
.await
{
Ok(_) => info!("Dropped role {}", database),
Err(err) => error!("Error dropping role {}: {:?}", database, err),
}
Ok(())
}

pub async fn run_database_command(opt: DatabaseOpt) -> anyhow::Result<()> {
let command = opt.command.clone();

Expand All @@ -207,6 +37,7 @@ pub async fn run_database_command(opt: DatabaseOpt) -> anyhow::Result<()> {
&pg,
pg_connect_options.database,
pg_connect_options.password,
config.schema,
)
.await?;
}
Expand Down
3 changes: 3 additions & 0 deletions nautilus_core/cli/src/opt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ pub struct DatabaseConfig {
/// Password for connecting to the database
#[arg(long)]
pub password: Option<String>,
/// Directory path to the schema files
#[arg(long)]
pub schema: Option<String>,
}

#[derive(Parser, Debug, Clone)]
Expand Down
2 changes: 1 addition & 1 deletion nautilus_core/infrastructure/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ extension-module = [
]
python = ["pyo3"]
redis = ["dep:redis"]
sql = ["dep:sqlx"]
postgres = ["dep:sqlx"]
2 changes: 1 addition & 1 deletion nautilus_core/infrastructure/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ pub mod python;
#[cfg(feature = "redis")]
pub mod redis;

#[cfg(feature = "sql")]
#[cfg(feature = "postgres")]
pub mod sql;
5 changes: 5 additions & 0 deletions nautilus_core/infrastructure/src/python/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
#[cfg(feature = "redis")]
pub mod redis;

#[cfg(feature = "postgres")]
pub mod sql;

use pyo3::{prelude::*, pymodule};

#[pymodule]
Expand All @@ -26,5 +29,7 @@ pub fn infrastructure(_: Python<'_>, m: &PyModule) -> PyResult<()> {
m.add_class::<crate::redis::cache::RedisCacheDatabase>()?;
#[cfg(feature = "redis")]
m.add_class::<crate::redis::msgbus::RedisMessageBusDatabase>()?;
#[cfg(feature = "postgres")]
m.add_class::<crate::sql::cache_database::PostgresCacheDatabase>()?;
Ok(())
}
Loading

0 comments on commit 6513d20

Please sign in to comment.