diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 8268fe528..2e45aab79 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -62,6 +62,9 @@ jobs: cargo publish -p dora-runtime --token ${{ secrets.CARGO_REGISTRY_TOKEN }} cargo publish -p dora-daemon --token ${{ secrets.CARGO_REGISTRY_TOKEN }} + # Publish extension crates + cargo publish -p dora-record --token ${{ secrets.CARGO_REGISTRY_TOKEN }} + windows-release: diff --git a/.gitignore b/.gitignore index c9121e8d8..557fa2e1d 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,9 @@ # These are backup files generated by rustfmt **/*.rs.bk +# Remove arrow file from dora-record +**/*.arrow + # Removing images. *.jpg *.png diff --git a/Cargo.lock b/Cargo.lock index c2ff128af..553bb0888 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1619,6 +1619,16 @@ dependencies = [ "safer-ffi", ] +[[package]] +name = "dora-record" +version = "0.2.6" +dependencies = [ + "chrono", + "dora-node-api", + "dora-tracing", + "eyre", +] + [[package]] name = "dora-ros2-bridge" version = "0.1.0" @@ -6569,13 +6579,6 @@ dependencies = [ "zenoh-sync", ] -[[package]] -name = "zenoh-logger" -version = "0.2.6" -dependencies = [ - "zenoh", -] - [[package]] name = "zenoh-macros" version = "0.7.0-rc" diff --git a/Cargo.toml b/Cargo.toml index cd8a6fa59..283c412f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ members = [ "libraries/shared-memory-server", "libraries/extensions/download", "libraries/extensions/telemetry/*", - "libraries/extensions/zenoh-logger", + "libraries/extensions/dora-record", "libraries/extensions/ros2-bridge", "libraries/extensions/ros2-bridge/msg-gen", "libraries/extensions/ros2-bridge/msg-gen-macro", diff --git a/examples/python-operator-dataflow/dataflow.yml b/examples/python-operator-dataflow/dataflow.yml index 92bf5f2b3..72630d27b 100644 --- a/examples/python-operator-dataflow/dataflow.yml +++ b/examples/python-operator-dataflow/dataflow.yml @@ -21,3 +21,10 @@ nodes: inputs: image: webcam/image bbox: object_detection/bbox + - id: dora-record + custom: + build: cargo build -p dora-record + source: ../../target/debug/dora-record + inputs: + image: webcam/image + bbox: object_detection/bbox diff --git a/examples/rust-dataflow/dataflow.yml b/examples/rust-dataflow/dataflow.yml index 84dd44040..92d607349 100644 --- a/examples/rust-dataflow/dataflow.yml +++ b/examples/rust-dataflow/dataflow.yml @@ -23,3 +23,10 @@ nodes: source: ../../target/debug/rust-dataflow-example-sink inputs: message: runtime-node/rust-operator/status + - id: dora-record + custom: + build: cargo build -p dora-record + source: ../../target/debug/dora-record + inputs: + message: runtime-node/rust-operator/status + random: rust-node/random \ No newline at end of file diff --git a/libraries/extensions/zenoh-logger/Cargo.toml b/libraries/extensions/dora-record/Cargo.toml similarity index 62% rename from libraries/extensions/zenoh-logger/Cargo.toml rename to libraries/extensions/dora-record/Cargo.toml index 15666b605..1d8dbe730 100644 --- a/libraries/extensions/zenoh-logger/Cargo.toml +++ b/libraries/extensions/dora-record/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "zenoh-logger" +name = "dora-record" version.workspace = true edition = "2021" documentation.workspace = true @@ -9,4 +9,7 @@ license.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -zenoh = "0.7.0-rc" +dora-node-api = { workspace = true, features = ["tracing"] } +eyre = "0.6.8" +chrono = "0.4.31" +dora-tracing = { workspace = true } diff --git a/libraries/extensions/dora-record/src/main.rs b/libraries/extensions/dora-record/src/main.rs new file mode 100644 index 000000000..2924fc4d0 --- /dev/null +++ b/libraries/extensions/dora-record/src/main.rs @@ -0,0 +1,137 @@ +use chrono::{DateTime, Utc}; +use dora_node_api::{ + self, + arrow::{ + array::{ + make_array, Array, ListArray, StringArray, TimestampMillisecondArray, UInt64Array, + }, + buffer::{OffsetBuffer, ScalarBuffer}, + datatypes::{DataType, Field, Schema}, + ipc::writer::FileWriter, + record_batch::RecordBatch, + }, + DoraNode, Event, Metadata, +}; +use dora_tracing::telemetry::deserialize_to_hashmap; +use eyre::{Context, ContextCompat}; +use std::{collections::HashMap, fs::File, sync::Arc}; + +fn main() -> eyre::Result<()> { + let (_node, mut events) = DoraNode::init_from_env()?; + + let mut writers = HashMap::new(); + while let Some(event) = events.recv() { + match event { + Event::Input { id, data, metadata } => { + match writers.get_mut(&id) { + None => { + let field_uhlc = Field::new("timestamp_uhlc", DataType::UInt64, false); + let field_utc_epoch = Field::new( + "timestamp_utc", + DataType::Timestamp( + dora_node_api::arrow::datatypes::TimeUnit::Millisecond, + None, + ), + false, + ); + let field_trace_id = Field::new("trace_id", DataType::Utf8, true); + let field_span_id = Field::new("span_id", DataType::Utf8, true); + let field_values = + Arc::new(Field::new("item", data.data_type().clone(), true)); + let field_data = Field::new(id.clone(), DataType::List(field_values), true); + + let schema = Arc::new(Schema::new(vec![ + field_trace_id, + field_span_id, + field_uhlc, + field_utc_epoch, + field_data, + ])); + let file = std::fs::File::create(format!("{id}.arrow")).unwrap(); + + let writer = FileWriter::try_new(file, &schema).unwrap(); + let mut writer = writer; + write_event(&mut writer, data.into(), &metadata) + .context("could not write first record data")?; + writers.insert(id.clone(), writer); + } + Some(writer) => { + write_event(writer, data.into(), &metadata) + .context("could not write record data")?; + } + }; + } + Event::InputClosed { id } => match writers.remove(&id) { + None => {} + Some(mut writer) => writer.finish().context("Could not finish arrow file")?, + }, + _ => {} + } + } + + let result: eyre::Result> = writers + .iter_mut() + .map(|(_, writer)| -> eyre::Result<()> { + writer + .finish() + .context("Could not finish writing arrow file")?; + Ok(()) + }) + .collect(); + result.context("At least one of the input recorder file writer failed to finish")?; + + Ok(()) +} + +/// Write a row of data into the writer +fn write_event( + writer: &mut FileWriter, + data: Arc, + metadata: &Metadata, +) -> eyre::Result<()> { + let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, data.len() as i32])); + let field = Arc::new(Field::new("item", data.data_type().clone(), true)); + let list = ListArray::new(field, offsets, data.clone(), None); + + let timestamp = metadata.timestamp(); + let timestamp_uhlc = UInt64Array::from(vec![timestamp.get_time().0]); + let timestamp_uhlc = make_array(timestamp_uhlc.into()); + let system_time = timestamp.get_time().to_system_time(); + + let dt: DateTime = system_time.into(); + let timestamp_utc = TimestampMillisecondArray::from(vec![dt.timestamp_millis()]); + let timestamp_utc = make_array(timestamp_utc.into()); + + let string_otel_context = metadata.parameters.open_telemetry_context.to_string(); + let otel_context = deserialize_to_hashmap(&string_otel_context); + let traceparent = otel_context.get("traceparent"); + let trace_id = match traceparent { + None => "", + Some(trace) => trace.split('-').nth(1).context("Trace is malformatted")?, + }; + let span_id = match traceparent { + None => "", + Some(trace) => trace.split('-').nth(2).context("Trace is malformatted")?, + }; + let trace_id_array = StringArray::from(vec![trace_id]); + let trace_id_array = make_array(trace_id_array.into()); + let span_id_array = StringArray::from(vec![span_id]); + let span_id_array = make_array(span_id_array.into()); + + let record = RecordBatch::try_new( + writer.schema().clone(), + vec![ + trace_id_array, + span_id_array, + timestamp_uhlc, + timestamp_utc, + make_array(list.into()), + ], + ) + .context("Could not create record batch with the given data")?; + writer + .write(&record) + .context("Could not write recordbatch to file")?; + + Ok(()) +} diff --git a/libraries/extensions/telemetry/tracing/src/telemetry.rs b/libraries/extensions/telemetry/tracing/src/telemetry.rs index c24eb957a..526fe970b 100644 --- a/libraries/extensions/telemetry/tracing/src/telemetry.rs +++ b/libraries/extensions/telemetry/tracing/src/telemetry.rs @@ -54,12 +54,17 @@ pub fn serialize_context(context: &Context) -> String { } pub fn deserialize_context(string_context: &str) -> Context { - let mut map = MetadataMap(HashMap::new()); + let map = MetadataMap(deserialize_to_hashmap(string_context)); + global::get_text_map_propagator(|prop| prop.extract(&map)) +} + +pub fn deserialize_to_hashmap(string_context: &str) -> HashMap<&str, &str> { + let mut map = HashMap::new(); for s in string_context.split(';') { let mut values = s.split(':'); let key = values.next().unwrap(); let value = values.next().unwrap_or(""); - map.0.insert(key, value); + map.insert(key, value); } - global::get_text_map_propagator(|prop| prop.extract(&map)) + map } diff --git a/libraries/extensions/zenoh-logger/src/main.rs b/libraries/extensions/zenoh-logger/src/main.rs deleted file mode 100644 index 782cdfed0..000000000 --- a/libraries/extensions/zenoh-logger/src/main.rs +++ /dev/null @@ -1,15 +0,0 @@ -use zenoh::prelude::{sync::SyncResolve, Config}; - -fn main() { - let zenoh = zenoh::open(Config::default()).res_sync().unwrap(); - let sub = zenoh - .declare_subscriber("/**") - .reliable() - .res_sync() - .unwrap(); - - loop { - let msg = sub.recv().unwrap(); - println!("{}", msg.key_expr); - } -}