Skip to content

Commit

Permalink
[apache#1707] feat(mysql): Support mysql index.
Browse files Browse the repository at this point in the history
  • Loading branch information
Clearvive authored and Clearvive committed Jan 26, 2024
1 parent 26a4b3d commit d44ab8b
Show file tree
Hide file tree
Showing 12 changed files with 211 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ public class Indexes {

public static final Index[] EMPTY_INDEXES = new Index[0];

public static final String DEFAULT_MYSQL_PRIMARY_KEY_NAME = "PRIMARY";

/**
* Create a unique index on columns. Like unique (a) or unique (a, b), for complex like unique
*
Expand All @@ -20,6 +22,17 @@ public static Index unique(String name, String[][] fieldNames) {
return of(Index.IndexType.UNIQUE_KEY, name, fieldNames);
}

/**
* Mysql only supports single-column primary key, and the name of the primary key is fixed, which
* is PRIMARY
*
* @param fieldName The field name under the table contained in the index.
* @return The primary key index
*/
public static Index createMysqlPrimaryKey(String fieldName) {
return primary(DEFAULT_MYSQL_PRIMARY_KEY_NAME, new String[][] {{fieldName}});
}

/**
* Create a primary index on columns. Like primary (a), for complex like primary
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,6 @@ public Table createTable(
SortOrder[] sortOrders,
Index[] indexes)
throws NoSuchSchemaException, TableAlreadyExistsException {
Preconditions.checkArgument(indexes.length == 0, "Jdbc-catalog does not support indexes");
Preconditions.checkArgument(
null == distribution || distribution == Distributions.NONE,
"jdbc-catalog does not support distribution");
Expand Down Expand Up @@ -400,7 +399,8 @@ public Table createTable(
jdbcColumns,
StringIdentifier.addToComment(identifier, comment),
resultProperties,
partitioning);
partitioning,
indexes);

return new JdbcTable.Builder()
.withAuditInfo(
Expand All @@ -413,6 +413,7 @@ public Table createTable(
.withComment(comment)
.withProperties(jdbcTablePropertiesMetadata.convertFromJdbcProperties(resultProperties))
.withPartitioning(partitioning)
.withIndexes(indexes)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ protected JdbcTable internalBuild() {
jdbcTable.columns = columns;
jdbcTable.partitioning = partitioning;
jdbcTable.sortOrders = sortOrders;
jdbcTable.indexes = indexes;
return jdbcTable;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.datastrato.gravitino.meta.AuditInfo;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.indexes.Index;
import com.google.common.collect.Lists;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
Expand Down Expand Up @@ -63,13 +64,14 @@ public void create(
JdbcColumn[] columns,
String comment,
Map<String, String> properties,
Transform[] partitioning)
Transform[] partitioning,
Index[] indexes)
throws TableAlreadyExistsException {
LOG.info("Attempting to create table {} in database {}", tableName, databaseName);
try (Connection connection = getConnection(databaseName)) {
JdbcConnectorUtils.executeUpdate(
connection,
generateCreateTableSql(tableName, columns, comment, properties, partitioning));
generateCreateTableSql(tableName, columns, comment, properties, partitioning, indexes));
LOG.info("Created table {} in database {}", tableName, databaseName);
} catch (final SQLException se) {
throw this.exceptionMapper.toGravitinoException(se);
Expand Down Expand Up @@ -215,7 +217,8 @@ protected abstract String generateCreateTableSql(
JdbcColumn[] columns,
String comment,
Map<String, String> properties,
Transform[] partitioning);
Transform[] partitioning,
Index[] indexes);

protected abstract String generateRenameTableSql(String oldTableName, String newTableName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.datastrato.gravitino.exceptions.TableAlreadyExistsException;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.indexes.Index;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
Expand Down Expand Up @@ -41,14 +42,16 @@ void initialize(
* @param comment The comment of the table.
* @param properties The properties of the table.
* @param partitioning The partitioning of the table.
* @param indexes
*/
void create(
String databaseName,
String tableName,
JdbcColumn[] columns,
String comment,
Map<String, String> properties,
Transform[] partitioning)
Transform[] partitioning,
Index[] indexes)
throws TableAlreadyExistsException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.datastrato.gravitino.catalog.jdbc.JdbcColumn;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.indexes.Index;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
Expand All @@ -20,7 +21,8 @@ protected String generateCreateTableSql(
JdbcColumn[] columns,
String comment,
Map<String, String> properties,
Transform[] partitioning) {
Transform[] partitioning,
Index[] indexes) {
StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("CREATE TABLE ").append(tableName).append(" (");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void testOperationTable() {
Assertions.assertDoesNotThrow(
() ->
JDBC_TABLE_OPERATIONS.create(
DATABASE_NAME, table1, jdbcColumns, null, properties, null));
DATABASE_NAME, table1, jdbcColumns, null, properties, null, indexes));

// list table.
List<String> allTables = JDBC_TABLE_OPERATIONS.listTables(DATABASE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@
import com.datastrato.gravitino.catalog.jdbc.JdbcTable;
import com.datastrato.gravitino.catalog.jdbc.operation.JdbcTableOperations;
import com.datastrato.gravitino.catalog.mysql.converter.MysqlTypeConverter;
import com.datastrato.gravitino.dto.rel.indexes.IndexDTO;
import com.datastrato.gravitino.exceptions.NoSuchColumnException;
import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
import com.datastrato.gravitino.exceptions.NoSuchTableException;
import com.datastrato.gravitino.meta.AuditInfo;
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.indexes.Index;
import com.datastrato.gravitino.rel.indexes.Indexes;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
Expand All @@ -29,8 +32,11 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
Expand Down Expand Up @@ -87,6 +93,47 @@ public JdbcTable load(String databaseName, String tableName) throws NoSuchTableE
comment = tableProperties.getOrDefault(COMMENT, comment);
}

List<IndexDTO> indexes = new ArrayList<>();

// 4.Get primary key information
ResultSet primaryKeys = metaData.getPrimaryKeys(databaseName, null, tableName);
while (primaryKeys.next()) {
indexes.add(
IndexDTO.builder()
.withIndexType(Index.IndexType.PRIMARY_KEY)
.withName(primaryKeys.getString("PK_NAME"))
.withFieldNames(new String[][] {{primaryKeys.getString("COLUMN_NAME")}})
.build());
}

// 5.Get index information
Map<String, Set<String>> indexGroupByName = new HashMap<>();
ResultSet indexInfo = metaData.getIndexInfo(databaseName, null, tableName, false, false);
while (indexInfo.next()) {
String indexName = indexInfo.getString("INDEX_NAME");
if (!indexInfo.getBoolean("NON_UNIQUE")
&& !Indexes.DEFAULT_MYSQL_PRIMARY_KEY_NAME.equals(indexName)) {
String columnName = indexInfo.getString("COLUMN_NAME");
indexGroupByName.compute(
indexName,
(k, v) -> {
if (v == null) {
v = new HashSet<>();
}
v.add(columnName);
return v;
});
}
}
for (Map.Entry<String, Set<String>> entry : indexGroupByName.entrySet()) {
indexes.add(
IndexDTO.builder()
.withIndexType(Index.IndexType.UNIQUE_KEY)
.withName(entry.getKey())
.withFieldNames(new String[][] {entry.getValue().toArray(new String[0])})
.build());
}

return new JdbcTable.Builder()
.withName(tableName)
.withColumns(jdbcColumns.toArray(new JdbcColumn[0]))
Expand All @@ -102,6 +149,7 @@ public JdbcTable load(String databaseName, String tableName) throws NoSuchTableE
}
}
}))
.withIndexes(indexes.toArray(new IndexDTO[0]))
.withAuditInfo(AuditInfo.EMPTY)
.build();
} catch (SQLException e) {
Expand Down Expand Up @@ -178,7 +226,8 @@ protected String generateCreateTableSql(
JdbcColumn[] columns,
String comment,
Map<String, String> properties,
Transform[] partitioning) {
Transform[] partitioning,
Index[] indexes) {
if (ArrayUtils.isNotEmpty(partitioning)) {
throw new UnsupportedOperationException("Currently we do not support Partitioning in mysql");
}
Expand All @@ -201,6 +250,46 @@ protected String generateCreateTableSql(
sqlBuilder.append(",\n");
}
}

if (indexes.length > 0) {
for (Index index : indexes) {
sqlBuilder.append(",\n");
switch (index.type()) {
case PRIMARY_KEY:
if (index.fieldNames().length != 1 || index.fieldNames()[0].length != 1) {
throw new IllegalArgumentException(
"Primary key does not support complex fields in Mysql");
}
if (!"PRIMARY".equals(index.name().toUpperCase(Locale.ROOT))) {
throw new IllegalArgumentException("Primary key name must be PRIMARY in Mysql");
}
sqlBuilder
.append("CONSTRAINT ")
.append("PRIMARY KEY (")
.append(index.fieldNames()[0][0])
.append(")");
break;
case UNIQUE_KEY:
if (index.fieldNames().length != 1) {
throw new IllegalArgumentException(
"Unique key does not support complex fields in Mysql");
}

String fields = String.join(", ", index.fieldNames()[0]);
sqlBuilder
.append("CONSTRAINT ")
.append(index.name())
.append(" UNIQUE (")
.append(fields)
.append(")");
break;
default:
throw new UnsupportedOperationException(
"Currently we do not support index in mysql, index type: " + index.type());
}
}
}

sqlBuilder.append("\n)");

// Add table comment if specified
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.datastrato.gravitino.meta.AuditInfo;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.indexes.Index;
import com.datastrato.gravitino.rel.types.Types;
import com.google.common.base.Preconditions;
import java.sql.Connection;
Expand Down Expand Up @@ -225,7 +226,8 @@ protected String generateCreateTableSql(
JdbcColumn[] columns,
String comment,
Map<String, String> properties,
Transform[] partitioning) {
Transform[] partitioning,
Index[] indexes) {
if (ArrayUtils.isNotEmpty(partitioning)) {
throw new UnsupportedOperationException(
"Currently we do not support Partitioning in PostgreSQL");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,15 @@
import com.datastrato.gravitino.catalog.jdbc.utils.DataSourceUtils;
import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
import com.datastrato.gravitino.integration.test.util.GravitinoITUtils;
import com.datastrato.gravitino.rel.indexes.Index;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.apache.commons.lang3.ArrayUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.testcontainers.containers.JdbcDatabaseContainer;
Expand Down Expand Up @@ -75,6 +80,7 @@ protected static void assertionsTableInfo(
String tableComment,
List<JdbcColumn> columns,
Map<String, String> properties,
Index[] indexes,
JdbcTable table) {
Assertions.assertEquals(tableName, table.name());
Assertions.assertEquals(tableComment, table.comment());
Expand All @@ -96,6 +102,24 @@ protected static void assertionsTableInfo(
for (Map.Entry<String, String> entry : properties.entrySet()) {
Assertions.assertEquals(entry.getValue(), table.properties().get(entry.getKey()));
}
if (ArrayUtils.isNotEmpty(indexes)) {
Assertions.assertEquals(indexes.length, table.index().length);

Map<String, Index> indexByName =
Arrays.stream(indexes).collect(Collectors.toMap(Index::name, index -> index));

for (int i = 0; i < table.index().length; i++) {
Assertions.assertTrue(indexByName.containsKey(table.index()[i].name()));
Assertions.assertEquals(
indexByName.get(table.index()[i].name()).type(), table.index()[i].type());
for (int j = 0; j < table.index()[i].fieldNames().length; j++) {
Set<String> colNames =
Arrays.stream(indexByName.get(table.index()[i].name()).fieldNames()[j])
.collect(Collectors.toSet());
colNames.containsAll(Arrays.asList(table.index()[i].fieldNames()[j]));
}
}
}
}

@AfterAll
Expand Down
Loading

0 comments on commit d44ab8b

Please sign in to comment.