Skip to content

Commit

Permalink
Push down predicates for Iceberg connector
Browse files Browse the repository at this point in the history
  • Loading branch information
Praveen2112 authored and electrum committed Oct 22, 2019
1 parent 6192023 commit 014c1d5
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,15 @@
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.ConnectorTableMetadata;
import io.prestosql.spi.connector.ConnectorTableProperties;
import io.prestosql.spi.connector.Constraint;
import io.prestosql.spi.connector.ConstraintApplicationResult;
import io.prestosql.spi.connector.SchemaNotFoundException;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.connector.SchemaTablePrefix;
import io.prestosql.spi.connector.SystemTable;
import io.prestosql.spi.connector.TableNotFoundException;
import io.prestosql.spi.connector.classloader.ClassLoaderSafeSystemTable;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.statistics.ComputedStatistics;
import io.prestosql.spi.type.DecimalType;
import io.prestosql.spi.type.TimestampType;
Expand Down Expand Up @@ -79,6 +82,7 @@
import static io.prestosql.plugin.hive.HiveColumnHandle.updateRowIdHandle;
import static io.prestosql.plugin.hive.HiveSchemaProperties.getLocation;
import static io.prestosql.plugin.hive.util.HiveWriteUtils.getTableDefaultLocation;
import static io.prestosql.plugin.iceberg.DomainConverter.convertTupleDomainTypes;
import static io.prestosql.plugin.iceberg.ExpressionConverter.toIcebergExpression;
import static io.prestosql.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
import static io.prestosql.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
Expand Down Expand Up @@ -535,4 +539,23 @@ public void rollback()
{
// TODO: cleanup open transaction
}

@Override
public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(ConnectorSession session, ConnectorTableHandle handle, Constraint constraint)
{
IcebergTableHandle table = (IcebergTableHandle) handle;
TupleDomain<HiveColumnHandle> newDomain = convertTupleDomainTypes(constraint.getSummary().transform(HiveColumnHandle.class::cast));

if (newDomain.equals(table.getPredicate())) {
return Optional.empty();
}

return Optional.of(new ConstraintApplicationResult<>(
new IcebergTableHandle(table.getSchemaName(),
table.getTableName(),
table.getTableType(),
table.getSnapshotId(),
table.getPredicate().intersect(newDomain)),
constraint.getSummary()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Iterators.limit;
import static io.prestosql.plugin.hive.HivePartitionKey.HIVE_DEFAULT_DYNAMIC_PARTITION;
import static io.prestosql.plugin.iceberg.DomainConverter.convertTupleDomainTypes;
import static io.prestosql.plugin.iceberg.IcebergUtil.getIdentityPartitions;
import static java.lang.Math.toIntExact;
import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -90,7 +89,6 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHan
{
// TODO: move this to a background thread
List<ConnectorSplit> splits = new ArrayList<>();
TupleDomain<HiveColumnHandle> predicate = convertTupleDomainTypes(this.predicate);
Iterator<FileScanTask> iterator = limit(fileScanIterator, maxSize);
while (iterator.hasNext()) {
FileScanTask task = iterator.next();
Expand Down

0 comments on commit 014c1d5

Please sign in to comment.