Skip to content

Commit

Permalink
Support updating Iceberg table partitioning
Browse files Browse the repository at this point in the history
  • Loading branch information
alexjo2144 authored and findepi committed May 23, 2022
1 parent dacd302 commit 86db89c
Show file tree
Hide file tree
Showing 4 changed files with 272 additions and 20 deletions.
9 changes: 9 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -251,13 +251,22 @@ The following table properties can be updated after a table is created:

* ``format``
* ``format_version``
* ``partitioning``

For example, to update a table from v1 of the Iceberg specification to v2:

.. code-block:: sql
ALTER TABLE table_name SET PROPERTIES format_version = 2;
Or to set the column ``my_new_partition_column`` as a partition column on a table:

.. code-block:: sql
ALTER TABLE table_name SET PROPERTIES partitioning = ARRAY[<existing partition columns>, 'my_new_partition_column'];
The current values of a table's properties can be shown using :doc:`SHOW CREATE TABLE </sql/show-create-table>`.

.. _iceberg-type-mapping:

Type mapping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,11 @@
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdatePartitionSpec;
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Term;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -207,6 +210,7 @@ public class IcebergMetadata
private static final int OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION = 1;
private static final int CLEANING_UP_PROCEDURES_MAX_SUPPORTED_TABLE_VERSION = 2;
private static final String RETENTION_THRESHOLD = "retention_threshold";
public static final Set<String> UPDATABLE_TABLE_PROPERTIES = ImmutableSet.of(FILE_FORMAT_PROPERTY, FORMAT_VERSION_PROPERTY, PARTITIONING_PROPERTY);

private final TypeManager typeManager;
private final JsonCodec<CommitTaskData> commitTaskCodec;
Expand Down Expand Up @@ -1161,44 +1165,78 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
BaseTable icebergTable = (BaseTable) catalog.loadTable(session, table.getSchemaTableName());

Set<String> unsupportedProperties = Sets.difference(properties.keySet(), UPDATABLE_TABLE_PROPERTIES);
if (!unsupportedProperties.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "The following properties cannot be updated: " + String.join(", ", unsupportedProperties));
}

transaction = icebergTable.newTransaction();
UpdateProperties updateProperties = transaction.updateProperties();

for (Map.Entry<String, Optional<Object>> propertyEntry : properties.entrySet()) {
String trinoPropertyName = propertyEntry.getKey();
Optional<Object> propertyValue = propertyEntry.getValue();
if (properties.containsKey(FILE_FORMAT_PROPERTY)) {
IcebergFileFormat fileFormat = (IcebergFileFormat) properties.get(FILE_FORMAT_PROPERTY)
.orElseThrow(() -> new IllegalArgumentException("The format property cannot be empty"));
updateProperties.defaultFormat(fileFormat.toIceberg());
}

switch (trinoPropertyName) {
case FILE_FORMAT_PROPERTY:
updateProperties.defaultFormat(((IcebergFileFormat) propertyValue.orElseThrow()).toIceberg());
break;
case FORMAT_VERSION_PROPERTY:
// UpdateProperties#commit will trigger any necessary metadata updates required for the new spec version
updateProperty(updateProperties, FORMAT_VERSION, propertyValue, formatVersion -> Integer.toString((int) formatVersion));
break;
default:
// TODO: Support updating partitioning https://github.com/trinodb/trino/issues/12174
throw new TrinoException(NOT_SUPPORTED, "Updating the " + trinoPropertyName + " property is not supported");
}
if (properties.containsKey(FORMAT_VERSION_PROPERTY)) {
// UpdateProperties#commit will trigger any necessary metadata updates required for the new spec version
int formatVersion = (int) properties.get(FORMAT_VERSION_PROPERTY)
.orElseThrow(() -> new IllegalArgumentException("The format_version property cannot be empty"));
updateProperties.set(FORMAT_VERSION, Integer.toString((int) formatVersion));
}

try {
updateProperties.commit();
}
catch (RuntimeException e) {
throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to set new property values", e);
}

if (properties.containsKey(PARTITIONING_PROPERTY)) {
@SuppressWarnings("unchecked")
List<String> partitionColumns = (List<String>) properties.get(PARTITIONING_PROPERTY)
.orElseThrow(() -> new IllegalArgumentException("The partitioning property cannot be empty"));
updatePartitioning(icebergTable, transaction, partitionColumns);
}

try {
transaction.commitTransaction();
}
catch (RuntimeException e) {
throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to commit new table properties", e);
}
}

private static void updateProperty(UpdateProperties updateProperties, String icebergPropertyName, Optional<Object> value, Function<Object, String> toIcebergString)
private static void updatePartitioning(Table icebergTable, Transaction transaction, List<String> partitionColumns)
{
if (value.isPresent()) {
updateProperties.set(icebergPropertyName, toIcebergString.apply(value.get()));
UpdatePartitionSpec updatePartitionSpec = transaction.updateSpec();
Set<PartitionField> existingPartitionFields = icebergTable.spec().fields().stream().collect(toImmutableSet());
Schema schema = icebergTable.schema();
if (partitionColumns.isEmpty()) {
existingPartitionFields.stream()
.map(partitionField -> toIcebergTerm(schema, partitionField))
.forEach(updatePartitionSpec::removeField);
}
else {
updateProperties.remove(icebergPropertyName);
Set<PartitionField> partitionFields = ImmutableSet.copyOf(parsePartitionFields(schema, partitionColumns).fields());
Sets.difference(existingPartitionFields, partitionFields).forEach(partitionField -> updatePartitionSpec.removeField(partitionField.name()));
Sets.difference(partitionFields, existingPartitionFields).stream()
.map(partitionField -> toIcebergTerm(schema, partitionField))
.forEach(updatePartitionSpec::addField);
}

try {
updatePartitionSpec.commit();
}
catch (RuntimeException e) {
throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to set new partitioning value", e);
}
}

private static Term toIcebergTerm(Schema schema, PartitionField partitionField)
{
return Expressions.transform(schema.findColumnName(partitionField.sourceId()), partitionField.transform());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.MaterializedRow;
import io.trino.testing.QueryRunner;
import io.trino.tpch.TpchTable;
import org.testng.annotations.Test;

import java.util.List;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.testing.sql.TestTable.randomTableSuffix;
import static java.lang.Math.toIntExact;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;

public class TestIcebergPartitionEvolution
extends AbstractTestQueryFramework
{
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
return IcebergQueryRunner.builder()
.setInitialTables(ImmutableList.of(TpchTable.NATION))
.build();
}

@Test
public void testRemovePartitioning()
{
String tableName = "test_remove_partition_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['regionkey', 'truncate(name, 1)']) AS SELECT * FROM nation WHERE nationkey < 10", 10);
assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY[]");
assertUpdate("INSERT INTO " + tableName + " SELECT * FROM nation WHERE nationkey >= 10", 15);

List<MaterializedRow> files = computeActual("SELECT file_path, record_count FROM \"" + tableName + "$files\"").getMaterializedRows();
List<MaterializedRow> unpartitionedFiles = files.stream()
.filter(file -> !((String) file.getField(0)).contains("regionkey="))
.collect(toImmutableList());

List<MaterializedRow> partitionedFiles = files.stream()
.filter(file -> ((String) file.getField(0)).contains("regionkey="))
.collect(toImmutableList());

int expectedFileCount = computeActual("SELECT DISTINCT regionkey, substring(name, 1, 1) FROM nation WHERE nationkey < 10").getRowCount();
assertThat(partitionedFiles).hasSize(expectedFileCount);
assertEquals(partitionedFiles.stream().mapToLong(row -> (long) row.getField(1)).sum(), 10L);

assertThat(unpartitionedFiles).hasSize(1);
assertEquals((long) unpartitionedFiles.get(0).getField(1), 15);

assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation");
// Most partitions have one record each. regionkey=2, trunc_name=I has two records, and 15 records are unpartitioned
assertQuery("SELECT record_count, count(*) FROM \"" + tableName + "$partitions\" GROUP BY record_count", "VALUES (1, 8), (2, 1), (15, 1)");
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testAddPartitionColumn()
{
String tableName = "test_add_partition_column_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['regionkey']) AS SELECT * FROM nation WHERE nationkey < 10", 10);
assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY['regionkey', 'truncate(name, 1)']");
assertUpdate("INSERT INTO " + tableName + " SELECT * FROM nation WHERE nationkey >= 10", 15);
assertThat((String) computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue()).contains("partitioning = ARRAY['regionkey','truncate(name, 1)']");

List<MaterializedRow> files = computeActual("SELECT file_path, record_count FROM \"" + tableName + "$files\"").getMaterializedRows();
List<MaterializedRow> initialFiles = files.stream()
.filter(file -> !((String) file.getField(0)).contains("name_trunc"))
.collect(toImmutableList());

List<MaterializedRow> partitionedFiles = files.stream()
.filter(file -> ((String) file.getField(0)).contains("name_trunc"))
.collect(toImmutableList());

int expectedInitialFiles = toIntExact((long) computeActual("SELECT count(distinct regionkey) FROM nation WHERE nationkey < 10").getOnlyValue());
assertThat(initialFiles).hasSize(expectedInitialFiles);
assertEquals(initialFiles.stream().mapToLong(row -> (long) row.getField(1)).sum(), 10L);

int expectedFinalFileCount = computeActual("SELECT DISTINCT regionkey, substring(name, 1, 1) FROM nation WHERE nationkey >= 10").getRowCount();
assertThat(partitionedFiles).hasSize(expectedFinalFileCount);
assertEquals(partitionedFiles.stream().mapToLong(row -> (long) row.getField(1)).sum(), 15L);

assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation");
assertUpdate("DROP TABLE " + tableName);

assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['truncate(name, 1)']) AS SELECT * FROM nation WHERE nationkey < 10", 10);
assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY['truncate(name, 1)', 'regionkey']");
assertUpdate("INSERT INTO " + tableName + " SELECT * FROM nation WHERE nationkey >= 10", 15);
assertThat((String) computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue()).contains("partitioning = ARRAY['truncate(name, 1)','regionkey']");

files = computeActual("SELECT file_path, record_count FROM \"" + tableName + "$files\"").getMaterializedRows();
initialFiles = files.stream()
.filter(file -> !((String) file.getField(0)).contains("regionkey="))
.collect(toImmutableList());

partitionedFiles = files.stream()
.filter(file -> ((String) file.getField(0)).contains("regionkey="))
.collect(toImmutableList());

expectedInitialFiles = toIntExact((long) computeActual("SELECT DISTINCT substring(name, 1, 1) FROM nation WHERE nationkey < 10").getRowCount());
assertThat(initialFiles).hasSize(expectedInitialFiles);
assertEquals(initialFiles.stream().mapToLong(row -> (long) row.getField(1)).sum(), 10L);

expectedFinalFileCount = computeActual("SELECT DISTINCT regionkey, substring(name, 1, 1) FROM nation WHERE nationkey >= 10").getRowCount();
assertThat(partitionedFiles).hasSize(expectedFinalFileCount);
assertEquals(partitionedFiles.stream().mapToLong(row -> (long) row.getField(1)).sum(), 15L);

assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation");
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testChangePartitionTransform()
{
String tableName = "test_change_partition_transform_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " (ts, a) WITH (partitioning = ARRAY['year(ts)']) " +
"AS VALUES (TIMESTAMP '2021-01-01 01:01:01.111111', 1), (TIMESTAMP '2022-02-02 02:02:02.222222', 2), (TIMESTAMP '2023-03-03 03:03:03.333333', 3)", 3);
assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY['month(ts)']");
assertUpdate("INSERT INTO " + tableName + " VALUES (TIMESTAMP '2024-04-04 04:04:04.444444', 4), (TIMESTAMP '2025-05-05 05:05:05.555555', 5)", 2);
assertThat((String) computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue()).contains("partitioning = ARRAY['month(ts)']");

List<MaterializedRow> files = computeActual("SELECT file_path, record_count FROM \"" + tableName + "$files\"").getMaterializedRows();
List<MaterializedRow> yearPartitionedFiles = files.stream()
.filter(file -> {
String filePath = ((String) file.getField(0));
return filePath.contains("ts_year") && !filePath.contains("ts_month");
})
.collect(toImmutableList());

List<MaterializedRow> monthPartitionedFiles = files.stream()
.filter(file -> {
String filePath = ((String) file.getField(0));
return !filePath.contains("ts_year") && filePath.contains("ts_month");
})
.collect(toImmutableList());

assertThat(yearPartitionedFiles).hasSize(3);
assertThat(monthPartitionedFiles).hasSize(2);
assertUpdate("DROP TABLE " + tableName);
}
}
Loading

0 comments on commit 86db89c

Please sign in to comment.