Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Updated join tests for cache #286

Merged
merged 17 commits into from
Jul 21, 2020
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 187 additions & 1 deletion integration_tests/src/main/python/cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_equal
from data_gen import *
import pyspark.sql.functions as f
from spark_session import with_cpu_session, with_gpu_session
from join_test import create_df
from generate_expr_test import four_op_df
from marks import incompat, allow_non_gpu, ignore_order
from pyspark.sql.functions import asc

def test_passing_gpuExpr_as_Expr():
assert_gpu_and_cpu_are_equal_collect(
Expand All @@ -28,3 +33,184 @@ def test_passing_gpuExpr_as_Expr():
.cache()
.limit(50)
)
#creating special cases to just remove -0.0
revans2 marked this conversation as resolved.
Show resolved Hide resolved
double_special_cases = [
DoubleGen._make_from(1, DOUBLE_MAX_EXP, DOUBLE_MAX_FRACTION),
revans2 marked this conversation as resolved.
Show resolved Hide resolved
DoubleGen._make_from(0, DOUBLE_MAX_EXP, DOUBLE_MAX_FRACTION),
DoubleGen._make_from(1, DOUBLE_MIN_EXP, DOUBLE_MAX_FRACTION),
DoubleGen._make_from(0, DOUBLE_MIN_EXP, DOUBLE_MAX_FRACTION),
0.0, 1.0, -1.0, float('inf'), float('-inf'), float('nan'),
NEG_DOUBLE_NAN_MAX_VALUE
]

all_gen = [StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(),
pytest.param(FloatGen(special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), marks=[incompat]), pytest.param(DoubleGen(special_cases=double_special_cases), marks=[incompat]), BooleanGen(), DateGen(), TimestampGen()]

all_gen_filters = [(StringGen(), "rlike(a, '^(?=.{1,5}$).*')"),
(ByteGen(), "a < 100"),
(ShortGen(), "a < 100"),
(IntegerGen(), "a < 1000"),
(LongGen(), "a < 1000"),
(BooleanGen(), "a == false"),
(DateGen(), "a > '1/21/2012'"),
(TimestampGen(), "a > '1/21/2012'"),
pytest.param((FloatGen(special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), "a < 1000"), marks=[incompat]),
pytest.param((DoubleGen(special_cases=double_special_cases),"a < 1000"), marks=[incompat])]

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
def test_cache_join(data_gen, join_type):
if data_gen.data_type == BooleanType():
pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350")
revans2 marked this conversation as resolved.
Show resolved Hide resolved
from pyspark.sql.functions import asc
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 500)
return left.join(right, left.a == right.r_a, join_type).cache()
cached_df_cpu = with_cpu_session(do_join)
if (join_type == 'LeftAnti' or join_type == 'LeftSemi'):
sort = [asc("a"), asc("b")]
else:
sort = [asc("a"), asc("b"), asc("r_a"), asc("r_b")]

revans2 marked this conversation as resolved.
Show resolved Hide resolved
from_cpu = cached_df_cpu.sort(sort).collect()
cached_df_gpu = with_gpu_session(do_join)
from_gpu = cached_df_gpu.sort(sort).collect()
assert_equal(from_cpu, from_gpu)


@pytest.mark.parametrize('data_gen', all_gen_filters, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
# We are OK running everything on CPU until we complete 'git issue'
revans2 marked this conversation as resolved.
Show resolved Hide resolved
# because we have an explicit check in our code that disallows InMemoryTableScan to have anything other than
# AttributeReference
@allow_non_gpu(any=True)
def test_cached_join_filter(data_gen, join_type):
from pyspark.sql.functions import asc
data, filter = data_gen
revans2 marked this conversation as resolved.
Show resolved Hide resolved
if data.data_type == BooleanType():
pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350")
def do_join(spark):
left, right = create_df(spark, data, 500, 500)
return left.join(right, left.a == right.r_a, join_type).cache()
cached_df_cpu = with_cpu_session(do_join)
if (join_type == 'LeftAnti' or join_type == 'LeftSemi'):
sort_columns = [asc("a"), asc("b")]
else:
sort_columns = [asc("a"), asc("b"), asc("r_a"), asc("r_b")]

join_from_cpu = cached_df_cpu.sort(sort_columns).collect()
filter_from_cpu = cached_df_cpu.filter(filter).sort(sort_columns).collect()
revans2 marked this conversation as resolved.
Show resolved Hide resolved

cached_df_gpu = with_gpu_session(do_join)
join_from_gpu = cached_df_gpu.sort(sort_columns).collect()
filter_from_gpu = cached_df_gpu.filter(filter).sort(sort_columns).collect()

assert_equal(join_from_cpu, join_from_gpu)
assert_equal(filter_from_cpu, filter_from_gpu)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
def test_cache_broadcast_hash_join(data_gen, join_type):
if data_gen.data_type == BooleanType():
pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350")
from pyspark.sql.functions import asc
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 500)
return left.join(right.hint("broadcast"), left.a == right.r_a, join_type).cache()
cached_df_cpu = with_cpu_session(do_join)
if (join_type == 'LeftAnti' or join_type == 'LeftSemi'):
sort = [asc("a"), asc("b")]
else:
sort = [asc("a"), asc("b"), asc("r_a"), asc("r_b")]

from_cpu = cached_df_cpu.sort(sort).collect()
cached_df_gpu = with_gpu_session(do_join)
from_gpu = cached_df_gpu.sort(sort).collect()
assert_equal(from_cpu, from_gpu)


shuffled_conf = {"spark.sql.autoBroadcastJoinThreshold": "160",
"spark.sql.join.preferSortMergeJoin": "false",
"spark.sql.shuffle.partitions": "2",
"spark.rapids.sql.exec.BroadcastNestedLoopJoinExec": "true"}

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
def test_cache_shuffled_hash_join(data_gen, join_type):
if data_gen.data_type == BooleanType():
pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350")
from pyspark.sql.functions import asc
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 500)
return left.join(right, left.a == right.r_a, join_type).cache()
cached_df_cpu = with_cpu_session(do_join, shuffled_conf)
if (join_type == 'LeftAnti' or join_type == 'LeftSemi'):
sort = [asc("a"), asc("b")]
else:
sort = [asc("a"), asc("b"), asc("r_a"), asc("r_b")]

from_cpu = cached_df_cpu.sort(sort).collect()
cached_df_gpu = with_gpu_session(do_join, shuffled_conf)
from_gpu = cached_df_gpu.sort(sort).collect()
assert_equal(from_cpu, from_gpu)


@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
@pytest.mark.skip(reason="this isn't calling the broadcastNestedLoopJoin, come back to it")
revans2 marked this conversation as resolved.
Show resolved Hide resolved
def test_cache_broadcast_nested_loop_join(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 50)
return left.join(right, left.a == left.a, join_type).cache()
cached_df_cpu = with_cpu_session(do_join, shuffled_conf)
if (join_type == 'LeftAnti' or join_type == 'LeftSemi'):
sort = [asc("a"), asc("b")]
else:
sort = [asc("a"), asc("b"), asc("r_a"), asc("r_b")]

from_cpu = cached_df_cpu.sort(sort).collect()
cached_df_gpu = with_gpu_session(do_join, shuffled_conf)
from_gpu = cached_df_gpu.sort(sort).collect()
assert_equal(from_cpu, from_gpu)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't actually sorting locally. Instead you removed -0.0 to avoid this issue.

#This is a copy of a test from generate_expr_test.py except for the fact that we are caching the df
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@allow_non_gpu('InMemoryTableScanExec', 'DataWritingCommandExec')
def test_cache_posexplode_makearray(spark_tmp_path, data_gen):
if data_gen.data_type == BooleanType():
pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350")
data_path_cpu = spark_tmp_path + '/PARQUET_DATA_CPU'
def posExplode(spark):
return four_op_df(spark, data_gen).selectExpr('posexplode(array(b, c, d))', 'a').cache()
cached_df_cpu = with_cpu_session(posExplode)
cached_df_cpu.write.parquet(data_path_cpu)
revans2 marked this conversation as resolved.
Show resolved Hide resolved
from_cpu = with_cpu_session(lambda spark: spark.read.parquet(data_path_cpu))

data_path_gpu = spark_tmp_path + '/PARQUET_DATA_GPU'
cached_df_gpu = with_gpu_session(posExplode)
cached_df_gpu.write.parquet(data_path_gpu)
from_gpu = with_gpu_session(lambda spark: spark.read.parquet(data_path_gpu))

sort_col = [asc("pos"), asc("col"), asc("a")]
assert_equal(cached_df_cpu.sort(sort_col).collect(), cached_df_gpu.sort(sort_col).collect())
assert_equal(from_cpu.sort(sort_col).collect(), from_gpu.sort(sort_col).collect())

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
def test_cache_expand_exec(data_gen):
def op_df(spark, length=2048, seed=0):
return gen_df(spark, StructGen([
('a', data_gen),
('b', IntegerGen())], nullable=False), length=length, seed=seed)

cached_df_cpu = with_cpu_session(op_df).cache()
from_cpu = with_cpu_session(lambda spark: cached_df_cpu.rollup(f.col("a"), f.col("b")).agg(f.count(f.col("b"))))

cached_df_gpu = with_gpu_session(op_df).cache()
from_gpu = with_cpu_session(lambda spark: cached_df_gpu.rollup(f.col("a"), f.col("b")).agg(f.count(f.col("b"))))

sort_col = [asc("a"), asc("b"), asc("count(b)")]
assert_equal(cached_df_cpu.sort(asc("a"), asc("b")).collect(), cached_df_gpu.sort(asc("a"), asc("b")).collect())
assert_equal(from_cpu.sort(sort_col).collect(), from_gpu.sort(sort_col).collect())


2 changes: 2 additions & 0 deletions integration_tests/src/main/python/spark_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ def _set_all_confs(conf):
def reset_spark_session_conf():
"""Reset all of the configs for a given spark session."""
_set_all_confs(_orig_conf)
#We should clear the cache
spark.catalog.clearCache()
revans2 marked this conversation as resolved.
Show resolved Hide resolved
# Have to reach into a private member to get access to the API we need
current_keys = _from_scala_map(spark.conf._jconf.getAll()).keys()
for key in current_keys:
Expand Down