Skip to content

Commit

Permalink
Allow executing optimize procedure for Iceberg v2 table
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed May 27, 2022
1 parent 7e566b9 commit 11a148f
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public class IcebergMetadata
{
private static final Logger log = Logger.get(IcebergMetadata.class);
private static final Pattern PATH_PATTERN = Pattern.compile("(.*)/[^/]+");
private static final int OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION = 1;
private static final int OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION = 2;
private static final int CLEANING_UP_PROCEDURES_MAX_SUPPORTED_TABLE_VERSION = 2;
private static final String RETENTION_THRESHOLD = "retention_threshold";
public static final Set<String> UPDATABLE_TABLE_PROPERTIES = ImmutableSet.of(FILE_FORMAT_PROPERTY, FORMAT_VERSION_PROPERTY, PARTITIONING_PROPERTY);
Expand Down Expand Up @@ -791,6 +791,7 @@ private Optional<ConnectorTableExecuteHandle> getTableHandleForOptimize(Connecto
tableHandle.getSchemaTableName(),
OPTIMIZE,
new IcebergOptimizeHandle(
tableHandle.getSnapshotId().orElseThrow(),
SchemaParser.toJson(icebergTable.schema()),
PartitionSpecParser.toJson(icebergTable.spec()),
getColumns(icebergTable.schema(), typeManager),
Expand Down Expand Up @@ -875,7 +876,6 @@ private BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandl

int tableFormatVersion = ((BaseTable) icebergTable).operations().current().formatVersion();
if (tableFormatVersion > OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION) {
// Currently, Optimize would fail when position deletes files are present in Iceberg table
throw new TrinoException(NOT_SUPPORTED, format(
"%s is not supported for Iceberg table format version > %d. Table %s format version is %s.",
OPTIMIZE.name(),
Expand Down Expand Up @@ -959,6 +959,9 @@ private void finishOptimize(ConnectorSession session, IcebergTableExecuteHandle
}
RewriteFiles rewriteFiles = transaction.newRewrite();
rewriteFiles.rewriteFiles(scannedFiles, newFiles);
// Table.snapshot method returns null if there is no matching snapshot
Snapshot snapshot = requireNonNull(icebergTable.snapshot(optimizeHandle.getSnapshotId()), "snapshot is null");
rewriteFiles.validateFromSnapshot(snapshot.snapshotId());
rewriteFiles.commit();
transaction.commitTransaction();
transaction = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
public class IcebergOptimizeHandle
extends IcebergProcedureHandle
{
private final long snapshotId;
private final String schemaAsJson;
private final String partitionSpecAsJson;
private final List<IcebergColumnHandle> tableColumns;
Expand All @@ -40,6 +41,7 @@ public class IcebergOptimizeHandle

@JsonCreator
public IcebergOptimizeHandle(
long snapshotId,
String schemaAsJson,
String partitionSpecAsJson,
List<IcebergColumnHandle> tableColumns,
Expand All @@ -48,6 +50,7 @@ public IcebergOptimizeHandle(
DataSize maxScannedFileSize,
boolean retriesEnabled)
{
this.snapshotId = snapshotId;
this.schemaAsJson = requireNonNull(schemaAsJson, "schemaAsJson is null");
this.partitionSpecAsJson = requireNonNull(partitionSpecAsJson, "partitionSpecAsJson is null");
this.tableColumns = ImmutableList.copyOf(requireNonNull(tableColumns, "tableColumns is null"));
Expand All @@ -57,6 +60,12 @@ public IcebergOptimizeHandle(
this.retriesEnabled = retriesEnabled;
}

@JsonProperty
public long getSnapshotId()
{
return snapshotId;
}

@JsonProperty
public String getSchemaAsJson()
{
Expand Down Expand Up @@ -103,6 +112,7 @@ public boolean isRetriesEnabled()
public String toString()
{
return toStringHelper(this)
.add("snapshotId", snapshotId)
.add("schemaAsJson", schemaAsJson)
.add("partitionSpecAsJson", partitionSpecAsJson)
.add("tableColumns", tableColumns)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3038,12 +3038,12 @@ public void testProjectionPushdownOnPartitionedTableWithComments()
assertUpdate("DROP TABLE IF EXISTS test_projection_pushdown_comments");
}

@Test
public void testOptimize()
@Test(dataProvider = "tableFormatVersion")
public void testOptimize(int formatVersion)
throws Exception
{
String tableName = "test_optimize_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " (key integer, value varchar) WITH (format_version = 1)");
assertUpdate("CREATE TABLE " + tableName + " (key integer, value varchar) WITH (format_version = " + formatVersion + ")");

// DistributedQueryRunner sets node-scheduler.include-coordinator by default, so include coordinator
int workerCount = getQueryRunner().getNodeCount();
Expand Down Expand Up @@ -3092,8 +3092,8 @@ public void testOptimize()
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testOptimizeForPartitionedTable()
@Test(dataProvider = "tableFormatVersion")
public void testOptimizeForPartitionedTable(int formatVersion)
throws IOException
{
// This test will have its own session to make sure partitioning is indeed forced and is not a result
Expand All @@ -3105,7 +3105,7 @@ public void testOptimizeForPartitionedTable()
.setSystemProperty("preferred_write_partitioning_min_number_of_partitions", "100")
.build();
String tableName = "test_repartitiong_during_optimize_" + randomTableSuffix();
assertUpdate(session, "CREATE TABLE " + tableName + " (key varchar, value integer) WITH (format_version = 1, partitioning = ARRAY['key'])");
assertUpdate(session, "CREATE TABLE " + tableName + " (key varchar, value integer) WITH (format_version = " + formatVersion + ", partitioning = ARRAY['key'])");
// optimize an empty table
assertQuerySucceeds(session, "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");

Expand Down Expand Up @@ -3136,6 +3136,41 @@ public void testOptimizeForPartitionedTable()
assertUpdate("DROP TABLE " + tableName);
}

@DataProvider
public Object[][] tableFormatVersion()
{
return IntStream.rangeClosed(IcebergConfig.FORMAT_VERSION_SUPPORT_MIN, IcebergConfig.FORMAT_VERSION_SUPPORT_MAX).boxed()
.collect(DataProviders.toDataProvider());
}

@Test
public void testOptimizeTableAfterDeleteWithFormatVersion2()
{
String tableName = "test_optimize_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM nation", 25);

List<String> initialFiles = getActiveFiles(tableName);

assertUpdate("DELETE FROM " + tableName + " WHERE nationkey = 7", 1);

// Verify that delete files exists
assertQuery(
"SELECT summary['total-delete-files'] FROM \"" + tableName + "$snapshots\" WHERE snapshot_id = " + getCurrentSnapshotId(tableName),
"VALUES '1'");

computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");

List<String> updatedFiles = getActiveFiles(tableName);
assertThat(updatedFiles)
.hasSize(1)
.isNotEqualTo(initialFiles);

assertThat(query("SELECT * FROM " + tableName))
.matches("SELECT * FROM nation WHERE nationkey != 7");

assertUpdate("DROP TABLE " + tableName);
}

private List<String> getActiveFiles(String tableName)
{
return computeActual(format("SELECT file_path FROM \"%s$files\"", tableName)).getOnlyColumn()
Expand Down Expand Up @@ -3531,6 +3566,11 @@ private List<Long> getSnapshotIds(String tableName)
.collect(toUnmodifiableList());
}

private long getCurrentSnapshotId(String tableName)
{
return (long) computeScalar("SELECT snapshot_id FROM \"" + tableName + "$snapshots\" ORDER BY committed_at DESC LIMIT 1");
}

private Path getIcebergTableDataPath(String tableName)
{
return getIcebergTablePath(tableName, "data");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1587,19 +1587,19 @@ public void testMissingMetrics()
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS})
public void testOptimizeFailsOnV2IcebergTable()
public void testOptimizeOnV2IcebergTable()
{
String tableName = format("test_optimize_fails_on_v2_iceberg_table_%s", randomTableSuffix());
String tableName = format("test_optimize_on_v2_iceberg_table_%s", randomTableSuffix());
String sparkTableName = sparkTableName(tableName);
String trinoTableName = trinoTableName(tableName);

onSpark().executeQuery("CREATE TABLE " + sparkTableName + "(a INT, b INT) " +
"USING ICEBERG PARTITIONED BY (b) " +
"TBLPROPERTIES ('format-version'='2', 'write.delete.mode'='merge-on-read')");
onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (1, 2), (2, 2), (3, 2), (11, 12), (12, 12), (13, 12)");
onTrino().executeQuery(format("ALTER TABLE %s EXECUTE OPTIMIZE", trinoTableName));

assertQueryFailure(() -> onTrino().executeQuery(format("ALTER TABLE %s EXECUTE OPTIMIZE", trinoTableName)))
.hasMessageContaining("is not supported for Iceberg table format version > 1");
assertThat(onSpark().executeQuery("SELECT * FROM " + sparkTableName))
.containsOnly(row(1, 2), row(2, 2), row(3, 2), row(11, 12), row(12, 12), row(13, 12));
}

private static String escapeSparkString(String value)
Expand Down Expand Up @@ -1742,15 +1742,15 @@ public void testSparkReadsTrinoTableAfterCleaningUp(StorageFormat storageFormat)
onTrino().executeQuery("DROP TABLE " + trinoTableName);
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats")
public void testSparkReadsTrinoTableAfterOptimizeAndCleaningUp(StorageFormat storageFormat)
@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion")
public void testSparkReadsTrinoTableAfterOptimizeAndCleaningUp(StorageFormat storageFormat, int specVersion)
{
String baseTableName = "test_spark_reads_trino_partitioned_table_after_expiring_snapshots_after_optimize" + storageFormat;
String trinoTableName = trinoTableName(baseTableName);
String sparkTableName = sparkTableName(baseTableName);
onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName);

onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (partitioning = ARRAY['_string'], format = '%s', format_version = 1)", trinoTableName, storageFormat));
onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (partitioning = ARRAY['_string'], format = '%s', format_version = %s)", trinoTableName, storageFormat, specVersion));
// separate inserts give us snapshot per insert
onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1001)", trinoTableName));
onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1002)", trinoTableName));
Expand Down Expand Up @@ -1786,15 +1786,15 @@ public void testSparkReadsTrinoTableAfterOptimizeAndCleaningUp(StorageFormat sto
onTrino().executeQuery("DROP TABLE " + trinoTableName);
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats")
public void testTrinoReadsTrinoTableWithSparkDeletesAfterOptimizeAndCleanUp(StorageFormat storageFormat)
@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion")
public void testTrinoReadsTrinoTableWithSparkDeletesAfterOptimizeAndCleanUp(StorageFormat storageFormat, int specVersion)
{
String baseTableName = "test_spark_reads_trino_partitioned_table_with_deletes_after_expiring_snapshots_after_optimize" + storageFormat;
String trinoTableName = trinoTableName(baseTableName);
String sparkTableName = sparkTableName(baseTableName);
onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName);

onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (partitioning = ARRAY['_string'], format = '%s', format_version = 1)", trinoTableName, storageFormat));
onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (partitioning = ARRAY['_string'], format = '%s', format_version = %s)", trinoTableName, storageFormat, specVersion));
// separate inserts give us snapshot per insert
onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1001)", trinoTableName));
onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1002)", trinoTableName));
Expand Down

0 comments on commit 11a148f

Please sign in to comment.