Skip to content

Commit

Permalink
Continue Databento historical client in Rust
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Jan 21, 2024
1 parent d10e239 commit a7a5851
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 19 deletions.
82 changes: 68 additions & 14 deletions nautilus_core/adapters/src/databento/python/historical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,39 +13,45 @@
// limitations under the License.
// -------------------------------------------------------------------------------------------------

use databento::{self};
use std::{num::NonZeroU64, sync::Arc};

use databento::{self, historical::timeseries::GetRangeParams};
use dbn;
use nautilus_core::python::to_pyvalue_err;
use pyo3::{exceptions::PyException, prelude::*, types::PyDict};
use time::macros::datetime;
use tokio::sync::Mutex;

#[cfg_attr(
feature = "python",
pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
)]
pub struct DatabentoHistoricalClient {
key: String,
inner: Arc<Mutex<databento::HistoricalClient>>,
}

#[pymethods]
impl DatabentoHistoricalClient {
#[new]
pub fn py_new(key: String) -> PyResult<Self> {
Ok(Self { key })
}

#[pyo3(name = "get_dataset_range")]
fn py_get_dataset_range<'py>(&self, py: Python<'py>, dataset: &str) -> PyResult<&'py PyAny> {
let dataset_clone = dataset.to_string();

// TODO: Cheaper way of accessing client as mutable `Send` (Arc<Mutex alone doesn't work)
let mut client = databento::HistoricalClient::builder()
.key(self.key.clone())
let client = databento::HistoricalClient::builder()
.key(key)
.map_err(to_pyvalue_err)?
.build()
.map_err(to_pyvalue_err)?;

pyo3_asyncio::tokio::future_into_py(py, async move {
let response = client.metadata().get_dataset_range(&dataset_clone).await;
Ok(Self {
inner: Arc::new(Mutex::new(client)),
})
}

#[pyo3(name = "get_dataset_range")]
fn py_get_dataset_range<'py>(&self, py: Python<'py>, dataset: String) -> PyResult<&'py PyAny> {
let client = self.inner.clone();

pyo3_asyncio::tokio::future_into_py(py, async move {
let mut client = client.lock().await;
let response = client.metadata().get_dataset_range(&dataset).await;
match response {
Ok(res) => Python::with_gil(|py| {
let dict = PyDict::new(py);
Expand All @@ -59,4 +65,52 @@ impl DatabentoHistoricalClient {
}
})
}

#[pyo3(name = "get_range")]
fn py_get_range(
&self,
_py: Python<'_>,
dataset: String,
symbols: String,
schema: dbn::Schema,
_limit: Option<usize>,
// ) -> PyResult<&'py PyAny> {
) {
let _client = self.inner.clone();

let _params = GetRangeParams::builder()
.dataset(dataset)
.date_time_range((
datetime!(2022-06-06 00:00 UTC),
datetime!(2022-06-10 12:10 UTC),
))
.symbols(symbols)
.schema(schema)
.limit(NonZeroU64::new(1))
.build();

// WIP
// pyo3_asyncio::tokio::future_into_py(py, async move {
// let mut client = client.lock().await;
// let decoder = client
// .timeseries()
// .get_range(&params)
// .await
// .map_err(to_pyvalue_err)?;
//
// let mut result: Vec<&dbn::TradeMsg> = Vec::new();
// for rec in decoder
// .decode_record::<dbn::TradeMsg>()
// .await
// .map_err(to_pyvalue_err)?
// {
// // TODO: Parse event record to a Nautilus data object
// result.push(rec);
// }
//
// let output = Vec::new();
// let py_list = output.into_py(py);
// Ok(py_list)
// });
}
}
8 changes: 3 additions & 5 deletions nautilus_trader/adapters/databento/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from nautilus_trader.common.clock import LiveClock
from nautilus_trader.common.component import MessageBus
from nautilus_trader.common.enums import LogColor
from nautilus_trader.core import nautilus_pyo3
from nautilus_trader.core.correctness import PyCondition
from nautilus_trader.core.nautilus_pyo3 import is_within_last_24_hours
from nautilus_trader.core.nautilus_pyo3 import last_weekday_nanos
Expand Down Expand Up @@ -118,6 +119,7 @@ def __init__(
self._mbo_subscriptions_delay: float | None = config.mbo_subscriptions_delay

# Clients
self._http_client_pyo3 = nautilus_pyo3.DatabentoHistoricalClient(http_client.key)
self._http_client: databento.Historical = http_client
self._live_clients: dict[Dataset, databento.Live] = {}
self._live_clients_mbo: dict[Dataset, databento.Live] = {}
Expand Down Expand Up @@ -244,11 +246,7 @@ async def _ensure_subscribed_for_instrument(self, instrument_id: InstrumentId) -
await self._subscribe_instrument(instrument_id)

async def _get_dataset_range(self, dataset: Dataset) -> tuple[pd.Timestamp, pd.Timestamp]:
response = await self._loop.run_in_executor(
None,
self._http_client.metadata.get_dataset_range,
dataset,
)
response = await self._http_client_pyo3.get_dataset_range(dataset)

start = pd.Timestamp(response["start_date"], tz=pytz.utc)
end = pd.Timestamp(response["end_date"], tz=pytz.utc)
Expand Down
9 changes: 9 additions & 0 deletions nautilus_trader/core/nautilus_pyo3.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1626,3 +1626,12 @@ class DatabentoDataLoader:
self,
path: PathLike[str] | str,
) -> None: ...


class DatabentoHistoricalClient:
def __init__(
self,
key: str,
) -> None: ...

async def get_dataset_range(self, dataset: str) -> dict[str, str]: ...

0 comments on commit a7a5851

Please sign in to comment.