Skip to content

Commit

Permalink
Check duplicated field names in Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Jun 19, 2024
1 parent db064d2 commit e9609fd
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,22 @@
import org.apache.iceberg.types.Types;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.spi.StandardErrorCode.DUPLICATE_COLUMN_NAME;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.type.TimeType.TIME_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS;
import static io.trino.spi.type.UuidType.UUID;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;

public final class TypeConverter
{
Expand Down Expand Up @@ -183,6 +187,7 @@ private static org.apache.iceberg.types.Type fromRow(RowType type, Optional<Colu
{
checkExactlyOne(columnIdentity, nextFieldId);

Set<String> fieldNames = new HashSet<>();
List<Types.NestedField> fields = new ArrayList<>();
for (int i = 0; i < type.getFields().size(); i++) {
int fieldIndex = i;
Expand All @@ -194,6 +199,9 @@ private static org.apache.iceberg.types.Type fromRow(RowType type, Optional<Colu
RowType.Field field = type.getFields().get(fieldIndex);
String name = field.getName().orElseThrow(() ->
new TrinoException(NOT_SUPPORTED, "Row type field does not have a name: " + type.getDisplayName()));
if (!fieldNames.add(name.toLowerCase(ENGLISH))) {
throw new TrinoException(DUPLICATE_COLUMN_NAME, "Field name '%s' specified more than once".formatted(name.toLowerCase(ENGLISH)));
}
fields.add(Types.NestedField.optional(id, name, toIcebergTypeInternal(field.getType(), childColumnIdentity, nextFieldId)));
}
return Types.StructType.of(fields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1534,7 +1534,42 @@ public void testDropRowFieldWhenDuplicates()
{
// Override because Iceberg doesn't allow duplicated field names in a row type
assertThatThrownBy(super::testDropRowFieldWhenDuplicates)
.hasMessage("Invalid schema: multiple fields for name col.a: 2 and 3");
.hasMessage("Field name 'a' specified more than once");
}

@Test
@Override // Override because ambiguous field name is disallowed in the connector
public void testDropAmbiguousRowFieldCaseSensitivity()
{
assertThatThrownBy(super::testDropAmbiguousRowFieldCaseSensitivity)
.hasMessage("Field name 'some_field' specified more than once");
}

@Test
public void testDuplicatedFieldNames()
{
String tableName = "test_duplicated_field_names" + randomNameSuffix();

assertQueryFails("CREATE TABLE " + tableName + "(col row(x int, \"X\" int))", "Field name 'x' specified more than once");
assertQueryFails("CREATE TABLE " + tableName + " AS SELECT cast(NULL AS row(x int, \"X\" int)) col", "Field name 'x' specified more than once");

assertQueryFails("CREATE TABLE " + tableName + "(col array(row(x int, \"X\" int)))", "Field name 'x' specified more than once");
assertQueryFails("CREATE TABLE " + tableName + " AS SELECT cast(NULL AS array(row(x int, \"X\" int))) col", "Field name 'x' specified more than once");

assertQueryFails("CREATE TABLE " + tableName + "(col map(int, row(x int, \"X\" int)))", "Field name 'x' specified more than once");
assertQueryFails("CREATE TABLE " + tableName + " AS SELECT cast(NULL AS map(int, row(x int, \"X\" int))) col", "Field name 'x' specified more than once");

assertQueryFails("CREATE TABLE " + tableName + "(col row(a row(x int, \"X\" int)))", "Field name 'x' specified more than once");
assertQueryFails("CREATE TABLE " + tableName + " AS SELECT cast(NULL AS row(a row(x int, \"X\" int))) col", "Field name 'x' specified more than once");

try (TestTable table = new TestTable(getQueryRunner()::execute, "test_duplicated_field_names_", "(id int)")) {
assertQueryFails("ALTER TABLE " + table.getName() + " ADD COLUMN col row(x int, \"X\" int)", ".* Field name 'x' specified more than once");

assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN col row(\"X\" int)");
assertQueryFails("ALTER TABLE " + table.getName() + " ADD COLUMN col.x int", "line 1:1: Field 'x' already exists");

assertQueryFails("ALTER TABLE " + table.getName() + " ALTER COLUMN col SET DATA TYPE row(x int, \"X\" int)", "Field name 'x' specified more than once");
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import static io.trino.testing.containers.Minio.MINIO_SECRET_KEY;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/**
* Iceberg connector test ORC and with S3-compatible storage (but without real metastore).
Expand Down Expand Up @@ -160,16 +159,6 @@ public void testTimeType()
}
}

@Test
@Override
public void testDropAmbiguousRowFieldCaseSensitivity()
{
// TODO https://github.com/trinodb/trino/issues/16273 The connector can't read row types having ambiguous field names in ORC files. e.g. row(X int, x int)
assertThatThrownBy(super::testDropAmbiguousRowFieldCaseSensitivity)
.hasMessageContaining("Error opening Iceberg split")
.hasStackTraceContaining("Multiple entries with same key");
}

@Override
protected Optional<TypeCoercionTestSetup> filterTypeCoercionOnCreateTableAsSelectProvider(TypeCoercionTestSetup setup)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import static io.trino.plugin.iceberg.IcebergTestUtils.withSmallRowGroups;
import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class TestIcebergParquetConnectorTest
extends BaseIcebergConnectorTest
Expand Down Expand Up @@ -87,15 +86,6 @@ protected Optional<SetColumnTypeSetup> filterSetColumnTypesDataProvider(SetColum
return super.filterSetColumnTypesDataProvider(setup);
}

@Test
@Override
public void testDropAmbiguousRowFieldCaseSensitivity()
{
// TODO https://github.com/trinodb/trino/issues/16273 The connector can't read row types having ambiguous field names in Parquet files. e.g. row(X int, x int)
assertThatThrownBy(super::testDropAmbiguousRowFieldCaseSensitivity)
.hasMessage("Invalid schema: multiple fields for name col.some_field: 2 and 3");
}

@Test
public void testIgnoreParquetStatistics()
{
Expand Down

0 comments on commit e9609fd

Please sign in to comment.