Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nodes hub to store and reuse commonly used node #569

Merged
merged 18 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,22 @@ jobs:
- uses: actions/checkout@v3
- uses: r7kamura/[email protected]
- run: cargo --version --verbose
- name: Free Disk Space (Ubuntu)
uses: jlumbroso/free-disk-space@main
if: runner.os == 'Linux'
with:
# this might remove tools that are actually needed,
# if set to "true" but frees about 6 GB
tool-cache: false

# all of these default to true, but feel free to set to
# "false" if necessary for your workflow
android: true
dotnet: true
haskell: true
large-packages: false
docker-images: true
swap-storage: false
- uses: Swatinem/rust-cache@v2
with:
cache-provider: buildjet
Expand Down Expand Up @@ -318,14 +334,15 @@ jobs:
cd test_python_project
dora up
dora list
dora build dataflow.yml
dora start dataflow.yml --name ci-python-test --detach
sleep 10
dora stop --name ci-python-test --grace-duration 5s
pip install "numpy<2.0.0" opencv-python
dora build ../examples/python-dataflow/dataflow_dynamic.yml
dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic --detach
python ../examples/python-dataflow/plot_dynamic.py
opencv-plot --name plot
sleep 5
dora stop --name ci-python-test --grace-duration 5s
dora stop --name ci-python-dynamic --grace-duration 5s
dora destroy

- name: "Test CLI (C)"
Expand All @@ -346,7 +363,7 @@ jobs:
sleep 10
dora stop --name ci-c-test --grace-duration 5s
dora destroy

- name: "Test CLI (C++)"
timeout-minutes: 30
# fail-fast by using bash shell explictly
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ members = [
"libraries/shared-memory-server",
"libraries/extensions/download",
"libraries/extensions/telemetry/*",
"tool_nodes/dora-record",
"tool_nodes/dora-rerun",
"node-hub/dora-record",
"node-hub/dora-rerun",
"libraries/extensions/ros2-bridge",
"libraries/extensions/ros2-bridge/msg-gen",
"libraries/extensions/ros2-bridge/python",
Expand Down
86 changes: 45 additions & 41 deletions apis/python/operator/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
use std::{
collections::HashMap,
collections::{BTreeMap, HashMap},
sync::{Arc, Mutex},
};

use arrow::pyarrow::ToPyArrow;
use dora_node_api::{
merged::{MergeExternalSend, MergedEvent},
DoraNode, Event, EventStream, Metadata, MetadataParameters,
DoraNode, Event, EventStream, Metadata, MetadataParameters, Parameter,
};
use eyre::{Context, Result};
use futures::{Stream, StreamExt};
use futures_concurrency::stream::Merge as _;
use pyo3::{
prelude::*,
pybacked::PyBackedStr,
types::{IntoPyDict, PyDict},
types::{IntoPyDict, PyBool, PyDict, PyInt, PyString},
};

/// Dora Event
Expand Down Expand Up @@ -94,7 +93,7 @@
if let Some(value) = self.value(py)? {
pydict.insert("value", value);
}
if let Some(metadata) = Self::metadata(event, py) {
if let Some(metadata) = Self::metadata(event, py)? {
pydict.insert("metadata", metadata);
}
if let Some(error) = Self::error(event) {
Expand Down Expand Up @@ -143,10 +142,14 @@
}
}

fn metadata(event: &Event, py: Python<'_>) -> Option<PyObject> {
fn metadata(event: &Event, py: Python<'_>) -> Result<Option<PyObject>> {
match event {
Event::Input { metadata, .. } => Some(metadata_to_pydict(metadata, py).to_object(py)),
_ => None,
Event::Input { metadata, .. } => Ok(Some(
metadata_to_pydict(metadata, py)
.context("Issue deserializing metadata")?
.to_object(py),
)),
_ => Ok(None),
}
}

Expand All @@ -159,44 +162,45 @@
}

pub fn pydict_to_metadata(dict: Option<Bound<'_, PyDict>>) -> Result<MetadataParameters> {
let mut default_metadata = MetadataParameters::default();
if let Some(metadata) = dict {
for (key, value) in metadata.iter() {
match key
.extract::<PyBackedStr>()
.context("Parsing metadata keys")?
.as_ref()
{
"watermark" => {
default_metadata.watermark =
value.extract().context("parsing watermark failed")?;
}
"deadline" => {
default_metadata.deadline =
value.extract().context("parsing deadline failed")?;
}
"open_telemetry_context" => {
let otel_context: PyBackedStr = value
.extract()
.context("parsing open telemetry context failed")?;
default_metadata.open_telemetry_context = otel_context.to_string();
}
_ => (),
}
let mut parameters = BTreeMap::default();
if let Some(pymetadata) = dict {
for (key, value) in pymetadata.iter() {
let key = key.extract::<String>().context("Parsing metadata keys")?;
if value.is_exact_instance_of::<PyBool>() {
parameters.insert(key, Parameter::Bool(value.extract()?))
} else if value.is_instance_of::<PyInt>() {
parameters.insert(key, Parameter::Integer(value.extract::<i64>()?))
} else if value.is_instance_of::<PyString>() {
parameters.insert(key, Parameter::String(value.extract()?))
} else {
println!("could not convert type {value}");
parameters.insert(key, Parameter::String(value.str()?.to_string()))
};
}
}
Ok(default_metadata)
Ok(parameters)
}

pub fn metadata_to_pydict<'a>(metadata: &'a Metadata, py: Python<'a>) -> pyo3::Bound<'a, PyDict> {
pub fn metadata_to_pydict<'a>(
metadata: &'a Metadata,
py: Python<'a>,
) -> Result<pyo3::Bound<'a, PyDict>> {
let dict = PyDict::new_bound(py);
dict.set_item(
"open_telemetry_context",
&metadata.parameters.open_telemetry_context,
)
.wrap_err("could not make metadata a python dictionary item")
.unwrap();
dict
for (k, v) in metadata.parameters.iter() {
match v {
Parameter::Bool(bool) => dict
.set_item(k, bool)
.context(format!("Could not insert metadata into python dictionary"))?,

Check warning on line 193 in apis/python/operator/src/lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

useless use of `format!`
Parameter::Integer(int) => dict
.set_item(k, int)
.context(format!("Could not insert metadata into python dictionary"))?,

Check warning on line 196 in apis/python/operator/src/lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

useless use of `format!`
Parameter::String(s) => dict
.set_item(k, s)
.context(format!("Could not insert metadata into python dictionary"))?,

Check warning on line 199 in apis/python/operator/src/lib.rs

View workflow job for this annotation

GitHub Actions / Clippy

useless use of `format!`
}
}

Ok(dict)
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion apis/rust/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
pub use arrow;
pub use dora_arrow_convert::*;
pub use dora_core;
pub use dora_core::message::{uhlc, Metadata, MetadataParameters};
pub use dora_core::message::{uhlc, Metadata, MetadataParameters, Parameter};
pub use event_stream::{merged, Event, EventStream, MappedInputData, RawData};
pub use flume::Receiver;
pub use node::{arrow_utils, DataSample, DoraNode, ZERO_COPY_THRESHOLD};
Expand Down
6 changes: 1 addition & 5 deletions apis/rust/node/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
serde_yaml::from_str(&raw).context("failed to deserialize operator config")?
};
#[cfg(feature = "tracing")]
set_up_tracing(&node_config.node_id.to_string())

Check warning on line 68 in apis/rust/node/src/node/mod.rs

View workflow job for this annotation

GitHub Actions / Clippy

unnecessary use of `to_string`
.context("failed to set up tracing subscriber")?;
Self::init(node_config)
}
Expand Down Expand Up @@ -250,11 +250,7 @@
if !self.node_config.outputs.contains(&output_id) {
eyre::bail!("unknown output");
}
let metadata = Metadata::from_parameters(
self.clock.new_timestamp(),
type_info,
parameters.into_owned(),
);
let metadata = Metadata::from_parameters(self.clock.new_timestamp(), type_info, parameters);

let (data, shmem) = match sample {
Some(sample) => sample.finalize(),
Expand Down
21 changes: 12 additions & 9 deletions binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use dora_core::daemon_messages::{
};
use dora_core::descriptor::runtime_node_inputs;
use dora_core::message::uhlc::{self, HLC};
use dora_core::message::{ArrowTypeInfo, Metadata, MetadataParameters};
use dora_core::message::{ArrowTypeInfo, Metadata};
use dora_core::topics::LOCALHOST;
use dora_core::topics::{
DataflowDaemonResult, DataflowResult, NodeError, NodeErrorCause, NodeExitStatus,
Expand All @@ -23,6 +23,7 @@ use dora_core::{
descriptor::{CoreNodeKind, Descriptor, ResolvedNode},
};

use dora_node_api::Parameter;
use eyre::{bail, eyre, Context, ContextCompat, Result};
use futures::{future, stream, FutureExt, TryFutureExt};
use futures_concurrency::stream::Merge;
Expand Down Expand Up @@ -1543,17 +1544,19 @@ impl RunningDataflow {
let span = tracing::span!(tracing::Level::TRACE, "tick");
let _ = span.enter();

let mut parameters = BTreeMap::new();
parameters.insert(
"open_telemetry_context".to_string(),
#[cfg(feature = "telemetry")]
Parameter::String(serialize_context(&span.context())),
#[cfg(not(feature = "telemetry"))]
Parameter::String("".into()),
);

let metadata = dora_core::message::Metadata::from_parameters(
hlc.new_timestamp(),
ArrowTypeInfo::empty(),
MetadataParameters {
watermark: 0,
deadline: 0,
#[cfg(feature = "telemetry")]
open_telemetry_context: serialize_context(&span.context()),
#[cfg(not(feature = "telemetry"))]
open_telemetry_context: "".into(),
},
parameters,
);

let event = Timestamped {
Expand Down
23 changes: 16 additions & 7 deletions binaries/runtime/src/operator/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use dora_core::{
descriptor::{source_is_url, Descriptor, PythonSource},
};
use dora_download::download_file;
use dora_node_api::{merged::MergedEvent, Event};
use dora_node_api::{merged::MergedEvent, Event, Parameter};
use dora_operator_api_python::PyEvent;
use dora_operator_api_types::DoraStatus;
use eyre::{bail, eyre, Context, Result};
Expand Down Expand Up @@ -201,11 +201,15 @@ pub fn run(
use tracing_opentelemetry::OpenTelemetrySpanExt;
span.record("input_id", input_id.as_str());

let cx = deserialize_context(&metadata.parameters.open_telemetry_context);
let otel = metadata.open_telemetry_context();
let cx = deserialize_context(&otel);
span.set_parent(cx);
let cx = span.context();
let string_cx = serialize_context(&cx);
metadata.parameters.open_telemetry_context = string_cx;
metadata.parameters.insert(
"open_telemetry_context".to_string(),
Parameter::String(string_cx),
);
}

let py_event = PyEvent {
Expand Down Expand Up @@ -317,17 +321,22 @@ mod callback_impl {
metadata: Option<Bound<'_, PyDict>>,
py: Python,
) -> Result<()> {
let parameters = pydict_to_metadata(metadata)
.wrap_err("failed to parse metadata")?
.into_owned();
let parameters = pydict_to_metadata(metadata).wrap_err("failed to parse metadata")?;
let span = span!(
tracing::Level::TRACE,
"send_output",
output_id = field::Empty
);
span.record("output_id", output);
let otel = if let Some(dora_node_api::Parameter::String(otel)) =
parameters.get("open_telemetry_context")
{
otel.to_string()
} else {
"".to_string()
};

let cx = deserialize_context(&parameters.open_telemetry_context);
let cx = deserialize_context(&otel);
span.set_parent(cx);
let _ = span.enter();

Expand Down
27 changes: 15 additions & 12 deletions binaries/runtime/src/operator/shared_lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use dora_core::{
use dora_download::download_file;
use dora_node_api::{
arrow_utils::{copy_array_into_sample, required_data_size},
Event, MetadataParameters,
Event, Parameter,
};
use dora_operator_api_types::{
safer_ffi::closure::ArcDynFn1, DoraDropOperator, DoraInitOperator, DoraInitResult, DoraOnEvent,
Expand All @@ -17,6 +17,7 @@ use dora_operator_api_types::{
use eyre::{bail, eyre, Context, Result};
use libloading::Symbol;
use std::{
collections::BTreeMap,
ffi::c_void,
panic::{catch_unwind, AssertUnwindSafe},
path::Path,
Expand Down Expand Up @@ -119,10 +120,11 @@ impl<'lib> SharedLibraryOperator<'lib> {
open_telemetry_context,
},
} = output;
let parameters = MetadataParameters {
open_telemetry_context: open_telemetry_context.into(),
..Default::default()
};
let mut parameters = BTreeMap::new();
parameters.insert(
"open_telemetry_context".to_string(),
Parameter::String(open_telemetry_context.to_string()),
);

let arrow_array = match unsafe { arrow::ffi::from_ffi(data_array, &schema) } {
Ok(a) => a,
Expand Down Expand Up @@ -173,11 +175,15 @@ impl<'lib> SharedLibraryOperator<'lib> {
use tracing_opentelemetry::OpenTelemetrySpanExt;
span.record("input_id", input_id.as_str());

let cx = deserialize_context(&metadata.parameters.open_telemetry_context);
let otel = metadata.open_telemetry_context();
let cx = deserialize_context(&otel);
span.set_parent(cx);
let cx = span.context();
let string_cx = serialize_context(&cx);
metadata.parameters.open_telemetry_context = string_cx;
metadata.parameters.insert(
"open_telemetry_context".to_string(),
Parameter::String(string_cx),
);
}

let mut operator_event = match event {
Expand All @@ -193,16 +199,13 @@ impl<'lib> SharedLibraryOperator<'lib> {
data,
} => {
let (data_array, schema) = arrow::ffi::to_ffi(&data.to_data())?;

let otel = metadata.open_telemetry_context();
let operator_input = dora_operator_api_types::Input {
id: String::from(input_id).into(),
data_array: Some(data_array),
schema,
metadata: Metadata {
open_telemetry_context: metadata
.parameters
.open_telemetry_context
.into(),
open_telemetry_context: otel.into(),
},
};
dora_operator_api_types::RawEvent {
Expand Down
Loading
Loading