Skip to content

Commit

Permalink
Check primary_key exist during Ignite create table
Browse files Browse the repository at this point in the history
Adds a check for create table in IgniteClient. The check ensures
that the column names mentioned in the table property 'primary_key'
exists in the column defined in the create table statement.
  • Loading branch information
sahoss authored and ebyhr committed Mar 20, 2023
1 parent 366cd8a commit 8df3c9c
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
import static io.trino.plugin.jdbc.StandardColumnMappings.varcharReadFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.varcharWriteFunction;
import static io.trino.spi.StandardErrorCode.INVALID_ARGUMENTS;
import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
Expand Down Expand Up @@ -382,37 +383,47 @@ public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, Connecto

int expectedSize = tableMetadata.getColumns().size();
ImmutableList.Builder<String> columns = ImmutableList.builderWithExpectedSize(expectedSize);
ImmutableList.Builder<String> columnNames = ImmutableList.builderWithExpectedSize(expectedSize);
ImmutableList.Builder<String> columnNamesBuilder = ImmutableList.builderWithExpectedSize(expectedSize);
ImmutableList.Builder<Type> columnTypes = ImmutableList.builderWithExpectedSize(expectedSize);
for (ColumnMetadata columnMetadata : tableMetadata.getColumns()) {
columns.add(getColumnDefinitionSql(session, columnMetadata, columnMetadata.getName()));
columnNames.add(columnMetadata.getName());
columnNamesBuilder.add(columnMetadata.getName());
columnTypes.add(columnMetadata.getType());
}
String sql = buildCreateSql(schemaTableName, columns.build(), tableMetadata.getProperties());

List<String> columnNames = columnNamesBuilder.build();
List<String> primaryKeys = IgniteTableProperties.getPrimaryKey(tableMetadata.getProperties());

for (String primaryKey : primaryKeys) {
if (!columnNames.contains(primaryKey)) {
throw new TrinoException(INVALID_TABLE_PROPERTY,
format("Column '%s' specified in property '%s' doesn't exist in table", primaryKey, PRIMARY_KEY_PROPERTY));
}
}

String sql = buildCreateSql(schemaTableName, columns.build(), primaryKeys);

try (Connection connection = connectionFactory.openConnection(session)) {
execute(session, connection, sql);

return new IgniteOutputTableHandle(
schemaTableName.getSchemaName(),
schemaTableName.getTableName(),
columnNames.build(),
columnNames,
columnTypes.build(),
Optional.empty(),
IgniteTableProperties.getPrimaryKey(tableMetadata.getProperties()).isEmpty() ? Optional.of(IGNITE_DUMMY_ID) : Optional.empty());
primaryKeys.isEmpty() ? Optional.of(IGNITE_DUMMY_ID) : Optional.empty());
}
catch (SQLException e) {
throw new TrinoException(JDBC_ERROR, e);
}
}

private String buildCreateSql(SchemaTableName schemaTableName, List<String> columns, Map<String, Object> tableProperties)
private String buildCreateSql(SchemaTableName schemaTableName, List<String> columns, List<String> primaryKeys)
{
ImmutableList.Builder<String> columnDefinitions = ImmutableList.builder();
columnDefinitions.addAll(columns);

List<String> primaryKeys = IgniteTableProperties.getPrimaryKey(tableProperties);
checkArgument(primaryKeys.size() < columns.size(), "Ignite table must have at least one non PRIMARY KEY column.");
if (primaryKeys.isEmpty()) {
columnDefinitions.add(quoted(IGNITE_DUMMY_ID) + " VARCHAR NOT NULL");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,20 @@ public void testCreateTableWithCommaPropertyColumn()
}
}

@Test
public void testCreateTableWithNonExistingPrimaryKey()
{
String tableName = "test_invalid_primary_key" + randomNameSuffix();
assertQueryFails("CREATE TABLE " + tableName + "(a bigint) WITH (primary_key = ARRAY['not_existing_column'])",
"Column 'not_existing_column' specified in property 'primary_key' doesn't exist in table");

assertQueryFails("CREATE TABLE " + tableName + "(a bigint) WITH (primary_key = ARRAY['dummy_id'])",
"Column 'dummy_id' specified in property 'primary_key' doesn't exist in table");

assertQueryFails("CREATE TABLE " + tableName + "(a bigint) WITH (primary_key = ARRAY['A'])",
"Column 'A' specified in property 'primary_key' doesn't exist in table");
}

@Test
public void testCreateTableWithAllProperties()
{
Expand Down

0 comments on commit 8df3c9c

Please sign in to comment.