diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index b18e65f219ad..beac48fc875a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -65,7 +65,10 @@ import io.trino.plugin.iceberg.delete.PositionDeleteFilter; import io.trino.plugin.iceberg.delete.RowPredicate; import io.trino.plugin.iceberg.fileio.ForwardingInputFile; +import io.trino.spi.Page; import io.trino.spi.TrinoException; +import io.trino.spi.block.Block; +import io.trino.spi.block.RunLengthEncodedBlock; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorPageSource; import io.trino.spi.connector.ConnectorPageSourceProvider; @@ -75,6 +78,7 @@ import io.trino.spi.connector.ConnectorTransactionHandle; import io.trino.spi.connector.DynamicFilter; import io.trino.spi.connector.EmptyPageSource; +import io.trino.spi.connector.FixedPageSource; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.NullableValue; import io.trino.spi.predicate.Range; @@ -136,6 +140,7 @@ import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Maps.uniqueIndex; +import static io.airlift.slice.SizeOf.SIZE_OF_LONG; import static io.airlift.slice.Slices.utf8Slice; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static io.trino.orc.OrcReader.INITIAL_BATCH_SIZE; @@ -177,6 +182,7 @@ import static io.trino.plugin.iceberg.delete.EqualityDeleteFilter.readEqualityDeletes; import static io.trino.plugin.iceberg.delete.PositionDeleteFilter.readPositionDeletes; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static io.trino.spi.block.PageBuilderStatus.DEFAULT_MAX_PAGE_SIZE_IN_BYTES; import static io.trino.spi.predicate.Utils.nativeValueToBlock; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.BooleanType.BOOLEAN; @@ -185,6 +191,8 @@ import static io.trino.spi.type.TimeZoneKey.UTC_KEY; import static io.trino.spi.type.UuidType.UUID; import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.lang.Math.min; +import static java.lang.Math.toIntExact; import static java.lang.String.format; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; @@ -205,6 +213,12 @@ public class IcebergPageSourceProvider { private static final String AVRO_FIELD_ID = "field-id"; + // This is used whenever a query doesn't reference any data columns. + // We need to limit the number of rows per page in case there are projections + // in the query that can cause page sizes to explode. For example: SELECT rand() FROM some_table + // TODO (https://github.com/trinodb/trino/issues/16824) allow connector to return pages of arbitrary row count and handle this gracefully in engine + private static final int MAX_RLE_PAGE_SIZE = DEFAULT_MAX_PAGE_SIZE_IN_BYTES / SIZE_OF_LONG; + private final TrinoFileSystemFactory fileSystemFactory; private final FileFormatDataSourceStats fileFormatDataSourceStats; private final OrcReaderOptions orcReaderOptions; @@ -259,6 +273,7 @@ public ConnectorPageSource createPageSource( split.getStart(), split.getLength(), split.getFileSize(), + split.getFileRecordCount(), split.getPartitionDataJson(), split.getFileFormat(), tableHandle.getNameMappingJson().map(NameMappingParser::fromJson)); @@ -277,6 +292,7 @@ public ConnectorPageSource createPageSource( long start, long length, long fileSize, + long fileRecordCount, String partitionDataJson, IcebergFileFormat fileFormat, Optional nameMapping) @@ -330,6 +346,21 @@ else if (identity.getId() == TRINO_MERGE_PARTITION_DATA) { ? fileSystem.newInputFile(Location.of(path), fileSize) : fileSystem.newInputFile(Location.of(path)); + try { + if (effectivePredicate.isAll() && + start == 0 && length == inputfile.length() && + deletes.isEmpty() && + icebergColumns.stream().allMatch(column -> partitionKeys.containsKey(column.getId()))) { + return generatePages( + fileRecordCount, + icebergColumns, + partitionKeys); + } + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + ReaderPageSourceWithRowPositions readerPageSourceWithRowPositions = createDataPageSource( session, inputfile, @@ -575,6 +606,41 @@ public ReaderPageSourceWithRowPositions createDataPageSource( } } + private static ConnectorPageSource generatePages( + long totalRowCount, + List icebergColumns, + Map> partitionKeys) + { + int maxPageSize = MAX_RLE_PAGE_SIZE; + Block[] pageBlocks = new Block[icebergColumns.size()]; + for (int i = 0; i < icebergColumns.size(); i++) { + IcebergColumnHandle column = icebergColumns.get(i); + Type trinoType = column.getType(); + Object partitionValue = deserializePartitionValue(trinoType, partitionKeys.get(column.getId()).orElse(null), column.getName()); + pageBlocks[i] = RunLengthEncodedBlock.create(nativeValueToBlock(trinoType, partitionValue), maxPageSize); + } + Page maxPage = new Page(maxPageSize, pageBlocks); + + return new FixedPageSource( + new AbstractIterator<>() + { + private long rowIndex; + + @Override + protected Page computeNext() + { + if (rowIndex == totalRowCount) { + return endOfData(); + } + int pageSize = toIntExact(min(maxPageSize, totalRowCount - rowIndex)); + Page page = maxPage.getRegion(0, pageSize); + rowIndex += pageSize; + return page; + } + }, + maxPage.getRetainedSizeInBytes()); + } + private static ReaderPageSourceWithRowPositions createOrcPageSource( TrinoInputFile inputFile, long start, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java index e92877bbb247..b79dbd43596f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java @@ -54,6 +54,7 @@ public final class IcebergSessionProperties implements SessionPropertiesProvider { + public static final String SPLIT_SIZE = "experimental_split_size"; private static final String COMPRESSION_CODEC = "compression_codec"; private static final String USE_FILE_SIZE_FROM_METADATA = "use_file_size_from_metadata"; private static final String ORC_BLOOM_FILTERS_ENABLED = "orc_bloom_filters_enabled"; @@ -103,6 +104,13 @@ public IcebergSessionProperties( ParquetWriterConfig parquetWriterConfig) { sessionProperties = ImmutableList.>builder() + .add(dataSizeProperty( + SPLIT_SIZE, + "Target split size", + // Note: this is null by default & hidden, currently mainly for tests. + // See https://github.com/trinodb/trino/issues/9018#issuecomment-1752929193 for further discussion. + null, + true)) .add(enumProperty( COMPRESSION_CODEC, "Compression codec to use when writing files", @@ -404,6 +412,11 @@ public static DataSize getOrcWriterMaxDictionaryMemory(ConnectorSession session) return session.getProperty(ORC_WRITER_MAX_DICTIONARY_MEMORY, DataSize.class); } + public static Optional getSplitSize(ConnectorSession session) + { + return Optional.ofNullable(session.getProperty(SPLIT_SIZE, DataSize.class)); + } + public static HiveCompressionCodec getCompressionCodec(ConnectorSession session) { return session.getProperty(COMPRESSION_CODEC, HiveCompressionCodec.class); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java index 20719a9d8dfb..3bc609c03bce 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java @@ -40,6 +40,7 @@ public class IcebergSplit private final long start; private final long length; private final long fileSize; + private final long fileRecordCount; private final IcebergFileFormat fileFormat; private final String partitionSpecJson; private final String partitionDataJson; @@ -52,6 +53,7 @@ public IcebergSplit( @JsonProperty("start") long start, @JsonProperty("length") long length, @JsonProperty("fileSize") long fileSize, + @JsonProperty("fileRecordCount") long fileRecordCount, @JsonProperty("fileFormat") IcebergFileFormat fileFormat, @JsonProperty("partitionSpecJson") String partitionSpecJson, @JsonProperty("partitionDataJson") String partitionDataJson, @@ -62,6 +64,7 @@ public IcebergSplit( this.start = start; this.length = length; this.fileSize = fileSize; + this.fileRecordCount = fileRecordCount; this.fileFormat = requireNonNull(fileFormat, "fileFormat is null"); this.partitionSpecJson = requireNonNull(partitionSpecJson, "partitionSpecJson is null"); this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null"); @@ -106,6 +109,12 @@ public long getFileSize() return fileSize; } + @JsonProperty + public long getFileRecordCount() + { + return fileRecordCount; + } + @JsonProperty public IcebergFileFormat getFileFormat() { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java index c93f04629cf7..8d65fa52748e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java @@ -41,6 +41,7 @@ import io.trino.spi.type.TypeManager; import jakarta.annotation.Nullable; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; import org.apache.iceberg.TableScan; @@ -76,6 +77,7 @@ import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnHandle; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR; import static io.trino.plugin.iceberg.IcebergMetadataColumn.isMetadataColumnId; +import static io.trino.plugin.iceberg.IcebergSessionProperties.getSplitSize; import static io.trino.plugin.iceberg.IcebergSplitManager.ICEBERG_DOMAIN_COMPACTION_THRESHOLD; import static io.trino.plugin.iceberg.IcebergTypes.convertIcebergValueToTrino; import static io.trino.plugin.iceberg.IcebergUtil.getColumnHandle; @@ -110,6 +112,7 @@ public class IcebergSplitSource private final TypeManager typeManager; private final Closer closer = Closer.create(); private final double minimumAssignedSplitWeight; + private final Set projectedBaseColumns; private final TupleDomain dataColumnPredicate; private final Domain pathDomain; private final Domain fileModifiedTimeDomain; @@ -152,6 +155,9 @@ public IcebergSplitSource( this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.recordScannedFiles = recordScannedFiles; this.minimumAssignedSplitWeight = minimumAssignedSplitWeight; + this.projectedBaseColumns = tableHandle.getProjectedColumns().stream() + .map(column -> column.getBaseColumnIdentity().getId()) + .collect(toImmutableSet()); this.dataColumnPredicate = tableHandle.getEnforcedPredicate().filter((column, domain) -> !isMetadataColumnId(column.getId())); this.pathDomain = getPathDomain(tableHandle.getEnforcedPredicate()); checkArgument( @@ -203,7 +209,9 @@ public CompletableFuture getNextBatch(int maxSize) scan = scan.includeColumnStats(); } this.fileScanIterable = closer.register(scan.planFiles()); - this.targetSplitSize = tableScan.targetSplitSize(); + this.targetSplitSize = getSplitSize(session) + .map(DataSize::toBytes) + .orElseGet(tableScan::targetSplitSize); this.fileScanIterator = closer.register(fileScanIterable.iterator()); this.fileTasksIterator = emptyIterator(); } @@ -219,7 +227,12 @@ public CompletableFuture getNextBatch(int maxSize) while (splits.size() < maxSize && (fileTasksIterator.hasNext() || fileScanIterator.hasNext())) { if (!fileTasksIterator.hasNext()) { FileScanTask wholeFileTask = fileScanIterator.next(); - fileTasksIterator = wholeFileTask.split(targetSplitSize).iterator(); + if (wholeFileTask.deletes().isEmpty() && noDataColumnsProjected(wholeFileTask)) { + fileTasksIterator = List.of(wholeFileTask).iterator(); + } + else { + fileTasksIterator = wholeFileTask.split(targetSplitSize).iterator(); + } fileHasAnyDeletions = false; // In theory, .split() could produce empty iterator, so let's evaluate the outer loop condition again. continue; @@ -292,6 +305,15 @@ public CompletableFuture getNextBatch(int maxSize) return completedFuture(new ConnectorSplitBatch(splits, isFinished())); } + private boolean noDataColumnsProjected(FileScanTask fileScanTask) + { + return fileScanTask.spec().fields().stream() + .filter(partitionField -> partitionField.transform().isIdentity()) + .map(PartitionField::sourceId) + .collect(toImmutableSet()) + .containsAll(projectedBaseColumns); + } + private long getModificationTime(String path) { try { @@ -451,6 +473,7 @@ private IcebergSplit toIcebergSplit(FileScanTask task) task.start(), task.length(), task.file().fileSizeInBytes(), + task.file().recordCount(), IcebergFileFormat.fromIceberg(task.file().format()), PartitionSpecParser.toJson(task.spec()), PartitionData.toJson(task.file().partition()), diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunctionProcessor.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunctionProcessor.java index 2227a07fbd97..7271a025ace5 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunctionProcessor.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunctionProcessor.java @@ -124,6 +124,7 @@ else if (column.getId() == DATA_CHANGE_ORDINAL_ID) { split.start(), split.length(), split.fileSize(), + split.fileRecordCount(), split.partitionDataJson(), split.fileFormat(), functionHandle.nameMappingJson().map(NameMappingParser::fromJson)); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java index 1aba3b8ed814..d60e5e906bb2 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java @@ -18,6 +18,7 @@ import com.google.common.collect.Multiset; import com.google.inject.Key; import io.trino.Session; +import io.trino.SystemSessionProperties; import io.trino.filesystem.TrackingFileSystemFactory; import io.trino.filesystem.TrackingFileSystemFactory.OperationType; import io.trino.filesystem.hdfs.HdfsFileSystemFactory; @@ -251,7 +252,6 @@ public void testReadWholePartition() .add(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM)) .add(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH)) .add(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM)) - .addCopies(new FileOperation(DATA, INPUT_FILE_NEW_STREAM), 4) .build()); // Read partition column only, one partition only @@ -263,7 +263,6 @@ public void testReadWholePartition() .add(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM)) .add(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH)) .add(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM)) - .addCopies(new FileOperation(DATA, INPUT_FILE_NEW_STREAM), 2) .build()); // Read partition and synthetic columns @@ -275,12 +274,72 @@ public void testReadWholePartition() .add(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM)) .add(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH)) .add(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM)) + // TODO return synthetic columns without opening the data files .addCopies(new FileOperation(DATA, INPUT_FILE_NEW_STREAM), 4) .build()); + // Read only row count + assertFileSystemAccesses( + "SELECT count(*) FROM test_read_part_key", + ALL_FILES, + ImmutableMultiset.builder() + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_NEW_STREAM), 2) + .add(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM)) + .add(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH)) + .add(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM)) + .build()); + assertUpdate("DROP TABLE test_read_part_key"); } + @Test + public void testReadWholePartitionSplittableFile() + { + String catalog = getSession().getCatalog().orElseThrow(); + + assertUpdate("DROP TABLE IF EXISTS test_read_whole_splittable_file"); + assertUpdate("CREATE TABLE test_read_whole_splittable_file(key varchar, data varchar) WITH (partitioning=ARRAY['key'])"); + + assertUpdate( + Session.builder(getSession()) + .setSystemProperty(SystemSessionProperties.WRITER_SCALING_MIN_DATA_PROCESSED, "1PB") + .setCatalogSessionProperty(catalog, "parquet_writer_block_size", "1kB") + .setCatalogSessionProperty(catalog, "orc_writer_max_stripe_size", "1kB") + .setCatalogSessionProperty(catalog, "orc_writer_max_stripe_rows", "1000") + .build(), + "INSERT INTO test_read_whole_splittable_file SELECT 'single partition', comment FROM tpch.tiny.orders", 15000); + + Session session = Session.builder(getSession()) + .setCatalogSessionProperty(catalog, IcebergSessionProperties.SPLIT_SIZE, "1kB") + .build(); + + // Read partition column only + assertFileSystemAccesses( + session, + "SELECT key, count(*) FROM test_read_whole_splittable_file GROUP BY key", + ALL_FILES, + ImmutableMultiset.builder() + .add(new FileOperation(MANIFEST, INPUT_FILE_NEW_STREAM)) + .add(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM)) + .add(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH)) + .add(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM)) + .build()); + + // Read only row count + assertFileSystemAccesses( + session, + "SELECT count(*) FROM test_read_whole_splittable_file", + ALL_FILES, + ImmutableMultiset.builder() + .add(new FileOperation(MANIFEST, INPUT_FILE_NEW_STREAM)) + .add(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM)) + .add(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH)) + .add(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM)) + .build()); + + assertUpdate("DROP TABLE test_read_whole_splittable_file"); + } + @Test public void testSelectFromVersionedTable() { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java index ad0d299ed73b..4e44345bf34e 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java @@ -160,6 +160,7 @@ private static ConnectorPageSource createTestingPageSource(HiveTransactionHandle 0, inputFile.length(), inputFile.length(), + -1, // invalid; normally known ORC, PartitionSpecParser.toJson(PartitionSpec.unpartitioned()), PartitionData.toJson(new PartitionData(new Object[] {})), diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java index 5cd773aa27fa..fee9595614f9 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java @@ -22,11 +22,16 @@ import io.trino.plugin.hive.TrinoViewHiveMetastore; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore; +import io.trino.plugin.hive.orc.OrcReaderConfig; +import io.trino.plugin.hive.orc.OrcWriterConfig; +import io.trino.plugin.hive.parquet.ParquetReaderConfig; +import io.trino.plugin.hive.parquet.ParquetWriterConfig; import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider; import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog; import io.trino.spi.connector.CatalogHandle; import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.DynamicFilter; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.predicate.Domain; @@ -38,6 +43,7 @@ import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.DistributedQueryRunner; import io.trino.testing.QueryRunner; +import io.trino.testing.TestingConnectorSession; import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Table; @@ -67,7 +73,6 @@ import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory; import static io.trino.spi.connector.Constraint.alwaysTrue; import static io.trino.spi.type.BigintType.BIGINT; -import static io.trino.testing.TestingConnectorSession.SESSION; import static io.trino.tpch.TpchTable.NATION; import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; @@ -79,6 +84,16 @@ public class TestIcebergSplitSource extends AbstractTestQueryFramework { + private static final ConnectorSession SESSION = TestingConnectorSession.builder() + .setPropertyMetadata(new IcebergSessionProperties( + new IcebergConfig(), + new OrcReaderConfig(), + new OrcWriterConfig(), + new ParquetReaderConfig(), + new ParquetWriterConfig()) + .getSessionProperties()) + .build(); + private File metastoreDir; private TrinoFileSystemFactory fileSystemFactory; private TrinoCatalog catalog;