diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index dc736993a4533..62b3143eb1560 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -70,6 +70,7 @@ pub mod projection; pub mod recursive_query; pub mod repartition; pub mod sorts; +pub mod spill; pub mod stream; pub mod streaming; pub mod tree_node; @@ -98,8 +99,6 @@ pub use datafusion_physical_expr::{ use crate::common::IPCWriter; pub use crate::stream::EmptyRecordBatchStream; use crate::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter}; -use datafusion_execution::disk_manager::RefCountedTempFile; -use datafusion_execution::memory_pool::human_readable_size; pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; pub mod udaf { @@ -903,56 +902,6 @@ pub fn get_plan_string(plan: &Arc) -> Vec { actual.iter().map(|elem| elem.to_string()).collect() } -/// Read spilled batches from the disk -/// -/// `path` - temp file -/// `schema` - batches schema, should be the same across batches -/// `buffer` - internal buffer of capacity batches -pub fn read_spill_as_stream( - path: RefCountedTempFile, - schema: SchemaRef, - buffer: usize, -) -> Result { - let mut builder = RecordBatchReceiverStream::builder(schema, buffer); - let sender = builder.tx(); - - builder.spawn_blocking(move || read_spill(sender, path.path())); - - Ok(builder.build()) -} - -/// Spills in-memory `batches` to disk. -/// -/// Returns total number of the rows spilled to disk. -pub fn spill_record_batches( - batches: Vec, - path: PathBuf, - schema: SchemaRef, -) -> Result { - let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?; - for batch in batches { - writer.write(&batch)?; - } - writer.finish()?; - debug!( - "Spilled {} batches of total {} rows to disk, memory released {}", - writer.num_batches, - writer.num_rows, - human_readable_size(writer.num_bytes), - ); - Ok(writer.num_rows) -} - -fn read_spill(sender: Sender>, path: &Path) -> Result<()> { - let file = BufReader::new(File::open(path)?); - let reader = FileReader::try_new(file, None)?; - for batch in reader { - sender - .blocking_send(batch.map_err(Into::into)) - .map_err(|e| exec_datafusion_err!("{e}"))?; - } - Ok(()) -} #[cfg(test)] mod tests { diff --git a/datafusion/physical-plan/src/spill.rs b/datafusion/physical-plan/src/spill.rs new file mode 100644 index 0000000000000..7b1b9ae351243 --- /dev/null +++ b/datafusion/physical-plan/src/spill.rs @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines the spilling functions + +use std::any::Any; +use std::fmt::Debug; +use std::fs::File; +use std::io::BufReader; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use arrow::datatypes::SchemaRef; +use arrow::ipc::reader::FileReader; +use arrow::record_batch::RecordBatch; +use futures::stream::{StreamExt, TryStreamExt}; +use log::debug; +use tokio::sync::mpsc::Sender; +use tokio::task::JoinSet; + +use datafusion_common::{exec_datafusion_err, exec_err, Result}; +use datafusion_common::{ColumnStatistics, internal_err, Statistics}; +use datafusion_common::config::ConfigOptions; +use datafusion_common::hash_utils; +use datafusion_common::utils::project_schema; +use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; +use datafusion_execution::disk_manager::RefCountedTempFile; +use datafusion_execution::memory_pool::human_readable_size; +use datafusion_execution::TaskContext; +use datafusion_expr::{Accumulator, ColumnarValue}; +use datafusion_physical_expr::{ + EquivalenceProperties, LexOrdering, PhysicalSortExpr, PhysicalSortRequirement, +}; +use datafusion_physical_expr::{ + AggregateExpr, Distribution, expressions, functions, Partitioning, PhysicalExpr, udf, +}; +use datafusion_physical_expr::window::WindowExpr; + +use crate::coalesce_partitions::CoalescePartitionsExec; +use crate::common::IPCWriter; +use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; +use crate::display::DisplayableExecutionPlan; +use crate::metrics::Metric; +use crate::metrics::MetricsSet; +use crate::ordering::InputOrderMode; +use crate::repartition::RepartitionExec; +use crate::sorts::sort_preserving_merge::SortPreservingMergeExec; +use crate::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter}; +use crate::stream::EmptyRecordBatchStream; +use crate::topk::TopK; +use crate::visitor::{accept, ExecutionPlanVisitor, visit_execution_plan}; + +/// Read spilled batches from the disk +/// +/// `path` - temp file +/// `schema` - batches schema, should be the same across batches +/// `buffer` - internal buffer of capacity batches +pub fn read_spill_as_stream( + path: RefCountedTempFile, + schema: SchemaRef, + buffer: usize, +) -> Result { + let mut builder = RecordBatchReceiverStream::builder(schema, buffer); + let sender = builder.tx(); + + builder.spawn_blocking(move || read_spill(sender, path.path())); + + Ok(builder.build()) +} + +/// Spills in-memory `batches` to disk. +/// +/// Returns total number of the rows spilled to disk. +pub fn spill_record_batches( + batches: Vec, + path: PathBuf, + schema: SchemaRef, +) -> Result { + let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?; + for batch in batches { + writer.write(&batch)?; + } + writer.finish()?; + debug!( + "Spilled {} batches of total {} rows to disk, memory released {}", + writer.num_batches, + writer.num_rows, + human_readable_size(writer.num_bytes), + ); + Ok(writer.num_rows) +} + +fn read_spill(sender: Sender>, path: &Path) -> Result<()> { + let file = BufReader::new(File::open(path)?); + let reader = FileReader::try_new(file, None)?; + for batch in reader { + sender + .blocking_send(batch.map_err(Into::into)) + .map_err(|e| exec_datafusion_err!("{e}"))?; + } + Ok(()) +} \ No newline at end of file