From acc73a0c42586d49d507e47fac2d24e1121c6d3c Mon Sep 17 00:00:00 2001 From: Vikash Kumar Date: Thu, 26 Sep 2024 22:08:14 +0530 Subject: [PATCH] Load only required projected columns in Iceberg connector --- .../trino/plugin/iceberg/IcebergMetadata.java | 17 ++++-- .../io/trino/plugin/iceberg/IcebergUtil.java | 61 ++++++++++++------- .../trino/plugin/iceberg/TestIcebergUtil.java | 36 +++++++++++ 3 files changed, 85 insertions(+), 29 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 31bce54dcb11..1e15b67d5099 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -160,6 +160,7 @@ import org.apache.iceberg.expressions.Term; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.IntegerType; import org.apache.iceberg.types.Types.NestedField; @@ -252,8 +253,10 @@ import static io.trino.plugin.iceberg.IcebergTableProperties.SORTED_BY_PROPERTY; import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning; import static io.trino.plugin.iceberg.IcebergTableProperties.getTableLocation; +import static io.trino.plugin.iceberg.IcebergUtil.buildPath; import static io.trino.plugin.iceberg.IcebergUtil.canEnforceColumnConstraintInSpecs; import static io.trino.plugin.iceberg.IcebergUtil.commit; +import static io.trino.plugin.iceberg.IcebergUtil.createColumnHandle; import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue; import static io.trino.plugin.iceberg.IcebergUtil.fileName; import static io.trino.plugin.iceberg.IcebergUtil.firstSnapshot; @@ -625,8 +628,7 @@ public ConnectorTableProperties getTableProperties(ConnectorSession session, Con DiscretePredicates discretePredicates = null; if (!partitionSourceIds.isEmpty()) { // Extract identity partition columns - Map columns = getProjectedColumns(icebergTable.schema(), typeManager).stream() - .filter(column -> partitionSourceIds.contains(column.getId())) + Map columns = getProjectedColumns(icebergTable.schema(), typeManager, partitionSourceIds).stream() .collect(toImmutableMap(IcebergColumnHandle::getId, identity())); Supplier> lazyFiles = Suppliers.memoize(() -> { @@ -1069,9 +1071,11 @@ private Optional getWriteLayout(Schema tableSchema, Partit return Optional.empty(); } - Map indexParents = indexParents(tableSchema.asStruct()); - Map columnById = getProjectedColumns(tableSchema, typeManager).stream() - .collect(toImmutableMap(IcebergColumnHandle::getId, identity())); + StructType schemaAsStruct = tableSchema.asStruct(); + Map indexById = TypeUtil.indexById(schemaAsStruct); + Map indexParents = indexParents(schemaAsStruct); + Map> indexPaths = indexById.entrySet().stream() + .collect(toImmutableMap(Map.Entry::getKey, entry -> ImmutableList.copyOf(buildPath(indexParents, entry.getKey())))); List partitioningColumns = partitionSpec.fields().stream() .sorted(Comparator.comparing(PartitionField::sourceId)) @@ -1093,7 +1097,8 @@ private Optional getWriteLayout(Schema tableSchema, Partit if (sourceType.isListType()) { throw new TrinoException(NOT_SUPPORTED, "Partitioning field [" + field.name() + "] cannot be contained in a array"); } - return requireNonNull(columnById.get(sourceId), () -> "Cannot find source column for partition field " + field); + verify(indexById.containsKey(sourceId), "Cannot find source column for partition field " + field); + return createColumnHandle(typeManager, sourceId, indexById, indexPaths); }) .distinct() .collect(toImmutableList()); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java index 473110220764..84820e882ce6 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java @@ -98,6 +98,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Verify.verify; +import static com.google.common.collect.ImmutableList.builderWithExpectedSize; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; @@ -239,32 +240,46 @@ public static Table getIcebergTableWithMetadata( public static List getProjectedColumns(Schema schema, TypeManager typeManager) { - ImmutableList.Builder projectedColumns = ImmutableList.builder(); - StructType schemaAsStruct = schema.asStruct(); - Map indexById = TypeUtil.indexById(schemaAsStruct); - Map indexParents = TypeUtil.indexParents(schemaAsStruct); + Map indexById = TypeUtil.indexById(schema.asStruct()); + return getProjectedColumns(schema, typeManager, indexById, indexById.keySet() /* project all columns */); + } + + public static List getProjectedColumns(Schema schema, TypeManager typeManager, Set fieldIds) + { + Map indexById = TypeUtil.indexById(schema.asStruct()); + return getProjectedColumns(schema, typeManager, indexById, fieldIds /* project selected columns */); + } + + private static List getProjectedColumns(Schema schema, TypeManager typeManager, Map indexById, Set fieldIds) + { + ImmutableList.Builder columns = builderWithExpectedSize(fieldIds.size()); + Map indexParents = TypeUtil.indexParents(schema.asStruct()); Map> indexPaths = indexById.entrySet().stream() - .collect(toImmutableMap(Entry::getKey, e -> ImmutableList.copyOf(buildPath(indexParents, e.getKey())))); - - for (Map.Entry entry : indexById.entrySet()) { - int fieldId = entry.getKey(); - NestedField childField = entry.getValue(); - NestedField baseField = childField; - - List path = requireNonNull(indexPaths.get(fieldId)); - if (!path.isEmpty()) { - baseField = indexById.get(path.getFirst()); - path = ImmutableList.builder() - .addAll(path.subList(1, path.size())) // Base column id shouldn't exist in IcebergColumnHandle.path - .add(fieldId) // Append the leaf field id - .build(); - } - projectedColumns.add(createColumnHandle(baseField, childField, typeManager, path)); + .collect(toImmutableMap(Entry::getKey, entry -> ImmutableList.copyOf(buildPath(indexParents, entry.getKey())))); + + for (int fieldId : fieldIds) { + columns.add(createColumnHandle(typeManager, fieldId, indexById, indexPaths)); + } + return columns.build(); + } + + public static IcebergColumnHandle createColumnHandle(TypeManager typeManager, int fieldId, Map indexById, Map> indexPaths) + { + NestedField childField = indexById.get(fieldId); + NestedField baseField = childField; + + List path = requireNonNull(indexPaths.get(fieldId)); + if (!path.isEmpty()) { + baseField = indexById.get(path.getFirst()); + path = ImmutableList.builder() + .addAll(path.subList(1, path.size())) // Base column id shouldn't exist in IcebergColumnHandle.path + .add(fieldId) // Append the leaf field id + .build(); } - return projectedColumns.build(); + return createColumnHandle(baseField, childField, typeManager, path); } - private static List buildPath(Map indexParents, int fieldId) + public static List buildPath(Map indexParents, int fieldId) { List path = new ArrayList<>(); while (indexParents.containsKey(fieldId)) { @@ -357,7 +372,7 @@ public static List getTopLevelColumns(Schema schema, TypeMa public static List getColumnMetadatas(Schema schema, TypeManager typeManager) { List icebergColumns = schema.columns(); - ImmutableList.Builder columns = ImmutableList.builderWithExpectedSize(icebergColumns.size() + 2); + ImmutableList.Builder columns = builderWithExpectedSize(icebergColumns.size() + 2); icebergColumns.stream() .map(column -> diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergUtil.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergUtil.java index 39db02d94ba9..a5df23fe0812 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergUtil.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergUtil.java @@ -14,6 +14,7 @@ package io.trino.plugin.iceberg; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import org.apache.iceberg.Schema; import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.NestedField; @@ -85,5 +86,40 @@ public void testGetProjectedColumns() tuple(5, "element", 2, ImmutableList.of(4, 5)), tuple(6, "nested", 2, ImmutableList.of(6)), tuple(7, "value", 2, ImmutableList.of(6, 7))); + + assertThat(getProjectedColumns(schema, TESTING_TYPE_MANAGER, ImmutableSet.of(1))) + .extracting(IcebergColumnHandle::getId, IcebergColumnHandle::getName, column -> column.getBaseColumn().getId(), IcebergColumnHandle::getPath) + .containsExactly(tuple(1, "id", 1, ImmutableList.of())); + assertThat(getProjectedColumns(schema, TESTING_TYPE_MANAGER, ImmutableSet.of(2))) + .extracting(IcebergColumnHandle::getId, IcebergColumnHandle::getName, column -> column.getBaseColumn().getId(), IcebergColumnHandle::getPath) + .containsExactly(tuple(2, "nested", 2, ImmutableList.of())); + assertThat(getProjectedColumns(schema, TESTING_TYPE_MANAGER, ImmutableSet.of(3))) + .extracting(IcebergColumnHandle::getId, IcebergColumnHandle::getName, column -> column.getBaseColumn().getId(), IcebergColumnHandle::getPath) + .containsExactly(tuple(3, "value", 2, ImmutableList.of(3))); + assertThat(getProjectedColumns(schema, TESTING_TYPE_MANAGER, ImmutableSet.of(4))) + .extracting(IcebergColumnHandle::getId, IcebergColumnHandle::getName, column -> column.getBaseColumn().getId(), IcebergColumnHandle::getPath) + .containsExactly(tuple(4, "list", 2, ImmutableList.of(4))); + assertThat(getProjectedColumns(schema, TESTING_TYPE_MANAGER, ImmutableSet.of(5))) + .extracting(IcebergColumnHandle::getId, IcebergColumnHandle::getName, column -> column.getBaseColumn().getId(), IcebergColumnHandle::getPath) + .containsExactly(tuple(5, "element", 2, ImmutableList.of(4, 5))); + assertThat(getProjectedColumns(schema, TESTING_TYPE_MANAGER, ImmutableSet.of(6))) + .extracting(IcebergColumnHandle::getId, IcebergColumnHandle::getName, column -> column.getBaseColumn().getId(), IcebergColumnHandle::getPath) + .containsExactly(tuple(6, "nested", 2, ImmutableList.of(6))); + assertThat(getProjectedColumns(schema, TESTING_TYPE_MANAGER, ImmutableSet.of(7))) + .extracting(IcebergColumnHandle::getId, IcebergColumnHandle::getName, column -> column.getBaseColumn().getId(), IcebergColumnHandle::getPath) + .containsExactly(tuple(7, "value", 2, ImmutableList.of(6, 7))); + + assertThat(getProjectedColumns(schema, TESTING_TYPE_MANAGER, ImmutableSet.of(3, 7))) + .extracting(IcebergColumnHandle::getId, IcebergColumnHandle::getName, column -> column.getBaseColumn().getId(), IcebergColumnHandle::getPath) + .containsExactly( + tuple(3, "value", 2, ImmutableList.of(3)), + tuple(7, "value", 2, ImmutableList.of(6, 7))); + + assertThat(getProjectedColumns(schema, TESTING_TYPE_MANAGER, ImmutableSet.of(1, 4, 5))) + .extracting(IcebergColumnHandle::getId, IcebergColumnHandle::getName, column -> column.getBaseColumn().getId(), IcebergColumnHandle::getPath) + .containsExactly( + tuple(1, "id", 1, ImmutableList.of()), + tuple(4, "list", 2, ImmutableList.of(4)), + tuple(5, "element", 2, ImmutableList.of(4, 5))); } }