Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to redirect table reads from Hive to Iceberg #8340

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public ListenableFuture<Void> execute(
{
Session session = stateMachine.getSession();
QualifiedObjectName tableName = createQualifiedObjectName(session, statement, statement.getName());
Optional<TableHandle> tableHandle = metadata.getTableHandle(session, tableName);
Optional<TableHandle> tableHandle = metadata.getOriginalTableHandle(session, tableName, Optional.of(getName()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fail modifications for redirected tables in engine

I am not convinced we should to that

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here "modifications" is bad wording, meant DDLs. we already added support for DMLs in #8683

The last discussion that we had on this was #7606 (comment)

DDL operations are problematic since users need to be aware of which connector they are using, since they have different data types, partitioning, bucketing, etc. Our thought was that these are relatively rare operations by more advanced users and that trying to have hidden redirections would end up causing more confusion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you also feel otherwise for DDLs?

Copy link
Member

@findepi findepi Nov 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users sooner or later will ask why ALTER ... ADD COLUMN (something varchar) does not work, and I won't be able to explain to them why not. Yes, from engineer perspective, we may have harder time supporting things like column properties (maybe, or maybe not), but i would assume there are not always used, so users who do not intend to use column properties, will not accept this as a rational explanation for the limitation.
TL;DR yes, DDLs like ALTER .. ADD/DROP COLUMN should be routed as well.

cc @losipiuk @alexjo2144 @claudiusli

if (tableHandle.isEmpty()) {
if (!statement.isTableExists()) {
throw semanticException(TABLE_NOT_FOUND, statement, "Table '%s' does not exist", tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public ListenableFuture<Void> execute(

if (statement.getType() == Comment.Type.TABLE) {
QualifiedObjectName tableName = createQualifiedObjectName(session, statement, statement.getName());
Optional<TableHandle> tableHandle = metadata.getTableHandle(session, tableName);
Optional<TableHandle> tableHandle = metadata.getOriginalTableHandle(session, tableName, Optional.of(getName()));
if (tableHandle.isEmpty()) {
throw semanticException(TABLE_NOT_FOUND, statement, "Table does not exist: %s", tableName);
}
Expand All @@ -77,7 +77,7 @@ else if (statement.getType() == Comment.Type.COLUMN) {
}

QualifiedObjectName tableName = createQualifiedObjectName(session, statement, prefix.get());
Optional<TableHandle> tableHandle = metadata.getTableHandle(session, tableName);
Optional<TableHandle> tableHandle = metadata.getOriginalTableHandle(session, tableName, Optional.of(getName()));
if (tableHandle.isEmpty()) {
throw semanticException(TABLE_NOT_FOUND, statement, "Table does not exist: " + tableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ ListenableFuture<Void> internalExecute(CreateTable statement, Metadata metadata,

Map<NodeRef<Parameter>, Expression> parameterLookup = parameterExtractor(statement, parameters);
QualifiedObjectName tableName = createQualifiedObjectName(session, statement, statement.getName());
Optional<TableHandle> tableHandle = metadata.getTableHandle(session, tableName);
Optional<TableHandle> tableHandle = metadata.getOriginalTableHandle(session, tableName, Optional.of(getName()));
if (tableHandle.isPresent()) {
if (!statement.isNotExists()) {
throw semanticException(TABLE_ALREADY_EXISTS, statement, "Table '%s' already exists", tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public ListenableFuture<Void> execute(
{
Session session = stateMachine.getSession();
QualifiedObjectName tableName = createQualifiedObjectName(session, statement, statement.getTable());
Optional<TableHandle> tableHandleOptional = metadata.getTableHandle(session, tableName);
Optional<TableHandle> tableHandleOptional = metadata.getOriginalTableHandle(session, tableName, Optional.of(getName()));

if (tableHandleOptional.isEmpty()) {
if (!statement.isTableExists()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public ListenableFuture<Void> execute(
Session session = stateMachine.getSession();
QualifiedObjectName tableName = createQualifiedObjectName(session, statement, statement.getTableName());

Optional<TableHandle> tableHandle = metadata.getTableHandle(session, tableName);
Optional<TableHandle> tableHandle = metadata.getOriginalTableHandle(session, tableName, Optional.of(getName()));
if (tableHandle.isEmpty()) {
if (!statement.isExists()) {
throw semanticException(TABLE_NOT_FOUND, statement, "Table '%s' does not exist", tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private void executeGrantOnSchema(Session session, Grant statement, Metadata met
private void executeGrantOnTable(Session session, Grant statement, Metadata metadata, AccessControl accessControl)
{
QualifiedObjectName tableName = createQualifiedObjectName(session, statement, statement.getName());
Optional<TableHandle> tableHandle = metadata.getTableHandle(session, tableName);
Optional<TableHandle> tableHandle = metadata.getOriginalTableHandle(session, tableName, Optional.of(getName()));
if (tableHandle.isEmpty()) {
throw semanticException(TABLE_NOT_FOUND, statement, "Table '%s' does not exist", tableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public ListenableFuture<Void> execute(
{
Session session = stateMachine.getSession();
QualifiedObjectName tableName = createQualifiedObjectName(session, statement, statement.getTable());
Optional<TableHandle> tableHandleOptional = metadata.getTableHandle(session, tableName);
Optional<TableHandle> tableHandleOptional = metadata.getOriginalTableHandle(session, tableName, Optional.of(getName()));
if (tableHandleOptional.isEmpty()) {
if (!statement.isTableExists()) {
throw semanticException(TABLE_NOT_FOUND, statement, "Table '%s' does not exist", tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public ListenableFuture<Void> execute(
{
Session session = stateMachine.getSession();
QualifiedObjectName tableName = createQualifiedObjectName(session, statement, statement.getSource());
Optional<TableHandle> tableHandle = metadata.getTableHandle(session, tableName);
Optional<TableHandle> tableHandle = metadata.getOriginalTableHandle(session, tableName, Optional.of(getName()));
if (tableHandle.isEmpty()) {
if (!statement.isExists()) {
throw semanticException(TABLE_NOT_FOUND, statement, "Table '%s' does not exist", tableName);
Expand All @@ -68,7 +68,7 @@ public ListenableFuture<Void> execute(
if (metadata.getCatalogHandle(session, target.getCatalogName()).isEmpty()) {
throw semanticException(CATALOG_NOT_FOUND, statement, "Target catalog '%s' does not exist", target.getCatalogName());
}
if (metadata.getTableHandle(session, target).isPresent()) {
if (metadata.getOriginalTableHandle(session, target, Optional.of(getName())).isPresent()) {
throw semanticException(TABLE_ALREADY_EXISTS, statement, "Target table '%s' already exists", target);
}
if (!tableName.getCatalogName().equals(target.getCatalogName())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private void executeRevokeOnSchema(Session session, Revoke statement, Metadata m
private void executeRevokeOnTable(Session session, Revoke statement, Metadata metadata, AccessControl accessControl)
{
QualifiedObjectName tableName = createQualifiedObjectName(session, statement, statement.getName());
Optional<TableHandle> tableHandle = metadata.getTableHandle(session, tableName);
Optional<TableHandle> tableHandle = metadata.getOriginalTableHandle(session, tableName, Optional.of(getName()));
if (tableHandle.isEmpty()) {
throw semanticException(TABLE_NOT_FOUND, statement, "Table '%s' does not exist", tableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.trino.transaction.TransactionManager;

import java.util.List;
import java.util.Optional;

import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static io.trino.metadata.MetadataUtil.createPrincipal;
Expand Down Expand Up @@ -61,7 +62,7 @@ public ListenableFuture<Void> execute(

CatalogName catalogName = metadata.getCatalogHandle(session, tableName.getCatalogName())
.orElseThrow(() -> new TrinoException(NOT_FOUND, "Catalog does not exist: " + tableName.getCatalogName()));
if (metadata.getTableHandle(session, tableName).isEmpty()) {
if (metadata.getOriginalTableHandle(session, tableName, Optional.of(getName())).isEmpty()) {
throw semanticException(TABLE_NOT_FOUND, statement, "Table '%s' does not exist", tableName);
}

Expand Down
10 changes: 5 additions & 5 deletions core/trino-main/src/main/java/io/trino/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,6 @@ public interface Metadata

List<String> listSchemaNames(Session session, String catalogName);

/**
* Returns a table handle for the specified table name.
*/
Optional<TableHandle> getTableHandle(Session session, QualifiedObjectName tableName);

Optional<SystemTable> getSystemTable(Session session, QualifiedObjectName tableName);

Optional<TableHandle> getTableHandleForStatisticsCollection(Session session, QualifiedObjectName tableName, Map<String, Object> analyzeProperties);
Expand Down Expand Up @@ -669,4 +664,9 @@ default ResolvedFunction getCoercion(Type fromType, Type toType)
* Get the target table handle after performing redirection.
*/
RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session session, QualifiedObjectName tableName);

/**
* Get the table handle for {@param tableName} if it is not redirected. Throws an exception otherwise.
*/
Optional<TableHandle> getOriginalTableHandle(Session session, QualifiedObjectName tableName, Optional<String> invokerDescription);
}
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,7 @@ public List<String> listSchemaNames(Session session, String catalogName)
return ImmutableList.copyOf(schemaNames.build());
}

@Override
public Optional<TableHandle> getTableHandle(Session session, QualifiedObjectName table)
private Optional<TableHandle> getTableHandle(Session session, QualifiedObjectName table)
{
requireNonNull(table, "table is null");

Expand Down Expand Up @@ -410,6 +409,10 @@ public Optional<TableHandle> getTableHandleForStatisticsCollection(Session sessi
CatalogName catalogName = catalogMetadata.getConnectorId(session, table);
ConnectorMetadata metadata = catalogMetadata.getMetadataFor(catalogName);

QualifiedObjectName targetTableName = getRedirectedTableName(session, table);
if (!targetTableName.equals(table)) {
throw new TrinoException(NOT_SUPPORTED, format("Cannot collect statistics for table '%s', because it is redirected to '%s'", table, targetTableName));
}
ConnectorTableHandle tableHandle = metadata.getTableHandleForStatisticsCollection(session.toConnectorSession(catalogName), table.asSchemaTableName(), analyzeProperties);
if (tableHandle != null) {
return Optional.of(new TableHandle(
Expand Down Expand Up @@ -646,7 +649,12 @@ private boolean isExistingRelation(Session session, QualifiedObjectName name)
return true;
}

return getTableHandle(session, name).isPresent();
if (!getRedirectedTableName(session, name).equals(name)) {
// If the table is redirected, we do not check for existence of the target table
return true;
}

return getOriginalTableHandle(session, name, Optional.empty()).isPresent();
}

@Override
Expand Down Expand Up @@ -1412,6 +1420,11 @@ private QualifiedObjectName getRedirectedTableName(Session session, QualifiedObj
requireNonNull(session, "session is null");
requireNonNull(originalTableName, "originalTableName is null");

if (originalTableName.getCatalogName().isEmpty() || originalTableName.getSchemaName().isEmpty() || originalTableName.getObjectName().isEmpty()) {
// table cannot exist
return originalTableName;
}

QualifiedObjectName tableName = originalTableName;
Set<QualifiedObjectName> visitedTableNames = new LinkedHashSet<>();
visitedTableNames.add(tableName);
Expand Down Expand Up @@ -1472,6 +1485,22 @@ public RedirectionAwareTableHandle getRedirectionAwareTableHandle(Session sessio
throw new TrinoException(TABLE_REDIRECTION_ERROR, format("Table '%s' redirected to '%s', but the target table '%s' does not exist", tableName, targetTableName, targetTableName));
}

@Override
public Optional<TableHandle> getOriginalTableHandle(Session session, QualifiedObjectName tableName, Optional<String> invokerDescription)
{
RedirectionAwareTableHandle redirectionAwareTableHandle = getRedirectionAwareTableHandle(session, tableName);
Optional<QualifiedObjectName> redirected = redirectionAwareTableHandle.getRedirectedTableName();
if (redirected.isPresent()) {
throw new TrinoException(NOT_SUPPORTED,
format("Failed to %s. It is not supported on table '%s' because the table is redirected to '%s'",
invokerDescription.map(value -> format("perform operation '%s'", value)).orElse("get the table handle"),
tableName,
redirected.get()));
}

return redirectionAwareTableHandle.getTableHandle();
}

@Override
public Optional<ResolvedIndex> resolveIndex(Session session, TableHandle tableHandle, Set<ColumnHandle> indexableColumns, Set<ColumnHandle> outputColumns, TupleDomain<ColumnHandle> tupleDomain)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,14 @@ public static PrincipalSpecification createPrincipal(TrinoPrincipal principal)
throw new IllegalArgumentException("Unsupported type: " + type);
}

// TODO: move this method to test
public static boolean tableExists(Metadata metadata, Session session, String table)
{
if (session.getCatalog().isEmpty() || session.getSchema().isEmpty()) {
return false;
}
QualifiedObjectName name = new QualifiedObjectName(session.getCatalog().get(), session.getSchema().get(), table);
return metadata.getTableHandle(session, name).isPresent();
return metadata.getOriginalTableHandle(session, name, Optional.empty()).isPresent();
}

public static class TableMetadataBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ protected Scope visitInsert(Insert insert, Optional<Scope> scope)
Scope queryScope = analyze(insert.getQuery(), createScope(scope));

// verify the insert destination columns match the query
Optional<TableHandle> targetTableHandle = metadata.getTableHandle(session, targetTable);
Optional<TableHandle> targetTableHandle = metadata.getOriginalTableHandle(session, targetTable, Optional.of("INSERT"));
if (targetTableHandle.isEmpty()) {
throw semanticException(TABLE_NOT_FOUND, insert, "Table '%s' does not exist", targetTable);
}
Expand Down Expand Up @@ -543,14 +543,13 @@ protected Scope visitRefreshMaterializedView(RefreshMaterializedView refreshMate
}

QualifiedObjectName targetTable = createQualifiedObjectName(session, refreshMaterializedView, storageName.get());
checkStorageTableNotRedirected(targetTable);

// analyze the query that creates the data
Query query = parseView(optionalView.get().getOriginalSql(), name, refreshMaterializedView);
Scope queryScope = process(query, scope);

// verify the insert destination columns match the query
Optional<TableHandle> targetTableHandle = metadata.getTableHandle(session, targetTable);
Optional<TableHandle> targetTableHandle = getStorageTableHandle(targetTable, name);
if (targetTableHandle.isEmpty()) {
throw semanticException(TABLE_NOT_FOUND, refreshMaterializedView, "Table '%s' does not exist", targetTable);
}
Expand Down Expand Up @@ -665,7 +664,7 @@ protected Scope visitDelete(Delete node, Optional<Scope> scope)
throw semanticException(NOT_SUPPORTED, node, "Deleting from views is not supported");
}

TableHandle handle = metadata.getTableHandle(session, tableName)
TableHandle handle = metadata.getOriginalTableHandle(session, tableName, Optional.of("DELETE"))
.orElseThrow(() -> semanticException(TABLE_NOT_FOUND, table, "Table '%s' does not exist", tableName));

accessControl.checkCanDeleteFromTable(session.toSecurityContext(), tableName);
Expand Down Expand Up @@ -751,7 +750,7 @@ protected Scope visitCreateTableAsSelect(CreateTableAsSelect node, Optional<Scop
// turn this into a query that has a new table writer node on top.
QualifiedObjectName targetTable = createQualifiedObjectName(session, node, node.getName());

Optional<TableHandle> targetTableHandle = metadata.getTableHandle(session, targetTable);
Optional<TableHandle> targetTableHandle = metadata.getOriginalTableHandle(session, targetTable, Optional.of("CREATE TABLE AS SELECT"));
if (targetTableHandle.isPresent()) {
if (node.isNotExists()) {
analysis.setCreate(new Analysis.Create(
Expand Down Expand Up @@ -1303,8 +1302,7 @@ protected Scope visitTable(Table table, Optional<Scope> scope)
throw semanticException(INVALID_VIEW, table, "Materialized view '%s' is fresh but does not have storage table name", name);
}
QualifiedObjectName storageTableName = createQualifiedObjectName(session, table, storageName.get());
checkStorageTableNotRedirected(storageTableName);
Optional<TableHandle> tableHandle = metadata.getTableHandle(session, storageTableName);
Optional<TableHandle> tableHandle = getStorageTableHandle(storageTableName, name);
if (tableHandle.isEmpty()) {
throw semanticException(INVALID_VIEW, table, "Storage table '%s' does not exist", storageTableName);
}
Expand Down Expand Up @@ -1387,11 +1385,9 @@ protected Scope visitTable(Table table, Optional<Scope> scope)
return tableScope;
}

private void checkStorageTableNotRedirected(QualifiedObjectName source)
private Optional<TableHandle> getStorageTableHandle(QualifiedObjectName storageTableName, QualifiedObjectName viewName)
{
metadata.getRedirectionAwareTableHandle(session, source).getRedirectedTableName().ifPresent(name -> {
throw new TrinoException(NOT_SUPPORTED, format("Redirection of materialized view storage table '%s' to '%s' is not supported", source, name));
});
return metadata.getOriginalTableHandle(session, storageTableName, Optional.of(format("Scan of storage table '%s' for materialized view '%s'", storageTableName, viewName)));
}

private void analyzeFiltersAndMasks(Table table, QualifiedObjectName name, Optional<TableHandle> tableHandle, List<Field> fields, String authorization)
Expand Down Expand Up @@ -1580,7 +1576,6 @@ private List<Field> analyzeStorageTable(Table table, List<Field> viewFields, Tab
TableSchema tableSchema = metadata.getTableSchema(session, storageTable);
Map<String, ColumnHandle> columnHandles = metadata.getColumnHandles(session, storageTable);
QualifiedObjectName tableName = createQualifiedObjectName(session, table, table.getName());
checkStorageTableNotRedirected(tableName);
List<Field> tableFields = analyzeTableOutputFields(table, tableName, tableSchema, columnHandles)
.stream()
.filter(field -> !field.isHidden())
Expand Down Expand Up @@ -2211,7 +2206,7 @@ protected Scope visitUpdate(Update update, Optional<Scope> scope)
throw semanticException(NOT_SUPPORTED, update, "Updating through views is not supported");
}

TableHandle handle = metadata.getTableHandle(session, tableName)
TableHandle handle = metadata.getOriginalTableHandle(session, tableName, Optional.of("UPDATE"))
.orElseThrow(() -> semanticException(TABLE_NOT_FOUND, table, "Table '%s' does not exist", tableName));

TableMetadata tableMetadata = metadata.getTableMetadata(session, handle);
Expand Down
Loading