Skip to content

Commit

Permalink
Basic functionality of Delta Table Features
Browse files Browse the repository at this point in the history
This PR implements Table Features proposed in the feature request (#1408) and the PROTOCOL doc (#1450).

This PR implements the basic functionality, including
- The protocol structure and necessary APIs
- Protocol upgrade logic
- Append-only feature ported to Table Features
- Protocol upgrade path
- User-facing APIs, such as allowing referencing features manually
- Partial test coverage

Not covered by this PR:
- Adapt all features
- Full test coverage
- Make `DESCRIBE TABLE` show referenced features
- Enable table clone and time travel paths

Table Features support starts from reader protocol version `3` and writer version `7`. When supported, features can be **referenced** by a protocol by placing a `DeltaFeatureDescriptor` into the protocol's `readerFeatures` and/or `writerFeatures`.

A feature can be one of two types: writer-only and reader-writer. The first type means that only writers must care about such a feature, while the latter means that in addition to writers, readers must also be aware of the feature to read the data correctly. We now have the following features released:

- `appendOnly`: legacy, writer-only
- `invariants`: legacy, writer-only
- `checkConstriants`: legacy, writer-only
- `changeDataFeed`: legacy, writer-only
- `generatedColumns`: legacy, writer-only
- `columnMapping`: legacy, reader-writer
- `identityColumn`: legacy, writer-only
- `deletionVector`: native, reader-writer

Some examples of the `protocol` action:

```scala
// Valid protocol. Both reader and writer versions are capable.
Protocol(
  minReaderVersion = 3,
  minWriterVersion = 7,
  readerFeatures = {(columnMapping,enabled), (changeDataFeed,enabled)},
  writerFeatures = {(appendOnly,enabled), (columnMapping,enabled), (changeDataFeed,enabled)})

// Valid protocol. Only writer version is capable. "columnMapping" is implicitly enabled by readers.
Protocol(
  minReaderVersion = 2,
  minWriterVersion = 7,
  readerFeatures = None,
  writerFeatures = {(columnMapping,enabled)})

// Invalid protocol. Reader version does enable "columnMapping" implicitly.
Protocol(
  minReaderVersion = 1,
  minWriterVersion = 7,
  readerFeatures = None,
  writerFeatures = {(columnMapping,enabled)})
```

When reading or writing a table, clients MUST respect all enabled features.

Upon table creation, the system assigns the table a minimum protocol that satisfies all features that are **automatically enabled** in the table's metadata. This means the table can be on a "legacy" protocol with both `readerFeatures` and `writerFeatures` set to `None` (if all active features are legacy, which is the current behavior) or be on a Table Features-capable protocol with all active features explicitly referenced in `readerFeatures` and/or `writerFeatures` (if one of the active features is Table Features-native or the user has specified a Table Features-capable protocol version).

It's possible to upgrade an existing table to support table features. The update can be either for writers or for both readers and writers. During the upgrade, the system will explicitly reference all legacy features that are implicitly supported by the old protocol.

Users can mark a feature to be required by a table by using the following commands:
```sql
-- for an existing table
ALTER TABLE table_name SET TBLPROPERTIES (delta.feature.featureName = 'enabled')
-- for a new table
CREATE TABLE table_name ... TBLPROPERTIES (delta.feature.featureName = 'enabled')
-- for all new tables
SET spark.databricks.delta.properties.defaults.feature.featureName = 'enabled'
```
When some features are set to `enabled` in table properties and some others in Spark sessions, the final table will enable all features defined in two places:
```sql
SET spark.databricks.delta.properties.defaults.feature.featureA = 'enabled';
CREATE TABLE table_name ... TBLPROPERTIES (delta.feature.featureB = 'enabled')
--- 'table_name' will have 'featureA' and 'featureB' enabled.
```
Closes #1520

GitOrigin-RevId: 2b05f397b24e57f1804761b3242a0f29098a209c
  • Loading branch information
xupefei authored and allisonport-db committed Dec 15, 2022
1 parent 9fc7da8 commit 4a8786b
Show file tree
Hide file tree
Showing 24 changed files with 1,794 additions and 209 deletions.
48 changes: 48 additions & 0 deletions core/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,18 @@
],
"sqlState" : "42000"
},
"DELTA_FEATURE_REQUIRES_HIGHER_READER_VERSION" : {
"message" : [
"Unable to enable table feature <feature> because it requires a higher reader protocol version (current <current>). Consider upgrading the table's reader protocol version to <required>, or to a version which supports reader table features. Refer to <docLink> for more information on table protocol versions."
],
"sqlState" : "42000"
},
"DELTA_FEATURE_REQUIRES_HIGHER_WRITER_VERSION" : {
"message" : [
"Unable to enable table feature <feature> because it requires a higher writer protocol version (current <current>). Consider upgrading the table's writer protocol version to <required>, or to a version which supports writer table features. Refer to <docLink> for more information on table protocol versions."
],
"sqlState" : "42000"
},
"DELTA_FILE_ALREADY_EXISTS" : {
"message" : [
"Existing file path <path>"
Expand Down Expand Up @@ -852,6 +864,12 @@
],
"sqlState" : "42000"
},
"DELTA_INVALID_PROTOCOL_VERSION" : {
"message" : [
"Delta protocol version is too new for this version of Delta Lake: table requires <required>, client supports up to <supported>. Please upgrade to a newer release."
],
"sqlState" : "42000"
},
"DELTA_INVALID_SOURCE_VERSION" : {
"message" : [
"sourceVersion(<version>) is invalid"
Expand Down Expand Up @@ -1273,6 +1291,12 @@
],
"sqlState" : "22000"
},
"DELTA_READ_FEATURE_PROTOCOL_REQUIRES_WRITE" : {
"message" : [
"Unable to upgrade only the reader protocol version to use table features. Writer protocol version must be at least <writerVersion> to proceed. Refer to <docLink> for more information on table protocol versions."
],
"sqlState" : "42000"
},
"DELTA_READ_TABLE_WITHOUT_COLUMNS" : {
"message" : [
"You are trying to read a table <tableName> without columns using Delta.",
Expand Down Expand Up @@ -1786,6 +1810,30 @@
],
"sqlState" : "0A000"
},
"DELTA_UNSUPPORTED_FEATURES_FOR_READ" : {
"message" : [
"Unable to read this table because it requires reader table features that are unsupported by this version of Delta Lake: <unsupported>."
],
"sqlState" : "42000"
},
"DELTA_UNSUPPORTED_FEATURES_FOR_WRITE" : {
"message" : [
"Unable to write this table because it requires writer table features that are unsupported by this version of Delta Lake: <unsupported>."
],
"sqlState" : "42000"
},
"DELTA_UNSUPPORTED_FEATURES_IN_CONFIG" : {
"message" : [
"Table features configured in the following Spark configs or Delta table properties are not recognized by this version of Delta Lake: <configs>."
],
"sqlState" : "42000"
},
"DELTA_UNSUPPORTED_FEATURE_STATUS" : {
"message" : [
"Expecting the status for table feature <feature> to be \"enabled\", but got \"<status>\"."
],
"sqlState" : "42000"
},
"DELTA_UNSUPPORTED_FIELD_UPDATE_NON_STRUCT" : {
"message" : [
"Updating nested fields is only supported for StructType, but you are trying to update a field of <columnName>, which is of type: <dataType>."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,8 @@ private[delta] class ConflictChecker(
deltaLog.protocolRead(p)
deltaLog.protocolWrite(p)
}
currentTransactionInfo.actions.foreach {
case Protocol(_, _) =>
throw DeltaErrors.protocolChangedException(winningCommitSummary.commitInfo)
case _ =>
if (currentTransactionInfo.actions.exists(_.isInstanceOf[Protocol])) {
throw DeltaErrors.protocolChangedException(winningCommitSummary.commitInfo)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ trait DeltaColumnMappingBase extends DeltaLogging {
} else {
// legal mode change, now check if protocol is upgraded before or part of this txn
val caseInsensitiveMap = CaseInsensitiveMap(newMetadata.configuration)
val newProtocol = new Protocol(
val newProtocol = Protocol(
minReaderVersion = caseInsensitiveMap
.get(Protocol.MIN_READER_VERSION_PROP).map(_.toInt)
.getOrElse(oldProtocol.minReaderVersion),
Expand Down
51 changes: 28 additions & 23 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.delta

import java.util.{HashMap, Locale}

import org.apache.spark.sql.delta.actions.{Action, Metadata, Protocol}
import org.apache.spark.sql.delta.actions.{Action, Metadata, Protocol, TableFeatureProtocolUtils}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.DataSkippingReader
Expand Down Expand Up @@ -147,31 +147,36 @@ trait DeltaConfigsBase extends DeltaLogging {
* Validates specified configurations and returns the normalized key -> value map.
*/
def validateConfigurations(configurations: Map[String, String]): Map[String, String] = {
configurations.map {
case kv @ (key, value) if key.toLowerCase(Locale.ROOT).startsWith("delta.constraints.") =>
// This is a CHECK constraint, we should allow it.
kv
case kv @ (key, value) if key.toLowerCase(Locale.ROOT).startsWith("delta.") =>
Option(entries.get(key.toLowerCase(Locale.ROOT).stripPrefix("delta."))) match {
case Some(deltaConfig) => deltaConfig(value) // validate the value
case None if
SparkSession.active.sessionState.conf
.getConf(DeltaSQLConf.ALLOW_ARBITRARY_TABLE_PROPERTIES)
=>
logConsole(s"You are setting a property: $key that is not recognized by this " +
s"version of Delta")
kv
case None => throw DeltaErrors.unknownConfigurationKeyException(key)
}
case keyvalue @ (key, _) =>
if (entries.containsKey(key.toLowerCase(Locale.ROOT))) {
logConsole(
s"""
val allowArbitraryProperties = SparkSession.active.sessionState.conf
.getConf(DeltaSQLConf.ALLOW_ARBITRARY_TABLE_PROPERTIES)

configurations.map { case kv @ (key, value) =>
key.toLowerCase(Locale.ROOT) match {
case lKey if lKey.startsWith("delta.constraints.") =>
// This is a CHECK constraint, we should allow it.
kv
case lKey if lKey.startsWith(TableFeatureProtocolUtils.FEATURE_PROP_PREFIX) =>
// This is a table feature, we should allow it.
lKey -> value
case lKey if lKey.startsWith("delta.") =>
Option(entries.get(lKey.stripPrefix("delta."))) match {
case Some(deltaConfig) => deltaConfig(value) // validate the value
case None if allowArbitraryProperties =>
logConsole(
s"You are setting a property: $key that is not recognized by this " +
"version of Delta")
kv
case None => throw DeltaErrors.unknownConfigurationKeyException(key)
}
case _ =>
if (entries.containsKey(key)) {
logConsole(s"""
|You are trying to set a property the key of which is the same as Delta config: $key.
|If you are trying to set a Delta config, prefix it with "delta.", e.g. 'delta.$key'.
""".stripMargin)
}
keyvalue
}
kv
}
}
}

Expand Down
91 changes: 90 additions & 1 deletion core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ trait DocsPath {
"incorrectLogStoreImplementationException",
"sourceNotDeterministicInMergeException",
"columnMappingAdviceMessage",
"icebergClassMissing"
"icebergClassMissing",
"tableFeatureReadRequiresWriteException",
"tableFeatureRequiresHigherReaderProtocolVersion",
"tableFeatureRequiresHigherWriterProtocolVersion"
)
}

Expand Down Expand Up @@ -2078,6 +2081,70 @@ trait DeltaErrorsBase
new io.delta.exceptions.ProtocolChangedException(message)
}

def unsupportedReaderTableFeaturesInTableException(
unsupported: Iterable[String]): DeltaTableFeatureException = {
new DeltaTableFeatureException(
errorClass = "DELTA_UNSUPPORTED_FEATURES_FOR_READ",
messageParameters = Array(unsupported.mkString(", ")))
}

def unsupportedWriterTableFeaturesInTableException(
unsupported: Iterable[String]): DeltaTableFeatureException = {
new DeltaTableFeatureException(
errorClass = "DELTA_UNSUPPORTED_FEATURES_FOR_WRITE",
messageParameters = Array(unsupported.mkString(", ")))
}

def unsupportedTableFeatureConfigsException(
configs: Iterable[String]): DeltaTableFeatureException = {
new DeltaTableFeatureException(
errorClass = "DELTA_UNSUPPORTED_FEATURES_IN_CONFIG",
messageParameters = Array(configs.mkString(", ")))
}

def unsupportedTableFeatureStatusException(
feature: String,
status: String): DeltaTableFeatureException = {
new DeltaTableFeatureException(
errorClass = "DELTA_UNSUPPORTED_FEATURE_STATUS",
messageParameters = Array(feature, status))
}

def tableFeatureReadRequiresWriteException(
requiredWriterVersion: Int): DeltaTableFeatureException = {
new DeltaTableFeatureException(
errorClass = "DELTA_READ_FEATURE_PROTOCOL_REQUIRES_WRITE",
messageParameters = Array(
requiredWriterVersion.toString,
generateDocsLink(SparkSession.active.sparkContext.getConf, "/index.html")))
}

def tableFeatureRequiresHigherReaderProtocolVersion(
feature: String,
currentVersion: Int,
requiredVersion: Int): DeltaTableFeatureException = {
new DeltaTableFeatureException(
errorClass = "DELTA_FEATURE_REQUIRES_HIGHER_READER_VERSION",
messageParameters = Array(
feature,
currentVersion.toString,
requiredVersion.toString,
generateDocsLink(SparkSession.active.sparkContext.getConf, "/index.html")))
}

def tableFeatureRequiresHigherWriterProtocolVersion(
feature: String,
currentVersion: Int,
requiredVersion: Int): DeltaTableFeatureException = {
new DeltaTableFeatureException(
errorClass = "DELTA_FEATURE_REQUIRES_HIGHER_WRITER_VERSION",
messageParameters = Array(
feature,
currentVersion.toString,
requiredVersion.toString,
generateDocsLink(SparkSession.active.sparkContext.getConf, "/index.html")))
}

def concurrentAppendException(
conflictingCommit: Option[CommitInfo],
partition: String,
Expand Down Expand Up @@ -2746,6 +2813,28 @@ class DeltaIndexOutOfBoundsException(
override def getErrorClass: String = errorClass
}

/** Thrown when the protocol version of a table is greater than supported by this client. */
class InvalidProtocolVersionException(requiredVersion: Int, supportedVersion: Int)
extends RuntimeException(DeltaThrowableHelper.getMessage(
errorClass = "DELTA_INVALID_PROTOCOL_VERSION",
messageParameters = Array(requiredVersion.toString, supportedVersion.toString)))
with DeltaThrowable {
override def getErrorClass: String = "DELTA_INVALID_PROTOCOL_VERSION"
}

class ProtocolDowngradeException(oldProtocol: Protocol, newProtocol: Protocol)
extends RuntimeException(DeltaThrowableHelper.getMessage(
errorClass = "DELTA_INVALID_PROTOCOL_DOWNGRADE",
messageParameters = Array(s"(${oldProtocol.simpleString})", s"(${newProtocol.simpleString})")
)) with DeltaThrowable {
override def getErrorClass: String = "DELTA_INVALID_PROTOCOL_DOWNGRADE"
}

class DeltaTableFeatureException(
errorClass: String,
messageParameters: Array[String] = Array.empty)
extends DeltaRuntimeException(errorClass, messageParameters)

class DeltaRuntimeException(
errorClass: String,
messageParameters: Array[String] = Array.empty)
Expand Down
Loading

0 comments on commit 4a8786b

Please sign in to comment.