Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Updated join tests for cache #286

Merged
merged 17 commits into from
Jul 21, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 64 additions & 1 deletion integration_tests/src/main/python/cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_equal
from data_gen import *
import pyspark.sql.functions as f
from spark_session import with_cpu_session, with_gpu_session
from join_test import create_df
from marks import incompat, allow_non_gpu
from join_test import all_gen_no_nulls

def test_passing_gpuExpr_as_Expr():
assert_gpu_and_cpu_are_equal_collect(
Expand All @@ -28,3 +32,62 @@ def test_passing_gpuExpr_as_Expr():
.cache()
.limit(50)
)

conf={"spark.rapids.sql.explain":"ALL"}
@pytest.mark.xfail(reason="TODO: github issue")
revans2 marked this conversation as resolved.
Show resolved Hide resolved
@pytest.mark.parametrize('data_gen', all_gen_no_nulls, ids=idfn)
revans2 marked this conversation as resolved.
Show resolved Hide resolved
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
def test_cache_join(data_gen, join_type):
from pyspark.sql.functions import asc
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 500)
return left.join(right, left.a == right.r_a, join_type).cache()
cached_df_cpu = with_cpu_session(do_join, conf)
if (join_type == 'LeftAnti' or join_type == 'LeftSemi'):
sort = [asc("a"), asc("b")]
else:
sort = [asc("a"), asc("b"), asc("r_a"), asc("r_b")]

revans2 marked this conversation as resolved.
Show resolved Hide resolved
from_cpu = cached_df_cpu.sort(sort).collect()
print('COLLECTED\n{}'.format(from_cpu))
cached_df_gpu = with_gpu_session(do_join, conf)
from_gpu = cached_df_gpu.sort(sort).collect()
print('COLLECTED\n{}'.format(from_gpu))
assert_equal(from_cpu, from_gpu)

all_gen_no_nulls_filters = [(StringGen(nullable=False), "rlike(a, '^(?=.{1,5}$).*')"),
(ByteGen(nullable=False), "a < 100"),
(ShortGen(nullable=False), "a < 100"),
(IntegerGen(nullable=False), "a < 1000"),
(LongGen(nullable=False), "a < 1000"),
(BooleanGen(nullable=False), "a == false"),
(DateGen(nullable=False), "a > '1/21/2012'"),
(TimestampGen(nullable=False), "a > '1/21/2012'"),
pytest.param((FloatGen(nullable=False), "a < 1000"), marks=[incompat]),
pytest.param((DoubleGen(nullable=False),"a < 1000"), marks=[incompat])]

@pytest.mark.xfail(reason="TODO: github issue")
revans2 marked this conversation as resolved.
Show resolved Hide resolved
@pytest.mark.parametrize('data_gen', all_gen_no_nulls_filters, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
@allow_non_gpu('InMemoryTableScanExec', 'RDDScanExec')
def test_cached_join_filter(data_gen, join_type):
from pyspark.sql.functions import asc
data, filter = data_gen
revans2 marked this conversation as resolved.
Show resolved Hide resolved
def do_join(spark):
left, right = create_df(spark, data, 500, 500)
return left.join(right, left.a == right.r_a, join_type).cache()
cached_df_cpu = with_cpu_session(do_join, conf)
if (join_type == 'LeftAnti' or join_type == 'LeftSemi'):
sort_columns = [asc("a"), asc("b")]
else:
sort_columns = [asc("a"), asc("b"), asc("r_a"), asc("r_b")]

join_from_cpu = cached_df_cpu.sort(sort_columns).collect()
filter_from_cpu = cached_df_cpu.filter(filter).sort(sort_columns).collect()
revans2 marked this conversation as resolved.
Show resolved Hide resolved

cached_df_gpu = with_gpu_session(do_join, conf)
join_from_gpu = cached_df_gpu.sort(sort_columns).collect()
filter_from_gpu = cached_df_gpu.filter(filter).sort(sort_columns).collect()

assert_equal(join_from_cpu, join_from_gpu)
assert_equal(filter_from_cpu, filter_from_gpu)
2 changes: 2 additions & 0 deletions integration_tests/src/main/python/spark_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ def _set_all_confs(conf):
def reset_spark_session_conf():
"""Reset all of the configs for a given spark session."""
_set_all_confs(_orig_conf)
#We should clear the cache
spark.catalog.clearCache()
revans2 marked this conversation as resolved.
Show resolved Hide resolved
# Have to reach into a private member to get access to the API we need
current_keys = _from_scala_map(spark.conf._jconf.getAll()).keys()
for key in current_keys:
Expand Down