Skip to content

Commit

Permalink
Optimize Arrow encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Jan 7, 2024
1 parent d07d839 commit ba8d021
Show file tree
Hide file tree
Showing 13 changed files with 210 additions and 89 deletions.
7 changes: 7 additions & 0 deletions nautilus_core/model/src/python/data/bar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use std::{
collections::{hash_map::DefaultHasher, HashMap},
hash::{Hash, Hasher},
str::FromStr,
};

use nautilus_core::{
Expand Down Expand Up @@ -116,6 +117,12 @@ impl BarType {
fn py_fully_qualified_name() -> String {
format!("{}:{}", PY_MODULE_MODEL, stringify!(BarType))
}

#[staticmethod]
#[pyo3(name = "from_str")]
fn py_from_str(value: &str) -> PyResult<Self> {
BarType::from_str(value).map_err(to_pyvalue_err)
}
}

#[pymethods]
Expand Down
77 changes: 55 additions & 22 deletions nautilus_core/persistence/src/python/backend/transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use datafusion::arrow::{
};
use nautilus_core::python::to_pyvalue_err;
use nautilus_model::data::{
bar::Bar, delta::OrderBookDelta, is_monotonically_increasing_by_init, quote::QuoteTick,
trade::TradeTick,
bar::Bar, delta::OrderBookDelta, depth::OrderBookDepth10, is_monotonically_increasing_by_init,
quote::QuoteTick, trade::TradeTick,
};
use pyo3::{
exceptions::{PyRuntimeError, PyTypeError, PyValueError},
Expand Down Expand Up @@ -102,21 +102,20 @@ impl DataTransformer {
}

/// Transforms the given record `batches` into Python `bytes`.
fn record_batches_to_pybytes(
fn record_batch_to_pybytes(
py: Python<'_>,
batches: Vec<RecordBatch>,
batch: RecordBatch,
schema: Schema,
) -> PyResult<Py<PyBytes>> {
// Create a cursor to write to a byte array in memory
let mut cursor = Cursor::new(Vec::new());
{
let mut writer = StreamWriter::try_new(&mut cursor, &schema)
.map_err(|err| PyRuntimeError::new_err(format!("{err}")))?;
for batch in batches {
writer
.write(&batch)
.map_err(|err| PyRuntimeError::new_err(format!("{err}")))?;
}

writer
.write(&batch)
.map_err(|err| PyRuntimeError::new_err(format!("{err}")))?;

writer
.finish()
Expand All @@ -137,6 +136,7 @@ impl DataTransformer {
let cls_str: &str = cls.getattr("__name__")?.extract()?;
let result_map = match cls_str {
stringify!(OrderBookDelta) => OrderBookDelta::get_schema_map(),
stringify!(OrderBookDepth10) => OrderBookDepth10::get_schema_map(),
stringify!(QuoteTick) => QuoteTick::get_schema_map(),
stringify!(TradeTick) => TradeTick::get_schema_map(),
stringify!(Bar) => Bar::get_schema_map(),
Expand All @@ -153,7 +153,7 @@ impl DataTransformer {
/// Return Python `bytes` from the given list of 'legacy' data objects, which can be passed
/// to `pa.ipc.open_stream` to create a `RecordBatchReader`.
#[staticmethod]
pub fn pyobjects_to_batches_bytes(
pub fn pyobjects_to_record_batch_bytes(
py: Python<'_>,
data: Vec<PyObject>,
) -> PyResult<Py<PyBytes>> {
Expand All @@ -172,19 +172,19 @@ impl DataTransformer {
match data_type.as_str() {
stringify!(OrderBookDelta) => {
let deltas = Self::pyobjects_to_order_book_deltas(py, data)?;
Self::pyo3_order_book_deltas_to_batches_bytes(py, deltas)
Self::pyo3_order_book_deltas_to_record_batch_bytes(py, deltas)
}
stringify!(QuoteTick) => {
let quotes = Self::pyobjects_to_quote_ticks(py, data)?;
Self::pyo3_quote_ticks_to_batch_bytes(py, quotes)
Self::pyo3_quote_ticks_to_record_batch_bytes(py, quotes)
}
stringify!(TradeTick) => {
let trades = Self::pyobjects_to_trade_ticks(py, data)?;
Self::pyo3_trade_ticks_to_batches_bytes(py, trades)
Self::pyo3_trade_ticks_to_record_batch_bytes(py, trades)
}
stringify!(Bar) => {
let bars = Self::pyobjects_to_bars(py, data)?;
Self::pyo3_bars_to_batches_bytes(py, bars)
Self::pyo3_bars_to_record_batch_bytes(py, bars)
}
_ => Err(PyValueError::new_err(format!(
"unsupported data type: {data_type}"
Expand All @@ -193,7 +193,7 @@ impl DataTransformer {
}

#[staticmethod]
pub fn pyo3_order_book_deltas_to_batches_bytes(
pub fn pyo3_order_book_deltas_to_record_batch_bytes(
py: Python<'_>,
data: Vec<OrderBookDelta>,
) -> PyResult<Py<PyBytes>> {
Expand All @@ -216,14 +216,44 @@ impl DataTransformer {
match result {
Ok(batch) => {
let schema = OrderBookDelta::get_schema(Some(metadata));
Self::record_batches_to_pybytes(py, vec![batch], schema)
Self::record_batch_to_pybytes(py, batch, schema)
}
Err(e) => Err(to_pyvalue_err(e)),
}
}

#[staticmethod]
pub fn pyo3_order_book_depth10_to_record_batch_bytes(
py: Python<'_>,
data: Vec<OrderBookDepth10>,
) -> PyResult<Py<PyBytes>> {
if data.is_empty() {
return Err(PyValueError::new_err(ERROR_EMPTY_DATA));
}

// Take first element and extract metadata
// SAFETY: Unwrap safe as already checked that `data` not empty
let first = data.first().unwrap();
let metadata = OrderBookDepth10::get_metadata(
&first.instrument_id,
first.bids[0].price.precision,
first.bids[0].size.precision,
);

let result: Result<RecordBatch, ArrowError> =
OrderBookDepth10::encode_batch(&metadata, &data);

match result {
Ok(batch) => {
let schema = OrderBookDepth10::get_schema(Some(metadata));
Self::record_batch_to_pybytes(py, batch, schema)
}
Err(e) => Err(to_pyvalue_err(e)),
}
}

#[staticmethod]
pub fn pyo3_quote_ticks_to_batch_bytes(
pub fn pyo3_quote_ticks_to_record_batch_bytes(
py: Python<'_>,
data: Vec<QuoteTick>,
) -> PyResult<Py<PyBytes>> {
Expand All @@ -245,14 +275,14 @@ impl DataTransformer {
match result {
Ok(batch) => {
let schema = QuoteTick::get_schema(Some(metadata));
Self::record_batches_to_pybytes(py, vec![batch], schema)
Self::record_batch_to_pybytes(py, batch, schema)
}
Err(e) => Err(to_pyvalue_err(e)),
}
}

#[staticmethod]
pub fn pyo3_trade_ticks_to_batches_bytes(
pub fn pyo3_trade_ticks_to_record_batch_bytes(
py: Python<'_>,
data: Vec<TradeTick>,
) -> PyResult<Py<PyBytes>> {
Expand All @@ -274,14 +304,17 @@ impl DataTransformer {
match result {
Ok(batch) => {
let schema = TradeTick::get_schema(Some(metadata));
Self::record_batches_to_pybytes(py, vec![batch], schema)
Self::record_batch_to_pybytes(py, batch, schema)
}
Err(e) => Err(to_pyvalue_err(e)),
}
}

#[staticmethod]
pub fn pyo3_bars_to_batches_bytes(py: Python<'_>, data: Vec<Bar>) -> PyResult<Py<PyBytes>> {
pub fn pyo3_bars_to_record_batch_bytes(
py: Python<'_>,
data: Vec<Bar>,
) -> PyResult<Py<PyBytes>> {
if data.is_empty() {
return Err(to_pyvalue_err(ERROR_EMPTY_DATA));
}
Expand All @@ -300,7 +333,7 @@ impl DataTransformer {
match result {
Ok(batch) => {
let schema = Bar::get_schema(Some(metadata));
Self::record_batches_to_pybytes(py, vec![batch], schema)
Self::record_batch_to_pybytes(py, batch, schema)
}
Err(e) => Err(to_pyvalue_err(e)),
}
Expand Down
2 changes: 1 addition & 1 deletion nautilus_core/persistence/src/python/wranglers/bar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl BarDataWrangler {
self.size_precision
}

fn process_record_batches_bytes(&self, _py: Python, data: &[u8]) -> PyResult<Vec<Bar>> {
fn process_record_batch_bytes(&self, _py: Python, data: &[u8]) -> PyResult<Vec<Bar>> {
// Create a StreamReader (from Arrow IPC)
let cursor = Cursor::new(data);
let reader = match StreamReader::try_new(cursor, None) {
Expand Down
2 changes: 1 addition & 1 deletion nautilus_core/persistence/src/python/wranglers/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl OrderBookDeltaDataWrangler {
self.size_precision
}

fn process_record_batches_bytes(
fn process_record_batch_bytes(
&self,
_py: Python,
data: &[u8],
Expand Down
2 changes: 1 addition & 1 deletion nautilus_core/persistence/src/python/wranglers/quote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl QuoteTickDataWrangler {
self.size_precision
}

fn process_record_batches_bytes(&self, _py: Python, data: &[u8]) -> PyResult<Vec<QuoteTick>> {
fn process_record_batch_bytes(&self, _py: Python, data: &[u8]) -> PyResult<Vec<QuoteTick>> {
// Create a StreamReader (from Arrow IPC)
let cursor = Cursor::new(data);
let reader = match StreamReader::try_new(cursor, None) {
Expand Down
2 changes: 1 addition & 1 deletion nautilus_core/persistence/src/python/wranglers/trade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl TradeTickDataWrangler {
self.size_precision
}

fn process_record_batches_bytes(&self, _py: Python, data: &[u8]) -> PyResult<Vec<TradeTick>> {
fn process_record_batch_bytes(&self, _py: Python, data: &[u8]) -> PyResult<Vec<TradeTick>> {
// Create a StreamReader (from Arrow IPC)
let cursor = Cursor::new(data);
let reader = match StreamReader::try_new(cursor, None) {
Expand Down
22 changes: 10 additions & 12 deletions nautilus_trader/core/nautilus_pyo3.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ from decimal import Decimal
from enum import Enum
from typing import Any

from pyarrow import RecordBatch

from nautilus_trader.core.data import Data


Expand Down Expand Up @@ -1280,17 +1278,17 @@ class DataTransformer:
@staticmethod
def get_schema_map(data_cls: type) -> dict[str, str]: ...
@staticmethod
def pyobjects_to_batches_bytes(data: list[Data]) -> bytes: ...
def pyobjects_to_record_batch_bytes(data: list[Data]) -> bytes: ...
@staticmethod
def pyo3_order_book_deltas_to_batches_bytes(data: list[OrderBookDelta]) -> bytes: ...
def pyo3_order_book_deltas_to_record_batch_bytes(data: list[OrderBookDelta]) -> bytes: ...
@staticmethod
def pyo3_quote_ticks_to_batch_bytes(data: list[QuoteTick]) -> bytes: ...
def pyo3_order_book_depth10_to_record_batch_bytes(data: list[OrderBookDepth10]) -> bytes: ...
@staticmethod
def pyo3_trade_ticks_to_batches_bytes(data: list[TradeTick]) -> bytes: ...
def pyo3_quote_ticks_to_record_batch_bytes(data: list[QuoteTick]) -> bytes: ...
@staticmethod
def pyo3_bars_to_batches_bytes(data: list[Bar]) -> bytes: ...
def pyo3_trade_ticks_to_record_batch_bytes(data: list[TradeTick]) -> bytes: ...
@staticmethod
def record_batches_to_pybytes(batches: list[RecordBatch], schema: Any) -> bytes: ...
def pyo3_bars_to_record_batch_bytes(data: list[Bar]) -> bytes: ...

class BarDataWrangler:
def __init__(
Expand All @@ -1305,7 +1303,7 @@ class BarDataWrangler:
def price_precision(self) -> int: ...
@property
def size_precision(self) -> int: ...
def process_record_batches_bytes(self, data: bytes) -> list[Bar]: ...
def process_record_batch_bytes(self, data: bytes) -> list[Bar]: ...

class OrderBookDeltaDataWrangler:
def __init__(
Expand All @@ -1320,7 +1318,7 @@ class OrderBookDeltaDataWrangler:
def price_precision(self) -> int: ...
@property
def size_precision(self) -> int: ...
def process_record_batches_bytes(self, data: bytes) -> list[OrderBookDelta]: ...
def process_record_batch_bytes(self, data: bytes) -> list[OrderBookDelta]: ...

class QuoteTickDataWrangler:
def __init__(
Expand All @@ -1335,7 +1333,7 @@ class QuoteTickDataWrangler:
def price_precision(self) -> int: ...
@property
def size_precision(self) -> int: ...
def process_record_batches_bytes(self, data: bytes) -> list[QuoteTick]: ...
def process_record_batch_bytes(self, data: bytes) -> list[QuoteTick]: ...

class TradeTickDataWrangler:
def __init__(
Expand All @@ -1350,7 +1348,7 @@ class TradeTickDataWrangler:
def price_precision(self) -> int: ...
@property
def size_precision(self) -> int: ...
def process_record_batches_bytes(self, data: bytes) -> list[TradeTick]: ...
def process_record_batch_bytes(self, data: bytes) -> list[TradeTick]: ...


###################################################################################################
Expand Down
Loading

0 comments on commit ba8d021

Please sign in to comment.