Skip to content

Commit

Permalink
Updated join tests for cache (NVIDIA#286)
Browse files Browse the repository at this point in the history
  • Loading branch information
razajafri authored Jul 21, 2020
1 parent 0be6d29 commit cef27c1
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 7 deletions.
152 changes: 151 additions & 1 deletion integration_tests/src/main/python/cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_equal
from data_gen import *
from datetime import date
import pyspark.sql.functions as f
from spark_session import with_cpu_session, with_gpu_session
from join_test import create_df
from generate_expr_test import four_op_df
from marks import incompat, allow_non_gpu, ignore_order

def test_passing_gpuExpr_as_Expr():
assert_gpu_and_cpu_are_equal_collect(
Expand All @@ -28,3 +33,148 @@ def test_passing_gpuExpr_as_Expr():
.cache()
.limit(50)
)
# creating special cases to just remove -0.0 because of https://github.com/NVIDIA/spark-rapids/issues/84
double_special_cases = [
DoubleGen.make_from(1, DOUBLE_MAX_EXP, DOUBLE_MAX_FRACTION),
DoubleGen.make_from(0, DOUBLE_MAX_EXP, DOUBLE_MAX_FRACTION),
DoubleGen.make_from(1, DOUBLE_MIN_EXP, DOUBLE_MAX_FRACTION),
DoubleGen.make_from(0, DOUBLE_MIN_EXP, DOUBLE_MAX_FRACTION),
0.0, 1.0, -1.0, float('inf'), float('-inf'), float('nan'),
NEG_DOUBLE_NAN_MAX_VALUE
]

all_gen = [StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(),
pytest.param(FloatGen(special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), marks=[incompat]), pytest.param(DoubleGen(special_cases=double_special_cases), marks=[incompat]), BooleanGen(), DateGen(), TimestampGen()]

all_gen_filters = [(StringGen(), "rlike(a, '^(?=.{1,5}$).*')"),
(ByteGen(), "a < 100"),
(ShortGen(), "a < 100"),
(IntegerGen(), "a < 1000"),
(LongGen(), "a < 1000"),
(BooleanGen(), "a == false"),
(DateGen(), "a > '1/21/2012'"),
(TimestampGen(), "a > '1/21/2012'"),
pytest.param((FloatGen(special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), "a < 1000"), marks=[incompat]),
pytest.param((DoubleGen(special_cases=double_special_cases),"a < 1000"), marks=[incompat])]

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
@ignore_order
def test_cache_join(data_gen, join_type):
if data_gen.data_type == BooleanType():
pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350")

def do_join(spark):
left, right = create_df(spark, data_gen, 500, 500)
cached = left.join(right, left.a == right.r_a, join_type).cache()
cached.count() # populates cache
return cached

assert_gpu_and_cpu_are_equal_collect(do_join)

@pytest.mark.parametrize('data_gen', all_gen_filters, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
# We are OK running everything on CPU until we complete 'https://github.com/NVIDIA/spark-rapids/issues/360'
# because we have an explicit check in our code that disallows InMemoryTableScan to have anything other than
# AttributeReference
@allow_non_gpu(any=True)
@ignore_order
def test_cached_join_filter(data_gen, join_type):
data, filter = data_gen
if data.data_type == BooleanType():
pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350")

def do_join(spark):
left, right = create_df(spark, data, 500, 500)
cached = left.join(right, left.a == right.r_a, join_type).cache()
cached.count() #populates the cache
return cached.filter(filter)

assert_gpu_and_cpu_are_equal_collect(do_join)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
@ignore_order
def test_cache_broadcast_hash_join(data_gen, join_type):
if data_gen.data_type == BooleanType():
pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350")

def do_join(spark):
left, right = create_df(spark, data_gen, 500, 500)
cached = left.join(right.hint("broadcast"), left.a == right.r_a, join_type).cache()
cached.count()
return cached

assert_gpu_and_cpu_are_equal_collect(do_join)

shuffled_conf = {"spark.sql.autoBroadcastJoinThreshold": "160",
"spark.sql.join.preferSortMergeJoin": "false",
"spark.sql.shuffle.partitions": "2",
"spark.rapids.sql.exec.BroadcastNestedLoopJoinExec": "true"}

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
@ignore_order
def test_cache_shuffled_hash_join(data_gen, join_type):
if data_gen.data_type == BooleanType():
pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350")

def do_join(spark):
left, right = create_df(spark, data_gen, 50, 500)
cached = left.join(right, left.a == right.r_a, join_type).cache()
cached.count()
return cached
assert_gpu_and_cpu_are_equal_collect(do_join)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
@ignore_order
def test_cache_broadcast_nested_loop_join(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
cached = left.crossJoin(right.hint("broadcast")).cache()
cached.count()
return cached

assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.rapids.sql.exec.BroadcastNestedLoopJoinExec': 'true'})

all_gen_restricting_dates = [StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(),
pytest.param(FloatGen(special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), marks=[incompat]),
pytest.param(DoubleGen(special_cases=double_special_cases), marks=[incompat]),
BooleanGen(),
# due to backward compatibility we are avoiding writing dates prior to 1582-10-15
# For more detail please look at SPARK-31404
# This issue is tracked by https://github.com/NVIDIA/spark-rapids/issues/133 in the plugin
DateGen(start=date(1582, 10, 15)),
TimestampGen()]

@pytest.mark.parametrize('data_gen', all_gen_restricting_dates, ids=idfn)
@allow_non_gpu('InMemoryTableScanExec', 'DataWritingCommandExec')
def test_cache_posexplode_makearray(spark_tmp_path, data_gen):
if data_gen.data_type == BooleanType():
pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350")
data_path_cpu = spark_tmp_path + '/PARQUET_DATA_CPU'
data_path_gpu = spark_tmp_path + '/PARQUET_DATA_GPU'
def write_posExplode(data_path):
def posExplode(spark):
cached = four_op_df(spark, data_gen).selectExpr('posexplode(array(b, c, d))', 'a').cache()
cached.count()
cached.write.parquet(data_path)
spark.read.parquet(data_path)
return posExplode
from_cpu = with_cpu_session(write_posExplode(data_path_cpu))
from_gpu = with_gpu_session(write_posExplode(data_path_gpu))
assert_equal(from_cpu, from_gpu)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@ignore_order
def test_cache_expand_exec(data_gen):
def op_df(spark, length=2048, seed=0):
cached = gen_df(spark, StructGen([
('a', data_gen),
('b', IntegerGen())], nullable=False), length=length, seed=seed).cache()
cached.count() # populate the cache
return cached.rollup(f.col("a"), f.col("b")).agg(f.count(f.col("b")))

assert_gpu_and_cpu_are_equal_collect(op_df)

14 changes: 8 additions & 6 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,10 @@ def __init__(self, min_exp=DOUBLE_MIN_EXP, max_exp=DOUBLE_MAX_EXP, no_nans=False
self._use_full_range = (self._min_exp == DOUBLE_MIN_EXP) and (self._max_exp == DOUBLE_MAX_EXP)
if special_cases is None:
special_cases = [
self._make_from(1, self._max_exp, DOUBLE_MAX_FRACTION),
self._make_from(0, self._max_exp, DOUBLE_MAX_FRACTION),
self._make_from(1, self._min_exp, DOUBLE_MAX_FRACTION),
self._make_from(0, self._min_exp, DOUBLE_MAX_FRACTION)
self.make_from(1, self._max_exp, DOUBLE_MAX_FRACTION),
self.make_from(0, self._max_exp, DOUBLE_MAX_FRACTION),
self.make_from(1, self._min_exp, DOUBLE_MAX_FRACTION),
self.make_from(0, self._min_exp, DOUBLE_MAX_FRACTION)
]
if self._min_exp <= 0 and self._max_exp >= 0:
special_cases.append(0.0)
Expand All @@ -312,7 +312,7 @@ def __init__(self, min_exp=DOUBLE_MIN_EXP, max_exp=DOUBLE_MAX_EXP, no_nans=False
super().__init__(DoubleType(), nullable=nullable, special_cases=special_cases)

@staticmethod
def _make_from(sign, exp, fraction):
def make_from(sign, exp, fraction):
sign = sign & 1 # 1 bit
exp = (exp + 1023) & 0x7FF # add bias and 11 bits
fraction = fraction & DOUBLE_MAX_FRACTION
Expand All @@ -338,7 +338,7 @@ def gen_part_double():
sign = rand.getrandbits(1)
exp = rand.randint(self._min_exp, self._max_exp)
fraction = rand.getrandbits(52)
return self._fixup_nans(self._make_from(sign, exp, fraction))
return self._fixup_nans(self.make_from(sign, exp, fraction))
self._start(rand, gen_part_double)

class BooleanGen(DataGen):
Expand Down Expand Up @@ -416,6 +416,8 @@ def _guess_leap_year(t):
y = int(math.ceil(t/4.0)) * 4
if ((y % 100) == 0) and ((y % 400) != 0):
y = y + 4
if (y == 10000):
y = y - 4
return y

_epoch = date(1970, 1, 1)
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/src/main/python/spark_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ def _set_all_confs(conf):
def reset_spark_session_conf():
"""Reset all of the configs for a given spark session."""
_set_all_confs(_orig_conf)
#We should clear the cache
spark.catalog.clearCache()
# Have to reach into a private member to get access to the API we need
current_keys = _from_scala_map(spark.conf._jconf.getAll()).keys()
for key in current_keys:
Expand Down

0 comments on commit cef27c1

Please sign in to comment.