Skip to content

Commit

Permalink
Add possibility to send stdout for operators and add warnings when th…
Browse files Browse the repository at this point in the history
…ere is multiple operators
  • Loading branch information
haixuanTao committed Feb 28, 2024
1 parent e85ce44 commit 9b3b7af
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 6 deletions.
6 changes: 5 additions & 1 deletion binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,11 @@ impl Daemon {
};

let Some(subscribers) = dataflow.mappings.get(&output_id) else {
tracing::warn!("No subscribers found for {:?} in {:?}", output_id, dataflow.mappings);
tracing::warn!(
"No subscribers found for {:?} in {:?}",
output_id,
dataflow.mappings
);
return Ok(RunStatus::Continue);
};

Expand Down
2 changes: 1 addition & 1 deletion binaries/daemon/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub async fn spawn_node(
clock.clone(),
)
.await?;
let send_stdout_to = node.send_stdout_as().map(ToOwned::to_owned);
let send_stdout_to = node.send_stdout_as();

let mut child = match node.kind {
dora_core::descriptor::CoreNodeKind::Custom(n) => {
Expand Down
3 changes: 2 additions & 1 deletion examples/python-operator-dataflow/dataflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ nodes:
- id: object_detection
operator:
python: object_detection.py
send_stdout_as: stdout
inputs:
image: webcam/image
outputs:
- bbox
- logs
- stdout

- id: plot
operator:
Expand Down
27 changes: 24 additions & 3 deletions libraries/core/src/descriptor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{
fmt,
path::{Path, PathBuf},
};
use tracing::warn;
pub use visualize::collect_dora_timers;

mod validate;
Expand Down Expand Up @@ -165,10 +166,28 @@ pub struct ResolvedNode {
}

impl ResolvedNode {
pub fn send_stdout_as(&self) -> Option<&str> {
pub fn send_stdout_as(&self) -> Option<String> {
match &self.kind {
CoreNodeKind::Runtime(_) => None, // todo: add support for operator-level stdout capture
CoreNodeKind::Custom(n) => n.send_stdout_as.as_deref(),
// TODO: Split stdout between operators
CoreNodeKind::Runtime(n) => {
let count = n
.operators
.iter()
.filter(|op| op.config.send_stdout_as.is_some())
.count();
if count == 1 && n.operators.len() > 1 {
warn!("All stdout from all operators of a runtime are going to be sent in the selected `send_stdout_as` operator.")
} else if count > 1 {
warn!("More than one `send_stdout_as` operators for a runtime node. Selecting the first stdout operator.")
}
n.operators.iter().find_map(|op| {
op.config
.send_stdout_as
.clone()
.map(|stdout| format!("{}/{}", op.id, stdout))
})
}
CoreNodeKind::Custom(n) => n.send_stdout_as.clone(),
}
}
}
Expand Down Expand Up @@ -233,6 +252,8 @@ pub struct OperatorConfig {

#[serde(default, skip_serializing_if = "Option::is_none")]
pub build: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub send_stdout_as: Option<String>,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
Expand Down

0 comments on commit 9b3b7af

Please sign in to comment.