Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Showcase dereference pushdown limited effectiveness in Iceberg #17145

Conversation

findinpath
Copy link
Contributor

@findinpath findinpath commented Apr 20, 2023

Description

As can be seen in the snippets below:

  • the physical input is the same for both queries Physical input: 1.20kB
  • the query selecting a nested field has Input: 1 row (5B) which is much less than the query doing the selection of the whole row Input: 1 row (50B)
trino:tiny> explain analyze select b.c3.d2.e2.f2 from iceberg.tiny.itest2;
                                                                             Query Plan                                       >
------------------------------------------------------------------------------------------------------------------------------>
 Trino version: dev                                                                                                           >
 Queued: 761.02us, Analysis: 27.10ms, Planning: 33.96ms, Execution: 536.67ms                                                  >
 Fragment 1 [SOURCE]                                                                                                          >
     CPU: 303.10ms, Scheduled: 372.58ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 1 row (5B); per task: avg.: 1.>
     Output layout: [b.c3.d2.e2.f2]                                                                                           >
     Output partitioning: SINGLE []                                                                                           >
     TableScan[table = iceberg:tiny.itest2$data@700374889882065389]                                                           >
         Layout: [b.c3.d2.e2.f2:integer]                                                                                      >
         Estimates: {rows: 1 (5B), cpu: 5, memory: 0B, network: 0B}                                                           >
         CPU: 303.00ms (100.00%), Scheduled: 372.00ms (100.00%), Blocked: 0.00ns (?%), Output: 1 row (5B)                     >
         Input avg.: 1.00 rows, Input std.dev.: 0.00%                                                                         >
         b.c3.d2.e2.f2 := 11:f2:integer                                                                                       >
         Input: 1 row (5B), Physical input: 1.20kB, Physical input time: 10120000.00ns                                        >
                                                                                                                              >
                                                                                                                              >
(1 row)
trino:tiny> explain analyze select b  from iceberg.tiny.itest2;
                                                                            Query Plan                                        >
------------------------------------------------------------------------------------------------------------------------------>
 Trino version: dev                                                                                                           >
 Queued: 211.84us, Analysis: 25.14ms, Planning: 27.73ms, Execution: 167.99ms                                                  >
 Fragment 1 [SOURCE]                                                                                                          >
     CPU: 7.18ms, Scheduled: 12.43ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 1 row (50B); per task: avg.: 1.00>
     Output layout: [b]                                                                                                       >
     Output partitioning: SINGLE []                                                                                           >
     TableScan[table = iceberg:tiny.itest2$data@700374889882065389]                                                           >
         Layout: [b:row(c1 integer, c2 integer, c3 row(d1 integer, d2 row(e1 integer, e2 row(f1 integer, f2 integer))))]      >
         Estimates: {rows: 1 (55B), cpu: 55, memory: 0B, network: 0B}                                                         >
         CPU: 7.00ms (100.00%), Scheduled: 12.00ms (100.00%), Blocked: 0.00ns (?%), Output: 1 row (50B)                       >
         Input avg.: 1.00 rows, Input std.dev.: 0.00%                                                                         >
         b := 2:b:row(c1 integer, c2 integer, c3 row(d1 integer, d2 row(e1 integer, e2 row(f1 integer, f2 integer))))         >
         Input: 1 row (50B), Physical input: 1.20kB, Physical input time: 7950000.00ns                                        >
                                                                                                                              >
                                                                                                                              >
(1 row)

This PR proves that the dereference pushdown in Iceberg has limited effectiveness.

The logic for building a stripped file schema MessageType (see

public static Optional<MessageType> getParquetMessageType(List<HiveColumnHandle> columns, boolean useColumnNames, MessageType fileSchema)
{
Optional<MessageType> message = projectSufficientColumns(columns)
.map(projection -> projection.get().stream()
.map(HiveColumnHandle.class::cast)
.collect(toUnmodifiableList()))
.orElse(columns).stream()
.filter(column -> column.getColumnType() == REGULAR)
.map(column -> getColumnType(column, fileSchema, useColumnNames))
.filter(Optional::isPresent)
.map(Optional::get)
.map(type -> new MessageType(fileSchema.getName(), type))
.reduce(MessageType::union);
return message;
}
public static Optional<org.apache.parquet.schema.Type> getColumnType(HiveColumnHandle column, MessageType messageType, boolean useParquetColumnNames)
{
Optional<org.apache.parquet.schema.Type> columnType = getBaseColumnParquetType(column, messageType, useParquetColumnNames);
if (columnType.isEmpty() || column.getHiveColumnProjectionInfo().isEmpty()) {
return columnType;
}
GroupType baseType = columnType.get().asGroupType();
ImmutableList.Builder<org.apache.parquet.schema.Type> typeBuilder = ImmutableList.builder();
org.apache.parquet.schema.Type parentType = baseType;
for (String name : column.getHiveColumnProjectionInfo().get().getDereferenceNames()) {
org.apache.parquet.schema.Type childType = getParquetTypeByName(name, parentType.asGroupType());
if (childType == null) {
return Optional.empty();
}
typeBuilder.add(childType);
parentType = childType;
}
List<org.apache.parquet.schema.Type> subfieldTypes = typeBuilder.build();
org.apache.parquet.schema.Type type = subfieldTypes.get(subfieldTypes.size() - 1);
for (int i = subfieldTypes.size() - 2; i >= 0; --i) {
GroupType groupType = subfieldTypes.get(i).asGroupType();
type = new GroupType(groupType.getRepetition(), groupType.getName(), ImmutableList.of(type));
}
return Optional.of(new GroupType(baseType.getRepetition(), baseType.getName(), ImmutableList.of(type)));
}
) seems to be missing from IcebergPageSourceFactory
Map<Integer, org.apache.parquet.schema.Type> parquetIdToField = fileSchema.getFields().stream()
.filter(field -> field.getId() != null)
.collect(toImmutableMap(field -> field.getId().intValue(), Function.identity()));
Optional<ReaderColumns> columnProjections = projectColumns(regularColumns);
List<IcebergColumnHandle> readColumns = columnProjections
.map(readerColumns -> (List<IcebergColumnHandle>) readerColumns.get().stream().map(IcebergColumnHandle.class::cast).collect(toImmutableList()))
.orElse(regularColumns);
List<org.apache.parquet.schema.Type> parquetFields = readColumns.stream()
.map(column -> parquetIdToField.get(column.getId()))
.collect(toList());
MessageType requestedSchema = new MessageType(fileSchema.getName(), parquetFields.stream().filter(Objects::nonNull).collect(toImmutableList()));

For this reason, the ParquetReader reads always all the primitive fields of the nested columns selected in the Iceberg connector.

tldr; In Iceberg, the Parquet file schema is built from base columns, while on Hive is built out of adaptation of the original base columns which contain Optional.empty() for what is not being read.

Release notes

( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
( ) Release notes are required, with the following suggested text:

# Section
* Fix some things. ({issue}`issuenumber`)

@cla-bot cla-bot bot added the cla-signed label Apr 20, 2023
@github-actions github-actions bot added the iceberg Iceberg connector label Apr 20, 2023
@findinpath findinpath force-pushed the findinpath/iceberg-dereference-pushdown-bug branch from b27672a to 016940c Compare April 21, 2023 03:32
@findinpath findinpath self-assigned this Apr 21, 2023
@findinpath findinpath requested review from electrum and findepi April 21, 2023 03:33
Dereference pushdown is effective on filtering
the data being read from the source so that only relevant
nested data is being provided by the connector to the engine.
However, the whole content corresponding to a nested column
is currently read from the data files.
@findinpath findinpath force-pushed the findinpath/iceberg-dereference-pushdown-bug branch from 016940c to 23a3f2b Compare April 21, 2023 04:57
@findinpath findinpath changed the title Showcase dereference pushdown ineffectiveness in Iceberg Showcase dereference pushdown limited effectiveness in Iceberg Apr 21, 2023
@raunaqmorarka raunaqmorarka merged commit 40ab0b3 into trinodb:master Apr 24, 2023
@github-actions github-actions bot added this to the 415 milestone Apr 24, 2023
DataSize processedDataSizeWithPushdown = statsWithPushdown.getProcessedInputDataSize();
assertQueryStats(
sessionWithoutPushdown,
selectQuery,
statsWithoutPushdown -> assertThat(statsWithoutPushdown.getProcessedInputDataSize()).isGreaterThan(processedDataSizeWithPushdown),
statsWithoutPushdown -> {
//TODO (https://github.com/trinodb/trino/issues/17156) add dereference pushdown on the physical layer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need more test data to show data pruning happens on read and/or disable Parquet compression (on write).
3000 'Z' characters will compress very well, and the coalescing adjacent reads should kick in

@atifiu
Copy link
Contributor

atifiu commented Jun 4, 2023

@findepi @findinpath I wanted to understand the difference between Physical input size and Input size. As per my initial basic understanding was that Physical size is compressed and will be always smaller in size and input size is uncompressed data that will be acted upon by Trino and will be always higher in size but after looking at multiple explain plan I don't think its correct understanding. I have documented some of my explanation in this discussion https://trinodb.slack.com/archives/CJ6UC075E/p1685710163200809
I have also referred to this article https://trino.io/blog/2023/04/11/date-predicates.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed iceberg Iceberg connector
Development

Successfully merging this pull request may close these issues.

5 participants