Skip to content

Commit

Permalink
Tweak validation of missing data columns in add_files_from_table proc…
Browse files Browse the repository at this point in the history
…edure
  • Loading branch information
ebyhr committed Oct 9, 2024
1 parent 59f9f84 commit c8f1bd0
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1521,8 +1521,11 @@ private Optional<ConnectorTableExecuteHandle> getTableHandleForAddFilesFromTable
Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName());

checkProcedureArgument(
icebergTable.schemas().size() == sourceTable.getDataColumns().size(),
"Data column count mismatch: %d vs %d", icebergTable.schemas().size(), sourceTable.getDataColumns().size());
icebergTable.schemas().size() >= sourceTable.getDataColumns().size(),
"Target table should have at least %d columns but got %d", sourceTable.getDataColumns().size(), icebergTable.schemas().size());
checkProcedureArgument(
icebergTable.spec().fields().size() == sourceTable.getPartitionColumns().size(),
"Numbers of partition columns should be equivalent. target: %d, source: %d", icebergTable.spec().fields().size(), sourceTable.getPartitionColumns().size());

// TODO Add files from all partitions when partition filter is not provided
checkProcedureArgument(
Expand All @@ -1543,19 +1546,27 @@ private Optional<ConnectorTableExecuteHandle> getTableHandleForAddFilesFromTable
throw new TrinoException(NOT_SUPPORTED, "Partition filter is not supported for non-partitioned tables");
}

Set<String> missingDataColumns = new HashSet<>();
Stream.of(sourceTable.getDataColumns(), sourceTable.getPartitionColumns())
.flatMap(List::stream)
.forEach(sourceColumn -> {
Types.NestedField targetColumn = icebergTable.schema().caseInsensitiveFindField(sourceColumn.getName());
if (targetColumn == null) {
throw new TrinoException(COLUMN_NOT_FOUND, "Column '%s' does not exist".formatted(sourceColumn.getName()));
if (sourceTable.getPartitionColumns().contains(sourceColumn)) {
throw new TrinoException(COLUMN_NOT_FOUND, "Partition column '%s' does not exist".formatted(sourceColumn.getName()));
}
missingDataColumns.add(sourceColumn.getName());
return;
}
ColumnIdentity columnIdentity = createColumnIdentity(targetColumn);
org.apache.iceberg.types.Type sourceColumnType = toIcebergType(typeManager.getType(getTypeSignature(sourceColumn.getType(), DEFAULT_PRECISION)), columnIdentity);
if (!targetColumn.type().equals(sourceColumnType)) {
throw new TrinoException(TYPE_MISMATCH, "Target '%s' column is '%s' type, but got source '%s' type".formatted(targetColumn.name(), targetColumn.type(), sourceColumnType));
}
});
if (missingDataColumns.size() == sourceTable.getDataColumns().size()) {
throw new TrinoException(COLUMN_NOT_FOUND, "All columns in the source table do not exist in the target table");
}

return Optional.of(new IcebergTableExecuteHandle(
tableHandle.getSchemaTableName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,10 @@ public static void addFiles(ConnectorSession session, Table table, List<DataFile

if (!requiredFields.isEmpty()) {
for (DataFile dataFile : dataFiles) {
Map<Integer, Long> nullValueCounts = dataFile.nullValueCounts();
Map<Integer, Long> nullValueCounts = firstNonNull(dataFile.nullValueCounts(), Map.of());
for (Integer field : requiredFields) {
if (nullValueCounts.get(field) > 0) {
Long nullCount = nullValueCounts.get(field);
if (nullCount == null || nullCount > 0) {
throw new TrinoException(CONSTRAINT_VIOLATION, "NULL value not allowed for NOT NULL column: " + schema.findField(field).name());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,63 @@ void testAddFilesTypeMismatch()
}

@Test
void testAddFilesDifferentDataColumnDefinitions()
void testAddFilesFromLessColumnTable()
{
for (String format : List.of("ORC", "PARQUET", "AVRO")) {
String hiveTableName = "test_add_files_" + randomNameSuffix();
String icebergTableName = "test_add_files_" + randomNameSuffix();

assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " WITH (format = '" + format + "') AS SELECT 1 x", 1);
assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + " WITH (format = '" + format + "') AS SELECT 2 x, 20 y", 1);

assertUpdate("ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', '" + hiveTableName + "')");
assertQuery("SELECT * FROM iceberg.tpch." + icebergTableName, "VALUES (1, NULL), (2, 20)");

assertUpdate("DROP TABLE hive.tpch." + hiveTableName);
assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName);
}
}

@Test
void testAddFilesFromLessColumnTableNotNull()
{
for (String format : List.of("ORC", "PARQUET", "AVRO")) {
String hiveTableName = "test_add_files_" + randomNameSuffix();
String icebergTableName = "test_add_files_" + randomNameSuffix();

assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " WITH (format = '" + format + "') AS SELECT 1 x", 1);
assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + "(x int, y int NOT NULL) WITH (format = '" + format + "')");

assertQueryFails(
"ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', '" + hiveTableName + "')",
".*NULL value not allowed for NOT NULL column: y");

assertUpdate("DROP TABLE hive.tpch." + hiveTableName);
assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName);
}
}

@Test
void testAddFilesFromMoreColumnTable()
{
for (String format : List.of("ORC", "PARQUET", "AVRO")) {
String hiveTableName = "test_add_files_" + randomNameSuffix();
String icebergTableName = "test_add_files_" + randomNameSuffix();

assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " WITH (format = '" + format + "') AS SELECT 1 x, 'extra' y", 1);
assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + " WITH (format = '" + format + "') AS SELECT 2 x", 1);

assertQueryFails(
"ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', '" + hiveTableName + "')",
"Target table should have at least 2 columns but got 1");

assertUpdate("DROP TABLE hive.tpch." + hiveTableName);
assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName);
}
}

@Test
void testAddFilesDifferentAllDataColumnDefinitions()
{
for (String format : List.of("ORC", "PARQUET", "AVRO")) {
String hiveTableName = "test_add_files_" + randomNameSuffix();
Expand All @@ -182,7 +238,9 @@ void testAddFilesDifferentDataColumnDefinitions()
assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " WITH (format = '" + format + "') AS SELECT 1 x", 1);
assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + " WITH (format = '" + format + "') AS SELECT 2 y", 1);

assertQueryFails("ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', '" + hiveTableName + "')", "Column 'x' does not exist");
assertQueryFails(
"ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', '" + hiveTableName + "')",
"All columns in the source table do not exist in the target table");

assertUpdate("DROP TABLE hive.tpch." + hiveTableName);
assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName);
Expand All @@ -200,7 +258,24 @@ void testAddFilesDifferentPartitionColumnDefinitions()

assertQueryFails(
"ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', '" + hiveTableName + "', map(ARRAY['hive_part'], ARRAY['10']))",
"Column 'hive_part' does not exist");
"Partition column 'hive_part' does not exist");

assertUpdate("DROP TABLE hive.tpch." + hiveTableName);
assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName);
}

@Test
void testAddFilesFromNonPartitionTable()
{
String hiveTableName = "test_add_files_" + randomNameSuffix();
String icebergTableName = "test_add_files_" + randomNameSuffix();

assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " AS SELECT 1 x", 1);
assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + " WITH (partitioning = ARRAY['iceberg_part']) AS SELECT 2 x, 20 iceberg_part", 1);

assertQueryFails(
"ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', '" + hiveTableName + "')",
"Numbers of partition columns should be equivalent. target: 1, source: 0");

assertUpdate("DROP TABLE hive.tpch." + hiveTableName);
assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName);
Expand Down Expand Up @@ -249,7 +324,7 @@ void tetAddFilesNonPartitionTableWithPartitionFilter()

assertQueryFails(
"ALTER TABLE " + icebergTableName + " EXECUTE add_files_from_table('tpch', '" + hiveTableName + "', map(ARRAY['part'], ARRAY['test1']))",
"Partition filter is not supported for non-partitioned tables");
"Numbers of partition columns should be equivalent. target: 1, source: 0");

assertUpdate("DROP TABLE hive.tpch." + hiveTableName);
assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName);
Expand Down

0 comments on commit c8f1bd0

Please sign in to comment.