Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add config to enable mixed types as string in GpuJsonToStruct & GpuJsonScan #10268

Merged
merged 8 commits into from
Jan 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/additional-functionality/advanced_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ Name | Description | Default Value | Applicable at
<a name="sql.json.read.decimal.enabled"></a>spark.rapids.sql.json.read.decimal.enabled|JSON reading is not 100% compatible when reading decimals.|false|Runtime
<a name="sql.json.read.double.enabled"></a>spark.rapids.sql.json.read.double.enabled|JSON reading is not 100% compatible when reading doubles.|true|Runtime
<a name="sql.json.read.float.enabled"></a>spark.rapids.sql.json.read.float.enabled|JSON reading is not 100% compatible when reading floats.|true|Runtime
<a name="sql.json.read.mixedTypesAsString.enabled"></a>spark.rapids.sql.json.read.mixedTypesAsString.enabled|JSON reading is not 100% compatible when reading mixed types as string.|false|Runtime
<a name="sql.mode"></a>spark.rapids.sql.mode|Set the mode for the Rapids Accelerator. The supported modes are explainOnly and executeOnGPU. This config can not be changed at runtime, you must restart the application for it to take affect. The default mode is executeOnGPU, which means the RAPIDS Accelerator plugin convert the Spark operations and execute them on the GPU when possible. The explainOnly mode allows running queries on the CPU and the RAPIDS Accelerator will evaluate the queries as if it was going to run on the GPU. The explanations of what would have run on the GPU and why are output in log messages. When using explainOnly mode, the default explain output is ALL, this can be changed by setting spark.rapids.sql.explain. See that config for more details.|executeongpu|Startup
<a name="sql.optimizer.joinReorder.enabled"></a>spark.rapids.sql.optimizer.joinReorder.enabled|When enabled, joins may be reordered for improved query performance|true|Runtime
<a name="sql.python.gpu.enabled"></a>spark.rapids.sql.python.gpu.enabled|This is an experimental feature and is likely to change in the future. Enable (true) or disable (false) support for scheduling Python Pandas UDFs with GPU resources. When enabled, pandas UDFs are assumed to share the same GPU that the RAPIDs accelerator uses and will honor the python GPU configs|false|Runtime
Expand Down
8 changes: 8 additions & 0 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,14 @@ In particular, the output map is not resulted from a regular JSON parsing but in
* If the input JSON is given as multiple rows, any row containing invalid JSON format will be parsed as an empty
struct instead of a null value ([#9592](https://github.com/NVIDIA/spark-rapids/issues/9592)).

When a JSON attribute contains mixed types (different types in different rows), such as a mix of dictionaries
and lists, Spark will return a string representation of the JSON, but when running on GPU, the default
behavior is to throw an exception. There is an experimental setting
`spark.rapids.sql.json.read.mixedTypesAsString.enabled` that can be set to true to support reading
mixed types as string, but there are known issues where it could also read structs as string in some cases. There
can also be minor formatting differences. Spark will return a parsed and formatted representation, but the
GPU implementation returns the unparsed JSON string.

### `to_json` function

The `to_json` function is disabled by default because it is experimental and has some known incompatibilities
Expand Down
16 changes: 15 additions & 1 deletion integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,9 @@ def test_read_invalid_json(spark_tmp_table_factory, std_input_path, read_func, f
@pytest.mark.parametrize('schema', [_int_schema])
@pytest.mark.parametrize('v1_enabled_list', ["", "json"])
def test_read_valid_json(spark_tmp_table_factory, std_input_path, read_func, filename, schema, v1_enabled_list):
conf = copy_and_update(_enable_all_types_conf, {'spark.sql.sources.useV1SourceList': v1_enabled_list})
conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.rapids.sql.json.read.mixedTypesAsString.enabled': True})
assert_gpu_and_cpu_are_equal_collect(
read_func(std_input_path + '/' + filename,
schema,
Expand Down Expand Up @@ -822,6 +824,18 @@ def test_from_json_struct_of_list(schema):
.select(f.from_json('a', schema)),
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@pytest.mark.parametrize('schema', [
'struct<a:string>'
])
@allow_non_gpu(*non_utc_allow)
def test_from_json_mixed_types_list_struct(schema):
json_string_gen = StringGen(r'{"a": (\[1,2,3\]|{"b":"[a-z]{2}"}) }')
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select('a', f.from_json('a', schema)),
conf={"spark.rapids.sql.expression.JsonToStructs": True,
'spark.rapids.sql.json.read.mixedTypesAsString.enabled': True})

@pytest.mark.parametrize('schema', ['struct<a:string>', 'struct<a:string,b:int>'])
@allow_non_gpu(*non_utc_allow)
def test_from_json_struct_all_empty_string_input(schema):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3662,7 +3662,8 @@ object GpuOverrides extends Logging {

override def convertToGpu(child: Expression): GpuExpression =
// GPU implementation currently does not support duplicated json key names in input
GpuJsonToStructs(a.schema, a.options, child, a.timeZoneId)
GpuJsonToStructs(a.schema, a.options, child, conf.isJsonMixedTypesAsStringEnabled,
a.timeZoneId)
}).disabledByDefault("parsing JSON from a column has a large number of issues and " +
"should be considered beta quality right now."),
expr[StructsToJson](
Expand Down Expand Up @@ -3850,7 +3851,8 @@ object GpuOverrides extends Logging {
a.dataFilters,
conf.maxReadBatchSizeRows,
conf.maxReadBatchSizeBytes,
conf.maxGpuColumnSizeBytes)
conf.maxGpuColumnSizeBytes,
conf.isJsonMixedTypesAsStringEnabled)
})).map(r => (r.getClassFor.asSubclass(classOf[Scan]), r)).toMap

val scans: Map[Class[_ <: Scan], ScanRule[_ <: Scan]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,12 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.booleanConf
.createWithDefault(false)

val ENABLE_READ_JSON_MIXED_TYPES_AS_STRING =
conf("spark.rapids.sql.json.read.mixedTypesAsString.enabled")
.doc("JSON reading is not 100% compatible when reading mixed types as string.")
.booleanConf
.createWithDefault(false)

val ENABLE_AVRO = conf("spark.rapids.sql.format.avro.enabled")
.doc("When set to true enables all avro input and output acceleration. " +
"(only input is currently supported anyways)")
Expand Down Expand Up @@ -2621,6 +2627,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isJsonDecimalReadEnabled: Boolean = get(ENABLE_READ_JSON_DECIMALS)

lazy val isJsonMixedTypesAsStringEnabled: Boolean = get(ENABLE_READ_JSON_MIXED_TYPES_AS_STRING)

lazy val isAvroEnabled: Boolean = get(ENABLE_AVRO)

lazy val isAvroReadEnabled: Boolean = get(ENABLE_AVRO_READ)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ case class GpuJsonScan(
dataFilters: Seq[Expression],
maxReaderBatchSizeRows: Integer,
maxReaderBatchSizeBytes: Long,
maxGpuColumnSizeBytes: Long)
maxGpuColumnSizeBytes: Long,
mixedTypesAsStringEnabled: Boolean)
extends TextBasedFileScan(sparkSession, options) with GpuScan {

private lazy val parsedOptions: JSONOptions = new JSONOptions(
Expand All @@ -272,7 +273,8 @@ case class GpuJsonScan(

GpuJsonPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
dataSchema, readDataSchema, readPartitionSchema, parsedOptions, maxReaderBatchSizeRows,
maxReaderBatchSizeBytes, maxGpuColumnSizeBytes, metrics, options.asScala.toMap)
maxReaderBatchSizeBytes, maxGpuColumnSizeBytes, metrics, options.asScala.toMap,
mixedTypesAsStringEnabled)
}

override def withInputFile(): GpuScan = this
Expand All @@ -290,7 +292,8 @@ case class GpuJsonPartitionReaderFactory(
maxReaderBatchSizeBytes: Long,
maxGpuColumnSizeBytes: Long,
metrics: Map[String, GpuMetric],
@transient params: Map[String, String]) extends ShimFilePartitionReaderFactory(params) {
@transient params: Map[String, String],
mixedTypesAsStringEnabled: Boolean) extends ShimFilePartitionReaderFactory(params) {

override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = {
throw new IllegalStateException("ROW BASED PARSING IS NOT SUPPORTED ON THE GPU...")
Expand All @@ -300,7 +303,7 @@ case class GpuJsonPartitionReaderFactory(
val conf = broadcastedConf.value.value
val reader = new PartitionReaderWithBytesRead(new JsonPartitionReader(conf, partFile,
dataSchema, readDataSchema, parsedOptions, maxReaderBatchSizeRows, maxReaderBatchSizeBytes,
metrics))
metrics, mixedTypesAsStringEnabled))
ColumnarPartitionReaderWithPartitionValues.newReader(partFile, reader, partitionSchema,
maxGpuColumnSizeBytes)
}
Expand Down Expand Up @@ -346,15 +349,16 @@ class JsonPartitionReader(
parsedOptions: JSONOptions,
maxRowsPerChunk: Integer,
maxBytesPerChunk: Long,
execMetrics: Map[String, GpuMetric])
execMetrics: Map[String, GpuMetric],
enableMixedTypesAsString: Boolean)
extends GpuTextBasedPartitionReader[HostLineBufferer, HostLineBuffererFactory.type](conf,
partFile, dataSchema, readDataSchema, parsedOptions.lineSeparatorInRead, maxRowsPerChunk,
maxBytesPerChunk, execMetrics, HostLineBuffererFactory) {

def buildJsonOptions(parsedOptions: JSONOptions): cudf.JSONOptions = {
cudf.JSONOptions.builder()
.withRecoverWithNull(true)
.withMixedTypesAsStrings(true)
.withMixedTypesAsStrings(enableMixedTypesAsString)
.build
}

Expand Down
jlowe marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -67,7 +67,8 @@ class GpuReadJsonFileFormat extends JsonFileFormat with GpuReadFileFormatWithMet
rapidsConf.maxReadBatchSizeBytes,
rapidsConf.maxGpuColumnSizeBytes,
metrics,
options)
options,
rapidsConf.isJsonMixedTypesAsStringEnabled)
PartitionReaderIterator.buildReader(factory)
}

Expand Down
jlowe marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,6 +33,7 @@ case class GpuJsonToStructs(
schema: DataType,
options: Map[String, String],
child: Expression,
enableMixedTypesAsString: Boolean,
timeZoneId: Option[String] = None)
extends GpuUnaryExpression with TimeZoneAwareExpression with ExpectsInputTypes
with NullIntolerant {
Expand Down Expand Up @@ -177,9 +178,7 @@ case class GpuJsonToStructs(

val jsonOptions = cudf.JSONOptions.builder()
.withRecoverWithNull(true)
// tracking issue for enabling mixed type as string
// https://github.com/NVIDIA/spark-rapids/issues/10253
.withMixedTypesAsStrings(false)
.withMixedTypesAsStrings(enableMixedTypesAsString)
.build()
withResource(cudf.Table.readJSON(jsonOptions, data, start, length)) { tableWithMeta =>
val names = tableWithMeta.getColumnNames
Expand Down
Loading