Skip to content

Commit

Permalink
Iceberg View Rest Catalog support
Browse files Browse the repository at this point in the history
  • Loading branch information
amogh-jahagirdar committed May 22, 2024
1 parent ed231b6 commit 406ed5b
Show file tree
Hide file tree
Showing 21 changed files with 320 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,7 @@ public void testGetTablesMetadataCalls()
list(list(COUNTING_CATALOG, "test_schema1", "test_table1", "TABLE")),
ImmutableMultiset.<String>builder()
.addCopies("ConnectorMetadata.getSystemTable(schema=test_schema1, table=test_table1)", 4)
.add("ConnectorMetadata.getView(schema=test_schema1, table=test_table1)")
.add("ConnectorMetadata.isView(schema=test_schema1, table=test_table1)")
.add("ConnectorMetadata.getMaterializedView(schema=test_schema1, table=test_table1)")
.add("ConnectorMetadata.redirectTable(schema=test_schema1, table=test_table1)")
.add("ConnectorMetadata.getTableHandle(schema=test_schema1, table=test_table1)")
Expand Down Expand Up @@ -1394,12 +1394,14 @@ public void testGetColumnsMetadataCalls()
.add("ConnectorMetadata.listTables(schema=test_schema4_empty)")
.addCopies("ConnectorMetadata.getSystemTable(schema=test_schema1, table=test_table1)", 20)
.addCopies("ConnectorMetadata.getMaterializedView(schema=test_schema1, table=test_table1)", 5)
.addCopies("ConnectorMetadata.getView(schema=test_schema1, table=test_table1)", 5)
.addCopies("ConnectorMetadata.getView(schema=test_schema1, table=test_table1)", 1)
.addCopies("ConnectorMetadata.isView(schema=test_schema1, table=test_table1)", 4)
.addCopies("ConnectorMetadata.redirectTable(schema=test_schema1, table=test_table1)", 5)
.addCopies("ConnectorMetadata.getTableHandle(schema=test_schema1, table=test_table1)", 5)
.addCopies("ConnectorMetadata.getSystemTable(schema=test_schema2, table=test_table1)", 20)
.addCopies("ConnectorMetadata.getMaterializedView(schema=test_schema2, table=test_table1)", 5)
.addCopies("ConnectorMetadata.getView(schema=test_schema2, table=test_table1)", 5)
.addCopies("ConnectorMetadata.getView(schema=test_schema2, table=test_table1)", 1)
.addCopies("ConnectorMetadata.isView(schema=test_schema2, table=test_table1)", 4)
.addCopies("ConnectorMetadata.redirectTable(schema=test_schema2, table=test_table1)", 5)
.addCopies("ConnectorMetadata.getTableHandle(schema=test_schema2, table=test_table1)", 5)
.add("ConnectorMetadata.getTableMetadata(handle=test_schema1.test_table1)")
Expand Down Expand Up @@ -1535,7 +1537,7 @@ private void testAssumeLiteralMetadataCalls(String escapeLiteralParameter)
list(list(COUNTING_CATALOG, "test_schema1", "test_table1", "TABLE")),
ImmutableMultiset.<String>builder()
.addCopies("ConnectorMetadata.getSystemTable(schema=test_schema1, table=test_table1)", 4)
.add("ConnectorMetadata.getView(schema=test_schema1, table=test_table1)")
.add("ConnectorMetadata.isView(schema=test_schema1, table=test_table1)")
.add("ConnectorMetadata.getMaterializedView(schema=test_schema1, table=test_table1)")
.add("ConnectorMetadata.redirectTable(schema=test_schema1, table=test_table1)")
.add("ConnectorMetadata.getTableHandle(schema=test_schema1, table=test_table1)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,13 +479,7 @@ Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
*/
Map<QualifiedObjectName, ViewInfo> getViews(Session session, QualifiedTablePrefix prefix);

/**
* Is the specified table a view.
*/
default boolean isView(Session session, QualifiedObjectName viewName)
{
return getView(session, viewName).isPresent();
}
boolean isView(Session session, QualifiedObjectName viewName);

/**
* Returns the view definition for the specified view name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1476,7 +1476,18 @@ public Optional<TrinoPrincipal> getSchemaOwner(Session session, CatalogSchemaNam
@Override
public boolean isView(Session session, QualifiedObjectName viewName)
{
return getViewInternal(session, viewName).isPresent();
if (cannotExist(viewName)) {
return false;
}

return getOptionalCatalogMetadata(session, viewName.catalogName())
.map(catalog -> {
CatalogHandle catalogHandle = catalog.getCatalogHandle(session, viewName, Optional.empty(), Optional.empty());
ConnectorMetadata metadata = catalog.getMetadataFor(session, catalogHandle);
ConnectorSession connectorSession = session.toConnectorSession(catalogHandle);
return metadata.isView(connectorSession, viewName.asSchemaTableName());
})
.orElse(false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,15 @@ public Optional<ConnectorViewDefinition> getView(ConnectorSession session, Schem
}
}

@Override
public boolean isView(ConnectorSession session, SchemaTableName viewName)
{
Span span = startSpan("isView", viewName);
try (var _ = scopedSpan(span)) {
return delegate.isView(session, viewName);
}
}

@Override
public Map<String, Object> getViewProperties(ConnectorSession session, SchemaTableName viewName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,12 @@ public Optional<ViewDefinition> getView(Session session, QualifiedObjectName vie
return Optional.ofNullable(views.get(viewName.asSchemaTableName()));
}

@Override
public boolean isView(Session session, QualifiedObjectName viewName)
{
return getView(session, viewName).isPresent();
}

@Override
public void createView(Session session, QualifiedObjectName viewName, ViewDefinition definition, Map<String, Object> viewProperties, boolean replace)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,12 @@ public Optional<ViewDefinition> getView(Session session, QualifiedObjectName vie
throw new UnsupportedOperationException();
}

@Override
public boolean isView(Session session, QualifiedObjectName viewName)
{
throw new UnsupportedOperationException();
}

@Override
public Map<String, Object> getViewProperties(Session session, QualifiedObjectName viewName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,14 @@ default Optional<ConnectorViewDefinition> getView(ConnectorSession session, Sche
return Optional.empty();
}

/**
* Is the specified table a view.
*/
default boolean isView(ConnectorSession session, SchemaTableName viewName)
{
return getView(session, viewName).isPresent();
}

/**
* Gets the view properties for the specified view.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,14 @@ public Optional<ConnectorViewDefinition> getView(ConnectorSession session, Schem
}
}

@Override
public boolean isView(ConnectorSession session, SchemaTableName viewName)
{
try (ThreadContextClassLoader _ = new ThreadContextClassLoader(classLoader)) {
return delegate.isView(session, viewName);
}
}

@Override
public Map<String, Object> getViewProperties(ConnectorSession session, SchemaTableName viewName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public enum IcebergErrorCode
ICEBERG_WRITER_CLOSE_ERROR(14, EXTERNAL),
ICEBERG_MISSING_METADATA(15, EXTERNAL),
ICEBERG_WRITER_DATA_ERROR(16, EXTERNAL),
ICEBERG_UNSUPPORTED_VIEW_DIALECT(17, EXTERNAL)
/**/;

private final ErrorCode errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_MISSING_METADATA;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_UNSUPPORTED_VIEW_DIALECT;
import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_MODIFIED_TIME;
import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_PATH;
import static io.trino.plugin.iceberg.IcebergMetadataColumn.isMetadataColumnId;
Expand Down Expand Up @@ -2504,6 +2505,20 @@ public Map<SchemaTableName, ConnectorViewDefinition> getViews(ConnectorSession s
return catalog.getViews(session, schemaName);
}

@Override
public boolean isView(ConnectorSession session, SchemaTableName viewName)
{
try {
return catalog.getView(session, viewName).isPresent();
}
catch (TrinoException e) {
if (e.getErrorCode() == ICEBERG_UNSUPPORTED_VIEW_DIALECT.toErrorCode()) {
return true;
}
throw e;
}
}

@Override
public Optional<ConnectorViewDefinition> getView(ConnectorSession session, SchemaTableName viewName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
import static io.trino.spi.StandardErrorCode.INVALID_ARGUMENTS;
import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.connector.ConnectorViewDefinition.ViewColumn;
import static io.trino.spi.function.InvocationConvention.InvocationArgumentConvention.NEVER_NULL;
import static io.trino.spi.function.InvocationConvention.InvocationReturnConvention.FAIL_ON_NULL;
import static io.trino.spi.predicate.Utils.nativeValueToBlock;
Expand Down Expand Up @@ -314,6 +315,18 @@ public static List<ColumnMetadata> getColumnMetadatas(Schema schema, TypeManager
return columns.build();
}

public static Schema updateColumnComment(Schema schema, String columnName, String comment)
{
NestedField fieldToUpdate = schema.findField(columnName);
checkArgument(fieldToUpdate != null, "Field %s does not exist", columnName);
NestedField updatedField = NestedField.of(fieldToUpdate.fieldId(), fieldToUpdate.isOptional(), fieldToUpdate.name(), fieldToUpdate.type(), comment);
List<NestedField> newFields = schema.columns().stream()
.map(field -> (field.fieldId() == updatedField.fieldId()) ? updatedField : field)
.toList();

return new Schema(newFields, schema.getAliases(), schema.identifierFieldIds());
}

public static IcebergColumnHandle getColumnHandle(NestedField column, TypeManager typeManager)
{
Type type = toTrinoType(column.type(), typeManager);
Expand Down Expand Up @@ -678,6 +691,27 @@ public static Schema schemaFromMetadata(List<ColumnMetadata> columns)
return new Schema(icebergSchema.asStructType().fields());
}

public static Schema schemaFromViewColumns(TypeManager typeManager, List<ViewColumn> columns)
{
List<NestedField> icebergColumns = new ArrayList<>();
AtomicInteger nextFieldId = new AtomicInteger(1);
for (ViewColumn column : columns) {
Type trinoType = typeManager.getType(column.getType());
org.apache.iceberg.types.Type type = toIcebergTypeForNewColumn(trinoType, nextFieldId);
NestedField field = NestedField.of(nextFieldId.getAndIncrement(), false, column.getName(), type, column.getComment().orElse(null));
icebergColumns.add(field);
}
org.apache.iceberg.types.Type icebergSchema = StructType.of(icebergColumns);
return new Schema(icebergSchema.asStructType().fields());
}

public static List<ViewColumn> viewColumnsFromSchema(TypeManager typeManager, Schema schema)
{
return IcebergUtil.getColumns(schema, typeManager).stream()
.map(column -> new ViewColumn(column.getName(), column.getType().getTypeId(), column.getComment()))
.toList();
}

public static Transaction newCreateTableTransaction(TrinoCatalog catalog, ConnectorTableMetadata tableMetadata, ConnectorSession session, boolean replace, String tableLocation)
{
SchemaTableName schemaTableName = tableMetadata.getTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ public abstract class AbstractTrinoCatalog
implements TrinoCatalog
{
public static final String TRINO_CREATED_BY_VALUE = "Trino Iceberg connector";
public static final String ICEBERG_VIEW_RUN_AS_OWNER = "trino.run-as-owner";

protected static final String TRINO_CREATED_BY = HiveMetadata.TRINO_CREATED_BY;
protected static final String TRINO_QUERY_ID_NAME = HiveMetadata.TRINO_QUERY_ID_NAME;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.trino.plugin.iceberg.fileio.ForwardingFileIo;
import io.trino.spi.catalog.CatalogName;
import io.trino.spi.security.ConnectorIdentity;
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.rest.HTTPClient;
import org.apache.iceberg.rest.RESTSessionCatalog;
Expand All @@ -46,6 +47,7 @@ public class TrinoIcebergRestCatalogFactory
private final boolean vendedCredentialsEnabled;
private final SecurityProperties securityProperties;
private final boolean uniqueTableLocation;
private final TypeManager typeManager;

@GuardedBy("this")
private RESTSessionCatalog icebergCatalog;
Expand All @@ -57,6 +59,7 @@ public TrinoIcebergRestCatalogFactory(
IcebergRestCatalogConfig restConfig,
SecurityProperties securityProperties,
IcebergConfig icebergConfig,
TypeManager typeManager,
NodeVersion nodeVersion)
{
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
Expand All @@ -70,6 +73,7 @@ public TrinoIcebergRestCatalogFactory(
this.securityProperties = requireNonNull(securityProperties, "securityProperties is null");
requireNonNull(icebergConfig, "icebergConfig is null");
this.uniqueTableLocation = icebergConfig.isUniqueTableLocation();
this.typeManager = requireNonNull(typeManager, "typeManager is null");
}

@Override
Expand Down Expand Up @@ -101,6 +105,6 @@ public synchronized TrinoCatalog create(ConnectorIdentity identity)
icebergCatalog = icebergCatalogInstance;
}

return new TrinoRestCatalog(icebergCatalog, catalogName, sessionType, trinoVersion, uniqueTableLocation);
return new TrinoRestCatalog(icebergCatalog, catalogName, sessionType, trinoVersion, typeManager, uniqueTableLocation);
}
}
Loading

0 comments on commit 406ed5b

Please sign in to comment.