Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iceberg View Rest Catalog support #19818

Merged
merged 1 commit into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,7 @@ public void testGetTablesMetadataCalls()
list(list(COUNTING_CATALOG, "test_schema1", "test_table1", "TABLE")),
ImmutableMultiset.<String>builder()
.addCopies("ConnectorMetadata.getSystemTable(schema=test_schema1, table=test_table1)", 4)
.add("ConnectorMetadata.getView(schema=test_schema1, table=test_table1)")
.add("ConnectorMetadata.isView(schema=test_schema1, table=test_table1)")
.add("ConnectorMetadata.getMaterializedView(schema=test_schema1, table=test_table1)")
.add("ConnectorMetadata.redirectTable(schema=test_schema1, table=test_table1)")
.add("ConnectorMetadata.getTableHandle(schema=test_schema1, table=test_table1)")
Expand Down Expand Up @@ -1394,12 +1394,14 @@ public void testGetColumnsMetadataCalls()
.add("ConnectorMetadata.listTables(schema=test_schema4_empty)")
.addCopies("ConnectorMetadata.getSystemTable(schema=test_schema1, table=test_table1)", 20)
.addCopies("ConnectorMetadata.getMaterializedView(schema=test_schema1, table=test_table1)", 5)
.addCopies("ConnectorMetadata.getView(schema=test_schema1, table=test_table1)", 5)
.addCopies("ConnectorMetadata.getView(schema=test_schema1, table=test_table1)", 1)
.addCopies("ConnectorMetadata.isView(schema=test_schema1, table=test_table1)", 4)
.addCopies("ConnectorMetadata.redirectTable(schema=test_schema1, table=test_table1)", 5)
.addCopies("ConnectorMetadata.getTableHandle(schema=test_schema1, table=test_table1)", 5)
.addCopies("ConnectorMetadata.getSystemTable(schema=test_schema2, table=test_table1)", 20)
.addCopies("ConnectorMetadata.getMaterializedView(schema=test_schema2, table=test_table1)", 5)
.addCopies("ConnectorMetadata.getView(schema=test_schema2, table=test_table1)", 5)
.addCopies("ConnectorMetadata.getView(schema=test_schema2, table=test_table1)", 1)
.addCopies("ConnectorMetadata.isView(schema=test_schema2, table=test_table1)", 4)
.addCopies("ConnectorMetadata.redirectTable(schema=test_schema2, table=test_table1)", 5)
.addCopies("ConnectorMetadata.getTableHandle(schema=test_schema2, table=test_table1)", 5)
.add("ConnectorMetadata.getTableMetadata(handle=test_schema1.test_table1)")
Expand Down Expand Up @@ -1535,7 +1537,7 @@ private void testAssumeLiteralMetadataCalls(String escapeLiteralParameter)
list(list(COUNTING_CATALOG, "test_schema1", "test_table1", "TABLE")),
ImmutableMultiset.<String>builder()
.addCopies("ConnectorMetadata.getSystemTable(schema=test_schema1, table=test_table1)", 4)
.add("ConnectorMetadata.getView(schema=test_schema1, table=test_table1)")
.add("ConnectorMetadata.isView(schema=test_schema1, table=test_table1)")
.add("ConnectorMetadata.getMaterializedView(schema=test_schema1, table=test_table1)")
.add("ConnectorMetadata.redirectTable(schema=test_schema1, table=test_table1)")
.add("ConnectorMetadata.getTableHandle(schema=test_schema1, table=test_table1)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,13 +479,7 @@ Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
*/
Map<QualifiedObjectName, ViewInfo> getViews(Session session, QualifiedTablePrefix prefix);

/**
* Is the specified table a view.
*/
default boolean isView(Session session, QualifiedObjectName viewName)
{
return getView(session, viewName).isPresent();
}
boolean isView(Session session, QualifiedObjectName viewName);

/**
* Returns the view definition for the specified view name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1476,7 +1476,18 @@ public Optional<TrinoPrincipal> getSchemaOwner(Session session, CatalogSchemaNam
@Override
public boolean isView(Session session, QualifiedObjectName viewName)
{
return getViewInternal(session, viewName).isPresent();
if (cannotExist(viewName)) {
return false;
}

return getOptionalCatalogMetadata(session, viewName.catalogName())
.map(catalog -> {
CatalogHandle catalogHandle = catalog.getCatalogHandle(session, viewName, Optional.empty(), Optional.empty());
ConnectorMetadata metadata = catalog.getMetadataFor(session, catalogHandle);
ConnectorSession connectorSession = session.toConnectorSession(catalogHandle);
return metadata.isView(connectorSession, viewName.asSchemaTableName());
})
.orElse(false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,15 @@ public Optional<ConnectorViewDefinition> getView(ConnectorSession session, Schem
}
}

@Override
public boolean isView(ConnectorSession session, SchemaTableName viewName)
{
Span span = startSpan("isView", viewName);
try (var _ = scopedSpan(span)) {
return delegate.isView(session, viewName);
}
}

@Override
public Map<String, Object> getViewProperties(ConnectorSession session, SchemaTableName viewName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,12 @@ public Optional<ViewDefinition> getView(Session session, QualifiedObjectName vie
return Optional.ofNullable(views.get(viewName.asSchemaTableName()));
}

@Override
public boolean isView(Session session, QualifiedObjectName viewName)
{
return getView(session, viewName).isPresent();
}

@Override
public void createView(Session session, QualifiedObjectName viewName, ViewDefinition definition, Map<String, Object> viewProperties, boolean replace)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,12 @@ public Optional<ViewDefinition> getView(Session session, QualifiedObjectName vie
throw new UnsupportedOperationException();
}

@Override
public boolean isView(Session session, QualifiedObjectName viewName)
{
throw new UnsupportedOperationException();
}

@Override
public Map<String, Object> getViewProperties(Session session, QualifiedObjectName viewName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,14 @@ default Optional<ConnectorViewDefinition> getView(ConnectorSession session, Sche
return Optional.empty();
}

/**
* Is the specified table a view.
*/
default boolean isView(ConnectorSession session, SchemaTableName viewName)
{
return getView(session, viewName).isPresent();
}
Comment on lines +1028 to +1031
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default implementation of connector metadata#getView will preserve the existing behavior for other connectors by just doing getView().isPresent. IcebergMetadata overrides this implementation so we can still return true in the case of multiple dialects.


/**
* Gets the view properties for the specified view.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,14 @@ public Optional<ConnectorViewDefinition> getView(ConnectorSession session, Schem
}
}

@Override
public boolean isView(ConnectorSession session, SchemaTableName viewName)
{
try (ThreadContextClassLoader _ = new ThreadContextClassLoader(classLoader)) {
return delegate.isView(session, viewName);
}
}

@Override
public Map<String, Object> getViewProperties(ConnectorSession session, SchemaTableName viewName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public enum IcebergErrorCode
ICEBERG_WRITER_CLOSE_ERROR(14, EXTERNAL),
ICEBERG_MISSING_METADATA(15, EXTERNAL),
ICEBERG_WRITER_DATA_ERROR(16, EXTERNAL),
ICEBERG_UNSUPPORTED_VIEW_DIALECT(17, EXTERNAL)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I like making this Iceberg specific

/**/;

private final ErrorCode errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_MISSING_METADATA;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_UNSUPPORTED_VIEW_DIALECT;
import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_MODIFIED_TIME;
import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_PATH;
import static io.trino.plugin.iceberg.IcebergMetadataColumn.isMetadataColumnId;
Expand Down Expand Up @@ -2504,6 +2505,20 @@ public Map<SchemaTableName, ConnectorViewDefinition> getViews(ConnectorSession s
return catalog.getViews(session, schemaName);
}

@Override
public boolean isView(ConnectorSession session, SchemaTableName viewName)
{
try {
return catalog.getView(session, viewName).isPresent();
}
catch (TrinoException e) {
if (e.getErrorCode() == ICEBERG_UNSUPPORTED_VIEW_DIALECT.toErrorCode()) {
return true;
}
Comment on lines +2515 to +2517
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for the Iceberg specific error code is so that we can handle multiple dialects properly in the isView logic and distinguish it from other errors. We can just catch this particular code and return true, and other catalogs in the future can just do what the rest catalog does and it should just work. Another benefit of this is that we don't need to plumb isView all the way down into TrinoCatalog and add to another interface.

throw e;
}
}

@Override
public Optional<ConnectorViewDefinition> getView(ConnectorSession session, SchemaTableName viewName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
import static io.trino.spi.StandardErrorCode.INVALID_ARGUMENTS;
import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.connector.ConnectorViewDefinition.ViewColumn;
import static io.trino.spi.function.InvocationConvention.InvocationArgumentConvention.NEVER_NULL;
import static io.trino.spi.function.InvocationConvention.InvocationReturnConvention.FAIL_ON_NULL;
import static io.trino.spi.predicate.Utils.nativeValueToBlock;
Expand Down Expand Up @@ -314,6 +315,18 @@ public static List<ColumnMetadata> getColumnMetadatas(Schema schema, TypeManager
return columns.build();
}

public static Schema updateColumnComment(Schema schema, String columnName, String comment)
{
NestedField fieldToUpdate = schema.findField(columnName);
checkArgument(fieldToUpdate != null, "Field %s does not exist", columnName);
NestedField updatedField = NestedField.of(fieldToUpdate.fieldId(), fieldToUpdate.isOptional(), fieldToUpdate.name(), fieldToUpdate.type(), comment);
List<NestedField> newFields = schema.columns().stream()
.map(field -> (field.fieldId() == updatedField.fieldId()) ? updatedField : field)
.toList();

return new Schema(newFields, schema.getAliases(), schema.identifierFieldIds());
}

public static IcebergColumnHandle getColumnHandle(NestedField column, TypeManager typeManager)
{
Type type = toTrinoType(column.type(), typeManager);
Expand Down Expand Up @@ -678,6 +691,27 @@ public static Schema schemaFromMetadata(List<ColumnMetadata> columns)
return new Schema(icebergSchema.asStructType().fields());
}

public static Schema schemaFromViewColumns(TypeManager typeManager, List<ViewColumn> columns)
{
List<NestedField> icebergColumns = new ArrayList<>();
AtomicInteger nextFieldId = new AtomicInteger(1);
for (ViewColumn column : columns) {
Type trinoType = typeManager.getType(column.getType());
org.apache.iceberg.types.Type type = toIcebergTypeForNewColumn(trinoType, nextFieldId);
NestedField field = NestedField.of(nextFieldId.getAndIncrement(), false, column.getName(), type, column.getComment().orElse(null));
icebergColumns.add(field);
}
org.apache.iceberg.types.Type icebergSchema = StructType.of(icebergColumns);
return new Schema(icebergSchema.asStructType().fields());
}

public static List<ViewColumn> viewColumnsFromSchema(TypeManager typeManager, Schema schema)
{
return IcebergUtil.getColumns(schema, typeManager).stream()
.map(column -> new ViewColumn(column.getName(), column.getType().getTypeId(), column.getComment()))
.toList();
}

public static Transaction newCreateTableTransaction(TrinoCatalog catalog, ConnectorTableMetadata tableMetadata, ConnectorSession session, boolean replace, String tableLocation)
{
SchemaTableName schemaTableName = tableMetadata.getTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ public abstract class AbstractTrinoCatalog
implements TrinoCatalog
{
public static final String TRINO_CREATED_BY_VALUE = "Trino Iceberg connector";
public static final String ICEBERG_VIEW_RUN_AS_OWNER = "trino.run-as-owner";

protected static final String TRINO_CREATED_BY = HiveMetadata.TRINO_CREATED_BY;
protected static final String TRINO_QUERY_ID_NAME = HiveMetadata.TRINO_QUERY_ID_NAME;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.trino.plugin.iceberg.fileio.ForwardingFileIo;
import io.trino.spi.catalog.CatalogName;
import io.trino.spi.security.ConnectorIdentity;
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.rest.HTTPClient;
import org.apache.iceberg.rest.RESTSessionCatalog;
Expand All @@ -46,6 +47,7 @@ public class TrinoIcebergRestCatalogFactory
private final boolean vendedCredentialsEnabled;
private final SecurityProperties securityProperties;
private final boolean uniqueTableLocation;
private final TypeManager typeManager;

@GuardedBy("this")
private RESTSessionCatalog icebergCatalog;
Expand All @@ -57,6 +59,7 @@ public TrinoIcebergRestCatalogFactory(
IcebergRestCatalogConfig restConfig,
SecurityProperties securityProperties,
IcebergConfig icebergConfig,
TypeManager typeManager,
NodeVersion nodeVersion)
{
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
Expand All @@ -70,6 +73,7 @@ public TrinoIcebergRestCatalogFactory(
this.securityProperties = requireNonNull(securityProperties, "securityProperties is null");
requireNonNull(icebergConfig, "icebergConfig is null");
this.uniqueTableLocation = icebergConfig.isUniqueTableLocation();
this.typeManager = requireNonNull(typeManager, "typeManager is null");
}

@Override
Expand Down Expand Up @@ -101,6 +105,6 @@ public synchronized TrinoCatalog create(ConnectorIdentity identity)
icebergCatalog = icebergCatalogInstance;
}

return new TrinoRestCatalog(icebergCatalog, catalogName, sessionType, trinoVersion, uniqueTableLocation);
return new TrinoRestCatalog(icebergCatalog, catalogName, sessionType, trinoVersion, typeManager, uniqueTableLocation);
}
}
Loading
Loading