Skip to content

Commit

Permalink
Enable column name mapping in Delta
Browse files Browse the repository at this point in the history
Add a column name mapping mode in Delta, which allows Delta to use different names in the table schema and in the underlying Parquet files. This is a building block for issues delta-io#957 and delta-io#958.

New unit tests.

Closes delta-io#962

GitOrigin-RevId: 7a64a33fd60781c17236bff6168044e702c8413a
  • Loading branch information
jackierwzhang authored and jbguerraz committed Jul 6, 2022
1 parent 85b35c2 commit 5fa760f
Show file tree
Hide file tree
Showing 8 changed files with 1,322 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}

import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, Metadata => SparkMetadata, MetadataBuilder, StructField, StructType}

Expand Down Expand Up @@ -52,6 +53,9 @@ trait DeltaColumnMappingBase {
*/
protected val DELTA_INTERNAL_COLUMNS: Set[String] = Set.empty

val supportedModes: Set[DeltaColumnMappingMode] =
Set(NoMapping, NameMapping)

def isInternalField(field: StructField): Boolean = DELTA_INTERNAL_COLUMNS
.contains(field.name.toLowerCase(Locale.ROOT))

Expand Down Expand Up @@ -92,6 +96,10 @@ trait DeltaColumnMappingBase {
val oldMappingMode = oldMetadata.columnMappingMode
val newMappingMode = newMetadata.columnMappingMode

if (!supportedModes.contains(newMappingMode)) {
throw DeltaErrors.unsupportedColumnMappingMode(newMappingMode.name)
}

val isChangingModeOnExistingTable = oldMappingMode != newMappingMode && !isCreatingNewTable
if (isChangingModeOnExistingTable) {
if (!allowMappingModeChange(oldMappingMode, newMappingMode)) {
Expand Down Expand Up @@ -162,7 +170,7 @@ trait DeltaColumnMappingBase {
.build()

case mode =>
throw DeltaErrors.unknownColumnMappingMode(mode.name)
throw DeltaErrors.unsupportedColumnMappingMode(mode.name)
}
}

Expand Down Expand Up @@ -226,7 +234,7 @@ trait DeltaColumnMappingBase {
case NoMapping =>
newMetadata
case mode =>
throw DeltaErrors.unknownColumnMappingMode(mode.name)
throw DeltaErrors.unsupportedColumnMappingMode(mode.name)
}
}

Expand All @@ -250,7 +258,7 @@ trait DeltaColumnMappingBase {

// use id mapping to keep all column mapping metadata
// this method checks for missing physical name & column id already
val physicalSchema = createPhysicalSchema(schema, schema, IdMapping)
val physicalSchema = createPhysicalSchema(schema, schema, IdMapping, checkSupportedMode = false)

SchemaMergingUtils.transformColumns(physicalSchema) ((parentPhysicalPath, field, _) => {
// field.name is now physical name
Expand Down Expand Up @@ -349,14 +357,23 @@ trait DeltaColumnMappingBase {
* @param referenceSchema the schema from the delta log, which has all the metadata
* @param columnMappingMode column mapping mode of the delta table, which determines which
* metadata to fill in
* @param checkSupportedMode whether we should check of the column mapping mode is supported
*/
def createPhysicalSchema(
schema: StructType,
referenceSchema: StructType,
columnMappingMode: DeltaColumnMappingMode): StructType = {
columnMappingMode: DeltaColumnMappingMode,
checkSupportedMode: Boolean = true): StructType = {
if (columnMappingMode == NoMapping) {
return schema
}

// createPhysicalSchema is the narrow-waist for both read/write code path
// so we could check for mode support here
if (checkSupportedMode && !supportedModes.contains(columnMappingMode)) {
throw DeltaErrors.unsupportedColumnMappingMode(columnMappingMode.name)
}

SchemaMergingUtils.transformColumns(schema) { (path, field, _) =>
val fullName = path :+ field.name
val inSchema = SchemaUtils
Expand All @@ -374,6 +391,28 @@ trait DeltaColumnMappingBase {
}
}

/**
* Create a list of physical attributes for the given attributes using the table schema as a
* reference.
*
* @param output the list of attributes (potentially without any metadata)
* @param referenceSchema the table schema with all the metadata
* @param columnMappingMode column mapping mode of the delta table, which determines which
* metadata to fill in
*/
def createPhysicalAttributes(
output: Seq[Attribute],
referenceSchema: StructType,
columnMappingMode: DeltaColumnMappingMode): Seq[Attribute] = {
// Assign correct column mapping info to columns according to the schema
val struct = createPhysicalSchema(output.toStructType, referenceSchema, columnMappingMode)
output.zip(struct).map { case (attr, field) =>
attr.withDataType(field.dataType) // for recursive column names and metadata
.withMetadata(field.metadata)
.withName(field.name)
}
}

}

object DeltaColumnMapping extends DeltaColumnMappingBase
Expand Down Expand Up @@ -423,7 +462,7 @@ object DeltaColumnMappingMode {
case NoMapping.name => NoMapping
case IdMapping.name => IdMapping
case NameMapping.name => NameMapping
case mode => throw DeltaErrors.unknownColumnMappingMode(mode)
case mode => throw DeltaErrors.unsupportedColumnMappingMode(mode)
}
}
}
12 changes: 3 additions & 9 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1192,10 +1192,9 @@ object DeltaErrors
* We have plans to support more column mapping modes, but they are not implemented yet,
* so we error for now to be forward compatible with tables created in the future.
*/
def unknownColumnMappingMode(mode: String): Throwable =
new ColumnMappingUnsupportedException(s"The column mapping mode `$mode` is not" +
s" supported. Supported modes in this version are: `none` and `id`." +
s" Please upgrade Delta to access this table.")
def unsupportedColumnMappingMode(mode: String): Throwable =
new ColumnMappingUnsupportedException(s"The column mapping mode `$mode` is " +
s"not supported for this Delta version. Please upgrade if you want to use this mode.")

def missingColumnId(mode: DeltaColumnMappingMode, field: String): Throwable = {
ColumnMappingException(s"Missing column ID in column mapping mode `${mode.name}`" +
Expand Down Expand Up @@ -1231,11 +1230,6 @@ object DeltaErrors
s" '$oldMode' to '$newMode' is not supported.")
}

def writesWithColumnMappingNotSupported: Throwable = {
new ColumnMappingUnsupportedException("Writing data with column mapping mode is not " +
"supported.")
}

def generateManifestWithColumnMappingNotSupported: Throwable = {
new ColumnMappingUnsupportedException("Manifest generation is not supported for tables that " +
"leverage column mapping, as external readers cannot read these Delta tables. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import org.apache.spark.sql.delta.actions.Metadata

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat

trait DeltaFileFormat {
// TODO: Add support for column mapping
Expand All @@ -35,5 +34,6 @@ trait DeltaFileFormat {
* transaction, so if possible, we should always pass in the latest transaction's metadata
* instead of one from a past snapshot.
*/
def fileFormat(metadata: Metadata = metadata): FileFormat = new ParquetFileFormat()
def fileFormat(metadata: Metadata = metadata): FileFormat =
new DeltaParquetFileFormat(metadata.columnMappingMode, metadata.schema)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import org.apache.hadoop.conf.Configuration

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType

/** A thin wrapper over the Parquet file format to support columns names without restrictions. */
class DeltaParquetFileFormat(
val columnMappingMode: DeltaColumnMappingMode,
val referenceSchema: StructType)
extends ParquetFileFormat {

private def prepareSchema(inputSchema: StructType): StructType = {
DeltaColumnMapping.createPhysicalSchema(inputSchema, referenceSchema, columnMappingMode)
}

/**
* We sometimes need to replace FileFormat within LogicalPlans, so we have to override
* `equals` to ensure file format changes are captured
*/
override def equals(other: Any): Boolean = {
other match {
case ff: DeltaParquetFileFormat
=> ff.columnMappingMode == columnMappingMode && ff.referenceSchema == referenceSchema
case _ => false
}
}

override def hashCode(): Int = getClass.hashCode()

override def buildReaderWithPartitionValues(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
super.buildReaderWithPartitionValues(
sparkSession,
prepareSchema(dataSchema),
prepareSchema(partitionSchema),
prepareSchema(requiredSchema),
filters,
options,
hadoopConf)
}

override def supportFieldName(name: String): Boolean = {
if (columnMappingMode != NoMapping) true else super.supportFieldName(name)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ case class MergeIntoCommand(
// In cases of schema evolution, they may not be the same type as the original attributes.
val original =
deltaTxn.deltaLog.createDataFrame(deltaTxn.snapshot, files).queryExecution.analyzed
original.transform {
val transformed = original.transform {
case LogicalRelation(base, output, catalogTbl, isStreaming) =>
LogicalRelation(
base,
Expand All @@ -631,6 +631,16 @@ case class MergeIntoCommand(
catalogTbl,
isStreaming)
}

// In case of schema evolution & column mapping, we would also need to rebuild the file format
// because under column mapping, the reference schema within DeltaParquetFileFormat
// that is used to populate metadata needs to be updated
if (deltaTxn.metadata.columnMappingMode != NoMapping) {
val updatedFileFormat = deltaTxn.deltaLog.fileFormat(deltaTxn.metadata)
DeltaTableUtils.replaceFileFormat(transformed, updatedFileFormat)
} else {
transformed
}
}

// For each plan output column, find the corresponding target output column (by name) and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl
protected def mapColumnAttributes(
output: Seq[Attribute],
mappingMode: DeltaColumnMappingMode): Seq[Attribute] = {
throw DeltaErrors.writesWithColumnMappingNotSupported
DeltaColumnMapping.createPhysicalAttributes(output, metadata.schema, mappingMode)
}

/**
Expand Down
Loading

0 comments on commit 5fa760f

Please sign in to comment.