diff --git a/nautilus_core/model/src/data/depth.rs b/nautilus_core/model/src/data/depth.rs index d434f1738ed0..3201d2963d7a 100644 --- a/nautilus_core/model/src/data/depth.rs +++ b/nautilus_core/model/src/data/depth.rs @@ -13,8 +13,12 @@ // limitations under the License. // ------------------------------------------------------------------------------------------------- -use std::fmt::{Display, Formatter}; +use std::{ + collections::HashMap, + fmt::{Display, Formatter}, +}; +use indexmap::IndexMap; use nautilus_core::{serialization::Serializable, time::UnixNanos}; use pyo3::prelude::*; use serde::{Deserialize, Serialize}; @@ -86,6 +90,89 @@ impl OrderBookDepth10 { ts_init, } } + + /// Returns the metadata for the type, for use with serialization formats. + pub fn get_metadata( + instrument_id: &InstrumentId, + price_precision: u8, + size_precision: u8, + ) -> HashMap { + let mut metadata = HashMap::new(); + metadata.insert("instrument_id".to_string(), instrument_id.to_string()); + metadata.insert("price_precision".to_string(), price_precision.to_string()); + metadata.insert("size_precision".to_string(), size_precision.to_string()); + metadata + } + + /// Returns the field map for the type, for use with Arrow schemas. + pub fn get_fields() -> IndexMap { + let mut metadata = IndexMap::new(); + metadata.insert("bid_price_0".to_string(), "Int64".to_string()); + metadata.insert("bid_price_1".to_string(), "Int64".to_string()); + metadata.insert("bid_price_2".to_string(), "Int64".to_string()); + metadata.insert("bid_price_3".to_string(), "Int64".to_string()); + metadata.insert("bid_price_4".to_string(), "Int64".to_string()); + metadata.insert("bid_price_5".to_string(), "Int64".to_string()); + metadata.insert("bid_price_6".to_string(), "Int64".to_string()); + metadata.insert("bid_price_7".to_string(), "Int64".to_string()); + metadata.insert("bid_price_8".to_string(), "Int64".to_string()); + metadata.insert("bid_price_9".to_string(), "Int64".to_string()); + metadata.insert("ask_price_0".to_string(), "Int64".to_string()); + metadata.insert("ask_price_1".to_string(), "Int64".to_string()); + metadata.insert("ask_price_2".to_string(), "Int64".to_string()); + metadata.insert("ask_price_3".to_string(), "Int64".to_string()); + metadata.insert("ask_price_4".to_string(), "Int64".to_string()); + metadata.insert("ask_price_5".to_string(), "Int64".to_string()); + metadata.insert("ask_price_6".to_string(), "Int64".to_string()); + metadata.insert("ask_price_7".to_string(), "Int64".to_string()); + metadata.insert("ask_price_8".to_string(), "Int64".to_string()); + metadata.insert("ask_price_9".to_string(), "Int64".to_string()); + metadata.insert("bid_size_0".to_string(), "UInt64".to_string()); + metadata.insert("bid_size_1".to_string(), "UInt64".to_string()); + metadata.insert("bid_size_2".to_string(), "UInt64".to_string()); + metadata.insert("bid_size_3".to_string(), "UInt64".to_string()); + metadata.insert("bid_size_4".to_string(), "UInt64".to_string()); + metadata.insert("bid_size_5".to_string(), "UInt64".to_string()); + metadata.insert("bid_size_6".to_string(), "UInt64".to_string()); + metadata.insert("bid_size_7".to_string(), "UInt64".to_string()); + metadata.insert("bid_size_8".to_string(), "UInt64".to_string()); + metadata.insert("bid_size_9".to_string(), "UInt64".to_string()); + metadata.insert("ask_size_0".to_string(), "UInt64".to_string()); + metadata.insert("ask_size_1".to_string(), "UInt64".to_string()); + metadata.insert("ask_size_2".to_string(), "UInt64".to_string()); + metadata.insert("ask_size_3".to_string(), "UInt64".to_string()); + metadata.insert("ask_size_4".to_string(), "UInt64".to_string()); + metadata.insert("ask_size_5".to_string(), "UInt64".to_string()); + metadata.insert("ask_size_6".to_string(), "UInt64".to_string()); + metadata.insert("ask_size_7".to_string(), "UInt64".to_string()); + metadata.insert("ask_size_8".to_string(), "UInt64".to_string()); + metadata.insert("ask_size_9".to_string(), "UInt64".to_string()); + metadata.insert("bid_count_0".to_string(), "UInt32".to_string()); + metadata.insert("bid_count_1".to_string(), "UInt32".to_string()); + metadata.insert("bid_count_2".to_string(), "UInt32".to_string()); + metadata.insert("bid_count_3".to_string(), "UInt32".to_string()); + metadata.insert("bid_count_4".to_string(), "UInt32".to_string()); + metadata.insert("bid_count_5".to_string(), "UInt32".to_string()); + metadata.insert("bid_count_6".to_string(), "UInt32".to_string()); + metadata.insert("bid_count_7".to_string(), "UInt32".to_string()); + metadata.insert("bid_count_8".to_string(), "UInt32".to_string()); + metadata.insert("bid_count_9".to_string(), "UInt32".to_string()); + metadata.insert("ask_count_0".to_string(), "UInt32".to_string()); + metadata.insert("ask_count_1".to_string(), "UInt32".to_string()); + metadata.insert("ask_count_2".to_string(), "UInt32".to_string()); + metadata.insert("ask_count_3".to_string(), "UInt32".to_string()); + metadata.insert("ask_count_4".to_string(), "UInt32".to_string()); + metadata.insert("ask_count_5".to_string(), "UInt32".to_string()); + metadata.insert("ask_count_6".to_string(), "UInt32".to_string()); + metadata.insert("ask_count_7".to_string(), "UInt32".to_string()); + metadata.insert("ask_count_8".to_string(), "UInt32".to_string()); + metadata.insert("ask_count_9".to_string(), "UInt32".to_string()); + metadata.insert("flags".to_string(), "UInt8".to_string()); + metadata.insert("sequence".to_string(), "UInt64".to_string()); + metadata.insert("ts_event".to_string(), "UInt64".to_string()); + metadata.insert("ts_init".to_string(), "UInt64".to_string()); + metadata + } } // TODO: Exact format for Debug and Display TBD diff --git a/nautilus_core/model/src/python/data/depth.rs b/nautilus_core/model/src/python/data/depth.rs index 99a4485a13ae..20f5cf333aea 100644 --- a/nautilus_core/model/src/python/data/depth.rs +++ b/nautilus_core/model/src/python/data/depth.rs @@ -14,12 +14,16 @@ // ------------------------------------------------------------------------------------------------- use std::{ - collections::hash_map::DefaultHasher, + collections::{hash_map::DefaultHasher, HashMap}, hash::{Hash, Hasher}, }; -use nautilus_core::time::UnixNanos; -use pyo3::{prelude::*, pyclass::CompareOp}; +use nautilus_core::{ + python::{serialization::from_dict_pyo3, to_pyvalue_err}, + serialization::Serializable, + time::UnixNanos, +}; +use pyo3::{prelude::*, pyclass::CompareOp, types::PyDict}; use crate::{ data::{ @@ -130,4 +134,74 @@ impl OrderBookDepth10 { fn py_fully_qualified_name() -> String { format!("{}:{}", PY_MODULE_MODEL, stringify!(OrderBookDepth10)) } + + /// Return a dictionary representation of the object. + #[pyo3(name = "as_dict")] + fn py_as_dict(&self, py: Python<'_>) -> PyResult> { + // Serialize object to JSON bytes + let json_str = serde_json::to_string(self).map_err(to_pyvalue_err)?; + // Parse JSON into a Python dictionary + let py_dict: Py = PyModule::import(py, "json")? + .call_method("loads", (json_str,), None)? + .extract()?; + Ok(py_dict) + } + + /// Return a new object from the given dictionary representation. + #[staticmethod] + #[pyo3(name = "from_dict")] + fn py_from_dict(py: Python<'_>, values: Py) -> PyResult { + from_dict_pyo3(py, values) + } + + #[staticmethod] + #[pyo3(name = "get_metadata")] + fn py_get_metadata( + instrument_id: &InstrumentId, + price_precision: u8, + size_precision: u8, + ) -> PyResult> { + Ok(Self::get_metadata( + instrument_id, + price_precision, + size_precision, + )) + } + + #[staticmethod] + #[pyo3(name = "get_fields")] + fn py_get_fields(py: Python<'_>) -> PyResult<&PyDict> { + let py_dict = PyDict::new(py); + for (k, v) in Self::get_fields() { + py_dict.set_item(k, v)?; + } + + Ok(py_dict) + } + + #[staticmethod] + #[pyo3(name = "from_json")] + fn py_from_json(data: Vec) -> PyResult { + Self::from_json_bytes(data).map_err(to_pyvalue_err) + } + + #[staticmethod] + #[pyo3(name = "from_msgpack")] + fn py_from_msgpack(data: Vec) -> PyResult { + Self::from_msgpack_bytes(data).map_err(to_pyvalue_err) + } + + /// Return JSON encoded bytes representation of the object. + #[pyo3(name = "as_json")] + fn py_as_json(&self, py: Python<'_>) -> Py { + // Unwrapping is safe when serializing a valid object + self.as_json_bytes().unwrap().into_py(py) + } + + /// Return MsgPack encoded bytes representation of the object. + #[pyo3(name = "as_msgpack")] + fn py_as_msgpack(&self, py: Python<'_>) -> Py { + // Unwrapping is safe when serializing a valid object + self.as_msgpack_bytes().unwrap().into_py(py) + } } diff --git a/nautilus_core/model/src/python/mod.rs b/nautilus_core/model/src/python/mod.rs index e75a96eca6d2..817e724d4355 100644 --- a/nautilus_core/model/src/python/mod.rs +++ b/nautilus_core/model/src/python/mod.rs @@ -223,7 +223,7 @@ mod tests { /// Loaded as nautilus_pyo3.model #[pymodule] pub fn model(_: Python<'_>, m: &PyModule) -> PyResult<()> { - // data + // Data m.add_class::()?; m.add_class::()?; m.add_class::()?; @@ -233,7 +233,7 @@ pub fn model(_: Python<'_>, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; - // enums + // Enums m.add_class::()?; m.add_class::()?; m.add_class::()?; @@ -258,7 +258,7 @@ pub fn model(_: Python<'_>, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; - // identifiers + // Identifiers m.add_class::()?; m.add_class::()?; m.add_class::()?; @@ -273,7 +273,7 @@ pub fn model(_: Python<'_>, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; - // orders + // Orders m.add_class::()?; m.add_class::()?; m.add_class::()?; @@ -286,7 +286,7 @@ pub fn model(_: Python<'_>, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; - // instruments + // Instruments m.add_class::()?; m.add_class::()?; m.add_class::()?; @@ -294,7 +294,7 @@ pub fn model(_: Python<'_>, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; - // events + // Events m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/nautilus_core/persistence/src/arrow/depth.rs b/nautilus_core/persistence/src/arrow/depth.rs new file mode 100644 index 000000000000..9d53f478ef3a --- /dev/null +++ b/nautilus_core/persistence/src/arrow/depth.rs @@ -0,0 +1,584 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (C) 2015-2023 Nautech Systems Pty Ltd. All rights reserved. +// https://nautechsystems.io +// +// Licensed under the GNU Lesser General Public License Version 3.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------------------------------- + +use std::{collections::HashMap, str::FromStr, sync::Arc}; + +use datafusion::arrow::{ + array::{Array, Int64Array, UInt32Array, UInt64Array, UInt8Array}, + datatypes::{DataType, Field, Schema}, + error::ArrowError, + record_batch::RecordBatch, +}; +use nautilus_model::{ + data::{ + depth::{OrderBookDepth10, DEPTH10_LEN}, + order::BookOrder, + }, + enums::OrderSide, + identifiers::instrument_id::InstrumentId, + types::{price::Price, quantity::Quantity}, +}; + +use super::{ + extract_column, DecodeDataFromRecordBatch, EncodingError, KEY_INSTRUMENT_ID, + KEY_PRICE_PRECISION, KEY_SIZE_PRECISION, +}; +use crate::arrow::{ArrowSchemaProvider, Data, DecodeFromRecordBatch, EncodeToRecordBatch}; + +impl ArrowSchemaProvider for OrderBookDepth10 { + fn get_schema(metadata: Option>) -> Schema { + let fields = vec![ + Field::new("bid_price_0", DataType::Int64, false), + Field::new("bid_price_1", DataType::Int64, false), + Field::new("bid_price_2", DataType::Int64, false), + Field::new("bid_price_3", DataType::Int64, false), + Field::new("bid_price_4", DataType::Int64, false), + Field::new("bid_price_5", DataType::Int64, false), + Field::new("bid_price_6", DataType::Int64, false), + Field::new("bid_price_7", DataType::Int64, false), + Field::new("bid_price_8", DataType::Int64, false), + Field::new("bid_price_9", DataType::Int64, false), + Field::new("ask_price_0", DataType::Int64, false), + Field::new("ask_price_1", DataType::Int64, false), + Field::new("ask_price_2", DataType::Int64, false), + Field::new("ask_price_3", DataType::Int64, false), + Field::new("ask_price_4", DataType::Int64, false), + Field::new("ask_price_5", DataType::Int64, false), + Field::new("ask_price_6", DataType::Int64, false), + Field::new("ask_price_7", DataType::Int64, false), + Field::new("ask_price_8", DataType::Int64, false), + Field::new("ask_price_9", DataType::Int64, false), + Field::new("bid_size_0", DataType::UInt64, false), + Field::new("bid_size_1", DataType::UInt64, false), + Field::new("bid_size_2", DataType::UInt64, false), + Field::new("bid_size_3", DataType::UInt64, false), + Field::new("bid_size_4", DataType::UInt64, false), + Field::new("bid_size_5", DataType::UInt64, false), + Field::new("bid_size_6", DataType::UInt64, false), + Field::new("bid_size_7", DataType::UInt64, false), + Field::new("bid_size_8", DataType::UInt64, false), + Field::new("bid_size_9", DataType::UInt64, false), + Field::new("ask_size_0", DataType::UInt64, false), + Field::new("ask_size_1", DataType::UInt64, false), + Field::new("ask_size_2", DataType::UInt64, false), + Field::new("ask_size_3", DataType::UInt64, false), + Field::new("ask_size_4", DataType::UInt64, false), + Field::new("ask_size_5", DataType::UInt64, false), + Field::new("ask_size_6", DataType::UInt64, false), + Field::new("ask_size_7", DataType::UInt64, false), + Field::new("ask_size_8", DataType::UInt64, false), + Field::new("ask_size_9", DataType::UInt64, false), + Field::new("bid_count_0", DataType::UInt32, false), + Field::new("bid_count_1", DataType::UInt32, false), + Field::new("bid_count_2", DataType::UInt32, false), + Field::new("bid_count_3", DataType::UInt32, false), + Field::new("bid_count_4", DataType::UInt32, false), + Field::new("bid_count_5", DataType::UInt32, false), + Field::new("bid_count_6", DataType::UInt32, false), + Field::new("bid_count_7", DataType::UInt32, false), + Field::new("bid_count_8", DataType::UInt32, false), + Field::new("bid_count_9", DataType::UInt32, false), + Field::new("ask_count_0", DataType::UInt32, false), + Field::new("ask_count_1", DataType::UInt32, false), + Field::new("ask_count_2", DataType::UInt32, false), + Field::new("ask_count_3", DataType::UInt32, false), + Field::new("ask_count_4", DataType::UInt32, false), + Field::new("ask_count_5", DataType::UInt32, false), + Field::new("ask_count_6", DataType::UInt32, false), + Field::new("ask_count_7", DataType::UInt32, false), + Field::new("ask_count_8", DataType::UInt32, false), + Field::new("ask_count_9", DataType::UInt32, false), + Field::new("flags", DataType::UInt8, false), + Field::new("sequence", DataType::UInt64, false), + Field::new("ts_event", DataType::UInt64, false), + Field::new("ts_init", DataType::UInt64, false), + ]; + + match metadata { + Some(metadata) => Schema::new_with_metadata(fields, metadata), + None => Schema::new(fields), + } + } +} + +fn parse_metadata( + metadata: &HashMap, +) -> Result<(InstrumentId, u8, u8), EncodingError> { + let instrument_id_str = metadata + .get(KEY_INSTRUMENT_ID) + .ok_or_else(|| EncodingError::MissingMetadata(KEY_INSTRUMENT_ID))?; + let instrument_id = InstrumentId::from_str(instrument_id_str) + .map_err(|e| EncodingError::ParseError(KEY_INSTRUMENT_ID, e.to_string()))?; + + let price_precision = metadata + .get(KEY_PRICE_PRECISION) + .ok_or_else(|| EncodingError::MissingMetadata(KEY_PRICE_PRECISION))? + .parse::() + .map_err(|e| EncodingError::ParseError(KEY_PRICE_PRECISION, e.to_string()))?; + + let size_precision = metadata + .get(KEY_SIZE_PRECISION) + .ok_or_else(|| EncodingError::MissingMetadata(KEY_SIZE_PRECISION))? + .parse::() + .map_err(|e| EncodingError::ParseError(KEY_SIZE_PRECISION, e.to_string()))?; + + Ok((instrument_id, price_precision, size_precision)) +} + +impl EncodeToRecordBatch for OrderBookDepth10 { + fn encode_batch( + metadata: &HashMap, + data: &[Self], + ) -> Result { + // Create array builders + let mut bid_price_builders = Vec::with_capacity(DEPTH10_LEN); + let mut ask_price_builders = Vec::with_capacity(DEPTH10_LEN); + let mut bid_size_builders = Vec::with_capacity(DEPTH10_LEN); + let mut ask_size_builders = Vec::with_capacity(DEPTH10_LEN); + let mut bid_count_builders = Vec::with_capacity(DEPTH10_LEN); + let mut ask_count_builders = Vec::with_capacity(DEPTH10_LEN); + + for _ in 0..DEPTH10_LEN { + bid_price_builders.push(Int64Array::builder(data.len())); + ask_price_builders.push(Int64Array::builder(data.len())); + bid_size_builders.push(UInt64Array::builder(data.len())); + ask_size_builders.push(UInt64Array::builder(data.len())); + bid_count_builders.push(UInt32Array::builder(data.len())); + ask_count_builders.push(UInt32Array::builder(data.len())); + } + + let mut flags_builder = UInt8Array::builder(data.len()); + let mut sequence_builder = UInt64Array::builder(data.len()); + let mut ts_event_builder = UInt64Array::builder(data.len()); + let mut ts_init_builder = UInt64Array::builder(data.len()); + + // Iterate over data + for depth in data { + for i in 0..DEPTH10_LEN { + bid_price_builders[i].append_value(depth.bids[i].price.raw); + ask_price_builders[i].append_value(depth.asks[i].price.raw); + bid_size_builders[i].append_value(depth.bids[i].size.raw); + ask_size_builders[i].append_value(depth.asks[i].size.raw); + bid_count_builders[i].append_value(depth.bid_counts[i]); + ask_count_builders[i].append_value(depth.ask_counts[i]); + } + + flags_builder.append_value(depth.flags); + sequence_builder.append_value(depth.sequence); + ts_event_builder.append_value(depth.ts_event); + ts_init_builder.append_value(depth.ts_init); + } + + // Build arrays + let bid_price_arrays = bid_price_builders + .into_iter() + .map(|mut b| Arc::new(b.finish()) as Arc) + .collect::>(); + let ask_price_arrays = ask_price_builders + .into_iter() + .map(|mut b| Arc::new(b.finish()) as Arc) + .collect::>(); + let bid_size_arrays = bid_size_builders + .into_iter() + .map(|mut b| Arc::new(b.finish()) as Arc) + .collect::>(); + let ask_size_arrays = ask_size_builders + .into_iter() + .map(|mut b| Arc::new(b.finish()) as Arc) + .collect::>(); + let bid_count_arrays = bid_count_builders + .into_iter() + .map(|mut b| Arc::new(b.finish()) as Arc) + .collect::>(); + let ask_count_arrays = ask_count_builders + .into_iter() + .map(|mut b| Arc::new(b.finish()) as Arc) + .collect::>(); + + let flags_array = Arc::new(flags_builder.finish()); + let sequence_array = Arc::new(sequence_builder.finish()); + let ts_event_array = Arc::new(ts_event_builder.finish()); + let ts_init_array = Arc::new(ts_init_builder.finish()); + + // Build record batch + let mut columns = Vec::new(); + columns.extend_from_slice(&bid_price_arrays); + columns.extend_from_slice(&ask_price_arrays); + columns.extend_from_slice(&bid_size_arrays); + columns.extend_from_slice(&ask_size_arrays); + columns.extend_from_slice(&bid_count_arrays); + columns.extend_from_slice(&ask_count_arrays); + columns.push(flags_array); + columns.push(sequence_array); + columns.push(ts_event_array); + columns.push(ts_init_array); + + RecordBatch::try_new(Self::get_schema(Some(metadata.clone())).into(), columns) + } +} + +impl DecodeFromRecordBatch for OrderBookDepth10 { + fn decode_batch( + metadata: &HashMap, + record_batch: RecordBatch, + ) -> Result, EncodingError> { + // Parse and validate metadata + let (instrument_id, price_precision, size_precision) = parse_metadata(metadata)?; + + // Extract field value arrays + let cols = record_batch.columns(); + + let bid_price_col_names = [ + "bid_price_0", + "bid_price_1", + "bid_price_2", + "bid_price_3", + "bid_price_4", + "bid_price_5", + "bid_price_6", + "bid_price_7", + "bid_price_8", + "bid_price_9", + ]; + + let ask_price_col_names = [ + "ask_price_0", + "ask_price_1", + "ask_price_2", + "ask_price_3", + "ask_price_4", + "ask_price_5", + "ask_price_6", + "ask_price_7", + "ask_price_8", + "ask_price_9", + ]; + + let bid_size_col_names = [ + "bid_size_0", + "bid_size_1", + "bid_size_2", + "bid_size_3", + "bid_size_4", + "bid_size_5", + "bid_size_6", + "bid_size_7", + "bid_size_8", + "bid_size_9", + ]; + + let ask_size_col_names = [ + "ask_size_0", + "ask_size_1", + "ask_size_2", + "ask_size_3", + "ask_size_4", + "ask_size_5", + "ask_size_6", + "ask_size_7", + "ask_size_8", + "ask_size_9", + ]; + + let bid_count_col_names = [ + "bid_count_0", + "bid_count_1", + "bid_count_2", + "bid_count_3", + "bid_count_4", + "bid_count_5", + "bid_count_6", + "bid_count_7", + "bid_count_8", + "bid_count_9", + ]; + + let ask_count_col_names = [ + "ask_count_0", + "ask_count_1", + "ask_count_2", + "ask_count_3", + "ask_count_4", + "ask_count_5", + "ask_count_6", + "ask_count_7", + "ask_count_8", + "ask_count_9", + ]; + + let mut bid_prices = Vec::with_capacity(DEPTH10_LEN); + let mut ask_prices = Vec::with_capacity(DEPTH10_LEN); + let mut bid_sizes = Vec::with_capacity(DEPTH10_LEN); + let mut ask_sizes = Vec::with_capacity(DEPTH10_LEN); + let mut bid_counts = Vec::with_capacity(DEPTH10_LEN); + let mut ask_counts = Vec::with_capacity(DEPTH10_LEN); + + for i in 0..DEPTH10_LEN { + bid_prices.push(extract_column::( + cols, + bid_price_col_names[i], + i, + DataType::Int64, + )?); + ask_prices.push(extract_column::( + cols, + ask_price_col_names[i], + DEPTH10_LEN + i, + DataType::Int64, + )?); + bid_sizes.push(extract_column::( + cols, + bid_size_col_names[i], + 2 * DEPTH10_LEN + i, + DataType::UInt64, + )?); + ask_sizes.push(extract_column::( + cols, + ask_size_col_names[i], + 3 * DEPTH10_LEN + i, + DataType::UInt64, + )?); + bid_counts.push(extract_column::( + cols, + bid_count_col_names[i], + 4 * DEPTH10_LEN + i, + DataType::UInt32, + )?); + ask_counts.push(extract_column::( + cols, + ask_count_col_names[i], + 5 * DEPTH10_LEN + i, + DataType::UInt32, + )?); + } + + let flags = + extract_column::(cols, "flags", 4 * DEPTH10_LEN + 2, DataType::UInt8)?; + let sequence = + extract_column::(cols, "sequence", 4 * DEPTH10_LEN + 3, DataType::UInt64)?; + let ts_event = + extract_column::(cols, "ts_event", 4 * DEPTH10_LEN + 4, DataType::UInt64)?; + let ts_init = + extract_column::(cols, "ts_init", 4 * DEPTH10_LEN + 5, DataType::UInt64)?; + + // Map record batch rows to vector of OrderBookDepth10 + let result: Result, EncodingError> = (0..record_batch.num_rows()) + .map(|i| { + let mut bids = [BookOrder::default(); DEPTH10_LEN]; + let mut asks = [BookOrder::default(); DEPTH10_LEN]; + let mut bid_count_arr = [0u32; DEPTH10_LEN]; + let mut ask_count_arr = [0u32; DEPTH10_LEN]; + + for j in 0..DEPTH10_LEN { + // Assuming Price and Quantity can be constructed from the raw values + bids[j] = BookOrder::new( + OrderSide::Buy, + Price::from_raw(bid_prices[j].value(i), price_precision).unwrap(), + Quantity::from_raw(bid_sizes[j].value(i), size_precision).unwrap(), + 0, // Order ID always zero + ); + + asks[j] = BookOrder::new( + OrderSide::Sell, + Price::from_raw(ask_prices[j].value(i), price_precision).unwrap(), + Quantity::from_raw(ask_sizes[j].value(i), size_precision).unwrap(), + 0, // Order ID always zero + ); + bid_count_arr[j] = bid_counts[j].value(i); + ask_count_arr[j] = ask_counts[j].value(i); + } + + Ok(Self { + instrument_id, + bids, + asks, + bid_counts: bid_count_arr, + ask_counts: ask_count_arr, + flags: flags.value(i), + sequence: sequence.value(i), + ts_event: ts_event.value(i), + ts_init: ts_init.value(i), + }) + }) + .collect(); + + result + } +} + +impl DecodeDataFromRecordBatch for OrderBookDepth10 { + fn decode_data_batch( + metadata: &HashMap, + record_batch: RecordBatch, + ) -> Result, EncodingError> { + let depths: Vec = Self::decode_batch(metadata, record_batch)?; + Ok(depths.into_iter().map(Data::from).collect()) + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Tests +//////////////////////////////////////////////////////////////////////////////// +#[cfg(test)] +mod tests { + + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use rstest::rstest; + + use super::*; + + #[rstest] + fn test_get_schema() { + let instrument_id = InstrumentId::from("AAPL.XNAS"); + let metadata = OrderBookDepth10::get_metadata(&instrument_id, 2, 0); + let schema = OrderBookDepth10::get_schema(Some(metadata.clone())); + let expected_fields = vec![ + Field::new("bid_price_0", DataType::Int64, false), + Field::new("bid_price_1", DataType::Int64, false), + Field::new("bid_price_2", DataType::Int64, false), + Field::new("bid_price_3", DataType::Int64, false), + Field::new("bid_price_4", DataType::Int64, false), + Field::new("bid_price_5", DataType::Int64, false), + Field::new("bid_price_6", DataType::Int64, false), + Field::new("bid_price_7", DataType::Int64, false), + Field::new("bid_price_8", DataType::Int64, false), + Field::new("bid_price_9", DataType::Int64, false), + Field::new("ask_price_0", DataType::Int64, false), + Field::new("ask_price_1", DataType::Int64, false), + Field::new("ask_price_2", DataType::Int64, false), + Field::new("ask_price_3", DataType::Int64, false), + Field::new("ask_price_4", DataType::Int64, false), + Field::new("ask_price_5", DataType::Int64, false), + Field::new("ask_price_6", DataType::Int64, false), + Field::new("ask_price_7", DataType::Int64, false), + Field::new("ask_price_8", DataType::Int64, false), + Field::new("ask_price_9", DataType::Int64, false), + Field::new("bid_size_0", DataType::UInt64, false), + Field::new("bid_size_1", DataType::UInt64, false), + Field::new("bid_size_2", DataType::UInt64, false), + Field::new("bid_size_3", DataType::UInt64, false), + Field::new("bid_size_4", DataType::UInt64, false), + Field::new("bid_size_5", DataType::UInt64, false), + Field::new("bid_size_6", DataType::UInt64, false), + Field::new("bid_size_7", DataType::UInt64, false), + Field::new("bid_size_8", DataType::UInt64, false), + Field::new("bid_size_9", DataType::UInt64, false), + Field::new("ask_size_0", DataType::UInt64, false), + Field::new("ask_size_1", DataType::UInt64, false), + Field::new("ask_size_2", DataType::UInt64, false), + Field::new("ask_size_3", DataType::UInt64, false), + Field::new("ask_size_4", DataType::UInt64, false), + Field::new("ask_size_5", DataType::UInt64, false), + Field::new("ask_size_6", DataType::UInt64, false), + Field::new("ask_size_7", DataType::UInt64, false), + Field::new("ask_size_8", DataType::UInt64, false), + Field::new("ask_size_9", DataType::UInt64, false), + Field::new("bid_count_0", DataType::UInt32, false), + Field::new("bid_count_1", DataType::UInt32, false), + Field::new("bid_count_2", DataType::UInt32, false), + Field::new("bid_count_3", DataType::UInt32, false), + Field::new("bid_count_4", DataType::UInt32, false), + Field::new("bid_count_5", DataType::UInt32, false), + Field::new("bid_count_6", DataType::UInt32, false), + Field::new("bid_count_7", DataType::UInt32, false), + Field::new("bid_count_8", DataType::UInt32, false), + Field::new("bid_count_9", DataType::UInt32, false), + Field::new("ask_count_0", DataType::UInt32, false), + Field::new("ask_count_1", DataType::UInt32, false), + Field::new("ask_count_2", DataType::UInt32, false), + Field::new("ask_count_3", DataType::UInt32, false), + Field::new("ask_count_4", DataType::UInt32, false), + Field::new("ask_count_5", DataType::UInt32, false), + Field::new("ask_count_6", DataType::UInt32, false), + Field::new("ask_count_7", DataType::UInt32, false), + Field::new("ask_count_8", DataType::UInt32, false), + Field::new("ask_count_9", DataType::UInt32, false), + Field::new("flags", DataType::UInt8, false), + Field::new("sequence", DataType::UInt64, false), + Field::new("ts_event", DataType::UInt64, false), + Field::new("ts_init", DataType::UInt64, false), + ]; + let expected_schema = Schema::new_with_metadata(expected_fields, metadata); + assert_eq!(schema, expected_schema); + } + + #[ignore] // WIP + #[rstest] + fn test_get_schema_map() { + let schema_map = OrderBookDepth10::get_schema_map(); + let mut expected_map = HashMap::new(); + expected_map.insert("bid_price_0".to_string(), "Int64".to_string()); + expected_map.insert("bid_price_1".to_string(), "Int64".to_string()); + expected_map.insert("bid_price_2".to_string(), "Int64".to_string()); + expected_map.insert("bid_price_3".to_string(), "Int64".to_string()); + expected_map.insert("bid_price_4".to_string(), "Int64".to_string()); + expected_map.insert("bid_price_5".to_string(), "Int64".to_string()); + expected_map.insert("bid_price_6".to_string(), "Int64".to_string()); + expected_map.insert("bid_price_7".to_string(), "Int64".to_string()); + expected_map.insert("bid_price_8".to_string(), "Int64".to_string()); + expected_map.insert("bid_price_9".to_string(), "Int64".to_string()); + expected_map.insert("ask_price_0".to_string(), "Int64".to_string()); + expected_map.insert("ask_price_1".to_string(), "Int64".to_string()); + expected_map.insert("ask_price_2".to_string(), "Int64".to_string()); + expected_map.insert("ask_price_3".to_string(), "Int64".to_string()); + expected_map.insert("ask_price_4".to_string(), "Int64".to_string()); + expected_map.insert("ask_price_5".to_string(), "Int64".to_string()); + expected_map.insert("ask_price_6".to_string(), "Int64".to_string()); + expected_map.insert("ask_price_7".to_string(), "Int64".to_string()); + expected_map.insert("ask_price_8".to_string(), "Int64".to_string()); + expected_map.insert("ask_price_9".to_string(), "Int64".to_string()); + expected_map.insert("bid_size_0".to_string(), "UInt64".to_string()); + expected_map.insert("bid_size_1".to_string(), "UInt64".to_string()); + expected_map.insert("bid_size_2".to_string(), "UInt64".to_string()); + expected_map.insert("bid_size_3".to_string(), "UInt64".to_string()); + expected_map.insert("bid_size_4".to_string(), "UInt64".to_string()); + expected_map.insert("bid_size_5".to_string(), "UInt64".to_string()); + expected_map.insert("bid_size_6".to_string(), "UInt64".to_string()); + expected_map.insert("bid_size_7".to_string(), "UInt64".to_string()); + expected_map.insert("bid_size_8".to_string(), "UInt64".to_string()); + expected_map.insert("bid_size_9".to_string(), "UInt64".to_string()); + expected_map.insert("ask_size_0".to_string(), "UInt64".to_string()); + expected_map.insert("ask_size_1".to_string(), "UInt64".to_string()); + expected_map.insert("ask_size_2".to_string(), "UInt64".to_string()); + expected_map.insert("ask_size_3".to_string(), "UInt64".to_string()); + expected_map.insert("ask_size_4".to_string(), "UInt64".to_string()); + expected_map.insert("ask_size_5".to_string(), "UInt64".to_string()); + expected_map.insert("ask_size_6".to_string(), "UInt64".to_string()); + expected_map.insert("ask_size_7".to_string(), "UInt64".to_string()); + expected_map.insert("ask_size_8".to_string(), "UInt64".to_string()); + expected_map.insert("ask_size_9".to_string(), "UInt64".to_string()); + expected_map.insert("bid_count_0".to_string(), "UInt32".to_string()); + expected_map.insert("bid_count_1".to_string(), "UInt32".to_string()); + expected_map.insert("bid_count_2".to_string(), "UInt32".to_string()); + expected_map.insert("bid_count_3".to_string(), "UInt32".to_string()); + expected_map.insert("bid_count_4".to_string(), "UInt32".to_string()); + expected_map.insert("bid_count_5".to_string(), "UInt32".to_string()); + expected_map.insert("bid_count_6".to_string(), "UInt32".to_string()); + expected_map.insert("bid_count_7".to_string(), "UInt32".to_string()); + expected_map.insert("bid_count_8".to_string(), "UInt32".to_string()); + expected_map.insert("bid_count_9".to_string(), "UInt32".to_string()); + expected_map.insert("ask_count_0".to_string(), "UInt32".to_string()); + expected_map.insert("ask_count_1".to_string(), "UInt32".to_string()); + expected_map.insert("ask_count_2".to_string(), "UInt32".to_string()); + expected_map.insert("ask_count_3".to_string(), "UInt32".to_string()); + expected_map.insert("ask_count_4".to_string(), "UInt32".to_string()); + expected_map.insert("ask_count_5".to_string(), "UInt32".to_string()); + expected_map.insert("ask_count_6".to_string(), "UInt32".to_string()); + expected_map.insert("ask_count_7".to_string(), "UInt32".to_string()); + expected_map.insert("ask_count_8".to_string(), "UInt32".to_string()); + expected_map.insert("ask_count_9".to_string(), "UInt32".to_string()); + assert_eq!(schema_map, expected_map); + } +} diff --git a/nautilus_core/persistence/src/arrow/mod.rs b/nautilus_core/persistence/src/arrow/mod.rs index 8c4464e7d40d..39a677905fb8 100644 --- a/nautilus_core/persistence/src/arrow/mod.rs +++ b/nautilus_core/persistence/src/arrow/mod.rs @@ -15,6 +15,7 @@ pub mod bar; pub mod delta; +pub mod depth; pub mod quote; pub mod trade; diff --git a/nautilus_trader/core/nautilus_pyo3.pyi b/nautilus_trader/core/nautilus_pyo3.pyi index 75089ca5e069..933af5f15b94 100644 --- a/nautilus_trader/core/nautilus_pyo3.pyi +++ b/nautilus_trader/core/nautilus_pyo3.pyi @@ -266,6 +266,10 @@ class OrderBookDelta: @staticmethod def get_fields() -> dict[str, str]: ... +class OrderBookDepth10: + @staticmethod + def get_fields() -> dict[str, str]: ... + class QuoteTick: def __init__( self, diff --git a/nautilus_trader/serialization/arrow/schema.py b/nautilus_trader/serialization/arrow/schema.py index 7d2ef2d37070..15e67f59e492 100644 --- a/nautilus_trader/serialization/arrow/schema.py +++ b/nautilus_trader/serialization/arrow/schema.py @@ -24,6 +24,7 @@ from nautilus_trader.model.data import InstrumentClose from nautilus_trader.model.data import InstrumentStatus from nautilus_trader.model.data import OrderBookDelta +from nautilus_trader.model.data import OrderBookDepth10 from nautilus_trader.model.data import QuoteTick from nautilus_trader.model.data import Ticker from nautilus_trader.model.data import TradeTick @@ -51,6 +52,12 @@ for k, v in nautilus_pyo3.OrderBookDelta.get_fields().items() ], ), + OrderBookDepth10: pa.schema( + [ + pa.field(k, pa.type_for_alias(v), False) + for k, v in nautilus_pyo3.OrderBookDepth10.get_fields().items() + ], + ), QuoteTick: pa.schema( [ pa.field(k, pa.type_for_alias(v), False)