Skip to content

Commit

Permalink
Enable reading Parquet's bloomfilter statistics for Iceberg connector
Browse files Browse the repository at this point in the history
  • Loading branch information
leetcode-1533 committed Apr 24, 2023
1 parent c4f0e7e commit b1d730e
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 1 deletion.
6 changes: 6 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1521,3 +1521,9 @@ with Parquet files performed by the Iceberg connector.
for structural data types. The equivalent catalog session property is
``parquet_optimized_nested_reader_enabled``.
- ``true``
* - ``parquet.use-bloom-filter``
- Whether bloom filters are used for predicate pushdown when reading
Parquet files. Set this property to ``false`` to disable the usage of
bloom filters by default. The equivalent catalog session property is
``parquet_use_bloom_filter``.
- ``true``
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.trino.orc.TupleDomainOrcPredicate;
import io.trino.orc.TupleDomainOrcPredicate.TupleDomainOrcPredicateBuilder;
import io.trino.orc.metadata.OrcType;
import io.trino.parquet.BloomFilterStore;
import io.trino.parquet.Field;
import io.trino.parquet.ParquetCorruptionException;
import io.trino.parquet.ParquetDataSource;
Expand Down Expand Up @@ -131,6 +132,7 @@
import static io.trino.orc.OrcReader.INITIAL_BATCH_SIZE;
import static io.trino.orc.OrcReader.ProjectedLayout;
import static io.trino.orc.OrcReader.fullyProjectedLayout;
import static io.trino.parquet.BloomFilterStore.getBloomFilterStore;
import static io.trino.parquet.ParquetTypeUtils.getColumnIO;
import static io.trino.parquet.ParquetTypeUtils.getDescriptors;
import static io.trino.parquet.predicate.PredicateUtils.buildPredicate;
Expand All @@ -156,6 +158,7 @@
import static io.trino.plugin.iceberg.IcebergSessionProperties.isParquetOptimizedNestedReaderEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isParquetOptimizedReaderEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isUseFileSizeFromMetadata;
import static io.trino.plugin.iceberg.IcebergSessionProperties.useParquetBloomFilter;
import static io.trino.plugin.iceberg.IcebergSplitManager.ICEBERG_DOMAIN_COMPACTION_THRESHOLD;
import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue;
import static io.trino.plugin.iceberg.IcebergUtil.getColumnHandle;
Expand Down Expand Up @@ -510,6 +513,7 @@ public ReaderPageSourceWithRowPositions createDataPageSource(
.withMaxReadBlockSize(getParquetMaxReadBlockSize(session))
.withMaxReadBlockRowCount(getParquetMaxReadBlockRowCount(session))
.withBatchColumnReaders(isParquetOptimizedReaderEnabled(session))
.withBloomFilter(useParquetBloomFilter(session))
.withBatchNestedColumnReaders(isParquetOptimizedNestedReaderEnabled(session)),
predicate,
fileFormatDataSourceStats,
Expand Down Expand Up @@ -927,8 +931,10 @@ private static ReaderPageSourceWithRowPositions createParquetPageSource(
List<BlockMetaData> blocks = new ArrayList<>();
for (BlockMetaData block : parquetMetadata.getBlocks()) {
long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset();
Optional<BloomFilterStore> bloomFilterStore = getBloomFilterStore(dataSource, block, parquetTupleDomain, options);

if (start <= firstDataPage && firstDataPage < start + length &&
predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain, Optional.empty(), Optional.empty(), UTC, ICEBERG_DOMAIN_COMPACTION_THRESHOLD)) {
predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain, Optional.empty(), bloomFilterStore, UTC, ICEBERG_DOMAIN_COMPACTION_THRESHOLD)) {
blocks.add(block);
blockStarts.add(nextStart);
if (startRowPosition.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public final class IcebergSessionProperties
private static final String ORC_WRITER_MAX_STRIPE_ROWS = "orc_writer_max_stripe_rows";
private static final String ORC_WRITER_MAX_DICTIONARY_MEMORY = "orc_writer_max_dictionary_memory";
private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size";
private static final String PARQUET_USE_BLOOM_FILTER = "parquet_use_bloom_filter";
private static final String PARQUET_MAX_READ_BLOCK_ROW_COUNT = "parquet_max_read_block_row_count";
private static final String PARQUET_OPTIMIZED_READER_ENABLED = "parquet_optimized_reader_enabled";
private static final String PARQUET_OPTIMIZED_NESTED_READER_ENABLED = "parquet_optimized_nested_reader_enabled";
Expand Down Expand Up @@ -197,6 +198,11 @@ public IcebergSessionProperties(
"Parquet: Maximum size of a block to read",
parquetReaderConfig.getMaxReadBlockSize(),
false))
.add(booleanProperty(
PARQUET_USE_BLOOM_FILTER,
"Parquet: Enable using Parquet bloom filter",
parquetReaderConfig.isUseBloomFilter(),
false))
.add(integerProperty(
PARQUET_MAX_READ_BLOCK_ROW_COUNT,
"Parquet: Maximum number of rows read in a batch",
Expand Down Expand Up @@ -432,6 +438,11 @@ public static int getParquetWriterBatchSize(ConnectorSession session)
return session.getProperty(PARQUET_WRITER_BATCH_SIZE, Integer.class);
}

public static boolean useParquetBloomFilter(ConnectorSession session)
{
return session.getProperty(PARQUET_USE_BLOOM_FILTER, Boolean.class);
}

public static Duration getDynamicFilteringWaitTimeout(ConnectorSession session)
{
return session.getProperty(DYNAMIC_FILTERING_WAIT_TIMEOUT, Duration.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableMap;
import io.trino.plugin.hive.TestingHivePlugin;
import io.trino.spi.connector.CatalogSchemaTableName;
import io.trino.spi.connector.SchemaTableName;
import io.trino.testing.BaseTestParquetWithBloomFilters;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.QueryRunner;

import java.nio.file.Path;
import java.util.List;

import static io.trino.plugin.hive.parquet.TestHiveParquetWithBloomFilters.writeParquetFileWithBloomFilter;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static java.lang.String.format;

public class TestIcebergParquetWithBloomFilters
extends BaseTestParquetWithBloomFilters
{
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
DistributedQueryRunner queryRunner = IcebergQueryRunner.builder().build();
dataDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data");

// create hive catalog
queryRunner.installPlugin(new TestingHivePlugin());
queryRunner.createCatalog("hive", "hive", ImmutableMap.<String, String>builder()
.put("hive.metastore", "file")
.put("hive.metastore.catalog.dir", dataDirectory.toString())
.put("hive.security", "allow-all")
.buildOrThrow());

return queryRunner;
}

@Override
protected CatalogSchemaTableName createParquetTableWithBloomFilter(String columnName, List<Integer> testValues)
{
// create the managed table
String tableName = "parquet_with_bloom_filters_" + randomNameSuffix();
CatalogSchemaTableName hiveCatalogSchemaTableName = new CatalogSchemaTableName("hive", new SchemaTableName("tpch", tableName));
CatalogSchemaTableName icebergCatalogSchemaTableName = new CatalogSchemaTableName("iceberg", new SchemaTableName("tpch", tableName));
assertUpdate(format("CREATE TABLE %s (%s INT) WITH (format = 'PARQUET')", hiveCatalogSchemaTableName, columnName));

// directly write data to the managed table
Path tableLocation = Path.of("%s/tpch/%s".formatted(dataDirectory, tableName));
Path fileLocation = tableLocation.resolve("bloomFilterFile.parquet");
writeParquetFileWithBloomFilter(fileLocation.toFile(), columnName, testValues);

// migrate the hive table to the iceberg table
assertUpdate("CALL iceberg.system.migrate('tpch', '" + tableName + "', 'false')");

return icebergCatalogSchemaTableName;
}
}

0 comments on commit b1d730e

Please sign in to comment.