Skip to content

Commit

Permalink
Extract function Iceberg::getPartitionValues
Browse files Browse the repository at this point in the history
  • Loading branch information
Dith3r authored and sopel39 committed Oct 4, 2023
1 parent 823de45 commit dcc2b73
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -79,9 +78,9 @@
import static io.trino.plugin.iceberg.IcebergMetadataColumn.isMetadataColumnId;
import static io.trino.plugin.iceberg.IcebergSplitManager.ICEBERG_DOMAIN_COMPACTION_THRESHOLD;
import static io.trino.plugin.iceberg.IcebergTypes.convertIcebergValueToTrino;
import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue;
import static io.trino.plugin.iceberg.IcebergUtil.getColumnHandle;
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys;
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionValues;
import static io.trino.plugin.iceberg.IcebergUtil.primitiveFieldTypes;
import static io.trino.plugin.iceberg.TypeConverter.toIcebergType;
import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone;
Expand Down Expand Up @@ -251,18 +250,7 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(int maxSize)
.map(fieldId -> getColumnHandle(fileSchema.findField(fieldId), typeManager))
.collect(toImmutableSet());

Supplier<Map<ColumnHandle, NullableValue>> partitionValues = memoize(() -> {
Map<ColumnHandle, NullableValue> bindings = new HashMap<>();
for (IcebergColumnHandle partitionColumn : identityPartitionColumns) {
Object partitionValue = deserializePartitionValue(
partitionColumn.getType(),
partitionKeys.get(partitionColumn.getId()).orElse(null),
partitionColumn.getName());
NullableValue bindingValue = new NullableValue(partitionColumn.getType(), partitionValue);
bindings.put(partitionColumn, bindingValue);
}
return bindings;
});
Supplier<Map<ColumnHandle, NullableValue>> partitionValues = memoize(() -> getPartitionValues(identityPartitionColumns, partitionKeys));

if (!dynamicFilterPredicate.isAll() && !dynamicFilterPredicate.equals(pushedDownDynamicFilterPredicate)) {
if (!partitionMatchesPredicate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
import io.trino.plugin.iceberg.catalog.TrinoCatalog;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.function.InvocationConvention;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.NullableValue;
import io.trino.spi.predicate.Range;
import io.trino.spi.predicate.ValueSet;
import io.trino.spi.type.DecimalType;
Expand Down Expand Up @@ -585,6 +587,22 @@ public static Map<Integer, Optional<String>> getPartitionKeys(StructLike partiti
return partitionKeys.buildOrThrow();
}

public static Map<ColumnHandle, NullableValue> getPartitionValues(
Set<IcebergColumnHandle> identityPartitionColumns,
Map<Integer, Optional<String>> partitionKeys)
{
ImmutableMap.Builder<ColumnHandle, NullableValue> bindings = ImmutableMap.builder();
for (IcebergColumnHandle partitionColumn : identityPartitionColumns) {
Object partitionValue = deserializePartitionValue(
partitionColumn.getType(),
partitionKeys.get(partitionColumn.getId()).orElse(null),
partitionColumn.getName());
NullableValue bindingValue = new NullableValue(partitionColumn.getType(), partitionValue);
bindings.put(partitionColumn, bindingValue);
}
return bindings.buildOrThrow();
}

public static LocationProvider getLocationProvider(SchemaTableName schemaTableName, String tableLocation, Map<String, String> storageProperties)
{
if (storageProperties.containsKey(WRITE_LOCATION_PROVIDER_IMPL)) {
Expand Down

0 comments on commit dcc2b73

Please sign in to comment.