Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for deletion vector in Iceberg #24882

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,12 @@
<artifactId>iceberg-core</artifactId>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-data</artifactId>
<version>${dep.iceberg.version}</version>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-nessie</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,29 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;

import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;

import static java.util.Objects.requireNonNull;

public record CommitTaskData(
String path,
IcebergFileFormat fileFormat,
FileFormat fileFormat,
long fileSizeInBytes,
MetricsWrapper metrics,
String partitionSpecJson,
Optional<String> partitionDataJson,
FileContent content,
Optional<String> referencedDataFile,
Optional<List<Long>> fileSplitOffsets)
List<String> rewrittenDeleteFiles,
Optional<List<Long>> fileSplitOffsets,
OptionalLong contentOffset,
OptionalLong contentSize)
{
public CommitTaskData
{
Expand All @@ -40,6 +46,8 @@ public record CommitTaskData(
requireNonNull(partitionDataJson, "partitionDataJson is null");
requireNonNull(content, "content is null");
requireNonNull(referencedDataFile, "referencedDataFile is null");
rewrittenDeleteFiles = ImmutableList.copyOf(rewrittenDeleteFiles);
requireNonNull(fileSplitOffsets, "fileSplitOffsets is null");
requireNonNull(contentOffset, "contentOffset is null");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.type.Type;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.Record;
Expand Down Expand Up @@ -46,6 +47,7 @@ public final class IcebergAvroFileWriter
// Use static table name instead of the actual name because it becomes outdated once the table is renamed
public static final String AVRO_TABLE_NAME = "table";

private final String location;
private final Schema icebergSchema;
private final List<Type> types;
private final FileAppender<Record> avroWriter;
Expand All @@ -58,6 +60,7 @@ public IcebergAvroFileWriter(
List<Type> types,
HiveCompressionCodec hiveCompressionCodec)
{
this.location = file.location();
this.rollbackAction = requireNonNull(rollbackAction, "rollbackAction null");
this.icebergSchema = requireNonNull(icebergSchema, "icebergSchema is null");
this.types = ImmutableList.copyOf(requireNonNull(types, "types is null"));
Expand All @@ -71,10 +74,22 @@ public IcebergAvroFileWriter(
.build();
}
catch (IOException e) {
throw new TrinoException(ICEBERG_WRITER_OPEN_ERROR, "Error creating Avro file: " + file.location(), e);
throw new TrinoException(ICEBERG_WRITER_OPEN_ERROR, "Error creating Avro file: " + location, e);
}
}

@Override
public FileFormat getFileFormat()
{
return FileFormat.AVRO;
}

@Override
public String location()
{
return location;
}

@Override
public long getWrittenBytes()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@
public class IcebergConfig
{
public static final int FORMAT_VERSION_SUPPORT_MIN = 1;
public static final int FORMAT_VERSION_SUPPORT_MAX = 2;
private static final int FORMAT_VERSION_DEFAULT = 2;
public static final int FORMAT_VERSION_SUPPORT_MAX = 3;
public static final String EXTENDED_STATISTICS_CONFIG = "iceberg.extended-statistics.enabled";
public static final String EXTENDED_STATISTICS_DESCRIPTION = "Enable collection (ANALYZE) and use of extended statistics.";
public static final String COLLECT_EXTENDED_STATISTICS_ON_WRITE_DESCRIPTION = "Collect extended statistics during writes";
Expand All @@ -75,7 +76,7 @@ public class IcebergConfig
private boolean registerTableProcedureEnabled;
private boolean addFilesProcedureEnabled;
private Optional<String> hiveCatalogName = Optional.empty();
private int formatVersion = FORMAT_VERSION_SUPPORT_MAX;
private int formatVersion = FORMAT_VERSION_DEFAULT;
private Duration expireSnapshotsMinRetention = new Duration(7, DAYS);
private Duration removeOrphanFilesMinRetention = new Duration(7, DAYS);
private DataSize targetMaxFileSize = DataSize.of(1, GIGABYTE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public static IcebergFileFormat fromIceberg(FileFormat format)
case AVRO -> AVRO;
// Not used as a data file format
case METADATA -> throw new IllegalArgumentException("Unexpected METADATA file format");
case PUFFIN -> throw new IllegalArgumentException("Unexpected PUFFIN file format");
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import io.trino.plugin.hive.FileWriter;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Metrics;

import java.util.List;
Expand All @@ -22,6 +24,15 @@
public interface IcebergFileWriter
extends FileWriter
{
FileFormat getFileFormat();

String location();

default List<String> rewrittenDeleteFiles()
{
return ImmutableList.of();
}

FileMetrics getFileMetrics();

record FileMetrics(Metrics metrics, Optional<List<Long>> splitOffsets) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,31 @@
import io.trino.plugin.hive.HiveCompressionCodec;
import io.trino.plugin.hive.NodeVersion;
import io.trino.plugin.hive.orc.OrcWriterConfig;
import io.trino.plugin.iceberg.catalog.TrinoCatalog;
import io.trino.plugin.iceberg.delete.DeletionVectorWriter;
import io.trino.plugin.iceberg.fileio.ForwardingOutputFile;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DeleteFileSet;
import org.weakref.jmx.Managed;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -81,6 +91,7 @@
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS;
import static org.apache.iceberg.TableProperties.DEFAULT_WRITE_METRICS_MODE;
import static org.apache.iceberg.io.DeleteSchemaUtil.pathPosSchema;
import static org.apache.iceberg.parquet.ParquetSchemaUtil.convert;
Expand Down Expand Up @@ -135,19 +146,47 @@ public IcebergFileWriter createDataFileWriter(
}

public IcebergFileWriter createPositionDeleteWriter(
TrinoCatalog catalog,
SchemaTableName tableName,
TrinoFileSystem fileSystem,
Location outputPath,
ConnectorSession session,
IcebergFileFormat fileFormat,
String dataFilePath,
FileFormat fileFormat,
PartitionSpec partitionSpec,
Optional<PartitionData> partition,
Map<String, String> storageProperties)
{
return switch (fileFormat) {
case PUFFIN -> createDeletionVectorWriter(catalog, tableName, fileSystem, outputPath, session, dataFilePath, partitionSpec, partition);
case PARQUET -> createParquetWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties);
case ORC -> createOrcWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties, DataSize.ofBytes(Integer.MAX_VALUE));
case AVRO -> createAvroWriter(fileSystem, outputPath, POSITION_DELETE_SCHEMA, session);
default -> throw new IllegalArgumentException("Unsupported file format: " + fileFormat);
};
}

private static DeletionVectorWriter createDeletionVectorWriter(
TrinoCatalog catalog,
SchemaTableName tableName,
TrinoFileSystem fileSystem,
Location outputPath,
ConnectorSession session,
String dataFilePath,
PartitionSpec partitionSpec,
Optional<PartitionData> partition)
{
Table table = catalog.loadTable(session, tableName);
ImmutableMap.Builder<String, DeleteFileSet> rewritableDeletes = ImmutableMap.builder();
// TODO Specify snapshot ID
for (FileScanTask next : table.newScan().planFiles()) {
rewritableDeletes.put(next.file().location(), DeleteFileSet.of(next.deletes()));
}
Function<CharSequence, PositionDeleteIndex> previousDeleteLoader = DeletionVectorWriter.create(table, rewritableDeletes.buildOrThrow());
int positionChannel = POSITION_DELETE_SCHEMA.columns().indexOf(DELETE_FILE_POS);
return new DeletionVectorWriter(fileSystem, outputPath, dataFilePath, catalog.loadTable(session, tableName), partitionSpec, partition, previousDeleteLoader::apply, positionChannel);
}

private IcebergFileWriter createParquetWriter(
MetricsConfig metricsConfig,
TrinoFileSystem fileSystem,
Expand Down Expand Up @@ -233,6 +272,7 @@ private IcebergFileWriter createOrcWriter(
}

return new IcebergOrcFileWriter(
outputPath,
metricsConfig,
icebergSchema,
orcDataSink,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airlift.json.JsonCodec;
import io.airlift.slice.Slice;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.plugin.iceberg.catalog.TrinoCatalog;
import io.trino.plugin.iceberg.delete.PositionDeleteWriter;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
Expand All @@ -25,6 +26,7 @@
import io.trino.spi.connector.ConnectorPageSink;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.MergePage;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.VarcharType;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -55,8 +57,11 @@ public class IcebergMergeSink
private final LocationProvider locationProvider;
private final IcebergFileWriterFactory fileWriterFactory;
private final TrinoFileSystem fileSystem;
private final TrinoCatalog catalog;
private final SchemaTableName tableName;
private final JsonCodec<CommitTaskData> jsonCodec;
private final ConnectorSession session;
private final int formatVersion;
private final IcebergFileFormat fileFormat;
private final Map<String, String> storageProperties;
private final Schema schema;
Expand All @@ -69,8 +74,11 @@ public IcebergMergeSink(
LocationProvider locationProvider,
IcebergFileWriterFactory fileWriterFactory,
TrinoFileSystem fileSystem,
TrinoCatalog catalog,
SchemaTableName tableName,
JsonCodec<CommitTaskData> jsonCodec,
ConnectorSession session,
int formatVersion,
IcebergFileFormat fileFormat,
Map<String, String> storageProperties,
Schema schema,
Expand All @@ -81,8 +89,11 @@ public IcebergMergeSink(
this.locationProvider = requireNonNull(locationProvider, "locationProvider is null");
this.fileWriterFactory = requireNonNull(fileWriterFactory, "fileWriterFactory is null");
this.fileSystem = requireNonNull(fileSystem, "fileSystem is null");
this.catalog = requireNonNull(catalog, "catalog is null");
this.tableName = requireNonNull(tableName, "tableName is null");
this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null");
this.session = requireNonNull(session, "session is null");
this.formatVersion = formatVersion;
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.storageProperties = ImmutableMap.copyOf(requireNonNull(storageProperties, "storageProperties is null"));
this.schema = requireNonNull(schema, "schema is null");
Expand Down Expand Up @@ -125,6 +136,7 @@ public CompletableFuture<Collection<Slice>> finish()
{
List<Slice> fragments = new ArrayList<>(insertPageSink.finish().join());

// TODO V3 doesn't allow several DV files par a data file
fileDeletions.forEach((dataFilePath, deletion) -> {
PositionDeleteWriter writer = createPositionDeleteWriter(
dataFilePath.toStringUtf8(),
Expand Down Expand Up @@ -154,6 +166,8 @@ private PositionDeleteWriter createPositionDeleteWriter(String dataFilePath, Par
}

return new PositionDeleteWriter(
catalog,
tableName,
dataFilePath,
partitionSpec,
partitionData,
Expand All @@ -162,6 +176,7 @@ private PositionDeleteWriter createPositionDeleteWriter(String dataFilePath, Par
fileSystem,
jsonCodec,
session,
formatVersion,
fileFormat,
storageProperties);
}
Expand Down
Loading
Loading