diff --git a/Cargo.toml b/Cargo.toml index cf606190..2aac69fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,13 +15,15 @@ exclude = ["fuzz/"] crate-type = ["cdylib", "rlib"] [build-dependencies] -pyo3-build-config = "0.20.0" +pyo3-build-config = { version = "0.20.0", optional = true } [dependencies] -libc = "0.2.104" log = {version = "0.4.17", optional = true } pyo3 = {version = "0.20.0", features=["extension-module", "abi3-py37"], optional = true } +[target.'cfg(unix)'.dependencies] +libc = "0.2.104" + # Common test/bench dependencies [dev-dependencies] rand = "0.8" @@ -45,7 +47,7 @@ io-uring = "0.6.2" [features] # This feature is still experimental, and is not considered stable -python = ["pyo3"] +python = [ "pyo3", "pyo3-build-config" ] # Enables log messages logging = ["log"] # Enable cache hit metrics diff --git a/build.rs b/build.rs index dace4a9b..76f9c70b 100644 --- a/build.rs +++ b/build.rs @@ -1,3 +1,11 @@ fn main() { + if std::env::var("CARGO_CFG_FUZZING").is_ok() + && std::env::var("CARGO_CFG_TARGET_OS").as_deref() == Ok("macos") + { + println!("cargo:rustc-cdylib-link-arg=-undefined"); + println!("cargo:rustc-cdylib-link-arg=dynamic_lookup"); + } + + #[cfg(feature = "python")] pyo3_build_config::add_extension_module_link_args(); } diff --git a/src/db.rs b/src/db.rs index de8b1226..d42f4b4f 100644 --- a/src/db.rs +++ b/src/db.rs @@ -10,8 +10,10 @@ use crate::{ StorageError, }; use crate::{ReadTransaction, Result, WriteTransaction}; -use std::fmt::{Display, Formatter}; +use std::fmt::{Debug, Display, Formatter}; + use std::fs::{File, OpenOptions}; +use std::io; use std::io::ErrorKind; use std::marker::PhantomData; use std::ops::RangeFull; @@ -26,6 +28,25 @@ use crate::transactions::SAVEPOINT_TABLE; #[cfg(feature = "logging")] use log::{info, warn}; +#[allow(clippy::len_without_is_empty)] +/// Implements persistent storage for a database. +pub trait StorageBackend: 'static + Debug + Send + Sync { + /// Gets the current length of the storage. + fn len(&self) -> Result; + + /// Reads the specified array of bytes from the storage. + fn read(&self, offset: u64, len: usize) -> Result, io::Error>; + + /// Sets the length of the storage. + fn set_len(&self, len: u64) -> Result<(), io::Error>; + + /// Syncs all buffered data with the persistent storage. + fn sync_data(&self, eventual: bool) -> Result<(), io::Error>; + + /// Writes the specified array to the storage. + fn write(&self, offset: u64, data: &[u8]) -> Result<(), io::Error>; +} + struct AtomicTransactionId { inner: AtomicU64, } @@ -608,7 +629,7 @@ impl Database { } fn new( - file: File, + file: Box, page_size: usize, region_size: Option, read_cache_size_bytes: usize, @@ -793,7 +814,7 @@ impl Builder { .open(path)?; Database::new( - file, + Box::new(crate::FileBackend::new(file)?), self.page_size, self.region_size, self.read_cache_size_bytes, @@ -810,7 +831,7 @@ impl Builder { } Database::new( - file, + Box::new(crate::FileBackend::new(file)?), self.page_size, None, self.read_cache_size_bytes, @@ -823,7 +844,18 @@ impl Builder { /// The file must be empty or contain a valid database. pub fn create_file(&self, file: File) -> Result { Database::new( - file, + Box::new(crate::FileBackend::new(file)?), + self.page_size, + self.region_size, + self.read_cache_size_bytes, + self.write_cache_size_bytes, + ) + } + + /// Open an existing or create a new database with the given backend. + pub fn create_backend(&self, backend: impl StorageBackend) -> Result { + Database::new( + Box::new(backend), self.page_size, self.region_size, self.read_cache_size_bytes, diff --git a/src/lib.rs b/src/lib.rs index 1abf7c16..83d17b52 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -54,8 +54,8 @@ //! [design]: https://github.com/cberner/redb/blob/master/docs/design.md pub use db::{ - Builder, Database, MultimapTableDefinition, MultimapTableHandle, TableDefinition, TableHandle, - UntypedMultimapTableHandle, UntypedTableHandle, + Builder, Database, MultimapTableDefinition, MultimapTableHandle, StorageBackend, + TableDefinition, TableHandle, UntypedMultimapTableHandle, UntypedTableHandle, }; pub use error::{ CommitError, CompactionError, DatabaseError, Error, SavepointError, StorageError, TableError, @@ -66,7 +66,8 @@ pub use multimap_table::{ }; pub use table::{Drain, DrainFilter, Range, ReadOnlyTable, ReadableTable, Table}; pub use transactions::{DatabaseStats, Durability, ReadTransaction, WriteTransaction}; -pub use tree_store::{AccessGuard, AccessGuardMut, Savepoint}; +pub use tree_store::file_backend::FileBackend; +pub use tree_store::{AccessGuard, AccessGuardMut, InMemoryBackend, Savepoint}; pub use types::{RedbKey, RedbValue, TypeName}; type Result = std::result::Result; diff --git a/src/tree_store/mod.rs b/src/tree_store/mod.rs index 02b11b4e..bb453f98 100644 --- a/src/tree_store/mod.rs +++ b/src/tree_store/mod.rs @@ -12,7 +12,7 @@ pub(crate) use btree_base::{LeafAccessor, LeafMutator, RawLeafBuilder, BRANCH, L pub(crate) use btree_iters::{ AllPageNumbersBtreeIter, BtreeDrain, BtreeDrainFilter, BtreeRangeIter, }; -pub use page_store::Savepoint; +pub use page_store::{file_backend, InMemoryBackend, Savepoint}; pub(crate) use page_store::{ CachePriority, Page, PageHint, PageNumber, SerializedSavepoint, TransactionalMemory, FILE_FORMAT_VERSION, MAX_VALUE_LENGTH, PAGE_SIZE, diff --git a/src/tree_store/page_store/cached_file.rs b/src/tree_store/page_store/cached_file.rs index 778ae6ac..1a950baf 100644 --- a/src/tree_store/page_store/cached_file.rs +++ b/src/tree_store/page_store/cached_file.rs @@ -1,14 +1,10 @@ use crate::tree_store::page_store::base::PageHint; -use crate::tree_store::page_store::file_lock::LockedFile; use crate::tree_store::LEAF; -use crate::{DatabaseError, Result, StorageError}; +use crate::{DatabaseError, Result, StorageBackend, StorageError}; use std::collections::BTreeMap; -use std::fs::File; use std::io; use std::mem; use std::ops::{Index, IndexMut}; -#[cfg(any(target_os = "linux", all(unix, not(fuzzing))))] -use std::os::unix::io::AsRawFd; use std::slice::SliceIndex; #[cfg(any(fuzzing, test, feature = "cache_metrics"))] use std::sync::atomic::AtomicU64; @@ -225,7 +221,7 @@ impl PrioritizedWriteCache { } pub(super) struct PagedCachedFile { - file: LockedFile, + file: Box, page_size: u64, max_read_cache_bytes: usize, read_cache_bytes: AtomicUsize, @@ -245,7 +241,7 @@ pub(super) struct PagedCachedFile { impl PagedCachedFile { pub(super) fn new( - file: File, + file: Box, page_size: u64, max_read_cache_bytes: usize, max_write_buffer_bytes: usize, @@ -255,17 +251,8 @@ impl PagedCachedFile { read_cache.push(RwLock::new(PrioritizedCache::new())); } - let lock = LockedFile::new(file)?; - - // Try to flush any pages in the page cache that are out of sync with disk. - // See here for why: - #[cfg(target_os = "linux")] - unsafe { - libc::posix_fadvise64(lock.file().as_raw_fd(), 0, 0, libc::POSIX_FADV_DONTNEED); - } - Ok(Self { - file: lock, + file, page_size, max_read_cache_bytes, read_cache_bytes: AtomicUsize::new(0), @@ -284,7 +271,7 @@ impl PagedCachedFile { } pub(crate) fn raw_file_len(&self) -> Result { - Ok(self.file.file().metadata()?.len()) + self.file.len().map_err(StorageError::from) } #[cfg(any(fuzzing, test))] @@ -338,29 +325,18 @@ impl PagedCachedFile { // TODO: be more fine-grained about this invalidation self.invalidate_cache_all(); - self.file.file().set_len(len).map_err(StorageError::from) + self.file.set_len(len).map_err(StorageError::from) } - pub(super) fn flush(&self) -> Result { + pub(super) fn flush(&self, #[allow(unused_variables)] eventual: bool) -> Result { self.check_fsync_failure()?; self.flush_write_buffer()?; // Disable fsync when fuzzing, since it doesn't test crash consistency #[cfg(not(fuzzing))] { - let res = self.file.file().sync_data().map_err(StorageError::from); + let res = self.file.sync_data(eventual).map_err(StorageError::from); if res.is_err() { self.set_fsync_failed(true); - // Try to flush any pages in the page cache that are out of sync with disk. - // See here for why: - #[cfg(target_os = "linux")] - unsafe { - libc::posix_fadvise64( - self.file.file().as_raw_fd(), - 0, - 0, - libc::POSIX_FADV_DONTNEED, - ); - } return res; } } @@ -368,26 +344,6 @@ impl PagedCachedFile { Ok(()) } - pub(super) fn eventual_flush(&self) -> Result { - self.check_fsync_failure()?; - - #[cfg(not(target_os = "macos"))] - { - self.flush()?; - } - #[cfg(all(target_os = "macos", not(fuzzing)))] - { - self.flush_write_buffer()?; - let code = unsafe { libc::fcntl(self.file.file().as_raw_fd(), libc::F_BARRIERFSYNC) }; - if code == -1 { - self.set_fsync_failed(true); - return Err(io::Error::last_os_error().into()); - } - } - - Ok(()) - } - // Make writes visible to readers, but does not guarantee any durability pub(super) fn write_barrier(&self) -> Result { self.flush_write_buffer() diff --git a/src/tree_store/page_store/file_backend/mod.rs b/src/tree_store/page_store/file_backend/mod.rs new file mode 100644 index 00000000..ef556225 --- /dev/null +++ b/src/tree_store/page_store/file_backend/mod.rs @@ -0,0 +1,14 @@ +#[cfg(any(unix, target_os = "wasi"))] +mod unix; +#[cfg(any(unix, target_os = "wasi"))] +pub use unix::FileBackend; + +#[cfg(windows)] +mod windows; +#[cfg(windows)] +pub use windows::FileBackend; + +#[cfg(not(any(windows, unix, target_os = "wasi")))] +mod unsupported; +#[cfg(not(any(windows, unix, target_os = "wasi")))] +pub use unsupported::FileBackend; diff --git a/src/tree_store/page_store/file_backend/unix.rs b/src/tree_store/page_store/file_backend/unix.rs new file mode 100644 index 00000000..29e06e15 --- /dev/null +++ b/src/tree_store/page_store/file_backend/unix.rs @@ -0,0 +1,110 @@ +// TODO once Rust's libc has flock implemented for WASI, this file needs to be revisited. +// What needs to be changed is commented below. +// See also: https://github.com/WebAssembly/wasi-filesystem/issues/2 + +// Remove this line once wasi-libc has flock +#![cfg_attr(target_os = "wasi", allow(unused_imports))] + +use crate::{DatabaseError, Result, StorageBackend}; +use std::fs::File; +use std::io; + +#[cfg(unix)] +use std::os::unix::{fs::FileExt, io::AsRawFd}; + +#[cfg(target_os = "wasi")] +use std::os::wasi::{fs::FileExt, io::AsRawFd}; + +/// Stores a database as a file on-disk. +#[derive(Debug)] +pub struct FileBackend { + file: File, +} + +impl FileBackend { + /// Creates a new backend which stores data to the given file. + // This is a no-op until we get flock in wasi-libc. + // Delete this function when we get flock. + #[cfg(target_os = "wasi")] + pub fn new(file: File) -> Result { + Ok(Self { file }) + } + + /// Creates a new backend which stores data to the given file. + #[cfg(unix)] // remove this line when wasi-libc gets flock + pub fn new(file: File) -> Result { + let fd = file.as_raw_fd(); + let result = unsafe { libc::flock(fd, libc::LOCK_EX | libc::LOCK_NB) }; + if result != 0 { + let err = io::Error::last_os_error(); + if err.kind() == io::ErrorKind::WouldBlock { + Err(DatabaseError::DatabaseAlreadyOpen) + } else { + Err(err.into()) + } + } else { + // Try to flush any pages in the page cache that are out of sync with disk. + // See here for why: + #[cfg(target_os = "linux")] + unsafe { + libc::posix_fadvise64(fd, 0, 0, libc::POSIX_FADV_DONTNEED); + } + Ok(Self { file }) + } + } +} + +impl StorageBackend for FileBackend { + fn len(&self) -> Result { + Ok(self.file.metadata()?.len()) + } + + fn read(&self, offset: u64, len: usize) -> Result, io::Error> { + let mut buffer = vec![0; len]; + self.file.read_exact_at(&mut buffer, offset)?; + Ok(buffer) + } + + fn set_len(&self, len: u64) -> Result<(), io::Error> { + self.file.set_len(len) + } + + #[cfg(not(target_os = "macos"))] + fn sync_data(&self, _: bool) -> Result<(), io::Error> { + let res = self.file.sync_data(); + #[cfg(target_os = "linux")] + if res.is_err() { + // Try to flush any pages in the page cache that are out of sync with disk. + // See here for why: + unsafe { + libc::posix_fadvise64(self.file.as_raw_fd(), 0, 0, libc::POSIX_FADV_DONTNEED); + } + } + res + } + + #[cfg(target_os = "macos")] + fn sync_data(&self, eventual: bool) -> Result<(), io::Error> { + if eventual { + let code = unsafe { libc::fcntl(self.file.as_raw_fd(), libc::F_BARRIERFSYNC) }; + if code == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } + } else { + self.file.sync_data() + } + } + + fn write(&self, offset: u64, data: &[u8]) -> Result<(), io::Error> { + self.file.write_all_at(data, offset) + } +} + +#[cfg(unix)] // remove this line when wasi-libc gets flock +impl Drop for FileBackend { + fn drop(&mut self) { + unsafe { libc::flock(self.file.as_raw_fd(), libc::LOCK_UN) }; + } +} diff --git a/src/tree_store/page_store/file_backend/unsupported.rs b/src/tree_store/page_store/file_backend/unsupported.rs new file mode 100644 index 00000000..9e2dfab6 --- /dev/null +++ b/src/tree_store/page_store/file_backend/unsupported.rs @@ -0,0 +1,36 @@ +use crate::{DatabaseError, Result, StorageBackend}; +use std::fs::File; +use std::io; + +/// Stores a database as a file on-disk. +#[derive(Debug)] +pub struct FileBackend; + +impl FileBackend { + /// Creates a new backend which stores data to the given file. + pub fn new(_: File) -> Result { + unimplemented!("Not supported on this platform.") + } +} + +impl StorageBackend for FileBackend { + fn len(&self) -> Result { + unimplemented!() + } + + fn read(&self, _: u64, _: usize) -> Result, io::Error> { + unimplemented!() + } + + fn set_len(&self, _: u64) -> Result<(), io::Error> { + unimplemented!() + } + + fn sync_data(&self, _: bool) -> Result<(), io::Error> { + unimplemented!() + } + + fn write(&self, _: u64, _: &[u8]) -> Result<(), io::Error> { + unimplemented!() + } +} diff --git a/src/tree_store/page_store/file_lock/windows.rs b/src/tree_store/page_store/file_backend/windows.rs similarity index 72% rename from src/tree_store/page_store/file_lock/windows.rs rename to src/tree_store/page_store/file_backend/windows.rs index b288ebcf..0a2c263f 100644 --- a/src/tree_store/page_store/file_lock/windows.rs +++ b/src/tree_store/page_store/file_backend/windows.rs @@ -1,6 +1,6 @@ #![allow(clippy::upper_case_acronyms)] -use crate::{DatabaseError, Result}; +use crate::{DatabaseError, Result, StorageBackend}; use std::fs::File; use std::io; use std::os::windows::fs::FileExt; @@ -30,12 +30,15 @@ extern "system" { ) -> i32; } -pub(crate) struct LockedFile { +/// Stores a database as a file on-disk. +#[derive(Debug)] +pub struct FileBackend { file: File, } -impl LockedFile { - pub(crate) fn new(file: File) -> Result { +impl FileBackend { + /// Creates a new backend which stores data to the given file. + pub fn new(file: File) -> Result { let handle = file.as_raw_handle(); unsafe { let result = LockFile(handle, 0, 0, u32::MAX, u32::MAX); @@ -54,8 +57,14 @@ impl LockedFile { Ok(Self { file }) } +} + +impl StorageBackend for FileBackend { + fn len(&self) -> Result { + Ok(self.file.metadata()?.len()) + } - pub(crate) fn read(&self, mut offset: u64, len: usize) -> Result, io::Error> { + fn read(&self, mut offset: u64, len: usize) -> Result, io::Error> { let mut buffer = vec![0; len]; let mut data_offset = 0; while data_offset < buffer.len() { @@ -66,7 +75,15 @@ impl LockedFile { Ok(buffer) } - pub(crate) fn write(&self, mut offset: u64, data: &[u8]) -> Result<(), io::Error> { + fn set_len(&self, len: u64) -> Result<(), io::Error> { + self.file.set_len(len) + } + + fn sync_data(&self, _: bool) -> Result<(), io::Error> { + self.file.sync_data() + } + + fn write(&self, mut offset: u64, data: &[u8]) -> Result<(), io::Error> { let mut data_offset = 0; while data_offset < data.len() { let written = self.file.seek_write(&data[data_offset..], offset)?; @@ -75,13 +92,9 @@ impl LockedFile { } Ok(()) } - - pub(crate) fn file(&self) -> &File { - &self.file - } } -impl Drop for LockedFile { +impl Drop for FileBackend { fn drop(&mut self) { unsafe { UnlockFile(self.file.as_raw_handle(), 0, 0, u32::MAX, u32::MAX) }; } diff --git a/src/tree_store/page_store/file_lock/mod.rs b/src/tree_store/page_store/file_lock/mod.rs deleted file mode 100644 index c9108dd5..00000000 --- a/src/tree_store/page_store/file_lock/mod.rs +++ /dev/null @@ -1,9 +0,0 @@ -#[cfg(any(unix, target_os = "wasi"))] -mod unix; -#[cfg(any(unix, target_os = "wasi"))] -pub(super) use unix::LockedFile; - -#[cfg(windows)] -mod windows; -#[cfg(windows)] -pub(super) use windows::LockedFile; diff --git a/src/tree_store/page_store/file_lock/unix.rs b/src/tree_store/page_store/file_lock/unix.rs deleted file mode 100644 index cbfadc53..00000000 --- a/src/tree_store/page_store/file_lock/unix.rs +++ /dev/null @@ -1,66 +0,0 @@ -// TODO once Rust's libc has flock implemented for WASI, this file needs to be revisited. -// What needs to be changed is commented below. -// See also: https://github.com/WebAssembly/wasi-filesystem/issues/2 - -// Remove this line once wasi-libc has flock -#![cfg_attr(target_os = "wasi", allow(unused_imports))] - -use crate::{DatabaseError, Result}; -use std::fs::File; -use std::io; - -#[cfg(unix)] -use std::os::unix::{fs::FileExt, io::AsRawFd}; - -#[cfg(target_os = "wasi")] -use std::os::wasi::{fs::FileExt, io::AsRawFd}; - -pub(crate) struct LockedFile { - file: File, -} - -impl LockedFile { - // This is a no-op until we get flock in wasi-libc. - // Delete this function when we get flock. - #[cfg(target_os = "wasi")] - pub(crate) fn new(file: File) -> Result { - Ok(Self { file }) - } - - #[cfg(unix)] // remove this line when wasi-libc gets flock - pub(crate) fn new(file: File) -> Result { - let fd = file.as_raw_fd(); - let result = unsafe { libc::flock(fd, libc::LOCK_EX | libc::LOCK_NB) }; - if result != 0 { - let err = io::Error::last_os_error(); - if err.kind() == io::ErrorKind::WouldBlock { - Err(DatabaseError::DatabaseAlreadyOpen) - } else { - Err(err.into()) - } - } else { - Ok(Self { file }) - } - } - - pub(crate) fn file(&self) -> &File { - &self.file - } - - pub(crate) fn read(&self, offset: u64, len: usize) -> Result, io::Error> { - let mut buffer = vec![0; len]; - self.file.read_exact_at(&mut buffer, offset)?; - Ok(buffer) - } - - pub(crate) fn write(&self, offset: u64, data: &[u8]) -> Result<(), io::Error> { - self.file.write_all_at(data, offset) - } -} - -#[cfg(unix)] // remove this line when wasi-libc gets flock -impl Drop for LockedFile { - fn drop(&mut self) { - unsafe { libc::flock(self.file.as_raw_fd(), libc::LOCK_UN) }; - } -} diff --git a/src/tree_store/page_store/header.rs b/src/tree_store/page_store/header.rs index 4bd43116..119d991f 100644 --- a/src/tree_store/page_store/header.rs +++ b/src/tree_store/page_store/header.rs @@ -413,7 +413,7 @@ mod test { use crate::tree_store::page_store::TransactionalMemory; #[cfg(not(target_os = "windows"))] use crate::StorageError; - use crate::{Database, ReadableTable}; + use crate::{Database, FileBackend, ReadableTable}; use std::fs::OpenOptions; use std::io::{Read, Seek, SeekFrom, Write}; use std::mem::size_of; @@ -468,10 +468,16 @@ mod test { .unwrap(); file.write_all(&[0; size_of::()]).unwrap(); - assert!(TransactionalMemory::new(file, PAGE_SIZE, None, 0, 0) - .unwrap() - .needs_repair() - .unwrap()); + assert!(TransactionalMemory::new( + Box::new(FileBackend::new(file).unwrap()), + PAGE_SIZE, + None, + 0, + 0 + ) + .unwrap() + .needs_repair() + .unwrap()); #[allow(unused_mut)] let mut db2 = Database::create(tmpfile.path()).unwrap(); @@ -551,10 +557,16 @@ mod test { buffer[0] |= RECOVERY_REQUIRED; file.write_all(&buffer).unwrap(); - assert!(TransactionalMemory::new(file, PAGE_SIZE, None, 0, 0) - .unwrap() - .needs_repair() - .unwrap()); + assert!(TransactionalMemory::new( + Box::new(FileBackend::new(file).unwrap()), + PAGE_SIZE, + None, + 0, + 0 + ) + .unwrap() + .needs_repair() + .unwrap()); Database::open(tmpfile.path()).unwrap(); } @@ -597,10 +609,16 @@ mod test { buffer[0] |= RECOVERY_REQUIRED; file.write_all(&buffer).unwrap(); - assert!(TransactionalMemory::new(file, PAGE_SIZE, None, 0, 0) - .unwrap() - .needs_repair() - .unwrap()); + assert!(TransactionalMemory::new( + Box::new(FileBackend::new(file).unwrap()), + PAGE_SIZE, + None, + 0, + 0 + ) + .unwrap() + .needs_repair() + .unwrap()); Database::open(tmpfile.path()).unwrap(); } diff --git a/src/tree_store/page_store/in_memory_backend.rs b/src/tree_store/page_store/in_memory_backend.rs new file mode 100644 index 00000000..4ab0c783 --- /dev/null +++ b/src/tree_store/page_store/in_memory_backend.rs @@ -0,0 +1,77 @@ +use crate::StorageBackend; +use std::io; +use std::sync::*; + +/// Acts as temporal in-memory database storage. +#[derive(Debug, Default)] +pub struct InMemoryBackend(RwLock>); + +impl InMemoryBackend { + fn out_of_range() -> io::Error { + io::Error::new(io::ErrorKind::InvalidInput, "Index out-of-range.") + } +} + +impl InMemoryBackend { + /// Creates a new, empty memory backend. + pub fn new() -> Self { + Self::default() + } + + /// Gets a read guard for this backend. + fn read(&self) -> RwLockReadGuard<'_, Vec> { + self.0.read().expect("Could not acquire read lock.") + } + + /// Gets a write guard for this backend. + fn write(&self) -> RwLockWriteGuard<'_, Vec> { + self.0.write().expect("Could not acquire write lock.") + } +} + +impl StorageBackend for InMemoryBackend { + fn len(&self) -> Result { + Ok(self.read().len() as u64) + } + + fn read(&self, offset: u64, len: usize) -> Result, io::Error> { + let guard = self.read(); + let offset = usize::try_from(offset).map_err(|_| Self::out_of_range())?; + if offset + len <= guard.len() { + Ok(guard[offset..offset + len].to_owned()) + } else { + Err(Self::out_of_range()) + } + } + + fn set_len(&self, len: u64) -> Result<(), io::Error> { + let mut guard = self.write(); + let len = usize::try_from(len).map_err(|_| Self::out_of_range())?; + if guard.len() < len { + let additional = len - guard.len(); + guard.reserve(additional); + for _ in 0..additional { + guard.push(0); + } + } else { + guard.truncate(len); + } + + Ok(()) + } + + fn sync_data(&self, _: bool) -> Result<(), io::Error> { + Ok(()) + } + + fn write(&self, offset: u64, data: &[u8]) -> Result<(), io::Error> { + let mut guard = self.write(); + let offset = usize::try_from(offset).map_err(|_| Self::out_of_range())?; + if offset + data.len() <= guard.len() { + guard[offset..offset + data.len()].copy_from_slice(data); + Ok(()) + } else { + Err(Self::out_of_range()) + } + } +} diff --git a/src/tree_store/page_store/mod.rs b/src/tree_store/page_store/mod.rs index dfa50a80..174515b5 100644 --- a/src/tree_store/page_store/mod.rs +++ b/src/tree_store/page_store/mod.rs @@ -2,8 +2,9 @@ mod base; mod bitmap; mod buddy_allocator; mod cached_file; -mod file_lock; +pub mod file_backend; mod header; +mod in_memory_backend; mod layout; mod page_manager; mod region; @@ -13,6 +14,7 @@ mod xxh3; pub(crate) use base::{Page, PageHint, PageNumber, MAX_VALUE_LENGTH}; pub(crate) use header::PAGE_SIZE; +pub use in_memory_backend::InMemoryBackend; pub(crate) use page_manager::{xxh3_checksum, TransactionalMemory, FILE_FORMAT_VERSION}; pub use savepoint::Savepoint; pub(crate) use savepoint::SerializedSavepoint; diff --git a/src/tree_store/page_store/page_manager.rs b/src/tree_store/page_store/page_manager.rs index 1741702a..126501ce 100644 --- a/src/tree_store/page_store/page_manager.rs +++ b/src/tree_store/page_store/page_manager.rs @@ -8,6 +8,7 @@ use crate::tree_store::page_store::layout::DatabaseLayout; use crate::tree_store::page_store::region::{Allocators, RegionTracker}; use crate::tree_store::page_store::{hash128_with_seed, PageImpl, PageMut}; use crate::tree_store::{Page, PageNumber}; +use crate::StorageBackend; use crate::{DatabaseError, Result, StorageError}; #[cfg(feature = "logging")] use log::warn; @@ -16,7 +17,6 @@ use std::cmp::{max, min}; use std::collections::HashMap; use std::collections::HashSet; use std::convert::TryInto; -use std::fs::File; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Mutex; @@ -98,7 +98,7 @@ pub(crate) struct TransactionalMemory { impl TransactionalMemory { #[allow(clippy::too_many_arguments)] pub(crate) fn new( - file: File, + file: Box, page_size: usize, requested_region_size: Option, read_cache_size_bytes: usize, @@ -180,14 +180,14 @@ impl TransactionalMemory { .copy_from_slice(&header.to_bytes(false, false)); allocators.flush_to(tracker_page, layout, &storage)?; - storage.flush()?; + storage.flush(false)?; // Write the magic number only after the data structure is initialized and written to disk // to ensure that it's crash safe storage .write(0, DB_HEADER_SIZE, true, |_| CachePriority::High)? .mem_mut() .copy_from_slice(&header.to_bytes(true, false)); - storage.flush()?; + storage.flush(false)?; } let header_bytes = storage.read_direct(0, DB_HEADER_SIZE)?; let (mut header, repair_info) = DatabaseHeader::from_bytes(&header_bytes); @@ -244,7 +244,7 @@ impl TransactionalMemory { .write(0, DB_HEADER_SIZE, true, |_| CachePriority::High)? .mem_mut() .copy_from_slice(&header.to_bytes(true, false)); - storage.flush()?; + storage.flush(false)?; } let layout = header.layout(); @@ -284,7 +284,7 @@ impl TransactionalMemory { pub(crate) fn clear_cache_and_reload(&mut self) -> Result { assert!(self.allocated_since_commit.lock().unwrap().is_empty()); - self.storage.flush()?; + self.storage.flush(false)?; self.storage.invalidate_cache_all(); let header_bytes = self.storage.read_direct(0, DB_HEADER_SIZE)?; @@ -319,7 +319,7 @@ impl TransactionalMemory { .write(0, DB_HEADER_SIZE, true, |_| CachePriority::High)? .mem_mut() .copy_from_slice(&header.to_bytes(true, false)); - self.storage.flush()?; + self.storage.flush(false)?; } self.needs_recovery @@ -335,7 +335,7 @@ impl TransactionalMemory { assert!(!state.header.recovery_required); state.header.recovery_required = true; self.write_header(&state.header, false)?; - self.storage.flush() + self.storage.flush(false) } pub(crate) fn needs_repair(&self) -> Result { @@ -402,7 +402,7 @@ impl TransactionalMemory { let mut state = self.state.lock().unwrap(); state.header.set_region_tracker(tracker_page); self.write_header(&state.header, false)?; - self.storage.flush()?; + self.storage.flush(false)?; state .allocators @@ -410,7 +410,7 @@ impl TransactionalMemory { state.header.recovery_required = false; self.write_header(&state.header, false)?; - let result = self.storage.flush(); + let result = self.storage.flush(false); self.needs_recovery.store(false, Ordering::Release); result @@ -538,20 +538,12 @@ impl TransactionalMemory { // Use 2-phase commit, if checksums are disabled if two_phase { - if eventual { - self.storage.eventual_flush()?; - } else { - self.storage.flush()?; - } + self.storage.flush(eventual)?; } // Swap the primary bit on-disk self.write_header(&header, true)?; - if eventual { - self.storage.eventual_flush()?; - } else { - self.storage.flush()?; - } + self.storage.flush(eventual)?; // Only swap the in-memory primary bit after the fsync is successful header.swap_primary_slot(); @@ -1105,10 +1097,10 @@ impl Drop for TransactionalMemory { return; } - if self.storage.flush().is_ok() && !self.needs_recovery.load(Ordering::Acquire) { + if self.storage.flush(false).is_ok() && !self.needs_recovery.load(Ordering::Acquire) { state.header.recovery_required = false; let _ = self.write_header(&state.header, false); - let _ = self.storage.flush(); + let _ = self.storage.flush(false); } } }