Skip to content

Commit

Permalink
Add in basic support for running tpcds like queries (#506)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <[email protected]>
  • Loading branch information
revans2 authored Aug 4, 2020
1 parent 0d585ab commit b8da990
Show file tree
Hide file tree
Showing 6 changed files with 4,804 additions and 0 deletions.
6 changes: 6 additions & 0 deletions integration_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ def pytest_addoption(parser):
parser.addoption(
"--tpcxbb_path", action="store", default=None, help="path to TPCXbb data"
)
parser.addoption(
"--tpcds_format", action="store", default="parquet", help="format of TPC-DS data"
)
parser.addoption(
"--tpcds_path", action="store", default=None, help="path to TPC-DS data"
)
parser.addoption(
"--tpch_format", action="store", default="parquet", help="format of TPCH data"
)
Expand Down
3 changes: 3 additions & 0 deletions integration_tests/src/main/python/asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from conftest import is_incompat, should_sort_on_spark, should_sort_locally, get_float_check, get_limit, spark_jvm
from datetime import date, datetime
from decimal import Decimal
import math
from pyspark.sql import Row
import pytest
Expand Down Expand Up @@ -73,6 +74,8 @@ def _assert_equal(cpu, gpu, float_check, path):
assert cpu == gpu, "GPU and CPU date values are different at {}".format(path)
elif isinstance(cpu, bool):
assert cpu == gpu, "GPU and CPU boolean values are different at {}".format(path)
elif isinstance(cpu, Decimal):
assert cpu == gpu, "GPU and CPU decimal values are different at {}".format(path)
elif (cpu == None):
assert cpu == gpu, "GPU and CPU are not both null at {}".format(path)
else:
Expand Down
37 changes: 37 additions & 0 deletions integration_tests/src/main/python/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ def setup(self, spark):
"parquet": jvm.com.nvidia.spark.rapids.tests.tpch.TpchLikeSpark.setupAllParquet,
"orc": jvm.com.nvidia.spark.rapids.tests.tpch.TpchLikeSpark.setupAllOrc
}
if not self.tpch_format in formats:
raise RuntimeError("{} is not a supported tpch input type".format(self.tpch_format))
formats.get(self.tpch_format)(jvm_session, self.tpch_path)

def do_test_query(self, query):
Expand Down Expand Up @@ -267,6 +269,8 @@ def setup(self, spark):
"parquet": jvm.com.nvidia.spark.rapids.tests.tpcxbb.TpcxbbLikeSpark.setupAllParquet,
"orc": jvm.com.nvidia.spark.rapids.tests.tpcxbb.TpcxbbLikeSpark.setupAllOrc
}
if not self.tpcxbb_format in formats:
raise RuntimeError("{} is not a supported tpcxbb input type".format(self.tpcxbb_format))
formats.get(self.tpcxbb_format)(jvm_session,self.tpcxbb_path)

def do_test_query(self, query):
Expand Down Expand Up @@ -327,4 +331,37 @@ def mortgage(request):
else:
yield MortgageRunner(mortgage_format, mortgage_path + '/acq', mortgage_path + '/perf')

class TpcdsRunner:
def __init__(self, tpcds_format, tpcds_path):
self.tpcds_format = tpcds_format
self.tpcds_path = tpcds_path
self.setup(get_spark_i_know_what_i_am_doing())

def setup(self, spark):
jvm_session = _get_jvm_session(spark)
jvm = _get_jvm(spark)
formats = {
"csv": jvm.com.nvidia.spark.rapids.tests.tpcds.TpcdsLikeSpark.setupAllCSV,
"parquet": jvm.com.nvidia.spark.rapids.tests.tpcds.TpcdsLikeSpark.setupAllParquet,
"orc": jvm.com.nvidia.spark.rapids.tests.tpcds.TpcdsLikeSpark.setupAllOrc
}
if not self.tpcds_format in formats:
raise RuntimeError("{} is not a supported tpcds input type".format(self.tpcds_format))
formats.get(self.tpcds_format)(jvm_session,self.tpcds_path)

def do_test_query(self, query):
spark = get_spark_i_know_what_i_am_doing()
jvm_session = _get_jvm_session(spark)
jvm = _get_jvm(spark)
df = jvm.com.nvidia.spark.rapids.tests.tpcds.TpcdsLikeSpark.run(jvm_session, query)
return DataFrame(df, spark.getActiveSession())

@pytest.fixture(scope="session")
def tpcds(request):
tpcds_format = request.config.getoption("tpcds_format")
tpcds_path = request.config.getoption("tpcds_path")
if tpcds_path is None:
pytest.skip("TPC-DS not configured to run")
else:
yield TpcdsRunner(tpcds_format, tpcds_path)

39 changes: 39 additions & 0 deletions integration_tests/src/main/python/tpcds_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect
from marks import incompat, ignore_order, allow_non_gpu, approximate_float

queries = ['q1', 'q2', 'q3', 'q4', 'q5', 'q6', 'q7', 'q8', 'q9',
'q10', 'q11', 'q12', 'q13', 'q14a', 'q14b', 'q15', 'q16', 'q17', 'q18', 'q19',
'q20', 'q21', 'q22', 'q23a', 'q23b', 'q24a', 'q24b', 'q25', 'q26', 'q27', 'q28', 'q29',
'q30', 'q31', 'q32', 'q33', 'q34', 'q35', 'q36', 'q37', 'q38', 'q39a', 'q39b',
'q40', 'q41', 'q42', 'q43', 'q44', 'q45', 'q46', 'q47', 'q48', 'q49',
'q50', 'q51', 'q52', 'q53', 'q54', 'q55', 'q56', 'q57', 'q58', 'q59',
'q60', 'q61', 'q62', 'q63', 'q64', 'q65', 'q66', 'q67', 'q68', 'q69',
'q70', 'q71', 'q72', 'q73', 'q74', 'q75', 'q76', 'q77', 'q78', 'q79',
'q80', 'q81', 'q82', 'q83', 'q84', 'q85', 'q86', 'q87', 'q88', 'q89',
'q90', 'q91', 'q92', 'q93', 'q94', 'q95', 'q96', 'q97', 'q98', 'q99',
'ss_max', 'ss_maxb']

@incompat
@ignore_order
@approximate_float
@allow_non_gpu(any=True)
@pytest.mark.parametrize('query', queries)
def test_tpcds(tpcds, query):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : tpcds.do_test_query(query))
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tests.tpcds

import java.util.concurrent.TimeUnit.NANOSECONDS

import scala.collection.mutable.ListBuffer

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession

object TpcdsLikeBench extends Logging {

/**
* This method can be called from Spark shell using the following syntax:
*
* TpcdsLikeBench.runBench(spark, "q5")
*/
def runBench(
spark: SparkSession,
query: String,
numColdRuns: Int = 1,
numHotRuns: Int = 3): Unit = {

val coldRunElapsed = new ListBuffer[Long]()
for (i <- 0 until numColdRuns) {
println(s"*** Start cold run $i:")
val start = System.nanoTime()
TpcdsLikeSpark.run(spark, query).collect
val end = System.nanoTime()
val elapsed = NANOSECONDS.toMillis(end - start)
coldRunElapsed.append(elapsed)
println(s"*** Cold run $i took $elapsed msec.")
}

val hotRunElapsed = new ListBuffer[Long]()
for (i <- 0 until numHotRuns) {
println(s"*** Start hot run $i:")
val start = System.nanoTime()
TpcdsLikeSpark.run(spark, query).collect
val end = System.nanoTime()
val elapsed = NANOSECONDS.toMillis(end - start)
hotRunElapsed.append(elapsed)
println(s"*** Hot run $i took $elapsed msec.")
}

for (i <- 0 until numColdRuns) {
println(s"Cold run $i took ${coldRunElapsed(i)} msec.")
}
println(s"Average cold run took ${coldRunElapsed.sum.toDouble/numColdRuns} msec.")

for (i <- 0 until numHotRuns) {
println(s"Hot run $i took ${hotRunElapsed(i)} msec.")
}
println(s"Average hot run took ${hotRunElapsed.sum.toDouble/numHotRuns} msec.")
}

def main(args: Array[String]): Unit = {
val input = args(0)
val query = args(1)

val spark = SparkSession.builder.appName("TPC-DS Like Bench").getOrCreate()
TpcdsLikeSpark.setupAllParquet(spark, input)

println(s"*** RUNNING TPC-DS QUERY $query")
runBench(spark, query)
}
}
Loading

0 comments on commit b8da990

Please sign in to comment.