Skip to content

Commit

Permalink
Comments
Browse files Browse the repository at this point in the history
  • Loading branch information
comphead committed Jul 19, 2024
1 parent 1d9c7d4 commit 93643b6
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 31 deletions.
3 changes: 2 additions & 1 deletion datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,9 @@ async fn sort_merge_join_no_spill() {
"select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time",
)
.with_expected_errors(vec![
"Resources exhausted: Failed to allocate additional",
"Failed to allocate additional",
"SMJStream",
"Disk spilling disabled",
])
.with_memory_limit(1_000)
.with_config(config)
Expand Down
44 changes: 22 additions & 22 deletions datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ use futures::{Stream, StreamExt};
use hashbrown::HashSet;

use datafusion_common::{
internal_err, not_impl_err, plan_err, DataFusionError, JoinSide, JoinType, Result,
exec_err, internal_err, not_impl_err, plan_err, DataFusionError, JoinSide, JoinType,
Result,
};
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
Expand Down Expand Up @@ -252,10 +253,10 @@ impl DisplayAs for SortMergeJoinExec {
"SortMergeJoin: join_type={:?}, on=[{}]{}",
self.join_type,
on,
self.filter
.as_ref()
.map(|f| format!(", filter={}", f.expression()))
.unwrap_or("".to_string())
self.filter.as_ref().map_or("".to_string(), |f| format!(
", filter={}",
f.expression()
))
)
}
}
Expand Down Expand Up @@ -583,6 +584,7 @@ impl StreamedBatch {
#[derive(Debug)]
struct BufferedBatch {
/// The buffered record batch
/// None if the batch spilled to disk th
pub batch: Option<RecordBatch>,
/// The range in which the rows share the same join key
pub range: Range<usize>,
Expand All @@ -599,7 +601,9 @@ struct BufferedBatch {
/// but if batch is spilled to disk this property is preferable
/// and less expensive
pub num_rows: usize,
/// A temp spill file name on the disk if the batch spilled
/// An optional temp spill file name on the disk if the batch spilled
/// None by default
/// Some(fileName) if the batch spilled to the disk
pub spill_file: Option<RefCountedTempFile>,
}

Expand Down Expand Up @@ -890,7 +894,7 @@ impl SMJStream {
}

fn free_reservation(&mut self, buffered_batch: BufferedBatch) -> Result<()> {
// Shrink memory usage for in memory batches only
// Shrink memory usage for in-memory batches only
if buffered_batch.spill_file.is_none() && buffered_batch.batch.is_some() {
self.reservation
.try_shrink(buffered_batch.size_estimation)?;
Expand All @@ -912,7 +916,7 @@ impl SMJStream {
let spill_file = self
.runtime_env
.disk_manager
.create_tmp_file("SortMergeJoinBuffered")?;
.create_tmp_file("sort_merge_join_buffered_spill")?;

if let Some(batch) = buffered_batch.batch {
spill_record_batches(
Expand All @@ -929,11 +933,12 @@ impl SMJStream {
.spilled_bytes
.add(buffered_batch.size_estimation);
self.join_metrics.spilled_rows.add(buffered_batch.num_rows);
Ok(())
} else {
internal_err!("Buffered batch has empty body")
}

Ok(())
}
Err(e) => Err(e),
Err(e) => exec_err!("{}. Disk spilling disabled.", e.message()),
}?;

self.buffered_data.batches.push_back(buffered_batch);
Expand Down Expand Up @@ -1020,7 +1025,7 @@ impl SMJStream {
self.buffered_state = BufferedState::Ready;
}
Poll::Ready(Some(batch)) => {
// Multi batch
// Polling batches coming concurrently as multiple partitions
self.join_metrics.input_batches.add(1);
self.join_metrics.input_rows.add(batch.num_rows());
if batch.num_rows() > 0 {
Expand Down Expand Up @@ -1609,7 +1614,7 @@ fn get_buffered_columns_from_batch(
Ok(buffered_cols)
}
// Invalid combination
_ => internal_err!("Buffered batch spill status is in the inconsistent state."),
(spill, batch) => internal_err!("Unexpected buffered batch spill status. Spill exists: {}. In-memory exists: {}", spill.is_some(), batch.is_some()),
}
}

Expand Down Expand Up @@ -1646,7 +1651,6 @@ fn get_filtered_join_mask(
// we don't need to check any others for the same index
JoinType::LeftSemi => {
// have we seen a filter match for a streaming index before
// have we seen a filter match for are streaming index before
for i in 0..streamed_indices_length {
// LeftSemi respects only first true values for specific streaming index,
// others true values for the same index must be false
Expand Down Expand Up @@ -2904,11 +2908,9 @@ mod tests {
let stream = join.execute(0, task_ctx)?;
let err = common::collect(stream).await.unwrap_err();

assert_contains!(
err.to_string(),
"Resources exhausted: Failed to allocate additional"
);
assert_contains!(err.to_string(), "Failed to allocate additional");
assert_contains!(err.to_string(), "SMJStream[0]");
assert_contains!(err.to_string(), "Disk spilling disabled");
assert!(join.metrics().is_some());
assert_eq!(join.metrics().unwrap().spill_count(), Some(0));
assert_eq!(join.metrics().unwrap().spilled_bytes(), Some(0));
Expand Down Expand Up @@ -2990,11 +2992,9 @@ mod tests {
let stream = join.execute(0, task_ctx)?;
let err = common::collect(stream).await.unwrap_err();

assert_contains!(
err.to_string(),
"Resources exhausted: Failed to allocate additional"
);
assert_contains!(err.to_string(), "Failed to allocate additional");
assert_contains!(err.to_string(), "SMJStream[0]");
assert_contains!(err.to_string(), "Disk spilling disabled");
assert!(join.metrics().is_some());
assert_eq!(join.metrics().unwrap().spill_count(), Some(0));
assert_eq!(join.metrics().unwrap().spilled_bytes(), Some(0));
Expand Down
14 changes: 6 additions & 8 deletions datafusion/physical-plan/src/spill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::stream::RecordBatchReceiverStream;
/// `path` - temp file
/// `schema` - batches schema, should be the same across batches
/// `buffer` - internal buffer of capacity batches
pub fn read_spill_as_stream(
pub(crate) fn read_spill_as_stream(
path: RefCountedTempFile,
schema: SchemaRef,
buffer: usize,
Expand All @@ -56,7 +56,7 @@ pub fn read_spill_as_stream(
/// Spills in-memory `batches` to disk.
///
/// Returns total number of the rows spilled to disk.
pub fn spill_record_batches(
pub(crate) fn spill_record_batches(
batches: Vec<RecordBatch>,
path: PathBuf,
schema: SchemaRef,
Expand Down Expand Up @@ -94,7 +94,7 @@ pub fn spill_record_batch_by_size(
path: PathBuf,
schema: SchemaRef,
batch_size_rows: usize,
) -> Result<usize> {
) -> Result<()> {
let mut offset = 0;
let total_rows = batch.num_rows();
let mut writer = IPCWriter::new(&path, schema.as_ref())?;
Expand All @@ -107,7 +107,7 @@ pub fn spill_record_batch_by_size(
}
writer.finish()?;

Ok(total_rows)
Ok(())
}

#[cfg(test)]
Expand Down Expand Up @@ -168,14 +168,12 @@ mod tests {

let spill_file = disk_manager.create_tmp_file("Test Spill")?;
let schema = batch1.schema();
let num_rows = batch1.num_rows();
let cnt = spill_record_batch_by_size(
spill_record_batch_by_size(
&batch1,
spill_file.path().into(),
Arc::clone(&schema),
1,
);
assert_eq!(cnt.unwrap(), num_rows);
)?;

let file = BufReader::new(File::open(spill_file.path())?);
let reader = arrow::ipc::reader::FileReader::try_new(file, None)?;
Expand Down

0 comments on commit 93643b6

Please sign in to comment.