Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support unix_timestamp on GPU for subset of formats #1113

Merged
merged 18 commits into from
Nov 18, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,26 @@ window functions like `row_number`, `lead`, and `lag` can produce different resu
includes both `-0.0` and `0.0`, or if the ordering is ambiguous. Spark can produce
different results from one run to another if the ordering is ambiguous on a window function too.

## Parsing strings as dates or timestamps

When converting strings to dates or timestamps using functions like `to_date` and `unix_timestamp`,
only a subset of possible formats are supported on GPU with full compatibility with Spark. The
supported formats are:

- `dd/MM/yyyy`
- `yyyy-MM-dd`
- `yyyy-MM-dd HH:mm:ss`

Other formats may result in incorrect results and will not run on the GPU by default. Some
specific issues with other formats are:

- Spark supports partial microseconds but the plugin does not
- The plugin will produce incorrect results for input data that is not in the correct format in
some cases

To enable all formats on GPU, set
[`spark.rapids.sql.incompatibleDateFormats.enabled`](configs.md#sql.incompatibleDateFormats.enabled) to `true`.

## Casting between types

In general, performing `cast` and `ansi_cast` operations on the GPU is compatible with the same operations on the CPU. However, there are some exceptions. For this reason, certain casts are disabled on the GPU by default and require configuration options to be specified to enable them.
Expand Down
3 changes: 2 additions & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ Name | Description | Default Value
<a name="sql.hashOptimizeSort.enabled"></a>spark.rapids.sql.hashOptimizeSort.enabled|Whether sorts should be inserted after some hashed operations to improve output ordering. This can improve output file sizes when saving to columnar formats.|false
<a name="sql.improvedFloatOps.enabled"></a>spark.rapids.sql.improvedFloatOps.enabled|For some floating point operations spark uses one way to compute the value and the underlying cudf implementation can use an improved algorithm. In some cases this can result in cudf producing an answer when spark overflows. Because this is not as compatible with spark, we have it disabled by default.|false
<a name="sql.improvedTimeOps.enabled"></a>spark.rapids.sql.improvedTimeOps.enabled|When set to true, some operators will avoid overflowing by converting epoch days directly to seconds without first converting to microseconds|false
<a name="sql.incompatibleDateFormats.enabled"></a>spark.rapids.sql.incompatibleDateFormats.enabled|When parsing strings as dates and timestamps in functions like unix_timestamp, setting this to true will force all parsing onto GPU even for formats that can result in incorrect results when parsing invalid inputs.|false
<a name="sql.incompatibleOps.enabled"></a>spark.rapids.sql.incompatibleOps.enabled|For operations that work, but are not 100% compatible with the Spark equivalent set if they should be enabled by default or disabled by default.|false
<a name="sql.python.gpu.enabled"></a>spark.rapids.sql.python.gpu.enabled|This is an experimental feature and is likely to change in the future. Enable (true) or disable (false) support for scheduling Python Pandas UDFs with GPU resources. When enabled, pandas UDFs are assumed to share the same GPU that the RAPIDs accelerator uses and will honor the python GPU configs|false
<a name="sql.reader.batchSizeBytes"></a>spark.rapids.sql.reader.batchSizeBytes|Soft limit on the maximum number of bytes the reader reads per batch. The readers will read chunks of data until this limit is met or exceeded. Note that the reader may estimate the number of bytes that will be used on the GPU in some cases based on the schema and number of rows in each batch.|2147483647
Expand Down Expand Up @@ -224,7 +225,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.UnaryPositive"></a>spark.rapids.sql.expression.UnaryPositive|`positive`|A numeric value with a + in front of it|true|None|
<a name="sql.expression.UnboundedFollowing$"></a>spark.rapids.sql.expression.UnboundedFollowing$| |Special boundary for a window frame, indicating all rows preceding the current row|true|None|
<a name="sql.expression.UnboundedPreceding$"></a>spark.rapids.sql.expression.UnboundedPreceding$| |Special boundary for a window frame, indicating all rows preceding the current row|true|None|
<a name="sql.expression.UnixTimestamp"></a>spark.rapids.sql.expression.UnixTimestamp|`unix_timestamp`|Returns the UNIX timestamp of current or specified time|false|This is not 100% compatible with the Spark version because Incorrectly formatted strings and bogus dates produce garbage data instead of null|
<a name="sql.expression.UnixTimestamp"></a>spark.rapids.sql.expression.UnixTimestamp|`unix_timestamp`|Returns the UNIX timestamp of current or specified time|true|None|
<a name="sql.expression.Upper"></a>spark.rapids.sql.expression.Upper|`upper`, `ucase`|String uppercase operator|false|This is not 100% compatible with the Spark version because in some cases unicode characters change byte width when changing the case. The GPU string conversion does not support these characters. For a full list of unsupported characters see https://github.com/rapidsai/cudf/issues/3132|
<a name="sql.expression.WeekDay"></a>spark.rapids.sql.expression.WeekDay|`weekday`|Returns the day of the week (0 = Monday...6=Sunday)|true|None|
<a name="sql.expression.WindowExpression"></a>spark.rapids.sql.expression.WindowExpression| |Calculates a return value for every input row of a table based on a group (or "window") of rows|true|None|
Expand Down
17 changes: 7 additions & 10 deletions integration_tests/src/main/python/date_time_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,44 +160,41 @@ def test_dayofyear(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.dayofyear(f.col('a'))))

@incompat #Really only the string is
@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn)
def test_unix_timestamp(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.unix_timestamp(f.col('a'))))

@incompat #Really only the string is
@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn)
def test_to_unix_timestamp(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr("to_unix_timestamp(a)"))

@incompat #Really only the string is
@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn)
def test_unix_timestamp_improved(data_gen):
conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true"}
conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true",
"spark.sql.legacy.timeParserPolicy": "EXCEPTION"}
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.unix_timestamp(f.col('a'))), conf)

@incompat #Really only the string is
@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn)
def test_to_unix_timestamp_improved(data_gen):
conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true"}
conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true",
"spark.sql.legacy.timeParserPolicy": "EXCEPTION"}
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr("to_unix_timestamp(a)"), conf)

str_date_and_format_gen = [pytest.param(StringGen('[0-9]{4}/[01][0-9]'),'yyyy/MM', marks=pytest.mark.xfail(reason="cudf does no checks")),
(StringGen('[0-9]{4}/[01][12]/[0-2][1-8]'),'yyyy/MM/dd'),
(ConvertGen(DateGen(nullable=False), lambda d: d.strftime('%Y/%m').zfill(7), data_type=StringType()), 'yyyy/MM')]

@incompat
@pytest.mark.parametrize('data_gen,date_form', str_date_and_format_gen, ids=idfn)
def test_string_to_unix_timestamp(data_gen, date_form):
print("date: " + date_form)
conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true",
"spark.sql.legacy.timeParserPolicy": "EXCEPTION"}
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen, seed=1).selectExpr("to_unix_timestamp(a, '{}')".format(date_form)))
lambda spark : unary_op_df(spark, data_gen, seed=1).selectExpr("to_unix_timestamp(a, '{}')".format(date_form)), conf)

@incompat
@pytest.mark.parametrize('data_gen,date_form', str_date_and_format_gen, ids=idfn)
def test_string_unix_timestamp(data_gen, date_form):
assert_gpu_and_cpu_are_equal_collect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.nvidia.spark.rapids

import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._

class TimeOperatorsSuite extends SparkQueryCompareTestSuite {
Expand All @@ -28,7 +29,8 @@ class TimeOperatorsSuite extends SparkQueryCompareTestSuite {
}

testSparkResultsAreEqual(
"Test from_unixtime with alternative month and two digit year", datesPostEpochDf) {
"Test from_unixtime with alternative month and two digit year", datesPostEpochDf,
conf = new SparkConf().set(RapidsConf.INCOMPATIBLE_DATE_FORMATS.key, "true")) {
frame => frame.select(from_unixtime(col("dates"),"dd/LL/yy HH:mm:ss.SSSSSS"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ object DateUtils {
"MM" -> "%m", "LL" -> "%m", "dd" -> "%d", "mm" -> "%M", "ss" -> "%S", "HH" -> "%H",
"yy" -> "%y", "yyyy" -> "%Y", "SSSSSS" -> "%f")

val ONE_SECOND_MICROSECONDS = 1000000

val ONE_DAY_MICROSECONDS = 86400000000L

case class FormatKeywordToReplace(word: String, startIndex: Int, endIndex: Int)

/**
Expand Down
41 changes: 24 additions & 17 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ object GpuCast {
"\\A\\d{4}\\-\\d{2}\\-\\d{2}[ T]\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z\\Z"
private val TIMESTAMP_REGEX_NO_DATE = "\\A[T]?(\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z)\\Z"

private val ONE_DAY_MICROSECONDS = 86400000000L

/**
* Regex for identifying strings that contain numeric values that can be casted to integral
* types. This includes floating point numbers but not numbers containing exponents.
Expand All @@ -122,6 +120,12 @@ object GpuCast {

val INVALID_FLOAT_CAST_MSG = "At least one value is either null or is an invalid number"

val EPOCH = "epoch"
val NOW = "now"
val TODAY = "today"
val YESTERDAY = "yesterday"
val TOMORROW = "tomorrow"

/**
* Returns true iff we can cast `from` to `to` using the GPU.
*/
Expand Down Expand Up @@ -182,6 +186,17 @@ object GpuCast {
case _ => false
}
}

def calculateSpecialDates: Map[String, Int] = {
val now = DateTimeUtils.currentDate(ZoneId.of("UTC"))
Map(
EPOCH -> 0,
NOW -> now,
TODAY -> now,
YESTERDAY -> (now - 1),
TOMORROW -> (now + 1)
)
}
}

/**
Expand Down Expand Up @@ -655,16 +670,6 @@ case class GpuCast(
}
}

// special dates
val now = DateTimeUtils.currentDate(ZoneId.of("UTC"))
val specialDates: Map[String, Int] = Map(
"epoch" -> 0,
"now" -> now,
"today" -> now,
"yesterday" -> (now - 1),
"tomorrow" -> (now + 1)
)

var sanitizedInput = input.incRefCount()

// replace partial months
Expand All @@ -677,6 +682,8 @@ case class GpuCast(
cv.stringReplaceWithBackrefs("-([0-9])([ T](:?[\\r\\n]|.)*)?\\Z", "-0\\1")
}

val specialDates = calculateSpecialDates

withResource(sanitizedInput) { sanitizedInput =>

// convert dates that are in valid formats yyyy, yyyy-mm, yyyy-mm-dd
Expand Down Expand Up @@ -756,11 +763,11 @@ case class GpuCast(
val today: Long = cal.getTimeInMillis * 1000
val todayStr = new SimpleDateFormat("yyyy-MM-dd").format(cal.getTime)
val specialDates: Map[String, Long] = Map(
"epoch" -> 0,
"now" -> today,
"today" -> today,
"yesterday" -> (today - ONE_DAY_MICROSECONDS),
"tomorrow" -> (today + ONE_DAY_MICROSECONDS)
GpuCast.EPOCH -> 0,
GpuCast.NOW -> today,
GpuCast.TODAY -> today,
GpuCast.YESTERDAY -> (today - DateUtils.ONE_DAY_MICROSECONDS),
GpuCast.TOMORROW -> (today + DateUtils.ONE_DAY_MICROSECONDS)
)

var sanitizedInput = input.incRefCount()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleEx
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python._
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids._
import org.apache.spark.sql.rapids.catalyst.expressions.GpuRand
import org.apache.spark.sql.rapids.execution.{GpuBroadcastMeta, GpuBroadcastNestedLoopJoinMeta, GpuCustomShuffleReaderExec, GpuShuffleMeta}
Expand Down Expand Up @@ -1069,28 +1070,24 @@ object GpuOverrides {
override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = {
if (conf.isImprovedTimestampOpsEnabled) {
// passing the already converted strf string for a little optimization
GpuToUnixTimestampImproved(lhs, rhs, strfFormat)
GpuToUnixTimestampImproved(lhs, rhs, sparkFormat, strfFormat)
} else {
GpuToUnixTimestamp(lhs, rhs, strfFormat)
GpuToUnixTimestamp(lhs, rhs, sparkFormat, strfFormat)
}
}
})
.incompat("Incorrectly formatted strings and bogus dates produce garbage data" +
" instead of null"),
}),
expr[UnixTimestamp](
"Returns the UNIX timestamp of current or specified time",
(a, conf, p, r) => new UnixTimeExprMeta[UnixTimestamp](a, conf, p, r){
override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = {
if (conf.isImprovedTimestampOpsEnabled) {
// passing the already converted strf string for a little optimization
GpuUnixTimestampImproved(lhs, rhs, strfFormat)
GpuUnixTimestampImproved(lhs, rhs, sparkFormat, strfFormat)
} else {
GpuUnixTimestamp(lhs, rhs, strfFormat)
GpuUnixTimestamp(lhs, rhs, sparkFormat, strfFormat)
}
}
})
.incompat("Incorrectly formatted strings and bogus dates produce garbage data" +
" instead of null"),
}),
expr[Hour](
"Returns the hour component of the string/timestamp",
(a, conf, p, r) => new UnaryExprMeta[Hour](a, conf, p, r) {
Expand Down Expand Up @@ -2012,6 +2009,16 @@ object GpuOverrides {
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap
val execs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] =
commonExecs ++ ShimLoader.getSparkShims.getExecs

def getTimeParserPolicy: TimeParserPolicy = {
val policy = SQLConf.get.getConfString(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "EXCEPTION")
policy match {
case "LEGACY" => LegacyTimeParserPolicy
case "EXCEPTION" => ExceptionTimeParserPolicy
case "CORRECTED" => CorrectedTimeParserPolicy
}
}

}
/** Tag the initial plan when AQE is enabled */
case class GpuQueryStagePrepOverrides() extends Rule[SparkPlan] with Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,13 @@ object RapidsConf {
.booleanConf
.createWithDefault(false)

val INCOMPATIBLE_DATE_FORMATS = conf("spark.rapids.sql.incompatibleDateFormats.enabled")
.doc("When parsing strings as dates and timestamps in functions like unix_timestamp, " +
"setting this to true will force all parsing onto GPU even for formats that can " +
"result in incorrect results when parsing invalid inputs.")
.booleanConf
.createWithDefault(false)

val IMPROVED_FLOAT_OPS = conf("spark.rapids.sql.improvedFloatOps.enabled")
.doc("For some floating point operations spark uses one way to compute the value " +
"and the underlying cudf implementation can use an improved algorithm. " +
Expand Down Expand Up @@ -919,6 +926,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isIncompatEnabled: Boolean = get(INCOMPATIBLE_OPS)

lazy val incompatDateFormats: Boolean = get(INCOMPATIBLE_DATE_FORMATS)

lazy val includeImprovedFloat: Boolean = get(IMPROVED_FLOAT_OPS)

lazy val pinnedPoolSize: Long = get(PINNED_POOL_SIZE)
Expand Down
Loading