Skip to content

Commit

Permalink
Support unix_timestamp on GPU for subset of formats (NVIDIA#1113)
Browse files Browse the repository at this point in the history
* Support unix_timestamp on GPU for subset of formats

Signed-off-by: Andy Grove <[email protected]>

* close scalar value

Signed-off-by: Andy Grove <[email protected]>

* compatible formats will now run on GPU without requiring incompatibleOps to be set

Signed-off-by: Andy Grove <[email protected]>

* code cleanup and address more review comments

Signed-off-by: Andy Grove <[email protected]>

* add specific config option for enabling incompatible date formats on GPU

* update documentation

Signed-off-by: Andy Grove <[email protected]>

* improve docs

Signed-off-by: Andy Grove <[email protected]>

* use constants for special dates

Signed-off-by: Andy Grove <[email protected]>

* Add support for more date formats and remove incompat from to_unix_timestamp

Signed-off-by: Andy Grove <[email protected]>

* remove debug print

Signed-off-by: Andy Grove <[email protected]>

* Revert unnecessary change

Signed-off-by: Andy Grove <[email protected]>

* Make ToUnixTimestamp consistent with UnixTimestamp

Signed-off-by: Andy Grove <[email protected]>

* refactor to remove duplicate code

Signed-off-by: Andy Grove <[email protected]>

* fix resource leaks and fix regressions in python tests

Signed-off-by: Andy Grove <[email protected]>

* scalstyle

Signed-off-by: Andy Grove <[email protected]>

* update docs

Signed-off-by: Andy Grove <[email protected]>

* fix error in handling of legacyTimeParserPolicy=EXCEPTION

Signed-off-by: Andy Grove <[email protected]>

* fix test failures against Spark 3.1.0

Signed-off-by: Andy Grove <[email protected]>
  • Loading branch information
andygrove authored and sperlingxx committed Nov 20, 2020
1 parent e6a7cff commit b58dc2a
Show file tree
Hide file tree
Showing 10 changed files with 433 additions and 60 deletions.
23 changes: 23 additions & 0 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,29 @@ window functions like `row_number`, `lead`, and `lag` can produce different resu
includes both `-0.0` and `0.0`, or if the ordering is ambiguous. Spark can produce
different results from one run to another if the ordering is ambiguous on a window function too.

## Parsing strings as dates or timestamps

When converting strings to dates or timestamps using functions like `to_date` and `unix_timestamp`,
only a subset of possible formats are supported on GPU with full compatibility with Spark. The
supported formats are:

- `dd/MM/yyyy`
- `yyyy/MM`
- `yyyy/MM/dd`
- `yyyy-MM`
- `yyyy-MM-dd`
- `yyyy-MM-dd HH:mm:ss`

Other formats may result in incorrect results and will not run on the GPU by default. Some
specific issues with other formats are:

- Spark supports partial microseconds but the plugin does not
- The plugin will produce incorrect results for input data that is not in the correct format in
some cases

To enable all formats on GPU, set
[`spark.rapids.sql.incompatibleDateFormats.enabled`](configs.md#sql.incompatibleDateFormats.enabled) to `true`.

## Casting between types

In general, performing `cast` and `ansi_cast` operations on the GPU is compatible with the same operations on the CPU. However, there are some exceptions. For this reason, certain casts are disabled on the GPU by default and require configuration options to be specified to enable them.
Expand Down
5 changes: 3 additions & 2 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ Name | Description | Default Value
<a name="sql.hashOptimizeSort.enabled"></a>spark.rapids.sql.hashOptimizeSort.enabled|Whether sorts should be inserted after some hashed operations to improve output ordering. This can improve output file sizes when saving to columnar formats.|false
<a name="sql.improvedFloatOps.enabled"></a>spark.rapids.sql.improvedFloatOps.enabled|For some floating point operations spark uses one way to compute the value and the underlying cudf implementation can use an improved algorithm. In some cases this can result in cudf producing an answer when spark overflows. Because this is not as compatible with spark, we have it disabled by default.|false
<a name="sql.improvedTimeOps.enabled"></a>spark.rapids.sql.improvedTimeOps.enabled|When set to true, some operators will avoid overflowing by converting epoch days directly to seconds without first converting to microseconds|false
<a name="sql.incompatibleDateFormats.enabled"></a>spark.rapids.sql.incompatibleDateFormats.enabled|When parsing strings as dates and timestamps in functions like unix_timestamp, setting this to true will force all parsing onto GPU even for formats that can result in incorrect results when parsing invalid inputs.|false
<a name="sql.incompatibleOps.enabled"></a>spark.rapids.sql.incompatibleOps.enabled|For operations that work, but are not 100% compatible with the Spark equivalent set if they should be enabled by default or disabled by default.|false
<a name="sql.python.gpu.enabled"></a>spark.rapids.sql.python.gpu.enabled|This is an experimental feature and is likely to change in the future. Enable (true) or disable (false) support for scheduling Python Pandas UDFs with GPU resources. When enabled, pandas UDFs are assumed to share the same GPU that the RAPIDs accelerator uses and will honor the python GPU configs|false
<a name="sql.reader.batchSizeBytes"></a>spark.rapids.sql.reader.batchSizeBytes|Soft limit on the maximum number of bytes the reader reads per batch. The readers will read chunks of data until this limit is met or exceeded. Note that the reader may estimate the number of bytes that will be used on the GPU in some cases based on the schema and number of rows in each batch.|2147483647
Expand Down Expand Up @@ -219,12 +220,12 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.TimeSub"></a>spark.rapids.sql.expression.TimeSub| |Subtracts interval from timestamp|true|None|
<a name="sql.expression.ToDegrees"></a>spark.rapids.sql.expression.ToDegrees|`degrees`|Converts radians to degrees|true|None|
<a name="sql.expression.ToRadians"></a>spark.rapids.sql.expression.ToRadians|`radians`|Converts degrees to radians|true|None|
<a name="sql.expression.ToUnixTimestamp"></a>spark.rapids.sql.expression.ToUnixTimestamp|`to_unix_timestamp`|Returns the UNIX timestamp of the given time|false|This is not 100% compatible with the Spark version because Incorrectly formatted strings and bogus dates produce garbage data instead of null|
<a name="sql.expression.ToUnixTimestamp"></a>spark.rapids.sql.expression.ToUnixTimestamp|`to_unix_timestamp`|Returns the UNIX timestamp of the given time|true|None|
<a name="sql.expression.UnaryMinus"></a>spark.rapids.sql.expression.UnaryMinus|`negative`|Negate a numeric value|true|None|
<a name="sql.expression.UnaryPositive"></a>spark.rapids.sql.expression.UnaryPositive|`positive`|A numeric value with a + in front of it|true|None|
<a name="sql.expression.UnboundedFollowing$"></a>spark.rapids.sql.expression.UnboundedFollowing$| |Special boundary for a window frame, indicating all rows preceding the current row|true|None|
<a name="sql.expression.UnboundedPreceding$"></a>spark.rapids.sql.expression.UnboundedPreceding$| |Special boundary for a window frame, indicating all rows preceding the current row|true|None|
<a name="sql.expression.UnixTimestamp"></a>spark.rapids.sql.expression.UnixTimestamp|`unix_timestamp`|Returns the UNIX timestamp of current or specified time|false|This is not 100% compatible with the Spark version because Incorrectly formatted strings and bogus dates produce garbage data instead of null|
<a name="sql.expression.UnixTimestamp"></a>spark.rapids.sql.expression.UnixTimestamp|`unix_timestamp`|Returns the UNIX timestamp of current or specified time|true|None|
<a name="sql.expression.Upper"></a>spark.rapids.sql.expression.Upper|`upper`, `ucase`|String uppercase operator|false|This is not 100% compatible with the Spark version because in some cases unicode characters change byte width when changing the case. The GPU string conversion does not support these characters. For a full list of unsupported characters see https://github.com/rapidsai/cudf/issues/3132|
<a name="sql.expression.WeekDay"></a>spark.rapids.sql.expression.WeekDay|`weekday`|Returns the day of the week (0 = Monday...6=Sunday)|true|None|
<a name="sql.expression.WindowExpression"></a>spark.rapids.sql.expression.WindowExpression| |Calculates a return value for every input row of a table based on a group (or "window") of rows|true|None|
Expand Down
10 changes: 2 additions & 8 deletions integration_tests/src/main/python/date_time_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,26 +160,23 @@ def test_dayofyear(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.dayofyear(f.col('a'))))

@incompat #Really only the string is
@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn)
def test_unix_timestamp(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.unix_timestamp(f.col('a'))))

@incompat #Really only the string is
@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn)
def test_to_unix_timestamp(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr("to_unix_timestamp(a)"))

@incompat #Really only the string is
@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn)
def test_unix_timestamp_improved(data_gen):
conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true"}
conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true",
"spark.sql.legacy.timeParserPolicy": "CORRECTED"}
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.unix_timestamp(f.col('a'))), conf)

@incompat #Really only the string is
@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn)
def test_to_unix_timestamp_improved(data_gen):
conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true"}
Expand All @@ -190,14 +187,11 @@ def test_to_unix_timestamp_improved(data_gen):
(StringGen('[0-9]{4}/[01][12]/[0-2][1-8]'),'yyyy/MM/dd'),
(ConvertGen(DateGen(nullable=False), lambda d: d.strftime('%Y/%m').zfill(7), data_type=StringType()), 'yyyy/MM')]

@incompat
@pytest.mark.parametrize('data_gen,date_form', str_date_and_format_gen, ids=idfn)
def test_string_to_unix_timestamp(data_gen, date_form):
print("date: " + date_form)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen, seed=1).selectExpr("to_unix_timestamp(a, '{}')".format(date_form)))

@incompat
@pytest.mark.parametrize('data_gen,date_form', str_date_and_format_gen, ids=idfn)
def test_string_unix_timestamp(data_gen, date_form):
assert_gpu_and_cpu_are_equal_collect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.nvidia.spark.rapids

import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._

class TimeOperatorsSuite extends SparkQueryCompareTestSuite {
Expand All @@ -28,7 +29,8 @@ class TimeOperatorsSuite extends SparkQueryCompareTestSuite {
}

testSparkResultsAreEqual(
"Test from_unixtime with alternative month and two digit year", datesPostEpochDf) {
"Test from_unixtime with alternative month and two digit year", datesPostEpochDf,
conf = new SparkConf().set(RapidsConf.INCOMPATIBLE_DATE_FORMATS.key, "true")) {
frame => frame.select(from_unixtime(col("dates"),"dd/LL/yy HH:mm:ss.SSSSSS"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ object DateUtils {
"MM" -> "%m", "LL" -> "%m", "dd" -> "%d", "mm" -> "%M", "ss" -> "%S", "HH" -> "%H",
"yy" -> "%y", "yyyy" -> "%Y", "SSSSSS" -> "%f")

val ONE_SECOND_MICROSECONDS = 1000000

val ONE_DAY_SECONDS = 86400L

val ONE_DAY_MICROSECONDS = 86400000000L

case class FormatKeywordToReplace(word: String, startIndex: Int, endIndex: Int)

/**
Expand Down
41 changes: 24 additions & 17 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ object GpuCast {
"\\A\\d{4}\\-\\d{2}\\-\\d{2}[ T]\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z\\Z"
private val TIMESTAMP_REGEX_NO_DATE = "\\A[T]?(\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z)\\Z"

private val ONE_DAY_MICROSECONDS = 86400000000L

/**
* Regex for identifying strings that contain numeric values that can be casted to integral
* types. This includes floating point numbers but not numbers containing exponents.
Expand All @@ -122,6 +120,12 @@ object GpuCast {

val INVALID_FLOAT_CAST_MSG = "At least one value is either null or is an invalid number"

val EPOCH = "epoch"
val NOW = "now"
val TODAY = "today"
val YESTERDAY = "yesterday"
val TOMORROW = "tomorrow"

/**
* Returns true iff we can cast `from` to `to` using the GPU.
*/
Expand Down Expand Up @@ -182,6 +186,17 @@ object GpuCast {
case _ => false
}
}

def calculateSpecialDates: Map[String, Int] = {
val now = DateTimeUtils.currentDate(ZoneId.of("UTC"))
Map(
EPOCH -> 0,
NOW -> now,
TODAY -> now,
YESTERDAY -> (now - 1),
TOMORROW -> (now + 1)
)
}
}

/**
Expand Down Expand Up @@ -655,16 +670,6 @@ case class GpuCast(
}
}

// special dates
val now = DateTimeUtils.currentDate(ZoneId.of("UTC"))
val specialDates: Map[String, Int] = Map(
"epoch" -> 0,
"now" -> now,
"today" -> now,
"yesterday" -> (now - 1),
"tomorrow" -> (now + 1)
)

var sanitizedInput = input.incRefCount()

// replace partial months
Expand All @@ -677,6 +682,8 @@ case class GpuCast(
cv.stringReplaceWithBackrefs("-([0-9])([ T](:?[\\r\\n]|.)*)?\\Z", "-0\\1")
}

val specialDates = calculateSpecialDates

withResource(sanitizedInput) { sanitizedInput =>

// convert dates that are in valid formats yyyy, yyyy-mm, yyyy-mm-dd
Expand Down Expand Up @@ -756,11 +763,11 @@ case class GpuCast(
val today: Long = cal.getTimeInMillis * 1000
val todayStr = new SimpleDateFormat("yyyy-MM-dd").format(cal.getTime)
val specialDates: Map[String, Long] = Map(
"epoch" -> 0,
"now" -> today,
"today" -> today,
"yesterday" -> (today - ONE_DAY_MICROSECONDS),
"tomorrow" -> (today + ONE_DAY_MICROSECONDS)
GpuCast.EPOCH -> 0,
GpuCast.NOW -> today,
GpuCast.TODAY -> today,
GpuCast.YESTERDAY -> (today - DateUtils.ONE_DAY_MICROSECONDS),
GpuCast.TOMORROW -> (today + DateUtils.ONE_DAY_MICROSECONDS)
)

var sanitizedInput = input.incRefCount()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleEx
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python._
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids._
import org.apache.spark.sql.rapids.catalyst.expressions.GpuRand
import org.apache.spark.sql.rapids.execution.{GpuBroadcastMeta, GpuBroadcastNestedLoopJoinMeta, GpuCustomShuffleReaderExec, GpuShuffleMeta}
Expand Down Expand Up @@ -1142,28 +1143,24 @@ object GpuOverrides {
override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = {
if (conf.isImprovedTimestampOpsEnabled) {
// passing the already converted strf string for a little optimization
GpuToUnixTimestampImproved(lhs, rhs, strfFormat)
GpuToUnixTimestampImproved(lhs, rhs, sparkFormat, strfFormat)
} else {
GpuToUnixTimestamp(lhs, rhs, strfFormat)
GpuToUnixTimestamp(lhs, rhs, sparkFormat, strfFormat)
}
}
})
.incompat("Incorrectly formatted strings and bogus dates produce garbage data" +
" instead of null"),
}),
expr[UnixTimestamp](
"Returns the UNIX timestamp of current or specified time",
(a, conf, p, r) => new UnixTimeExprMeta[UnixTimestamp](a, conf, p, r){
override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = {
if (conf.isImprovedTimestampOpsEnabled) {
// passing the already converted strf string for a little optimization
GpuUnixTimestampImproved(lhs, rhs, strfFormat)
GpuUnixTimestampImproved(lhs, rhs, sparkFormat, strfFormat)
} else {
GpuUnixTimestamp(lhs, rhs, strfFormat)
GpuUnixTimestamp(lhs, rhs, sparkFormat, strfFormat)
}
}
})
.incompat("Incorrectly formatted strings and bogus dates produce garbage data" +
" instead of null"),
}),
expr[Hour](
"Returns the hour component of the string/timestamp",
(a, conf, p, r) => new UnaryExprMeta[Hour](a, conf, p, r) {
Expand Down Expand Up @@ -2094,6 +2091,16 @@ object GpuOverrides {
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap
val execs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] =
commonExecs ++ ShimLoader.getSparkShims.getExecs

def getTimeParserPolicy: TimeParserPolicy = {
val policy = SQLConf.get.getConfString(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "EXCEPTION")
policy match {
case "LEGACY" => LegacyTimeParserPolicy
case "EXCEPTION" => ExceptionTimeParserPolicy
case "CORRECTED" => CorrectedTimeParserPolicy
}
}

}
/** Tag the initial plan when AQE is enabled */
case class GpuQueryStagePrepOverrides() extends Rule[SparkPlan] with Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,13 @@ object RapidsConf {
.booleanConf
.createWithDefault(false)

val INCOMPATIBLE_DATE_FORMATS = conf("spark.rapids.sql.incompatibleDateFormats.enabled")
.doc("When parsing strings as dates and timestamps in functions like unix_timestamp, " +
"setting this to true will force all parsing onto GPU even for formats that can " +
"result in incorrect results when parsing invalid inputs.")
.booleanConf
.createWithDefault(false)

val IMPROVED_FLOAT_OPS = conf("spark.rapids.sql.improvedFloatOps.enabled")
.doc("For some floating point operations spark uses one way to compute the value " +
"and the underlying cudf implementation can use an improved algorithm. " +
Expand Down Expand Up @@ -922,6 +929,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isIncompatEnabled: Boolean = get(INCOMPATIBLE_OPS)

lazy val incompatDateFormats: Boolean = get(INCOMPATIBLE_DATE_FORMATS)

lazy val includeImprovedFloat: Boolean = get(IMPROVED_FLOAT_OPS)

lazy val pinnedPoolSize: Long = get(PINNED_POOL_SIZE)
Expand Down
Loading

0 comments on commit b58dc2a

Please sign in to comment.