From 303687178dc1f6aef86f366d3b14167b4a8ad29d Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Sat, 2 Mar 2024 09:12:31 +0100 Subject: [PATCH 1/3] Use Async Parquet Writer for `dora-record` This commit makes it possible to: - Concurrently write records of data - Use Apache Parquet Compression to reduce storage size to avoid slow transfer speed. --- Cargo.lock | 184 +++++++++++++++++-- libraries/extensions/dora-record/Cargo.toml | 2 + libraries/extensions/dora-record/src/main.rs | 86 ++++++--- 3 files changed, 228 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 19f5f4d78..191d16e9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -60,6 +60,21 @@ dependencies = [ "serde", ] +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + [[package]] name = "android-tzdata" version = "0.1.1" @@ -759,6 +774,27 @@ dependencies = [ "tracing", ] +[[package]] +name = "brotli" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "516074a47ef4bce09577a3b379392300159ce5b1ba2e501ff1c819950066100f" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "2.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e2e4afe60d7dd600fdd3de8d0f08c2b7ec039712e3b6137ff98b7004e82de4f" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bstr" version = "1.6.2" @@ -1166,7 +1202,7 @@ dependencies = [ "bitflags 1.3.2", "crossterm_winapi", "libc", - "mio 0.8.8", + "mio 0.8.10", "parking_lot", "signal-hook", "signal-hook-mio", @@ -1682,6 +1718,8 @@ dependencies = [ "dora-node-api", "dora-tracing", "eyre", + "parquet", + "tokio", ] [[package]] @@ -3034,6 +3072,15 @@ dependencies = [ "value-bag", ] +[[package]] +name = "lz4_flex" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "912b45c753ff5f7f5208307e8ace7d2a2e30d024e26d3509f3dce546c044ce15" +dependencies = [ + "twox-hash", +] + [[package]] name = "macro_rules_attribute" version = "0.1.3" @@ -3164,9 +3211,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.8" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" +checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09" dependencies = [ "libc", "log", @@ -3360,7 +3407,7 @@ dependencies = [ "inotify", "kqueue", "libc", - "mio 0.8.8", + "mio 0.8.10", "walkdir", "windows-sys 0.45.0", ] @@ -3622,7 +3669,7 @@ dependencies = [ "opentelemetry 0.18.0", "opentelemetry-semantic-conventions 0.10.0", "thiserror", - "thrift", + "thrift 0.16.0", "tokio", ] @@ -3764,6 +3811,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-float" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" +dependencies = [ + "num-traits", +] + [[package]] name = "ordered-float" version = "3.9.1" @@ -3823,6 +3879,39 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "parquet" +version = "48.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "239229e6a668ab50c61de3dce61cf0fa1069345f7aa0f4c934491f92205a4945" +dependencies = [ + "ahash", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-ipc", + "arrow-schema", + "arrow-select", + "base64 0.21.4", + "brotli", + "bytes", + "chrono", + "flate2", + "futures", + "hashbrown 0.14.3", + "lz4_flex", + "num", + "num-bigint", + "paste", + "seq-macro", + "snap", + "thrift 0.17.0", + "tokio", + "twox-hash", + "zstd", +] + [[package]] name = "paste" version = "1.0.14" @@ -4706,7 +4795,7 @@ dependencies = [ "log", "md5", "mio 0.6.23", - "mio 0.8.8", + "mio 0.8.10", "mio-extras", "num-derive", "num-traits", @@ -4714,7 +4803,7 @@ dependencies = [ "rand", "serde", "serde_repr", - "socket2 0.5.4", + "socket2 0.5.6", "socketpair", "speedy", "static_assertions", @@ -4921,6 +5010,12 @@ version = "1.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad977052201c6de01a8ef2aa3378c4bd23217a056337d1d6da40468d267a4fb0" +[[package]] +name = "seq-macro" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" + [[package]] name = "serde" version = "1.0.195" @@ -5130,7 +5225,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "29ad2e15f37ec9a6cc544097b78a1ec90001e9f71b81338ca39f430adaca99af" dependencies = [ "libc", - "mio 0.8.8", + "mio 0.8.10", "signal-hook", ] @@ -5168,6 +5263,12 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" +[[package]] +name = "snap" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" + [[package]] name = "socket2" version = "0.4.9" @@ -5180,12 +5281,12 @@ dependencies = [ [[package]] name = "socket2" -version = "0.5.4" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e" +checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871" dependencies = [ "libc", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -5473,6 +5574,17 @@ dependencies = [ "threadpool", ] +[[package]] +name = "thrift" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" +dependencies = [ + "byteorder", + "integer-encoding", + "ordered-float 2.10.1", +] + [[package]] name = "time" version = "0.3.29" @@ -5527,19 +5639,19 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.32.0" +version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" +checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" dependencies = [ "backtrace", "bytes", "libc", - "mio 0.8.8", + "mio 0.8.10", "num_cpus", "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.4", + "socket2 0.5.6", "tokio-macros", "windows-sys 0.48.0", ] @@ -5556,9 +5668,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.1.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", @@ -5741,6 +5853,16 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" +[[package]] +name = "twox-hash" +version = "1.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" +dependencies = [ + "cfg-if 1.0.0", + "static_assertions", +] + [[package]] name = "typenum" version = "1.17.0" @@ -6930,3 +7052,31 @@ name = "zeroize" version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9" + +[[package]] +name = "zstd" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bffb3309596d527cfcba7dfc6ed6052f1d39dfbd7c867aa2e865e4a449c10110" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43747c7422e2924c11144d5229878b98180ef8b06cca4ab5af37afc8a8d8ea3e" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.9+zstd.1.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e16efa8a874a0481a574084d34cc26fdb3b99627480f785888deb6386506656" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/libraries/extensions/dora-record/Cargo.toml b/libraries/extensions/dora-record/Cargo.toml index 1d8dbe730..9d7077169 100644 --- a/libraries/extensions/dora-record/Cargo.toml +++ b/libraries/extensions/dora-record/Cargo.toml @@ -9,7 +9,9 @@ license.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +tokio = { version = "1.36.0", features = ["fs", "rt", "rt-multi-thread"] } dora-node-api = { workspace = true, features = ["tracing"] } eyre = "0.6.8" chrono = "0.4.31" dora-tracing = { workspace = true } +parquet = { version = "48.0.0", features = ["async"] } diff --git a/libraries/extensions/dora-record/src/main.rs b/libraries/extensions/dora-record/src/main.rs index a67b8f28e..bb52dc350 100644 --- a/libraries/extensions/dora-record/src/main.rs +++ b/libraries/extensions/dora-record/src/main.rs @@ -7,23 +7,26 @@ use dora_node_api::{ }, buffer::{OffsetBuffer, ScalarBuffer}, datatypes::{DataType, Field, Schema}, - ipc::writer::StreamWriter, record_batch::RecordBatch, }, DoraNode, Event, Metadata, }; use dora_tracing::telemetry::deserialize_to_hashmap; use eyre::{Context, ContextCompat}; -use std::{collections::HashMap, fs::File, path::PathBuf, sync::Arc}; +use parquet::{arrow::AsyncArrowWriter, basic::BrotliLevel, file::properties::WriterProperties}; +use std::{collections::HashMap, path::PathBuf, sync::Arc}; +use tokio::sync::mpsc; -fn main() -> eyre::Result<()> { +#[tokio::main] +async fn main() -> eyre::Result<()> { let (node, mut events) = DoraNode::init_from_env()?; let dataflow_id = node.dataflow_id(); let mut writers = HashMap::new(); + while let Some(event) = events.recv() { match event { Event::Input { id, data, metadata } => { - match writers.get_mut(&id) { + match writers.get(&id) { None => { let field_uhlc = Field::new("timestamp_uhlc", DataType::UInt64, false); let field_utc_epoch = Field::new( @@ -52,46 +55,74 @@ fn main() -> eyre::Result<()> { std::fs::create_dir_all(&dataflow_dir) .context("could not create dataflow_dir")?; } - let file = std::fs::File::create(dataflow_dir.join(format!("{id}.arrow"))) - .context("Couldn't create write file")?; + let file = + tokio::fs::File::create(dataflow_dir.join(format!("{id}.parquet"))) + .await + .context("Couldn't create write file")?; + let mut writer = AsyncArrowWriter::try_new( + file, + schema.clone(), + 0, + Some( + WriterProperties::builder() + .set_compression(parquet::basic::Compression::BROTLI( + BrotliLevel::default(), + )) + .build(), + ), + ) + .context("Could not create parquet writer")?; + let (tx, mut rx) = mpsc::channel(10); - let writer = StreamWriter::try_new(file, &schema).unwrap(); - let mut writer = writer; - write_event(&mut writer, data.into(), &metadata, schema.clone()) - .context("could not write first record data")?; - writers.insert(id.clone(), (writer, schema)); + // Per Input thread + let join_handle = tokio::spawn(async move { + while let Some((data, metadata)) = rx.recv().await { + match write_event(&mut writer, data, &metadata, schema.clone()) + .await + { + Err(e) => println!( + "Error writing event data into parquet file: {:?}", + e + ), + _ => (), + }; + } + writer.close() + }); + writers.insert(id, (tx, join_handle)); } - Some((writer, schema)) => { - write_event(writer, data.into(), &metadata, schema.clone()) - .context("could not write record data")?; + Some((tx, _)) => { + tx.send((data.into(), metadata)) + .await + .context("Could not send event data into writer loop")?; } }; } Event::InputClosed { id } => match writers.remove(&id) { None => {} - Some((mut writer, _)) => writer.finish().context("Could not finish arrow file")?, + Some(tx) => drop(tx), }, _ => {} } } - let result: eyre::Result> = writers - .iter_mut() - .map(|(_, (writer, _))| -> eyre::Result<()> { - writer - .finish() - .context("Could not finish writing arrow file")?; - Ok(()) - }) - .collect(); - result.context("At least one of the input recorder file writer failed to finish")?; + for (id, (tx, join_handle)) in writers { + drop(tx); + join_handle + .await + .context("Writer thread failed")? + .await + .context(format!( + "Could not close the Parquet writer for {id} parquet writer" + ))?; + } Ok(()) } /// Write a row of data into the writer -fn write_event( - writer: &mut StreamWriter, +async fn write_event( + writer: &mut AsyncArrowWriter, data: Arc, metadata: &Metadata, schema: Arc, @@ -138,6 +169,7 @@ fn write_event( .context("Could not create record batch with the given data")?; writer .write(&record) + .await .context("Could not write recordbatch to file")?; Ok(()) From eb12eb1c8fb28e38b3c4fe2887c2a07e11ef3a9f Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Sat, 2 Mar 2024 17:15:13 +0100 Subject: [PATCH 2/3] Fix `clippy` error within `dora-record` within the writing loop --- libraries/extensions/dora-record/src/main.rs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/libraries/extensions/dora-record/src/main.rs b/libraries/extensions/dora-record/src/main.rs index bb52dc350..17ff83453 100644 --- a/libraries/extensions/dora-record/src/main.rs +++ b/libraries/extensions/dora-record/src/main.rs @@ -77,17 +77,13 @@ async fn main() -> eyre::Result<()> { // Per Input thread let join_handle = tokio::spawn(async move { while let Some((data, metadata)) = rx.recv().await { - match write_event(&mut writer, data, &metadata, schema.clone()) - .await + if let Err(e) = + write_event(&mut writer, data, &metadata, schema.clone()).await { - Err(e) => println!( - "Error writing event data into parquet file: {:?}", - e - ), - _ => (), + println!("Error writing event data into parquet file: {:?}", e) }; } - writer.close() + writer.close().await }); writers.insert(id, (tx, join_handle)); } @@ -111,7 +107,6 @@ async fn main() -> eyre::Result<()> { join_handle .await .context("Writer thread failed")? - .await .context(format!( "Could not close the Parquet writer for {id} parquet writer" ))?; From f641f2cdc8fdbf6b706829d6f44b18396b5e27df Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 11 Mar 2024 20:36:21 +0100 Subject: [PATCH 3/3] Adding first row of data --- libraries/extensions/dora-record/src/main.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/libraries/extensions/dora-record/src/main.rs b/libraries/extensions/dora-record/src/main.rs index 17ff83453..fb10bf18e 100644 --- a/libraries/extensions/dora-record/src/main.rs +++ b/libraries/extensions/dora-record/src/main.rs @@ -85,6 +85,9 @@ async fn main() -> eyre::Result<()> { } writer.close().await }); + tx.send((data.into(), metadata)) + .await + .context("Could not send event data into writer loop")?; writers.insert(id, (tx, join_handle)); } Some((tx, _)) => {