From cef27c15b7d1375da7e352da39efafe53f862b52 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Tue, 21 Jul 2020 08:17:04 -0700 Subject: [PATCH] Updated join tests for cache (#286) --- .../src/main/python/cache_test.py | 152 +++++++++++++++++- integration_tests/src/main/python/data_gen.py | 14 +- .../src/main/python/spark_session.py | 2 + 3 files changed, 161 insertions(+), 7 deletions(-) diff --git a/integration_tests/src/main/python/cache_test.py b/integration_tests/src/main/python/cache_test.py index 5ecd9b3b792..96d17d21d54 100644 --- a/integration_tests/src/main/python/cache_test.py +++ b/integration_tests/src/main/python/cache_test.py @@ -14,9 +14,14 @@ import pytest -from asserts import assert_gpu_and_cpu_are_equal_collect +from asserts import assert_gpu_and_cpu_are_equal_collect, assert_equal from data_gen import * +from datetime import date import pyspark.sql.functions as f +from spark_session import with_cpu_session, with_gpu_session +from join_test import create_df +from generate_expr_test import four_op_df +from marks import incompat, allow_non_gpu, ignore_order def test_passing_gpuExpr_as_Expr(): assert_gpu_and_cpu_are_equal_collect( @@ -28,3 +33,148 @@ def test_passing_gpuExpr_as_Expr(): .cache() .limit(50) ) +# creating special cases to just remove -0.0 because of https://github.com/NVIDIA/spark-rapids/issues/84 +double_special_cases = [ + DoubleGen.make_from(1, DOUBLE_MAX_EXP, DOUBLE_MAX_FRACTION), + DoubleGen.make_from(0, DOUBLE_MAX_EXP, DOUBLE_MAX_FRACTION), + DoubleGen.make_from(1, DOUBLE_MIN_EXP, DOUBLE_MAX_FRACTION), + DoubleGen.make_from(0, DOUBLE_MIN_EXP, DOUBLE_MAX_FRACTION), + 0.0, 1.0, -1.0, float('inf'), float('-inf'), float('nan'), + NEG_DOUBLE_NAN_MAX_VALUE +] + +all_gen = [StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(), + pytest.param(FloatGen(special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), marks=[incompat]), pytest.param(DoubleGen(special_cases=double_special_cases), marks=[incompat]), BooleanGen(), DateGen(), TimestampGen()] + +all_gen_filters = [(StringGen(), "rlike(a, '^(?=.{1,5}$).*')"), + (ByteGen(), "a < 100"), + (ShortGen(), "a < 100"), + (IntegerGen(), "a < 1000"), + (LongGen(), "a < 1000"), + (BooleanGen(), "a == false"), + (DateGen(), "a > '1/21/2012'"), + (TimestampGen(), "a > '1/21/2012'"), + pytest.param((FloatGen(special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), "a < 1000"), marks=[incompat]), + pytest.param((DoubleGen(special_cases=double_special_cases),"a < 1000"), marks=[incompat])] + +@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) +@ignore_order +def test_cache_join(data_gen, join_type): + if data_gen.data_type == BooleanType(): + pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350") + + def do_join(spark): + left, right = create_df(spark, data_gen, 500, 500) + cached = left.join(right, left.a == right.r_a, join_type).cache() + cached.count() # populates cache + return cached + + assert_gpu_and_cpu_are_equal_collect(do_join) + +@pytest.mark.parametrize('data_gen', all_gen_filters, ids=idfn) +@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) +# We are OK running everything on CPU until we complete 'https://github.com/NVIDIA/spark-rapids/issues/360' +# because we have an explicit check in our code that disallows InMemoryTableScan to have anything other than +# AttributeReference +@allow_non_gpu(any=True) +@ignore_order +def test_cached_join_filter(data_gen, join_type): + data, filter = data_gen + if data.data_type == BooleanType(): + pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350") + + def do_join(spark): + left, right = create_df(spark, data, 500, 500) + cached = left.join(right, left.a == right.r_a, join_type).cache() + cached.count() #populates the cache + return cached.filter(filter) + + assert_gpu_and_cpu_are_equal_collect(do_join) + +@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) +@ignore_order +def test_cache_broadcast_hash_join(data_gen, join_type): + if data_gen.data_type == BooleanType(): + pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350") + + def do_join(spark): + left, right = create_df(spark, data_gen, 500, 500) + cached = left.join(right.hint("broadcast"), left.a == right.r_a, join_type).cache() + cached.count() + return cached + + assert_gpu_and_cpu_are_equal_collect(do_join) + +shuffled_conf = {"spark.sql.autoBroadcastJoinThreshold": "160", + "spark.sql.join.preferSortMergeJoin": "false", + "spark.sql.shuffle.partitions": "2", + "spark.rapids.sql.exec.BroadcastNestedLoopJoinExec": "true"} + +@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) +@ignore_order +def test_cache_shuffled_hash_join(data_gen, join_type): + if data_gen.data_type == BooleanType(): + pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350") + + def do_join(spark): + left, right = create_df(spark, data_gen, 50, 500) + cached = left.join(right, left.a == right.r_a, join_type).cache() + cached.count() + return cached + assert_gpu_and_cpu_are_equal_collect(do_join) + +@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) +@ignore_order +def test_cache_broadcast_nested_loop_join(data_gen, join_type): + def do_join(spark): + left, right = create_df(spark, data_gen, 50, 25) + cached = left.crossJoin(right.hint("broadcast")).cache() + cached.count() + return cached + + assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.rapids.sql.exec.BroadcastNestedLoopJoinExec': 'true'}) + +all_gen_restricting_dates = [StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(), + pytest.param(FloatGen(special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), marks=[incompat]), + pytest.param(DoubleGen(special_cases=double_special_cases), marks=[incompat]), + BooleanGen(), + # due to backward compatibility we are avoiding writing dates prior to 1582-10-15 + # For more detail please look at SPARK-31404 + # This issue is tracked by https://github.com/NVIDIA/spark-rapids/issues/133 in the plugin + DateGen(start=date(1582, 10, 15)), + TimestampGen()] + +@pytest.mark.parametrize('data_gen', all_gen_restricting_dates, ids=idfn) +@allow_non_gpu('InMemoryTableScanExec', 'DataWritingCommandExec') +def test_cache_posexplode_makearray(spark_tmp_path, data_gen): + if data_gen.data_type == BooleanType(): + pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350") + data_path_cpu = spark_tmp_path + '/PARQUET_DATA_CPU' + data_path_gpu = spark_tmp_path + '/PARQUET_DATA_GPU' + def write_posExplode(data_path): + def posExplode(spark): + cached = four_op_df(spark, data_gen).selectExpr('posexplode(array(b, c, d))', 'a').cache() + cached.count() + cached.write.parquet(data_path) + spark.read.parquet(data_path) + return posExplode + from_cpu = with_cpu_session(write_posExplode(data_path_cpu)) + from_gpu = with_gpu_session(write_posExplode(data_path_gpu)) + assert_equal(from_cpu, from_gpu) + +@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +@ignore_order +def test_cache_expand_exec(data_gen): + def op_df(spark, length=2048, seed=0): + cached = gen_df(spark, StructGen([ + ('a', data_gen), + ('b', IntegerGen())], nullable=False), length=length, seed=seed).cache() + cached.count() # populate the cache + return cached.rollup(f.col("a"), f.col("b")).agg(f.count(f.col("b"))) + + assert_gpu_and_cpu_are_equal_collect(op_df) + diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index 0710d7f943e..cdd637ca122 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -293,10 +293,10 @@ def __init__(self, min_exp=DOUBLE_MIN_EXP, max_exp=DOUBLE_MAX_EXP, no_nans=False self._use_full_range = (self._min_exp == DOUBLE_MIN_EXP) and (self._max_exp == DOUBLE_MAX_EXP) if special_cases is None: special_cases = [ - self._make_from(1, self._max_exp, DOUBLE_MAX_FRACTION), - self._make_from(0, self._max_exp, DOUBLE_MAX_FRACTION), - self._make_from(1, self._min_exp, DOUBLE_MAX_FRACTION), - self._make_from(0, self._min_exp, DOUBLE_MAX_FRACTION) + self.make_from(1, self._max_exp, DOUBLE_MAX_FRACTION), + self.make_from(0, self._max_exp, DOUBLE_MAX_FRACTION), + self.make_from(1, self._min_exp, DOUBLE_MAX_FRACTION), + self.make_from(0, self._min_exp, DOUBLE_MAX_FRACTION) ] if self._min_exp <= 0 and self._max_exp >= 0: special_cases.append(0.0) @@ -312,7 +312,7 @@ def __init__(self, min_exp=DOUBLE_MIN_EXP, max_exp=DOUBLE_MAX_EXP, no_nans=False super().__init__(DoubleType(), nullable=nullable, special_cases=special_cases) @staticmethod - def _make_from(sign, exp, fraction): + def make_from(sign, exp, fraction): sign = sign & 1 # 1 bit exp = (exp + 1023) & 0x7FF # add bias and 11 bits fraction = fraction & DOUBLE_MAX_FRACTION @@ -338,7 +338,7 @@ def gen_part_double(): sign = rand.getrandbits(1) exp = rand.randint(self._min_exp, self._max_exp) fraction = rand.getrandbits(52) - return self._fixup_nans(self._make_from(sign, exp, fraction)) + return self._fixup_nans(self.make_from(sign, exp, fraction)) self._start(rand, gen_part_double) class BooleanGen(DataGen): @@ -416,6 +416,8 @@ def _guess_leap_year(t): y = int(math.ceil(t/4.0)) * 4 if ((y % 100) == 0) and ((y % 400) != 0): y = y + 4 + if (y == 10000): + y = y - 4 return y _epoch = date(1970, 1, 1) diff --git a/integration_tests/src/main/python/spark_session.py b/integration_tests/src/main/python/spark_session.py index 5dd91cba61a..cbaa74fa01c 100644 --- a/integration_tests/src/main/python/spark_session.py +++ b/integration_tests/src/main/python/spark_session.py @@ -48,6 +48,8 @@ def _set_all_confs(conf): def reset_spark_session_conf(): """Reset all of the configs for a given spark session.""" _set_all_confs(_orig_conf) + #We should clear the cache + spark.catalog.clearCache() # Have to reach into a private member to get access to the API we need current_keys = _from_scala_map(spark.conf._jconf.getAll()).keys() for key in current_keys: