Skip to content

Commit

Permalink
Make core clocks pyclass with Send and Sync
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Nov 26, 2023
1 parent d9610b1 commit 39fa0dd
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 15 deletions.
8 changes: 8 additions & 0 deletions nautilus_core/common/src/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ pub trait Clock {
fn cancel_timers(&mut self);
}

#[cfg_attr(
feature = "python",
pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
)]
pub struct TestClock {
time: AtomicTime,
timers: HashMap<String, TestTimer>,
Expand Down Expand Up @@ -249,6 +253,10 @@ impl Clock for TestClock {
}
}

#[cfg_attr(
feature = "python",
pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
)]
pub struct LiveClock {
internal: AtomicTime,
timers: HashMap<String, TestTimer>,
Expand Down
36 changes: 31 additions & 5 deletions nautilus_core/common/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.
// -------------------------------------------------------------------------------------------------

use std::{fmt, rc::Rc};
use std::{fmt, sync::Arc};

use nautilus_core::message::Message;
use pyo3::{ffi, prelude::*};
Expand All @@ -33,17 +33,39 @@ pub extern "C" fn dummy_callable(c: PyCallableWrapper) -> PyCallableWrapper {
c
}

#[allow(dead_code)]
#[derive(Clone)]
pub struct SafeMessageCallback {
pub callback: Arc<dyn Fn(Message) + Send>,
}

unsafe impl Send for SafeMessageCallback {}
unsafe impl Sync for SafeMessageCallback {}

#[allow(dead_code)]
#[derive(Clone)]
pub struct SafeTimeEventCallback {
pub callback: Arc<dyn Fn(TimeEvent) + Send>,
}

unsafe impl Send for SafeTimeEventCallback {}
unsafe impl Sync for SafeTimeEventCallback {}

// TODO: Make this more generic
#[derive(Clone)]
#[cfg_attr(
feature = "python",
pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
)]
pub struct MessageHandler {
pub handler_id: Ustr,
_callback: Option<Rc<dyn Fn(Message)>>,
_callback: Option<SafeMessageCallback>,
}

impl MessageHandler {
// TODO: Validate exactly one of these is `Some`
#[must_use]
pub fn new(handler_id: Ustr, callback: Option<Rc<dyn Fn(Message)>>) -> Self {
pub fn new(handler_id: Ustr, callback: Option<SafeMessageCallback>) -> Self {
Self {
handler_id,
_callback: callback,
Expand All @@ -67,15 +89,19 @@ impl fmt::Debug for MessageHandler {

// TODO: Make this more generic
#[derive(Clone)]
#[cfg_attr(
feature = "python",
pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
)]
pub struct EventHandler {
py_callback: Option<PyObject>,
_callback: Option<Rc<dyn Fn(TimeEvent)>>,
_callback: Option<SafeTimeEventCallback>,
}

impl EventHandler {
// TODO: Validate exactly one of these is `Some`
#[must_use]
pub fn new(py_callback: Option<PyObject>, callback: Option<Rc<dyn Fn(TimeEvent)>>) -> Self {
pub fn new(py_callback: Option<PyObject>, callback: Option<SafeTimeEventCallback>) -> Self {
Self {
py_callback,
_callback: callback,
Expand Down
14 changes: 8 additions & 6 deletions nautilus_core/common/src/msgbus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,22 +459,24 @@ pub fn is_matching(topic: &Ustr, pattern: &Ustr) -> bool {
////////////////////////////////////////////////////////////////////////////////
#[cfg(test)]
mod tests {
use std::rc::Rc;
use std::sync::Arc;

use nautilus_core::{message::Message, uuid::UUID4};
use rstest::*;

use super::*;
use crate::handlers::MessageHandler;
use crate::handlers::{MessageHandler, SafeMessageCallback};

fn stub_msgbus() -> MessageBus {
MessageBus::new(TraderId::from("trader-001"), UUID4::new(), None, None)
}

fn stub_rust_callback() -> Rc<dyn Fn(Message)> {
Rc::new(|m: Message| {
format!("{m:?}");
})
fn stub_rust_callback() -> SafeMessageCallback {
SafeMessageCallback {
callback: Arc::new(|m: Message| {
format!("{m:?}");
}),
}
}

#[rstest]
Expand Down
14 changes: 14 additions & 0 deletions nautilus_core/common/src/python/clock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// -------------------------------------------------------------------------------------------------
// Copyright (C) 2015-2023 Nautech Systems Pty Ltd. All rights reserved.
// https://nautechsystems.io
//
// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// -------------------------------------------------------------------------------------------------
1 change: 1 addition & 0 deletions nautilus_core/common/src/python/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.
// -------------------------------------------------------------------------------------------------

pub mod clock;
pub mod timer;

use pyo3::prelude::*;
Expand Down
10 changes: 6 additions & 4 deletions nautilus_core/core/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

use std::{
ops::Deref,
rc::Rc,
sync::atomic::{AtomicU64, Ordering},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::{Duration, SystemTime, UNIX_EPOCH},
};

Expand Down Expand Up @@ -54,7 +56,7 @@ pub struct AtomicTime {
/// Atomic clock is operating in live or static mode
mode: ClockMode,
/// The last recorded time in nanoseconds for the clock
timestamp_ns: Rc<AtomicU64>,
timestamp_ns: Arc<AtomicU64>,
}

impl Deref for AtomicTime {
Expand All @@ -70,7 +72,7 @@ impl AtomicTime {
pub fn new(mode: ClockMode, time: u64) -> Self {
AtomicTime {
mode,
timestamp_ns: Rc::new(AtomicU64::new(time)),
timestamp_ns: Arc::new(AtomicU64::new(time)),
}
}

Expand Down

0 comments on commit 39fa0dd

Please sign in to comment.