From c8880be509efd36c346cfac7f3dc39c2f0d6230b Mon Sep 17 00:00:00 2001 From: Paul Montag Date: Tue, 28 May 2024 08:12:40 -0500 Subject: [PATCH] Got a binary running --- Cargo.lock | 1 + Cargo.toml | 1 + src/bin/main.rs | 61 ++++++++++++++++++++-------- src/ccfg/memberlist.rs | 90 ++++++++++++++++++++++++++++++++++++------ src/ccfg/mod.rs | 49 ++--------------------- src/lib.rs | 2 +- 6 files changed, 128 insertions(+), 76 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a5f856d..7323503 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1254,6 +1254,7 @@ version = "0.1.0" dependencies = [ "arrayvec", "dapt", + "log", "memberlist", "metrics", "serde", diff --git a/Cargo.toml b/Cargo.toml index 566e1e3..db9254f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] arrayvec = "0.7.4" dapt = "0.1.2" +log = "0.4.21" memberlist = "0.2.1" metrics = "0.22.3" serde = "1.0.202" diff --git a/src/bin/main.rs b/src/bin/main.rs index 214bcba..9a086af 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -1,7 +1,11 @@ use std::env; use std::hash::{DefaultHasher, Hasher}; +use std::net::SocketAddr; use std::str::FromStr; +use std::sync::Arc; +use herman::ccfg::{HermanDelegate, WithHermanDelegate}; +use herman::Config; use memberlist::agnostic::tokio::TokioRuntime; use memberlist::net::resolver::address::NodeAddressResolver; use memberlist::net::stream_layer::tcp::Tcp; @@ -13,7 +17,10 @@ use memberlist::{Memberlist, Options}; use tokio::signal; #[tokio::main] -async fn main() { +async fn main() -> Result<(), Box> { + log::set_logger(&SimpleLogger).unwrap(); + log::set_max_level(log::LevelFilter::Info); + let mut args = env::args(); let _ = args.next(); // the first argument is the name of the binary let bind_address = args.next().expect("missing bind address"); @@ -33,21 +40,21 @@ async fn main() { NodeAddress::from_str(bind_address.as_str()).expect("invalid bind address"), ); - let (tx, rx) = tokio::sync::mpsc::channel(100); - - let messages = Subscriber::new(rx); - - let delegate = HermesDelegate::with_messages(tx); - - let m: Memberlist< - NetTransport< - u64, - NodeAddressResolver, - Tcp, - Lpe<_, _>, - TokioRuntime, + let (m, _cfg): ( + Arc< + Memberlist< + NetTransport< + u64, + NodeAddressResolver, + Tcp, + Lpe<_, _>, + TokioRuntime, + >, + HermanDelegate, + >, >, - > = Memberlist::with_delegate(delegate, transport_options, Options::default()) + Config, + ) = Memberlist::with_config(transport_options, Options::default()) .await .unwrap(); @@ -58,17 +65,37 @@ async fn main() { hasher.finish() }; - let _ = m + let _node = m .join(Node::new( member_addr_hash, MaybeResolvedAddress::Unresolved( NodeAddress::from_str(member_addr.as_str()).expect("invalid member address"), ), )) - .await; + .await?; } signal::ctrl_c().await.expect("failed to listen for event"); println!("{:?}", m.members().await); + + Ok(()) +} + +use log::{Level, Metadata, Record}; + +struct SimpleLogger; + +impl log::Log for SimpleLogger { + fn enabled(&self, metadata: &Metadata) -> bool { + metadata.level() <= Level::Info + } + + fn log(&self, record: &Record) { + if self.enabled(record.metadata()) { + println!("{} - {}", record.level(), record.args()); + } + } + + fn flush(&self) {} } diff --git a/src/ccfg/memberlist.rs b/src/ccfg/memberlist.rs index 4b32f00..c70bf98 100644 --- a/src/ccfg/memberlist.rs +++ b/src/ccfg/memberlist.rs @@ -15,13 +15,75 @@ use memberlist::{ CheapClone, Memberlist, }; use serde::{de::DeserializeOwned, Serialize}; -use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::mpsc::{self, Receiver, Sender}; + +use crate::Config; use super::{ ledger::{Data, Entry, Key}, Broadcast, Subscribe, }; +pub trait WithHermanDelegate< + T: Transport, + K: Key + DeserializeOwned + Serialize, + D: Data + DeserializeOwned + Serialize, +> +{ + fn with_config( + transport_options: T::Options, + opts: memberlist::Options, + ) -> impl Future< + Output = Result< + ( + Arc< + Memberlist< + T, + HermanDelegate::ResolvedAddress>, + >, + >, + Config>, + ), + memberlist::error::Error< + T, + HermanDelegate::ResolvedAddress>, + >, + >, + >; +} + +impl< + T: Transport, + K: Key + DeserializeOwned + Serialize, + D: Data + DeserializeOwned + Serialize, + > WithHermanDelegate + for Memberlist::ResolvedAddress>> +{ + async fn with_config( + transport_options: T::Options, + opts: memberlist::Options, + ) -> Result< + (Arc, Config>), + memberlist::error::Error< + T, + HermanDelegate::ResolvedAddress>, + >, + > { + let (tx, rx) = mpsc::channel(100); + let delegate = HermanDelegate::with_messages(tx); + let mbrlist = Arc::new(Memberlist::with_delegate(delegate, transport_options, opts).await?); + + let cfg = Config::new(Broadcaster::new(mbrlist.clone(), 3)); + + let mut broadcast_listener = cfg.broadcast_listener(Subscriber::new(rx)); + tokio::spawn(async move { + broadcast_listener.run().await; + }); + + Ok((mbrlist, cfg)) + } +} + /// The broadcaster implements transporting data using memberlists send_reliable mode. It /// grabs all of the online_members when called, and sends the entry to the next num_friends /// in a sorted list of members. By increasing the number of friends, you increase the likelyhood @@ -34,16 +96,16 @@ use super::{ /// /// So I guess use with caution. pub struct Broadcaster { - memberlist: + memberlist: Arc< Memberlist::ResolvedAddress>>, + >, num_friends: usize, } -impl Broadcaster { +impl<'a, T: Transport> Broadcaster { pub fn new( - memberlist: Memberlist< - T, - HermanDelegate::ResolvedAddress>, + memberlist: Arc< + Memberlist::ResolvedAddress>>, >, num_friends: usize, ) -> Self { @@ -123,10 +185,6 @@ impl Broadcast for } } -pub trait WithCCFG { - fn with_ccfg() -> Broadcaster; -} - pub struct Subscriber { messages: Receiver, } @@ -280,11 +338,17 @@ impl EventDelegate for HermanDeleg type Id = I; type Address = A; - async fn notify_join(&self, _node: Arc>) {} + async fn notify_join(&self, node: Arc>) { + log::info!("{} joined", node.id); + } - async fn notify_leave(&self, _node: Arc>) {} + async fn notify_leave(&self, node: Arc>) { + log::info!("{} left", node.id); + } - async fn notify_update(&self, _node: Arc>) {} + async fn notify_update(&self, node: Arc>) { + log::info!("{} updated", node.id); + } } impl Delegate for HermanDelegate { diff --git a/src/ccfg/mod.rs b/src/ccfg/mod.rs index b14132c..c9a5b63 100644 --- a/src/ccfg/mod.rs +++ b/src/ccfg/mod.rs @@ -3,21 +3,13 @@ mod memberlist; use std::{future::Future, sync::Arc}; -use ::memberlist::{ - delegate::Delegate, - net::{AddressResolver, Transport}, - transport, Memberlist, -}; use ledger::Ledger; -use serde::{de::DeserializeOwned, Serialize}; -use tokio::sync::{broadcast, mpsc, Mutex}; +use tokio::sync::Mutex; -use self::{ - ledger::{Data, Entry, Key}, - memberlist::{Broadcaster, Subscriber}, -}; +use self::ledger::{Data, Entry, Key}; -pub use self::memberlist::{Broadcast, HermanDelegate, Subscribe}; +pub use memberlist::HermanDelegate; +pub use memberlist::WithHermanDelegate; /// Broadcase is used to send data to multiple nodes on the network. It is up to /// the implementor how this should be done. send_entry is used to send a single entry @@ -44,39 +36,6 @@ pub struct Config> { drop: Arc>, } -impl< - T: Key + Serialize + DeserializeOwned, - D: Data + Serialize + DeserializeOwned, - Tr: Transport, - > Config> -{ - pub async fn new_with_memberlist( - transport_options: Tr::Options, - opts: ::memberlist::Options, - ) -> Result< - Config>, - ::memberlist::error::Error< - Tr, - HermanDelegate::ResolvedAddress>, - >, - > { - // TODO: we need to make the channel size and the number of friends configurable - let (tx, rx) = mpsc::channel(100); - let delegate = HermanDelegate::with_messages(tx); - let subscriber = Subscriber::new(rx); - let memberlist = Memberlist::with_delegate(delegate, transport_options, opts).await?; - let broadcast = Broadcaster::new(memberlist, 3); - - // TODO start the threads - - Ok(Self { - ledger: Arc::new(Mutex::new(Ledger::new())), - broadcast: Arc::new(Mutex::new(broadcast)), - drop: Arc::new(Mutex::new(false)), - }) - } -} - impl> Config { /// new creates a new Config with the given ledger, broadcast, and subscriber. pub fn new(broadcast: B) -> Self { diff --git a/src/lib.rs b/src/lib.rs index 4a4b70c..4543588 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -mod ccfg; +pub mod ccfg; mod router; pub use ccfg::Config;