Skip to content

Commit

Permalink
Fix StreamingConfig.include_types behavior and docs
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Jan 25, 2024
1 parent fd0e698 commit b09602a
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 8 deletions.
2 changes: 2 additions & 0 deletions RELEASES.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ Released on TBD (UTC).
- Improved `register_serializable object` to also add type to internal `_EXTERNAL_PUBLIHSABLE_TYPES`

### Breaking Changes
- Changed `StreamingConfig.include_types` from `tuple[str]` to `list[type]` (better alignment with other type filters)
- Consolidated `clock` module into `component` module (reduce binary wheel size)
- Consolidated `logging` module into `component` module (reduce binary wheel size)

### Fixes
- Fixed Arrow serialization of `OrderUpdated` (`trigger_price` type was incorrect), thanks @benjaminsingleton
- Fixed `StreamingConfig.include_types` behavior (was not being honored for instrument writers), thanks for reporting @doublier1

---

Expand Down
1 change: 1 addition & 0 deletions examples/live/binance/binance_spot_market_maker.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
# snapshot_orders=True,
# snapshot_positions=True,
# snapshot_positions_interval=5.0,
# streaming=StreamingConfig(catalog_path="catalog"),
data_clients={
"BINANCE": BinanceDataClientConfig(
api_key=None, # 'BINANCE_API_KEY' env var
Expand Down
5 changes: 4 additions & 1 deletion nautilus_trader/config/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,9 @@ class StreamingConfig(NautilusConfig, frozen=True):
The flush interval (milliseconds) for writing chunks.
replace_existing: bool, default False
If any existing feather files should be replaced.
include_types : list[type], optional
A list of Arrow serializable types to write.
If this is specified then *only* the included types will be written.
"""

Expand All @@ -490,7 +493,7 @@ class StreamingConfig(NautilusConfig, frozen=True):
fs_storage_options: dict | None = None
flush_interval_ms: int | None = None
replace_existing: bool = False
include_types: list[str] | None = None
include_types: list[type] | None = None

@property
def fs(self):
Expand Down
18 changes: 12 additions & 6 deletions nautilus_trader/persistence/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class StreamingFeatherWriter:
The flush interval (milliseconds) for writing chunks.
replace : bool, default False
If existing files at the given `path` should be replaced.
include_types : list[type], optional
A list of Arrow serializable types to write.
If this is specified then *only* the included types will be written.
"""

Expand All @@ -63,7 +66,7 @@ def __init__(
fs_protocol: str | None = "file",
flush_interval_ms: int | None = None,
replace: bool = False,
include_types: tuple[type] | None = None,
include_types: list[type] | None = None,
) -> None:
self.path = path
self.fs: fsspec.AbstractFileSystem = fsspec.filesystem(fs_protocol)
Expand Down Expand Up @@ -110,8 +113,9 @@ def is_closed(self) -> bool:
"""
return all(self._files[table_name].closed for table_name in self._files)

def _create_writer(self, cls: type, table_name: str | None = None):
if self.include_types is not None and cls.__name__ not in self.include_types:
def _create_writer(self, cls: type, table_name: str | None = None) -> None:
# Check if an include types filter has been specified
if self.include_types is not None and cls not in self.include_types:
return

table_name = class_to_filename(cls) if not table_name else table_name
Expand All @@ -134,9 +138,11 @@ def _create_writers(self) -> None:
self._create_writer(cls=cls)

def _create_instrument_writer(self, cls: type, obj: Any) -> None:
"""
Create an arrow writer with instrument specific metadata in the schema.
"""
# Check if an include types filter has been specified
if self.include_types is not None and cls not in self.include_types:
return

# Create an arrow writer with instrument specific metadata in the schema
metadata: dict[bytes, bytes] = self._extract_obj_metadata(obj)
mapped_cls = {OrderBookDeltas: OrderBookDelta}.get(cls, cls)
schema = self._schemas[mapped_cls].with_metadata(metadata)
Expand Down
2 changes: 1 addition & 1 deletion nautilus_trader/system/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ def _setup_streaming(self, config: StreamingConfig) -> None:
path=path,
fs_protocol=config.fs_protocol,
flush_interval_ms=config.flush_interval_ms,
include_types=config.include_types, # type: ignore # TODO(cs)
include_types=config.include_types,
)
self._trader.subscribe("*", self._writer.write)
self._log.info(f"Writing data & events to {path}")
Expand Down

0 comments on commit b09602a

Please sign in to comment.