Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Metrics mode when creating/writing Iceberg tables #9938

Merged
merged 1 commit into from
Jan 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
import org.weakref.jmx.Managed;
Expand Down Expand Up @@ -110,13 +111,15 @@ public IcebergFileWriter createFileWriter(
JobConf jobConf,
ConnectorSession session,
HdfsContext hdfsContext,
FileFormat fileFormat)
FileFormat fileFormat,
MetricsConfig metricsConfig)
{
switch (fileFormat) {
case PARQUET:
// TODO use metricsConfig
return createParquetWriter(outputPath, icebergSchema, jobConf, session, hdfsContext);
case ORC:
return createOrcWriter(outputPath, icebergSchema, jobConf, session);
return createOrcWriter(metricsConfig, outputPath, icebergSchema, jobConf, session);
findepi marked this conversation as resolved.
Show resolved Hide resolved
default:
throw new TrinoException(NOT_SUPPORTED, "File format not supported for Iceberg: " + fileFormat);
}
Expand Down Expand Up @@ -170,6 +173,7 @@ private IcebergFileWriter createParquetWriter(
}

private IcebergFileWriter createOrcWriter(
MetricsConfig metricsConfig,
Path outputPath,
Schema icebergSchema,
JobConf jobConf,
Expand Down Expand Up @@ -210,6 +214,7 @@ private IcebergFileWriter createOrcWriter(
}

return new IcebergOrcFileWriter(
metricsConfig,
icebergSchema,
orcDataSink,
rollbackAction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,15 @@
import io.trino.plugin.hive.orc.OrcFileWriter;
import io.trino.spi.type.Type;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.MetricsModes;
import org.apache.iceberg.MetricsUtil;
import org.apache.iceberg.Schema;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.BinaryUtil;
import org.apache.iceberg.util.UnicodeUtil;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
Expand All @@ -65,8 +71,10 @@ public class IcebergOrcFileWriter
{
private final Schema icebergSchema;
private final ColumnMetadata<OrcType> orcColumns;
private final MetricsConfig metricsConfig;

public IcebergOrcFileWriter(
MetricsConfig metricsConfig,
Schema icebergSchema,
OrcDataSink orcDataSink,
Callable<Void> rollbackAction,
Expand All @@ -83,16 +91,17 @@ public IcebergOrcFileWriter(
{
super(orcDataSink, WriterKind.INSERT, NO_ACID_TRANSACTION, false, OptionalInt.empty(), rollbackAction, columnNames, fileColumnTypes, fileColumnOrcTypes, compression, options, fileInputColumnIndexes, metadata, validationInputFactory, validationMode, stats);
this.icebergSchema = requireNonNull(icebergSchema, "icebergSchema is null");
this.metricsConfig = requireNonNull(metricsConfig, "metricsConfig is null");
orcColumns = fileColumnOrcTypes;
}

@Override
public Metrics getMetrics()
{
return computeMetrics(icebergSchema, orcColumns, orcWriter.getFileRowCount(), orcWriter.getFileStats());
return computeMetrics(metricsConfig, icebergSchema, orcColumns, orcWriter.getFileRowCount(), orcWriter.getFileStats());
}

private static Metrics computeMetrics(Schema icebergSchema, ColumnMetadata<OrcType> orcColumns, long fileRowCount, Optional<ColumnMetadata<ColumnStatistics>> columnStatistics)
private static Metrics computeMetrics(MetricsConfig metricsConfig, Schema icebergSchema, ColumnMetadata<OrcType> orcColumns, long fileRowCount, Optional<ColumnMetadata<ColumnStatistics>> columnStatistics)
{
if (columnStatistics.isEmpty()) {
return new Metrics(fileRowCount, null, null, null, null, null);
Expand All @@ -118,15 +127,22 @@ private static Metrics computeMetrics(Schema icebergSchema, ColumnMetadata<OrcTy
ColumnStatistics orcColumnStats = columnStatistics.get().get(orcColumnId);
int icebergId = getIcebergId(orcColumn);
Types.NestedField icebergField = icebergSchema.findField(icebergId);
MetricsModes.MetricsMode metricsMode = MetricsUtil.metricsMode(icebergSchema, metricsConfig, icebergId);
if (metricsMode.equals(MetricsModes.None.get())) {
continue;
}
verify(icebergField != null, "Cannot find Iceberg column with ID %s in schema %s", icebergId, icebergSchema);
valueCountsBuilder.put(icebergId, fileRowCount);
if (orcColumnStats.hasNumberOfValues()) {
nullCountsBuilder.put(icebergId, fileRowCount - orcColumnStats.getNumberOfValues());
}
toIcebergMinMax(orcColumnStats, icebergField.type()).ifPresent(minMax -> {
lowerBoundsBuilder.put(icebergId, minMax.getMin());
upperBoundsBuilder.put(icebergId, minMax.getMax());
});

if (!metricsMode.equals(MetricsModes.Counts.get())) {
toIcebergMinMax(orcColumnStats, icebergField.type(), metricsMode).ifPresent(minMax -> {
lowerBoundsBuilder.put(icebergId, minMax.getMin());
upperBoundsBuilder.put(icebergId, minMax.getMax());
});
}
}
Map<Integer, Long> valueCounts = valueCountsBuilder.buildOrThrow();
Map<Integer, Long> nullCounts = nullCountsBuilder.buildOrThrow();
Expand Down Expand Up @@ -178,13 +194,13 @@ private static int getIcebergId(OrcType orcColumn)
return Integer.parseInt(icebergId);
}

private static Optional<IcebergMinMax> toIcebergMinMax(ColumnStatistics orcColumnStats, org.apache.iceberg.types.Type icebergType)
private static Optional<IcebergMinMax> toIcebergMinMax(ColumnStatistics orcColumnStats, org.apache.iceberg.types.Type icebergType, MetricsModes.MetricsMode metricsModes)
{
BooleanStatistics booleanStatistics = orcColumnStats.getBooleanStatistics();
if (booleanStatistics != null) {
boolean hasTrueValues = booleanStatistics.getTrueValueCount() != 0;
boolean hasFalseValues = orcColumnStats.getNumberOfValues() != booleanStatistics.getTrueValueCount();
return Optional.of(new IcebergMinMax(icebergType, !hasFalseValues, hasTrueValues));
return Optional.of(new IcebergMinMax(icebergType, !hasFalseValues, hasTrueValues, metricsModes));
}

IntegerStatistics integerStatistics = orcColumnStats.getIntegerStatistics();
Expand All @@ -198,7 +214,7 @@ private static Optional<IcebergMinMax> toIcebergMinMax(ColumnStatistics orcColum
min = toIntExact((Long) min);
max = toIntExact((Long) max);
}
return Optional.of(new IcebergMinMax(icebergType, min, max));
return Optional.of(new IcebergMinMax(icebergType, min, max, metricsModes));
}
DoubleStatistics doubleStatistics = orcColumnStats.getDoubleStatistics();
if (doubleStatistics != null) {
Expand All @@ -211,7 +227,7 @@ private static Optional<IcebergMinMax> toIcebergMinMax(ColumnStatistics orcColum
min = ((Double) min).floatValue();
max = ((Double) max).floatValue();
}
return Optional.of(new IcebergMinMax(icebergType, min, max));
return Optional.of(new IcebergMinMax(icebergType, min, max, metricsModes));
}
StringStatistics stringStatistics = orcColumnStats.getStringStatistics();
if (stringStatistics != null) {
Expand All @@ -220,7 +236,7 @@ private static Optional<IcebergMinMax> toIcebergMinMax(ColumnStatistics orcColum
if (min == null || max == null) {
return Optional.empty();
}
return Optional.of(new IcebergMinMax(icebergType, min.toStringUtf8(), max.toStringUtf8()));
return Optional.of(new IcebergMinMax(icebergType, min.toStringUtf8(), max.toStringUtf8(), metricsModes));
}
DateStatistics dateStatistics = orcColumnStats.getDateStatistics();
if (dateStatistics != null) {
Expand All @@ -229,7 +245,7 @@ private static Optional<IcebergMinMax> toIcebergMinMax(ColumnStatistics orcColum
if (min == null || max == null) {
return Optional.empty();
}
return Optional.of(new IcebergMinMax(icebergType, min, max));
return Optional.of(new IcebergMinMax(icebergType, min, max, metricsModes));
}
DecimalStatistics decimalStatistics = orcColumnStats.getDecimalStatistics();
if (decimalStatistics != null) {
Expand All @@ -240,7 +256,7 @@ private static Optional<IcebergMinMax> toIcebergMinMax(ColumnStatistics orcColum
}
min = min.setScale(((Types.DecimalType) icebergType).scale());
max = max.setScale(((Types.DecimalType) icebergType).scale());
return Optional.of(new IcebergMinMax(icebergType, min, max));
return Optional.of(new IcebergMinMax(icebergType, min, max, metricsModes));
}
TimestampStatistics timestampStatistics = orcColumnStats.getTimestampStatistics();
if (timestampStatistics != null) {
Expand All @@ -251,7 +267,7 @@ private static Optional<IcebergMinMax> toIcebergMinMax(ColumnStatistics orcColum
}
// Since ORC timestamp statistics are truncated to millisecond precision, this can cause some column values to fall outside the stats range.
// We are appending 999 microseconds to account for the fact that Trino ORC writer truncates timestamps.
return Optional.of(new IcebergMinMax(icebergType, min * MICROSECONDS_PER_MILLISECOND, (max * MICROSECONDS_PER_MILLISECOND) + (MICROSECONDS_PER_MILLISECOND - 1)));
return Optional.of(new IcebergMinMax(icebergType, min * MICROSECONDS_PER_MILLISECOND, (max * MICROSECONDS_PER_MILLISECOND) + (MICROSECONDS_PER_MILLISECOND - 1), metricsModes));
}
return Optional.empty();
}
Expand All @@ -261,10 +277,33 @@ private static class IcebergMinMax
private ByteBuffer min;
private ByteBuffer max;

private IcebergMinMax(org.apache.iceberg.types.Type type, Object min, Object max)
private IcebergMinMax(org.apache.iceberg.types.Type type, Object min, Object max, MetricsModes.MetricsMode metricsMode)
{
this.min = Conversions.toByteBuffer(type, min);
this.max = Conversions.toByteBuffer(type, max);
if (metricsMode instanceof MetricsModes.Full) {
this.min = Conversions.toByteBuffer(type, min);
this.max = Conversions.toByteBuffer(type, max);
}
else if (metricsMode instanceof MetricsModes.Truncate) {
MetricsModes.Truncate truncateMode = (MetricsModes.Truncate) metricsMode;
int truncateLength = truncateMode.length();
switch (type.typeId()) {
case STRING:
this.min = UnicodeUtil.truncateStringMin(Literal.of((CharSequence) min), truncateLength).toByteBuffer();
this.max = UnicodeUtil.truncateStringMax(Literal.of((CharSequence) max), truncateLength).toByteBuffer();
break;
case FIXED:
case BINARY:
this.min = BinaryUtil.truncateBinaryMin(Literal.of((ByteBuffer) min), truncateLength).toByteBuffer();
this.max = BinaryUtil.truncateBinaryMax(Literal.of((ByteBuffer) max), truncateLength).toByteBuffer();
break;
default:
this.min = Conversions.toByteBuffer(type, min);
this.max = Conversions.toByteBuffer(type, max);
}
}
else {
throw new UnsupportedOperationException("Unsupported metrics mode for Iceberg Max/Min Bound: " + metricsMode);
}
}

public ByteBuffer getMin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -95,6 +96,7 @@ public class IcebergPageSink
private final JsonCodec<CommitTaskData> jsonCodec;
private final ConnectorSession session;
private final FileFormat fileFormat;
private final MetricsConfig metricsConfig;
private final PagePartitioner pagePartitioner;

private final List<WriteContext> writers = new ArrayList<>();
Expand All @@ -115,6 +117,7 @@ public IcebergPageSink(
JsonCodec<CommitTaskData> jsonCodec,
ConnectorSession session,
FileFormat fileFormat,
Map<String, String> storageProperties,
int maxOpenWriters)
{
requireNonNull(inputColumns, "inputColumns is null");
Expand All @@ -128,6 +131,7 @@ public IcebergPageSink(
this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null");
this.session = requireNonNull(session, "session is null");
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.metricsConfig = MetricsConfig.fromProperties(requireNonNull(storageProperties, "storageProperties is null"));
findepi marked this conversation as resolved.
Show resolved Hide resolved
this.maxOpenWriters = maxOpenWriters;
this.pagePartitioner = new PagePartitioner(pageIndexerFactory, toPartitionColumns(inputColumns, partitionSpec));
}
Expand Down Expand Up @@ -312,7 +316,8 @@ private WriteContext createWriter(Optional<PartitionData> partitionData)
jobConf,
session,
hdfsContext,
fileFormat);
fileFormat,
metricsConfig);

return new WriteContext(writer, outputPath, partitionData);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ private ConnectorPageSink createPageSink(ConnectorSession session, IcebergWritab
jsonCodec,
session,
tableHandle.getFileFormat(),
tableHandle.getStorageProperties(),
findepi marked this conversation as resolved.
Show resolved Hide resolved
maxOpenPartitions);
}

Expand Down Expand Up @@ -121,6 +122,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
jsonCodec,
session,
optimizeHandle.getFileFormat(),
optimizeHandle.getTableStorageProperties(),
maxOpenPartitions);
}

Expand Down
Loading