diff --git a/docs/configs.md b/docs/configs.md
index 78887199b41..41af8b4fdc7 100644
--- a/docs/configs.md
+++ b/docs/configs.md
@@ -65,6 +65,7 @@ Name | Description | Default Value
spark.rapids.sql.castStringToFloat.enabled|When set to true, enables casting from strings to float types (float, double) on the GPU. Currently hex values aren't supported on the GPU. Also note that casting from string to float types on the GPU returns incorrect results when the string represents any number "1.7976931348623158E308" <= x < "1.7976931348623159E308" and "-1.7976931348623158E308" >= x > "-1.7976931348623159E308" in both these cases the GPU returns Double.MaxValue while CPU returns "+Infinity" and "-Infinity" respectively|false
spark.rapids.sql.castStringToTimestamp.enabled|When set to true, casting from string to timestamp is supported on the GPU. The GPU only supports a subset of formats when casting strings to timestamps. Refer to the CAST documentation for more details.|false
spark.rapids.sql.concurrentGpuTasks|Set the number of tasks that can execute concurrently per GPU. Tasks may temporarily block when the number of concurrent tasks in the executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors.|1
+spark.rapids.sql.createMap.enabled|The GPU-enabled version of the `CreateMap` expression (`map` SQL function) does not detect duplicate keys in all cases and does not guarantee which key wins if there are duplicates. When this config is set to true, `CreateMap` will be enabled to run on the GPU even when there might be duplicate keys.|false
spark.rapids.sql.csv.read.bool.enabled|Parsing an invalid CSV boolean value produces true instead of null|false
spark.rapids.sql.csv.read.byte.enabled|Parsing CSV bytes is much more lenient and will return 0 for some malformed values instead of null|false
spark.rapids.sql.csv.read.date.enabled|Parsing invalid CSV dates produces different results from Spark|false
diff --git a/docs/supported_ops.md b/docs/supported_ops.md
index 688f74fbd1d..64210a9c463 100644
--- a/docs/supported_ops.md
+++ b/docs/supported_ops.md
@@ -3855,7 +3855,7 @@ are limited.
|
|
PS max nested DECIMAL precision of 18; UTC is only supported TZ for nested TIMESTAMP |
- |
+PS max nested DECIMAL precision of 18; UTC is only supported TZ for nested TIMESTAMP |
PS max nested DECIMAL precision of 18; UTC is only supported TZ for nested TIMESTAMP |
|
diff --git a/integration_tests/src/main/python/map_test.py b/integration_tests/src/main/python/map_test.py
index 44b8a0ccf4c..5127f62cacb 100644
--- a/integration_tests/src/main/python/map_test.py
+++ b/integration_tests/src/main/python/map_test.py
@@ -39,7 +39,7 @@ def test_simple_get_map_value(data_gen):
'a["key_5"]'))
@pytest.mark.parametrize('key_gen', [StringGen(nullable=False), IntegerGen(nullable=False), basic_struct_gen], ids=idfn)
-@pytest.mark.parametrize('value_gen', [StringGen(nullable=False), IntegerGen(nullable=False), basic_struct_gen], ids=idfn)
+@pytest.mark.parametrize('value_gen', [StringGen(nullable=True), IntegerGen(nullable=True), basic_struct_gen], ids=idfn)
def test_single_entry_map(key_gen, value_gen):
data_gen = [('a', key_gen), ('b', value_gen)]
assert_gpu_and_cpu_are_equal_collect(
@@ -47,13 +47,39 @@ def test_single_entry_map(key_gen, value_gen):
'map("literal_key", b) as map1',
'map(a, b) as map2'))
-@allow_non_gpu('ProjectExec,Alias,CreateMap')
-# until https://github.com/NVIDIA/spark-rapids/issues/3229 is implemented
+def test_map_expr_no_pairs():
+ data_gen = [('a', StringGen(nullable=False)), ('b', StringGen(nullable=False))]
+ assert_gpu_and_cpu_are_equal_collect(
+ lambda spark : gen_df(spark, data_gen).selectExpr(
+ 'map() as m1'))
+
def test_map_expr_multiple_pairs():
+ # we don't hit duplicate keys in this test due to the high cardinality of the generated strings
+ data_gen = [('a', StringGen(nullable=False)), ('b', StringGen(nullable=False))]
+ assert_gpu_and_cpu_are_equal_collect(
+ lambda spark : gen_df(spark, data_gen).selectExpr(
+ 'map("key1", b, "key2", a) as m1',
+ 'map(a, b, b, a) as m2'),
+ conf={'spark.rapids.sql.createMap.enabled':True})
+
+@allow_non_gpu('ProjectExec,Alias,CreateMap,Literal')
+def test_map_expr_dupe_keys_fallback():
+ data_gen = [('a', StringGen(nullable=False)), ('b', StringGen(nullable=False))]
+ assert_gpu_fallback_collect(
+ lambda spark : gen_df(spark, data_gen).selectExpr(
+ 'map("key1", b, "key1", a) as m1'),
+ 'ProjectExec',
+ conf={'spark.rapids.sql.createMap.enabled':True,
+ 'spark.sql.mapKeyDedupPolicy':'LAST_WIN'})
+
+@allow_non_gpu('ProjectExec,Alias,CreateMap,Literal')
+def test_map_expr_multi_non_literal_keys_fallback():
data_gen = [('a', StringGen(nullable=False)), ('b', StringGen(nullable=False))]
assert_gpu_fallback_collect(
lambda spark : gen_df(spark, data_gen).selectExpr(
- "map(a, b, b, a) as m1"), 'ProjectExec')
+ 'map(a, b, b, a) as m1'),
+ 'ProjectExec',
+ conf={'spark.rapids.sql.createMap.enabled':False})
def test_map_scalar_project():
assert_gpu_and_cpu_are_equal_collect(
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
index aa08c9c1c93..9d563843050 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
@@ -637,6 +637,14 @@ object RapidsConf {
.booleanConf
.createWithDefault(false)
+ val ENABLE_CREATE_MAP = conf("spark.rapids.sql.createMap.enabled")
+ .doc("The GPU-enabled version of the `CreateMap` expression (`map` SQL function) does not " +
+ "detect duplicate keys in all cases and does not guarantee which key wins if there are " +
+ "duplicates. When this config is set to true, `CreateMap` will be enabled to run on the " +
+ "GPU even when there might be duplicate keys.")
+ .booleanConf
+ .createWithDefault(false)
+
val ENABLE_INNER_JOIN = conf("spark.rapids.sql.join.inner.enabled")
.doc("When set to true inner joins are enabled on the GPU")
.booleanConf
@@ -1543,6 +1551,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {
lazy val isProjectAstEnabled: Boolean = get(ENABLE_PROJECT_AST)
+ lazy val isCreateMapEnabled: Boolean = get(ENABLE_CREATE_MAP)
+
lazy val isParquetEnabled: Boolean = get(ENABLE_PARQUET)
lazy val isParquetInt96WriteEnabled: Boolean = get(ENABLE_PARQUET_INT96_WRITE)
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala
index 610cc2e1fb4..8b695e486e2 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala
@@ -20,10 +20,12 @@ import java.io.{File, FileOutputStream}
import java.time.ZoneId
import ai.rapids.cudf.DType
+import scala.collection.mutable
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, UnaryExpression, WindowSpecDefinition}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Literal, UnaryExpression, WindowSpecDefinition}
import org.apache.spark.sql.types._
+
/**
* The level of support that the plugin has for a given type. Used for documentation generation.
*/
@@ -1088,11 +1090,14 @@ object WindowSpecCheck extends ExprChecks {
object CreateMapCheck extends ExprChecks {
- // Spark supports all types except for Map for key and value (Map is not supported
+ // Spark supports all types except for Map for key (Map is not supported
// even in nested types)
- val keyValueSig: TypeSig = (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 +
+ private val keySig: TypeSig = (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 +
TypeSig.ARRAY + TypeSig.STRUCT).nested()
+ private val valueSig: TypeSig = (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 +
+ TypeSig.ARRAY + TypeSig.MAP + TypeSig.STRUCT).nested()
+
override def tagAst(meta: BaseExprMeta[_]): Unit = {
meta.willNotWorkInAst("CreateMap is not supported by AST")
}
@@ -1103,20 +1108,35 @@ object CreateMapCheck extends ExprChecks {
if (context != ProjectExprContext) {
meta.willNotWorkOnGpu(s"this is not supported in the $context context")
} else {
- if (meta.childExprs.length != 2) {
- // See https://github.com/NVIDIA/spark-rapids/issues/3229
- meta.willNotWorkOnGpu("CreateMap only supports two expressions on GPU")
+ // if there are more than two key-value pairs then there is the possibility of duplicate keys
+ if (meta.childExprs.length > 2) {
+ // check for duplicate keys if the keys are literal values
+ val keyExprs = meta.childExprs.indices.filter(_ % 2 == 0).map(meta.childExprs)
+ val litKeys = keyExprs.map(e => GpuOverrides.extractLit(e.wrapped.asInstanceOf[Expression]))
+ if (litKeys.forall(_.isDefined)) {
+ val keys = litKeys.map(_.get.value)
+ val uniqueKeys = new mutable.HashSet[Any]()
+ for (key <- keys) {
+ if (!uniqueKeys.add(key)) {
+ meta.willNotWorkOnGpu("CreateMap with duplicate literal keys is not supported")
+ }
+ }
+ } else if (!meta.conf.isCreateMapEnabled) {
+ meta.willNotWorkOnGpu("CreateMap is not enabled by default when there are " +
+ "multiple key-value pairs and where the keys are not literal values because handling " +
+ "of duplicate keys is not compatible with Spark. " +
+ s"Set ${RapidsConf.ENABLE_CREATE_MAP}=true to enable it anyway.")
+ }
}
}
}
override def support(
dataType: TypeEnum.Value): Map[ExpressionContext, Map[String, SupportLevel]] = {
- val support = keyValueSig.getSupportLevel(dataType, keyValueSig)
Map((ProjectExprContext,
Map(
- ("key", support),
- ("value", support))))
+ ("key", keySig.getSupportLevel(dataType, keySig)),
+ ("value", valueSig.getSupportLevel(dataType, valueSig)))))
}
}
diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/complexTypeCreator.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/complexTypeCreator.scala
index e71d621ac42..71eb49c6b22 100644
--- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/complexTypeCreator.scala
+++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/complexTypeCreator.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.rapids
import ai.rapids.cudf.{ColumnVector, DType}
import com.nvidia.spark.rapids.{GpuColumnVector, GpuExpression, GpuExpressionsUtils}
+import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq
import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion}
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FUNC_ALIAS
@@ -84,11 +85,11 @@ case class GpuCreateArray(children: Seq[Expression], useStringTypeWhenEmpty: Boo
case class GpuCreateMap(children: Seq[Expression], useStringTypeWhenEmpty: Boolean)
extends GpuExpression {
- // See https://github.com/NVIDIA/spark-rapids/issues/3229
- require(children.length == 2)
+ private val valueIndices: Seq[Int] = children.indices.filter(_ % 2 != 0)
+ private val keyIndices: Seq[Int] = children.indices.filter(_ % 2 == 0)
- lazy val keys = children.indices.filter(_ % 2 == 0).map(children)
- lazy val values = children.indices.filter(_ % 2 != 0).map(children)
+ lazy val keys: Seq[Expression] = keyIndices.map(children)
+ lazy val values: Seq[Expression] = valueIndices.map(children)
private val defaultElementType: DataType = {
if (useStringTypeWhenEmpty) {
@@ -104,8 +105,10 @@ case class GpuCreateMap(children: Seq[Expression], useStringTypeWhenEmpty: Boole
children.indices.foreach { index =>
columns(index) = GpuExpressionsUtils.columnarEvalToColumn(children(index), batch).getBase
}
- withResource(ColumnVector.makeStruct(columns: _*)) { struct =>
- GpuColumnVector.from(ColumnVector.makeList(numRows, DType.STRUCT, struct), dataType)
+ val structs = Range(0, columns.length, 2)
+ .safeMap(i => ColumnVector.makeStruct(columns(i), columns(i + 1)))
+ withResource(structs) { _ =>
+ GpuColumnVector.from(ColumnVector.makeList(numRows, DType.STRUCT, structs: _*), dataType)
}
}
}