Skip to content

Commit

Permalink
[Enhancement] pushdown or predicate (8): support unused_columns_in_sc…
Browse files Browse the repository at this point in the history
…an_stage (#46140)

Signed-off-by: zihe.liu <[email protected]>
  • Loading branch information
ZiheLiu authored May 24, 2024
1 parent 0e1816f commit bac2d92
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 64 deletions.
10 changes: 6 additions & 4 deletions fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,16 @@ public void updateAppliedDictStringColumns(Set<Integer> appliedColumnIds) {
}
}

public void setUnUsedOutputStringColumns(Set<Integer> unUsedOutputColumnIds,
Set<String> aggOrPrimaryKeyTableValueColumnNames) {
public List<SlotDescriptor> getSlots() {
return desc.getSlots();
}

public void setUnUsedOutputStringColumns(Set<Integer> unUsedOutputColumnIds) {
for (SlotDescriptor slot : desc.getSlots()) {
if (!slot.isMaterialized()) {
continue;
}
if (unUsedOutputColumnIds.contains(slot.getId().asInt()) &&
!aggOrPrimaryKeyTableValueColumnNames.contains(slot.getColumn().getName())) {
if (unUsedOutputColumnIds.contains(slot.getId().asInt())) {
unUsedOutputStringColumns.add(slot.getColumn().getName());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ public static boolean cannotApplyDictOptimize(ScalarOperator operator, Set<Integ
return !couldApplyCtx.canDictOptBeApplied && couldApplyCtx.stopOptPropagateUpward;
}

public static boolean isSimpleStrictPredicate(ScalarOperator operator) {
return operator.accept(new IsSimpleStrictPredicateVisitor(), null);
public static boolean isSimpleStrictPredicate(ScalarOperator operator, boolean enablePushdownOrPredicate) {
return operator.accept(new IsSimpleStrictPredicateVisitor(enablePushdownOrPredicate), null);
}

private void visitProjectionBefore(OptExpression optExpression, DecodeContext context) {
Expand Down Expand Up @@ -1199,14 +1199,30 @@ public Void visitCompoundPredicate(CompoundPredicateOperator predicate, CouldApp
// The predicate no function all, this implementation is consistent with BE olap scan node
private static class IsSimpleStrictPredicateVisitor extends ScalarOperatorVisitor<Boolean, Void> {

public IsSimpleStrictPredicateVisitor() {
private final boolean enablePushDownOrPredicate;

public IsSimpleStrictPredicateVisitor(boolean enablePushDownOrPredicate) {
this.enablePushDownOrPredicate = enablePushDownOrPredicate;
}

@Override
public Boolean visit(ScalarOperator scalarOperator, Void context) {
return false;
}

@Override
public Boolean visitCompoundPredicate(CompoundPredicateOperator predicate, Void context) {
if (!enablePushDownOrPredicate) {
return false;
}

if (!predicate.isAnd() && !predicate.isOr()) {
return false;
}

return predicate.getChildren().stream().allMatch(child -> child.accept(this, context));
}

@Override
public Boolean visitBinaryPredicate(BinaryPredicateOperator predicate, Void context) {
if (predicate.getBinaryType() == EQ_FOR_NULL) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,71 +470,102 @@ public PlanFragment visit(OptExpression optExpression, ExecPlan context) {
return fragment;
}

/**
* Set the columns which do not need to be output by ScanNode. They are columns which are only used in pushdownable
* predicates rather than columns which are used in non-pushdownable predicates and output columns.
*
* <p> The columns that can be pushed down need to meet:
* <ul>
* <li> All the columns of duplicate-key model.
* <li> Keys of primary-key model.
* <li> Keys of agg-key model (aggregation/unique_key model) in the skip-aggr scan stage.
* </ul>
*
* <p> The predicates that can be pushed down need to meet:
* <ul>
* <li> Simple single-column predicates, and the column can be pushed down, such as a = v1.
* <li> AND and OR predicates, and the sub-predicates are all pushable, such as a = v1 OR (b = v2 AND c = v3).
* <li> The other predicates are not pushable, such as a + b = v1, a = v1 OR a + b = v2.
* </ul>
*
* <p> The columns in the predicates that cannot be pushed down need to be retained, because these columns are
* used in the stage after scan.
*/
private void setUnUsedOutputColumns(PhysicalOlapScanOperator node, OlapScanNode scanNode,
List<ScalarOperator> predicates, OlapTable referenceTable) {
if (!ConnectContext.get().getSessionVariable().isEnableFilterUnusedColumnsInScanStage()) {
SessionVariable sessionVariable = ConnectContext.get().getSessionVariable();
if (!sessionVariable.isEnableFilterUnusedColumnsInScanStage()) {
return;
}

// Key columns and value columns cannot be pruned in the non-skip-aggr scan stage.
// - All the keys columns must be retained to merge and aggregate rows.
// - Value columns can only be used after merging and aggregating.
MaterializedIndexMeta materializedIndexMeta =
referenceTable.getIndexMetaByIndexId(node.getSelectedIndexId());
MaterializedIndexMeta materializedIndexMeta = referenceTable.getIndexMetaByIndexId(node.getSelectedIndexId());
if (materializedIndexMeta.getKeysType().isAggregationFamily() && !node.isPreAggregation()) {
return;
}

List<ColumnRefOperator> outputColumns = node.getOutputColumns();
// if outputColumns is empty, skip this optimization
if (outputColumns.isEmpty()) {
return;
// ------------------------------------------------------------------------------------
// Get outputColumnIds.
// ------------------------------------------------------------------------------------
Set<Integer> outputColumnIds = node.getOutputColumns().stream()
.map(ColumnRefOperator::getId)
.collect(Collectors.toSet());
// Empty outputColumnIds means that the expression after ScanNode does not need any column from ScanNode.
// However, at least one column needs to be output, so choose any column as the output column.
if (outputColumnIds.isEmpty()) {
if (!scanNode.getSlots().isEmpty()) {
outputColumnIds.add(scanNode.getSlots().get(0).getId().asInt());
}
}
Set<Integer> outputColumnIds = new HashSet<Integer>();
for (ColumnRefOperator colref : outputColumns) {
outputColumnIds.add(colref.getId());
}

// NOTE:
// - only support push down single predicate(eg, a = xx) to scan node.
// - only keys in agg-key model (aggregation/unique_key model) and primary-key model can be included in
// the unused
// columns.
// - complex pred(eg, a + b = xx) can not be pushed down to scan node yet.
// so the columns in complex predicate are useful for the stage after scan.
Set<Integer> singlePredColumnIds = new HashSet<Integer>();
Set<Integer> complexPredColumnIds = new HashSet<Integer>();
Set<String> aggOrPrimaryKeyTableValueColumnNames = new HashSet<String>();

// ------------------------------------------------------------------------------------
// Get nonPushdownColumnIds.
// ------------------------------------------------------------------------------------
Set<Integer> nonPushdownColumnIds = Collections.emptySet();
if (materializedIndexMeta.getKeysType().isAggregationFamily() ||
materializedIndexMeta.getKeysType() == KeysType.PRIMARY_KEYS) {
aggOrPrimaryKeyTableValueColumnNames =
materializedIndexMeta.getSchema().stream()
.filter(col -> !col.isKey())
.map(Column::getName)
.collect(Collectors.toSet());
}

Map<String, Integer> columnNameToId = scanNode.getSlots().stream().collect(Collectors.toMap(
slot -> slot.getColumn().getName(),
slot -> slot.getId().asInt()
));
nonPushdownColumnIds = materializedIndexMeta.getSchema().stream()
.filter(col -> !col.isKey())
.map(Column::getName)
.map(columnNameToId::get)
.collect(Collectors.toSet());
}

// ------------------------------------------------------------------------------------
// Get pushdownPredUsedColumnIds and nonPushdownPredUsedColumnIds.
// ------------------------------------------------------------------------------------
Set<Integer> pushdownPredUsedColumnIds = new HashSet<>();
Set<Integer> nonPushdownPredUsedColumnIds = new HashSet<>();
for (ScalarOperator predicate : predicates) {
ColumnRefSet usedColumns = predicate.getUsedColumns();
if (DecodeVisitor.isSimpleStrictPredicate(predicate)) {
boolean isPushdown =
DecodeVisitor.isSimpleStrictPredicate(predicate, sessionVariable.isEnablePushdownOrPredicate())
&& Arrays.stream(usedColumns.getColumnIds()).noneMatch(nonPushdownColumnIds::contains);
if (isPushdown) {
for (int cid : usedColumns.getColumnIds()) {
singlePredColumnIds.add(cid);
pushdownPredUsedColumnIds.add(cid);
}
} else {
for (int cid : usedColumns.getColumnIds()) {
complexPredColumnIds.add(cid);
nonPushdownPredUsedColumnIds.add(cid);
}
}
}

Set<Integer> unUsedOutputColumnIds = new HashSet<>();
for (Integer newCid : singlePredColumnIds) {
if (!complexPredColumnIds.contains(newCid) && !outputColumnIds.contains(newCid)) {
unUsedOutputColumnIds.add(newCid);
}
}

scanNode.setUnUsedOutputStringColumns(unUsedOutputColumnIds, aggOrPrimaryKeyTableValueColumnNames);
// ------------------------------------------------------------------------------------
// Get unUsedOutputColumnIds which are in pushdownPredUsedColumnIds
// but not in nonPushdownPredUsedColumnIds and outputColumnIds.
// ------------------------------------------------------------------------------------
Set<Integer> unUsedOutputColumnIds = pushdownPredUsedColumnIds.stream()
.filter(cid -> !nonPushdownPredUsedColumnIds.contains(cid) && !outputColumnIds.contains(cid))
.collect(Collectors.toSet());
scanNode.setUnUsedOutputStringColumns(unUsedOutputColumnIds);
}

@Override
Expand Down
Loading

0 comments on commit bac2d92

Please sign in to comment.