Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-38094] Enable matching schema column names by field ids #35385

Closed
wants to merge 14 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,15 @@ object QueryExecutionErrors {
""".stripMargin.replaceAll("\n", " "))
}

def foundDuplicateFieldInFieldIdLookupModeError(
requiredId: Int, matchedFields: String): Throwable = {
new RuntimeException(
s"""
|Found duplicate field(s) "$requiredId": $matchedFields
|in id mapping mode
""".stripMargin.replaceAll("\n", " "))
}

def failedToMergeIncompatibleSchemasError(
left: StructType, right: StructType, e: Throwable): Throwable = {
new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,33 @@ object SQLConf {
.intConf
.createWithDefault(4096)

val PARQUET_FIELD_ID_WRITE_ENABLED =
buildConf("spark.sql.parquet.fieldId.write.enabled")
.doc("Field ID is a native field of the Parquet schema spec. When enabled, " +
"Parquet writers will populate the field Id " +
"metadata (if present) in the Spark schema to the Parquet schema.")
.version("3.3.0")
.booleanConf
jackierwzhang marked this conversation as resolved.
Show resolved Hide resolved
.createWithDefault(true)
jackierwzhang marked this conversation as resolved.
Show resolved Hide resolved

val PARQUET_FIELD_ID_READ_ENABLED =
buildConf("spark.sql.parquet.fieldId.read.enabled")
.doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers " +
"will use field IDs (if present) in the requested Spark schema to look up Parquet " +
"fields instead of using column names")
.version("3.3.0")
.booleanConf
.createWithDefault(false)

val IGNORE_MISSING_PARQUET_FIELD_ID =
buildConf("spark.sql.parquet.fieldId.read.ignoreMissing")
.doc("When the Parquet file doesn't have any field IDs but the " +
"Spark read schema is using field IDs to read, we will silently return nulls " +
"when this flag is enabled, or error otherwise.")
jackierwzhang marked this conversation as resolved.
Show resolved Hide resolved
.version("3.3.0")
.booleanConf
.createWithDefault(false)

val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec")
.doc("Sets the compression codec used when writing ORC files. If either `compression` or " +
"`orc.compress` is specified in the table-specific options/properties, the precedence " +
Expand Down Expand Up @@ -4251,6 +4278,12 @@ class SQLConf extends Serializable with Logging {

def inferDictAsStruct: Boolean = getConf(SQLConf.INFER_NESTED_DICT_AS_STRUCT)

def parquetFieldIdReadEnabled: Boolean = getConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED)

def parquetFieldIdWriteEnabled: Boolean = getConf(SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED)

def ignoreMissingParquetFieldId: Boolean = getConf(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID)

jackierwzhang marked this conversation as resolved.
Show resolved Hide resolved
def useV1Command: Boolean = getConf(SQLConf.LEGACY_USE_V1_COMMAND)

/** ********************** SQLConf functionality methods ************ */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ class ParquetFileFormat
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key,
sparkSession.sessionState.conf.parquetOutputTimestampType.toString)

conf.set(
SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key,
sparkSession.sessionState.conf.parquetFieldIdWriteEnabled.toString)

// Sets compression scheme
conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)

Expand Down
Loading