diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 17b3e9942..d092c5558 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -141,8 +141,15 @@ jobs: python-version: "3.10" - name: "Python Dataflow example" run: cargo run --example python-dataflow + - uses: conda-incubator/setup-miniconda@v2 + with: + auto-activate-base: true + activate-environment: "" - name: "Python Operator Dataflow example" - run: cargo run --example python-operator-dataflow + shell: bash -l {0} + run: | + conda deactivate + cargo run --example python-operator-dataflow # ROS2 bridge examples ros2-bridge-examples: diff --git a/Cargo.lock b/Cargo.lock index 2f260d9ab..51e0deda3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1557,6 +1557,7 @@ dependencies = [ "tracing", "tracing-opentelemetry", "uuid", + "which", ] [[package]] diff --git a/binaries/cli/src/attach.rs b/binaries/cli/src/attach.rs index 38a12aeed..d47306434 100644 --- a/binaries/cli/src/attach.rs +++ b/binaries/cli/src/attach.rs @@ -38,11 +38,13 @@ pub fn attach_dataflow( CoreNodeKind::Custom(_cn) => (), CoreNodeKind::Runtime(rn) => { for op in rn.operators.iter() { - if let dora_core::descriptor::OperatorSource::Python(source) = &op.config.source + if let dora_core::descriptor::OperatorSource::Python(python_source) = + &op.config.source { - let path = resolve_path(source, &working_dir).wrap_err_with(|| { - format!("failed to resolve node source `{}`", source) - })?; + let path = resolve_path(&python_source.source, &working_dir) + .wrap_err_with(|| { + format!("failed to resolve node source `{}`", python_source.source) + })?; node_path_lookup .insert(path, (dataflow_id, node.id.clone(), Some(op.id.clone()))); } diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index 8086394e8..dbed43b6f 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -37,3 +37,4 @@ bincode = "1.3.3" async-trait = "0.1.64" aligned-vec = "0.5.0" ctrlc = "3.2.5" +which = "5.0.0" diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 1681c0ad1..6b6a3bff0 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -8,7 +8,8 @@ use dora_core::{ config::{DataId, NodeRunConfig}, daemon_messages::{DataMessage, DataflowId, NodeConfig, RuntimeConfig, Timestamped}, descriptor::{ - resolve_path, source_is_url, Descriptor, OperatorSource, ResolvedNode, SHELL_SOURCE, + resolve_path, source_is_url, Descriptor, OperatorDefinition, OperatorSource, PythonSource, + ResolvedNode, SHELL_SOURCE, }, get_python_path, message::uhlc::HLC, @@ -19,7 +20,7 @@ use dora_node_api::{ arrow_utils::{copy_array_into_sample, required_data_size}, Metadata, }; -use eyre::WrapErr; +use eyre::{ContextCompat, WrapErr}; use std::{ env::consts::EXE_EXTENSION, path::{Path, PathBuf}, @@ -149,26 +150,62 @@ pub async fn spawn_node( })? } dora_core::descriptor::CoreNodeKind::Runtime(n) => { - let has_python_operator = n + let python_operators: Vec<&OperatorDefinition> = n .operators .iter() - .any(|x| matches!(x.config.source, OperatorSource::Python { .. })); + .filter(|x| matches!(x.config.source, OperatorSource::Python { .. })) + .collect(); - let has_other_operator = n + let other_operators = n .operators .iter() .any(|x| !matches!(x.config.source, OperatorSource::Python { .. })); - let mut command = if has_python_operator && !has_other_operator { + let mut command = if !python_operators.is_empty() && !other_operators { // Use python to spawn runtime if there is a python operator - let python = get_python_path().context("Could not find python in daemon")?; - let mut command = tokio::process::Command::new(python); - command.args([ - "-c", - format!("import dora; dora.start_runtime() # {}", node.id).as_str(), - ]); - command - } else if !has_python_operator && has_other_operator { + + // TODO: Handle multi-operator runtime once sub-interpreter is supported + if python_operators.len() > 2 { + eyre::bail!( + "Runtime currently only support one Python Operator. + This is because pyo4 sub-interpreter is not yet available. + See: https://github.com/PyO4/pyo3/issues/576" + ); + } + + let python_operator = python_operators + .first() + .context("Runtime had no operators definition.")?; + + if let OperatorSource::Python(PythonSource { + source: _, + conda_env: Some(conda_env), + }) = &python_operator.config.source + { + let conda = which::which("conda").context( + "failed to find `conda`, yet a `conda_env` was defined. Make sure that `conda` is available.", + )?; + let mut command = tokio::process::Command::new(conda); + command.args([ + "run", + "-n", + &conda_env, + "python", + "-c", + format!("import dora; dora.start_runtime() # {}", node.id).as_str(), + ]); + command + } else { + let python = get_python_path() + .context("Could not find python path when spawning runtime node")?; + let mut command = tokio::process::Command::new(python); + command.args([ + "-c", + format!("import dora; dora.start_runtime() # {}", node.id).as_str(), + ]); + command + } + } else if python_operators.is_empty() && other_operators { let mut cmd = tokio::process::Command::new( std::env::current_exe().wrap_err("failed to get current executable path")?, ); diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 07a47c422..eec5b7ec5 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -3,7 +3,7 @@ use super::{OperatorEvent, StopReason}; use dora_core::{ config::{NodeId, OperatorId}, - descriptor::{source_is_url, Descriptor}, + descriptor::{source_is_url, Descriptor, PythonSource}, }; use dora_download::download_file; use dora_node_api::Event; @@ -35,13 +35,13 @@ fn traceback(err: pyo3::PyErr) -> eyre::Report { pub fn run( node_id: &NodeId, operator_id: &OperatorId, - source: &str, + python_source: &PythonSource, events_tx: Sender, incoming_events: flume::Receiver, init_done: oneshot::Sender>, dataflow_descriptor: &Descriptor, ) -> eyre::Result<()> { - let path = if source_is_url(source) { + let path = if source_is_url(&python_source.source) { let target_path = Path::new("build") .join(node_id.to_string()) .join(format!("{}.py", operator_id)); @@ -49,11 +49,11 @@ pub fn run( let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build()?; - rt.block_on(download_file(source, &target_path)) + rt.block_on(download_file(&python_source.source, &target_path)) .wrap_err("failed to download Python operator")?; target_path } else { - Path::new(source).to_owned() + Path::new(&python_source.source).to_owned() }; if !path.exists() { diff --git a/examples/python-operator-dataflow/dataflow_conda.yml b/examples/python-operator-dataflow/dataflow_conda.yml new file mode 100644 index 000000000..ba367da22 --- /dev/null +++ b/examples/python-operator-dataflow/dataflow_conda.yml @@ -0,0 +1,28 @@ +nodes: + - id: webcam + operator: + python: webcam.py + inputs: + tick: dora/timer/millis/50 + outputs: + - image + + - id: object_detection + operator: + send_stdout_as: stdout + python: object_detection.py + inputs: + image: webcam/image + outputs: + - bbox + - stdout + + - id: plot + operator: + python: + source: plot.py + conda_env: base + inputs: + image: webcam/image + bbox: object_detection/bbox + assistant_message: object_detection/stdout diff --git a/examples/python-operator-dataflow/run.rs b/examples/python-operator-dataflow/run.rs index dfc2e8345..d6534cf6b 100644 --- a/examples/python-operator-dataflow/run.rs +++ b/examples/python-operator-dataflow/run.rs @@ -73,8 +73,13 @@ async fn main() -> eyre::Result<()> { .await .context("maturin develop failed")?; - let dataflow = Path::new("dataflow.yml"); - run_dataflow(dataflow).await?; + if std::env::var("CONDA_EXE").is_ok() { + let dataflow = Path::new("dataflow.yml"); + run_dataflow(dataflow).await?; + } else { + let dataflow = Path::new("dataflow_conda.yml"); + run_dataflow(dataflow).await?; + } Ok(()) } diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 90ad74e68..ad4b7757a 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -260,9 +260,53 @@ pub struct OperatorConfig { #[serde(rename_all = "kebab-case")] pub enum OperatorSource { SharedLibrary(String), - Python(String), + Python(PythonSource), Wasm(String), } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde( + deny_unknown_fields, + from = "PythonSourceDef", + into = "PythonSourceDef" +)] +pub struct PythonSource { + pub source: String, + pub conda_env: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(untagged)] +pub enum PythonSourceDef { + SourceOnly(String), + WithOptions { + source: String, + conda_env: Option, + }, +} + +impl From for PythonSourceDef { + fn from(input: PythonSource) -> Self { + match input { + PythonSource { + source, + conda_env: None, + } => Self::SourceOnly(source), + PythonSource { source, conda_env } => Self::WithOptions { source, conda_env }, + } + } +} + +impl From for PythonSource { + fn from(value: PythonSourceDef) -> Self { + match value { + PythonSourceDef::SourceOnly(source) => Self { + source, + conda_env: None, + }, + PythonSourceDef::WithOptions { source, conda_env } => Self { source, conda_env }, + } + } +} pub fn source_is_url(source: &str) -> bool { source.contains("://") diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index 316d3c588..1da9328af 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -50,9 +50,10 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result } } } - OperatorSource::Python(path) => { + OperatorSource::Python(python_source) => { has_python_operator = true; - if source_is_url(path) { + let path = &python_source.source; + if source_is_url(&path) { info!("{path} is a URL."); // TODO: Implement url check. } else if !working_dir.join(path).exists() { bail!("no Python library at `{path}`");