Skip to content

Commit

Permalink
Refactor HiveMetastore API and Implementations
Browse files Browse the repository at this point in the history
This commit changes the HiveMetastore API to remove methods to methods
getPartionNames() and getPartitionNamesByParts() and replace with
getPartitionNamesByFilter(). Each implementation is updated as well as
all callsites. The previous behavior is maintained as far as
translation of any non-equal Domain values to the wildcard value.
A subsequent commit will implement translation of a Domain into a
proper Glue expression for the GlueHiveMetastore GetPartitions API in
the Glue Data Catalog.
  • Loading branch information
rash67 committed Aug 27, 2020
1 parent 1f3f481 commit 43843b2
Show file tree
Hide file tree
Showing 44 changed files with 766 additions and 395 deletions.
5 changes: 5 additions & 0 deletions presto-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@
<artifactId>aws-java-sdk-sts</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"hive.optimized-reader.enabled",
"hive.rcfile-optimized-writer.enabled",
"hive.time-zone",
"hive.assume-canonical-partition-keys",
})
public class HiveConfig
{
Expand Down Expand Up @@ -91,8 +92,6 @@ public class HiveConfig
private String parquetTimeZone = TimeZone.getDefault().getID();
private boolean useParquetColumnNames;

private boolean assumeCanonicalPartitionKeys;

private String rcfileTimeZone = TimeZone.getDefault().getID();
private boolean rcfileWriterValidate;

Expand Down Expand Up @@ -538,18 +537,6 @@ public HiveConfig setRcfileWriterValidate(boolean rcfileWriterValidate)
return this;
}

public boolean isAssumeCanonicalPartitionKeys()
{
return assumeCanonicalPartitionKeys;
}

@Config("hive.assume-canonical-partition-keys")
public HiveConfig setAssumeCanonicalPartitionKeys(boolean assumeCanonicalPartitionKeys)
{
this.assumeCanonicalPartitionKeys = assumeCanonicalPartitionKeys;
return this;
}

@MinDataSize("1B")
@MaxDataSize("1GB")
@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.prestosql.plugin.hive.metastore.Table;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.connector.TableNotFoundException;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.security.RoleGrant;
import io.prestosql.spi.statistics.ColumnStatisticType;
import io.prestosql.spi.type.Type;
Expand Down Expand Up @@ -185,14 +186,14 @@ public Optional<Partition> getPartition(HiveIdentity identity, String databaseNa
.flatMap(table -> delegate.getPartition(identity, table, partitionValues));
}

public Optional<List<String>> getPartitionNames(HiveIdentity identity, String databaseName, String tableName)
public Optional<List<String>> getPartitionNamesByFilter(
HiveIdentity identity,
String databaseName,
String tableName,
List<String> columnNames,
TupleDomain<String> partitionKeysFilter)
{
return delegate.getPartitionNames(identity, databaseName, tableName);
}

public Optional<List<String>> getPartitionNamesByParts(HiveIdentity identity, String databaseName, String tableName, List<String> parts)
{
return delegate.getPartitionNamesByParts(identity, databaseName, tableName, parts);
return delegate.getPartitionNamesByFilter(identity, databaseName, tableName, columnNames, partitionKeysFilter);
}

private List<Partition> getExistingPartitionsByNames(HiveIdentity identity, Table table, List<String> partitionNames)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.airlift.slice.Slice;
import io.prestosql.plugin.hive.authentication.HiveIdentity;
import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore;
import io.prestosql.plugin.hive.metastore.Table;
Expand All @@ -32,72 +31,48 @@
import io.prestosql.spi.predicate.Domain;
import io.prestosql.spi.predicate.NullableValue;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.type.BigintType;
import io.prestosql.spi.type.BooleanType;
import io.prestosql.spi.type.CharType;
import io.prestosql.spi.type.DateType;
import io.prestosql.spi.type.DecimalType;
import io.prestosql.spi.type.Decimals;
import io.prestosql.spi.type.DoubleType;
import io.prestosql.spi.type.IntegerType;
import io.prestosql.spi.type.RealType;
import io.prestosql.spi.type.SmallintType;
import io.prestosql.spi.type.TimestampType;
import io.prestosql.spi.type.TinyintType;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.VarcharType;
import org.apache.hadoop.hive.common.FileUtils;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;

import javax.inject.Inject;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_EXCEEDED_PARTITION_LIMIT;
import static io.prestosql.plugin.hive.metastore.MetastoreUtil.computePartitionKeyFilter;
import static io.prestosql.plugin.hive.metastore.MetastoreUtil.toPartitionName;
import static io.prestosql.plugin.hive.util.HiveBucketing.getHiveBucketFilter;
import static io.prestosql.plugin.hive.util.HiveUtil.parsePartitionValue;
import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.prestosql.spi.predicate.TupleDomain.none;
import static io.prestosql.spi.type.Chars.padSpaces;
import static java.lang.String.format;
import static java.util.stream.Collectors.toList;

public class HivePartitionManager
{
private static final String PARTITION_VALUE_WILDCARD = "";

private final int maxPartitions;
private final boolean assumeCanonicalPartitionKeys;
private final int domainCompactionThreshold;

@Inject
public HivePartitionManager(HiveConfig hiveConfig)
{
this(
hiveConfig.getMaxPartitionsPerScan(),
hiveConfig.isAssumeCanonicalPartitionKeys(),
hiveConfig.getDomainCompactionThreshold());
}

public HivePartitionManager(
int maxPartitions,
boolean assumeCanonicalPartitionKeys,
int domainCompactionThreshold)
{
checkArgument(maxPartitions >= 1, "maxPartitions must be at least 1");
this.maxPartitions = maxPartitions;
this.assumeCanonicalPartitionKeys = assumeCanonicalPartitionKeys;
checkArgument(domainCompactionThreshold >= 1, "domainCompactionThreshold must be at least 1");
this.domainCompactionThreshold = domainCompactionThreshold;
}
Expand Down Expand Up @@ -268,68 +243,12 @@ public static boolean partitionMatches(List<HiveColumnHandle> partitionColumns,

private List<String> getFilteredPartitionNames(SemiTransactionalHiveMetastore metastore, HiveIdentity identity, SchemaTableName tableName, List<HiveColumnHandle> partitionKeys, TupleDomain<ColumnHandle> effectivePredicate)
{
checkArgument(effectivePredicate.getDomains().isPresent());

List<String> filter = new ArrayList<>();
for (HiveColumnHandle partitionKey : partitionKeys) {
Domain domain = effectivePredicate.getDomains().get().get(partitionKey);
if (domain != null && domain.isNullableSingleValue()) {
Object value = domain.getNullableSingleValue();
Type type = domain.getType();
if (value == null) {
filter.add(HivePartitionKey.HIVE_DEFAULT_DYNAMIC_PARTITION);
}
else if (type instanceof CharType) {
Slice slice = (Slice) value;
filter.add(padSpaces(slice, (CharType) type).toStringUtf8());
}
else if (type instanceof VarcharType) {
Slice slice = (Slice) value;
filter.add(slice.toStringUtf8());
}
// Types above this have only a single possible representation for each value.
// Types below this may have multiple representations for a single value. For
// example, a boolean column may represent the false value as "0", "false" or "False".
// The metastore distinguishes between these representations, so we cannot prune partitions
// unless we know that all partition values use the canonical Java representation.
else if (!assumeCanonicalPartitionKeys) {
filter.add(PARTITION_VALUE_WILDCARD);
}
else if (type instanceof DecimalType && !((DecimalType) type).isShort()) {
Slice slice = (Slice) value;
filter.add(Decimals.toString(slice, ((DecimalType) type).getScale()));
}
else if (type instanceof DecimalType && ((DecimalType) type).isShort()) {
filter.add(Decimals.toString((long) value, ((DecimalType) type).getScale()));
}
else if (type instanceof DateType) {
DateTimeFormatter dateTimeFormatter = ISODateTimeFormat.date().withZoneUTC();
filter.add(dateTimeFormatter.print(TimeUnit.DAYS.toMillis((long) value)));
}
else if (type instanceof TimestampType) {
// we don't have time zone info, so just add a wildcard
filter.add(PARTITION_VALUE_WILDCARD);
}
else if (type instanceof TinyintType
|| type instanceof SmallintType
|| type instanceof IntegerType
|| type instanceof BigintType
|| type instanceof DoubleType
|| type instanceof RealType
|| type instanceof BooleanType) {
filter.add(value.toString());
}
else {
throw new PrestoException(NOT_SUPPORTED, format("Unsupported partition key type: %s", type.getDisplayName()));
}
}
else {
filter.add(PARTITION_VALUE_WILDCARD);
}
}

List<String> columnNames = partitionKeys.stream()
.map(HiveColumnHandle::getName)
.collect(toImmutableList());
TupleDomain<String> partitionKeysFilter = computePartitionKeyFilter(partitionKeys, effectivePredicate);
// fetch the partition names
return metastore.getPartitionNamesByParts(identity, tableName.getSchemaName(), tableName.getTableName(), filter)
return metastore.getPartitionNamesByFilter(identity, tableName.getSchemaName(), tableName.getTableName(), columnNames, partitionKeysFilter)
.orElseThrow(() -> new TableNotFoundException(tableName));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.prestosql.plugin.hive.PartitionStatistics;
import io.prestosql.plugin.hive.authentication.HiveIdentity;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.security.RoleGrant;
import io.prestosql.spi.statistics.ColumnStatisticType;
import io.prestosql.spi.type.Type;
Expand Down Expand Up @@ -85,9 +86,17 @@ public interface HiveMetastore

Optional<Partition> getPartition(HiveIdentity identity, Table table, List<String> partitionValues);

Optional<List<String>> getPartitionNames(HiveIdentity identity, String databaseName, String tableName);

Optional<List<String>> getPartitionNamesByParts(HiveIdentity identity, String databaseName, String tableName, List<String> parts);
/**
* return a list of partition names where partitionKeysFilter is used as a hint to each implementation.
*
* @param databaseName the name of the database
* @param tableName the name of the table
* @param columnNames the list of partition column names
* @param partitionKeysFilter map of filters (Domain) for each partition column
* @return optionally, a list of strings where each entry is in the form of {key}={value}
* @see TupleDomain
*/
Optional<List<String>> getPartitionNamesByFilter(HiveIdentity identity, String databaseName, String tableName, List<String> columnNames, TupleDomain<String> partitionKeysFilter);

Map<String, Optional<Partition>> getPartitionsByNames(HiveIdentity identity, Table table, List<String> partitionNames);

Expand Down
Loading

0 comments on commit 43843b2

Please sign in to comment.