Skip to content

Commit

Permalink
Improve condition for ignoring bucketing for hive
Browse files Browse the repository at this point in the history
Don't ignore bucketing for queries that use the bucket column and consider the
bucket filter when calculating the total number of buckets
  • Loading branch information
rschlussel committed Sep 4, 2019
1 parent 3cdf680 commit d0d8850
Show file tree
Hide file tree
Showing 3 changed files with 541 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@

import static com.facebook.presto.hive.HiveBucketing.getHiveBucketFilter;
import static com.facebook.presto.hive.HiveBucketing.getHiveBucketHandle;
import static com.facebook.presto.hive.HiveColumnHandle.BUCKET_COLUMN_NAME;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_EXCEEDED_PARTITION_LIMIT;
import static com.facebook.presto.hive.HiveSessionProperties.getMaxBucketsForGroupedExecution;
import static com.facebook.presto.hive.HiveSessionProperties.shouldIgnoreTableBucketing;
Expand Down Expand Up @@ -159,7 +160,9 @@ public HivePartitionResult getPartitions(SemiTransactionalHiveMetastore metastor
Optional<HiveBucketHandle> hiveBucketHandle = shouldIgnoreTableBucketing ? Optional.empty() : getHiveBucketHandle(table);
Optional<HiveBucketFilter> bucketFilter = shouldIgnoreTableBucketing ? Optional.empty() : getHiveBucketFilter(table, effectivePredicate);

if (hiveBucketHandle.isPresent() && hiveBucketHandle.get().getReadBucketCount() * partitions.size() > getMaxBucketsForGroupedExecution(session)) {
if (!queryUsesHiveBucketColumn(effectivePredicate)
&& hiveBucketHandle.isPresent()
&& queryAccessesTooManyBuckets(hiveBucketHandle.get(), bucketFilter, partitions, session)) {
hiveBucketHandle = Optional.empty();
bucketFilter = Optional.empty();
}
Expand Down Expand Up @@ -187,6 +190,21 @@ public HivePartitionResult getPartitions(SemiTransactionalHiveMetastore metastor
return new HivePartitionResult(partitionColumns, partitions, compactEffectivePredicate, remainingTupleDomain, enforcedTupleDomain, hiveBucketHandle, bucketFilter);
}

private boolean queryUsesHiveBucketColumn(TupleDomain<ColumnHandle> effectivePredicate)
{
if (!effectivePredicate.getDomains().isPresent()) {
return false;
}
return effectivePredicate.getDomains().get().keySet().stream().anyMatch(key -> ((HiveColumnHandle) key).getName().equals(BUCKET_COLUMN_NAME));
}

private boolean queryAccessesTooManyBuckets(HiveBucketHandle handle, Optional<HiveBucketFilter> filter, List<HivePartition> partitions, ConnectorSession session)
{
int bucketsPerPartition = filter.map(hiveBucketFilter -> hiveBucketFilter.getBucketsToKeep().size())
.orElseGet(handle::getReadBucketCount);
return bucketsPerPartition * partitions.size() > getMaxBucketsForGroupedExecution(session);
}

private List<HivePartition> getPartitionsAsList(Iterator<HivePartition> partitionsIterator)
{
ImmutableList.Builder<HivePartition> partitionList = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.facebook.presto.hive;

import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.PrestoTableType;
import com.facebook.presto.hive.metastore.Storage;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.predicate.Domain;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.type.StandardTypes;
import com.facebook.presto.spi.type.TestingTypeManager;
import com.facebook.presto.testing.TestingConnectorSession;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.List;
import java.util.Optional;

import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.PARTITION_KEY;
import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hive.HiveColumnHandle.bucketColumnHandle;
import static com.facebook.presto.hive.HiveStorageFormat.ORC;
import static com.facebook.presto.hive.HiveType.HIVE_INT;
import static com.facebook.presto.hive.HiveType.HIVE_STRING;
import static com.facebook.presto.hive.metastore.StorageFormat.fromHiveStorageFormat;
import static com.facebook.presto.spi.type.IntegerType.INTEGER;
import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature;
import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
import static io.airlift.slice.Slices.utf8Slice;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

public class TestHivePartitionManager
{
private static final String SCHEMA_NAME = "schema";
private static final String TABLE_NAME = "table";
private static final String USER_NAME = "user";
private static final String LOCATION = "somewhere/over/the/rainbow";
private static final Column PARTITION_COLUMN = new Column("ds", HIVE_STRING, Optional.empty());
private static final Column BUCKET_COLUMN = new Column("c1", HIVE_INT, Optional.empty());
private static final Table TABLE = new Table(
SCHEMA_NAME,
TABLE_NAME,
USER_NAME,
PrestoTableType.MANAGED_TABLE,
new Storage(fromHiveStorageFormat(ORC), LOCATION, Optional.of(new HiveBucketProperty(ImmutableList.of(BUCKET_COLUMN.getName()), 100, ImmutableList.of())), false, ImmutableMap.of()),
ImmutableList.of(BUCKET_COLUMN),
ImmutableList.of(PARTITION_COLUMN),
ImmutableMap.of(),
Optional.empty(),
Optional.empty());

private static final List<String> PARTITIONS = ImmutableList.of("ds=2019-07-23", "ds=2019-08-23");

private HivePartitionManager hivePartitionManager = new HivePartitionManager(new TestingTypeManager(), new HiveClientConfig());
private final TestingSemiTransactionalHiveMetastore metastore = TestingSemiTransactionalHiveMetastore.create();

@BeforeClass
public void setUp()
{
metastore.addTable(SCHEMA_NAME, TABLE_NAME, TABLE, PARTITIONS);
}

@Test
public void testUsesBucketingIfSmallEnough()
{
HiveTableHandle tableHandle = new HiveTableHandle(SCHEMA_NAME, TABLE_NAME);
HivePartitionResult result = hivePartitionManager.getPartitions(
metastore,
tableHandle,
Constraint.alwaysTrue(),
new TestingConnectorSession(new HiveSessionProperties(new HiveClientConfig(), new OrcFileWriterConfig(), new ParquetFileWriterConfig()).getSessionProperties()));
assertTrue(result.getBucketHandle().isPresent(), "bucketHandle is not present");
assertFalse(result.getBucketFilter().isPresent(), "bucketFilter is present");
}

@Test
public void testIgnoresBucketingWhenTooManyBuckets()
{
ConnectorSession session = new TestingConnectorSession(
new HiveSessionProperties(
new HiveClientConfig().setMaxBucketsForGroupedExecution(100),
new OrcFileWriterConfig(),
new ParquetFileWriterConfig())
.getSessionProperties());
HivePartitionResult result = hivePartitionManager.getPartitions(metastore, new HiveTableHandle(SCHEMA_NAME, TABLE_NAME), Constraint.alwaysTrue(), session);
assertFalse(result.getBucketHandle().isPresent(), "bucketHandle is present");
assertFalse(result.getBucketFilter().isPresent(), "bucketFilter is present");
}

@Test
public void testUsesBucketingWithPartitionFilters()
{
ConnectorSession session = new TestingConnectorSession(new HiveSessionProperties(new HiveClientConfig().setMaxBucketsForGroupedExecution(100), new OrcFileWriterConfig(), new ParquetFileWriterConfig()).getSessionProperties());
HiveTableHandle tableHandle = new HiveTableHandle(SCHEMA_NAME, TABLE_NAME);
HivePartitionResult result = hivePartitionManager.getPartitions(
metastore,
tableHandle,
new Constraint<>(TupleDomain.withColumnDomains(
ImmutableMap.of(
new HiveColumnHandle(
PARTITION_COLUMN.getName(),
PARTITION_COLUMN.getType(),
parseTypeSignature(StandardTypes.VARCHAR),
-1,
PARTITION_KEY,
Optional.empty()),
Domain.singleValue(VARCHAR, utf8Slice("2019-07-23"))))),
session);
assertTrue(result.getBucketHandle().isPresent(), "bucketHandle is not present");
assertFalse(result.getBucketFilter().isPresent(), "bucketFilter is present");
}

@Test
public void testUsesBucketingWithBucketFilters()
{
ConnectorSession session = new TestingConnectorSession(new HiveSessionProperties(new HiveClientConfig().setMaxBucketsForGroupedExecution(100), new OrcFileWriterConfig(), new ParquetFileWriterConfig()).getSessionProperties());
HiveTableHandle tableHandle = new HiveTableHandle(SCHEMA_NAME, TABLE_NAME);
HivePartitionResult result = hivePartitionManager.getPartitions(
metastore,
tableHandle,
new Constraint<>(TupleDomain.withColumnDomains(
ImmutableMap.of(
new HiveColumnHandle(
BUCKET_COLUMN.getName(),
BUCKET_COLUMN.getType(),
parseTypeSignature(StandardTypes.VARCHAR),
0,
REGULAR,
Optional.empty()),
Domain.singleValue(INTEGER, 1L)))),
session);
assertTrue(result.getBucketHandle().isPresent(), "bucketHandle is not present");
assertTrue(result.getBucketFilter().isPresent(), "bucketFilter is present");
}

@Test
public void testUsesBucketingWithBucketColumn()
{
ConnectorSession session = new TestingConnectorSession(new HiveSessionProperties(new HiveClientConfig().setMaxBucketsForGroupedExecution(1), new OrcFileWriterConfig(), new ParquetFileWriterConfig()).getSessionProperties());
HiveTableHandle tableHandle = new HiveTableHandle(SCHEMA_NAME, TABLE_NAME);
HivePartitionResult result = hivePartitionManager.getPartitions(
metastore,
tableHandle,
new Constraint<>(TupleDomain.withColumnDomains(
ImmutableMap.of(
bucketColumnHandle(),
Domain.singleValue(INTEGER, 1L)))),
session);
assertTrue(result.getBucketHandle().isPresent(), "bucketHandle is not present");
assertTrue(result.getBucketFilter().isPresent(), "bucketFilter is present");
}

@Test
public void testIgnoresBucketingWhenConfigured()
{
ConnectorSession session = new TestingConnectorSession(
new HiveSessionProperties(
new HiveClientConfig().setIgnoreTableBucketing(true),
new OrcFileWriterConfig(),
new ParquetFileWriterConfig())
.getSessionProperties());
HivePartitionResult result = hivePartitionManager.getPartitions(metastore, new HiveTableHandle(SCHEMA_NAME, TABLE_NAME), Constraint.alwaysTrue(), session);
assertFalse(result.getBucketHandle().isPresent(), "bucketHandle is present");
assertFalse(result.getBucketFilter().isPresent(), "bucketFilter is present");
}
}
Loading

0 comments on commit d0d8850

Please sign in to comment.