Skip to content

Commit

Permalink
[BEAM-13099] Update all rels to use BeamRelMetadataQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
TheNeuralBit committed Oct 28, 2021
1 parent 0e5451f commit 60eaa82
Show file tree
Hide file tree
Showing 31 changed files with 138 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelMetadataQuery;
import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.plan.RelOptCluster;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.plan.RelOptPlanner;
Expand Down Expand Up @@ -55,7 +56,7 @@ public int getLimitCountOfSortRel() {
}

@Override
public NodeStats estimateNodeStats(RelMetadataQuery mq) {
public NodeStats estimateNodeStats(BeamRelMetadataQuery mq) {
NodeStats inputStat = BeamSqlRelUtils.getNodeStats(input, mq);
double selectivity = estimateFilterSelectivity(getInput(), program, mq);

Expand All @@ -78,7 +79,7 @@ private static double estimateFilterSelectivity(
}

@Override
public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, BeamRelMetadataQuery mq) {
NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq);
return BeamCostModel.FACTORY.makeCost(inputStat.getRowCount(), inputStat.getRate());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelMetadataQuery;
import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.transforms.Group;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
Expand All @@ -58,7 +58,6 @@
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.RelWriter;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.core.Aggregate;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.core.AggregateCall;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.util.ImmutableBitSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -91,7 +90,7 @@ public BeamAggregationRel(
}

@Override
public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, BeamRelMetadataQuery mq) {

NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq);
inputStat = computeWindowingCostEffect(inputStat);
Expand All @@ -111,7 +110,7 @@ public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery
}

@Override
public NodeStats estimateNodeStats(RelMetadataQuery mq) {
public NodeStats estimateNodeStats(BeamRelMetadataQuery mq) {

NodeStats inputEstimate = BeamSqlRelUtils.getNodeStats(this.input, mq);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelMetadataQuery;
import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.impl.rule.BeamIOSinkRule;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
Expand All @@ -39,7 +40,6 @@
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.prepare.Prepare;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.RelNode;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.core.TableModify;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rex.RexNode;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.sql2rel.RelStructuredTypeFlattener;

Expand Down Expand Up @@ -77,12 +77,12 @@ public BeamIOSinkRel(
}

@Override
public NodeStats estimateNodeStats(RelMetadataQuery mq) {
public NodeStats estimateNodeStats(BeamRelMetadataQuery mq) {
return BeamSqlRelUtils.getNodeStats(this.input, mq);
}

@Override
public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, BeamRelMetadataQuery mq) {
NodeStats inputEstimates = BeamSqlRelUtils.getNodeStats(this.input, mq);
return BeamCostModel.FACTORY.makeCost(inputEstimates.getRowCount(), inputEstimates.getRate());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteTable;
import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelMetadataQuery;
import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
Expand Down Expand Up @@ -88,7 +89,7 @@ public double estimateRowCount(RelMetadataQuery mq) {
}

@Override
public NodeStats estimateNodeStats(RelMetadataQuery mq) {
public NodeStats estimateNodeStats(BeamRelMetadataQuery mq) {
BeamTableStatistics rowCountStatistics = calciteTable.getStatistic();
double window =
(beamTable.isBounded() == PCollection.IsBounded.BOUNDED)
Expand Down Expand Up @@ -130,7 +131,7 @@ public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
}

@Override
public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, BeamRelMetadataQuery mq) {
NodeStats estimates = BeamSqlRelUtils.getNodeStats(this, mq);
return BeamCostModel.FACTORY.makeCost(estimates.getRowCount(), estimates.getRate());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelMetadataQuery;
import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
Expand All @@ -30,7 +31,6 @@
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.RelNode;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.core.Intersect;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.core.SetOp;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.metadata.RelMetadataQuery;

/**
* {@code BeamRelNode} to replace a {@code Intersect} node.
Expand All @@ -55,7 +55,7 @@ public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
}

@Override
public NodeStats estimateNodeStats(RelMetadataQuery mq) {
public NodeStats estimateNodeStats(BeamRelMetadataQuery mq) {
// This takes the minimum of the inputs for all the estimate factors.
double minimumRows = Double.POSITIVE_INFINITY;
double minimumWindowSize = Double.POSITIVE_INFINITY;
Expand All @@ -72,7 +72,7 @@ public NodeStats estimateNodeStats(RelMetadataQuery mq) {
}

@Override
public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, BeamRelMetadataQuery mq) {

NodeStats inputsStatSummation =
inputs.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Set;
import org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelMetadataQuery;
import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
import org.apache.beam.sdk.values.PCollection;
Expand All @@ -35,7 +36,6 @@
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.core.CorrelationId;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.core.Join;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.core.JoinRelType;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rex.RexCall;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rex.RexFieldAccess;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rex.RexInputRef;
Expand Down Expand Up @@ -111,7 +111,7 @@ public static boolean seekable(BeamRelNode relNode) {
}

@Override
public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, BeamRelMetadataQuery mq) {
NodeStats leftEstimates = BeamSqlRelUtils.getNodeStats(this.left, mq);
NodeStats rightEstimates = BeamSqlRelUtils.getNodeStats(this.right, mq);
NodeStats selfEstimates = BeamSqlRelUtils.getNodeStats(this, mq);
Expand All @@ -120,7 +120,7 @@ public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery
}

@Override
public NodeStats estimateNodeStats(RelMetadataQuery mq) {
public NodeStats estimateNodeStats(BeamRelMetadataQuery mq) {
double selectivity = mq.getSelectivity(this, getCondition());
NodeStats leftEstimates = BeamSqlRelUtils.getNodeStats(this.left, mq);
NodeStats rightEstimates = BeamSqlRelUtils.getNodeStats(this.right, mq);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.beam.sdk.extensions.sql.impl.cep.OrderKey;
import org.apache.beam.sdk.extensions.sql.impl.nfa.NFA;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelMetadataQuery;
import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.schemas.Schema;
Expand All @@ -54,7 +55,6 @@
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.RelCollation;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.RelNode;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.core.Match;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.type.RelDataType;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rex.RexCall;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rex.RexNode;
Expand Down Expand Up @@ -110,12 +110,12 @@ public BeamMatchRel(
}

@Override
public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, BeamRelMetadataQuery mq) {
return BeamCostModel.FACTORY.makeTinyCost(); // return constant costModel for now
}

@Override
public NodeStats estimateNodeStats(RelMetadataQuery mq) {
public NodeStats estimateNodeStats(BeamRelMetadataQuery mq) {
// a simple way of getting some estimate data
// to be examined further
NodeStats inputEstimate = BeamSqlRelUtils.getNodeStats(input, mq);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelMetadataQuery;
import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
Expand All @@ -30,7 +31,6 @@
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.RelNode;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.core.Minus;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.core.SetOp;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.metadata.RelMetadataQuery;

/**
* {@code BeamRelNode} to replace a {@code Minus} node.
Expand All @@ -45,7 +45,7 @@ public BeamMinusRel(
}

@Override
public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, BeamRelMetadataQuery mq) {
NodeStats inputsEstimatesSummation =
inputs.stream()
.map(input -> BeamSqlRelUtils.getNodeStats(input, mq))
Expand All @@ -66,7 +66,7 @@ public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
}

@Override
public NodeStats estimateNodeStats(RelMetadataQuery mq) {
public NodeStats estimateNodeStats(BeamRelMetadataQuery mq) {
NodeStats firstInputEstimates = BeamSqlRelUtils.getNodeStats(inputs.get(0), mq);
// The first input minus half of the others. (We are assuming half of them have intersection)
for (int i = 1; i < inputs.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteTable;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelMetadataQuery;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
Expand All @@ -39,7 +40,6 @@
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.plan.RelOptTable;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.plan.RelTraitSet;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.RelWriter;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.metadata.RelMetadataQuery;

public class BeamPushDownIOSourceRel extends BeamIOSourceRel {
private final List<String> usedFields;
Expand Down Expand Up @@ -105,7 +105,7 @@ public PCollection<Row> expand(PCollectionList<Row> input) {
}

@Override
public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, BeamRelMetadataQuery mq) {
BeamCostModel parentCost = super.beamComputeSelfCost(planner, mq);
Preconditions.checkArgument(parentCost.getCpu() >= 0, "Cpu cost must be zero or positive.");
// Table schema will always contain all fields, while usedFields may contain less fields due to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelMetadataQuery;
import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.plan.RelOptPlanner;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.RelNode;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.metadata.RelMetadataQuery;

/** A {@link RelNode} that can also give a {@link PTransform} that implements the expression. */
@SuppressWarnings({
Expand Down Expand Up @@ -80,7 +80,7 @@ default Map<String, String> getPipelineOptions() {
* org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.metadata.RelMetadataQuery)}
* instead.
*/
NodeStats estimateNodeStats(RelMetadataQuery mq);
NodeStats estimateNodeStats(BeamRelMetadataQuery mq);

/**
* This method is called by {@code
Expand All @@ -91,5 +91,5 @@ default Map<String, String> getPipelineOptions() {
* will call this method instead of ComputeSelfCost if the handler is set correctly (see {@code
* org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner#convertToBeamRel(String)})
*/
BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq);
BeamCostModel beamComputeSelfCost(RelOptPlanner planner, BeamRelMetadataQuery mq);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelMetadataQuery;
import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.schemas.Schema.FieldType;
Expand Down Expand Up @@ -58,7 +59,6 @@
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.RelFieldCollation;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.RelNode;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.core.Sort;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rex.RexInputRef;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rex.RexLiteral;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rex.RexNode;
Expand Down Expand Up @@ -143,13 +143,13 @@ public BeamSortRel(
}

@Override
public NodeStats estimateNodeStats(RelMetadataQuery mq) {
public NodeStats estimateNodeStats(BeamRelMetadataQuery mq) {
// Sorting does not change rate or row count of the input.
return BeamSqlRelUtils.getNodeStats(this.input, mq);
}

@Override
public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, BeamRelMetadataQuery mq) {
NodeStats inputEstimates = BeamSqlRelUtils.getNodeStats(this.input, mq);

final double rowSize = getRowType().getFieldCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.beam.sdk.extensions.sql.impl.TVFSlidingWindowFn;
import org.apache.beam.sdk.extensions.sql.impl.ZetaSqlUserDefinedSQLNativeTableValuedFunction;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelMetadataQuery;
import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.impl.utils.TVFStreamingUtils;
Expand Down Expand Up @@ -57,7 +58,6 @@
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.RelNode;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.core.TableFunctionScan;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.metadata.RelColumnMapping;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.type.RelDataType;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rex.RexCall;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rex.RexInputRef;
Expand Down Expand Up @@ -349,12 +349,12 @@ public void processElement(
}

@Override
public NodeStats estimateNodeStats(RelMetadataQuery mq) {
public NodeStats estimateNodeStats(BeamRelMetadataQuery mq) {
return BeamSqlRelUtils.getNodeStats(getInput(0), mq);
}

@Override
public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, BeamRelMetadataQuery mq) {
NodeStats inputEstimates = BeamSqlRelUtils.getNodeStats(getInput(0), mq);

final double rowSize = getRowType().getFieldCount();
Expand Down
Loading

0 comments on commit 60eaa82

Please sign in to comment.