Skip to content

Commit

Permalink
Support row level deletes in the Iceberg connector
Browse files Browse the repository at this point in the history
Support removing individual rows by writing positional delete files
compatible with v2 of the Iceberg specification.

Co-authored-by: Jack Ye <[email protected]>
  • Loading branch information
2 people authored and findepi committed Apr 25, 2022
1 parent 061e5b5 commit d9fddb0
Show file tree
Hide file tree
Showing 18 changed files with 796 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.iceberg.FileContent;

import java.util.Optional;

Expand All @@ -24,21 +25,33 @@
public class CommitTaskData
{
private final String path;
private final IcebergFileFormat fileFormat;
private final long fileSizeInBytes;
private final MetricsWrapper metrics;
private final String partitionSpecJson;
private final Optional<String> partitionDataJson;
private final FileContent content;
private final Optional<String> referencedDataFile;

@JsonCreator
public CommitTaskData(
@JsonProperty("path") String path,
@JsonProperty("fileFormat") IcebergFileFormat fileFormat,
@JsonProperty("fileSizeInBytes") long fileSizeInBytes,
@JsonProperty("metrics") MetricsWrapper metrics,
@JsonProperty("partitionDataJson") Optional<String> partitionDataJson)
@JsonProperty("partitionSpecJson") String partitionSpecJson,
@JsonProperty("partitionDataJson") Optional<String> partitionDataJson,
@JsonProperty("content") FileContent content,
@JsonProperty("referencedDataFile") Optional<String> referencedDataFile)
{
this.path = requireNonNull(path, "path is null");
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.fileSizeInBytes = fileSizeInBytes;
this.metrics = requireNonNull(metrics, "metrics is null");
this.partitionSpecJson = requireNonNull(partitionSpecJson, "partitionSpecJson is null");
this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null");
this.content = requireNonNull(content, "content is null");
this.referencedDataFile = requireNonNull(referencedDataFile, "referencedDataFile is null");
checkArgument(fileSizeInBytes >= 0, "fileSizeInBytes is negative");
}

Expand All @@ -48,6 +61,12 @@ public String getPath()
return path;
}

@JsonProperty
public IcebergFileFormat getFileFormat()
{
return fileFormat;
}

@JsonProperty
public long getFileSizeInBytes()
{
Expand All @@ -60,9 +79,27 @@ public MetricsWrapper getMetrics()
return metrics;
}

@JsonProperty
public String getPartitionSpecJson()
{
return partitionSpecJson;
}

@JsonProperty
public Optional<String> getPartitionDataJson()
{
return partitionDataJson;
}

@JsonProperty
public FileContent getContent()
{
return content;
}

@JsonProperty
public Optional<String> getReferencedDataFile()
{
return referencedDataFile;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,13 @@
import static io.trino.plugin.iceberg.util.PrimitiveTypeMapBuilder.makeTypeMap;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.TableProperties.DEFAULT_WRITE_METRICS_MODE;
import static org.apache.iceberg.parquet.ParquetSchemaUtil.convert;

public class IcebergFileWriterFactory
{
private static final MetricsConfig FULL_METRICS_CONFIG = MetricsConfig.fromProperties(ImmutableMap.of(DEFAULT_WRITE_METRICS_MODE, "full"));

private final HdfsEnvironment hdfsEnvironment;
private final TypeManager typeManager;
private final NodeVersion nodeVersion;
Expand Down Expand Up @@ -104,7 +107,7 @@ public OrcWriterStats getOrcWriterStats()
return orcWriterStats;
}

public IcebergFileWriter createFileWriter(
public IcebergFileWriter createDataFileWriter(
Path outputPath,
Schema icebergSchema,
JobConf jobConf,
Expand All @@ -120,7 +123,25 @@ public IcebergFileWriter createFileWriter(
case ORC:
return createOrcWriter(metricsConfig, outputPath, icebergSchema, jobConf, session);
default:
throw new TrinoException(NOT_SUPPORTED, "File format not supported for Iceberg: " + fileFormat);
throw new TrinoException(NOT_SUPPORTED, "File format not supported: " + fileFormat);
}
}

public IcebergFileWriter createPositionDeleteWriter(
Path outputPath,
Schema icebergSchema,
JobConf jobConf,
ConnectorSession session,
HdfsContext hdfsContext,
IcebergFileFormat fileFormat)
{
switch (fileFormat) {
case PARQUET:
return createParquetWriter(outputPath, icebergSchema, jobConf, session, hdfsContext);
case ORC:
return createOrcWriter(FULL_METRICS_CONFIG, outputPath, icebergSchema, jobConf, session);
default:
throw new TrinoException(NOT_SUPPORTED, "File format not supported: " + fileFormat);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,17 @@
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.IsolationLevel;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
Expand Down Expand Up @@ -139,7 +143,6 @@
import static io.trino.plugin.hive.HiveApplyProjectionUtil.extractSupportedProjectedColumns;
import static io.trino.plugin.hive.HiveApplyProjectionUtil.replaceWithNewVariables;
import static io.trino.plugin.hive.util.HiveUtil.isStructuralType;
import static io.trino.plugin.iceberg.ColumnIdentity.primitiveColumnIdentity;
import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression;
import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnHandle;
import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnMetadata;
Expand Down Expand Up @@ -172,15 +175,18 @@
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.DELETE_ORPHAN_FILES;
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.EXPIRE_SNAPSHOTS;
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.OPTIMIZE;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
import static io.trino.spi.type.BigintType.BIGINT;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.joining;
import static org.apache.iceberg.MetadataColumns.ROW_POSITION;
import static org.apache.iceberg.ReachableFileUtil.metadataFileLocations;
import static org.apache.iceberg.ReachableFileUtil.versionHintLocation;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT;
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;

public class IcebergMetadata
Expand Down Expand Up @@ -240,26 +246,31 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa
return null;
}

Table table;
BaseTable table;
try {
table = catalog.loadTable(session, new SchemaTableName(tableName.getSchemaName(), name.getTableName()));
table = (BaseTable) catalog.loadTable(session, new SchemaTableName(tableName.getSchemaName(), name.getTableName()));
}
catch (TableNotFoundException e) {
return null;
}
Optional<Long> snapshotId = getSnapshotId(table, name.getSnapshotId());

String nameMappingJson = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
Map<String, String> tableProperties = table.properties();
String nameMappingJson = tableProperties.get(TableProperties.DEFAULT_NAME_MAPPING);
return new IcebergTableHandle(
tableName.getSchemaName(),
name.getTableName(),
name.getTableType(),
snapshotId,
SchemaParser.toJson(table.schema()),
table.operations().current().formatVersion(),
TupleDomain.all(),
TupleDomain.all(),
ImmutableSet.of(),
Optional.ofNullable(nameMappingJson));
Optional.ofNullable(nameMappingJson),
table.location(),
table.properties(),
NO_RETRIES);
}

@Override
Expand Down Expand Up @@ -714,12 +725,6 @@ private static String getLocation(String path)
return matcher.group(1);
}

@Override
public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
{
return new IcebergColumnHandle(primitiveColumnIdentity(0, "$row_id"), BIGINT, ImmutableList.of(), BIGINT, Optional.empty());
}

@Override
public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
ConnectorSession session,
Expand Down Expand Up @@ -1208,9 +1213,83 @@ public Optional<ConnectorTableHandle> applyDelete(ConnectorSession session, Conn
}

@Override
public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle, RetryMode retryMode)
{
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
if (table.getFormatVersion() < 2) {
throw new TrinoException(NOT_SUPPORTED, "Iceberg table updates require at least format version 2");
}
verify(transaction == null, "transaction already set");
transaction = catalog.loadTable(session, table.getSchemaTableName()).newTransaction();
return table.withRetryMode(retryMode);
}

@Override
public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHandle, Collection<Slice> fragments)
{
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
Table icebergTable = transaction.table();

List<CommitTaskData> commitTasks = fragments.stream()
.map(slice -> commitTaskCodec.fromJson(slice.getBytes()))
.collect(toImmutableList());

Schema schema = SchemaParser.fromJson(table.getTableSchemaJson());

RowDelta rowDelta = transaction.newRowDelta();
table.getSnapshotId().map(icebergTable::snapshot).ifPresent(s -> rowDelta.validateFromSnapshot(s.snapshotId()));
if (!table.getEnforcedPredicate().isAll()) {
rowDelta.conflictDetectionFilter(toIcebergExpression(table.getEnforcedPredicate()));
}

IsolationLevel isolationLevel = IsolationLevel.fromName(icebergTable.properties().getOrDefault(DELETE_ISOLATION_LEVEL, DELETE_ISOLATION_LEVEL_DEFAULT));
if (isolationLevel == IsolationLevel.SERIALIZABLE) {
rowDelta.validateNoConflictingDataFiles();
}

ImmutableSet.Builder<String> writtenFiles = ImmutableSet.builder();
ImmutableSet.Builder<String> referencedDataFiles = ImmutableSet.builder();
for (CommitTaskData task : commitTasks) {
if (task.getContent() != FileContent.POSITION_DELETES) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Iceberg finishDelete called with commit task that was not a position delete file");
}
PartitionSpec partitionSpec = PartitionSpecParser.fromJson(schema, task.getPartitionSpecJson());
Type[] partitionColumnTypes = partitionSpec.fields().stream()
.map(field -> field.transform().getResultType(icebergTable.schema().findType(field.sourceId())))
.toArray(Type[]::new);
FileMetadata.Builder deleteBuilder = FileMetadata.deleteFileBuilder(partitionSpec)
.withPath(task.getPath())
.withFormat(task.getFileFormat().toIceberg())
.ofPositionDeletes()
.withFileSizeInBytes(task.getFileSizeInBytes())
.withMetrics(task.getMetrics().metrics());

if (!partitionSpec.fields().isEmpty()) {
String partitionDataJson = task.getPartitionDataJson()
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
deleteBuilder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
}

rowDelta.addDeletes(deleteBuilder.build());
writtenFiles.add(task.getPath());
task.getReferencedDataFile().ifPresent(referencedDataFiles::add);
}

// try to leave as little garbage as possible behind
if (table.getRetryMode() != NO_RETRIES) {
cleanExtraOutputFiles(session, writtenFiles.build());
}

rowDelta.validateDataFilesExist(referencedDataFiles.build());
rowDelta.commit();
transaction.commitTransaction();
transaction = null;
}

@Override
public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
{
throw new TrinoException(NOT_SUPPORTED, "This connector only supports delete where one or more identity-transformed partitions are deleted entirely");
return IcebergUtil.getColumnHandle(ROW_POSITION, typeManager);
}

@Override
Expand Down Expand Up @@ -1266,7 +1345,7 @@ public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle
.deleteFromRowFilter(toIcebergExpression(handle.getEnforcedPredicate()))
.commit();

// TODO: it should be possible to return number of deleted records
// TODO: it should be possible to return number of deleted records. https://github.com/trinodb/trino/issues/12055
return OptionalLong.empty();
}

Expand Down Expand Up @@ -1313,10 +1392,14 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
table.getTableType(),
table.getSnapshotId(),
table.getTableSchemaJson(),
table.getFormatVersion(),
newUnenforcedConstraint,
newEnforcedConstraint,
table.getProjectedColumns(),
table.getNameMappingJson()),
table.getNameMappingJson(),
table.getTableLocation(),
table.getStorageProperties(),
table.getRetryMode()),
remainingConstraint.transformKeys(ColumnHandle.class::cast),
false));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.transforms.Transform;
Expand Down Expand Up @@ -79,6 +80,7 @@
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.iceberg.FileContent.DATA;

public class IcebergPageSink
implements ConnectorPageSink
Expand Down Expand Up @@ -312,9 +314,13 @@ private void closeWriter(WriteContext writeContext)

CommitTaskData task = new CommitTaskData(
writeContext.getPath().toString(),
fileFormat,
writeContext.getWriter().getWrittenBytes(),
new MetricsWrapper(writeContext.getWriter().getMetrics()),
writeContext.getPartitionData().map(PartitionData::toJson));
PartitionSpecParser.toJson(partitionSpec),
writeContext.getPartitionData().map(PartitionData::toJson),
DATA,
Optional.empty());

commitTasks.add(wrappedBuffer(jsonCodec.toJsonBytes(task)));

Expand All @@ -329,7 +335,7 @@ private WriteContext createWriter(Optional<PartitionData> partitionData)
Path outputPath = partitionData.map(partition -> new Path(locationProvider.newDataLocation(partitionSpec, partition, fileName)))
.orElse(new Path(locationProvider.newDataLocation(fileName)));

IcebergFileWriter writer = fileWriterFactory.createFileWriter(
IcebergFileWriter writer = fileWriterFactory.createDataFileWriter(
outputPath,
outputSchema,
jobConf,
Expand Down
Loading

0 comments on commit d9fddb0

Please sign in to comment.