diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs b/datafusion/core/tests/fuzz_cases/join_fuzz.rs index 5fdf02079496..2b3afbf75022 100644 --- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs @@ -215,10 +215,6 @@ async fn test_semi_join_1k() { .await } -// The test is flaky -// https://github.com/apache/datafusion/issues/10886 -// SMJ produces 1 more row in the output -#[ignore] #[tokio::test] async fn test_semi_join_1k_filtered() { JoinFuzzTestCase::new( @@ -275,7 +271,7 @@ impl JoinFuzzTestCase { join_filter_builder: Option, ) -> Self { Self { - batch_sizes: &[1, 2, 7, 49, 50, 51, 100], + batch_sizes: &[2], input1, input2, join_type, @@ -442,18 +438,45 @@ impl JoinFuzzTestCase { if debug { println!("The debug is ON. Input data will be saved"); - let out_dir_name = &format!("fuzz_test_debug_batch_size_{batch_size}"); - Self::save_as_parquet(&self.input1, out_dir_name, "input1"); - Self::save_as_parquet(&self.input2, out_dir_name, "input2"); + let fuzz_debug = "fuzz_test_debug"; + std::fs::remove_dir_all(fuzz_debug).unwrap_or(()); + std::fs::create_dir_all(fuzz_debug).unwrap(); + let out_dir_name = &format!("{fuzz_debug}/batch_size_{batch_size}"); + Self::save_partitioned_batches_as_parquet( + &self.input1, + out_dir_name, + "input1", + ); + Self::save_partitioned_batches_as_parquet( + &self.input2, + out_dir_name, + "input2", + ); if join_tests.contains(&JoinTestType::NljHj) { - Self::save_as_parquet(&nlj_collected, out_dir_name, "nlj"); - Self::save_as_parquet(&hj_collected, out_dir_name, "hj"); + Self::save_partitioned_batches_as_parquet( + &nlj_collected, + out_dir_name, + "nlj", + ); + Self::save_partitioned_batches_as_parquet( + &hj_collected, + out_dir_name, + "hj", + ); } if join_tests.contains(&JoinTestType::HjSmj) { - Self::save_as_parquet(&hj_collected, out_dir_name, "hj"); - Self::save_as_parquet(&smj_collected, out_dir_name, "smj"); + Self::save_partitioned_batches_as_parquet( + &hj_collected, + out_dir_name, + "hj", + ); + Self::save_partitioned_batches_as_parquet( + &smj_collected, + out_dir_name, + "smj", + ); } } @@ -527,11 +550,26 @@ impl JoinFuzzTestCase { /// as a parquet files preserving partitioning. /// Once the data is saved it is possible to run a custom test on top of the saved data and debug /// + /// #[tokio::test] + /// async fn test1() { + /// let left: Vec = JoinFuzzTestCase::load_partitioned_batches_from_parquet("fuzz_test_debug/batch_size_2/input1").await.unwrap(); + /// let right: Vec = JoinFuzzTestCase::load_partitioned_batches_from_parquet("fuzz_test_debug/batch_size_2/input2").await.unwrap(); + /// + /// JoinFuzzTestCase::new( + /// left, + /// right, + /// JoinType::LeftSemi, + /// Some(Box::new(col_lt_col_filter)), + /// ) + /// .run_test(&[JoinTestType::HjSmj], false) + /// .await + /// } + /// /// let ctx: SessionContext = SessionContext::new(); /// let df = ctx /// .read_parquet( /// "/tmp/input1/*.parquet", - /// ParquetReadOptions::default(), + /// datafusion::prelude::ParquetReadOptions::default(), /// ) /// .await /// .unwrap(); @@ -540,7 +578,7 @@ impl JoinFuzzTestCase { /// let df = ctx /// .read_parquet( /// "/tmp/input2/*.parquet", - /// ParquetReadOptions::default(), + /// datafusion::prelude::ParquetReadOptions::default(), /// ) /// .await /// .unwrap(); @@ -554,8 +592,11 @@ impl JoinFuzzTestCase { /// ) /// .run_test() /// .await - /// } - fn save_as_parquet(input: &[RecordBatch], output_dir: &str, out_name: &str) { + fn save_partitioned_batches_as_parquet( + input: &[RecordBatch], + output_dir: &str, + out_name: &str, + ) { let out_path = &format!("{output_dir}/{out_name}"); std::fs::remove_dir_all(out_path).unwrap_or(()); std::fs::create_dir_all(out_path).unwrap(); @@ -576,6 +617,39 @@ impl JoinFuzzTestCase { println!("The data {out_name} saved as parquet into {out_path}"); } + + /// Read parquet files preserving partitions, i.e. 1 file -> 1 partition + /// Files can be of different sizes + /// The method can be useful to read partitions have been saved by `save_partitioned_batches_as_parquet` + /// for test debugging purposes + #[allow(dead_code)] + async fn load_partitioned_batches_from_parquet( + dir: &str, + ) -> std::io::Result> { + let ctx: SessionContext = SessionContext::new(); + let mut batches: Vec = vec![]; + + for entry in std::fs::read_dir(dir)? { + let entry = entry?; + let path = entry.path(); + + if path.is_file() { + let mut batch = ctx + .read_parquet( + path.to_str().unwrap(), + datafusion::prelude::ParquetReadOptions::default(), + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + batches.append(&mut batch); + } + } + Ok(batches) + } } /// Return randomly sized record batches with: diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 420fab51da39..9f7a38aa2e29 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -1532,17 +1532,21 @@ fn get_filtered_join_mask( for i in 0..streamed_indices_length { // LeftSemi respects only first true values for specific streaming index, // others true values for the same index must be false - if mask.value(i) && !seen_as_true { + let streamed_idx = streamed_indices.value(i); + if mask.value(i) + && !seen_as_true + && !matched_indices.contains(&streamed_idx) + { seen_as_true = true; corrected_mask.append_value(true); - filter_matched_indices.push(streamed_indices.value(i)); + filter_matched_indices.push(streamed_idx); } else { corrected_mask.append_value(false); } // if switched to next streaming index(e.g. from 0 to 1, or from 1 to 2), we reset seen_as_true flag if i < streamed_indices_length - 1 - && streamed_indices.value(i) != streamed_indices.value(i + 1) + && streamed_idx != streamed_indices.value(i + 1) { seen_as_true = false; } @@ -2940,6 +2944,20 @@ mod tests { )) ); + assert_eq!( + get_filtered_join_mask( + LeftSemi, + UInt64Array::from(vec![0, 0, 0, 1, 1, 1]), + &BooleanArray::from(vec![true, false, false, false, false, true]), + &HashSet::from_iter(vec![1]), + &0, + ), + Some(( + BooleanArray::from(vec![true, false, false, false, false, false]), + vec![0] + )) + ); + Ok(()) }