Skip to content

Commit

Permalink
Refine clocks and timers
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Apr 16, 2024
1 parent 834d2a5 commit a872874
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 240 deletions.
54 changes: 29 additions & 25 deletions nautilus_core/common/src/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use std::{collections::HashMap, ops::Deref};

use nautilus_core::{
correctness::check_valid_string,
correctness::{check_positive_u64, check_predicate_true, check_valid_string},
nanos::UnixNanos,
time::{get_atomic_clock_realtime, AtomicTime},
};
Expand Down Expand Up @@ -49,7 +49,7 @@ pub trait Clock {
name: &str,
alert_time_ns: UnixNanos,
callback: Option<EventHandler>,
);
) -> anyhow::Result<()>;

/// Set a `Timer` to start alerting at every interval
/// between start and stop time. Optional callback gets
Expand All @@ -61,7 +61,7 @@ pub trait Clock {
start_time_ns: UnixNanos,
stop_time_ns: Option<UnixNanos>,
callback: Option<EventHandler>,
);
) -> anyhow::Result<()>;

fn next_time_ns(&self, name: &str) -> UnixNanos;
fn cancel_timer(&mut self, name: &str);
Expand Down Expand Up @@ -184,29 +184,28 @@ impl Clock for TestClock {
name: &str,
alert_time_ns: UnixNanos,
callback: Option<EventHandler>,
) {
check_valid_string(name, stringify!(name)).unwrap();
assert!(
) -> anyhow::Result<()> {
check_valid_string(name, stringify!(name))?;
check_predicate_true(
callback.is_some() | self.default_callback.is_some(),
"All Python callbacks were `None`"
);
"All Python callbacks were `None`",
)?;

let name_ustr = Ustr::from(name);
match callback {
Some(callback_py) => self.callbacks.insert(name_ustr, callback_py),
None => None,
};

// TODO: should the atomic clock be shared
// currently share timestamp nanoseconds
let time_ns = self.time.get_time_ns();
let timer = TestTimer::new(
name,
(alert_time_ns - time_ns).into(),
time_ns,
Some(alert_time_ns),
);
)?;
self.timers.insert(name_ustr, timer);
Ok(())
}

fn set_timer_ns(
Expand All @@ -216,21 +215,23 @@ impl Clock for TestClock {
start_time_ns: UnixNanos,
stop_time_ns: Option<UnixNanos>,
callback: Option<EventHandler>,
) {
check_valid_string(name, "name").unwrap();
assert!(
) -> anyhow::Result<()> {
check_valid_string(name, "name")?;
check_positive_u64(interval_ns, stringify!(interval_ns))?;
check_predicate_true(
callback.is_some() | self.default_callback.is_some(),
"All Python callbacks were `None`"
);
"All Python callbacks were `None`",
)?;

let name_ustr = Ustr::from(name);
match callback {
Some(callback_py) => self.callbacks.insert(name_ustr, callback_py),
None => None,
};

let timer = TestTimer::new(name, interval_ns, start_time_ns, stop_time_ns);
let timer = TestTimer::new(name, interval_ns, start_time_ns, stop_time_ns)?;
self.timers.insert(name_ustr, timer);
Ok(())
}

fn next_time_ns(&self, name: &str) -> UnixNanos {
Expand Down Expand Up @@ -318,7 +319,7 @@ impl Clock for LiveClock {
name: &str,
mut alert_time_ns: UnixNanos,
callback: Option<EventHandler>,
) {
) -> anyhow::Result<()> {
check_valid_string(name, stringify!(name)).unwrap();
assert!(
callback.is_some() | self.default_callback.is_some(),
Expand All @@ -338,9 +339,10 @@ impl Clock for LiveClock {
ts_now,
Some(alert_time_ns),
callback,
);
)?;
timer.start();
self.timers.insert(Ustr::from(name), timer);
Ok(())
}

fn set_timer_ns(
Expand All @@ -350,21 +352,23 @@ impl Clock for LiveClock {
start_time_ns: UnixNanos,
stop_time_ns: Option<UnixNanos>,
callback: Option<EventHandler>,
) {
check_valid_string(name, stringify!(name)).unwrap();
assert!(
) -> anyhow::Result<()> {
check_valid_string(name, stringify!(name))?;
check_positive_u64(interval_ns, stringify!(interval_ns))?;
check_predicate_true(
callback.is_some() | self.default_callback.is_some(),
"All Python callbacks were `None`"
);
"All Python callbacks were `None`",
)?;

let callback = match callback {
Some(callback) => callback,
None => self.default_callback.clone().unwrap(),
};

let mut timer = LiveTimer::new(name, interval_ns, start_time_ns, stop_time_ns, callback);
let mut timer = LiveTimer::new(name, interval_ns, start_time_ns, stop_time_ns, callback)?;
timer.start();
self.timers.insert(Ustr::from(name), timer);
Ok(())
}

fn next_time_ns(&self, name: &str) -> UnixNanos {
Expand Down
16 changes: 12 additions & 4 deletions nautilus_core/common/src/ffi/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ pub unsafe extern "C" fn test_clock_set_time_alert(
}
};

clock.set_time_alert_ns(name, alert_time_ns, handler);
clock
.set_time_alert_ns(name, alert_time_ns, handler)
.unwrap();
}

/// # Safety
Expand Down Expand Up @@ -183,7 +185,9 @@ pub unsafe extern "C" fn test_clock_set_timer(
}
};

clock.set_timer_ns(name, interval_ns, start_time_ns, stop_time_ns, handler);
clock
.set_timer_ns(name, interval_ns, start_time_ns, stop_time_ns, handler)
.unwrap();
}

/// # Safety
Expand Down Expand Up @@ -353,7 +357,9 @@ pub unsafe extern "C" fn live_clock_set_time_alert(
}
};

clock.set_time_alert_ns(name, alert_time_ns, handler);
clock
.set_time_alert_ns(name, alert_time_ns, handler)
.unwrap();
}

/// # Safety
Expand Down Expand Up @@ -385,7 +391,9 @@ pub unsafe extern "C" fn live_clock_set_timer(
}
};

clock.set_timer_ns(name, interval_ns, start_time_ns, stop_time_ns, handler);
clock
.set_timer_ns(name, interval_ns, start_time_ns, stop_time_ns, handler)
.unwrap();
}

/// # Safety
Expand Down
24 changes: 18 additions & 6 deletions nautilus_core/common/src/python/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ mod tests {
test_clock.register_default_handler(handler);

let timer_name = "TEST_TIME1";
test_clock.set_timer_ns(timer_name, 10, 0.into(), None, None);
test_clock
.set_timer_ns(timer_name, 10, 0.into(), None, None)
.unwrap();

assert_eq!(test_clock.timer_names(), [timer_name]);
assert_eq!(test_clock.timer_count(), 1);
Expand All @@ -73,7 +75,9 @@ mod tests {
test_clock.register_default_handler(handler);

let timer_name = "TEST_TIME1";
test_clock.set_timer_ns(timer_name, 10, 0.into(), None, None);
test_clock
.set_timer_ns(timer_name, 10, 0.into(), None, None)
.unwrap();
test_clock.cancel_timer(timer_name);

assert!(test_clock.timer_names().is_empty());
Expand All @@ -92,7 +96,9 @@ mod tests {
test_clock.register_default_handler(handler);

let timer_name = "TEST_TIME1";
test_clock.set_timer_ns(timer_name, 10, 0.into(), None, None);
test_clock
.set_timer_ns(timer_name, 10, 0.into(), None, None)
.unwrap();
test_clock.cancel_timers();

assert!(test_clock.timer_names().is_empty());
Expand All @@ -111,7 +117,9 @@ mod tests {
test_clock.register_default_handler(handler);

let timer_name = "TEST_TIME1";
test_clock.set_timer_ns(timer_name, 1, 1.into(), Some(UnixNanos::from(3)), None);
test_clock
.set_timer_ns(timer_name, 1, 1.into(), Some(UnixNanos::from(3)), None)
.unwrap();
test_clock.advance_time(2.into(), true);

assert_eq!(test_clock.timer_names(), [timer_name]);
Expand All @@ -129,7 +137,9 @@ mod tests {
let handler = EventHandler::new(py_append);
test_clock.register_default_handler(handler);

test_clock.set_timer_ns("TEST_TIME1", 2, 0.into(), Some(UnixNanos::from(3)), None);
test_clock
.set_timer_ns("TEST_TIME1", 2, 0.into(), Some(UnixNanos::from(3)), None)
.unwrap();
test_clock.advance_time(3.into(), true);

assert_eq!(test_clock.timer_names().len(), 1);
Expand All @@ -148,7 +158,9 @@ mod tests {
let handler = EventHandler::new(py_append);
test_clock.register_default_handler(handler);

test_clock.set_timer_ns("TEST_TIME1", 2, 0.into(), Some(UnixNanos::from(3)), None);
test_clock
.set_timer_ns("TEST_TIME1", 2, 0.into(), Some(UnixNanos::from(3)), None)
.unwrap();
test_clock.advance_time(3.into(), false);

assert_eq!(test_clock.timer_names().len(), 1);
Expand Down
10 changes: 10 additions & 0 deletions nautilus_core/common/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ use tokio::runtime::Runtime;

static RUNTIME: OnceLock<tokio::runtime::Runtime> = OnceLock::new();

/// Retrieves a reference to a globally shared Tokio runtime.
/// The runtime is lazily initialized on the first call and reused thereafter.
///
/// This global runtime is intended for use cases where passing a runtime
/// around is impractical. It uses default configuration values.
///
/// # Panics
///
/// Panics if the runtime could not be created, which typically indicates
/// an inability to spawn threads or allocate necessary resources.
pub fn get_runtime() -> &'static tokio::runtime::Runtime {
// Using default configuration values for now
RUNTIME.get_or_init(|| Runtime::new().expect("Failed to create tokio runtime"))
Expand Down
Loading

0 comments on commit a872874

Please sign in to comment.