Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update iceberg to 0.13.1 #11032

Merged
merged 2 commits into from
Feb 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<dep.iceberg.version>0.12.0</dep.iceberg.version>
<dep.iceberg.version>0.13.1</dep.iceberg.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect
{
InMemoryRecordSet.Builder table = InMemoryRecordSet.builder(COLUMNS);

Set<Long> ancestorIds = ImmutableSet.copyOf(SnapshotUtil.currentAncestors(icebergTable));
Set<Long> ancestorIds = ImmutableSet.copyOf(SnapshotUtil.currentAncestorIds(icebergTable));
TimeZoneKey timeZoneKey = session.getTimeZoneKey();
for (HistoryEntry historyEntry : icebergTable.history()) {
long snapshotId = historyEntry.snapshotId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public Metrics getMetrics()
private static Metrics computeMetrics(MetricsConfig metricsConfig, Schema icebergSchema, ColumnMetadata<OrcType> orcColumns, long fileRowCount, Optional<ColumnMetadata<ColumnStatistics>> columnStatistics)
{
if (columnStatistics.isEmpty()) {
return new Metrics(fileRowCount, null, null, null, null, null);
return new Metrics(fileRowCount, null, null, null, null, null, null);
}
// Columns that are descendants of LIST or MAP types are excluded because:
// 1. Their stats are not used by Apache Iceberg to filter out data files
Expand Down Expand Up @@ -153,6 +153,7 @@ private static Metrics computeMetrics(MetricsConfig metricsConfig, Schema iceber
null, // TODO: Add column size accounting to ORC column writers
valueCounts.isEmpty() ? null : valueCounts,
nullCounts.isEmpty() ? null : nullCounts,
null, // TODO: Add nanValueCounts to ORC writer
lowerBounds.isEmpty() ? null : lowerBounds,
upperBounds.isEmpty() ? null : upperBounds);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public MetricsWrapper(
@JsonProperty("columnSizes") Map<Integer, Long> columnSizes,
@JsonProperty("valueCounts") Map<Integer, Long> valueCounts,
@JsonProperty("nullValueCounts") Map<Integer, Long> nullValueCounts,
@JsonProperty("nanValueCounts") Map<Integer, Long> nanValueCounts,
@JsonProperty("lowerBounds") Map<Integer, ByteBuffer> lowerBounds,
@JsonProperty("upperBounds") Map<Integer, ByteBuffer> upperBounds)
{
Expand All @@ -40,6 +41,7 @@ public MetricsWrapper(
columnSizes,
valueCounts,
nullValueCounts,
nanValueCounts,
lowerBounds,
upperBounds));
}
Expand Down Expand Up @@ -78,6 +80,12 @@ public Map<Integer, Long> nullValueCounts()
return metrics.nullValueCounts();
}

@JsonProperty
public Map<Integer, Long> nanValueCounts()
{
return metrics.nanValueCounts();
}

@JsonProperty
public Map<Integer, ByteBuffer> lowerBounds()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import static org.apache.iceberg.TableMetadata.newTableMetadata;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH;
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;
import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION;
import static org.apache.iceberg.TableProperties.WRITE_NEW_DATA_LOCATION;
import static org.apache.iceberg.Transactions.createTableTransaction;
Expand Down Expand Up @@ -281,13 +282,13 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
listNamespaces(session, namespace)
.stream()
.flatMap(schema -> Stream.concat(
// Get tables with parameter table_type set to "ICEBERG" or "iceberg". This is required because
// Trino uses lowercase value whereas Spark and Flink use uppercase.
// TODO: use one metastore call to pass both the filters: https://github.com/trinodb/trino/issues/7710
metastore.getTablesWithParameter(schema, TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toLowerCase(Locale.ENGLISH)).stream()
.map(table -> new SchemaTableName(schema, table)),
metastore.getTablesWithParameter(schema, TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH)).stream()
.map(table -> new SchemaTableName(schema, table)))
// Get tables with parameter table_type set to "ICEBERG" or "iceberg". This is required because
// Trino uses lowercase value whereas Spark and Flink use uppercase.
// TODO: use one metastore call to pass both the filters: https://github.com/trinodb/trino/issues/7710
metastore.getTablesWithParameter(schema, TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toLowerCase(Locale.ENGLISH)).stream()
.map(table -> new SchemaTableName(schema, table)),
metastore.getTablesWithParameter(schema, TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH)).stream()
.map(table -> new SchemaTableName(schema, table)))
.distinct()) // distinct() to avoid duplicates for case-insensitive HMS backends
.forEach(tablesListBuilder::add);

Expand All @@ -302,7 +303,8 @@ public boolean dropTable(ConnectorSession session, SchemaTableName schemaTableNa
Table table = loadTable(session, schemaTableName);
if (table.properties().containsKey(OBJECT_STORE_PATH) ||
table.properties().containsKey(WRITE_NEW_DATA_LOCATION) ||
table.properties().containsKey(WRITE_METADATA_LOCATION)) {
table.properties().containsKey(WRITE_METADATA_LOCATION) ||
table.properties().containsKey(WRITE_DATA_LOCATION)) {
throw new TrinoException(NOT_SUPPORTED, "Table " + schemaTableName + " contains Iceberg path override properties and cannot be dropped from Trino");
}
metastore.dropTable(schemaTableName.getSchemaName(), schemaTableName.getTableName(), purgeData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ protected void refreshFromMetadataLocation(String newLocation)
.retry(20)
.exponentialBackoff(100, 5000, 600000, 4.0)
.run(metadataLocation -> newMetadata.set(
TableMetadataParser.read(this, io().newInputFile(metadataLocation))));
TableMetadataParser.read(fileIo, io().newInputFile(metadataLocation))));

String newUUID = newMetadata.get().uuid();
if (currentMetadata != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
Expand Down Expand Up @@ -115,13 +116,17 @@ public void testV2TableWithPositionDelete()
String deleteFileName = "delete_file_" + UUID.randomUUID();
FileSystem fs = hdfsEnvironment.getFileSystem(new HdfsContext(SESSION), metadataDir);

PositionDeleteWriter<Record> writer = Parquet.writeDeletes(HadoopOutputFile.fromPath(new Path(metadataDir, deleteFileName), fs))
Path path = new Path(metadataDir, deleteFileName);
PositionDeleteWriter<Record> writer = Parquet.writeDeletes(HadoopOutputFile.fromPath(path, fs))
.createWriterFunc(GenericParquetWriter::buildWriter)
.forTable(icebergTable)
.overwrite()
.rowSchema(icebergTable.schema())
.withSpec(PartitionSpec.unpartitioned())
.buildPositionWriter();

try (Closeable ignored = writer) {
writer.delete(dataFilePath, 0);
writer.delete(dataFilePath, 0, GenericRecord.create(icebergTable.schema()));
}

icebergTable.newRowDelta().addDeletes(writer.toDeleteFile()).commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,19 @@ public void testRoundTrip()
Map<Integer, Long> columnSizes = ImmutableMap.of(3, 321L, 5, 543L);
Map<Integer, Long> valueCounts = ImmutableMap.of(7, 765L, 9, 987L);
Map<Integer, Long> nullValueCounts = ImmutableMap.of(2, 234L, 4, 456L);
Map<Integer, Long> nanValueCounts = ImmutableMap.of(1, 2L, 3, 4L);
Map<Integer, ByteBuffer> lowerBounds = ImmutableMap.of(13, ByteBuffer.wrap(new byte[] {0, 8, 9}));
Map<Integer, ByteBuffer> upperBounds = ImmutableMap.of(17, ByteBuffer.wrap(new byte[] {5, 4, 0}));

Metrics expected = new Metrics(recordCount, columnSizes, valueCounts, nullValueCounts, lowerBounds, upperBounds);
Metrics expected = new Metrics(recordCount, columnSizes, valueCounts, nullValueCounts, nanValueCounts, lowerBounds, upperBounds);

Metrics actual = CODEC.fromJson(CODEC.toJson(new MetricsWrapper(expected))).metrics();

assertEquals(actual.recordCount(), recordCount);
assertEquals(actual.columnSizes(), columnSizes);
assertEquals(actual.valueCounts(), valueCounts);
assertEquals(actual.nullValueCounts(), nullValueCounts);
assertEquals(actual.nanValueCounts(), nanValueCounts);
assertEquals(actual.lowerBounds(), lowerBounds);
assertEquals(actual.upperBounds(), upperBounds);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,34 @@ public void testTrinoWritingDataWithObjectStorageLocationProvider(StorageFormat
onSpark().executeQuery("DROP TABLE " + sparkTableName);
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion")
public void testTrinoWritingDataWithWriterDataPathSet(StorageFormat storageFormat, int specVersion)
{
String baseTableName = "test_writer_data_path_" + storageFormat;
String sparkTableName = sparkTableName(baseTableName);
String trinoTableName = trinoTableName(baseTableName);
String dataPath = "hdfs://hadoop-master:9000/user/hive/warehouse/test_writer_data_path_/obj-data";

onSpark().executeQuery(format("CREATE TABLE %s (_string STRING, _bigint BIGINT) USING ICEBERG TBLPROPERTIES (" +
"'write.data.path'='%s'," +
"'write.format.default' = '%s'," +
"'format-version' = %s)",
sparkTableName, dataPath, storageFormat, specVersion));
onTrino().executeQuery(format("INSERT INTO %s VALUES ('a_string', 1000000000000000)", trinoTableName));

Row result = row("a_string", 1000000000000000L);
assertThat(onSpark().executeQuery(format("SELECT _string, _bigint FROM %s", sparkTableName))).containsOnly(result);
assertThat(onTrino().executeQuery(format("SELECT _string, _bigint FROM %s", trinoTableName))).containsOnly(result);

QueryResult queryResult = onTrino().executeQuery(format("SELECT file_path FROM %s", trinoTableName("\"" + baseTableName + "$files\"")));
assertThat(queryResult).hasRowsCount(1).hasColumnsCount(1);
assertTrue(((String) queryResult.row(0).get(0)).contains(dataPath));

assertQueryFailure(() -> onTrino().executeQuery("DROP TABLE " + trinoTableName))
.hasMessageContaining("contains Iceberg path override properties and cannot be dropped from Trino");
onSpark().executeQuery("DROP TABLE " + sparkTableName);
}

private static final List<String> SPECIAL_CHARACTER_VALUES = ImmutableList.of(
"with-hyphen",
"with.dot",
Expand Down