Skip to content

Commit

Permalink
Merge pull request #365 from dora-rs/dora-record
Browse files Browse the repository at this point in the history
Adding Dora record
  • Loading branch information
haixuanTao authored Oct 31, 2023
2 parents 0ef994e + e8f658f commit 95fcf46
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 28 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ jobs:
cargo publish -p dora-runtime --token ${{ secrets.CARGO_REGISTRY_TOKEN }}
cargo publish -p dora-daemon --token ${{ secrets.CARGO_REGISTRY_TOKEN }}
# Publish extension crates
cargo publish -p dora-record --token ${{ secrets.CARGO_REGISTRY_TOKEN }}
windows-release:
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
# These are backup files generated by rustfmt
**/*.rs.bk

# Remove arrow file from dora-record
**/*.arrow

# Removing images.
*.jpg
*.png
Expand Down
17 changes: 10 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ members = [
"libraries/shared-memory-server",
"libraries/extensions/download",
"libraries/extensions/telemetry/*",
"libraries/extensions/zenoh-logger",
"libraries/extensions/dora-record",
"libraries/extensions/ros2-bridge",
"libraries/extensions/ros2-bridge/msg-gen",
"libraries/extensions/ros2-bridge/msg-gen-macro",
Expand Down
7 changes: 7 additions & 0 deletions examples/python-operator-dataflow/dataflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,10 @@ nodes:
inputs:
image: webcam/image
bbox: object_detection/bbox
- id: dora-record
custom:
build: cargo build -p dora-record
source: ../../target/debug/dora-record
inputs:
image: webcam/image
bbox: object_detection/bbox
7 changes: 7 additions & 0 deletions examples/rust-dataflow/dataflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,10 @@ nodes:
source: ../../target/debug/rust-dataflow-example-sink
inputs:
message: runtime-node/rust-operator/status
- id: dora-record
custom:
build: cargo build -p dora-record
source: ../../target/debug/dora-record
inputs:
message: runtime-node/rust-operator/status
random: rust-node/random
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "zenoh-logger"
name = "dora-record"
version.workspace = true
edition = "2021"
documentation.workspace = true
Expand All @@ -9,4 +9,7 @@ license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
zenoh = "0.7.0-rc"
dora-node-api = { workspace = true, features = ["tracing"] }
eyre = "0.6.8"
chrono = "0.4.31"
dora-tracing = { workspace = true }
137 changes: 137 additions & 0 deletions libraries/extensions/dora-record/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use chrono::{DateTime, Utc};
use dora_node_api::{
self,
arrow::{
array::{
make_array, Array, ListArray, StringArray, TimestampMillisecondArray, UInt64Array,
},
buffer::{OffsetBuffer, ScalarBuffer},
datatypes::{DataType, Field, Schema},
ipc::writer::FileWriter,
record_batch::RecordBatch,
},
DoraNode, Event, Metadata,
};
use dora_tracing::telemetry::deserialize_to_hashmap;
use eyre::{Context, ContextCompat};
use std::{collections::HashMap, fs::File, sync::Arc};

fn main() -> eyre::Result<()> {
let (_node, mut events) = DoraNode::init_from_env()?;

let mut writers = HashMap::new();
while let Some(event) = events.recv() {
match event {
Event::Input { id, data, metadata } => {
match writers.get_mut(&id) {
None => {
let field_uhlc = Field::new("timestamp_uhlc", DataType::UInt64, false);
let field_utc_epoch = Field::new(
"timestamp_utc",
DataType::Timestamp(
dora_node_api::arrow::datatypes::TimeUnit::Millisecond,
None,
),
false,
);
let field_trace_id = Field::new("trace_id", DataType::Utf8, true);
let field_span_id = Field::new("span_id", DataType::Utf8, true);
let field_values =
Arc::new(Field::new("item", data.data_type().clone(), true));
let field_data = Field::new(id.clone(), DataType::List(field_values), true);

let schema = Arc::new(Schema::new(vec![
field_trace_id,
field_span_id,
field_uhlc,
field_utc_epoch,
field_data,
]));
let file = std::fs::File::create(format!("{id}.arrow")).unwrap();

let writer = FileWriter::try_new(file, &schema).unwrap();
let mut writer = writer;
write_event(&mut writer, data.into(), &metadata)
.context("could not write first record data")?;
writers.insert(id.clone(), writer);
}
Some(writer) => {
write_event(writer, data.into(), &metadata)
.context("could not write record data")?;
}
};
}
Event::InputClosed { id } => match writers.remove(&id) {
None => {}
Some(mut writer) => writer.finish().context("Could not finish arrow file")?,
},
_ => {}
}
}

let result: eyre::Result<Vec<_>> = writers
.iter_mut()
.map(|(_, writer)| -> eyre::Result<()> {
writer
.finish()
.context("Could not finish writing arrow file")?;
Ok(())
})
.collect();
result.context("At least one of the input recorder file writer failed to finish")?;

Ok(())
}

/// Write a row of data into the writer
fn write_event(
writer: &mut FileWriter<File>,
data: Arc<dyn Array>,
metadata: &Metadata,
) -> eyre::Result<()> {
let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, data.len() as i32]));
let field = Arc::new(Field::new("item", data.data_type().clone(), true));
let list = ListArray::new(field, offsets, data.clone(), None);

let timestamp = metadata.timestamp();
let timestamp_uhlc = UInt64Array::from(vec![timestamp.get_time().0]);
let timestamp_uhlc = make_array(timestamp_uhlc.into());
let system_time = timestamp.get_time().to_system_time();

let dt: DateTime<Utc> = system_time.into();
let timestamp_utc = TimestampMillisecondArray::from(vec![dt.timestamp_millis()]);
let timestamp_utc = make_array(timestamp_utc.into());

let string_otel_context = metadata.parameters.open_telemetry_context.to_string();
let otel_context = deserialize_to_hashmap(&string_otel_context);
let traceparent = otel_context.get("traceparent");
let trace_id = match traceparent {
None => "",
Some(trace) => trace.split('-').nth(1).context("Trace is malformatted")?,
};
let span_id = match traceparent {
None => "",
Some(trace) => trace.split('-').nth(2).context("Trace is malformatted")?,
};
let trace_id_array = StringArray::from(vec![trace_id]);
let trace_id_array = make_array(trace_id_array.into());
let span_id_array = StringArray::from(vec![span_id]);
let span_id_array = make_array(span_id_array.into());

let record = RecordBatch::try_new(
writer.schema().clone(),
vec![
trace_id_array,
span_id_array,
timestamp_uhlc,
timestamp_utc,
make_array(list.into()),
],
)
.context("Could not create record batch with the given data")?;
writer
.write(&record)
.context("Could not write recordbatch to file")?;

Ok(())
}
11 changes: 8 additions & 3 deletions libraries/extensions/telemetry/tracing/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,17 @@ pub fn serialize_context(context: &Context) -> String {
}

pub fn deserialize_context(string_context: &str) -> Context {
let mut map = MetadataMap(HashMap::new());
let map = MetadataMap(deserialize_to_hashmap(string_context));
global::get_text_map_propagator(|prop| prop.extract(&map))
}

pub fn deserialize_to_hashmap(string_context: &str) -> HashMap<&str, &str> {
let mut map = HashMap::new();
for s in string_context.split(';') {
let mut values = s.split(':');
let key = values.next().unwrap();
let value = values.next().unwrap_or("");
map.0.insert(key, value);
map.insert(key, value);
}
global::get_text_map_propagator(|prop| prop.extract(&map))
map
}
15 changes: 0 additions & 15 deletions libraries/extensions/zenoh-logger/src/main.rs

This file was deleted.

0 comments on commit 95fcf46

Please sign in to comment.