Skip to content

Commit

Permalink
Enhance wranglers performance (#1590)
Browse files Browse the repository at this point in the history
- Fix pandas FutureWarning and enhance wranglers performance
  • Loading branch information
rsmb7z authored Apr 13, 2024
1 parent 5f60ce1 commit ce38cc3
Show file tree
Hide file tree
Showing 5 changed files with 418 additions and 184 deletions.
15 changes: 11 additions & 4 deletions nautilus_trader/core/datetime.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ from libc.stdint cimport uint64_t

from nautilus_trader.core.correctness cimport Condition

from pandas.api.types import is_datetime64_ns_dtype


# UNIX epoch is the UTC time at 00:00:00 on 1/1/1970
# https://en.wikipedia.org/wiki/Unix_time
Expand Down Expand Up @@ -246,12 +248,17 @@ cpdef object as_utc_index(data: pd.DataFrame):
if data.empty:
return data

# Ensure the index is localized to UTC
if data.index.tzinfo is None: # tz-naive
return data.tz_localize(pytz.utc)
data = data.tz_localize(pytz.utc)
elif data.index.tzinfo != pytz.utc:
return data.tz_convert(None).tz_localize(pytz.utc)
else:
return data # Already UTC
data = data.tz_convert(None).tz_localize(pytz.utc)

# Check if the index is in nanosecond resolution, convert if not
if not is_datetime64_ns_dtype(data.index.dtype):
data.index = data.index.astype('datetime64[ns, UTC]')

return data


cpdef str format_iso8601(datetime dt):
Expand Down
26 changes: 26 additions & 0 deletions nautilus_trader/model/data.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,19 @@ cdef class QuoteTick(Data):
uint64_t ts_init,
)

@staticmethod
cdef list[QuoteTick] from_raw_arrays_to_list_c(
InstrumentId instrument_id,
uint8_t price_prec,
uint8_t size_prec,
int64_t[:] bid_prices_raw,
int64_t[:] ask_prices_raw,
uint64_t[:] bid_sizes_raw,
uint64_t[:] ask_sizes_raw,
uint64_t[:] ts_events,
uint64_t[:] ts_inits,
)

@staticmethod
cdef QuoteTick from_mem_c(QuoteTick_t mem)

Expand Down Expand Up @@ -406,6 +419,19 @@ cdef class TradeTick(Data):
uint64_t ts_init,
)

@staticmethod
cdef list[TradeTick] from_raw_arrays_to_list_c(
InstrumentId instrument_id,
uint8_t price_prec,
uint8_t size_prec,
int64_t[:] prices_raw,
uint64_t[:] sizes_raw,
uint8_t[:] aggressor_sides,
list[str] trade_ids,
uint64_t[:] ts_events,
uint64_t[:] ts_inits,
)

@staticmethod
cdef TradeTick from_mem_c(TradeTick_t mem)

Expand Down
132 changes: 132 additions & 0 deletions nautilus_trader/model/data.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ from nautilus_trader.model.identifiers cimport Symbol
from nautilus_trader.model.identifiers cimport Venue
from nautilus_trader.model.objects cimport Price
from nautilus_trader.model.objects cimport Quantity
import numpy as np
from nautilus_trader.model.identifiers cimport TradeId


cdef inline BookOrder order_from_mem_c(BookOrder_t mem):
Expand Down Expand Up @@ -3493,6 +3495,70 @@ cdef class QuoteTick(Data):
)
return quote

@staticmethod
cdef list[QuoteTick] from_raw_arrays_to_list_c(
InstrumentId instrument_id,
uint8_t price_prec,
uint8_t size_prec,
int64_t[:] bid_prices_raw,
int64_t[:] ask_prices_raw,
uint64_t[:] bid_sizes_raw,
uint64_t[:] ask_sizes_raw,
uint64_t[:] ts_events,
uint64_t[:] ts_inits,
):
Condition.type(instrument_id, InstrumentId, "instrument_id")
Condition.not_negative(price_prec, "price_prec")
Condition.not_negative(size_prec, "size_prec")
Condition.true(len(bid_prices_raw) == len(ask_prices_raw) == len(bid_sizes_raw) == len(ask_sizes_raw)
== len(ts_events) == len(ts_inits), "Array lengths must be equal")

cdef int i
cdef int count = ts_events.shape[0]
cdef list ticks = []
cdef QuoteTick quote
for i in range(count):
quote = QuoteTick.__new__(QuoteTick)
quote._mem = quote_tick_new(
instrument_id._mem,
bid_prices_raw[i],
ask_prices_raw[i],
price_prec,
price_prec,
bid_sizes_raw[i],
ask_sizes_raw[i],
size_prec,
size_prec,
ts_events[i],
ts_inits[i],
)
ticks.append(quote)
return ticks

@staticmethod
def from_raw_arrays_to_list(
instrument_id: InstrumentId,
price_prec: int,
size_prec: int,
bid_prices_raw: np.ndarray,
ask_prices_raw: np.ndarray,
bid_sizes_raw: np.ndarray,
ask_sizes_raw: np.ndarray,
ts_events: np.ndarray,
ts_inits: np.ndarray,
) -> list[QuoteTick]:
return QuoteTick.from_raw_arrays_to_list_c(
instrument_id,
price_prec,
size_prec,
bid_prices_raw,
ask_prices_raw,
bid_sizes_raw,
ask_sizes_raw,
ts_events,
ts_inits,
)

@staticmethod
cdef list[QuoteTick] capsule_to_list_c(object capsule):
# SAFETY: Do NOT deallocate the capsule here
Expand Down Expand Up @@ -3984,6 +4050,72 @@ cdef class TradeTick(Data):
)
return trade

@staticmethod
cdef list[TradeTick] from_raw_arrays_to_list_c(
InstrumentId instrument_id,
uint8_t price_prec,
uint8_t size_prec,
int64_t[:] prices_raw,
uint64_t[:] sizes_raw,
uint8_t[:] aggressor_sides,
list[str] trade_ids,
uint64_t[:] ts_events,
uint64_t[:] ts_inits,
):
Condition.type(instrument_id, InstrumentId, "instrument_id")
Condition.not_negative(price_prec, "price_prec")
Condition.not_negative(size_prec, "size_prec")
Condition.true(len(prices_raw) == len(sizes_raw) == len(aggressor_sides) == len(trade_ids) ==
len(ts_events) == len(ts_inits), "Array lengths must be equal")

cdef int i
cdef int count = ts_events.shape[0]
cdef list trades = []
cdef TradeTick trade
cdef AggressorSide aggressor_side
cdef TradeId trade_id
for i in range(count):
aggressor_side = <AggressorSide>aggressor_sides[i]
trade_id = TradeId(trade_ids[i])
trade = TradeTick.__new__(TradeTick)
trade._mem = trade_tick_new(
instrument_id._mem,
prices_raw[i],
price_prec,
sizes_raw[i],
size_prec,
aggressor_side,
trade_id._mem,
ts_events[i],
ts_inits[i],
)
trades.append(trade)
return trades

@staticmethod
def from_raw_arrays_to_list(
InstrumentId instrument_id,
uint8_t price_prec,
uint8_t size_prec,
int64_t[:] prices_raw,
uint64_t[:] sizes_raw,
uint8_t[:] aggressor_sides,
list[str] trade_ids,
uint64_t[:] ts_events,
uint64_t[:] ts_inits,
) -> list[TradeTick]:
return TradeTick.from_raw_arrays_to_list_c(
instrument_id,
price_prec,
size_prec,
prices_raw,
sizes_raw,
aggressor_sides,
trade_ids,
ts_events,
ts_inits,
)

@staticmethod
cdef list[TradeTick] capsule_to_list_c(capsule):
# SAFETY: Do NOT deallocate the capsule here
Expand Down
Loading

0 comments on commit ce38cc3

Please sign in to comment.