Skip to content

Commit

Permalink
Initial MessageBus external publishing
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Nov 15, 2023
1 parent fce469c commit 0f0214c
Show file tree
Hide file tree
Showing 24 changed files with 379 additions and 91 deletions.
5 changes: 5 additions & 0 deletions examples/live/binance_spot_market_maker.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@
reconciliation_lookback_mins=1440,
),
cache_database=CacheDatabaseConfig(type="in-memory"),
# message_bus=MessageBusConfig(
# database=DatabaseConfig(), # No yet for operational use
# encoding="json",
# timestamps_as_iso8601=True,
# ),
data_clients={
"BINANCE": BinanceDataClientConfig(
api_key=None, # "YOUR_BINANCE_API_KEY"
Expand Down
32 changes: 32 additions & 0 deletions nautilus_core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions nautilus_core/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ serde_json = { workspace = true }
pyo3 = { workspace = true, optional = true }
strum = { workspace = true }
ustr = { workspace = true }
redis = "0.23.3"

[dev-dependencies]
tempfile = { workspace = true }
Expand Down
23 changes: 21 additions & 2 deletions nautilus_core/common/src/ffi/msgbus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use std::{
use nautilus_core::{
ffi::{
cvec::CVec,
string::{cstr_to_string, cstr_to_ustr, optional_cstr_to_string},
parsing::optional_bytes_to_json,
string::{cstr_to_string, cstr_to_ustr, cstr_to_vec, optional_cstr_to_string},
},
uuid::UUID4,
};
Expand Down Expand Up @@ -73,10 +74,12 @@ impl DerefMut for MessageBus_API {
pub unsafe extern "C" fn msgbus_new(
trader_id_ptr: *const c_char,
name_ptr: *const c_char,
config_ptr: *const c_char,
) -> MessageBus_API {
let trader_id = TraderId::from_str(&cstr_to_string(trader_id_ptr)).unwrap();
let name = optional_cstr_to_string(name_ptr);
MessageBus_API(Box::new(MessageBus::new(trader_id, name)))
let config = optional_bytes_to_json(config_ptr);
MessageBus_API(Box::new(MessageBus::new(trader_id, name, config)))
}

#[no_mangle]
Expand Down Expand Up @@ -391,3 +394,19 @@ pub unsafe extern "C" fn msgbus_is_matching(
let pattern = cstr_to_ustr(pattern_ptr);
is_matching(&topic, &pattern) as u8
}

/// # Safety
///
/// - Assumes `topic_ptr` is a valid C string pointer.
/// - Assumes `handler_id_ptr` is a valid C string pointer.
/// - Assumes `py_callable_ptr` points to a valid Python callable.
#[no_mangle]
pub unsafe extern "C" fn msgbus_publish_external(
bus: &mut MessageBus_API,
topic_ptr: *const c_char,
payload_ptr: *const c_char,
) {
let topic = cstr_to_string(topic_ptr);
let payload = cstr_to_vec(payload_ptr);
bus.publish_external(topic, payload);
}
97 changes: 81 additions & 16 deletions nautilus_core/common/src/msgbus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,19 @@
// limitations under the License.
// -------------------------------------------------------------------------------------------------

use std::hash::{Hash, Hasher};
use std::{
collections::HashMap,
fmt,
hash::{Hash, Hasher},
sync::mpsc::{channel, Receiver, SendError, Sender},
thread,
};

use indexmap::IndexMap;
use nautilus_core::uuid::UUID4;
use nautilus_model::identifiers::trader_id::TraderId;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use ustr::Ustr;

use crate::handlers::MessageHandler;
Expand Down Expand Up @@ -80,6 +88,26 @@ impl Hash for Subscription {
}
}

/// Represents a bus message including a topic and payload.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BusMessage {
/// The topic to publish on.
topic: String,
/// The serialized payload for the message.
payload: Vec<u8>,
}

impl fmt::Display for BusMessage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"[{}] {}",
self.topic,
String::from_utf8_lossy(&self.payload)
)
}
}

/// Provides a generic message bus to facilitate various messaging patterns.
///
/// The bus provides both a producer and consumer API for Pub/Sub, Req/Rep, as
Expand All @@ -102,18 +130,7 @@ impl Hash for Subscription {
/// For example, `c??p` would match both of the above examples and `coop`.
#[derive(Clone)]
pub struct MessageBus {
/// The trader ID associated with the message bus.
pub trader_id: TraderId,
/// The name for the message bus.
pub name: String,
// The count of messages sent through the bus.
pub sent_count: u64,
// The count of requests processed by the bus.
pub req_count: u64,
// The count of responses processed by the bus.
pub res_count: u64,
/// The count of messages published by the bus.
pub pub_count: u64,
tx: Sender<BusMessage>,
/// mapping from topic to the corresponding handler
/// a topic can be a string with wildcards
/// * '?' - any character
Expand All @@ -128,13 +145,40 @@ pub struct MessageBus {
/// a request maps it's id to a handler so that a response
/// with the same id can later be handled.
correlation_index: IndexMap<UUID4, MessageHandler>,
/// The trader ID associated with the message bus.
pub trader_id: TraderId,
/// The name for the message bus.
pub name: String,
// The count of messages sent through the bus.
pub sent_count: u64,
// The count of requests processed by the bus.
pub req_count: u64,
// The count of responses processed by the bus.
pub res_count: u64,
/// The count of messages published by the bus.
pub pub_count: u64,
/// If the message bus is backed by a database.
pub has_backing: bool,
}

impl MessageBus {
/// Initializes a new instance of the [`MessageBus`].
#[must_use]
pub fn new(trader_id: TraderId, name: Option<String>) -> Self {
pub fn new(
trader_id: TraderId,
name: Option<String>,
config: Option<HashMap<String, Value>>,
) -> Self {
let (tx, rx) = channel::<BusMessage>();
let config = config.unwrap_or_default();
let has_backing = config.get("database").is_some();

thread::spawn(move || {
Self::handle_messages(trader_id.value.as_ref(), config, rx);
});

Self {
tx,
trader_id,
name: name.unwrap_or_else(|| stringify!(MessageBus).to_owned()),
sent_count: 0,
Expand All @@ -145,6 +189,7 @@ impl MessageBus {
patterns: IndexMap::new(),
endpoints: IndexMap::new(),
correlation_index: IndexMap::new(),
has_backing,
}
}

Expand Down Expand Up @@ -334,6 +379,26 @@ impl MessageBus {
}
})
}

pub fn publish_external(&self, topic: String, payload: Vec<u8>) {
let msg = BusMessage { topic, payload };
if let Err(SendError(e)) = self.tx.send(msg) {
eprintln!("Error publishing external message: {e}");
}
}

fn handle_messages(
_trader_id: &str,
_config: HashMap<String, Value>,
rx: Receiver<BusMessage>,
) {
// Continue to receive and handle bus messages until channel is hung up
while let Ok(msg) = rx.recv() {
println!("{}", msg);
}

// TODO: WIP
}
}

/// Match a topic and a string pattern
Expand Down Expand Up @@ -381,7 +446,7 @@ mod tests {
use crate::handlers::MessageHandler;

fn stub_msgbus() -> MessageBus {
MessageBus::new(TraderId::from("trader-001"), None)
MessageBus::new(TraderId::from("trader-001"), None, None)
}

fn stub_rust_callback() -> Rc<dyn Fn(Message)> {
Expand All @@ -393,7 +458,7 @@ mod tests {
#[rstest]
fn test_new() {
let trader_id = TraderId::from("trader-001");
let msgbus = MessageBus::new(trader_id, None);
let msgbus = MessageBus::new(trader_id, None, None);

assert_eq!(msgbus.trader_id, trader_id);
assert_eq!(msgbus.name, stringify!(MessageBus));
Expand Down
38 changes: 36 additions & 2 deletions nautilus_core/core/src/ffi/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,26 @@ pub unsafe fn pystr_to_string(ptr: *mut ffi::PyObject) -> String {
///
/// - If `ptr` is null.
#[must_use]
#[no_mangle]
pub unsafe extern "C" fn cstr_to_ustr(ptr: *const c_char) -> Ustr {
pub unsafe fn cstr_to_ustr(ptr: *const c_char) -> Ustr {
assert!(!ptr.is_null(), "`ptr` was NULL");
Ustr::from(CStr::from_ptr(ptr).to_str().expect("CStr::from_ptr failed"))
}

/// Convert a C string pointer into an owned `String`.
///
/// # Safety
///
/// - Assumes `ptr` is a valid C string pointer.
///
/// # Panics
///
/// - If `ptr` is null.
#[must_use]
pub unsafe fn cstr_to_vec(ptr: *const c_char) -> Vec<u8> {
assert!(!ptr.is_null(), "`ptr` was NULL");
CStr::from_ptr(ptr).to_bytes().to_vec()
}

/// Convert a C string pointer into an owned `Option<Ustr>`.
///
/// # Safety
Expand Down Expand Up @@ -192,6 +206,26 @@ mod tests {
};
}

#[rstest]
fn test_cstr_to_vec() {
// Create a valid C string pointer
let sample_c_string = CString::new("Hello, world!").expect("CString::new failed");
let cstr_ptr = sample_c_string.as_ptr();
let result = unsafe { cstr_to_vec(cstr_ptr) };
assert_eq!(result, b"Hello, world!");
assert_eq!(result.len(), 13);
}

#[rstest]
#[should_panic]
fn test_cstr_to_vec_with_null_ptr() {
// Create a null C string pointer
let ptr: *const c_char = std::ptr::null();
unsafe {
let _ = cstr_to_vec(ptr);
};
}

#[rstest]
fn test_optional_cstr_to_string_with_null_ptr() {
// Call optional_cstr_to_string with null pointer
Expand Down
Loading

0 comments on commit 0f0214c

Please sign in to comment.