Skip to content

Commit

Permalink
Load only required projected columns in Iceberg connector
Browse files Browse the repository at this point in the history
  • Loading branch information
krvikash authored and raunaqmorarka committed Sep 30, 2024
1 parent 7d37167 commit acc73a0
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@
import org.apache.iceberg.expressions.Term;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.IntegerType;
import org.apache.iceberg.types.Types.NestedField;
Expand Down Expand Up @@ -252,8 +253,10 @@
import static io.trino.plugin.iceberg.IcebergTableProperties.SORTED_BY_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning;
import static io.trino.plugin.iceberg.IcebergTableProperties.getTableLocation;
import static io.trino.plugin.iceberg.IcebergUtil.buildPath;
import static io.trino.plugin.iceberg.IcebergUtil.canEnforceColumnConstraintInSpecs;
import static io.trino.plugin.iceberg.IcebergUtil.commit;
import static io.trino.plugin.iceberg.IcebergUtil.createColumnHandle;
import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue;
import static io.trino.plugin.iceberg.IcebergUtil.fileName;
import static io.trino.plugin.iceberg.IcebergUtil.firstSnapshot;
Expand Down Expand Up @@ -625,8 +628,7 @@ public ConnectorTableProperties getTableProperties(ConnectorSession session, Con
DiscretePredicates discretePredicates = null;
if (!partitionSourceIds.isEmpty()) {
// Extract identity partition columns
Map<Integer, IcebergColumnHandle> columns = getProjectedColumns(icebergTable.schema(), typeManager).stream()
.filter(column -> partitionSourceIds.contains(column.getId()))
Map<Integer, IcebergColumnHandle> columns = getProjectedColumns(icebergTable.schema(), typeManager, partitionSourceIds).stream()
.collect(toImmutableMap(IcebergColumnHandle::getId, identity()));

Supplier<List<FileScanTask>> lazyFiles = Suppliers.memoize(() -> {
Expand Down Expand Up @@ -1069,9 +1071,11 @@ private Optional<ConnectorTableLayout> getWriteLayout(Schema tableSchema, Partit
return Optional.empty();
}

Map<Integer, Integer> indexParents = indexParents(tableSchema.asStruct());
Map<Integer, IcebergColumnHandle> columnById = getProjectedColumns(tableSchema, typeManager).stream()
.collect(toImmutableMap(IcebergColumnHandle::getId, identity()));
StructType schemaAsStruct = tableSchema.asStruct();
Map<Integer, NestedField> indexById = TypeUtil.indexById(schemaAsStruct);
Map<Integer, Integer> indexParents = indexParents(schemaAsStruct);
Map<Integer, List<Integer>> indexPaths = indexById.entrySet().stream()
.collect(toImmutableMap(Map.Entry::getKey, entry -> ImmutableList.copyOf(buildPath(indexParents, entry.getKey()))));

List<IcebergColumnHandle> partitioningColumns = partitionSpec.fields().stream()
.sorted(Comparator.comparing(PartitionField::sourceId))
Expand All @@ -1093,7 +1097,8 @@ private Optional<ConnectorTableLayout> getWriteLayout(Schema tableSchema, Partit
if (sourceType.isListType()) {
throw new TrinoException(NOT_SUPPORTED, "Partitioning field [" + field.name() + "] cannot be contained in a array");
}
return requireNonNull(columnById.get(sourceId), () -> "Cannot find source column for partition field " + field);
verify(indexById.containsKey(sourceId), "Cannot find source column for partition field " + field);
return createColumnHandle(typeManager, sourceId, indexById, indexPaths);
})
.distinct()
.collect(toImmutableList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.builderWithExpectedSize;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
Expand Down Expand Up @@ -239,32 +240,46 @@ public static Table getIcebergTableWithMetadata(

public static List<IcebergColumnHandle> getProjectedColumns(Schema schema, TypeManager typeManager)
{
ImmutableList.Builder<IcebergColumnHandle> projectedColumns = ImmutableList.builder();
StructType schemaAsStruct = schema.asStruct();
Map<Integer, NestedField> indexById = TypeUtil.indexById(schemaAsStruct);
Map<Integer, Integer> indexParents = TypeUtil.indexParents(schemaAsStruct);
Map<Integer, NestedField> indexById = TypeUtil.indexById(schema.asStruct());
return getProjectedColumns(schema, typeManager, indexById, indexById.keySet() /* project all columns */);
}

public static List<IcebergColumnHandle> getProjectedColumns(Schema schema, TypeManager typeManager, Set<Integer> fieldIds)
{
Map<Integer, NestedField> indexById = TypeUtil.indexById(schema.asStruct());
return getProjectedColumns(schema, typeManager, indexById, fieldIds /* project selected columns */);
}

private static List<IcebergColumnHandle> getProjectedColumns(Schema schema, TypeManager typeManager, Map<Integer, NestedField> indexById, Set<Integer> fieldIds)
{
ImmutableList.Builder<IcebergColumnHandle> columns = builderWithExpectedSize(fieldIds.size());
Map<Integer, Integer> indexParents = TypeUtil.indexParents(schema.asStruct());
Map<Integer, List<Integer>> indexPaths = indexById.entrySet().stream()
.collect(toImmutableMap(Entry::getKey, e -> ImmutableList.copyOf(buildPath(indexParents, e.getKey()))));

for (Map.Entry<Integer, NestedField> entry : indexById.entrySet()) {
int fieldId = entry.getKey();
NestedField childField = entry.getValue();
NestedField baseField = childField;

List<Integer> path = requireNonNull(indexPaths.get(fieldId));
if (!path.isEmpty()) {
baseField = indexById.get(path.getFirst());
path = ImmutableList.<Integer>builder()
.addAll(path.subList(1, path.size())) // Base column id shouldn't exist in IcebergColumnHandle.path
.add(fieldId) // Append the leaf field id
.build();
}
projectedColumns.add(createColumnHandle(baseField, childField, typeManager, path));
.collect(toImmutableMap(Entry::getKey, entry -> ImmutableList.copyOf(buildPath(indexParents, entry.getKey()))));

for (int fieldId : fieldIds) {
columns.add(createColumnHandle(typeManager, fieldId, indexById, indexPaths));
}
return columns.build();
}

public static IcebergColumnHandle createColumnHandle(TypeManager typeManager, int fieldId, Map<Integer, NestedField> indexById, Map<Integer, List<Integer>> indexPaths)
{
NestedField childField = indexById.get(fieldId);
NestedField baseField = childField;

List<Integer> path = requireNonNull(indexPaths.get(fieldId));
if (!path.isEmpty()) {
baseField = indexById.get(path.getFirst());
path = ImmutableList.<Integer>builder()
.addAll(path.subList(1, path.size())) // Base column id shouldn't exist in IcebergColumnHandle.path
.add(fieldId) // Append the leaf field id
.build();
}
return projectedColumns.build();
return createColumnHandle(baseField, childField, typeManager, path);
}

private static List<Integer> buildPath(Map<Integer, Integer> indexParents, int fieldId)
public static List<Integer> buildPath(Map<Integer, Integer> indexParents, int fieldId)
{
List<Integer> path = new ArrayList<>();
while (indexParents.containsKey(fieldId)) {
Expand Down Expand Up @@ -357,7 +372,7 @@ public static List<IcebergColumnHandle> getTopLevelColumns(Schema schema, TypeMa
public static List<ColumnMetadata> getColumnMetadatas(Schema schema, TypeManager typeManager)
{
List<NestedField> icebergColumns = schema.columns();
ImmutableList.Builder<ColumnMetadata> columns = ImmutableList.builderWithExpectedSize(icebergColumns.size() + 2);
ImmutableList.Builder<ColumnMetadata> columns = builderWithExpectedSize(icebergColumns.size() + 2);

icebergColumns.stream()
.map(column ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
Expand Down Expand Up @@ -85,5 +86,40 @@ public void testGetProjectedColumns()
tuple(5, "element", 2, ImmutableList.of(4, 5)),
tuple(6, "nested", 2, ImmutableList.of(6)),
tuple(7, "value", 2, ImmutableList.of(6, 7)));

assertThat(getProjectedColumns(schema, TESTING_TYPE_MANAGER, ImmutableSet.of(1)))
.extracting(IcebergColumnHandle::getId, IcebergColumnHandle::getName, column -> column.getBaseColumn().getId(), IcebergColumnHandle::getPath)
.containsExactly(tuple(1, "id", 1, ImmutableList.of()));
assertThat(getProjectedColumns(schema, TESTING_TYPE_MANAGER, ImmutableSet.of(2)))
.extracting(IcebergColumnHandle::getId, IcebergColumnHandle::getName, column -> column.getBaseColumn().getId(), IcebergColumnHandle::getPath)
.containsExactly(tuple(2, "nested", 2, ImmutableList.of()));
assertThat(getProjectedColumns(schema, TESTING_TYPE_MANAGER, ImmutableSet.of(3)))
.extracting(IcebergColumnHandle::getId, IcebergColumnHandle::getName, column -> column.getBaseColumn().getId(), IcebergColumnHandle::getPath)
.containsExactly(tuple(3, "value", 2, ImmutableList.of(3)));
assertThat(getProjectedColumns(schema, TESTING_TYPE_MANAGER, ImmutableSet.of(4)))
.extracting(IcebergColumnHandle::getId, IcebergColumnHandle::getName, column -> column.getBaseColumn().getId(), IcebergColumnHandle::getPath)
.containsExactly(tuple(4, "list", 2, ImmutableList.of(4)));
assertThat(getProjectedColumns(schema, TESTING_TYPE_MANAGER, ImmutableSet.of(5)))
.extracting(IcebergColumnHandle::getId, IcebergColumnHandle::getName, column -> column.getBaseColumn().getId(), IcebergColumnHandle::getPath)
.containsExactly(tuple(5, "element", 2, ImmutableList.of(4, 5)));
assertThat(getProjectedColumns(schema, TESTING_TYPE_MANAGER, ImmutableSet.of(6)))
.extracting(IcebergColumnHandle::getId, IcebergColumnHandle::getName, column -> column.getBaseColumn().getId(), IcebergColumnHandle::getPath)
.containsExactly(tuple(6, "nested", 2, ImmutableList.of(6)));
assertThat(getProjectedColumns(schema, TESTING_TYPE_MANAGER, ImmutableSet.of(7)))
.extracting(IcebergColumnHandle::getId, IcebergColumnHandle::getName, column -> column.getBaseColumn().getId(), IcebergColumnHandle::getPath)
.containsExactly(tuple(7, "value", 2, ImmutableList.of(6, 7)));

assertThat(getProjectedColumns(schema, TESTING_TYPE_MANAGER, ImmutableSet.of(3, 7)))
.extracting(IcebergColumnHandle::getId, IcebergColumnHandle::getName, column -> column.getBaseColumn().getId(), IcebergColumnHandle::getPath)
.containsExactly(
tuple(3, "value", 2, ImmutableList.of(3)),
tuple(7, "value", 2, ImmutableList.of(6, 7)));

assertThat(getProjectedColumns(schema, TESTING_TYPE_MANAGER, ImmutableSet.of(1, 4, 5)))
.extracting(IcebergColumnHandle::getId, IcebergColumnHandle::getName, column -> column.getBaseColumn().getId(), IcebergColumnHandle::getPath)
.containsExactly(
tuple(1, "id", 1, ImmutableList.of()),
tuple(4, "list", 2, ImmutableList.of(4)),
tuple(5, "element", 2, ImmutableList.of(4, 5)));
}
}

0 comments on commit acc73a0

Please sign in to comment.