Skip to content

Commit

Permalink
Only use conda if conda is present
Browse files Browse the repository at this point in the history
  • Loading branch information
haixuanTao committed Apr 10, 2024
1 parent c19658d commit c6948c1
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 12 deletions.
2 changes: 1 addition & 1 deletion binaries/cli/src/attach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub fn attach_dataflow(
let path = resolve_path(&python_source.source, &working_dir)
.wrap_err_with(|| {
format!("failed to resolve node source `{}`", python_source.source)
})?;
})?;
node_path_lookup
.insert(path, (dataflow_id, node.id.clone(), Some(op.id.clone())));
}
Expand Down
2 changes: 1 addition & 1 deletion binaries/daemon/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use dora_node_api::{
arrow_utils::{copy_array_into_sample, required_data_size},
Metadata,
};
use eyre::WrapErr;
use eyre::{ContextCompat, WrapErr};
use std::{
env::consts::EXE_EXTENSION,
path::{Path, PathBuf},
Expand Down
10 changes: 5 additions & 5 deletions binaries/runtime/src/operator/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use super::{OperatorEvent, StopReason};
use dora_core::{
config::{NodeId, OperatorId},
descriptor::{source_is_url, Descriptor},
descriptor::{source_is_url, Descriptor, PythonSource},
};
use dora_download::download_file;
use dora_node_api::Event;
Expand Down Expand Up @@ -35,25 +35,25 @@ fn traceback(err: pyo3::PyErr) -> eyre::Report {
pub fn run(
node_id: &NodeId,
operator_id: &OperatorId,
source: &str,
python_source: &PythonSource,
events_tx: Sender<OperatorEvent>,
incoming_events: flume::Receiver<Event>,
init_done: oneshot::Sender<Result<()>>,
dataflow_descriptor: &Descriptor,
) -> eyre::Result<()> {
let path = if source_is_url(source) {
let path = if source_is_url(&python_source.source) {
let target_path = Path::new("build")
.join(node_id.to_string())
.join(format!("{}.py", operator_id));
// try to download the shared library
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
rt.block_on(download_file(source, &target_path))
rt.block_on(download_file(&python_source.source, &target_path))
.wrap_err("failed to download Python operator")?;
target_path
} else {
Path::new(source).to_owned()
Path::new(&python_source.source).to_owned()
};

if !path.exists() {
Expand Down
4 changes: 1 addition & 3 deletions examples/python-operator-dataflow/dataflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ nodes:

- id: plot
operator:
python:
source: plot.py
conda_env: base
python: plot.py
inputs:
image: webcam/image
bbox: object_detection/bbox
Expand Down
28 changes: 28 additions & 0 deletions examples/python-operator-dataflow/dataflow_conda.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
nodes:
- id: webcam
operator:
python: webcam.py
inputs:
tick: dora/timer/millis/50
outputs:
- image

- id: object_detection
operator:
send_stdout_as: stdout
python: object_detection.py
inputs:
image: webcam/image
outputs:
- bbox
- stdout

- id: plot
operator:
python:
source: plot.py
conda_env: base
inputs:
image: webcam/image
bbox: object_detection/bbox
assistant_message: object_detection/stdout
9 changes: 7 additions & 2 deletions examples/python-operator-dataflow/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,13 @@ async fn main() -> eyre::Result<()> {
.await
.context("maturin develop failed")?;

let dataflow = Path::new("dataflow.yml");
run_dataflow(dataflow).await?;
if std::env::var("CONDA_EXE").is_ok() {
let dataflow = Path::new("dataflow.yml");
run_dataflow(dataflow).await?;
} else {
let dataflow = Path::new("dataflow_conda.yml");
run_dataflow(dataflow).await?;
}

Ok(())
}
Expand Down

0 comments on commit c6948c1

Please sign in to comment.