Skip to content

Commit

Permalink
[trinodb#36] Expose columns in unenforced predicate to QueryInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
yangjinde committed Apr 24, 2023
1 parent 0ab4ee3 commit 027234a
Show file tree
Hide file tree
Showing 14 changed files with 489 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ private static QueryInfo immediateFailureQueryInfo(
ImmutableList.of(),
ImmutableList.of(),
true,
ImmutableSet.of(),
resourceGroupId,
Optional.empty(),
RetryPolicy.NONE);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.execution;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;

import javax.annotation.concurrent.Immutable;

import java.util.Objects;
import java.util.Set;

import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

@Immutable
public final class ColumnsInPredicate
{
private final String catalog;
private final String schema;
private final String table;
private final Set<Column> colsInDiscretePredicate;
private final Set<Column> colsInRangePredicate;

@JsonCreator
public ColumnsInPredicate(
@JsonProperty("catalog") String catalog,
@JsonProperty("schema") String schema,
@JsonProperty("table") String table,
@JsonProperty("colsInDiscretePredicate") Set<Column> colsInDiscretePredicate,
@JsonProperty("colsInRangePredicate") Set<Column> colsInRangePredicate)
{
this.catalog = requireNonNull(catalog, "catalog is null");
this.schema = requireNonNull(schema, "schema is null");
this.table = requireNonNull(table, "table is null");
requireNonNull(colsInDiscretePredicate, "colsInDiscretePredicate is null");
this.colsInDiscretePredicate = ImmutableSet.copyOf(colsInDiscretePredicate);
requireNonNull(colsInRangePredicate, "colsInRangePredicate is null");
this.colsInRangePredicate = ImmutableSet.copyOf(colsInRangePredicate);
}

@JsonProperty
public String getCatalog()
{
return catalog;
}

@JsonProperty
public String getSchema()
{
return schema;
}

@JsonProperty
public String getTable()
{
return table;
}

@JsonProperty
public Set<Column> getColsInDiscretePredicate()
{
return colsInDiscretePredicate;
}

@JsonProperty
public Set<Column> getColsInRangePredicate()
{
return colsInRangePredicate;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ColumnsInPredicate other = (ColumnsInPredicate) o;
return Objects.equals(catalog, other.catalog) &&
Objects.equals(schema, other.schema) &&
Objects.equals(table, other.table) &&
Objects.equals(colsInDiscretePredicate, other.colsInDiscretePredicate) &&
Objects.equals(colsInRangePredicate, other.colsInRangePredicate);
}

@Override
public int hashCode()
{
return Objects.hash(catalog, schema, table, colsInDiscretePredicate, colsInRangePredicate);
}

@Override
public String toString()
{
return toStringHelper(this)
.addValue(catalog)
.addValue(schema)
.addValue(table)
.addValue(colsInDiscretePredicate)
.addValue(colsInRangePredicate)
.toString();
}
}
10 changes: 10 additions & 0 deletions core/trino-main/src/main/java/io/trino/execution/QueryInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class QueryInfo
private final Optional<StageInfo> outputStage;
private final List<TableInfo> referencedTables;
private final List<RoutineInfo> routines;
private final Set<ColumnsInPredicate> columnsInUnenforcedPredicate;
private final ExecutionFailureInfo failureInfo;
private final ErrorType errorType;
private final ErrorCode errorCode;
Expand Down Expand Up @@ -111,6 +112,7 @@ public QueryInfo(
@JsonProperty("referencedTables") List<TableInfo> referencedTables,
@JsonProperty("routines") List<RoutineInfo> routines,
@JsonProperty("finalQueryInfo") boolean finalQueryInfo,
@JsonProperty("columnsInUnenforcedPredicate") Set<ColumnsInPredicate> columnsInUnenforcedPredicate,
@JsonProperty("resourceGroupId") Optional<ResourceGroupId> resourceGroupId,
@JsonProperty("queryType") Optional<QueryType> queryType,
@JsonProperty("retryPolicy") RetryPolicy retryPolicy)
Expand All @@ -136,6 +138,7 @@ public QueryInfo(
requireNonNull(output, "output is null");
requireNonNull(referencedTables, "referencedTables is null");
requireNonNull(routines, "routines is null");
requireNonNull(columnsInUnenforcedPredicate, "columnsInTableScanPredicate is null");
requireNonNull(resourceGroupId, "resourceGroupId is null");
requireNonNull(warnings, "warnings is null");
requireNonNull(queryType, "queryType is null");
Expand Down Expand Up @@ -171,6 +174,7 @@ public QueryInfo(
this.routines = ImmutableList.copyOf(routines);
this.finalQueryInfo = finalQueryInfo;
checkArgument(!finalQueryInfo || state.isDone(), "finalQueryInfo without a terminal query state");
this.columnsInUnenforcedPredicate = ImmutableSet.copyOf(columnsInUnenforcedPredicate);
this.resourceGroupId = resourceGroupId;
this.queryType = queryType;
this.retryPolicy = retryPolicy;
Expand Down Expand Up @@ -360,6 +364,12 @@ public List<RoutineInfo> getRoutines()
return routines;
}

@JsonProperty
public Set<ColumnsInPredicate> getColumnsInUnenforcedPredicate()
{
return columnsInUnenforcedPredicate;
}

@JsonProperty
public Optional<ResourceGroupId> getResourceGroupId()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public class QueryStateMachine
private final AtomicReference<Optional<Output>> output = new AtomicReference<>(Optional.empty());
private final AtomicReference<List<TableInfo>> referencedTables = new AtomicReference<>(ImmutableList.of());
private final AtomicReference<List<RoutineInfo>> routines = new AtomicReference<>(ImmutableList.of());
private final AtomicReference<Set<ColumnsInPredicate>> columnsInUnenforcedPredicate = new AtomicReference<>(ImmutableSet.of());
private final StateMachine<Optional<QueryInfo>> finalQueryInfo;

private final WarningCollector warningCollector;
Expand Down Expand Up @@ -482,6 +483,7 @@ QueryInfo getQueryInfo(Optional<StageInfo> rootStage)
referencedTables.get(),
routines.get(),
finalInfo,
columnsInUnenforcedPredicate.get(),
Optional.of(resourceGroup),
queryType,
getRetryPolicy(session));
Expand Down Expand Up @@ -775,6 +777,12 @@ public void setRoutines(List<RoutineInfo> routines)
this.routines.set(ImmutableList.copyOf(routines));
}

public void setColumnsInUnenforcedPredicate(Set<ColumnsInPredicate> columnsInUnenforcedPredicate)
{
requireNonNull(columnsInUnenforcedPredicate, "columnsInUnenforcedPredicate is null");
this.columnsInUnenforcedPredicate.set(ImmutableSet.copyOf(columnsInUnenforcedPredicate));
}

private DynamicFiltersStats getDynamicFiltersStats()
{
synchronized (dynamicFiltersStatsSupplierLock) {
Expand Down Expand Up @@ -1235,6 +1243,7 @@ public void pruneQueryInfo()
queryInfo.getReferencedTables(),
queryInfo.getRoutines(),
queryInfo.isFinalQueryInfo(),
queryInfo.getColumnsInUnenforcedPredicate(),
queryInfo.getResourceGroupId(),
queryInfo.getQueryType(),
queryInfo.getRetryPolicy());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.SetThreadName;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.Session;
Expand Down Expand Up @@ -49,6 +50,7 @@
import io.trino.sql.analyzer.Analysis;
import io.trino.sql.analyzer.Analyzer;
import io.trino.sql.analyzer.AnalyzerFactory;
import io.trino.sql.planner.ColumnsInUnenforcedPredicateExtractor;
import io.trino.sql.planner.InputExtractor;
import io.trino.sql.planner.LogicalPlanner;
import io.trino.sql.planner.NodePartitioningManager;
Expand All @@ -74,6 +76,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -102,6 +105,8 @@
public class SqlQueryExecution
implements QueryExecution
{
private static final Logger LOG = Logger.get(SqlQueryExecution.class);

private final QueryStateMachine stateMachine;
private final Slug slug;
private final PlannerContext plannerContext;
Expand Down Expand Up @@ -469,6 +474,11 @@ private PlanRoot doPlanQuery()
Plan plan = logicalPlanner.plan(analysis);
queryPlan.set(plan);

// extract columns in table scan predicates
Set<ColumnsInPredicate> columnsInUnenforcedPredicate =
new ColumnsInUnenforcedPredicateExtractor(plannerContext.getMetadata(), stateMachine.getSession()).extract(plan);
stateMachine.setColumnsInUnenforcedPredicate(columnsInUnenforcedPredicate);

// fragment the plan
SubPlan fragmentedPlan = planFragmenter.createSubPlans(stateMachine.getSession(), plan, false, stateMachine.getWarningCollector());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public Optional<DiscretePredicates> getDiscretePredicates()
return tableProperties.getDiscretePredicates();
}

public Optional<TupleDomain<ColumnHandle>> getUnenforcedPredicates()
{
return tableProperties.getUnenforcedPredicates();
}

public static class TablePartitioning
{
private final PartitioningHandle partitioningHandle;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.sql.planner;

import io.trino.Session;
import io.trino.execution.Column;
import io.trino.execution.ColumnsInPredicate;
import io.trino.metadata.Metadata;
import io.trino.metadata.QualifiedObjectName;
import io.trino.metadata.TableHandle;
import io.trino.metadata.TableSchema;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.predicate.ValueSet;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.PlanVisitor;
import io.trino.sql.planner.plan.TableScanNode;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import static com.google.common.collect.ImmutableSet.toImmutableSet;

public class ColumnsInUnenforcedPredicateExtractor
{
private final Metadata metadata;
private final Session session;

public ColumnsInUnenforcedPredicateExtractor(Metadata metadata, Session session)
{
this.metadata = metadata;
this.session = session;
}

public Set<ColumnsInPredicate> extract(Plan plan)
{
Visitor visitor = new Visitor();
plan.getRoot().accept(visitor, null);
return visitor.getColumnsInPredicateSet();
}

private class Visitor
extends PlanVisitor<Void, Void>
{
private final Map<QualifiedObjectName, ColumnsInPredicate> columnsInPredicateMap = new HashMap<>();

public Set<ColumnsInPredicate> getColumnsInPredicateSet()
{
return columnsInPredicateMap.values().stream().collect(toImmutableSet());
}

@Override
public Void visitTableScan(TableScanNode node, Void context)
{
TableHandle tableHandle = node.getTable();

Set<Column> colsInDiscretePredicate = new HashSet<>();
Set<Column> colsInRangePredicate = new HashSet<>();
metadata.getTableProperties(session, tableHandle)
.getUnenforcedPredicates()
.flatMap(TupleDomain::getDomains)
.ifPresent(domains ->
domains.forEach(((columnHandle, domain) -> {
ValueSet valueSet = domain.getValues();
if (!valueSet.isAll() && !valueSet.isNone()) {
ColumnMetadata columnMetadata = metadata.getColumnMetadata(session, tableHandle, columnHandle);
Column column = new Column(columnMetadata.getName(), columnMetadata.getType().toString());
if (valueSet.isDiscreteSet()) {
colsInDiscretePredicate.add(column);
}
else {
colsInRangePredicate.add(column);
}
}
})));

// merge result from same source table
TableSchema tableSchema = metadata.getTableSchema(session, tableHandle);
QualifiedObjectName qualifiedTableName = tableSchema.getQualifiedName();
ColumnsInPredicate previous = columnsInPredicateMap.get(qualifiedTableName);
if (previous != null) {
colsInDiscretePredicate.addAll(previous.getColsInDiscretePredicate());
colsInRangePredicate.addAll(previous.getColsInRangePredicate());
}

// put into map even if this node has no unenforced predicates
// so that we can obtain all source tables from the result
columnsInPredicateMap.put(
qualifiedTableName,
new ColumnsInPredicate(
qualifiedTableName.getCatalogName(),
qualifiedTableName.getSchemaName(),
qualifiedTableName.getObjectName(),
colsInDiscretePredicate,
colsInRangePredicate));
return null;
}

@Override
protected Void visitPlan(PlanNode node, Void context)
{
for (PlanNode child : node.getSources()) {
child.accept(this, context);
}
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ public QueryInfo getFullQueryInfo()
ImmutableList.of(),
ImmutableList.of(),
state.isDone(),
ImmutableSet.of(),
Optional.empty(),
Optional.empty(),
RetryPolicy.NONE);
Expand Down
Loading

0 comments on commit 027234a

Please sign in to comment.