Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read spark generated statistics in hive connector #16120

Merged
merged 2 commits into from
Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/src/main/sphinx/connector/hive.rst
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,8 @@ Property Name Description
``KERBEROS``. Default is ``NONE``.
* - ``hive.metastore.thrift.impersonation.enabled``
- Enable Hive metastore end user impersonation.
* - ``hive.metastore.thrift.use-spark-table-statistics-fallback``
- Enable usage of table statistics generated by Apache Spark when hive table statistics are not available
* - ``hive.metastore.thrift.delegation-token.cache-ttl``
- Time to live delegation token cache for metastore. Default is ``1h``.
* - ``hive.metastore.thrift.delegation-token.cache-maximum-size``
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.fromMetastoreApiTable;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.fromRolePrincipalGrants;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.fromTrinoPrincipalType;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.getBasicStatisticsWithSparkFallback;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.getHiveBasicStatistics;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.isAvroTableWithSchemaSet;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.parsePrivilege;
Expand Down Expand Up @@ -156,6 +157,7 @@ public class ThriftHiveMetastore
private final boolean deleteFilesOnDrop;
private final boolean translateHiveViews;
private final boolean assumeCanonicalPartitionKeys;
private final boolean useSparkTableStatisticsFallback;
private final ThriftMetastoreStats stats;
private final ExecutorService writeStatisticsExecutor;

Expand All @@ -172,6 +174,7 @@ public ThriftHiveMetastore(
boolean deleteFilesOnDrop,
boolean translateHiveViews,
boolean assumeCanonicalPartitionKeys,
boolean useSparkTableStatisticsFallback,
ThriftMetastoreStats stats,
ExecutorService writeStatisticsExecutor)
{
Expand All @@ -187,6 +190,7 @@ public ThriftHiveMetastore(
this.deleteFilesOnDrop = deleteFilesOnDrop;
this.translateHiveViews = translateHiveViews;
this.assumeCanonicalPartitionKeys = assumeCanonicalPartitionKeys;
this.useSparkTableStatisticsFallback = useSparkTableStatisticsFallback;
this.stats = requireNonNull(stats, "stats is null");
this.writeStatisticsExecutor = requireNonNull(writeStatisticsExecutor, "writeStatisticsExecutor is null");
}
Expand Down Expand Up @@ -325,7 +329,16 @@ public PartitionStatistics getTableStatistics(Table table)
List<String> dataColumns = table.getSd().getCols().stream()
.map(FieldSchema::getName)
.collect(toImmutableList());
HiveBasicStatistics basicStatistics = getHiveBasicStatistics(table.getParameters());
Map<String, String> parameters = table.getParameters();
HiveBasicStatistics basicStatistics = getHiveBasicStatistics(parameters);

if (useSparkTableStatisticsFallback && basicStatistics.getRowCount().isEmpty()) {
PartitionStatistics sparkTableStatistics = ThriftSparkMetastoreUtil.getTableStatistics(table);
if (sparkTableStatistics.getBasicStatistics().getRowCount().isPresent()) {
return sparkTableStatistics;
}
}

Map<String, HiveColumnStatistics> columnStatistics = getTableColumnStatistics(table.getDbName(), table.getTableName(), dataColumns, basicStatistics.getRowCount());
return new PartitionStatistics(basicStatistics, columnStatistics);
}
Expand Down Expand Up @@ -366,7 +379,12 @@ public Map<String, PartitionStatistics> getPartitionStatistics(Table table, List
Map<String, HiveBasicStatistics> partitionBasicStatistics = partitions.stream()
.collect(toImmutableMap(
partition -> makePartName(partitionColumns, partition.getValues()),
partition -> getHiveBasicStatistics(partition.getParameters())));
partition -> {
if (useSparkTableStatisticsFallback) {
return getBasicStatisticsWithSparkFallback(partition.getParameters());
}
return getHiveBasicStatistics(partition.getParameters());
}));
Map<String, OptionalLong> partitionRowCounts = partitionBasicStatistics.entrySet().stream()
.collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().getRowCount()));
Map<String, Map<String, HiveColumnStatistics>> partitionColumnStatistics = getPartitionColumnStatistics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class ThriftHiveMetastoreFactory
private final boolean deleteFilesOnDrop;
private final boolean translateHiveViews;
private final boolean assumeCanonicalPartitionKeys;
private final boolean useSparkTableStatisticsFallback;
private final ExecutorService writeStatisticsExecutor;
private final ThriftMetastoreStats stats = new ThriftMetastoreStats();

Expand All @@ -69,6 +70,7 @@ public ThriftHiveMetastoreFactory(
this.maxWaitForLock = thriftConfig.getMaxWaitForTransactionLock();

this.assumeCanonicalPartitionKeys = thriftConfig.isAssumeCanonicalPartitionKeys();
this.useSparkTableStatisticsFallback = thriftConfig.isUseSparkTableStatisticsFallback();
this.writeStatisticsExecutor = requireNonNull(writeStatisticsExecutor, "writeStatisticsExecutor is null");
}

Expand Down Expand Up @@ -101,6 +103,7 @@ public ThriftMetastore createMetastore(Optional<ConnectorIdentity> identity)
deleteFilesOnDrop,
translateHiveViews,
assumeCanonicalPartitionKeys,
useSparkTableStatisticsFallback,
stats,
writeStatisticsExecutor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class ThriftMetastoreConfig
private Duration maxBackoffDelay = RetryDriver.DEFAULT_SLEEP_TIME;
private Duration maxRetryTime = RetryDriver.DEFAULT_MAX_RETRY_TIME;
private boolean impersonationEnabled;
private boolean useSparkTableStatisticsFallback = true;
private Duration delegationTokenCacheTtl = new Duration(1, TimeUnit.HOURS); // The default lifetime in Hive is 7 days (metastore.cluster.delegation.token.max-lifetime)
private long delegationTokenCacheMaximumSize = 1000;
private boolean deleteFilesOnDrop;
Expand Down Expand Up @@ -158,6 +159,19 @@ public ThriftMetastoreConfig setImpersonationEnabled(boolean impersonationEnable
return this;
}

public boolean isUseSparkTableStatisticsFallback()
{
return useSparkTableStatisticsFallback;
}

@Config("hive.metastore.thrift.use-spark-table-statistics-fallback")
@ConfigDescription("Enable usage of table statistics generated by Apache Spark when hive table statistics are not available")
public ThriftMetastoreConfig setUseSparkTableStatisticsFallback(boolean useSparkTableStatisticsFallback)
{
this.useSparkTableStatisticsFallback = useSparkTableStatisticsFallback;
return this;
}

@NotNull
@MinDuration("0ms")
public Duration getDelegationTokenCacheTtl()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive.metastore.thrift;

import com.google.common.primitives.Doubles;
import com.google.common.primitives.Longs;

import javax.annotation.Nullable;

import java.math.BigDecimal;
import java.time.DateTimeException;
import java.time.LocalDate;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.OptionalLong;

final class ThriftMetastoreParameterParserUtils
{
private ThriftMetastoreParameterParserUtils() {}

static Optional<Boolean> toBoolean(@Nullable String parameterValue)
{
if (parameterValue == null) {
return Optional.empty();
}
Boolean value = Boolean.parseBoolean(parameterValue);
return Optional.of(value);
}

static OptionalLong toLong(@Nullable String parameterValue)
{
if (parameterValue == null) {
return OptionalLong.empty();
}
Long longValue = Longs.tryParse(parameterValue);
if (longValue == null || longValue < 0) {
return OptionalLong.empty();
}
return OptionalLong.of(longValue);
}

static OptionalDouble toDouble(@Nullable String parameterValue)
{
if (parameterValue == null) {
return OptionalDouble.empty();
}
Double doubleValue = Doubles.tryParse(parameterValue);
if (doubleValue == null || doubleValue < 0) {
return OptionalDouble.empty();
}
return OptionalDouble.of(doubleValue);
}

static Optional<BigDecimal> toDecimal(@Nullable String parameterValue)
{
if (parameterValue == null) {
return Optional.empty();
}
try {
BigDecimal decimal = new BigDecimal(parameterValue);
if (decimal.compareTo(BigDecimal.ZERO) < 0) {
return Optional.empty();
}
return Optional.of(decimal);
}
catch (NumberFormatException exception) {
return Optional.empty();
}
}

static Optional<LocalDate> toDate(@Nullable String parameterValue)
{
if (parameterValue == null) {
return Optional.empty();
}
try {
LocalDate date = LocalDate.parse(parameterValue);
return Optional.of(date);
}
catch (DateTimeException exception) {
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Streams;
import com.google.common.primitives.Longs;
import com.google.common.primitives.Shorts;
import io.trino.hive.thrift.metastore.BinaryColumnStatsData;
import io.trino.hive.thrift.metastore.BooleanColumnStatsData;
Expand Down Expand Up @@ -125,6 +124,8 @@
import static io.trino.plugin.hive.metastore.HivePrivilegeInfo.HivePrivilege.OWNERSHIP;
import static io.trino.plugin.hive.metastore.HivePrivilegeInfo.HivePrivilege.SELECT;
import static io.trino.plugin.hive.metastore.HivePrivilegeInfo.HivePrivilege.UPDATE;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreParameterParserUtils.toLong;
import static io.trino.plugin.hive.metastore.thrift.ThriftSparkMetastoreUtil.getSparkBasicStatistics;
import static io.trino.plugin.hive.type.Category.PRIMITIVE;
import static io.trino.spi.security.PrincipalType.ROLE;
import static io.trino.spi.security.PrincipalType.USER;
Expand All @@ -144,10 +145,10 @@

public final class ThriftMetastoreUtil
{
public static final String NUM_ROWS = "numRows";
private static final String PUBLIC_ROLE_NAME = "public";
private static final String ADMIN_ROLE_NAME = "admin";
private static final String NUM_FILES = "numFiles";
public static final String NUM_ROWS = "numRows";
private static final String RAW_DATA_SIZE = "rawDataSize";
private static final String TOTAL_SIZE = "totalSize";
private static final Set<String> STATS_PROPERTIES = ImmutableSet.of(NUM_FILES, NUM_ROWS, RAW_DATA_SIZE, TOTAL_SIZE);
Expand Down Expand Up @@ -742,23 +743,25 @@ public static Set<HivePrivilegeInfo> parsePrivilege(PrivilegeGrantInfo userGrant

public static HiveBasicStatistics getHiveBasicStatistics(Map<String, String> parameters)
{
OptionalLong numFiles = parse(parameters.get(NUM_FILES));
OptionalLong numRows = parse(parameters.get(NUM_ROWS));
OptionalLong inMemoryDataSizeInBytes = parse(parameters.get(RAW_DATA_SIZE));
OptionalLong onDiskDataSizeInBytes = parse(parameters.get(TOTAL_SIZE));
OptionalLong numFiles = toLong(parameters.get(NUM_FILES));
OptionalLong numRows = toLong(parameters.get(NUM_ROWS));
OptionalLong inMemoryDataSizeInBytes = toLong(parameters.get(RAW_DATA_SIZE));
OptionalLong onDiskDataSizeInBytes = toLong(parameters.get(TOTAL_SIZE));
return new HiveBasicStatistics(numFiles, numRows, inMemoryDataSizeInBytes, onDiskDataSizeInBytes);
}

private static OptionalLong parse(@Nullable String parameterValue)
public static HiveBasicStatistics getBasicStatisticsWithSparkFallback(Map<String, String> parameters)
{
if (parameterValue == null) {
return OptionalLong.empty();
}
Long longValue = Longs.tryParse(parameterValue);
if (longValue == null || longValue < 0) {
return OptionalLong.empty();
HiveBasicStatistics basicStatistics = getHiveBasicStatistics(parameters);
// Partitioned table without statistics
if (basicStatistics.getRowCount().isEmpty() || basicStatistics.getRowCount().getAsLong() == 0L) {
HiveBasicStatistics sparkBasicStatistics = getSparkBasicStatistics(parameters);
if (sparkBasicStatistics.getRowCount().isPresent()) {
return sparkBasicStatistics;
}
}
return OptionalLong.of(longValue);

return basicStatistics;
}

public static Map<String, String> updateStatisticsParameters(Map<String, String> parameters, HiveBasicStatistics statistics)
Expand Down
Loading