From 28fdc2bee5130847173922198c2255b3fb5b5a20 Mon Sep 17 00:00:00 2001 From: Peter Heiss Date: Sun, 29 Oct 2023 11:43:12 +0100 Subject: [PATCH] Create atomic like types (#13) Implements ClockOrdered struct --- CHANGELOG.md | 5 +- Cargo.toml | 2 +- src/redis/clock.rs | 235 ++++++++++++++++++++++++++++++++ src/redis/generic.rs | 12 +- src/redis/list.rs | 1 + src/redis/mod.rs | 29 ++-- src/redis/{lock.rs => mutex.rs} | 30 ++-- 7 files changed, 276 insertions(+), 38 deletions(-) create mode 100644 src/redis/clock.rs rename src/redis/{lock.rs => mutex.rs} (95%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5e4927b..c3984ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,9 +4,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). -## [Unreleased] - DESCRIPTION - YYYY-MM-DD +## 0.2.2 - 2023-10-29 -- +- add ClockOrdered Type, which implements a counter based clock ordering +- minor additions to documentation ## 0.2 - Lock and ADT - 2023-09-22 diff --git a/Cargo.toml b/Cargo.toml index 2a5f2c6..20b70b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "micro_types" -version = "0.2.1" +version = "0.2.2" edition = "2021" readme = "README.md" license = "MIT" diff --git a/src/redis/clock.rs b/src/redis/clock.rs new file mode 100644 index 0000000..30d2a56 --- /dev/null +++ b/src/redis/clock.rs @@ -0,0 +1,235 @@ +use crate::redis::Generic; +use serde_json::from_str; +use std::ops::{Deref, DerefMut}; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum ClockOrderedError { + #[error("Ordering number is not greater than current number stored in redis.")] + OrderError, +} + +/// This is the set_load script. +/// It is used to set the value if order is greater than the current order. +/// Returns the current value and the current_ordering number. +/// +/// It takes 3 arguments: +/// 1. The key of value to set +/// 2. The order_number of the setting operation +/// 3. The value itself to set +const SET_LOAD_SCRIPT: &str = r#" +local key = ARGV[1] +local order = ARGV[2] +local current_order = redis.call("GET", key .. ":order") +if current_order == false or current_order < order then + redis.call("SET", key .. ":order", order) + redis.call("SET", key, ARGV[3]) + current_order = order +end +return {redis.call("GET", key), current_order} +"#; + +/// This is the load script. +/// It is used to load the value and the order number of the value. +/// Returns the current value and the current ordering number. +/// +/// It takes 1 argument: +/// 1. The key of value to load +const LOAD_SCRIPT: &str = r#" +local key = ARGV[1] +return {redis.call("GET", key), redis.call("GET", key .. ":order")} +"#; + +/// The ClockOrdered type. +/// +/// It is used to store a value in redis and load it in sync. +/// It tracks automatically an ordering number to ensure that the value is only stored if the order is greater than the current order, mostly from other instances. +/// The value is only stored if the order is greater than the current order. +/// +/// This helps to synchronize the value between multiple instances without any locking mechanism. +/// But can results in more network traffic in benefit of less wait time because of locks. +/// Mostly used in situations, where your value changes rarely but read often. +/// Another use case is, when it is okay for you, that the value could be not the latest or +/// computing a derived value multiple times is acceptable. +#[derive(Debug)] +pub struct ClockOrdered { + data: Generic, + counter: usize, +} + +impl ClockOrdered +where + T: serde::Serialize + serde::de::DeserializeOwned, +{ + /// Creates a new ClockOrdered. + /// The value is loaded from redis directly. + pub fn new(data: Generic) -> Self { + let mut s = Self { data, counter: 0 }; + s.load(); + s + } + + /// Stores the value in the redis server. + /// The value is only stored if the ordering_number is greater than the current number. + /// The order is incremented by one before each store. + /// + /// # Example + /// ``` + /// use dtypes::redis::Generic; + /// use dtypes::redis::ClockOrdered; + /// + /// let client = redis::Client::open("redis://localhost:6379").unwrap(); + /// let mut i32 = Generic::with_value(1, "test_add_clock_ordered_example1", client.clone()); + /// let mut clock_ordered = ClockOrdered::new(i32); + /// clock_ordered.store(2).unwrap(); + /// assert_eq!(*clock_ordered, 2); + /// ``` + /// + /// The store can fail if the order is not greater than the current order. + /// This happens, if the value was set from another instance before. + /// + /// # Example + /// ``` + /// use std::thread; + /// use dtypes::redis::Generic; + /// use dtypes::redis::ClockOrdered; + /// + /// let client = redis::Client::open("redis://localhost:6379").unwrap(); + /// let client2 = client.clone(); + /// + /// thread::scope(|s| { + /// let t1 = s.spawn(|| { + /// let mut i32: Generic = Generic::new("test_add_clock_ordered_example2", client2); + /// let mut clock_ordered = ClockOrdered::new(i32); + /// while let Err(_) = clock_ordered.store(2) {} + /// assert_eq!(*clock_ordered, 2); + /// }); + /// let mut i32: Generic = Generic::new("test_add_clock_ordered_example2", client); + /// let mut clock_ordered = ClockOrdered::new(i32); + /// while let Err(_) = clock_ordered.store(3) {} + /// assert_eq!(*clock_ordered, 3); + /// t1.join().unwrap(); + /// }); + /// ``` + pub fn store(&mut self, val: T) -> Result<(), ClockOrderedError> { + self.counter += 1; + let val_json = serde_json::to_string(&val).unwrap(); + let (v, order) = self.store_redis(&val_json); + + if let Some(v) = v { + if self.counter >= order && v == val_json { + self.data.cache = Some(val); + return Ok(()); + } + } + Err(ClockOrderedError::OrderError) + } + + /// Stores the value in the redis server and blocks until succeeds. + /// Everything else is equal to [ClockOrdered::store]. + /// + /// # Example + /// ``` + /// use std::thread; + /// use dtypes::redis::Generic; + /// use dtypes::redis::ClockOrdered; + /// + /// let client = redis::Client::open("redis://localhost:6379").unwrap(); + /// let client2 = client.clone(); + /// + /// thread::scope(|s| { + /// let t1 = s.spawn(|| { + /// let mut i32: Generic = Generic::new("test_add_clock_ordered_example3", client2); + /// let mut clock_ordered = ClockOrdered::new(i32); + /// clock_ordered.store_blocking(2).unwrap(); + /// assert_eq!(*clock_ordered, 2); + /// }); + /// let mut i32: Generic = Generic::new("test_add_clock_ordered_example3", client); + /// let mut clock_ordered = ClockOrdered::new(i32); + /// clock_ordered.store_blocking(3).unwrap(); + /// assert_eq!(*clock_ordered, 3); + /// t1.join().unwrap(); + /// }); + /// ``` + pub fn store_blocking(&mut self, val: T) -> Result<(), ClockOrderedError> { + let val_json = serde_json::to_string(&val).unwrap(); + let mut res = self.store_redis(&val_json); + + while self.counter < res.1 || res.0.is_none() || res.0.unwrap() != val_json { + self.counter = res.1 + 1; + res = self.store_redis(&val_json); + } + + self.data.cache = Some(val); + Ok(()) + } + + fn store_redis(&self, val: &str) -> (Option, usize) { + let mut conn = self.data.client.get_connection().unwrap(); + redis::Script::new(SET_LOAD_SCRIPT) + .arg(&self.data.key) + .arg(self.counter) + .arg(val) + .invoke(&mut conn) + .expect("Could not execute script") + } + + /// Loads the value from the redis server. + /// This is done automatically on creation. + /// Mostly used for synchronization. Reset the counter to order from redis or 0. + pub fn load(&mut self) { + let mut conn = self.data.client.get_connection().unwrap(); + let res: (Option, Option) = redis::Script::new(LOAD_SCRIPT) + .arg(&self.data.key) + .invoke(&mut conn) + .expect("Could not execute script"); + + match res { + (Some(v), Some(order)) => { + self.data.cache = Some(from_str(&v).unwrap()); + self.counter = order; + } + (Some(v), None) => { + self.data.cache = Some(from_str(&v).unwrap()); + self.counter = 0; + } + (None, Some(c)) => { + self.data.cache = None; + self.counter = c; + } + _ => { + self.data.cache = None; + self.counter = 0; + } + } + } +} + +impl Deref for ClockOrdered { + type Target = Generic; + + fn deref(&self) -> &Self::Target { + &self.data + } +} + +impl DerefMut for ClockOrdered { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.data + } +} + +#[cfg(test)] +mod tests { + #[test] + fn test_set_load() { + use crate::redis::ClockOrdered; + use crate::redis::Generic; + + let client = redis::Client::open("redis://localhost:6379").unwrap(); + let i32: Generic = Generic::new("test_add_clock_ordered", client.clone()); + let mut clock_ordered = ClockOrdered::new(i32); + clock_ordered.store(2).unwrap(); + assert_eq!(*clock_ordered, 2); + } +} diff --git a/src/redis/generic.rs b/src/redis/generic.rs index 9aebb9e..775eb82 100644 --- a/src/redis/generic.rs +++ b/src/redis/generic.rs @@ -1,8 +1,4 @@ //! This module contains the generic type. -//! The generic type is used to implement the common methods for all types. -//! The generic type is not meant to be used directly. -//! -//! use crate::redis::apply_operator; use redis::{Commands, RedisResult}; use serde::{de::DeserializeOwned, Serialize}; @@ -10,6 +6,10 @@ use std::fmt::{Debug, Display}; use std::ops; /// The generic type is used to implement the common methods for all types. +/// +/// The generic type is not meant to be used directly. Instead use one of the aliases. +/// +/// Mostly you will interact with the methods [Generic::store], [Generic::acquire] and [Generic::into_inner]. pub struct Generic { pub(crate) cache: Option, pub(crate) key: String, @@ -78,7 +78,7 @@ where new_type } - /// The set method sets the value of the type. + /// The store method sets the value of the type. pub fn store(&mut self, value: T) { let value = self.set(value); self.cache = Some(value); @@ -106,7 +106,7 @@ where res.expect("Failed to set value"); } - /// The get method returns a reference to the value stored in the type. + /// The acquire method returns a reference to the value stored in the type. /// Loads it from the redis directly. /// /// # Example diff --git a/src/redis/list.rs b/src/redis/list.rs index c1862ce..57e9842 100644 --- a/src/redis/list.rs +++ b/src/redis/list.rs @@ -1,3 +1,4 @@ +use crate::redis::Generic; use serde::de::DeserializeOwned; use serde::Serialize; use std::collections::VecDeque; diff --git a/src/redis/mod.rs b/src/redis/mod.rs index 63f7b08..8fc46e1 100644 --- a/src/redis/mod.rs +++ b/src/redis/mod.rs @@ -2,24 +2,25 @@ //! //! This crate provides a set of types that can be stored in Redis. The types are: //! -//! * [bool](crate::Dbool) +//! * [bool](redis::Dbool) //! * Integer types: -//! * signed Integer: [i8](crate::redis::Di8), [i16](crate::redis::Di16), [i32](crate::redis::Di32), [i64](crate::redis::Di64), [isize](crate::redis::Disize) -//! * unsigned Integer: [u8](crate::redis::Du8), [u16](crate::redis::Du16), [u32](crate::redis::Du32), [u64](crate::redis::Du64), [usize](crate::redis::Dusize) -//! * [String](crate::redis::DString) +//! * signed Integer: [i8](redis::Di8), [i16](redis::Di16), [i32](redis::Di32), [i64](redis::Di64), [isize](redis::Disize) +//! * unsigned Integer: [u8](redis::Du8), [u16](redis::Du16), [u32](redis::Du32), [u64](redis::Du64), [usize](redis::Dusize) +//! * [String](redis::DString) +//! * [List](redis::List) +//! * Sync types: +//! * [Mutex](redis::Mutex) +//! * [ClockOrdered](redis::ClockOrdered) //! //! This crate implements the most common traits for the primitive types, so it is frictionless to use them in place. +//! The methods of the types can be seen in the documentation of [Generic](redis::Generic). //! With this crate it is possible to create multiple services that shares the values via Redis. //! This is helpful if you want to create a distributed system and run multiple instances of the same service. //! Or you want to communicate between different services. All this kind of stuff can be done with this crate. //! //! # Upcoming Features //! -//! In a later release it will be possible to lock values like a Mutex or RwLock. -//! Also it will be possible to create happens-before relationships between store and load operations like atomic types. -//! So it will be possible to use the types in a concurrent environment in the same way as in a distributed one. -//! -//! Also it will be possible to create other backends than Redis. +//! It will be possible to create happens-before relationships between store and load operations like atomic types. //! //! # Usage //! @@ -33,26 +34,26 @@ //! assert_eq!(i32, 3); //! ``` //! -//! # Custom Types +//! More examples can be found on the doc pages of the types. //! -//! It is possible to implement your own complex types by implementing the [BackedType](crate::BackedType) trait. -//! But it should not be needed as long as your type implements some or all of the various [Ops](https://doc.rust-lang.org/std/ops/index.html) traits. mod bool_type; +mod clock; mod generic; mod helper; mod integer; mod list; -mod lock; +mod mutex; mod string; pub(crate) use helper::apply_operator; pub use bool_type::TBool as Dbool; +pub use clock::ClockOrdered; pub use generic::Generic; pub use integer::{ Ti16 as Di16, Ti32 as Di32, Ti64 as Di64, Ti8 as Di8, Tisize as Disize, Tu16 as Du16, Tu32 as Du32, Tu64 as Du64, Tu8 as Du8, Tusize as Dusize, }; pub use list::{List, ListCache, ListIter}; -pub use lock::{Guard, LockError, Mutex}; +pub use mutex::{Guard, LockError, Mutex}; pub use string::TString as DString; diff --git a/src/redis/lock.rs b/src/redis/mutex.rs similarity index 95% rename from src/redis/lock.rs rename to src/redis/mutex.rs index cf1a26a..d561d1d 100644 --- a/src/redis/lock.rs +++ b/src/redis/mutex.rs @@ -1,7 +1,6 @@ use crate::redis::Generic; use serde::de::DeserializeOwned; use serde::Serialize; -use std::fmt::Display; use std::ops::{Deref, DerefMut}; use thiserror::Error; @@ -42,12 +41,12 @@ impl From for LockNum { /// 2. The timeout in seconds, /// 3. The value to store. const LOCK_SCRIPT: &str = r#" - local val = redis.call("get", ARGV[1] .. ":lock") - if val == false or val == ARGV[3] then - redis.call("setex", ARGV[1] .. ":lock", ARGV[2], ARGV[3]) - return 1 - end - return 0"#; +local val = redis.call("get", ARGV[1] .. ":lock") +if val == false or val == ARGV[3] then + redis.call("setex", ARGV[1] .. ":lock", ARGV[2], ARGV[3]) + return 1 +end +return 0"#; /// The drop script. /// It is used to drop a value in Redis, so that only the instance that locked it can drop it. @@ -56,12 +55,12 @@ const LOCK_SCRIPT: &str = r#" /// 1. The key of the value to drop, /// 2. The value to check. const DROP_SCRIPT: &str = r#" - local current_lock = redis.call("get", ARGV[1] .. ":lock") - if current_lock == ARGV[2] then - redis.call("del", ARGV[1] .. ":lock") - return 1 - end - return 0"#; +local current_lock = redis.call("get", ARGV[1] .. ":lock") +if current_lock == ARGV[2] then + redis.call("del", ARGV[1] .. ":lock") + return 1 +end +return 0"#; /// The uuid script. /// It is used to generate a uuid for the lock. @@ -104,6 +103,7 @@ end return nil"#; /// The RedisMutex struct. +/// /// It is used to lock a value in Redis, so that only one instance can access it at a time. /// You have to use RedisGeneric as the data type. /// It is a wrapper around the data type you want to store like the Mutex in std. @@ -335,7 +335,7 @@ where impl Deref for Guard<'_, T> where - T: DeserializeOwned + Serialize + Display, + T: DeserializeOwned + Serialize, { type Target = Generic; @@ -347,7 +347,7 @@ where impl DerefMut for Guard<'_, T> where - T: DeserializeOwned + Serialize + Display, + T: DeserializeOwned + Serialize, { fn deref_mut(&mut self) -> &mut Self::Target { // Safety: The very existence of this Guard guarantees that we have exclusive access to the data.