Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: experiment with SMJ last buffered batch #12082

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 48 additions & 53 deletions datafusion/core/tests/fuzz_cases/join_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,16 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use arrow::array::{ArrayRef, Int32Array};
use arrow::compute::SortOptions;
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use arrow_schema::Schema;

use datafusion_common::ScalarValue;
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_expr::PhysicalExprRef;

use rand::Rng;
use std::sync::Arc;
use std::time::SystemTime;

use datafusion::common::JoinSide;
use datafusion::logical_expr::{JoinType, Operator};
Expand All @@ -39,6 +36,7 @@ use datafusion::physical_plan::joins::{
HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec,
};
use datafusion::physical_plan::memory::MemoryExec;
use rand::Rng;

use datafusion::prelude::{SessionConfig, SessionContext};
use test_utils::stagger_batch_with_seed;
Expand Down Expand Up @@ -250,20 +248,22 @@ async fn test_anti_join_1k() {
}

#[tokio::test]
#[ignore]
//#[ignore]
// flaky test giving 1 rows difference sometimes
// https://github.com/apache/datafusion/issues/11555
async fn test_anti_join_1k_filtered() {
// NLJ vs HJ gives wrong result
// Tracked in https://github.com/apache/datafusion/issues/11537
JoinFuzzTestCase::new(
make_staggered_batches(1000),
make_staggered_batches(1000),
JoinType::LeftAnti,
Some(Box::new(col_lt_col_filter)),
)
.run_test(&[JoinTestType::HjSmj], false)
.await
for i in 0..1000 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I run this test 1000 times there is a possibility that test gonna fail.

JoinFuzzTestCase::new(
make_staggered_batches(1000),
make_staggered_batches(1000),
JoinType::LeftAnti,
Some(Box::new(col_lt_col_filter)),
)
.run_test(&[JoinTestType::HjSmj], false)
.await
}
}

type JoinFilterBuilder = Box<dyn Fn(Arc<Schema>, Arc<Schema>) -> JoinFilter>;
Expand Down Expand Up @@ -450,11 +450,15 @@ impl JoinFuzzTestCase {
let smj_rows = smj_collected.iter().fold(0, |acc, b| acc + b.num_rows());
let nlj_rows = nlj_collected.iter().fold(0, |acc, b| acc + b.num_rows());

if debug {
if debug
&& ((join_tests.contains(&JoinTestType::NljHj) && nlj_rows != hj_rows)
|| (join_tests.contains(&JoinTestType::HjSmj) && smj_rows != hj_rows))
{
let fuzz_debug = "fuzz_test_debug";
std::fs::remove_dir_all(fuzz_debug).unwrap_or(());
std::fs::create_dir_all(fuzz_debug).unwrap();
let out_dir_name = &format!("{fuzz_debug}/batch_size_{batch_size}");
println!("Test result data mismatch found. HJ rows {}, SMJ rows {}, NLJ rows {}", hj_rows, smj_rows, nlj_rows);
println!("The debug is ON. Input data will be saved to {out_dir_name}");

Self::save_partitioned_batches_as_parquet(
Expand All @@ -468,7 +472,7 @@ impl JoinFuzzTestCase {
"input2",
);

if join_tests.contains(&JoinTestType::NljHj) {
if join_tests.contains(&JoinTestType::NljHj) && nlj_rows != hj_rows {
Self::save_partitioned_batches_as_parquet(
&nlj_collected,
out_dir_name,
Expand All @@ -481,7 +485,7 @@ impl JoinFuzzTestCase {
);
}

if join_tests.contains(&JoinTestType::HjSmj) {
if join_tests.contains(&JoinTestType::HjSmj) && smj_rows != hj_rows {
Self::save_partitioned_batches_as_parquet(
&hj_collected,
out_dir_name,
Expand Down Expand Up @@ -537,6 +541,11 @@ impl JoinFuzzTestCase {

if join_tests.contains(&JoinTestType::HjSmj) {
let err_msg_row_cnt = format!("HashJoinExec and SortMergeJoinExec produced different row counts, batch_size: {}", &batch_size);
// println!("=============== HashJoinExec ==================");
// hj_formatted_sorted.iter().for_each(|s| println!("{}", s));
// println!("=============== SortMergeJoinExec ==================");
// smj_formatted_sorted.iter().for_each(|s| println!("{}", s));

assert_eq!(hj_rows, smj_rows, "{}", err_msg_row_cnt.as_str());

let err_msg_contents = format!("SortMergeJoinExec and HashJoinExec produced different results, batch_size: {}", &batch_size);
Expand Down Expand Up @@ -578,34 +587,6 @@ impl JoinFuzzTestCase {
/// )
/// .run_test(&[JoinTestType::HjSmj], false)
/// .await;
///
/// let ctx: SessionContext = SessionContext::new();
/// let df = ctx
/// .read_parquet(
/// "/tmp/input1/*.parquet",
/// datafusion::prelude::ParquetReadOptions::default(),
/// )
/// .await
/// .unwrap();
/// let left = df.collect().await.unwrap();
///
/// let df = ctx
/// .read_parquet(
/// "/tmp/input2/*.parquet",
/// datafusion::prelude::ParquetReadOptions::default(),
/// )
/// .await
/// .unwrap();
///
/// let right = df.collect().await.unwrap();
/// JoinFuzzTestCase::new(
/// left,
/// right,
/// JoinType::LeftSemi,
/// Some(Box::new(less_than_100_join_filter)),
/// )
/// .run_test()
/// .await
/// }
fn save_partitioned_batches_as_parquet(
input: &[RecordBatch],
Expand All @@ -617,9 +598,15 @@ impl JoinFuzzTestCase {
std::fs::create_dir_all(out_path).unwrap();

input.iter().enumerate().for_each(|(idx, batch)| {
let mut file =
std::fs::File::create(format!("{out_path}/file_{}.parquet", idx))
.unwrap();
let file_path = format!("{out_path}/file_{}.parquet", idx);
let mut file = std::fs::File::create(&file_path).unwrap();
println!(
"{}: Saving batch idx {} rows {} to parquet {}",
&out_name,
idx,
batch.num_rows(),
&file_path
);
let mut writer = parquet::arrow::ArrowWriter::try_new(
&mut file,
input.first().unwrap().schema(),
Expand All @@ -629,8 +616,6 @@ impl JoinFuzzTestCase {
writer.write(batch).unwrap();
writer.close().unwrap();
});

println!("The data {out_name} saved as parquet into {out_path}");
}

/// Read parquet files preserving partitions, i.e. 1 file -> 1 partition
Expand All @@ -643,10 +628,20 @@ impl JoinFuzzTestCase {
) -> std::io::Result<Vec<RecordBatch>> {
let ctx: SessionContext = SessionContext::new();
let mut batches: Vec<RecordBatch> = vec![];
let mut entries = std::fs::read_dir(dir)?
.map(|res| res.map(|e| e.path()))
.collect::<Result<Vec<_>, std::io::Error>>()?;

// important to read files using the same order as they have been written
// sort by modification time
entries.sort_by_key(|path| {
std::fs::metadata(path)
.and_then(|metadata| metadata.modified())
.unwrap_or(SystemTime::UNIX_EPOCH)
});

for entry in std::fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
for entry in entries {
let path = entry.as_path();

if path.is_file() {
let mut batch = ctx
Expand Down
Loading
Loading