Skip to content

Commit

Permalink
Refine LiveTimer with tokio timer and atomic bool
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Apr 17, 2024
1 parent 4ddf0d8 commit 9eb5577
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 31 deletions.
33 changes: 18 additions & 15 deletions nautilus_core/common/src/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
// limitations under the License.
// -------------------------------------------------------------------------------------------------

use std::{collections::HashMap, ops::Deref};
use std::{collections::HashMap, ops::Deref, sync::atomic::Ordering};

use nautilus_core::{
correctness::{check_positive_u64, check_predicate_true, check_valid_string},
nanos::UnixNanos,
time::{get_atomic_clock_realtime, AtomicTime},
};
use tracing::error;
use ustr::Ustr;

use crate::{
Expand Down Expand Up @@ -298,15 +299,15 @@ impl Clock for LiveClock {
fn timer_names(&self) -> Vec<&str> {
self.timers
.iter()
.filter(|(_, timer)| !timer.is_expired)
.filter(|(_, timer)| !timer.is_expired.load(Ordering::SeqCst))
.map(|(k, _)| k.as_str())
.collect()
}

fn timer_count(&self) -> usize {
self.timers
.iter()
.filter(|(_, timer)| !timer.is_expired)
.filter(|(_, timer)| !timer.is_expired.load(Ordering::SeqCst))
.count()
}

Expand All @@ -323,7 +324,7 @@ impl Clock for LiveClock {
check_valid_string(name, stringify!(name)).unwrap();
assert!(
callback.is_some() | self.default_callback.is_some(),
"All Python callbacks were `None`"
"No callbacks provided",
);

let callback = match callback {
Expand All @@ -333,13 +334,9 @@ impl Clock for LiveClock {

let ts_now = self.get_time_ns();
alert_time_ns = std::cmp::max(alert_time_ns, ts_now);
let mut timer = LiveTimer::new(
name,
(alert_time_ns - ts_now).into(),
ts_now,
Some(alert_time_ns),
callback,
)?;
let interval_ns = (alert_time_ns - ts_now).into();
let mut timer = LiveTimer::new(name, interval_ns, ts_now, Some(alert_time_ns), callback)?;

timer.start();
self.timers.insert(Ustr::from(name), timer);
Ok(())
Expand All @@ -357,7 +354,7 @@ impl Clock for LiveClock {
check_positive_u64(interval_ns, stringify!(interval_ns))?;
check_predicate_true(
callback.is_some() | self.default_callback.is_some(),
"All Python callbacks were `None`",
"No callbacks provided",
)?;

let callback = match callback {
Expand All @@ -383,15 +380,21 @@ impl Clock for LiveClock {
let timer = self.timers.remove(&Ustr::from(name));
match timer {
None => {}
Some(mut timer) => timer.cancel(),
Some(mut timer) => {
if let Err(e) = timer.cancel() {
error!("Error on timer cancel: {:?}", e);
}
}
}
}

fn cancel_timers(&mut self) {
for timer in &mut self.timers.values_mut() {
timer.cancel();
if let Err(e) = timer.cancel() {
error!("Error on timer cancel: {:?}", e);
}
}
self.timers = HashMap::new();
self.timers.clear();
}
}

Expand Down
49 changes: 33 additions & 16 deletions nautilus_core/common/src/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ use std::{
cmp::Ordering,
ffi::c_char,
fmt::{Display, Formatter},
sync::{
atomic::{self, AtomicBool},
Arc,
},
};

use nautilus_core::{
Expand All @@ -27,7 +31,10 @@ use nautilus_core::{
};
#[cfg(feature = "python")]
use pyo3::{types::PyCapsule, IntoPy, PyObject, Python};
use tokio::{sync::oneshot, time::Duration};
use tokio::{
sync::oneshot,
time::{Duration, Instant},
};
use tracing::error;
use ustr::Ustr;

Expand Down Expand Up @@ -218,7 +225,7 @@ pub struct LiveTimer {
pub start_time_ns: UnixNanos,
pub stop_time_ns: Option<UnixNanos>,
pub next_time_ns: UnixNanos,
pub is_expired: bool,
pub is_expired: Arc<AtomicBool>,
callback: EventHandler,
canceler: Option<oneshot::Sender<()>>,
}
Expand All @@ -240,35 +247,43 @@ impl LiveTimer {
start_time_ns,
stop_time_ns,
next_time_ns: start_time_ns + interval_ns,
is_expired: false,
is_expired: Arc::new(AtomicBool::new(false)),
callback,
canceler: None,
})
}

pub fn start(&mut self) {
let event_name = self.name;
let mut start_time_ns = self.start_time_ns;
let start_time_ns = self.start_time_ns;
let stop_time_ns = self.stop_time_ns;
let interval_ns = self.interval_ns;
let is_expired = self.is_expired.clone();
let callback = self.callback.clone();

// Setup oneshot channel for cancelling timer task
let (cancel_tx, mut cancel_rx) = oneshot::channel();
self.canceler = Some(cancel_tx);

let clock = get_atomic_clock_realtime();
if start_time_ns == 0 {
start_time_ns = clock.get_time_ns();
}

let mut next_time_ns = start_time_ns + interval_ns;

let rt = get_runtime();
rt.spawn(async move {
let clock = get_atomic_clock_realtime();
let now_ns = clock.get_time_ns();
let start_instant = if start_time_ns <= now_ns {
Instant::now()
} else {
let delay_duration = Duration::from_nanos((start_time_ns - now_ns).into());
Instant::now() + delay_duration
};

let mut timer = tokio::time::interval_at(start_instant, Duration::from_millis(interval_ns));
let mut next_time_ns = start_time_ns + interval_ns;

loop {
// SAFETY: `timer.tick` is cancellation safe, if the cancel branch completes
// first then no tick has been consumed (no event was ready).
tokio::select! {
_ = tokio::time::sleep(Duration::from_nanos(next_time_ns.saturating_sub(clock.get_time_ns().as_u64()))) => {
_ = timer.tick() => {
call_python_with_time_event(event_name, next_time_ns, clock.get_time_ns(), &callback);

// Prepare next time interval
Expand All @@ -287,17 +302,19 @@ impl LiveTimer {
}
}

is_expired.store(true, atomic::Ordering::SeqCst);

Ok::<(), anyhow::Error>(())
});

self.is_expired = true;
}

/// Cancels the timer (the timer will not generate an event).
pub fn cancel(&mut self) {
pub fn cancel(&mut self) -> anyhow::Result<()> {
if let Some(sender) = self.canceler.take() {
let _ = sender.send(());
// Send cancellation signal
sender.send(()).map_err(|e| anyhow::anyhow!("{:?}", e))?;
}
Ok(())
}
}

Expand Down

0 comments on commit 9eb5577

Please sign in to comment.