Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable downloading remote dataflow #682

Merged
merged 2 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions binaries/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dora-core = { workspace = true }
dora-message = { workspace = true }
dora-node-api-c = { workspace = true }
dora-operator-api-c = { workspace = true }
dora-download = { workspace = true }
serde = { version = "1.0.136", features = ["derive"] }
serde_yaml = "0.9.11"
webbrowser = "0.8.3"
Expand Down
7 changes: 5 additions & 2 deletions binaries/cli/src/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ use dora_core::{
use eyre::{eyre, Context};
use std::{path::Path, process::Command};

pub fn build(dataflow: &Path) -> eyre::Result<()> {
let descriptor = Descriptor::blocking_read(dataflow)?;
use crate::resolve_dataflow;

pub fn build(dataflow: String) -> eyre::Result<()> {
let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let descriptor = Descriptor::blocking_read(&dataflow)?;
let dataflow_absolute = if dataflow.is_relative() {
std::env::current_dir().unwrap().join(dataflow)
} else {
Expand Down
32 changes: 25 additions & 7 deletions binaries/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ use colored::Colorize;
use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection};
use dora_coordinator::Event;
use dora_core::{
descriptor::Descriptor,
descriptor::{source_is_url, Descriptor},
topics::{
DORA_COORDINATOR_PORT_CONTROL_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT,
DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT,
},
};
use dora_daemon::Daemon;
use dora_download::download_file;
use dora_message::{
cli_to_coordinator::ControlRequest,
coordinator_to_cli::{ControlRequestReply, DataflowList, DataflowResult, DataflowStatus},
Expand All @@ -21,7 +22,7 @@ use dora_tracing::set_up_tracing_opts;
use duration_str::parse;
use eyre::{bail, Context};
use formatting::FormatDataflowError;
use std::{io::Write, net::SocketAddr};
use std::{env::current_dir, io::Write, net::SocketAddr};
use std::{
net::{IpAddr, Ipv4Addr},
path::PathBuf,
Expand Down Expand Up @@ -80,8 +81,8 @@ enum Command {
/// Run build commands provided in the given dataflow.
Build {
/// Path to the dataflow descriptor file
#[clap(value_name = "PATH", value_hint = clap::ValueHint::FilePath)]
dataflow: PathBuf,
#[clap(value_name = "PATH")]
dataflow: String,
},
/// Generate a new project or node. Choose the language between Rust, Python, C or C++.
New {
Expand Down Expand Up @@ -111,8 +112,8 @@ enum Command {
/// Start the given dataflow path. Attach a name to the running dataflow by using --name.
Start {
/// Path to the dataflow descriptor file
#[clap(value_name = "PATH", value_hint = clap::ValueHint::FilePath)]
dataflow: PathBuf,
#[clap(value_name = "PATH")]
dataflow: String,
/// Assign a name to the dataflow
#[clap(long)]
name: Option<String>,
Expand Down Expand Up @@ -324,7 +325,7 @@ fn run() -> eyre::Result<()> {
graph::create(dataflow, mermaid, open)?;
}
Command::Build { dataflow } => {
build::build(&dataflow)?;
build::build(dataflow)?;
}
Command::New {
args,
Expand Down Expand Up @@ -366,6 +367,7 @@ fn run() -> eyre::Result<()> {
detach,
hot_reload,
} => {
let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let dataflow_descriptor =
Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?;
let working_dir = dataflow
Expand Down Expand Up @@ -656,3 +658,19 @@ fn connect_to_coordinator(
) -> std::io::Result<Box<TcpRequestReplyConnection>> {
TcpLayer::new().connect(coordinator_addr)
}

fn resolve_dataflow(dataflow: String) -> eyre::Result<PathBuf> {
let dataflow = if source_is_url(&dataflow) {
// try to download the shared library
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

- // try to download the shared library
+ // try to download the dataflow YAML file

let target_path = current_dir().context("Could not access the current dir")?;
let rt = Builder::new_current_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
rt.block_on(async { download_file(&dataflow, &target_path).await })
.wrap_err("failed to download dataflow yaml file")?
} else {
PathBuf::from(dataflow)
};
Ok(dataflow)
}
10 changes: 3 additions & 7 deletions binaries/daemon/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
};
use eyre::{ContextCompat, WrapErr};
use std::{
env::consts::EXE_EXTENSION,
path::{Path, PathBuf},
process::Stdio,
sync::Arc,
Expand Down Expand Up @@ -101,13 +100,10 @@
source => {
let resolved_path = if source_is_url(source) {
// try to download the shared library
let target_path = Path::new("build")
.join(node_id.to_string())
.with_extension(EXE_EXTENSION);
download_file(source, &target_path)
let target_dir = Path::new("build");
download_file(source, &target_dir)

Check warning on line 104 in binaries/daemon/src/spawn.rs

View workflow job for this annotation

GitHub Actions / Clippy

this expression creates a reference which is immediately dereferenced by the compiler
.await
.wrap_err("failed to download custom node")?;
target_path.clone()
.wrap_err("failed to download custom node")?
Comment on lines -104 to +106
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify the reason for this breaking change?

By renaming the downloaded files we avoided collisions. For example, consider:

nodes:
  - id: camera
    path: example.com/camera/node
    inputs:
      tick: dora/timer/millis/20
    outputs:
      - image

  - id: plot
    path: example.com/plot/node
    inputs:
      image: camera/image

With the new logic, the second download overwrites the first, as both nodes are stored as build/node. It probably still works as expected on Linux because the executables are directly started, but I could imagine that there will be some errors on Windows (since Windows disallows removing executable files that are still running). Also, this collision would prevent us from adding some kind of caching again in the future.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand you're reasoning.

But, I really don't think we should rename file on the fly as it makes it very hard to keep track of which file correspond to what.

I think that if people name everything "node.py" and colludes with names, I don't think that changing every name is the right solution.

Copy link
Collaborator Author

@haixuanTao haixuanTao Oct 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would very much prefer people use build for caching, in the likes of something like:

  - id: plot
    build: wget -O plot.py example.com/plot/node
    path: node.py
    inputs:
      image: camera/image

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that if people name everything "node.py" and colludes with names, I don't think that changing every name is the right solution.

The thing is that the two node.py files might come from different sources. For example, they could be part of different git repositories that are not your own. In this case, you have no way to avoid the name conflict.

But, I really don't think we should rename file on the fly as it makes it very hard to keep track of which file correspond to what.

For the above example, you don't have the source for one of the nodes at all (because it's overwritten).

I would very much prefer people use build for caching, in the likes of something like:

Sure, this is a good alternative. But if we support pointing path to remote URLs, we should be able to deal with potential name collisions.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, this is a good alternative. But if we support pointing path to remote URLs, we should be able to deal with potential name collisions.

Ok, so I guess, what we could do is fail on pre-existing files. That would make the most sense to me.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other alternative would be renaming, either always or only on conflict. If we don't want to do that, I agree that failing is better than silently overwriting the files of other nodes.

} else {
resolve_path(source, working_dir).wrap_err_with(|| {
format!("failed to resolve node source `{}`", source)
Expand Down
7 changes: 2 additions & 5 deletions binaries/runtime/src/operator/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,13 @@
dataflow_descriptor: &Descriptor,
) -> eyre::Result<()> {
let path = if source_is_url(&python_source.source) {
let target_path = Path::new("build")
.join(node_id.to_string())
.join(format!("{}.py", operator_id));
let target_path = Path::new("build");
// try to download the shared library
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
rt.block_on(download_file(&python_source.source, &target_path))

Check warning on line 50 in binaries/runtime/src/operator/python.rs

View workflow job for this annotation

GitHub Actions / Clippy

this expression creates a reference which is immediately dereferenced by the compiler
.wrap_err("failed to download Python operator")?;
target_path
.wrap_err("failed to download Python operator")?
} else {
Path::new(&python_source.source).to_owned()
};
Expand Down
13 changes: 4 additions & 9 deletions binaries/runtime/src/operator/shared_lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,21 @@
use tracing::{field, span};

pub fn run(
node_id: &NodeId,
operator_id: &OperatorId,
_node_id: &NodeId,
_operator_id: &OperatorId,
source: &str,
events_tx: Sender<OperatorEvent>,
incoming_events: flume::Receiver<Event>,
init_done: oneshot::Sender<Result<()>>,
) -> eyre::Result<()> {
let path = if source_is_url(source) {
let target_path = adjust_shared_library_path(
&Path::new("build")
.join(node_id.to_string())
.join(operator_id.to_string()),
)?;
let target_path = &Path::new("build");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for this adjust_shared_library_path was that you can provide cross-platform URLs. E.g. example.com/operator would become example.com/liboperator.so on Linux and example.com/operator.dll on Windows. This is a bit hacky, but without this adjust you need different dataflow files for Windows and Linux.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand, but I honestly find it super hard to keep track about all the rules that we had rather than having simple link.

I would rather prefer user implement their own url resolver for cross platform than choosing a url definition that people might need to remember.

this might resolve some issues as you might also want to deal with architecture (x86 or arm) and I don't think we should expect people to build library for all platform.

Copy link
Collaborator Author

@haixuanTao haixuanTao Oct 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let say I want to use the current status quo for something like:

-> libopencv_plot.so
-> I need to push into a server and need to be referenced at name url.com/libopencv_plot.so any other link will not work.
-> I need to remember to put it in my node as: url.com/opencv_plot which is not obvious
-> And if there is a bug from me or someone else it's going to be: Something went wrong with build/libplot.so for example. Which I need to remember links to url.com/opencv_plot which is my libopencv_plot.so

Knowing that none of this is going to be properly documented in the near future...

Copy link
Collaborator Author

@haixuanTao haixuanTao Oct 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Being cross platform could be as simple as:

  - id: plot
    build: }
        wget -O plot.zip example.com/plot/$OSTYPE/node/plot.zip
        unzip plot.zip
    path: libplot.so
    inputs:
      image: camera/image

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fully agree that the status quo solution was hacky and not ideal. Of course, things get simpler if we simply ignore the platform differences. However, if we want dora to be cross-platform, we should provide some way to use this feature for cross-platform dataflows.

Being cross platform could be as simple as:

[...]
path: libplot.so
[...]

The issue is that the file would need to be named plot.dll if you want to run it on Windows.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we decided to remove operator support in the near future, so we might not have to deal with this anymore. However, the same things also applies to executables, which need to have an .exe suffix on Windows.

Would you be fine with example.com/path/node expanding to example.com/path/node.exe when run on Windows? Or would you prefer some solution with explicit placeholders, e.g. example.com/path/node{EXE_SUFFIX}? Or do you want to make this a "one-platform-only" feature that does not do any path adjustments at all (so that you need a separate dataflow_Windows.yml with adjusted paths)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in windows you can run an exe without the .exe suffix so we can always call something like target/release/node without the exe extension. So I would prefer defaulting to this.

See: https://stackoverflow.com/questions/44441265/calling-an-exe-without-the-extension-but-with-i

FYI, i hope that most binary that we ship for dora can be packaged in a way that is pip or cargo installable so that everything is contained in PATH by default and easy to find and deploy.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's useful for local paths, but unfortunately not for URLs. We need different binaries on Windows and Linux, so downloading the same file would not help. Also, the downloaded file would probably need an .exe extension to be recognized as an executable file on Windows, no?

However, even if we added a .exe extension automatically, we would still have the issue that there are different CPU architectures etc. So placeholders is probably the way to go, e.g. example.com/path/{target}/node, which would expand to example.com/path/x86_64-unknown-linux-gnu/node on an x86_64 Linux system. Alternatively, we could of course support separate URLs for each target, e.g.:

path:
  x86_64-unknown-linux-gnu: https://example.com/path/x86/linux/node
  x86_64-pc-windows-msvc: https://example.com/path/x86/win/node.exe

(Of course, this is not a high priority right now.)

// try to download the shared library
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
rt.block_on(download_file(source, &target_path))

Check warning on line 43 in binaries/runtime/src/operator/shared_lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

this expression creates a reference which is immediately dereferenced by the compiler
.wrap_err("failed to download shared library operator")?;
target_path
.wrap_err("failed to download shared library operator")?
} else {
adjust_shared_library_path(Path::new(source))?
};
Expand Down
52 changes: 34 additions & 18 deletions libraries/extensions/download/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,51 @@
use eyre::Context;
use eyre::{Context, ContextCompat};
#[cfg(unix)]
use std::os::unix::prelude::PermissionsExt;
use std::path::Path;
use std::path::{Path, PathBuf};
use tokio::io::AsyncWriteExt;
use tracing::info;

pub async fn download_file<T>(url: T, target_path: &Path) -> Result<(), eyre::ErrReport>
where
T: reqwest::IntoUrl + std::fmt::Display + Copy,
{
if target_path.exists() {
info!("Using cache: {:?}", target_path.to_str());
return Ok(());
fn get_filename(response: &reqwest::Response) -> Option<String> {
if let Some(content_disposition) = response.headers().get("content-disposition") {
if let Ok(filename) = content_disposition.to_str() {
if let Some(name) = filename.split("filename=").nth(1) {
return Some(name.trim_matches('"').to_string());
}
}
}

if let Some(parent) = target_path.parent() {
tokio::fs::create_dir_all(parent)
.await
.wrap_err("failed to create parent folder")?;
// If Content-Disposition header is not available, extract from URL
let path = Path::new(response.url().as_str());
if let Some(name) = path.file_name() {
if let Some(filename) = name.to_str() {
return Some(filename.to_string());
}
}

None
}

pub async fn download_file<T>(url: T, target_dir: &Path) -> Result<PathBuf, eyre::ErrReport>
where
T: reqwest::IntoUrl + std::fmt::Display + Copy,
{
tokio::fs::create_dir_all(&target_dir)
.await
.wrap_err("failed to create parent folder")?;

let response = reqwest::get(url)
.await
.wrap_err_with(|| format!("failed to request operator from `{url}`"))?
.wrap_err_with(|| format!("failed to request operator from `{url}`"))?;

let filename = get_filename(&response).context("Could not find a filename")?;
let bytes = response
.bytes()
.await
.wrap_err("failed to read operator from `{uri}`")?;
let mut file = tokio::fs::File::create(target_path)
let path = target_dir.join(filename);
let mut file = tokio::fs::File::create(&path)
.await
.wrap_err("failed to create target file")?;
file.write_all(&response)
file.write_all(&bytes)
.await
.wrap_err("failed to write downloaded operator to file")?;
file.sync_all().await.wrap_err("failed to `sync_all`")?;
Expand All @@ -39,5 +55,5 @@ where
.await
.wrap_err("failed to make downloaded file executable")?;

Ok(())
Ok(path.to_path_buf())
}
Loading