Skip to content

Commit

Permalink
Support creating tables with table comment in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed May 27, 2022
1 parent 2db4430 commit 67e5478
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
tableHandle.getSchemaTableName(),
columns,
properties.buildOrThrow(),
Optional.empty());
Optional.ofNullable(tableHandle.getMetadataEntry().getDescription()));
}

@Override
Expand Down Expand Up @@ -626,7 +626,8 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
CREATE_TABLE_OPERATION,
session,
nodeVersion,
nodeId);
nodeId,
tableMetadata.getComment());

setRollback(() -> deleteRecursivelyIfExists(new HdfsContext(session), hdfsEnvironment, deltaLogDirectory));
transactionLogWriter.flush();
Expand Down Expand Up @@ -735,7 +736,8 @@ public DeltaLakeOutputTableHandle beginCreateTable(ConnectorSession session, Con
tableMetadata.getColumns().stream().map(column -> toColumnHandle(column, partitionedBy)).collect(toImmutableList()),
location,
DeltaLakeTableProperties.getCheckpointInterval(tableMetadata.getProperties()),
external);
external,
tableMetadata.getComment());
}

private Optional<String> getSchemaLocation(Database database)
Expand Down Expand Up @@ -885,7 +887,8 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(
CREATE_TABLE_AS_OPERATION,
session,
nodeVersion,
nodeId);
nodeId,
handle.getComment());
appendAddFileEntries(transactionLogWriter, dataFileInfos, handle.getPartitionedBy(), true);
transactionLogWriter.flush();
PrincipalPrivileges principalPrivileges = buildInitialPrivilegeSet(table.getOwner().orElseThrow());
Expand Down Expand Up @@ -960,7 +963,8 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
ADD_COLUMN_OPERATION,
session,
nodeVersion,
nodeId);
nodeId,
Optional.ofNullable(handle.getMetadataEntry().getDescription()));
transactionLogWriter.flush();
}
catch (Exception e) {
Expand All @@ -978,7 +982,8 @@ private static void appendTableEntries(
String operation,
ConnectorSession session,
String nodeVersion,
String nodeId)
String nodeId,
Optional<String> comment)
{
long createdTime = System.currentTimeMillis();
transactionLogWriter.appendCommitInfoEntry(
Expand All @@ -1002,7 +1007,7 @@ private static void appendTableEntries(
new MetadataEntry(
tableId,
null,
null,
comment.orElse(null),
new Format("parquet", ImmutableMap.of()),
serializeSchemaAsJson(columns),
partitionColumnNames,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class DeltaLakeOutputTableHandle
private final String location;
private final Optional<Long> checkpointInterval;
private final boolean external;
private final Optional<String> comment;

@JsonCreator
public DeltaLakeOutputTableHandle(
Expand All @@ -43,14 +44,16 @@ public DeltaLakeOutputTableHandle(
@JsonProperty("inputColumns") List<DeltaLakeColumnHandle> inputColumns,
@JsonProperty("location") String location,
@JsonProperty("checkpointInterval") Optional<Long> checkpointInterval,
@JsonProperty("external") boolean external)
@JsonProperty("external") boolean external,
@JsonProperty("comment") Optional<String> comment)
{
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
this.inputColumns = ImmutableList.copyOf(inputColumns);
this.location = requireNonNull(location, "location is null");
this.checkpointInterval = checkpointInterval;
this.external = external;
this.comment = requireNonNull(comment, "comment is null");
}

@JsonProperty
Expand Down Expand Up @@ -97,4 +100,10 @@ public boolean isExternal()
{
return external;
}

@JsonProperty
public Optional<String> getComment()
{
return comment;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ private static ConnectorPageSink createPageSink(Path outputPath, DeltaLakeWriter
getColumnHandles(),
outputPath.toString(),
Optional.of(deltaLakeConfig.getDefaultCheckpointWritingInterval()),
true);
true,
Optional.empty());

DeltaLakePageSinkProvider provider = new DeltaLakePageSinkProvider(
new GroupByHashPageIndexerFactory(new JoinCompiler(new TypeOperators()), new BlockTypeOperators()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ImmutableList;
import io.trino.tempto.assertions.QueryAssert;
import io.trino.tempto.query.QueryResult;
import org.testng.annotations.Test;

import java.util.List;
Expand All @@ -27,6 +28,7 @@
import static io.trino.tests.product.utils.QueryExecutors.onDelta;
import static io.trino.tests.product.utils.QueryExecutors.onTrino;
import static java.lang.String.format;
import static org.testng.Assert.assertEquals;

public class TestDeltaLakeDatabricksCreateTableCompatibility
extends BaseTestDeltaLakeS3Storage
Expand Down Expand Up @@ -171,4 +173,35 @@ private void testInsert(String tableName, List<QueryAssert.Row> existingRows)
assertThat(onTrino().executeQuery("SELECT integer, string, to_iso8601(timetz) FROM delta.default." + tableName))
.containsOnly(expected.build());
}

@Test(groups = {DELTA_LAKE_DATABRICKS, PROFILE_SPECIFIC_TESTS})
public void testCreateTableWithTableComment()
{
String tableName = "test_dl_create_table_comment_" + randomTableSuffix();
String tableDirectory = "databricks-compatibility-test-" + tableName;

onTrino().executeQuery(format("CREATE TABLE delta.default.%s (col INT) COMMENT 'test comment' WITH (location = 's3://%s/%s')",
tableName,
bucketName,
tableDirectory));

try {
assertThat(onTrino().executeQuery("SELECT comment FROM system.metadata.table_comments WHERE catalog_name = 'delta' AND schema_name = 'default' AND table_name = '" + tableName + "'"))
.containsOnly(row("test comment"));

assertEquals(getTableCommentOnDelta("default", tableName), "test comment");
}
finally {
onTrino().executeQuery("DROP TABLE delta.default." + tableName);
}
}

private static String getTableCommentOnDelta(String schemaName, String tableName)
{
QueryResult result = onDelta().executeQuery(format("DESCRIBE EXTENDED %s.%s", schemaName, tableName));
return (String) result.rows().stream()
.filter(row -> row.get(0).equals("Comment"))
.map(row -> row.get(1))
.findFirst().orElseThrow();
}
}

0 comments on commit 67e5478

Please sign in to comment.