From 15af539729a44062c9fe4a155c89488c7eb26646 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Wed, 10 Apr 2024 13:39:24 +0200 Subject: [PATCH 1/4] Adding customizing `conda_env` within Python operator This commit makes it possible to specify the conda env that we want to use in a specific operator. ```yaml - id: robot operator: python: source: ../operators/robot.py conda_env: robomaster ``` This will call: ```bash conda run -n robomaster python -c "import dora; dora.start_runtime()" ``` --- Cargo.lock | 1 + binaries/cli/src/attach.rs | 8 +-- binaries/daemon/Cargo.toml | 1 + binaries/daemon/src/spawn.rs | 63 ++++++++++++++++++----- libraries/core/src/descriptor/mod.rs | 46 ++++++++++++++++- libraries/core/src/descriptor/validate.rs | 5 +- 6 files changed, 105 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2f260d9ab..51e0deda3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1557,6 +1557,7 @@ dependencies = [ "tracing", "tracing-opentelemetry", "uuid", + "which", ] [[package]] diff --git a/binaries/cli/src/attach.rs b/binaries/cli/src/attach.rs index 38a12aeed..c121fb0dd 100644 --- a/binaries/cli/src/attach.rs +++ b/binaries/cli/src/attach.rs @@ -38,10 +38,12 @@ pub fn attach_dataflow( CoreNodeKind::Custom(_cn) => (), CoreNodeKind::Runtime(rn) => { for op in rn.operators.iter() { - if let dora_core::descriptor::OperatorSource::Python(source) = &op.config.source + if let dora_core::descriptor::OperatorSource::Python(python_source) = + &op.config.source { - let path = resolve_path(source, &working_dir).wrap_err_with(|| { - format!("failed to resolve node source `{}`", source) + let path = resolve_path(&python_source.source, &working_dir) + .wrap_err_with(|| { + format!("failed to resolve node source `{}`", python_source.source) })?; node_path_lookup .insert(path, (dataflow_id, node.id.clone(), Some(op.id.clone()))); diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index 8086394e8..dbed43b6f 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -37,3 +37,4 @@ bincode = "1.3.3" async-trait = "0.1.64" aligned-vec = "0.5.0" ctrlc = "3.2.5" +which = "5.0.0" diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 1681c0ad1..6f43ec645 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -8,7 +8,8 @@ use dora_core::{ config::{DataId, NodeRunConfig}, daemon_messages::{DataMessage, DataflowId, NodeConfig, RuntimeConfig, Timestamped}, descriptor::{ - resolve_path, source_is_url, Descriptor, OperatorSource, ResolvedNode, SHELL_SOURCE, + resolve_path, source_is_url, Descriptor, OperatorDefinition, OperatorSource, PythonSource, + ResolvedNode, SHELL_SOURCE, }, get_python_path, message::uhlc::HLC, @@ -149,26 +150,62 @@ pub async fn spawn_node( })? } dora_core::descriptor::CoreNodeKind::Runtime(n) => { - let has_python_operator = n + let python_operators: Vec<&OperatorDefinition> = n .operators .iter() - .any(|x| matches!(x.config.source, OperatorSource::Python { .. })); + .filter(|x| matches!(x.config.source, OperatorSource::Python { .. })) + .collect(); - let has_other_operator = n + let other_operators = n .operators .iter() .any(|x| !matches!(x.config.source, OperatorSource::Python { .. })); - let mut command = if has_python_operator && !has_other_operator { + let mut command = if !python_operators.is_empty() && !other_operators { // Use python to spawn runtime if there is a python operator - let python = get_python_path().context("Could not find python in daemon")?; - let mut command = tokio::process::Command::new(python); - command.args([ - "-c", - format!("import dora; dora.start_runtime() # {}", node.id).as_str(), - ]); - command - } else if !has_python_operator && has_other_operator { + + // TODO: Handle multi-operator runtime once sub-interpreter is supported + if python_operators.len() > 2 { + eyre::bail!( + "Runtime currently only support one Python Operator. + This is because pyo4 sub-interpreter is not yet available. + See: https://github.com/PyO4/pyo3/issues/576" + ); + } + + let python_operator = python_operators + .first() + .context("Runtime had no operators definition.")?; + + if let OperatorSource::Python(PythonSource { + source: _, + conda_env: Some(conda_env), + }) = &python_operator.config.source + { + let conda = which::which("conda").context( + "failed to find `conda`, yet a `conda_env` was defined. Make sure that `conda` is available.", + )?; + let mut command = tokio::process::Command::new(conda); + command.args([ + "run", + "-n", + &conda_env, + "python", + "-c", + format!("import dora; dora.start_runtime() # {}", node.id).as_str(), + ]); + command + } else { + let python = get_python_path() + .context("Could not find python path when spawning runtime node")?; + let mut command = tokio::process::Command::new(python); + command.args([ + "-c", + format!("import dora; dora.start_runtime() # {}", node.id).as_str(), + ]); + command + } + } else if python_operators.is_empty() && other_operators { let mut cmd = tokio::process::Command::new( std::env::current_exe().wrap_err("failed to get current executable path")?, ); diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 90ad74e68..ad4b7757a 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -260,9 +260,53 @@ pub struct OperatorConfig { #[serde(rename_all = "kebab-case")] pub enum OperatorSource { SharedLibrary(String), - Python(String), + Python(PythonSource), Wasm(String), } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde( + deny_unknown_fields, + from = "PythonSourceDef", + into = "PythonSourceDef" +)] +pub struct PythonSource { + pub source: String, + pub conda_env: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(untagged)] +pub enum PythonSourceDef { + SourceOnly(String), + WithOptions { + source: String, + conda_env: Option, + }, +} + +impl From for PythonSourceDef { + fn from(input: PythonSource) -> Self { + match input { + PythonSource { + source, + conda_env: None, + } => Self::SourceOnly(source), + PythonSource { source, conda_env } => Self::WithOptions { source, conda_env }, + } + } +} + +impl From for PythonSource { + fn from(value: PythonSourceDef) -> Self { + match value { + PythonSourceDef::SourceOnly(source) => Self { + source, + conda_env: None, + }, + PythonSourceDef::WithOptions { source, conda_env } => Self { source, conda_env }, + } + } +} pub fn source_is_url(source: &str) -> bool { source.contains("://") diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index 316d3c588..1da9328af 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -50,9 +50,10 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result } } } - OperatorSource::Python(path) => { + OperatorSource::Python(python_source) => { has_python_operator = true; - if source_is_url(path) { + let path = &python_source.source; + if source_is_url(&path) { info!("{path} is a URL."); // TODO: Implement url check. } else if !working_dir.join(path).exists() { bail!("no Python library at `{path}`"); From c19658d4a046d22428c6567f9115b0f0b7490dc1 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Wed, 10 Apr 2024 13:48:36 +0200 Subject: [PATCH 2/4] Add Ci/CD on custom conda env for operator --- .github/workflows/ci.yml | 4 ++++ examples/python-operator-dataflow/dataflow.yml | 4 +++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 17b3e9942..c6bd86540 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -141,7 +141,11 @@ jobs: python-version: "3.10" - name: "Python Dataflow example" run: cargo run --example python-dataflow + - uses: conda-incubator/setup-miniconda@v2 + with: + auto-activate-base: false - name: "Python Operator Dataflow example" + shell: bash -l {0} run: cargo run --example python-operator-dataflow # ROS2 bridge examples diff --git a/examples/python-operator-dataflow/dataflow.yml b/examples/python-operator-dataflow/dataflow.yml index 400f881f5..ba367da22 100644 --- a/examples/python-operator-dataflow/dataflow.yml +++ b/examples/python-operator-dataflow/dataflow.yml @@ -19,7 +19,9 @@ nodes: - id: plot operator: - python: plot.py + python: + source: plot.py + conda_env: base inputs: image: webcam/image bbox: object_detection/bbox From c6948c1644f327ef234debda6a3b505374413349 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Wed, 10 Apr 2024 13:54:56 +0200 Subject: [PATCH 3/4] Only use conda if conda is present --- binaries/cli/src/attach.rs | 2 +- binaries/daemon/src/spawn.rs | 2 +- binaries/runtime/src/operator/python.rs | 10 +++---- .../python-operator-dataflow/dataflow.yml | 4 +-- .../dataflow_conda.yml | 28 +++++++++++++++++++ examples/python-operator-dataflow/run.rs | 9 ++++-- 6 files changed, 43 insertions(+), 12 deletions(-) create mode 100644 examples/python-operator-dataflow/dataflow_conda.yml diff --git a/binaries/cli/src/attach.rs b/binaries/cli/src/attach.rs index c121fb0dd..d47306434 100644 --- a/binaries/cli/src/attach.rs +++ b/binaries/cli/src/attach.rs @@ -44,7 +44,7 @@ pub fn attach_dataflow( let path = resolve_path(&python_source.source, &working_dir) .wrap_err_with(|| { format!("failed to resolve node source `{}`", python_source.source) - })?; + })?; node_path_lookup .insert(path, (dataflow_id, node.id.clone(), Some(op.id.clone()))); } diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 6f43ec645..6b6a3bff0 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -20,7 +20,7 @@ use dora_node_api::{ arrow_utils::{copy_array_into_sample, required_data_size}, Metadata, }; -use eyre::WrapErr; +use eyre::{ContextCompat, WrapErr}; use std::{ env::consts::EXE_EXTENSION, path::{Path, PathBuf}, diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 07a47c422..eec5b7ec5 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -3,7 +3,7 @@ use super::{OperatorEvent, StopReason}; use dora_core::{ config::{NodeId, OperatorId}, - descriptor::{source_is_url, Descriptor}, + descriptor::{source_is_url, Descriptor, PythonSource}, }; use dora_download::download_file; use dora_node_api::Event; @@ -35,13 +35,13 @@ fn traceback(err: pyo3::PyErr) -> eyre::Report { pub fn run( node_id: &NodeId, operator_id: &OperatorId, - source: &str, + python_source: &PythonSource, events_tx: Sender, incoming_events: flume::Receiver, init_done: oneshot::Sender>, dataflow_descriptor: &Descriptor, ) -> eyre::Result<()> { - let path = if source_is_url(source) { + let path = if source_is_url(&python_source.source) { let target_path = Path::new("build") .join(node_id.to_string()) .join(format!("{}.py", operator_id)); @@ -49,11 +49,11 @@ pub fn run( let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build()?; - rt.block_on(download_file(source, &target_path)) + rt.block_on(download_file(&python_source.source, &target_path)) .wrap_err("failed to download Python operator")?; target_path } else { - Path::new(source).to_owned() + Path::new(&python_source.source).to_owned() }; if !path.exists() { diff --git a/examples/python-operator-dataflow/dataflow.yml b/examples/python-operator-dataflow/dataflow.yml index ba367da22..400f881f5 100644 --- a/examples/python-operator-dataflow/dataflow.yml +++ b/examples/python-operator-dataflow/dataflow.yml @@ -19,9 +19,7 @@ nodes: - id: plot operator: - python: - source: plot.py - conda_env: base + python: plot.py inputs: image: webcam/image bbox: object_detection/bbox diff --git a/examples/python-operator-dataflow/dataflow_conda.yml b/examples/python-operator-dataflow/dataflow_conda.yml new file mode 100644 index 000000000..ba367da22 --- /dev/null +++ b/examples/python-operator-dataflow/dataflow_conda.yml @@ -0,0 +1,28 @@ +nodes: + - id: webcam + operator: + python: webcam.py + inputs: + tick: dora/timer/millis/50 + outputs: + - image + + - id: object_detection + operator: + send_stdout_as: stdout + python: object_detection.py + inputs: + image: webcam/image + outputs: + - bbox + - stdout + + - id: plot + operator: + python: + source: plot.py + conda_env: base + inputs: + image: webcam/image + bbox: object_detection/bbox + assistant_message: object_detection/stdout diff --git a/examples/python-operator-dataflow/run.rs b/examples/python-operator-dataflow/run.rs index dfc2e8345..d6534cf6b 100644 --- a/examples/python-operator-dataflow/run.rs +++ b/examples/python-operator-dataflow/run.rs @@ -73,8 +73,13 @@ async fn main() -> eyre::Result<()> { .await .context("maturin develop failed")?; - let dataflow = Path::new("dataflow.yml"); - run_dataflow(dataflow).await?; + if std::env::var("CONDA_EXE").is_ok() { + let dataflow = Path::new("dataflow.yml"); + run_dataflow(dataflow).await?; + } else { + let dataflow = Path::new("dataflow_conda.yml"); + run_dataflow(dataflow).await?; + } Ok(()) } From b58904923ad35186447373b018b50ef2d331e049 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Wed, 10 Apr 2024 14:38:57 +0200 Subject: [PATCH 4/4] deactivate env before running CI/CD --- .github/workflows/ci.yml | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c6bd86540..d092c5558 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -143,10 +143,13 @@ jobs: run: cargo run --example python-dataflow - uses: conda-incubator/setup-miniconda@v2 with: - auto-activate-base: false + auto-activate-base: true + activate-environment: "" - name: "Python Operator Dataflow example" shell: bash -l {0} - run: cargo run --example python-operator-dataflow + run: | + conda deactivate + cargo run --example python-operator-dataflow # ROS2 bridge examples ros2-bridge-examples: