Skip to content

Commit

Permalink
Continue core MessageBus
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Nov 5, 2023
1 parent 6fe09a1 commit a4f9f8f
Show file tree
Hide file tree
Showing 8 changed files with 266 additions and 36 deletions.
1 change: 1 addition & 0 deletions nautilus_core/common/src/ffi/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub extern "C" fn test_clock_drop(clock: TestClock_API) {
}

/// # Safety
///
/// - Assumes `callback_ptr` is a valid `PyCallable` pointer.
#[no_mangle]
pub unsafe extern "C" fn test_clock_register_default_handler(
Expand Down
1 change: 1 addition & 0 deletions nautilus_core/common/src/ffi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@

pub mod clock;
pub mod logging;
pub mod msgbus;
pub mod timer;
91 changes: 91 additions & 0 deletions nautilus_core/common/src/ffi/msgbus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// -------------------------------------------------------------------------------------------------
// Copyright (C) 2015-2023 Nautech Systems Pty Ltd. All rights reserved.
// https://nautechsystems.io
//
// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// -------------------------------------------------------------------------------------------------

use std::{
collections::HashMap,
ffi::c_char,
ops::{Deref, DerefMut},
};

use nautilus_core::{ffi::string::cstr_to_string, uuid::UUID4};
use pyo3::ffi::PyObject;

use crate::msgbus::MessageBus;

#[allow(dead_code)] // Temporary for development
pub struct PythonSwitchboard {
subscriptions: HashMap<String, PyObject>,
patterns: HashMap<String, Vec<PyObject>>,
endpoints: HashMap<String, PyObject>,
correlation_index: HashMap<UUID4, PyObject>,
}

impl PythonSwitchboard {
pub fn new() -> Self {
Self {
subscriptions: HashMap::new(),
patterns: HashMap::new(),
endpoints: HashMap::new(),
correlation_index: HashMap::new(),
}
}
}

impl Default for PythonSwitchboard {
fn default() -> Self {
Self::new()
}
}

/// Provides a C compatible Foreign Function Interface (FFI) for an underlying [`MessageBus`].
///
/// This struct wraps `MessageBus` in a way that makes it compatible with C function
/// calls, enabling interaction with `MessageBus` in a C environment.
///
/// It implements the `Deref` trait, allowing instances of `MessageBus_API` to be
/// dereferenced to `MessageBus`, providing access to `TestClock`'s methods without
/// having to manually access the underlying `MessageBus` instance.
#[allow(non_camel_case_types)]
#[repr(C)]
pub struct MessageBus_API {
inner: Box<MessageBus>,
switchboard: Box<PythonSwitchboard>,
}

impl Deref for MessageBus_API {
type Target = MessageBus;

fn deref(&self) -> &Self::Target {
self.inner.deref()
}
}

impl DerefMut for MessageBus_API {
fn deref_mut(&mut self) -> &mut Self::Target {
self.inner.deref_mut()
}
}

/// # Safety
///
/// - Assumes `name_ptr` is a valid C string pointer.
#[no_mangle]
pub unsafe extern "C" fn test_msgbus_new(name_ptr: *const c_char) -> MessageBus_API {
let name = cstr_to_string(name_ptr);
MessageBus_API {
inner: Box::new(MessageBus::new(&name)),
switchboard: Box::new(PythonSwitchboard::new()),
}
}
73 changes: 37 additions & 36 deletions nautilus_core/common/src/msgbus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,61 +15,62 @@

use std::{collections::HashMap, rc::Rc};

use nautilus_core::uuid::UUID4;

use crate::clock::TestClock;
use nautilus_core::{message::Message, uuid::UUID4};

type Handler = Rc<dyn Fn(&Message)>;

/// Provides a generic message bus to facilitate various messaging patterns.
///
/// The bus provides both a producer and consumer API for Pub/Sub, Req/Rep, as
/// well as direct point-to-point messaging to registered endpoints.
///
/// Pub/Sub wildcard patterns for hierarchical topics are possible:
/// - `*` asterisk represents one or more characters in a pattern.
/// - `?` question mark represents a single character in a pattern.
///
/// Given a topic and pattern potentially containing wildcard characters, i.e.
/// `*` and `?`, where `?` can match any single character in the topic, and `*`
/// can match any number of characters including zero characters.
///
/// The asterisk in a wildcard matches any character zero or more times. For
/// example, `comp*` matches anything beginning with `comp` which means `comp`,
/// `complete`, and `computer` are all matched.
///
/// A question mark matches a single character once. For example, `c?mp` matches
/// `camp` and `comp`. The question mark can also be used more than once.
/// For example, `c??p` would match both of the above examples and `coop`.
#[allow(dead_code)]
#[derive(Debug, Clone)]
enum Message {
Command {
id: UUID4,
ts_init: u64,
},
Document {
id: UUID4,
ts_init: u64,
},
Event {
id: UUID4,
ts_init: u64,
ts_event: u64,
},
Request {
id: UUID4,
ts_init: u64,
},
Response {
id: UUID4,
ts_init: u64,
correlation_id: UUID4,
},
}

#[allow(dead_code)]
struct MessageBus {
name: String,
clock: TestClock,
pub struct MessageBus {
/// The name for the message bus.
pub name: String,
/// mapping from topic to the corresponding handler
/// a topic can be a string with wildcards
/// * '?' - any character
/// * '*' - any number of any characters
subscriptions: HashMap<String, Handler>,
/// maps a pattern to all the handlers registered for it
/// this is updated whenever a new subscription is created
/// this is updated whenever a new subscription is created.
patterns: HashMap<String, Vec<Handler>>,
/// handles a message or a request destined for a specific endpoint
/// handles a message or a request destined for a specific endpoint.
endpoints: HashMap<String, Handler>,
/// Relates a request with a response
/// a request maps it's id to a handler so that a response
/// with the same id can later be handled
/// with the same id can later be handled.
correlation_index: HashMap<UUID4, Handler>,
}

#[allow(dead_code)]
impl MessageBus {
pub fn new(name: &str) -> Self {
Self {
name: name.to_owned(),
subscriptions: HashMap::new(),
patterns: HashMap::new(),
endpoints: HashMap::new(),
correlation_index: HashMap::new(),
}
}

// TODO: not needed accessible from struct field
fn endpoints(&self) {}

Expand Down
1 change: 1 addition & 0 deletions nautilus_core/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

pub mod correctness;
pub mod datetime;
pub mod message;
pub mod parsing;
pub mod serialization;
pub mod time;
Expand Down
42 changes: 42 additions & 0 deletions nautilus_core/core/src/message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// -------------------------------------------------------------------------------------------------
// Copyright (C) 2015-2023 Nautech Systems Pty Ltd. All rights reserved.
// https://nautechsystems.io
//
// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// -------------------------------------------------------------------------------------------------

use crate::{time::UnixNanos, uuid::UUID4};

#[derive(Debug, Clone)]
pub enum Message {
Command {
id: UUID4,
ts_init: UnixNanos,
},
Document {
id: UUID4,
ts_init: UnixNanos,
},
Event {
id: UUID4,
ts_init: UnixNanos,
ts_event: UnixNanos,
},
Request {
id: UUID4,
ts_init: UnixNanos,
},
Response {
id: UUID4,
ts_init: UnixNanos,
correlation_id: UUID4,
},
}
49 changes: 49 additions & 0 deletions nautilus_trader/core/includes/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,32 @@ typedef struct LiveClock LiveClock;
*/
typedef struct Logger_t Logger_t;

/**
* Provides a generic message bus to facilitate various messaging patterns.
*
* The bus provides both a producer and consumer API for Pub/Sub, Req/Rep, as
* well as direct point-to-point messaging to registered endpoints.
*
* Pub/Sub wildcard patterns for hierarchical topics are possible:
* - `*` asterisk represents one or more characters in a pattern.
* - `?` question mark represents a single character in a pattern.
*
* Given a topic and pattern potentially containing wildcard characters, i.e.
* `*` and `?`, where `?` can match any single character in the topic, and `*`
* can match any number of characters including zero characters.
*
* The asterisk in a wildcard matches any character zero or more times. For
* example, `comp*` matches anything beginning with `comp` which means `comp`,
* `complete`, and `computer` are all matched.
*
* A question mark matches a single character once. For example, `c?mp` matches
* `camp` and `comp`. The question mark can also be used more than once.
* For example, `c??p` would match both of the above examples and `coop`.
*/
typedef struct MessageBus MessageBus;

typedef struct PythonSwitchboard PythonSwitchboard;

typedef struct TestClock TestClock;

/**
Expand Down Expand Up @@ -248,6 +274,21 @@ typedef struct Logger_API {
struct Logger_t *_0;
} Logger_API;

/**
* Provides a C compatible Foreign Function Interface (FFI) for an underlying [`MessageBus`].
*
* This struct wraps `MessageBus` in a way that makes it compatible with C function
* calls, enabling interaction with `MessageBus` in a C environment.
*
* It implements the `Deref` trait, allowing instances of `MessageBus_API` to be
* dereferenced to `MessageBus`, providing access to `TestClock`'s methods without
* having to manually access the underlying `MessageBus` instance.
*/
typedef struct MessageBus_API {
struct MessageBus *inner;
struct PythonSwitchboard *switchboard;
} MessageBus_API;

/**
* Represents a time event occurring at the event timestamp.
*/
Expand Down Expand Up @@ -330,6 +371,7 @@ void test_clock_drop(struct TestClock_API clock);

/**
* # Safety
*
* - Assumes `callback_ptr` is a valid `PyCallable` pointer.
*/
void test_clock_register_default_handler(struct TestClock_API *clock, PyObject *callback_ptr);
Expand Down Expand Up @@ -455,6 +497,13 @@ void logger_log(struct Logger_API *logger,
const char *component_ptr,
const char *message_ptr);

/**
* # Safety
*
* - Assumes `name_ptr` is a valid C string pointer.
*/
struct MessageBus_API test_msgbus_new(const char *name_ptr);

/**
* # Safety
*
Expand Down
Loading

0 comments on commit a4f9f8f

Please sign in to comment.