Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Updated join tests for cache #286

Merged
merged 17 commits into from
Jul 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 151 additions & 1 deletion integration_tests/src/main/python/cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_equal
from data_gen import *
from datetime import date
import pyspark.sql.functions as f
from spark_session import with_cpu_session, with_gpu_session
from join_test import create_df
from generate_expr_test import four_op_df
from marks import incompat, allow_non_gpu, ignore_order

def test_passing_gpuExpr_as_Expr():
assert_gpu_and_cpu_are_equal_collect(
Expand All @@ -28,3 +33,148 @@ def test_passing_gpuExpr_as_Expr():
.cache()
.limit(50)
)
# creating special cases to just remove -0.0 because of https://github.com/NVIDIA/spark-rapids/issues/84
double_special_cases = [
DoubleGen.make_from(1, DOUBLE_MAX_EXP, DOUBLE_MAX_FRACTION),
DoubleGen.make_from(0, DOUBLE_MAX_EXP, DOUBLE_MAX_FRACTION),
DoubleGen.make_from(1, DOUBLE_MIN_EXP, DOUBLE_MAX_FRACTION),
DoubleGen.make_from(0, DOUBLE_MIN_EXP, DOUBLE_MAX_FRACTION),
0.0, 1.0, -1.0, float('inf'), float('-inf'), float('nan'),
NEG_DOUBLE_NAN_MAX_VALUE
]

all_gen = [StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(),
pytest.param(FloatGen(special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), marks=[incompat]), pytest.param(DoubleGen(special_cases=double_special_cases), marks=[incompat]), BooleanGen(), DateGen(), TimestampGen()]

all_gen_filters = [(StringGen(), "rlike(a, '^(?=.{1,5}$).*')"),
(ByteGen(), "a < 100"),
(ShortGen(), "a < 100"),
(IntegerGen(), "a < 1000"),
(LongGen(), "a < 1000"),
(BooleanGen(), "a == false"),
(DateGen(), "a > '1/21/2012'"),
(TimestampGen(), "a > '1/21/2012'"),
pytest.param((FloatGen(special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), "a < 1000"), marks=[incompat]),
pytest.param((DoubleGen(special_cases=double_special_cases),"a < 1000"), marks=[incompat])]

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
@ignore_order
def test_cache_join(data_gen, join_type):
if data_gen.data_type == BooleanType():
pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350")
revans2 marked this conversation as resolved.
Show resolved Hide resolved

def do_join(spark):
left, right = create_df(spark, data_gen, 500, 500)
cached = left.join(right, left.a == right.r_a, join_type).cache()
cached.count() # populates cache
return cached

assert_gpu_and_cpu_are_equal_collect(do_join)
revans2 marked this conversation as resolved.
Show resolved Hide resolved

@pytest.mark.parametrize('data_gen', all_gen_filters, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
# We are OK running everything on CPU until we complete 'https://github.com/NVIDIA/spark-rapids/issues/360'
# because we have an explicit check in our code that disallows InMemoryTableScan to have anything other than
# AttributeReference
@allow_non_gpu(any=True)
@ignore_order
def test_cached_join_filter(data_gen, join_type):
data, filter = data_gen
revans2 marked this conversation as resolved.
Show resolved Hide resolved
if data.data_type == BooleanType():
pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350")

def do_join(spark):
left, right = create_df(spark, data, 500, 500)
cached = left.join(right, left.a == right.r_a, join_type).cache()
cached.count() #populates the cache
return cached.filter(filter)

assert_gpu_and_cpu_are_equal_collect(do_join)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
@ignore_order
def test_cache_broadcast_hash_join(data_gen, join_type):
if data_gen.data_type == BooleanType():
pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350")

def do_join(spark):
left, right = create_df(spark, data_gen, 500, 500)
cached = left.join(right.hint("broadcast"), left.a == right.r_a, join_type).cache()
cached.count()
return cached

assert_gpu_and_cpu_are_equal_collect(do_join)

shuffled_conf = {"spark.sql.autoBroadcastJoinThreshold": "160",
"spark.sql.join.preferSortMergeJoin": "false",
"spark.sql.shuffle.partitions": "2",
"spark.rapids.sql.exec.BroadcastNestedLoopJoinExec": "true"}

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
@ignore_order
def test_cache_shuffled_hash_join(data_gen, join_type):
if data_gen.data_type == BooleanType():
pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350")

def do_join(spark):
left, right = create_df(spark, data_gen, 50, 500)
cached = left.join(right, left.a == right.r_a, join_type).cache()
cached.count()
return cached
assert_gpu_and_cpu_are_equal_collect(do_join)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
@ignore_order
def test_cache_broadcast_nested_loop_join(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
cached = left.crossJoin(right.hint("broadcast")).cache()
cached.count()
return cached

assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.rapids.sql.exec.BroadcastNestedLoopJoinExec': 'true'})

all_gen_restricting_dates = [StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(),
pytest.param(FloatGen(special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), marks=[incompat]),
pytest.param(DoubleGen(special_cases=double_special_cases), marks=[incompat]),
BooleanGen(),
# due to backward compatibility we are avoiding writing dates prior to 1582-10-15
# For more detail please look at SPARK-31404
# This issue is tracked by https://github.com/NVIDIA/spark-rapids/issues/133 in the plugin
DateGen(start=date(1582, 10, 15)),
TimestampGen()]

@pytest.mark.parametrize('data_gen', all_gen_restricting_dates, ids=idfn)
@allow_non_gpu('InMemoryTableScanExec', 'DataWritingCommandExec')
def test_cache_posexplode_makearray(spark_tmp_path, data_gen):
if data_gen.data_type == BooleanType():
pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350")
data_path_cpu = spark_tmp_path + '/PARQUET_DATA_CPU'
data_path_gpu = spark_tmp_path + '/PARQUET_DATA_GPU'
def write_posExplode(data_path):
def posExplode(spark):
cached = four_op_df(spark, data_gen).selectExpr('posexplode(array(b, c, d))', 'a').cache()
cached.count()
cached.write.parquet(data_path)
spark.read.parquet(data_path)
return posExplode
from_cpu = with_cpu_session(write_posExplode(data_path_cpu))
from_gpu = with_gpu_session(write_posExplode(data_path_gpu))
assert_equal(from_cpu, from_gpu)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@ignore_order
def test_cache_expand_exec(data_gen):
def op_df(spark, length=2048, seed=0):
cached = gen_df(spark, StructGen([
('a', data_gen),
('b', IntegerGen())], nullable=False), length=length, seed=seed).cache()
cached.count() # populate the cache
return cached.rollup(f.col("a"), f.col("b")).agg(f.count(f.col("b")))

assert_gpu_and_cpu_are_equal_collect(op_df)

14 changes: 8 additions & 6 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,10 @@ def __init__(self, min_exp=DOUBLE_MIN_EXP, max_exp=DOUBLE_MAX_EXP, no_nans=False
self._use_full_range = (self._min_exp == DOUBLE_MIN_EXP) and (self._max_exp == DOUBLE_MAX_EXP)
if special_cases is None:
special_cases = [
self._make_from(1, self._max_exp, DOUBLE_MAX_FRACTION),
self._make_from(0, self._max_exp, DOUBLE_MAX_FRACTION),
self._make_from(1, self._min_exp, DOUBLE_MAX_FRACTION),
self._make_from(0, self._min_exp, DOUBLE_MAX_FRACTION)
self.make_from(1, self._max_exp, DOUBLE_MAX_FRACTION),
self.make_from(0, self._max_exp, DOUBLE_MAX_FRACTION),
self.make_from(1, self._min_exp, DOUBLE_MAX_FRACTION),
self.make_from(0, self._min_exp, DOUBLE_MAX_FRACTION)
]
if self._min_exp <= 0 and self._max_exp >= 0:
special_cases.append(0.0)
Expand All @@ -312,7 +312,7 @@ def __init__(self, min_exp=DOUBLE_MIN_EXP, max_exp=DOUBLE_MAX_EXP, no_nans=False
super().__init__(DoubleType(), nullable=nullable, special_cases=special_cases)

@staticmethod
def _make_from(sign, exp, fraction):
def make_from(sign, exp, fraction):
sign = sign & 1 # 1 bit
exp = (exp + 1023) & 0x7FF # add bias and 11 bits
fraction = fraction & DOUBLE_MAX_FRACTION
Expand All @@ -338,7 +338,7 @@ def gen_part_double():
sign = rand.getrandbits(1)
exp = rand.randint(self._min_exp, self._max_exp)
fraction = rand.getrandbits(52)
return self._fixup_nans(self._make_from(sign, exp, fraction))
return self._fixup_nans(self.make_from(sign, exp, fraction))
self._start(rand, gen_part_double)

class BooleanGen(DataGen):
Expand Down Expand Up @@ -416,6 +416,8 @@ def _guess_leap_year(t):
y = int(math.ceil(t/4.0)) * 4
if ((y % 100) == 0) and ((y % 400) != 0):
y = y + 4
if (y == 10000):
y = y - 4
return y

_epoch = date(1970, 1, 1)
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/src/main/python/spark_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ def _set_all_confs(conf):
def reset_spark_session_conf():
"""Reset all of the configs for a given spark session."""
_set_all_confs(_orig_conf)
#We should clear the cache
spark.catalog.clearCache()
revans2 marked this conversation as resolved.
Show resolved Hide resolved
# Have to reach into a private member to get access to the API we need
current_keys = _from_scala_map(spark.conf._jconf.getAll()).keys()
for key in current_keys:
Expand Down