Skip to content

Commit

Permalink
Accelerate Iceberg when reading partition columns only
Browse files Browse the repository at this point in the history
Manifests contain trustworthy information about record count, so it can
be used to answer the count(*) queries.
  • Loading branch information
findepi committed Oct 16, 2023
1 parent acc5844 commit b756564
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@
import io.trino.plugin.iceberg.delete.PositionDeleteFilter;
import io.trino.plugin.iceberg.delete.RowPredicate;
import io.trino.plugin.iceberg.fileio.ForwardingInputFile;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorPageSourceProvider;
Expand All @@ -75,6 +78,7 @@
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.EmptyPageSource;
import io.trino.spi.connector.FixedPageSource;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.NullableValue;
import io.trino.spi.predicate.Range;
Expand Down Expand Up @@ -136,6 +140,7 @@
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Maps.uniqueIndex;
import static io.airlift.slice.SizeOf.SIZE_OF_LONG;
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static io.trino.orc.OrcReader.INITIAL_BATCH_SIZE;
Expand Down Expand Up @@ -177,6 +182,7 @@
import static io.trino.plugin.iceberg.delete.EqualityDeleteFilter.readEqualityDeletes;
import static io.trino.plugin.iceberg.delete.PositionDeleteFilter.readPositionDeletes;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.block.PageBuilderStatus.DEFAULT_MAX_PAGE_SIZE_IN_BYTES;
import static io.trino.spi.predicate.Utils.nativeValueToBlock;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
Expand All @@ -185,6 +191,8 @@
import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
import static io.trino.spi.type.UuidType.UUID;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.Math.min;
import static java.lang.Math.toIntExact;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
Expand All @@ -205,6 +213,12 @@ public class IcebergPageSourceProvider
{
private static final String AVRO_FIELD_ID = "field-id";

// This is used whenever a query doesn't reference any data columns.
// We need to limit the number of rows per page in case there are projections
// in the query that can cause page sizes to explode. For example: SELECT rand() FROM some_table
// TODO (https://github.com/trinodb/trino/issues/16824) allow connector to return pages of arbitrary row count and handle this gracefully in engine
private static final int MAX_RLE_PAGE_SIZE = DEFAULT_MAX_PAGE_SIZE_IN_BYTES / SIZE_OF_LONG;

private final TrinoFileSystemFactory fileSystemFactory;
private final FileFormatDataSourceStats fileFormatDataSourceStats;
private final OrcReaderOptions orcReaderOptions;
Expand Down Expand Up @@ -259,6 +273,7 @@ public ConnectorPageSource createPageSource(
split.getStart(),
split.getLength(),
split.getFileSize(),
split.getFileRecordCount(),
split.getPartitionDataJson(),
split.getFileFormat(),
tableHandle.getNameMappingJson().map(NameMappingParser::fromJson));
Expand All @@ -277,6 +292,7 @@ public ConnectorPageSource createPageSource(
long start,
long length,
long fileSize,
long fileRecordCount,
String partitionDataJson,
IcebergFileFormat fileFormat,
Optional<NameMapping> nameMapping)
Expand Down Expand Up @@ -330,6 +346,21 @@ else if (identity.getId() == TRINO_MERGE_PARTITION_DATA) {
? fileSystem.newInputFile(Location.of(path), fileSize)
: fileSystem.newInputFile(Location.of(path));

try {
if (effectivePredicate.isAll() &&
start == 0 && length == inputfile.length() &&
deletes.isEmpty() &&
icebergColumns.stream().allMatch(column -> partitionKeys.containsKey(column.getId()))) {
return generatePages(
fileRecordCount,
icebergColumns,
partitionKeys);
}
}
catch (IOException e) {
throw new UncheckedIOException(e);
}

ReaderPageSourceWithRowPositions readerPageSourceWithRowPositions = createDataPageSource(
session,
inputfile,
Expand Down Expand Up @@ -575,6 +606,41 @@ public ReaderPageSourceWithRowPositions createDataPageSource(
}
}

private static ConnectorPageSource generatePages(
long totalRowCount,
List<IcebergColumnHandle> icebergColumns,
Map<Integer, Optional<String>> partitionKeys)
{
int maxPageSize = MAX_RLE_PAGE_SIZE;
Block[] pageBlocks = new Block[icebergColumns.size()];
for (int i = 0; i < icebergColumns.size(); i++) {
IcebergColumnHandle column = icebergColumns.get(i);
Type trinoType = column.getType();
Object partitionValue = deserializePartitionValue(trinoType, partitionKeys.get(column.getId()).orElse(null), column.getName());
pageBlocks[i] = RunLengthEncodedBlock.create(nativeValueToBlock(trinoType, partitionValue), maxPageSize);
}
Page maxPage = new Page(maxPageSize, pageBlocks);

return new FixedPageSource(
new AbstractIterator<>()
{
private long rowIndex;

@Override
protected Page computeNext()
{
if (rowIndex == totalRowCount) {
return endOfData();
}
int pageSize = toIntExact(min(maxPageSize, totalRowCount - rowIndex));
Page page = maxPage.getRegion(0, pageSize);
rowIndex += pageSize;
return page;
}
},
maxPage.getRetainedSizeInBytes());
}

private static ReaderPageSourceWithRowPositions createOrcPageSource(
TrinoInputFile inputFile,
long start,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
public final class IcebergSessionProperties
implements SessionPropertiesProvider
{
public static final String SPLIT_SIZE = "experimental_split_size";
private static final String COMPRESSION_CODEC = "compression_codec";
private static final String USE_FILE_SIZE_FROM_METADATA = "use_file_size_from_metadata";
private static final String ORC_BLOOM_FILTERS_ENABLED = "orc_bloom_filters_enabled";
Expand Down Expand Up @@ -103,6 +104,13 @@ public IcebergSessionProperties(
ParquetWriterConfig parquetWriterConfig)
{
sessionProperties = ImmutableList.<PropertyMetadata<?>>builder()
.add(dataSizeProperty(
SPLIT_SIZE,
"Target split size",
// Note: this is null by default & hidden, currently mainly for tests.
// See https://github.com/trinodb/trino/issues/9018#issuecomment-1752929193 for further discussion.
null,
true))
.add(enumProperty(
COMPRESSION_CODEC,
"Compression codec to use when writing files",
Expand Down Expand Up @@ -404,6 +412,11 @@ public static DataSize getOrcWriterMaxDictionaryMemory(ConnectorSession session)
return session.getProperty(ORC_WRITER_MAX_DICTIONARY_MEMORY, DataSize.class);
}

public static Optional<DataSize> getSplitSize(ConnectorSession session)
{
return Optional.ofNullable(session.getProperty(SPLIT_SIZE, DataSize.class));
}

public static HiveCompressionCodec getCompressionCodec(ConnectorSession session)
{
return session.getProperty(COMPRESSION_CODEC, HiveCompressionCodec.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class IcebergSplit
private final long start;
private final long length;
private final long fileSize;
private final long fileRecordCount;
private final IcebergFileFormat fileFormat;
private final String partitionSpecJson;
private final String partitionDataJson;
Expand All @@ -52,6 +53,7 @@ public IcebergSplit(
@JsonProperty("start") long start,
@JsonProperty("length") long length,
@JsonProperty("fileSize") long fileSize,
@JsonProperty("fileRecordCount") long fileRecordCount,
@JsonProperty("fileFormat") IcebergFileFormat fileFormat,
@JsonProperty("partitionSpecJson") String partitionSpecJson,
@JsonProperty("partitionDataJson") String partitionDataJson,
Expand All @@ -62,6 +64,7 @@ public IcebergSplit(
this.start = start;
this.length = length;
this.fileSize = fileSize;
this.fileRecordCount = fileRecordCount;
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.partitionSpecJson = requireNonNull(partitionSpecJson, "partitionSpecJson is null");
this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null");
Expand Down Expand Up @@ -106,6 +109,12 @@ public long getFileSize()
return fileSize;
}

@JsonProperty
public long getFileRecordCount()
{
return fileRecordCount;
}

@JsonProperty
public IcebergFileFormat getFileFormat()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.trino.spi.type.TypeManager;
import jakarta.annotation.Nullable;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableScan;
Expand Down Expand Up @@ -76,6 +77,7 @@
import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnHandle;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static io.trino.plugin.iceberg.IcebergMetadataColumn.isMetadataColumnId;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getSplitSize;
import static io.trino.plugin.iceberg.IcebergSplitManager.ICEBERG_DOMAIN_COMPACTION_THRESHOLD;
import static io.trino.plugin.iceberg.IcebergTypes.convertIcebergValueToTrino;
import static io.trino.plugin.iceberg.IcebergUtil.getColumnHandle;
Expand Down Expand Up @@ -110,6 +112,7 @@ public class IcebergSplitSource
private final TypeManager typeManager;
private final Closer closer = Closer.create();
private final double minimumAssignedSplitWeight;
private final Set<Integer> projectedBaseColumns;
private final TupleDomain<IcebergColumnHandle> dataColumnPredicate;
private final Domain pathDomain;
private final Domain fileModifiedTimeDomain;
Expand Down Expand Up @@ -152,6 +155,9 @@ public IcebergSplitSource(
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.recordScannedFiles = recordScannedFiles;
this.minimumAssignedSplitWeight = minimumAssignedSplitWeight;
this.projectedBaseColumns = tableHandle.getProjectedColumns().stream()
.map(column -> column.getBaseColumnIdentity().getId())
.collect(toImmutableSet());
this.dataColumnPredicate = tableHandle.getEnforcedPredicate().filter((column, domain) -> !isMetadataColumnId(column.getId()));
this.pathDomain = getPathDomain(tableHandle.getEnforcedPredicate());
checkArgument(
Expand Down Expand Up @@ -203,7 +209,9 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(int maxSize)
scan = scan.includeColumnStats();
}
this.fileScanIterable = closer.register(scan.planFiles());
this.targetSplitSize = tableScan.targetSplitSize();
this.targetSplitSize = getSplitSize(session)
.map(DataSize::toBytes)
.orElseGet(tableScan::targetSplitSize);
this.fileScanIterator = closer.register(fileScanIterable.iterator());
this.fileTasksIterator = emptyIterator();
}
Expand All @@ -219,7 +227,12 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(int maxSize)
while (splits.size() < maxSize && (fileTasksIterator.hasNext() || fileScanIterator.hasNext())) {
if (!fileTasksIterator.hasNext()) {
FileScanTask wholeFileTask = fileScanIterator.next();
fileTasksIterator = wholeFileTask.split(targetSplitSize).iterator();
if (wholeFileTask.deletes().isEmpty() && noDataColumnsProjected(wholeFileTask)) {
fileTasksIterator = List.of(wholeFileTask).iterator();
}
else {
fileTasksIterator = wholeFileTask.split(targetSplitSize).iterator();
}
fileHasAnyDeletions = false;
// In theory, .split() could produce empty iterator, so let's evaluate the outer loop condition again.
continue;
Expand Down Expand Up @@ -292,6 +305,15 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(int maxSize)
return completedFuture(new ConnectorSplitBatch(splits, isFinished()));
}

private boolean noDataColumnsProjected(FileScanTask fileScanTask)
{
return fileScanTask.spec().fields().stream()
.filter(partitionField -> partitionField.transform().isIdentity())
.map(PartitionField::sourceId)
.collect(toImmutableSet())
.containsAll(projectedBaseColumns);
}

private long getModificationTime(String path)
{
try {
Expand Down Expand Up @@ -451,6 +473,7 @@ private IcebergSplit toIcebergSplit(FileScanTask task)
task.start(),
task.length(),
task.file().fileSizeInBytes(),
task.file().recordCount(),
IcebergFileFormat.fromIceberg(task.file().format()),
PartitionSpecParser.toJson(task.spec()),
PartitionData.toJson(task.file().partition()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ else if (column.getId() == DATA_CHANGE_ORDINAL_ID) {
split.start(),
split.length(),
split.fileSize(),
split.fileRecordCount(),
split.partitionDataJson(),
split.fileFormat(),
functionHandle.nameMappingJson().map(NameMappingParser::fromJson));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.Multiset;
import com.google.inject.Key;
import io.trino.Session;
import io.trino.SystemSessionProperties;
import io.trino.filesystem.TrackingFileSystemFactory;
import io.trino.filesystem.TrackingFileSystemFactory.OperationType;
import io.trino.filesystem.hdfs.HdfsFileSystemFactory;
Expand Down Expand Up @@ -251,7 +252,6 @@ public void testReadWholePartition()
.add(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM))
.add(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH))
.add(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM))
.addCopies(new FileOperation(DATA, INPUT_FILE_NEW_STREAM), 4)
.build());

// Read partition column only, one partition only
Expand All @@ -263,7 +263,6 @@ public void testReadWholePartition()
.add(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM))
.add(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH))
.add(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM))
.addCopies(new FileOperation(DATA, INPUT_FILE_NEW_STREAM), 2)
.build());

// Read partition and synthetic columns
Expand All @@ -275,12 +274,72 @@ public void testReadWholePartition()
.add(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM))
.add(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH))
.add(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM))
// TODO return synthetic columns without opening the data files
.addCopies(new FileOperation(DATA, INPUT_FILE_NEW_STREAM), 4)
.build());

// Read only row count
assertFileSystemAccesses(
"SELECT count(*) FROM test_read_part_key",
ALL_FILES,
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(MANIFEST, INPUT_FILE_NEW_STREAM), 2)
.add(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM))
.add(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH))
.add(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM))
.build());

assertUpdate("DROP TABLE test_read_part_key");
}

@Test
public void testReadWholePartitionSplittableFile()
{
String catalog = getSession().getCatalog().orElseThrow();

assertUpdate("DROP TABLE IF EXISTS test_read_whole_splittable_file");
assertUpdate("CREATE TABLE test_read_whole_splittable_file(key varchar, data varchar) WITH (partitioning=ARRAY['key'])");

assertUpdate(
Session.builder(getSession())
.setSystemProperty(SystemSessionProperties.WRITER_SCALING_MIN_DATA_PROCESSED, "1PB")
.setCatalogSessionProperty(catalog, "parquet_writer_block_size", "1kB")
.setCatalogSessionProperty(catalog, "orc_writer_max_stripe_size", "1kB")
.setCatalogSessionProperty(catalog, "orc_writer_max_stripe_rows", "1000")
.build(),
"INSERT INTO test_read_whole_splittable_file SELECT 'single partition', comment FROM tpch.tiny.orders", 15000);

Session session = Session.builder(getSession())
.setCatalogSessionProperty(catalog, IcebergSessionProperties.SPLIT_SIZE, "1kB")
.build();

// Read partition column only
assertFileSystemAccesses(
session,
"SELECT key, count(*) FROM test_read_whole_splittable_file GROUP BY key",
ALL_FILES,
ImmutableMultiset.<FileOperation>builder()
.add(new FileOperation(MANIFEST, INPUT_FILE_NEW_STREAM))
.add(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM))
.add(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH))
.add(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM))
.build());

// Read only row count
assertFileSystemAccesses(
session,
"SELECT count(*) FROM test_read_whole_splittable_file",
ALL_FILES,
ImmutableMultiset.<FileOperation>builder()
.add(new FileOperation(MANIFEST, INPUT_FILE_NEW_STREAM))
.add(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM))
.add(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH))
.add(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM))
.build());

assertUpdate("DROP TABLE test_read_whole_splittable_file");
}

@Test
public void testSelectFromVersionedTable()
{
Expand Down
Loading

0 comments on commit b756564

Please sign in to comment.