diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 6f43ec645..6b6a3bff0 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -20,7 +20,7 @@ use dora_node_api::{ arrow_utils::{copy_array_into_sample, required_data_size}, Metadata, }; -use eyre::WrapErr; +use eyre::{ContextCompat, WrapErr}; use std::{ env::consts::EXE_EXTENSION, path::{Path, PathBuf}, diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 07a47c422..eec5b7ec5 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -3,7 +3,7 @@ use super::{OperatorEvent, StopReason}; use dora_core::{ config::{NodeId, OperatorId}, - descriptor::{source_is_url, Descriptor}, + descriptor::{source_is_url, Descriptor, PythonSource}, }; use dora_download::download_file; use dora_node_api::Event; @@ -35,13 +35,13 @@ fn traceback(err: pyo3::PyErr) -> eyre::Report { pub fn run( node_id: &NodeId, operator_id: &OperatorId, - source: &str, + python_source: &PythonSource, events_tx: Sender, incoming_events: flume::Receiver, init_done: oneshot::Sender>, dataflow_descriptor: &Descriptor, ) -> eyre::Result<()> { - let path = if source_is_url(source) { + let path = if source_is_url(&python_source.source) { let target_path = Path::new("build") .join(node_id.to_string()) .join(format!("{}.py", operator_id)); @@ -49,11 +49,11 @@ pub fn run( let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build()?; - rt.block_on(download_file(source, &target_path)) + rt.block_on(download_file(&python_source.source, &target_path)) .wrap_err("failed to download Python operator")?; target_path } else { - Path::new(source).to_owned() + Path::new(&python_source.source).to_owned() }; if !path.exists() { diff --git a/examples/python-operator-dataflow/dataflow.yml b/examples/python-operator-dataflow/dataflow.yml index ba367da22..400f881f5 100644 --- a/examples/python-operator-dataflow/dataflow.yml +++ b/examples/python-operator-dataflow/dataflow.yml @@ -19,9 +19,7 @@ nodes: - id: plot operator: - python: - source: plot.py - conda_env: base + python: plot.py inputs: image: webcam/image bbox: object_detection/bbox diff --git a/examples/python-operator-dataflow/dataflow_conda.yml b/examples/python-operator-dataflow/dataflow_conda.yml new file mode 100644 index 000000000..ba367da22 --- /dev/null +++ b/examples/python-operator-dataflow/dataflow_conda.yml @@ -0,0 +1,28 @@ +nodes: + - id: webcam + operator: + python: webcam.py + inputs: + tick: dora/timer/millis/50 + outputs: + - image + + - id: object_detection + operator: + send_stdout_as: stdout + python: object_detection.py + inputs: + image: webcam/image + outputs: + - bbox + - stdout + + - id: plot + operator: + python: + source: plot.py + conda_env: base + inputs: + image: webcam/image + bbox: object_detection/bbox + assistant_message: object_detection/stdout diff --git a/examples/python-operator-dataflow/run.rs b/examples/python-operator-dataflow/run.rs index dfc2e8345..7a33abf4a 100644 --- a/examples/python-operator-dataflow/run.rs +++ b/examples/python-operator-dataflow/run.rs @@ -73,8 +73,13 @@ async fn main() -> eyre::Result<()> { .await .context("maturin develop failed")?; - let dataflow = Path::new("dataflow.yml"); - run_dataflow(dataflow).await?; + if env!("CONDA_EXE").is_empty() { + let dataflow = Path::new("dataflow.yml"); + run_dataflow(dataflow).await?; + } else { + let dataflow = Path::new("dataflow_conda.yml"); + run_dataflow(dataflow).await?; + } Ok(()) }