Skip to content

Commit

Permalink
Keep consistent the ordering of partition values to the partition keys
Browse files Browse the repository at this point in the history
The previous logic was depending on the ordering of the columns
in the schema of the table and not on the ordering of the partition
keys which could have produced inconsistent partition values
for tables with composed partition keys.
  • Loading branch information
findinpath authored and ebyhr committed Mar 28, 2024
1 parent d11ce0f commit 4ae73be
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,11 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.function.Function;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Iterables.getOnlyElement;
Expand Down Expand Up @@ -155,19 +157,23 @@ public ConnectorPageSource createPageSource(
ColumnMappingMode columnMappingMode = getColumnMappingMode(table.getMetadataEntry(), table.getProtocolEntry());
Optional<List<String>> partitionValues = Optional.empty();
if (deltaLakeColumns.stream().anyMatch(column -> column.getBaseColumnName().equals(ROW_ID_COLUMN_NAME))) {
// using ArrayList because partition values can be null
partitionValues = Optional.of(new ArrayList<>());
for (DeltaLakeColumnMetadata column : extractSchema(table.getMetadataEntry(), table.getProtocolEntry(), typeManager)) {
Map<String, DeltaLakeColumnMetadata> columnsMetadataByName = extractSchema(table.getMetadataEntry(), table.getProtocolEntry(), typeManager).stream()
.collect(toImmutableMap(DeltaLakeColumnMetadata::getName, Function.identity()));
for (String partitionColumnName : table.getMetadataEntry().getOriginalPartitionColumns()) {
DeltaLakeColumnMetadata partitionColumn = columnsMetadataByName.get(partitionColumnName);
checkState(partitionColumn != null, "Partition column %s not found", partitionColumnName);
Optional<String> value = switch (columnMappingMode) {
case NONE:
yield partitionKeys.get(column.getName());
yield partitionKeys.get(partitionColumn.getName());
case ID, NAME:
yield partitionKeys.get(column.getPhysicalName());
yield partitionKeys.get(partitionColumn.getPhysicalName());
default:
throw new IllegalStateException("Unknown column mapping mode");
};
if (value != null) {
partitionValues.get().add(value.orElse(null));
}
// Fill partition values in the same order as the partition columns are specified in the table definition
partitionValues.get().add(value.orElse(null));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,36 @@ public void testPartitionColumnOrderIsDifferentFromTableDefinition()
assertQuery("SELECT * FROM " + tableName, "VALUES (1, 'first#1', 'second#1'), (2, 'first#2', NULL), (3, NULL, 'second#3'), (4, NULL, NULL)");
}

@Test
public void testPartialFilterWhenPartitionColumnOrderIsDifferentFromTableDefinition()
{
testPartialFilterWhenPartitionColumnOrderIsDifferentFromTableDefinition(ColumnMappingMode.ID);
testPartialFilterWhenPartitionColumnOrderIsDifferentFromTableDefinition(ColumnMappingMode.NAME);
testPartialFilterWhenPartitionColumnOrderIsDifferentFromTableDefinition(ColumnMappingMode.NONE);
}

private void testPartialFilterWhenPartitionColumnOrderIsDifferentFromTableDefinition(ColumnMappingMode columnMappingMode)
{
try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_delete_with_partial_filter_composed_partition",
"(_bigint BIGINT, _date DATE, _varchar VARCHAR) WITH (column_mapping_mode='" + columnMappingMode + "', partitioned_by = ARRAY['_varchar', '_date'])")) {
assertUpdate("INSERT INTO " + table.getName() + " VALUES (1, CAST('2019-09-10' AS DATE), 'a'), (2, CAST('2019-09-10' AS DATE), 'a')", 2);
assertUpdate("INSERT INTO " + table.getName() + " VALUES (3, null, 'c'), (4, CAST('2019-09-08' AS DATE), 'd')", 2);
assertUpdate("UPDATE " + table.getName() + " SET _bigint = 10 WHERE _bigint = BIGINT '1'", 1);
assertUpdate("DELETE FROM " + table.getName() + " WHERE _date = DATE '2019-09-08'", 1);

assertQuery(
"SELECT * FROM " + table.getName(),
"""
VALUES
(10, DATE '2019-09-10', 'a'),
(2, DATE '2019-09-10', 'a'),
(3, null, 'c')
""");
}
}

@Test
public void testCreateTableWithAllPartitionColumns()
{
Expand Down

0 comments on commit 4ae73be

Please sign in to comment.