Skip to content

Commit

Permalink
Merge pull request dora-rs#388 from dora-rs/output-logs
Browse files Browse the repository at this point in the history
Add option to send `stdout` as node/operator output
  • Loading branch information
haixuanTao authored Feb 29, 2024
2 parents ac6eaec + b32a7e4 commit 2615b04
Show file tree
Hide file tree
Showing 13 changed files with 180 additions and 21 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apis/python/operator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ mod tests {
let size = required_data_size(arrow_array);
let mut sample: AVec<u8, ConstAlign<128>> = AVec::__from_elem(128, 0, size);

let info = copy_array_into_sample(&mut sample, arrow_array)?;
let info = copy_array_into_sample(&mut sample, arrow_array);

let serialized_deserialized_arrow_array = RawData::Vec(sample)
.into_arrow_array(&info)
Expand Down
13 changes: 5 additions & 8 deletions apis/rust/node/src/node/arrow_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ fn required_data_size_inner(array: &ArrayData, next_offset: &mut usize) {
}
}

pub fn copy_array_into_sample(
target_buffer: &mut [u8],
arrow_array: &ArrayData,
) -> eyre::Result<ArrowTypeInfo> {
pub fn copy_array_into_sample(target_buffer: &mut [u8], arrow_array: &ArrayData) -> ArrowTypeInfo {
let mut next_offset = 0;
copy_array_into_sample_inner(target_buffer, &mut next_offset, arrow_array)
}
Expand All @@ -32,7 +29,7 @@ fn copy_array_into_sample_inner(
target_buffer: &mut [u8],
next_offset: &mut usize,
arrow_array: &ArrayData,
) -> eyre::Result<ArrowTypeInfo> {
) -> ArrowTypeInfo {
let mut buffer_offsets = Vec::new();
let layout = arrow::array::layout(arrow_array.data_type());
for (buffer, spec) in arrow_array.buffers().iter().zip(&layout.buffers) {
Expand All @@ -58,17 +55,17 @@ fn copy_array_into_sample_inner(

let mut child_data = Vec::new();
for child in arrow_array.child_data() {
let child_type_info = copy_array_into_sample_inner(target_buffer, next_offset, child)?;
let child_type_info = copy_array_into_sample_inner(target_buffer, next_offset, child);
child_data.push(child_type_info);
}

Ok(ArrowTypeInfo {
ArrowTypeInfo {
data_type: arrow_array.data_type().clone(),
len: arrow_array.len(),
null_count: arrow_array.null_count(),
validity: arrow_array.nulls().map(|b| b.validity().to_owned()),
offset: arrow_array.offset(),
buffer_offsets,
child_data,
})
}
}
2 changes: 1 addition & 1 deletion apis/rust/node/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl DoraNode {
let total_len = required_data_size(&arrow_array);

let mut sample = self.allocate_data_sample(total_len)?;
let type_info = copy_array_into_sample(&mut sample, &arrow_array)?;
let type_info = copy_array_into_sample(&mut sample, &arrow_array);

self.send_output_sample(output_id, type_info, parameters, Some(sample))
.wrap_err("failed to send output")?;
Expand Down
2 changes: 2 additions & 0 deletions binaries/daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ dora-core = { workspace = true }
flume = "0.10.14"
dora-download = { workspace = true }
dora-tracing = { workspace = true, optional = true }
dora-arrow-convert = { workspace = true }
dora-node-api = { workspace = true }
serde_yaml = "0.8.23"
uuid = { version = "1.1.2", features = ["v4"] }
futures = "0.3.25"
Expand Down
58 changes: 56 additions & 2 deletions binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use dora_core::config::{Input, OperatorId};
use dora_core::coordinator_messages::CoordinatorRequest;
use dora_core::daemon_messages::{DataMessage, InterDaemonEvent, Timestamped};
use dora_core::message::uhlc::{self, HLC};
use dora_core::message::{ArrowTypeInfo, MetadataParameters};
use dora_core::message::{ArrowTypeInfo, Metadata, MetadataParameters};
use dora_core::{
config::{DataId, InputMapping, NodeId},
coordinator_messages::DaemonEvent,
Expand All @@ -14,6 +14,7 @@ use dora_core::{
},
descriptor::{CoreNodeKind, Descriptor, ResolvedNode},
};

use eyre::{bail, eyre, Context, ContextCompat};
use futures::{future, stream, FutureExt, TryFutureExt};
use futures_concurrency::stream::Merge;
Expand Down Expand Up @@ -971,6 +972,53 @@ impl Daemon {
dataflow.subscribe_channels.remove(id);
}
}
DoraEvent::Logs {
dataflow_id,
output_id,
message,
metadata,
} => {
let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
tracing::warn!("Logs event for unknown dataflow `{dataflow_id}`");
return Ok(RunStatus::Continue);
};

let Some(subscribers) = dataflow.mappings.get(&output_id) else {
tracing::warn!(
"No subscribers found for {:?} in {:?}",
output_id,
dataflow.mappings
);
return Ok(RunStatus::Continue);
};

let mut closed = Vec::new();
for (receiver_id, input_id) in subscribers {
let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
tracing::warn!("No subscriber channel found for {:?}", output_id);
continue;
};

let send_result = send_with_timestamp(
channel,
daemon_messages::NodeEvent::Input {
id: input_id.clone(),
metadata: metadata.clone(),
data: Some(message.clone()),
},
&self.clock,
);
match send_result {
Ok(()) => {}
Err(_) => {
closed.push(receiver_id);
}
}
}
for id in closed {
dataflow.subscribe_channels.remove(id);
}
}
DoraEvent::SpawnedNodeResult {
dataflow_id,
node_id,
Expand Down Expand Up @@ -1397,7 +1445,7 @@ impl RunningDataflow {
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct OutputId(NodeId, DataId);
pub struct OutputId(NodeId, DataId);
type InputId = (NodeId, DataId);

struct DropTokenInformation {
Expand Down Expand Up @@ -1465,6 +1513,12 @@ pub enum DoraEvent {
interval: Duration,
metadata: dora_core::message::Metadata,
},
Logs {
dataflow_id: DataflowId,
output_id: OutputId,
message: DataMessage,
metadata: Metadata,
},
SpawnedNodeResult {
dataflow_id: DataflowId,
node_id: NodeId,
Expand Down
49 changes: 46 additions & 3 deletions binaries/daemon/src/spawn.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
use crate::{
log, node_communication::spawn_listener_loop, node_inputs, runtime_node_inputs,
runtime_node_outputs, DoraEvent, Event, NodeExitStatus,
runtime_node_outputs, DoraEvent, Event, NodeExitStatus, OutputId,
};
use aligned_vec::{AVec, ConstAlign};
use dora_arrow_convert::IntoArrow;
use dora_core::{
config::NodeRunConfig,
daemon_messages::{DataflowId, NodeConfig, RuntimeConfig, Timestamped},
config::{DataId, NodeRunConfig},
daemon_messages::{DataMessage, DataflowId, NodeConfig, RuntimeConfig, Timestamped},
descriptor::{
resolve_path, source_is_url, Descriptor, OperatorSource, ResolvedNode, SHELL_SOURCE,
},
get_python_path,
message::uhlc::HLC,
};
use dora_download::download_file;
use dora_node_api::{
arrow::array::ArrayData,
arrow_utils::{copy_array_into_sample, required_data_size},
Metadata,
};
use eyre::WrapErr;
use std::{
env::{consts::EXE_EXTENSION, temp_dir},
Expand Down Expand Up @@ -51,6 +58,9 @@ pub async fn spawn_node(
clock.clone(),
)
.await?;
let send_stdout_to = node
.send_stdout_as()
.context("Could not resolve `send_stdout_as` configuration")?;

let mut child = match node.kind {
dora_core::descriptor::CoreNodeKind::Custom(n) => {
Expand Down Expand Up @@ -265,6 +275,8 @@ pub async fn spawn_node(
// Stderr listener stream
let stderr_tx = tx.clone();
let node_id = node.id.clone();
let uhlc = clock.clone();
let daemon_tx_log = daemon_tx.clone();
tokio::spawn(async move {
let mut buffer = String::new();
let mut finished = false;
Expand Down Expand Up @@ -317,9 +329,40 @@ pub async fn spawn_node(
let _ = daemon_tx.send(event).await;
});

let node_id = node.id.clone();
// Log to file stream.
tokio::spawn(async move {
while let Some(message) = rx.recv().await {
// If log is an output, we're sending the logs to the dataflow
if let Some(stdout_output_name) = &send_stdout_to {
// Convert logs to DataMessage
let array = message.into_arrow();

let array: ArrayData = array.into();
let total_len = required_data_size(&array);
let mut sample: AVec<u8, ConstAlign<128>> = AVec::__from_elem(128, 0, total_len);

let type_info = copy_array_into_sample(&mut sample, &array);

let metadata = Metadata::new(uhlc.new_timestamp(), type_info);
let output_id = OutputId(
node_id.clone(),
DataId::from(stdout_output_name.to_string()),
);
let event = DoraEvent::Logs {
dataflow_id,
output_id,
metadata,
message: DataMessage::Vec(sample),
}
.into();
let event = Timestamped {
inner: event,
timestamp: uhlc.new_timestamp(),
};
let _ = daemon_tx_log.send(event).await;
}

let _ = file
.write_all(message.as_bytes())
.await
Expand Down
2 changes: 1 addition & 1 deletion binaries/runtime/src/operator/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ mod callback_impl {
let total_len = required_data_size(&arrow_array);
let mut sample = allocate_sample(total_len)?;

let type_info = copy_array_into_sample(&mut sample, &arrow_array)?;
let type_info = copy_array_into_sample(&mut sample, &arrow_array);

(sample, type_info)
} else {
Expand Down
5 changes: 1 addition & 4 deletions binaries/runtime/src/operator/shared_lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,7 @@ impl<'lib> SharedLibraryOperator<'lib> {
let total_len = required_data_size(&arrow_array);
let mut sample: AVec<u8, ConstAlign<128>> = AVec::__from_elem(128, 0, total_len);

let type_info = match copy_array_into_sample(&mut sample, &arrow_array) {
Ok(t) => t,
Err(err) => return DoraResult::from_error(err.to_string()),
};
let type_info = copy_array_into_sample(&mut sample, &arrow_array);

let event = OperatorEvent::Output {
output_id: DataId::from(String::from(output_id)),
Expand Down
3 changes: 3 additions & 0 deletions examples/python-operator-dataflow/dataflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@ nodes:
- id: object_detection
operator:
python: object_detection.py
send_stdout_as: stdout
inputs:
image: webcam/image
outputs:
- bbox
- stdout

- id: plot
operator:
python: plot.py
inputs:
image: webcam/image
bbox: object_detection/bbox
object_detection_stdout: object_detection/stdout
23 changes: 23 additions & 0 deletions examples/python-operator-dataflow/plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(self):
self.bboxs = []
self.bounding_box_messages = 0
self.image_messages = 0
self.object_detection_stdout = []

def on_event(
self,
Expand Down Expand Up @@ -69,12 +70,22 @@ def on_input(
self.image_messages += 1
print("received " + str(self.image_messages) + " images")

elif dora_input["id"] == "object_detection_stdout":
stdout = dora_input["value"][0].as_py()
self.object_detection_stdout += [stdout]
## Only keep last 10 stdout
self.object_detection_stdout = self.object_detection_stdout[-10:]
return DoraStatus.CONTINUE

elif dora_input["id"] == "bbox" and len(self.image) != 0:
bboxs = dora_input["value"].to_numpy()
self.bboxs = np.reshape(bboxs, (-1, 6))

self.bounding_box_messages += 1
print("received " + str(self.bounding_box_messages) + " bounding boxes")
return DoraStatus.CONTINUE
else:
return DoraStatus.CONTINUE

for bbox in self.bboxs:
[
Expand Down Expand Up @@ -104,6 +115,18 @@ def on_input(
1,
)

for i, log in enumerate(self.object_detection_stdout):
cv2.putText(
self.image,
log,
(10, 10 + 20 * i),
font,
0.5,
(0, 255, 0),
2,
1,
)

if CI != "true":
cv2.imshow("frame", self.image)
if cv2.waitKey(1) & 0xFF == ord("q"):
Expand Down
Loading

0 comments on commit 2615b04

Please sign in to comment.