diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 2494108d9258..bc2c3315da59 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -175,8 +175,9 @@ async fn sort_merge_join_no_spill() { "select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time", ) .with_expected_errors(vec![ - "Resources exhausted: Failed to allocate additional", + "Failed to allocate additional", "SMJStream", + "Disk spilling disabled", ]) .with_memory_limit(1_000) .with_config(config) diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 555df2613ff7..5fde028c7f48 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -42,7 +42,8 @@ use futures::{Stream, StreamExt}; use hashbrown::HashSet; use datafusion_common::{ - internal_err, not_impl_err, plan_err, DataFusionError, JoinSide, JoinType, Result, + exec_err, internal_err, not_impl_err, plan_err, DataFusionError, JoinSide, JoinType, + Result, }; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; @@ -252,10 +253,10 @@ impl DisplayAs for SortMergeJoinExec { "SortMergeJoin: join_type={:?}, on=[{}]{}", self.join_type, on, - self.filter - .as_ref() - .map(|f| format!(", filter={}", f.expression())) - .unwrap_or("".to_string()) + self.filter.as_ref().map_or("".to_string(), |f| format!( + ", filter={}", + f.expression() + )) ) } } @@ -583,6 +584,7 @@ impl StreamedBatch { #[derive(Debug)] struct BufferedBatch { /// The buffered record batch + /// None if the batch spilled to disk th pub batch: Option, /// The range in which the rows share the same join key pub range: Range, @@ -599,7 +601,9 @@ struct BufferedBatch { /// but if batch is spilled to disk this property is preferable /// and less expensive pub num_rows: usize, - /// A temp spill file name on the disk if the batch spilled + /// An optional temp spill file name on the disk if the batch spilled + /// None by default + /// Some(fileName) if the batch spilled to the disk pub spill_file: Option, } @@ -890,7 +894,7 @@ impl SMJStream { } fn free_reservation(&mut self, buffered_batch: BufferedBatch) -> Result<()> { - // Shrink memory usage for in memory batches only + // Shrink memory usage for in-memory batches only if buffered_batch.spill_file.is_none() && buffered_batch.batch.is_some() { self.reservation .try_shrink(buffered_batch.size_estimation)?; @@ -912,7 +916,7 @@ impl SMJStream { let spill_file = self .runtime_env .disk_manager - .create_tmp_file("SortMergeJoinBuffered")?; + .create_tmp_file("sort_merge_join_buffered_spill")?; if let Some(batch) = buffered_batch.batch { spill_record_batches( @@ -929,11 +933,12 @@ impl SMJStream { .spilled_bytes .add(buffered_batch.size_estimation); self.join_metrics.spilled_rows.add(buffered_batch.num_rows); + Ok(()) + } else { + internal_err!("Buffered batch has empty body") } - - Ok(()) } - Err(e) => Err(e), + Err(e) => exec_err!("{}. Disk spilling disabled.", e.message()), }?; self.buffered_data.batches.push_back(buffered_batch); @@ -1020,7 +1025,7 @@ impl SMJStream { self.buffered_state = BufferedState::Ready; } Poll::Ready(Some(batch)) => { - // Multi batch + // Polling batches coming concurrently as multiple partitions self.join_metrics.input_batches.add(1); self.join_metrics.input_rows.add(batch.num_rows()); if batch.num_rows() > 0 { @@ -1609,7 +1614,7 @@ fn get_buffered_columns_from_batch( Ok(buffered_cols) } // Invalid combination - _ => internal_err!("Buffered batch spill status is in the inconsistent state."), + (spill, batch) => internal_err!("Unexpected buffered batch spill status. Spill exists: {}. In-memory exists: {}", spill.is_some(), batch.is_some()), } } @@ -1646,7 +1651,6 @@ fn get_filtered_join_mask( // we don't need to check any others for the same index JoinType::LeftSemi => { // have we seen a filter match for a streaming index before - // have we seen a filter match for are streaming index before for i in 0..streamed_indices_length { // LeftSemi respects only first true values for specific streaming index, // others true values for the same index must be false @@ -2904,11 +2908,9 @@ mod tests { let stream = join.execute(0, task_ctx)?; let err = common::collect(stream).await.unwrap_err(); - assert_contains!( - err.to_string(), - "Resources exhausted: Failed to allocate additional" - ); + assert_contains!(err.to_string(), "Failed to allocate additional"); assert_contains!(err.to_string(), "SMJStream[0]"); + assert_contains!(err.to_string(), "Disk spilling disabled"); assert!(join.metrics().is_some()); assert_eq!(join.metrics().unwrap().spill_count(), Some(0)); assert_eq!(join.metrics().unwrap().spilled_bytes(), Some(0)); @@ -2990,11 +2992,9 @@ mod tests { let stream = join.execute(0, task_ctx)?; let err = common::collect(stream).await.unwrap_err(); - assert_contains!( - err.to_string(), - "Resources exhausted: Failed to allocate additional" - ); + assert_contains!(err.to_string(), "Failed to allocate additional"); assert_contains!(err.to_string(), "SMJStream[0]"); + assert_contains!(err.to_string(), "Disk spilling disabled"); assert!(join.metrics().is_some()); assert_eq!(join.metrics().unwrap().spill_count(), Some(0)); assert_eq!(join.metrics().unwrap().spilled_bytes(), Some(0)); diff --git a/datafusion/physical-plan/src/spill.rs b/datafusion/physical-plan/src/spill.rs index 75797eb25624..21ca58fa0a9f 100644 --- a/datafusion/physical-plan/src/spill.rs +++ b/datafusion/physical-plan/src/spill.rs @@ -40,7 +40,7 @@ use crate::stream::RecordBatchReceiverStream; /// `path` - temp file /// `schema` - batches schema, should be the same across batches /// `buffer` - internal buffer of capacity batches -pub fn read_spill_as_stream( +pub(crate) fn read_spill_as_stream( path: RefCountedTempFile, schema: SchemaRef, buffer: usize, @@ -56,7 +56,7 @@ pub fn read_spill_as_stream( /// Spills in-memory `batches` to disk. /// /// Returns total number of the rows spilled to disk. -pub fn spill_record_batches( +pub(crate) fn spill_record_batches( batches: Vec, path: PathBuf, schema: SchemaRef, @@ -94,7 +94,7 @@ pub fn spill_record_batch_by_size( path: PathBuf, schema: SchemaRef, batch_size_rows: usize, -) -> Result { +) -> Result<()> { let mut offset = 0; let total_rows = batch.num_rows(); let mut writer = IPCWriter::new(&path, schema.as_ref())?; @@ -107,7 +107,7 @@ pub fn spill_record_batch_by_size( } writer.finish()?; - Ok(total_rows) + Ok(()) } #[cfg(test)] @@ -168,14 +168,12 @@ mod tests { let spill_file = disk_manager.create_tmp_file("Test Spill")?; let schema = batch1.schema(); - let num_rows = batch1.num_rows(); - let cnt = spill_record_batch_by_size( + spill_record_batch_by_size( &batch1, spill_file.path().into(), Arc::clone(&schema), 1, - ); - assert_eq!(cnt.unwrap(), num_rows); + )?; let file = BufReader::new(File::open(spill_file.path())?); let reader = arrow::ipc::reader::FileReader::try_new(file, None)?;