Skip to content

Commit

Permalink
Add Delta Lake $properties system table
Browse files Browse the repository at this point in the history
  • Loading branch information
jkylling authored and ebyhr committed Jul 19, 2023
1 parent 9e6cc47 commit 1e188a1
Show file tree
Hide file tree
Showing 10 changed files with 286 additions and 1 deletion.
20 changes: 20 additions & 0 deletions docs/src/main/sphinx/connector/delta-lake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,26 @@ The output of the query has the following history columns:
- ``BOOLEAN``
- Whether or not the operation appended data

``$properties`` table
~~~~~~~~~~~~~~~~~~~~~

The ``$properties`` table provides access to Delta Lake table configuration,
table features and table properties. The table rows are key/value pairs.

You can retrieve the properties of the Delta
table ``test_table`` by using the following query::

SELECT * FROM "test_table$properties"

.. code-block:: text
key | value |
----------------------------+-----------------+
delta.minReaderVersion | 1 |
delta.minWriterVersion | 4 |
delta.columnMapping.mode | name |
delta.feature.columnMapping | supported |
.. _delta-lake-special-columns:

Metadata columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3097,8 +3097,9 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema

String tableLocation = table.get().location();
TableSnapshot tableSnapshot = getSnapshot(new SchemaTableName(systemTableName.getSchemaName(), tableName), tableLocation, session);
MetadataEntry metadataEntry;
try {
transactionLogAccess.getMetadataEntry(tableSnapshot, session);
metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, session);
}
catch (TrinoException e) {
if (e.getErrorCode().equals(DELTA_LAKE_INVALID_SCHEMA.toErrorCode())) {
Expand All @@ -3113,6 +3114,10 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
systemTableName,
getCommitInfoEntries(tableLocation, session),
typeManager));
case PROPERTIES -> Optional.of(new DeltaLakePropertiesTable(
systemTableName,
metadataEntry,
transactionLogAccess.getProtocolEntry(session, tableSnapshot)));
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.plugin.deltalake.util.PageListBuilder;
import io.trino.spi.Page;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.FixedPageSource;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.predicate.TupleDomain;

import java.util.List;

import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;

public class DeltaLakePropertiesTable
implements SystemTable
{
private static final String DELTA_FEATURE_PREFIX = "delta.feature.";
private static final String MIN_READER_VERSION_KEY = "delta.minReaderVersion";
private static final String MIN_WRITER_VERSION_KEY = "delta.minWriterVersion";

private static final List<ColumnMetadata> COLUMNS = ImmutableList.<ColumnMetadata>builder()
.add(new ColumnMetadata("key", VARCHAR))
.add(new ColumnMetadata("value", VARCHAR))
.build();

private final ConnectorTableMetadata tableMetadata;
private final MetadataEntry metadataEntry;
private final ProtocolEntry protocolEntry;

public DeltaLakePropertiesTable(SchemaTableName tableName, MetadataEntry metadataEntry, ProtocolEntry protocolEntry)
{
this.metadataEntry = requireNonNull(metadataEntry, "metadataEntry is null");
this.protocolEntry = requireNonNull(protocolEntry, "protocolEntry is null");
this.tableMetadata = new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"), COLUMNS);
}

@Override
public Distribution getDistribution()
{
return Distribution.SINGLE_COORDINATOR;
}

@Override
public ConnectorTableMetadata getTableMetadata()
{
return tableMetadata;
}

@Override
public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
return new FixedPageSource(buildPages());
}

private List<Page> buildPages()
{
PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata);

metadataEntry.getConfiguration().forEach((key, value) -> {
pagesBuilder.beginRow();
pagesBuilder.appendVarchar(key);
pagesBuilder.appendVarchar(value);
pagesBuilder.endRow();
});

pagesBuilder.beginRow();
pagesBuilder.appendVarchar(MIN_READER_VERSION_KEY);
pagesBuilder.appendVarchar(String.valueOf(protocolEntry.getMinReaderVersion()));
pagesBuilder.endRow();

pagesBuilder.beginRow();
pagesBuilder.appendVarchar(MIN_WRITER_VERSION_KEY);
pagesBuilder.appendVarchar(String.valueOf(protocolEntry.getMinWriterVersion()));
pagesBuilder.endRow();

ImmutableSet.<String>builder()
.addAll(protocolEntry.getReaderFeatures().orElseGet(ImmutableSet::of))
.addAll(protocolEntry.getWriterFeatures().orElseGet(ImmutableSet::of))
.build().forEach(feature -> {
pagesBuilder.beginRow();
pagesBuilder.appendVarchar(DELTA_FEATURE_PREFIX + feature);
pagesBuilder.appendVarchar("supported");
pagesBuilder.endRow();
});

return pagesBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ public enum DeltaLakeTableType
{
DATA,
HISTORY,
PROPERTIES,
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,11 @@ public void testShowSchemas()
showCreateDeltaLakeWithRedirectionsSchema,
getExpectedDeltaLakeCreateSchema("delta_with_redirections"));
}

@Test
public void testPropertiesTable()
{
assertThat(query("SELECT * FROM delta_with_redirections." + schema + ".\"delta_table$properties\""))
.matches("SELECT * FROM hive_with_redirections." + schema + ".\"delta_table$properties\"");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ private void testCorruptedTableLocation(String tableName, Path tableLocation, bo
// Assert queries fail cleanly
assertQueryFails("TABLE " + tableName, "Metadata not found in transaction log for tpch." + tableName);
assertQueryFails("SELECT * FROM \"" + tableName + "$history\"", ".* Table '.*\\$history' does not exist");
assertQueryFails("SELECT * FROM \"" + tableName + "$properties\"", ".* Table '.*\\$properties' does not exist");
assertQueryFails("SELECT * FROM " + tableName + " WHERE false", "Metadata not found in transaction log for tpch." + tableName);
assertQueryFails("SELECT 1 FROM " + tableName + " WHERE false", "Metadata not found in transaction log for tpch." + tableName);
assertQueryFails("SHOW CREATE TABLE " + tableName, "Metadata not found in transaction log for tpch." + tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,17 @@ public void testHistoryTable()
assertUpdate("DROP TABLE IF EXISTS test_checkpoint_table");
}
}

@Test
public void testPropertiesTable()
{
String tableName = "test_simple_properties_table";
try {
assertUpdate("CREATE TABLE " + tableName + " (_bigint BIGINT) WITH (change_data_feed_enabled = true, checkpoint_interval = 5)");
assertQuery("SELECT * FROM \"" + tableName + "$properties\"", "VALUES ('delta.enableChangeDataFeed', 'true'), ('delta.checkpointInterval', '5'), ('delta.minReaderVersion', '1'), ('delta.minWriterVersion', '4')");
}
finally {
assertUpdate("DROP TABLE IF EXISTS " + tableName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static io.trino.plugin.deltalake.DeltaLakeTableType.DATA;
import static io.trino.plugin.deltalake.DeltaLakeTableType.HISTORY;
import static io.trino.plugin.deltalake.DeltaLakeTableType.PROPERTIES;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.testing.assertions.TrinoExceptionAssert.assertTrinoExceptionThrownBy;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -33,6 +34,7 @@ public void testParse()
{
assertParseNameAndType("abc", "abc", DATA);
assertParseNameAndType("abc$history", "abc", DeltaLakeTableType.HISTORY);
assertParseNameAndType("abc$properties", "abc", DeltaLakeTableType.PROPERTIES);

assertNoValidTableType("abc$data");
assertInvalid("abc@123", "Invalid Delta Lake table name: abc@123");
Expand All @@ -58,6 +60,7 @@ public void testTableNameFrom()
assertEquals(DeltaLakeTableName.tableNameFrom("abc"), "abc");
assertEquals(DeltaLakeTableName.tableNameFrom("abc$data"), "abc");
assertEquals(DeltaLakeTableName.tableNameFrom("abc$history"), "abc");
assertEquals(DeltaLakeTableName.tableNameFrom("abc$properties"), "abc");
assertEquals(DeltaLakeTableName.tableNameFrom("abc$invalid"), "abc");
}

Expand All @@ -67,6 +70,7 @@ public void testTableTypeFrom()
assertEquals(DeltaLakeTableName.tableTypeFrom("abc"), Optional.of(DATA));
assertEquals(DeltaLakeTableName.tableTypeFrom("abc$data"), Optional.empty()); // it's invalid
assertEquals(DeltaLakeTableName.tableTypeFrom("abc$history"), Optional.of(HISTORY));
assertEquals(DeltaLakeTableName.tableTypeFrom("abc$properties"), Optional.of(PROPERTIES));

assertEquals(DeltaLakeTableName.tableTypeFrom("abc$invalid"), Optional.empty());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.tests.product.deltalake;

import com.google.common.collect.ImmutableList;
import io.trino.tempto.assertions.QueryAssert.Row;
import io.trino.tempto.query.QueryResult;
import io.trino.testng.services.Flaky;
import org.testng.annotations.Test;

import java.util.List;

import static io.trino.tempto.assertions.QueryAssert.Row.row;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_104;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_113;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_73;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_91;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_OSS;
import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS;
import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE;
import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH;
import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry;
import static io.trino.tests.product.utils.QueryExecutors.onDelta;
import static io.trino.tests.product.utils.QueryExecutors.onTrino;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;

public class TestDeltaLakeSystemTableCompatibility
extends BaseTestDeltaLakeS3Storage
{
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testTablePropertiesCaseSensitivity()
{
String tableName = "test_dl_table_properties_case_sensitivity_" + randomNameSuffix();
String tableDirectory = "databricks-compatibility-test-" + tableName;

onDelta().executeQuery(format("CREATE TABLE default.%s (col INT) USING DELTA LOCATION 's3://%s/%s' TBLPROPERTIES ('test_key'='test_value', 'Test_Key'='Test_Mixed_Case')",
tableName,
bucketName,
tableDirectory));
List<Row> expectedRows = ImmutableList.of(
row("test_key", "test_value"),
row("Test_Key", "Test_Mixed_Case"));
try {
QueryResult deltaResult = onDelta().executeQuery("SHOW TBLPROPERTIES default." + tableName);
QueryResult trinoResult = onTrino().executeQuery("SELECT * FROM default.\"" + tableName + "$properties\"");
assertThat(deltaResult).contains(expectedRows);
assertThat(trinoResult).contains(expectedRows);
assertThat(trinoResult.rows()).containsExactlyInAnyOrderElementsOf(deltaResult.rows());
}
finally {
dropDeltaTableWithRetry("default." + tableName);
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testTablePropertiesWithTableFeatures()
{
String tableName = "test_dl_table_properties_with_features_" + randomNameSuffix();
String tableDirectory = "databricks-compatibility-test-" + tableName;

onDelta().executeQuery(format("CREATE TABLE default.%s (col INT) USING DELTA LOCATION 's3://%s/%s'" +
" TBLPROPERTIES ('delta.minReaderVersion'='3', 'delta.minWriterVersion'='7', 'delta.columnMapping.mode'='id')",
tableName,
bucketName,
tableDirectory));

List<Row> expectedRows = ImmutableList.of(
row("delta.columnMapping.mode", "id"),
row("delta.feature.columnMapping", "supported"),
row("delta.minReaderVersion", "3"),
row("delta.minWriterVersion", "7"));
try {
QueryResult deltaResult = onDelta().executeQuery("SHOW TBLPROPERTIES default." + tableName);
QueryResult trinoResult = onTrino().executeQuery("SELECT * FROM default.\"" + tableName + "$properties\"");
assertThat(deltaResult).contains(expectedRows);
assertThat(trinoResult).contains(expectedRows);
assertThat(trinoResult.rows()).containsExactlyInAnyOrderElementsOf(deltaResult.rows());
}
finally {
dropDeltaTableWithRetry("default." + tableName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,30 @@ public void testViewReferencingHiveAndDeltaTable(boolean legacyHiveViewTranslati
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testHiveToDeltaPropertiesRedirect()
{
String tableName = "test_redirect_to_delta_properties_" + randomNameSuffix();

onDelta().executeQuery("" +
"CREATE TABLE " + tableName + " USING DELTA " +
"LOCATION '" + locationForTable(tableName) + "' " +
" AS SELECT true AS a_boolean");

List<Row> expected = List.of(
row("delta.minReaderVersion", "1"),
row("delta.minWriterVersion", "2"));

try {
assertThat(onTrino().executeQuery(format("SELECT * FROM delta.default.\"%s$properties\"", tableName))).containsOnly(expected);
assertThat(onTrino().executeQuery(format("SELECT * FROM hive.default.\"%s$properties\"", tableName))).containsOnly(expected);
}
finally {
dropDeltaTableWithRetry("default." + tableName);
}
}

@DataProvider
public Object[][] trueFalse()
{
Expand Down

0 comments on commit 1e188a1

Please sign in to comment.