Skip to content

Commit

Permalink
feat: Implement RedbBlockstore (#12)
Browse files Browse the repository at this point in the history
* feat: Implement `RedbBlockstore`

* Update src/redb_blockstore.rs

Co-authored-by: Mikołaj Florkiewicz <[email protected]>

* Update src/redb_blockstore.rs

Co-authored-by: Mikołaj Florkiewicz <[email protected]>

---------

Co-authored-by: Mikołaj Florkiewicz <[email protected]>
  • Loading branch information
oblique and fl0rek authored Apr 12, 2024
1 parent 69173e8 commit 111bc56
Showing 5 changed files with 266 additions and 3 deletions.
10 changes: 9 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -81,6 +81,11 @@ jobs:
steps:
- uses: actions/checkout@v1

- name: Install cargo-hack
uses: taiki-e/cache-cargo-install-action@v1
with:
tool: cargo-hack

- name: Set up cargo cache
uses: actions/cache@v3
with:
@@ -95,8 +100,11 @@ jobs:
cargo-${{ hashFiles('**/Cargo.lock') }}
cargo-

- name: Build (native)
run: cargo hack build --feature-powerset --no-dev-deps --exclude-features indexeddb

- name: Run tests
run: cargo test --all-features
run: cargo hack test --each-feature --exclude-features indexeddb


unused-deps:
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ multihash = "0.19.1"
thiserror = "1.0.40"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
redb = { version = "2", optional = true }
# Upgrading this dependency invalidates existing persistent dbs.
# Those can be restored by migrating between versions:
# https://docs.rs/sled/latest/sled/struct.Db.html#examples-1
@@ -45,9 +46,10 @@ multihash-codetable = { version = "0.1.1", features = ["digest", "sha2"] }
wasm-bindgen-test = "0.3.41"

[features]
indexeddb = ["dep:js-sys", "dep:rexie", "dep:wasm-bindgen"]
lru = ["dep:lru"]
redb = ["dep:redb", "dep:tokio"]
sled = ["dep:sled", "dep:tokio"]
indexeddb = ["dep:js-sys", "dep:rexie", "dep:wasm-bindgen"]

[package.metadata.docs.rs]
rustdoc-args = ["--cfg", "docs_rs"]
18 changes: 17 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -18,6 +18,8 @@ mod in_memory_blockstore;
mod indexed_db_blockstore;
#[cfg(feature = "lru")]
mod lru_blockstore;
#[cfg(all(not(target_arch = "wasm32"), feature = "redb"))]
mod redb_blockstore;
#[cfg(all(not(target_arch = "wasm32"), feature = "sled"))]
mod sled_blockstore;

@@ -28,6 +30,9 @@ pub use crate::indexed_db_blockstore::IndexedDbBlockstore;
#[cfg(feature = "lru")]
#[cfg_attr(docs_rs, doc(cfg(feature = "lru")))]
pub use crate::lru_blockstore::LruBlockstore;
#[cfg(all(not(target_arch = "wasm32"), feature = "redb"))]
#[cfg_attr(docs_rs, doc(cfg(all(not(target_arch = "wasm32"), feature = "redb"))))]
pub use crate::redb_blockstore::RedbBlockstore;
#[cfg(all(not(target_arch = "wasm32"), feature = "sled"))]
#[cfg_attr(docs_rs, doc(cfg(all(not(target_arch = "wasm32"), feature = "sled"))))]
pub use crate::sled_blockstore::SledBlockstore;
@@ -62,7 +67,7 @@ pub enum BlockstoreError {

type Result<T, E = BlockstoreError> = std::result::Result<T, E>;

#[cfg(all(not(target_arch = "wasm32"), feature = "sled"))]
#[cfg(all(not(target_arch = "wasm32"), any(feature = "sled", feature = "redb")))]
impl From<tokio::task::JoinError> for BlockstoreError {
fn from(e: tokio::task::JoinError) -> BlockstoreError {
BlockstoreError::ExecutorError(e.to_string())
@@ -182,6 +187,7 @@ pub(crate) mod tests {
#[rstest]
#[case(new_in_memory::<64>())]
#[cfg_attr(feature = "lru", case(new_lru::<64>()))]
#[cfg_attr(all(not(target_arch = "wasm32"), feature = "redb"), case(new_redb()))]
#[cfg_attr(all(not(target_arch = "wasm32"), feature = "sled"), case(new_sled()))]
#[cfg_attr(
all(target_arch = "wasm32", feature = "indexeddb"),
@@ -210,6 +216,7 @@ pub(crate) mod tests {
#[rstest]
#[case(new_in_memory::<64>())]
#[cfg_attr(feature = "lru", case(new_lru::<64>()))]
#[cfg_attr(all(not(target_arch = "wasm32"), feature = "redb"), case(new_redb()))]
#[cfg_attr(all(not(target_arch = "wasm32"), feature = "sled"), case(new_sled()))]
#[cfg_attr(
all(target_arch = "wasm32", feature = "indexeddb"),
@@ -233,6 +240,7 @@ pub(crate) mod tests {
#[rstest]
#[case(new_in_memory::<128>())]
#[cfg_attr(feature = "lru", case(new_lru::<128>()))]
#[cfg_attr(all(not(target_arch = "wasm32"), feature = "redb"), case(new_redb()))]
#[cfg_attr(all(not(target_arch = "wasm32"), feature = "sled"), case(new_sled()))]
#[cfg_attr(
all(target_arch = "wasm32", feature = "indexeddb"),
@@ -287,6 +295,7 @@ pub(crate) mod tests {
#[rstest]
#[case(new_in_memory::<8>())]
#[cfg_attr(feature = "lru", case(new_lru::<8>()))]
#[cfg_attr(all(not(target_arch = "wasm32"), feature = "redb"), case(new_redb()))]
#[cfg_attr(all(not(target_arch = "wasm32"), feature = "sled"), case(new_sled()))]
#[cfg_attr(
all(target_arch = "wasm32", feature = "indexeddb"),
@@ -308,6 +317,7 @@ pub(crate) mod tests {
#[rstest]
#[case(new_in_memory::<8>())]
#[cfg_attr(feature = "lru", case(new_lru::<8>()))]
#[cfg_attr(all(not(target_arch = "wasm32"), feature = "redb"), case(new_redb()))]
#[cfg_attr(all(not(target_arch = "wasm32"), feature = "sled"), case(new_sled()))]
#[cfg_attr(
all(target_arch = "wasm32", feature = "indexeddb"),
@@ -355,6 +365,7 @@ pub(crate) mod tests {
#[rstest]
#[case(new_in_memory::<8>())]
#[cfg_attr(feature = "lru", case(new_lru::<8>()))]
#[cfg_attr(all(not(target_arch = "wasm32"), feature = "redb"), case(new_redb()))]
#[cfg_attr(all(not(target_arch = "wasm32"), feature = "sled"), case(new_sled()))]
#[cfg_attr(
all(target_arch = "wasm32", feature = "indexeddb"),
@@ -393,6 +404,11 @@ pub(crate) mod tests {
LruBlockstore::new(std::num::NonZeroUsize::new(128).unwrap())
}

#[cfg(all(not(target_arch = "wasm32"), feature = "redb"))]
async fn new_redb() -> RedbBlockstore {
RedbBlockstore::in_memory().unwrap()
}

#[cfg(all(not(target_arch = "wasm32"), feature = "sled"))]
async fn new_sled() -> SledBlockstore {
let path = tempfile::TempDir::with_prefix("sled-blockstore-test")
227 changes: 227 additions & 0 deletions src/redb_blockstore.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
use std::path::Path;
use std::sync::Arc;

use cid::CidGeneric;
use redb::{
CommitError, Database, ReadTransaction, ReadableTable, StorageError, TableDefinition,
TableError, TransactionError, WriteTransaction,
};
use tokio::task::spawn_blocking;

use crate::{Blockstore, BlockstoreError, Result};

const BLOCKS_TABLE: TableDefinition<'static, &[u8], &[u8]> =
TableDefinition::new("BLOCKSTORE.BLOCKS");

/// A [`Blockstore`] implementation backed by a [`redb`] database.
#[derive(Debug)]
pub struct RedbBlockstore {
db: Arc<Database>,
}

impl RedbBlockstore {
/// Open a persistent [`redb`] store.
pub async fn open(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref().to_owned();

let db = spawn_blocking(|| Database::create(path))
.await?
.map_err(|e| BlockstoreError::FatalDatabaseError(e.to_string()))?;

Ok(RedbBlockstore::new(Arc::new(db)))
}

/// Open an in memory [`redb`] store.
pub fn in_memory() -> Result<Self> {
let db = Database::builder()
.create_with_backend(redb::backends::InMemoryBackend::new())
.map_err(|e| BlockstoreError::FatalDatabaseError(e.to_string()))?;

Ok(RedbBlockstore::new(Arc::new(db)))
}

/// Create a new `RedbBlockstore` with the already opened [`redb::Database`].
///
/// # Example
/// ```
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// # use std::sync::Arc;
/// use blockstore::RedbBlockstore;
/// use tokio::task::spawn_blocking;
///
/// let db = spawn_blocking(|| redb::Database::create("path/to/db")).await??;
/// let db = Arc::new(db);
/// let blockstore = RedbBlockstore::new(db);
/// # Ok(())
/// # }
/// ```
pub fn new(db: Arc<Database>) -> Self {
RedbBlockstore { db }
}

/// Returns the raw [`redb::Database`].
///
/// This is useful if you want to pass the database handle to any other
/// stores (e.g. [`blockstore`]).
pub fn raw_db(&self) -> Arc<Database> {
self.db.clone()
}

/// Execute a read transaction.
async fn read_tx<F, T>(&self, f: F) -> Result<T>
where
F: FnOnce(&mut ReadTransaction) -> Result<T> + Send + 'static,
T: Send + 'static,
{
let db = self.db.clone();

spawn_blocking(move || {
let mut tx = db.begin_read()?;
f(&mut tx)
})
.await?
}

/// Execute a write transaction.
///
/// If closure returns an error the store state is not changed, otherwise transaction is commited.
async fn write_tx<F, T>(&self, f: F) -> Result<T>
where
F: FnOnce(&mut WriteTransaction) -> Result<T> + Send + 'static,
T: Send + 'static,
{
let db = self.db.clone();

spawn_blocking(move || {
let mut tx = db.begin_write()?;
let res = f(&mut tx);

if res.is_ok() {
tx.commit()?;
} else {
tx.abort()?;
}

res
})
.await?
}
}

impl Blockstore for RedbBlockstore {
async fn get<const S: usize>(&self, cid: &CidGeneric<S>) -> Result<Option<Vec<u8>>> {
let cid = cid.to_bytes();

self.read_tx(move |tx| {
let blocks_table = match tx.open_table(BLOCKS_TABLE) {
Ok(val) => val,
Err(TableError::TableDoesNotExist(_)) => return Ok(None),
Err(e) => return Err(e.into()),
};

Ok(blocks_table
.get(&cid[..])?
.map(|guard| guard.value().to_owned()))
})
.await
}

async fn put_keyed<const S: usize>(&self, cid: &CidGeneric<S>, data: &[u8]) -> Result<()> {
let cid = cid.to_bytes();
let data = data.to_vec();

self.write_tx(move |tx| {
let mut blocks_table = tx.open_table(BLOCKS_TABLE)?;

if blocks_table.get(&cid[..])?.is_none() {
blocks_table.insert(&cid[..], &data[..])?;
}

Ok(())
})
.await
}

async fn has<const S: usize>(&self, cid: &CidGeneric<S>) -> Result<bool> {
let cid = cid.to_bytes();

self.read_tx(move |tx| {
let blocks_table = match tx.open_table(BLOCKS_TABLE) {
Ok(val) => val,
Err(TableError::TableDoesNotExist(_)) => return Ok(false),
Err(e) => return Err(e.into()),
};

Ok(blocks_table.get(&cid[..])?.is_some())
})
.await
}
}

impl From<TransactionError> for BlockstoreError {
fn from(e: TransactionError) -> Self {
match e {
TransactionError::ReadTransactionStillInUse(_) => {
unreachable!("redb::ReadTransaction::close is never used")
}
e => BlockstoreError::FatalDatabaseError(e.to_string()),
}
}
}

impl From<TableError> for BlockstoreError {
fn from(e: TableError) -> Self {
match e {
TableError::Storage(e) => e.into(),
TableError::TableAlreadyOpen(table, location) => {
unreachable!("Table {table} already opened from: {location}")
}
e @ TableError::TableDoesNotExist(_) => {
unreachable!("redb::ReadTransaction::open_table result not handled correctly: {e}");
}
e => BlockstoreError::StoredDataError(e.to_string()),
}
}
}

impl From<StorageError> for BlockstoreError {
fn from(e: StorageError) -> Self {
match e {
StorageError::ValueTooLarge(_) => BlockstoreError::ValueTooLarge,
e => BlockstoreError::FatalDatabaseError(e.to_string()),
}
}
}

impl From<CommitError> for BlockstoreError {
fn from(e: CommitError) -> Self {
BlockstoreError::FatalDatabaseError(e.to_string())
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::tests::cid_v1;

#[tokio::test]
async fn store_persists() {
let dir = tempfile::TempDir::with_prefix("redb-blockstore-test")
.unwrap()
.into_path();
let db_path = dir.join("db");

let store = RedbBlockstore::open(&db_path).await.unwrap();
let cid = cid_v1::<64>(b"1");
let data = b"data";

store.put_keyed(&cid, data).await.unwrap();

spawn_blocking(move || drop(store)).await.unwrap();

let store = RedbBlockstore::open(&db_path).await.unwrap();
let received = store.get(&cid).await.unwrap();

assert_eq!(received, Some(data.to_vec()));
}
}

0 comments on commit 111bc56

Please sign in to comment.