Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Complex pushdown #402

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import io.prestosql.metadata.QualifiedObjectName;
import io.prestosql.metadata.Split;
import io.prestosql.metadata.TableHandle;
import io.prestosql.metadata.TableLayoutHandle;
import io.prestosql.metadata.TableLayoutResult;
import io.prestosql.operator.Driver;
import io.prestosql.operator.DriverContext;
import io.prestosql.operator.FilterAndProjectOperator;
Expand All @@ -41,23 +39,25 @@
import io.prestosql.operator.TaskContext;
import io.prestosql.operator.TaskStats;
import io.prestosql.operator.project.InputPageProjection;
import io.prestosql.operator.project.InterpretedPageProjection;
import io.prestosql.operator.project.PageProcessor;
import io.prestosql.operator.project.PageProjection;
import io.prestosql.security.AllowAllAccessControl;
import io.prestosql.spi.QueryId;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ConnectorPageSource;
import io.prestosql.spi.connector.Constraint;
import io.prestosql.spi.memory.MemoryPoolId;
import io.prestosql.spi.type.Type;
import io.prestosql.spiller.SpillSpaceTracker;
import io.prestosql.split.SplitSource;
import io.prestosql.sql.gen.PageFunctionCompiler;
import io.prestosql.sql.planner.Symbol;
import io.prestosql.sql.planner.TypeAnalyzer;
import io.prestosql.sql.planner.TypeProvider;
import io.prestosql.sql.planner.optimizations.HashGenerationOptimizer;
import io.prestosql.sql.planner.plan.PlanNodeId;
import io.prestosql.sql.relational.RowExpression;
import io.prestosql.sql.tree.Expression;
import io.prestosql.sql.tree.NodeRef;
import io.prestosql.testing.LocalQueryRunner;
import io.prestosql.transaction.TransactionId;

Expand All @@ -79,9 +79,11 @@
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static io.prestosql.SystemSessionProperties.getFilterAndProjectMinOutputPageRowCount;
import static io.prestosql.SystemSessionProperties.getFilterAndProjectMinOutputPageSize;
import static io.prestosql.metadata.FunctionKind.SCALAR;
import static io.prestosql.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.UNGROUPED_SCHEDULING;
import static io.prestosql.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.sql.relational.SqlToRowExpressionTranslator.translate;
import static io.prestosql.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -170,8 +172,7 @@ protected final OperatorFactory createTableScanOperator(int operatorId, PlanNode
List<ColumnHandle> columnHandles = columnHandlesBuilder.build();

// get the split for this table
List<TableLayoutResult> layouts = metadata.getLayouts(session, tableHandle, Constraint.alwaysTrue(), Optional.empty());
Split split = getLocalQuerySplit(session, layouts.get(0).getLayout().getHandle());
Split split = getLocalQuerySplit(session, tableHandle);

return new OperatorFactory()
{
Expand All @@ -196,7 +197,7 @@ public OperatorFactory duplicate()
};
}

private Split getLocalQuerySplit(Session session, TableLayoutHandle handle)
private Split getLocalQuerySplit(Session session, TableHandle handle)
{
SplitSource splitSource = localQueryRunner.getSplitManager().getSplits(session, handle, UNGROUPED_SCHEDULING);
List<Split> splits = new ArrayList<>();
Expand Down Expand Up @@ -226,13 +227,14 @@ protected final OperatorFactory createHashProjectOperator(int operatorId, PlanNo

Optional<Expression> hashExpression = HashGenerationOptimizer.getHashExpression(ImmutableList.copyOf(symbolTypes.build().keySet()));
verify(hashExpression.isPresent());
projections.add(new InterpretedPageProjection(
hashExpression.get(),
TypeProvider.copyOf(symbolTypes.build()),
symbolToInputMapping.build(),
localQueryRunner.getMetadata(),
localQueryRunner.getSqlParser(),
session));

Map<NodeRef<Expression>, Type> expressionTypes = new TypeAnalyzer(localQueryRunner.getSqlParser(), localQueryRunner.getMetadata())
.getTypes(session, TypeProvider.copyOf(symbolTypes.build()), hashExpression.get());

RowExpression translated = translate(hashExpression.get(), SCALAR, expressionTypes, symbolToInputMapping.build(), localQueryRunner.getMetadata().getFunctionRegistry(), localQueryRunner.getTypeManager(), session, false);

PageFunctionCompiler functionCompiler = new PageFunctionCompiler(localQueryRunner.getMetadata(), 0);
projections.add(functionCompiler.compileProjection(translated, Optional.empty()).get());

return new FilterAndProjectOperator.FilterAndProjectOperatorFactory(
operatorId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,6 @@ public void testPushDownAnd()
private RuleAssert assertRuleApplication()
{
RuleTester tester = tester();
return tester.assertThat(new ExtractSpatialInnerJoin(tester.getMetadata(), tester.getSplitManager(), tester.getPageSourceManager(), tester.getSqlParser()));
return tester.assertThat(new ExtractSpatialInnerJoin(tester.getMetadata(), tester.getSplitManager(), tester.getPageSourceManager(), tester.getTypeAnalyzer()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,6 @@ public void testPushDownAnd()
private RuleAssert assertRuleApplication()
{
RuleTester tester = tester();
return tester().assertThat(new ExtractSpatialLeftJoin(tester.getMetadata(), tester.getSplitManager(), tester.getPageSourceManager(), tester.getSqlParser()));
return tester().assertThat(new ExtractSpatialLeftJoin(tester.getMetadata(), tester.getSplitManager(), tester.getPageSourceManager(), tester.getTypeAnalyzer()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@
import io.prestosql.metadata.Metadata;
import io.prestosql.metadata.QualifiedObjectName;
import io.prestosql.metadata.TableHandle;
import io.prestosql.metadata.TableLayout;
import io.prestosql.metadata.TableLayoutResult;
import io.prestosql.metadata.TableMetadata;
import io.prestosql.plugin.hive.HiveSessionProperties.InsertExistingPartitionsBehavior;
import io.prestosql.spi.connector.CatalogSchemaTableName;
import io.prestosql.spi.connector.ColumnMetadata;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.ConnectorTableLayoutHandle;
import io.prestosql.spi.connector.Constraint;
import io.prestosql.spi.security.Identity;
import io.prestosql.spi.security.SelectedRole;
Expand Down Expand Up @@ -1673,9 +1672,13 @@ private Object getHiveTableProperty(String tableName, Function<HiveTableLayoutHa
Optional<TableHandle> tableHandle = metadata.getTableHandle(transactionSession, new QualifiedObjectName(catalog, TPCH_SCHEMA, tableName));
assertTrue(tableHandle.isPresent());

List<TableLayoutResult> layouts = metadata.getLayouts(transactionSession, tableHandle.get(), Constraint.alwaysTrue(), Optional.empty());
TableLayout layout = getOnlyElement(layouts).getLayout();
return propertyGetter.apply((HiveTableLayoutHandle) layout.getHandle().getConnectorHandle());
ConnectorTableLayoutHandle connectorLayout = metadata.getLayout(transactionSession, tableHandle.get(), Constraint.alwaysTrue(), Optional.empty())
.get()
.getNewTableHandle()
.getLayout()
.get();

return propertyGetter.apply((HiveTableLayoutHandle) connectorLayout);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import io.prestosql.sql.planner.PlanOptimizers;
import io.prestosql.sql.planner.StageExecutionPlan;
import io.prestosql.sql.planner.SubPlan;
import io.prestosql.sql.planner.TypeAnalyzer;
import io.prestosql.sql.planner.optimizations.PlanOptimizer;
import io.prestosql.sql.tree.Explain;
import io.prestosql.transaction.TransactionManager;
Expand Down Expand Up @@ -414,7 +415,7 @@ private PlanRoot doAnalyzeQuery()

// plan query
PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator();
LogicalPlanner logicalPlanner = new LogicalPlanner(stateMachine.getSession(), planOptimizers, idAllocator, metadata, sqlParser, statsCalculator, costCalculator, stateMachine.getWarningCollector());
LogicalPlanner logicalPlanner = new LogicalPlanner(stateMachine.getSession(), planOptimizers, idAllocator, metadata, new TypeAnalyzer(sqlParser, metadata), statsCalculator, costCalculator, stateMachine.getWarningCollector());
Plan plan = logicalPlanner.plan(analysis);
queryPlan.set(plan);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.metadata;

import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.expression.ConnectorExpression;

import java.util.List;

public class FilterApplicationResult
{
private final TableHandle table;
private final ConnectorExpression remainingFilter;
private final List<Column> newProjections;

public FilterApplicationResult(TableHandle table, ConnectorExpression remainingFilter, List<Column> newProjections)
{
this.table = table;
this.remainingFilter = remainingFilter;
this.newProjections = newProjections;
}

public TableHandle getTable()
{
return table;
}

public ConnectorExpression getRemainingFilter()
{
return remainingFilter;
}

public List<Column> getNewProjections()
{
return newProjections;
}

public static class Column
{
private final ColumnHandle column;
private final Type type;

public Column(ColumnHandle column, Type type)
{
this.column = column;
this.type = type;
}

public ColumnHandle getColumn()
{
return column;
}

public Type getType()
{
return type;
}
}
}
24 changes: 14 additions & 10 deletions presto-main/src/main/java/io/prestosql/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.prestosql.spi.type.TypeManager;
import io.prestosql.spi.type.TypeSignature;
import io.prestosql.sql.planner.PartitioningHandle;
import io.prestosql.spi.expression.ConnectorExpression;
import io.prestosql.sql.tree.QualifiedName;

import java.util.Collection;
Expand Down Expand Up @@ -73,25 +74,25 @@ public interface Metadata

Optional<TableHandle> getTableHandleForStatisticsCollection(Session session, QualifiedObjectName tableName, Map<String, Object> analyzeProperties);

List<TableLayoutResult> getLayouts(Session session, TableHandle tableHandle, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns);
Optional<TableLayoutResult> getLayout(Session session, TableHandle tableHandle, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns);

TableLayout getLayout(Session session, TableLayoutHandle handle);
TableLayout getLayout(Session session, TableHandle handle);

/**
* Return a table layout handle whose partitioning is converted to the provided partitioning handle,
* but otherwise identical to the provided table layout handle.
* The provided table layout handle must be one that the connector can transparently convert to from
* the original partitioning handle associated with the provided table layout handle,
* Return a table handle whose partitioning is converted to the provided partitioning handle,
* but otherwise identical to the provided table handle.
* The provided table handle must be one that the connector can transparently convert to from
* the original partitioning handle associated with the provided table handle,
* as promised by {@link #getCommonPartitioning}.
*/
TableLayoutHandle makeCompatiblePartitioning(Session session, TableLayoutHandle tableLayoutHandle, PartitioningHandle partitioningHandle);
TableHandle makeCompatiblePartitioning(Session session, TableHandle table, PartitioningHandle partitioningHandle);

/**
* Return a partitioning handle which the connector can transparently convert both {@code left} and {@code right} into.
*/
Optional<PartitioningHandle> getCommonPartitioning(Session session, PartitioningHandle left, PartitioningHandle right);

Optional<Object> getInfo(Session session, TableLayoutHandle handle);
Optional<Object> getInfo(Session session, TableHandle handle);

/**
* Return the metadata for the specified table handle.
Expand Down Expand Up @@ -241,14 +242,14 @@ public interface Metadata
/**
* @return whether delete without table scan is supported
*/
boolean supportsMetadataDelete(Session session, TableHandle tableHandle, TableLayoutHandle tableLayoutHandle);
boolean supportsMetadataDelete(Session session, TableHandle tableHandle);

/**
* Delete the provide table layout
*
* @return number of rows deleted, or empty for unknown
*/
OptionalLong metadataDelete(Session session, TableHandle tableHandle, TableLayoutHandle tableLayoutHandle);
OptionalLong metadataDelete(Session session, TableHandle tableHandle);

/**
* Begin delete query
Expand Down Expand Up @@ -380,4 +381,7 @@ public interface Metadata
ColumnPropertyManager getColumnPropertyManager();

AnalyzePropertyManager getAnalyzePropertyManager();

// => TableHandle + remaining filter + new projections
Optional<FilterApplicationResult> applyFilter(TableHandle table, ConnectorExpression expression);
}
Loading