diff --git a/nautilus_trader/core/datetime.pyx b/nautilus_trader/core/datetime.pyx index 525418780e78..e22cf97325a3 100644 --- a/nautilus_trader/core/datetime.pyx +++ b/nautilus_trader/core/datetime.pyx @@ -38,6 +38,8 @@ from libc.stdint cimport uint64_t from nautilus_trader.core.correctness cimport Condition +from pandas.api.types import is_datetime64_ns_dtype + # UNIX epoch is the UTC time at 00:00:00 on 1/1/1970 # https://en.wikipedia.org/wiki/Unix_time @@ -246,12 +248,17 @@ cpdef object as_utc_index(data: pd.DataFrame): if data.empty: return data + # Ensure the index is localized to UTC if data.index.tzinfo is None: # tz-naive - return data.tz_localize(pytz.utc) + data = data.tz_localize(pytz.utc) elif data.index.tzinfo != pytz.utc: - return data.tz_convert(None).tz_localize(pytz.utc) - else: - return data # Already UTC + data = data.tz_convert(None).tz_localize(pytz.utc) + + # Check if the index is in nanosecond resolution, convert if not + if not is_datetime64_ns_dtype(data.index.dtype): + data.index = data.index.astype('datetime64[ns, UTC]') + + return data cpdef str format_iso8601(datetime dt): diff --git a/nautilus_trader/model/data.pxd b/nautilus_trader/model/data.pxd index 3627aff9f339..cf6d8ff2a330 100644 --- a/nautilus_trader/model/data.pxd +++ b/nautilus_trader/model/data.pxd @@ -366,6 +366,19 @@ cdef class QuoteTick(Data): uint64_t ts_init, ) + @staticmethod + cdef list[QuoteTick] from_raw_arrays_to_list_c( + InstrumentId instrument_id, + uint8_t price_prec, + uint8_t size_prec, + int64_t[:] bid_prices_raw, + int64_t[:] ask_prices_raw, + uint64_t[:] bid_sizes_raw, + uint64_t[:] ask_sizes_raw, + uint64_t[:] ts_events, + uint64_t[:] ts_inits, + ) + @staticmethod cdef QuoteTick from_mem_c(QuoteTick_t mem) @@ -406,6 +419,19 @@ cdef class TradeTick(Data): uint64_t ts_init, ) + @staticmethod + cdef list[TradeTick] from_raw_arrays_to_list_c( + InstrumentId instrument_id, + uint8_t price_prec, + uint8_t size_prec, + int64_t[:] prices_raw, + uint64_t[:] sizes_raw, + uint8_t[:] aggressor_sides, + list[str] trade_ids, + uint64_t[:] ts_events, + uint64_t[:] ts_inits, + ) + @staticmethod cdef TradeTick from_mem_c(TradeTick_t mem) diff --git a/nautilus_trader/model/data.pyx b/nautilus_trader/model/data.pyx index fa5d23614a77..98b070f3164e 100644 --- a/nautilus_trader/model/data.pyx +++ b/nautilus_trader/model/data.pyx @@ -133,6 +133,8 @@ from nautilus_trader.model.identifiers cimport Symbol from nautilus_trader.model.identifiers cimport Venue from nautilus_trader.model.objects cimport Price from nautilus_trader.model.objects cimport Quantity +import numpy as np +from nautilus_trader.model.identifiers cimport TradeId cdef inline BookOrder order_from_mem_c(BookOrder_t mem): @@ -3493,6 +3495,70 @@ cdef class QuoteTick(Data): ) return quote + @staticmethod + cdef list[QuoteTick] from_raw_arrays_to_list_c( + InstrumentId instrument_id, + uint8_t price_prec, + uint8_t size_prec, + int64_t[:] bid_prices_raw, + int64_t[:] ask_prices_raw, + uint64_t[:] bid_sizes_raw, + uint64_t[:] ask_sizes_raw, + uint64_t[:] ts_events, + uint64_t[:] ts_inits, + ): + Condition.type(instrument_id, InstrumentId, "instrument_id") + Condition.not_negative(price_prec, "price_prec") + Condition.not_negative(size_prec, "size_prec") + Condition.true(len(bid_prices_raw) == len(ask_prices_raw) == len(bid_sizes_raw) == len(ask_sizes_raw) + == len(ts_events) == len(ts_inits), "Array lengths must be equal") + + cdef int i + cdef int count = ts_events.shape[0] + cdef list ticks = [] + cdef QuoteTick quote + for i in range(count): + quote = QuoteTick.__new__(QuoteTick) + quote._mem = quote_tick_new( + instrument_id._mem, + bid_prices_raw[i], + ask_prices_raw[i], + price_prec, + price_prec, + bid_sizes_raw[i], + ask_sizes_raw[i], + size_prec, + size_prec, + ts_events[i], + ts_inits[i], + ) + ticks.append(quote) + return ticks + + @staticmethod + def from_raw_arrays_to_list( + instrument_id: InstrumentId, + price_prec: int, + size_prec: int, + bid_prices_raw: np.ndarray, + ask_prices_raw: np.ndarray, + bid_sizes_raw: np.ndarray, + ask_sizes_raw: np.ndarray, + ts_events: np.ndarray, + ts_inits: np.ndarray, + ) -> list[QuoteTick]: + return QuoteTick.from_raw_arrays_to_list_c( + instrument_id, + price_prec, + size_prec, + bid_prices_raw, + ask_prices_raw, + bid_sizes_raw, + ask_sizes_raw, + ts_events, + ts_inits, + ) + @staticmethod cdef list[QuoteTick] capsule_to_list_c(object capsule): # SAFETY: Do NOT deallocate the capsule here @@ -3984,6 +4050,72 @@ cdef class TradeTick(Data): ) return trade + @staticmethod + cdef list[TradeTick] from_raw_arrays_to_list_c( + InstrumentId instrument_id, + uint8_t price_prec, + uint8_t size_prec, + int64_t[:] prices_raw, + uint64_t[:] sizes_raw, + uint8_t[:] aggressor_sides, + list[str] trade_ids, + uint64_t[:] ts_events, + uint64_t[:] ts_inits, + ): + Condition.type(instrument_id, InstrumentId, "instrument_id") + Condition.not_negative(price_prec, "price_prec") + Condition.not_negative(size_prec, "size_prec") + Condition.true(len(prices_raw) == len(sizes_raw) == len(aggressor_sides) == len(trade_ids) == + len(ts_events) == len(ts_inits), "Array lengths must be equal") + + cdef int i + cdef int count = ts_events.shape[0] + cdef list trades = [] + cdef TradeTick trade + cdef AggressorSide aggressor_side + cdef TradeId trade_id + for i in range(count): + aggressor_side = aggressor_sides[i] + trade_id = TradeId(trade_ids[i]) + trade = TradeTick.__new__(TradeTick) + trade._mem = trade_tick_new( + instrument_id._mem, + prices_raw[i], + price_prec, + sizes_raw[i], + size_prec, + aggressor_side, + trade_id._mem, + ts_events[i], + ts_inits[i], + ) + trades.append(trade) + return trades + + @staticmethod + def from_raw_arrays_to_list( + InstrumentId instrument_id, + uint8_t price_prec, + uint8_t size_prec, + int64_t[:] prices_raw, + uint64_t[:] sizes_raw, + uint8_t[:] aggressor_sides, + list[str] trade_ids, + uint64_t[:] ts_events, + uint64_t[:] ts_inits, + ) -> list[TradeTick]: + return TradeTick.from_raw_arrays_to_list_c( + instrument_id, + price_prec, + size_prec, + prices_raw, + sizes_raw, + aggressor_sides, + trade_ids, + ts_events, + ts_inits, + ) + @staticmethod cdef list[TradeTick] capsule_to_list_c(capsule): # SAFETY: Do NOT deallocate the capsule here diff --git a/nautilus_trader/persistence/wranglers.pyx b/nautilus_trader/persistence/wranglers.pyx index 20ad73fbfe06..34447a4eed1b 100644 --- a/nautilus_trader/persistence/wranglers.pyx +++ b/nautilus_trader/persistence/wranglers.pyx @@ -27,11 +27,7 @@ from libc.stdint cimport uint8_t from libc.stdint cimport uint64_t from nautilus_trader.core.correctness cimport Condition -from nautilus_trader.core.data cimport Data from nautilus_trader.core.datetime cimport as_utc_index -from nautilus_trader.core.datetime cimport dt_to_unix_nanos -from nautilus_trader.core.rust.core cimport CVec -from nautilus_trader.core.rust.core cimport secs_to_nanos from nautilus_trader.core.rust.model cimport AggressorSide from nautilus_trader.core.rust.model cimport BookAction from nautilus_trader.core.rust.model cimport OrderSide @@ -46,6 +42,132 @@ from nautilus_trader.model.objects cimport Price from nautilus_trader.model.objects cimport Quantity +BAR_PRICES = ('open', 'high', 'low', 'close') +BAR_COLUMNS = (*BAR_PRICES, 'volume') + + +def preprocess_bar_data(data: pd.DataFrame, is_raw: bool): + """ + Preprocess financial bar data to a standardized format. + + Ensures the DataFrame index is labeled as "timestamp", converts the index to UTC, removes time zone awareness, + drops rows with NaN values in critical columns, and optionally scales the data. + + Parameters + ---------- + data : pd.DataFrame + The input DataFrame containing financial bar data. + is_raw : bool + A flag to determine whether the data should be scaled. If False, scales the data by 1e9. + + Returns + ------- + pd.DataFrame: The preprocessed DataFrame with a cleaned and standardized structure. + + """ + # Ensure index is timestamp + if data.index.name != "timestamp": + data.index.name = "timestamp" + + # Standardize index to UTC and remove time zone awareness + data = as_utc_index(data) + data.index = data.index.tz_localize(None).astype("datetime64[ns]") + + # Drop rows with NaN values in critical columns + data = data.dropna(subset=BAR_COLUMNS) + + # Scale data if not raw + if not is_raw: + data[list(BAR_COLUMNS)] = data[list(BAR_COLUMNS)].multiply(1e9) + + return data + + +def calculate_bar_price_offsets(num_records, timestamp_is_close: bool, offset_interval_ms: int, random_seed=None): + """ + Calculate and potentially randomize the time offsets for bar prices based on the closeness of the timestamp. + + Parameters + ---------- + num_records : int + The number of records for which offsets are to be generated. + timestamp_is_close : bool + A flag indicating whether the timestamp is close to the trading time. + offset_interval_ms : int + The offset interval in milliseconds to be applied. + random_seed : Optional[int] + The seed for random number generation to ensure reproducibility. + + Returns + ------- + dict: A dictionary with arrays of offsets for open, high, low, and close prices. If random_seed is provided, + high and low offsets are randomized. + """ + # Initialize offsets + offsets = { + "open": np.full(num_records, np.timedelta64((-3 if timestamp_is_close else 0) * offset_interval_ms, "ms")), + "high": np.full(num_records, np.timedelta64((-2 if timestamp_is_close else 1) * offset_interval_ms, "ms")), + "low": np.full(num_records, np.timedelta64((-1 if timestamp_is_close else 2) * offset_interval_ms, "ms")), + "close": np.full(num_records, np.timedelta64((0 if timestamp_is_close else 3) * offset_interval_ms, "ms")), + } + + # Randomize high and low if seed is given + if random_seed is not None: + local_random = random.Random(random_seed) + for i in range(num_records): + if local_random.getrandbits(1): # With a 50% chance, swap high and low + offsets['high'][i], offsets['low'][i] = offsets['low'][i], offsets['high'][i] + + return offsets + + +def calculate_volume_quarter(volume: np.ndarray, precision: int): + """ + Convert raw volume data to quarter precision. + + Args: + volume : np.ndarray + An array of volume data to be processed. + precision : int + The decimal precision to which the volume data is rounded, adjusted by subtracting 9. + + Returns: + np.ndarray: The volume data adjusted to quarter precision. + """ + # Convert raw volume to quarter precision + return np.round(volume / 4, precision - 9).astype(np.uint64) + + +def align_bid_ask_bar_data(bid_data: pd.DataFrame, ask_data: pd.DataFrame): + """ + Merge bid and ask data into a single DataFrame with prefixed column names. + + Args: + bid_data : pd.DataFrame + The DataFrame containing bid data. + ask_data : pd.DataFrame + The DataFrame containing ask data. + + Returns: + pd.DataFrame: A merged DataFrame with columns prefixed by 'bid_' for bid data and 'ask_' for ask data, joined on their indexes. + """ + bid_prefixed = bid_data.add_prefix('bid_') + ask_prefixed = ask_data.add_prefix('ask_') + merged_data = pd.merge(bid_prefixed, ask_prefixed, left_index=True, right_index=True, how='inner') + return merged_data + + +def prepare_event_and_init_timestamps( + index: pd.DatetimeIndex, + ts_init_delta: int, +): + Condition.type(index, pd.DatetimeIndex, "index") + Condition.not_negative(ts_init_delta, "ts_init_delta") + ts_events = index.view(np.uint64) + ts_inits = ts_events + ts_init_delta + return ts_events, ts_inits + + cdef class OrderBookDeltaDataWrangler: """ Provides a means of building lists of Nautilus `OrderBookDelta` objects. @@ -85,8 +207,7 @@ cdef class OrderBookDeltaDataWrangler: Condition.false(data.empty, "data.empty") data = as_utc_index(data) - cdef uint64_t[:] ts_events = np.ascontiguousarray([dt_to_unix_nanos(dt) for dt in data.index], dtype=np.uint64) # noqa - cdef uint64_t[:] ts_inits = np.ascontiguousarray([ts_event + ts_init_delta for ts_event in ts_events], dtype=np.uint64) # noqa + ts_events, ts_inits = prepare_event_and_init_timestamps(data.index, ts_init_delta) if is_raw: return list(map( @@ -172,54 +293,6 @@ cdef class OrderBookDeltaDataWrangler: ) -def prepare_tick_data_from_bars( - *, - data_open: dict, - data_high: dict, - data_low: dict, - data_close: dict, - offset_interval_ms: int, - random_seed: int | None, - ts_init_delta: int, - timestamp_is_close: bool, -): - df_ticks_o = pd.DataFrame(data=data_open) - df_ticks_h = pd.DataFrame(data=data_high) - df_ticks_l = pd.DataFrame(data=data_low) - df_ticks_c = pd.DataFrame(data=data_close) - - # Latency offsets - if timestamp_is_close: - df_ticks_o.index = df_ticks_o.index.shift(periods=-3 * offset_interval_ms, freq="ms") - df_ticks_h.index = df_ticks_h.index.shift(periods=-2 * offset_interval_ms, freq="ms") - df_ticks_l.index = df_ticks_l.index.shift(periods=-1 * offset_interval_ms, freq="ms") - else: # timestamp is open - df_ticks_h.index = df_ticks_h.index.shift(periods=1 * offset_interval_ms, freq="ms") - df_ticks_l.index = df_ticks_l.index.shift(periods=2 * offset_interval_ms, freq="ms") - df_ticks_c.index = df_ticks_c.index.shift(periods=3 * offset_interval_ms, freq="ms") - - # Merge tick data - df_ticks_final = pd.concat([df_ticks_o, df_ticks_h, df_ticks_l, df_ticks_c]) - df_ticks_final = df_ticks_final.dropna() - df_ticks_final = df_ticks_final.sort_index(axis=0, kind="mergesort") - - cdef int i - # Randomly shift high low prices - if random_seed is not None: - random.seed(random_seed) - for i in range(0, len(df_ticks_final), 4): - if random.getrandbits(1): - high = copy(df_ticks_final.iloc[i + 1]) - low = copy(df_ticks_final.iloc[i + 2]) - df_ticks_final.iloc[i + 1] = low - df_ticks_final.iloc[i + 2] = high - - cdef uint64_t[:] ts_events = np.ascontiguousarray([secs_to_nanos(dt.timestamp()) for dt in df_ticks_final.index], dtype=np.uint64) # noqa - cdef uint64_t[:] ts_inits = np.ascontiguousarray([ts_event + ts_init_delta for ts_event in ts_events], dtype=np.uint64) # noqa - - return df_ticks_final, ts_events, ts_inits - - cdef class QuoteTickDataWrangler: """ Provides a means of building lists of Nautilus `QuoteTick` objects. @@ -265,7 +338,7 @@ cdef class QuoteTickDataWrangler: Condition.false(data.empty, "data.empty") Condition.not_none(default_volume, "default_volume") - as_utc_index(data) + data = as_utc_index(data) columns = { "bid": "bid_price", @@ -278,8 +351,7 @@ cdef class QuoteTickDataWrangler: if "ask_size" not in data.columns: data["ask_size"] = float(default_volume) - cdef uint64_t[:] ts_events = np.ascontiguousarray([dt_to_unix_nanos(dt) for dt in data.index], dtype=np.uint64) # noqa - cdef uint64_t[:] ts_inits = np.ascontiguousarray([ts_event + ts_init_delta for ts_event in ts_events], dtype=np.uint64) # noqa + ts_events, ts_inits = prepare_event_and_init_timestamps(data.index, ts_init_delta) return list(map( self._build_tick, @@ -301,6 +373,7 @@ cdef class QuoteTickDataWrangler: bint timestamp_is_close: bool = True, random_seed: int | None = None, bint is_raw: bool = False, + bint sort_data: bool = True, ): """ Process the given bar datasets into Nautilus `QuoteTick` objects. @@ -333,85 +406,93 @@ cdef class QuoteTickDataWrangler: If bar timestamps are at the close. If True then open, high, low timestamps are offset before the close timestamp. If False then high, low, close timestamps are offset after the open timestamp. + sort_data : bool, default True + If the data should be sorted by timestamp. """ - Condition.not_none(bid_data, "bid_data") - Condition.not_none(ask_data, "ask_data") + Condition.type(bid_data, pd.DataFrame, "bid_data") + Condition.type(ask_data, pd.DataFrame, "ask_data") Condition.false(bid_data.empty, "bid_data.empty") Condition.false(ask_data.empty, "ask_data.empty") + Condition.type(bid_data.index, pd.DatetimeIndex, "bid_data.index") + Condition.type(ask_data.index, pd.DatetimeIndex, "ask_data.index") Condition.not_none(default_volume, "default_volume") + for col in BAR_PRICES: + Condition.is_in(col, bid_data.columns, col, "bid_data.columns") + Condition.is_in(col, ask_data.columns, col, "ask_data.columns") if random_seed is not None: Condition.type(random_seed, int, "random_seed") - # Ensure index is tz-aware UTC - bid_data = as_utc_index(bid_data) - ask_data = as_utc_index(ask_data) - + # Add default volume if not present if "volume" not in bid_data: - bid_data["volume"] = float(default_volume * 4) - + bid_data.loc[:, "volume"] = float(default_volume * 4) * (1e9 if is_raw else 1) if "volume" not in ask_data: - ask_data["volume"] = float(default_volume * 4) + ask_data.loc[:, "volume"] = float(default_volume * 4) * (1e9 if is_raw else 1) - cdef dict data_open = { - "bid_price": bid_data["open"], - "ask_price": ask_data["open"], - "bid_size": bid_data["volume"] / 4, - "ask_size": ask_data["volume"] / 4, - } + # Standardize and preprocess data + bid_data = preprocess_bar_data(bid_data, is_raw) + ask_data = preprocess_bar_data(ask_data, is_raw) - cdef dict data_high = { - "bid_price": bid_data["high"], - "ask_price": ask_data["high"], - "bid_size": bid_data["volume"] / 4, - "ask_size": ask_data["volume"] / 4, - } + merged_data = align_bid_ask_bar_data(bid_data, ask_data) + offsets = calculate_bar_price_offsets(len(merged_data), timestamp_is_close, offset_interval_ms, random_seed) + ticks_final = self._create_quote_ticks_array(merged_data, is_raw, self.instrument, offsets, ts_init_delta) - cdef dict data_low = { - "bid_price": bid_data["low"], - "ask_price": ask_data["low"], - "bid_size": bid_data["volume"] / 4, - "ask_size": ask_data["volume"] / 4, - } + # Sort data by timestamp, if required + if sort_data: + sorted_indices = np.argsort(ticks_final['timestamp']) + ticks_final = ticks_final[sorted_indices] - cdef dict data_close = { - "bid_price": bid_data["close"], - "ask_price": ask_data["close"], - "bid_size": bid_data["volume"] / 4, - "ask_size": ask_data["volume"] / 4, - } + ts_events = ticks_final["timestamp"].view(np.uint64) + ts_inits = ts_events + ts_init_delta - df_ticks_final, ts_events, ts_inits = prepare_tick_data_from_bars( - data_open=data_open, - data_high=data_high, - data_low=data_low, - data_close=data_close, - offset_interval_ms=offset_interval_ms, - random_seed=random_seed, - ts_init_delta=ts_init_delta, - timestamp_is_close=timestamp_is_close, + return QuoteTick.from_raw_arrays_to_list_c( + self.instrument.id, + self.instrument.price_precision, + self.instrument.size_precision, + ticks_final["bid_price_raw"], + ticks_final["ask_price_raw"], + ticks_final["bid_size_raw"], + ticks_final["ask_size_raw"], + ts_events, + ts_inits, ) - if is_raw: - return list(map( - self._build_tick_from_raw, - df_ticks_final["bid_price"], - df_ticks_final["ask_price"], - df_ticks_final["bid_size"], - df_ticks_final["ask_size"], - ts_events, - ts_inits, - )) - else: - return list(map( - self._build_tick, - df_ticks_final["bid_price"], - df_ticks_final["ask_price"], - df_ticks_final["bid_size"], - df_ticks_final["ask_size"], - ts_events, - ts_inits, - )) + def _create_quote_ticks_array( + self, + merged_data, + is_raw, + instrument: Instrument, + offsets, + ts_init_delta, + ): + dtype = [ + ('bid_price_raw', np.int64), ('ask_price_raw', np.int64), + ('bid_size_raw', np.uint64), ('ask_size_raw', np.uint64), + ('timestamp', 'datetime64[ns]') + ] + + size_precision = instrument.size_precision + merged_data.loc[:, 'bid_volume'] = calculate_volume_quarter(merged_data['bid_volume'], size_precision) + merged_data.loc[:, 'ask_volume'] = calculate_volume_quarter(merged_data['ask_volume'], size_precision) + + # Convert to record array + records = merged_data.to_records() + + # Create structured array + total_records = len(records) * 4 # For open, high, low, close + tick_data = np.empty(total_records, dtype=dtype) + + for i, price_key in enumerate(BAR_PRICES): + start_index = i * len(records) + end_index = start_index + len(records) + + tick_data['bid_price_raw'][start_index:end_index] = records[f'bid_{price_key}'].astype(np.int64) + tick_data['ask_price_raw'][start_index:end_index] = records[f'ask_{price_key}'].astype(np.int64) + tick_data['bid_size_raw'][start_index:end_index] = records['bid_volume'].astype(np.uint64) + tick_data['ask_size_raw'][start_index:end_index] = records['ask_volume'].astype(np.uint64) + tick_data['timestamp'][start_index:end_index] = records['timestamp'] + offsets[price_key] + + return tick_data # cpdef method for Python wrap() (called with map) cpdef QuoteTick _build_tick_from_raw( @@ -502,8 +583,7 @@ cdef class TradeTickDataWrangler: Condition.false(data.empty, "data.empty") data = as_utc_index(data) - cdef uint64_t[:] ts_events = np.ascontiguousarray([dt_to_unix_nanos(dt) for dt in data.index], dtype=np.uint64) # noqa - cdef uint64_t[:] ts_inits = np.ascontiguousarray([ts_event + ts_init_delta for ts_event in ts_events], dtype=np.uint64) # noqa + ts_events, ts_inits = prepare_event_and_init_timestamps(data.index, ts_init_delta) if is_raw: return list(map( @@ -534,6 +614,7 @@ cdef class TradeTickDataWrangler: bint timestamp_is_close: bool = True, random_seed: int | None = None, bint is_raw: bool = False, + bint sort_data: bool = True, ): """ Process the given bar datasets into Nautilus `QuoteTick` objects. @@ -562,75 +643,64 @@ cdef class TradeTickDataWrangler: If bar timestamps are at the close. If True then open, high, low timestamps are offset before the close timestamp. If False then high, low, close timestamps are offset after the open timestamp. + sort_data : bool, default True + If the data should be sorted by timestamp. """ - Condition.not_none(data, "data") + Condition.type(data, pd.DataFrame, "data") Condition.false(data.empty, "data.empty") + Condition.type(data.index, pd.DatetimeIndex, "data.index") + for col in BAR_COLUMNS: + Condition.is_in(col, data.columns, col, "data.columns") if random_seed is not None: Condition.type(random_seed, int, "random_seed") - # Ensure index is tz-aware UTC - data = as_utc_index(data) + # Standardize and preprocess data + data = preprocess_bar_data(data, is_raw) + data.loc[:, 'volume'] = calculate_volume_quarter(data['volume'], self.instrument.size_precision) + data.loc[:, 'trade_id'] = data.index.view(np.uint64).astype(str) - cdef dict data_open = { - "price": data["open"], - "size": data["volume"] / 4, - } + records = data.to_records() + offsets = calculate_bar_price_offsets(len(records), timestamp_is_close, offset_interval_ms, random_seed) + ticks_final = self._create_trade_ticks_array(records, offsets) - cdef dict data_high = { - "price": data["high"], - "size": data["volume"] / 4, - } + # Sort data by timestamp, if required + if sort_data: + sorted_indices = np.argsort(ticks_final['timestamp']) + ticks_final = ticks_final[sorted_indices] - cdef dict data_low = { - "price": data["low"], - "size": data["volume"] / 4, - } + ts_events = ticks_final["timestamp"].view(np.uint64) + ts_inits = ts_events + ts_init_delta - cdef dict data_close = { - "price": data["close"], - "size": data["volume"] / 4, - } + cdef uint8_t[:] aggressor_sides = np.full(len(ts_events), AggressorSide.NO_AGGRESSOR, dtype=np.uint8) - df_ticks_final, ts_events, ts_inits = prepare_tick_data_from_bars( - data_open=data_open, - data_high=data_high, - data_low=data_low, - data_close=data_close, - offset_interval_ms=offset_interval_ms, - random_seed=random_seed, - ts_init_delta=ts_init_delta, - timestamp_is_close=timestamp_is_close, + return TradeTick.from_raw_arrays_to_list_c( + self.instrument.id, + self.instrument.price_precision, + self.instrument.size_precision, + ticks_final["price"], + ticks_final["size"], + aggressor_sides, + ts_events.astype(str).tolist(), + ts_events, + ts_inits, ) - df_ticks_final["trade_id"] = df_ticks_final.index.view(np.uint64).astype(str) - # Adjust size precision - size_precision = self.instrument.size_precision - if is_raw: - df_ticks_final["size"] = df_ticks_final["size"].apply(lambda x: round(x, size_precision - 9)) - else: - df_ticks_final["size"] = df_ticks_final["size"].round(size_precision) + def _create_trade_ticks_array( + self, + records, + offsets, + ): + dtype = [("price", np.int64), ("size", np.uint64), ("timestamp", "datetime64[ns]")] + tick_data = np.empty(len(records) * 4, dtype=dtype) + for i, price_key in enumerate(BAR_PRICES): + start_index = i * len(records) + end_index = start_index + len(records) + tick_data["price"][start_index:end_index] = records[price_key].astype(np.int64) + tick_data["size"][start_index:end_index] = records["volume"].astype(np.uint64) + tick_data["timestamp"][start_index:end_index] = records["timestamp"] + offsets[price_key] - if is_raw: - return list(map( - self._build_tick_from_raw, - df_ticks_final["price"], - df_ticks_final["size"], - self._create_side_if_not_exist(df_ticks_final), - df_ticks_final["trade_id"], - ts_events, - ts_inits, - )) - else: - return list(map( - self._build_tick, - df_ticks_final["price"], - df_ticks_final["size"], - self._create_side_if_not_exist(df_ticks_final), - df_ticks_final["trade_id"], - ts_events, - ts_inits, - )) + return tick_data def _create_side_if_not_exist(self, data): if "side" in data.columns: @@ -752,8 +822,7 @@ cdef class BarDataWrangler: if "volume" not in data: data["volume"] = float(default_volume) - cdef uint64_t[:] ts_events = np.ascontiguousarray([secs_to_nanos(dt.timestamp()) for dt in data.index], dtype=np.uint64) # noqa - cdef uint64_t[:] ts_inits = np.ascontiguousarray([ts_event + ts_init_delta for ts_event in ts_events], dtype=np.uint64) # noqa + ts_events, ts_inits = prepare_event_and_init_timestamps(data.index, ts_init_delta) return list(map( self._build_bar, diff --git a/tests/unit_tests/persistence/test_wranglers.py b/tests/unit_tests/persistence/test_wranglers.py index d4a382ace201..8fbddf0a9d6e 100644 --- a/tests/unit_tests/persistence/test_wranglers.py +++ b/tests/unit_tests/persistence/test_wranglers.py @@ -59,9 +59,9 @@ def test_load_binance_deltas() -> None: False, 50, 1359676800000000000, - 1359676800049999872, + 1359676800050000000, 1359676800100000000, - 1359676800150000128, + 1359676800150000000, ], ], )