From 63e1b7b74508aded0472cd0255e332bcc1112980 Mon Sep 17 00:00:00 2001 From: Chris Sellers Date: Wed, 25 Oct 2023 20:16:34 +1100 Subject: [PATCH] Update ParquetDataCatalog.write_data method and docs --- docs/concepts/data.md | 22 +++++++++++-- docs/getting_started/quickstart.md | 5 +++ .../persistence/catalog/parquet.py | 31 +++++++++++++++++-- 3 files changed, 53 insertions(+), 5 deletions(-) diff --git a/docs/concepts/data.md b/docs/concepts/data.md index 0a70447ff902..8c4c69c9148c 100644 --- a/docs/concepts/data.md +++ b/docs/concepts/data.md @@ -149,9 +149,25 @@ The following example shows the above list of Binance `OrderBookDelta` objects b catalog.write_data(deltas) ``` +### Basename template +Nautilus makes no assumptions about how data may be partitioned between files for a particular +data type and instrument ID. + +The `basename_template` keyword argument is an additional optional naming component for the output files. +The template should include placeholders that will be filled in with actual values at runtime. +These values can be automatically derived from the data or provided as additional keyword arguments. + +For example, using a basename template like `"{date}"` for AUD/USD.SIM quote tick data, +and assuming `"date"` is a provided or derivable field, could result in a filename like +`"2023-01-01.parquet"` under the `"quote_tick/audusd.sim/"` catalog directory. +If not provided, a default naming scheme will be applied. This parameter should be specified as a +keyword argument, like `write_data(data, basename_template="{date}")`. + ```{warning} -Existing data for the same data type, `instrument_id`, and date will be overwritten without prior warning. -Ensure you have appropriate backups or safeguards in place before performing this action. +Any existing data which already exists under a filename will be overwritten. +If a `basename_template` is not provided, then its very likely existing data for the data type and instrument ID will +be overwritten. To prevent data loss, ensure that the `basename_template` (or the default naming scheme) +generates unique filenames for different data sets. ``` Rust Arrow schema implementations and available for the follow data types (enhanced performance): @@ -166,6 +182,7 @@ Any stored data can then we read back into memory: from nautilus_trader.core.datetime import dt_to_unix_nanos import pandas as pd + start = dt_to_unix_nanos(pd.Timestamp("2020-01-03", tz=pytz.utc)) end = dt_to_unix_nanos(pd.Timestamp("2020-01-04", tz=pytz.utc)) @@ -180,6 +197,7 @@ The following example shows how to achieve this by initializing a `BacktestDataC from nautilus_trader.config import BacktestDataConfig from nautilus_trader.model.data import OrderBookDelta + data_config = BacktestDataConfig( catalog_path=str(catalog.path), data_cls=OrderBookDelta, diff --git a/docs/getting_started/quickstart.md b/docs/getting_started/quickstart.md index b76d6e12b450..fc7bd2778c5f 100644 --- a/docs/getting_started/quickstart.md +++ b/docs/getting_started/quickstart.md @@ -47,6 +47,7 @@ If everything worked correctly, you should be able to see a single EUR/USD instr ```python from nautilus_trader.persistence.catalog import ParquetDataCatalog + # You can also use `ParquetDataCatalog.from_env()` which will use the `NAUTILUS_PATH` environment variable # catalog = ParquetDataCatalog.from_env() catalog = ParquetDataCatalog("./catalog") @@ -191,6 +192,7 @@ FX trading is typically done on margin with Non-Deliverable Forward, Swap or CFD ```python from nautilus_trader.config import BacktestVenueConfig + venue = BacktestVenueConfig( name="SIM", oms_type="NETTING", @@ -221,6 +223,7 @@ adding the `QuoteTick`(s) for our EUR/USD instrument: from nautilus_trader.config import BacktestDataConfig from nautilus_trader.model.data import QuoteTick + data = BacktestDataConfig( catalog_path=str(catalog.path), data_cls=QuoteTick, @@ -243,6 +246,7 @@ from nautilus_trader.config import BacktestEngineConfig from nautilus_trader.config import ImportableStrategyConfig from nautilus_trader.config import LoggingConfig + engine = BacktestEngineConfig( strategies=[ ImportableStrategyConfig( @@ -302,6 +306,7 @@ The engine(s) can provide additional reports and information. from nautilus_trader.backtest.engine import BacktestEngine from nautilus_trader.model.identifiers import Venue + engine: BacktestEngine = node.get_engine(config.id) engine.trader.generate_order_fills_report() diff --git a/nautilus_trader/persistence/catalog/parquet.py b/nautilus_trader/persistence/catalog/parquet.py index f082c00b6d5d..f76d32165973 100644 --- a/nautilus_trader/persistence/catalog/parquet.py +++ b/nautilus_trader/persistence/catalog/parquet.py @@ -228,6 +228,7 @@ def write_chunk( data: list[Data], data_cls: type[Data], instrument_id: str | None = None, + basename_template: str = "part-{i}", **kwargs: Any, ) -> None: table = self._objects_to_table(data, data_cls=data_cls) @@ -235,12 +236,18 @@ def write_chunk( kw = dict(**self.dataset_kwargs, **kwargs) if "partitioning" not in kw: - self._fast_write(table=table, path=path, fs=self.fs) + self._fast_write( + table=table, + path=path, + fs=self.fs, + basename_template=basename_template, + ) else: # Write parquet file pds.write_dataset( data=table, base_dir=path, + basename_template=basename_template, format="parquet", filesystem=self.fs, min_rows_per_group=self.min_rows_per_group, @@ -254,7 +261,7 @@ def _fast_write( table: pa.Table, path: str, fs: fsspec.AbstractFileSystem, - basename_template: str = "part-{i}", + basename_template: str, ) -> None: name = basename_template.format(i=0) fs.mkdirs(path, exist_ok=True) @@ -265,7 +272,12 @@ def _fast_write( row_group_size=self.max_rows_per_group, ) - def write_data(self, data: list[Data | Event], **kwargs: Any) -> None: + def write_data( + self, + data: list[Data | Event], + basename_template: str = "part-{i}", + **kwargs: Any, + ) -> None: """ Write the given `data` to the catalog. @@ -277,9 +289,21 @@ def write_data(self, data: list[Data | Event], **kwargs: Any) -> None: ---------- data : list[Data | Event] The data or event objects to be written to the catalog. + basename_template : str, default 'part-{i}' + A template string used to generate basenames of written data files. + The token '{i}' will be replaced with an automatically incremented + integer as files are partitioned. + If not specified, it defaults to 'part-{i}' + the default extension '.parquet'. kwargs : Any Additional keyword arguments to be passed to the `write_chunk` method. + Warnings + -------- + Any existing data which already exists under a filename will be overwritten. + If a `basename_template` is not provided, then its very likely existing data for the data type and instrument ID will + be overwritten. To prevent data loss, ensure that the `basename_template` (or the default naming scheme) + generates unique filenames for different data sets. + Notes ----- - All data of the same type is expected to be monotonically increasing, or non-decreasing @@ -311,6 +335,7 @@ def key(obj: Any) -> tuple[str, str | None]: data=list(single_type), data_cls=name_to_cls[cls_name], instrument_id=instrument_id, + basename_template=basename_template, **kwargs, )