Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

date_format should not suggest enabling incompatibleDateFormats for formats we cannot support #2532

Merged
merged 2 commits into from
Jun 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions integration_tests/src/main/python/date_time_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
# limitations under the License.

import pytest
from asserts import assert_gpu_and_cpu_are_equal_collect
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect
from data_gen import *
from datetime import date, datetime, timezone
from marks import incompat
from marks import incompat, allow_non_gpu
from pyspark.sql.types import *
from spark_session import with_spark_session
import pyspark.sql.functions as f
Expand Down Expand Up @@ -214,3 +214,36 @@ def test_string_unix_timestamp(data_gen, date_form):
def test_date_format(data_gen, date_format):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr("date_format(a, '{}')".format(date_format)))

unsupported_date_formats = ['F']
@pytest.mark.parametrize('date_format', unsupported_date_formats, ids=idfn)
@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn)
@allow_non_gpu('ProjectExec,Alias,DateFormatClass,Literal,Cast')
def test_date_format_f(data_gen, date_format):
assert_gpu_fallback_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr("date_format(a, '{}')".format(date_format)), 'ProjectExec')

@pytest.mark.parametrize('date_format', unsupported_date_formats, ids=idfn)
@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn)
@allow_non_gpu('ProjectExec,Alias,DateFormatClass,Literal,Cast')
def test_date_format_f_incompat(data_gen, date_format):
# note that we can't support it even with incompatibleDateFormats enabled
conf = {"spark.rapids.sql.incompatibleDateFormats.enabled": "true"}
assert_gpu_fallback_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr("date_format(a, '{}')".format(date_format)), 'ProjectExec', conf)

maybe_supported_date_formats = ['dd-MM-yyyy']
@pytest.mark.parametrize('date_format', maybe_supported_date_formats, ids=idfn)
@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn)
@allow_non_gpu('ProjectExec,Alias,DateFormatClass,Literal,Cast')
def test_date_format_maybe(data_gen, date_format):
assert_gpu_fallback_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr("date_format(a, '{}')".format(date_format)),
'ProjectExec')

@pytest.mark.parametrize('date_format', maybe_supported_date_formats, ids=idfn)
@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn)
def test_date_format_maybe_incompat(data_gen, date_format):
conf = {"spark.rapids.sql.incompatibleDateFormats.enabled": "true"}
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr("date_format(a, '{}')".format(date_format)), conf)
Original file line number Diff line number Diff line change
Expand Up @@ -394,26 +394,31 @@ abstract class UnixTimeExprMeta[A <: BinaryExpression with TimeZoneAwareExpressi

// Date and Timestamp work too
if (expr.right.dataType == StringType) {
try {
extractStringLit(expr.right) match {
case Some(rightLit) =>
sparkFormat = rightLit
if (GpuOverrides.getTimeParserPolicy == LegacyTimeParserPolicy) {
willNotWorkOnGpu("legacyTimeParserPolicy LEGACY is not supported")
} else if (GpuToTimestamp.COMPATIBLE_FORMATS.contains(sparkFormat) ||
conf.incompatDateFormats) {
extractStringLit(expr.right) match {
case Some(rightLit) =>
sparkFormat = rightLit
if (GpuOverrides.getTimeParserPolicy == LegacyTimeParserPolicy) {
willNotWorkOnGpu("legacyTimeParserPolicy LEGACY is not supported")
} else {
try {
// try and convert the format to cuDF format - this will throw an exception if
// the format contains unsupported characters or words
strfFormat = DateUtils.toStrf(sparkFormat,
expr.left.dataType == DataTypes.StringType)
} else {
willNotWorkOnGpu(s"incompatible format '$sparkFormat'. Set " +
// format parsed ok, so it is either compatible (tested/certified) or incompatible
if (!GpuToTimestamp.COMPATIBLE_FORMATS.contains(sparkFormat) &&
!conf.incompatDateFormats) {
willNotWorkOnGpu(s"format '$sparkFormat' on the GPU is not guaranteed " +
s"to produce the same results as Spark on CPU. Set " +
s"spark.rapids.sql.incompatibleDateFormats.enabled=true to force onto GPU.")
}
} catch {
case e: TimestampFormatConversionException =>
willNotWorkOnGpu(s"Failed to convert ${e.reason} ${e.getMessage}")
}
case None =>
willNotWorkOnGpu("format has to be a string literal")
}
} catch {
case x: TimestampFormatConversionException =>
willNotWorkOnGpu(s"Failed to convert ${x.reason} ${x.getMessage()}")
}
case None =>
willNotWorkOnGpu("format has to be a string literal")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,25 @@ package com.nvidia.spark.rapids

import java.sql.{Date, Timestamp}

import org.apache.spark.{SparkConf, SparkException}
import scala.collection.mutable.ListBuffer

import org.scalatest.BeforeAndAfterEach

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.functions.{col, to_date, to_timestamp, unix_timestamp}
import org.apache.spark.sql.internal.SQLConf

class ParseDateTimeSuite extends SparkQueryCompareTestSuite {
class ParseDateTimeSuite extends SparkQueryCompareTestSuite with BeforeAndAfterEach {

override def beforeEach() {
GpuOverrides.removeAllListeners()
}

override def afterEach() {
GpuOverrides.removeAllListeners()
}

testSparkResultsAreEqual("to_date dd/MM/yy (fall back)",
datesAsStrings,
Expand Down Expand Up @@ -142,6 +155,33 @@ class ParseDateTimeSuite extends SparkQueryCompareTestSuite {
"Part of the plan is not columnar class org.apache.spark.sql.execution.ProjectExec"))
}

test("unsupported format") {

// capture plans
val plans = new ListBuffer[SparkPlanMeta[SparkPlan]]()
GpuOverrides.addListener(
(plan: SparkPlanMeta[SparkPlan], _: SparkPlan, _: Seq[Optimization]) => {
plans.append(plan)
})

val e = intercept[IllegalArgumentException] {
val df = withGpuSparkSession(spark => {
datesAsStrings(spark)
.repartition(2)
.withColumn("c1", to_date(col("c0"), "F"))
}, new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED"))
df.collect()
}
assert(e.getMessage.contains(
"Part of the plan is not columnar class org.apache.spark.sql.execution.ProjectExec"))

val planStr = plans.last.toString
assert(planStr.contains("Failed to convert Unsupported character: F"))
// make sure we aren't suggesting enabling INCOMPATIBLE_DATE_FORMATS for something we
// can never support
assert(!planStr.contains(RapidsConf.INCOMPATIBLE_DATE_FORMATS.key))
}

test("parse now") {
def now(spark: SparkSession) = {
import spark.implicits._
Expand Down