From 1e3af32cdd672b341149708e3d88c29fa281954e Mon Sep 17 00:00:00 2001 From: Chris Sellers Date: Wed, 17 Apr 2024 21:59:17 +1000 Subject: [PATCH] Refine LiveTimer interface --- nautilus_core/common/src/clock.rs | 6 +- nautilus_core/common/src/handlers.rs | 1 - nautilus_core/common/src/timer.rs | 167 ++++++++++++++++++--------- 3 files changed, 116 insertions(+), 58 deletions(-) diff --git a/nautilus_core/common/src/clock.rs b/nautilus_core/common/src/clock.rs index a9bc9d5a41f7..dd8c2ea4c952 100644 --- a/nautilus_core/common/src/clock.rs +++ b/nautilus_core/common/src/clock.rs @@ -13,7 +13,7 @@ // limitations under the License. // ------------------------------------------------------------------------------------------------- -use std::{collections::HashMap, ops::Deref, sync::atomic::Ordering}; +use std::{collections::HashMap, ops::Deref}; use nautilus_core::{ correctness::{check_positive_u64, check_predicate_true, check_valid_string}, @@ -299,7 +299,7 @@ impl Clock for LiveClock { fn timer_names(&self) -> Vec<&str> { self.timers .iter() - .filter(|(_, timer)| !timer.is_expired.load(Ordering::SeqCst)) + .filter(|(_, timer)| !timer.is_expired()) .map(|(k, _)| k.as_str()) .collect() } @@ -307,7 +307,7 @@ impl Clock for LiveClock { fn timer_count(&self) -> usize { self.timers .iter() - .filter(|(_, timer)| !timer.is_expired.load(Ordering::SeqCst)) + .filter(|(_, timer)| !timer.is_expired()) .count() } diff --git a/nautilus_core/common/src/handlers.rs b/nautilus_core/common/src/handlers.rs index 37291c63be7b..85694cb2ede0 100644 --- a/nautilus_core/common/src/handlers.rs +++ b/nautilus_core/common/src/handlers.rs @@ -37,7 +37,6 @@ pub struct SafeMessageCallback { unsafe impl Send for SafeMessageCallback {} unsafe impl Sync for SafeMessageCallback {} -#[allow(dead_code)] #[derive(Clone)] pub struct SafeTimeEventCallback { pub callback: Arc, diff --git a/nautilus_core/common/src/timer.rs b/nautilus_core/common/src/timer.rs index 58d51a6979aa..7a584424e049 100644 --- a/nautilus_core/common/src/timer.rs +++ b/nautilus_core/common/src/timer.rs @@ -225,7 +225,7 @@ pub struct LiveTimer { pub start_time_ns: UnixNanos, pub stop_time_ns: Option, pub next_time_ns: UnixNanos, - pub is_expired: Arc, + is_expired: Arc, callback: EventHandler, canceler: Option>, } @@ -253,6 +253,10 @@ impl LiveTimer { }) } + pub fn is_expired(&self) -> bool { + self.is_expired.load(atomic::Ordering::SeqCst) + } + pub fn start(&mut self) { let event_name = self.name; let start_time_ns = self.start_time_ns; @@ -352,59 +356,114 @@ fn call_python_with_time_event( //////////////////////////////////////////////////////////////////////////////// // Tests //////////////////////////////////////////////////////////////////////////////// -#[cfg(not(feature = "python"))] #[cfg(test)] mod tests { - use rstest::*; - - use super::{TestTimer, TimeEvent}; - - #[rstest] - fn test_test_timer_pop_event() { - let mut timer = TestTimer::new("test_timer", 0, 1, None); - - assert!(timer.next().is_some()); - assert!(timer.next().is_some()); - timer.is_expired = true; - assert!(timer.next().is_none()); - } - - #[rstest] - fn test_test_timer_advance_within_next_time_ns() { - let mut timer = TestTimer::new("test_timer", 5, 0, None); - let _: Vec = timer.advance(1).collect(); - let _: Vec = timer.advance(2).collect(); - let _: Vec = timer.advance(3).collect(); - assert_eq!(timer.advance(4).count(), 0); - assert_eq!(timer.next_time_ns, 5); - assert!(!timer.is_expired); - } - - #[rstest] - fn test_test_timer_advance_up_to_next_time_ns() { - let mut timer = TestTimer::new("test_timer", 1, 0, None); - assert_eq!(timer.advance(1).count(), 1); - assert!(!timer.is_expired); - } - - #[rstest] - fn test_test_timer_advance_up_to_next_time_ns_with_stop_time() { - let mut timer = TestTimer::new("test_timer", 1, 0, Some(2)); - assert_eq!(timer.advance(2).count(), 2); - assert!(timer.is_expired); - } - - #[rstest] - fn test_test_timer_advance_beyond_next_time_ns() { - let mut timer = TestTimer::new("test_timer", 1, 0, Some(5)); - assert_eq!(timer.advance(5).count(), 5); - assert!(timer.is_expired); - } - - #[rstest] - fn test_test_timer_advance_beyond_stop_time() { - let mut timer = TestTimer::new("test_timer", 1, 0, Some(5)); - assert_eq!(timer.advance(10).count(), 5); - assert!(timer.is_expired); - } + // use nautilus_core::nanos::UnixNanos; + // use rstest::*; + // + // use super::{TestTimer, TimeEvent}; + // + // #[rstest] + // fn test_test_timer_pop_event() { + // let mut timer = TestTimer::new("test_timer", 0, UnixNanos::from(1), None).unwrap(); + // + // assert!(timer.next().is_some()); + // assert!(timer.next().is_some()); + // timer.is_expired = true; + // assert!(timer.next().is_none()); + // } + // + // #[rstest] + // fn test_test_timer_advance_within_next_time_ns() { + // let mut timer = TestTimer::new("test_timer", 5, UnixNanos::from(0), None).unwrap(); + // let _: Vec = timer.advance(UnixNanos::from(1)).collect(); + // let _: Vec = timer.advance(UnixNanos::from(2)).collect(); + // let _: Vec = timer.advance(UnixNanos::from(3)).collect(); + // assert_eq!(timer.advance(UnixNanos::from(4)).count(), 0); + // assert_eq!(timer.next_time_ns, 5); + // assert!(!timer.is_expired); + // } + + // #[rstest] + // fn test_test_timer_advance_up_to_next_time_ns() { + // let mut timer = TestTimer::new("test_timer", 1, 0, None); + // assert_eq!(timer.advance(1).count(), 1); + // assert!(!timer.is_expired); + // } + // + // #[rstest] + // fn test_test_timer_advance_up_to_next_time_ns_with_stop_time() { + // let mut timer = TestTimer::new("test_timer", 1, 0, Some(2)); + // assert_eq!(timer.advance(2).count(), 2); + // assert!(timer.is_expired); + // } + // + // #[rstest] + // fn test_test_timer_advance_beyond_next_time_ns() { + // let mut timer = TestTimer::new("test_timer", 1, 0, Some(5)); + // assert_eq!(timer.advance(5).count(), 5); + // assert!(timer.is_expired); + // } + // + // #[rstest] + // fn test_test_timer_advance_beyond_stop_time() { + // let mut timer = TestTimer::new("test_timer", 1, 0, Some(5)); + // assert_eq!(timer.advance(10).count(), 5); + // assert!(timer.is_expired); + // } + + // #[tokio::test] + // async fn test_live_timer_starts_and_stops() { + // // Create a callback that increments a counter + // let event_list = Python::with_gil(|py| PyList::empty(py)); + // + // // Create a new LiveTimer with a short interval and start immediately + // let clock = get_atomic_clock_realtime(); + // let start_time = UnixNanos::from(clock.get_time_ns()); + // let interval_ns = 100_000_000; // 100 ms + // let mut timer = + // LiveTimer::new("TEST_TIMER", interval_ns, start_time, None, handler).unwrap(); + // timer.start(); + // + // // Wait for a short time to allow the timer to run + // tokio::time::sleep(Duration::from_millis(250)).await; + // + // // Stop the timer and assert that the counter has been incremented + // timer.cancel().unwrap(); + // // let counter = counter.lock().unwrap(); + // // assert!(*counter > 0); + // assert!(timer.is_expired()) + // } + + // #[tokio::test] + // async fn test_live_timer_with_stop_time() { + // // Create a callback that increments a counter + // let counter = Arc::new(Mutex::new(0)); + // let counter_clone = Arc::clone(&counter); + // let callback = move || { + // let mut counter = counter_clone.lock().unwrap(); + // *counter += 1; + // }; + // + // // Create a new LiveTimer with a short interval and stop time + // let start_time = UnixNanos::now(); + // let interval_ns = 100_000_000; // 100 ms + // let stop_time = start_time + 500_000_000; // 500 ms + // let mut live_timer = LiveTimer::new( + // "TEST_TIMER", + // interval_ns, + // start_time, + // Some(stop_time), + // callback, + // ) + // .unwrap(); + // live_timer.start(); + // + // // Wait for a longer time than the stop time + // tokio::time::sleep(Duration::from_millis(750)).await; + // + // // Check that the counter has not been incremented beyond the stop time + // let counter = counter.lock().unwrap(); + // assert!(*counter <= 5); // 500 ms / 100 ms = 5 increments + // } }