diff --git a/RELEASES.md b/RELEASES.md index d033084d9d1d..11c05205959c 100644 --- a/RELEASES.md +++ b/RELEASES.md @@ -7,11 +7,13 @@ Released on TBD (UTC). - Improved `register_serializable object` to also add type to internal `_EXTERNAL_PUBLIHSABLE_TYPES` ### Breaking Changes +- Changed `StreamingConfig.include_types` from `tuple[str]` to `list[type]` (better alignment with other type filters) - Consolidated `clock` module into `component` module (reduce binary wheel size) - Consolidated `logging` module into `component` module (reduce binary wheel size) ### Fixes - Fixed Arrow serialization of `OrderUpdated` (`trigger_price` type was incorrect), thanks @benjaminsingleton +- Fixed `StreamingConfig.include_types` behavior (was not being honored for instrument writers), thanks for reporting @doublier1 --- diff --git a/examples/live/binance/binance_spot_market_maker.py b/examples/live/binance/binance_spot_market_maker.py index b3b1c820cba3..623f4f53d231 100644 --- a/examples/live/binance/binance_spot_market_maker.py +++ b/examples/live/binance/binance_spot_market_maker.py @@ -69,6 +69,7 @@ # snapshot_orders=True, # snapshot_positions=True, # snapshot_positions_interval=5.0, + # streaming=StreamingConfig(catalog_path="catalog"), data_clients={ "BINANCE": BinanceDataClientConfig( api_key=None, # 'BINANCE_API_KEY' env var diff --git a/nautilus_trader/config/common.py b/nautilus_trader/config/common.py index 6f28c7d46eae..12a71f3e4c77 100644 --- a/nautilus_trader/config/common.py +++ b/nautilus_trader/config/common.py @@ -482,6 +482,9 @@ class StreamingConfig(NautilusConfig, frozen=True): The flush interval (milliseconds) for writing chunks. replace_existing: bool, default False If any existing feather files should be replaced. + include_types : list[type], optional + A list of Arrow serializable types to write. + If this is specified then *only* the included types will be written. """ @@ -490,7 +493,7 @@ class StreamingConfig(NautilusConfig, frozen=True): fs_storage_options: dict | None = None flush_interval_ms: int | None = None replace_existing: bool = False - include_types: list[str] | None = None + include_types: list[type] | None = None @property def fs(self): diff --git a/nautilus_trader/persistence/writer.py b/nautilus_trader/persistence/writer.py index 20dfd54ae605..99973697e7d8 100644 --- a/nautilus_trader/persistence/writer.py +++ b/nautilus_trader/persistence/writer.py @@ -54,6 +54,9 @@ class StreamingFeatherWriter: The flush interval (milliseconds) for writing chunks. replace : bool, default False If existing files at the given `path` should be replaced. + include_types : list[type], optional + A list of Arrow serializable types to write. + If this is specified then *only* the included types will be written. """ @@ -63,7 +66,7 @@ def __init__( fs_protocol: str | None = "file", flush_interval_ms: int | None = None, replace: bool = False, - include_types: tuple[type] | None = None, + include_types: list[type] | None = None, ) -> None: self.path = path self.fs: fsspec.AbstractFileSystem = fsspec.filesystem(fs_protocol) @@ -110,8 +113,9 @@ def is_closed(self) -> bool: """ return all(self._files[table_name].closed for table_name in self._files) - def _create_writer(self, cls: type, table_name: str | None = None): - if self.include_types is not None and cls.__name__ not in self.include_types: + def _create_writer(self, cls: type, table_name: str | None = None) -> None: + # Check if an include types filter has been specified + if self.include_types is not None and cls not in self.include_types: return table_name = class_to_filename(cls) if not table_name else table_name @@ -134,9 +138,11 @@ def _create_writers(self) -> None: self._create_writer(cls=cls) def _create_instrument_writer(self, cls: type, obj: Any) -> None: - """ - Create an arrow writer with instrument specific metadata in the schema. - """ + # Check if an include types filter has been specified + if self.include_types is not None and cls not in self.include_types: + return + + # Create an arrow writer with instrument specific metadata in the schema metadata: dict[bytes, bytes] = self._extract_obj_metadata(obj) mapped_cls = {OrderBookDeltas: OrderBookDelta}.get(cls, cls) schema = self._schemas[mapped_cls].with_metadata(metadata) diff --git a/nautilus_trader/system/kernel.py b/nautilus_trader/system/kernel.py index 5c130c8ae7d5..995038eca99d 100644 --- a/nautilus_trader/system/kernel.py +++ b/nautilus_trader/system/kernel.py @@ -474,7 +474,7 @@ def _setup_streaming(self, config: StreamingConfig) -> None: path=path, fs_protocol=config.fs_protocol, flush_interval_ms=config.flush_interval_ms, - include_types=config.include_types, # type: ignore # TODO(cs) + include_types=config.include_types, ) self._trader.subscribe("*", self._writer.write) self._log.info(f"Writing data & events to {path}")