From 3ae07203cc1f1522b69b58e09490a0ea5acb0ebe Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Thu, 19 Oct 2023 10:06:13 +0200 Subject: [PATCH 1/9] replacing `zenoh-logger` with `dora-record` --- .gitignore | 3 + Cargo.lock | 17 ++- Cargo.toml | 2 +- .../python-operator-dataflow/dataflow.yml | 7 + examples/rust-dataflow/dataflow.yml | 8 ++ .../{zenoh-logger => dora-record}/Cargo.toml | 7 +- libraries/extensions/dora-record/src/main.rs | 120 ++++++++++++++++++ .../telemetry/tracing/src/telemetry.rs | 11 +- libraries/extensions/zenoh-logger/src/main.rs | 15 --- 9 files changed, 162 insertions(+), 28 deletions(-) rename libraries/extensions/{zenoh-logger => dora-record}/Cargo.toml (62%) create mode 100644 libraries/extensions/dora-record/src/main.rs delete mode 100644 libraries/extensions/zenoh-logger/src/main.rs diff --git a/.gitignore b/.gitignore index c9121e8d8..557fa2e1d 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,9 @@ # These are backup files generated by rustfmt **/*.rs.bk +# Remove arrow file from dora-record +**/*.arrow + # Removing images. *.jpg *.png diff --git a/Cargo.lock b/Cargo.lock index 885ac2d1f..0901276f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1619,6 +1619,16 @@ dependencies = [ "safer-ffi", ] +[[package]] +name = "dora-record" +version = "0.2.6" +dependencies = [ + "chrono", + "dora-node-api", + "dora-tracing", + "eyre", +] + [[package]] name = "dora-ros2-bridge" version = "0.1.0" @@ -6569,13 +6579,6 @@ dependencies = [ "zenoh-sync", ] -[[package]] -name = "zenoh-logger" -version = "0.2.6" -dependencies = [ - "zenoh", -] - [[package]] name = "zenoh-macros" version = "0.7.0-rc" diff --git a/Cargo.toml b/Cargo.toml index cd8a6fa59..283c412f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ members = [ "libraries/shared-memory-server", "libraries/extensions/download", "libraries/extensions/telemetry/*", - "libraries/extensions/zenoh-logger", + "libraries/extensions/dora-record", "libraries/extensions/ros2-bridge", "libraries/extensions/ros2-bridge/msg-gen", "libraries/extensions/ros2-bridge/msg-gen-macro", diff --git a/examples/python-operator-dataflow/dataflow.yml b/examples/python-operator-dataflow/dataflow.yml index 92bf5f2b3..fb08e5eba 100644 --- a/examples/python-operator-dataflow/dataflow.yml +++ b/examples/python-operator-dataflow/dataflow.yml @@ -21,3 +21,10 @@ nodes: inputs: image: webcam/image bbox: object_detection/bbox + - id: dora-record + custom: + build: cargo build -p dora-record + source: ../../target/release/dora-record + inputs: + image: webcam/image + bbox: object_detection/bbox diff --git a/examples/rust-dataflow/dataflow.yml b/examples/rust-dataflow/dataflow.yml index 84dd44040..f19ed5169 100644 --- a/examples/rust-dataflow/dataflow.yml +++ b/examples/rust-dataflow/dataflow.yml @@ -23,3 +23,11 @@ nodes: source: ../../target/debug/rust-dataflow-example-sink inputs: message: runtime-node/rust-operator/status + - id: dora-record + custom: + build: cargo build -p dora-record + source: ../../target/debug/dora-record + inputs: + message: runtime-node/rust-operator/status + tick: dora/timer/millis/100 + random: rust-node/random \ No newline at end of file diff --git a/libraries/extensions/zenoh-logger/Cargo.toml b/libraries/extensions/dora-record/Cargo.toml similarity index 62% rename from libraries/extensions/zenoh-logger/Cargo.toml rename to libraries/extensions/dora-record/Cargo.toml index 15666b605..1d8dbe730 100644 --- a/libraries/extensions/zenoh-logger/Cargo.toml +++ b/libraries/extensions/dora-record/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "zenoh-logger" +name = "dora-record" version.workspace = true edition = "2021" documentation.workspace = true @@ -9,4 +9,7 @@ license.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -zenoh = "0.7.0-rc" +dora-node-api = { workspace = true, features = ["tracing"] } +eyre = "0.6.8" +chrono = "0.4.31" +dora-tracing = { workspace = true } diff --git a/libraries/extensions/dora-record/src/main.rs b/libraries/extensions/dora-record/src/main.rs new file mode 100644 index 000000000..14d6ccba5 --- /dev/null +++ b/libraries/extensions/dora-record/src/main.rs @@ -0,0 +1,120 @@ +use chrono::{DateTime, Utc}; +use dora_node_api::{ + self, + arrow::{ + array::{make_array, Array, Int64Array, ListArray, StringArray}, + buffer::{OffsetBuffer, ScalarBuffer}, + datatypes::{DataType, Field, Schema}, + ipc::writer::FileWriter, + record_batch::RecordBatch, + }, + DoraNode, Event, Metadata, +}; +use dora_tracing::telemetry::deserialize_to_hashmap; +use eyre::{Context, ContextCompat}; +use std::{collections::HashMap, fs::File, sync::Arc}; + +// Remove once arrow-rs 48.0 is published with access to writer schema. +// See: https://github.com/apache/arrow-rs/pull/4940 +struct WriterContext { + writer: FileWriter, + schema: Arc, +} + +fn main() -> eyre::Result<()> { + let (_node, mut events) = DoraNode::init_from_env()?; + + let mut writers = HashMap::new(); + while let Some(event) = events.recv() { + match event { + Event::Input { id, data, metadata } => { + match writers.get_mut(&id) { + None => { + let field_timestamp = Field::new("timestamp", DataType::Int64, true); + let field_otel_context = Field::new("otel_context", DataType::Utf8, true); + let field_values = + Arc::new(Field::new("item", data.data_type().clone(), true)); + let field_data = Field::new(id.clone(), DataType::List(field_values), true); + + let schema = Arc::new(Schema::new(vec![ + field_otel_context, + field_timestamp, + field_data, + ])); + let file = std::fs::File::create(format!("{id}.arrow")).unwrap(); + + let writer = FileWriter::try_new(file, &schema).unwrap(); + let mut writer_context = WriterContext { writer, schema }; + write_event(&mut writer_context, data.into(), &metadata) + .context("could not write first record data")?; + writers.insert(id.clone(), writer_context); + } + Some(writer_context) => { + write_event(writer_context, data.into(), &metadata) + .context("could not write first record data")?; + } + }; + } + Event::InputClosed { id } => match writers.remove(&id) { + None => {} + Some(mut writer) => writer + .writer + .finish() + .context("Could not finish arrow file")?, + }, + _ => {} + } + } + + let result: eyre::Result> = writers + .iter_mut() + .map(|(_, wc)| -> eyre::Result<()> { + wc.writer + .finish() + .context("Could not finish writing arrow file")?; + Ok(()) + }) + .collect(); + result.context("One of the input recorder file writer failed to finish")?; + + Ok(()) +} + +fn write_event( + writer_context: &mut WriterContext, + data: Arc, + metadata: &Metadata, +) -> eyre::Result<()> { + let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, data.len() as i32])); + let field = Arc::new(Field::new("item", data.data_type().clone(), true)); + let list = ListArray::new(field.clone(), offsets, data.clone(), None); + + let timestamp = metadata.timestamp(); + let timestamp = timestamp.get_time().to_system_time(); + + let dt: DateTime = timestamp.into(); + let timestamp_array = Int64Array::from(vec![dt.timestamp_millis()]); + let timestamp_array = make_array(timestamp_array.into()); + + let string_otel_context = metadata.parameters.open_telemetry_context.to_string(); + let otel_context = deserialize_to_hashmap(&string_otel_context); + let traceparent = otel_context.get("traceparent").clone(); + let traceparent = match traceparent { + None => "", + Some(trace) => trace.split("-").nth(2).context("Trace is malformatted")?, + }; + let otel_context_array = StringArray::from(vec![traceparent]); + let otel_context_array = make_array(otel_context_array.into()); + + let record = RecordBatch::try_new( + writer_context.schema.clone(), + vec![otel_context_array, timestamp_array, make_array(list.into())], + ) + .context("Could not create record batch with the given data")?; + writer_context + .writer + .write(&record) + .context("Could not write recordbatch to file")?; + + Ok(()) +} diff --git a/libraries/extensions/telemetry/tracing/src/telemetry.rs b/libraries/extensions/telemetry/tracing/src/telemetry.rs index c24eb957a..526fe970b 100644 --- a/libraries/extensions/telemetry/tracing/src/telemetry.rs +++ b/libraries/extensions/telemetry/tracing/src/telemetry.rs @@ -54,12 +54,17 @@ pub fn serialize_context(context: &Context) -> String { } pub fn deserialize_context(string_context: &str) -> Context { - let mut map = MetadataMap(HashMap::new()); + let map = MetadataMap(deserialize_to_hashmap(string_context)); + global::get_text_map_propagator(|prop| prop.extract(&map)) +} + +pub fn deserialize_to_hashmap(string_context: &str) -> HashMap<&str, &str> { + let mut map = HashMap::new(); for s in string_context.split(';') { let mut values = s.split(':'); let key = values.next().unwrap(); let value = values.next().unwrap_or(""); - map.0.insert(key, value); + map.insert(key, value); } - global::get_text_map_propagator(|prop| prop.extract(&map)) + map } diff --git a/libraries/extensions/zenoh-logger/src/main.rs b/libraries/extensions/zenoh-logger/src/main.rs deleted file mode 100644 index 782cdfed0..000000000 --- a/libraries/extensions/zenoh-logger/src/main.rs +++ /dev/null @@ -1,15 +0,0 @@ -use zenoh::prelude::{sync::SyncResolve, Config}; - -fn main() { - let zenoh = zenoh::open(Config::default()).res_sync().unwrap(); - let sub = zenoh - .declare_subscriber("/**") - .reliable() - .res_sync() - .unwrap(); - - loop { - let msg = sub.recv().unwrap(); - println!("{}", msg.key_expr); - } -} From 319f1c93b18268030ee5e27622162627abca8fcf Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Thu, 19 Oct 2023 12:04:27 +0200 Subject: [PATCH 2/9] Use `trace_id` within the logs --- libraries/extensions/dora-record/src/main.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/libraries/extensions/dora-record/src/main.rs b/libraries/extensions/dora-record/src/main.rs index 14d6ccba5..74cb7a674 100644 --- a/libraries/extensions/dora-record/src/main.rs +++ b/libraries/extensions/dora-record/src/main.rs @@ -99,11 +99,11 @@ fn write_event( let string_otel_context = metadata.parameters.open_telemetry_context.to_string(); let otel_context = deserialize_to_hashmap(&string_otel_context); let traceparent = otel_context.get("traceparent").clone(); - let traceparent = match traceparent { + let trace_id = match traceparent { None => "", - Some(trace) => trace.split("-").nth(2).context("Trace is malformatted")?, + Some(trace) => trace.split("-").nth(1).context("Trace is malformatted")?, }; - let otel_context_array = StringArray::from(vec![traceparent]); + let otel_context_array = StringArray::from(vec![trace_id]); let otel_context_array = make_array(otel_context_array.into()); let record = RecordBatch::try_new( From cd23f87b08406c87770db575ae62e6abb258f5fc Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Thu, 19 Oct 2023 14:00:01 +0200 Subject: [PATCH 3/9] add dora-record into release page --- .github/workflows/release.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 8268fe528..2e45aab79 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -62,6 +62,9 @@ jobs: cargo publish -p dora-runtime --token ${{ secrets.CARGO_REGISTRY_TOKEN }} cargo publish -p dora-daemon --token ${{ secrets.CARGO_REGISTRY_TOKEN }} + # Publish extension crates + cargo publish -p dora-record --token ${{ secrets.CARGO_REGISTRY_TOKEN }} + windows-release: From 4bf3f7e48f252ee8efb5410e276ea0cbe7a47191 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Thu, 19 Oct 2023 14:05:21 +0200 Subject: [PATCH 4/9] Split otel context into `traceid` and `spanid` --- libraries/extensions/dora-record/src/main.rs | 24 ++++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/libraries/extensions/dora-record/src/main.rs b/libraries/extensions/dora-record/src/main.rs index 74cb7a674..6d3ccb072 100644 --- a/libraries/extensions/dora-record/src/main.rs +++ b/libraries/extensions/dora-record/src/main.rs @@ -31,13 +31,15 @@ fn main() -> eyre::Result<()> { match writers.get_mut(&id) { None => { let field_timestamp = Field::new("timestamp", DataType::Int64, true); - let field_otel_context = Field::new("otel_context", DataType::Utf8, true); + let field_trace_id = Field::new("trace_id", DataType::Utf8, true); + let field_span_id = Field::new("span_id", DataType::Utf8, true); let field_values = Arc::new(Field::new("item", data.data_type().clone(), true)); let field_data = Field::new(id.clone(), DataType::List(field_values), true); let schema = Arc::new(Schema::new(vec![ - field_otel_context, + field_trace_id, + field_span_id, field_timestamp, field_data, ])); @@ -80,6 +82,7 @@ fn main() -> eyre::Result<()> { Ok(()) } +/// Write a row of data into the writer fn write_event( writer_context: &mut WriterContext, data: Arc, @@ -103,12 +106,23 @@ fn write_event( None => "", Some(trace) => trace.split("-").nth(1).context("Trace is malformatted")?, }; - let otel_context_array = StringArray::from(vec![trace_id]); - let otel_context_array = make_array(otel_context_array.into()); + let span_id = match traceparent { + None => "", + Some(trace) => trace.split("-").nth(2).context("Trace is malformatted")?, + }; + let trace_id_array = StringArray::from(vec![trace_id]); + let trace_id_array = make_array(trace_id_array.into()); + let span_id_array = StringArray::from(vec![span_id]); + let span_id_array = make_array(span_id_array.into()); let record = RecordBatch::try_new( writer_context.schema.clone(), - vec![otel_context_array, timestamp_array, make_array(list.into())], + vec![ + trace_id_array, + span_id_array, + timestamp_array, + make_array(list.into()), + ], ) .context("Could not create record batch with the given data")?; writer_context From 2561533b6383b1258ca32ec92b5b9d43b796e4a3 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Fri, 27 Oct 2023 11:22:54 +0200 Subject: [PATCH 5/9] Removing `Writer_Context` as `arrow 0.48` access the underlying schema file --- libraries/extensions/dora-record/src/main.rs | 37 +++++++------------- 1 file changed, 13 insertions(+), 24 deletions(-) diff --git a/libraries/extensions/dora-record/src/main.rs b/libraries/extensions/dora-record/src/main.rs index 6d3ccb072..34b911359 100644 --- a/libraries/extensions/dora-record/src/main.rs +++ b/libraries/extensions/dora-record/src/main.rs @@ -14,13 +14,6 @@ use dora_tracing::telemetry::deserialize_to_hashmap; use eyre::{Context, ContextCompat}; use std::{collections::HashMap, fs::File, sync::Arc}; -// Remove once arrow-rs 48.0 is published with access to writer schema. -// See: https://github.com/apache/arrow-rs/pull/4940 -struct WriterContext { - writer: FileWriter, - schema: Arc, -} - fn main() -> eyre::Result<()> { let (_node, mut events) = DoraNode::init_from_env()?; @@ -46,23 +39,20 @@ fn main() -> eyre::Result<()> { let file = std::fs::File::create(format!("{id}.arrow")).unwrap(); let writer = FileWriter::try_new(file, &schema).unwrap(); - let mut writer_context = WriterContext { writer, schema }; - write_event(&mut writer_context, data.into(), &metadata) + let mut writer = writer; + write_event(&mut writer, data.into(), &metadata) .context("could not write first record data")?; - writers.insert(id.clone(), writer_context); + writers.insert(id.clone(), writer); } - Some(writer_context) => { - write_event(writer_context, data.into(), &metadata) - .context("could not write first record data")?; + Some(writer) => { + write_event(writer, data.into(), &metadata) + .context("could not write record data")?; } }; } Event::InputClosed { id } => match writers.remove(&id) { None => {} - Some(mut writer) => writer - .writer - .finish() - .context("Could not finish arrow file")?, + Some(mut writer) => writer.finish().context("Could not finish arrow file")?, }, _ => {} } @@ -70,21 +60,21 @@ fn main() -> eyre::Result<()> { let result: eyre::Result> = writers .iter_mut() - .map(|(_, wc)| -> eyre::Result<()> { - wc.writer + .map(|(_, writer)| -> eyre::Result<()> { + writer .finish() .context("Could not finish writing arrow file")?; Ok(()) }) .collect(); - result.context("One of the input recorder file writer failed to finish")?; + result.context("At least one of the input recorder file writer failed to finish")?; Ok(()) } /// Write a row of data into the writer fn write_event( - writer_context: &mut WriterContext, + writer: &mut FileWriter, data: Arc, metadata: &Metadata, ) -> eyre::Result<()> { @@ -116,7 +106,7 @@ fn write_event( let span_id_array = make_array(span_id_array.into()); let record = RecordBatch::try_new( - writer_context.schema.clone(), + writer.schema().clone(), vec![ trace_id_array, span_id_array, @@ -125,8 +115,7 @@ fn write_event( ], ) .context("Could not create record batch with the given data")?; - writer_context - .writer + writer .write(&record) .context("Could not write recordbatch to file")?; From de3d07acd8f2410317b4b67219a77741acb27214 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Fri, 27 Oct 2023 11:23:31 +0200 Subject: [PATCH 6/9] Remove recording ticker and use debug instead of release --- examples/python-operator-dataflow/dataflow.yml | 2 +- examples/rust-dataflow/dataflow.yml | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/examples/python-operator-dataflow/dataflow.yml b/examples/python-operator-dataflow/dataflow.yml index fb08e5eba..72630d27b 100644 --- a/examples/python-operator-dataflow/dataflow.yml +++ b/examples/python-operator-dataflow/dataflow.yml @@ -24,7 +24,7 @@ nodes: - id: dora-record custom: build: cargo build -p dora-record - source: ../../target/release/dora-record + source: ../../target/debug/dora-record inputs: image: webcam/image bbox: object_detection/bbox diff --git a/examples/rust-dataflow/dataflow.yml b/examples/rust-dataflow/dataflow.yml index f19ed5169..92d607349 100644 --- a/examples/rust-dataflow/dataflow.yml +++ b/examples/rust-dataflow/dataflow.yml @@ -29,5 +29,4 @@ nodes: source: ../../target/debug/dora-record inputs: message: runtime-node/rust-operator/status - tick: dora/timer/millis/100 random: rust-node/random \ No newline at end of file From 1413d540cbfa38bd49bece03fefb8b6e8b0452b2 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Fri, 27 Oct 2023 12:27:57 +0200 Subject: [PATCH 7/9] Adding comment on Datetime format and adding uhlc for log & replay in future uhlc is necessary to keep the ordering of messages. --- libraries/extensions/dora-record/src/main.rs | 24 +++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/libraries/extensions/dora-record/src/main.rs b/libraries/extensions/dora-record/src/main.rs index 34b911359..d9a5766cc 100644 --- a/libraries/extensions/dora-record/src/main.rs +++ b/libraries/extensions/dora-record/src/main.rs @@ -2,7 +2,7 @@ use chrono::{DateTime, Utc}; use dora_node_api::{ self, arrow::{ - array::{make_array, Array, Int64Array, ListArray, StringArray}, + array::{make_array, Array, Int64Array, ListArray, StringArray, UInt64Array}, buffer::{OffsetBuffer, ScalarBuffer}, datatypes::{DataType, Field, Schema}, ipc::writer::FileWriter, @@ -23,7 +23,11 @@ fn main() -> eyre::Result<()> { Event::Input { id, data, metadata } => { match writers.get_mut(&id) { None => { - let field_timestamp = Field::new("timestamp", DataType::Int64, true); + let field_uhlc = Field::new("timestamp_uhlc", DataType::UInt64, true); + // DateTime are kept as there as Int64 because there is an issue in `pyarrow` when + // reading pyarrow Date64 format. See: https://github.com/apache/arrow/issues/38488 + let field_utc_epoch = + Field::new("timestamp_utc_epoch", DataType::Int64, true); let field_trace_id = Field::new("trace_id", DataType::Utf8, true); let field_span_id = Field::new("span_id", DataType::Utf8, true); let field_values = @@ -33,7 +37,8 @@ fn main() -> eyre::Result<()> { let schema = Arc::new(Schema::new(vec![ field_trace_id, field_span_id, - field_timestamp, + field_uhlc, + field_utc_epoch, field_data, ])); let file = std::fs::File::create(format!("{id}.arrow")).unwrap(); @@ -83,11 +88,13 @@ fn write_event( let list = ListArray::new(field.clone(), offsets, data.clone(), None); let timestamp = metadata.timestamp(); - let timestamp = timestamp.get_time().to_system_time(); + let timestamp_uhlc = UInt64Array::from(vec![timestamp.get_time().0]); + let timestamp_uhlc = make_array(timestamp_uhlc.into()); + let system_time = timestamp.get_time().to_system_time(); - let dt: DateTime = timestamp.into(); - let timestamp_array = Int64Array::from(vec![dt.timestamp_millis()]); - let timestamp_array = make_array(timestamp_array.into()); + let dt: DateTime = system_time.into(); + let timestamp_utc = Int64Array::from(vec![dt.timestamp_millis()]); + let timestamp_utc = make_array(timestamp_utc.into()); let string_otel_context = metadata.parameters.open_telemetry_context.to_string(); let otel_context = deserialize_to_hashmap(&string_otel_context); @@ -110,7 +117,8 @@ fn write_event( vec![ trace_id_array, span_id_array, - timestamp_array, + timestamp_uhlc, + timestamp_utc, make_array(list.into()), ], ) From 659a3e5aaf3a89473669f6993654d99e87b92e92 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Fri, 27 Oct 2023 15:42:53 +0200 Subject: [PATCH 8/9] Use `timestamp[ms]` to represent time --- libraries/extensions/dora-record/src/main.rs | 21 +++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/libraries/extensions/dora-record/src/main.rs b/libraries/extensions/dora-record/src/main.rs index d9a5766cc..dde2f2048 100644 --- a/libraries/extensions/dora-record/src/main.rs +++ b/libraries/extensions/dora-record/src/main.rs @@ -2,7 +2,10 @@ use chrono::{DateTime, Utc}; use dora_node_api::{ self, arrow::{ - array::{make_array, Array, Int64Array, ListArray, StringArray, UInt64Array}, + array::{ + make_array, Array, Int64Array, ListArray, StringArray, Time32MillisecondArray, + TimestampMillisecondArray, TimestampSecondArray, UInt64Array, + }, buffer::{OffsetBuffer, ScalarBuffer}, datatypes::{DataType, Field, Schema}, ipc::writer::FileWriter, @@ -23,11 +26,15 @@ fn main() -> eyre::Result<()> { Event::Input { id, data, metadata } => { match writers.get_mut(&id) { None => { - let field_uhlc = Field::new("timestamp_uhlc", DataType::UInt64, true); - // DateTime are kept as there as Int64 because there is an issue in `pyarrow` when - // reading pyarrow Date64 format. See: https://github.com/apache/arrow/issues/38488 - let field_utc_epoch = - Field::new("timestamp_utc_epoch", DataType::Int64, true); + let field_uhlc = Field::new("timestamp_uhlc", DataType::UInt64, false); + let field_utc_epoch = Field::new( + "timestamp_utc", + DataType::Timestamp( + dora_node_api::arrow::datatypes::TimeUnit::Millisecond, + None, + ), + false, + ); let field_trace_id = Field::new("trace_id", DataType::Utf8, true); let field_span_id = Field::new("span_id", DataType::Utf8, true); let field_values = @@ -93,7 +100,7 @@ fn write_event( let system_time = timestamp.get_time().to_system_time(); let dt: DateTime = system_time.into(); - let timestamp_utc = Int64Array::from(vec![dt.timestamp_millis()]); + let timestamp_utc = TimestampMillisecondArray::from(vec![dt.timestamp_millis()]); let timestamp_utc = make_array(timestamp_utc.into()); let string_otel_context = metadata.parameters.open_telemetry_context.to_string(); From e8f658f9cf3d8bbc25226dc80971c2520beb4392 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Tue, 31 Oct 2023 11:01:55 +0100 Subject: [PATCH 9/9] Fixing clippy warnings --- libraries/extensions/dora-record/src/main.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/libraries/extensions/dora-record/src/main.rs b/libraries/extensions/dora-record/src/main.rs index dde2f2048..2924fc4d0 100644 --- a/libraries/extensions/dora-record/src/main.rs +++ b/libraries/extensions/dora-record/src/main.rs @@ -3,8 +3,7 @@ use dora_node_api::{ self, arrow::{ array::{ - make_array, Array, Int64Array, ListArray, StringArray, Time32MillisecondArray, - TimestampMillisecondArray, TimestampSecondArray, UInt64Array, + make_array, Array, ListArray, StringArray, TimestampMillisecondArray, UInt64Array, }, buffer::{OffsetBuffer, ScalarBuffer}, datatypes::{DataType, Field, Schema}, @@ -92,7 +91,7 @@ fn write_event( ) -> eyre::Result<()> { let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, data.len() as i32])); let field = Arc::new(Field::new("item", data.data_type().clone(), true)); - let list = ListArray::new(field.clone(), offsets, data.clone(), None); + let list = ListArray::new(field, offsets, data.clone(), None); let timestamp = metadata.timestamp(); let timestamp_uhlc = UInt64Array::from(vec![timestamp.get_time().0]); @@ -105,14 +104,14 @@ fn write_event( let string_otel_context = metadata.parameters.open_telemetry_context.to_string(); let otel_context = deserialize_to_hashmap(&string_otel_context); - let traceparent = otel_context.get("traceparent").clone(); + let traceparent = otel_context.get("traceparent"); let trace_id = match traceparent { None => "", - Some(trace) => trace.split("-").nth(1).context("Trace is malformatted")?, + Some(trace) => trace.split('-').nth(1).context("Trace is malformatted")?, }; let span_id = match traceparent { None => "", - Some(trace) => trace.split("-").nth(2).context("Trace is malformatted")?, + Some(trace) => trace.split('-').nth(2).context("Trace is malformatted")?, }; let trace_id_array = StringArray::from(vec![trace_id]); let trace_id_array = make_array(trace_id_array.into());