Skip to content

Commit

Permalink
Improve performance of reading iceberg table with many delete files
Browse files Browse the repository at this point in the history
  • Loading branch information
Heltman authored and findepi committed Sep 26, 2023
1 parent e1dc146 commit 04ac9ba
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.graph.Traverser;
import com.google.inject.Inject;
import io.airlift.slice.Slice;
import io.trino.annotation.NotThreadSafe;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
Expand Down Expand Up @@ -61,6 +62,7 @@
import io.trino.plugin.iceberg.IcebergParquetColumnIOConverter.FieldContext;
import io.trino.plugin.iceberg.delete.DeleteFile;
import io.trino.plugin.iceberg.delete.DeleteFilter;
import io.trino.plugin.iceberg.delete.EqualityDeleteFilter;
import io.trino.plugin.iceberg.delete.PositionDeleteFilter;
import io.trino.plugin.iceberg.delete.RowPredicate;
import io.trino.plugin.iceberg.fileio.ForwardingInputFile;
Expand Down Expand Up @@ -92,6 +94,7 @@
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.MappedField;
Expand All @@ -100,6 +103,9 @@
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.StructProjection;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.FileMetaData;
Expand Down Expand Up @@ -353,6 +359,7 @@ else if (identity.getId() == TRINO_MERGE_PARTITION_DATA) {
List<DeleteFilter> deleteFilters = readDeletes(
session,
tableSchema,
readColumns,
path,
deletes,
readerPageSourceWithRowPositions.getStartRowPosition(),
Expand Down Expand Up @@ -390,6 +397,7 @@ else if (deleteFile.content() == EQUALITY_DELETES) {
private List<DeleteFilter> readDeletes(
ConnectorSession session,
Schema schema,
List<IcebergColumnHandle> readColumns,
String dataFilePath,
List<DeleteFile> deleteFiles,
Optional<Long> startRowPosition,
Expand All @@ -400,6 +408,7 @@ private List<DeleteFilter> readDeletes(
Slice targetPath = utf8Slice(dataFilePath);
List<DeleteFilter> filters = new ArrayList<>();
LongBitmapDataProvider deletedRows = new Roaring64Bitmap();
Map<Set<Integer>, EqualityDeleteSet> deletesSetByFieldIds = new HashMap<>();

IcebergColumnHandle deleteFilePath = getColumnHandle(DELETE_FILE_PATH, typeManager);
IcebergColumnHandle deleteFilePos = getColumnHandle(DELETE_FILE_POS, typeManager);
Expand Down Expand Up @@ -436,14 +445,17 @@ private List<DeleteFilter> readDeletes(
}
}
else if (delete.content() == EQUALITY_DELETES) {
List<Integer> fieldIds = delete.equalityFieldIds();
Set<Integer> fieldIds = ImmutableSet.copyOf(delete.equalityFieldIds());
verify(!fieldIds.isEmpty(), "equality field IDs are missing");
List<IcebergColumnHandle> columns = fieldIds.stream()
.map(id -> getColumnHandle(schema.findField(id), typeManager))
Schema deleteSchema = TypeUtil.select(schema, fieldIds);
List<IcebergColumnHandle> columns = deleteSchema.columns().stream()
.map(column -> getColumnHandle(column, typeManager))
.collect(toImmutableList());

EqualityDeleteSet equalityDeleteSet = deletesSetByFieldIds.computeIfAbsent(fieldIds, key -> new EqualityDeleteSet(deleteSchema, schemaFromHandles(readColumns)));

try (ConnectorPageSource pageSource = openDeletes(session, delete, columns, TupleDomain.all())) {
filters.add(readEqualityDeletes(pageSource, columns, schema));
readEqualityDeletes(pageSource, columns, equalityDeleteSet::add);
}
catch (IOException e) {
throw new UncheckedIOException(e);
Expand All @@ -458,6 +470,10 @@ else if (delete.content() == EQUALITY_DELETES) {
filters.add(new PositionDeleteFilter(deletedRows));
}

for (EqualityDeleteSet equalityDeleteSet : deletesSetByFieldIds.values()) {
filters.add(new EqualityDeleteFilter(equalityDeleteSet::contains));
}

return filters;
}

Expand Down Expand Up @@ -1537,4 +1553,27 @@ public int hashCode()
return Objects.hash(baseColumnIdentity, path);
}
}

@NotThreadSafe
private static class EqualityDeleteSet
{
private final StructLikeSet deleteSet;
private final StructProjection projection;

public EqualityDeleteSet(Schema deleteSchema, Schema dataSchema)
{
this.deleteSet = StructLikeSet.create(deleteSchema.asStruct());
this.projection = StructProjection.create(dataSchema, deleteSchema);
}

public void add(StructLike row)
{
deleteSet.add(row);
}

public boolean contains(StructLike row)
{
return deleteSet.contains(projection.wrap(row));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,22 @@
import io.trino.spi.Page;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.type.Type;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.StructProjection;

import java.util.List;
import java.util.function.Consumer;
import java.util.function.Predicate;

import static io.trino.plugin.iceberg.IcebergUtil.schemaFromHandles;
import static java.util.Objects.requireNonNull;

public final class EqualityDeleteFilter
implements DeleteFilter
{
private final Schema schema;
private final StructLikeSet deleteSet;
private final Predicate<StructLike> deletedRows;

private EqualityDeleteFilter(Schema schema, StructLikeSet deleteSet)
public EqualityDeleteFilter(Predicate<StructLike> deletedRows)
{
this.schema = requireNonNull(schema, "schema is null");
this.deleteSet = requireNonNull(deleteSet, "deleteSet is null");
this.deletedRows = requireNonNull(deletedRows, "deletedRows is null");
}

@Override
Expand All @@ -46,35 +42,27 @@ public RowPredicate createPredicate(List<IcebergColumnHandle> columns)
.map(IcebergColumnHandle::getType)
.toArray(Type[]::new);

Schema fileSchema = schemaFromHandles(columns);
StructProjection projection = StructProjection.create(fileSchema, schema);

return (page, position) -> {
StructLike row = new LazyTrinoRow(types, page, position);
return !deleteSet.contains(projection.wrap(row));
return !deletedRows.test(row);
};
}

public static DeleteFilter readEqualityDeletes(ConnectorPageSource pageSource, List<IcebergColumnHandle> columns, Schema tableSchema)
public static void readEqualityDeletes(ConnectorPageSource pageSource, List<IcebergColumnHandle> columns, Consumer<StructLike> deletedRows)
{
Type[] types = columns.stream()
.map(IcebergColumnHandle::getType)
.toArray(Type[]::new);

Schema deleteSchema = schemaFromHandles(columns);
StructLikeSet deleteSet = StructLikeSet.create(deleteSchema.asStruct());

while (!pageSource.isFinished()) {
Page page = pageSource.getNextPage();
if (page == null) {
continue;
}

for (int position = 0; position < page.getPositionCount(); position++) {
deleteSet.add(new TrinoRow(types, page, position));
deletedRows.accept(new TrinoRow(types, page, position));
}
}

return new EqualityDeleteFilter(deleteSchema, deleteSet);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.Types;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -307,6 +308,129 @@ public void testSelectivelyOptimizingLeavesEqualityDeletes()
assertThat(loadTable(tableName).currentSnapshot().summary().get("total-equality-deletes")).isEqualTo("1");
}

@Test
public void testMultipleEqualityDeletes()
throws Exception
{
String tableName = "test_multiple_equality_deletes_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.tiny.nation", 25);
Table icebergTable = loadTable(tableName);
assertThat(icebergTable.currentSnapshot().summary().get("total-equality-deletes")).isEqualTo("0");

for (int i = 1; i < 3; i++) {
writeEqualityDeleteToNationTable(
icebergTable,
Optional.empty(),
Optional.empty(),
ImmutableMap.of("regionkey", Integer.toUnsignedLong(i)));
}

assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation WHERE (regionkey != 1L AND regionkey != 2L)");
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testMultipleEqualityDeletesWithEquivalentSchemas()
throws Exception
{
String tableName = "test_multiple_equality_deletes_equivalent_schemas_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.tiny.nation", 25);
Table icebergTable = loadTable(tableName);
assertThat(icebergTable.currentSnapshot().summary().get("total-equality-deletes")).isEqualTo("0");
Schema deleteRowSchema = new Schema(ImmutableList.of("regionkey", "name").stream()
.map(name -> icebergTable.schema().findField(name))
.collect(toImmutableList()));
List<Integer> equalityFieldIds = ImmutableList.of("regionkey", "name").stream()
.map(name -> deleteRowSchema.findField(name).fieldId())
.collect(toImmutableList());
writeEqualityDeleteToNationTableWithDeleteColumns(
icebergTable,
Optional.empty(),
Optional.empty(),
ImmutableMap.of("regionkey", 1L, "name", "BRAZIL"),
deleteRowSchema,
equalityFieldIds);
Schema equivalentDeleteRowSchema = new Schema(ImmutableList.of("name", "regionkey").stream()
.map(name -> icebergTable.schema().findField(name))
.collect(toImmutableList()));
writeEqualityDeleteToNationTableWithDeleteColumns(
icebergTable,
Optional.empty(),
Optional.empty(),
ImmutableMap.of("name", "INDIA", "regionkey", 2L),
equivalentDeleteRowSchema,
equalityFieldIds);

assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation WHERE NOT ((regionkey = 1 AND name = 'BRAZIL') OR (regionkey = 2 AND name = 'INDIA'))");
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testMultipleEqualityDeletesWithDifferentSchemas()
throws Exception
{
String tableName = "test_multiple_equality_deletes_different_schemas_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.tiny.nation", 25);
Table icebergTable = loadTable(tableName);
assertThat(icebergTable.currentSnapshot().summary().get("total-equality-deletes")).isEqualTo("0");
writeEqualityDeleteToNationTableWithDeleteColumns(
icebergTable,
Optional.empty(),
Optional.empty(),
ImmutableMap.of("regionkey", 1L, "name", "BRAZIL"),
Optional.of(ImmutableList.of("regionkey", "name")));
writeEqualityDeleteToNationTableWithDeleteColumns(
icebergTable,
Optional.empty(),
Optional.empty(),
ImmutableMap.of("name", "ALGERIA"),
Optional.of(ImmutableList.of("name")));
writeEqualityDeleteToNationTableWithDeleteColumns(
icebergTable,
Optional.empty(),
Optional.empty(),
ImmutableMap.of("regionkey", 2L),
Optional.of(ImmutableList.of("regionkey")));

assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation WHERE NOT ((regionkey = 1 AND name = 'BRAZIL') OR regionkey = 2 OR name = 'ALGERIA')");
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testMultipleEqualityDeletesWithNestedFields()
throws Exception
{
String tableName = "test_multiple_equality_deletes_nested_fields_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " ( id BIGINT, root ROW(nested BIGINT, nested_other BIGINT))");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, row(10, 100))", 1);
assertUpdate("INSERT INTO " + tableName + " VALUES (2, row(20, 200))", 1);
assertUpdate("INSERT INTO " + tableName + " VALUES (2, row(20, 200))", 1);
Table icebergTable = loadTable(tableName);
assertThat(icebergTable.currentSnapshot().summary().get("total-equality-deletes")).isEqualTo("0");

List<String> deleteFileColumns = ImmutableList.of("root.nested");
Schema deleteRowSchema = icebergTable.schema().select(deleteFileColumns);
List<Integer> equalityFieldIds = ImmutableList.of("root.nested").stream()
.map(name -> deleteRowSchema.findField(name).fieldId())
.collect(toImmutableList());
Types.StructType nestedStructType = (Types.StructType) deleteRowSchema.findField("root").type();
Record nestedStruct = GenericRecord.create(nestedStructType);
nestedStruct.setField("nested", 20L);
for (int i = 1; i < 3; i++) {
writeEqualityDeleteToNationTableWithDeleteColumns(
icebergTable,
Optional.empty(),
Optional.empty(),
ImmutableMap.of("root", nestedStruct),
deleteRowSchema,
equalityFieldIds);
}

// TODO: support read equality deletes with nested fields(https://github.com/trinodb/trino/issues/18625)
assertThatThrownBy(() -> query("SELECT * FROM " + tableName)).hasMessageContaining("Multiple entries with same key");
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testOptimizingWholeTableRemovesEqualityDeletes()
throws Exception
Expand Down Expand Up @@ -826,22 +950,50 @@ private void writeEqualityDeleteToNationTable(Table icebergTable, Optional<Parti
writeEqualityDeleteToNationTable(icebergTable, partitionSpec, partitionData, ImmutableMap.of("regionkey", 1L));
}

private void writeEqualityDeleteToNationTable(Table icebergTable, Optional<PartitionSpec> partitionSpec, Optional<PartitionData> partitionData, Map<String, Object> overwriteValues)
private void writeEqualityDeleteToNationTable(
Table icebergTable,
Optional<PartitionSpec> partitionSpec,
Optional<PartitionData> partitionData,
Map<String, Object> overwriteValues)
throws Exception
{
writeEqualityDeleteToNationTableWithDeleteColumns(icebergTable, partitionSpec, partitionData, overwriteValues, Optional.empty());
}

private void writeEqualityDeleteToNationTableWithDeleteColumns(
Table icebergTable,
Optional<PartitionSpec> partitionSpec,
Optional<PartitionData> partitionData,
Map<String, Object> overwriteValues,
Optional<List<String>> deleteFileColumns)
throws Exception
{
List<String> deleteColumns = deleteFileColumns.orElse(new ArrayList<>(overwriteValues.keySet()));
Schema deleteRowSchema = icebergTable.schema().select(deleteColumns);
List<Integer> equalityDeleteFieldIds = deleteColumns.stream()
.map(name -> deleteRowSchema.findField(name).fieldId())
.collect(toImmutableList());
writeEqualityDeleteToNationTableWithDeleteColumns(icebergTable, partitionSpec, partitionData, overwriteValues, deleteRowSchema, equalityDeleteFieldIds);
}

private void writeEqualityDeleteToNationTableWithDeleteColumns(
Table icebergTable,
Optional<PartitionSpec> partitionSpec,
Optional<PartitionData> partitionData,
Map<String, Object> overwriteValues,
Schema deleteRowSchema,
List<Integer> equalityDeleteFieldIds)
throws Exception
{
Path metadataDir = new Path(metastoreDir.toURI());
String deleteFileName = "delete_file_" + UUID.randomUUID();
FileIO fileIo = new ForwardingFileIo(fileSystemFactory.create(SESSION));

Schema deleteRowSchema = icebergTable.schema().select(overwriteValues.keySet());
List<Integer> equalityFieldIds = overwriteValues.keySet().stream()
.map(name -> deleteRowSchema.findField(name).fieldId())
.collect(toImmutableList());
Parquet.DeleteWriteBuilder writerBuilder = Parquet.writeDeletes(fileIo.newOutputFile(new Path(metadataDir, deleteFileName).toString()))
.forTable(icebergTable)
.rowSchema(deleteRowSchema)
.createWriterFunc(GenericParquetWriter::buildWriter)
.equalityFieldIds(equalityFieldIds)
.equalityFieldIds(equalityDeleteFieldIds)
.overwrite();
if (partitionSpec.isPresent() && partitionData.isPresent()) {
writerBuilder = writerBuilder
Expand Down

0 comments on commit 04ac9ba

Please sign in to comment.