From 93643b61e8de78ede59318da9e99a8eda0e8a9e0 Mon Sep 17 00:00:00 2001
From: comphead <comphead@ukr.net>
Date: Thu, 18 Jul 2024 18:33:48 -0700
Subject: [PATCH] Comments

---
 datafusion/core/tests/memory_limit/mod.rs     |  3 +-
 .../src/joins/sort_merge_join.rs              | 44 +++++++++----------
 datafusion/physical-plan/src/spill.rs         | 14 +++---
 3 files changed, 30 insertions(+), 31 deletions(-)

diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs
index 2494108d9258..bc2c3315da59 100644
--- a/datafusion/core/tests/memory_limit/mod.rs
+++ b/datafusion/core/tests/memory_limit/mod.rs
@@ -175,8 +175,9 @@ async fn sort_merge_join_no_spill() {
             "select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time",
         )
         .with_expected_errors(vec![
-            "Resources exhausted: Failed to allocate additional",
+            "Failed to allocate additional",
             "SMJStream",
+            "Disk spilling disabled",
         ])
         .with_memory_limit(1_000)
         .with_config(config)
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs
index 555df2613ff7..5fde028c7f48 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs
@@ -42,7 +42,8 @@ use futures::{Stream, StreamExt};
 use hashbrown::HashSet;
 
 use datafusion_common::{
-    internal_err, not_impl_err, plan_err, DataFusionError, JoinSide, JoinType, Result,
+    exec_err, internal_err, not_impl_err, plan_err, DataFusionError, JoinSide, JoinType,
+    Result,
 };
 use datafusion_execution::disk_manager::RefCountedTempFile;
 use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
@@ -252,10 +253,10 @@ impl DisplayAs for SortMergeJoinExec {
                     "SortMergeJoin: join_type={:?}, on=[{}]{}",
                     self.join_type,
                     on,
-                    self.filter
-                        .as_ref()
-                        .map(|f| format!(", filter={}", f.expression()))
-                        .unwrap_or("".to_string())
+                    self.filter.as_ref().map_or("".to_string(), |f| format!(
+                        ", filter={}",
+                        f.expression()
+                    ))
                 )
             }
         }
@@ -583,6 +584,7 @@ impl StreamedBatch {
 #[derive(Debug)]
 struct BufferedBatch {
     /// The buffered record batch
+    /// None if the batch spilled to disk th
     pub batch: Option<RecordBatch>,
     /// The range in which the rows share the same join key
     pub range: Range<usize>,
@@ -599,7 +601,9 @@ struct BufferedBatch {
     /// but if batch is spilled to disk this property is preferable
     /// and less expensive
     pub num_rows: usize,
-    /// A temp spill file name on the disk if the batch spilled
+    /// An optional temp spill file name on the disk if the batch spilled
+    /// None by default
+    /// Some(fileName) if the batch spilled to the disk
     pub spill_file: Option<RefCountedTempFile>,
 }
 
@@ -890,7 +894,7 @@ impl SMJStream {
     }
 
     fn free_reservation(&mut self, buffered_batch: BufferedBatch) -> Result<()> {
-        // Shrink memory usage for in memory batches only
+        // Shrink memory usage for in-memory batches only
         if buffered_batch.spill_file.is_none() && buffered_batch.batch.is_some() {
             self.reservation
                 .try_shrink(buffered_batch.size_estimation)?;
@@ -912,7 +916,7 @@ impl SMJStream {
                 let spill_file = self
                     .runtime_env
                     .disk_manager
-                    .create_tmp_file("SortMergeJoinBuffered")?;
+                    .create_tmp_file("sort_merge_join_buffered_spill")?;
 
                 if let Some(batch) = buffered_batch.batch {
                     spill_record_batches(
@@ -929,11 +933,12 @@ impl SMJStream {
                         .spilled_bytes
                         .add(buffered_batch.size_estimation);
                     self.join_metrics.spilled_rows.add(buffered_batch.num_rows);
+                    Ok(())
+                } else {
+                    internal_err!("Buffered batch has empty body")
                 }
-
-                Ok(())
             }
-            Err(e) => Err(e),
+            Err(e) => exec_err!("{}. Disk spilling disabled.", e.message()),
         }?;
 
         self.buffered_data.batches.push_back(buffered_batch);
@@ -1020,7 +1025,7 @@ impl SMJStream {
                                 self.buffered_state = BufferedState::Ready;
                             }
                             Poll::Ready(Some(batch)) => {
-                                // Multi batch
+                                // Polling batches coming concurrently as multiple partitions
                                 self.join_metrics.input_batches.add(1);
                                 self.join_metrics.input_rows.add(batch.num_rows());
                                 if batch.num_rows() > 0 {
@@ -1609,7 +1614,7 @@ fn get_buffered_columns_from_batch(
             Ok(buffered_cols)
         }
         // Invalid combination
-        _ => internal_err!("Buffered batch spill status is in the inconsistent state."),
+        (spill, batch) => internal_err!("Unexpected buffered batch spill status. Spill exists: {}. In-memory exists: {}", spill.is_some(), batch.is_some()),
     }
 }
 
@@ -1646,7 +1651,6 @@ fn get_filtered_join_mask(
         // we don't need to check any others for the same index
         JoinType::LeftSemi => {
             // have we seen a filter match for a streaming index before
-            // have we seen a filter match for are streaming index before
             for i in 0..streamed_indices_length {
                 // LeftSemi respects only first true values for specific streaming index,
                 // others true values for the same index must be false
@@ -2904,11 +2908,9 @@ mod tests {
             let stream = join.execute(0, task_ctx)?;
             let err = common::collect(stream).await.unwrap_err();
 
-            assert_contains!(
-                err.to_string(),
-                "Resources exhausted: Failed to allocate additional"
-            );
+            assert_contains!(err.to_string(), "Failed to allocate additional");
             assert_contains!(err.to_string(), "SMJStream[0]");
+            assert_contains!(err.to_string(), "Disk spilling disabled");
             assert!(join.metrics().is_some());
             assert_eq!(join.metrics().unwrap().spill_count(), Some(0));
             assert_eq!(join.metrics().unwrap().spilled_bytes(), Some(0));
@@ -2990,11 +2992,9 @@ mod tests {
             let stream = join.execute(0, task_ctx)?;
             let err = common::collect(stream).await.unwrap_err();
 
-            assert_contains!(
-                err.to_string(),
-                "Resources exhausted: Failed to allocate additional"
-            );
+            assert_contains!(err.to_string(), "Failed to allocate additional");
             assert_contains!(err.to_string(), "SMJStream[0]");
+            assert_contains!(err.to_string(), "Disk spilling disabled");
             assert!(join.metrics().is_some());
             assert_eq!(join.metrics().unwrap().spill_count(), Some(0));
             assert_eq!(join.metrics().unwrap().spilled_bytes(), Some(0));
diff --git a/datafusion/physical-plan/src/spill.rs b/datafusion/physical-plan/src/spill.rs
index 75797eb25624..21ca58fa0a9f 100644
--- a/datafusion/physical-plan/src/spill.rs
+++ b/datafusion/physical-plan/src/spill.rs
@@ -40,7 +40,7 @@ use crate::stream::RecordBatchReceiverStream;
 /// `path` - temp file
 /// `schema` - batches schema, should be the same across batches
 /// `buffer` - internal buffer of capacity batches
-pub fn read_spill_as_stream(
+pub(crate) fn read_spill_as_stream(
     path: RefCountedTempFile,
     schema: SchemaRef,
     buffer: usize,
@@ -56,7 +56,7 @@ pub fn read_spill_as_stream(
 /// Spills in-memory `batches` to disk.
 ///
 /// Returns total number of the rows spilled to disk.
-pub fn spill_record_batches(
+pub(crate) fn spill_record_batches(
     batches: Vec<RecordBatch>,
     path: PathBuf,
     schema: SchemaRef,
@@ -94,7 +94,7 @@ pub fn spill_record_batch_by_size(
     path: PathBuf,
     schema: SchemaRef,
     batch_size_rows: usize,
-) -> Result<usize> {
+) -> Result<()> {
     let mut offset = 0;
     let total_rows = batch.num_rows();
     let mut writer = IPCWriter::new(&path, schema.as_ref())?;
@@ -107,7 +107,7 @@ pub fn spill_record_batch_by_size(
     }
     writer.finish()?;
 
-    Ok(total_rows)
+    Ok(())
 }
 
 #[cfg(test)]
@@ -168,14 +168,12 @@ mod tests {
 
         let spill_file = disk_manager.create_tmp_file("Test Spill")?;
         let schema = batch1.schema();
-        let num_rows = batch1.num_rows();
-        let cnt = spill_record_batch_by_size(
+        spill_record_batch_by_size(
             &batch1,
             spill_file.path().into(),
             Arc::clone(&schema),
             1,
-        );
-        assert_eq!(cnt.unwrap(), num_rows);
+        )?;
 
         let file = BufReader::new(File::open(spill_file.path())?);
         let reader = arrow::ipc::reader::FileReader::try_new(file, None)?;