From a4f9f8f58663d9d273ceede91ec32861a5a9150b Mon Sep 17 00:00:00 2001
From: Chris Sellers <chris@cjdsellers.io>
Date: Sun, 5 Nov 2023 10:59:27 +1100
Subject: [PATCH] Continue core MessageBus

---
 nautilus_core/common/src/ffi/clock.rs  |  1 +
 nautilus_core/common/src/ffi/mod.rs    |  1 +
 nautilus_core/common/src/ffi/msgbus.rs | 91 ++++++++++++++++++++++++++
 nautilus_core/common/src/msgbus.rs     | 73 +++++++++++----------
 nautilus_core/core/src/lib.rs          |  1 +
 nautilus_core/core/src/message.rs      | 42 ++++++++++++
 nautilus_trader/core/includes/common.h | 49 ++++++++++++++
 nautilus_trader/core/rust/common.pxd   | 44 +++++++++++++
 8 files changed, 266 insertions(+), 36 deletions(-)
 create mode 100644 nautilus_core/common/src/ffi/msgbus.rs
 create mode 100644 nautilus_core/core/src/message.rs

diff --git a/nautilus_core/common/src/ffi/clock.rs b/nautilus_core/common/src/ffi/clock.rs
index 174ea0edec82..a23280bcd985 100644
--- a/nautilus_core/common/src/ffi/clock.rs
+++ b/nautilus_core/common/src/ffi/clock.rs
@@ -71,6 +71,7 @@ pub extern "C" fn test_clock_drop(clock: TestClock_API) {
 }
 
 /// # Safety
+///
 /// - Assumes `callback_ptr` is a valid `PyCallable` pointer.
 #[no_mangle]
 pub unsafe extern "C" fn test_clock_register_default_handler(
diff --git a/nautilus_core/common/src/ffi/mod.rs b/nautilus_core/common/src/ffi/mod.rs
index 59f543f6cb1a..23f8f3946b84 100644
--- a/nautilus_core/common/src/ffi/mod.rs
+++ b/nautilus_core/common/src/ffi/mod.rs
@@ -15,4 +15,5 @@
 
 pub mod clock;
 pub mod logging;
+pub mod msgbus;
 pub mod timer;
diff --git a/nautilus_core/common/src/ffi/msgbus.rs b/nautilus_core/common/src/ffi/msgbus.rs
new file mode 100644
index 000000000000..5c300e9ebd56
--- /dev/null
+++ b/nautilus_core/common/src/ffi/msgbus.rs
@@ -0,0 +1,91 @@
+// -------------------------------------------------------------------------------------------------
+//  Copyright (C) 2015-2023 Nautech Systems Pty Ltd. All rights reserved.
+//  https://nautechsystems.io
+//
+//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
+//  You may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+// -------------------------------------------------------------------------------------------------
+
+use std::{
+    collections::HashMap,
+    ffi::c_char,
+    ops::{Deref, DerefMut},
+};
+
+use nautilus_core::{ffi::string::cstr_to_string, uuid::UUID4};
+use pyo3::ffi::PyObject;
+
+use crate::msgbus::MessageBus;
+
+#[allow(dead_code)] // Temporary for development
+pub struct PythonSwitchboard {
+    subscriptions: HashMap<String, PyObject>,
+    patterns: HashMap<String, Vec<PyObject>>,
+    endpoints: HashMap<String, PyObject>,
+    correlation_index: HashMap<UUID4, PyObject>,
+}
+
+impl PythonSwitchboard {
+    pub fn new() -> Self {
+        Self {
+            subscriptions: HashMap::new(),
+            patterns: HashMap::new(),
+            endpoints: HashMap::new(),
+            correlation_index: HashMap::new(),
+        }
+    }
+}
+
+impl Default for PythonSwitchboard {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+/// Provides a C compatible Foreign Function Interface (FFI) for an underlying [`MessageBus`].
+///
+/// This struct wraps `MessageBus` in a way that makes it compatible with C function
+/// calls, enabling interaction with `MessageBus` in a C environment.
+///
+/// It implements the `Deref` trait, allowing instances of `MessageBus_API` to be
+/// dereferenced to `MessageBus`, providing access to `TestClock`'s methods without
+/// having to manually access the underlying `MessageBus` instance.
+#[allow(non_camel_case_types)]
+#[repr(C)]
+pub struct MessageBus_API {
+    inner: Box<MessageBus>,
+    switchboard: Box<PythonSwitchboard>,
+}
+
+impl Deref for MessageBus_API {
+    type Target = MessageBus;
+
+    fn deref(&self) -> &Self::Target {
+        self.inner.deref()
+    }
+}
+
+impl DerefMut for MessageBus_API {
+    fn deref_mut(&mut self) -> &mut Self::Target {
+        self.inner.deref_mut()
+    }
+}
+
+/// # Safety
+///
+/// - Assumes `name_ptr` is a valid C string pointer.
+#[no_mangle]
+pub unsafe extern "C" fn test_msgbus_new(name_ptr: *const c_char) -> MessageBus_API {
+    let name = cstr_to_string(name_ptr);
+    MessageBus_API {
+        inner: Box::new(MessageBus::new(&name)),
+        switchboard: Box::new(PythonSwitchboard::new()),
+    }
+}
diff --git a/nautilus_core/common/src/msgbus.rs b/nautilus_core/common/src/msgbus.rs
index 5ed133815c49..a72da8e3d943 100644
--- a/nautilus_core/common/src/msgbus.rs
+++ b/nautilus_core/common/src/msgbus.rs
@@ -15,61 +15,62 @@
 
 use std::{collections::HashMap, rc::Rc};
 
-use nautilus_core::uuid::UUID4;
-
-use crate::clock::TestClock;
+use nautilus_core::{message::Message, uuid::UUID4};
 
 type Handler = Rc<dyn Fn(&Message)>;
 
+/// Provides a generic message bus to facilitate various messaging patterns.
+///
+/// The bus provides both a producer and consumer API for Pub/Sub, Req/Rep, as
+/// well as direct point-to-point messaging to registered endpoints.
+///
+/// Pub/Sub wildcard patterns for hierarchical topics are possible:
+///  - `*` asterisk represents one or more characters in a pattern.
+///  - `?` question mark represents a single character in a pattern.
+///
+/// Given a topic and pattern potentially containing wildcard characters, i.e.
+/// `*` and `?`, where `?` can match any single character in the topic, and `*`
+/// can match any number of characters including zero characters.
+///
+/// The asterisk in a wildcard matches any character zero or more times. For
+/// example, `comp*` matches anything beginning with `comp` which means `comp`,
+/// `complete`, and `computer` are all matched.
+///
+/// A question mark matches a single character once. For example, `c?mp` matches
+/// `camp` and `comp`. The question mark can also be used more than once.
+/// For example, `c??p` would match both of the above examples and `coop`.
 #[allow(dead_code)]
-#[derive(Debug, Clone)]
-enum Message {
-    Command {
-        id: UUID4,
-        ts_init: u64,
-    },
-    Document {
-        id: UUID4,
-        ts_init: u64,
-    },
-    Event {
-        id: UUID4,
-        ts_init: u64,
-        ts_event: u64,
-    },
-    Request {
-        id: UUID4,
-        ts_init: u64,
-    },
-    Response {
-        id: UUID4,
-        ts_init: u64,
-        correlation_id: UUID4,
-    },
-}
-
-#[allow(dead_code)]
-struct MessageBus {
-    name: String,
-    clock: TestClock,
+pub struct MessageBus {
+    /// The name for the message bus.
+    pub name: String,
     /// mapping from topic to the corresponding handler
     /// a topic can be a string with wildcards
     /// * '?' - any character
     /// * '*' - any number of any characters
     subscriptions: HashMap<String, Handler>,
     /// maps a pattern to all the handlers registered for it
-    /// this is updated whenever a new subscription is created
+    /// this is updated whenever a new subscription is created.
     patterns: HashMap<String, Vec<Handler>>,
-    /// handles a message or a request destined for a specific endpoint
+    /// handles a message or a request destined for a specific endpoint.
     endpoints: HashMap<String, Handler>,
     /// Relates a request with a response
     /// a request maps it's id to a handler so that a response
-    /// with the same id can later be handled
+    /// with the same id can later be handled.
     correlation_index: HashMap<UUID4, Handler>,
 }
 
 #[allow(dead_code)]
 impl MessageBus {
+    pub fn new(name: &str) -> Self {
+        Self {
+            name: name.to_owned(),
+            subscriptions: HashMap::new(),
+            patterns: HashMap::new(),
+            endpoints: HashMap::new(),
+            correlation_index: HashMap::new(),
+        }
+    }
+
     // TODO: not needed accessible from struct field
     fn endpoints(&self) {}
 
diff --git a/nautilus_core/core/src/lib.rs b/nautilus_core/core/src/lib.rs
index 629d50f6e492..d7b429e41361 100644
--- a/nautilus_core/core/src/lib.rs
+++ b/nautilus_core/core/src/lib.rs
@@ -15,6 +15,7 @@
 
 pub mod correctness;
 pub mod datetime;
+pub mod message;
 pub mod parsing;
 pub mod serialization;
 pub mod time;
diff --git a/nautilus_core/core/src/message.rs b/nautilus_core/core/src/message.rs
new file mode 100644
index 000000000000..1fbf3a85017f
--- /dev/null
+++ b/nautilus_core/core/src/message.rs
@@ -0,0 +1,42 @@
+// -------------------------------------------------------------------------------------------------
+//  Copyright (C) 2015-2023 Nautech Systems Pty Ltd. All rights reserved.
+//  https://nautechsystems.io
+//
+//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
+//  You may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+// -------------------------------------------------------------------------------------------------
+
+use crate::{time::UnixNanos, uuid::UUID4};
+
+#[derive(Debug, Clone)]
+pub enum Message {
+    Command {
+        id: UUID4,
+        ts_init: UnixNanos,
+    },
+    Document {
+        id: UUID4,
+        ts_init: UnixNanos,
+    },
+    Event {
+        id: UUID4,
+        ts_init: UnixNanos,
+        ts_event: UnixNanos,
+    },
+    Request {
+        id: UUID4,
+        ts_init: UnixNanos,
+    },
+    Response {
+        id: UUID4,
+        ts_init: UnixNanos,
+        correlation_id: UUID4,
+    },
+}
diff --git a/nautilus_trader/core/includes/common.h b/nautilus_trader/core/includes/common.h
index 03c9903f5763..4f700523bdc7 100644
--- a/nautilus_trader/core/includes/common.h
+++ b/nautilus_trader/core/includes/common.h
@@ -203,6 +203,32 @@ typedef struct LiveClock LiveClock;
  */
 typedef struct Logger_t Logger_t;
 
+/**
+ * Provides a generic message bus to facilitate various messaging patterns.
+ *
+ * The bus provides both a producer and consumer API for Pub/Sub, Req/Rep, as
+ * well as direct point-to-point messaging to registered endpoints.
+ *
+ * Pub/Sub wildcard patterns for hierarchical topics are possible:
+ *  - `*` asterisk represents one or more characters in a pattern.
+ *  - `?` question mark represents a single character in a pattern.
+ *
+ * Given a topic and pattern potentially containing wildcard characters, i.e.
+ * `*` and `?`, where `?` can match any single character in the topic, and `*`
+ * can match any number of characters including zero characters.
+ *
+ * The asterisk in a wildcard matches any character zero or more times. For
+ * example, `comp*` matches anything beginning with `comp` which means `comp`,
+ * `complete`, and `computer` are all matched.
+ *
+ * A question mark matches a single character once. For example, `c?mp` matches
+ * `camp` and `comp`. The question mark can also be used more than once.
+ * For example, `c??p` would match both of the above examples and `coop`.
+ */
+typedef struct MessageBus MessageBus;
+
+typedef struct PythonSwitchboard PythonSwitchboard;
+
 typedef struct TestClock TestClock;
 
 /**
@@ -248,6 +274,21 @@ typedef struct Logger_API {
     struct Logger_t *_0;
 } Logger_API;
 
+/**
+ * Provides a C compatible Foreign Function Interface (FFI) for an underlying [`MessageBus`].
+ *
+ * This struct wraps `MessageBus` in a way that makes it compatible with C function
+ * calls, enabling interaction with `MessageBus` in a C environment.
+ *
+ * It implements the `Deref` trait, allowing instances of `MessageBus_API` to be
+ * dereferenced to `MessageBus`, providing access to `TestClock`'s methods without
+ * having to manually access the underlying `MessageBus` instance.
+ */
+typedef struct MessageBus_API {
+    struct MessageBus *inner;
+    struct PythonSwitchboard *switchboard;
+} MessageBus_API;
+
 /**
  * Represents a time event occurring at the event timestamp.
  */
@@ -330,6 +371,7 @@ void test_clock_drop(struct TestClock_API clock);
 
 /**
  * # Safety
+ *
  * - Assumes `callback_ptr` is a valid `PyCallable` pointer.
  */
 void test_clock_register_default_handler(struct TestClock_API *clock, PyObject *callback_ptr);
@@ -455,6 +497,13 @@ void logger_log(struct Logger_API *logger,
                 const char *component_ptr,
                 const char *message_ptr);
 
+/**
+ * # Safety
+ *
+ * - Assumes `name_ptr` is a valid C string pointer.
+ */
+struct MessageBus_API test_msgbus_new(const char *name_ptr);
+
 /**
  * # Safety
  *
diff --git a/nautilus_trader/core/rust/common.pxd b/nautilus_trader/core/rust/common.pxd
index 587960153b72..a090c3ebb54f 100644
--- a/nautilus_trader/core/rust/common.pxd
+++ b/nautilus_trader/core/rust/common.pxd
@@ -110,6 +110,32 @@ cdef extern from "../includes/common.h":
     cdef struct Logger_t:
         pass
 
+    # Provides a generic message bus to facilitate various messaging patterns.
+    #
+    # The bus provides both a producer and consumer API for Pub/Sub, Req/Rep, as
+    # well as direct point-to-point messaging to registered endpoints.
+    #
+    # Pub/Sub wildcard patterns for hierarchical topics are possible:
+    #  - `*` asterisk represents one or more characters in a pattern.
+    #  - `?` question mark represents a single character in a pattern.
+    #
+    # Given a topic and pattern potentially containing wildcard characters, i.e.
+    # `*` and `?`, where `?` can match any single character in the topic, and `*`
+    # can match any number of characters including zero characters.
+    #
+    # The asterisk in a wildcard matches any character zero or more times. For
+    # example, `comp*` matches anything beginning with `comp` which means `comp`,
+    # `complete`, and `computer` are all matched.
+    #
+    # A question mark matches a single character once. For example, `c?mp` matches
+    # `camp` and `comp`. The question mark can also be used more than once.
+    # For example, `c??p` would match both of the above examples and `coop`.
+    cdef struct MessageBus:
+        pass
+
+    cdef struct PythonSwitchboard:
+        pass
+
     cdef struct TestClock:
         pass
 
@@ -147,6 +173,18 @@ cdef extern from "../includes/common.h":
     cdef struct Logger_API:
         Logger_t *_0;
 
+    # Provides a C compatible Foreign Function Interface (FFI) for an underlying [`MessageBus`].
+    #
+    # This struct wraps `MessageBus` in a way that makes it compatible with C function
+    # calls, enabling interaction with `MessageBus` in a C environment.
+    #
+    # It implements the `Deref` trait, allowing instances of `MessageBus_API` to be
+    # dereferenced to `MessageBus`, providing access to `TestClock`'s methods without
+    # having to manually access the underlying `MessageBus` instance.
+    cdef struct MessageBus_API:
+        MessageBus *inner;
+        PythonSwitchboard *switchboard;
+
     # Represents a time event occurring at the event timestamp.
     cdef struct TimeEvent_t:
         # The event name.
@@ -202,6 +240,7 @@ cdef extern from "../includes/common.h":
     void test_clock_drop(TestClock_API clock);
 
     # # Safety
+    #
     # - Assumes `callback_ptr` is a valid `PyCallable` pointer.
     void test_clock_register_default_handler(TestClock_API *clock, PyObject *callback_ptr);
 
@@ -312,6 +351,11 @@ cdef extern from "../includes/common.h":
                     const char *component_ptr,
                     const char *message_ptr);
 
+    # # Safety
+    #
+    # - Assumes `name_ptr` is a valid C string pointer.
+    MessageBus_API test_msgbus_new(const char *name_ptr);
+
     # # Safety
     #
     # - Assumes `name_ptr` is borrowed from a valid Python UTF-8 `str`.