From b8957bc4b523521a9c685492dc4f4d24bccdc1b3 Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 27 Jun 2024 17:28:38 -0700 Subject: [PATCH] Minor: move batch spilling methods to `lib.rs` to make it reusable --- .../physical-plan/src/aggregates/row_hash.rs | 6 +- datafusion/physical-plan/src/lib.rs | 64 ++++++++++++- datafusion/physical-plan/src/sorts/sort.rs | 95 +++---------------- 3 files changed, 78 insertions(+), 87 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index ad0860b93a3a..27577e6c8bf8 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -29,10 +29,10 @@ use crate::aggregates::{ }; use crate::common::IPCWriter; use crate::metrics::{BaselineMetrics, RecordOutput}; -use crate::sorts::sort::{read_spill_as_stream, sort_batch}; +use crate::sorts::sort::sort_batch; use crate::sorts::streaming_merge; use crate::stream::RecordBatchStreamAdapter; -use crate::{aggregates, ExecutionPlan, PhysicalExpr}; +use crate::{aggregates, read_spill_as_stream, ExecutionPlan, PhysicalExpr}; use crate::{RecordBatchStream, SendableRecordBatchStream}; use arrow::array::*; @@ -752,7 +752,7 @@ impl GroupedHashAggregateStream { })), ))); for spill in self.spill_state.spills.drain(..) { - let stream = read_spill_as_stream(spill, schema.clone())?; + let stream = read_spill_as_stream(spill, schema.clone(), 2)?; streams.push(stream); } self.spill_state.is_stream_merging = true; diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index c648547c98b1..aef5b307968c 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -19,6 +19,9 @@ use std::any::Any; use std::fmt::Debug; +use std::fs::File; +use std::io::BufReader; +use std::path::{Path, PathBuf}; use std::sync::Arc; use crate::coalesce_partitions::CoalescePartitionsExec; @@ -28,15 +31,18 @@ use crate::repartition::RepartitionExec; use crate::sorts::sort_preserving_merge::SortPreservingMergeExec; use arrow::datatypes::SchemaRef; +use arrow::ipc::reader::FileReader; use arrow::record_batch::RecordBatch; use datafusion_common::config::ConfigOptions; -use datafusion_common::Result; +use datafusion_common::{exec_datafusion_err, Result}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{ EquivalenceProperties, LexOrdering, PhysicalSortExpr, PhysicalSortRequirement, }; use futures::stream::TryStreamExt; +use log::debug; +use tokio::sync::mpsc::Sender; use tokio::task::JoinSet; mod ordering; @@ -87,8 +93,13 @@ pub use datafusion_physical_expr::{ }; // Backwards compatibility +use crate::common::IPCWriter; pub use crate::stream::EmptyRecordBatchStream; +use crate::stream::RecordBatchReceiverStream; +use datafusion_execution::disk_manager::RefCountedTempFile; +use datafusion_execution::memory_pool::human_readable_size; pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; + pub mod udaf { pub use datafusion_physical_expr_common::aggregate::{ create_aggregate_expr, AggregateFunctionExpr, @@ -799,6 +810,57 @@ pub fn get_plan_string(plan: &Arc) -> Vec { actual.iter().map(|elem| elem.to_string()).collect() } +/// Read spilled batches from the disk +/// +/// `path` - temp file +/// `schema` - batches schema, should be the same across batches +/// `buffer` - internal buffer of capacity batches +pub fn read_spill_as_stream( + path: RefCountedTempFile, + schema: SchemaRef, + buffer: usize, +) -> Result { + let mut builder = RecordBatchReceiverStream::builder(schema, buffer); + let sender = builder.tx(); + + builder.spawn_blocking(move || read_spill(sender, path.path())); + + Ok(builder.build()) +} + +/// Spills in-memory `batches` to disk. +/// +/// Returns total number of the rows spilled to disk. +pub fn spill_record_batches( + batches: Vec, + path: PathBuf, + schema: SchemaRef, +) -> Result { + let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?; + for batch in batches { + writer.write(&batch)?; + } + writer.finish()?; + debug!( + "Spilled {} batches of total {} rows to disk, memory released {}", + writer.num_batches, + writer.num_rows, + human_readable_size(writer.num_bytes), + ); + Ok(writer.num_rows) +} + +fn read_spill(sender: Sender>, path: &Path) -> Result<()> { + let file = BufReader::new(File::open(path)?); + let reader = FileReader::try_new(file, None)?; + for batch in reader { + sender + .blocking_send(batch.map_err(Into::into)) + .map_err(|e| exec_datafusion_err!("{e}"))?; + } + Ok(()) +} + #[cfg(test)] mod tests { use std::any::Any; diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 2a4862534590..47901591c8c9 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -22,45 +22,38 @@ use std::any::Any; use std::fmt; use std::fmt::{Debug, Formatter}; -use std::fs::File; -use std::io::BufReader; -use std::path::{Path, PathBuf}; use std::sync::Arc; -use crate::common::{spawn_buffered, IPCWriter}; +use crate::common::spawn_buffered; use crate::expressions::PhysicalSortExpr; use crate::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, }; use crate::sorts::streaming_merge::streaming_merge; -use crate::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter}; +use crate::stream::RecordBatchStreamAdapter; use crate::topk::TopK; use crate::{ - DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionMode, - ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties, - SendableRecordBatchStream, Statistics, + read_spill_as_stream, spill_record_batches, DisplayAs, DisplayFormatType, + Distribution, EmptyRecordBatchStream, ExecutionMode, ExecutionPlan, + ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream, + Statistics, }; use arrow::compute::{concat_batches, lexsort_to_indices, take, SortColumn}; use arrow::datatypes::SchemaRef; -use arrow::ipc::reader::FileReader; use arrow::record_batch::RecordBatch; use arrow::row::{RowConverter, SortField}; use arrow_array::{Array, RecordBatchOptions, UInt32Array}; use arrow_schema::DataType; -use datafusion_common::{exec_err, DataFusionError, Result}; -use datafusion_common_runtime::SpawnedTask; +use datafusion_common::{DataFusionError, Result}; use datafusion_execution::disk_manager::RefCountedTempFile; -use datafusion_execution::memory_pool::{ - human_readable_size, MemoryConsumer, MemoryReservation, -}; +use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; use datafusion_physical_expr::LexOrdering; use futures::{StreamExt, TryStreamExt}; -use log::{debug, error, trace}; -use tokio::sync::mpsc::Sender; +use log::{debug, trace}; struct ExternalSorterMetrics { /// metrics @@ -345,7 +338,7 @@ impl ExternalSorter { spill.path() ))); } - let stream = read_spill_as_stream(spill, self.schema.clone())?; + let stream = read_spill_as_stream(spill, self.schema.clone(), 2)?; streams.push(stream); } @@ -402,7 +395,7 @@ impl ExternalSorter { let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?; let batches = std::mem::take(&mut self.in_mem_batches); let spilled_rows = - spill_sorted_batches(batches, spill_file.path(), self.schema.clone()).await?; + spill_record_batches(batches, spill_file.path().into(), self.schema.clone())?; let used = self.reservation.free(); self.metrics.spill_count.add(1); self.metrics.spilled_bytes.add(used); @@ -667,70 +660,6 @@ pub(crate) fn lexsort_to_indices_multi_columns( Ok(indices) } -/// Spills sorted `in_memory_batches` to disk. -/// -/// Returns number of the rows spilled to disk. -async fn spill_sorted_batches( - batches: Vec, - path: &Path, - schema: SchemaRef, -) -> Result { - let path: PathBuf = path.into(); - let task = SpawnedTask::spawn_blocking(move || write_sorted(batches, path, schema)); - match task.join().await { - Ok(r) => r, - Err(e) => exec_err!("Error occurred while spilling {e}"), - } -} - -pub(crate) fn read_spill_as_stream( - path: RefCountedTempFile, - schema: SchemaRef, -) -> Result { - let mut builder = RecordBatchReceiverStream::builder(schema, 2); - let sender = builder.tx(); - - builder.spawn_blocking(move || { - let result = read_spill(sender, path.path()); - if let Err(e) = &result { - error!("Failure while reading spill file: {:?}. Error: {}", path, e); - } - result - }); - - Ok(builder.build()) -} - -fn write_sorted( - batches: Vec, - path: PathBuf, - schema: SchemaRef, -) -> Result { - let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?; - for batch in batches { - writer.write(&batch)?; - } - writer.finish()?; - debug!( - "Spilled {} batches of total {} rows to disk, memory released {}", - writer.num_batches, - writer.num_rows, - human_readable_size(writer.num_bytes), - ); - Ok(writer.num_rows) -} - -fn read_spill(sender: Sender>, path: &Path) -> Result<()> { - let file = BufReader::new(File::open(path)?); - let reader = FileReader::try_new(file, None)?; - for batch in reader { - sender - .blocking_send(batch.map_err(Into::into)) - .map_err(|e| DataFusionError::Execution(format!("{e}")))?; - } - Ok(()) -} - /// Sort execution plan. /// /// Support sorting datasets that are larger than the memory allotted @@ -776,7 +705,7 @@ impl SortExec { /// Specify the partitioning behavior of this sort exec /// /// If `preserve_partitioning` is true, sorts each partition - /// individually, producing one sorted strema for each input partition. + /// individually, producing one sorted stream for each input partition. /// /// If `preserve_partitioning` is false, sorts and merges all /// input partitions producing a single, sorted partition.