Skip to content

Commit

Permalink
Support writing to Iceberg table after partition field dropped
Browse files Browse the repository at this point in the history
In v1 tables, removed partitioning fields are replaced with `void`
transformation to keep field ordinal numbers unchanged, and `void`
transformation was not supported when writing. This commit adds support
for writing with `void` transformation and, implicitly, for creating
tables with such transformation.
  • Loading branch information
findepi committed Aug 2, 2021
1 parent 33d8410 commit 4bb71c5
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public final class PartitionFields
private static final Pattern HOUR_PATTERN = Pattern.compile("hour" + FUNCTION_ARGUMENT_NAME);
private static final Pattern BUCKET_PATTERN = Pattern.compile("bucket" + FUNCTION_ARGUMENT_NAME_AND_INT);
private static final Pattern TRUNCATE_PATTERN = Pattern.compile("truncate" + FUNCTION_ARGUMENT_NAME_AND_INT);
private static final Pattern VOID_PATTERN = Pattern.compile("void" + FUNCTION_ARGUMENT_NAME);

private static final Pattern ICEBERG_BUCKET_PATTERN = Pattern.compile("bucket\\[(\\d+)]");
private static final Pattern ICEBERG_TRUNCATE_PATTERN = Pattern.compile("truncate\\[(\\d+)]");
Expand All @@ -65,7 +66,9 @@ public static void parsePartitionField(PartitionSpec.Builder builder, String fie
tryMatch(field, DAY_PATTERN, match -> builder.day(match.group(1))) ||
tryMatch(field, HOUR_PATTERN, match -> builder.hour(match.group(1))) ||
tryMatch(field, BUCKET_PATTERN, match -> builder.bucket(match.group(1), parseInt(match.group(2)))) ||
tryMatch(field, TRUNCATE_PATTERN, match -> builder.truncate(match.group(1), parseInt(match.group(2))));
tryMatch(field, TRUNCATE_PATTERN, match -> builder.truncate(match.group(1), parseInt(match.group(2)))) ||
tryMatch(field, VOID_PATTERN, match -> builder.alwaysNull(match.group(1))) ||
false;
if (!matched) {
throw new IllegalArgumentException("Invalid partition field declaration: " + field);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airlift.slice.Slices;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarcharType;
Expand All @@ -37,6 +38,7 @@
import static io.airlift.slice.SliceUtf8.offsetOfCodePoint;
import static io.trino.plugin.iceberg.util.Timestamps.getTimestampTz;
import static io.trino.plugin.iceberg.util.Timestamps.timestampTzToMicros;
import static io.trino.spi.predicate.Utils.nativeValueToBlock;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.DateType.DATE;
import static io.trino.spi.type.Decimals.encodeScaledValue;
Expand Down Expand Up @@ -117,6 +119,8 @@ public static ColumnTransform getColumnTransform(PartitionField field, Type type
return new ColumnTransform(INTEGER, PartitionTransforms::hoursFromTimestampWithTimeZone);
}
throw new UnsupportedOperationException("Unsupported type for 'hour': " + field);
case "void":
return new ColumnTransform(type, getVoidTransform(type));
}

Matcher matcher = BUCKET_PATTERN.matcher(transform);
Expand Down Expand Up @@ -497,6 +501,12 @@ private static Block truncateVarbinary(Block block, int max)
return builder.build();
}

public static Function<Block, Block> getVoidTransform(Type type)
{
Block nullBlock = nativeValueToBlock(type, null);
return block -> new RunLengthEncodedBlock(nullBlock, block.getPositionCount());
}

@VisibleForTesting
static long epochYear(long epochMilli)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,48 @@ public void testBucketTransform()
dropTable("test_bucket_transform");
}

@Test
public void testVoidTransform()
{
assertUpdate("CREATE TABLE test_void_transform (d VARCHAR, b BIGINT) WITH (partitioning = ARRAY['void(d)'])");
String values = "VALUES " +
"('abcd', 1)," +
"('abxy', 2)," +
"('ab598', 3)," +
"('mommy', 4)," +
"('Warsaw', 5)," +
"(NULL, 6)," +
"(NULL, 7)";
assertUpdate("INSERT INTO test_void_transform " + values, 7);
assertQuery("SELECT * FROM test_void_transform", values);

assertQuery("SELECT COUNT(*) FROM \"test_void_transform$partitions\"", "SELECT 1");
assertQuery(
"SELECT d_null, row_count, file_count, d.min, d.max, d.null_count, b.min, b.max, b.null_count FROM \"test_void_transform$partitions\"",
"VALUES (NULL, 7, 1, 'Warsaw', 'mommy', 2, 1, 7, 0)");

assertQuery(
"SELECT d, b FROM test_void_transform WHERE d IS NOT NULL",
"VALUES " +
"('abcd', 1)," +
"('abxy', 2)," +
"('ab598', 3)," +
"('mommy', 4)," +
"('Warsaw', 5)");

assertQuery("SELECT b FROM test_void_transform WHERE d IS NULL", "VALUES 6, 7");

assertThat(query("SHOW STATS FOR test_void_transform"))
.projected(0, 2, 3, 4, 5, 6) // ignore data size which is available for Parquet, but not for ORC
.skippingTypesCheck()
.matches("VALUES " +
" ('d', NULL, 0.2857142857142857, NULL, NULL, NULL), " +
" ('b', NULL, 0e0, NULL, '1', '7'), " +
" (NULL, NULL, NULL, 7e0, NULL, NULL)");

assertUpdate("DROP TABLE " + "test_void_transform");
}

@Test
public void testMetadataDeleteSimple()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public void testParse()
assertParse("bucket(order_key, 42)", partitionSpec(builder -> builder.bucket("order_key", 42)));
assertParse("truncate(comment, 13)", partitionSpec(builder -> builder.truncate("comment", 13)));
assertParse("truncate(order_key, 88)", partitionSpec(builder -> builder.truncate("order_key", 88)));
assertParse("void(order_key)", partitionSpec(builder -> builder.alwaysNull("order_key")));

assertInvalid("bucket()", "Invalid partition field declaration: bucket()");
assertInvalid("abc", "Cannot find source column: abc");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,17 @@ public void testDroppedPartitionField(boolean dropFirst)
assertQueryFailure(() -> onTrino().executeQuery("SELECT a, b, c.min, c.max, row_count FROM \"test_dropped_partition_field$partitions\""))
// TODO (https://github.com/trinodb/trino/issues/8729): cannot read from $partitions table because of duplicate column names
.hasMessageMatching("Query failed \\(#\\w+\\):\\Q Multiple entries with same key: " + (dropFirst ? "a=a and a=a" : "b=b and b=b"));

onTrino().executeQuery("INSERT INTO test_dropped_partition_field VALUES ('yet', 'another', 'row')");
assertThat(onTrino().executeQuery("SELECT * FROM test_dropped_partition_field"))
.containsOnly(
row("one", "small", "snake"),
row("one", "small", "rabbit"),
row("one", "big", "rabbit"),
row("another", "small", "snake"),
row("something", "completely", "else"),
row(null, null, "nothing"),
row("yet", "another", "row"));
}

@DataProvider
Expand Down

0 comments on commit 4bb71c5

Please sign in to comment.