Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: CLI get command improvements #331

Merged
merged 29 commits into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
761c1fd
Go from async_trait to being explicit
faassen Oct 12, 2022
ac4d00d
Bring save_get_stream under test
faassen Oct 12, 2022
569aa90
Getting the fixture to produce the stream now
faassen Oct 12, 2022
872ce77
Clean up moving api ext into its own module
faassen Oct 12, 2022
d0ccc40
get now returns the root path
faassen Oct 12, 2022
7a15ead
Use new cid method on ipfs path
faassen Oct 12, 2022
c9738be
we can now test file output with trycmd!
faassen Oct 12, 2022
df4f62e
Slightly better names
faassen Oct 12, 2022
54c3fe4
Test for case where we have an explicit path
faassen Oct 12, 2022
c4d6208
We now fail if the output path already exists
faassen Oct 12, 2022
0763b0b
We fail if we try to overwrite a named directory
faassen Oct 12, 2022
6dafde0
The relative-path dependency only is needed in the testing feature
faassen Oct 12, 2022
3d6f35e
Rename away from `foo` as that's in .gitignore!
faassen Oct 13, 2022
518c2d8
The behavior is now to produce relative paths, which may be empty
faassen Oct 13, 2022
03e92a5
A test for a wrapped file
faassen Oct 13, 2022
3b1bd8e
Test the unwrapped file scenario
faassen Oct 13, 2022
9538328
Overwriting single files isn't allowed either
faassen Oct 13, 2022
96999ed
Lifetime elision is possible here
faassen Oct 13, 2022
377d5a6
avoid empty directories
faassen Oct 14, 2022
3cdb95b
This comment got deformatted
faassen Oct 14, 2022
5c8a89b
Make it so that we can get sub paths.
faassen Oct 14, 2022
671d10c
Move this line
faassen Oct 14, 2022
bf18274
add a note about doing this in the resolver instead in the future
faassen Oct 14, 2022
bb054cc
Placate clippy
faassen Oct 14, 2022
5901f56
when we get a specific path we need to save under that name
faassen Oct 14, 2022
a9ad5f1
Update to a more recent version of clap to fix CI issues
faassen Oct 14, 2022
9079cf9
made a nicer todo and comment
faassen Oct 14, 2022
645f3a0
Make a stronger argument for why this is the wrong place and attribut…
faassen Oct 14, 2022
2b71cf3
Maybe it's not touching the store now, could be, it's streaming after…
faassen Oct 14, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions iroh-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,7 @@ futures = "0.3.21"
async-stream = "0.3.3"
mockall = { version = "0.11.2", optional = true }
serde = { version = "1.0", features = ["derive"] }
relative-path = "1.7.2"

[dev-dependencies]
tempdir = "0.3.7"
122 changes: 77 additions & 45 deletions iroh-api/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,55 @@ use crate::config::{Config, CONFIG_FILE_NAME, ENV_PREFIX};
#[cfg(feature = "testing")]
use crate::p2p::MockP2p;
use crate::p2p::{ClientP2p, P2p};
use crate::{Cid, IpfsPath};
use anyhow::Result;
use async_trait::async_trait;
use cid::Cid;
use futures::future::{BoxFuture, LocalBoxFuture};
use futures::stream::LocalBoxStream;
use futures::FutureExt;
use futures::StreamExt;
use iroh_resolver::resolver::Path as IpfsPath;
use iroh_resolver::{resolver, unixfs_builder};
use iroh_resolver::unixfs_builder;
use iroh_rpc_client::Client;
use iroh_rpc_client::StatusTable;
use iroh_util::{iroh_config_path, make_config};
#[cfg(feature = "testing")]
use mockall::automock;
use relative_path::RelativePathBuf;
use tokio::io::AsyncRead;

pub struct Iroh {
client: Client,
}

pub enum OutType<T: resolver::ContentLoader> {
pub enum OutType {
Dir,
Reader(resolver::OutPrettyReader<T>),
Reader(Box<dyn AsyncRead + Unpin>),
}

// Note: `#[async_trait]` is deliberately not in use for this trait, because it
// became very hard to express what we wanted once streams were involved.
// Instead we spell things out explicitly without magic.

#[cfg_attr(feature= "testing", automock(type P = MockP2p;))]
#[async_trait(?Send)]
pub trait Api {
type P: P2p;

fn p2p(&self) -> Result<Self::P>;

async fn get<'a>(&self, ipfs_path: &IpfsPath, output: Option<&'a Path>) -> Result<()>;
async fn add(&self, path: &Path, recursive: bool, no_wrap: bool) -> Result<Cid>;
async fn check(&self) -> StatusTable;
async fn watch<'a>(&self) -> LocalBoxStream<'a, StatusTable>;
/// Produces a asynchronous stream of file descriptions
/// Each description is a tuple of a relative path, and either a `Directory` or a `Reader`
/// with the file contents.
fn get_stream(
&self,
ipfs_path: &IpfsPath,
) -> LocalBoxStream<'_, Result<(RelativePathBuf, OutType)>>;
fn add<'a>(
&'a self,
path: &'a Path,
recursive: bool,
no_wrap: bool,
) -> LocalBoxFuture<'_, Result<Cid>>;
fn check(&self) -> BoxFuture<'_, StatusTable>;
fn watch(&self) -> LocalBoxFuture<'static, LocalBoxStream<'static, StatusTable>>;
}

impl Iroh {
Expand Down Expand Up @@ -67,13 +83,8 @@ impl Iroh {
fn from_client(client: Client) -> Self {
Self { client }
}

pub(crate) fn get_client(&self) -> &Client {
&self.client
}
}

#[async_trait(?Send)]
impl Api for Iroh {
type P = ClientP2p;

Expand All @@ -82,45 +93,66 @@ impl Api for Iroh {
Ok(ClientP2p::new(p2p_client.clone()))
}

async fn get<'b>(&self, ipfs_path: &IpfsPath, output: Option<&'b Path>) -> Result<()> {
let blocks = self.get_stream(ipfs_path, output);
tokio::pin!(blocks);
while let Some(block) = blocks.next().await {
let (path, out) = block?;
match out {
OutType::Dir => {
tokio::fs::create_dir_all(path).await?;
fn get_stream(
&self,
ipfs_path: &IpfsPath,
) -> LocalBoxStream<'_, Result<(RelativePathBuf, OutType)>> {
tracing::debug!("get {:?}", ipfs_path);
let resolver = iroh_resolver::resolver::Resolver::new(self.client.clone());
let results = resolver.resolve_recursive_with_paths(ipfs_path.clone());
let sub_path = ipfs_path.to_relative_string();
async_stream::try_stream! {
tokio::pin!(results);
while let Some(res) = results.next().await {
let (relative_ipfs_path, out) = res?;
let relative_path = RelativePathBuf::from_path(&relative_ipfs_path.to_relative_string())?;
// TODO this focusing in on sub-paths should really be handled in the resolver,
// because it'll be testable there, and it allows potential optimizations we
// can't do here
faassen marked this conversation as resolved.
Show resolved Hide resolved
if !relative_path.starts_with(&sub_path) {
continue;
}
OutType::Reader(mut reader) => {
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let mut f = tokio::fs::File::create(path).await?;
tokio::io::copy(&mut reader, &mut f).await?;
let relative_path = relative_path.strip_prefix(&sub_path).expect("should be a prefix").to_owned();
if out.is_dir() {
yield (relative_path, OutType::Dir);
} else {
let reader = out.pretty(resolver.clone(), Default::default())?;
yield (relative_path, OutType::Reader(Box::new(reader)));
}
}
}
Ok(())
.boxed_local()
}

async fn add(&self, path: &Path, recursive: bool, no_wrap: bool) -> Result<Cid> {
let providing_client = iroh_resolver::unixfs_builder::StoreAndProvideClient {
client: Box::new(self.get_client()),
};
if path.is_dir() {
unixfs_builder::add_dir(Some(&providing_client), path, !no_wrap, recursive).await
} else if path.is_file() {
unixfs_builder::add_file(Some(&providing_client), path, !no_wrap).await
} else {
anyhow::bail!("can only add files or directories");
fn add<'a>(
&'a self,
path: &'a Path,
recursive: bool,
no_wrap: bool,
) -> LocalBoxFuture<'_, Result<Cid>> {
async move {
let providing_client = iroh_resolver::unixfs_builder::StoreAndProvideClient {
client: Box::new(&self.client),
};
if path.is_dir() {
unixfs_builder::add_dir(Some(&providing_client), path, !no_wrap, recursive).await
} else if path.is_file() {
unixfs_builder::add_file(Some(&providing_client), path, !no_wrap).await
} else {
anyhow::bail!("can only add files or directories");
}
}
.boxed_local()
}

async fn check(&self) -> StatusTable {
self.client.check().await
fn check(&self) -> BoxFuture<'_, StatusTable> {
async { self.client.check().await }.boxed()
}

async fn watch<'b>(&self) -> LocalBoxStream<'b, iroh_rpc_client::StatusTable> {
self.client.clone().watch().await.boxed()
fn watch(
&self,
) -> LocalBoxFuture<'static, LocalBoxStream<'static, iroh_rpc_client::StatusTable>> {
let client = self.client.clone();
async { client.watch().await.boxed_local() }.boxed_local()
}
}
124 changes: 124 additions & 0 deletions iroh-api/src/api_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use std::path::{Path, PathBuf};

use crate::{Api, IpfsPath, OutType};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use futures::Stream;
use futures::StreamExt;
use relative_path::RelativePathBuf;

#[async_trait(?Send)]
pub trait ApiExt: Api {
/// High level get, equivalent of CLI `iroh get`
async fn get<'a>(
&self,
ipfs_path: &IpfsPath,
output_path: Option<&'a Path>,
) -> Result<PathBuf> {
if ipfs_path.cid().is_none() {
return Err(anyhow!("IPFS path does not refer to a CID"));
}
let root_path = get_root_path(ipfs_path, output_path);
if root_path.exists() {
return Err(anyhow!(
"output path {} already exists",
root_path.display()
));
}
let blocks = self.get_stream(ipfs_path);
save_get_stream(&root_path, blocks).await?;
Ok(root_path)
}
}

impl<T> ApiExt for T where T: Api {}

/// take a stream of blocks as from `get_stream` and write them to the filesystem
async fn save_get_stream(
root_path: &Path,
blocks: impl Stream<Item = Result<(RelativePathBuf, OutType)>>,
) -> Result<()> {
tokio::pin!(blocks);
while let Some(block) = blocks.next().await {
let (path, out) = block?;
let full_path = path.to_path(root_path);
match out {
OutType::Dir => {
tokio::fs::create_dir_all(full_path).await?;
}
OutType::Reader(mut reader) => {
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent.to_path(root_path)).await?;
}
let mut f = tokio::fs::File::create(full_path).await?;
tokio::io::copy(&mut reader, &mut f).await?;
}
}
}
Ok(())
}

/// Given an cid and an optional output path, determine root path
fn get_root_path(ipfs_path: &IpfsPath, output_path: Option<&Path>) -> PathBuf {
match output_path {
Some(path) => path.to_path_buf(),
None => {
if ipfs_path.tail().is_empty() {
PathBuf::from(ipfs_path.cid().unwrap().to_string())
} else {
PathBuf::from(ipfs_path.tail().last().unwrap())
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;
use tempdir::TempDir;

#[tokio::test]
async fn test_save_get_stream() {
let stream = Box::pin(futures::stream::iter(vec![
Ok((RelativePathBuf::from_path("a").unwrap(), OutType::Dir)),
Ok((
RelativePathBuf::from_path("b").unwrap(),
OutType::Reader(Box::new(std::io::Cursor::new("hello"))),
)),
]));
let tmp_dir = TempDir::new("test_save_get_stream").unwrap();
save_get_stream(tmp_dir.path(), stream).await.unwrap();
assert!(tmp_dir.path().join("a").is_dir());
assert_eq!(
std::fs::read_to_string(tmp_dir.path().join("b")).unwrap(),
"hello"
);
}

#[test]
fn test_get_root_path() {
let ipfs_path =
IpfsPath::from_str("/ipfs/QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N").unwrap();
assert_eq!(
get_root_path(&ipfs_path, None),
PathBuf::from("QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N")
);
assert_eq!(
get_root_path(&ipfs_path, Some(Path::new("bar"))),
PathBuf::from("bar")
);
}

#[test]
fn test_get_root_path_with_tail() {
let ipfs_path =
IpfsPath::from_str("/ipfs/QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N/tail")
.unwrap();
assert_eq!(get_root_path(&ipfs_path, None), PathBuf::from("tail"));
assert_eq!(
get_root_path(&ipfs_path, Some(Path::new("bar"))),
PathBuf::from("bar")
);
}
}
Loading