Skip to content

Commit

Permalink
Add support for Trino views in Iceberg connector
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk authored and findepi committed Jul 19, 2021
1 parent adbc8bb commit 4b4e4f1
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@
import io.trino.plugin.base.classloader.ClassLoaderSafeSystemTable;
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.hive.HdfsEnvironment.HdfsContext;
import io.trino.plugin.hive.HiveMetadata;
import io.trino.plugin.hive.HiveSchemaProperties;
import io.trino.plugin.hive.HiveViewNotSupportedException;
import io.trino.plugin.hive.HiveWrittenPartitions;
import io.trino.plugin.hive.TableAlreadyExistsException;
import io.trino.plugin.hive.ViewAlreadyExistsException;
import io.trino.plugin.hive.ViewReaderUtil;
import io.trino.plugin.hive.authentication.HiveIdentity;
import io.trino.plugin.hive.metastore.Column;
import io.trino.plugin.hive.metastore.Database;
Expand All @@ -52,6 +56,7 @@
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableProperties;
import io.trino.spi.connector.ConnectorViewDefinition;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.ConstraintApplicationResult;
import io.trino.spi.connector.DiscretePredicates;
Expand All @@ -62,6 +67,7 @@
import io.trino.spi.connector.SchemaTablePrefix;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.connector.ViewNotFoundException;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.NullableValue;
import io.trino.spi.predicate.TupleDomain;
Expand Down Expand Up @@ -116,11 +122,12 @@
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA;
import static io.trino.plugin.hive.HiveMetadata.PRESTO_QUERY_ID_NAME;
import static io.trino.plugin.hive.HiveMetadata.STORAGE_TABLE;
import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT;
import static io.trino.plugin.hive.HiveType.HIVE_STRING;
import static io.trino.plugin.hive.ViewReaderUtil.PRESTO_VIEW_FLAG;
import static io.trino.plugin.hive.ViewReaderUtil.encodeViewData;
import static io.trino.plugin.hive.ViewReaderUtil.isPrestoView;
import static io.trino.plugin.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet;
import static io.trino.plugin.hive.metastore.StorageFormat.VIEW_STORAGE_FORMAT;
import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema;
Expand Down Expand Up @@ -155,6 +162,7 @@
import static io.trino.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.StandardErrorCode.SCHEMA_NOT_EMPTY;
import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND;
import static io.trino.spi.type.BigintType.BIGINT;
import static java.util.Collections.singletonList;
import static java.util.Objects.requireNonNull;
Expand All @@ -175,15 +183,23 @@ public class IcebergMetadata
private static final String ICEBERG_MATERIALIZED_VIEW_COMMENT = "Presto Materialized View";
public static final String DEPENDS_ON_TABLES = "dependsOnTables";

// Be compatible with views defined by the Hive connector, which can be useful under certain conditions.
private static final String PRESTO_VIEW_COMMENT = HiveMetadata.PRESTO_VIEW_COMMENT;
private static final String PRESTO_VERSION_NAME = HiveMetadata.PRESTO_VERSION_NAME;
private static final String PRESTO_QUERY_ID_NAME = HiveMetadata.PRESTO_QUERY_ID_NAME;
private static final String PRESTO_VIEW_EXPANDED_TEXT_MARKER = HiveMetadata.PRESTO_VIEW_EXPANDED_TEXT_MARKER;

private final CatalogName catalogName;
private final HiveMetastore metastore;
private final HdfsEnvironment hdfsEnvironment;
private final TypeManager typeManager;
private final JsonCodec<CommitTaskData> commitTaskCodec;
private final HiveTableOperationsProvider tableOperationsProvider;
private final String trinoVersion;

private final Map<String, Optional<Long>> snapshotIds = new ConcurrentHashMap<>();
private final Map<SchemaTableName, TableMetadata> tableMetadataCache = new ConcurrentHashMap<>();
private final ViewReaderUtil.PrestoViewReader viewReader = new ViewReaderUtil.PrestoViewReader();

private Transaction transaction;

Expand All @@ -193,14 +209,16 @@ public IcebergMetadata(
HdfsEnvironment hdfsEnvironment,
TypeManager typeManager,
JsonCodec<CommitTaskData> commitTaskCodec,
HiveTableOperationsProvider tableOperationsProvider)
HiveTableOperationsProvider tableOperationsProvider,
String trinoVersion)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null");
this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null");
}

@Override
Expand Down Expand Up @@ -770,6 +788,143 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable
throw new TrinoException(NOT_SUPPORTED, "This connector only supports delete where one or more partitions are deleted entirely");
}

@Override
public void createView(ConnectorSession session, SchemaTableName viewName, ConnectorViewDefinition definition, boolean replace)
{
HiveIdentity identity = new HiveIdentity(session);
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put(PRESTO_VIEW_FLAG, "true")
.put(PRESTO_VERSION_NAME, trinoVersion)
.put(PRESTO_QUERY_ID_NAME, session.getQueryId())
.put(TABLE_COMMENT, PRESTO_VIEW_COMMENT)
.build();

Table.Builder tableBuilder = Table.builder()
.setDatabaseName(viewName.getSchemaName())
.setTableName(viewName.getTableName())
.setOwner(session.getUser())
.setTableType(org.apache.hadoop.hive.metastore.TableType.VIRTUAL_VIEW.name())
.setDataColumns(ImmutableList.of(new Column("dummy", HIVE_STRING, Optional.empty())))
.setPartitionColumns(ImmutableList.of())
.setParameters(properties)
.setViewOriginalText(Optional.of(encodeViewData(definition)))
.setViewExpandedText(Optional.of(PRESTO_VIEW_EXPANDED_TEXT_MARKER));

tableBuilder.getStorageBuilder()
.setStorageFormat(VIEW_STORAGE_FORMAT)
.setLocation("");
Table table = tableBuilder.build();
PrincipalPrivileges principalPrivileges = buildInitialPrivilegeSet(session.getUser());

Optional<Table> existing = metastore.getTable(identity, viewName.getSchemaName(), viewName.getTableName());
if (existing.isPresent()) {
if (!replace || !isPrestoView(existing.get())) {
throw new ViewAlreadyExistsException(viewName);
}

metastore.replaceTable(identity, viewName.getSchemaName(), viewName.getTableName(), table, principalPrivileges);
return;
}

try {
metastore.createTable(identity, table, principalPrivileges);
}
catch (TableAlreadyExistsException e) {
throw new ViewAlreadyExistsException(e.getTableName());
}
}

@Override
public void renameView(ConnectorSession session, SchemaTableName source, SchemaTableName target)
{
// Not checking if source view exists as this is already done in RenameViewTask
metastore.renameTable(new HiveIdentity(session), source.getSchemaName(), source.getTableName(), target.getSchemaName(), target.getTableName());
}

@Override
public void setViewAuthorization(ConnectorSession session, SchemaTableName viewName, TrinoPrincipal principal)
{
// Not checking if view exists as this is already done in SetViewAuthorizationTask
setTableAuthorization(session, viewName, principal);
}

@Override
public void dropView(ConnectorSession session, SchemaTableName viewName)
{
if (getView(session, viewName).isEmpty()) {
throw new ViewNotFoundException(viewName);
}

try {
metastore.dropTable(new HiveIdentity(session), viewName.getSchemaName(), viewName.getTableName(), true);
}
catch (TableNotFoundException e) {
throw new ViewNotFoundException(e.getTableName());
}
}

@Override
public List<SchemaTableName> listViews(ConnectorSession session, Optional<String> schemaName)
{
// Filter on PRESTO_VIEW_COMMENT to distinguish from materialized views
return listSchemas(session, schemaName).stream()
.flatMap(schema ->
metastore.getTablesWithParameter(schema, TABLE_COMMENT, PRESTO_VIEW_COMMENT).stream()
.map(table -> new SchemaTableName(schema, table)))
.collect(toImmutableList());
}

@Override
public Map<SchemaTableName, ConnectorViewDefinition> getViews(ConnectorSession session, Optional<String> schemaName)
{
ImmutableMap.Builder<SchemaTableName, ConnectorViewDefinition> views = ImmutableMap.builder();
for (SchemaTableName name : listViews(session, schemaName)) {
try {
getView(session, name).ifPresent(view -> views.put(name, view));
}
catch (TrinoException e) {
if (e.getErrorCode().equals(TABLE_NOT_FOUND.toErrorCode())) {
// Ignore view that was dropped during query execution (race condition)
}
else {
throw e;
}
}
}
return views.build();
}

@Override
public Optional<ConnectorViewDefinition> getView(ConnectorSession session, SchemaTableName viewName)
{
if (isHiveSystemSchema(viewName.getSchemaName())) {
return Optional.empty();
}
return metastore.getTable(new HiveIdentity(session), viewName.getSchemaName(), viewName.getTableName())
.filter(table -> HiveMetadata.PRESTO_VIEW_COMMENT.equals(table.getParameters().get(TABLE_COMMENT))) // filter out materialized views
.filter(ViewReaderUtil::canDecodeView)
.map(view -> {
if (!isPrestoView(view)) {
throw new HiveViewNotSupportedException(viewName);
}

ConnectorViewDefinition definition = viewReader
.decodeViewData(view.getViewOriginalText().get(), view, catalogName);
// use owner from table metadata if it exists
if (view.getOwner() != null && !definition.isRunAsInvoker()) {
definition = new ConnectorViewDefinition(
definition.getOriginalSql(),
definition.getCatalog(),
definition.getSchema(),
definition.getColumns(),
definition.getComment(),
Optional.of(view.getOwner()),
false);
}
return definition;
});
}

@Override
public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
{
Expand Down Expand Up @@ -1046,8 +1201,7 @@ private boolean isMaterializedView(Table table)
@Override
public List<SchemaTableName> listMaterializedViews(ConnectorSession session, Optional<String> schemaName)
{
// Iceberg does not support VIEWs
// Filter on ICEBERG_MATERIALIZED_VIEW_COMMENT is used to avoid listing hive views in case of a shared HMS
// Filter on ICEBERG_MATERIALIZED_VIEW_COMMENT is used to avoid listing hive views in case of a shared HMS and to distinguish from standard views
return listSchemas(session, schemaName).stream()
.flatMap(schema -> metastore.getTablesWithParameter(schema, TABLE_COMMENT, ICEBERG_MATERIALIZED_VIEW_COMMENT).stream()
.map(table -> new SchemaTableName(schema, table)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airlift.json.JsonCodec;
import io.trino.plugin.base.CatalogName;
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.hive.NodeVersion;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.spi.type.TypeManager;

Expand All @@ -31,6 +32,7 @@ public class IcebergMetadataFactory
private final TypeManager typeManager;
private final JsonCodec<CommitTaskData> commitTaskCodec;
private final HiveTableOperationsProvider tableOperationsProvider;
private final String trinoVersion;

@Inject
public IcebergMetadataFactory(
Expand All @@ -40,9 +42,10 @@ public IcebergMetadataFactory(
HdfsEnvironment hdfsEnvironment,
TypeManager typeManager,
JsonCodec<CommitTaskData> commitTaskDataJsonCodec,
HiveTableOperationsProvider tableOperationsProvider)
HiveTableOperationsProvider tableOperationsProvider,
NodeVersion nodeVersion)
{
this(catalogName, metastore, hdfsEnvironment, typeManager, commitTaskDataJsonCodec, tableOperationsProvider);
this(catalogName, metastore, hdfsEnvironment, typeManager, commitTaskDataJsonCodec, tableOperationsProvider, nodeVersion);
}

public IcebergMetadataFactory(
Expand All @@ -51,18 +54,20 @@ public IcebergMetadataFactory(
HdfsEnvironment hdfsEnvironment,
TypeManager typeManager,
JsonCodec<CommitTaskData> commitTaskCodec,
HiveTableOperationsProvider tableOperationsProvider)
HiveTableOperationsProvider tableOperationsProvider,
NodeVersion nodeVersion)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null");
this.trinoVersion = requireNonNull(nodeVersion, "nodeVersion is null").toString();
}

public IcebergMetadata create()
{
return new IcebergMetadata(catalogName, metastore, hdfsEnvironment, typeManager, commitTaskCodec, tableOperationsProvider);
return new IcebergMetadata(catalogName, metastore, hdfsEnvironment, typeManager, commitTaskCodec, tableOperationsProvider, trinoVersion);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
case SUPPORTS_TOPN_PUSHDOWN:
return false;

case SUPPORTS_CREATE_VIEW:
return true;

case SUPPORTS_CREATE_MATERIALIZED_VIEW:
return true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
case SUPPORTS_TOPN_PUSHDOWN:
return false;

case SUPPORTS_CREATE_VIEW:
return true;

case SUPPORTS_CREATE_MATERIALIZED_VIEW:
return true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public void setUp()
assertQuerySucceeds("CREATE MATERIALIZED VIEW iceberg.test_schema.iceberg_materialized_view AS " +
"SELECT * FROM iceberg.test_schema.iceberg_table1");
storageTable = getStorageTable("iceberg", "test_schema", "iceberg_materialized_view");
assertQuerySucceeds("CREATE VIEW iceberg.test_schema.iceberg_view AS SELECT * FROM iceberg.test_schema.iceberg_table1");

assertQuerySucceeds("CREATE TABLE hive.test_schema.hive_table (_double DOUBLE)");
assertQuerySucceeds("CREATE VIEW hive.test_schema.hive_view AS SELECT * FROM hive.test_schema.hive_table");
Expand All @@ -106,6 +107,7 @@ public void tearDown()
{
assertQuerySucceeds("DROP TABLE IF EXISTS hive.test_schema.hive_table");
assertQuerySucceeds("DROP VIEW IF EXISTS hive.test_schema.hive_view");
assertQuerySucceeds("DROP VIEW IF EXISTS iceberg.test_schema.iceberg_view");
assertQuerySucceeds("DROP MATERIALIZED VIEW IF EXISTS iceberg.test_schema.iceberg_materialized_view");
assertQuerySucceeds("DROP TABLE IF EXISTS iceberg.test_schema.iceberg_table2");
assertQuerySucceeds("DROP TABLE IF EXISTS iceberg.test_schema.iceberg_table1");
Expand All @@ -121,6 +123,7 @@ public void testTableListing()
"iceberg_table2",
"iceberg_materialized_view",
storageTable.getTableName(),
"iceberg_view",
"hive_table",
"hive_view");

Expand All @@ -130,7 +133,9 @@ public void testTableListing()
"'iceberg_table1', " +
"'iceberg_table2', " +
"'iceberg_materialized_view', " +
"'" + storageTable.getTableName() + "'");
"'" + storageTable.getTableName() + "', " +
"'iceberg_view', " +
"'hive_view'");
}

@Test
Expand All @@ -146,7 +151,10 @@ public void testTableColumnListing()
"('iceberg_materialized_view', '_string'), " +
"('iceberg_materialized_view', '_integer'), " +
"('" + storageTable.getTableName() + "', '_string'), " +
"('" + storageTable.getTableName() + "', '_integer')");
"('" + storageTable.getTableName() + "', '_integer'), " +
"('iceberg_view', '_string'), " +
"('iceberg_view', '_integer'), " +
"('hive_view', '_double')");
}

@Test
Expand Down
Loading

0 comments on commit 4b4e4f1

Please sign in to comment.