Skip to content

Commit

Permalink
Add new send_stdout_as key for capturing stdout of custom nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
phil-opp authored and haixuanTao committed Feb 12, 2024
1 parent 39fbb6b commit e85ce44
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
13 changes: 7 additions & 6 deletions binaries/daemon/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ pub async fn spawn_node(
clock.clone(),
)
.await?;
let outputs = node_outputs(&node);
let log_output = outputs.contains(&DataId::from("op/logs".to_string()));
let send_stdout_to = node.send_stdout_as().map(ToOwned::to_owned);

let mut child = match node.kind {
dora_core::descriptor::CoreNodeKind::Custom(n) => {
Expand Down Expand Up @@ -333,19 +332,21 @@ pub async fn spawn_node(
tokio::spawn(async move {
while let Some(message) = rx.recv().await {
// If log is an output, we're sending the logs to the dataflow
if log_output {
if let Some(stdout_output_name) = &send_stdout_to {
// Convert logs to DataMessage
let array = message.into_arrow();

let array: ArrayData = array.into();
let total_len = required_data_size(&array);
let mut sample: AVec<u8, ConstAlign<128>> =
AVec::__from_elem(128, 0, total_len as usize);
let mut sample: AVec<u8, ConstAlign<128>> = AVec::__from_elem(128, 0, total_len);

let type_info = copy_array_into_sample(&mut sample, &array);

let metadata = Metadata::new(uhlc.new_timestamp(), type_info);
let output_id = OutputId(node_id.clone(), DataId::from("op/logs".to_string()));
let output_id = OutputId(
node_id.clone(),
DataId::from(stdout_output_name.to_string()),
);
let event = DoraEvent::Logs {
dataflow_id,
output_id,
Expand Down
11 changes: 11 additions & 0 deletions libraries/core/src/descriptor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,15 @@ pub struct ResolvedNode {
pub kind: CoreNodeKind,
}

impl ResolvedNode {
pub fn send_stdout_as(&self) -> Option<&str> {
match &self.kind {
CoreNodeKind::Runtime(_) => None, // todo: add support for operator-level stdout capture
CoreNodeKind::Custom(n) => n.send_stdout_as.as_deref(),
}
}
}

#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ResolvedDeploy {
pub machine: String,
Expand Down Expand Up @@ -275,6 +284,8 @@ pub struct CustomNode {
pub envs: Option<BTreeMap<String, EnvValue>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub build: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub send_stdout_as: Option<String>,

#[serde(flatten)]
pub run_config: NodeRunConfig,
Expand Down

0 comments on commit e85ce44

Please sign in to comment.