From 5efe0e74fcf2e4d819ab4eb63dd26895502d5b26 Mon Sep 17 00:00:00 2001 From: Filip Macek Date: Mon, 22 Apr 2024 08:54:03 +0200 Subject: [PATCH 01/22] create build-postgres action that runs on every commit --- .github/workflows/build_postgres.yml | 302 +++++++++++++++++++++++++++ 1 file changed, 302 insertions(+) create mode 100644 .github/workflows/build_postgres.yml diff --git a/.github/workflows/build_postgres.yml b/.github/workflows/build_postgres.yml new file mode 100644 index 000000000000..1c96b2d3a777 --- /dev/null +++ b/.github/workflows/build_postgres.yml @@ -0,0 +1,302 @@ +name: build + +on: [push] + +jobs: + build: + strategy: + fail-fast: false + matrix: + arch: [x64] + os: [ubuntu-latest] + python-version: ["3.10", "3.11", "3.12"] + defaults: + run: + shell: bash + name: build - Python ${{ matrix.python-version }} (${{ matrix.arch }} ${{ matrix.os }}) + runs-on: ${{ matrix.os }} + env: + BUILD_MODE: debug + RUST_BACKTRACE: 1 + + steps: + - name: Free disk space (Ubuntu) + uses: jlumbroso/free-disk-space@main + with: + tool-cache: true + android: false + dotnet: false + haskell: false + large-packages: true + docker-images: true + swap-storage: true + + - name: Install runner dependencies + run: sudo apt-get install -y curl clang git libssl-dev make pkg-config + + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Get Rust version from rust-toolchain.toml + id: rust-version + run: | + version=$(awk -F\" '/version/ {print $2}' nautilus_core/rust-toolchain.toml) + echo "Rust toolchain version $version" + echo "RUST_VERSION=$version" >> $GITHUB_ENV + working-directory: ${{ github.workspace }} + + - name: Set up Rust tool-chain (Linux, Windows) stable + uses: actions-rust-lang/setup-rust-toolchain@v1.5 + with: + toolchain: ${{ env.RUST_VERSION }} + components: rustfmt, clippy + + - name: Set up Python environment + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Get Python version + run: | + version=$(python -c "import sys; print('.'.join(map(str, sys.version_info[:3])))") + echo "PYTHON_VERSION=$version" >> $GITHUB_ENV + + - name: Get Poetry version from poetry-version + run: | + version=$(cat poetry-version) + echo "POETRY_VERSION=$version" >> $GITHUB_ENV + + - name: Install Poetry + uses: snok/install-poetry@v1 + with: + version: ${{ env.POETRY_VERSION }} + + - name: Install build dependencies + run: python -m pip install --upgrade pip setuptools wheel pre-commit msgspec + + - name: Install TA-Lib (Linux) + run: | + make install-talib + poetry run pip install ta-lib + + - name: Setup cached pre-commit + id: cached-pre-commit + uses: actions/cache@v4 + with: + path: ~/.cache/pre-commit + key: ${{ runner.os }}-${{ env.PYTHON_VERSION }}-pre-commit-${{ hashFiles('.pre-commit-config.yaml') }} + + - name: Set poetry cache-dir + run: echo "POETRY_CACHE_DIR=$(poetry config cache-dir)" >> $GITHUB_ENV + + - name: Poetry cache + id: cached-poetry + uses: actions/cache@v4 + with: + path: ${{ env.POETRY_CACHE_DIR }} + key: ${{ runner.os }}-${{ env.PYTHON_VERSION }}-poetry-${{ hashFiles('**/poetry.lock') }} + + - name: Run pre-commit + run: | + # pre-commit run --hook-stage manual gitlint-ci + pre-commit run --all-files + + - name: Install Redis (Linux) + run: | + sudo apt-get install redis-server + redis-server --daemonize yes + + - name: Run nautilus_core cargo tests (Linux) + run: | + cargo install cargo-nextest + make cargo-test + + - name: Run tests (Linux) + run: | + make pytest + make test-examples + + build-windows: + if: github.ref == 'refs/heads/master' || github.ref == 'refs/heads/nightly' + strategy: + fail-fast: false + matrix: + arch: [x64] + os: [windows-latest] + python-version: ["3.10", "3.11", "3.12"] + defaults: + run: + shell: bash + name: build - Python ${{ matrix.python-version }} (${{ matrix.arch }} ${{ matrix.os }}) + runs-on: ${{ matrix.os }} + env: + BUILD_MODE: debug + RUST_BACKTRACE: 1 + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Get Rust version from rust-toolchain.toml + id: rust-version + run: | + version=$(awk -F\" '/version/ {print $2}' nautilus_core/rust-toolchain.toml) + echo "Rust toolchain version $version" + echo "RUST_VERSION=$version" >> $GITHUB_ENV + working-directory: ${{ github.workspace }} + + - name: Set up Rust tool-chain (Linux, Windows) stable + uses: actions-rust-lang/setup-rust-toolchain@v1.5 + with: + toolchain: ${{ env.RUST_VERSION }} + components: rustfmt, clippy + + - name: Set up Python environment + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Get Python version + run: | + version=$(python -c "import sys; print('.'.join(map(str, sys.version_info[:3])))") + echo "PYTHON_VERSION=$version" >> $GITHUB_ENV + + - name: Get Poetry version from poetry-version + run: | + version=$(cat poetry-version) + echo "POETRY_VERSION=$version" >> $GITHUB_ENV + + - name: Install Poetry + uses: snok/install-poetry@v1 + with: + version: ${{ env.POETRY_VERSION }} + + - name: Install build dependencies + run: python -m pip install --upgrade pip setuptools wheel pre-commit msgspec + + - name: Setup cached pre-commit + id: cached-pre-commit + uses: actions/cache@v4 + with: + path: ~/.cache/pre-commit + key: ${{ runner.os }}-${{ matrix.python-version }}-pre-commit-${{ hashFiles('.pre-commit-config.yaml') }} + + - name: Set poetry cache-dir + run: echo "POETRY_CACHE_DIR=$(poetry config cache-dir)" >> $GITHUB_ENV + + - name: Poetry cache + id: cached-poetry + uses: actions/cache@v4 + with: + path: ${{ env.POETRY_CACHE_DIR }} + key: ${{ runner.os }}-${{ matrix.python-version }}-poetry-${{ hashFiles('**/poetry.lock') }} + + - name: Run pre-commit + run: | + # pre-commit run --hook-stage manual gitlint-ci + pre-commit run --all-files + + # Run tests without parallel build (avoids linker errors) + - name: Run tests (Windows) + run: | + poetry install --with test --all-extras + poetry run pytest --ignore=tests/performance_tests --new-first --failed-first + env: + PARALLEL_BUILD: false + + build-macos: + if: github.ref == 'refs/heads/master' || github.ref == 'refs/heads/nightly' + strategy: + fail-fast: false + matrix: + arch: [x64] + os: [macos-latest] + python-version: ["3.10", "3.11", "3.12"] + defaults: + run: + shell: bash + name: build - Python ${{ matrix.python-version }} (${{ matrix.arch }} ${{ matrix.os }}) + runs-on: ${{ matrix.os }} + env: + BUILD_MODE: debug + RUST_BACKTRACE: 1 + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Get Rust version from rust-toolchain.toml + id: rust-version + run: | + version=$(awk -F\" '/version/ {print $2}' nautilus_core/rust-toolchain.toml) + echo "Rust toolchain version $version" + echo "RUST_VERSION=$version" >> $GITHUB_ENV + working-directory: ${{ github.workspace }} + + # Work around as actions-rust-lang does not seem to work on macOS yet + - name: Set up Rust tool-chain (macOS) stable + uses: actions-rs/toolchain@v1 + with: + toolchain: ${{ env.RUST_VERSION }} + override: true + components: rustfmt, clippy + + - name: Set up Python environment + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Get Python version + run: | + version=$(python -c "import sys; print('.'.join(map(str, sys.version_info[:3])))") + echo "PYTHON_VERSION=$version" >> $GITHUB_ENV + + - name: Get Poetry version from poetry-version + run: | + version=$(cat poetry-version) + echo "POETRY_VERSION=$version" >> $GITHUB_ENV + + - name: Install Poetry + uses: snok/install-poetry@v1 + with: + version: ${{ env.POETRY_VERSION }} + + - name: Install build dependencies + run: python -m pip install --upgrade pip setuptools wheel pre-commit msgspec + + - name: Setup cached pre-commit + id: cached-pre-commit + uses: actions/cache@v4 + with: + path: ~/.cache/pre-commit + key: ${{ runner.os }}-${{ env.PYTHON_VERSION }}-pre-commit-${{ hashFiles('.pre-commit-config.yaml') }} + + - name: Set poetry cache-dir + run: echo "POETRY_CACHE_DIR=$(poetry config cache-dir)" >> $GITHUB_ENV + + - name: Poetry cache + id: cached-poetry + uses: actions/cache@v4 + with: + path: ${{ env.POETRY_CACHE_DIR }} + key: ${{ runner.os }}-${{ env.PYTHON_VERSION }}-poetry-${{ hashFiles('**/poetry.lock') }} + + - name: Run pre-commit + run: | + # pre-commit run --hook-stage manual gitlint-ci + pre-commit run --all-files + + - name: Install Redis (macOS) + run: | + brew install redis + redis-server --daemonize yes + + - name: Run nautilus_core cargo tests (macOS) + run: | + cargo install cargo-nextest + make cargo-test + + - name: Run tests (macOS) + run: | + make pytest + make test-examples From 281e42f137fe24e9ccf23b38f03d0a914a7c8d20 Mon Sep 17 00:00:00 2001 From: Filip Macek Date: Mon, 22 Apr 2024 12:25:12 +0200 Subject: [PATCH 02/22] add redis docker service --- .github/workflows/build_postgres.yml | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/.github/workflows/build_postgres.yml b/.github/workflows/build_postgres.yml index 1c96b2d3a777..ec245acefd77 100644 --- a/.github/workflows/build_postgres.yml +++ b/.github/workflows/build_postgres.yml @@ -19,6 +19,18 @@ jobs: BUILD_MODE: debug RUST_BACKTRACE: 1 + services: + redis: + image: redis + ports: + - 6379:6379 + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + + steps: - name: Free disk space (Ubuntu) uses: jlumbroso/free-disk-space@main @@ -101,11 +113,6 @@ jobs: # pre-commit run --hook-stage manual gitlint-ci pre-commit run --all-files - - name: Install Redis (Linux) - run: | - sudo apt-get install redis-server - redis-server --daemonize yes - - name: Run nautilus_core cargo tests (Linux) run: | cargo install cargo-nextest @@ -286,11 +293,6 @@ jobs: # pre-commit run --hook-stage manual gitlint-ci pre-commit run --all-files - - name: Install Redis (macOS) - run: | - brew install redis - redis-server --daemonize yes - - name: Run nautilus_core cargo tests (macOS) run: | cargo install cargo-nextest From de879647a92afc8b72efc7da2fbb3eb43a24281c Mon Sep 17 00:00:00 2001 From: Filip Macek Date: Mon, 22 Apr 2024 14:07:40 +0200 Subject: [PATCH 03/22] currency pyo3 cython transformers --- nautilus_trader/cache/postgres/__init__.py | 14 ++++++ .../cache/postgres/transformers.py | 44 +++++++++++++++++++ 2 files changed, 58 insertions(+) create mode 100644 nautilus_trader/cache/postgres/__init__.py create mode 100644 nautilus_trader/cache/postgres/transformers.py diff --git a/nautilus_trader/cache/postgres/__init__.py b/nautilus_trader/cache/postgres/__init__.py new file mode 100644 index 000000000000..3d34cab4588e --- /dev/null +++ b/nautilus_trader/cache/postgres/__init__.py @@ -0,0 +1,14 @@ +# ------------------------------------------------------------------------------------------------- +# Copyright (C) 2015-2024 Nautech Systems Pty Ltd. All rights reserved. +# https://nautechsystems.io +# +# Licensed under the GNU Lesser General Public License Version 3.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ------------------------------------------------------------------------------------------------- diff --git a/nautilus_trader/cache/postgres/transformers.py b/nautilus_trader/cache/postgres/transformers.py new file mode 100644 index 000000000000..bf5a252ca491 --- /dev/null +++ b/nautilus_trader/cache/postgres/transformers.py @@ -0,0 +1,44 @@ +# ------------------------------------------------------------------------------------------------- +# Copyright (C) 2015-2024 Nautech Systems Pty Ltd. All rights reserved. +# https://nautechsystems.io +# +# Licensed under the GNU Lesser General Public License Version 3.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ------------------------------------------------------------------------------------------------- + +from nautilus_trader.model.enums import CurrencyType +from nautilus_trader.model.objects import Currency +from nautilus_trader.core import nautilus_pyo3 + + +################################################################################ +# Currency +################################################################################ +def transform_currency_from_pyo3(currency: nautilus_pyo3.Currency)-> Currency: + return Currency( + code=currency.code, + precision=currency.precision, + iso4217=currency.iso4217, + name=currency.name, + currency_type=CurrencyType(currency.currency_type.value) + ) + + +def transform_currency_to_pyo3(currency: Currency)-> nautilus_pyo3.Currency: + return nautilus_pyo3.Currency( + code=currency.code, + precision=currency.precision, + iso4217=currency.iso4217, + name=currency.name, + currency_type=nautilus_pyo3.CurrencyType.from_str(currency.currency_type.name) + ) + + + From 3ce212d708f677d4da40ebd5d21944bf8cce14e7 Mon Sep 17 00:00:00 2001 From: Filip Macek Date: Mon, 22 Apr 2024 14:07:56 +0200 Subject: [PATCH 04/22] bootstrap CachePostgresAdapter --- nautilus_trader/cache/postgres/adapter.py | 59 +++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 nautilus_trader/cache/postgres/adapter.py diff --git a/nautilus_trader/cache/postgres/adapter.py b/nautilus_trader/cache/postgres/adapter.py new file mode 100644 index 000000000000..d52a6b9d2e45 --- /dev/null +++ b/nautilus_trader/cache/postgres/adapter.py @@ -0,0 +1,59 @@ +# ------------------------------------------------------------------------------------------------- +# Copyright (C) 2015-2024 Nautech Systems Pty Ltd. All rights reserved. +# https://nautechsystems.io +# +# Licensed under the GNU Lesser General Public License Version 3.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ------------------------------------------------------------------------------------------------- + +from nautilus_trader.cache.config import CacheConfig +from nautilus_trader.cache.facade import CacheDatabaseFacade +from nautilus_trader.cache.postgres.transformers import transform_currency_from_pyo3 +from nautilus_trader.cache.postgres.transformers import transform_currency_to_pyo3 +from nautilus_trader.core.nautilus_pyo3 import PostgresCacheDatabase +from nautilus_trader.model.objects import Currency + + +class CachePostgresAdapter(CacheDatabaseFacade): + + def __init__( + self, + config: CacheConfig = None, + ): + if config: + config = CacheConfig() + super().__init__(config) + self._backing: PostgresCacheDatabase = PostgresCacheDatabase.connect() + + def flush(self): + self._backing.flush_db() + + def load(self): + data = self._backing.load() + return {key: bytes(value) for key, value in data.items()} + + def add(self, key: str, value: bytes): + self._backing.add(key, value) + + def add_currency(self, currency: Currency): + currency_pyo3 = transform_currency_to_pyo3(currency) + self._backing.add_currency(currency_pyo3) + + def load_currencies(self) -> dict[str,Currency]: + currencies = self._backing.load_currencies() + return { currency.code: transform_currency_from_pyo3(currency) for currency in currencies} + + def load_currency(self, code: str) -> Currency | None: + currency_pyo3 = self._backing.load_currency(code) + if currency_pyo3: + return transform_currency_from_pyo3(currency_pyo3) + return None + + From 3087ac120334c1f384d22d47fa52ee3c637bb1fb Mon Sep 17 00:00:00 2001 From: Filip Macek Date: Mon, 22 Apr 2024 14:24:32 +0200 Subject: [PATCH 05/22] implement currency fromRow --- nautilus_core/model/src/types/currency.rs | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/nautilus_core/model/src/types/currency.rs b/nautilus_core/model/src/types/currency.rs index 28d0576e3870..2ebf57de6f06 100644 --- a/nautilus_core/model/src/types/currency.rs +++ b/nautilus_core/model/src/types/currency.rs @@ -142,6 +142,27 @@ impl<'de> Deserialize<'de> for Currency { } } +impl <'r> FromRow<'r, PgRow> for Currency { + fn from_row(row: &'r PgRow) -> Result { + let code = row.try_get::("code")?; + let precision = row.try_get::("precision")?; + let iso4217 = row.try_get::("iso4217")?; + let name = row.try_get::("name")?; + let currency_type = row.try_get::("currency_type") + .map(|res| CurrencyType::from_str(res.as_str()).unwrap())?; + + Ok( + Currency::new( + code.as_str(), + precision as u8, + iso4217 as u16, + name.as_str(), + currency_type, + ).unwrap()) + } +} + + //////////////////////////////////////////////////////////////////////////////// // Tests //////////////////////////////////////////////////////////////////////////////// From b0c14d2d6fcb70436f21e5f6286ec28262239524 Mon Sep 17 00:00:00 2001 From: Filip Macek Date: Mon, 22 Apr 2024 14:27:13 +0200 Subject: [PATCH 06/22] move postgres database code to infrastructure crate sql module --- nautilus_core/cli/src/database/postgres.rs | 169 ------------------- nautilus_core/infrastructure/src/sql/pg.rs | 181 +++++++++++++++++++++ 2 files changed, 181 insertions(+), 169 deletions(-) diff --git a/nautilus_core/cli/src/database/postgres.rs b/nautilus_core/cli/src/database/postgres.rs index 5536276b5499..904fae18974a 100644 --- a/nautilus_core/cli/src/database/postgres.rs +++ b/nautilus_core/cli/src/database/postgres.rs @@ -19,175 +19,6 @@ use sqlx::PgPool; use crate::opt::{DatabaseCommand, DatabaseOpt}; -/// Scans current path with keyword `nautilus_trader` and build schema dir -fn get_schema_dir() -> anyhow::Result { - std::env::var("SCHEMA_DIR").or_else(|_| { - let nautilus_git_repo_name = "nautilus_trader"; - let binding = std::env::current_dir().unwrap(); - let current_dir = binding.to_str().unwrap(); - match current_dir.find(nautilus_git_repo_name){ - Some(index) => { - let schema_path = current_dir[0..index + nautilus_git_repo_name.len()].to_string() + "/schema"; - Ok(schema_path) - } - None => anyhow::bail!("Could not calculate schema dir from current directory path or SCHEMA_DIR env variable") - } - }) -} - -pub async fn init_postgres(pg: &PgPool, database: String, password: String) -> anyhow::Result<()> { - info!("Initializing Postgres database with target permissions and schema"); - // create public schema - match sqlx::query("CREATE SCHEMA IF NOT EXISTS public;") - .execute(pg) - .await - { - Ok(_) => info!("Schema public created successfully"), - Err(err) => error!("Error creating schema public: {:?}", err), - } - // create role if not exists - match sqlx::query(format!("CREATE ROLE {database} PASSWORD '{password}' LOGIN;").as_str()) - .execute(pg) - .await - { - Ok(_) => info!("Role {} created successfully", database), - Err(err) => { - if err.to_string().contains("already exists") { - info!("Role {} already exists", database); - } else { - error!("Error creating role {}: {:?}", database, err); - } - } - } - // execute all the sql files in schema dir - let schema_dir = get_schema_dir()?; - let mut sql_files = - std::fs::read_dir(schema_dir)?.collect::, std::io::Error>>()?; - for file in &mut sql_files { - let file_name = file.file_name(); - info!("Executing schema file: {:?}", file_name); - let file_path = file.path(); - let sql_content = std::fs::read_to_string(file_path.clone())?; - for sql_statement in sql_content.split(';').filter(|s| !s.trim().is_empty()) { - sqlx::query(sql_statement).execute(pg).await?; - } - } - // grant connect - match sqlx::query(format!("GRANT CONNECT ON DATABASE {database} TO {database};").as_str()) - .execute(pg) - .await - { - Ok(_) => info!("Connect privileges granted to role {}", database), - Err(err) => error!( - "Error granting connect privileges to role {}: {:?}", - database, err - ), - } - // grant all schema privileges to the role - match sqlx::query(format!("GRANT ALL PRIVILEGES ON SCHEMA public TO {database};").as_str()) - .execute(pg) - .await - { - Ok(_) => info!("All schema privileges granted to role {}", database), - Err(err) => error!( - "Error granting all privileges to role {}: {:?}", - database, err - ), - } - // grant all table privileges to the role - match sqlx::query( - format!("GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO {database};").as_str(), - ) - .execute(pg) - .await - { - Ok(_) => info!("All tables privileges granted to role {}", database), - Err(err) => error!( - "Error granting all privileges to role {}: {:?}", - database, err - ), - } - // grant all sequence privileges to the role - match sqlx::query( - format!("GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO {database};").as_str(), - ) - .execute(pg) - .await - { - Ok(_) => info!("All sequences privileges granted to role {}", database), - Err(err) => error!( - "Error granting all privileges to role {}: {:?}", - database, err - ), - } - // grant all function privileges to the role - match sqlx::query( - format!("GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA public TO {database};").as_str(), - ) - .execute(pg) - .await - { - Ok(_) => info!("All functions privileges granted to role {}", database), - Err(err) => error!( - "Error granting all privileges to role {}: {:?}", - database, err - ), - } - - Ok(()) -} - -pub async fn drop_postgres(pg: &PgPool, database: String) -> anyhow::Result<()> { - // execute drop owned - match sqlx::query(format!("DROP OWNED BY {database}").as_str()) - .execute(pg) - .await - { - Ok(_) => info!("Dropped owned objects by role {}", database), - Err(err) => error!("Error dropping owned by role {}: {:?}", database, err), - } - // revoke connect - match sqlx::query(format!("REVOKE CONNECT ON DATABASE {database} FROM {database};").as_str()) - .execute(pg) - .await - { - Ok(_) => info!("Revoked connect privileges from role {}", database), - Err(err) => error!( - "Error revoking connect privileges from role {}: {:?}", - database, err - ), - } - // revoke privileges - match sqlx::query( - format!("REVOKE ALL PRIVILEGES ON DATABASE {database} FROM {database};").as_str(), - ) - .execute(pg) - .await - { - Ok(_) => info!("Revoked all privileges from role {}", database), - Err(err) => error!( - "Error revoking all privileges from role {}: {:?}", - database, err - ), - } - // execute drop schema - match sqlx::query("DROP SCHEMA IF EXISTS public CASCADE") - .execute(pg) - .await - { - Ok(_) => info!("Dropped schema public"), - Err(err) => error!("Error dropping schema public: {:?}", err), - } - // drop role - match sqlx::query(format!("DROP ROLE IF EXISTS {database};").as_str()) - .execute(pg) - .await - { - Ok(_) => info!("Dropped role {}", database), - Err(err) => error!("Error dropping role {}: {:?}", database, err), - } - Ok(()) -} pub async fn run_database_command(opt: DatabaseOpt) -> anyhow::Result<()> { let command = opt.command.clone(); diff --git a/nautilus_core/infrastructure/src/sql/pg.rs b/nautilus_core/infrastructure/src/sql/pg.rs index c86a5245e0d3..ab78e25124e2 100644 --- a/nautilus_core/infrastructure/src/sql/pg.rs +++ b/nautilus_core/infrastructure/src/sql/pg.rs @@ -14,6 +14,7 @@ // ------------------------------------------------------------------------------------------------- use sqlx::{postgres::PgConnectOptions, query, ConnectOptions, PgPool}; +use tracing::log::{error, info}; use crate::sql::NAUTILUS_TABLES; @@ -112,3 +113,183 @@ pub async fn delete_nautilus_postgres_tables(db: &PgPool) -> anyhow::Result<()> pub async fn connect_pg(options: PgConnectOptions) -> anyhow::Result { Ok(PgPool::connect_with(options).await.unwrap()) } + +/// Scans current path with keyword nautilus_trader and build schema dir +fn get_schema_dir() -> anyhow::Result { + std::env::var("SCHEMA_DIR").or_else(|_| { + let nautilus_git_repo_name = "nautilus_trader"; + let binding = std::env::current_dir().unwrap(); + let current_dir = binding.to_str().unwrap(); + match current_dir.find(nautilus_git_repo_name){ + Some(index) => { + let schema_path = current_dir[0..index + nautilus_git_repo_name.len()].to_string() + "/schema"; + Ok(schema_path) + } + None => anyhow::bail!("Could not calculate schema dir from current directory path or SCHEMA_DIR env variable") + } + }) +} + +pub async fn init_postgres(pg: &PgPool, database: String, password: String) -> anyhow::Result<()> { + info!("Initializing Postgres database with target permissions and schema"); + // create public schema + match sqlx::query("CREATE SCHEMA IF NOT EXISTS public;") + .execute(pg) + .await + { + Ok(_) => info!("Schema public created successfully"), + Err(err) => error!("Error creating schema public: {:?}", err), + } + // create role if not exists + match sqlx::query(format!("CREATE ROLE {} PASSWORD '{}' LOGIN;", database, password).as_str()) + .execute(pg) + .await + { + Ok(_) => info!("Role {} created successfully", database), + Err(err) => { + if err.to_string().contains("already exists") { + info!("Role {} already exists", database); + } else { + error!("Error creating role {}: {:?}", database, err); + } + } + } + // execute all the sql files in schema dir + let schema_dir = get_schema_dir()?; + let mut sql_files = + std::fs::read_dir(schema_dir)?.collect::, std::io::Error>>()?; + for file in &mut sql_files { + let file_name = file.file_name(); + info!("Executing schema file: {:?}", file_name); + let file_path = file.path(); + let sql_content = std::fs::read_to_string(file_path.clone())?; + for sql_statement in sql_content.split(';').filter(|s| !s.trim().is_empty()) { + sqlx::query(sql_statement).execute(pg).await?; + } + } + // grant connect + match sqlx::query(format!("GRANT CONNECT ON DATABASE {0} TO {0};", database).as_str()) + .execute(pg) + .await + { + Ok(_) => info!("Connect privileges granted to role {}", database), + Err(err) => error!( + "Error granting connect privileges to role {}: {:?}", + database, err + ), + } + // grant all schema privileges to the role + match sqlx::query(format!("GRANT ALL PRIVILEGES ON SCHEMA public TO {};", database).as_str()) + .execute(pg) + .await + { + Ok(_) => info!("All schema privileges granted to role {}", database), + Err(err) => error!( + "Error granting all privileges to role {}: {:?}", + database, err + ), + } + // grant all table privileges to the role + match sqlx::query( + format!( + "GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO {};", + database + ) + .as_str(), + ) + .execute(pg) + .await + { + Ok(_) => info!("All tables privileges granted to role {}", database), + Err(err) => error!( + "Error granting all privileges to role {}: {:?}", + database, err + ), + } + // grant all sequence privileges to the role + match sqlx::query( + format!( + "GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO {};", + database + ) + .as_str(), + ) + .execute(pg) + .await + { + Ok(_) => info!("All sequences privileges granted to role {}", database), + Err(err) => error!( + "Error granting all privileges to role {}: {:?}", + database, err + ), + } + // grant all function privileges to the role + match sqlx::query( + format!( + "GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA public TO {};", + database + ) + .as_str(), + ) + .execute(pg) + .await + { + Ok(_) => info!("All functions privileges granted to role {}", database), + Err(err) => error!( + "Error granting all privileges to role {}: {:?}", + database, err + ), + } + + Ok(()) +} + +pub async fn drop_postgres(pg: &PgPool, database: String) -> anyhow::Result<()> { + // execute drop owned + match sqlx::query(format!("DROP OWNED BY {}", database).as_str()) + .execute(pg) + .await + { + Ok(_) => info!("Dropped owned objects by role {}", database), + Err(err) => error!("Error dropping owned by role {}: {:?}", database, err), + } + // revoke connect + match sqlx::query(format!("REVOKE CONNECT ON DATABASE {0} FROM {0};", database).as_str()) + .execute(pg) + .await + { + Ok(_) => info!("Revoked connect privileges from role {}", database), + Err(err) => error!( + "Error revoking connect privileges from role {}: {:?}", + database, err + ), + } + // revoke privileges + match sqlx::query(format!("REVOKE ALL PRIVILEGES ON DATABASE {0} FROM {0};", database).as_str()) + .execute(pg) + .await + { + Ok(_) => info!("Revoked all privileges from role {}", database), + Err(err) => error!( + "Error revoking all privileges from role {}: {:?}", + database, err + ), + } + // execute drop schema + match sqlx::query("DROP SCHEMA IF EXISTS public CASCADE") + .execute(pg) + .await + { + Ok(_) => info!("Dropped schema public"), + Err(err) => error!("Error dropping schema public: {:?}", err), + } + // drop role + match sqlx::query(format!("DROP ROLE IF EXISTS {};", database).as_str()) + .execute(pg) + .await + { + Ok(_) => info!("Dropped role {}", database), + Err(err) => error!("Error dropping role {}: {:?}", database, err), + } + Ok(()) +} From 9032f6cd0071d6d5d23b494bf8a17215e2dca264 Mon Sep 17 00:00:00 2001 From: Filip Macek Date: Tue, 23 Apr 2024 10:43:12 +0200 Subject: [PATCH 07/22] remove old cache file unused --- nautilus_core/infrastructure/src/sql/cache.rs | 100 ------------------ 1 file changed, 100 deletions(-) delete mode 100644 nautilus_core/infrastructure/src/sql/cache.rs diff --git a/nautilus_core/infrastructure/src/sql/cache.rs b/nautilus_core/infrastructure/src/sql/cache.rs deleted file mode 100644 index fecf1072d482..000000000000 --- a/nautilus_core/infrastructure/src/sql/cache.rs +++ /dev/null @@ -1,100 +0,0 @@ -// ------------------------------------------------------------------------------------------------- -// Copyright (C) 2015-2024 Nautech Systems Pty Ltd. All rights reserved. -// https://nautechsystems.io -// -// Licensed under the GNU Lesser General Public License Version 3.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// ------------------------------------------------------------------------------------------------ - -use nautilus_model::identifiers::trader_id::TraderId; -use sqlx::Error; - -use crate::sql::{database::Database, schema::GeneralItem}; - -pub struct SqlCacheDatabase { - trader_id: TraderId, - db: Database, -} - -impl SqlCacheDatabase { - #[must_use] - pub fn new(trader_id: TraderId, database: Database) -> Self { - Self { - trader_id, - db: database, - } - } - #[must_use] - pub fn key_trader(&self) -> String { - format!("trader-{}", self.trader_id) - } - - #[must_use] - pub fn key_general(&self) -> String { - format!("{}:general:", self.key_trader()) - } - - pub async fn add(&self, key: String, value: String) -> Result { - let query = format!( - "INSERT INTO general (key, value) VALUES ('{key}', '{value}') ON CONFLICT (key) DO NOTHING;" - ); - self.db.execute(query.as_str()).await - } - - pub async fn get(&self, key: String) -> Vec { - let query = format!("SELECT * FROM general WHERE key = '{key}'"); - self.db - .fetch_all::(query.as_str()) - .await - .unwrap() - } -} - -//////////////////////////////////////////////////////////////////////////////// -// Tests -//////////////////////////////////////////////////////////////////////////////// -#[cfg(test)] -mod tests { - use nautilus_model::identifiers::stubs::trader_id; - - use super::SqlCacheDatabase; - use crate::sql::database::{init_db_schema, setup_test_database}; - - async fn setup_sql_cache_database() -> SqlCacheDatabase { - let db = setup_test_database().await; - let schema_dir = "../../schema"; - init_db_schema(&db, schema_dir) - .await - .expect("Failed to init db schema"); - let trader = trader_id(); - SqlCacheDatabase::new(trader, db) - } - - #[tokio::test] - async fn test_keys() { - let cache = setup_sql_cache_database().await; - assert_eq!(cache.key_trader(), "trader-TRADER-001"); - assert_eq!(cache.key_general(), "trader-TRADER-001:general:"); - } - - #[tokio::test] - async fn test_add_get_general() { - let cache = setup_sql_cache_database().await; - cache - .add(String::from("key1"), String::from("value1")) - .await - .expect("Failed to add key"); - let value = cache.get(String::from("key1")).await; - assert_eq!(value.len(), 1); - let item = value.first().unwrap(); - assert_eq!(item.key, "key1"); - assert_eq!(item.value, "value1"); - } -} From 44e64117a63bb7d6768aea07d50a931c359215e0 Mon Sep 17 00:00:00 2001 From: Filip Macek Date: Tue, 23 Apr 2024 10:43:28 +0200 Subject: [PATCH 08/22] remove old database file unused --- .../infrastructure/src/sql/database.rs | 226 ------------------ 1 file changed, 226 deletions(-) delete mode 100644 nautilus_core/infrastructure/src/sql/database.rs diff --git a/nautilus_core/infrastructure/src/sql/database.rs b/nautilus_core/infrastructure/src/sql/database.rs deleted file mode 100644 index f757025b99ae..000000000000 --- a/nautilus_core/infrastructure/src/sql/database.rs +++ /dev/null @@ -1,226 +0,0 @@ -// ------------------------------------------------------------------------------------------------- -// Copyright (C) 2015-2024 Nautech Systems Pty Ltd. All rights reserved. -// https://nautechsystems.io -// -// Licensed under the GNU Lesser General Public License Version 3.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// ------------------------------------------------------------------------------------------------- - -use std::{path::Path, str::FromStr}; - -use sqlx::{ - any::{install_default_drivers, AnyConnectOptions}, - sqlite::SqliteConnectOptions, - Error, Pool, SqlitePool, -}; - -#[derive(Clone)] -pub struct Database { - pub pool: Pool, -} - -pub enum DatabaseEngine { - POSTGRES, - SQLITE, -} - -fn str_to_database_engine(engine_str: &str) -> DatabaseEngine { - match engine_str { - "POSTGRES" | "postgres" => DatabaseEngine::POSTGRES, - "SQLITE" | "sqlite" => DatabaseEngine::SQLITE, - _ => panic!("Invalid database engine: {engine_str}"), - } -} - -impl Database { - pub async fn new(engine: Option, conn_string: Option<&str>) -> Self { - install_default_drivers(); - let db_options = Self::get_db_options(engine, conn_string); - let db = sqlx::pool::PoolOptions::new() - .max_connections(20) - .connect_with(db_options) - .await; - match db { - Ok(pool) => Self { pool }, - Err(err) => { - panic!("Failed to connect to database: {err}") - } - } - } - - #[must_use] - pub fn get_db_options( - engine: Option, - conn_string: Option<&str>, - ) -> AnyConnectOptions { - let connection_string = match conn_string { - Some(conn_string) => Ok(conn_string.to_string()), - None => std::env::var("DATABASE_URL"), - }; - let database_engine: DatabaseEngine = match engine { - Some(engine) => engine, - None => str_to_database_engine( - std::env::var("DATABASE_ENGINE") - .unwrap_or("SQLITE".to_string()) - .as_str(), - ), - }; - match connection_string { - Ok(connection_string) => match database_engine { - DatabaseEngine::POSTGRES => AnyConnectOptions::from_str(connection_string.as_str()) - .expect("Invalid PostgresSQL connection string"), - DatabaseEngine::SQLITE => AnyConnectOptions::from_str(connection_string.as_str()) - .expect("Invalid SQLITE connection string"), - }, - Err(err) => { - panic!("Failed to connect to database: {err}") - } - } - } - - pub async fn execute(&self, query_str: &str) -> Result { - let result = sqlx::query(query_str).execute(&self.pool).await?; - - Ok(result.rows_affected()) - } - - pub async fn fetch_all(&self, query_str: &str) -> Result, Error> - where - T: for<'r> sqlx::FromRow<'r, sqlx::any::AnyRow> + Unpin, - { - let rows = sqlx::query(query_str).fetch_all(&self.pool).await?; - - let mut objects = Vec::new(); - for row in rows { - let obj = T::from_row(&row)?; - objects.push(obj); - } - - Ok(objects) - } -} - -pub async fn init_db_schema(db: &Database, schema_dir: &str) -> anyhow::Result<()> { - // scan all the files in the current directory - let mut sql_files = - std::fs::read_dir(schema_dir)?.collect::, std::io::Error>>()?; - - for file in &mut sql_files { - let file_name = file.file_name(); - println!("Executing SQL file: {file_name:?}"); - let file_path = file.path(); - let sql_content = std::fs::read_to_string(file_path.clone())?; - for sql_statement in sql_content.split(';').filter(|s| !s.trim().is_empty()) { - db.execute(sql_statement).await.unwrap_or_else(|e| { - panic!( - "Failed to execute SQL statement: {} with reason {}", - file_path.display(), - e - ) - }); - } - } - Ok(()) -} - -pub async fn setup_test_database() -> Database { - // check if test_db.sqlite exists,if not, create it - let db_path = std::env::var("TEST_DB_PATH").unwrap_or("test_db.sqlite".to_string()); - let db_file_path = Path::new(db_path.as_str()); - let exists = db_file_path.exists(); - if !exists { - SqlitePool::connect_with( - SqliteConnectOptions::new() - .filename(db_file_path) - .create_if_missing(true), - ) - .await - .expect("Failed to create test_db.sqlite"); - } - Database::new(Some(DatabaseEngine::SQLITE), Some("sqlite:test_db.sqlite")).await -} - -//////////////////////////////////////////////////////////////////////////////// -// Tests -//////////////////////////////////////////////////////////////////////////////// -#[cfg(test)] -mod tests { - - use sqlx::{FromRow, Row}; - - use crate::sql::database::{setup_test_database, Database}; - - async fn init_item_table(database: &Database) { - database - .execute("CREATE TABLE IF NOT EXISTS items (key TEXT PRIMARY KEY, value TEXT)") - .await - .expect("Failed to create table item"); - } - - async fn drop_table(database: &Database) { - database - .execute("DROP TABLE items") - .await - .expect("Failed to drop table items"); - } - - #[tokio::test] - async fn test_database() { - let db = setup_test_database().await; - let rows_affected = db.execute("SELECT 1").await.unwrap(); - // it will not fail and give 0 rows affected - assert_eq!(rows_affected, 0); - } - - #[tokio::test] - async fn test_database_fetch_all() { - let db = setup_test_database().await; - struct SimpleValue { - value: i32, - } - impl FromRow<'_, sqlx::any::AnyRow> for SimpleValue { - fn from_row(row: &sqlx::any::AnyRow) -> Result { - Ok(Self { - value: row.try_get(0)?, - }) - } - } - let result = db.fetch_all::("SELECT 3").await.unwrap(); - assert_eq!(result[0].value, 3); - } - - #[tokio::test] - async fn test_insert_and_select() { - let db = setup_test_database().await; - init_item_table(&db).await; - // insert some value - db.execute("INSERT INTO items (key, value) VALUES ('key1', 'value1')") - .await - .unwrap(); - // fetch item, impl Data struct - struct Item { - key: String, - value: String, - } - impl FromRow<'_, sqlx::any::AnyRow> for Item { - fn from_row(row: &sqlx::any::AnyRow) -> Result { - Ok(Self { - key: row.try_get(0)?, - value: row.try_get(1)?, - }) - } - } - let result = db.fetch_all::("SELECT * FROM items").await.unwrap(); - assert_eq!(result.len(), 1); - assert_eq!(result[0].key, "key1"); - assert_eq!(result[0].value, "value1"); - drop_table(&db).await; - } -} From a7eac2fb8d91934e05a01b83dda903af24ab41b6 Mon Sep 17 00:00:00 2001 From: Filip Macek Date: Tue, 23 Apr 2024 15:31:58 +0200 Subject: [PATCH 09/22] psql cache database all --- nautilus_core/Cargo.lock | 4 + nautilus_core/cli/Cargo.toml | 2 +- nautilus_core/infrastructure/Cargo.toml | 2 +- nautilus_core/infrastructure/src/lib.rs | 2 +- .../infrastructure/src/python/mod.rs | 5 + .../src/python/sql/cache_database.rs | 119 ++++++++++++++ .../infrastructure/src/python/sql/mod.rs | 16 ++ .../infrastructure/src/sql/cache_database.rs | 146 ++++++++++++++++++ nautilus_core/infrastructure/src/sql/mod.rs | 13 +- .../src/sql/{schema.rs => models/general.rs} | 9 +- .../infrastructure/src/sql/models/mod.rs | 2 + .../infrastructure/src/sql/models/types.rs | 42 +++++ .../infrastructure/src/sql/queries.rs | 92 +++++++++++ nautilus_core/model/src/types/currency.rs | 20 --- nautilus_core/persistence/Cargo.toml | 12 +- .../test_cache_database_postgres.py | 124 +++++++++++++++ 16 files changed, 573 insertions(+), 37 deletions(-) create mode 100644 nautilus_core/infrastructure/src/python/sql/cache_database.rs create mode 100644 nautilus_core/infrastructure/src/python/sql/mod.rs create mode 100644 nautilus_core/infrastructure/src/sql/cache_database.rs rename nautilus_core/infrastructure/src/sql/{schema.rs => models/general.rs} (91%) create mode 100644 nautilus_core/infrastructure/src/sql/models/types.rs create mode 100644 nautilus_core/infrastructure/src/sql/queries.rs create mode 100644 tests/integration_tests/infrastructure/test_cache_database_postgres.py diff --git a/nautilus_core/Cargo.lock b/nautilus_core/Cargo.lock index 588d4ba3d425..32c7a71899d8 100644 --- a/nautilus_core/Cargo.lock +++ b/nautilus_core/Cargo.lock @@ -2801,14 +2801,18 @@ dependencies = [ "datafusion", "dotenv", "futures", + "nautilus-accounting", + "nautilus-common", "nautilus-core", "nautilus-model", "procfs", "pyo3", + "pyo3-asyncio", "quickcheck", "quickcheck_macros", "rand", "rstest", + "sqlx", "thiserror", "tokio", ] diff --git a/nautilus_core/cli/Cargo.toml b/nautilus_core/cli/Cargo.toml index 941f013ea46e..163f78e96bb8 100644 --- a/nautilus_core/cli/Cargo.toml +++ b/nautilus_core/cli/Cargo.toml @@ -14,7 +14,7 @@ path = "src/bin/cli.rs" nautilus-common = { path = "../common"} nautilus-model = { path = "../model" } nautilus-core = { path = "../core" } -nautilus-infrastructure = { path = "../infrastructure" , features = ['sql']} +nautilus-infrastructure = { path = "../infrastructure" , features = ['postgres']} anyhow = { workspace = true } tokio = {workspace = true} log = { workspace = true } diff --git a/nautilus_core/infrastructure/Cargo.toml b/nautilus_core/infrastructure/Cargo.toml index 95a2ad9eff91..594ca1448b37 100644 --- a/nautilus_core/infrastructure/Cargo.toml +++ b/nautilus_core/infrastructure/Cargo.toml @@ -50,4 +50,4 @@ extension-module = [ ] python = ["pyo3"] redis = ["dep:redis"] -sql = ["dep:sqlx"] +postgres = ["dep:sqlx"] diff --git a/nautilus_core/infrastructure/src/lib.rs b/nautilus_core/infrastructure/src/lib.rs index 6f87ec9dc619..e34994a90fa7 100644 --- a/nautilus_core/infrastructure/src/lib.rs +++ b/nautilus_core/infrastructure/src/lib.rs @@ -34,5 +34,5 @@ pub mod python; #[cfg(feature = "redis")] pub mod redis; -#[cfg(feature = "sql")] +#[cfg(feature = "postgres")] pub mod sql; diff --git a/nautilus_core/infrastructure/src/python/mod.rs b/nautilus_core/infrastructure/src/python/mod.rs index 6c07e5f0dbfc..6bf67c7c8340 100644 --- a/nautilus_core/infrastructure/src/python/mod.rs +++ b/nautilus_core/infrastructure/src/python/mod.rs @@ -18,6 +18,9 @@ #[cfg(feature = "redis")] pub mod redis; +#[cfg(feature = "postgres")] +pub mod sql; + use pyo3::{prelude::*, pymodule}; #[pymodule] @@ -26,5 +29,7 @@ pub fn infrastructure(_: Python<'_>, m: &PyModule) -> PyResult<()> { m.add_class::()?; #[cfg(feature = "redis")] m.add_class::()?; + #[cfg(feature = "postgres")] + m.add_class::()?; Ok(()) } diff --git a/nautilus_core/infrastructure/src/python/sql/cache_database.rs b/nautilus_core/infrastructure/src/python/sql/cache_database.rs new file mode 100644 index 000000000000..a2c871401220 --- /dev/null +++ b/nautilus_core/infrastructure/src/python/sql/cache_database.rs @@ -0,0 +1,119 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (C) 2015-2024 Nautech Systems Pty Ltd. All rights reserved. +// https://nautechsystems.io +// +// Licensed under the GNU Lesser General Public License Version 3.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------------------------------- + +use std::collections::HashMap; +use pyo3::prelude::*; +use nautilus_core::python::{to_pyruntime_err}; +use nautilus_common::runtime::get_runtime; +use nautilus_model::types::currency::Currency; +use crate::sql::cache_database::PostgresCacheDatabase; +use crate::sql::pg::delete_nautilus_postgres_tables; +use crate::sql::queries::DatabaseQueries; + + +#[pymethods] +impl PostgresCacheDatabase { + #[staticmethod] + #[pyo3(name = "connect")] + fn py_connect( + host: Option, + port: Option, + username: Option, + password: Option, + database: Option + ) -> PyResult { + let result = get_runtime().block_on(async { + PostgresCacheDatabase::connect(host, port, username, password, database).await + }); + result.map_err(to_pyruntime_err) + } + + #[pyo3(name = "load")] + fn py_load<'py>( + slf: PyRef<'_, Self>, + ) -> PyResult>> { + let result = get_runtime().block_on(async { + slf.load().await + }); + result.map_err(to_pyruntime_err) + } + + + #[pyo3(name = "load_currency")] + fn py_load_currency( + slf: PyRef<'_, Self>, + code: &str, + ) -> PyResult> { + let result = get_runtime().block_on(async { + DatabaseQueries::load_currency(&slf.pool, code).await + }); + result.map_err(to_pyruntime_err) + } + + #[pyo3(name = "load_currencies")] + fn py_load_currencies<'py>( + slf: PyRef<'_, Self>, + ) -> PyResult> { + let result = get_runtime().block_on(async { + DatabaseQueries::load_currencies(&slf.pool).await + }); + result.map_err(to_pyruntime_err) + } + + #[pyo3(name = "add")] + fn py_add( + slf: PyRef<'_, Self>, + key: String, + value: Vec + ) -> PyResult<()> { + let result = get_runtime().block_on(async { + slf.add(key,value).await + }); + result.map_err(to_pyruntime_err) + } + + #[pyo3(name = "add_currency")] + fn py_add_currency( + slf: PyRef<'_, Self>, + currency: Currency, + ) -> PyResult<()> { + let result = get_runtime().block_on(async { + slf.add_currency(currency).await + }); + result.map_err(to_pyruntime_err) + } + + #[pyo3(name = "flush_db")] + fn py_drop_schema( + slf: PyRef<'_, Self>, + ) -> PyResult<()> { + let result = get_runtime().block_on(async { + delete_nautilus_postgres_tables(&slf.pool) + .await + }); + result.map_err(to_pyruntime_err) + } + + #[pyo3(name = "truncate")] + fn py_truncate( + slf: PyRef<'_, Self>, + table: String + ) -> PyResult<()> { + let result = get_runtime().block_on(async { + DatabaseQueries::truncate(&slf.pool, table).await + }); + result.map_err(to_pyruntime_err) + } +} diff --git a/nautilus_core/infrastructure/src/python/sql/mod.rs b/nautilus_core/infrastructure/src/python/sql/mod.rs new file mode 100644 index 000000000000..0f12dc82fc2d --- /dev/null +++ b/nautilus_core/infrastructure/src/python/sql/mod.rs @@ -0,0 +1,16 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (C) 2015-2024 Nautech Systems Pty Ltd. All rights reserved. +// https://nautechsystems.io +// +// Licensed under the GNU Lesser General Public License Version 3.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------------------------------- + +pub mod cache_database; \ No newline at end of file diff --git a/nautilus_core/infrastructure/src/sql/cache_database.rs b/nautilus_core/infrastructure/src/sql/cache_database.rs new file mode 100644 index 000000000000..baa1244ed331 --- /dev/null +++ b/nautilus_core/infrastructure/src/sql/cache_database.rs @@ -0,0 +1,146 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (C) 2015-2024 Nautech Systems Pty Ltd. All rights reserved. +// https://nautechsystems.io +// +// Licensed under the GNU Lesser General Public License Version 3.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------------------------------- + +use std::collections::{HashMap, VecDeque}; +use tokio::sync::mpsc::{Receiver,Sender,channel}; +use std::time::{Duration, Instant}; +use sqlx::{PgPool}; +use sqlx::postgres::PgConnectOptions; +use tokio::sync::mpsc::error::TryRecvError; +use tokio::time::sleep; +use nautilus_model::types::currency::Currency; +use crate::sql::models::general::GeneralRow; +use crate::sql::pg::{connect_pg, get_postgres_connect_options}; +use crate::sql::queries::DatabaseQueries; + + +#[derive(Debug)] +#[cfg_attr( + feature = "python", + pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.persistence") +)] +pub struct PostgresCacheDatabase { + pub pool: PgPool, + tx: Sender, +} + + +#[derive(Debug, Clone)] +pub enum DatabaseQuery{ + Add(String,Vec), + AddCurrency(Currency), +} + + + +fn get_buffer_interval() -> Duration { + Duration::from_millis(0) +} + +async fn drain_buffer(pool: &PgPool, buffer: &mut VecDeque) { + for cmd in buffer.drain(..) { + match cmd { + DatabaseQuery::Add(key,value) => { + DatabaseQueries ::add(pool,key,value).await.unwrap(); + }, + DatabaseQuery::AddCurrency(currency) => { + DatabaseQueries::add_currency(pool,currency).await.unwrap(); + }, + } + } +} + +impl PostgresCacheDatabase { + pub async fn connect( + host: Option, + port: Option, + username: Option, + password: Option, + database: Option + ) -> Result { + let pg_connect_options = get_postgres_connect_options(host,port,username,password,database).unwrap(); + let pool = connect_pg(pg_connect_options.clone().into()).await.unwrap(); + let (tx, rx) = channel::(1000); + // spawn a thread to handle messages + let _join_handle = tokio::spawn(async move { + PostgresCacheDatabase::handle_message(rx, pg_connect_options.clone().into()).await; + }); + Ok(PostgresCacheDatabase { pool, tx }) + } + + async fn handle_message( + mut rx: Receiver, + pg_connect_options: PgConnectOptions + ){ + let pool = connect_pg(pg_connect_options).await.unwrap(); + // Buffering + let mut buffer: VecDeque = VecDeque::new(); + let mut last_drain = Instant::now(); + let buffer_interval = get_buffer_interval(); + let recv_interval = Duration::from_millis(1); + + loop { + if last_drain.elapsed() >= buffer_interval && !buffer.is_empty() { + // drain buffer + drain_buffer(&pool, &mut buffer).await; + last_drain = Instant::now(); + } else { + // Continue to receive and handle messages until channel is hung up + match rx.try_recv() { + Ok(msg) => buffer.push_back(msg), + Err(TryRecvError::Empty) => sleep(recv_interval).await, + Err(TryRecvError::Disconnected) => break, + } + } + } + // rain any remaining message + if !buffer.is_empty(){ + drain_buffer(&pool,&mut buffer).await; + } + } + + pub async fn load(&self) -> Result>,sqlx::Error> { + let query = sqlx::query_as::<_,GeneralRow>("SELECT * FROM general"); + let result = query.fetch_all(&self.pool).await; + match result { + Ok(rows) => { + let mut cache: HashMap> = HashMap::new(); + for row in rows { + cache.insert(row.key, row.value); + } + Ok(cache) + } + Err(err) => { + panic!("Failed to load general table: {err}") + } + } + } + + + + pub async fn add(&self, key: String, value: Vec) -> anyhow::Result<()> { + let query = DatabaseQuery::Add(key,value); + self.tx.send(query) + .await + .map_err(|err| anyhow::anyhow!("Failed to send query to database message handler: {err}")) + } + + pub async fn add_currency(&self, currency: Currency) -> anyhow::Result<()> { + let query = DatabaseQuery::AddCurrency(currency); + self.tx.send(query) + .await + .map_err(|err| anyhow::anyhow!("Failed to query add_currency to database message handler: {err}")) + } +} \ No newline at end of file diff --git a/nautilus_core/infrastructure/src/sql/mod.rs b/nautilus_core/infrastructure/src/sql/mod.rs index f098c3cf9d49..7e2558fdce60 100644 --- a/nautilus_core/infrastructure/src/sql/mod.rs +++ b/nautilus_core/infrastructure/src/sql/mod.rs @@ -13,12 +13,13 @@ // limitations under the License. // ------------------------------------------------------------------------------------------------- -/// Be careful about ordering and foreign key constraints when deleting data. -/// We can use this list for manual truncation of tables. -pub const NAUTILUS_TABLES: [&str; 5] = - ["general", "instrument", "currency", "order", "order_event"]; +// Be careful about ordering and foreign key constraints when deleting data. +pub const NAUTILUS_TABLES: [&str; 2] = [ + "general", + "currency", +]; -pub mod database; pub mod models; pub mod pg; -pub mod schema; +pub mod cache_database; +pub mod queries; \ No newline at end of file diff --git a/nautilus_core/infrastructure/src/sql/schema.rs b/nautilus_core/infrastructure/src/sql/models/general.rs similarity index 91% rename from nautilus_core/infrastructure/src/sql/schema.rs rename to nautilus_core/infrastructure/src/sql/models/general.rs index 2a551437f16c..1c47f74a7f74 100644 --- a/nautilus_core/infrastructure/src/sql/schema.rs +++ b/nautilus_core/infrastructure/src/sql/models/general.rs @@ -13,8 +13,9 @@ // limitations under the License. // ------------------------------------------------------------------------------------------------- -#[derive(sqlx::FromRow)] -pub struct GeneralItem { + +#[derive(Debug, sqlx::FromRow)] +pub struct GeneralRow { pub key: String, - pub value: String, -} + pub value: Vec, +} \ No newline at end of file diff --git a/nautilus_core/infrastructure/src/sql/models/mod.rs b/nautilus_core/infrastructure/src/sql/models/mod.rs index 02d78ede908a..7e2e8e9c01da 100644 --- a/nautilus_core/infrastructure/src/sql/models/mod.rs +++ b/nautilus_core/infrastructure/src/sql/models/mod.rs @@ -14,3 +14,5 @@ // ------------------------------------------------------------------------------------------------- pub mod instruments; +pub mod general; +pub mod types; diff --git a/nautilus_core/infrastructure/src/sql/models/types.rs b/nautilus_core/infrastructure/src/sql/models/types.rs new file mode 100644 index 000000000000..ede57202f430 --- /dev/null +++ b/nautilus_core/infrastructure/src/sql/models/types.rs @@ -0,0 +1,42 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (C) 2015-2024 Nautech Systems Pty Ltd. All rights reserved. +// https://nautechsystems.io +// +// Licensed under the GNU Lesser General Public License Version 3.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------------------------------- + +use std::str::FromStr; +use sqlx::{FromRow, Row}; +use sqlx::postgres::PgRow; +use nautilus_model::enums::CurrencyType; +use nautilus_model::types::currency::Currency; + +pub struct CurrencyModel(pub Currency); + +impl <'r> FromRow<'r, PgRow> for CurrencyModel { + fn from_row(row: &'r PgRow) -> Result { + let code = row.try_get::("code")?; + let precision = row.try_get::("precision")?; + let iso4217 = row.try_get::("iso4217")?; + let name = row.try_get::("name")?; + let currency_type = row.try_get::("currency_type") + .map(|res| CurrencyType::from_str(res.as_str()).unwrap())?; + + let currency = Currency::new( + code.as_str(), + precision as u8, + iso4217 as u16, + name.as_str(), + currency_type, + ).unwrap(); + Ok(CurrencyModel(currency)) + } +} diff --git a/nautilus_core/infrastructure/src/sql/queries.rs b/nautilus_core/infrastructure/src/sql/queries.rs new file mode 100644 index 000000000000..20f836913877 --- /dev/null +++ b/nautilus_core/infrastructure/src/sql/queries.rs @@ -0,0 +1,92 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (C) 2015-2024 Nautech Systems Pty Ltd. All rights reserved. +// https://nautechsystems.io +// +// Licensed under the GNU Lesser General Public License Version 3.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------------------------------- + + +use std::collections::HashMap; +use sqlx::{PgPool}; +use nautilus_model::types::currency::Currency; +use crate::sql::models::general::GeneralRow; +use crate::sql::models::types::CurrencyModel; + +pub struct DatabaseQueries; + +impl DatabaseQueries{ + pub async fn add(pool: &PgPool, key: String, value: Vec) -> anyhow::Result<()> { + sqlx::query("INSERT INTO general (key, value) VALUES ($1, $2)") + .bind(key) + .bind(value) + .execute(pool) + .await + .map(|_| ()) + .map_err(|err| anyhow::anyhow!("Failed to insert into general table: {err}")) + } + + + pub async fn load(pool: &PgPool) -> anyhow::Result>> { + sqlx::query_as::<_,GeneralRow>("SELECT * FROM general") + .fetch_all(pool) + .await + .map(|rows| { + let mut cache: HashMap> = HashMap::new(); + for row in rows { + cache.insert(row.key, row.value); + } + cache + }) + .map_err(|err| anyhow::anyhow!("Failed to load general table: {err}")) + } + + + pub async fn add_currency(pool: &PgPool, currency: Currency) -> anyhow::Result<()> { + sqlx::query( + "INSERT INTO currency (code, precision, iso4217, name, currency_type) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (code) DO NOTHING" + ) + .bind(currency.code.as_str()) + .bind(currency.precision as i32) + .bind(currency.iso4217 as i32) + .bind(currency.name.as_str()) + .bind(currency.currency_type.to_string()) + .execute(pool) + .await + .map(|_| ()) + .map_err(|err| anyhow::anyhow!("Failed to insert into currency table: {err}")) + } + + pub async fn load_currencies(pool: &PgPool) -> anyhow::Result> { + sqlx::query_as::<_, CurrencyModel>("SELECT * FROM currency ORDER BY code ASC") + .fetch_all(pool) + .await + .map(|rows| rows.into_iter().map(|row| row.0).collect()) + .map_err(|err| anyhow::anyhow!("Failed to load currencies: {err}")) + } + + + pub async fn load_currency(pool: &PgPool, code: &str) -> anyhow::Result> { + sqlx::query_as::<_, CurrencyModel>("SELECT * FROM currency WHERE code = $1") + .bind(code) + .fetch_optional(pool) + .await + .map(|currency| currency.map(|row| {row.0}) ) + .map_err(|err| anyhow::anyhow!("Failed to load currency: {err}")) + } + + pub async fn truncate(pool: &PgPool, table: String) -> anyhow::Result<()>{ + sqlx::query(format!("TRUNCATE TABLE {} CASCADE",table).as_str()) + .execute(pool) + .await + .map(|_| ()) + .map_err(|err| anyhow::anyhow!("Failed to truncate table: {err}")) + } +} \ No newline at end of file diff --git a/nautilus_core/model/src/types/currency.rs b/nautilus_core/model/src/types/currency.rs index 2ebf57de6f06..78a031cabe03 100644 --- a/nautilus_core/model/src/types/currency.rs +++ b/nautilus_core/model/src/types/currency.rs @@ -142,26 +142,6 @@ impl<'de> Deserialize<'de> for Currency { } } -impl <'r> FromRow<'r, PgRow> for Currency { - fn from_row(row: &'r PgRow) -> Result { - let code = row.try_get::("code")?; - let precision = row.try_get::("precision")?; - let iso4217 = row.try_get::("iso4217")?; - let name = row.try_get::("name")?; - let currency_type = row.try_get::("currency_type") - .map(|res| CurrencyType::from_str(res.as_str()).unwrap())?; - - Ok( - Currency::new( - code.as_str(), - precision as u8, - iso4217 as u16, - name.as_str(), - currency_type, - ).unwrap()) - } -} - //////////////////////////////////////////////////////////////////////////////// // Tests diff --git a/nautilus_core/persistence/Cargo.toml b/nautilus_core/persistence/Cargo.toml index 59e60ca1c617..22c850a69d09 100644 --- a/nautilus_core/persistence/Cargo.toml +++ b/nautilus_core/persistence/Cargo.toml @@ -12,10 +12,13 @@ crate-type = ["rlib", "staticlib", "cdylib"] [dependencies] nautilus-core = { path = "../core" } -nautilus-model = { path = "../model" } +nautilus-model = { path = "../model", features = ["stubs"] } +nautilus-common = { path = "../common" } +nautilus-accounting = { path = "../accounting" } anyhow = { workspace = true } futures = { workspace = true } pyo3 = { workspace = true, optional = true } +pyo3-asyncio = { workspace = true, optional = true } rand = { workspace = true } tokio = { workspace = true } thiserror = { workspace = true } @@ -23,6 +26,7 @@ binary-heap-plus = "0.5.0" compare = "0.1.0" datafusion = { version = "37.1.0", default-features = false, features = ["compression", "regex_expressions", "unicode_expressions", "pyarrow"] } dotenv = "0.15.0" +sqlx = { workspace = true} [dev-dependencies] criterion = { workspace = true } @@ -35,12 +39,12 @@ procfs = "0.16.0" [features] default = ["ffi", "python"] extension-module = [ - "pyo3/extension-module", - "nautilus-core/extension-module", + "pyo3/extension-module", + "nautilus-core/extension-module", "nautilus-model/extension-module", ] ffi = ["nautilus-core/ffi", "nautilus-model/ffi"] -python = ["pyo3", "nautilus-core/python", "nautilus-model/python"] +python = ["pyo3", "nautilus-core/python", "nautilus-model/python", "nautilus-accounting/python"] [[bench]] name = "bench_persistence" diff --git a/tests/integration_tests/infrastructure/test_cache_database_postgres.py b/tests/integration_tests/infrastructure/test_cache_database_postgres.py new file mode 100644 index 000000000000..3f878839ede6 --- /dev/null +++ b/tests/integration_tests/infrastructure/test_cache_database_postgres.py @@ -0,0 +1,124 @@ +# ------------------------------------------------------------------------------------------------- +# Copyright (C) 2015-2024 Nautech Systems Pty Ltd. All rights reserved. +# https://nautechsystems.io +# +# Licensed under the GNU Lesser General Public License Version 3.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ------------------------------------------------------------------------------------------------- + +import asyncio +import os + +from nautilus_trader.cache.postgres.adapter import CachePostgresAdapter +import pytest + +from nautilus_trader.common.component import MessageBus +from nautilus_trader.common.component import TestClock +from nautilus_trader.model.enums import CurrencyType +from nautilus_trader.model.objects import Currency +from nautilus_trader.portfolio.portfolio import Portfolio +from nautilus_trader.test_kit.functions import eventually +from nautilus_trader.test_kit.providers import TestInstrumentProvider +from nautilus_trader.test_kit.stubs.component import TestComponentStubs +from nautilus_trader.test_kit.stubs.data import TestDataStubs +from nautilus_trader.test_kit.stubs.identifiers import TestIdStubs +from nautilus_trader.trading.strategy import Strategy + +AUDUSD_SIM = TestInstrumentProvider.default_fx_ccy("AUD/USD") + + +class TestCachePostgresAdapter: + def setup(self): + # set envs + os.environ["POSTGRES_HOST"] = "localhost" + os.environ["POSTGRES_PORT"] = "5432" + os.environ["POSTGRES_USERNAME"] = "nautilus" + os.environ["POSTGRES_PASSWORD"] = "pass" + os.environ["POSTGRES_DATABASE"] = "nautilus" + self.database: CachePostgresAdapter = CachePostgresAdapter() + self.clock = TestClock() + + self.trader_id = TestIdStubs.trader_id() + + self.msgbus = MessageBus( + trader_id=self.trader_id, + clock=self.clock, + ) + + self.cache = TestComponentStubs.cache() + + self.portfolio = Portfolio( + msgbus=self.msgbus, + cache=self.cache, + clock=self.clock, + ) + + # Init strategy + self.strategy = Strategy() + self.strategy.register( + trader_id=self.trader_id, + portfolio=self.portfolio, + msgbus=self.msgbus, + cache=self.cache, + clock=self.clock, + ) + + + def teardown(self): + self.database.flush() + + @pytest.mark.asyncio + async def test_load_general_objects_when_nothing_in_cache_returns_empty_dict(self): + # Arrange, Act + result = self.database.load() + + # Assert + assert result == {} + + @pytest.mark.asyncio + async def test_add_general_object_adds_to_cache(self): + # Arrange + bar = TestDataStubs.bar_5decimal() + key = str(bar.bar_type) + "-" + str(bar.ts_event) + + # Act + self.database.add(key, str(bar).encode()) + + # Allow MPSC thread to insert + await eventually(lambda: self.database.load()) + + # Assert + assert self.database.load() == {key: str(bar).encode()} + + ################################################################################ + # Currency + ################################################################################ + @pytest.mark.asyncio + async def test_add_currency(self): + # Arrange + currency = Currency( + code="BTC", + precision=8, + iso4217=0, + name="BTC", + currency_type=CurrencyType.CRYPTO, + ) + + # Act + self.database.add_currency(currency) + + # Allow MPSC thread to insert + await eventually(lambda: self.database.load_currency(currency.code)) + + # Assert + assert self.database.load_currency(currency.code) == currency + + currencies = self.database.load_currencies() + assert list(currencies.keys()) == ["BTC"] From ace8ff08e43e9bffab75142ecacf8e1e0f7cb336 Mon Sep 17 00:00:00 2001 From: Filip Macek Date: Tue, 23 Apr 2024 16:39:08 +0200 Subject: [PATCH 10/22] add loging in sql statement execution --- nautilus_core/infrastructure/src/sql/pg.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/nautilus_core/infrastructure/src/sql/pg.rs b/nautilus_core/infrastructure/src/sql/pg.rs index ab78e25124e2..e5ddf8f603a6 100644 --- a/nautilus_core/infrastructure/src/sql/pg.rs +++ b/nautilus_core/infrastructure/src/sql/pg.rs @@ -164,7 +164,17 @@ pub async fn init_postgres(pg: &PgPool, database: String, password: String) -> a let file_path = file.path(); let sql_content = std::fs::read_to_string(file_path.clone())?; for sql_statement in sql_content.split(';').filter(|s| !s.trim().is_empty()) { - sqlx::query(sql_statement).execute(pg).await?; + let result = sqlx::query(sql_statement).execute(pg).await; + match result { + Ok(_) => info!("Executed statement successfully"), + Err(err) =>{ + if err.to_string().contains("already exists") { + info!("Already exists error on statement, skipping"); + } else { + panic!("Error executing statement: {:?}", err) + } + } + } } } // grant connect From 229d03b62c20ff5bb5f3d1363cef21bc43e6e12b Mon Sep 17 00:00:00 2001 From: Filip Macek Date: Tue, 23 Apr 2024 16:39:26 +0200 Subject: [PATCH 11/22] add postgres service to github action --- .github/workflows/build_postgres.yml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/.github/workflows/build_postgres.yml b/.github/workflows/build_postgres.yml index ec245acefd77..92a768a91bfc 100644 --- a/.github/workflows/build_postgres.yml +++ b/.github/workflows/build_postgres.yml @@ -29,6 +29,15 @@ jobs: --health-interval 10s --health-timeout 5s --health-retries 5 + postgres: + image: postgres + env: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: pass + POSTGRES_DB: postgres + ports: + - 5432:5432 + options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 steps: From 7019e933a456027154e5e6e385585e8b707f2f0a Mon Sep 17 00:00:00 2001 From: Filip Macek Date: Tue, 23 Apr 2024 16:39:36 +0200 Subject: [PATCH 12/22] implement postgres cache e2e tests --- .../tests/test_cache_database_postgres.rs | 101 ++++++++++++++++++ 1 file changed, 101 insertions(+) create mode 100644 nautilus_core/infrastructure/tests/test_cache_database_postgres.rs diff --git a/nautilus_core/infrastructure/tests/test_cache_database_postgres.rs b/nautilus_core/infrastructure/tests/test_cache_database_postgres.rs new file mode 100644 index 000000000000..f39079df502a --- /dev/null +++ b/nautilus_core/infrastructure/tests/test_cache_database_postgres.rs @@ -0,0 +1,101 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (C) 2015-2024 Nautech Systems Pty Ltd. All rights reserved. +// https://nautechsystems.io +// +// Licensed under the GNU Lesser General Public License Version 3.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------------------------------- + + +use tokio::sync::Mutex; +use sqlx::PgPool; +use nautilus_infrastructure::sql::cache_database::PostgresCacheDatabase; +use nautilus_infrastructure::sql::pg::{connect_pg, delete_nautilus_postgres_tables, drop_postgres, init_postgres, PostgresConnectOptions}; + +static INITlIZED: Mutex = Mutex::const_new(false); + + +pub fn get_test_pg_connect_options() -> PostgresConnectOptions { + PostgresConnectOptions::new( + "localhost".to_string(), + 5432, + "nautilus".to_string(), + "pass".to_string(), + "nautilus".to_string(), + ) +} +pub async fn get_pg() -> PgPool { + let pg_connect_options = get_test_pg_connect_options(); + connect_pg(pg_connect_options.into()).await.unwrap() +} + +pub async fn initialize() -> anyhow::Result<()>{ + let pg_pool = get_pg().await; + let mut initialized = INITlIZED.lock().await; + // 1. check if we need to init schema + if !*initialized { + // drop and init postgres commands dont throw, they just log + // se we can use them here in init login in this order + drop_postgres(&pg_pool, "nautilus".to_string()).await.unwrap(); + init_postgres(&pg_pool, "nautilus".to_string(), "pass".to_string()).await.unwrap(); + *initialized = true; + } + // truncate all table + println!("deleting all tables"); + delete_nautilus_postgres_tables(&pg_pool).await.unwrap(); + Ok(()) +} + +pub async fn get_pg_cache_database() -> anyhow::Result { + initialize().await.unwrap(); + let connect_options = get_test_pg_connect_options(); + Ok( + PostgresCacheDatabase::connect( + Some(connect_options.host), + Some(connect_options.port), + Some(connect_options.username), + Some(connect_options.password), + Some(connect_options.database), + ) + .await.unwrap() + ) +} + +#[cfg(test)] +mod tests{ + use std::time::Duration; + use crate::get_pg_cache_database; + + + + /// ----------------------------------- General ----------------------------------- + #[tokio::test] + async fn test_load_general_objects_when_nothing_in_cache_returns_empty_hashmap(){ + let pg_cache = get_pg_cache_database().await.unwrap(); + let result = pg_cache.load().await.unwrap(); + println!("1: {:?}",result); + assert_eq!(result.len(), 0); + } + + #[tokio::test] + async fn test_add_general_object_adds_to_cache(){ + let pg_cache = get_pg_cache_database().await.unwrap(); + let test_id_value = String::from("test_value").into_bytes(); + pg_cache.add(String::from("test_id"),test_id_value.clone()).await.unwrap(); + // sleep with tokio + tokio::time::sleep(Duration::from_secs(1)).await; + let result = pg_cache.load().await.unwrap(); + println!("2: {:?}",result); + assert_eq!(result.keys().len(), 1); + assert_eq!(result.keys().cloned().collect::>(), vec![String::from("test_id")]); // assert_eq!(result.get(&test_id_key).unwrap().to_owned(),&test_id_value.clone()); + assert_eq!(result.get("test_id").unwrap().to_owned(), test_id_value); + } + +} From 3931c8bc8b34f218261091d5a3488eac27ddb95c Mon Sep 17 00:00:00 2001 From: Filip Macek Date: Tue, 23 Apr 2024 16:42:23 +0200 Subject: [PATCH 13/22] fix precommit --- nautilus_core/Cargo.lock | 4 - nautilus_core/cli/src/database/postgres.rs | 7 +- .../src/python/sql/cache_database.rs | 81 ++++++------------ .../infrastructure/src/python/sql/mod.rs | 2 +- .../infrastructure/src/sql/cache_database.rs | 82 +++++++++---------- nautilus_core/infrastructure/src/sql/mod.rs | 9 +- .../infrastructure/src/sql/models/general.rs | 3 +- .../infrastructure/src/sql/models/mod.rs | 2 +- .../infrastructure/src/sql/models/types.rs | 35 ++++---- nautilus_core/infrastructure/src/sql/pg.rs | 20 ++--- .../infrastructure/src/sql/queries.rs | 27 +++--- .../tests/test_cache_database_postgres.rs | 60 ++++++++------ nautilus_core/model/src/types/currency.rs | 1 - nautilus_core/persistence/Cargo.toml | 6 +- nautilus_trader.iml | 29 +++++++ nautilus_trader/cache/postgres/adapter.py | 12 ++- .../cache/postgres/transformers.py | 13 ++- nautilus_trader/core/nautilus_pyo3.pyi | 20 +++++ .../test_cache_database_postgres.py | 5 +- 19 files changed, 212 insertions(+), 206 deletions(-) create mode 100644 nautilus_trader.iml diff --git a/nautilus_core/Cargo.lock b/nautilus_core/Cargo.lock index 32c7a71899d8..588d4ba3d425 100644 --- a/nautilus_core/Cargo.lock +++ b/nautilus_core/Cargo.lock @@ -2801,18 +2801,14 @@ dependencies = [ "datafusion", "dotenv", "futures", - "nautilus-accounting", - "nautilus-common", "nautilus-core", "nautilus-model", "procfs", "pyo3", - "pyo3-asyncio", "quickcheck", "quickcheck_macros", "rand", "rstest", - "sqlx", "thiserror", "tokio", ] diff --git a/nautilus_core/cli/src/database/postgres.rs b/nautilus_core/cli/src/database/postgres.rs index 904fae18974a..7e1d64a534a5 100644 --- a/nautilus_core/cli/src/database/postgres.rs +++ b/nautilus_core/cli/src/database/postgres.rs @@ -13,13 +13,12 @@ // limitations under the License. // ------------------------------------------------------------------------------------------------- -use log::{error, info}; -use nautilus_infrastructure::sql::pg::{connect_pg, get_postgres_connect_options}; -use sqlx::PgPool; +use nautilus_infrastructure::sql::pg::{ + connect_pg, drop_postgres, get_postgres_connect_options, init_postgres, +}; use crate::opt::{DatabaseCommand, DatabaseOpt}; - pub async fn run_database_command(opt: DatabaseOpt) -> anyhow::Result<()> { let command = opt.command.clone(); diff --git a/nautilus_core/infrastructure/src/python/sql/cache_database.rs b/nautilus_core/infrastructure/src/python/sql/cache_database.rs index a2c871401220..f1c7b181eb33 100644 --- a/nautilus_core/infrastructure/src/python/sql/cache_database.rs +++ b/nautilus_core/infrastructure/src/python/sql/cache_database.rs @@ -14,14 +14,16 @@ // ------------------------------------------------------------------------------------------------- use std::collections::HashMap; -use pyo3::prelude::*; -use nautilus_core::python::{to_pyruntime_err}; + use nautilus_common::runtime::get_runtime; +use nautilus_core::python::to_pyruntime_err; use nautilus_model::types::currency::Currency; -use crate::sql::cache_database::PostgresCacheDatabase; -use crate::sql::pg::delete_nautilus_postgres_tables; -use crate::sql::queries::DatabaseQueries; +use pyo3::prelude::*; +use crate::sql::{ + cache_database::PostgresCacheDatabase, pg::delete_nautilus_postgres_tables, + queries::DatabaseQueries, +}; #[pymethods] impl PostgresCacheDatabase { @@ -32,7 +34,7 @@ impl PostgresCacheDatabase { port: Option, username: Option, password: Option, - database: Option + database: Option, ) -> PyResult { let result = get_runtime().block_on(async { PostgresCacheDatabase::connect(host, port, username, password, database).await @@ -41,79 +43,48 @@ impl PostgresCacheDatabase { } #[pyo3(name = "load")] - fn py_load<'py>( - slf: PyRef<'_, Self>, - ) -> PyResult>> { - let result = get_runtime().block_on(async { - slf.load().await - }); + fn py_load(slf: PyRef<'_, Self>) -> PyResult>> { + let result = get_runtime().block_on(async { slf.load().await }); result.map_err(to_pyruntime_err) } - #[pyo3(name = "load_currency")] - fn py_load_currency( - slf: PyRef<'_, Self>, - code: &str, - ) -> PyResult> { - let result = get_runtime().block_on(async { - DatabaseQueries::load_currency(&slf.pool, code).await - }); + fn py_load_currency(slf: PyRef<'_, Self>, code: &str) -> PyResult> { + let result = + get_runtime().block_on(async { DatabaseQueries::load_currency(&slf.pool, code).await }); result.map_err(to_pyruntime_err) } #[pyo3(name = "load_currencies")] - fn py_load_currencies<'py>( - slf: PyRef<'_, Self>, - ) -> PyResult> { - let result = get_runtime().block_on(async { - DatabaseQueries::load_currencies(&slf.pool).await - }); + fn py_load_currencies(slf: PyRef<'_, Self>) -> PyResult> { + let result = + get_runtime().block_on(async { DatabaseQueries::load_currencies(&slf.pool).await }); result.map_err(to_pyruntime_err) } #[pyo3(name = "add")] - fn py_add( - slf: PyRef<'_, Self>, - key: String, - value: Vec - ) -> PyResult<()> { - let result = get_runtime().block_on(async { - slf.add(key,value).await - }); + fn py_add(slf: PyRef<'_, Self>, key: String, value: Vec) -> PyResult<()> { + let result = get_runtime().block_on(async { slf.add(key, value).await }); result.map_err(to_pyruntime_err) } #[pyo3(name = "add_currency")] - fn py_add_currency( - slf: PyRef<'_, Self>, - currency: Currency, - ) -> PyResult<()> { - let result = get_runtime().block_on(async { - slf.add_currency(currency).await - }); + fn py_add_currency(slf: PyRef<'_, Self>, currency: Currency) -> PyResult<()> { + let result = get_runtime().block_on(async { slf.add_currency(currency).await }); result.map_err(to_pyruntime_err) } #[pyo3(name = "flush_db")] - fn py_drop_schema( - slf: PyRef<'_, Self>, - ) -> PyResult<()> { - let result = get_runtime().block_on(async { - delete_nautilus_postgres_tables(&slf.pool) - .await - }); + fn py_drop_schema(slf: PyRef<'_, Self>) -> PyResult<()> { + let result = + get_runtime().block_on(async { delete_nautilus_postgres_tables(&slf.pool).await }); result.map_err(to_pyruntime_err) } #[pyo3(name = "truncate")] - fn py_truncate( - slf: PyRef<'_, Self>, - table: String - ) -> PyResult<()> { - let result = get_runtime().block_on(async { - DatabaseQueries::truncate(&slf.pool, table).await - }); + fn py_truncate(slf: PyRef<'_, Self>, table: String) -> PyResult<()> { + let result = + get_runtime().block_on(async { DatabaseQueries::truncate(&slf.pool, table).await }); result.map_err(to_pyruntime_err) } } diff --git a/nautilus_core/infrastructure/src/python/sql/mod.rs b/nautilus_core/infrastructure/src/python/sql/mod.rs index 0f12dc82fc2d..454f4be6bd37 100644 --- a/nautilus_core/infrastructure/src/python/sql/mod.rs +++ b/nautilus_core/infrastructure/src/python/sql/mod.rs @@ -13,4 +13,4 @@ // limitations under the License. // ------------------------------------------------------------------------------------------------- -pub mod cache_database; \ No newline at end of file +pub mod cache_database; diff --git a/nautilus_core/infrastructure/src/sql/cache_database.rs b/nautilus_core/infrastructure/src/sql/cache_database.rs index baa1244ed331..ed6ab08a9e48 100644 --- a/nautilus_core/infrastructure/src/sql/cache_database.rs +++ b/nautilus_core/infrastructure/src/sql/cache_database.rs @@ -13,18 +13,23 @@ // limitations under the License. // ------------------------------------------------------------------------------------------------- -use std::collections::{HashMap, VecDeque}; -use tokio::sync::mpsc::{Receiver,Sender,channel}; -use std::time::{Duration, Instant}; -use sqlx::{PgPool}; -use sqlx::postgres::PgConnectOptions; -use tokio::sync::mpsc::error::TryRecvError; -use tokio::time::sleep; -use nautilus_model::types::currency::Currency; -use crate::sql::models::general::GeneralRow; -use crate::sql::pg::{connect_pg, get_postgres_connect_options}; -use crate::sql::queries::DatabaseQueries; +use std::{ + collections::{HashMap, VecDeque}, + time::{Duration, Instant}, +}; +use nautilus_model::types::currency::Currency; +use sqlx::{postgres::PgConnectOptions, PgPool}; +use tokio::{ + sync::mpsc::{channel, error::TryRecvError, Receiver, Sender}, + time::sleep, +}; + +use crate::sql::{ + models::general::GeneralRow, + pg::{connect_pg, get_postgres_connect_options}, + queries::DatabaseQueries, +}; #[derive(Debug)] #[cfg_attr( @@ -36,15 +41,12 @@ pub struct PostgresCacheDatabase { tx: Sender, } - #[derive(Debug, Clone)] -pub enum DatabaseQuery{ - Add(String,Vec), +pub enum DatabaseQuery { + Add(String, Vec), AddCurrency(Currency), } - - fn get_buffer_interval() -> Duration { Duration::from_millis(0) } @@ -52,12 +54,12 @@ fn get_buffer_interval() -> Duration { async fn drain_buffer(pool: &PgPool, buffer: &mut VecDeque) { for cmd in buffer.drain(..) { match cmd { - DatabaseQuery::Add(key,value) => { - DatabaseQueries ::add(pool,key,value).await.unwrap(); - }, + DatabaseQuery::Add(key, value) => { + DatabaseQueries::add(pool, key, value).await.unwrap(); + } DatabaseQuery::AddCurrency(currency) => { - DatabaseQueries::add_currency(pool,currency).await.unwrap(); - }, + DatabaseQueries::add_currency(pool, currency).await.unwrap(); + } } } } @@ -68,9 +70,10 @@ impl PostgresCacheDatabase { port: Option, username: Option, password: Option, - database: Option - ) -> Result { - let pg_connect_options = get_postgres_connect_options(host,port,username,password,database).unwrap(); + database: Option, + ) -> Result { + let pg_connect_options = + get_postgres_connect_options(host, port, username, password, database).unwrap(); let pool = connect_pg(pg_connect_options.clone().into()).await.unwrap(); let (tx, rx) = channel::(1000); // spawn a thread to handle messages @@ -80,10 +83,7 @@ impl PostgresCacheDatabase { Ok(PostgresCacheDatabase { pool, tx }) } - async fn handle_message( - mut rx: Receiver, - pg_connect_options: PgConnectOptions - ){ + async fn handle_message(mut rx: Receiver, pg_connect_options: PgConnectOptions) { let pool = connect_pg(pg_connect_options).await.unwrap(); // Buffering let mut buffer: VecDeque = VecDeque::new(); @@ -106,13 +106,13 @@ impl PostgresCacheDatabase { } } // rain any remaining message - if !buffer.is_empty(){ - drain_buffer(&pool,&mut buffer).await; + if !buffer.is_empty() { + drain_buffer(&pool, &mut buffer).await; } } - pub async fn load(&self) -> Result>,sqlx::Error> { - let query = sqlx::query_as::<_,GeneralRow>("SELECT * FROM general"); + pub async fn load(&self) -> Result>, sqlx::Error> { + let query = sqlx::query_as::<_, GeneralRow>("SELECT * FROM general"); let result = query.fetch_all(&self.pool).await; match result { Ok(rows) => { @@ -127,20 +127,18 @@ impl PostgresCacheDatabase { } } } - - pub async fn add(&self, key: String, value: Vec) -> anyhow::Result<()> { - let query = DatabaseQuery::Add(key,value); - self.tx.send(query) - .await - .map_err(|err| anyhow::anyhow!("Failed to send query to database message handler: {err}")) + let query = DatabaseQuery::Add(key, value); + self.tx.send(query).await.map_err(|err| { + anyhow::anyhow!("Failed to send query to database message handler: {err}") + }) } pub async fn add_currency(&self, currency: Currency) -> anyhow::Result<()> { let query = DatabaseQuery::AddCurrency(currency); - self.tx.send(query) - .await - .map_err(|err| anyhow::anyhow!("Failed to query add_currency to database message handler: {err}")) + self.tx.send(query).await.map_err(|err| { + anyhow::anyhow!("Failed to query add_currency to database message handler: {err}") + }) } -} \ No newline at end of file +} diff --git a/nautilus_core/infrastructure/src/sql/mod.rs b/nautilus_core/infrastructure/src/sql/mod.rs index 7e2558fdce60..6e14caa333d3 100644 --- a/nautilus_core/infrastructure/src/sql/mod.rs +++ b/nautilus_core/infrastructure/src/sql/mod.rs @@ -14,12 +14,9 @@ // ------------------------------------------------------------------------------------------------- // Be careful about ordering and foreign key constraints when deleting data. -pub const NAUTILUS_TABLES: [&str; 2] = [ - "general", - "currency", -]; +pub const NAUTILUS_TABLES: [&str; 2] = ["general", "currency"]; +pub mod cache_database; pub mod models; pub mod pg; -pub mod cache_database; -pub mod queries; \ No newline at end of file +pub mod queries; diff --git a/nautilus_core/infrastructure/src/sql/models/general.rs b/nautilus_core/infrastructure/src/sql/models/general.rs index 1c47f74a7f74..824714a2c0d3 100644 --- a/nautilus_core/infrastructure/src/sql/models/general.rs +++ b/nautilus_core/infrastructure/src/sql/models/general.rs @@ -13,9 +13,8 @@ // limitations under the License. // ------------------------------------------------------------------------------------------------- - #[derive(Debug, sqlx::FromRow)] pub struct GeneralRow { pub key: String, pub value: Vec, -} \ No newline at end of file +} diff --git a/nautilus_core/infrastructure/src/sql/models/mod.rs b/nautilus_core/infrastructure/src/sql/models/mod.rs index 7e2e8e9c01da..4fe4acea056d 100644 --- a/nautilus_core/infrastructure/src/sql/models/mod.rs +++ b/nautilus_core/infrastructure/src/sql/models/mod.rs @@ -13,6 +13,6 @@ // limitations under the License. // ------------------------------------------------------------------------------------------------- -pub mod instruments; pub mod general; +pub mod instruments; pub mod types; diff --git a/nautilus_core/infrastructure/src/sql/models/types.rs b/nautilus_core/infrastructure/src/sql/models/types.rs index ede57202f430..a2d98170f723 100644 --- a/nautilus_core/infrastructure/src/sql/models/types.rs +++ b/nautilus_core/infrastructure/src/sql/models/types.rs @@ -14,29 +14,30 @@ // ------------------------------------------------------------------------------------------------- use std::str::FromStr; -use sqlx::{FromRow, Row}; -use sqlx::postgres::PgRow; -use nautilus_model::enums::CurrencyType; -use nautilus_model::types::currency::Currency; + +use nautilus_model::{enums::CurrencyType, types::currency::Currency}; +use sqlx::{postgres::PgRow, FromRow, Row}; pub struct CurrencyModel(pub Currency); -impl <'r> FromRow<'r, PgRow> for CurrencyModel { +impl<'r> FromRow<'r, PgRow> for CurrencyModel { fn from_row(row: &'r PgRow) -> Result { - let code = row.try_get::("code")?; - let precision = row.try_get::("precision")?; - let iso4217 = row.try_get::("iso4217")?; - let name = row.try_get::("name")?; - let currency_type = row.try_get::("currency_type") + let code = row.try_get::("code")?; + let precision = row.try_get::("precision")?; + let iso4217 = row.try_get::("iso4217")?; + let name = row.try_get::("name")?; + let currency_type = row + .try_get::("currency_type") .map(|res| CurrencyType::from_str(res.as_str()).unwrap())?; - + let currency = Currency::new( - code.as_str(), - precision as u8, - iso4217 as u16, - name.as_str(), - currency_type, - ).unwrap(); + code.as_str(), + precision as u8, + iso4217 as u16, + name.as_str(), + currency_type, + ) + .unwrap(); Ok(CurrencyModel(currency)) } } diff --git a/nautilus_core/infrastructure/src/sql/pg.rs b/nautilus_core/infrastructure/src/sql/pg.rs index e5ddf8f603a6..4871a183a6bf 100644 --- a/nautilus_core/infrastructure/src/sql/pg.rs +++ b/nautilus_core/infrastructure/src/sql/pg.rs @@ -167,7 +167,7 @@ pub async fn init_postgres(pg: &PgPool, database: String, password: String) -> a let result = sqlx::query(sql_statement).execute(pg).await; match result { Ok(_) => info!("Executed statement successfully"), - Err(err) =>{ + Err(err) => { if err.to_string().contains("already exists") { info!("Already exists error on statement, skipping"); } else { @@ -205,10 +205,10 @@ pub async fn init_postgres(pg: &PgPool, database: String, password: String) -> a "GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO {};", database ) - .as_str(), + .as_str(), ) - .execute(pg) - .await + .execute(pg) + .await { Ok(_) => info!("All tables privileges granted to role {}", database), Err(err) => error!( @@ -222,10 +222,10 @@ pub async fn init_postgres(pg: &PgPool, database: String, password: String) -> a "GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO {};", database ) - .as_str(), + .as_str(), ) - .execute(pg) - .await + .execute(pg) + .await { Ok(_) => info!("All sequences privileges granted to role {}", database), Err(err) => error!( @@ -239,10 +239,10 @@ pub async fn init_postgres(pg: &PgPool, database: String, password: String) -> a "GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA public TO {};", database ) - .as_str(), + .as_str(), ) - .execute(pg) - .await + .execute(pg) + .await { Ok(_) => info!("All functions privileges granted to role {}", database), Err(err) => error!( diff --git a/nautilus_core/infrastructure/src/sql/queries.rs b/nautilus_core/infrastructure/src/sql/queries.rs index 20f836913877..c0db3edc4bc3 100644 --- a/nautilus_core/infrastructure/src/sql/queries.rs +++ b/nautilus_core/infrastructure/src/sql/queries.rs @@ -13,16 +13,16 @@ // limitations under the License. // ------------------------------------------------------------------------------------------------- - use std::collections::HashMap; -use sqlx::{PgPool}; + use nautilus_model::types::currency::Currency; -use crate::sql::models::general::GeneralRow; -use crate::sql::models::types::CurrencyModel; +use sqlx::PgPool; + +use crate::sql::models::{general::GeneralRow, types::CurrencyModel}; pub struct DatabaseQueries; -impl DatabaseQueries{ +impl DatabaseQueries { pub async fn add(pool: &PgPool, key: String, value: Vec) -> anyhow::Result<()> { sqlx::query("INSERT INTO general (key, value) VALUES ($1, $2)") .bind(key) @@ -33,9 +33,8 @@ impl DatabaseQueries{ .map_err(|err| anyhow::anyhow!("Failed to insert into general table: {err}")) } - pub async fn load(pool: &PgPool) -> anyhow::Result>> { - sqlx::query_as::<_,GeneralRow>("SELECT * FROM general") + sqlx::query_as::<_, GeneralRow>("SELECT * FROM general") .fetch_all(pool) .await .map(|rows| { @@ -47,9 +46,8 @@ impl DatabaseQueries{ }) .map_err(|err| anyhow::anyhow!("Failed to load general table: {err}")) } - - pub async fn add_currency(pool: &PgPool, currency: Currency) -> anyhow::Result<()> { + pub async fn add_currency(pool: &PgPool, currency: Currency) -> anyhow::Result<()> { sqlx::query( "INSERT INTO currency (code, precision, iso4217, name, currency_type) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (code) DO NOTHING" ) @@ -64,7 +62,7 @@ impl DatabaseQueries{ .map_err(|err| anyhow::anyhow!("Failed to insert into currency table: {err}")) } - pub async fn load_currencies(pool: &PgPool) -> anyhow::Result> { + pub async fn load_currencies(pool: &PgPool) -> anyhow::Result> { sqlx::query_as::<_, CurrencyModel>("SELECT * FROM currency ORDER BY code ASC") .fetch_all(pool) .await @@ -72,21 +70,20 @@ impl DatabaseQueries{ .map_err(|err| anyhow::anyhow!("Failed to load currencies: {err}")) } - pub async fn load_currency(pool: &PgPool, code: &str) -> anyhow::Result> { sqlx::query_as::<_, CurrencyModel>("SELECT * FROM currency WHERE code = $1") .bind(code) .fetch_optional(pool) .await - .map(|currency| currency.map(|row| {row.0}) ) + .map(|currency| currency.map(|row| row.0)) .map_err(|err| anyhow::anyhow!("Failed to load currency: {err}")) } - pub async fn truncate(pool: &PgPool, table: String) -> anyhow::Result<()>{ - sqlx::query(format!("TRUNCATE TABLE {} CASCADE",table).as_str()) + pub async fn truncate(pool: &PgPool, table: String) -> anyhow::Result<()> { + sqlx::query(format!("TRUNCATE TABLE {} CASCADE", table).as_str()) .execute(pool) .await .map(|_| ()) .map_err(|err| anyhow::anyhow!("Failed to truncate table: {err}")) } -} \ No newline at end of file +} diff --git a/nautilus_core/infrastructure/tests/test_cache_database_postgres.rs b/nautilus_core/infrastructure/tests/test_cache_database_postgres.rs index f39079df502a..59409df8fd13 100644 --- a/nautilus_core/infrastructure/tests/test_cache_database_postgres.rs +++ b/nautilus_core/infrastructure/tests/test_cache_database_postgres.rs @@ -13,14 +13,17 @@ // limitations under the License. // ------------------------------------------------------------------------------------------------- - -use tokio::sync::Mutex; +use nautilus_infrastructure::sql::{ + cache_database::PostgresCacheDatabase, + pg::{ + connect_pg, delete_nautilus_postgres_tables, drop_postgres, init_postgres, + PostgresConnectOptions, + }, +}; use sqlx::PgPool; -use nautilus_infrastructure::sql::cache_database::PostgresCacheDatabase; -use nautilus_infrastructure::sql::pg::{connect_pg, delete_nautilus_postgres_tables, drop_postgres, init_postgres, PostgresConnectOptions}; - -static INITlIZED: Mutex = Mutex::const_new(false); +use tokio::sync::Mutex; +static INIT: Mutex = Mutex::const_new(false); pub fn get_test_pg_connect_options() -> PostgresConnectOptions { PostgresConnectOptions::new( @@ -36,15 +39,19 @@ pub async fn get_pg() -> PgPool { connect_pg(pg_connect_options.into()).await.unwrap() } -pub async fn initialize() -> anyhow::Result<()>{ +pub async fn initialize() -> anyhow::Result<()> { let pg_pool = get_pg().await; - let mut initialized = INITlIZED.lock().await; + let mut initialized = INIT.lock().await; // 1. check if we need to init schema if !*initialized { // drop and init postgres commands dont throw, they just log // se we can use them here in init login in this order - drop_postgres(&pg_pool, "nautilus".to_string()).await.unwrap(); - init_postgres(&pg_pool, "nautilus".to_string(), "pass".to_string()).await.unwrap(); + drop_postgres(&pg_pool, "nautilus".to_string()) + .await + .unwrap(); + init_postgres(&pg_pool, "nautilus".to_string(), "pass".to_string()) + .await + .unwrap(); *initialized = true; } // truncate all table @@ -56,46 +63,49 @@ pub async fn initialize() -> anyhow::Result<()>{ pub async fn get_pg_cache_database() -> anyhow::Result { initialize().await.unwrap(); let connect_options = get_test_pg_connect_options(); - Ok( - PostgresCacheDatabase::connect( - Some(connect_options.host), + Ok(PostgresCacheDatabase::connect( + Some(connect_options.host), Some(connect_options.port), Some(connect_options.username), Some(connect_options.password), Some(connect_options.database), - ) - .await.unwrap() ) + .await + .unwrap()) } #[cfg(test)] -mod tests{ +mod tests { use std::time::Duration; - use crate::get_pg_cache_database; - + use crate::get_pg_cache_database; /// ----------------------------------- General ----------------------------------- #[tokio::test] - async fn test_load_general_objects_when_nothing_in_cache_returns_empty_hashmap(){ + async fn test_load_general_objects_when_nothing_in_cache_returns_empty_hashmap() { let pg_cache = get_pg_cache_database().await.unwrap(); let result = pg_cache.load().await.unwrap(); - println!("1: {:?}",result); + println!("1: {:?}", result); assert_eq!(result.len(), 0); } #[tokio::test] - async fn test_add_general_object_adds_to_cache(){ + async fn test_add_general_object_adds_to_cache() { let pg_cache = get_pg_cache_database().await.unwrap(); let test_id_value = String::from("test_value").into_bytes(); - pg_cache.add(String::from("test_id"),test_id_value.clone()).await.unwrap(); + pg_cache + .add(String::from("test_id"), test_id_value.clone()) + .await + .unwrap(); // sleep with tokio tokio::time::sleep(Duration::from_secs(1)).await; let result = pg_cache.load().await.unwrap(); - println!("2: {:?}",result); + println!("2: {:?}", result); assert_eq!(result.keys().len(), 1); - assert_eq!(result.keys().cloned().collect::>(), vec![String::from("test_id")]); // assert_eq!(result.get(&test_id_key).unwrap().to_owned(),&test_id_value.clone()); + assert_eq!( + result.keys().cloned().collect::>(), + vec![String::from("test_id")] + ); // assert_eq!(result.get(&test_id_key).unwrap().to_owned(),&test_id_value.clone()); assert_eq!(result.get("test_id").unwrap().to_owned(), test_id_value); } - } diff --git a/nautilus_core/model/src/types/currency.rs b/nautilus_core/model/src/types/currency.rs index 78a031cabe03..28d0576e3870 100644 --- a/nautilus_core/model/src/types/currency.rs +++ b/nautilus_core/model/src/types/currency.rs @@ -142,7 +142,6 @@ impl<'de> Deserialize<'de> for Currency { } } - //////////////////////////////////////////////////////////////////////////////// // Tests //////////////////////////////////////////////////////////////////////////////// diff --git a/nautilus_core/persistence/Cargo.toml b/nautilus_core/persistence/Cargo.toml index 22c850a69d09..d95388b88c95 100644 --- a/nautilus_core/persistence/Cargo.toml +++ b/nautilus_core/persistence/Cargo.toml @@ -13,12 +13,9 @@ crate-type = ["rlib", "staticlib", "cdylib"] [dependencies] nautilus-core = { path = "../core" } nautilus-model = { path = "../model", features = ["stubs"] } -nautilus-common = { path = "../common" } -nautilus-accounting = { path = "../accounting" } anyhow = { workspace = true } futures = { workspace = true } pyo3 = { workspace = true, optional = true } -pyo3-asyncio = { workspace = true, optional = true } rand = { workspace = true } tokio = { workspace = true } thiserror = { workspace = true } @@ -26,7 +23,6 @@ binary-heap-plus = "0.5.0" compare = "0.1.0" datafusion = { version = "37.1.0", default-features = false, features = ["compression", "regex_expressions", "unicode_expressions", "pyarrow"] } dotenv = "0.15.0" -sqlx = { workspace = true} [dev-dependencies] criterion = { workspace = true } @@ -44,7 +40,7 @@ extension-module = [ "nautilus-model/extension-module", ] ffi = ["nautilus-core/ffi", "nautilus-model/ffi"] -python = ["pyo3", "nautilus-core/python", "nautilus-model/python", "nautilus-accounting/python"] +python = ["pyo3", "nautilus-core/python", "nautilus-model/python"] [[bench]] name = "bench_persistence" diff --git a/nautilus_trader.iml b/nautilus_trader.iml new file mode 100644 index 000000000000..7b5f44196c75 --- /dev/null +++ b/nautilus_trader.iml @@ -0,0 +1,29 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/nautilus_trader/cache/postgres/adapter.py b/nautilus_trader/cache/postgres/adapter.py index d52a6b9d2e45..fe67593bc242 100644 --- a/nautilus_trader/cache/postgres/adapter.py +++ b/nautilus_trader/cache/postgres/adapter.py @@ -24,8 +24,8 @@ class CachePostgresAdapter(CacheDatabaseFacade): def __init__( - self, - config: CacheConfig = None, + self, + config: CacheConfig | None = None, ): if config: config = CacheConfig() @@ -46,14 +46,12 @@ def add_currency(self, currency: Currency): currency_pyo3 = transform_currency_to_pyo3(currency) self._backing.add_currency(currency_pyo3) - def load_currencies(self) -> dict[str,Currency]: - currencies = self._backing.load_currencies() - return { currency.code: transform_currency_from_pyo3(currency) for currency in currencies} + def load_currencies(self) -> dict[str, Currency]: + currencies = self._backing.load_currencies() + return {currency.code: transform_currency_from_pyo3(currency) for currency in currencies} def load_currency(self, code: str) -> Currency | None: currency_pyo3 = self._backing.load_currency(code) if currency_pyo3: return transform_currency_from_pyo3(currency_pyo3) return None - - diff --git a/nautilus_trader/cache/postgres/transformers.py b/nautilus_trader/cache/postgres/transformers.py index bf5a252ca491..65542490015d 100644 --- a/nautilus_trader/cache/postgres/transformers.py +++ b/nautilus_trader/cache/postgres/transformers.py @@ -13,32 +13,29 @@ # limitations under the License. # ------------------------------------------------------------------------------------------------- +from nautilus_trader.core import nautilus_pyo3 from nautilus_trader.model.enums import CurrencyType from nautilus_trader.model.objects import Currency -from nautilus_trader.core import nautilus_pyo3 ################################################################################ # Currency ################################################################################ -def transform_currency_from_pyo3(currency: nautilus_pyo3.Currency)-> Currency: +def transform_currency_from_pyo3(currency: nautilus_pyo3.Currency) -> Currency: return Currency( code=currency.code, precision=currency.precision, iso4217=currency.iso4217, name=currency.name, - currency_type=CurrencyType(currency.currency_type.value) + currency_type=CurrencyType(currency.currency_type.value), ) -def transform_currency_to_pyo3(currency: Currency)-> nautilus_pyo3.Currency: +def transform_currency_to_pyo3(currency: Currency) -> nautilus_pyo3.Currency: return nautilus_pyo3.Currency( code=currency.code, precision=currency.precision, iso4217=currency.iso4217, name=currency.name, - currency_type=nautilus_pyo3.CurrencyType.from_str(currency.currency_type.name) + currency_type=nautilus_pyo3.CurrencyType.from_str(currency.currency_type.name), ) - - - diff --git a/nautilus_trader/core/nautilus_pyo3.pyi b/nautilus_trader/core/nautilus_pyo3.pyi index ea41259eb7cf..d027c8213435 100644 --- a/nautilus_trader/core/nautilus_pyo3.pyi +++ b/nautilus_trader/core/nautilus_pyo3.pyi @@ -682,6 +682,8 @@ class CurrencyType(Enum): CRYPTO = "CRYPTO" FIAT = "FIAT" COMMODITY_BACKED = "COMMODITY_BACKED" + @classmethod + def from_str(cls, value: str) -> CurrencyType: ... class InstrumentCloseType(Enum): END_OF_SESSION = "END_OF_SESSION" @@ -2250,6 +2252,24 @@ class RedisCacheDatabase: config: dict[str, Any], ) -> None: ... +class PostgresCacheDatabase: + @classmethod + def connect( + cls, + host: str | None = None, + port: str | None = None, + username: str | None = None, + password: str | None = None, + database: str | None = None, + )-> PostgresCacheDatabase: ... + def load(self) -> dict[str,str]: ... + def add(self, key: str, value: bytes) -> None: ... + def add_currency(self,currency: Currency) -> None: ... + def load_currency(self, code: str) -> Currency | None: ... + def load_currencies(self) -> list[Currency]: ... + def flush_db(self) -> None: ... + def truncate(self, table: str) -> None: ... + ################################################################################################### # Network ################################################################################################### diff --git a/tests/integration_tests/infrastructure/test_cache_database_postgres.py b/tests/integration_tests/infrastructure/test_cache_database_postgres.py index 3f878839ede6..ee69790d2a7e 100644 --- a/tests/integration_tests/infrastructure/test_cache_database_postgres.py +++ b/tests/integration_tests/infrastructure/test_cache_database_postgres.py @@ -13,12 +13,11 @@ # limitations under the License. # ------------------------------------------------------------------------------------------------- -import asyncio import os -from nautilus_trader.cache.postgres.adapter import CachePostgresAdapter import pytest +from nautilus_trader.cache.postgres.adapter import CachePostgresAdapter from nautilus_trader.common.component import MessageBus from nautilus_trader.common.component import TestClock from nautilus_trader.model.enums import CurrencyType @@ -31,6 +30,7 @@ from nautilus_trader.test_kit.stubs.identifiers import TestIdStubs from nautilus_trader.trading.strategy import Strategy + AUDUSD_SIM = TestInstrumentProvider.default_fx_ccy("AUD/USD") @@ -70,7 +70,6 @@ def setup(self): clock=self.clock, ) - def teardown(self): self.database.flush() From 0773a008ab82d2c4bb4870e4047991550a2c2d9e Mon Sep 17 00:00:00 2001 From: Filip Macek Date: Thu, 25 Apr 2024 11:09:51 +0200 Subject: [PATCH 14/22] refactor statement exeuction in init-postgres --- nautilus_core/infrastructure/src/sql/pg.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/nautilus_core/infrastructure/src/sql/pg.rs b/nautilus_core/infrastructure/src/sql/pg.rs index 4871a183a6bf..ad9c504e7be4 100644 --- a/nautilus_core/infrastructure/src/sql/pg.rs +++ b/nautilus_core/infrastructure/src/sql/pg.rs @@ -164,17 +164,20 @@ pub async fn init_postgres(pg: &PgPool, database: String, password: String) -> a let file_path = file.path(); let sql_content = std::fs::read_to_string(file_path.clone())?; for sql_statement in sql_content.split(';').filter(|s| !s.trim().is_empty()) { - let result = sqlx::query(sql_statement).execute(pg).await; - match result { - Ok(_) => info!("Executed statement successfully"), - Err(err) => { + sqlx::query(sql_statement) + .execute(pg) + .await + .map_err(|err| { if err.to_string().contains("already exists") { info!("Already exists error on statement, skipping"); } else { - panic!("Error executing statement: {:?}", err) + panic!( + "Error executing statement {} with error: {:?}", + sql_statement, err + ) } - } - } + }) + .unwrap(); } } // grant connect From f0daa705134415adbcf912b8cd35ccb0767c1316 Mon Sep 17 00:00:00 2001 From: Filip Macek Date: Thu, 25 Apr 2024 11:10:35 +0200 Subject: [PATCH 15/22] add nautilus-cli install and init-postgres in github action workflow --- .github/workflows/build_postgres.yml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build_postgres.yml b/.github/workflows/build_postgres.yml index 92a768a91bfc..2b991c2afff2 100644 --- a/.github/workflows/build_postgres.yml +++ b/.github/workflows/build_postgres.yml @@ -34,7 +34,7 @@ jobs: env: POSTGRES_USER: postgres POSTGRES_PASSWORD: pass - POSTGRES_DB: postgres + POSTGRES_DB: nautilus ports: - 5432:5432 options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 @@ -122,6 +122,11 @@ jobs: # pre-commit run --hook-stage manual gitlint-ci pre-commit run --all-files + - name: Install Nautilus CLI and run init postgres + run: | + make install-cli + nautilus database init --host localhost --port 5432 --user postgres --password pass --database nautilus + - name: Run nautilus_core cargo tests (Linux) run: | cargo install cargo-nextest From 6614b2c52ba2ef82ff34f79acfe8162e7048e6d4 Mon Sep 17 00:00:00 2001 From: Filip Macek Date: Thu, 25 Apr 2024 11:12:56 +0200 Subject: [PATCH 16/22] refactor test cache database postgres without init and drop postgres --- .../tests/test_cache_database_postgres.rs | 33 +++++-------------- 1 file changed, 8 insertions(+), 25 deletions(-) diff --git a/nautilus_core/infrastructure/tests/test_cache_database_postgres.rs b/nautilus_core/infrastructure/tests/test_cache_database_postgres.rs index 59409df8fd13..c0b10aa4821e 100644 --- a/nautilus_core/infrastructure/tests/test_cache_database_postgres.rs +++ b/nautilus_core/infrastructure/tests/test_cache_database_postgres.rs @@ -21,48 +21,32 @@ use nautilus_infrastructure::sql::{ }, }; use sqlx::PgPool; -use tokio::sync::Mutex; -static INIT: Mutex = Mutex::const_new(false); - -pub fn get_test_pg_connect_options() -> PostgresConnectOptions { +pub fn get_test_pg_connect_options(username: &str) -> PostgresConnectOptions { PostgresConnectOptions::new( "localhost".to_string(), 5432, - "nautilus".to_string(), + username.to_string(), "pass".to_string(), "nautilus".to_string(), ) } -pub async fn get_pg() -> PgPool { - let pg_connect_options = get_test_pg_connect_options(); +pub async fn get_pg(username: &str) -> PgPool { + let pg_connect_options = get_test_pg_connect_options(username); connect_pg(pg_connect_options.into()).await.unwrap() } pub async fn initialize() -> anyhow::Result<()> { - let pg_pool = get_pg().await; - let mut initialized = INIT.lock().await; - // 1. check if we need to init schema - if !*initialized { - // drop and init postgres commands dont throw, they just log - // se we can use them here in init login in this order - drop_postgres(&pg_pool, "nautilus".to_string()) - .await - .unwrap(); - init_postgres(&pg_pool, "nautilus".to_string(), "pass".to_string()) - .await - .unwrap(); - *initialized = true; - } - // truncate all table - println!("deleting all tables"); + // get pg pool with root postgres user to drop & create schema + let pg_pool = get_pg("postgres").await; delete_nautilus_postgres_tables(&pg_pool).await.unwrap(); Ok(()) } pub async fn get_pg_cache_database() -> anyhow::Result { initialize().await.unwrap(); - let connect_options = get_test_pg_connect_options(); + // run tests as nautilus user + let connect_options = get_test_pg_connect_options("nautilus"); Ok(PostgresCacheDatabase::connect( Some(connect_options.host), Some(connect_options.port), @@ -80,7 +64,6 @@ mod tests { use crate::get_pg_cache_database; - /// ----------------------------------- General ----------------------------------- #[tokio::test] async fn test_load_general_objects_when_nothing_in_cache_returns_empty_hashmap() { let pg_cache = get_pg_cache_database().await.unwrap(); From b10f79acf38769d99b3be62b59b017b7c242fa72 Mon Sep 17 00:00:00 2001 From: Filip Macek Date: Thu, 25 Apr 2024 11:29:11 +0200 Subject: [PATCH 17/22] fixup! add nautilus-cli install and init-postgres in github action workflow --- .github/workflows/build_postgres.yml | 2 +- .../infrastructure/tests/test_cache_database_postgres.rs | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/.github/workflows/build_postgres.yml b/.github/workflows/build_postgres.yml index 2b991c2afff2..5f5af7a376d5 100644 --- a/.github/workflows/build_postgres.yml +++ b/.github/workflows/build_postgres.yml @@ -125,7 +125,7 @@ jobs: - name: Install Nautilus CLI and run init postgres run: | make install-cli - nautilus database init --host localhost --port 5432 --user postgres --password pass --database nautilus + nautilus database init --host localhost --port 5432 --username postgres --password pass --database nautilus - name: Run nautilus_core cargo tests (Linux) run: | diff --git a/nautilus_core/infrastructure/tests/test_cache_database_postgres.rs b/nautilus_core/infrastructure/tests/test_cache_database_postgres.rs index c0b10aa4821e..05d3cc39ff1a 100644 --- a/nautilus_core/infrastructure/tests/test_cache_database_postgres.rs +++ b/nautilus_core/infrastructure/tests/test_cache_database_postgres.rs @@ -15,10 +15,7 @@ use nautilus_infrastructure::sql::{ cache_database::PostgresCacheDatabase, - pg::{ - connect_pg, delete_nautilus_postgres_tables, drop_postgres, init_postgres, - PostgresConnectOptions, - }, + pg::{connect_pg, delete_nautilus_postgres_tables, PostgresConnectOptions}, }; use sqlx::PgPool; From b61950f2e884350070a38a1581cf8c573d4c469c Mon Sep 17 00:00:00 2001 From: Filip Macek Date: Thu, 25 Apr 2024 13:55:03 +0200 Subject: [PATCH 18/22] update cli and database opt with schema dir path --- nautilus_core/cli/src/database/postgres.rs | 1 + nautilus_core/cli/src/opt.rs | 3 +++ nautilus_core/infrastructure/src/sql/pg.rs | 9 +++++++-- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/nautilus_core/cli/src/database/postgres.rs b/nautilus_core/cli/src/database/postgres.rs index 7e1d64a534a5..39f374881df0 100644 --- a/nautilus_core/cli/src/database/postgres.rs +++ b/nautilus_core/cli/src/database/postgres.rs @@ -37,6 +37,7 @@ pub async fn run_database_command(opt: DatabaseOpt) -> anyhow::Result<()> { &pg, pg_connect_options.database, pg_connect_options.password, + config.schema, ) .await?; } diff --git a/nautilus_core/cli/src/opt.rs b/nautilus_core/cli/src/opt.rs index 8ef5b9c2b1d2..1df35a0e78f2 100644 --- a/nautilus_core/cli/src/opt.rs +++ b/nautilus_core/cli/src/opt.rs @@ -51,6 +51,9 @@ pub struct DatabaseConfig { /// Password for connecting to the database #[arg(long)] pub password: Option, + /// Directory path to the schema files + #[arg(long)] + pub schema: Option, } #[derive(Parser, Debug, Clone)] diff --git a/nautilus_core/infrastructure/src/sql/pg.rs b/nautilus_core/infrastructure/src/sql/pg.rs index ad9c504e7be4..fd31fea4c917 100644 --- a/nautilus_core/infrastructure/src/sql/pg.rs +++ b/nautilus_core/infrastructure/src/sql/pg.rs @@ -130,7 +130,12 @@ fn get_schema_dir() -> anyhow::Result { }) } -pub async fn init_postgres(pg: &PgPool, database: String, password: String) -> anyhow::Result<()> { +pub async fn init_postgres( + pg: &PgPool, + database: String, + password: String, + schema_dir: Option, +) -> anyhow::Result<()> { info!("Initializing Postgres database with target permissions and schema"); // create public schema match sqlx::query("CREATE SCHEMA IF NOT EXISTS public;") @@ -155,7 +160,7 @@ pub async fn init_postgres(pg: &PgPool, database: String, password: String) -> a } } // execute all the sql files in schema dir - let schema_dir = get_schema_dir()?; + let schema_dir = schema_dir.unwrap_or_else(|| get_schema_dir().unwrap()); let mut sql_files = std::fs::read_dir(schema_dir)?.collect::, std::io::Error>>()?; for file in &mut sql_files { From 28c411aacc5e03ec53310f5aded02fbc6f1cff5d Mon Sep 17 00:00:00 2001 From: Filip Macek Date: Thu, 25 Apr 2024 13:55:25 +0200 Subject: [PATCH 19/22] refactor nautilus cli install and run for github build action --- .github/workflows/build_postgres.yml | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build_postgres.yml b/.github/workflows/build_postgres.yml index 5f5af7a376d5..3c34f865dafb 100644 --- a/.github/workflows/build_postgres.yml +++ b/.github/workflows/build_postgres.yml @@ -125,7 +125,14 @@ jobs: - name: Install Nautilus CLI and run init postgres run: | make install-cli - nautilus database init --host localhost --port 5432 --username postgres --password pass --database nautilus + nautilus database init --schema ${{ github.workspace }}/schema + env: + POSTGRES_HOST: localhost + POSTGRES_PORT: 5432 + POSTGRES_USERNAME: postgres + POSTGRES_PASSWORD: pass + POSTGRES_DATABASE: nautilus + - name: Run nautilus_core cargo tests (Linux) run: | From 1e2ee86bc81d92f0cf7f46ffbfb478d8effc8f87 Mon Sep 17 00:00:00 2001 From: Filip Macek Date: Thu, 25 Apr 2024 15:06:36 +0200 Subject: [PATCH 20/22] reset datebase any before tests --- .../infrastructure/test_cache_database_postgres.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/integration_tests/infrastructure/test_cache_database_postgres.py b/tests/integration_tests/infrastructure/test_cache_database_postgres.py index ee69790d2a7e..759a3de3fa75 100644 --- a/tests/integration_tests/infrastructure/test_cache_database_postgres.py +++ b/tests/integration_tests/infrastructure/test_cache_database_postgres.py @@ -43,6 +43,8 @@ def setup(self): os.environ["POSTGRES_PASSWORD"] = "pass" os.environ["POSTGRES_DATABASE"] = "nautilus" self.database: CachePostgresAdapter = CachePostgresAdapter() + # reset database + self.database.flush() self.clock = TestClock() self.trader_id = TestIdStubs.trader_id() From 7f62523adda2fc926166f71893d813d73d1a5235 Mon Sep 17 00:00:00 2001 From: Filip Macek Date: Thu, 25 Apr 2024 19:05:17 +0200 Subject: [PATCH 21/22] move github action change to offical build.yml file --- .github/workflows/build.yml | 62 ++++- .github/workflows/build_postgres.yml | 325 --------------------------- 2 files changed, 56 insertions(+), 331 deletions(-) delete mode 100644 .github/workflows/build_postgres.yml diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 5e53f546cadd..8c0dae0bb979 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -22,6 +22,25 @@ jobs: env: BUILD_MODE: debug RUST_BACKTRACE: 1 + services: + redis: + image: redis + ports: + - 6379:6379 + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + postgres: + image: postgres + env: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: pass + POSTGRES_DB: nautilus + ports: + - 5432:5432 + options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 steps: - name: Free disk space (Ubuntu) @@ -105,10 +124,16 @@ jobs: # pre-commit run --hook-stage manual gitlint-ci pre-commit run --all-files - - name: Install Redis (Linux) + - name: Install Nautilus CLI and run init postgres run: | - sudo apt-get install redis-server - redis-server --daemonize yes + make install-cli + nautilus database init --schema ${{ github.workspace }}/schema + env: + POSTGRES_HOST: localhost + POSTGRES_PORT: 5432 + POSTGRES_USERNAME: postgres + POSTGRES_PASSWORD: pass + POSTGRES_DATABASE: nautilus - name: Run nautilus_core cargo tests (Linux) run: | @@ -224,6 +249,25 @@ jobs: env: BUILD_MODE: debug RUST_BACKTRACE: 1 + services: + redis: + image: redis + ports: + - 6379:6379 + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + postgres: + image: postgres + env: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: pass + POSTGRES_DB: nautilus + ports: + - 5432:5432 + options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 steps: - name: Checkout repository @@ -290,10 +334,16 @@ jobs: # pre-commit run --hook-stage manual gitlint-ci pre-commit run --all-files - - name: Install Redis (macOS) + - name: Install Nautilus CLI and run init postgres run: | - brew install redis - redis-server --daemonize yes + make install-cli + nautilus database init --schema ${{ github.workspace }}/schema + env: + POSTGRES_HOST: localhost + POSTGRES_PORT: 5432 + POSTGRES_USERNAME: postgres + POSTGRES_PASSWORD: pass + POSTGRES_DATABASE: nautilus - name: Run nautilus_core cargo tests (macOS) run: | diff --git a/.github/workflows/build_postgres.yml b/.github/workflows/build_postgres.yml deleted file mode 100644 index 3c34f865dafb..000000000000 --- a/.github/workflows/build_postgres.yml +++ /dev/null @@ -1,325 +0,0 @@ -name: build - -on: [push] - -jobs: - build: - strategy: - fail-fast: false - matrix: - arch: [x64] - os: [ubuntu-latest] - python-version: ["3.10", "3.11", "3.12"] - defaults: - run: - shell: bash - name: build - Python ${{ matrix.python-version }} (${{ matrix.arch }} ${{ matrix.os }}) - runs-on: ${{ matrix.os }} - env: - BUILD_MODE: debug - RUST_BACKTRACE: 1 - - services: - redis: - image: redis - ports: - - 6379:6379 - options: >- - --health-cmd "redis-cli ping" - --health-interval 10s - --health-timeout 5s - --health-retries 5 - postgres: - image: postgres - env: - POSTGRES_USER: postgres - POSTGRES_PASSWORD: pass - POSTGRES_DB: nautilus - ports: - - 5432:5432 - options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 - - - steps: - - name: Free disk space (Ubuntu) - uses: jlumbroso/free-disk-space@main - with: - tool-cache: true - android: false - dotnet: false - haskell: false - large-packages: true - docker-images: true - swap-storage: true - - - name: Install runner dependencies - run: sudo apt-get install -y curl clang git libssl-dev make pkg-config - - - name: Checkout repository - uses: actions/checkout@v4 - - - name: Get Rust version from rust-toolchain.toml - id: rust-version - run: | - version=$(awk -F\" '/version/ {print $2}' nautilus_core/rust-toolchain.toml) - echo "Rust toolchain version $version" - echo "RUST_VERSION=$version" >> $GITHUB_ENV - working-directory: ${{ github.workspace }} - - - name: Set up Rust tool-chain (Linux, Windows) stable - uses: actions-rust-lang/setup-rust-toolchain@v1.5 - with: - toolchain: ${{ env.RUST_VERSION }} - components: rustfmt, clippy - - - name: Set up Python environment - uses: actions/setup-python@v5 - with: - python-version: ${{ matrix.python-version }} - - - name: Get Python version - run: | - version=$(python -c "import sys; print('.'.join(map(str, sys.version_info[:3])))") - echo "PYTHON_VERSION=$version" >> $GITHUB_ENV - - - name: Get Poetry version from poetry-version - run: | - version=$(cat poetry-version) - echo "POETRY_VERSION=$version" >> $GITHUB_ENV - - - name: Install Poetry - uses: snok/install-poetry@v1 - with: - version: ${{ env.POETRY_VERSION }} - - - name: Install build dependencies - run: python -m pip install --upgrade pip setuptools wheel pre-commit msgspec - - - name: Install TA-Lib (Linux) - run: | - make install-talib - poetry run pip install ta-lib - - - name: Setup cached pre-commit - id: cached-pre-commit - uses: actions/cache@v4 - with: - path: ~/.cache/pre-commit - key: ${{ runner.os }}-${{ env.PYTHON_VERSION }}-pre-commit-${{ hashFiles('.pre-commit-config.yaml') }} - - - name: Set poetry cache-dir - run: echo "POETRY_CACHE_DIR=$(poetry config cache-dir)" >> $GITHUB_ENV - - - name: Poetry cache - id: cached-poetry - uses: actions/cache@v4 - with: - path: ${{ env.POETRY_CACHE_DIR }} - key: ${{ runner.os }}-${{ env.PYTHON_VERSION }}-poetry-${{ hashFiles('**/poetry.lock') }} - - - name: Run pre-commit - run: | - # pre-commit run --hook-stage manual gitlint-ci - pre-commit run --all-files - - - name: Install Nautilus CLI and run init postgres - run: | - make install-cli - nautilus database init --schema ${{ github.workspace }}/schema - env: - POSTGRES_HOST: localhost - POSTGRES_PORT: 5432 - POSTGRES_USERNAME: postgres - POSTGRES_PASSWORD: pass - POSTGRES_DATABASE: nautilus - - - - name: Run nautilus_core cargo tests (Linux) - run: | - cargo install cargo-nextest - make cargo-test - - - name: Run tests (Linux) - run: | - make pytest - make test-examples - - build-windows: - if: github.ref == 'refs/heads/master' || github.ref == 'refs/heads/nightly' - strategy: - fail-fast: false - matrix: - arch: [x64] - os: [windows-latest] - python-version: ["3.10", "3.11", "3.12"] - defaults: - run: - shell: bash - name: build - Python ${{ matrix.python-version }} (${{ matrix.arch }} ${{ matrix.os }}) - runs-on: ${{ matrix.os }} - env: - BUILD_MODE: debug - RUST_BACKTRACE: 1 - - steps: - - name: Checkout repository - uses: actions/checkout@v4 - - - name: Get Rust version from rust-toolchain.toml - id: rust-version - run: | - version=$(awk -F\" '/version/ {print $2}' nautilus_core/rust-toolchain.toml) - echo "Rust toolchain version $version" - echo "RUST_VERSION=$version" >> $GITHUB_ENV - working-directory: ${{ github.workspace }} - - - name: Set up Rust tool-chain (Linux, Windows) stable - uses: actions-rust-lang/setup-rust-toolchain@v1.5 - with: - toolchain: ${{ env.RUST_VERSION }} - components: rustfmt, clippy - - - name: Set up Python environment - uses: actions/setup-python@v5 - with: - python-version: ${{ matrix.python-version }} - - - name: Get Python version - run: | - version=$(python -c "import sys; print('.'.join(map(str, sys.version_info[:3])))") - echo "PYTHON_VERSION=$version" >> $GITHUB_ENV - - - name: Get Poetry version from poetry-version - run: | - version=$(cat poetry-version) - echo "POETRY_VERSION=$version" >> $GITHUB_ENV - - - name: Install Poetry - uses: snok/install-poetry@v1 - with: - version: ${{ env.POETRY_VERSION }} - - - name: Install build dependencies - run: python -m pip install --upgrade pip setuptools wheel pre-commit msgspec - - - name: Setup cached pre-commit - id: cached-pre-commit - uses: actions/cache@v4 - with: - path: ~/.cache/pre-commit - key: ${{ runner.os }}-${{ matrix.python-version }}-pre-commit-${{ hashFiles('.pre-commit-config.yaml') }} - - - name: Set poetry cache-dir - run: echo "POETRY_CACHE_DIR=$(poetry config cache-dir)" >> $GITHUB_ENV - - - name: Poetry cache - id: cached-poetry - uses: actions/cache@v4 - with: - path: ${{ env.POETRY_CACHE_DIR }} - key: ${{ runner.os }}-${{ matrix.python-version }}-poetry-${{ hashFiles('**/poetry.lock') }} - - - name: Run pre-commit - run: | - # pre-commit run --hook-stage manual gitlint-ci - pre-commit run --all-files - - # Run tests without parallel build (avoids linker errors) - - name: Run tests (Windows) - run: | - poetry install --with test --all-extras - poetry run pytest --ignore=tests/performance_tests --new-first --failed-first - env: - PARALLEL_BUILD: false - - build-macos: - if: github.ref == 'refs/heads/master' || github.ref == 'refs/heads/nightly' - strategy: - fail-fast: false - matrix: - arch: [x64] - os: [macos-latest] - python-version: ["3.10", "3.11", "3.12"] - defaults: - run: - shell: bash - name: build - Python ${{ matrix.python-version }} (${{ matrix.arch }} ${{ matrix.os }}) - runs-on: ${{ matrix.os }} - env: - BUILD_MODE: debug - RUST_BACKTRACE: 1 - - steps: - - name: Checkout repository - uses: actions/checkout@v4 - - - name: Get Rust version from rust-toolchain.toml - id: rust-version - run: | - version=$(awk -F\" '/version/ {print $2}' nautilus_core/rust-toolchain.toml) - echo "Rust toolchain version $version" - echo "RUST_VERSION=$version" >> $GITHUB_ENV - working-directory: ${{ github.workspace }} - - # Work around as actions-rust-lang does not seem to work on macOS yet - - name: Set up Rust tool-chain (macOS) stable - uses: actions-rs/toolchain@v1 - with: - toolchain: ${{ env.RUST_VERSION }} - override: true - components: rustfmt, clippy - - - name: Set up Python environment - uses: actions/setup-python@v5 - with: - python-version: ${{ matrix.python-version }} - - - name: Get Python version - run: | - version=$(python -c "import sys; print('.'.join(map(str, sys.version_info[:3])))") - echo "PYTHON_VERSION=$version" >> $GITHUB_ENV - - - name: Get Poetry version from poetry-version - run: | - version=$(cat poetry-version) - echo "POETRY_VERSION=$version" >> $GITHUB_ENV - - - name: Install Poetry - uses: snok/install-poetry@v1 - with: - version: ${{ env.POETRY_VERSION }} - - - name: Install build dependencies - run: python -m pip install --upgrade pip setuptools wheel pre-commit msgspec - - - name: Setup cached pre-commit - id: cached-pre-commit - uses: actions/cache@v4 - with: - path: ~/.cache/pre-commit - key: ${{ runner.os }}-${{ env.PYTHON_VERSION }}-pre-commit-${{ hashFiles('.pre-commit-config.yaml') }} - - - name: Set poetry cache-dir - run: echo "POETRY_CACHE_DIR=$(poetry config cache-dir)" >> $GITHUB_ENV - - - name: Poetry cache - id: cached-poetry - uses: actions/cache@v4 - with: - path: ${{ env.POETRY_CACHE_DIR }} - key: ${{ runner.os }}-${{ env.PYTHON_VERSION }}-poetry-${{ hashFiles('**/poetry.lock') }} - - - name: Run pre-commit - run: | - # pre-commit run --hook-stage manual gitlint-ci - pre-commit run --all-files - - - name: Run nautilus_core cargo tests (macOS) - run: | - cargo install cargo-nextest - make cargo-test - - - name: Run tests (macOS) - run: | - make pytest - make test-examples From c748c70ec07d3d1609a8676c188d33d46caca60e Mon Sep 17 00:00:00 2001 From: Filip Macek Date: Thu, 25 Apr 2024 19:06:44 +0200 Subject: [PATCH 22/22] delete nautilus_trader.iml --- nautilus_trader.iml | 29 ----------------------------- 1 file changed, 29 deletions(-) delete mode 100644 nautilus_trader.iml diff --git a/nautilus_trader.iml b/nautilus_trader.iml deleted file mode 100644 index 7b5f44196c75..000000000000 --- a/nautilus_trader.iml +++ /dev/null @@ -1,29 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file