Skip to content

Commit

Permalink
In the middle of some stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
d1ngd0 committed May 26, 2024
1 parent bfff724 commit 632bc09
Show file tree
Hide file tree
Showing 4 changed files with 328 additions and 27 deletions.
75 changes: 73 additions & 2 deletions src/bin/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,74 @@
fn main() {
println!("Hello, world!");
use std::env;
use std::hash::{DefaultHasher, Hasher};
use std::str::FromStr;

use memberlist::agnostic::tokio::TokioRuntime;
use memberlist::net::resolver::address::NodeAddressResolver;
use memberlist::net::stream_layer::tcp::Tcp;
use memberlist::net::{
Lpe, MaybeResolvedAddress, NetTransport, NetTransportOptions, Node, NodeAddress,
};
use memberlist::{Memberlist, Options};

use tokio::signal;

#[tokio::main]
async fn main() {
let mut args = env::args();
let _ = args.next(); // the first argument is the name of the binary
let bind_address = args.next().expect("missing bind address");
let bind_hash = {
let mut hasher = DefaultHasher::new();
hasher.write(bind_address.as_bytes());
hasher.finish()
};

let mut transport_options: NetTransportOptions<
u64,
NodeAddressResolver<TokioRuntime>,
Tcp<TokioRuntime>,
> = NetTransportOptions::new(bind_hash);

transport_options.add_bind_address(
NodeAddress::from_str(bind_address.as_str()).expect("invalid bind address"),
);

let (tx, rx) = tokio::sync::mpsc::channel(100);

let messages = Subscriber::new(rx);

let delegate = HermesDelegate::with_messages(tx);

let m: Memberlist<
NetTransport<
u64,
NodeAddressResolver<TokioRuntime>,
Tcp<TokioRuntime>,
Lpe<_, _>,
TokioRuntime,
>,
> = Memberlist::with_delegate(delegate, transport_options, Options::default())
.await
.unwrap();

for member_addr in args {
let member_addr_hash = {
let mut hasher = DefaultHasher::new();
hasher.write(member_addr.as_bytes());
hasher.finish()
};

let _ = m
.join(Node::new(
member_addr_hash,
MaybeResolvedAddress::Unresolved(
NodeAddress::from_str(member_addr.as_str()).expect("invalid member address"),
),
))
.await;
}

signal::ctrl_c().await.expect("failed to listen for event");

println!("{:?}", m.members().await);
}
14 changes: 7 additions & 7 deletions src/ccfg/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ use std::{
time::{Duration, SystemTime, UNIX_EPOCH},
};

use serde::Serialize;
use serde::{Deserialize, Serialize};

// Key is a trait that we use to define the key type for the Ledger.
pub trait Key: Eq + Clone + Hash {}
impl<T: Eq + Clone + Hash> Key for T {}
pub trait Key: Eq + Clone + Hash + Send + Sync + 'static {}
impl<T: Eq + Clone + Hash + Send + Sync + 'static> Key for T {}

pub trait Data: Clone {}
impl<T: Clone> Data for T {}
pub trait Data: Clone + Send + Sync + 'static {}
impl<T: Clone + Send + Sync + 'static> Data for T {}

pub struct Ledger<T: Key, D: Data> {
entries: Vec<Entry<T, D>>,
Expand Down Expand Up @@ -137,7 +137,7 @@ impl<T: Key, D: Data> Ledger<T, D> {
}
}

#[derive(Clone, Serialize)]
#[derive(Clone, Serialize, Deserialize)]
pub enum Entry<T: Key, D: Data> {
Put(EntryMeta<T>, D),
Delete(EntryMeta<T>),
Expand All @@ -153,7 +153,7 @@ impl<T: Key, D: Data> PartialEq for Entry<T, D> {
}
}

#[derive(PartialEq, Eq, Hash, Clone, Serialize)]
#[derive(PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub struct EntryMeta<T: Key> {
key: T,
at: u128,
Expand Down
199 changes: 193 additions & 6 deletions src/ccfg/memberlist.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
use std::{
future::Future,
hash::{BuildHasher, BuildHasherDefault, DefaultHasher},
sync::Arc,
};

use memberlist::{net::Transport, Memberlist};
use serde::Serialize;
use memberlist::{
bytes::Bytes,
delegate::{
AliveDelegate, ConflictDelegate, Delegate, EventDelegate, MergeDelegate, NodeDelegate,
PingDelegate, VoidDelegateError,
},
net::{AddressResolver, Id, Transport},
types::{Meta, NodeState, SmallVec, TinyVec},
CheapClone, Memberlist,
};
use serde::{de::DeserializeOwned, Serialize};
use tokio::sync::mpsc::{Receiver, Sender};

use super::{
ledger::{Data, Entry, Key},
Broadcast,
Broadcast, Subscribe,
};

/// The broadcaster implements transporting data using memberlists send_reliable mode. It
Expand All @@ -23,12 +34,19 @@ use super::{
///
/// So I guess use with caution.
pub struct Broadcaster<T: Transport> {
memberlist: Memberlist<T>,
memberlist:
Memberlist<T, HermanDelegate<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>>,
num_friends: usize,
}

impl<T: Transport> Broadcaster<T> {
pub fn new(memberlist: Memberlist<T>, num_friends: usize) -> Self {
pub fn new(
memberlist: Memberlist<
T,
HermanDelegate<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
>,
num_friends: usize,
) -> Self {
Self {
memberlist,
num_friends,
Expand Down Expand Up @@ -96,11 +114,180 @@ impl<T: Transport, K: Key + Serialize, D: Data + Serialize> Broadcast<K, D> for
id = %self.memberlist.local_id(),
peer_addr = %member,
err = %e,
"memberlist.broadcast: failed to send packet"
"memberlist.broadcast: failed to sen packet"
);
}
};
}
}
}
}

pub trait WithCCFG<T: Transport> {
fn with_ccfg<T: Transport>() -> Broadcaster<T>;
}

pub struct Subscriber {
messages: Receiver<Bytes>,
}

impl Subscriber {
pub fn new(messages: Receiver<Bytes>) -> Self {
Self { messages }
}
}

pub struct HermanDelegate<K: Id, A: CheapClone + Send + Sync + 'static> {
phantom: std::marker::PhantomData<(K, A)>,
messages: Option<Sender<Bytes>>,
}

impl<T: Key + DeserializeOwned, D: Data + DeserializeOwned> Subscribe<T, D> for Subscriber {
async fn watch(&mut self) -> Entry<T, D> {
loop {
match self.messages.recv().await {
Some(msg) => {
let msg = serde_cbor::from_slice::<Entry<T, D>>(&msg);
match msg {
Ok(entry) => {
return entry;
}
Err(e) => {
tracing::error!(err = %e, "failed to deserialize message");
}
}
}
None => {
tracing::error!("failed to receive message");
}
}
}
}
}

impl<K: Id, A: CheapClone + Send + Sync + 'static> HermanDelegate<K, A> {
pub fn new() -> Self {
Self {
phantom: std::marker::PhantomData,
messages: None,
}
}

pub fn with_messages(messages: Sender<Bytes>) -> Self {
Self {
phantom: std::marker::PhantomData,
messages: Some(messages),
}
}

pub fn set_messages(&mut self, messages: Sender<Bytes>) {
self.messages = Some(messages);
}
}

impl<I: Id, A: CheapClone + Send + Sync + 'static> AliveDelegate for HermanDelegate<I, A> {
type Error = VoidDelegateError;
type Id = I;
type Address = A;

async fn notify_alive(
&self,
_peer: Arc<NodeState<Self::Id, Self::Address>>,
) -> Result<(), Self::Error> {
Ok(())
}
}

impl<I: Id, A: CheapClone + Send + Sync + 'static> MergeDelegate for HermanDelegate<I, A> {
type Error = VoidDelegateError;
type Id = I;
type Address = A;

async fn notify_merge(
&self,
_peers: SmallVec<Arc<NodeState<Self::Id, Self::Address>>>,
) -> Result<(), Self::Error> {
Ok(())
}
}

impl<I: Id, A: CheapClone + Send + Sync + 'static> ConflictDelegate for HermanDelegate<I, A> {
type Id = I;
type Address = A;

async fn notify_conflict(
&self,
_existing: Arc<NodeState<Self::Id, Self::Address>>,
_other: Arc<NodeState<Self::Id, Self::Address>>,
) {
}
}

impl<K: Id, A: CheapClone + Send + Sync + 'static> PingDelegate for HermanDelegate<K, A> {
type Id = K;
type Address = A;

async fn ack_payload(&self) -> Bytes {
Bytes::new()
}

async fn notify_ping_complete(
&self,
_node: Arc<NodeState<Self::Id, Self::Address>>,
_rtt: std::time::Duration,
_payload: Bytes,
) {
}

fn disable_promised_pings(&self, _target: &Self::Id) -> bool {
false
}
}

impl<K: Id, A: CheapClone + Send + Sync + 'static> NodeDelegate for HermanDelegate<K, A> {
async fn node_meta(&self, _limit: usize) -> Meta {
Meta::empty()
}

async fn notify_message(&self, msg: Bytes) {
// we can't really do anything if it doesn't work, so
// we don't care about the answer, fire and forget.
if let Some(messages) = &self.messages.as_ref() {
let _ = messages.send(msg).await;
}
}

async fn broadcast_messages<F>(
&self,
_overhead: usize,
_limit: usize,
_encoded_len: F,
) -> TinyVec<Bytes>
where
F: Fn(Bytes) -> (usize, Bytes) + Send,
{
TinyVec::new()
}

async fn local_state(&self, _join: bool) -> Bytes {
Bytes::new()
}

async fn merge_remote_state(&self, _buf: Bytes, _join: bool) {}
}

impl<I: Id, A: CheapClone + Send + Sync + 'static> EventDelegate for HermanDelegate<I, A> {
type Id = I;
type Address = A;

async fn notify_join(&self, _node: Arc<NodeState<Self::Id, Self::Address>>) {}

async fn notify_leave(&self, _node: Arc<NodeState<Self::Id, Self::Address>>) {}

async fn notify_update(&self, _node: Arc<NodeState<Self::Id, Self::Address>>) {}
}

impl<K: Id, A: CheapClone + Send + Sync + 'static> Delegate for HermanDelegate<K, A> {
type Id = K;
type Address = A;
}
Loading

0 comments on commit 632bc09

Please sign in to comment.