Skip to content

Commit

Permalink
Added some event listener stuff to the ledger
Browse files Browse the repository at this point in the history
  • Loading branch information
d1ngd0 committed May 29, 2024
1 parent c8880be commit 7fbd46f
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 22 deletions.
3 changes: 2 additions & 1 deletion src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;

use dapt::Dapt;
use herman::ccfg::{HermanDelegate, WithHermanDelegate};
use herman::Config;
use memberlist::agnostic::tokio::TokioRuntime;
Expand Down Expand Up @@ -53,7 +54,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
HermanDelegate<u64, SocketAddr>,
>,
>,
Config<u64, String, _>,
Config<u64, Dapt, _>,
) = Memberlist::with_config(transport_options, Options::default())
.await
.unwrap();
Expand Down
99 changes: 84 additions & 15 deletions src/ccfg/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,50 @@ impl<T: Eq + Clone + Hash + Send + Sync + 'static> Key for T {}
pub trait Data: Clone + Send + Sync + 'static {}
impl<T: Clone + Send + Sync + 'static> Data for T {}

pub struct Ledger<T: Key, D: Data> {
pub trait EventListener<T: Key, D: Data> {
fn on_event(&mut self, entry: &Entry<T, D>);
}

pub struct NoopEventListener {}

impl<T: Key, D: Data> EventListener<T, D> for NoopEventListener {
fn on_event(&mut self, _entry: &Entry<T, D>) {}
}

pub struct Ledger<T: Key, D: Data, Event: EventListener<T, D> = NoopEventListener> {
entries: Vec<Entry<T, D>>,
listener: Event,
}

#[derive(Debug)]
pub enum LedgerError {
EntryExists,
}

pub type LedgerResult<T> = Result<T, LedgerError>;

impl<T: Key, D: Data> Ledger<T, D> {
// new creates a new cluster config
pub fn new() -> Ledger<T, D> {
Ledger {
entries: Vec::new(),
listener: NoopEventListener {},
}
}
}

impl<T: Key, D: Data, Event: EventListener<T, D>> Ledger<T, D, Event> {
pub fn new_with_listener(listener: Event) -> Ledger<T, D, Event> {
Ledger {
entries: Vec::new(),
listener,
}
}

// put will add a new entry to the config and
// returns a pointer to the entry.
pub fn put(&mut self, key: T, value: D) -> &Entry<T, D> {
self.entries.push(Entry::Put(
pub fn put(&mut self, key: T, value: D) -> LedgerResult<&Entry<T, D>> {
if !self.add_entry(Entry::Put(
EntryMeta {
key: key.clone(),
at: SystemTime::now()
Expand All @@ -37,35 +65,67 @@ impl<T: Key, D: Data> Ledger<T, D> {
.as_millis(),
},
value,
));
self.entries.last().unwrap()
)) {
return Err(LedgerError::EntryExists);
}

Ok(self.get_entry(key).unwrap())
}

pub fn delete(&mut self, key: T) -> &Entry<T, D> {
self.entries.push(Entry::Delete(EntryMeta {
pub fn delete(&mut self, key: T) -> LedgerResult<&Entry<T, D>> {
if !self.add_entry(Entry::Delete(EntryMeta {
key: key.clone(),
at: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis(),
}));
self.entries.last().unwrap()
.as_nanos(),
})) {
return Err(LedgerError::EntryExists);
}

Ok(self.get_entry(key).unwrap())
}

// add_entry adds the entry to the config. If the entry is already present
// it will not be added, and we return false. Otherwise, we add it and
// return true.
pub fn add_entry(&mut self, entry: Entry<T, D>) -> bool {
let entry_meta = match &entry {
Entry::Put(meta, _) => meta,
Entry::Delete(meta) => meta,
};

for e in self.entries.iter() {
if e == &entry {
let meta = match e {
Entry::Put(meta, _) => meta,
Entry::Delete(meta) => meta,
};

// check for an entry under the same key that is newer or the same
// timestamp.
if meta.key == entry_meta.key && meta.at >= entry_meta.at {
return false;
}
}

self.entries.push(entry);
self.listener.on_event(self.entries.last().unwrap());
self.sort();
true
}

// this function expects the entries to be sorted
pub fn get_entry(&self, key: T) -> Option<&Entry<T, D>> {
let mut val = None;
for entry in self.entries.iter() {
if entry == &key {
val = Some(entry);
}
}

val
}

// get returns the matching key. It parses the whole config, making sure we
// grab the latest value or respect the most recent delete.
pub fn get<ET: PartialEq<T>>(&self, key: ET) -> Option<&D> {
Expand Down Expand Up @@ -153,6 +213,15 @@ impl<T: Key, D: Data> PartialEq for Entry<T, D> {
}
}

impl<T: Key, D: Data> PartialEq<T> for Entry<T, D> {
fn eq(&self, other: &T) -> bool {
match self {
Entry::Put(a, _) => &a.key == other,
Entry::Delete(a) => &a.key == other,
}
}
}

#[derive(PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub struct EntryMeta<T: Key> {
key: T,
Expand Down Expand Up @@ -185,15 +254,15 @@ mod test {
#[test]
fn test_cluster_config() {
let mut ccfg = Ledger::new();
ccfg.put("key1", "value1");
ccfg.put("key2", "value2");
ccfg.put("key3", "value3");
let _ = ccfg.put("key1", "value1");
let _ = ccfg.put("key2", "value2");
let _ = ccfg.put("key3", "value3");

assert_eq!(ccfg.get("key1"), Some(&"value1"));
assert_eq!(ccfg.get("key2"), Some(&"value2"));
assert_eq!(ccfg.get("key3"), Some(&"value3"));

ccfg.delete("key2");
let _ = ccfg.delete("key2");
assert_eq!(ccfg.get("key2"), None);

ccfg.compact(Duration::from_secs(1));
Expand Down
29 changes: 23 additions & 6 deletions src/ccfg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{future::Future, sync::Arc};
use ledger::Ledger;
use tokio::sync::Mutex;

use self::ledger::LedgerError;
use self::ledger::{Data, Entry, Key};

pub use memberlist::HermanDelegate;
Expand Down Expand Up @@ -36,6 +37,20 @@ pub struct Config<T: Key, D: Data, B: Broadcast<T, D>> {
drop: Arc<Mutex<bool>>,
}

pub enum Error {
EntryExists,
}

type ConfigResult<T> = Result<T, Error>;

impl From<LedgerError> for Error {
fn from(e: LedgerError) -> Self {
match e {
LedgerError::EntryExists => Error::EntryExists,
}
}
}

impl<T: Key, D: Data, B: Broadcast<T, D>> Config<T, D, B> {
/// new creates a new Config with the given ledger, broadcast, and subscriber.
pub fn new(broadcast: B) -> Self {
Expand All @@ -58,18 +73,20 @@ impl<T: Key, D: Data, B: Broadcast<T, D>> Config<T, D, B> {
}
}

pub async fn put(&mut self, key: T, value: D) {
pub async fn put(&mut self, key: T, value: D) -> ConfigResult<()> {
let mut ledger = self.ledger.lock().await;
let entry = ledger.put(key, value);
let entry = ledger.put(key, value)?;
self.broadcast.lock().await.send_entry(entry).await;
ledger.sort();
Ok(())
}

pub async fn delete(&mut self, key: T) {
pub async fn delete(&mut self, key: T) -> ConfigResult<()> {
let mut ledger = self.ledger.lock().await;
let entry = ledger.delete(key);
let entry = ledger.delete(key)?;
self.broadcast.lock().await.send_entry(entry).await;
ledger.sort();
Ok(())
}

pub async fn get<ET: PartialEq<T>>(&mut self, key: ET) -> Option<D> {
Expand Down Expand Up @@ -149,10 +166,10 @@ mod tests {
});

// create some entries in config
config
let _ = config
.put("something".to_string(), "value".to_string())
.await;
config
let _ = config
.put("another_thing".to_string(), "value2".to_string())
.await;

Expand Down
1 change: 1 addition & 0 deletions src/ccfg/state_engine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub struct StateEngine {}

0 comments on commit 7fbd46f

Please sign in to comment.