diff --git a/Cargo.lock b/Cargo.lock index 2f260d9ab..51e0deda3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1557,6 +1557,7 @@ dependencies = [ "tracing", "tracing-opentelemetry", "uuid", + "which", ] [[package]] diff --git a/binaries/cli/src/attach.rs b/binaries/cli/src/attach.rs index 38a12aeed..c121fb0dd 100644 --- a/binaries/cli/src/attach.rs +++ b/binaries/cli/src/attach.rs @@ -38,10 +38,12 @@ pub fn attach_dataflow( CoreNodeKind::Custom(_cn) => (), CoreNodeKind::Runtime(rn) => { for op in rn.operators.iter() { - if let dora_core::descriptor::OperatorSource::Python(source) = &op.config.source + if let dora_core::descriptor::OperatorSource::Python(python_source) = + &op.config.source { - let path = resolve_path(source, &working_dir).wrap_err_with(|| { - format!("failed to resolve node source `{}`", source) + let path = resolve_path(&python_source.source, &working_dir) + .wrap_err_with(|| { + format!("failed to resolve node source `{}`", python_source.source) })?; node_path_lookup .insert(path, (dataflow_id, node.id.clone(), Some(op.id.clone()))); diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index 8086394e8..dbed43b6f 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -37,3 +37,4 @@ bincode = "1.3.3" async-trait = "0.1.64" aligned-vec = "0.5.0" ctrlc = "3.2.5" +which = "5.0.0" diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 1681c0ad1..6f43ec645 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -8,7 +8,8 @@ use dora_core::{ config::{DataId, NodeRunConfig}, daemon_messages::{DataMessage, DataflowId, NodeConfig, RuntimeConfig, Timestamped}, descriptor::{ - resolve_path, source_is_url, Descriptor, OperatorSource, ResolvedNode, SHELL_SOURCE, + resolve_path, source_is_url, Descriptor, OperatorDefinition, OperatorSource, PythonSource, + ResolvedNode, SHELL_SOURCE, }, get_python_path, message::uhlc::HLC, @@ -149,26 +150,62 @@ pub async fn spawn_node( })? } dora_core::descriptor::CoreNodeKind::Runtime(n) => { - let has_python_operator = n + let python_operators: Vec<&OperatorDefinition> = n .operators .iter() - .any(|x| matches!(x.config.source, OperatorSource::Python { .. })); + .filter(|x| matches!(x.config.source, OperatorSource::Python { .. })) + .collect(); - let has_other_operator = n + let other_operators = n .operators .iter() .any(|x| !matches!(x.config.source, OperatorSource::Python { .. })); - let mut command = if has_python_operator && !has_other_operator { + let mut command = if !python_operators.is_empty() && !other_operators { // Use python to spawn runtime if there is a python operator - let python = get_python_path().context("Could not find python in daemon")?; - let mut command = tokio::process::Command::new(python); - command.args([ - "-c", - format!("import dora; dora.start_runtime() # {}", node.id).as_str(), - ]); - command - } else if !has_python_operator && has_other_operator { + + // TODO: Handle multi-operator runtime once sub-interpreter is supported + if python_operators.len() > 2 { + eyre::bail!( + "Runtime currently only support one Python Operator. + This is because pyo4 sub-interpreter is not yet available. + See: https://github.com/PyO4/pyo3/issues/576" + ); + } + + let python_operator = python_operators + .first() + .context("Runtime had no operators definition.")?; + + if let OperatorSource::Python(PythonSource { + source: _, + conda_env: Some(conda_env), + }) = &python_operator.config.source + { + let conda = which::which("conda").context( + "failed to find `conda`, yet a `conda_env` was defined. Make sure that `conda` is available.", + )?; + let mut command = tokio::process::Command::new(conda); + command.args([ + "run", + "-n", + &conda_env, + "python", + "-c", + format!("import dora; dora.start_runtime() # {}", node.id).as_str(), + ]); + command + } else { + let python = get_python_path() + .context("Could not find python path when spawning runtime node")?; + let mut command = tokio::process::Command::new(python); + command.args([ + "-c", + format!("import dora; dora.start_runtime() # {}", node.id).as_str(), + ]); + command + } + } else if python_operators.is_empty() && other_operators { let mut cmd = tokio::process::Command::new( std::env::current_exe().wrap_err("failed to get current executable path")?, ); diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 90ad74e68..ad4b7757a 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -260,9 +260,53 @@ pub struct OperatorConfig { #[serde(rename_all = "kebab-case")] pub enum OperatorSource { SharedLibrary(String), - Python(String), + Python(PythonSource), Wasm(String), } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde( + deny_unknown_fields, + from = "PythonSourceDef", + into = "PythonSourceDef" +)] +pub struct PythonSource { + pub source: String, + pub conda_env: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(untagged)] +pub enum PythonSourceDef { + SourceOnly(String), + WithOptions { + source: String, + conda_env: Option, + }, +} + +impl From for PythonSourceDef { + fn from(input: PythonSource) -> Self { + match input { + PythonSource { + source, + conda_env: None, + } => Self::SourceOnly(source), + PythonSource { source, conda_env } => Self::WithOptions { source, conda_env }, + } + } +} + +impl From for PythonSource { + fn from(value: PythonSourceDef) -> Self { + match value { + PythonSourceDef::SourceOnly(source) => Self { + source, + conda_env: None, + }, + PythonSourceDef::WithOptions { source, conda_env } => Self { source, conda_env }, + } + } +} pub fn source_is_url(source: &str) -> bool { source.contains("://") diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index 316d3c588..1da9328af 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -50,9 +50,10 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result } } } - OperatorSource::Python(path) => { + OperatorSource::Python(python_source) => { has_python_operator = true; - if source_is_url(path) { + let path = &python_source.source; + if source_is_url(&path) { info!("{path} is a URL."); // TODO: Implement url check. } else if !working_dir.join(path).exists() { bail!("no Python library at `{path}`");