Skip to content

Commit

Permalink
Update iceberg to 0.13.1
Browse files Browse the repository at this point in the history
  • Loading branch information
homar committed Feb 24, 2022
1 parent 3b6e89c commit d032b67
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 8 deletions.
2 changes: 1 addition & 1 deletion plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<dep.iceberg.version>0.12.0</dep.iceberg.version>
<dep.iceberg.version>0.13.1</dep.iceberg.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect
{
InMemoryRecordSet.Builder table = InMemoryRecordSet.builder(COLUMNS);

Set<Long> ancestorIds = ImmutableSet.copyOf(SnapshotUtil.currentAncestors(icebergTable));
Set<Long> ancestorIds = ImmutableSet.copyOf(SnapshotUtil.currentAncestorIds(icebergTable));
TimeZoneKey timeZoneKey = session.getTimeZoneKey();
for (HistoryEntry historyEntry : icebergTable.history()) {
long snapshotId = historyEntry.snapshotId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public Metrics getMetrics()
private static Metrics computeMetrics(MetricsConfig metricsConfig, Schema icebergSchema, ColumnMetadata<OrcType> orcColumns, long fileRowCount, Optional<ColumnMetadata<ColumnStatistics>> columnStatistics)
{
if (columnStatistics.isEmpty()) {
return new Metrics(fileRowCount, null, null, null, null, null);
return new Metrics(fileRowCount, null, null, null, null, null, null);
}
// Columns that are descendants of LIST or MAP types are excluded because:
// 1. Their stats are not used by Apache Iceberg to filter out data files
Expand Down Expand Up @@ -153,6 +153,7 @@ private static Metrics computeMetrics(MetricsConfig metricsConfig, Schema iceber
null, // TODO: Add column size accounting to ORC column writers
valueCounts.isEmpty() ? null : valueCounts,
nullCounts.isEmpty() ? null : nullCounts,
null, // TODO: Add nanValueCounts to ORC writer
lowerBounds.isEmpty() ? null : lowerBounds,
upperBounds.isEmpty() ? null : upperBounds);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public MetricsWrapper(
@JsonProperty("columnSizes") Map<Integer, Long> columnSizes,
@JsonProperty("valueCounts") Map<Integer, Long> valueCounts,
@JsonProperty("nullValueCounts") Map<Integer, Long> nullValueCounts,
@JsonProperty("nanValueCounts") Map<Integer, Long> nanValueCounts,
@JsonProperty("lowerBounds") Map<Integer, ByteBuffer> lowerBounds,
@JsonProperty("upperBounds") Map<Integer, ByteBuffer> upperBounds)
{
Expand All @@ -40,6 +41,7 @@ public MetricsWrapper(
columnSizes,
valueCounts,
nullValueCounts,
nanValueCounts,
lowerBounds,
upperBounds));
}
Expand Down Expand Up @@ -78,6 +80,12 @@ public Map<Integer, Long> nullValueCounts()
return metrics.nullValueCounts();
}

@JsonProperty
public Map<Integer, Long> nanValueCounts()
{
return metrics.nanValueCounts();
}

@JsonProperty
public Map<Integer, ByteBuffer> lowerBounds()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import static org.apache.iceberg.TableMetadata.newTableMetadata;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH;
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;
import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION;
import static org.apache.iceberg.TableProperties.WRITE_NEW_DATA_LOCATION;
import static org.apache.iceberg.Transactions.createTableTransaction;
Expand Down Expand Up @@ -302,7 +303,8 @@ public boolean dropTable(ConnectorSession session, SchemaTableName schemaTableNa
Table table = loadTable(session, schemaTableName);
if (table.properties().containsKey(OBJECT_STORE_PATH) ||
table.properties().containsKey(WRITE_NEW_DATA_LOCATION) ||
table.properties().containsKey(WRITE_METADATA_LOCATION)) {
table.properties().containsKey(WRITE_METADATA_LOCATION) ||
table.properties().containsKey(WRITE_DATA_LOCATION)) {
throw new TrinoException(NOT_SUPPORTED, "Table " + schemaTableName + " contains Iceberg path override properties and cannot be dropped from Trino");
}
metastore.dropTable(schemaTableName.getSchemaName(), schemaTableName.getTableName(), purgeData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ protected void refreshFromMetadataLocation(String newLocation)
.retry(20)
.exponentialBackoff(100, 5000, 600000, 4.0)
.run(metadataLocation -> newMetadata.set(
TableMetadataParser.read(this, io().newInputFile(metadataLocation))));
TableMetadataParser.read(fileIo, io().newInputFile(metadataLocation))));

String newUUID = newMetadata.get().uuid();
if (currentMetadata != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
Expand Down Expand Up @@ -115,13 +116,17 @@ public void testV2TableWithPositionDelete()
String deleteFileName = "delete_file_" + UUID.randomUUID();
FileSystem fs = hdfsEnvironment.getFileSystem(new HdfsContext(SESSION), metadataDir);

PositionDeleteWriter<Record> writer = Parquet.writeDeletes(HadoopOutputFile.fromPath(new Path(metadataDir, deleteFileName), fs))
Path path = new Path(metadataDir, deleteFileName);
PositionDeleteWriter<Record> writer = Parquet.writeDeletes(HadoopOutputFile.fromPath(path, fs))
.createWriterFunc(GenericParquetWriter::buildWriter)
.forTable(icebergTable)
.overwrite()
.rowSchema(icebergTable.schema())
.withSpec(PartitionSpec.unpartitioned())
.buildPositionWriter();

try (Closeable ignored = writer) {
writer.delete(dataFilePath, 0);
writer.delete(dataFilePath, 0, GenericRecord.create(icebergTable.schema()));
}

icebergTable.newRowDelta().addDeletes(writer.toDeleteFile()).commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,19 @@ public void testRoundTrip()
Map<Integer, Long> columnSizes = ImmutableMap.of(3, 321L, 5, 543L);
Map<Integer, Long> valueCounts = ImmutableMap.of(7, 765L, 9, 987L);
Map<Integer, Long> nullValueCounts = ImmutableMap.of(2, 234L, 4, 456L);
Map<Integer, Long> nanValueCounts = ImmutableMap.of(1, 2L, 3, 4L);
Map<Integer, ByteBuffer> lowerBounds = ImmutableMap.of(13, ByteBuffer.wrap(new byte[] {0, 8, 9}));
Map<Integer, ByteBuffer> upperBounds = ImmutableMap.of(17, ByteBuffer.wrap(new byte[] {5, 4, 0}));

Metrics expected = new Metrics(recordCount, columnSizes, valueCounts, nullValueCounts, lowerBounds, upperBounds);
Metrics expected = new Metrics(recordCount, columnSizes, valueCounts, nullValueCounts, nanValueCounts, lowerBounds, upperBounds);

Metrics actual = CODEC.fromJson(CODEC.toJson(new MetricsWrapper(expected))).metrics();

assertEquals(actual.recordCount(), recordCount);
assertEquals(actual.columnSizes(), columnSizes);
assertEquals(actual.valueCounts(), valueCounts);
assertEquals(actual.nullValueCounts(), nullValueCounts);
assertEquals(actual.nanValueCounts(), nanValueCounts);
assertEquals(actual.lowerBounds(), lowerBounds);
assertEquals(actual.upperBounds(), upperBounds);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,34 @@ public void testTrinoWritingDataWithObjectStorageLocationProvider(StorageFormat
onSpark().executeQuery("DROP TABLE " + sparkTableName);
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion")
public void testTrinoWritingDataWithWriterDataPathSet(StorageFormat storageFormat, int specVersion)
{
String baseTableName = "test_writer_data_path_" + storageFormat;
String sparkTableName = sparkTableName(baseTableName);
String trinoTableName = trinoTableName(baseTableName);
String dataPath = "hdfs://hadoop-master:9000/user/hive/warehouse/test_writer_data_path_/obj-data";

onSpark().executeQuery(format("CREATE TABLE %s (_string STRING, _bigint BIGINT) USING ICEBERG TBLPROPERTIES (" +
"'write.data.path'='%s'," +
"'write.format.default' = '%s'," +
"'format-version' = %s)",
sparkTableName, dataPath, storageFormat, specVersion));
onTrino().executeQuery(format("INSERT INTO %s VALUES ('a_string', 1000000000000000)", trinoTableName));

Row result = row("a_string", 1000000000000000L);
assertThat(onSpark().executeQuery(format("SELECT _string, _bigint FROM %s", sparkTableName))).containsOnly(result);
assertThat(onTrino().executeQuery(format("SELECT _string, _bigint FROM %s", trinoTableName))).containsOnly(result);

QueryResult queryResult = onTrino().executeQuery(format("SELECT file_path FROM %s", trinoTableName("\"" + baseTableName + "$files\"")));
assertThat(queryResult).hasRowsCount(1).hasColumnsCount(1);
assertTrue(((String) queryResult.row(0).get(0)).contains(dataPath));

assertQueryFailure(() -> onTrino().executeQuery("DROP TABLE " + trinoTableName))
.hasMessageContaining("contains Iceberg path override properties and cannot be dropped from Trino");
onSpark().executeQuery("DROP TABLE " + sparkTableName);
}

private static final List<String> SPECIAL_CHARACTER_VALUES = ImmutableList.of(
"with-hyphen",
"with.dot",
Expand Down

0 comments on commit d032b67

Please sign in to comment.