Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Fallible store traits #2005

Merged
merged 11 commits into from
Feb 8, 2024
4 changes: 2 additions & 2 deletions iroh-bytes/src/format/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,12 @@ impl Collection {
where
D: crate::store::Map,
{
let links_entry = db.get(root).context("links not found")?;
let links_entry = db.get(root)?.context("links not found")?;
anyhow::ensure!(links_entry.is_complete(), "links not complete");
let links_bytes = links_entry.data_reader().await?.read_to_end().await?;
let mut links = HashSeq::try_from(links_bytes)?;
let meta_hash = links.pop_front().context("meta hash not found")?;
let meta_entry = db.get(&meta_hash).context("meta not found")?;
let meta_entry = db.get(&meta_hash)?.context("meta not found")?;
anyhow::ensure!(links_entry.is_complete(), "links not complete");
let meta_bytes = meta_entry.data_reader().await?.read_to_end().await?;
let meta: CollectionMeta = postcard::from_bytes(&meta_bytes)?;
Expand Down
8 changes: 4 additions & 4 deletions iroh-bytes/src/get/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async fn get_blob<
hash: &Hash,
progress: impl ProgressSender<Msg = DownloadProgress> + IdGenerator,
) -> Result<Stats, GetError> {
let end = match db.get_possibly_partial(hash) {
let end = match db.get_possibly_partial(hash)? {
PossiblyPartialEntry::Complete(entry) => {
tracing::info!("already got entire blob");
progress
Expand Down Expand Up @@ -287,7 +287,7 @@ async fn get_blob_inner_partial<D: BaoStore>(
///
/// This will compute the valid ranges for partial blobs, so it is somewhat expensive for those.
pub async fn blob_info<D: BaoStore>(db: &D, hash: &Hash) -> io::Result<BlobInfo<D>> {
io::Result::Ok(match db.get_possibly_partial(hash) {
io::Result::Ok(match db.get_possibly_partial(hash)? {
PossiblyPartialEntry::Partial(entry) => {
let valid_ranges = valid_ranges::<D>(&entry)
.await
Expand Down Expand Up @@ -324,7 +324,7 @@ async fn get_hash_seq<
) -> Result<Stats, GetError> {
use tracing::info as log;
let finishing =
if let PossiblyPartialEntry::Complete(entry) = db.get_possibly_partial(root_hash) {
if let PossiblyPartialEntry::Complete(entry) = db.get_possibly_partial(root_hash)? {
log!("already got collection - doing partial download");
// send info that we have the hashseq itself entirely
sender
Expand Down Expand Up @@ -436,7 +436,7 @@ async fn get_hash_seq<
let end_root = get_blob_inner(db, header, sender.clone()).await?;
// read the collection fully for now
let entry = db
.get(root_hash)
.get(root_hash)?
.ok_or_else(|| GetError::LocalFailure(anyhow!("just downloaded but not in db")))?;
let reader = entry.data_reader().await?;
let (mut collection, count) = parse_hash_seq(reader).await.map_err(|err| {
Expand Down
4 changes: 2 additions & 2 deletions iroh-bytes/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ pub async fn handle_get<D: Map, E: EventSender>(
.await;

// 4. Attempt to find hash
match db.get(&hash) {
match db.get(&hash)? {
// Collection or blob request
Some(entry) => {
let mut stats = Box::<TransferStats>::default();
Expand Down Expand Up @@ -492,7 +492,7 @@ pub async fn send_blob<D: Map, W: AsyncStreamWriter>(
ranges: &RangeSpec,
writer: W,
) -> Result<(SentStatus, u64, SliceReaderStats)> {
match db.get(&name) {
match db.get(&name)? {
Some(entry) => {
let outboard = entry.outboard().await?;
let size = outboard.tree().size().0;
Expand Down
145 changes: 83 additions & 62 deletions iroh-bytes/src/store/flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ use crate::util::{LivenessTracker, Tag};
use crate::{BlobFormat, Hash, HashAndFormat, TempTag, IROH_BLOCK_SIZE};
use bao_tree::io::outboard::{PostOrderMemOutboard, PreOrderOutboard};
use bao_tree::io::sync::ReadAt;
use bao_tree::{blake3, ChunkRanges};
use bao_tree::ChunkRanges;
use bao_tree::{BaoTree, ByteNum};
use bytes::Bytes;
use futures::future::BoxFuture;
Expand Down Expand Up @@ -241,7 +241,7 @@ impl PartialEntryData {
}

impl MapEntry<Store> for PartialEntry {
fn hash(&self) -> blake3::Hash {
fn hash(&self) -> Hash {
self.hash
}

Expand All @@ -257,7 +257,7 @@ impl MapEntry<Store> for PartialEntry {
async move {
let file = File::open(self.outboard_path.clone()).await?;
Ok(PreOrderOutboard {
root: self.hash,
root: self.hash.into(),
tree: BaoTree::new(ByteNum(self.size), IROH_BLOCK_SIZE),
data: MemOrFile::File(file),
})
Expand Down Expand Up @@ -294,7 +294,7 @@ impl PartialMapEntry<Store> for PartialEntry {
.await?;
writer.write_at(0, &size.to_le_bytes()).await?;
Ok(PreOrderOutboard {
root: hash,
root: hash.into(),
tree,
data: writer,
})
Expand All @@ -321,22 +321,22 @@ impl PartialMap for Store {

type PartialEntry = PartialEntry;

fn entry_status(&self, hash: &Hash) -> EntryStatus {
fn entry_status(&self, hash: &Hash) -> io::Result<EntryStatus> {
let state = self.0.state.read().unwrap();
if state.complete.contains_key(hash) {
Ok(if state.complete.contains_key(hash) {
EntryStatus::Complete
} else if state.partial.contains_key(hash) {
EntryStatus::Partial
} else {
EntryStatus::NotFound
}
})
}

fn get_possibly_partial(&self, hash: &Hash) -> PossiblyPartialEntry<Self> {
fn get_possibly_partial(&self, hash: &Hash) -> io::Result<PossiblyPartialEntry<Self>> {
let state = self.0.state.read().unwrap();
if let Some(entry) = state.partial.get(hash) {
Ok(if let Some(entry) = state.partial.get(hash) {
PossiblyPartialEntry::Partial(PartialEntry {
hash: blake3::Hash::from(*hash),
hash: *hash,
size: entry.size,
data_path: self.0.options.partial_data_path(*hash, &entry.uuid),
outboard_path: self.0.options.partial_outboard_path(*hash, &entry.uuid),
Expand All @@ -348,7 +348,7 @@ impl PartialMap for Store {
.unwrap_or(PossiblyPartialEntry::NotFound)
} else {
PossiblyPartialEntry::NotFound
}
})
}

fn get_or_create_partial(&self, hash: Hash, size: u64) -> io::Result<Self::PartialEntry> {
Expand All @@ -373,7 +373,7 @@ impl PartialMap for Store {
let data_path = self.0.options.partial_data_path(hash, &entry.uuid);
let outboard_path = self.0.options.partial_outboard_path(hash, &entry.uuid);
Ok(PartialEntry {
hash: blake3::Hash::from(hash),
hash,
size: entry.size,
data_path,
outboard_path,
Expand Down Expand Up @@ -448,13 +448,13 @@ pub struct Store(Arc<Inner>);
#[derive(Debug, Clone)]
pub struct Entry {
/// the hash is not part of the entry itself
hash: blake3::Hash,
hash: Hash,
entry: EntryData,
is_complete: bool,
}

impl MapEntry<Store> for Entry {
fn hash(&self) -> blake3::Hash {
fn hash(&self) -> Hash {
self.hash
}

Expand All @@ -474,7 +474,7 @@ impl MapEntry<Store> for Entry {
let size = self.entry.size();
let data = self.entry.outboard_reader().await?;
Ok(PreOrderOutboard {
root: self.hash,
root: self.hash.into(),
tree: BaoTree::new(ByteNum(size), IROH_BLOCK_SIZE),
data,
})
Expand Down Expand Up @@ -581,7 +581,7 @@ fn needs_outboard(size: u64) -> bool {
/// The [PartialMapEntry] implementation for [Store].
#[derive(Debug, Clone)]
pub struct PartialEntry {
hash: blake3::Hash,
hash: Hash,
size: u64,
data_path: PathBuf,
outboard_path: PathBuf,
Expand All @@ -591,9 +591,9 @@ impl Map for Store {
type Entry = Entry;
type Outboard = PreOrderOutboard<MemOrFile>;
type DataReader = MemOrFile;
fn get(&self, hash: &Hash) -> Option<Self::Entry> {
fn get(&self, hash: &Hash) -> io::Result<Option<Self::Entry>> {
let state = self.0.state.read().unwrap();
if let Some(entry) = state.complete.get(hash) {
Ok(if let Some(entry) = state.complete.get(hash) {
state.get_entry(hash, entry, &self.0.options)
} else if let Some(entry) = state.partial.get(hash) {
let data_path = self.0.options.partial_data_path(*hash, &entry.uuid);
Expand All @@ -605,7 +605,7 @@ impl Map for Store {
hex::encode(entry.uuid)
);
Some(Entry {
hash: blake3::Hash::from(*hash),
hash: *hash,
is_complete: false,
entry: EntryData {
data: Either::Right((data_path, entry.size)),
Expand All @@ -615,15 +615,22 @@ impl Map for Store {
} else {
tracing::trace!("got none {}", hash);
None
}
})
}
}

impl ReadableStore for Store {
fn blobs(&self) -> Box<dyn Iterator<Item = Hash> + Send + Sync + 'static> {
fn blobs(
&self,
) -> io::Result<Box<dyn Iterator<Item = io::Result<Hash>> + Send + Sync + 'static>> {
let inner = self.0.state.read().unwrap();
let items = inner.complete.keys().copied().collect::<Vec<_>>();
Box::new(items.into_iter())
let items = inner
.complete
.keys()
.copied()
.map(io::Result::Ok)
.collect::<Vec<_>>();
Ok(Box::new(items.into_iter()))
}

fn temp_tags(&self) -> Box<dyn Iterator<Item = HashAndFormat> + Send + Sync + 'static> {
Expand All @@ -632,23 +639,35 @@ impl ReadableStore for Store {
Box::new(items)
}

fn tags(&self) -> Box<dyn Iterator<Item = (Tag, HashAndFormat)> + Send + Sync + 'static> {
fn tags(
&self,
) -> io::Result<
Box<dyn Iterator<Item = io::Result<(Tag, HashAndFormat)>> + Send + Sync + 'static>,
> {
let inner = self.0.tags.read().unwrap();
let items = inner
.iter()
.map(|(k, v)| (k.clone(), *v))
.map(io::Result::Ok)
.collect::<Vec<_>>();
Box::new(items.into_iter())
Ok(Box::new(items.into_iter()))
}

fn validate(&self, _tx: mpsc::Sender<ValidateProgress>) -> BoxFuture<'_, anyhow::Result<()>> {
fn validate(&self, _tx: mpsc::Sender<ValidateProgress>) -> BoxFuture<'_, io::Result<()>> {
unimplemented!()
}

fn partial_blobs(&self) -> Box<dyn Iterator<Item = Hash> + Send + Sync + 'static> {
fn partial_blobs(
&self,
) -> io::Result<Box<dyn Iterator<Item = io::Result<Hash>> + Send + Sync + 'static>> {
let lock = self.0.state.read().unwrap();
let res = lock.partial.keys().cloned().collect::<Vec<_>>();
Box::new(res.into_iter())
let res = lock
.partial
.keys()
.cloned()
.map(io::Result::Ok)
.collect::<Vec<_>>();
Ok(Box::new(res.into_iter()))
}

fn export(
Expand Down Expand Up @@ -757,11 +776,10 @@ impl super::Store for Store {
state.live.contains(hash) || state.temp.contains(hash)
}

fn delete(&self, hash: &Hash) -> BoxFuture<'_, io::Result<()>> {
tracing::debug!("delete: {:?}", hash);
fn delete(&self, hashes: Vec<Hash>) -> BoxFuture<'_, io::Result<()>> {
tracing::debug!("delete: {:?}", hashes);
let this = self.clone();
let hash = *hash;
tokio::task::spawn_blocking(move || this.delete_sync(hash))
tokio::task::spawn_blocking(move || this.delete_sync(hashes))
.map(flatten_to_io)
.boxed()
}
Expand Down Expand Up @@ -800,7 +818,7 @@ impl State {
// check if we have the data cached
let data = self.data.get(hash).cloned();
Some(Entry {
hash: blake3::Hash::from(*hash),
hash: *hash,
is_complete: true,
entry: EntryData {
data: if let Some(data) = data {
Expand Down Expand Up @@ -1007,47 +1025,50 @@ impl Store {
Ok(tag)
}

fn delete_sync(&self, hash: Hash) -> io::Result<()> {
let mut data = None;
let mut outboard = None;
let mut paths = None;
let mut partial_data = None;
let mut partial_outboard = None;
fn delete_sync(&self, hashes: Vec<Hash>) -> io::Result<()> {
let mut data = Vec::new();
let mut outboard = Vec::new();
let mut paths = Vec::new();
let mut partial_data = Vec::new();
let mut partial_outboard = Vec::new();
let complete_io_guard = self.0.complete_io_mutex.lock().unwrap();
let mut state = self.0.state.write().unwrap();
if let Some(entry) = state.complete.remove(&hash) {
if entry.owned_data {
data = Some(self.owned_data_path(&hash));
}
if needs_outboard(entry.size) {
outboard = Some(self.owned_outboard_path(&hash));
}
if !entry.external.is_empty() {
paths = Some(self.0.options.paths_path(hash));
for hash in hashes {
if let Some(entry) = state.complete.remove(&hash) {
if entry.owned_data {
data.push(self.owned_data_path(&hash));
}
if needs_outboard(entry.size) {
outboard.push(self.owned_outboard_path(&hash));
}
if !entry.external.is_empty() {
paths.push(self.0.options.paths_path(hash));
}
}
}
if let Some(partial) = state.partial.remove(&hash) {
partial_data = Some(self.0.options.partial_data_path(hash, &partial.uuid));
if needs_outboard(partial.size) {
partial_outboard = Some(self.0.options.partial_outboard_path(hash, &partial.uuid));
if let Some(partial) = state.partial.remove(&hash) {
partial_data.push(self.0.options.partial_data_path(hash, &partial.uuid));
if needs_outboard(partial.size) {
partial_outboard
.push(self.0.options.partial_outboard_path(hash, &partial.uuid));
}
}
state.outboard.remove(&hash);
state.data.remove(&hash);
}
state.outboard.remove(&hash);
state.data.remove(&hash);
drop(state);
if let Some(data) = data {
for data in data {
tracing::debug!("deleting data {}", data.display());
if let Err(cause) = std::fs::remove_file(data) {
tracing::warn!("failed to delete data file: {}", cause);
}
}
if let Some(external) = paths {
for external in paths {
tracing::debug!("deleting paths file {}", external.display());
if let Err(cause) = std::fs::remove_file(external) {
tracing::warn!("failed to delete paths file: {}", cause);
}
}
if let Some(outboard) = outboard {
for outboard in outboard {
tracing::debug!("deleting outboard {}", outboard.display());
if let Err(cause) = std::fs::remove_file(outboard) {
tracing::warn!("failed to delete outboard file: {}", cause);
Expand All @@ -1056,12 +1077,12 @@ impl Store {
drop(complete_io_guard);
// deleting the partial data and outboard files can happen at any time.
// there is no race condition since these are unique names.
if let Some(partial_data) = partial_data {
for partial_data in partial_data {
if let Err(cause) = std::fs::remove_file(partial_data) {
tracing::warn!("failed to delete partial data file: {}", cause);
}
}
if let Some(partial_outboard) = partial_outboard {
for partial_outboard in partial_outboard {
if let Err(cause) = std::fs::remove_file(partial_outboard) {
tracing::warn!("failed to delete partial outboard file: {}", cause);
}
Expand All @@ -1070,7 +1091,7 @@ impl Store {
}

fn insert_complete_sync(&self, entry: PartialEntry) -> io::Result<()> {
let hash = entry.hash.into();
let hash = entry.hash;
let data_path = self.0.options.owned_data_path(&hash);
let size = entry.size;
let temp_data_path = entry.data_path;
Expand Down
Loading
Loading