Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Updated join tests for cache #286

Merged
merged 17 commits into from
Jul 21, 2020
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 166 additions & 1 deletion integration_tests/src/main/python/cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_equal
from data_gen import *
import pyspark.sql.functions as f
from spark_session import with_cpu_session, with_gpu_session
from join_test import create_df
from generate_expr_test import four_op_df
from marks import incompat, allow_non_gpu, ignore_order
from pyspark.sql.functions import asc

def test_passing_gpuExpr_as_Expr():
assert_gpu_and_cpu_are_equal_collect(
Expand All @@ -28,3 +33,163 @@ def test_passing_gpuExpr_as_Expr():
.cache()
.limit(50)
)
#creating special cases to just remove -0.0
revans2 marked this conversation as resolved.
Show resolved Hide resolved
double_special_cases = [
DoubleGen.make_from(1, DOUBLE_MAX_EXP, DOUBLE_MAX_FRACTION),
DoubleGen.make_from(0, DOUBLE_MAX_EXP, DOUBLE_MAX_FRACTION),
DoubleGen.make_from(1, DOUBLE_MIN_EXP, DOUBLE_MAX_FRACTION),
DoubleGen.make_from(0, DOUBLE_MIN_EXP, DOUBLE_MAX_FRACTION),
0.0, 1.0, -1.0, float('inf'), float('-inf'), float('nan'),
NEG_DOUBLE_NAN_MAX_VALUE
]

all_gen = [StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(),
pytest.param(FloatGen(special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), marks=[incompat]), pytest.param(DoubleGen(special_cases=double_special_cases), marks=[incompat]), BooleanGen(), DateGen(), TimestampGen()]

all_gen_filters = [(StringGen(), "rlike(a, '^(?=.{1,5}$).*')"),
(ByteGen(), "a < 100"),
(ShortGen(), "a < 100"),
(IntegerGen(), "a < 1000"),
(LongGen(), "a < 1000"),
(BooleanGen(), "a == false"),
(DateGen(), "a > '1/21/2012'"),
(TimestampGen(), "a > '1/21/2012'"),
pytest.param((FloatGen(special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), "a < 1000"), marks=[incompat]),
pytest.param((DoubleGen(special_cases=double_special_cases),"a < 1000"), marks=[incompat])]

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
def test_cache_join(data_gen, join_type):
if data_gen.data_type == BooleanType():
pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350")
revans2 marked this conversation as resolved.
Show resolved Hide resolved
from pyspark.sql.functions import asc
if (join_type == 'LeftAnti' or join_type == 'LeftSemi'):
sort = [asc("a"), asc("b")]
else:
sort = [asc("a"), asc("b"), asc("r_a"), asc("r_b")]

revans2 marked this conversation as resolved.
Show resolved Hide resolved
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 500)
cached = left.join(right, left.a == right.r_a, join_type).cache()
cached.count() # populates cache
return cached.sort(sort)

assert_gpu_and_cpu_are_equal_collect(do_join)
revans2 marked this conversation as resolved.
Show resolved Hide resolved

@pytest.mark.parametrize('data_gen', all_gen_filters, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
# We are OK running everything on CPU until we complete 'git issue'
revans2 marked this conversation as resolved.
Show resolved Hide resolved
# because we have an explicit check in our code that disallows InMemoryTableScan to have anything other than
# AttributeReference
@allow_non_gpu(any=True)
def test_cached_join_filter(data_gen, join_type):
from pyspark.sql.functions import asc
data, filter = data_gen
revans2 marked this conversation as resolved.
Show resolved Hide resolved
if data.data_type == BooleanType():
pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350")
if (join_type == 'LeftAnti' or join_type == 'LeftSemi'):
sort_columns = [asc("a"), asc("b")]
else:
sort_columns = [asc("a"), asc("b"), asc("r_a"), asc("r_b")]
def do_join(spark):
left, right = create_df(spark, data, 500, 500)
cached = left.join(right, left.a == right.r_a, join_type).cache()
cached.count() #populates the cache
return cached.sort(sort_columns)

assert_gpu_and_cpu_are_equal_collect(do_join)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
def test_cache_broadcast_hash_join(data_gen, join_type):
if data_gen.data_type == BooleanType():
pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350")
from pyspark.sql.functions import asc
if (join_type == 'LeftAnti' or join_type == 'LeftSemi'):
sort = [asc("a"), asc("b")]
else:
sort = [asc("a"), asc("b"), asc("r_a"), asc("r_b")]
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 500)
cached = left.join(right.hint("broadcast"), left.a == right.r_a, join_type).cache()
cached.count()
return cached.sort(sort)

assert_gpu_and_cpu_are_equal_collect(do_join)

shuffled_conf = {"spark.sql.autoBroadcastJoinThreshold": "160",
"spark.sql.join.preferSortMergeJoin": "false",
"spark.sql.shuffle.partitions": "2",
"spark.rapids.sql.exec.BroadcastNestedLoopJoinExec": "true"}

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
def test_cache_shuffled_hash_join(data_gen, join_type):
if data_gen.data_type == BooleanType():
pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350")
from pyspark.sql.functions import asc
if (join_type == 'LeftAnti' or join_type == 'LeftSemi'):
sort = [asc("a"), asc("b")]
else:
sort = [asc("a"), asc("b"), asc("r_a"), asc("r_b")]

def do_join(spark):
left, right = create_df(spark, data_gen, 50, 500)
cached = left.join(right, left.a == right.r_a, join_type).cache()
cached.count()
return cached.sort(sort)
assert_gpu_and_cpu_are_equal_collect(do_join)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
@pytest.mark.skip(reason="this isn't calling the broadcastNestedLoopJoin, come back to it")
revans2 marked this conversation as resolved.
Show resolved Hide resolved
def test_cache_broadcast_nested_loop_join(data_gen, join_type):
if (join_type == 'LeftAnti' or join_type == 'LeftSemi'):
sort = [asc("a"), asc("b")]
else:
sort = [asc("a"), asc("b"), asc("r_a"), asc("r_b")]

def do_join(spark):
left, right = create_df(spark, data_gen, 50, 50)
cached = left.join(right, left.a == left.a, join_type).cache()
cached.count()
return cached.sort(sort)

assert_gpu_and_cpu_are_equal_collect(do_join)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't actually sorting locally. Instead you removed -0.0 to avoid this issue.

#This is a copy of a test from generate_expr_test.py except for the fact that we are caching the df
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@allow_non_gpu('InMemoryTableScanExec', 'DataWritingCommandExec')
def test_cache_posexplode_makearray(spark_tmp_path, data_gen):
if data_gen.data_type == BooleanType():
pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350")
data_path_cpu = spark_tmp_path + '/PARQUET_DATA_CPU'
def posExplode(spark):
return four_op_df(spark, data_gen).selectExpr('posexplode(array(b, c, d))', 'a').cache()

cached_df_cpu = with_cpu_session(posExplode)
cached_df_cpu.write.parquet(data_path_cpu)
revans2 marked this conversation as resolved.
Show resolved Hide resolved
from_cpu = with_cpu_session(lambda spark: spark.read.parquet(data_path_cpu))

data_path_gpu = spark_tmp_path + '/PARQUET_DATA_GPU'
cached_df_gpu = with_gpu_session(posExplode)
cached_df_gpu.write.parquet(data_path_gpu)
from_gpu = with_gpu_session(lambda spark: spark.read.parquet(data_path_gpu))

sort_col = [asc("pos"), asc("col"), asc("a")]
assert_equal(cached_df_cpu.sort(sort_col).collect(), cached_df_gpu.sort(sort_col).collect())
assert_equal(from_cpu.sort(sort_col).collect(), from_gpu.sort(sort_col).collect())

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
def test_cache_expand_exec(data_gen):
sort_col = [asc("a"), asc("b"), asc("count(b)")]
def op_df(spark, length=2048, seed=0):
cached = gen_df(spark, StructGen([
('a', data_gen),
('b', IntegerGen())], nullable=False), length=length, seed=seed).cache()
cached.count() # populate the cache
return cached.rollup(f.col("a"), f.col("b")).agg(f.count(f.col("b"))).sort(sort_col)

assert_gpu_and_cpu_are_equal_collect(op_df)

12 changes: 6 additions & 6 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,10 @@ def __init__(self, min_exp=DOUBLE_MIN_EXP, max_exp=DOUBLE_MAX_EXP, no_nans=False
self._use_full_range = (self._min_exp == DOUBLE_MIN_EXP) and (self._max_exp == DOUBLE_MAX_EXP)
if special_cases is None:
special_cases = [
self._make_from(1, self._max_exp, DOUBLE_MAX_FRACTION),
self._make_from(0, self._max_exp, DOUBLE_MAX_FRACTION),
self._make_from(1, self._min_exp, DOUBLE_MAX_FRACTION),
self._make_from(0, self._min_exp, DOUBLE_MAX_FRACTION)
self.make_from(1, self._max_exp, DOUBLE_MAX_FRACTION),
self.make_from(0, self._max_exp, DOUBLE_MAX_FRACTION),
self.make_from(1, self._min_exp, DOUBLE_MAX_FRACTION),
self.make_from(0, self._min_exp, DOUBLE_MAX_FRACTION)
]
if self._min_exp <= 0 and self._max_exp >= 0:
special_cases.append(0.0)
Expand All @@ -312,7 +312,7 @@ def __init__(self, min_exp=DOUBLE_MIN_EXP, max_exp=DOUBLE_MAX_EXP, no_nans=False
super().__init__(DoubleType(), nullable=nullable, special_cases=special_cases)

@staticmethod
def _make_from(sign, exp, fraction):
def make_from(sign, exp, fraction):
sign = sign & 1 # 1 bit
exp = (exp + 1023) & 0x7FF # add bias and 11 bits
fraction = fraction & DOUBLE_MAX_FRACTION
Expand All @@ -338,7 +338,7 @@ def gen_part_double():
sign = rand.getrandbits(1)
exp = rand.randint(self._min_exp, self._max_exp)
fraction = rand.getrandbits(52)
return self._fixup_nans(self._make_from(sign, exp, fraction))
return self._fixup_nans(self.make_from(sign, exp, fraction))
self._start(rand, gen_part_double)

class BooleanGen(DataGen):
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/src/main/python/spark_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ def _set_all_confs(conf):
def reset_spark_session_conf():
"""Reset all of the configs for a given spark session."""
_set_all_confs(_orig_conf)
#We should clear the cache
spark.catalog.clearCache()
revans2 marked this conversation as resolved.
Show resolved Hide resolved
# Have to reach into a private member to get access to the API we need
current_keys = _from_scala_map(spark.conf._jconf.getAll()).keys()
for key in current_keys:
Expand Down