Skip to content

Commit

Permalink
use even newer bao-tree
Browse files Browse the repository at this point in the history
  • Loading branch information
rklaehn committed Apr 10, 2024
1 parent ceaf168 commit ea6ee24
Show file tree
Hide file tree
Showing 13 changed files with 102 additions and 51 deletions.
11 changes: 5 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion iroh-base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ workspace = true

[dependencies]
anyhow = { version = "1" }
bao-tree = { version = "0.12", features = ["tokio_fsm", "validate"], default-features = false, optional = true }
bao-tree = { version = "0.13", features = ["tokio_fsm", "validate"], default-features = false, optional = true }
data-encoding = { version = "2.3.3", optional = true }
hex = "0.4.3"
multibase = { version = "0.9.1", optional = true }
Expand Down
4 changes: 2 additions & 2 deletions iroh-bytes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ workspace = true

[dependencies]
anyhow = { version = "1" }
bao-tree = { version = "0.12", features = ["tokio_fsm"], default-features = false }
bao-tree = { version = "0.13", features = ["tokio_fsm"], default-features = false }
bytes = { version = "1.4", features = ["serde"] }
chrono = "0.4.31"
data-encoding = "2.3.3"
Expand All @@ -28,7 +28,7 @@ futures-buffered = "0.2.4"
genawaiter = { version = "0.99.1", features = ["futures03"] }
hex = "0.4.3"
iroh-base = { version = "0.13.0", features = ["redb"], path = "../iroh-base" }
iroh-io = { version = "0.4.0", features = ["stats"] }
iroh-io = { version = "0.6.0", features = ["stats"] }
iroh-metrics = { version = "0.13.0", path = "../iroh-metrics", optional = true }
iroh-net = { version = "0.13.0", path = "../iroh-net", optional = true }
num_cpus = "1.15.0"
Expand Down
34 changes: 17 additions & 17 deletions iroh-bytes/src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@ pub mod fsm {
BaoTree, ChunkRanges, TreeNode,
};
use derive_more::From;
use iroh_io::AsyncSliceWriter;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use iroh_io::{AsyncSliceWriter, AsyncStreamReader, TokioStreamReader};
use tokio::io::AsyncWriteExt;

type WrappedRecvStream = TrackingReader<TokioStreamReader<RecvStream>>;

self_cell::self_cell! {
struct RangesIterInner {
Expand Down Expand Up @@ -142,7 +144,7 @@ pub mod fsm {
pub async fn next(self) -> Result<AtConnected, quinn::ConnectionError> {
let start = Instant::now();
let (writer, reader) = self.connection.open_bi().await?;
let reader = TrackingReader::new(reader);
let reader = TrackingReader::new(TokioStreamReader::new(reader));
let writer = TrackingWriter::new(writer);
Ok(AtConnected {
start,
Expand All @@ -157,7 +159,7 @@ pub mod fsm {
#[derive(Debug)]
pub struct AtConnected {
start: Instant,
reader: TrackingReader<quinn::RecvStream>,
reader: WrappedRecvStream,
writer: TrackingWriter<quinn::SendStream>,
request: GetRequest,
}
Expand Down Expand Up @@ -292,7 +294,7 @@ pub mod fsm {
#[derive(Debug)]
pub struct AtStartRoot {
ranges: ChunkRanges,
reader: TrackingReader<quinn::RecvStream>,
reader: TrackingReader<TokioStreamReader<quinn::RecvStream>>,
misc: Box<Misc>,
hash: Hash,
}
Expand All @@ -301,7 +303,7 @@ pub mod fsm {
#[derive(Debug)]
pub struct AtStartChild {
ranges: ChunkRanges,
reader: TrackingReader<quinn::RecvStream>,
reader: TrackingReader<TokioStreamReader<quinn::RecvStream>>,
misc: Box<Misc>,
child_offset: u64,
}
Expand Down Expand Up @@ -376,7 +378,7 @@ pub mod fsm {
#[derive(Debug)]
pub struct AtBlobHeader {
ranges: ChunkRanges,
reader: TrackingReader<quinn::RecvStream>,
reader: TrackingReader<TokioStreamReader<quinn::RecvStream>>,
misc: Box<Misc>,
hash: Hash,
}
Expand Down Expand Up @@ -412,7 +414,7 @@ pub mod fsm {
impl AtBlobHeader {
/// Read the size header, returning it and going into the `Content` state.
pub async fn next(mut self) -> Result<(AtBlobContent, u64), AtBlobHeaderNextError> {
let size = self.reader.read_u64_le().await.map_err(|cause| {
let size = self.reader.read::<8>().await.map_err(|cause| {
if cause.kind() == io::ErrorKind::UnexpectedEof {
AtBlobHeaderNextError::NotFound
} else if let Some(e) = cause
Expand All @@ -424,6 +426,7 @@ pub mod fsm {
AtBlobHeaderNextError::Io(cause)
}
})?;
let size = u64::from_le_bytes(size);
let stream = ResponseDecoder::new(
self.hash.into(),
self.ranges,
Expand Down Expand Up @@ -513,7 +516,7 @@ pub mod fsm {
/// State while we are reading content
#[derive(Debug)]
pub struct AtBlobContent {
stream: ResponseDecoder<TrackingReader<RecvStream>>,
stream: ResponseDecoder<WrappedRecvStream>,
misc: Box<Misc>,
}

Expand Down Expand Up @@ -792,7 +795,7 @@ pub mod fsm {
/// State after we have read all the content for a blob
#[derive(Debug)]
pub struct AtEndBlob {
stream: TrackingReader<RecvStream>,
stream: WrappedRecvStream,
misc: Box<Misc>,
}

Expand Down Expand Up @@ -826,16 +829,12 @@ pub mod fsm {
#[derive(Debug)]
pub struct AtClosing {
misc: Box<Misc>,
reader: TrackingReader<RecvStream>,
reader: WrappedRecvStream,
check_extra_data: bool,
}

impl AtClosing {
fn new(
misc: Box<Misc>,
reader: TrackingReader<RecvStream>,
check_extra_data: bool,
) -> Self {
fn new(misc: Box<Misc>, reader: WrappedRecvStream, check_extra_data: bool) -> Self {
Self {
misc,
reader,
Expand All @@ -846,7 +845,8 @@ pub mod fsm {
/// Finish the get response, returning statistics
pub async fn next(self) -> result::Result<Stats, quinn::ReadError> {
// Shut down the stream
let (mut reader, bytes_read) = self.reader.into_parts();
let (reader, bytes_read) = self.reader.into_parts();
let mut reader = reader.into_inner();
if self.check_extra_data {
if let Some(chunk) = reader.read_chunk(8, false).await? {
reader.stop(0u8.into()).ok();
Expand Down
2 changes: 1 addition & 1 deletion iroh-bytes/src/get/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ pub async fn valid_ranges<D: MapMut>(entry: &D::EntryMut) -> anyhow::Result<Chun
use tracing::trace as log;
// compute the valid range from just looking at the data file
let mut data_reader = entry.data_reader().await?;
let data_size = data_reader.len().await?;
let data_size = data_reader.size().await?;
let valid_from_data = ChunkRanges::from(..ChunkNum::full_chunks(data_size));
// compute the valid range from just looking at the outboard file
let mut outboard = entry.outboard().await?;
Expand Down
5 changes: 4 additions & 1 deletion iroh-bytes/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ pub async fn transfer_collection<D: Map, E: EventSender>(
// wrap the data reader in a tracking reader so we can get some stats for reading
let mut tracking_reader = TrackingSliceReader::new(&mut data);
// send the root
tw.write(outboard.tree().size().to_le_bytes().as_slice())
.await?;
encode_ranges_validated(
&mut tracking_reader,
&mut outboard,
Expand Down Expand Up @@ -490,13 +492,14 @@ pub async fn send_blob<D: Map, W: AsyncStreamWriter>(
db: &D,
name: Hash,
ranges: &RangeSpec,
writer: W,
mut writer: W,
) -> Result<(SentStatus, u64, SliceReaderStats)> {
match db.get(&name).await? {
Some(entry) => {
let outboard = entry.outboard().await?;
let size = outboard.tree().size();
let mut file_reader = TrackingSliceReader::new(entry.data_reader().await?);
writer.write(size.to_le_bytes().as_slice()).await?;
let res = encode_ranges_validated(
&mut file_reader,
outboard,
Expand Down
57 changes: 41 additions & 16 deletions iroh-bytes/src/store/bao_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ impl AsyncSliceReader for DataReader {
.await
}

async fn len(&mut self) -> io::Result<u64> {
async fn size(&mut self) -> io::Result<u64> {
with_storage(
&mut self.0,
BaoFileStorage::is_mem,
Expand Down Expand Up @@ -458,7 +458,7 @@ impl AsyncSliceReader for OutboardReader {
.await
}

async fn len(&mut self) -> io::Result<u64> {
async fn size(&mut self) -> io::Result<u64> {
with_storage(
&mut self.0,
BaoFileStorage::is_mem,
Expand Down Expand Up @@ -720,7 +720,10 @@ impl BaoBatchWriter for BaoFileWriter {

#[cfg(test)]
pub mod test_support {
use std::{io::Cursor, ops::Range};
use std::{
io::{Cursor, Write},
ops::Range,
};

use bao_tree::{
io::{
Expand All @@ -732,9 +735,9 @@ pub mod test_support {
BlockSize, ChunkRanges,
};
use futures::{Future, Stream, StreamExt};
use iroh_io::AsyncStreamReader;
use rand::RngCore;
use range_collections::RangeSet2;
use tokio::io::{AsyncRead, AsyncReadExt};

use crate::util::limited_range;

Expand All @@ -751,10 +754,11 @@ pub mod test_support {
mut target: W,
) -> io::Result<()>
where
R: AsyncRead + Unpin,
R: AsyncStreamReader,
W: BaoBatchWriter,
{
let size = encoded.read_u64_le().await?;
let size = encoded.read::<8>().await?;
let size = u64::from_le_bytes(size);
let mut reading =
ResponseDecoder::new(root.into(), ranges, BaoTree::new(size, block_size), encoded);
let mut stack = Vec::new();
Expand Down Expand Up @@ -793,6 +797,9 @@ pub mod test_support {
pub fn simulate_remote(data: &[u8]) -> (Hash, Cursor<Bytes>) {
let outboard = bao_tree::io::outboard::PostOrderMemOutboard::create(data, IROH_BLOCK_SIZE);
let mut encoded = Vec::new();
encoded
.write_all(&data.len().to_le_bytes().as_slice())
.unwrap();
bao_tree::io::sync::encode_ranges_validated(
data,
&outboard,
Expand Down Expand Up @@ -824,6 +831,9 @@ pub mod test_support {
// compute the outboard
let outboard = PostOrderMemOutboard::create(data, IROH_BLOCK_SIZE).flip();
let mut encoded = Vec::new();
encoded
.write_all(&data.len().to_le_bytes().as_slice())
.unwrap();
encode_ranges_validated(data, &outboard, &chunk_ranges, &mut encoded).unwrap();
(outboard.root.into(), chunk_ranges, encoded)
}
Expand Down Expand Up @@ -866,8 +876,11 @@ pub mod test_support {

#[cfg(test)]
mod tests {
use std::io::Write;

use bao_tree::{blake3, ChunkNum, ChunkRanges};
use futures::StreamExt;
use iroh_io::TokioStreamReader;
use tests::test_support::{
decode_response_into_batch, local, make_wire_data, random_test_data, trickle, validate,
};
Expand Down Expand Up @@ -900,7 +913,7 @@ mod tests {
let trickle = trickle(&wire_data, 1200, std::time::Duration::from_millis(10))
.map(io::Result::Ok)
.boxed();
let trickle = tokio_util::io::StreamReader::new(trickle);
let trickle = TokioStreamReader::new(tokio_util::io::StreamReader::new(trickle));
let _task = tasks.spawn_local(async move {
decode_response_into_batch(hash, IROH_BLOCK_SIZE, chunk_ranges, trickle, file)
.await
Expand All @@ -912,18 +925,22 @@ mod tests {
println!(
"len {:?} {:?}",
handle,
handle.data_reader().len().await.unwrap()
handle.data_reader().size().await.unwrap()
);
#[allow(clippy::single_range_in_vec_init)]
let ranges = [1024 * 16..1024 * 48];
validate(&handle, &test_data, &ranges).await;

// let ranges =
// let full_chunks = bao_tree::io::full_chunk_groups();
let encoded = Vec::new();
let mut encoded = Vec::new();
let ob = handle.outboard().unwrap();
encoded
.write_all(ob.tree.size().to_le_bytes().as_slice())
.unwrap();
bao_tree::io::fsm::encode_ranges_validated(
handle.data_reader(),
handle.outboard().unwrap(),
ob,
&ChunkRanges::from(ChunkNum(16)..ChunkNum(48)),
encoded,
)
Expand Down Expand Up @@ -957,7 +974,7 @@ mod tests {
let trickle = trickle(&wire_data, 1200, std::time::Duration::from_millis(10))
.map(io::Result::Ok)
.boxed();
let trickle = tokio_util::io::StreamReader::new(trickle);
let trickle = TokioStreamReader::new(tokio_util::io::StreamReader::new(trickle));
let task = local.spawn_pinned(move || async move {
decode_response_into_batch(hash, IROH_BLOCK_SIZE, chunk_ranges, trickle, file).await
});
Expand All @@ -969,16 +986,20 @@ mod tests {
println!(
"len {:?} {:?}",
handle,
handle.data_reader().len().await.unwrap()
handle.data_reader().size().await.unwrap()
);
#[allow(clippy::single_range_in_vec_init)]
let ranges = [0..n];
validate(&handle, &test_data, &ranges).await;

let encoded = Vec::new();
let mut encoded = Vec::new();
let ob = handle.outboard().unwrap();
encoded
.write_all(ob.tree.size().to_le_bytes().as_slice())
.unwrap();
bao_tree::io::fsm::encode_ranges_validated(
handle.data_reader(),
handle.outboard().unwrap(),
ob,
&ChunkRanges::all(),
encoded,
)
Expand Down Expand Up @@ -1013,10 +1034,14 @@ mod tests {
.unwrap();
validate(&handle, &test_data, &ranges).await;

let encoded = Vec::new();
let mut encoded = Vec::new();
let ob = handle.outboard().unwrap();
encoded
.write_all(ob.tree.size().to_le_bytes().as_slice())
.unwrap();
bao_tree::io::fsm::encode_ranges_validated(
handle.data_reader(),
handle.outboard().unwrap(),
ob,
&ChunkRanges::all(),
encoded,
)
Expand Down
2 changes: 1 addition & 1 deletion iroh-bytes/src/store/fs/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ async fn actor_store_smoke() {
hash,
IROH_BLOCK_SIZE,
chunk_ranges.clone(),
Cursor::new(wire_data),
Cursor::new(wire_data.as_slice()),
handle.batch_writer().await.unwrap(),
)
.await
Expand Down
Loading

0 comments on commit ea6ee24

Please sign in to comment.