Skip to content

Commit

Permalink
Refine DatabentoLiveClient session handling
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Jan 31, 2024
1 parent 5399de0 commit 0533208
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 11 deletions.
2 changes: 1 addition & 1 deletion examples/live/databento/databento_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
# Configure the trading node
config_node = TradingNodeConfig(
trader_id=TraderId("TESTER-001"),
logging=LoggingConfig(log_level="DEBUG"),
logging=LoggingConfig(log_level="INFO"),
exec_engine=LiveExecEngineConfig(
reconciliation=False, # Not applicable
inflight_check_interval_ms=0, # Not applicable
Expand Down
30 changes: 21 additions & 9 deletions nautilus_core/adapters/src/databento/python/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use nautilus_model::identifiers::venue::Venue;
use pyo3::prelude::*;
use time::OffsetDateTime;
use tokio::sync::Mutex;
use tokio::time::{timeout, Duration};

use crate::databento::parsing::{parse_instrument_def_msg, parse_record};
use crate::databento::types::{DatabentoPublisher, PublisherId};
Expand Down Expand Up @@ -110,8 +111,6 @@ impl DatabentoLiveClient {
let arc_client = self.get_inner_client().map_err(to_pyruntime_err)?;

pyo3_asyncio::tokio::future_into_py(py, async move {
// TODO: Attempt to obtain the mutex guard, if the client has already started then
// this will not be possible currently.
let mut client = arc_client.lock().await;

// TODO: This can be tidied up, conditionally calling `if let Some(start)` on
Expand All @@ -134,7 +133,7 @@ impl DatabentoLiveClient {
};

// TODO: Temporary debug logging
println!("{:?}", subscription);
// println!("{:?}", subscription);

client
.subscribe(&subscription)
Expand All @@ -154,26 +153,39 @@ impl DatabentoLiveClient {
let mut client = arc_client.lock().await;
let mut symbol_map = PitSymbolMap::new();

let timeout_duration = Duration::from_millis(10);
client.start().await.map_err(to_pyruntime_err)?;

while let Some(record) = client.next_record().await.map_err(to_pyruntime_err)? {
loop {
drop(client);
client = arc_client.lock().await;

let result = timeout(timeout_duration, client.next_record()).await;
let record = match result {
Ok(Ok(Some(record))) => record,
Ok(Ok(None)) => break, // Session ended normally
Ok(Err(e)) => {
// Fail session entirely for now
return Err(to_pyruntime_err(e));
}
Err(_) => continue, // Timeout
};

let rtype = record.rtype().expect("Invalid `rtype`");

match rtype {
RType::SymbolMapping => {
symbol_map.on_record(record).unwrap_or_else(|_| {
panic!("Error updating `symbol_map` with {record:?}")
});
continue;
}
RType::Error => {
eprintln!("{record:?}"); // TODO: Just print stderr for now
error!("{:?}", record);
continue;
}
RType::System => {
println!("{record:?}"); // TODO: Just print stdout for now
info!("{:?}", record);
continue;
}
RType::InstrumentDef => {
let msg = record
Expand All @@ -186,7 +198,7 @@ impl DatabentoLiveClient {

match result {
Ok(instrument) => {
// TODO: Improve the efficiency of this constant GIL aquisition
// TODO: Optimize this by reducing the frequency of acquiring the GIL if possible
Python::with_gil(|py| {
let py_obj =
convert_instrument_to_pyobject(py, instrument).unwrap();
Expand Down Expand Up @@ -217,7 +229,7 @@ impl DatabentoLiveClient {
parse_record(&record, rtype, instrument_id, 2, Some(ts_init))
.map_err(to_pyvalue_err)?;

// TODO: Improve the efficiency of this constant GIL aquisition
// TODO: Optimize this by reducing the frequency of acquiring the GIL if possible
Python::with_gil(|py| {
let py_obj = match data {
Data::Delta(delta) => delta.into_py(py),
Expand Down
3 changes: 2 additions & 1 deletion nautilus_trader/adapters/databento/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import asyncio
import datetime as dt
from typing import Any

import pandas as pd
import pytz
Expand Down Expand Up @@ -124,7 +125,7 @@ async def load_ids_async(

pyo3_instruments = []

def receive_instruments(pyo3_instrument) -> None:
def receive_instruments(pyo3_instrument: Any) -> None:
pyo3_instruments.append(pyo3_instrument)
instrument_ids_to_decode.discard(pyo3_instrument.id.value)
# TODO: Improve how to handle decode completion
Expand Down

0 comments on commit 0533208

Please sign in to comment.