Skip to content

Commit

Permalink
Add support iceberg parquet predicate pushdown with column id
Browse files Browse the repository at this point in the history
  • Loading branch information
Heltman authored and raunaqmorarka committed Jan 11, 2024
1 parent 289cbca commit 05f81ee
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
import static java.util.function.Predicate.not;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
Expand Down Expand Up @@ -1471,12 +1472,14 @@ private static TupleDomain<ColumnDescriptor> getParquetTupleDomain(Map<List<Stri
return TupleDomain.none();
}

Map<Integer, ColumnDescriptor> descriptorsById = descriptorsByPath.values().stream()
.collect(toImmutableMap(descriptor -> descriptor.getPrimitiveType().getId().intValue(), identity()));
ImmutableMap.Builder<ColumnDescriptor, Domain> predicate = ImmutableMap.builder();
effectivePredicate.getDomains().orElseThrow().forEach((columnHandle, domain) -> {
String baseType = columnHandle.getType().getTypeSignature().getBase();
// skip looking up predicates for complex types as Parquet only stores stats for primitives
if (columnHandle.isBaseColumn() && (!baseType.equals(StandardTypes.MAP) && !baseType.equals(StandardTypes.ARRAY) && !baseType.equals(StandardTypes.ROW))) {
ColumnDescriptor descriptor = descriptorsByPath.get(ImmutableList.of(columnHandle.getName()));
ColumnDescriptor descriptor = descriptorsById.get(columnHandle.getId());
if (descriptor != null) {
predicate.put(descriptor, domain);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,36 @@ public void testIgnoreParquetStatistics()
}
}

@Test
public void testPushdownPredicateToParquetAfterColumnRename()
{
try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_pushdown_predicate_statistics",
"WITH (sorted_by = ARRAY['custkey']) AS TABLE tpch.tiny.customer WITH NO DATA")) {
assertUpdate(
withSmallRowGroups(getSession()),
"INSERT INTO " + table.getName() + " TABLE tpch.tiny.customer",
"VALUES 1500");

assertUpdate("ALTER TABLE " + table.getName() + " RENAME COLUMN custkey TO custkey1");

DistributedQueryRunner queryRunner = getDistributedQueryRunner();
MaterializedResultWithQueryId resultWithoutPredicate = queryRunner.executeWithQueryId(getSession(), "TABLE " + table.getName());
OperatorStats queryStatsWithoutPredicate = getOperatorStats(resultWithoutPredicate.getQueryId());
assertThat(queryStatsWithoutPredicate.getPhysicalInputPositions()).isGreaterThan(0);
assertThat(resultWithoutPredicate.getResult()).hasSize(1500);

@Language("SQL") String selectiveQuery = "SELECT * FROM " + table.getName() + " WHERE custkey1 = 100";
MaterializedResultWithQueryId selectiveQueryResult = queryRunner.executeWithQueryId(getSession(), selectiveQuery);
OperatorStats queryStatsSelectiveQuery = getOperatorStats(selectiveQueryResult.getQueryId());
assertThat(queryStatsSelectiveQuery.getPhysicalInputPositions()).isGreaterThan(0);
assertThat(queryStatsSelectiveQuery.getPhysicalInputPositions())
.isLessThan(queryStatsWithoutPredicate.getPhysicalInputPositions());
assertThat(selectiveQueryResult.getResult()).hasSize(1);
}
}

@Override
protected boolean isFileSorted(String path, String sortColumnName)
{
Expand Down

0 comments on commit 05f81ee

Please sign in to comment.