Skip to content

Commit

Permalink
Reuse the IoDispatcher across DataFusion instances (#1299)
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn authored Nov 14, 2024
1 parent 5d72d84 commit 8484445
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 28 deletions.
38 changes: 22 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion bench-vortex/benches/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ use criterion::{criterion_group, criterion_main, Criterion};
use tokio::runtime::Builder;

fn benchmark(c: &mut Criterion) {
let runtime = Builder::new_current_thread().enable_all().build().unwrap();
let runtime = Builder::new_current_thread()
.thread_name("benchmark-tpch")
.enable_all()
.build()
.unwrap();

// Run TPC-H data gen.
let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap();
Expand Down
10 changes: 8 additions & 2 deletions bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::fs::File;
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::Arc;
use std::sync::{Arc, LazyLock};

use arrow_array::types::Int64Type;
use arrow_array::{
Expand All @@ -27,11 +27,16 @@ use vortex::arrow::FromArrowType;
use vortex::compress::CompressionStrategy;
use vortex::dtype::DType;
use vortex::error::VortexResult;
use vortex::file::{LayoutContext, LayoutDeserializer, VortexFileWriter, VortexReadBuilder};
use vortex::file::{
IoDispatcher, LayoutContext, LayoutDeserializer, VortexFileWriter, VortexReadBuilder,
};
use vortex::io::{ObjectStoreReadAt, TokioFile, VortexReadAt, VortexWrite};
use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT};
use vortex::{Array, IntoArray, IntoCanonical};

static DISPATCHER: LazyLock<Arc<IoDispatcher>> =
LazyLock::new(|| Arc::new(IoDispatcher::new_tokio(1)));

pub const BATCH_SIZE: usize = 65_536;

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -51,6 +56,7 @@ pub async fn open_vortex(path: &Path) -> VortexResult<Array> {
LayoutContext::default().into(),
),
)
.with_io_dispatcher(DISPATCHER.clone())
.build()
.await?
.read_all()
Expand Down
13 changes: 10 additions & 3 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::sync::{Arc, LazyLock};

use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
Expand All @@ -9,9 +9,15 @@ use futures::{FutureExt as _, StreamExt, TryStreamExt};
use object_store::ObjectStore;
use vortex_array::Context;
use vortex_expr::datafusion::convert_expr_to_vortex;
use vortex_file::{LayoutContext, LayoutDeserializer, Projection, RowFilter, VortexReadBuilder};
use vortex_file::{
IoDispatcher, LayoutContext, LayoutDeserializer, Projection, RowFilter, VortexReadBuilder,
};
use vortex_io::ObjectStoreReadAt;

/// Share an IO dispatcher across all DataFusion instances.
static IO_DISPATCHER: LazyLock<Arc<IoDispatcher>> =
LazyLock::new(|| Arc::new(IoDispatcher::new_tokio(1)));

pub struct VortexFileOpener {
pub ctx: Arc<Context>,
pub object_store: Arc<dyn ObjectStore>,
Expand All @@ -28,7 +34,8 @@ impl FileOpener for VortexFileOpener {
let mut builder = VortexReadBuilder::new(
read_at,
LayoutDeserializer::new(self.ctx.clone(), Arc::new(LayoutContext::default())),
);
)
.with_io_dispatcher(IO_DISPATCHER.clone());

let row_filter = self
.predicate
Expand Down
2 changes: 1 addition & 1 deletion vortex-file/src/dispatcher/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ impl TokioDispatcher {
let threads: Vec<_> = (0..num_threads)
.map(|tid| {
let worker_thread = std::thread::Builder::new();
let worker_thread = worker_thread.name(format!("tokio-dispatch-{tid}"));
let rx: flume::Receiver<Box<dyn TokioSpawn + Send>> = rx.clone();

worker_thread
.spawn(move || {
// Create a runtime-per-thread
let rt = tokio::runtime::Builder::new_current_thread()
.thread_name(format!("tokio-dispatch-{tid}"))
.enable_all()
.build()
.unwrap_or_else(|e| {
Expand Down
6 changes: 3 additions & 3 deletions vortex-file/src/read/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub struct VortexReadBuilder<R> {
size: Option<u64>,
row_mask: Option<Array>,
row_filter: Option<RowFilter>,
io_dispatcher: Option<IoDispatcher>,
io_dispatcher: Option<Arc<IoDispatcher>>,
}

impl<R: VortexReadAt> VortexReadBuilder<R> {
Expand Down Expand Up @@ -112,7 +112,7 @@ impl<R: VortexReadAt> VortexReadBuilder<R> {
self
}

pub fn with_io_dispatcher(mut self, dispatcher: IoDispatcher) -> Self {
pub fn with_io_dispatcher(mut self, dispatcher: Arc<IoDispatcher>) -> Self {
self.io_dispatcher = Some(dispatcher);
self
}
Expand Down Expand Up @@ -176,7 +176,7 @@ impl<R: VortexReadAt> VortexReadBuilder<R> {
// Default: fallback to single-threaded tokio dispatcher.
let io_dispatcher = self
.io_dispatcher
.unwrap_or_else(|| IoDispatcher::new_tokio(1));
.unwrap_or_else(|| Arc::new(IoDispatcher::new_tokio(1)));

Ok(VortexFileArrayStream::new(
self.read_at,
Expand Down
4 changes: 2 additions & 2 deletions vortex-file/src/read/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub struct VortexFileArrayStream<R> {
messages_cache: Arc<RwLock<LayoutMessageCache>>,
state: Option<StreamingState>,
input: R,
dispatcher: IoDispatcher,
dispatcher: Arc<IoDispatcher>,
}

impl<R: VortexReadAt> VortexFileArrayStream<R> {
Expand All @@ -48,7 +48,7 @@ impl<R: VortexReadAt> VortexFileArrayStream<R> {
dtype: DType,
row_count: u64,
row_mask: Option<RowMask>,
dispatcher: IoDispatcher,
dispatcher: Arc<IoDispatcher>,
) -> Self {
let mask_iterator = if let Some(fr) = filter_reader {
Box::new(FilteringRowSplitIterator::new(fr, row_count, row_mask)) as MaskIteratorRef
Expand Down

0 comments on commit 8484445

Please sign in to comment.