Skip to content

Commit

Permalink
Avoid loading partitions eagerly in HiveSplitManager
Browse files Browse the repository at this point in the history
  • Loading branch information
arhimondr committed Oct 7, 2022
1 parent dd8dee5 commit 911d7ca
Show file tree
Hide file tree
Showing 11 changed files with 259 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ public S3SelectTestHelper(String host,
this.hiveConfig.getSplitLoaderConcurrency(),
this.hiveConfig.getMaxSplitsPerSecond(),
this.hiveConfig.getRecursiveDirWalkerEnabled(),
TESTING_TYPE_MANAGER);
TESTING_TYPE_MANAGER,
this.hiveConfig.getMaxPartitionsPerScan());

pageSourceProvider = new HivePageSourceProvider(
TESTING_TYPE_MANAGER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
Expand All @@ -102,6 +104,7 @@
import static io.airlift.concurrent.MoreFutures.toListenableFuture;
import static io.trino.hdfs.ConfigurationUtils.toJobConf;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_EXCEEDED_PARTITION_LIMIT;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_BUCKET_FILES;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA;
Expand Down Expand Up @@ -167,6 +170,7 @@ public class BackgroundHiveSplitLoader
private final Deque<Iterator<InternalHiveSplit>> fileIterators = new ConcurrentLinkedDeque<>();
private final Optional<ValidWriteIdList> validWriteIds;
private final Optional<Long> maxSplitFileSize;
private final int maxPartitions;

// Purpose of this lock:
// * Write lock: when you need a consistent view across partitions, fileIterators, and hiveSplitSource.
Expand All @@ -188,10 +192,12 @@ public class BackgroundHiveSplitLoader
private HiveSplitSource hiveSplitSource;
private Stopwatch stopwatch;
private volatile boolean stopped;
private final AtomicInteger activeLoaderCount = new AtomicInteger();
private final AtomicInteger partitionCount = new AtomicInteger();

public BackgroundHiveSplitLoader(
Table table,
Iterable<HivePartitionMetadata> partitions,
Iterator<HivePartitionMetadata> partitions,
TupleDomain<? extends ColumnHandle> compactEffectivePredicate,
DynamicFilter dynamicFilter,
Duration dynamicFilteringWaitTimeout,
Expand All @@ -207,7 +213,8 @@ public BackgroundHiveSplitLoader(
boolean ignoreAbsentPartitions,
boolean optimizeSymlinkListing,
Optional<ValidWriteIdList> validWriteIds,
Optional<Long> maxSplitFileSize)
Optional<Long> maxSplitFileSize,
int maxPartitions)
{
this.table = table;
this.compactEffectivePredicate = compactEffectivePredicate;
Expand All @@ -224,22 +231,37 @@ public BackgroundHiveSplitLoader(
this.recursiveDirWalkerEnabled = recursiveDirWalkerEnabled;
this.ignoreAbsentPartitions = ignoreAbsentPartitions;
this.optimizeSymlinkListing = optimizeSymlinkListing;
requireNonNull(executor, "executor is null");
// direct executor is not supported in this implementation due to locking specifics
checkExecutorIsNotDirectExecutor(executor);
this.executor = executor;
this.partitions = new ConcurrentLazyQueue<>(partitions);
this.hdfsContext = new HdfsContext(session);
this.validWriteIds = requireNonNull(validWriteIds, "validWriteIds is null");
this.maxSplitFileSize = requireNonNull(maxSplitFileSize, "maxSplitFileSize is null");
this.maxPartitions = maxPartitions;
}

@Override
public void start(HiveSplitSource splitSource)
{
this.hiveSplitSource = splitSource;
this.stopwatch = Stopwatch.createStarted();
for (int i = 0; i < loaderConcurrency; i++) {
ListenableFuture<Void> future = ResumableTasks.submit(executor, new HiveSplitLoaderTask());
addExceptionCallback(future, hiveSplitSource::fail); // best effort; hiveSplitSource could be already completed
addLoaderIfNecessary();
}

private void addLoaderIfNecessary()
{
// opportunistic check to avoid incrementing indefinitely
if (activeLoaderCount.get() >= loaderConcurrency) {
return;
}
if (activeLoaderCount.incrementAndGet() > loaderConcurrency) {
return;
}
ListenableFuture<Void> future = ResumableTasks.submit(executor, new HiveSplitLoaderTask());
// best effort; hiveSplitSource could be already completed
addExceptionCallback(future, hiveSplitSource::fail);
}

@Override
Expand Down Expand Up @@ -348,9 +370,24 @@ private ListenableFuture<Void> loadSplits()
if (partition == null) {
return COMPLETED_FUTURE;
}
if (partitionCount.incrementAndGet() > maxPartitions) {
throw new TrinoException(HIVE_EXCEEDED_PARTITION_LIMIT, format(
"Query over table '%s' can potentially read more than %s partitions",
partition.getHivePartition().getTableName(),
maxPartitions));
}
// this is racy and sometimes more loaders can be added than necessary, but this is fine
if (!partitions.isEmpty()) {
addLoaderIfNecessary();
}
return loadPartition(partition);
}

// this is racy and sometimes more loaders can be added than necessary, but this is fine
if (!fileIterators.isEmpty()) {
addLoaderIfNecessary();
}

while (splits.hasNext() && !stopped) {
ListenableFuture<Void> future = hiveSplitSource.addToQueue(splits.next());
if (!future.isDone()) {
Expand Down Expand Up @@ -1040,4 +1077,16 @@ public boolean isTableBucketEnabled(int tableBucketNumber)
return bucketFilter.test(tableBucketNumber);
}
}

private static void checkExecutorIsNotDirectExecutor(Executor executor)
{
ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
executor.execute(() -> checkState(!lock.isHeldByCurrentThread(), "executor is a direct executor"));
}
finally {
lock.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@

import java.util.Iterator;

import static java.util.Objects.requireNonNull;

public class ConcurrentLazyQueue<E>
{
@GuardedBy("this")
private final Iterator<E> iterator;

public ConcurrentLazyQueue(Iterable<E> iterable)
public ConcurrentLazyQueue(Iterator<E> iterator)
{
this.iterator = iterable.iterator();
this.iterator = requireNonNull(iterator, "iterator is null");
}

public synchronized boolean isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
// Note that the computation is not persisted in the table handle, so can be redone many times
// TODO: https://github.com/trinodb/trino/issues/10980.
if (partitionManager.canPartitionsBeLoaded(partitionResult)) {
List<HivePartition> partitions = partitionManager.getPartitionsAsList(partitionResult);
List<HivePartition> partitions = ImmutableList.copyOf(partitionResult.getPartitions());
return hiveStatisticsProvider.getTableStatistics(session, hiveTableHandle.getSchemaTableName(), columns, columnTypes, partitions);
}
return TableStatistics.empty();
Expand Down Expand Up @@ -2724,17 +2724,20 @@ public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle
metastore.truncateUnpartitionedTable(session, handle.getSchemaName(), handle.getTableName());
}
else {
List<HivePartition> partitionsToBeDropped = partitionManager.getOrLoadPartitions(metastore, handle);
if (partitionsToBeDropped.size() > maxPartitionDropsPerQuery) {
throw new TrinoException(
NOT_SUPPORTED,
format(
"Failed to drop partitions. The number of partitions to be dropped (%s) is greater than the maximum allowed partitions (%s).",
partitionsToBeDropped.size(),
maxPartitionDropsPerQuery));
Iterator<HivePartition> partitions = partitionManager.getPartitions(metastore, handle);
List<String> partitionIds = new ArrayList<>();
while (partitions.hasNext()) {
partitionIds.add(partitions.next().getPartitionId());
if (partitionIds.size() > maxPartitionDropsPerQuery) {
throw new TrinoException(
NOT_SUPPORTED,
format(
"Failed to drop partitions. The number of partitions to be dropped is greater than the maximum allowed partitions (%s).",
maxPartitionDropsPerQuery));
}
}
for (HivePartition hivePartition : partitionsToBeDropped) {
metastore.dropPartition(session, handle.getSchemaName(), handle.getTableName(), toPartitionValues(hivePartition.getPartitionId()), true);
for (String partitionId : partitionIds) {
metastore.dropPartition(session, handle.getSchemaName(), handle.getTableName(), toPartitionValues(partitionId), true);
}
}
// it is too expensive to determine the exact number of deleted rows
Expand All @@ -2761,7 +2764,7 @@ public ConnectorTableProperties getTableProperties(ConnectorSession session, Con
// TODO: https://github.com/trinodb/trino/issues/10980.
HivePartitionResult partitionResult = partitionManager.getPartitions(metastore, table, new Constraint(hiveTable.getEnforcedConstraint()));
if (partitionManager.canPartitionsBeLoaded(partitionResult)) {
return Optional.of(partitionManager.getPartitionsAsList(partitionResult));
return Optional.of(ImmutableList.copyOf(partitionResult.getPartitions()));
}
return Optional.empty();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.google.common.collect.Sets;
import io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore;
import io.trino.plugin.hive.util.HiveBucketing.HiveBucketFilter;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.Constraint;
Expand All @@ -42,12 +41,10 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_EXCEEDED_PARTITION_LIMIT;
import static io.trino.plugin.hive.metastore.MetastoreUtil.computePartitionKeyFilter;
import static io.trino.plugin.hive.metastore.MetastoreUtil.toPartitionName;
import static io.trino.plugin.hive.util.HiveBucketing.getHiveBucketFilter;
import static io.trino.plugin.hive.util.HiveUtil.parsePartitionValue;
import static java.lang.String.format;
import static java.util.stream.Collectors.toList;

public class HivePartitionManager
Expand Down Expand Up @@ -154,25 +151,6 @@ public HivePartitionResult getPartitions(ConnectorTableHandle tableHandle, List<
return new HivePartitionResult(partitionColumns, Optional.empty(), partitionList, TupleDomain.all(), TupleDomain.all(), bucketHandle, Optional.empty());
}

public List<HivePartition> getPartitionsAsList(HivePartitionResult partitionResult)
{
ImmutableList.Builder<HivePartition> partitionList = ImmutableList.builder();
int count = 0;
Iterator<HivePartition> iterator = partitionResult.getPartitions();
while (iterator.hasNext()) {
HivePartition partition = iterator.next();
if (count == maxPartitions) {
throw new TrinoException(HIVE_EXCEEDED_PARTITION_LIMIT, format(
"Query over table '%s' can potentially read more than %s partitions",
partition.getTableName(),
maxPartitions));
}
partitionList.add(partition);
count++;
}
return partitionList.build();
}

public HiveTableHandle applyPartitionResult(HiveTableHandle handle, HivePartitionResult partitions, Constraint constraint)
{
Optional<List<String>> partitionNames = partitions.getPartitionNames();
Expand All @@ -184,9 +162,9 @@ public HiveTableHandle applyPartitionResult(HiveTableHandle handle, HivePartitio
// that can be applied on other join side (if the join is based on partition column),
// 2. If additional predicate is passed as a part of Constraint. (specified via loadPartition). This delays the partition checks
// until we have additional filtering based on Constraint
if (canPartitionsBeLoaded(partitions) || constraint.predicate().isPresent()) {
if (canPartitionsBeLoaded(partitions)) {
partitionNames = Optional.empty();
partitionList = Optional.of(getPartitionsAsList(partitions));
partitionList = Optional.of(ImmutableList.copyOf(partitions.getPartitions()));
List<HiveColumnHandle> partitionColumns = partitions.getPartitionColumns();
enforcedConstraint = partitions.getEffectivePredicate().filter((column, domain) -> partitionColumns.contains(column));
}
Expand All @@ -210,15 +188,14 @@ public HiveTableHandle applyPartitionResult(HiveTableHandle handle, HivePartitio
handle.getMaxScannedFileSize());
}

public List<HivePartition> getOrLoadPartitions(SemiTransactionalHiveMetastore metastore, HiveTableHandle table)
public Iterator<HivePartition> getPartitions(SemiTransactionalHiveMetastore metastore, HiveTableHandle table)
{
// In case of partitions not being loaded, their permissible values are specified in `HiveTableHandle#getCompactEffectivePredicate,
// so we do an intersection of getCompactEffectivePredicate and HiveTable's enforced constraint
TupleDomain<ColumnHandle> summary = table.getEnforcedConstraint().intersect(
table.getCompactEffectivePredicate()
.transformKeys(ColumnHandle.class::cast));
return table.getPartitions().orElseGet(() ->
getPartitionsAsList(getPartitions(metastore, table, new Constraint(summary))));
return table.getPartitions().map(List::iterator).orElseGet(() -> getPartitions(metastore, table, new Constraint(summary)).getPartitions());
}

public boolean canPartitionsBeLoaded(HivePartitionResult partitionResult)
Expand Down
Loading

0 comments on commit 911d7ca

Please sign in to comment.