Skip to content

Commit

Permalink
Adding customizing conda_env within Python operator
Browse files Browse the repository at this point in the history
This commit makes it possible to specify the conda env that we want to use in
a specific operator.

```yaml
  - id: robot
    operator:
      python:
        source: ../operators/robot.py
        conda_env: robomaster
```

This will call:
```bash
conda run -n robomaster python -c "import dora; dora.start_runtime()"
```
  • Loading branch information
haixuanTao committed Apr 10, 2024
1 parent cea23e8 commit 15af539
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 19 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions binaries/cli/src/attach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ pub fn attach_dataflow(
CoreNodeKind::Custom(_cn) => (),
CoreNodeKind::Runtime(rn) => {
for op in rn.operators.iter() {
if let dora_core::descriptor::OperatorSource::Python(source) = &op.config.source
if let dora_core::descriptor::OperatorSource::Python(python_source) =
&op.config.source
{
let path = resolve_path(source, &working_dir).wrap_err_with(|| {
format!("failed to resolve node source `{}`", source)
let path = resolve_path(&python_source.source, &working_dir)
.wrap_err_with(|| {
format!("failed to resolve node source `{}`", python_source.source)
})?;
node_path_lookup
.insert(path, (dataflow_id, node.id.clone(), Some(op.id.clone())));
Expand Down
1 change: 1 addition & 0 deletions binaries/daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ bincode = "1.3.3"
async-trait = "0.1.64"
aligned-vec = "0.5.0"
ctrlc = "3.2.5"
which = "5.0.0"
63 changes: 50 additions & 13 deletions binaries/daemon/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use dora_core::{
config::{DataId, NodeRunConfig},
daemon_messages::{DataMessage, DataflowId, NodeConfig, RuntimeConfig, Timestamped},
descriptor::{
resolve_path, source_is_url, Descriptor, OperatorSource, ResolvedNode, SHELL_SOURCE,
resolve_path, source_is_url, Descriptor, OperatorDefinition, OperatorSource, PythonSource,
ResolvedNode, SHELL_SOURCE,
},
get_python_path,
message::uhlc::HLC,
Expand Down Expand Up @@ -149,26 +150,62 @@ pub async fn spawn_node(
})?
}
dora_core::descriptor::CoreNodeKind::Runtime(n) => {
let has_python_operator = n
let python_operators: Vec<&OperatorDefinition> = n
.operators
.iter()
.any(|x| matches!(x.config.source, OperatorSource::Python { .. }));
.filter(|x| matches!(x.config.source, OperatorSource::Python { .. }))
.collect();

let has_other_operator = n
let other_operators = n
.operators
.iter()
.any(|x| !matches!(x.config.source, OperatorSource::Python { .. }));

let mut command = if has_python_operator && !has_other_operator {
let mut command = if !python_operators.is_empty() && !other_operators {
// Use python to spawn runtime if there is a python operator
let python = get_python_path().context("Could not find python in daemon")?;
let mut command = tokio::process::Command::new(python);
command.args([
"-c",
format!("import dora; dora.start_runtime() # {}", node.id).as_str(),
]);
command
} else if !has_python_operator && has_other_operator {

// TODO: Handle multi-operator runtime once sub-interpreter is supported
if python_operators.len() > 2 {
eyre::bail!(
"Runtime currently only support one Python Operator.
This is because pyo4 sub-interpreter is not yet available.
See: https://github.com/PyO4/pyo3/issues/576"
);
}

let python_operator = python_operators
.first()
.context("Runtime had no operators definition.")?;

if let OperatorSource::Python(PythonSource {
source: _,
conda_env: Some(conda_env),
}) = &python_operator.config.source
{
let conda = which::which("conda").context(
"failed to find `conda`, yet a `conda_env` was defined. Make sure that `conda` is available.",
)?;
let mut command = tokio::process::Command::new(conda);
command.args([
"run",
"-n",
&conda_env,
"python",
"-c",
format!("import dora; dora.start_runtime() # {}", node.id).as_str(),
]);
command
} else {
let python = get_python_path()
.context("Could not find python path when spawning runtime node")?;
let mut command = tokio::process::Command::new(python);
command.args([
"-c",
format!("import dora; dora.start_runtime() # {}", node.id).as_str(),
]);
command
}
} else if python_operators.is_empty() && other_operators {
let mut cmd = tokio::process::Command::new(
std::env::current_exe().wrap_err("failed to get current executable path")?,
);
Expand Down
46 changes: 45 additions & 1 deletion libraries/core/src/descriptor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,53 @@ pub struct OperatorConfig {
#[serde(rename_all = "kebab-case")]
pub enum OperatorSource {
SharedLibrary(String),
Python(String),
Python(PythonSource),
Wasm(String),
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(
deny_unknown_fields,
from = "PythonSourceDef",
into = "PythonSourceDef"
)]
pub struct PythonSource {
pub source: String,
pub conda_env: Option<String>,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(untagged)]
pub enum PythonSourceDef {
SourceOnly(String),
WithOptions {
source: String,
conda_env: Option<String>,
},
}

impl From<PythonSource> for PythonSourceDef {
fn from(input: PythonSource) -> Self {
match input {
PythonSource {
source,
conda_env: None,
} => Self::SourceOnly(source),
PythonSource { source, conda_env } => Self::WithOptions { source, conda_env },
}
}
}

impl From<PythonSourceDef> for PythonSource {
fn from(value: PythonSourceDef) -> Self {
match value {
PythonSourceDef::SourceOnly(source) => Self {
source,
conda_env: None,
},
PythonSourceDef::WithOptions { source, conda_env } => Self { source, conda_env },
}
}
}

pub fn source_is_url(source: &str) -> bool {
source.contains("://")
Expand Down
5 changes: 3 additions & 2 deletions libraries/core/src/descriptor/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result
}
}
}
OperatorSource::Python(path) => {
OperatorSource::Python(python_source) => {
has_python_operator = true;
if source_is_url(path) {
let path = &python_source.source;
if source_is_url(&path) {
info!("{path} is a URL."); // TODO: Implement url check.
} else if !working_dir.join(path).exists() {
bail!("no Python library at `{path}`");
Expand Down

0 comments on commit 15af539

Please sign in to comment.