From 04ac9ba2ad5ffa2a162eec7b307d14c9fa574c8f Mon Sep 17 00:00:00 2001 From: Heltman Date: Tue, 18 Apr 2023 20:40:24 +0800 Subject: [PATCH] Improve performance of reading iceberg table with many delete files --- .../iceberg/IcebergPageSourceProvider.java | 47 ++++- .../iceberg/delete/EqualityDeleteFilter.java | 28 +-- .../trino/plugin/iceberg/TestIcebergV2.java | 164 +++++++++++++++++- 3 files changed, 209 insertions(+), 30 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index 3ec75b3f5b42..a3ac28bef570 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -24,6 +24,7 @@ import com.google.common.graph.Traverser; import com.google.inject.Inject; import io.airlift.slice.Slice; +import io.trino.annotation.NotThreadSafe; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; @@ -61,6 +62,7 @@ import io.trino.plugin.iceberg.IcebergParquetColumnIOConverter.FieldContext; import io.trino.plugin.iceberg.delete.DeleteFile; import io.trino.plugin.iceberg.delete.DeleteFilter; +import io.trino.plugin.iceberg.delete.EqualityDeleteFilter; import io.trino.plugin.iceberg.delete.PositionDeleteFilter; import io.trino.plugin.iceberg.delete.RowPredicate; import io.trino.plugin.iceberg.fileio.ForwardingInputFile; @@ -92,6 +94,7 @@ import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.StructLike; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.MappedField; @@ -100,6 +103,9 @@ import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.StructLikeSet; +import org.apache.iceberg.util.StructProjection; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.FileMetaData; @@ -353,6 +359,7 @@ else if (identity.getId() == TRINO_MERGE_PARTITION_DATA) { List deleteFilters = readDeletes( session, tableSchema, + readColumns, path, deletes, readerPageSourceWithRowPositions.getStartRowPosition(), @@ -390,6 +397,7 @@ else if (deleteFile.content() == EQUALITY_DELETES) { private List readDeletes( ConnectorSession session, Schema schema, + List readColumns, String dataFilePath, List deleteFiles, Optional startRowPosition, @@ -400,6 +408,7 @@ private List readDeletes( Slice targetPath = utf8Slice(dataFilePath); List filters = new ArrayList<>(); LongBitmapDataProvider deletedRows = new Roaring64Bitmap(); + Map, EqualityDeleteSet> deletesSetByFieldIds = new HashMap<>(); IcebergColumnHandle deleteFilePath = getColumnHandle(DELETE_FILE_PATH, typeManager); IcebergColumnHandle deleteFilePos = getColumnHandle(DELETE_FILE_POS, typeManager); @@ -436,14 +445,17 @@ private List readDeletes( } } else if (delete.content() == EQUALITY_DELETES) { - List fieldIds = delete.equalityFieldIds(); + Set fieldIds = ImmutableSet.copyOf(delete.equalityFieldIds()); verify(!fieldIds.isEmpty(), "equality field IDs are missing"); - List columns = fieldIds.stream() - .map(id -> getColumnHandle(schema.findField(id), typeManager)) + Schema deleteSchema = TypeUtil.select(schema, fieldIds); + List columns = deleteSchema.columns().stream() + .map(column -> getColumnHandle(column, typeManager)) .collect(toImmutableList()); + EqualityDeleteSet equalityDeleteSet = deletesSetByFieldIds.computeIfAbsent(fieldIds, key -> new EqualityDeleteSet(deleteSchema, schemaFromHandles(readColumns))); + try (ConnectorPageSource pageSource = openDeletes(session, delete, columns, TupleDomain.all())) { - filters.add(readEqualityDeletes(pageSource, columns, schema)); + readEqualityDeletes(pageSource, columns, equalityDeleteSet::add); } catch (IOException e) { throw new UncheckedIOException(e); @@ -458,6 +470,10 @@ else if (delete.content() == EQUALITY_DELETES) { filters.add(new PositionDeleteFilter(deletedRows)); } + for (EqualityDeleteSet equalityDeleteSet : deletesSetByFieldIds.values()) { + filters.add(new EqualityDeleteFilter(equalityDeleteSet::contains)); + } + return filters; } @@ -1537,4 +1553,27 @@ public int hashCode() return Objects.hash(baseColumnIdentity, path); } } + + @NotThreadSafe + private static class EqualityDeleteSet + { + private final StructLikeSet deleteSet; + private final StructProjection projection; + + public EqualityDeleteSet(Schema deleteSchema, Schema dataSchema) + { + this.deleteSet = StructLikeSet.create(deleteSchema.asStruct()); + this.projection = StructProjection.create(dataSchema, deleteSchema); + } + + public void add(StructLike row) + { + deleteSet.add(row); + } + + public boolean contains(StructLike row) + { + return deleteSet.contains(projection.wrap(row)); + } + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/EqualityDeleteFilter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/EqualityDeleteFilter.java index a41fbc679479..fbf5334de2b1 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/EqualityDeleteFilter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/EqualityDeleteFilter.java @@ -17,26 +17,22 @@ import io.trino.spi.Page; import io.trino.spi.connector.ConnectorPageSource; import io.trino.spi.type.Type; -import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; -import org.apache.iceberg.util.StructLikeSet; -import org.apache.iceberg.util.StructProjection; import java.util.List; +import java.util.function.Consumer; +import java.util.function.Predicate; -import static io.trino.plugin.iceberg.IcebergUtil.schemaFromHandles; import static java.util.Objects.requireNonNull; public final class EqualityDeleteFilter implements DeleteFilter { - private final Schema schema; - private final StructLikeSet deleteSet; + private final Predicate deletedRows; - private EqualityDeleteFilter(Schema schema, StructLikeSet deleteSet) + public EqualityDeleteFilter(Predicate deletedRows) { - this.schema = requireNonNull(schema, "schema is null"); - this.deleteSet = requireNonNull(deleteSet, "deleteSet is null"); + this.deletedRows = requireNonNull(deletedRows, "deletedRows is null"); } @Override @@ -46,24 +42,18 @@ public RowPredicate createPredicate(List columns) .map(IcebergColumnHandle::getType) .toArray(Type[]::new); - Schema fileSchema = schemaFromHandles(columns); - StructProjection projection = StructProjection.create(fileSchema, schema); - return (page, position) -> { StructLike row = new LazyTrinoRow(types, page, position); - return !deleteSet.contains(projection.wrap(row)); + return !deletedRows.test(row); }; } - public static DeleteFilter readEqualityDeletes(ConnectorPageSource pageSource, List columns, Schema tableSchema) + public static void readEqualityDeletes(ConnectorPageSource pageSource, List columns, Consumer deletedRows) { Type[] types = columns.stream() .map(IcebergColumnHandle::getType) .toArray(Type[]::new); - Schema deleteSchema = schemaFromHandles(columns); - StructLikeSet deleteSet = StructLikeSet.create(deleteSchema.asStruct()); - while (!pageSource.isFinished()) { Page page = pageSource.getNextPage(); if (page == null) { @@ -71,10 +61,8 @@ public static DeleteFilter readEqualityDeletes(ConnectorPageSource pageSource, L } for (int position = 0; position < page.getPositionCount(); position++) { - deleteSet.add(new TrinoRow(types, page, position)); + deletedRows.accept(new TrinoRow(types, page, position)); } } - - return new EqualityDeleteFilter(deleteSchema, deleteSet); } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java index 7f3577a1f11e..d7d5ad927337 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java @@ -61,6 +61,7 @@ import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Types; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -307,6 +308,129 @@ public void testSelectivelyOptimizingLeavesEqualityDeletes() assertThat(loadTable(tableName).currentSnapshot().summary().get("total-equality-deletes")).isEqualTo("1"); } + @Test + public void testMultipleEqualityDeletes() + throws Exception + { + String tableName = "test_multiple_equality_deletes_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.tiny.nation", 25); + Table icebergTable = loadTable(tableName); + assertThat(icebergTable.currentSnapshot().summary().get("total-equality-deletes")).isEqualTo("0"); + + for (int i = 1; i < 3; i++) { + writeEqualityDeleteToNationTable( + icebergTable, + Optional.empty(), + Optional.empty(), + ImmutableMap.of("regionkey", Integer.toUnsignedLong(i))); + } + + assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation WHERE (regionkey != 1L AND regionkey != 2L)"); + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testMultipleEqualityDeletesWithEquivalentSchemas() + throws Exception + { + String tableName = "test_multiple_equality_deletes_equivalent_schemas_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.tiny.nation", 25); + Table icebergTable = loadTable(tableName); + assertThat(icebergTable.currentSnapshot().summary().get("total-equality-deletes")).isEqualTo("0"); + Schema deleteRowSchema = new Schema(ImmutableList.of("regionkey", "name").stream() + .map(name -> icebergTable.schema().findField(name)) + .collect(toImmutableList())); + List equalityFieldIds = ImmutableList.of("regionkey", "name").stream() + .map(name -> deleteRowSchema.findField(name).fieldId()) + .collect(toImmutableList()); + writeEqualityDeleteToNationTableWithDeleteColumns( + icebergTable, + Optional.empty(), + Optional.empty(), + ImmutableMap.of("regionkey", 1L, "name", "BRAZIL"), + deleteRowSchema, + equalityFieldIds); + Schema equivalentDeleteRowSchema = new Schema(ImmutableList.of("name", "regionkey").stream() + .map(name -> icebergTable.schema().findField(name)) + .collect(toImmutableList())); + writeEqualityDeleteToNationTableWithDeleteColumns( + icebergTable, + Optional.empty(), + Optional.empty(), + ImmutableMap.of("name", "INDIA", "regionkey", 2L), + equivalentDeleteRowSchema, + equalityFieldIds); + + assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation WHERE NOT ((regionkey = 1 AND name = 'BRAZIL') OR (regionkey = 2 AND name = 'INDIA'))"); + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testMultipleEqualityDeletesWithDifferentSchemas() + throws Exception + { + String tableName = "test_multiple_equality_deletes_different_schemas_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.tiny.nation", 25); + Table icebergTable = loadTable(tableName); + assertThat(icebergTable.currentSnapshot().summary().get("total-equality-deletes")).isEqualTo("0"); + writeEqualityDeleteToNationTableWithDeleteColumns( + icebergTable, + Optional.empty(), + Optional.empty(), + ImmutableMap.of("regionkey", 1L, "name", "BRAZIL"), + Optional.of(ImmutableList.of("regionkey", "name"))); + writeEqualityDeleteToNationTableWithDeleteColumns( + icebergTable, + Optional.empty(), + Optional.empty(), + ImmutableMap.of("name", "ALGERIA"), + Optional.of(ImmutableList.of("name"))); + writeEqualityDeleteToNationTableWithDeleteColumns( + icebergTable, + Optional.empty(), + Optional.empty(), + ImmutableMap.of("regionkey", 2L), + Optional.of(ImmutableList.of("regionkey"))); + + assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation WHERE NOT ((regionkey = 1 AND name = 'BRAZIL') OR regionkey = 2 OR name = 'ALGERIA')"); + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testMultipleEqualityDeletesWithNestedFields() + throws Exception + { + String tableName = "test_multiple_equality_deletes_nested_fields_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " ( id BIGINT, root ROW(nested BIGINT, nested_other BIGINT))"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, row(10, 100))", 1); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, row(20, 200))", 1); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, row(20, 200))", 1); + Table icebergTable = loadTable(tableName); + assertThat(icebergTable.currentSnapshot().summary().get("total-equality-deletes")).isEqualTo("0"); + + List deleteFileColumns = ImmutableList.of("root.nested"); + Schema deleteRowSchema = icebergTable.schema().select(deleteFileColumns); + List equalityFieldIds = ImmutableList.of("root.nested").stream() + .map(name -> deleteRowSchema.findField(name).fieldId()) + .collect(toImmutableList()); + Types.StructType nestedStructType = (Types.StructType) deleteRowSchema.findField("root").type(); + Record nestedStruct = GenericRecord.create(nestedStructType); + nestedStruct.setField("nested", 20L); + for (int i = 1; i < 3; i++) { + writeEqualityDeleteToNationTableWithDeleteColumns( + icebergTable, + Optional.empty(), + Optional.empty(), + ImmutableMap.of("root", nestedStruct), + deleteRowSchema, + equalityFieldIds); + } + + // TODO: support read equality deletes with nested fields(https://github.com/trinodb/trino/issues/18625) + assertThatThrownBy(() -> query("SELECT * FROM " + tableName)).hasMessageContaining("Multiple entries with same key"); + assertUpdate("DROP TABLE " + tableName); + } + @Test public void testOptimizingWholeTableRemovesEqualityDeletes() throws Exception @@ -826,22 +950,50 @@ private void writeEqualityDeleteToNationTable(Table icebergTable, Optional partitionSpec, Optional partitionData, Map overwriteValues) + private void writeEqualityDeleteToNationTable( + Table icebergTable, + Optional partitionSpec, + Optional partitionData, + Map overwriteValues) + throws Exception + { + writeEqualityDeleteToNationTableWithDeleteColumns(icebergTable, partitionSpec, partitionData, overwriteValues, Optional.empty()); + } + + private void writeEqualityDeleteToNationTableWithDeleteColumns( + Table icebergTable, + Optional partitionSpec, + Optional partitionData, + Map overwriteValues, + Optional> deleteFileColumns) + throws Exception + { + List deleteColumns = deleteFileColumns.orElse(new ArrayList<>(overwriteValues.keySet())); + Schema deleteRowSchema = icebergTable.schema().select(deleteColumns); + List equalityDeleteFieldIds = deleteColumns.stream() + .map(name -> deleteRowSchema.findField(name).fieldId()) + .collect(toImmutableList()); + writeEqualityDeleteToNationTableWithDeleteColumns(icebergTable, partitionSpec, partitionData, overwriteValues, deleteRowSchema, equalityDeleteFieldIds); + } + + private void writeEqualityDeleteToNationTableWithDeleteColumns( + Table icebergTable, + Optional partitionSpec, + Optional partitionData, + Map overwriteValues, + Schema deleteRowSchema, + List equalityDeleteFieldIds) throws Exception { Path metadataDir = new Path(metastoreDir.toURI()); String deleteFileName = "delete_file_" + UUID.randomUUID(); FileIO fileIo = new ForwardingFileIo(fileSystemFactory.create(SESSION)); - Schema deleteRowSchema = icebergTable.schema().select(overwriteValues.keySet()); - List equalityFieldIds = overwriteValues.keySet().stream() - .map(name -> deleteRowSchema.findField(name).fieldId()) - .collect(toImmutableList()); Parquet.DeleteWriteBuilder writerBuilder = Parquet.writeDeletes(fileIo.newOutputFile(new Path(metadataDir, deleteFileName).toString())) .forTable(icebergTable) .rowSchema(deleteRowSchema) .createWriterFunc(GenericParquetWriter::buildWriter) - .equalityFieldIds(equalityFieldIds) + .equalityFieldIds(equalityDeleteFieldIds) .overwrite(); if (partitionSpec.isPresent() && partitionData.isPresent()) { writerBuilder = writerBuilder