Skip to content

Commit

Permalink
Move spill related functions to spill.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed Jul 17, 2024
1 parent de0765a commit aa226ca
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 52 deletions.
53 changes: 1 addition & 52 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub mod projection;
pub mod recursive_query;
pub mod repartition;
pub mod sorts;
pub mod spill;
pub mod stream;
pub mod streaming;
pub mod tree_node;
Expand Down Expand Up @@ -98,8 +99,6 @@ pub use datafusion_physical_expr::{
use crate::common::IPCWriter;
pub use crate::stream::EmptyRecordBatchStream;
use crate::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::human_readable_size;
pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};

pub mod udaf {
Expand Down Expand Up @@ -903,56 +902,6 @@ pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
actual.iter().map(|elem| elem.to_string()).collect()
}

/// Read spilled batches from the disk
///
/// `path` - temp file
/// `schema` - batches schema, should be the same across batches
/// `buffer` - internal buffer of capacity batches
pub fn read_spill_as_stream(
path: RefCountedTempFile,
schema: SchemaRef,
buffer: usize,
) -> Result<SendableRecordBatchStream> {
let mut builder = RecordBatchReceiverStream::builder(schema, buffer);
let sender = builder.tx();

builder.spawn_blocking(move || read_spill(sender, path.path()));

Ok(builder.build())
}

/// Spills in-memory `batches` to disk.
///
/// Returns total number of the rows spilled to disk.
pub fn spill_record_batches(
batches: Vec<RecordBatch>,
path: PathBuf,
schema: SchemaRef,
) -> Result<usize> {
let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?;
for batch in batches {
writer.write(&batch)?;
}
writer.finish()?;
debug!(
"Spilled {} batches of total {} rows to disk, memory released {}",
writer.num_batches,
writer.num_rows,
human_readable_size(writer.num_bytes),
);
Ok(writer.num_rows)
}

fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
let file = BufReader::new(File::open(path)?);
let reader = FileReader::try_new(file, None)?;
for batch in reader {
sender
.blocking_send(batch.map_err(Into::into))
.map_err(|e| exec_datafusion_err!("{e}"))?;
}
Ok(())
}

#[cfg(test)]
mod tests {
Expand Down
116 changes: 116 additions & 0 deletions datafusion/physical-plan/src/spill.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Defines the spilling functions
use std::any::Any;
use std::fmt::Debug;
use std::fs::File;
use std::io::BufReader;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use arrow::datatypes::SchemaRef;
use arrow::ipc::reader::FileReader;
use arrow::record_batch::RecordBatch;
use futures::stream::{StreamExt, TryStreamExt};
use log::debug;
use tokio::sync::mpsc::Sender;
use tokio::task::JoinSet;

use datafusion_common::{exec_datafusion_err, exec_err, Result};
use datafusion_common::{ColumnStatistics, internal_err, Statistics};
use datafusion_common::config::ConfigOptions;
use datafusion_common::hash_utils;
use datafusion_common::utils::project_schema;
use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::human_readable_size;
use datafusion_execution::TaskContext;
use datafusion_expr::{Accumulator, ColumnarValue};
use datafusion_physical_expr::{
EquivalenceProperties, LexOrdering, PhysicalSortExpr, PhysicalSortRequirement,
};
use datafusion_physical_expr::{
AggregateExpr, Distribution, expressions, functions, Partitioning, PhysicalExpr, udf,
};
use datafusion_physical_expr::window::WindowExpr;

use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::common::IPCWriter;
use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
use crate::display::DisplayableExecutionPlan;
use crate::metrics::Metric;
use crate::metrics::MetricsSet;
use crate::ordering::InputOrderMode;
use crate::repartition::RepartitionExec;
use crate::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
use crate::stream::EmptyRecordBatchStream;
use crate::topk::TopK;
use crate::visitor::{accept, ExecutionPlanVisitor, visit_execution_plan};

/// Read spilled batches from the disk
///
/// `path` - temp file
/// `schema` - batches schema, should be the same across batches
/// `buffer` - internal buffer of capacity batches
pub fn read_spill_as_stream(
path: RefCountedTempFile,
schema: SchemaRef,
buffer: usize,
) -> Result<SendableRecordBatchStream> {
let mut builder = RecordBatchReceiverStream::builder(schema, buffer);
let sender = builder.tx();

builder.spawn_blocking(move || read_spill(sender, path.path()));

Ok(builder.build())
}

/// Spills in-memory `batches` to disk.
///
/// Returns total number of the rows spilled to disk.
pub fn spill_record_batches(
batches: Vec<RecordBatch>,
path: PathBuf,
schema: SchemaRef,
) -> Result<usize> {
let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?;
for batch in batches {
writer.write(&batch)?;
}
writer.finish()?;
debug!(
"Spilled {} batches of total {} rows to disk, memory released {}",
writer.num_batches,
writer.num_rows,
human_readable_size(writer.num_bytes),
);
Ok(writer.num_rows)
}

fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
let file = BufReader::new(File::open(path)?);
let reader = FileReader::try_new(file, None)?;
for batch in reader {
sender
.blocking_send(batch.map_err(Into::into))
.map_err(|e| exec_datafusion_err!("{e}"))?;
}
Ok(())
}

0 comments on commit aa226ca

Please sign in to comment.