Skip to content

Commit

Permalink
Add missing column translation during agg pushdown in Pinot
Browse files Browse the repository at this point in the history
The connector was incorrectly relying on the engine-provided
variable name to represent the underlying column name. Variables
are synthetic names generated by the engine for the purpose of
mapping column assignments in and out of the call to the connector's
pushdown API, but they bear no relationship with the name of the
underlying columns.
  • Loading branch information
martint committed Mar 25, 2024
1 parent 7185ccb commit 7e84eea
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -476,8 +476,9 @@ private Optional<AggregateExpression> applyCountDistinct(ConnectorSession sessio
// otherwise do not push down the aggregation.
// This is to avoid count(column_name) being pushed into pinot, which is currently unsupported.
// Currently Pinot treats count(column_name) as count(*), i.e. it counts nulls.
PinotColumnHandle columnHandle = (PinotColumnHandle) assignments.get(argument.getName());
if (tableHandle.getQuery().isEmpty() || tableHandle.getQuery().get().getGroupingColumns().stream()
.noneMatch(groupingExpression -> groupingExpression.getColumnName().equals(argument.getName()))) {
.noneMatch(groupingExpression -> groupingExpression.getColumnName().equals(columnHandle.getColumnName()))) {
return Optional.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.trino.matching.Captures;
import io.trino.matching.Pattern;
import io.trino.plugin.base.aggregation.AggregateFunctionRule;
import io.trino.plugin.pinot.PinotColumnHandle;
import io.trino.plugin.pinot.query.AggregateExpression;
import io.trino.spi.connector.AggregateFunction;
import io.trino.spi.expression.Variable;
Expand Down Expand Up @@ -64,6 +65,7 @@ public Optional<AggregateExpression> rewrite(AggregateFunction aggregateFunction
}
Variable argument = captures.get(ARGUMENT);
verify(aggregateFunction.getOutputType() == BIGINT);
return Optional.of(new AggregateExpression("distinctcount", identifierQuote.apply(argument.getName()), false));
PinotColumnHandle column = (PinotColumnHandle) context.getAssignment(argument.getName());
return Optional.of(new AggregateExpression("distinctcount", identifierQuote.apply(column.getColumnName()), false));
}
}

0 comments on commit 7e84eea

Please sign in to comment.