Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support lazy dynamic filtering in hive connector #4991

Merged
merged 2 commits into from
Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
package io.prestosql.plugin.hive;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Streams;
import com.google.common.io.CharStreams;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.Duration;
import io.prestosql.plugin.hive.HdfsEnvironment.HdfsContext;
import io.prestosql.plugin.hive.HiveSplit.BucketConversion;
import io.prestosql.plugin.hive.metastore.Column;
Expand All @@ -35,6 +37,7 @@
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.DynamicFilter;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.type.TypeManager;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -74,14 +77,14 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BooleanSupplier;
import java.util.function.IntPredicate;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static io.airlift.concurrent.MoreFutures.addExceptionCallback;
import static io.airlift.concurrent.MoreFutures.toListenableFuture;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_BAD_DATA;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_INVALID_BUCKET_FILES;
Expand Down Expand Up @@ -109,6 +112,7 @@
import static java.lang.String.format;
import static java.util.Collections.max;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.hadoop.hive.common.FileUtils.HIDDEN_FILES_PATH_FILTER;

public class BackgroundHiveSplitLoader
Expand All @@ -129,7 +133,8 @@ public class BackgroundHiveSplitLoader

private final Table table;
private final TupleDomain<? extends ColumnHandle> compactEffectivePredicate;
private final Supplier<TupleDomain<ColumnHandle>> dynamicFilterSupplier;
private final DynamicFilter dynamicFilter;
private final long dynamicFilteringProbeBlockingTimeoutMillis;
private final TypeManager typeManager;
private final Optional<BucketSplitInfo> tableBucketInfo;
private final HdfsEnvironment hdfsEnvironment;
Expand Down Expand Up @@ -163,13 +168,15 @@ public class BackgroundHiveSplitLoader
private final ReadWriteLock taskExecutionLock = new ReentrantReadWriteLock();

private HiveSplitSource hiveSplitSource;
private Stopwatch stopwatch;
private volatile boolean stopped;

public BackgroundHiveSplitLoader(
Table table,
Iterable<HivePartitionMetadata> partitions,
TupleDomain<? extends ColumnHandle> compactEffectivePredicate,
Supplier<TupleDomain<ColumnHandle>> dynamicFilterSupplier,
DynamicFilter dynamicFilter,
Duration dynamicFilteringProbeBlockingTimeout,
TypeManager typeManager,
Optional<BucketSplitInfo> tableBucketInfo,
ConnectorSession session,
Expand All @@ -184,7 +191,8 @@ public BackgroundHiveSplitLoader(
{
this.table = table;
this.compactEffectivePredicate = compactEffectivePredicate;
this.dynamicFilterSupplier = dynamicFilterSupplier;
this.dynamicFilter = dynamicFilter;
this.dynamicFilteringProbeBlockingTimeoutMillis = dynamicFilteringProbeBlockingTimeout.toMillis();
this.typeManager = typeManager;
this.tableBucketInfo = tableBucketInfo;
this.loaderConcurrency = loaderConcurrency;
Expand All @@ -204,6 +212,7 @@ public BackgroundHiveSplitLoader(
public void start(HiveSplitSource splitSource)
{
this.hiveSplitSource = splitSource;
this.stopwatch = Stopwatch.createStarted();
for (int i = 0; i < loaderConcurrency; i++) {
ListenableFuture<?> future = ResumableTasks.submit(executor, new HiveSplitLoaderTask());
addExceptionCallback(future, hiveSplitSource::fail); // best effort; hiveSplitSource could be already completed
Expand All @@ -227,6 +236,14 @@ public TaskStatus process()
return TaskStatus.finished();
}
ListenableFuture<?> future;
// Block until one of below conditions is met:
// 1. Completion of DynamicFilter
// 2. Timeout after waiting for the configured time
long timeLeft = dynamicFilteringProbeBlockingTimeoutMillis - stopwatch.elapsed(MILLISECONDS);
if (timeLeft > 0 && !dynamicFilter.isComplete()) {
future = toListenableFuture(dynamicFilter.isBlocked().orTimeout(timeLeft, MILLISECONDS));
sopel39 marked this conversation as resolved.
Show resolved Hide resolved
return TaskStatus.continueOn(future);
}
taskExecutionLock.readLock().lock();
try {
future = loadSplits();
Expand Down Expand Up @@ -325,7 +342,7 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
TupleDomain<HiveColumnHandle> effectivePredicate = compactEffectivePredicate.transform(HiveColumnHandle.class::cast);

List<HiveColumnHandle> partitionColumns = getPartitionKeyColumnHandles(table, typeManager);
BooleanSupplier partitionMatchSupplier = () -> partitionMatches(partitionColumns, dynamicFilterSupplier.get(), hivePartition);
BooleanSupplier partitionMatchSupplier = () -> partitionMatches(partitionColumns, dynamicFilter.getCurrentPredicate(), hivePartition);
if (!partitionMatchSupplier.getAsBoolean()) {
// Avoid listing files and creating splits from a partition if it has been pruned due to dynamic filters
return COMPLETED_FUTURE;
Expand Down
16 changes: 16 additions & 0 deletions presto-hive/src/main/java/io/prestosql/plugin/hive/HiveConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ public class HiveConfig

private boolean projectionPushdownEnabled = true;

private Duration dynamicFilteringProbeBlockingTimeout = new Duration(0, MINUTES);

public int getMaxInitialSplits()
{
return maxInitialSplits;
Expand Down Expand Up @@ -964,4 +966,18 @@ public HiveConfig setProjectionPushdownEnabled(boolean projectionPushdownEnabled
this.projectionPushdownEnabled = projectionPushdownEnabled;
return this;
}

@NotNull
public Duration getDynamicFilteringProbeBlockingTimeout()
{
return dynamicFilteringProbeBlockingTimeout;
}

@Config("hive.dynamic-filtering-probe-blocking-timeout")
@ConfigDescription("Duration to wait for completion of dynamic filters during split generation for probe side table")
public HiveConfig setDynamicFilteringProbeBlockingTimeout(Duration dynamicFilteringProbeBlockingTimeout)
{
this.dynamicFilteringProbeBlockingTimeout = dynamicFilteringProbeBlockingTimeout;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.prestosql.orc.OrcWriteValidation.OrcWriteValidationMode;
import io.prestosql.plugin.hive.orc.OrcReaderConfig;
import io.prestosql.plugin.hive.orc.OrcWriterConfig;
Expand All @@ -31,6 +32,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static io.prestosql.plugin.base.session.PropertyMetadataUtil.dataSizeProperty;
import static io.prestosql.plugin.base.session.PropertyMetadataUtil.durationProperty;
import static io.prestosql.plugin.hive.HiveSessionProperties.InsertExistingPartitionsBehavior.APPEND;
import static io.prestosql.plugin.hive.HiveSessionProperties.InsertExistingPartitionsBehavior.ERROR;
import static io.prestosql.spi.StandardErrorCode.INVALID_SESSION_PROPERTY;
Expand Down Expand Up @@ -91,6 +93,7 @@ public final class HiveSessionProperties
private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required";
private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled";
private static final String PARQUET_OPTIMIZED_WRITER_ENABLED = "parquet_optimized_writer_enabled";
private static final String DYNAMIC_FILTERING_PROBE_BLOCKING_TIMEOUT = "dynamic_filtering_probe_blocking_timeout";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -369,6 +372,11 @@ public HiveSessionProperties(
PARQUET_OPTIMIZED_WRITER_ENABLED,
"Experimental: Enable optimized writer",
parquetWriterConfig.isParquetOptimizedWriterEnabled(),
false),
durationProperty(
DYNAMIC_FILTERING_PROBE_BLOCKING_TIMEOUT,
"Duration to wait for completion of dynamic filters during split generation for probe side table",
hiveConfig.getDynamicFilteringProbeBlockingTimeout(),
false));
}

Expand Down Expand Up @@ -635,4 +643,9 @@ public static boolean isParquetOptimizedWriterEnabled(ConnectorSession session)
{
return session.getProperty(PARQUET_OPTIMIZED_WRITER_ENABLED, Boolean.class);
}

public static Duration getDynamicFilteringProbeBlockingTimeout(ConnectorSession session)
{
return session.getProperty(DYNAMIC_FILTERING_PROBE_BLOCKING_TIMEOUT, Duration.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_PARTITION_DROPPED_DURING_QUERY;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_PARTITION_SCHEMA_MISMATCH;
import static io.prestosql.plugin.hive.HivePartition.UNPARTITIONED_ID;
import static io.prestosql.plugin.hive.HiveSessionProperties.getDynamicFilteringProbeBlockingTimeout;
import static io.prestosql.plugin.hive.HiveSessionProperties.isIgnoreAbsentPartitions;
import static io.prestosql.plugin.hive.HiveSessionProperties.isPartitionUseColumnNames;
import static io.prestosql.plugin.hive.TableToPartitionMapping.mapColumnsByIndex;
Expand Down Expand Up @@ -219,7 +220,8 @@ public ConnectorSplitSource getSplits(
table,
hivePartitions,
hiveTable.getCompactEffectivePredicate(),
dynamicFilter::getCurrentPredicate,
dynamicFilter,
getDynamicFilteringProbeBlockingTimeout(session),
typeManager,
createBucketSplitInfo(bucketHandle, bucketFilter),
session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import io.prestosql.plugin.hive.metastore.Table;
import io.prestosql.plugin.hive.util.HiveBucketing.HiveBucketFilter;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.DynamicFilter;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.predicate.Domain;
import io.prestosql.spi.predicate.TupleDomain;
Expand Down Expand Up @@ -68,6 +70,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -343,6 +346,45 @@ public void testNoHangIfPartitionIsOffline()
assertThrows(RuntimeException.class, hiveSplitSource::isFinished);
}

@Test(timeOut = 30_000)
public void testIncompleteDynamicFilterTimeout()
throws Exception
{
BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(
new DynamicFilter() {
@Override
public CompletableFuture<?> isBlocked()
{
return CompletableFuture.runAsync(() -> {
try {
TimeUnit.HOURS.sleep(1);
}
catch (InterruptedException e) {
throw new IllegalStateException(e);
}
});
}

@Override
public boolean isComplete()
{
return false;
}

@Override
public TupleDomain<ColumnHandle> getCurrentPredicate()
{
return TupleDomain.all();
}
},
Duration.valueOf("1s"));
HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
backgroundHiveSplitLoader.start(hiveSplitSource);

assertEquals(drain(hiveSplitSource).size(), 2);
assertTrue(hiveSplitSource.isFinished());
}

@Test
public void testCachedDirectoryLister()
throws Exception
Expand Down Expand Up @@ -454,7 +496,8 @@ public HivePartitionMetadata next()
}
},
TupleDomain.all(),
TupleDomain::all,
DynamicFilter.EMPTY,
Duration.valueOf("0s"),
TYPE_MANAGER,
createBucketSplitInfo(Optional.empty(), Optional.empty()),
SESSION,
Expand Down Expand Up @@ -703,6 +746,21 @@ private static List<HiveSplit> drainSplits(HiveSplitSource source)
return splits.build();
}

private static BackgroundHiveSplitLoader backgroundHiveSplitLoader(
raunaqmorarka marked this conversation as resolved.
Show resolved Hide resolved
DynamicFilter dynamicFilter,
Duration dynamicFilteringProbeBlockingTimeoutMillis)
{
return backgroundHiveSplitLoader(
new TestingHdfsEnvironment(TEST_FILES),
TupleDomain.all(),
dynamicFilter,
dynamicFilteringProbeBlockingTimeoutMillis,
Optional.empty(),
SIMPLE_TABLE,
Optional.empty(),
Optional.empty());
}

private static BackgroundHiveSplitLoader backgroundHiveSplitLoader(
List<LocatedFileStatus> files,
TupleDomain<HiveColumnHandle> tupleDomain)
Expand Down Expand Up @@ -755,6 +813,27 @@ private static BackgroundHiveSplitLoader backgroundHiveSplitLoader(
Table table,
Optional<HiveBucketHandle> bucketHandle,
Optional<ValidWriteIdList> validWriteIds)
{
return backgroundHiveSplitLoader(
hdfsEnvironment,
compactEffectivePredicate,
DynamicFilter.EMPTY,
Duration.valueOf("0s"),
hiveBucketFilter,
table,
bucketHandle,
validWriteIds);
}

private static BackgroundHiveSplitLoader backgroundHiveSplitLoader(
HdfsEnvironment hdfsEnvironment,
TupleDomain<HiveColumnHandle> compactEffectivePredicate,
DynamicFilter dynamicFilter,
Duration dynamicFilteringProbeBlockingTimeout,
Optional<HiveBucketFilter> hiveBucketFilter,
Table table,
Optional<HiveBucketHandle> bucketHandle,
Optional<ValidWriteIdList> validWriteIds)
{
List<HivePartitionMetadata> hivePartitionMetadatas =
ImmutableList.of(
Expand All @@ -767,7 +846,8 @@ private static BackgroundHiveSplitLoader backgroundHiveSplitLoader(
table,
hivePartitionMetadatas,
compactEffectivePredicate,
TupleDomain::all,
dynamicFilter,
dynamicFilteringProbeBlockingTimeout,
TYPE_MANAGER,
createBucketSplitInfo(bucketHandle, hiveBucketFilter),
SESSION,
Expand Down Expand Up @@ -796,7 +876,8 @@ private static BackgroundHiveSplitLoader backgroundHiveSplitLoader(List<LocatedF
SIMPLE_TABLE,
hivePartitionMetadatas,
TupleDomain.none(),
TupleDomain::all,
DynamicFilter.EMPTY,
Duration.valueOf("0s"),
TYPE_MANAGER,
Optional.empty(),
connectorSession,
Expand All @@ -819,7 +900,8 @@ private static BackgroundHiveSplitLoader backgroundHiveSplitLoaderOfflinePartiti
SIMPLE_TABLE,
createPartitionMetadataWithOfflinePartitions(),
TupleDomain.all(),
TupleDomain::all,
DynamicFilter.EMPTY,
Duration.valueOf("0s"),
TYPE_MANAGER,
createBucketSplitInfo(Optional.empty(), Optional.empty()),
connectorSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ public void testDefaults()
.setAllowRegisterPartition(false)
.setQueryPartitionFilterRequired(false)
.setPartitionUseColumnNames(false)
.setProjectionPushdownEnabled(true));
.setProjectionPushdownEnabled(true)
.setDynamicFilteringProbeBlockingTimeout(new Duration(0, TimeUnit.MINUTES)));
}

@Test
Expand Down Expand Up @@ -164,6 +165,7 @@ public void testExplicitPropertyMappings()
.put("hive.query-partition-filter-required", "true")
.put("hive.partition-use-column-names", "true")
.put("hive.projection-pushdown-enabled", "false")
.put("hive.dynamic-filtering-probe-blocking-timeout", "10s")
.build();

HiveConfig expected = new HiveConfig()
Expand Down Expand Up @@ -228,7 +230,8 @@ public void testExplicitPropertyMappings()
.setAllowRegisterPartition(true)
.setQueryPartitionFilterRequired(true)
.setPartitionUseColumnNames(true)
.setProjectionPushdownEnabled(false);
.setProjectionPushdownEnabled(false)
.setDynamicFilteringProbeBlockingTimeout(new Duration(10, TimeUnit.SECONDS));

assertFullMapping(properties, expected);
}
Expand Down
Loading