diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 94c55e086283..8566f700f0a0 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -291,6 +291,7 @@ import static io.trino.plugin.iceberg.PartitionFields.parsePartitionFields; import static io.trino.plugin.iceberg.PartitionFields.toPartitionFields; import static io.trino.plugin.iceberg.SortFieldUtils.parseSortFields; +import static io.trino.plugin.iceberg.TableStatisticsReader.readNdvs; import static io.trino.plugin.iceberg.TableStatisticsWriter.StatsUpdateMode.INCREMENTAL_UPDATE; import static io.trino.plugin.iceberg.TableStatisticsWriter.StatsUpdateMode.REPLACE; import static io.trino.plugin.iceberg.TableType.DATA; @@ -346,6 +347,7 @@ import static org.apache.iceberg.SnapshotSummary.DELETED_RECORDS_PROP; import static org.apache.iceberg.SnapshotSummary.REMOVED_EQ_DELETES_PROP; import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP; +import static org.apache.iceberg.SnapshotSummary.TOTAL_RECORDS_PROP; import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL; import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT; import static org.apache.iceberg.TableProperties.FORMAT_VERSION; @@ -2461,21 +2463,34 @@ public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(Connector ConnectorTableHandle tableHandle = getTableHandle(session, tableMetadata.getTable(), Optional.empty(), Optional.empty()); if (tableHandle == null) { - // Assume new table (CTAS), collect all stats possible + // Assume new table (CTAS), collect NDV stats on all columns return getStatisticsCollectionMetadata(tableMetadata, Optional.empty(), availableColumnNames -> {}); } IcebergTableHandle table = checkValidTableHandle(tableHandle); - Schema schema = SchemaParser.fromJson(table.getTableSchemaJson()); - TableStatistics tableStatistics = getTableStatistics( - session, - table.withProjectedColumns(ImmutableSet.copyOf(getTopLevelColumns(schema, typeManager)))); - if (tableStatistics.getRowCount().getValue() == 0.0) { - // Table has no data (empty, or wiped out). Collect all stats possible + if (table.getSnapshotId().isEmpty()) { + // Table has no data (empty, or wiped out). Collect NDV stats on all columns + return getStatisticsCollectionMetadata(tableMetadata, Optional.empty(), availableColumnNames -> {}); + } + + Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); + long snapshotId = table.getSnapshotId().orElseThrow(); + Snapshot snapshot = icebergTable.snapshot(snapshotId); + String totalRecords = snapshot.summary().get(TOTAL_RECORDS_PROP); + if (totalRecords != null && Long.parseLong(totalRecords) == 0) { + // Table has no data (empty, or wiped out). Collect NDV stats on all columns return getStatisticsCollectionMetadata(tableMetadata, Optional.empty(), availableColumnNames -> {}); } - Set columnsWithExtendedStatistics = tableStatistics.getColumnStatistics().entrySet().stream() - .filter(entry -> !entry.getValue().getDistinctValuesCount().isUnknown()) - .map(entry -> ((IcebergColumnHandle) entry.getKey()).getName()) + + Schema schema = SchemaParser.fromJson(table.getTableSchemaJson()); + List columns = getTopLevelColumns(schema, typeManager); + Set columnIds = columns.stream() + .map(IcebergColumnHandle::getId) + .collect(toImmutableSet()); + Map ndvs = readNdvs(icebergTable, snapshotId, columnIds, true); + // Avoid collecting NDV stats on columns where we don't know the existing NDV count + Set columnsWithExtendedStatistics = columns.stream() + .filter(column -> ndvs.containsKey(column.getId())) + .map(IcebergColumnHandle::getName) .collect(toImmutableSet()); return getStatisticsCollectionMetadata(tableMetadata, Optional.of(columnsWithExtendedStatistics), availableColumnNames -> {}); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableStatisticsReader.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableStatisticsReader.java index 03b7b9f74dae..560689a1024e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableStatisticsReader.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableStatisticsReader.java @@ -240,7 +240,7 @@ else if (columnHandle.getBaseType() == VARBINARY) { return new TableStatistics(Estimate.of(recordCount), columnHandleBuilder.buildOrThrow()); } - private static Map readNdvs(Table icebergTable, long snapshotId, Set columnIds, boolean extendedStatisticsEnabled) + public static Map readNdvs(Table icebergTable, long snapshotId, Set columnIds, boolean extendedStatisticsEnabled) { if (!extendedStatisticsEnabled) { return ImmutableMap.of(); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAlluxioCacheFileOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAlluxioCacheFileOperations.java index 3425eefbdbc6..68c2d5688c46 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAlluxioCacheFileOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAlluxioCacheFileOperations.java @@ -97,9 +97,9 @@ public void testCacheFileOperations() .add(new CacheOperation("Alluxio.writeCache", METADATA_JSON)) .addCopies(new CacheOperation("Alluxio.readCached", SNAPSHOT), 2) .add(new CacheOperation("InputFile.length", SNAPSHOT)) - .add(new CacheOperation("Alluxio.readExternalStream", MANIFEST)) + .addCopies(new CacheOperation("Alluxio.readExternalStream", MANIFEST), 2) .addCopies(new CacheOperation("Alluxio.readCached", MANIFEST), 4) - .add(new CacheOperation("Alluxio.writeCache", MANIFEST)) + .addCopies(new CacheOperation("Alluxio.writeCache", MANIFEST), 2) .build()); assertFileSystemAccesses( @@ -129,9 +129,9 @@ public void testCacheFileOperations() .add(new CacheOperation("Alluxio.writeCache", METADATA_JSON)) .addCopies(new CacheOperation("Alluxio.readCached", SNAPSHOT), 2) .add(new CacheOperation("InputFile.length", SNAPSHOT)) - .add(new CacheOperation("Alluxio.readExternalStream", MANIFEST)) + .addCopies(new CacheOperation("Alluxio.readExternalStream", MANIFEST), 3) .addCopies(new CacheOperation("Alluxio.readCached", MANIFEST), 10) - .add(new CacheOperation("Alluxio.writeCache", MANIFEST)) + .addCopies(new CacheOperation("Alluxio.writeCache", MANIFEST), 3) .build()); assertFileSystemAccesses( "SELECT * FROM test_cache_file_operations", diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergFileOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergFileOperations.java index 1fa2431c60ee..e5eb5d5868b8 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergFileOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergFileOperations.java @@ -198,11 +198,41 @@ public void testCreateOrReplaceTableAsSelect() ImmutableMultiset.builder() .addCopies(new FileOperation(METADATA_JSON, "OutputFile.create"), 2) .addCopies(new FileOperation(METADATA_JSON, "InputFile.newStream"), 2) - .addCopies(new FileOperation(SNAPSHOT, "InputFile.newStream"), 2) - .addCopies(new FileOperation(SNAPSHOT, "InputFile.length"), 2) + .add(new FileOperation(SNAPSHOT, "InputFile.newStream")) + .add(new FileOperation(SNAPSHOT, "InputFile.length")) + .add(new FileOperation(SNAPSHOT, "OutputFile.create")) + .add(new FileOperation(MANIFEST, "OutputFile.create")) + .add(new FileOperation(STATS, "OutputFile.create")) + .build()); + } + + @Test + public void testInsert() + { + assertUpdate("CREATE TABLE test_insert (id VARCHAR, age INT)"); + + assertFileSystemAccesses( + "INSERT INTO test_insert VALUES('a', 1)", + ImmutableMultiset.builder() + .addCopies(new FileOperation(METADATA_JSON, "OutputFile.create"), 2) + .addCopies(new FileOperation(METADATA_JSON, "InputFile.newStream"), 3) + .addCopies(new FileOperation(SNAPSHOT, "InputFile.length"), 3) + .addCopies(new FileOperation(SNAPSHOT, "InputFile.newStream"), 3) + .add(new FileOperation(SNAPSHOT, "OutputFile.create")) + .add(new FileOperation(MANIFEST, "OutputFile.create")) + .add(new FileOperation(STATS, "OutputFile.create")) + .build()); + + assertFileSystemAccesses( + "INSERT INTO test_insert VALUES('b', 2)", + ImmutableMultiset.builder() + .addCopies(new FileOperation(METADATA_JSON, "OutputFile.create"), 2) + .addCopies(new FileOperation(METADATA_JSON, "InputFile.newStream"), 3) + .addCopies(new FileOperation(SNAPSHOT, "InputFile.newStream"), 3) + .addCopies(new FileOperation(SNAPSHOT, "InputFile.length"), 3) + .add(new FileOperation(STATS, "InputFile.newStream")) .add(new FileOperation(SNAPSHOT, "OutputFile.create")) .add(new FileOperation(MANIFEST, "OutputFile.create")) - .add(new FileOperation(MANIFEST, "InputFile.newStream")) .add(new FileOperation(STATS, "OutputFile.create")) .build()); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMemoryCacheFileOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMemoryCacheFileOperations.java index ceea916446e4..4492fee1c965 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMemoryCacheFileOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMemoryCacheFileOperations.java @@ -88,7 +88,7 @@ public void testCacheFileOperations() .add(new CacheOperation("FileSystemCache.cacheStream", METADATA_JSON)) .add(new CacheOperation("FileSystemCache.cacheLength", SNAPSHOT)) .add(new CacheOperation("FileSystemCache.cacheStream", SNAPSHOT)) - .add(new CacheOperation("Input.readTail", MANIFEST)) + .addCopies(new CacheOperation("Input.readTail", MANIFEST), 2) .addCopies(new CacheOperation("FileSystemCache.cacheStream", MANIFEST), 2) .build()); @@ -116,7 +116,7 @@ public void testCacheFileOperations() .add(new CacheOperation("FileSystemCache.cacheStream", METADATA_JSON)) .add(new CacheOperation("FileSystemCache.cacheLength", SNAPSHOT)) .add(new CacheOperation("FileSystemCache.cacheStream", SNAPSHOT)) - .add(new CacheOperation("Input.readTail", MANIFEST)) + .addCopies(new CacheOperation("Input.readTail", MANIFEST), 3) .addCopies(new CacheOperation("FileSystemCache.cacheStream", MANIFEST), 5) .build());