Skip to content

Commit

Permalink
Add support to redirect table operations from Iceberg to Hive
Browse files Browse the repository at this point in the history
The Iceberg connector can make use of the `iceberg.hive-catalog-name`
configuration property for enable table redirects towards the Hive tables.

When the table redirection towards Hive connector is not enabled,
in case of trying to query on the Iceberg connector
a metadata table of a Hive connector table,
the user will receive a table not found exception.
  • Loading branch information
findinpath authored and findepi committed Apr 6, 2022
1 parent df3d232 commit 7562be6
Show file tree
Hide file tree
Showing 13 changed files with 601 additions and 219 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

import java.util.Optional;

import static io.trino.plugin.hive.HiveCompressionCodec.ZSTD;
import static io.trino.plugin.iceberg.CatalogType.HIVE_METASTORE;
import static io.trino.plugin.iceberg.IcebergFileFormat.ORC;
Expand All @@ -37,6 +39,7 @@ public class IcebergConfig
private Duration dynamicFilteringWaitTimeout = new Duration(0, SECONDS);
private boolean tableStatisticsEnabled = true;
private boolean projectionPushdownEnabled = true;
private Optional<String> hiveCatalogName = Optional.empty();

public CatalogType getCatalogType()
{
Expand Down Expand Up @@ -166,4 +169,17 @@ public IcebergConfig setProjectionPushdownEnabled(boolean projectionPushdownEnab
this.projectionPushdownEnabled = projectionPushdownEnabled;
return this;
}

public Optional<String> getHiveCatalogName()
{
return hiveCatalogName;
}

@Config("iceberg.hive-catalog-name")
@ConfigDescription("Catalog to redirect to when a Hive table is referenced")
public IcebergConfig setHiveCatalogName(String hiveCatalogName)
{
this.hiveCatalogName = Optional.ofNullable(hiveCatalogName);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SchemaTablePrefix;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.TableColumnsMetadata;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.expression.Variable;
Expand Down Expand Up @@ -115,6 +116,7 @@
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;

import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand Down Expand Up @@ -151,7 +153,6 @@
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
import static io.trino.spi.type.BigintType.BIGINT;
import static java.lang.String.format;
import static java.util.Collections.singletonList;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.joining;
Expand Down Expand Up @@ -206,7 +207,10 @@ public Optional<TrinoPrincipal> getSchemaOwner(ConnectorSession session, Catalog
public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName)
{
IcebergTableName name = IcebergTableName.from(tableName.getTableName());
verify(name.getTableType() == DATA, "Wrong table type: " + name.getTableNameWithType());
if (name.getTableType() != DATA) {
// Pretend the table does not exist to produce better error message in case of table redirects to Hive
return null;
}

Table table;
try {
Expand Down Expand Up @@ -236,6 +240,7 @@ public Optional<SystemTable> getSystemTable(ConnectorSession session, SchemaTabl
.map(systemTable -> new ClassLoaderSafeSystemTable(systemTable, getClass().getClassLoader()));
}

@SuppressWarnings("TryWithIdenticalCatches")
private Optional<SystemTable> getRawSystemTable(ConnectorSession session, SchemaTableName tableName)
{
IcebergTableName name = IcebergTableName.from(tableName.getTableName());
Expand All @@ -251,6 +256,10 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
catch (TableNotFoundException e) {
return Optional.empty();
}
catch (UnknownTableTypeException e) {
// avoid dealing with non Iceberg tables
return Optional.empty();
}

SchemaTableName systemTableName = new SchemaTableName(tableName.getSchemaName(), name.getTableNameWithType());
switch (name.getTableType()) {
Expand Down Expand Up @@ -395,27 +404,43 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable
@Override
public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
{
List<SchemaTableName> tables = prefix.getTable()
.map(ignored -> singletonList(prefix.toSchemaTableName()))
.orElseGet(() -> listTables(session, prefix.getSchema()));
throw new UnsupportedOperationException("The deprecated listTableColumns is not supported because streamTableColumns is implemented instead");
}

ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder();
for (SchemaTableName table : tables) {
try {
columns.put(table, getTableMetadata(session, table).getColumns());
}
catch (TableNotFoundException e) {
// table disappeared during listing operation
}
catch (UnknownTableTypeException e) {
// ignore table of unknown type
}
catch (RuntimeException e) {
// Table can be being removed and this may cause all sorts of exceptions. Log, because we're catching broadly.
log.warn(e, "Failed to access metadata of table %s during column listing for %s", table, prefix);
}
@Override
@SuppressWarnings("TryWithIdenticalCatches")
public Stream<TableColumnsMetadata> streamTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
{
requireNonNull(prefix, "prefix is null");
List<SchemaTableName> schemaTableNames;
if (prefix.getTable().isEmpty()) {
schemaTableNames = catalog.listTables(session, prefix.getSchema());
}
else {
schemaTableNames = ImmutableList.of(prefix.toSchemaTableName());
}
return columns.buildOrThrow();
return schemaTableNames.stream()
.flatMap(tableName -> {
try {
if (redirectTable(session, tableName).isPresent()) {
return Stream.of(TableColumnsMetadata.forRedirectedTable(tableName));
}
return Stream.of(TableColumnsMetadata.forTable(tableName, getTableMetadata(session, tableName).getColumns()));
}
catch (TableNotFoundException e) {
// Table disappeared during listing operation
return Stream.empty();
}
catch (UnknownTableTypeException e) {
// Skip unsupported table type in case that the table redirects are not enabled
return Stream.empty();
}
catch (RuntimeException e) {
// Table can be being removed and this may cause all sorts of exceptions. Log, because we're catching broadly.
log.warn(e, "Failed to access metadata of table %s during streaming table columns for %s", tableName, prefix);
return Stream.empty();
}
});
}

@Override
Expand Down Expand Up @@ -1394,6 +1419,12 @@ private Map<String, Optional<TableToken>> getMaterializedViewToken(ConnectorSess
return viewToken;
}

@Override
public Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session, SchemaTableName tableName)
{
return catalog.redirectTable(session, tableName);
}

private static class TableToken
{
// Current Snapshot ID of the table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import javax.inject.Inject;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;

import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -41,6 +42,7 @@
import static io.trino.spi.session.PropertyMetadata.doubleProperty;
import static io.trino.spi.session.PropertyMetadata.enumProperty;
import static io.trino.spi.session.PropertyMetadata.integerProperty;
import static io.trino.spi.session.PropertyMetadata.stringProperty;
import static java.lang.String.format;

public final class IcebergSessionProperties
Expand Down Expand Up @@ -71,6 +73,7 @@ public final class IcebergSessionProperties
private static final String STATISTICS_ENABLED = "statistics_enabled";
private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled";
private static final String TARGET_MAX_FILE_SIZE = "target_max_file_size";
private static final String HIVE_CATALOG_NAME = "hive_catalog_name";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -219,6 +222,13 @@ public IcebergSessionProperties(
"Target maximum size of written files; the actual size may be larger",
hiveConfig.getTargetMaxFileSize(),
false))
.add(stringProperty(
HIVE_CATALOG_NAME,
"Catalog to redirect to when a Hive table is referenced",
icebergConfig.getHiveCatalogName().orElse(null),
// Session-level redirections configuration does not work well with views, as view body is analyzed in context
// of a session with properties stripped off. Thus, this property is more of a test-only, or at most POC usefulness.
true))
.build();
}

Expand Down Expand Up @@ -359,4 +369,9 @@ public static long getTargetMaxFileSize(ConnectorSession session)
{
return session.getProperty(TARGET_MAX_FILE_SIZE, DataSize.class).toBytes();
}

public static Optional<String> getHiveCatalogName(ConnectorSession session)
{
return Optional.ofNullable(session.getProperty(HIVE_CATALOG_NAME, String.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.trino.plugin.iceberg.ColumnIdentity;
import io.trino.plugin.iceberg.UnknownTableTypeException;
import io.trino.spi.connector.CatalogSchemaTableName;
import io.trino.spi.connector.ConnectorMaterializedViewDefinition;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorViewDefinition;
Expand Down Expand Up @@ -119,4 +120,6 @@ void createMaterializedView(
void renameMaterializedView(ConnectorSession session, SchemaTableName source, SchemaTableName target);

void updateColumnComment(ConnectorSession session, SchemaTableName schemaTableName, ColumnIdentity columnIdentity, Optional<String> comment);

Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session, SchemaTableName tableName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.trino.plugin.iceberg.catalog.AbstractTrinoCatalog;
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.CatalogSchemaTableName;
import io.trino.spi.connector.ConnectorMaterializedViewDefinition;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorViewDefinition;
Expand Down Expand Up @@ -74,16 +75,22 @@
import static io.trino.plugin.hive.ViewReaderUtil.encodeViewData;
import static io.trino.plugin.hive.ViewReaderUtil.isPrestoView;
import static io.trino.plugin.hive.metastore.glue.AwsSdkUtil.getPaginatedResults;
import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR;
import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getHiveCatalogName;
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata;
import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName;
import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped;
import static io.trino.plugin.iceberg.catalog.glue.GlueIcebergUtil.getTableInput;
import static io.trino.plugin.iceberg.catalog.glue.GlueIcebergUtil.getViewTableInput;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.connector.SchemaTableName.schemaTableName;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.hive.metastore.TableType.VIRTUAL_VIEW;
import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE;
import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
import static org.apache.iceberg.CatalogUtil.dropTableData;

public class TrinoGlueCatalog
Expand Down Expand Up @@ -575,4 +582,40 @@ public void renameMaterializedView(ConnectorSession session, SchemaTableName sou
{
throw new TrinoException(NOT_SUPPORTED, "renameMaterializedView is not supported for Iceberg Glue catalogs");
}

@Override
public Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session, SchemaTableName tableName)
{
requireNonNull(session, "session is null");
requireNonNull(tableName, "tableName is null");
Optional<String> targetCatalogName = getHiveCatalogName(session);
if (targetCatalogName.isEmpty()) {
return Optional.empty();
}
if (isHiveSystemSchema(tableName.getSchemaName())) {
return Optional.empty();
}

// we need to chop off any "$partitions" and similar suffixes from table name while querying the metastore for the Table object
int metadataMarkerIndex = tableName.getTableName().lastIndexOf('$');
SchemaTableName tableNameBase = (metadataMarkerIndex == -1) ? tableName : schemaTableName(
tableName.getSchemaName(),
tableName.getTableName().substring(0, metadataMarkerIndex));

Optional<com.amazonaws.services.glue.model.Table> table = getTable(new SchemaTableName(tableNameBase.getSchemaName(), tableNameBase.getTableName()));

if (table.isEmpty() || VIRTUAL_VIEW.name().equals(table.get().getTableType())) {
return Optional.empty();
}
if (!isIcebergTable(table.get())) {
// After redirecting, use the original table name, with "$partitions" and similar suffixes
return targetCatalogName.map(catalog -> new CatalogSchemaTableName(catalog, tableName));
}
return Optional.empty();
}

private static boolean isIcebergTable(com.amazonaws.services.glue.model.Table table)
{
return ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(table.getParameters().get(TABLE_TYPE_PROP));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,19 @@
import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.encodeMaterializedViewData;
import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.fromConnectorMaterializedViewDefinition;
import static io.trino.plugin.iceberg.IcebergSchemaProperties.getSchemaLocation;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getHiveCatalogName;
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata;
import static io.trino.plugin.iceberg.IcebergUtil.isIcebergTable;
import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable;
import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped;
import static io.trino.plugin.iceberg.PartitionFields.toPartitionFields;
import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS;
import static io.trino.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.StandardErrorCode.SCHEMA_NOT_EMPTY;
import static io.trino.spi.connector.SchemaTableName.schemaTableName;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
import static org.apache.hadoop.hive.metastore.TableType.VIRTUAL_VIEW;
Expand Down Expand Up @@ -622,6 +625,37 @@ private List<String> listNamespaces(ConnectorSession session, Optional<String> n
return listNamespaces(session);
}

@Override
public Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session, SchemaTableName tableName)
{
requireNonNull(session, "session is null");
requireNonNull(tableName, "tableName is null");
Optional<String> targetCatalogName = getHiveCatalogName(session);
if (targetCatalogName.isEmpty()) {
return Optional.empty();
}
if (isHiveSystemSchema(tableName.getSchemaName())) {
return Optional.empty();
}

// we need to chop off any "$partitions" and similar suffixes from table name while querying the metastore for the Table object
int metadataMarkerIndex = tableName.getTableName().lastIndexOf('$');
SchemaTableName tableNameBase = (metadataMarkerIndex == -1) ? tableName : schemaTableName(
tableName.getSchemaName(),
tableName.getTableName().substring(0, metadataMarkerIndex));

Optional<io.trino.plugin.hive.metastore.Table> table = metastore.getTable(tableNameBase.getSchemaName(), tableNameBase.getTableName());

if (table.isEmpty() || isHiveOrPrestoView(table.get().getTableType())) {
return Optional.empty();
}
if (!isIcebergTable(table.get())) {
// After redirecting, use the original table name, with "$partitions" and similar suffixes
return targetCatalogName.map(catalog -> new CatalogSchemaTableName(catalog, tableName));
}
return Optional.empty();
}

private static class MaterializedViewMayBeBeingRemovedException
extends RuntimeException
{
Expand Down
Loading

0 comments on commit 7562be6

Please sign in to comment.