Skip to content

Commit

Permalink
Test utility to compare SQL query results between CPU and GPU (NVIDIA…
Browse files Browse the repository at this point in the history
…#383)

* Test utility to compare SQL query results between CPU and GPU
  • Loading branch information
mythrocks authored Jul 17, 2020
1 parent 2c8f12d commit f4abe2e
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 106 deletions.
15 changes: 15 additions & 0 deletions integration_tests/src/main/python/asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,3 +299,18 @@ def assert_gpu_and_cpu_are_equal_iterator(func, conf={}):
so any amount of data can work, just be careful about how long it might take.
"""
_assert_gpu_and_cpu_are_equal(func, False, conf=conf)


def assert_gpu_and_cpu_are_equal_sql(df, tableName, sql, conf=None):
"""
Assert that the specified SQL query produces equal results on CPU and GPU.
:param df: Input dataframe
:param tableName: Name of table to be created with the dataframe
:param sql: SQL query to be run on the specified table
:param conf: Any user-specified confs. Empty by default.
:return: Assertion failure, if results from CPU and GPU do not match.
"""
if conf is None:
conf = {}
df.createOrReplaceTempView(tableName)
assert_gpu_and_cpu_are_equal_collect(lambda spark: spark.sql(sql), conf)
76 changes: 33 additions & 43 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_are_equal_sql
from data_gen import *
from pyspark.sql.types import *
from marks import *
Expand Down Expand Up @@ -275,17 +275,15 @@ def test_hash_count_with_filter(data_gen, conf):
@pytest.mark.parametrize('data_gen', _init_list_no_nans, ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_hash_multiple_filters(data_gen, conf):
df = with_cpu_session(
lambda spark : gen_df(spark, data_gen, length=100))
df.createOrReplaceTempView("hash_agg_table")
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.sql(
'select count(a) filter (where c > 50),' +
'count(b) filter (where c > 100),' +
# Uncomment after https://github.com/NVIDIA/spark-rapids/issues/155 is fixed
# 'avg(b) filter (where b > 20),' +
'min(a), max(b) filter (where c > 250) from hash_agg_table group by a'),
conf=conf)
assert_gpu_and_cpu_are_equal_sql(
with_cpu_session(lambda spark : gen_df(spark, data_gen, length=100)),
"hash_agg_table",
'select count(a) filter (where c > 50),' +
'count(b) filter (where c > 100),' +
# Uncomment after https://github.com/NVIDIA/spark-rapids/issues/155 is fixed
# 'avg(b) filter (where b > 20),' +
'min(a), max(b) filter (where c > 250) from hash_agg_table group by a',
conf)


@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/155')
Expand All @@ -297,13 +295,11 @@ def test_hash_multiple_filters(data_gen, conf):
'EqualTo', 'First', 'SortAggregateExec', 'Coalesce')
@pytest.mark.parametrize('data_gen', [_longs_with_nulls], ids=idfn)
def test_hash_multiple_filters_fail(data_gen):
df = with_cpu_session(
lambda spark : gen_df(spark, data_gen, length=100))
df.createOrReplaceTempView("hash_agg_table")
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.sql(
'select avg(b) filter (where b > 20) from hash_agg_table group by a'),
conf=_no_nans_float_conf_partial)
assert_gpu_and_cpu_are_equal_sql(
with_cpu_session(lambda spark : gen_df(spark, data_gen, length=100)),
"hash_agg_table",
'select avg(b) filter (where b > 20) from hash_agg_table group by a',
_no_nans_float_conf_partial)


@ignore_order
Expand All @@ -321,21 +317,19 @@ def test_hash_query_max_bug(data_gen):
@pytest.mark.parametrize('data_gen', [_grpkey_floats_with_nan_zero_grouping_keys,
_grpkey_doubles_with_nan_zero_grouping_keys], ids=idfn)
def test_hash_agg_with_nan_keys(data_gen):
df = with_cpu_session(
lambda spark : gen_df(spark, data_gen, length=1024))
df.createOrReplaceTempView("hash_agg_table")
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.sql(
'select a, '
'count(*) as count_stars, '
'count(b) as count_bees, '
'sum(b) as sum_of_bees, '
'max(c) as max_seas, '
'min(c) as min_seas, '
'count(distinct c) as count_distinct_cees, '
'avg(c) as average_seas '
'from hash_agg_table group by a'),
conf=_no_nans_float_conf)
assert_gpu_and_cpu_are_equal_sql(
with_cpu_session(lambda spark : gen_df(spark, data_gen, length=1024)),
"hash_agg_table",
'select a, '
'count(*) as count_stars, '
'count(b) as count_bees, '
'sum(b) as sum_of_bees, '
'max(c) as max_seas, '
'min(c) as min_seas, '
'count(distinct c) as count_distinct_cees, '
'avg(c) as average_seas '
'from hash_agg_table group by a',
_no_nans_float_conf)


@pytest.mark.xfail(
Expand All @@ -347,15 +341,11 @@ def test_hash_agg_with_nan_keys(data_gen):
@ignore_order
@pytest.mark.parametrize('data_gen', [ _grpkey_doubles_with_nan_zero_grouping_keys], ids=idfn)
def test_count_distinct_with_nan_floats(data_gen):
df = with_cpu_session(
lambda spark : gen_df(spark, data_gen, length=1024))
df.createOrReplaceTempView("hash_agg_table")
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.sql(
'select a, '
'count(distinct b) as count_distinct_bees '
'from hash_agg_table group by a'),
conf=_no_nans_float_conf)
assert_gpu_and_cpu_are_equal_sql(
with_cpu_session(lambda spark : gen_df(spark, data_gen, length=1024)),
"hash_agg_table",
'select a, count(distinct b) as count_distinct_bees from hash_agg_table group by a',
_no_nans_float_conf)

# TODO: Literal tests
# TODO: First and Last tests
120 changes: 57 additions & 63 deletions integration_tests/src/main/python/window_function_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_are_equal_sql
from data_gen import *
from pyspark.sql.types import *
from marks import *
Expand Down Expand Up @@ -47,23 +47,21 @@
_grpkey_longs_with_timestamps,
_grpkey_longs_with_nullable_timestamps], ids=idfn)
def test_window_aggs_for_rows(data_gen):
df = with_cpu_session(
lambda spark : gen_df(spark, data_gen, length=2048))
df.createOrReplaceTempView("window_agg_table")
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.sql(
'select '
' sum(c) over '
' (partition by a order by b,c asc rows between 1 preceding and 1 following) as sum_c_asc, '
' max(c) over '
' (partition by a order by b desc, c desc rows between 2 preceding and 1 following) as max_c_desc, '
' min(c) over '
' (partition by a order by b,c rows between 2 preceding and current row) as min_c_asc, '
' count(1) over '
' (partition by a order by b,c rows between UNBOUNDED preceding and UNBOUNDED following) as count_1, '
' row_number() over '
' (partition by a order by b,c rows between UNBOUNDED preceding and CURRENT ROW) as row_num '
'from window_agg_table '))
assert_gpu_and_cpu_are_equal_sql(
with_cpu_session(lambda spark : gen_df(spark, data_gen, length=2048)),
"window_agg_table",
'select '
' sum(c) over '
' (partition by a order by b,c asc rows between 1 preceding and 1 following) as sum_c_asc, '
' max(c) over '
' (partition by a order by b desc, c desc rows between 2 preceding and 1 following) as max_c_desc, '
' min(c) over '
' (partition by a order by b,c rows between 2 preceding and current row) as min_c_asc, '
' count(1) over '
' (partition by a order by b,c rows between UNBOUNDED preceding and UNBOUNDED following) as count_1, '
' row_number() over '
' (partition by a order by b,c rows between UNBOUNDED preceding and CURRENT ROW) as row_num '
'from window_agg_table ')


# Test for RANGE queries, with timestamp order-by expressions.
Expand All @@ -73,62 +71,58 @@ def test_window_aggs_for_rows(data_gen):
@pytest.mark.parametrize('data_gen', [_grpkey_longs_with_timestamps,
_grpkey_longs_with_nullable_timestamps], ids=idfn)
def test_window_aggs_for_ranges(data_gen):
df = with_cpu_session(
lambda spark : gen_df(spark, data_gen, length=2048))
df.createOrReplaceTempView("window_agg_table")
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.sql(
'select '
' sum(c) over '
' (partition by a order by cast(b as timestamp) asc '
' range between interval 1 day preceding and interval 1 day following) as sum_c_asc, '
' max(c) over '
' (partition by a order by cast(b as timestamp) desc '
' range between interval 2 days preceding and interval 1 days following) as max_c_desc, '
' min(c) over '
' (partition by a order by cast(b as timestamp) asc '
' range between interval 2 days preceding and current row) as min_c_asc, '
' count(1) over '
' (partition by a order by cast(b as timestamp) asc '
' range between CURRENT ROW and UNBOUNDED following) as count_1_asc, '
' sum(c) over '
' (partition by a order by cast(b as timestamp) asc '
' range between UNBOUNDED preceding and CURRENT ROW) as sum_c_unbounded, '
' max(c) over '
' (partition by a order by cast(b as timestamp) asc '
' range between UNBOUNDED preceding and UNBOUNDED following) as max_c_unbounded '
'from window_agg_table'))
assert_gpu_and_cpu_are_equal_sql(
with_cpu_session(lambda spark: gen_df(spark, data_gen, length=2048)),
"window_agg_table",
'select '
' sum(c) over '
' (partition by a order by cast(b as timestamp) asc '
' range between interval 1 day preceding and interval 1 day following) as sum_c_asc, '
' max(c) over '
' (partition by a order by cast(b as timestamp) desc '
' range between interval 2 days preceding and interval 1 days following) as max_c_desc, '
' min(c) over '
' (partition by a order by cast(b as timestamp) asc '
' range between interval 2 days preceding and current row) as min_c_asc, '
' count(1) over '
' (partition by a order by cast(b as timestamp) asc '
' range between CURRENT ROW and UNBOUNDED following) as count_1_asc, '
' sum(c) over '
' (partition by a order by cast(b as timestamp) asc '
' range between UNBOUNDED preceding and CURRENT ROW) as sum_c_unbounded, '
' max(c) over '
' (partition by a order by cast(b as timestamp) asc '
' range between UNBOUNDED preceding and UNBOUNDED following) as max_c_unbounded '
'from window_agg_table')


@pytest.mark.xfail(reason="[UNSUPPORTED] Ranges over non-timestamp columns "
"(https://github.com/NVIDIA/spark-rapids/issues/216)")
@ignore_order
@pytest.mark.parametrize('data_gen', [_grpkey_longs_with_timestamps], ids=idfn)
def test_window_aggs_for_ranges_of_dates(data_gen):
df = with_cpu_session(
lambda spark : gen_df(spark, data_gen, length=2048))
df.createOrReplaceTempView("window_agg_table")
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.sql(
'select '
' sum(c) over '
' (partition by a order by b asc '
' range between 1 preceding and 1 following) as sum_c_asc '
'from window_agg_table'))
assert_gpu_and_cpu_are_equal_sql(
with_cpu_session(lambda spark: gen_df(spark, data_gen, length=2048)),
"window_agg_table",
'select '
' sum(c) over '
' (partition by a order by b asc '
' range between 1 preceding and 1 following) as sum_c_asc '
'from window_agg_table'
)


@pytest.mark.xfail(reason="[BUG] `COUNT(x)` should not count null values of `x` "
"(https://github.com/NVIDIA/spark-rapids/issues/218)")
@ignore_order
@pytest.mark.parametrize('data_gen', [_grpkey_longs_with_no_nulls], ids=idfn)
def test_window_aggs_for_rows_count_non_null(data_gen):
df = with_cpu_session(
lambda spark : gen_df(spark, data_gen, length=2048))
df.createOrReplaceTempView("window_agg_table")
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.sql(
'select '
' count(c) over '
' (partition by a order by b,c '
' rows between UNBOUNDED preceding and UNBOUNDED following) as count_non_null '
'from window_agg_table '))
assert_gpu_and_cpu_are_equal_sql(
with_cpu_session(lambda spark: gen_df(spark, data_gen, length=2048)),
"window_agg_table",
'select '
' count(c) over '
' (partition by a order by b,c '
' rows between UNBOUNDED preceding and UNBOUNDED following) as count_non_null '
'from window_agg_table '
)

0 comments on commit f4abe2e

Please sign in to comment.