Skip to content

Commit

Permalink
Support lazy dynamic filtering in hive connector
Browse files Browse the repository at this point in the history
  • Loading branch information
raunaqmorarka committed Sep 8, 2020
1 parent f9a67d1 commit b001728
Show file tree
Hide file tree
Showing 8 changed files with 388 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
package io.prestosql.plugin.hive;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Streams;
import com.google.common.io.CharStreams;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.Duration;
import io.prestosql.plugin.hive.HdfsEnvironment.HdfsContext;
import io.prestosql.plugin.hive.HiveSplit.BucketConversion;
import io.prestosql.plugin.hive.metastore.Column;
Expand All @@ -35,6 +37,7 @@
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.DynamicFilter;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.type.TypeManager;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -74,14 +77,14 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BooleanSupplier;
import java.util.function.IntPredicate;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static io.airlift.concurrent.MoreFutures.addExceptionCallback;
import static io.airlift.concurrent.MoreFutures.toListenableFuture;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_BAD_DATA;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_INVALID_BUCKET_FILES;
Expand Down Expand Up @@ -109,6 +112,7 @@
import static java.lang.String.format;
import static java.util.Collections.max;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.hadoop.hive.common.FileUtils.HIDDEN_FILES_PATH_FILTER;

public class BackgroundHiveSplitLoader
Expand All @@ -129,7 +133,8 @@ public class BackgroundHiveSplitLoader

private final Table table;
private final TupleDomain<? extends ColumnHandle> compactEffectivePredicate;
private final Supplier<TupleDomain<ColumnHandle>> dynamicFilterSupplier;
private final DynamicFilter dynamicFilter;
private final long dynamicFilteringProbeBlockingTimeoutMillis;
private final TypeManager typeManager;
private final Optional<BucketSplitInfo> tableBucketInfo;
private final HdfsEnvironment hdfsEnvironment;
Expand Down Expand Up @@ -163,13 +168,15 @@ public class BackgroundHiveSplitLoader
private final ReadWriteLock taskExecutionLock = new ReentrantReadWriteLock();

private HiveSplitSource hiveSplitSource;
private Stopwatch stopwatch;
private volatile boolean stopped;

public BackgroundHiveSplitLoader(
Table table,
Iterable<HivePartitionMetadata> partitions,
TupleDomain<? extends ColumnHandle> compactEffectivePredicate,
Supplier<TupleDomain<ColumnHandle>> dynamicFilterSupplier,
DynamicFilter dynamicFilter,
Duration dynamicFilteringProbeBlockingTimeout,
TypeManager typeManager,
Optional<BucketSplitInfo> tableBucketInfo,
ConnectorSession session,
Expand All @@ -184,7 +191,8 @@ public BackgroundHiveSplitLoader(
{
this.table = table;
this.compactEffectivePredicate = compactEffectivePredicate;
this.dynamicFilterSupplier = dynamicFilterSupplier;
this.dynamicFilter = dynamicFilter;
this.dynamicFilteringProbeBlockingTimeoutMillis = dynamicFilteringProbeBlockingTimeout.toMillis();
this.typeManager = typeManager;
this.tableBucketInfo = tableBucketInfo;
this.loaderConcurrency = loaderConcurrency;
Expand All @@ -204,6 +212,7 @@ public BackgroundHiveSplitLoader(
public void start(HiveSplitSource splitSource)
{
this.hiveSplitSource = splitSource;
this.stopwatch = Stopwatch.createStarted();
for (int i = 0; i < loaderConcurrency; i++) {
ListenableFuture<?> future = ResumableTasks.submit(executor, new HiveSplitLoaderTask());
addExceptionCallback(future, hiveSplitSource::fail); // best effort; hiveSplitSource could be already completed
Expand All @@ -227,6 +236,14 @@ public TaskStatus process()
return TaskStatus.finished();
}
ListenableFuture<?> future;
// Block until one of below conditions is met:
// 1. Completion of DynamicFilter
// 2. Timeout after waiting for the configured time
long timeLeft = dynamicFilteringProbeBlockingTimeoutMillis - stopwatch.elapsed(MILLISECONDS);
if (timeLeft > 0 && !dynamicFilter.isComplete()) {
future = toListenableFuture(dynamicFilter.isBlocked().orTimeout(timeLeft, MILLISECONDS));
return TaskStatus.continueOn(future);
}
taskExecutionLock.readLock().lock();
try {
future = loadSplits();
Expand Down Expand Up @@ -325,7 +342,7 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
TupleDomain<HiveColumnHandle> effectivePredicate = compactEffectivePredicate.transform(HiveColumnHandle.class::cast);

List<HiveColumnHandle> partitionColumns = getPartitionKeyColumnHandles(table, typeManager);
BooleanSupplier partitionMatchSupplier = () -> partitionMatches(partitionColumns, dynamicFilterSupplier.get(), hivePartition);
BooleanSupplier partitionMatchSupplier = () -> partitionMatches(partitionColumns, dynamicFilter.getCurrentPredicate(), hivePartition);
if (!partitionMatchSupplier.getAsBoolean()) {
// Avoid listing files and creating splits from a partition if it has been pruned due to dynamic filters
return COMPLETED_FUTURE;
Expand Down
16 changes: 16 additions & 0 deletions presto-hive/src/main/java/io/prestosql/plugin/hive/HiveConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ public class HiveConfig

private boolean projectionPushdownEnabled = true;

private Duration dynamicFilteringProbeBlockingTimeout = new Duration(0, MINUTES);

public int getMaxInitialSplits()
{
return maxInitialSplits;
Expand Down Expand Up @@ -964,4 +966,18 @@ public HiveConfig setProjectionPushdownEnabled(boolean projectionPushdownEnabled
this.projectionPushdownEnabled = projectionPushdownEnabled;
return this;
}

@NotNull
public Duration getDynamicFilteringProbeBlockingTimeout()
{
return dynamicFilteringProbeBlockingTimeout;
}

@Config("hive.dynamic-filtering-probe-blocking-timeout")
@ConfigDescription("Duration to wait for completion of dynamic filters during split generation for probe side table")
public HiveConfig setDynamicFilteringProbeBlockingTimeout(Duration dynamicFilteringProbeBlockingTimeout)
{
this.dynamicFilteringProbeBlockingTimeout = dynamicFilteringProbeBlockingTimeout;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.prestosql.orc.OrcWriteValidation.OrcWriteValidationMode;
import io.prestosql.plugin.hive.orc.OrcReaderConfig;
import io.prestosql.plugin.hive.orc.OrcWriterConfig;
Expand All @@ -31,6 +32,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static io.prestosql.plugin.base.session.PropertyMetadataUtil.dataSizeProperty;
import static io.prestosql.plugin.base.session.PropertyMetadataUtil.durationProperty;
import static io.prestosql.plugin.hive.HiveSessionProperties.InsertExistingPartitionsBehavior.APPEND;
import static io.prestosql.plugin.hive.HiveSessionProperties.InsertExistingPartitionsBehavior.ERROR;
import static io.prestosql.spi.StandardErrorCode.INVALID_SESSION_PROPERTY;
Expand Down Expand Up @@ -91,6 +93,7 @@ public final class HiveSessionProperties
private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required";
private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled";
private static final String PARQUET_OPTIMIZED_WRITER_ENABLED = "parquet_optimized_writer_enabled";
private static final String DYNAMIC_FILTERING_PROBE_BLOCKING_TIMEOUT = "dynamic_filtering_probe_blocking_timeout";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -369,6 +372,11 @@ public HiveSessionProperties(
PARQUET_OPTIMIZED_WRITER_ENABLED,
"Experimental: Enable optimized writer",
parquetWriterConfig.isParquetOptimizedWriterEnabled(),
false),
durationProperty(
DYNAMIC_FILTERING_PROBE_BLOCKING_TIMEOUT,
"Duration to wait for completion of dynamic filters during split generation for probe side table",
hiveConfig.getDynamicFilteringProbeBlockingTimeout(),
false));
}

Expand Down Expand Up @@ -635,4 +643,9 @@ public static boolean isParquetOptimizedWriterEnabled(ConnectorSession session)
{
return session.getProperty(PARQUET_OPTIMIZED_WRITER_ENABLED, Boolean.class);
}

public static Duration getDynamicFilteringProbeBlockingTimeout(ConnectorSession session)
{
return session.getProperty(DYNAMIC_FILTERING_PROBE_BLOCKING_TIMEOUT, Duration.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_PARTITION_DROPPED_DURING_QUERY;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_PARTITION_SCHEMA_MISMATCH;
import static io.prestosql.plugin.hive.HivePartition.UNPARTITIONED_ID;
import static io.prestosql.plugin.hive.HiveSessionProperties.getDynamicFilteringProbeBlockingTimeout;
import static io.prestosql.plugin.hive.HiveSessionProperties.isIgnoreAbsentPartitions;
import static io.prestosql.plugin.hive.HiveSessionProperties.isPartitionUseColumnNames;
import static io.prestosql.plugin.hive.TableToPartitionMapping.mapColumnsByIndex;
Expand Down Expand Up @@ -219,7 +220,8 @@ public ConnectorSplitSource getSplits(
table,
hivePartitions,
hiveTable.getCompactEffectivePredicate(),
dynamicFilter::getCurrentPredicate,
dynamicFilter,
getDynamicFilteringProbeBlockingTimeout(session),
typeManager,
createBucketSplitInfo(bucketHandle, bucketFilter),
session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import io.prestosql.plugin.hive.metastore.Table;
import io.prestosql.plugin.hive.util.HiveBucketing.HiveBucketFilter;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.DynamicFilter;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.predicate.Domain;
import io.prestosql.spi.predicate.TupleDomain;
Expand Down Expand Up @@ -68,6 +70,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -343,6 +346,45 @@ public void testNoHangIfPartitionIsOffline()
assertThrows(RuntimeException.class, hiveSplitSource::isFinished);
}

@Test(timeOut = 30_000)
public void testIncompleteDynamicFilterTimeout()
throws Exception
{
BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(
new DynamicFilter() {
@Override
public CompletableFuture<?> isBlocked()
{
return CompletableFuture.runAsync(() -> {
try {
TimeUnit.HOURS.sleep(1);
}
catch (InterruptedException e) {
throw new IllegalStateException(e);
}
});
}

@Override
public boolean isComplete()
{
return false;
}

@Override
public TupleDomain<ColumnHandle> getCurrentPredicate()
{
return TupleDomain.all();
}
},
Duration.valueOf("1s"));
HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
backgroundHiveSplitLoader.start(hiveSplitSource);

assertEquals(drain(hiveSplitSource).size(), 2);
assertTrue(hiveSplitSource.isFinished());
}

@Test
public void testCachedDirectoryLister()
throws Exception
Expand Down Expand Up @@ -454,7 +496,8 @@ public HivePartitionMetadata next()
}
},
TupleDomain.all(),
TupleDomain::all,
DynamicFilter.EMPTY,
Duration.valueOf("0s"),
TYPE_MANAGER,
createBucketSplitInfo(Optional.empty(), Optional.empty()),
SESSION,
Expand Down Expand Up @@ -703,6 +746,21 @@ private static List<HiveSplit> drainSplits(HiveSplitSource source)
return splits.build();
}

private static BackgroundHiveSplitLoader backgroundHiveSplitLoader(
DynamicFilter dynamicFilter,
Duration dynamicFilteringProbeBlockingTimeoutMillis)
{
return backgroundHiveSplitLoader(
new TestingHdfsEnvironment(TEST_FILES),
TupleDomain.all(),
dynamicFilter,
dynamicFilteringProbeBlockingTimeoutMillis,
Optional.empty(),
SIMPLE_TABLE,
Optional.empty(),
Optional.empty());
}

private static BackgroundHiveSplitLoader backgroundHiveSplitLoader(
List<LocatedFileStatus> files,
TupleDomain<HiveColumnHandle> tupleDomain)
Expand Down Expand Up @@ -755,6 +813,27 @@ private static BackgroundHiveSplitLoader backgroundHiveSplitLoader(
Table table,
Optional<HiveBucketHandle> bucketHandle,
Optional<ValidWriteIdList> validWriteIds)
{
return backgroundHiveSplitLoader(
hdfsEnvironment,
compactEffectivePredicate,
DynamicFilter.EMPTY,
Duration.valueOf("0s"),
hiveBucketFilter,
table,
bucketHandle,
validWriteIds);
}

private static BackgroundHiveSplitLoader backgroundHiveSplitLoader(
HdfsEnvironment hdfsEnvironment,
TupleDomain<HiveColumnHandle> compactEffectivePredicate,
DynamicFilter dynamicFilter,
Duration dynamicFilteringProbeBlockingTimeout,
Optional<HiveBucketFilter> hiveBucketFilter,
Table table,
Optional<HiveBucketHandle> bucketHandle,
Optional<ValidWriteIdList> validWriteIds)
{
List<HivePartitionMetadata> hivePartitionMetadatas =
ImmutableList.of(
Expand All @@ -767,7 +846,8 @@ private static BackgroundHiveSplitLoader backgroundHiveSplitLoader(
table,
hivePartitionMetadatas,
compactEffectivePredicate,
TupleDomain::all,
dynamicFilter,
dynamicFilteringProbeBlockingTimeout,
TYPE_MANAGER,
createBucketSplitInfo(bucketHandle, hiveBucketFilter),
SESSION,
Expand Down Expand Up @@ -796,7 +876,8 @@ private static BackgroundHiveSplitLoader backgroundHiveSplitLoader(List<LocatedF
SIMPLE_TABLE,
hivePartitionMetadatas,
TupleDomain.none(),
TupleDomain::all,
DynamicFilter.EMPTY,
Duration.valueOf("0s"),
TYPE_MANAGER,
Optional.empty(),
connectorSession,
Expand All @@ -819,7 +900,8 @@ private static BackgroundHiveSplitLoader backgroundHiveSplitLoaderOfflinePartiti
SIMPLE_TABLE,
createPartitionMetadataWithOfflinePartitions(),
TupleDomain.all(),
TupleDomain::all,
DynamicFilter.EMPTY,
Duration.valueOf("0s"),
TYPE_MANAGER,
createBucketSplitInfo(Optional.empty(), Optional.empty()),
connectorSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ public void testDefaults()
.setAllowRegisterPartition(false)
.setQueryPartitionFilterRequired(false)
.setPartitionUseColumnNames(false)
.setProjectionPushdownEnabled(true));
.setProjectionPushdownEnabled(true)
.setDynamicFilteringProbeBlockingTimeout(new Duration(0, TimeUnit.MINUTES)));
}

@Test
Expand Down Expand Up @@ -164,6 +165,7 @@ public void testExplicitPropertyMappings()
.put("hive.query-partition-filter-required", "true")
.put("hive.partition-use-column-names", "true")
.put("hive.projection-pushdown-enabled", "false")
.put("hive.dynamic-filtering-probe-blocking-timeout", "10s")
.build();

HiveConfig expected = new HiveConfig()
Expand Down Expand Up @@ -228,7 +230,8 @@ public void testExplicitPropertyMappings()
.setAllowRegisterPartition(true)
.setQueryPartitionFilterRequired(true)
.setPartitionUseColumnNames(true)
.setProjectionPushdownEnabled(false);
.setProjectionPushdownEnabled(false)
.setDynamicFilteringProbeBlockingTimeout(new Duration(10, TimeUnit.SECONDS));

assertFullMapping(properties, expected);
}
Expand Down
Loading

0 comments on commit b001728

Please sign in to comment.