Skip to content

Commit

Permalink
Update ParquetDataCatalog.write_data method and docs
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Oct 25, 2023
1 parent 4ac01ad commit 63e1b7b
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 5 deletions.
22 changes: 20 additions & 2 deletions docs/concepts/data.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,25 @@ The following example shows the above list of Binance `OrderBookDelta` objects b
catalog.write_data(deltas)
```

### Basename template
Nautilus makes no assumptions about how data may be partitioned between files for a particular
data type and instrument ID.

The `basename_template` keyword argument is an additional optional naming component for the output files.
The template should include placeholders that will be filled in with actual values at runtime.
These values can be automatically derived from the data or provided as additional keyword arguments.

For example, using a basename template like `"{date}"` for AUD/USD.SIM quote tick data,
and assuming `"date"` is a provided or derivable field, could result in a filename like
`"2023-01-01.parquet"` under the `"quote_tick/audusd.sim/"` catalog directory.
If not provided, a default naming scheme will be applied. This parameter should be specified as a
keyword argument, like `write_data(data, basename_template="{date}")`.

```{warning}
Existing data for the same data type, `instrument_id`, and date will be overwritten without prior warning.
Ensure you have appropriate backups or safeguards in place before performing this action.
Any existing data which already exists under a filename will be overwritten.
If a `basename_template` is not provided, then its very likely existing data for the data type and instrument ID will
be overwritten. To prevent data loss, ensure that the `basename_template` (or the default naming scheme)
generates unique filenames for different data sets.
```

Rust Arrow schema implementations and available for the follow data types (enhanced performance):
Expand All @@ -166,6 +182,7 @@ Any stored data can then we read back into memory:
from nautilus_trader.core.datetime import dt_to_unix_nanos
import pandas as pd


start = dt_to_unix_nanos(pd.Timestamp("2020-01-03", tz=pytz.utc))
end = dt_to_unix_nanos(pd.Timestamp("2020-01-04", tz=pytz.utc))

Expand All @@ -180,6 +197,7 @@ The following example shows how to achieve this by initializing a `BacktestDataC
from nautilus_trader.config import BacktestDataConfig
from nautilus_trader.model.data import OrderBookDelta


data_config = BacktestDataConfig(
catalog_path=str(catalog.path),
data_cls=OrderBookDelta,
Expand Down
5 changes: 5 additions & 0 deletions docs/getting_started/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ If everything worked correctly, you should be able to see a single EUR/USD instr
```python
from nautilus_trader.persistence.catalog import ParquetDataCatalog


# You can also use `ParquetDataCatalog.from_env()` which will use the `NAUTILUS_PATH` environment variable
# catalog = ParquetDataCatalog.from_env()
catalog = ParquetDataCatalog("./catalog")
Expand Down Expand Up @@ -191,6 +192,7 @@ FX trading is typically done on margin with Non-Deliverable Forward, Swap or CFD
```python
from nautilus_trader.config import BacktestVenueConfig


venue = BacktestVenueConfig(
name="SIM",
oms_type="NETTING",
Expand Down Expand Up @@ -221,6 +223,7 @@ adding the `QuoteTick`(s) for our EUR/USD instrument:
from nautilus_trader.config import BacktestDataConfig
from nautilus_trader.model.data import QuoteTick


data = BacktestDataConfig(
catalog_path=str(catalog.path),
data_cls=QuoteTick,
Expand All @@ -243,6 +246,7 @@ from nautilus_trader.config import BacktestEngineConfig
from nautilus_trader.config import ImportableStrategyConfig
from nautilus_trader.config import LoggingConfig


engine = BacktestEngineConfig(
strategies=[
ImportableStrategyConfig(
Expand Down Expand Up @@ -302,6 +306,7 @@ The engine(s) can provide additional reports and information.
from nautilus_trader.backtest.engine import BacktestEngine
from nautilus_trader.model.identifiers import Venue


engine: BacktestEngine = node.get_engine(config.id)

engine.trader.generate_order_fills_report()
Expand Down
31 changes: 28 additions & 3 deletions nautilus_trader/persistence/catalog/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,19 +228,26 @@ def write_chunk(
data: list[Data],
data_cls: type[Data],
instrument_id: str | None = None,
basename_template: str = "part-{i}",
**kwargs: Any,
) -> None:
table = self._objects_to_table(data, data_cls=data_cls)
path = self._make_path(data_cls=data_cls, instrument_id=instrument_id)
kw = dict(**self.dataset_kwargs, **kwargs)

if "partitioning" not in kw:
self._fast_write(table=table, path=path, fs=self.fs)
self._fast_write(
table=table,
path=path,
fs=self.fs,
basename_template=basename_template,
)
else:
# Write parquet file
pds.write_dataset(
data=table,
base_dir=path,
basename_template=basename_template,
format="parquet",
filesystem=self.fs,
min_rows_per_group=self.min_rows_per_group,
Expand All @@ -254,7 +261,7 @@ def _fast_write(
table: pa.Table,
path: str,
fs: fsspec.AbstractFileSystem,
basename_template: str = "part-{i}",
basename_template: str,
) -> None:
name = basename_template.format(i=0)
fs.mkdirs(path, exist_ok=True)
Expand All @@ -265,7 +272,12 @@ def _fast_write(
row_group_size=self.max_rows_per_group,
)

def write_data(self, data: list[Data | Event], **kwargs: Any) -> None:
def write_data(
self,
data: list[Data | Event],
basename_template: str = "part-{i}",
**kwargs: Any,
) -> None:
"""
Write the given `data` to the catalog.
Expand All @@ -277,9 +289,21 @@ def write_data(self, data: list[Data | Event], **kwargs: Any) -> None:
----------
data : list[Data | Event]
The data or event objects to be written to the catalog.
basename_template : str, default 'part-{i}'
A template string used to generate basenames of written data files.
The token '{i}' will be replaced with an automatically incremented
integer as files are partitioned.
If not specified, it defaults to 'part-{i}' + the default extension '.parquet'.
kwargs : Any
Additional keyword arguments to be passed to the `write_chunk` method.
Warnings
--------
Any existing data which already exists under a filename will be overwritten.
If a `basename_template` is not provided, then its very likely existing data for the data type and instrument ID will
be overwritten. To prevent data loss, ensure that the `basename_template` (or the default naming scheme)
generates unique filenames for different data sets.
Notes
-----
- All data of the same type is expected to be monotonically increasing, or non-decreasing
Expand Down Expand Up @@ -311,6 +335,7 @@ def key(obj: Any) -> tuple[str, str | None]:
data=list(single_type),
data_cls=name_to_cls[cls_name],
instrument_id=instrument_id,
basename_template=basename_template,
**kwargs,
)

Expand Down

0 comments on commit 63e1b7b

Please sign in to comment.