From a56c912036f519cf44c4449b5d879e5ce8a895c2 Mon Sep 17 00:00:00 2001 From: Chris Sellers Date: Sun, 28 Apr 2024 09:31:03 +1000 Subject: [PATCH] Add Cache methods in Rust --- nautilus_core/common/src/cache/database.rs | 2 +- nautilus_core/common/src/cache/mod.rs | 679 +++++++++++++++++++-- 2 files changed, 620 insertions(+), 61 deletions(-) diff --git a/nautilus_core/common/src/cache/database.rs b/nautilus_core/common/src/cache/database.rs index 6eb5c019e66d..50025b35f19c 100644 --- a/nautilus_core/common/src/cache/database.rs +++ b/nautilus_core/common/src/cache/database.rs @@ -261,7 +261,7 @@ impl CacheDatabaseAdapter { todo!() // TODO } - pub fn update_account(&self) -> anyhow::Result<()> { + pub fn update_account(&self, account: &dyn Account) -> anyhow::Result<()> { todo!() // TODO } diff --git a/nautilus_core/common/src/cache/mod.rs b/nautilus_core/common/src/cache/mod.rs index f95a99852d5e..2acea0f8db98 100644 --- a/nautilus_core/common/src/cache/mod.rs +++ b/nautilus_core/common/src/cache/mod.rs @@ -21,9 +21,12 @@ pub mod database; -use std::collections::{HashMap, HashSet, VecDeque}; +use std::{ + collections::{HashMap, HashSet, VecDeque}, + time::{SystemTime, UNIX_EPOCH}, +}; -use log::{debug, info}; +use log::{debug, error, info, warn}; use nautilus_core::correctness::{check_key_not_in_map, check_slice_not_empty, check_valid_string}; use nautilus_model::{ data::{ @@ -31,7 +34,7 @@ use nautilus_model::{ quote::QuoteTick, trade::TradeTick, }, - enums::{AggregationSource, OrderSide, PositionSide, PriceType, TriggerType}, + enums::{AggregationSource, OmsType, OrderSide, PositionSide, PriceType, TriggerType}, identifiers::{ account_id::AccountId, client_id::ClientId, client_order_id::ClientOrderId, component_id::ComponentId, exec_algorithm_id::ExecAlgorithmId, instrument_id::InstrumentId, @@ -252,6 +255,7 @@ impl Cache { // -- COMMANDS -------------------------------------------------------------------------------- + /// Clear the current general cache and load the general objects from the cache database. pub fn cache_general(&mut self) -> anyhow::Result<()> { self.general = match &self.database { Some(db) => db.load()?, @@ -265,6 +269,7 @@ impl Cache { Ok(()) } + /// Clear the current currencies cache and load currencies from the cache database. pub fn cache_currencies(&mut self) -> anyhow::Result<()> { self.currencies = match &self.database { Some(db) => db.load_currencies()?, @@ -275,6 +280,7 @@ impl Cache { Ok(()) } + /// Clear the current instruments cache and load instruments from the cache database. pub fn cache_instruments(&mut self) -> anyhow::Result<()> { self.instruments = match &self.database { Some(db) => db.load_instruments()?, @@ -285,6 +291,8 @@ impl Cache { Ok(()) } + /// Clear the current synthetic instruments cache and load synthetic instruments from the cache + /// database. pub fn cache_synthetics(&mut self) -> anyhow::Result<()> { self.synthetics = match &self.database { Some(db) => db.load_synthetics()?, @@ -298,6 +306,7 @@ impl Cache { Ok(()) } + /// Clear the current accounts cache and load accounts from the cache database. pub fn cache_accounts(&mut self) -> anyhow::Result<()> { self.accounts = match &self.database { Some(db) => db.load_accounts()?, @@ -311,6 +320,7 @@ impl Cache { Ok(()) } + /// Clear the current orders cache and load orders from the cache database. pub fn cache_orders(&mut self) -> anyhow::Result<()> { self.orders = match &self.database { Some(db) => db.load_orders()?, @@ -321,6 +331,7 @@ impl Cache { Ok(()) } + /// Clear the current positions cache and load positions from the cache database. pub fn cache_positions(&mut self) -> anyhow::Result<()> { self.positions = match &self.database { Some(db) => db.load_positions()?, @@ -331,6 +342,7 @@ impl Cache { Ok(()) } + /// Clear the current cache index and re-build. pub fn build_index(&mut self) { self.index.clear(); debug!("Building index"); @@ -343,7 +355,7 @@ impl Cache { } // Index orders - for (client_order_id, order) in self.orders.iter() { + for (client_order_id, order) in &self.orders { let instrument_id = order.instrument_id(); let venue = instrument_id.venue; let strategy_id = order.strategy_id(); @@ -441,7 +453,7 @@ impl Cache { } // Index positions - for (position_id, position) in self.positions.iter() { + for (position_id, position) in &self.positions { let instrument_id = position.instrument_id; let venue = instrument_id.venue; let strategy_id = position.strategy_id; @@ -497,15 +509,394 @@ impl Cache { } } + /// Check integrity of data within the cache. + /// + /// All data should be loaded from the database prior to this call. + /// If an error is found then a log error message will also be produced. #[must_use] - pub fn check_integrity(&self) -> bool { - true // TODO + fn check_integrity(&mut self) -> bool { + let mut error_count = 0; + let failure = "Integrity failure"; + + // Get current timestamp in microseconds + let timestamp_us = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_micros(); + + info!("Checking data integrity"); + + // Check object caches + for account_id in self.accounts.keys() { + if !self + .index + .venue_account + .contains_key(&account_id.get_issuer()) + { + error!( + "{} in accounts: {} not found in `self.index.venue_account`", + failure, account_id + ); + error_count += 1; + } + } + + for (client_order_id, order) in &self.orders { + if !self.index.order_strategy.contains_key(client_order_id) { + error!( + "{} in orders: {} not found in `self.index.order_strategy`", + failure, client_order_id + ); + error_count += 1; + } + if !self.index.orders.contains(client_order_id) { + error!( + "{} in orders: {} not found in `self.index.orders`", + failure, client_order_id + ); + error_count += 1; + } + if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) { + error!( + "{} in orders: {} not found in `self.index.orders_inflight`", + failure, client_order_id + ); + error_count += 1; + } + if order.is_open() && !self.index.orders_open.contains(client_order_id) { + error!( + "{} in orders: {} not found in `self.index.orders_open`", + failure, client_order_id + ); + error_count += 1; + } + if order.is_closed() && !self.index.orders_closed.contains(client_order_id) { + error!( + "{} in orders: {} not found in `self.index.orders_closed`", + failure, client_order_id + ); + error_count += 1; + } + if let Some(exec_algorithm_id) = order.exec_algorithm_id() { + if !self + .index + .exec_algorithm_orders + .contains_key(&exec_algorithm_id) + { + error!( + "{} in orders: {} not found in `self.index.exec_algorithm_orders`", + failure, exec_algorithm_id + ); + error_count += 1; + } + if order.exec_spawn_id().is_none() + && !self.index.exec_spawn_orders.contains_key(client_order_id) + { + error!( + "{} in orders: {} not found in `self.index.exec_spawn_orders`", + failure, exec_algorithm_id + ); + error_count += 1; + } + } + } + + for (position_id, position) in &self.positions { + if !self.index.position_strategy.contains_key(position_id) { + error!( + "{} in positions: {} not found in `self.index.position_strategy`", + failure, position_id + ); + error_count += 1; + } + if !self.index.position_orders.contains_key(position_id) { + error!( + "{} in positions: {} not found in `self.index.position_orders`", + failure, position_id + ); + error_count += 1; + } + if !self.index.positions.contains(position_id) { + error!( + "{} in positions: {} not found in `self.index.positions`", + failure, position_id + ); + error_count += 1; + } + if position.is_open() && !self.index.positions_open.contains(position_id) { + error!( + "{} in positions: {} not found in `self.index.positions_open`", + failure, position_id + ); + error_count += 1; + } + if position.is_closed() && !self.index.positions_closed.contains(position_id) { + error!( + "{} in positions: {} not found in `self.index.positions_closed`", + failure, position_id + ); + error_count += 1; + } + } + + // Check indexes + for account_id in self.index.venue_account.values() { + if !self.accounts.contains_key(account_id) { + error!( + "{} in `index.venue_account`: {} not found in `self.accounts`", + failure, account_id + ); + error_count += 1; + } + } + + for client_order_id in self.index.order_ids.values() { + if !self.orders.contains_key(client_order_id) { + error!( + "{} in `index.order_ids`: {} not found in `self.orders`", + failure, client_order_id + ); + error_count += 1; + } + } + + for client_order_id in self.index.order_position.keys() { + if !self.orders.contains_key(client_order_id) { + error!( + "{} in `index.order_position`: {} not found in `self.orders`", + failure, client_order_id + ); + error_count += 1; + } + } + + // Check indexes + for client_order_id in self.index.order_strategy.keys() { + if !self.orders.contains_key(client_order_id) { + error!( + "{} in `index.order_strategy`: {} not found in `self.orders`", + failure, client_order_id + ); + error_count += 1; + } + } + + for position_id in self.index.position_strategy.keys() { + if !self.positions.contains_key(position_id) { + error!( + "{} in `index.position_strategy`: {} not found in `self.positions`", + failure, position_id + ); + error_count += 1; + } + } + + for position_id in self.index.position_orders.keys() { + if !self.positions.contains_key(position_id) { + error!( + "{} in `index.position_orders`: {} not found in `self.positions`", + failure, position_id + ); + error_count += 1; + } + } + + for (instrument_id, client_order_ids) in &self.index.instrument_orders { + for client_order_id in client_order_ids { + if !self.orders.contains_key(client_order_id) { + error!( + "{} in `index.instrument_orders`: {} not found in `self.orders`", + failure, instrument_id + ); + error_count += 1; + } + } + } + + for instrument_id in self.index.instrument_positions.keys() { + if !self.index.instrument_orders.contains_key(instrument_id) { + error!( + "{} in `index.instrument_positions`: {} not found in `index.instrument_orders`", + failure, instrument_id + ); + error_count += 1; + } + } + + for client_order_ids in self.index.strategy_orders.values() { + for client_order_id in client_order_ids { + if !self.orders.contains_key(client_order_id) { + error!( + "{} in `index.strategy_orders`: {} not found in `self.orders`", + failure, client_order_id + ); + error_count += 1; + } + } + } + + for position_ids in self.index.strategy_positions.values() { + for position_id in position_ids { + if !self.positions.contains_key(position_id) { + error!( + "{} in `index.strategy_positions`: {} not found in `self.positions`", + failure, position_id + ); + error_count += 1; + } + } + } + + for client_order_id in &self.index.orders { + if !self.orders.contains_key(client_order_id) { + error!( + "{} in `index.orders`: {} not found in `self.orders`", + failure, client_order_id + ); + error_count += 1; + } + } + + for client_order_id in &self.index.orders_emulated { + if !self.orders.contains_key(client_order_id) { + error!( + "{} in `index.orders_emulated`: {} not found in `self.orders`", + failure, client_order_id + ); + error_count += 1; + } + } + + for client_order_id in &self.index.orders_inflight { + if !self.orders.contains_key(client_order_id) { + error!( + "{} in `index.orders_inflight`: {} not found in `self.orders`", + failure, client_order_id + ); + error_count += 1; + } + } + + for client_order_id in &self.index.orders_open { + if !self.orders.contains_key(client_order_id) { + error!( + "{} in `index.orders_open`: {} not found in `self.orders`", + failure, client_order_id + ); + error_count += 1; + } + } + + for client_order_id in &self.index.orders_closed { + if !self.orders.contains_key(client_order_id) { + error!( + "{} in `index.orders_closed`: {} not found in `self.orders`", + failure, client_order_id + ); + error_count += 1; + } + } + + for position_id in &self.index.positions { + if !self.positions.contains_key(position_id) { + error!( + "{} in `index.positions`: {} not found in `self.positions`", + failure, position_id + ); + error_count += 1; + } + } + + for position_id in &self.index.positions_open { + if !self.positions.contains_key(position_id) { + error!( + "{} in `index.positions_open`: {} not found in `self.positions`", + failure, position_id + ); + error_count += 1; + } + } + + for position_id in &self.index.positions_closed { + if !self.positions.contains_key(position_id) { + error!( + "{} in `index.positions_closed`: {} not found in `self.positions`", + failure, position_id + ); + error_count += 1; + } + } + + for strategy_id in &self.index.strategies { + if !self.index.strategy_orders.contains_key(strategy_id) { + error!( + "{} in `index.strategies`: {} not found in `index.strategy_orders`", + failure, strategy_id + ); + error_count += 1; + } + } + + for exec_algorithm_id in &self.index.exec_algorithms { + if !self + .index + .exec_algorithm_orders + .contains_key(exec_algorithm_id) + { + error!( + "{} in `index.exec_algorithms`: {} not found in `index.exec_algorithm_orders`", + failure, exec_algorithm_id + ); + error_count += 1; + } + } + + // Finally + let total_us = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_micros() + - timestamp_us; + + if error_count == 0 { + info!("Integrity check passed in {}μs", total_us); + true + } else { + error!( + "Integrity check failed with {} error{} in {}μs", + error_count, + if error_count == 1 { "" } else { "s" }, + total_us + ); + false + } } - pub fn check_residuals(&self) { - todo!() // Needs order query methods + /// Check for any residual open state and log warnings if any are found. + /// + ///'Open state' is considered to be open orders and open positions. + #[must_use] + pub fn check_residuals(&self) -> bool { + debug!("Checking residuals"); + + let mut residuals = false; + + // Check for any open orders + for order in self.orders_open(None, None, None, None) { + residuals = true; + warn!("Residual {:?}", order); + } + + // Check for any open positions + for position in self.positions_open(None, None, None, None) { + residuals = true; + warn!("Residual {}", position); + } + + residuals } + /// Clear the caches index. pub fn clear_index(&mut self) { self.index.clear(); debug!("Cleared index"); @@ -535,6 +926,7 @@ impl Cache { info!("Reset cache"); } + /// Dispose of the cache which will close any underlying database adapter. pub fn dispose(&self) -> anyhow::Result<()> { if let Some(database) = &self.database { // TODO: Log operations in database adapter @@ -543,6 +935,7 @@ impl Cache { Ok(()) } + /// Flush the caches database which permanently removes all persisted data. pub fn flush_db(&self) -> anyhow::Result<()> { if let Some(database) = &self.database { // TODO: Log operations in database adapter @@ -551,6 +944,10 @@ impl Cache { Ok(()) } + /// Add the given general object to the cache. + /// + /// The cache is agnostic to what the object actually is (and how it may be serialized), + /// offering maximum flexibility. pub fn add(&mut self, key: &str, value: Vec) -> anyhow::Result<()> { check_valid_string(key, stringify!(key))?; check_slice_not_empty(value.as_slice(), stringify!(value))?; @@ -564,12 +961,14 @@ impl Cache { Ok(()) } + /// Add the given order `book` to the cache. pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> { debug!("Add `OrderBook` {}", book.instrument_id); self.books.insert(book.instrument_id, book); Ok(()) } + /// Add the given `quote` tick to the cache. pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> { debug!("Add `QuoteTick` {}", quote.instrument_id); let quotes_deque = self @@ -580,6 +979,7 @@ impl Cache { Ok(()) } + /// Add the given `quotes` to the cache. pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> { check_slice_not_empty(quotes, stringify!(quotes))?; @@ -596,6 +996,7 @@ impl Cache { Ok(()) } + /// Add the given `trade` tick to the cache. pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> { debug!("Add `TradeTick` {}", trade.instrument_id); let trades_deque = self @@ -606,6 +1007,7 @@ impl Cache { Ok(()) } + /// Add the give `trades` to the cache. pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> { check_slice_not_empty(trades, stringify!(trades))?; @@ -622,6 +1024,7 @@ impl Cache { Ok(()) } + /// Add the given `bar` to the cache. pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> { debug!("Add `Bar` {}", bar.bar_type); let bars = self @@ -632,6 +1035,7 @@ impl Cache { Ok(()) } + /// Add the given `bars` to the cache. pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> { check_slice_not_empty(bars, stringify!(bars))?; @@ -648,6 +1052,7 @@ impl Cache { Ok(()) } + /// Add the given `currency` to the cache. pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> { debug!("Add `Currency` {}", currency.code); @@ -659,6 +1064,7 @@ impl Cache { Ok(()) } + /// Add the given `instrument` to the cache. pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> { debug!("Add `Instrument` {}", instrument.id()); @@ -670,6 +1076,7 @@ impl Cache { Ok(()) } + /// Add the given `synthetic` instrument to the cache. pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> { debug!("Add `SyntheticInstrument` {}", synthetic.id); @@ -681,6 +1088,7 @@ impl Cache { Ok(()) } + /// Add the given `account` to the cache. pub fn add_account(&mut self, account: Box) -> anyhow::Result<()> { debug!("Add `Account` {}", account.id()); @@ -715,7 +1123,7 @@ impl Cache { let client_order_id = order.client_order_id(); let strategy_id = order.strategy_id(); let exec_algorithm_id = order.exec_algorithm_id(); - let _exec_spawn_id = order.exec_spawn_id(); + let exec_spawn_id = order.exec_spawn_id(); if !replace_existing { check_key_not_in_map( @@ -753,56 +1161,42 @@ impl Cache { self.index.strategies.insert(strategy_id); // Update venue -> orders index - if let Some(venue_orders) = self.index.venue_orders.get_mut(&venue) { - venue_orders.insert(client_order_id); - } else { - let mut new_set = HashSet::new(); - new_set.insert(client_order_id); - self.index.venue_orders.insert(venue, new_set); - } + self.index + .venue_orders + .entry(venue) + .or_default() + .insert(client_order_id); // Update instrument -> orders index - if let Some(instrument_orders) = self.index.instrument_orders.get_mut(&instrument_id) { - instrument_orders.insert(client_order_id); - } else { - let mut new_set = HashSet::new(); - new_set.insert(client_order_id); - self.index.instrument_orders.insert(instrument_id, new_set); - } + self.index + .instrument_orders + .entry(instrument_id) + .or_default() + .insert(client_order_id); // Update strategy -> orders index - if let Some(strategy_orders) = self.index.strategy_orders.get_mut(&strategy_id) { - strategy_orders.insert(client_order_id); - } else { - let mut new_set = HashSet::new(); - new_set.insert(client_order_id); - self.index.strategy_orders.insert(strategy_id, new_set); - } + self.index + .strategy_orders + .entry(strategy_id) + .or_default() + .insert(client_order_id); // Update exec_algorithm -> orders index if let Some(exec_algorithm_id) = exec_algorithm_id { self.index.exec_algorithms.insert(exec_algorithm_id); - if let Some(exec_algorithm_orders) = - self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id) - { - exec_algorithm_orders.insert(client_order_id); - } else { - let mut new_set = HashSet::new(); - new_set.insert(client_order_id); - self.index - .exec_algorithm_orders - .insert(exec_algorithm_id, new_set); - } + self.index + .exec_algorithm_orders + .entry(exec_algorithm_id) + .or_default() + .insert(client_order_id); - // TODO: Implement - // if let Some(exec_spawn_orders) = self.index.exec_spawn_orders.get_mut(&exec_spawn_id) { - // exec_spawn_orders.insert(client_order_id.clone()); - // } else { - // let mut new_set = HashSet::new(); - // new_set.insert(client_order_id.clone()); - // self.index.exec_spawn_orders.insert(exec_spawn_id, new_set); - // } + // SAFETY: We can guarantee the `exec_spawn_id` is Some + self.index + .exec_spawn_orders + .entry(exec_spawn_id.unwrap()) + .or_default() + .insert(client_order_id); } // TODO: Change emulation trigger setup @@ -816,16 +1210,15 @@ impl Cache { // } // } - // TODO: Implement // Index position ID if provided - // if let Some(position_id) = position_id { - // self.add_position_id( - // position_id, - // order.instrument_id().venue, - // client_order_id.clone(), - // strategy_id, - // ); - // } + if let Some(position_id) = order.position_id() { + self.add_position_id( + &position_id, + &order.instrument_id().venue, + &client_order_id, + &strategy_id, + )?; + } // Index client ID if provided if let Some(client_id) = client_id { @@ -833,7 +1226,6 @@ impl Cache { log::debug!("Indexed {:?}", client_id); } - // Update database if available if let Some(database) = &mut self.database { database.add_order(&order)?; // TODO: Implement @@ -847,6 +1239,173 @@ impl Cache { Ok(()) } + /// Index the given `position_id` with the other given IDs. + pub fn add_position_id( + &mut self, + position_id: &PositionId, + venue: &Venue, + client_order_id: &ClientOrderId, + strategy_id: &StrategyId, + ) -> anyhow::Result<()> { + self.index + .order_position + .insert(*client_order_id, *position_id); + + // Index: ClientOrderId -> PositionId + if let Some(database) = &mut self.database { + database.index_order_position(*client_order_id, *position_id)?; + } + + // Index: PositionId -> StrategyId + self.index + .position_strategy + .insert(*position_id, *strategy_id); + + // Index: PositionId -> set[ClientOrderId] + self.index + .position_orders + .entry(*position_id) + .or_default() + .insert(*client_order_id); + + // Index: StrategyId -> set[PositionId] + self.index + .strategy_positions + .entry(*strategy_id) + .or_default() + .insert(*position_id); + + Ok(()) + } + + pub fn add_position(&mut self, position: Position, oms_type: OmsType) -> anyhow::Result<()> { + self.positions.insert(position.id, position.clone()); + self.index.positions.insert(position.id); + self.index.positions_open.insert(position.id); + + self.add_position_id( + &position.id, + &position.instrument_id.venue, + &position.opening_order_id, + &position.strategy_id, + )?; + + let venue = position.instrument_id.venue; + let venue_positions = self.index.venue_positions.entry(venue).or_default(); + venue_positions.insert(position.id); + + // Index: InstrumentId -> HashSet + let instrument_id = position.instrument_id; + let instrument_positions = self + .index + .instrument_positions + .entry(instrument_id) + .or_default(); + instrument_positions.insert(position.id); + + log::debug!( + "Added Position(id={}, strategy_id={})", + position.id, + position.strategy_id, + ); + + if let Some(database) = &mut self.database { + database.add_position(&position)?; + // TODO: Implement position snapshots + // if self.snapshot_positions { + // database.snapshot_position_state( + // position, + // position.ts_last, + // self.calculate_unrealized_pnl(&position), + // )?; + // } + } + + Ok(()) + } + + /// Update the given `account` in the cache. + pub fn update_account(&mut self, account: &dyn Account) -> anyhow::Result<()> { + if let Some(database) = &mut self.database { + database.update_account(account)?; + } + Ok(()) + } + + /// Update the given `order` in the cache. + pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> { + let client_order_id = order.client_order_id(); + + // Update venue order ID + if let Some(venue_order_id) = order.venue_order_id() { + // Assumes order_id does not change + self.index.order_ids.insert(venue_order_id, client_order_id); + } + + // Update in-flight state + if order.is_inflight() { + self.index.orders_inflight.insert(client_order_id); + } else { + self.index.orders_inflight.remove(&client_order_id); + } + + // Update open/closed state + if order.is_open() { + self.index.orders_closed.remove(&client_order_id); + self.index.orders_open.insert(client_order_id); + } else if order.is_closed() { + self.index.orders_open.remove(&client_order_id); + self.index.orders_pending_cancel.remove(&client_order_id); + self.index.orders_closed.insert(client_order_id); + } + + // Update emulation + if let Some(emulation_trigger) = order.emulation_trigger() { + match emulation_trigger { + TriggerType::NoTrigger => self.index.orders_emulated.remove(&client_order_id), + _ => self.index.orders_emulated.insert(client_order_id), + }; + } + + if let Some(database) = &mut self.database { + database.update_order(order)?; + // TODO: Implement order snapshots + // if self.snapshot_orders { + // database.snapshot_order_state(order)?; + // } + } + + Ok(()) + } + + /// Update the given `order` as pending cancel locally. + pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) { + self.index + .orders_pending_cancel + .insert(order.client_order_id()); + } + + /// Update the given `position` in the cache. + pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> { + // Update open/closed state + if position.is_open() { + self.index.positions_open.insert(position.id); + self.index.positions_closed.remove(&position.id); + } else { + self.index.positions_closed.insert(position.id); + self.index.positions_open.remove(&position.id); + } + + if let Some(database) = &mut self.database { + database.update_position(position)?; + // TODO: Implement order snapshots + // if self.snapshot_orders { + // database.snapshot_order_state(order)?; + // } + } + Ok(()) + } + // -- IDENTIFIER QUERIES ---------------------------------------------------------------------- fn build_order_query_filter_set(