Skip to content

Commit

Permalink
fix case
Browse files Browse the repository at this point in the history
  • Loading branch information
starocean999 committed Mar 13, 2024
1 parent 295306b commit bf78e19
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1033,9 +1033,44 @@ public PlanFragment visitPhysicalAssertNumRows(PhysicalAssertNumRows<? extends P
// create assertNode
AssertNumRowsNode assertNumRowsNode = new AssertNumRowsNode(context.nextPlanNodeId(),
currentFragment.getPlanRoot(),
ExpressionTranslator.translateAssert(assertNumRows.getAssertNumRowsElement()));
ExpressionTranslator.translateAssert(assertNumRows.getAssertNumRowsElement()), true);
assertNumRowsNode.setChildrenDistributeExprLists(distributeExprLists);
assertNumRowsNode.setNereidsId(assertNumRows.getId());

// we need convert all columns to nullable in AssertNumRows node
// create project exprs same as child's output
List<Expr> projectionExprs = assertNumRows.child().getOutput()
.stream()
.map(e -> ExpressionTranslator.translate(e, context))
.collect(Collectors.toList());
assertNumRowsNode.setProjectList(projectionExprs);

// create a output tuple
TupleDescriptor outputTupleDescriptor = context.generateTupleDesc();
List<SlotDescriptor> outputSlotDescriptor = Lists.newArrayList();

// collect all child output slots
List<TupleDescriptor> childTuples = context.getTupleDesc(currentFragment.getPlanRoot());
List<SlotDescriptor> childSlotDescriptors = childTuples.stream()
.map(TupleDescriptor::getSlots)
.flatMap(Collection::stream)
.collect(Collectors.toList());

// create output slot based on child output
Map<ExprId, SlotReference> childOutputMap = Maps.newHashMap();
assertNumRows.child().getOutput().stream()
.map(SlotReference.class::cast)
.forEach(s -> childOutputMap.put(s.getExprId(), s));
for (SlotDescriptor slot : childSlotDescriptors) {
SlotReference sf = childOutputMap.get(context.findExprId(slot.getId()));
SlotDescriptor sd = context.createSlotDesc(outputTupleDescriptor, sf, slot.getParent().getTable());
outputSlotDescriptor.add(sd);
}

// set all output slot nullable
outputSlotDescriptor.forEach(sd -> sd.setIsNullable(true));
assertNumRowsNode.setOutputTupleDesc(outputTupleDescriptor);

addPlanRoot(currentFragment, assertNumRowsNode, assertNumRows);
return currentFragment;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,22 @@ private Plan skipPlan(Plan plan) {

private boolean canEliminate(LogicalAssertNumRows<?> assertNumRows, Plan plan) {
long maxOutputRowcount;
AssertNumRowsElement assertNumRowsElement = assertNumRows.getAssertNumRowsElement();
Assertion assertion = assertNumRowsElement.getAssertion();
long assertNum = assertNumRowsElement.getDesiredNumOfRows();
// Don't need to consider TopN, because it's generated by Sort + Limit.
if (plan instanceof LogicalLimit) {
maxOutputRowcount = ((LogicalLimit<?>) plan).getLimit();
} else if (plan instanceof LogicalAggregate && ((LogicalAggregate<?>) plan).getGroupByExpressions().isEmpty()) {
maxOutputRowcount = 1;
if (assertion == Assertion.EQ && assertNum == 1) {
return true;
} else {
maxOutputRowcount = 1;
}
} else {
return false;
}

AssertNumRowsElement assertNumRowsElement = assertNumRows.getAssertNumRowsElement();
Assertion assertion = assertNumRowsElement.getAssertion();
long assertNum = assertNumRowsElement.getDesiredNumOfRows();

switch (assertion) {
case NE:
case LT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,14 @@ public class AssertNumRowsNode extends PlanNode {
private String subqueryString;
private AssertNumRowsElement.Assertion assertion;

private boolean shouldConvertOutputToNullable = false;

public AssertNumRowsNode(PlanNodeId id, PlanNode input, AssertNumRowsElement assertNumRowsElement) {
this(id, input, assertNumRowsElement, false);
}

public AssertNumRowsNode(PlanNodeId id, PlanNode input, AssertNumRowsElement assertNumRowsElement,
boolean convertToNullable) {
super(id, "ASSERT NUMBER OF ROWS", StatisticalType.ASSERT_NUM_ROWS_NODE);
this.desiredNumOfRows = assertNumRowsElement.getDesiredNumOfRows();
this.subqueryString = assertNumRowsElement.getSubqueryString();
Expand All @@ -56,6 +63,7 @@ public AssertNumRowsNode(PlanNodeId id, PlanNode input, AssertNumRowsElement ass
}
this.tblRefIds.addAll(input.getTblRefIds());
this.nullableTupleIds.addAll(input.getNullableTupleIds());
this.shouldConvertOutputToNullable = convertToNullable;
}

@Override
Expand Down Expand Up @@ -94,6 +102,7 @@ protected void toThrift(TPlanNode msg) {
msg.assert_num_rows_node.setDesiredNumRows(desiredNumOfRows);
msg.assert_num_rows_node.setSubqueryString(subqueryString);
msg.assert_num_rows_node.setAssertion(assertion.toThrift());
msg.assert_num_rows_node.setShouldConvertOutputToNullable(shouldConvertOutputToNullable);
}

@Override
Expand Down
5 changes: 3 additions & 2 deletions gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ struct TTrinoConnectorFileDesc {
5: optional string trino_connector_table_handle
6: optional string trino_connector_column_handles
7: optional string trino_connector_column_metadata
8: optional string trino_connector_column_names // not used
8: optional string trino_connector_column_names
9: optional string trino_connector_split
10: optional string trino_connector_predicate
11: optional string trino_connector_trascation_handle
Expand Down Expand Up @@ -511,7 +511,7 @@ struct TTasksMetadataParams {

struct TQueriesMetadataParams {
1: optional string cluster_name
2: optional bool relay_to_other_fe
2: optional bool relay_to_other_fe
3: optional TMaterializedViewsMetadataParams materialized_views_params
4: optional TJobsMetadataParams jobs_params
5: optional TTasksMetadataParams tasks_params
Expand Down Expand Up @@ -1134,6 +1134,7 @@ struct TAssertNumRowsNode {
1: optional i64 desired_num_rows;
2: optional string subquery_string;
3: optional TAssertion assertion;
4: optional bool should_convert_output_to_nullable;
}

enum TRuntimeFilterType {
Expand Down

0 comments on commit bf78e19

Please sign in to comment.