Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iroh-bytes): bring back validation #2107

Merged
merged 21 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion iroh-base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ workspace = true

[dependencies]
anyhow = { version = "1" }
bao-tree = { version = "0.10.2", features = ["tokio_fsm"], default-features = false, optional = true }
bao-tree = { version = "0.11", features = ["tokio_fsm", "validate"], default-features = false, optional = true }
data-encoding = { version = "2.3.3", optional = true }
hex = "0.4.3"
multibase = { version = "0.9.1", optional = true }
Expand Down
3 changes: 2 additions & 1 deletion iroh-bytes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ workspace = true

[dependencies]
anyhow = { version = "1" }
bao-tree = { version = "0.10.2", features = ["tokio_fsm"], default-features = false }
bao-tree = { version = "0.11", features = ["tokio_fsm"], default-features = false }
bytes = { version = "1.4", features = ["serde"] }
chrono = "0.4.31"
data-encoding = "2.3.3"
derive_more = { version = "1.0.0-beta.1", features = ["debug", "display", "deref", "deref_mut", "from", "try_into", "into"] }
flume = "0.11"
futures = "0.3.25"
futures-buffered = "0.2.4"
genawaiter = { version = "0.99.1", features = ["futures03"] }
hex = "0.4.3"
iroh-base = { version = "0.12.0", features = ["redb"], path = "../iroh-base" }
Expand Down
7 changes: 6 additions & 1 deletion iroh-bytes/src/get/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,12 @@ pub async fn valid_ranges<D: MapMut>(entry: &D::EntryMut) -> anyhow::Result<Chun
let valid_from_data = ChunkRanges::from(..ByteNum(data_size).full_chunks());
// compute the valid range from just looking at the outboard file
let mut outboard = entry.outboard().await?;
let valid_from_outboard = bao_tree::io::fsm::valid_ranges(&mut outboard).await?;
let all = ChunkRanges::all();
let mut stream = bao_tree::io::fsm::valid_outboard_ranges(&mut outboard, &all);
let mut valid_from_outboard = ChunkRanges::empty();
while let Some(range) = stream.next().await {
valid_from_outboard |= ChunkRanges::from(range?);
}
let valid: ChunkRanges = valid_from_data.intersection(&valid_from_outboard);
log!("valid_from_data: {:?}", valid_from_data);
log!("valid_from_outboard: {:?}", valid_from_data);
Expand Down
54 changes: 28 additions & 26 deletions iroh-bytes/src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,10 @@ use crate::{
},
},
util::{
progress::{IdGenerator, IgnoreProgressSender, ProgressSendError, ProgressSender},
progress::{
BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSendError,
ProgressSender,
},
raw_outboard_size, LivenessTracker, MemOrFile,
},
Tag, TempTag, IROH_BLOCK_SIZE,
Expand All @@ -119,8 +122,8 @@ use self::test_support::EntryData;

use super::{
bao_file::{BaoFileConfig, BaoFileHandle, BaoFileHandleWeak, CreateCb},
temp_name, BaoBatchWriter, BaoBlobSize, EntryStatus, ExportMode, ExportProgressCb, ImportMode,
ImportProgress, Map, ReadableStore, TempCounterMap, ValidateProgress,
temp_name, BaoBatchWriter, BaoBlobSize, ConsistencyCheckProgress, EntryStatus, ExportMode,
ExportProgressCb, ImportMode, ImportProgress, Map, TempCounterMap,
};

/// Location of the data.
Expand Down Expand Up @@ -460,37 +463,36 @@ impl ImportSource {
}
}

/// An entry for a partial or complete blob in the store.
#[derive(Debug, Clone, derive_more::From)]
pub struct Entry(BaoFileHandle);
/// Use BaoFileHandle as the entry type for the map.
pub type Entry = BaoFileHandle;

impl super::MapEntry for Entry {
fn hash(&self) -> Hash {
self.0.hash()
self.hash()
}

fn size(&self) -> BaoBlobSize {
let size = self.0.current_size().unwrap();
let size = self.current_size().unwrap();
tracing::trace!("redb::Entry::size() = {}", size);
BaoBlobSize::new(size, self.is_complete())
}

fn is_complete(&self) -> bool {
self.0.is_complete()
self.is_complete()
}

async fn outboard(&self) -> io::Result<impl Outboard> {
self.0.outboard()
self.outboard()
}

async fn data_reader(&self) -> io::Result<impl AsyncSliceReader> {
Ok(self.0.data_reader())
Ok(self.data_reader())
}
}

impl super::MapEntryMut for Entry {
async fn batch_writer(&self) -> io::Result<impl BaoBatchWriter> {
Ok(self.0.writer())
Ok(self.writer())
}
}

Expand Down Expand Up @@ -641,9 +643,9 @@ pub(crate) enum ActorMessage {
///
/// Note that this will block the actor until it is done, so don't use it
/// on a node under load.
Validate {
Fsck {
repair: bool,
progress: tokio::sync::mpsc::Sender<ValidateProgress>,
progress: BoxedProgressSender<ConsistencyCheckProgress>,
tx: oneshot::Sender<ActorResult<()>>,
},
/// Internal method: notify the actor that a new gc epoch has started.
Expand Down Expand Up @@ -678,7 +680,7 @@ impl ActorMessage {
Self::UpdateInlineOptions { .. }
| Self::Sync { .. }
| Self::Shutdown { .. }
| Self::Validate { .. }
| Self::Fsck { .. }
| Self::ImportFlatStore { .. } => MessageCategory::TopLevel,
#[cfg(test)]
Self::EntryState { .. } => MessageCategory::ReadOnly,
Expand Down Expand Up @@ -946,7 +948,7 @@ impl StoreInner {

async fn complete(&self, entry: Entry) -> OuterResult<()> {
self.tx
.send_async(ActorMessage::OnComplete { handle: entry.0 })
.send_async(ActorMessage::OnComplete { handle: entry })
.await?;
Ok(())
}
Expand Down Expand Up @@ -994,14 +996,14 @@ impl StoreInner {
Ok(rx.await??)
}

async fn validate(
async fn consistency_check(
&self,
repair: bool,
progress: tokio::sync::mpsc::Sender<ValidateProgress>,
progress: BoxedProgressSender<ConsistencyCheckProgress>,
) -> OuterResult<()> {
let (tx, rx) = oneshot::channel();
self.tx
.send_async(ActorMessage::Validate {
.send_async(ActorMessage::Fsck {
repair,
progress,
tx,
Expand Down Expand Up @@ -1284,7 +1286,7 @@ impl super::MapMut for Store {
type EntryMut = Entry;

async fn get_or_create(&self, hash: Hash, _size: u64) -> io::Result<Self::EntryMut> {
Ok(self.0.get_or_create(hash).await?.into())
Ok(self.0.get_or_create(hash).await?)
}

async fn entry_status(&self, hash: &Hash) -> io::Result<EntryStatus> {
Expand All @@ -1304,7 +1306,7 @@ impl super::MapMut for Store {
}
}

impl ReadableStore for Store {
impl super::ReadableStore for Store {
async fn blobs(&self) -> io::Result<super::DbIter<Hash>> {
Ok(Box::new(self.0.blobs().await?.into_iter()))
}
Expand All @@ -1321,12 +1323,12 @@ impl ReadableStore for Store {
Box::new(self.0.temp.read().unwrap().keys())
}

async fn validate(
async fn consistency_check(
&self,
repair: bool,
tx: tokio::sync::mpsc::Sender<ValidateProgress>,
tx: BoxedProgressSender<ConsistencyCheckProgress>,
) -> io::Result<()> {
self.0.validate(repair, tx).await?;
self.0.consistency_check(repair, tx.clone()).await?;
Ok(())
}

Expand Down Expand Up @@ -2141,12 +2143,12 @@ impl ActorState {
let res = self.update_inline_options(db, inline_options, reapply);
tx.send(res?).ok();
}
ActorMessage::Validate {
ActorMessage::Fsck {
repair,
progress,
tx,
} => {
let res = self.validate(db, repair, progress);
let res = self.consistency_check(db, repair, progress);
tx.send(res).ok();
}
ActorMessage::Sync { tx } => {
Expand Down
36 changes: 6 additions & 30 deletions iroh-bytes/src/store/fs/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::time::Duration;
use crate::store::bao_file::test_support::{
decode_response_into_batch, make_wire_data, random_test_data, simulate_remote, validate,
};
use crate::store::{Map as _, MapEntry, MapEntryMut, MapMut, Store as _};
use crate::store::{Map as _, MapEntryMut, MapMut, ReadableStore, Store as _};
use crate::util::raw_outboard;

macro_rules! assert_matches {
Expand Down Expand Up @@ -80,13 +80,7 @@ async fn get_cases() {
.unwrap();
let res = db.get(&hash).await.unwrap();
let entry = res.expect("entry not found");
let actual = entry
.data_reader()
.await
.unwrap()
.read_to_end()
.await
.unwrap();
let actual = entry.data_reader().read_to_end().await.unwrap();
assert_eq!(actual, small);
drop(tt);
}
Expand All @@ -98,13 +92,7 @@ async fn get_cases() {
let tt = db.import_bytes(mid.clone(), BlobFormat::Raw).await.unwrap();
let res = db.get(&hash).await.unwrap();
let entry = res.expect("entry not found");
let actual = entry
.data_reader()
.await
.unwrap()
.read_to_end()
.await
.unwrap();
let actual = entry.data_reader().read_to_end().await.unwrap();
assert_eq!(actual, mid);
drop(tt);
}
Expand All @@ -119,13 +107,7 @@ async fn get_cases() {
.unwrap();
let res = db.get(&hash).await.unwrap();
let entry = res.expect("entry not found");
let actual = entry
.data_reader()
.await
.unwrap()
.read_to_end()
.await
.unwrap();
let actual = entry.data_reader().read_to_end().await.unwrap();
assert_eq!(actual, large);
drop(tt);
}
Expand All @@ -142,13 +124,7 @@ async fn get_cases() {
.unwrap();
let res = db.get(&hash).await.unwrap();
let entry = res.expect("entry not found");
let actual = entry
.data_reader()
.await
.unwrap()
.read_to_end()
.await
.unwrap();
let actual = entry.data_reader().read_to_end().await.unwrap();
assert_eq!(actual, mid);
drop(tt);
}
Expand Down Expand Up @@ -823,7 +799,7 @@ async fn actor_store_smoke() {
)
.await
.unwrap();
validate(&handle.0, &data, &ranges).await;
validate(&handle, &data, &ranges).await;
db.insert_complete(handle).await.unwrap();
db.sync().await.unwrap();
db.dump().await.unwrap();
Expand Down
Loading
Loading