From 77365ad0e5f7d633e93810ff92bcd98925c9e572 Mon Sep 17 00:00:00 2001 From: Max Fierro Date: Fri, 1 Nov 2024 05:04:03 -0700 Subject: [PATCH] Lots of volatile database --- doc | 2 +- src/database/mod.rs | 16 +- src/database/util.rs | 38 ++- src/database/volatile/error.rs | 3 + src/database/volatile/mod.rs | 148 +++++------- src/database/volatile/resource/manager.rs | 232 +++++++++++++++++++ src/database/volatile/resource/mod.rs | 39 ++++ src/database/volatile/transaction/manager.rs | 52 +++++ src/database/volatile/transaction/mod.rs | 178 ++++++++++++++ 9 files changed, 613 insertions(+), 95 deletions(-) create mode 100644 src/database/volatile/error.rs create mode 100644 src/database/volatile/resource/manager.rs create mode 100644 src/database/volatile/resource/mod.rs create mode 100644 src/database/volatile/transaction/manager.rs create mode 100644 src/database/volatile/transaction/mod.rs diff --git a/doc b/doc index 2f08bac..0843edd 160000 --- a/doc +++ b/doc @@ -1 +1 @@ -Subproject commit 2f08bacec3d6a62018c5b730efda7021377a1832 +Subproject commit 0843eddfdbad12e1be55a4adea762bf2908a41bd diff --git a/src/database/mod.rs b/src/database/mod.rs index a4d291b..3a08a4f 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -9,7 +9,10 @@ use anyhow::Result; use bitvec::prelude::{BitSlice, Msb0}; -use std::path::Path; +use std::{ + path::Path, + sync::{RwLockReadGuard, RwLockWriteGuard}, +}; use crate::{model::State, solver::RecordType}; @@ -105,10 +108,15 @@ pub trait Persistent { /// a database should be optimized for inter-table operations. In fact, this /// interface's semantics are such that its implementations optimize performance /// for cases of sequential operations on a single table. -pub trait Tabular { +pub trait Tabular { fn create_table(&self, id: &str, schema: Schema) -> Result<()>; - fn select_table(&self, id: &str) -> Result<()>; - fn delete_table(&self, id: &str) -> Result<()>; + fn delete_table(&mut self, id: &str) -> Result<()>; + + fn get_table(&mut self, id: &str) -> Result>>; + fn get_table_mut( + &mut self, + id: &str, + ) -> Result>>; } /// Allows a database implementation to read raw data from a record buffer. diff --git a/src/database/util.rs b/src/database/util.rs index a328b33..8085653 100644 --- a/src/database/util.rs +++ b/src/database/util.rs @@ -8,8 +8,11 @@ //! #### Authorship //! - Max Fierro, 2/24/2024 (maxfierro@berkeley.edu) +use anyhow::anyhow; use anyhow::Result; +use std::sync::Mutex; + use crate::database::error::DatabaseError; use crate::database::Attribute; use crate::database::Datatype; @@ -33,7 +36,13 @@ pub struct SchemaIterator<'a> { index: usize, } -/* BUILDER IMPLEMENTATION */ +/// Thread-safe generator for sequential database keys. +#[derive(Default)] +pub struct KeySequencer { + sequence_key: Mutex, +} + +/* SCHEMA BUILDER IMPLEMENTATION */ impl SchemaBuilder { /// Returns a new instance of a `SchemaBuilder`, which can be used to @@ -123,6 +132,33 @@ impl SchemaBuilder { } } +/* KEY SEQUENCER IMPLEMENTATION */ + +impl KeySequencer { + /// Returns a new instance of `KeySequencer`, which can be used to generate + /// sequential database keys in a thread-safe manner. The first generated + /// key will be `initial`. + pub fn new(initial: u64) -> Self { + Self { + sequence_key: Mutex::new(initial), + } + } + + /// Returns the next sequential database key in a thread-safe manner. + pub fn next(&self) -> Result { + let mut key = self + .sequence_key + .lock() + .map_err(|_| anyhow!("Key sequencer lock poisoned."))?; + + { + let cur = *key; + *key += 1; + Ok(cur) + } + } +} + /* UTILITY IMPLEMENTATIONS */ impl ToString for Datatype { diff --git a/src/database/volatile/error.rs b/src/database/volatile/error.rs new file mode 100644 index 0000000..2ac56cb --- /dev/null +++ b/src/database/volatile/error.rs @@ -0,0 +1,3 @@ +//! # Volatile Database Error +//! +//! TODO diff --git a/src/database/volatile/mod.rs b/src/database/volatile/mod.rs index fe37679..a904ee3 100644 --- a/src/database/volatile/mod.rs +++ b/src/database/volatile/mod.rs @@ -1,115 +1,85 @@ //! # Volatile Database //! //! This module provides a trivial database implementation backed by a volatile -//! in-memory hashmap. -//! -//! #### Authorship -//! - Max Fierro, 2/24/2024 (maxfierro@berkeley.edu) -//! - Casey Stanford, 4/10/2024 (cqstanford@berkeley.edu) +//! in-memory data structure arrangement. use anyhow::Result; -use bitvec::{order::Msb0, prelude::*, slice::BitSlice, store::BitStore}; -use std::collections::HashMap; +use std::sync::Arc; -use crate::{ - database::{KVStore, Record, Schema, Tabular}, - model::State, - solver::record::rem, -}; +use crate::database::util::KeySequencer; +use resource::ResourceManager; +use transaction::TransactionManager; -/// [`KVStore`] implementation backed by an in-memory [`HashMap`]. -/// Constrained by the space available in memory, growing at O(n) with the number of entries. -pub struct Database { - memory: HashMap>, -} +/* RE-EXPORTS */ -impl Database { - pub fn initialize() -> Self { - Self { - memory: HashMap::new(), - } - } -} +pub use resource::Request; +pub use resource::Resource; +pub use transaction::Transaction; -impl KVStore for Database { - fn put(&mut self, key: State, value: &R) { - let new = BitVec::from(value.raw()).clone(); - self.memory.insert(key, new); - } +/* MODULES */ - fn get(&self, key: State) -> Option<&BitSlice> { - if let Some(vect) = self.memory.get(&key) { - return Some(&vect[..]); - } else { - return None; - } - } +mod transaction; +mod resource; - fn del(&mut self, key: State) { - self.memory.remove(&key); - } +/* DEFINITIONS */ + +type SequenceKey = u64; +type TransactionID = SequenceKey; +type ResourceID = SequenceKey; + +pub struct Database { + transaction_manager: Arc, + resource_manager: Arc, + sequencer: Arc, } -impl Tabular for Database { - fn create_table(&self, id: &str, schema: Schema) -> Result<()> { - todo!() - } +#[derive(Default)] +struct Sequencer { + transaction: KeySequencer, + resource: KeySequencer, +} + +/* IMPLEMENTATION */ - fn select_table(&self, id: &str) -> Result<()> { - todo!() +impl Sequencer { + pub fn next_transaction(&self) -> Result { + self.transaction.next() } - fn delete_table(&self, id: &str) -> Result<()> { - todo!() + pub fn next_resource(&self) -> Result { + self.transaction.next() } } -#[cfg(test)] -mod tests { - - use crate::database::volatile::tests::rem::RecordBuffer; - - use super::*; - - /// This test: - /// - Creates an example state test_state and Record test_rec. - /// - Checks that that test_state is initially not mapped in the database. - /// - Puts test_rec in the database, with test_state as its key. - /// - Checks that test_state now maps to test_rec. - #[test] - fn put_data_and_get_it() { - let mut db: Database = Database::initialize(); - let test_state: State = 7; - assert!(db.get(test_state).is_none()); - let test_rec: RecordBuffer = RecordBuffer::new().unwrap(); - db.put(test_state, &test_rec); - if let Some(result_rec) = db.get(test_state) { - assert_eq!(result_rec, test_rec.raw()); - } else { - assert_eq!(1, 0); +impl Database { + fn new() -> Self { + let sequencer = Arc::new(Sequencer::default()); + let resource_manager = ResourceManager::new(sequencer.clone()); + let transaction_manager = TransactionManager::new( + resource_manager.clone(), + sequencer.clone(), + ); + + Self { + transaction_manager, + resource_manager, + sequencer, } } - /// This test - /// - Creates an example state test_state and Record test_rec. - /// - Puts test_rec in the database, with test_state as its key. - /// - Deletes test_state and any associated Record. - /// - Checks that test_state now, again, maps to nothing. - /// - Puts test_rec BACK in the database, and confirms that test_state now maps to it once again. - #[test] - fn put_remove_and_put() { - let mut db: Database = Database::initialize(); - let test_state: State = 7; - let test_rec: RecordBuffer = RecordBuffer::new().unwrap(); - db.put(test_state, &test_rec); - db.del(test_state); - assert!(db.get(test_state).is_none()); - db.put(test_state, &test_rec); - if let Some(result_rec) = db.get(test_state) { - assert_eq!(result_rec, test_rec.raw()); - } else { - assert_eq!(1, 0); + fn create_transaction(&self, request: Request) -> Result> { + let transaction = self + .resource_manager + .initialize_transaction( + request, + self.transaction_manager.clone(), + )?; + + { + self.transaction_manager + .add_transaction(transaction.clone()); + Ok(transaction) } } } diff --git a/src/database/volatile/resource/manager.rs b/src/database/volatile/resource/manager.rs new file mode 100644 index 0000000..20a579f --- /dev/null +++ b/src/database/volatile/resource/manager.rs @@ -0,0 +1,232 @@ +//! # Volatile Database Resource Manager +//! +//! TODO + +use anyhow::anyhow; +use anyhow::bail; +use anyhow::Result; + +use std::collections::HashMap; +use std::collections::HashSet; +use std::sync::Condvar; +use std::sync::Mutex; +use std::sync::MutexGuard; +use std::sync::{Arc, RwLock}; + +use crate::database::volatile::resource::Resource; +use crate::database::volatile::transaction::ResourceHandles; +use crate::database::volatile::transaction::Transaction; +use crate::database::volatile::transaction::TransactionManager; +use crate::database::volatile::ResourceID; +use crate::database::volatile::Sequencer; +use crate::database::volatile::TransactionID; + +/* DEFINITIONS */ + +pub struct ResourceManager { + pub sequencer: Arc, + access: Mutex, + signal: Condvar, +} + +#[derive(Default)] +pub struct Request { + pub write: Vec, + pub read: Vec, +} + +#[derive(Default)] +struct AccessControl { + pool: HashMap>>, + owners: HashMap, + reading: HashMap, + writing: HashSet, + dropped: HashSet, +} + +/* IMPLEMENTATION */ + +impl AccessControl { + fn conflict(&self, request: &Request) -> bool { + request + .read + .iter() + .chain(&request.write) + .any(|id| self.writing.contains(id)) + } +} + +impl ResourceManager { + pub fn new(sequencer: Arc) -> Arc { + let access = Mutex::new(AccessControl::default()); + let signal = Condvar::new(); + let manager = Self { + sequencer, + access, + signal, + }; + + Arc::new(manager) + } + + pub fn add_resource(&self, resource: Resource) -> Result<()> { + let mut resources = self + .access + .lock() + .map_err(|_| anyhow!("Resource access lock poisoned."))?; + + let id = resource.id(); + let lock = RwLock::new(resource); + resources + .pool + .insert(id, Arc::new(lock)); + + Ok(()) + } + + pub fn drop_resource( + &self, + rid: ResourceID, + tid: TransactionID, + ) -> Result<()> { + let mut resources = self + .access + .lock() + .map_err(|_| anyhow!("Resource access lock poisoned."))?; + + if let Some(&owner) = resources.owners.get(&rid) { + if owner == tid && !resources.dropped.contains(&rid) { + resources.dropped.insert(rid); + resources.pool.remove(&rid); + } + } + + Err(anyhow!( + "Attempted to drop resource without ownership." + )) + } + + pub fn initialize_transaction( + &self, + request: Request, + manager: Arc, + ) -> Result> { + loop { + let mut resources = self.lock()?; + if request + .write + .iter() + .chain(&request.read) + .any(|id| !resources.pool.contains_key(id)) + { + bail!("Attempted to acquire non-existent resource."); + } + + if !resources.conflict(&request) { + let id = self.sequencer.next_transaction()?; + self.acquire_resources(&request, id, &mut resources); + let handles = self.generate_handles(&request, &mut resources); + let transaction = + Transaction::new(manager, handles, request, id); + + return Ok(transaction); + } + + self.signal + .wait(resources) + .map_err(|_| anyhow!("Resource access lock poisoned."))?; + } + } + + /* UTILITIES */ + + pub fn lock(&self) -> Result> { + self.access + .lock() + .map_err(|_| anyhow!("Resource access lock poisoned.")) + } + + pub fn signal_waiters(&self) { + self.signal.notify_all() + } + + pub fn acquire_resources( + &self, + request: &Request, + transaction: TransactionID, + resources: &mut MutexGuard, + ) { + request + .write + .iter() + .for_each(|id| { + resources.writing.insert(*id); + resources + .owners + .insert(*id, transaction); + }); + + request.read.iter().for_each(|id| { + let count = resources + .reading + .entry(*id) + .or_insert(0); + + *count += 1; + resources + .owners + .insert(*id, transaction); + }); + } + + pub fn release_resources( + &self, + request: &Request, + resources: &mut MutexGuard, + ) { + request + .write + .iter() + .for_each(|id| { + resources.owners.remove(id); + resources.writing.remove(id); + }); + + request.read.iter().for_each(|id| { + resources.owners.remove(id); + let count = resources + .reading + .entry(*id) + .or_insert(1); + + *count -= 1; + }); + } + + fn generate_handles( + &self, + request: &Request, + resources: &mut MutexGuard, + ) -> ResourceHandles { + let mut handles = ResourceHandles::default(); + for &id in request.write.iter() { + let lock = resources + .pool + .get(&id) + .expect("Attempted to fetch non-existent resource lock."); + + handles.add_writing(id, lock.clone()); + } + + for &id in request.read.iter() { + let lock = resources + .pool + .get(&id) + .expect("Attempted to fetch non-existent resource lock."); + + handles.add_reading(id, lock.clone()); + } + + handles + } +} diff --git a/src/database/volatile/resource/mod.rs b/src/database/volatile/resource/mod.rs new file mode 100644 index 0000000..48f786d --- /dev/null +++ b/src/database/volatile/resource/mod.rs @@ -0,0 +1,39 @@ +//! # Volatile Database Resource +//! +//! TODO + +use std::sync::{Arc, Weak}; + +use crate::database::volatile::ResourceID; + +/* RE-EXPORTS */ + +pub use manager::Request; +pub use manager::ResourceManager; + +/* MODULES */ + +mod manager; + +/* DEFINITIONS */ + +pub struct Resource { + manager: Weak, + id: ResourceID, +} + +/* IMPLEMENTATION */ + +impl Resource { + pub(in crate::database::volatile) fn new( + manager: Arc, + id: ResourceID, + ) -> Self { + let manager = Arc::downgrade(&manager); + Self { manager, id } + } + + pub fn id(&self) -> ResourceID { + self.id + } +} diff --git a/src/database/volatile/transaction/manager.rs b/src/database/volatile/transaction/manager.rs new file mode 100644 index 0000000..e9239c7 --- /dev/null +++ b/src/database/volatile/transaction/manager.rs @@ -0,0 +1,52 @@ +//! # Volatile Database Transaction Manager +//! +//! TODO + +use anyhow::anyhow; +use anyhow::Result; + +use std::collections::HashMap; +use std::sync::{Arc, RwLock, Weak}; + +use crate::database::volatile::transaction::Transaction; +use crate::database::volatile::ResourceManager; +use crate::database::volatile::Sequencer; +use crate::database::volatile::TransactionID; + +/* DEFINITIONS */ + +pub struct TransactionManager { + transactions: RwLock>>, + pub resource_manager: Arc, + pub sequencer: Arc, +} + +/* IMPLEMENTATION */ + +impl TransactionManager { + pub fn new( + resource_manager: Arc, + sequencer: Arc, + ) -> Arc { + let transactions = RwLock::new(HashMap::new()); + let manager = Self { + resource_manager, + transactions, + sequencer, + }; + + Arc::new(manager) + } + + pub fn add_transaction(&self, transaction: Arc) -> Result<()> { + let mut txns = self + .transactions + .write() + .map_err(|_| anyhow!("Transaction manager lock poisoned."))?; + + { + txns.insert(transaction.id(), transaction); + Ok(()) + } + } +} diff --git a/src/database/volatile/transaction/mod.rs b/src/database/volatile/transaction/mod.rs new file mode 100644 index 0000000..fa3e5c9 --- /dev/null +++ b/src/database/volatile/transaction/mod.rs @@ -0,0 +1,178 @@ +//! # Volatile Database Transaction +//! +//! TODO + +use anyhow::anyhow; +use anyhow::Result; + +use std::sync::RwLockReadGuard; +use std::{ + collections::HashMap, + sync::{Arc, RwLock, RwLockWriteGuard, Weak}, +}; + +use crate::database::volatile::resource::Resource; +use crate::database::volatile::Request; +use crate::database::volatile::{ResourceID, TransactionID}; + +/* RE-EXPORTS */ + +pub use manager::TransactionManager; + +/* MODULES */ + +mod manager; + +/* DEFINITIONS */ + +#[derive(Default)] +pub struct ResourceHandles { + write: HashMap, + read: HashMap, +} + +pub struct Transaction { + manager: Weak, + handles: ResourceHandles, + request: Request, + id: TransactionID, +} + +pub struct WriteLock(Arc>); +pub struct ReadLock(Arc>); + +/* IMPLEMENTATION */ + +impl WriteLock { + fn new(lock: Arc>) -> Self { + Self(lock) + } + + fn lock(&self) -> Result> { + self.0 + .write() + .map_err(|_| anyhow!("Resource lock poisoned.")) + } +} + +impl ReadLock { + fn new(lock: Arc>) -> Self { + Self(lock) + } + + fn lock(&self) -> Result> { + self.0 + .read() + .map_err(|_| anyhow!("Resource lock poisoned.")) + } +} + +impl ResourceHandles { + pub fn add_reading(&mut self, id: ResourceID, lock: Arc>) { + self.read + .insert(id, ReadLock::new(lock)); + } + + pub fn add_writing(&mut self, id: ResourceID, lock: Arc>) { + self.write + .insert(id, WriteLock::new(lock)); + } + + fn get_reading(&self, resource: ResourceID) -> Result<&ReadLock> { + if let Some(resource) = self.read.get(&resource) { + Ok(resource) + } else { + Err(anyhow!( + "Attempted read on unacquired resource {}.", + resource + )) + } + } + + fn get_writing(&self, resource: ResourceID) -> Result<&WriteLock> { + if let Some(resource) = self.write.get(&resource) { + Ok(resource) + } else { + Err(anyhow!( + "Attempted write on unacquired resource {}.", + resource + )) + } + } +} + +impl Transaction { + pub(in crate::database::volatile) fn new( + manager: Arc, + handles: ResourceHandles, + request: Request, + id: TransactionID, + ) -> Arc { + let manager = Arc::downgrade(&manager); + let transaction = Self { + manager, + handles, + request, + id, + }; + + Arc::new(transaction) + } + + pub fn create_resource(&self) -> Result { + if let Some(manager) = self.manager.upgrade() { + let id = manager.sequencer.next_resource()?; + let resource_manager = manager.resource_manager.clone(); + let resource = Resource::new(resource_manager.clone(), id); + resource_manager.add_resource(resource)?; + Ok(id) + } else { + Err(anyhow!("Transaction manager was dropped.")) + } + } + + pub fn drop_resource(&self, id: ResourceID) -> Result<()> { + if let Some(manager) = self.manager.upgrade() { + manager + .resource_manager + .drop_resource(id, self.id())?; + + Ok(()) + } else { + Err(anyhow!("Transaction manager was dropped.")) + } + } + + pub fn reading(&self, id: ResourceID, func: F) -> Result + where + F: FnOnce(&Resource) -> Result, + { + let resource = self.handles.get_reading(id)?; + let guard = resource.lock()?; + func(&guard) + } + + pub fn writing(&self, id: ResourceID, func: F) -> Result + where + F: FnOnce(&mut Resource) -> Result, + { + let resource = self.handles.get_writing(id)?; + let mut guard = resource.lock()?; + func(&mut guard) + } + + pub fn id(&self) -> TransactionID { + self.id + } +} + +impl Drop for Transaction { + fn drop(&mut self) { + if let Some(manager) = self.manager.upgrade() { + let resource_manager = &manager.resource_manager; + let mut resources = resource_manager.lock().unwrap(); + resource_manager.release_resources(&self.request, &mut resources); + resource_manager.signal_waiters(); + } + } +}