Skip to content

Commit

Permalink
Added some more stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
d1ngd0 committed May 22, 2024
1 parent c3c0b61 commit 6b11bd5
Show file tree
Hide file tree
Showing 5 changed files with 386 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ edition = "2021"
[dependencies]
arrayvec = "0.7.4"
dapt = "0.1.2"
serde = "1.0.202"
serde_json = "1.0.117"
tokio = { version = "1.37.0", features = ["full"] }
215 changes: 215 additions & 0 deletions src/ccfg/ledger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
use std::{
collections::HashMap,
hash::Hash,
time::{Duration, SystemTime, UNIX_EPOCH},
};

// Key is a trait that we use to define the key type for the Ledger.
pub trait Key: Eq + Clone + Hash {}
impl<T: Eq + Clone + Hash> Key for T {}

pub trait Data: Clone {}
impl<T: Clone> Data for T {}

pub struct Ledger<T: Key, D: Data> {
entries: Vec<Entry<T, D>>,
}

impl<T: Key, D: Data> Ledger<T, D> {
// new creates a new cluster config
pub fn new() -> Ledger<T, D> {
Ledger {
entries: Vec::new(),
}
}

// put will add a new entry to the config and
// returns a pointer to the entry.
pub fn put(&mut self, key: T, value: D) -> &Entry<T, D> {
self.entries.push(Entry::Put(
EntryMeta {
key: key.clone(),
at: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis(),
},
value,
));
self.entries.last().unwrap()
}

pub fn delete(&mut self, key: T) -> &Entry<T, D> {
self.entries.push(Entry::Delete(EntryMeta {
key: key.clone(),
at: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis(),
}));
self.entries.last().unwrap()
}

// add_entry adds the entry to the config. If the entry is already present
// it will not be added, and we return false. Otherwise, we add it and
// return true.
pub fn add_entry(&mut self, entry: Entry<T, D>) -> bool {
for e in self.entries.iter() {
if e == &entry {
return false;
}
}

self.entries.push(entry);
true
}

// get returns the matching key. It parses the whole config, making sure we
// grab the latest value or respect the most recent delete.
pub fn get<ET: PartialEq<T>>(&self, key: ET) -> Option<&D> {
let mut val = None;
for entry in self.entries.iter() {
match entry {
Entry::Put(meta, value) => {
if key == meta.key {
val = Some(value);
}
}
Entry::Delete(meta) => {
if key == meta.key {
val = None;
}
}
}
}

val
}

pub fn sort(&mut self) {
self.entries.sort_by(|a, b| match (a, b) {
(Entry::Put(a, _), Entry::Put(b, _)) => a.at.cmp(&b.at),
(Entry::Delete(a), Entry::Delete(b)) => a.at.cmp(&b.at),
(Entry::Put(a, _), Entry::Delete(b)) => a.at.cmp(&b.at),
(Entry::Delete(a), Entry::Put(b, _)) => a.at.cmp(&b.at),
});
}

pub fn compact(&mut self, upto: Duration) {
// fetch the timestamp
let upto = SystemTime::now()
.checked_sub(upto)
.expect("Invalid duration")
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();

// grab everything that needs to be removed
let remove: Vec<T> = self
.entries
.iter()
.filter_map(|entry| match entry {
Entry::Delete(meta) => {
if meta.at < upto {
Some(meta.key.clone())
} else {
None
}
}
_ => None,
})
.collect();

self.entries.retain(|entry| {
let meta = match entry {
Entry::Put(meta, _) => meta,
Entry::Delete(meta) => meta,
};

if meta.at < upto {
!remove.contains(&meta.key)
} else {
true
}
});
}
}

#[derive(Clone)]
pub enum Entry<T: Key, D: Data> {
Put(EntryMeta<T>, D),
Delete(EntryMeta<T>),
}

impl<T: Key, D: Data> PartialEq for Entry<T, D> {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Entry::Put(a, _), Entry::Put(b, _)) => a == b,
(Entry::Delete(a), Entry::Delete(b)) => a == b,
_ => false,
}
}
}

#[derive(PartialEq, Eq, Hash, Clone)]
pub struct EntryMeta<T: Key> {
key: T,
at: u128,
}

impl<T: Key, D: Data> From<Ledger<T, D>> for HashMap<T, D> {
fn from(ccfg: Ledger<T, D>) -> HashMap<T, D> {
let mut map = HashMap::new();

for entry in ccfg.entries {
match entry {
Entry::Put(meta, value) => {
map.insert(meta.key, value);
}
Entry::Delete(meta) => {
map.remove(&meta.key);
}
}
}

map
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_cluster_config() {
let mut ccfg = Ledger::new();
ccfg.put("key1", "value1");
ccfg.put("key2", "value2");
ccfg.put("key3", "value3");

assert_eq!(ccfg.get("key1"), Some(&"value1"));
assert_eq!(ccfg.get("key2"), Some(&"value2"));
assert_eq!(ccfg.get("key3"), Some(&"value3"));

ccfg.delete("key2");
assert_eq!(ccfg.get("key2"), None);

ccfg.compact(Duration::from_secs(1));
assert_eq!(ccfg.get("key1"), Some(&"value1"));
assert_eq!(ccfg.get("key3"), Some(&"value3"));

let settings: HashMap<&str, &str> = ccfg.into();
assert_eq!(settings.len(), 2);
assert_eq!(settings.get("key1"), Some(&"value1"));
assert_eq!(settings.get("key3"), Some(&"value3"));
}

#[test]
fn test_add_entry() {
let mut ccfg = Ledger::new();
let entry = Entry::Put(EntryMeta { key: "key1", at: 0 }, "value1");

assert!(ccfg.add_entry(entry.clone()));
assert!(!ccfg.add_entry(entry.clone()));
}
}
167 changes: 167 additions & 0 deletions src/ccfg/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
mod ledger;

use std::{future::Future, sync::Arc};

use ledger::Ledger;
use tokio::sync::Mutex;

use self::ledger::{Data, Entry, Key};

/// Broadcase is used to send data to multiple nodes on the network. It is up to
/// the implementor how this should be done. send_entry is used to send a single entry
/// whenever we encounter new data, or at regular intervals all entries within the ledger
/// are sent to ensure all nodes are in sync with one another.
pub trait Broadcast<T: Key, D: Data> {
/// send_entry is used to send an entry to one or more nodes on the network.
fn send_entry(&mut self, entry: &Entry<T, D>) -> impl Future<Output = ()>;
// TODO: We will likely need an error from this
}

/// Subscriber is used to watch for incoming changes. It is the partner of the Broadcast
/// trait. The subscriber implements a future trait that is ued to grab an entry
/// from the broadcast of another node.
pub trait Subscriber<T: Key, D: Data> {
fn watch(&mut self) -> impl Future<Output = Entry<T, D>>;
}

/// Config holds the config data, and handles the broadcast and subscribers for the
/// ledger.
pub struct Config<T: Key, D: Data, B: Broadcast<T, D>> {
ledger: Arc<Mutex<Ledger<T, D>>>,
broadcast: Arc<Mutex<B>>,
drop: Arc<Mutex<bool>>,
}

impl<T: Key, D: Data, B: Broadcast<T, D>> Config<T, D, B> {
/// new creates a new Config with the given ledger, broadcast, and subscriber.
pub fn new(broadcast: B) -> Self {
Self {
ledger: Arc::new(Mutex::new(Ledger::new())),
broadcast: Arc::new(Mutex::new(broadcast)),
drop: Arc::new(Mutex::new(false)),
}
}

pub fn broadcast_listener<S: Subscriber<T, D>>(
&self,
subscriber: S,
) -> BroadcastListener<T, D, B, S> {
BroadcastListener {
ledger: self.ledger.clone(),
broadcast: self.broadcast.clone(),
drop: self.drop.clone(),
subscriber,
}
}

pub async fn put(&mut self, key: T, value: D) {
let mut ledger = self.ledger.lock().await;
let entry = ledger.put(key, value);
self.broadcast.lock().await.send_entry(entry).await;
ledger.sort();
}

pub async fn delete(&mut self, key: T) {
let mut ledger = self.ledger.lock().await;
let entry = ledger.delete(key);
self.broadcast.lock().await.send_entry(entry).await;
ledger.sort();
}

pub async fn get<ET: PartialEq<T>>(&mut self, key: ET) -> Option<D> {
let ledger = self.ledger.lock().await;
ledger.get(key).cloned()
}
}

impl<T: Key, D: Data, B: Broadcast<T, D>> Drop for Config<T, D, B> {
fn drop(&mut self) {
let drop = self.drop.clone();
tokio::spawn(async move {
let mut drop = drop.lock().await;
*drop = true;
});
}
}

pub struct BroadcastListener<T: Key, D: Data, B: Broadcast<T, D>, S: Subscriber<T, D>> {
ledger: Arc<Mutex<Ledger<T, D>>>,
broadcast: Arc<Mutex<B>>,
drop: Arc<Mutex<bool>>,
subscriber: S,
}

// TODO: We need to add a way to stop the listener
impl<T: Key, D: Data, B: Broadcast<T, D>, S: Subscriber<T, D>> BroadcastListener<T, D, B, S> {
pub async fn run(&mut self) {
loop {
let entry = self.subscriber.watch().await;
let mut ledger = self.ledger.lock().await;
if ledger.add_entry(entry.clone()) {
self.broadcast.lock().await.send_entry(&entry).await;
}
ledger.sort();
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use tokio::{sync::broadcast, time::sleep};

struct TestBroadcast<T: Key, D: Data> {
tx: broadcast::Sender<Entry<T, D>>,
}

impl<T: Key, D: Data> Broadcast<T, D> for TestBroadcast<T, D> {
fn send_entry(&mut self, entry: &Entry<T, D>) -> impl Future<Output = ()> {
let _ = self.tx.send(entry.clone());
async {}
}
}

struct TestSubscriber<T: Key, D: Data> {
rx: broadcast::Receiver<Entry<T, D>>,
}

impl<T: Key, D: Data> Subscriber<T, D> for TestSubscriber<T, D> {
fn watch(&mut self) -> impl Future<Output = Entry<T, D>> {
async { self.rx.recv().await.unwrap() }
}
}

#[tokio::test]
async fn test_config() {
// set up some local broadcasting stuff
let (tx, rx) = broadcast::channel(10);
let broadcast = TestBroadcast { tx: tx.clone() };
let subscriber = TestSubscriber { rx };
let mut config = Config::new(broadcast);

let mut dup_config = Config::new(TestBroadcast { tx: tx.clone() });
let mut runner = dup_config.broadcast_listener(subscriber);
tokio::spawn(async move {
runner.run().await;
});

// create some entries in config
config
.put("something".to_string(), "value".to_string())
.await;
config
.put("another_thing".to_string(), "value2".to_string())
.await;

// wait for convergence
sleep(std::time::Duration::from_secs(1)).await;

// Check that they are now in the duplicate config
assert_eq!(dup_config.get("something").await, Some("value".to_string()));
assert_eq!(
dup_config.get("another_thing").await,
Some("value2".to_string())
);
assert_eq!(dup_config.get("nothing").await, None);
}
}
Loading

0 comments on commit 6b11bd5

Please sign in to comment.