Skip to content

Commit

Permalink
Got a binary running
Browse files Browse the repository at this point in the history
  • Loading branch information
d1ngd0 committed May 28, 2024
1 parent 632bc09 commit c8880be
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 76 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = "2021"
[dependencies]
arrayvec = "0.7.4"
dapt = "0.1.2"
log = "0.4.21"
memberlist = "0.2.1"
metrics = "0.22.3"
serde = "1.0.202"
Expand Down
61 changes: 44 additions & 17 deletions src/bin/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use std::env;
use std::hash::{DefaultHasher, Hasher};
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;

use herman::ccfg::{HermanDelegate, WithHermanDelegate};
use herman::Config;
use memberlist::agnostic::tokio::TokioRuntime;
use memberlist::net::resolver::address::NodeAddressResolver;
use memberlist::net::stream_layer::tcp::Tcp;
Expand All @@ -13,7 +17,10 @@ use memberlist::{Memberlist, Options};
use tokio::signal;

#[tokio::main]
async fn main() {
async fn main() -> Result<(), Box<dyn std::error::Error>> {
log::set_logger(&SimpleLogger).unwrap();
log::set_max_level(log::LevelFilter::Info);

let mut args = env::args();
let _ = args.next(); // the first argument is the name of the binary
let bind_address = args.next().expect("missing bind address");
Expand All @@ -33,21 +40,21 @@ async fn main() {
NodeAddress::from_str(bind_address.as_str()).expect("invalid bind address"),
);

let (tx, rx) = tokio::sync::mpsc::channel(100);

let messages = Subscriber::new(rx);

let delegate = HermesDelegate::with_messages(tx);

let m: Memberlist<
NetTransport<
u64,
NodeAddressResolver<TokioRuntime>,
Tcp<TokioRuntime>,
Lpe<_, _>,
TokioRuntime,
let (m, _cfg): (
Arc<
Memberlist<
NetTransport<
u64,
NodeAddressResolver<TokioRuntime>,
Tcp<TokioRuntime>,
Lpe<_, _>,
TokioRuntime,
>,
HermanDelegate<u64, SocketAddr>,
>,
>,
> = Memberlist::with_delegate(delegate, transport_options, Options::default())
Config<u64, String, _>,
) = Memberlist::with_config(transport_options, Options::default())
.await
.unwrap();

Expand All @@ -58,17 +65,37 @@ async fn main() {
hasher.finish()
};

let _ = m
let _node = m
.join(Node::new(
member_addr_hash,
MaybeResolvedAddress::Unresolved(
NodeAddress::from_str(member_addr.as_str()).expect("invalid member address"),
),
))
.await;
.await?;
}

signal::ctrl_c().await.expect("failed to listen for event");

println!("{:?}", m.members().await);

Ok(())
}

use log::{Level, Metadata, Record};

struct SimpleLogger;

impl log::Log for SimpleLogger {
fn enabled(&self, metadata: &Metadata) -> bool {
metadata.level() <= Level::Info
}

fn log(&self, record: &Record) {
if self.enabled(record.metadata()) {
println!("{} - {}", record.level(), record.args());
}
}

fn flush(&self) {}
}
90 changes: 77 additions & 13 deletions src/ccfg/memberlist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,75 @@ use memberlist::{
CheapClone, Memberlist,
};
use serde::{de::DeserializeOwned, Serialize};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::mpsc::{self, Receiver, Sender};

use crate::Config;

use super::{
ledger::{Data, Entry, Key},
Broadcast, Subscribe,
};

pub trait WithHermanDelegate<
T: Transport,
K: Key + DeserializeOwned + Serialize,
D: Data + DeserializeOwned + Serialize,
>
{
fn with_config(
transport_options: T::Options,
opts: memberlist::Options,
) -> impl Future<
Output = Result<
(
Arc<
Memberlist<
T,
HermanDelegate<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
>,
>,
Config<K, D, Broadcaster<T>>,
),
memberlist::error::Error<
T,
HermanDelegate<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
>,
>,
>;
}

impl<
T: Transport,
K: Key + DeserializeOwned + Serialize,
D: Data + DeserializeOwned + Serialize,
> WithHermanDelegate<T, K, D>
for Memberlist<T, HermanDelegate<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>>
{
async fn with_config(
transport_options: T::Options,
opts: memberlist::Options,
) -> Result<
(Arc<Self>, Config<K, D, Broadcaster<T>>),
memberlist::error::Error<
T,
HermanDelegate<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
>,
> {
let (tx, rx) = mpsc::channel(100);
let delegate = HermanDelegate::with_messages(tx);
let mbrlist = Arc::new(Memberlist::with_delegate(delegate, transport_options, opts).await?);

let cfg = Config::new(Broadcaster::new(mbrlist.clone(), 3));

let mut broadcast_listener = cfg.broadcast_listener(Subscriber::new(rx));
tokio::spawn(async move {
broadcast_listener.run().await;
});

Ok((mbrlist, cfg))
}
}

/// The broadcaster implements transporting data using memberlists send_reliable mode. It
/// grabs all of the online_members when called, and sends the entry to the next num_friends
/// in a sorted list of members. By increasing the number of friends, you increase the likelyhood
Expand All @@ -34,16 +96,16 @@ use super::{
///
/// So I guess use with caution.
pub struct Broadcaster<T: Transport> {
memberlist:
memberlist: Arc<
Memberlist<T, HermanDelegate<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>>,
>,
num_friends: usize,
}

impl<T: Transport> Broadcaster<T> {
impl<'a, T: Transport> Broadcaster<T> {
pub fn new(
memberlist: Memberlist<
T,
HermanDelegate<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
memberlist: Arc<
Memberlist<T, HermanDelegate<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>>,
>,
num_friends: usize,
) -> Self {
Expand Down Expand Up @@ -123,10 +185,6 @@ impl<T: Transport, K: Key + Serialize, D: Data + Serialize> Broadcast<K, D> for
}
}

pub trait WithCCFG<T: Transport> {
fn with_ccfg<T: Transport>() -> Broadcaster<T>;
}

pub struct Subscriber {
messages: Receiver<Bytes>,
}
Expand Down Expand Up @@ -280,11 +338,17 @@ impl<I: Id, A: CheapClone + Send + Sync + 'static> EventDelegate for HermanDeleg
type Id = I;
type Address = A;

async fn notify_join(&self, _node: Arc<NodeState<Self::Id, Self::Address>>) {}
async fn notify_join(&self, node: Arc<NodeState<Self::Id, Self::Address>>) {
log::info!("{} joined", node.id);
}

async fn notify_leave(&self, _node: Arc<NodeState<Self::Id, Self::Address>>) {}
async fn notify_leave(&self, node: Arc<NodeState<Self::Id, Self::Address>>) {
log::info!("{} left", node.id);
}

async fn notify_update(&self, _node: Arc<NodeState<Self::Id, Self::Address>>) {}
async fn notify_update(&self, node: Arc<NodeState<Self::Id, Self::Address>>) {
log::info!("{} updated", node.id);
}
}

impl<K: Id, A: CheapClone + Send + Sync + 'static> Delegate for HermanDelegate<K, A> {
Expand Down
49 changes: 4 additions & 45 deletions src/ccfg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,13 @@ mod memberlist;

use std::{future::Future, sync::Arc};

use ::memberlist::{
delegate::Delegate,
net::{AddressResolver, Transport},
transport, Memberlist,
};
use ledger::Ledger;
use serde::{de::DeserializeOwned, Serialize};
use tokio::sync::{broadcast, mpsc, Mutex};
use tokio::sync::Mutex;

use self::{
ledger::{Data, Entry, Key},
memberlist::{Broadcaster, Subscriber},
};
use self::ledger::{Data, Entry, Key};

pub use self::memberlist::{Broadcast, HermanDelegate, Subscribe};
pub use memberlist::HermanDelegate;
pub use memberlist::WithHermanDelegate;

/// Broadcase is used to send data to multiple nodes on the network. It is up to
/// the implementor how this should be done. send_entry is used to send a single entry
Expand All @@ -44,39 +36,6 @@ pub struct Config<T: Key, D: Data, B: Broadcast<T, D>> {
drop: Arc<Mutex<bool>>,
}

impl<
T: Key + Serialize + DeserializeOwned,
D: Data + Serialize + DeserializeOwned,
Tr: Transport,
> Config<T, D, Broadcaster<Tr>>
{
pub async fn new_with_memberlist(
transport_options: Tr::Options,
opts: ::memberlist::Options,
) -> Result<
Config<T, D, Broadcaster<Tr>>,
::memberlist::error::Error<
Tr,
HermanDelegate<Tr::Id, <Tr::Resolver as AddressResolver>::ResolvedAddress>,
>,
> {
// TODO: we need to make the channel size and the number of friends configurable
let (tx, rx) = mpsc::channel(100);
let delegate = HermanDelegate::with_messages(tx);
let subscriber = Subscriber::new(rx);
let memberlist = Memberlist::with_delegate(delegate, transport_options, opts).await?;
let broadcast = Broadcaster::new(memberlist, 3);

// TODO start the threads

Ok(Self {
ledger: Arc::new(Mutex::new(Ledger::new())),
broadcast: Arc::new(Mutex::new(broadcast)),
drop: Arc::new(Mutex::new(false)),
})
}
}

impl<T: Key, D: Data, B: Broadcast<T, D>> Config<T, D, B> {
/// new creates a new Config with the given ledger, broadcast, and subscriber.
pub fn new(broadcast: B) -> Self {
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
mod ccfg;
pub mod ccfg;
mod router;

pub use ccfg::Config;
Expand Down

0 comments on commit c8880be

Please sign in to comment.