Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor HiveMetastore API and Implementations #4748

Merged
merged 1 commit into from
Aug 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions presto-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@
<artifactId>aws-java-sdk-sts</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"hive.optimized-reader.enabled",
"hive.rcfile-optimized-writer.enabled",
"hive.time-zone",
"hive.assume-canonical-partition-keys",
})
public class HiveConfig
{
Expand Down Expand Up @@ -91,8 +92,6 @@ public class HiveConfig
private String parquetTimeZone = TimeZone.getDefault().getID();
private boolean useParquetColumnNames;

private boolean assumeCanonicalPartitionKeys;

private String rcfileTimeZone = TimeZone.getDefault().getID();
private boolean rcfileWriterValidate;

Expand Down Expand Up @@ -538,18 +537,6 @@ public HiveConfig setRcfileWriterValidate(boolean rcfileWriterValidate)
return this;
}

public boolean isAssumeCanonicalPartitionKeys()
{
return assumeCanonicalPartitionKeys;
}

@Config("hive.assume-canonical-partition-keys")
public HiveConfig setAssumeCanonicalPartitionKeys(boolean assumeCanonicalPartitionKeys)
{
this.assumeCanonicalPartitionKeys = assumeCanonicalPartitionKeys;
return this;
}

@MinDataSize("1B")
@MaxDataSize("1GB")
@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.prestosql.plugin.hive.metastore.Table;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.connector.TableNotFoundException;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.security.RoleGrant;
import io.prestosql.spi.statistics.ColumnStatisticType;
import io.prestosql.spi.type.Type;
Expand Down Expand Up @@ -185,14 +186,14 @@ public Optional<Partition> getPartition(HiveIdentity identity, String databaseNa
.flatMap(table -> delegate.getPartition(identity, table, partitionValues));
}

public Optional<List<String>> getPartitionNames(HiveIdentity identity, String databaseName, String tableName)
public Optional<List<String>> getPartitionNamesByFilter(
HiveIdentity identity,
String databaseName,
String tableName,
List<String> columnNames,
TupleDomain<String> partitionKeysFilter)
{
return delegate.getPartitionNames(identity, databaseName, tableName);
}

public Optional<List<String>> getPartitionNamesByParts(HiveIdentity identity, String databaseName, String tableName, List<String> parts)
{
return delegate.getPartitionNamesByParts(identity, databaseName, tableName, parts);
return delegate.getPartitionNamesByFilter(identity, databaseName, tableName, columnNames, partitionKeysFilter);
}

private List<Partition> getExistingPartitionsByNames(HiveIdentity identity, Table table, List<String> partitionNames)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.airlift.slice.Slice;
import io.prestosql.plugin.hive.authentication.HiveIdentity;
import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore;
import io.prestosql.plugin.hive.metastore.Table;
Expand All @@ -32,72 +31,48 @@
import io.prestosql.spi.predicate.Domain;
import io.prestosql.spi.predicate.NullableValue;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.type.BigintType;
import io.prestosql.spi.type.BooleanType;
import io.prestosql.spi.type.CharType;
import io.prestosql.spi.type.DateType;
import io.prestosql.spi.type.DecimalType;
import io.prestosql.spi.type.Decimals;
import io.prestosql.spi.type.DoubleType;
import io.prestosql.spi.type.IntegerType;
import io.prestosql.spi.type.RealType;
import io.prestosql.spi.type.SmallintType;
import io.prestosql.spi.type.TimestampType;
import io.prestosql.spi.type.TinyintType;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.VarcharType;
import org.apache.hadoop.hive.common.FileUtils;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;

import javax.inject.Inject;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_EXCEEDED_PARTITION_LIMIT;
import static io.prestosql.plugin.hive.metastore.MetastoreUtil.computePartitionKeyFilter;
import static io.prestosql.plugin.hive.metastore.MetastoreUtil.toPartitionName;
import static io.prestosql.plugin.hive.util.HiveBucketing.getHiveBucketFilter;
import static io.prestosql.plugin.hive.util.HiveUtil.parsePartitionValue;
import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.prestosql.spi.predicate.TupleDomain.none;
import static io.prestosql.spi.type.Chars.padSpaces;
import static java.lang.String.format;
import static java.util.stream.Collectors.toList;

public class HivePartitionManager
{
private static final String PARTITION_VALUE_WILDCARD = "";

private final int maxPartitions;
private final boolean assumeCanonicalPartitionKeys;
private final int domainCompactionThreshold;

@Inject
public HivePartitionManager(HiveConfig hiveConfig)
{
this(
hiveConfig.getMaxPartitionsPerScan(),
hiveConfig.isAssumeCanonicalPartitionKeys(),
hiveConfig.getDomainCompactionThreshold());
}

public HivePartitionManager(
int maxPartitions,
boolean assumeCanonicalPartitionKeys,
int domainCompactionThreshold)
{
checkArgument(maxPartitions >= 1, "maxPartitions must be at least 1");
this.maxPartitions = maxPartitions;
this.assumeCanonicalPartitionKeys = assumeCanonicalPartitionKeys;
checkArgument(domainCompactionThreshold >= 1, "domainCompactionThreshold must be at least 1");
this.domainCompactionThreshold = domainCompactionThreshold;
}
Expand Down Expand Up @@ -268,68 +243,12 @@ public static boolean partitionMatches(List<HiveColumnHandle> partitionColumns,

private List<String> getFilteredPartitionNames(SemiTransactionalHiveMetastore metastore, HiveIdentity identity, SchemaTableName tableName, List<HiveColumnHandle> partitionKeys, TupleDomain<ColumnHandle> effectivePredicate)
{
checkArgument(effectivePredicate.getDomains().isPresent());

List<String> filter = new ArrayList<>();
for (HiveColumnHandle partitionKey : partitionKeys) {
Domain domain = effectivePredicate.getDomains().get().get(partitionKey);
if (domain != null && domain.isNullableSingleValue()) {
Object value = domain.getNullableSingleValue();
Type type = domain.getType();
if (value == null) {
filter.add(HivePartitionKey.HIVE_DEFAULT_DYNAMIC_PARTITION);
}
else if (type instanceof CharType) {
Slice slice = (Slice) value;
filter.add(padSpaces(slice, (CharType) type).toStringUtf8());
}
else if (type instanceof VarcharType) {
Slice slice = (Slice) value;
filter.add(slice.toStringUtf8());
}
// Types above this have only a single possible representation for each value.
// Types below this may have multiple representations for a single value. For
// example, a boolean column may represent the false value as "0", "false" or "False".
// The metastore distinguishes between these representations, so we cannot prune partitions
// unless we know that all partition values use the canonical Java representation.
else if (!assumeCanonicalPartitionKeys) {
filter.add(PARTITION_VALUE_WILDCARD);
}
else if (type instanceof DecimalType && !((DecimalType) type).isShort()) {
Slice slice = (Slice) value;
filter.add(Decimals.toString(slice, ((DecimalType) type).getScale()));
}
else if (type instanceof DecimalType && ((DecimalType) type).isShort()) {
filter.add(Decimals.toString((long) value, ((DecimalType) type).getScale()));
}
else if (type instanceof DateType) {
DateTimeFormatter dateTimeFormatter = ISODateTimeFormat.date().withZoneUTC();
filter.add(dateTimeFormatter.print(TimeUnit.DAYS.toMillis((long) value)));
}
else if (type instanceof TimestampType) {
// we don't have time zone info, so just add a wildcard
filter.add(PARTITION_VALUE_WILDCARD);
}
else if (type instanceof TinyintType
|| type instanceof SmallintType
|| type instanceof IntegerType
|| type instanceof BigintType
|| type instanceof DoubleType
|| type instanceof RealType
|| type instanceof BooleanType) {
filter.add(value.toString());
}
else {
throw new PrestoException(NOT_SUPPORTED, format("Unsupported partition key type: %s", type.getDisplayName()));
}
}
else {
filter.add(PARTITION_VALUE_WILDCARD);
}
}

List<String> columnNames = partitionKeys.stream()
.map(HiveColumnHandle::getName)
.collect(toImmutableList());
TupleDomain<String> partitionKeysFilter = computePartitionKeyFilter(partitionKeys, effectivePredicate);
// fetch the partition names
return metastore.getPartitionNamesByParts(identity, tableName.getSchemaName(), tableName.getTableName(), filter)
return metastore.getPartitionNamesByFilter(identity, tableName.getSchemaName(), tableName.getTableName(), columnNames, partitionKeysFilter)
.orElseThrow(() -> new TableNotFoundException(tableName));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.prestosql.plugin.hive.PartitionStatistics;
import io.prestosql.plugin.hive.authentication.HiveIdentity;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.security.RoleGrant;
import io.prestosql.spi.statistics.ColumnStatisticType;
import io.prestosql.spi.type.Type;
Expand Down Expand Up @@ -85,9 +86,17 @@ public interface HiveMetastore

Optional<Partition> getPartition(HiveIdentity identity, Table table, List<String> partitionValues);

Optional<List<String>> getPartitionNames(HiveIdentity identity, String databaseName, String tableName);

Optional<List<String>> getPartitionNamesByParts(HiveIdentity identity, String databaseName, String tableName, List<String> parts);
/**
* return a list of partition names where partitionKeysFilter is used as a hint to each implementation.
*
* @param databaseName the name of the database
* @param tableName the name of the table
* @param columnNames the list of partition column names
* @param partitionKeysFilter map of filters (Domain) for each partition column
* @return optionally, a list of strings where each entry is in the form of {key}={value}
* @see TupleDomain
*/
Optional<List<String>> getPartitionNamesByFilter(HiveIdentity identity, String databaseName, String tableName, List<String> columnNames, TupleDomain<String> partitionKeysFilter);

Map<String, Optional<Partition>> getPartitionsByNames(HiveIdentity identity, Table table, List<String> partitionNames);

Expand Down
Loading