Skip to content

Commit

Permalink
Implement DatabentoLiveClient session handling
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Jan 30, 2024
1 parent 991fb49 commit c93eee9
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 65 deletions.
27 changes: 13 additions & 14 deletions examples/live/databento/databento_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from nautilus_trader.model.data import OrderBookDeltas
from nautilus_trader.model.data import QuoteTick
from nautilus_trader.model.data import TradeTick
from nautilus_trader.model.enums import BookType
from nautilus_trader.model.identifiers import InstrumentId
from nautilus_trader.model.identifiers import TraderId
from nautilus_trader.trading.strategy import Strategy
Expand All @@ -43,15 +42,15 @@
# For correct subscription operation, you must specify all instruments to be immediately
# subscribed for as part of the data client configuration
instrument_ids = [
# InstrumentId.from_str("AAPL.XCHI"),
# InstrumentId.from_str("ESH4.GLBX"),
InstrumentId.from_str("AAPL.XCHI"),
InstrumentId.from_str("ESH4.GLBX"),
InstrumentId.from_str("ESM4.GLBX"),
]

# Configure the trading node
config_node = TradingNodeConfig(
trader_id=TraderId("TESTER-001"),
logging=LoggingConfig(log_level="INFO"),
logging=LoggingConfig(log_level="DEBUG"),
exec_engine=LiveExecEngineConfig(
reconciliation=False, # Not applicable
inflight_check_interval_ms=0, # Not applicable
Expand Down Expand Up @@ -131,7 +130,7 @@ def on_start(self) -> None:
"""
Actions to be performed when the strategy is started.
Here we specify the 'DATABENTO' client for subscriptions.
Here we specify the 'DATABENTO' client_id for subscriptions.
"""
for instrument_id in self.instrument_ids:
Expand All @@ -140,15 +139,15 @@ def on_start(self) -> None:
# book_type=BookType.L3_MBO,
# client_id=DATABENTO_CLIENT_ID,
# )
self.subscribe_order_book_snapshots(
instrument_id=instrument_id,
book_type=BookType.L2_MBP,
depth=10,
client_id=DATABENTO_CLIENT_ID,
interval_ms=100,
)
# self.subscribe_quote_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID)
# self.subscribe_trade_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID)
# self.subscribe_order_book_snapshots(
# instrument_id=instrument_id,
# book_type=BookType.L2_MBP,
# depth=10,
# client_id=DATABENTO_CLIENT_ID,
# interval_ms=100,
# )
self.subscribe_quote_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID)
self.subscribe_trade_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID)
# self.request_quote_ticks(instrument_id)
# self.request_trade_ticks(instrument_id)
# self.request_bars(BarType.from_str(f"{instrument_id}-1-MINUTE-LAST-EXTERNAL"))
Expand Down
1 change: 1 addition & 0 deletions nautilus_core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions nautilus_core/adapters/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pyo3-asyncio = { workspace = true, optional = true }
rand = { workspace = true }
rust_decimal = { workspace = true }
rust_decimal_macros = { workspace = true }
tracing = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
Expand Down
14 changes: 11 additions & 3 deletions nautilus_core/adapters/src/databento/python/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::sync::{Arc, OnceLock};

use anyhow::Result;
use databento::live::Subscription;
use dbn::{PitSymbolMap, RType, Record, SymbolIndex};
use dbn::{PitSymbolMap, RType, Record, SymbolIndex, VersionUpgradePolicy};
use indexmap::IndexMap;
use log::{error, info};
use nautilus_core::python::to_pyruntime_err;
Expand Down Expand Up @@ -59,6 +59,7 @@ impl DatabentoLiveClient {
databento::LiveClient::builder()
.key(&self.key)?
.dataset(&self.dataset)
.upgrade_policy(VersionUpgradePolicy::Upgrade)
.build()
.await
}
Expand Down Expand Up @@ -109,6 +110,8 @@ impl DatabentoLiveClient {
let arc_client = self.get_inner_client().map_err(to_pyruntime_err)?;

pyo3_asyncio::tokio::future_into_py(py, async move {
// TODO: Attempt to obtain the mutex guard, if the client has already started then
// this will not be possible currently.
let mut client = arc_client.lock().await;

// TODO: This can be tidied up, conditionally calling `if let Some(start)` on
Expand All @@ -130,6 +133,9 @@ impl DatabentoLiveClient {
.build(),
};

// TODO: Temporary debug logging
println!("{:?}", subscription);

client
.subscribe(&subscription)
.await
Expand All @@ -148,7 +154,9 @@ impl DatabentoLiveClient {
let mut client = arc_client.lock().await;
let mut symbol_map = PitSymbolMap::new();

while let Some(record) = client.next_record().await.map_err(to_pyvalue_err)? {
client.start().await.map_err(to_pyruntime_err)?;

while let Some(record) = client.next_record().await.map_err(to_pyruntime_err)? {
let rtype = record.rtype().expect("Invalid `rtype`");
match rtype {
RType::SymbolMapping => {
Expand All @@ -163,7 +171,7 @@ impl DatabentoLiveClient {
continue;
}
RType::System => {
println!("{record:?}"); // TODO: Just print stderr for now
println!("{record:?}"); // TODO: Just print stdout for now
info!("{:?}", record);
continue;
}
Expand Down
11 changes: 6 additions & 5 deletions nautilus_trader/adapters/databento/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from pathlib import Path

from nautilus_trader.adapters.databento.enums import DatabentoSchema
from nautilus_trader.adapters.databento.types import DatabentoPublisher
from nautilus_trader.core.correctness import PyCondition
from nautilus_trader.model.data import BarType
Expand Down Expand Up @@ -79,7 +80,7 @@ def nautilus_instrument_id_from_databento(
return InstrumentId(Symbol(raw_symbol), Venue(publisher.venue))


def databento_schema_from_nautilus_bar_type(bar_type: BarType) -> str:
def databento_schema_from_nautilus_bar_type(bar_type: BarType) -> DatabentoSchema:
"""
Return the Databento bar aggregate schema string for the given Nautilus `bar_type`.
Expand Down Expand Up @@ -117,13 +118,13 @@ def databento_schema_from_nautilus_bar_type(bar_type: BarType) -> str:

match bar_type.spec.aggregation:
case BarAggregation.SECOND:
return "ohlcv-1s"
return DatabentoSchema.OHLCV_1S
case BarAggregation.MINUTE:
return "ohlcv-1m"
return DatabentoSchema.OHLCV_1M
case BarAggregation.HOUR:
return "ohlcv-1h"
return DatabentoSchema.OHLCV_1H
case BarAggregation.DAY:
return "ohlcv-1d"
return DatabentoSchema.OHLCV_1D
case _:
raise ValueError(
f"Invalid bar type '{bar_type}'. "
Expand Down
106 changes: 70 additions & 36 deletions nautilus_trader/adapters/databento/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from nautilus_trader.adapters.databento.constants import DATABENTO_CLIENT_ID
from nautilus_trader.adapters.databento.constants import PUBLISHERS_PATH
from nautilus_trader.adapters.databento.enums import DatabentoRecordFlags
from nautilus_trader.adapters.databento.enums import DatabentoSchema
from nautilus_trader.adapters.databento.loaders import DatabentoDataLoader
from nautilus_trader.adapters.databento.providers import DatabentoInstrumentProvider
from nautilus_trader.adapters.databento.types import Dataset
Expand Down Expand Up @@ -259,7 +260,7 @@ def _get_live_client_mbo(self, dataset: Dataset) -> nautilus_pyo3.DatabentoLiveC

return live_client

def _check_live_client_started(
async def _check_live_client_started(
self,
dataset: Dataset,
live_client: nautilus_pyo3.DatabentoLiveClient,
Expand Down Expand Up @@ -365,11 +366,15 @@ async def _subscribe_instrument(self, instrument_id: InstrumentId) -> None:
try:
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
live_client = self._get_live_client(dataset)
await live_client.subscribe(
schema="definition",
symbols=instrument_id.symbol.value,
future = asyncio.ensure_future(
live_client.subscribe(
schema=DatabentoSchema.DEFINITION.value,
symbols=instrument_id.symbol.value,
),
)
self._check_live_client_started(dataset, live_client)
self._live_client_futures.add(future)
await future
await self._check_live_client_started(dataset, live_client)
except asyncio.CancelledError:
self._log.warning("`_subscribe_instrument` was canceled while still pending.")

Expand All @@ -380,12 +385,16 @@ async def _subscribe_parent_symbols(
) -> None:
try:
live_client = self._get_live_client(dataset)
await live_client.subscribe(
schema="definition",
symbols=",".join(sorted(parent_symbols)),
stype_in="parent",
future = asyncio.ensure_future(
live_client.subscribe(
schema=DatabentoSchema.DEFINITION.value,
symbols=",".join(sorted(parent_symbols)),
stype_in="parent",
),
)
self._check_live_client_started(dataset, live_client)
self._live_client_futures.add(future)
await future
await self._check_live_client_started(dataset, live_client)
except asyncio.CancelledError:
self._log.warning("`_subscribe_parent_symbols` was canceled while still pending.")

Expand All @@ -396,11 +405,15 @@ async def _subscribe_instrument_ids(
) -> None:
try:
live_client = self._get_live_client(dataset)
await live_client.subscribe(
schema="definition",
symbols=",".join(sorted([i.symbol.value for i in instrument_ids])),
future = asyncio.ensure_future(
live_client.subscribe(
schema=DatabentoSchema.DEFINITION.value,
symbols=",".join(sorted([i.symbol.value for i in instrument_ids])),
),
)
self._check_live_client_started(dataset, live_client)
self._live_client_futures.add(future)
await future
await self._check_live_client_started(dataset, live_client)
except asyncio.CancelledError:
self._log.warning("`_subscribe_instrument_ids` was canceled while still pending.")

Expand Down Expand Up @@ -469,13 +482,18 @@ async def _subscribe_order_book_deltas_batch(

dataset: Dataset = self._loader.get_dataset_for_venue(instrument_ids[0].venue)
live_client = self._get_live_client_mbo(dataset)
await live_client.subscribe(
schema="mbo",
symbols=",".join(sorted([i.symbol.value for i in instrument_ids])),
start=0, # Must subscribe from start of week to get 'Sunday snapshot' for now
future = asyncio.ensure_future(
live_client.subscribe(
schema=DatabentoSchema.MBO.value,
symbols=",".join(sorted([i.symbol.value for i in instrument_ids])),
start=0, # Must subscribe from start of week to get 'Sunday snapshot' for now
),
)
self._live_client_futures.add(future)
await future
future = asyncio.ensure_future(live_client.start(self._handle_record))
self._live_client_futures.add(future)
await future
except asyncio.CancelledError:
self._log.warning(
"`_subscribe_order_book_deltas_batch` was canceled while still pending.",
Expand All @@ -493,9 +511,9 @@ async def _subscribe_order_book_snapshots(

match depth:
case 1:
schema = "mbp-1"
schema = DatabentoSchema.MBP_1.value
case 10:
schema = "mbp-10"
schema = DatabentoSchema.MBP_10.value
case _:
self._log.error(
f"Cannot subscribe for order book snapshots of depth {depth}, use either 1 or 10.",
Expand All @@ -504,11 +522,15 @@ async def _subscribe_order_book_snapshots(

dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
live_client = self._get_live_client(dataset)
await live_client.subscribe(
schema=schema,
symbols=",".join(sorted([instrument_id.symbol.value])),
future = asyncio.ensure_future(
live_client.subscribe(
schema=schema,
symbols=",".join(sorted([instrument_id.symbol.value])),
),
)
self._check_live_client_started(dataset, live_client)
self._live_client_futures.add(future)
await future
await self._check_live_client_started(dataset, live_client)
except asyncio.CancelledError:
self._log.warning("`_subscribe_order_book_snapshots` was canceled while still pending.")

Expand All @@ -518,11 +540,15 @@ async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None:

dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
live_client = self._get_live_client(dataset)
await live_client.subscribe(
schema="mbp-1",
symbols=",".join(sorted([instrument_id.symbol.value])),
future = asyncio.ensure_future(
live_client.subscribe(
schema=DatabentoSchema.MBP_1.value,
symbols=",".join(sorted([instrument_id.symbol.value])),
),
)
self._check_live_client_started(dataset, live_client)
self._live_client_futures.add(future)
await future
await self._check_live_client_started(dataset, live_client)
except asyncio.CancelledError:
self._log.warning("`_subscribe_quote_ticks` was canceled while still pending.")

Expand All @@ -535,11 +561,15 @@ async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None:

dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)
live_client = self._get_live_client(dataset)
await live_client.subscribe(
schema="trades",
symbols=instrument_id.symbol.value,
future = asyncio.ensure_future(
live_client.subscribe(
schema=DatabentoSchema.TRADES.value,
symbols=instrument_id.symbol.value,
),
)
self._check_live_client_started(dataset, live_client)
self._live_client_futures.add(future)
await future
await self._check_live_client_started(dataset, live_client)
except asyncio.CancelledError:
self._log.warning("`_subscribe_trade_ticks` was canceled while still pending.")

Expand All @@ -554,11 +584,15 @@ async def _subscribe_bars(self, bar_type: BarType) -> None:
return

live_client = self._get_live_client(dataset)
await live_client.subscribe(
schema=schema,
symbols=bar_type.instrument_id.symbol.value,
future = asyncio.ensure_future(
live_client.subscribe(
schema=schema.value,
symbols=bar_type.instrument_id.symbol.value,
),
)
self._check_live_client_started(dataset, live_client)
self._live_client_futures.add(future)
await future
await self._check_live_client_started(dataset, live_client)
except asyncio.CancelledError:
self._log.warning("`_subscribe_bars` was canceled while still pending.")

Expand Down
Loading

0 comments on commit c93eee9

Please sign in to comment.