Skip to content

Commit

Permalink
feat(iroh): add Sync bound and Stream impl for BlobReader (#2063)
Browse files Browse the repository at this point in the history
Depends on n0-computer/quic-rpc#68

This allows usage in contexts where the `BlobReader` is passed to eg
hyper handlers

---------

Co-authored-by: Rüdiger Klaehn <[email protected]>
  • Loading branch information
dignifiedquire and rklaehn authored Mar 12, 2024
1 parent f4d3fab commit 09e3e52
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 7 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 16 additions & 5 deletions iroh/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use std::task::{Context, Poll};

use anyhow::{anyhow, Context as AnyhowContext, Result};
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::{SinkExt, Stream, StreamExt, TryStreamExt};
use iroh_base::ticket::BlobTicket;
use iroh_bytes::export::ExportProgress;
Expand All @@ -28,7 +27,7 @@ use iroh_sync::store::DownloadPolicy;
use iroh_sync::{store::Query, AuthorId, CapabilityKind, NamespaceId};
use iroh_sync::{ContentStatus, RecordIdentifier};
use quic_rpc::message::RpcMsg;
use quic_rpc::{RpcClient, ServiceConnection};
use quic_rpc::{client::BoxStreamSync, RpcClient, ServiceConnection};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
use tokio_util::io::{ReaderStream, StreamReader};
Expand Down Expand Up @@ -667,15 +666,15 @@ pub struct BlobReader {
response_size: u64,
is_complete: bool,
#[debug("StreamReader")]
stream: tokio_util::io::StreamReader<BoxStream<'static, io::Result<Bytes>>, Bytes>,
stream: tokio_util::io::StreamReader<BoxStreamSync<'static, io::Result<Bytes>>, Bytes>,
}

impl BlobReader {
fn new(
size: u64,
response_size: u64,
is_complete: bool,
stream: BoxStream<'static, io::Result<Bytes>>,
stream: BoxStreamSync<'static, io::Result<Bytes>>,
) -> Self {
Self {
size,
Expand Down Expand Up @@ -717,7 +716,7 @@ impl BlobReader {
let len = len
.map(|l| l as u64)
.unwrap_or_else(|| size.value() - offset);
Ok(Self::new(size.value(), len, is_complete, stream.boxed()))
Ok(Self::new(size.value(), len, is_complete, Box::pin(stream)))
}

/// Total size of this blob.
Expand Down Expand Up @@ -750,6 +749,18 @@ impl AsyncRead for BlobReader {
}
}

impl Stream for BlobReader {
type Item = io::Result<Bytes>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.stream).get_pin_mut().poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.get_ref().size_hint()
}
}

/// Document handle
#[derive(Debug, Clone)]
pub struct Doc<C: ServiceConnection<ProviderService>>(Arc<DocInner<C>>);
Expand Down

0 comments on commit 09e3e52

Please sign in to comment.