Skip to content

Commit

Permalink
Enable downloading remote dataflow (#682)
Browse files Browse the repository at this point in the history
This PR makes it possible to start a dataflow from scratch without
having to git clone a repository.

## Example

```bash
dora up
dora build https://gist.githubusercontent.com/haixuanTao/b0399d7b82273c4d2a32d4394bd80942/raw/09f7f2927d51c1431ffd7834bbab558cd7343cbb/camera.yaml
dora start https://gist.githubusercontent.com/haixuanTao/b0399d7b82273c4d2a32d4394bd80942/raw/09f7f2927d51c1431ffd7834bbab558cd7343cbb/camera.yaml
```

## Video

[Screencast from 2024-10-09
12-11-57.webm](https://github.com/user-attachments/assets/041ca5a0-7cd2-4dc6-bc34-aa94e6010166)

## Small breaking change

- This slighlty change the logic of `download_file` function to use the
predefined downloaded file name instead of changing it when downloading.
- It removes the caching of `download_file` as there is now no way to
know if the filename and path is going to be the same before hand. I
think that if users want to have caching, they should link the path
before hand instead of using the url. This can be done in conjunction of
a build `wget` or `curl` command.
  • Loading branch information
haixuanTao authored Oct 11, 2024
2 parents 2d8b9ee + b494a47 commit 903cb23
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 48 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions binaries/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dora-core = { workspace = true }
dora-message = { workspace = true }
dora-node-api-c = { workspace = true }
dora-operator-api-c = { workspace = true }
dora-download = { workspace = true }
serde = { version = "1.0.136", features = ["derive"] }
serde_yaml = "0.9.11"
webbrowser = "0.8.3"
Expand Down
7 changes: 5 additions & 2 deletions binaries/cli/src/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ use dora_core::{
use eyre::{eyre, Context};
use std::{path::Path, process::Command};

pub fn build(dataflow: &Path) -> eyre::Result<()> {
let descriptor = Descriptor::blocking_read(dataflow)?;
use crate::resolve_dataflow;

pub fn build(dataflow: String) -> eyre::Result<()> {
let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let descriptor = Descriptor::blocking_read(&dataflow)?;
let dataflow_absolute = if dataflow.is_relative() {
std::env::current_dir().unwrap().join(dataflow)
} else {
Expand Down
32 changes: 25 additions & 7 deletions binaries/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ use colored::Colorize;
use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection};
use dora_coordinator::Event;
use dora_core::{
descriptor::Descriptor,
descriptor::{source_is_url, Descriptor},
topics::{
DORA_COORDINATOR_PORT_CONTROL_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT,
DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT,
},
};
use dora_daemon::Daemon;
use dora_download::download_file;
use dora_message::{
cli_to_coordinator::ControlRequest,
coordinator_to_cli::{ControlRequestReply, DataflowList, DataflowResult, DataflowStatus},
Expand All @@ -21,7 +22,7 @@ use dora_tracing::set_up_tracing_opts;
use duration_str::parse;
use eyre::{bail, Context};
use formatting::FormatDataflowError;
use std::{io::Write, net::SocketAddr};
use std::{env::current_dir, io::Write, net::SocketAddr};
use std::{
net::{IpAddr, Ipv4Addr},
path::PathBuf,
Expand Down Expand Up @@ -80,8 +81,8 @@ enum Command {
/// Run build commands provided in the given dataflow.
Build {
/// Path to the dataflow descriptor file
#[clap(value_name = "PATH", value_hint = clap::ValueHint::FilePath)]
dataflow: PathBuf,
#[clap(value_name = "PATH")]
dataflow: String,
},
/// Generate a new project or node. Choose the language between Rust, Python, C or C++.
New {
Expand Down Expand Up @@ -111,8 +112,8 @@ enum Command {
/// Start the given dataflow path. Attach a name to the running dataflow by using --name.
Start {
/// Path to the dataflow descriptor file
#[clap(value_name = "PATH", value_hint = clap::ValueHint::FilePath)]
dataflow: PathBuf,
#[clap(value_name = "PATH")]
dataflow: String,
/// Assign a name to the dataflow
#[clap(long)]
name: Option<String>,
Expand Down Expand Up @@ -324,7 +325,7 @@ fn run() -> eyre::Result<()> {
graph::create(dataflow, mermaid, open)?;
}
Command::Build { dataflow } => {
build::build(&dataflow)?;
build::build(dataflow)?;
}
Command::New {
args,
Expand Down Expand Up @@ -366,6 +367,7 @@ fn run() -> eyre::Result<()> {
detach,
hot_reload,
} => {
let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let dataflow_descriptor =
Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?;
let working_dir = dataflow
Expand Down Expand Up @@ -656,3 +658,19 @@ fn connect_to_coordinator(
) -> std::io::Result<Box<TcpRequestReplyConnection>> {
TcpLayer::new().connect(coordinator_addr)
}

fn resolve_dataflow(dataflow: String) -> eyre::Result<PathBuf> {
let dataflow = if source_is_url(&dataflow) {
// try to download the shared library
let target_path = current_dir().context("Could not access the current dir")?;
let rt = Builder::new_current_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
rt.block_on(async { download_file(&dataflow, &target_path).await })
.wrap_err("failed to download dataflow yaml file")?
} else {
PathBuf::from(dataflow)
};
Ok(dataflow)
}
10 changes: 3 additions & 7 deletions binaries/daemon/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use dora_node_api::{
};
use eyre::{ContextCompat, WrapErr};
use std::{
env::consts::EXE_EXTENSION,
path::{Path, PathBuf},
process::Stdio,
sync::Arc,
Expand Down Expand Up @@ -101,13 +100,10 @@ pub async fn spawn_node(
source => {
let resolved_path = if source_is_url(source) {
// try to download the shared library
let target_path = Path::new("build")
.join(node_id.to_string())
.with_extension(EXE_EXTENSION);
download_file(source, &target_path)
let target_dir = Path::new("build");
download_file(source, &target_dir)

Check warning on line 104 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Clippy

this expression creates a reference which is immediately dereferenced by the compiler
.await
.wrap_err("failed to download custom node")?;
target_path.clone()
.wrap_err("failed to download custom node")?
} else {
resolve_path(source, working_dir).wrap_err_with(|| {
format!("failed to resolve node source `{}`", source)
Expand Down
7 changes: 2 additions & 5 deletions binaries/runtime/src/operator/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,13 @@ pub fn run(
dataflow_descriptor: &Descriptor,
) -> eyre::Result<()> {
let path = if source_is_url(&python_source.source) {
let target_path = Path::new("build")
.join(node_id.to_string())
.join(format!("{}.py", operator_id));
let target_path = Path::new("build");
// try to download the shared library
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
rt.block_on(download_file(&python_source.source, &target_path))

Check warning on line 50 in binaries/runtime/src/operator/python.rs

View workflow job for this annotation

GitHub Actions / Clippy

this expression creates a reference which is immediately dereferenced by the compiler
.wrap_err("failed to download Python operator")?;
target_path
.wrap_err("failed to download Python operator")?
} else {
Path::new(&python_source.source).to_owned()
};
Expand Down
13 changes: 4 additions & 9 deletions binaries/runtime/src/operator/shared_lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,21 @@ use tokio::sync::{mpsc::Sender, oneshot};
use tracing::{field, span};

pub fn run(
node_id: &NodeId,
operator_id: &OperatorId,
_node_id: &NodeId,
_operator_id: &OperatorId,
source: &str,
events_tx: Sender<OperatorEvent>,
incoming_events: flume::Receiver<Event>,
init_done: oneshot::Sender<Result<()>>,
) -> eyre::Result<()> {
let path = if source_is_url(source) {
let target_path = adjust_shared_library_path(
&Path::new("build")
.join(node_id.to_string())
.join(operator_id.to_string()),
)?;
let target_path = &Path::new("build");
// try to download the shared library
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
rt.block_on(download_file(source, &target_path))

Check warning on line 43 in binaries/runtime/src/operator/shared_lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

this expression creates a reference which is immediately dereferenced by the compiler
.wrap_err("failed to download shared library operator")?;
target_path
.wrap_err("failed to download shared library operator")?
} else {
adjust_shared_library_path(Path::new(source))?
};
Expand Down
52 changes: 34 additions & 18 deletions libraries/extensions/download/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,51 @@
use eyre::Context;
use eyre::{Context, ContextCompat};
#[cfg(unix)]
use std::os::unix::prelude::PermissionsExt;
use std::path::Path;
use std::path::{Path, PathBuf};
use tokio::io::AsyncWriteExt;
use tracing::info;

pub async fn download_file<T>(url: T, target_path: &Path) -> Result<(), eyre::ErrReport>
where
T: reqwest::IntoUrl + std::fmt::Display + Copy,
{
if target_path.exists() {
info!("Using cache: {:?}", target_path.to_str());
return Ok(());
fn get_filename(response: &reqwest::Response) -> Option<String> {
if let Some(content_disposition) = response.headers().get("content-disposition") {
if let Ok(filename) = content_disposition.to_str() {
if let Some(name) = filename.split("filename=").nth(1) {
return Some(name.trim_matches('"').to_string());
}
}
}

if let Some(parent) = target_path.parent() {
tokio::fs::create_dir_all(parent)
.await
.wrap_err("failed to create parent folder")?;
// If Content-Disposition header is not available, extract from URL
let path = Path::new(response.url().as_str());
if let Some(name) = path.file_name() {
if let Some(filename) = name.to_str() {
return Some(filename.to_string());
}
}

None
}

pub async fn download_file<T>(url: T, target_dir: &Path) -> Result<PathBuf, eyre::ErrReport>
where
T: reqwest::IntoUrl + std::fmt::Display + Copy,
{
tokio::fs::create_dir_all(&target_dir)
.await
.wrap_err("failed to create parent folder")?;

let response = reqwest::get(url)
.await
.wrap_err_with(|| format!("failed to request operator from `{url}`"))?
.wrap_err_with(|| format!("failed to request operator from `{url}`"))?;

let filename = get_filename(&response).context("Could not find a filename")?;
let bytes = response
.bytes()
.await
.wrap_err("failed to read operator from `{uri}`")?;
let mut file = tokio::fs::File::create(target_path)
let path = target_dir.join(filename);
let mut file = tokio::fs::File::create(&path)
.await
.wrap_err("failed to create target file")?;
file.write_all(&response)
file.write_all(&bytes)
.await
.wrap_err("failed to write downloaded operator to file")?;
file.sync_all().await.wrap_err("failed to `sync_all`")?;
Expand All @@ -39,5 +55,5 @@ where
.await
.wrap_err("failed to make downloaded file executable")?;

Ok(())
Ok(path.to_path_buf())
}

0 comments on commit 903cb23

Please sign in to comment.