Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test utility to compare SQL query results between CPU and GPU #383

Merged
merged 2 commits into from
Jul 17, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions integration_tests/src/main/python/asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,3 +299,18 @@ def assert_gpu_and_cpu_are_equal_iterator(func, conf={}):
so any amount of data can work, just be careful about how long it might take.
"""
_assert_gpu_and_cpu_are_equal(func, False, conf=conf)


def assert_gpu_and_cpu_are_equal_sql(df, tableName, sql, conf=None):
"""
Assert that the specified SQL query produces equal results on CPU and GPU.
:param df: Input dataframe
:param tableName: Name of table to be created with the dataframe
:param sql: SQL query to be run on the specified table
:param conf: Any user-specified confs. Empty by default.
:return: Assertion failure, if results from CPU and GPU do not match.
"""
if conf is None:
conf = {}
df.createOrReplaceTempView(tableName)
assert_gpu_and_cpu_are_equal_collect(lambda spark: spark.sql(sql), conf)
120 changes: 57 additions & 63 deletions integration_tests/src/main/python/window_function_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_are_equal_sql
from data_gen import *
from pyspark.sql.types import *
from marks import *
Expand Down Expand Up @@ -47,23 +47,21 @@
_grpkey_longs_with_timestamps,
_grpkey_longs_with_nullable_timestamps], ids=idfn)
def test_window_aggs_for_rows(data_gen):
df = with_cpu_session(
lambda spark : gen_df(spark, data_gen, length=2048))
df.createOrReplaceTempView("window_agg_table")
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.sql(
'select '
' sum(c) over '
' (partition by a order by b,c asc rows between 1 preceding and 1 following) as sum_c_asc, '
' max(c) over '
' (partition by a order by b desc, c desc rows between 2 preceding and 1 following) as max_c_desc, '
' min(c) over '
' (partition by a order by b,c rows between 2 preceding and current row) as min_c_asc, '
' count(1) over '
' (partition by a order by b,c rows between UNBOUNDED preceding and UNBOUNDED following) as count_1, '
' row_number() over '
' (partition by a order by b,c rows between UNBOUNDED preceding and CURRENT ROW) as row_num '
'from window_agg_table '))
assert_gpu_and_cpu_are_equal_sql(
with_cpu_session(lambda spark : gen_df(spark, data_gen, length=2048)),
"window_agg_table",
'select '
' sum(c) over '
' (partition by a order by b,c asc rows between 1 preceding and 1 following) as sum_c_asc, '
' max(c) over '
' (partition by a order by b desc, c desc rows between 2 preceding and 1 following) as max_c_desc, '
' min(c) over '
' (partition by a order by b,c rows between 2 preceding and current row) as min_c_asc, '
' count(1) over '
' (partition by a order by b,c rows between UNBOUNDED preceding and UNBOUNDED following) as count_1, '
' row_number() over '
' (partition by a order by b,c rows between UNBOUNDED preceding and CURRENT ROW) as row_num '
'from window_agg_table ')


# Test for RANGE queries, with timestamp order-by expressions.
Expand All @@ -73,62 +71,58 @@ def test_window_aggs_for_rows(data_gen):
@pytest.mark.parametrize('data_gen', [_grpkey_longs_with_timestamps,
_grpkey_longs_with_nullable_timestamps], ids=idfn)
def test_window_aggs_for_ranges(data_gen):
df = with_cpu_session(
lambda spark : gen_df(spark, data_gen, length=2048))
df.createOrReplaceTempView("window_agg_table")
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.sql(
'select '
' sum(c) over '
' (partition by a order by cast(b as timestamp) asc '
' range between interval 1 day preceding and interval 1 day following) as sum_c_asc, '
' max(c) over '
' (partition by a order by cast(b as timestamp) desc '
' range between interval 2 days preceding and interval 1 days following) as max_c_desc, '
' min(c) over '
' (partition by a order by cast(b as timestamp) asc '
' range between interval 2 days preceding and current row) as min_c_asc, '
' count(1) over '
' (partition by a order by cast(b as timestamp) asc '
' range between CURRENT ROW and UNBOUNDED following) as count_1_asc, '
' sum(c) over '
' (partition by a order by cast(b as timestamp) asc '
' range between UNBOUNDED preceding and CURRENT ROW) as sum_c_unbounded, '
' max(c) over '
' (partition by a order by cast(b as timestamp) asc '
' range between UNBOUNDED preceding and UNBOUNDED following) as max_c_unbounded '
'from window_agg_table'))
assert_gpu_and_cpu_are_equal_sql(
with_cpu_session(lambda spark: gen_df(spark, data_gen, length=2048)),
"window_agg_table",
'select '
' sum(c) over '
' (partition by a order by cast(b as timestamp) asc '
' range between interval 1 day preceding and interval 1 day following) as sum_c_asc, '
' max(c) over '
' (partition by a order by cast(b as timestamp) desc '
' range between interval 2 days preceding and interval 1 days following) as max_c_desc, '
' min(c) over '
' (partition by a order by cast(b as timestamp) asc '
' range between interval 2 days preceding and current row) as min_c_asc, '
' count(1) over '
' (partition by a order by cast(b as timestamp) asc '
' range between CURRENT ROW and UNBOUNDED following) as count_1_asc, '
' sum(c) over '
' (partition by a order by cast(b as timestamp) asc '
' range between UNBOUNDED preceding and CURRENT ROW) as sum_c_unbounded, '
' max(c) over '
' (partition by a order by cast(b as timestamp) asc '
' range between UNBOUNDED preceding and UNBOUNDED following) as max_c_unbounded '
'from window_agg_table')


@pytest.mark.xfail(reason="[UNSUPPORTED] Ranges over non-timestamp columns "
"(https://github.com/NVIDIA/spark-rapids/issues/216)")
@ignore_order
@pytest.mark.parametrize('data_gen', [_grpkey_longs_with_timestamps], ids=idfn)
def test_window_aggs_for_ranges_of_dates(data_gen):
df = with_cpu_session(
lambda spark : gen_df(spark, data_gen, length=2048))
df.createOrReplaceTempView("window_agg_table")
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.sql(
'select '
' sum(c) over '
' (partition by a order by b asc '
' range between 1 preceding and 1 following) as sum_c_asc '
'from window_agg_table'))
assert_gpu_and_cpu_are_equal_sql(
with_cpu_session(lambda spark: gen_df(spark, data_gen, length=2048)),
"window_agg_table",
'select '
' sum(c) over '
' (partition by a order by b asc '
' range between 1 preceding and 1 following) as sum_c_asc '
'from window_agg_table'
)


@pytest.mark.xfail(reason="[BUG] `COUNT(x)` should not count null values of `x` "
"(https://github.com/NVIDIA/spark-rapids/issues/218)")
@ignore_order
@pytest.mark.parametrize('data_gen', [_grpkey_longs_with_no_nulls], ids=idfn)
def test_window_aggs_for_rows_count_non_null(data_gen):
df = with_cpu_session(
lambda spark : gen_df(spark, data_gen, length=2048))
df.createOrReplaceTempView("window_agg_table")
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.sql(
'select '
' count(c) over '
' (partition by a order by b,c '
' rows between UNBOUNDED preceding and UNBOUNDED following) as count_non_null '
'from window_agg_table '))
assert_gpu_and_cpu_are_equal_sql(
with_cpu_session(lambda spark: gen_df(spark, data_gen, length=2048)),
"window_agg_table",
'select '
' count(c) over '
' (partition by a order by b,c '
' rows between UNBOUNDED preceding and UNBOUNDED following) as count_non_null '
'from window_agg_table '
)