diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index bff6cf13350c..2b538f476f27 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -15,7 +15,7 @@ ${project.parent.basedir} - 0.12.0 + 0.13.1 diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/HistoryTable.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/HistoryTable.java index a07be09532ca..25cb9a81588b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/HistoryTable.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/HistoryTable.java @@ -75,7 +75,7 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect { InMemoryRecordSet.Builder table = InMemoryRecordSet.builder(COLUMNS); - Set ancestorIds = ImmutableSet.copyOf(SnapshotUtil.currentAncestors(icebergTable)); + Set ancestorIds = ImmutableSet.copyOf(SnapshotUtil.currentAncestorIds(icebergTable)); TimeZoneKey timeZoneKey = session.getTimeZoneKey(); for (HistoryEntry historyEntry : icebergTable.history()) { long snapshotId = historyEntry.snapshotId(); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergOrcFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergOrcFileWriter.java index 5097f5af7a4c..75e743da3b0f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergOrcFileWriter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergOrcFileWriter.java @@ -104,7 +104,7 @@ public Metrics getMetrics() private static Metrics computeMetrics(MetricsConfig metricsConfig, Schema icebergSchema, ColumnMetadata orcColumns, long fileRowCount, Optional> columnStatistics) { if (columnStatistics.isEmpty()) { - return new Metrics(fileRowCount, null, null, null, null, null); + return new Metrics(fileRowCount, null, null, null, null, null, null); } // Columns that are descendants of LIST or MAP types are excluded because: // 1. Their stats are not used by Apache Iceberg to filter out data files @@ -153,6 +153,7 @@ private static Metrics computeMetrics(MetricsConfig metricsConfig, Schema iceber null, // TODO: Add column size accounting to ORC column writers valueCounts.isEmpty() ? null : valueCounts, nullCounts.isEmpty() ? null : nullCounts, + null, // TODO: Add nanValueCounts to ORC writer lowerBounds.isEmpty() ? null : lowerBounds, upperBounds.isEmpty() ? null : upperBounds); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/MetricsWrapper.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/MetricsWrapper.java index 1cc2fd2cb253..4ba6a41cefbb 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/MetricsWrapper.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/MetricsWrapper.java @@ -32,6 +32,7 @@ public MetricsWrapper( @JsonProperty("columnSizes") Map columnSizes, @JsonProperty("valueCounts") Map valueCounts, @JsonProperty("nullValueCounts") Map nullValueCounts, + @JsonProperty("nanValueCounts") Map nanValueCounts, @JsonProperty("lowerBounds") Map lowerBounds, @JsonProperty("upperBounds") Map upperBounds) { @@ -40,6 +41,7 @@ public MetricsWrapper( columnSizes, valueCounts, nullValueCounts, + nanValueCounts, lowerBounds, upperBounds)); } @@ -78,6 +80,12 @@ public Map nullValueCounts() return metrics.nullValueCounts(); } + @JsonProperty + public Map nanValueCounts() + { + return metrics.nanValueCounts(); + } + @JsonProperty public Map lowerBounds() { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoHiveCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoHiveCatalog.java index 5a9376388581..7b76ee1bccf1 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoHiveCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoHiveCatalog.java @@ -103,6 +103,7 @@ import static org.apache.iceberg.TableMetadata.newTableMetadata; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH; +import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION; import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION; import static org.apache.iceberg.TableProperties.WRITE_NEW_DATA_LOCATION; import static org.apache.iceberg.Transactions.createTableTransaction; @@ -302,7 +303,8 @@ public boolean dropTable(ConnectorSession session, SchemaTableName schemaTableNa Table table = loadTable(session, schemaTableName); if (table.properties().containsKey(OBJECT_STORE_PATH) || table.properties().containsKey(WRITE_NEW_DATA_LOCATION) || - table.properties().containsKey(WRITE_METADATA_LOCATION)) { + table.properties().containsKey(WRITE_METADATA_LOCATION) || + table.properties().containsKey(WRITE_DATA_LOCATION)) { throw new TrinoException(NOT_SUPPORTED, "Table " + schemaTableName + " contains Iceberg path override properties and cannot be dropped from Trino"); } metastore.dropTable(schemaTableName.getSchemaName(), schemaTableName.getTableName(), purgeData); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractMetastoreTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractMetastoreTableOperations.java index 061de97c205a..5f9522668cc0 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractMetastoreTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractMetastoreTableOperations.java @@ -291,7 +291,7 @@ protected void refreshFromMetadataLocation(String newLocation) .retry(20) .exponentialBackoff(100, 5000, 600000, 4.0) .run(metadataLocation -> newMetadata.set( - TableMetadataParser.read(this, io().newInputFile(metadataLocation)))); + TableMetadataParser.read(fileIo, io().newInputFile(metadataLocation)))); String newUUID = newMetadata.get().uuid(); if (currentMetadata != null) { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java index de93d56af7f7..8f8047296c84 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.BaseTable; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; @@ -115,13 +116,17 @@ public void testV2TableWithPositionDelete() String deleteFileName = "delete_file_" + UUID.randomUUID(); FileSystem fs = hdfsEnvironment.getFileSystem(new HdfsContext(SESSION), metadataDir); - PositionDeleteWriter writer = Parquet.writeDeletes(HadoopOutputFile.fromPath(new Path(metadataDir, deleteFileName), fs)) + Path path = new Path(metadataDir, deleteFileName); + PositionDeleteWriter writer = Parquet.writeDeletes(HadoopOutputFile.fromPath(path, fs)) + .createWriterFunc(GenericParquetWriter::buildWriter) .forTable(icebergTable) .overwrite() + .rowSchema(icebergTable.schema()) + .withSpec(PartitionSpec.unpartitioned()) .buildPositionWriter(); try (Closeable ignored = writer) { - writer.delete(dataFilePath, 0); + writer.delete(dataFilePath, 0, GenericRecord.create(icebergTable.schema())); } icebergTable.newRowDelta().addDeletes(writer.toDeleteFile()).commit(); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestMetricsWrapper.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestMetricsWrapper.java index 3bc82cf4ab75..aef802b37d3b 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestMetricsWrapper.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestMetricsWrapper.java @@ -43,10 +43,11 @@ public void testRoundTrip() Map columnSizes = ImmutableMap.of(3, 321L, 5, 543L); Map valueCounts = ImmutableMap.of(7, 765L, 9, 987L); Map nullValueCounts = ImmutableMap.of(2, 234L, 4, 456L); + Map nanValueCounts = ImmutableMap.of(1, 2L, 3, 4L); Map lowerBounds = ImmutableMap.of(13, ByteBuffer.wrap(new byte[] {0, 8, 9})); Map upperBounds = ImmutableMap.of(17, ByteBuffer.wrap(new byte[] {5, 4, 0})); - Metrics expected = new Metrics(recordCount, columnSizes, valueCounts, nullValueCounts, lowerBounds, upperBounds); + Metrics expected = new Metrics(recordCount, columnSizes, valueCounts, nullValueCounts, nanValueCounts, lowerBounds, upperBounds); Metrics actual = CODEC.fromJson(CODEC.toJson(new MetricsWrapper(expected))).metrics(); @@ -54,6 +55,7 @@ public void testRoundTrip() assertEquals(actual.columnSizes(), columnSizes); assertEquals(actual.valueCounts(), valueCounts); assertEquals(actual.nullValueCounts(), nullValueCounts); + assertEquals(actual.nanValueCounts(), nanValueCounts); assertEquals(actual.lowerBounds(), lowerBounds); assertEquals(actual.upperBounds(), upperBounds); } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java index 544a0c2d6e2e..dbdc3ca19137 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java @@ -848,6 +848,34 @@ public void testTrinoWritingDataWithObjectStorageLocationProvider(StorageFormat onSpark().executeQuery("DROP TABLE " + sparkTableName); } + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") + public void testTrinoWritingDataWithWriterDataPathSet(StorageFormat storageFormat, int specVersion) + { + String baseTableName = "test_writer_data_path_" + storageFormat; + String sparkTableName = sparkTableName(baseTableName); + String trinoTableName = trinoTableName(baseTableName); + String dataPath = "hdfs://hadoop-master:9000/user/hive/warehouse/test_writer_data_path_/obj-data"; + + onSpark().executeQuery(format("CREATE TABLE %s (_string STRING, _bigint BIGINT) USING ICEBERG TBLPROPERTIES (" + + "'write.data.path'='%s'," + + "'write.format.default' = '%s'," + + "'format-version' = %s)", + sparkTableName, dataPath, storageFormat, specVersion)); + onTrino().executeQuery(format("INSERT INTO %s VALUES ('a_string', 1000000000000000)", trinoTableName)); + + Row result = row("a_string", 1000000000000000L); + assertThat(onSpark().executeQuery(format("SELECT _string, _bigint FROM %s", sparkTableName))).containsOnly(result); + assertThat(onTrino().executeQuery(format("SELECT _string, _bigint FROM %s", trinoTableName))).containsOnly(result); + + QueryResult queryResult = onTrino().executeQuery(format("SELECT file_path FROM %s", trinoTableName("\"" + baseTableName + "$files\""))); + assertThat(queryResult).hasRowsCount(1).hasColumnsCount(1); + assertTrue(((String) queryResult.row(0).get(0)).contains(dataPath)); + + assertQueryFailure(() -> onTrino().executeQuery("DROP TABLE " + trinoTableName)) + .hasMessageContaining("contains Iceberg path override properties and cannot be dropped from Trino"); + onSpark().executeQuery("DROP TABLE " + sparkTableName); + } + private static final List SPECIAL_CHARACTER_VALUES = ImmutableList.of( "with-hyphen", "with.dot",