Skip to content

Commit

Permalink
Respect Iceberg column metrics mode when writing
Browse files Browse the repository at this point in the history
  • Loading branch information
liqinrae authored and findepi committed Jan 19, 2022
1 parent 55432cf commit 835438a
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
import org.weakref.jmx.Managed;
Expand Down Expand Up @@ -110,13 +111,15 @@ public IcebergFileWriter createFileWriter(
JobConf jobConf,
ConnectorSession session,
HdfsContext hdfsContext,
FileFormat fileFormat)
FileFormat fileFormat,
MetricsConfig metricsConfig)
{
switch (fileFormat) {
case PARQUET:
// TODO use metricsConfig
return createParquetWriter(outputPath, icebergSchema, jobConf, session, hdfsContext);
case ORC:
return createOrcWriter(outputPath, icebergSchema, jobConf, session);
return createOrcWriter(metricsConfig, outputPath, icebergSchema, jobConf, session);
default:
throw new TrinoException(NOT_SUPPORTED, "File format not supported for Iceberg: " + fileFormat);
}
Expand Down Expand Up @@ -170,6 +173,7 @@ private IcebergFileWriter createParquetWriter(
}

private IcebergFileWriter createOrcWriter(
MetricsConfig metricsConfig,
Path outputPath,
Schema icebergSchema,
JobConf jobConf,
Expand Down Expand Up @@ -210,6 +214,7 @@ private IcebergFileWriter createOrcWriter(
}

return new IcebergOrcFileWriter(
metricsConfig,
icebergSchema,
orcDataSink,
rollbackAction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,15 @@
import io.trino.plugin.hive.orc.OrcFileWriter;
import io.trino.spi.type.Type;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.MetricsModes;
import org.apache.iceberg.MetricsUtil;
import org.apache.iceberg.Schema;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.BinaryUtil;
import org.apache.iceberg.util.UnicodeUtil;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
Expand All @@ -65,8 +71,10 @@ public class IcebergOrcFileWriter
{
private final Schema icebergSchema;
private final ColumnMetadata<OrcType> orcColumns;
private final MetricsConfig metricsConfig;

public IcebergOrcFileWriter(
MetricsConfig metricsConfig,
Schema icebergSchema,
OrcDataSink orcDataSink,
Callable<Void> rollbackAction,
Expand All @@ -83,16 +91,17 @@ public IcebergOrcFileWriter(
{
super(orcDataSink, WriterKind.INSERT, NO_ACID_TRANSACTION, false, OptionalInt.empty(), rollbackAction, columnNames, fileColumnTypes, fileColumnOrcTypes, compression, options, fileInputColumnIndexes, metadata, validationInputFactory, validationMode, stats);
this.icebergSchema = requireNonNull(icebergSchema, "icebergSchema is null");
this.metricsConfig = requireNonNull(metricsConfig, "metricsConfig is null");
orcColumns = fileColumnOrcTypes;
}

@Override
public Metrics getMetrics()
{
return computeMetrics(icebergSchema, orcColumns, orcWriter.getFileRowCount(), orcWriter.getFileStats());
return computeMetrics(metricsConfig, icebergSchema, orcColumns, orcWriter.getFileRowCount(), orcWriter.getFileStats());
}

private static Metrics computeMetrics(Schema icebergSchema, ColumnMetadata<OrcType> orcColumns, long fileRowCount, Optional<ColumnMetadata<ColumnStatistics>> columnStatistics)
private static Metrics computeMetrics(MetricsConfig metricsConfig, Schema icebergSchema, ColumnMetadata<OrcType> orcColumns, long fileRowCount, Optional<ColumnMetadata<ColumnStatistics>> columnStatistics)
{
if (columnStatistics.isEmpty()) {
return new Metrics(fileRowCount, null, null, null, null, null);
Expand All @@ -118,15 +127,22 @@ private static Metrics computeMetrics(Schema icebergSchema, ColumnMetadata<OrcTy
ColumnStatistics orcColumnStats = columnStatistics.get().get(orcColumnId);
int icebergId = getIcebergId(orcColumn);
Types.NestedField icebergField = icebergSchema.findField(icebergId);
MetricsModes.MetricsMode metricsMode = MetricsUtil.metricsMode(icebergSchema, metricsConfig, icebergId);
if (metricsMode.equals(MetricsModes.None.get())) {
continue;
}
verify(icebergField != null, "Cannot find Iceberg column with ID %s in schema %s", icebergId, icebergSchema);
valueCountsBuilder.put(icebergId, fileRowCount);
if (orcColumnStats.hasNumberOfValues()) {
nullCountsBuilder.put(icebergId, fileRowCount - orcColumnStats.getNumberOfValues());
}
toIcebergMinMax(orcColumnStats, icebergField.type()).ifPresent(minMax -> {
lowerBoundsBuilder.put(icebergId, minMax.getMin());
upperBoundsBuilder.put(icebergId, minMax.getMax());
});

if (!metricsMode.equals(MetricsModes.Counts.get())) {
toIcebergMinMax(orcColumnStats, icebergField.type(), metricsMode).ifPresent(minMax -> {
lowerBoundsBuilder.put(icebergId, minMax.getMin());
upperBoundsBuilder.put(icebergId, minMax.getMax());
});
}
}
Map<Integer, Long> valueCounts = valueCountsBuilder.buildOrThrow();
Map<Integer, Long> nullCounts = nullCountsBuilder.buildOrThrow();
Expand Down Expand Up @@ -178,13 +194,13 @@ private static int getIcebergId(OrcType orcColumn)
return Integer.parseInt(icebergId);
}

private static Optional<IcebergMinMax> toIcebergMinMax(ColumnStatistics orcColumnStats, org.apache.iceberg.types.Type icebergType)
private static Optional<IcebergMinMax> toIcebergMinMax(ColumnStatistics orcColumnStats, org.apache.iceberg.types.Type icebergType, MetricsModes.MetricsMode metricsModes)
{
BooleanStatistics booleanStatistics = orcColumnStats.getBooleanStatistics();
if (booleanStatistics != null) {
boolean hasTrueValues = booleanStatistics.getTrueValueCount() != 0;
boolean hasFalseValues = orcColumnStats.getNumberOfValues() != booleanStatistics.getTrueValueCount();
return Optional.of(new IcebergMinMax(icebergType, !hasFalseValues, hasTrueValues));
return Optional.of(new IcebergMinMax(icebergType, !hasFalseValues, hasTrueValues, metricsModes));
}

IntegerStatistics integerStatistics = orcColumnStats.getIntegerStatistics();
Expand All @@ -198,7 +214,7 @@ private static Optional<IcebergMinMax> toIcebergMinMax(ColumnStatistics orcColum
min = toIntExact((Long) min);
max = toIntExact((Long) max);
}
return Optional.of(new IcebergMinMax(icebergType, min, max));
return Optional.of(new IcebergMinMax(icebergType, min, max, metricsModes));
}
DoubleStatistics doubleStatistics = orcColumnStats.getDoubleStatistics();
if (doubleStatistics != null) {
Expand All @@ -211,7 +227,7 @@ private static Optional<IcebergMinMax> toIcebergMinMax(ColumnStatistics orcColum
min = ((Double) min).floatValue();
max = ((Double) max).floatValue();
}
return Optional.of(new IcebergMinMax(icebergType, min, max));
return Optional.of(new IcebergMinMax(icebergType, min, max, metricsModes));
}
StringStatistics stringStatistics = orcColumnStats.getStringStatistics();
if (stringStatistics != null) {
Expand All @@ -220,7 +236,7 @@ private static Optional<IcebergMinMax> toIcebergMinMax(ColumnStatistics orcColum
if (min == null || max == null) {
return Optional.empty();
}
return Optional.of(new IcebergMinMax(icebergType, min.toStringUtf8(), max.toStringUtf8()));
return Optional.of(new IcebergMinMax(icebergType, min.toStringUtf8(), max.toStringUtf8(), metricsModes));
}
DateStatistics dateStatistics = orcColumnStats.getDateStatistics();
if (dateStatistics != null) {
Expand All @@ -229,7 +245,7 @@ private static Optional<IcebergMinMax> toIcebergMinMax(ColumnStatistics orcColum
if (min == null || max == null) {
return Optional.empty();
}
return Optional.of(new IcebergMinMax(icebergType, min, max));
return Optional.of(new IcebergMinMax(icebergType, min, max, metricsModes));
}
DecimalStatistics decimalStatistics = orcColumnStats.getDecimalStatistics();
if (decimalStatistics != null) {
Expand All @@ -240,7 +256,7 @@ private static Optional<IcebergMinMax> toIcebergMinMax(ColumnStatistics orcColum
}
min = min.setScale(((Types.DecimalType) icebergType).scale());
max = max.setScale(((Types.DecimalType) icebergType).scale());
return Optional.of(new IcebergMinMax(icebergType, min, max));
return Optional.of(new IcebergMinMax(icebergType, min, max, metricsModes));
}
TimestampStatistics timestampStatistics = orcColumnStats.getTimestampStatistics();
if (timestampStatistics != null) {
Expand All @@ -251,7 +267,7 @@ private static Optional<IcebergMinMax> toIcebergMinMax(ColumnStatistics orcColum
}
// Since ORC timestamp statistics are truncated to millisecond precision, this can cause some column values to fall outside the stats range.
// We are appending 999 microseconds to account for the fact that Trino ORC writer truncates timestamps.
return Optional.of(new IcebergMinMax(icebergType, min * MICROSECONDS_PER_MILLISECOND, (max * MICROSECONDS_PER_MILLISECOND) + (MICROSECONDS_PER_MILLISECOND - 1)));
return Optional.of(new IcebergMinMax(icebergType, min * MICROSECONDS_PER_MILLISECOND, (max * MICROSECONDS_PER_MILLISECOND) + (MICROSECONDS_PER_MILLISECOND - 1), metricsModes));
}
return Optional.empty();
}
Expand All @@ -261,10 +277,33 @@ private static class IcebergMinMax
private ByteBuffer min;
private ByteBuffer max;

private IcebergMinMax(org.apache.iceberg.types.Type type, Object min, Object max)
private IcebergMinMax(org.apache.iceberg.types.Type type, Object min, Object max, MetricsModes.MetricsMode metricsMode)
{
this.min = Conversions.toByteBuffer(type, min);
this.max = Conversions.toByteBuffer(type, max);
if (metricsMode instanceof MetricsModes.Full) {
this.min = Conversions.toByteBuffer(type, min);
this.max = Conversions.toByteBuffer(type, max);
}
else if (metricsMode instanceof MetricsModes.Truncate) {
MetricsModes.Truncate truncateMode = (MetricsModes.Truncate) metricsMode;
int truncateLength = truncateMode.length();
switch (type.typeId()) {
case STRING:
this.min = UnicodeUtil.truncateStringMin(Literal.of((CharSequence) min), truncateLength).toByteBuffer();
this.max = UnicodeUtil.truncateStringMax(Literal.of((CharSequence) max), truncateLength).toByteBuffer();
break;
case FIXED:
case BINARY:
this.min = BinaryUtil.truncateBinaryMin(Literal.of((ByteBuffer) min), truncateLength).toByteBuffer();
this.max = BinaryUtil.truncateBinaryMax(Literal.of((ByteBuffer) max), truncateLength).toByteBuffer();
break;
default:
this.min = Conversions.toByteBuffer(type, min);
this.max = Conversions.toByteBuffer(type, max);
}
}
else {
throw new UnsupportedOperationException("Unsupported metrics mode for Iceberg Max/Min Bound: " + metricsMode);
}
}

public ByteBuffer getMin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -95,6 +96,7 @@ public class IcebergPageSink
private final JsonCodec<CommitTaskData> jsonCodec;
private final ConnectorSession session;
private final FileFormat fileFormat;
private final MetricsConfig metricsConfig;
private final PagePartitioner pagePartitioner;

private final List<WriteContext> writers = new ArrayList<>();
Expand All @@ -115,6 +117,7 @@ public IcebergPageSink(
JsonCodec<CommitTaskData> jsonCodec,
ConnectorSession session,
FileFormat fileFormat,
Map<String, String> storageProperties,
int maxOpenWriters)
{
requireNonNull(inputColumns, "inputColumns is null");
Expand All @@ -128,6 +131,7 @@ public IcebergPageSink(
this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null");
this.session = requireNonNull(session, "session is null");
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.metricsConfig = MetricsConfig.fromProperties(requireNonNull(storageProperties, "storageProperties is null"));
this.maxOpenWriters = maxOpenWriters;
this.pagePartitioner = new PagePartitioner(pageIndexerFactory, toPartitionColumns(inputColumns, partitionSpec));
}
Expand Down Expand Up @@ -312,7 +316,8 @@ private WriteContext createWriter(Optional<PartitionData> partitionData)
jobConf,
session,
hdfsContext,
fileFormat);
fileFormat,
metricsConfig);

return new WriteContext(writer, outputPath, partitionData);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ private ConnectorPageSink createPageSink(ConnectorSession session, IcebergWritab
jsonCodec,
session,
tableHandle.getFileFormat(),
tableHandle.getStorageProperties(),
maxOpenPartitions);
}

Expand Down Expand Up @@ -121,6 +122,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
jsonCodec,
session,
optimizeHandle.getFileFormat(),
optimizeHandle.getTableStorageProperties(),
maxOpenPartitions);
}

Expand Down
Loading

0 comments on commit 835438a

Please sign in to comment.