diff --git a/Cargo.lock b/Cargo.lock index a1dfd6061a..a1f80e0376 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -580,9 +580,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1aeb932158bd710538c73702db6945cb68a8fb08c519e6e12706b94263b36db8" +checksum = "fd9de9f2205d5ef3fd67e685b0df337994ddd4495e2a28d185500d0e1edfea47" dependencies = [ "jobserver", "libc", @@ -701,9 +701,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" +checksum = "afb84c814227b90d6895e01398aee0d8033c00e7466aca416fb6a8e0eb19d8a7" [[package]] name = "colorchoice" @@ -713,13 +713,13 @@ checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" [[package]] name = "comfy-table" -version = "7.1.1" +version = "7.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b34115915337defe99b2aff5c2ce6771e5fbc4079f4b506301f5cf394c8452f7" +checksum = "24f165e7b643266ea80cb858aed492ad9280e3e05ce24d4a99d7d7b889b6a4d9" dependencies = [ "strum", "strum_macros", - "unicode-width 0.1.14", + "unicode-width 0.2.0", ] [[package]] @@ -1666,9 +1666,9 @@ dependencies = [ [[package]] name = "flate2" -version = "1.0.34" +version = "1.0.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1b589b4dc103969ad3cf85c950899926ec64300a1a46d76c03a6072957036f0" +checksum = "c936bfdafb507ebbf50b8074c54fa31c5be9a1e7e5f467dd659697041407d07c" dependencies = [ "crc32fast", "miniz_oxide", @@ -3363,9 +3363,9 @@ dependencies = [ [[package]] name = "quinn" -version = "0.11.5" +version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c7c5fdde3cdae7203427dc4f0a68fe0ed09833edc525a03456b153b79828684" +checksum = "62e96808277ec6f97351a2380e6c25114bc9e67037775464979f3037c92d05ef" dependencies = [ "bytes", "pin-project-lite", @@ -3374,26 +3374,29 @@ dependencies = [ "rustc-hash", "rustls", "socket2", - "thiserror 1.0.69", + "thiserror 2.0.3", "tokio", "tracing", ] [[package]] name = "quinn-proto" -version = "0.11.8" +version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fadfaed2cd7f389d0161bb73eeb07b7b78f8691047a6f3e73caaeae55310a4a6" +checksum = "a2fe5ef3495d7d2e377ff17b1a8ce2ee2ec2a18cde8b6ad6619d65d0701c135d" dependencies = [ "bytes", + "getrandom", "rand", "ring", "rustc-hash", "rustls", + "rustls-pki-types", "slab", - "thiserror 1.0.69", + "thiserror 2.0.3", "tinyvec", "tracing", + "web-time", ] [[package]] @@ -3701,6 +3704,9 @@ name = "rustls-pki-types" version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b" +dependencies = [ + "web-time", +] [[package]] name = "rustls-webpki" @@ -5148,7 +5154,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/bench-vortex/benches/tpch.rs b/bench-vortex/benches/tpch.rs index 1b5ae80219..500dfde381 100644 --- a/bench-vortex/benches/tpch.rs +++ b/bench-vortex/benches/tpch.rs @@ -4,7 +4,11 @@ use criterion::{criterion_group, criterion_main, Criterion}; use tokio::runtime::Builder; fn benchmark(c: &mut Criterion) { - let runtime = Builder::new_current_thread().enable_all().build().unwrap(); + let runtime = Builder::new_current_thread() + .thread_name("benchmark-tpch") + .enable_all() + .build() + .unwrap(); // Run TPC-H data gen. let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap(); diff --git a/bench-vortex/src/reader.rs b/bench-vortex/src/reader.rs index 4d270f6e8a..601e9a5250 100644 --- a/bench-vortex/src/reader.rs +++ b/bench-vortex/src/reader.rs @@ -2,7 +2,7 @@ use std::fs::File; use std::ops::Range; use std::path::{Path, PathBuf}; use std::process::Command; -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use arrow_array::types::Int64Type; use arrow_array::{ @@ -27,11 +27,16 @@ use vortex::arrow::FromArrowType; use vortex::compress::CompressionStrategy; use vortex::dtype::DType; use vortex::error::VortexResult; -use vortex::file::{LayoutContext, LayoutDeserializer, VortexFileWriter, VortexReadBuilder}; +use vortex::file::{ + IoDispatcher, LayoutContext, LayoutDeserializer, VortexFileWriter, VortexReadBuilder, +}; use vortex::io::{ObjectStoreReadAt, TokioFile, VortexReadAt, VortexWrite}; use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT}; use vortex::{Array, IntoArray, IntoCanonical}; +static DISPATCHER: LazyLock> = + LazyLock::new(|| Arc::new(IoDispatcher::new_tokio(1))); + pub const BATCH_SIZE: usize = 65_536; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -51,6 +56,7 @@ pub async fn open_vortex(path: &Path) -> VortexResult { LayoutContext::default().into(), ), ) + .with_io_dispatcher(DISPATCHER.clone()) .build() .await? .read_all() diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 7cf1e7d6c1..5e8cc1f71a 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use arrow_array::RecordBatch; use arrow_schema::SchemaRef; @@ -9,9 +9,15 @@ use futures::{FutureExt as _, StreamExt, TryStreamExt}; use object_store::ObjectStore; use vortex_array::Context; use vortex_expr::datafusion::convert_expr_to_vortex; -use vortex_file::{LayoutContext, LayoutDeserializer, Projection, RowFilter, VortexReadBuilder}; +use vortex_file::{ + IoDispatcher, LayoutContext, LayoutDeserializer, Projection, RowFilter, VortexReadBuilder, +}; use vortex_io::ObjectStoreReadAt; +/// Share an IO dispatcher across all DataFusion instances. +static IO_DISPATCHER: LazyLock> = + LazyLock::new(|| Arc::new(IoDispatcher::new_tokio(1))); + pub struct VortexFileOpener { pub ctx: Arc, pub object_store: Arc, @@ -28,7 +34,8 @@ impl FileOpener for VortexFileOpener { let mut builder = VortexReadBuilder::new( read_at, LayoutDeserializer::new(self.ctx.clone(), Arc::new(LayoutContext::default())), - ); + ) + .with_io_dispatcher(IO_DISPATCHER.clone()); let row_filter = self .predicate diff --git a/vortex-file/src/dispatcher/tokio.rs b/vortex-file/src/dispatcher/tokio.rs index 1f03f03cb0..aa33fad088 100644 --- a/vortex-file/src/dispatcher/tokio.rs +++ b/vortex-file/src/dispatcher/tokio.rs @@ -26,13 +26,13 @@ impl TokioDispatcher { let threads: Vec<_> = (0..num_threads) .map(|tid| { let worker_thread = std::thread::Builder::new(); - let worker_thread = worker_thread.name(format!("tokio-dispatch-{tid}")); let rx: flume::Receiver> = rx.clone(); worker_thread .spawn(move || { // Create a runtime-per-thread let rt = tokio::runtime::Builder::new_current_thread() + .thread_name(format!("tokio-dispatch-{tid}")) .enable_all() .build() .unwrap_or_else(|e| { diff --git a/vortex-file/src/read/builder/mod.rs b/vortex-file/src/read/builder/mod.rs index 706c1f0dfb..c9f2f12553 100644 --- a/vortex-file/src/read/builder/mod.rs +++ b/vortex-file/src/read/builder/mod.rs @@ -71,7 +71,7 @@ pub struct VortexReadBuilder { size: Option, row_mask: Option, row_filter: Option, - io_dispatcher: Option, + io_dispatcher: Option>, } impl VortexReadBuilder { @@ -112,7 +112,7 @@ impl VortexReadBuilder { self } - pub fn with_io_dispatcher(mut self, dispatcher: IoDispatcher) -> Self { + pub fn with_io_dispatcher(mut self, dispatcher: Arc) -> Self { self.io_dispatcher = Some(dispatcher); self } @@ -176,7 +176,7 @@ impl VortexReadBuilder { // Default: fallback to single-threaded tokio dispatcher. let io_dispatcher = self .io_dispatcher - .unwrap_or_else(|| IoDispatcher::new_tokio(1)); + .unwrap_or_else(|| Arc::new(IoDispatcher::new_tokio(1))); Ok(VortexFileArrayStream::new( self.read_at, diff --git a/vortex-file/src/read/stream.rs b/vortex-file/src/read/stream.rs index feb54a4115..2e3e1ad3e1 100644 --- a/vortex-file/src/read/stream.rs +++ b/vortex-file/src/read/stream.rs @@ -35,7 +35,7 @@ pub struct VortexFileArrayStream { messages_cache: Arc>, state: Option, input: R, - dispatcher: IoDispatcher, + dispatcher: Arc, } impl VortexFileArrayStream { @@ -48,7 +48,7 @@ impl VortexFileArrayStream { dtype: DType, row_count: u64, row_mask: Option, - dispatcher: IoDispatcher, + dispatcher: Arc, ) -> Self { let mask_iterator = if let Some(fr) = filter_reader { Box::new(FilteringRowSplitIterator::new(fr, row_count, row_mask)) as MaskIteratorRef