Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hide table layouts from engine #363

Merged
merged 15 commits into from
Mar 9, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@
import io.prestosql.execution.Lifespan;
import io.prestosql.execution.TaskId;
import io.prestosql.execution.TaskStateMachine;
import io.prestosql.execution.warnings.WarningCollector;
import io.prestosql.memory.MemoryPool;
import io.prestosql.memory.QueryContext;
import io.prestosql.metadata.Metadata;
import io.prestosql.metadata.QualifiedObjectName;
import io.prestosql.metadata.Split;
import io.prestosql.metadata.TableHandle;
import io.prestosql.metadata.TableLayoutHandle;
import io.prestosql.metadata.TableLayoutResult;
import io.prestosql.operator.Driver;
import io.prestosql.operator.DriverContext;
import io.prestosql.operator.FilterAndProjectOperator;
Expand All @@ -41,23 +40,24 @@
import io.prestosql.operator.TaskContext;
import io.prestosql.operator.TaskStats;
import io.prestosql.operator.project.InputPageProjection;
import io.prestosql.operator.project.InterpretedPageProjection;
import io.prestosql.operator.project.PageProcessor;
import io.prestosql.operator.project.PageProjection;
import io.prestosql.security.AllowAllAccessControl;
import io.prestosql.spi.QueryId;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ConnectorPageSource;
import io.prestosql.spi.connector.Constraint;
import io.prestosql.spi.memory.MemoryPoolId;
import io.prestosql.spi.type.Type;
import io.prestosql.spiller.SpillSpaceTracker;
import io.prestosql.split.SplitSource;
import io.prestosql.sql.gen.PageFunctionCompiler;
import io.prestosql.sql.planner.Symbol;
import io.prestosql.sql.planner.TypeProvider;
import io.prestosql.sql.planner.optimizations.HashGenerationOptimizer;
import io.prestosql.sql.planner.plan.PlanNodeId;
import io.prestosql.sql.relational.RowExpression;
import io.prestosql.sql.tree.Expression;
import io.prestosql.sql.tree.NodeRef;
import io.prestosql.testing.LocalQueryRunner;
import io.prestosql.transaction.TransactionId;

Expand All @@ -79,9 +79,12 @@
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static io.prestosql.SystemSessionProperties.getFilterAndProjectMinOutputPageRowCount;
import static io.prestosql.SystemSessionProperties.getFilterAndProjectMinOutputPageSize;
import static io.prestosql.metadata.FunctionKind.SCALAR;
import static io.prestosql.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.UNGROUPED_SCHEDULING;
import static io.prestosql.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.sql.analyzer.ExpressionAnalyzer.getExpressionTypes;
import static io.prestosql.sql.relational.SqlToRowExpressionTranslator.translate;
import static io.prestosql.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -170,8 +173,7 @@ protected final OperatorFactory createTableScanOperator(int operatorId, PlanNode
List<ColumnHandle> columnHandles = columnHandlesBuilder.build();

// get the split for this table
List<TableLayoutResult> layouts = metadata.getLayouts(session, tableHandle, Constraint.alwaysTrue(), Optional.empty());
Split split = getLocalQuerySplit(session, layouts.get(0).getLayout().getHandle());
Split split = getLocalQuerySplit(session, tableHandle);

return new OperatorFactory()
{
Expand All @@ -196,7 +198,7 @@ public OperatorFactory duplicate()
};
}

private Split getLocalQuerySplit(Session session, TableLayoutHandle handle)
private Split getLocalQuerySplit(Session session, TableHandle handle)
{
SplitSource splitSource = localQueryRunner.getSplitManager().getSplits(session, handle, UNGROUPED_SCHEDULING);
List<Split> splits = new ArrayList<>();
Expand Down Expand Up @@ -226,13 +228,19 @@ protected final OperatorFactory createHashProjectOperator(int operatorId, PlanNo

Optional<Expression> hashExpression = HashGenerationOptimizer.getHashExpression(ImmutableList.copyOf(symbolTypes.build().keySet()));
verify(hashExpression.isPresent());
projections.add(new InterpretedPageProjection(
hashExpression.get(),
TypeProvider.copyOf(symbolTypes.build()),
symbolToInputMapping.build(),

Map<NodeRef<Expression>, Type> expressionTypes = getExpressionTypes(
session,
localQueryRunner.getMetadata(),
localQueryRunner.getSqlParser(),
session));
TypeProvider.copyOf(symbolTypes.build()),
hashExpression.get(),
ImmutableList.of(),
WarningCollector.NOOP);
RowExpression translated = translate(hashExpression.get(), SCALAR, expressionTypes, symbolToInputMapping.build(), localQueryRunner.getMetadata().getFunctionRegistry(), localQueryRunner.getTypeManager(), session, false);

PageFunctionCompiler functionCompiler = new PageFunctionCompiler(localQueryRunner.getMetadata(), 0);
projections.add(functionCompiler.compileProjection(translated, Optional.empty()).get());

return new FilterAndProjectOperator.FilterAndProjectOperatorFactory(
operatorId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@
import io.prestosql.metadata.Metadata;
import io.prestosql.metadata.QualifiedObjectName;
import io.prestosql.metadata.TableHandle;
import io.prestosql.metadata.TableLayout;
import io.prestosql.metadata.TableLayoutResult;
import io.prestosql.metadata.TableMetadata;
import io.prestosql.plugin.hive.HiveSessionProperties.InsertExistingPartitionsBehavior;
import io.prestosql.spi.connector.CatalogSchemaTableName;
import io.prestosql.spi.connector.ColumnMetadata;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.ConnectorTableLayoutHandle;
import io.prestosql.spi.connector.Constraint;
import io.prestosql.spi.security.Identity;
import io.prestosql.spi.security.SelectedRole;
Expand Down Expand Up @@ -1673,9 +1672,13 @@ private Object getHiveTableProperty(String tableName, Function<HiveTableLayoutHa
Optional<TableHandle> tableHandle = metadata.getTableHandle(transactionSession, new QualifiedObjectName(catalog, TPCH_SCHEMA, tableName));
assertTrue(tableHandle.isPresent());

List<TableLayoutResult> layouts = metadata.getLayouts(transactionSession, tableHandle.get(), Constraint.alwaysTrue(), Optional.empty());
TableLayout layout = getOnlyElement(layouts).getLayout();
return propertyGetter.apply((HiveTableLayoutHandle) layout.getHandle().getConnectorHandle());
ConnectorTableLayoutHandle connectorLayout = metadata.getLayout(transactionSession, tableHandle.get(), Constraint.alwaysTrue(), Optional.empty())
.get()
.getNewTableHandle()
.getLayout()
.get();

return propertyGetter.apply((HiveTableLayoutHandle) connectorLayout);
});
}

Expand Down
20 changes: 10 additions & 10 deletions presto-main/src/main/java/io/prestosql/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,25 +73,25 @@ public interface Metadata

Optional<TableHandle> getTableHandleForStatisticsCollection(Session session, QualifiedObjectName tableName, Map<String, Object> analyzeProperties);

List<TableLayoutResult> getLayouts(Session session, TableHandle tableHandle, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns);
Optional<TableLayoutResult> getLayout(Session session, TableHandle tableHandle, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns);

TableLayout getLayout(Session session, TableLayoutHandle handle);
TableLayout getLayout(Session session, TableHandle handle);

/**
* Return a table layout handle whose partitioning is converted to the provided partitioning handle,
* but otherwise identical to the provided table layout handle.
* The provided table layout handle must be one that the connector can transparently convert to from
* the original partitioning handle associated with the provided table layout handle,
* Return a table handle whose partitioning is converted to the provided partitioning handle,
* but otherwise identical to the provided table handle.
* The provided table handle must be one that the connector can transparently convert to from
* the original partitioning handle associated with the provided table handle,
* as promised by {@link #getCommonPartitioning}.
*/
TableLayoutHandle makeCompatiblePartitioning(Session session, TableLayoutHandle tableLayoutHandle, PartitioningHandle partitioningHandle);
martint marked this conversation as resolved.
Show resolved Hide resolved
TableHandle makeCompatiblePartitioning(Session session, TableHandle table, PartitioningHandle partitioningHandle);

/**
* Return a partitioning handle which the connector can transparently convert both {@code left} and {@code right} into.
*/
Optional<PartitioningHandle> getCommonPartitioning(Session session, PartitioningHandle left, PartitioningHandle right);

Optional<Object> getInfo(Session session, TableLayoutHandle handle);
Optional<Object> getInfo(Session session, TableHandle handle);

/**
* Return the metadata for the specified table handle.
Expand Down Expand Up @@ -241,14 +241,14 @@ public interface Metadata
/**
* @return whether delete without table scan is supported
*/
boolean supportsMetadataDelete(Session session, TableHandle tableHandle, TableLayoutHandle tableLayoutHandle);
boolean supportsMetadataDelete(Session session, TableHandle tableHandle);

/**
* Delete the provide table layout
*
* @return number of rows deleted, or empty for unknown
*/
OptionalLong metadataDelete(Session session, TableHandle tableHandle, TableLayoutHandle tableLayoutHandle);
OptionalLong metadataDelete(Session session, TableHandle tableHandle);

/**
* Begin delete query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,8 @@
import java.util.concurrent.ConcurrentMap;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.prestosql.metadata.QualifiedObjectName.convertFromSchemaTableName;
import static io.prestosql.metadata.TableLayout.fromConnectorLayout;
import static io.prestosql.metadata.ViewDefinition.ViewColumn;
import static io.prestosql.spi.StandardErrorCode.INVALID_VIEW;
import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
Expand Down Expand Up @@ -326,9 +324,14 @@ public Optional<TableHandle> getTableHandle(Session session, QualifiedObjectName
ConnectorId connectorId = catalogMetadata.getConnectorId(session, table);
ConnectorMetadata metadata = catalogMetadata.getMetadataFor(connectorId);

ConnectorTableHandle tableHandle = metadata.getTableHandle(session.toConnectorSession(connectorId), table.asSchemaTableName());
ConnectorSession connectorSession = session.toConnectorSession(connectorId);
ConnectorTableHandle tableHandle = metadata.getTableHandle(connectorSession, table.asSchemaTableName());
if (tableHandle != null) {
return Optional.of(new TableHandle(connectorId, tableHandle));
return Optional.of(new TableHandle(
connectorId,
tableHandle,
catalogMetadata.getTransactionHandleFor(connectorId),
Optional.empty()));
}
}
return Optional.empty();
Expand All @@ -347,7 +350,11 @@ public Optional<TableHandle> getTableHandleForStatisticsCollection(Session sessi

ConnectorTableHandle tableHandle = metadata.getTableHandleForStatisticsCollection(session.toConnectorSession(connectorId), table.asSchemaTableName(), analyzeProperties);
if (tableHandle != null) {
return Optional.of(new TableHandle(connectorId, tableHandle));
return Optional.of(new TableHandle(
connectorId,
tableHandle,
catalogMetadata.getTransactionHandleFor(connectorId),
Optional.empty()));
}
}
return Optional.empty();
Expand All @@ -373,10 +380,10 @@ public Optional<SystemTable> getSystemTable(Session session, QualifiedObjectName
}

@Override
public List<TableLayoutResult> getLayouts(Session session, TableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns)
public Optional<TableLayoutResult> getLayout(Session session, TableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns)
{
if (constraint.getSummary().isNone()) {
return ImmutableList.of();
return Optional.empty();
}

ConnectorId connectorId = table.getConnectorId();
Expand All @@ -387,33 +394,47 @@ public List<TableLayoutResult> getLayouts(Session session, TableHandle table, Co
ConnectorTransactionHandle transaction = catalogMetadata.getTransactionHandleFor(connectorId);
ConnectorSession connectorSession = session.toConnectorSession(connectorId);
List<ConnectorTableLayoutResult> layouts = metadata.getTableLayouts(connectorSession, connectorTable, constraint, desiredColumns);
if (layouts.isEmpty()) {
return Optional.empty();
}

if (layouts.size() > 1) {
throw new PrestoException(NOT_SUPPORTED, format("Connector returned multiple layouts for table %s", table));
}

return layouts.stream()
.map(layout -> new TableLayoutResult(fromConnectorLayout(connectorId, transaction, layout.getTableLayout()), layout.getUnenforcedConstraint()))
.collect(toImmutableList());
ConnectorTableLayout tableLayout = layouts.get(0).getTableLayout();
martint marked this conversation as resolved.
Show resolved Hide resolved
return Optional.of(new TableLayoutResult(
new TableHandle(connectorId, connectorTable, transaction, Optional.of(tableLayout.getHandle())),
new TableLayout(connectorId, transaction, tableLayout),
layouts.get(0).getUnenforcedConstraint()));
}

@Override
public TableLayout getLayout(Session session, TableLayoutHandle handle)
public TableLayout getLayout(Session session, TableHandle handle)
{
ConnectorId connectorId = handle.getConnectorId();
CatalogMetadata catalogMetadata = getCatalogMetadata(session, connectorId);
ConnectorMetadata metadata = catalogMetadata.getMetadataFor(connectorId);
ConnectorTransactionHandle transaction = catalogMetadata.getTransactionHandleFor(connectorId);
return fromConnectorLayout(connectorId, transaction, metadata.getTableLayout(session.toConnectorSession(connectorId), handle.getConnectorHandle()));
ConnectorSession connectorSession = session.toConnectorSession(connectorId);

return handle.getLayout()
.map(layout -> new TableLayout(connectorId, handle.getTransaction(), metadata.getTableLayout(connectorSession, layout)))
.orElseGet(() -> getLayout(session, handle, Constraint.alwaysTrue(), Optional.empty())
martint marked this conversation as resolved.
Show resolved Hide resolved
.get()
.getLayout());
}

@Override
public TableLayoutHandle makeCompatiblePartitioning(Session session, TableLayoutHandle tableLayoutHandle, PartitioningHandle partitioningHandle)
public TableHandle makeCompatiblePartitioning(Session session, TableHandle tableHandle, PartitioningHandle partitioningHandle)
{
checkArgument(partitioningHandle.getConnectorId().isPresent(), "Expect partitioning handle from connector, got system partitioning handle");
ConnectorId connectorId = partitioningHandle.getConnectorId().get();
checkArgument(connectorId.equals(tableLayoutHandle.getConnectorId()), "ConnectorId of tableLayoutHandle and partitioningHandle does not match");
checkArgument(connectorId.equals(tableHandle.getConnectorId()), "ConnectorId of tableHandle and partitioningHandle does not match");
CatalogMetadata catalogMetadata = getCatalogMetadata(session, connectorId);
ConnectorMetadata metadata = catalogMetadata.getMetadataFor(connectorId);
ConnectorTransactionHandle transaction = catalogMetadata.getTransactionHandleFor(connectorId);
ConnectorTableLayoutHandle newTableLayoutHandle = metadata.makeCompatiblePartitioning(session.toConnectorSession(connectorId), tableLayoutHandle.getConnectorHandle(), partitioningHandle.getConnectorHandle());
return new TableLayoutHandle(connectorId, transaction, newTableLayoutHandle);
ConnectorTableLayoutHandle newTableLayoutHandle = metadata.makeCompatiblePartitioning(session.toConnectorSession(connectorId), tableHandle.getLayout().get(), partitioningHandle.getConnectorHandle());
return new TableHandle(connectorId, tableHandle.getConnectorHandle(), transaction, Optional.of(newTableLayoutHandle));
}

@Override
Expand All @@ -435,12 +456,19 @@ public Optional<PartitioningHandle> getCommonPartitioning(Session session, Parti
}

@Override
public Optional<Object> getInfo(Session session, TableLayoutHandle handle)
public Optional<Object> getInfo(Session session, TableHandle handle)
{
ConnectorId connectorId = handle.getConnectorId();
ConnectorMetadata metadata = getMetadata(session, connectorId);
ConnectorTableLayout tableLayout = metadata.getTableLayout(session.toConnectorSession(connectorId), handle.getConnectorHandle());
return metadata.getInfo(tableLayout.getHandle());

ConnectorTableLayoutHandle layoutHandle = handle.getLayout()
.orElseGet(() -> getLayout(session, handle, Constraint.alwaysTrue(), Optional.empty())
.get()
.getNewTableHandle()
.getLayout()
.get());

return metadata.getInfo(layoutHandle);
}

@Override
Expand Down Expand Up @@ -781,22 +809,22 @@ public ColumnHandle getUpdateRowIdColumnHandle(Session session, TableHandle tabl
}

@Override
public boolean supportsMetadataDelete(Session session, TableHandle tableHandle, TableLayoutHandle tableLayoutHandle)
public boolean supportsMetadataDelete(Session session, TableHandle tableHandle)
{
ConnectorId connectorId = tableHandle.getConnectorId();
ConnectorMetadata metadata = getMetadata(session, connectorId);
return metadata.supportsMetadataDelete(
session.toConnectorSession(connectorId),
tableHandle.getConnectorHandle(),
tableLayoutHandle.getConnectorHandle());
tableHandle.getLayout().get());
}

@Override
public OptionalLong metadataDelete(Session session, TableHandle tableHandle, TableLayoutHandle tableLayoutHandle)
public OptionalLong metadataDelete(Session session, TableHandle tableHandle)
{
ConnectorId connectorId = tableHandle.getConnectorId();
ConnectorMetadata metadata = getMetadataForWrite(session, connectorId);
return metadata.metadataDelete(session.toConnectorSession(connectorId), tableHandle.getConnectorHandle(), tableLayoutHandle.getConnectorHandle());
return metadata.metadataDelete(session.toConnectorSession(connectorId), tableHandle.getConnectorHandle(), tableHandle.getLayout().get());
}

@Override
Expand All @@ -805,7 +833,7 @@ public TableHandle beginDelete(Session session, TableHandle tableHandle)
ConnectorId connectorId = tableHandle.getConnectorId();
ConnectorMetadata metadata = getMetadataForWrite(session, connectorId);
ConnectorTableHandle newHandle = metadata.beginDelete(session.toConnectorSession(connectorId), tableHandle.getConnectorHandle());
return new TableHandle(tableHandle.getConnectorId(), newHandle);
return new TableHandle(tableHandle.getConnectorId(), newHandle, tableHandle.getTransaction(), tableHandle.getLayout());
}

@Override
Expand Down
Loading