Skip to content

Commit

Permalink
[apache#1707] feat(mysql): Support mysql index.
Browse files Browse the repository at this point in the history
  • Loading branch information
Clearvive authored and Clearvive committed Jan 26, 2024
1 parent e15078b commit 6d90696
Show file tree
Hide file tree
Showing 13 changed files with 326 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public class Indexes {

public static final Index[] EMPTY_INDEXES = new Index[0];

/** My SQL does not support setting the name of the primary key, so the default name is used. */
/** MySQL does not support setting the name of the primary key, so the default name is used. */
public static final String DEFAULT_MYSQL_PRIMARY_KEY_NAME = "PRIMARY";

/**
Expand All @@ -24,9 +24,9 @@ public static Index unique(String name, String[][] fieldNames) {
}

/**
* To create a My SQL primary key, you need to use the default primary key name.
* To create a MySQL primary key, you need to use the default primary key name.
*
* @param fieldNames T.
* @param fieldNames The field names under the table contained in the index.
* @return The primary key index
*/
public static Index createMysqlPrimaryKey(String[][] fieldNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ public Table loadTable(NameIdentifier tableIdent) throws NoSuchTableException {
// Remove id from comment
.withComment(StringIdentifier.removeIdFromComment(load.comment()))
.withProperties(properties)
.withIndexes(load.index())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.datastrato.gravitino.catalog.jdbc.JdbcTable;
import com.datastrato.gravitino.catalog.jdbc.operation.JdbcTableOperations;
import com.datastrato.gravitino.catalog.mysql.converter.MysqlTypeConverter;
import com.datastrato.gravitino.dto.rel.indexes.IndexDTO;
import com.datastrato.gravitino.exceptions.NoSuchColumnException;
import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
import com.datastrato.gravitino.exceptions.NoSuchTableException;
Expand Down Expand Up @@ -93,59 +92,8 @@ public JdbcTable load(String databaseName, String tableName) throws NoSuchTableE
comment = tableProperties.getOrDefault(COMMENT, comment);
}

List<IndexDTO> indexes = new ArrayList<>();

Map<String, Set<String>> primaryKeyGroupByName = new HashMap<>();
// 4.Get primary key information
ResultSet primaryKeys = metaData.getPrimaryKeys(databaseName, null, tableName);
while (primaryKeys.next()) {
String columnName = primaryKeys.getString("COLUMN_NAME");
primaryKeyGroupByName.compute(
primaryKeys.getString("PK_NAME"),
(k, v) -> {
if (v == null) {
v = new HashSet<>();
}
v.add(columnName);
return v;
});
}
for (Map.Entry<String, Set<String>> entry : primaryKeyGroupByName.entrySet()) {
indexes.add(
IndexDTO.builder()
.withIndexType(Index.IndexType.PRIMARY_KEY)
.withName(entry.getKey())
.withFieldNames(convertIndexFieldNames(entry.getValue().toArray(new String[0])))
.build());
}

// 5.Get index information
Map<String, Set<String>> indexGroupByName = new HashMap<>();
ResultSet indexInfo = metaData.getIndexInfo(databaseName, null, tableName, false, false);
while (indexInfo.next()) {
String indexName = indexInfo.getString("INDEX_NAME");
if (!indexInfo.getBoolean("NON_UNIQUE")
&& !StringUtils.equalsIgnoreCase(Indexes.DEFAULT_MYSQL_PRIMARY_KEY_NAME, indexName)) {
String columnName = indexInfo.getString("COLUMN_NAME");
indexGroupByName.compute(
indexName,
(k, v) -> {
if (v == null) {
v = new HashSet<>();
}
v.add(columnName);
return v;
});
}
}
for (Map.Entry<String, Set<String>> entry : indexGroupByName.entrySet()) {
indexes.add(
IndexDTO.builder()
.withIndexType(Index.IndexType.UNIQUE_KEY)
.withName(entry.getKey())
.withFieldNames(convertIndexFieldNames(entry.getValue().toArray(new String[0])))
.build());
}
// 4.Get index information
List<Index> indexes = getIndexes(databaseName, tableName, metaData);

return new JdbcTable.Builder()
.withName(tableName)
Expand All @@ -162,14 +110,65 @@ public JdbcTable load(String databaseName, String tableName) throws NoSuchTableE
}
}
}))
.withIndexes(indexes.toArray(new IndexDTO[0]))
.withIndexes(indexes.toArray(new Index[0]))
.withAuditInfo(AuditInfo.EMPTY)
.build();
} catch (SQLException e) {
throw exceptionMapper.toGravitinoException(e);
}
}

private List<Index> getIndexes(String databaseName, String tableName, DatabaseMetaData metaData)
throws SQLException {
List<Index> indexes = new ArrayList<>();
// 1.Get primary key information
Map<String, Set<String>> primaryKeyGroupByName = new HashMap<>();
ResultSet primaryKeys = metaData.getPrimaryKeys(databaseName, null, tableName);
while (primaryKeys.next()) {
String columnName = primaryKeys.getString("COLUMN_NAME");
primaryKeyGroupByName.compute(
primaryKeys.getString("PK_NAME"),
(k, v) -> {
if (v == null) {
v = new HashSet<>();
}
v.add(columnName);
return v;
});
}
for (Map.Entry<String, Set<String>> entry : primaryKeyGroupByName.entrySet()) {
indexes.add(
Indexes.primary(
entry.getKey(), convertIndexFieldNames(entry.getValue().toArray(new String[0]))));
}

// 2.Get unique key information
Map<String, Set<String>> indexGroupByName = new HashMap<>();
ResultSet indexInfo = metaData.getIndexInfo(databaseName, null, tableName, false, false);
while (indexInfo.next()) {
String indexName = indexInfo.getString("INDEX_NAME");
if (!indexInfo.getBoolean("NON_UNIQUE")
&& !StringUtils.equalsIgnoreCase(Indexes.DEFAULT_MYSQL_PRIMARY_KEY_NAME, indexName)) {
String columnName = indexInfo.getString("COLUMN_NAME");
indexGroupByName.compute(
indexName,
(k, v) -> {
if (v == null) {
v = new HashSet<>();
}
v.add(columnName);
return v;
});
}
}
for (Map.Entry<String, Set<String>> entry : indexGroupByName.entrySet()) {
indexes.add(
Indexes.unique(
entry.getKey(), convertIndexFieldNames(entry.getValue().toArray(new String[0]))));
}
return indexes;
}

private String[][] convertIndexFieldNames(String[] fieldNames) {
return Arrays.stream(fieldNames)
.map(colName -> new String[] {colName})
Expand Down Expand Up @@ -285,18 +284,17 @@ protected String generateCreateTableSql(
sqlBuilder.append(",\n");
switch (index.type()) {
case PRIMARY_KEY:
if (!"PRIMARY".equals(index.name().toUpperCase(Locale.ROOT))) {
if (null != index.name() && !"PRIMARY".equals(index.name().toUpperCase(Locale.ROOT))) {
throw new IllegalArgumentException("Primary key name must be PRIMARY in MySQL");
}
sqlBuilder.append("CONSTRAINT ").append("PRIMARY KEY (").append(fieldStr).append(")");
break;
case UNIQUE_KEY:
sqlBuilder
.append("CONSTRAINT ")
.append(index.name())
.append(" UNIQUE (")
.append(fieldStr)
.append(")");
sqlBuilder.append("CONSTRAINT ");
if (null != index.name()) {
sqlBuilder.append(index.name());
}
sqlBuilder.append(" UNIQUE (").append(fieldStr).append(")");
break;
default:
throw new IllegalArgumentException("MySQL doesn't support index : " + index.type());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.datastrato.gravitino.dto.rel.expressions.FieldReferenceDTO;
import com.datastrato.gravitino.dto.rel.expressions.FunctionArg;
import com.datastrato.gravitino.dto.rel.expressions.LiteralDTO;
import com.datastrato.gravitino.dto.rel.indexes.IndexDTO;
import com.datastrato.gravitino.dto.rel.partitioning.DayPartitioningDTO;
import com.datastrato.gravitino.dto.rel.partitioning.IdentityPartitioningDTO;
import com.datastrato.gravitino.dto.rel.partitioning.Partitioning;
Expand Down Expand Up @@ -55,7 +56,6 @@
import com.datastrato.gravitino.rel.expressions.distributions.Strategy;
import com.datastrato.gravitino.rel.expressions.sorts.SortDirection;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
import com.datastrato.gravitino.rel.indexes.Indexes;
import com.datastrato.gravitino.rel.types.Type;
import com.datastrato.gravitino.rel.types.Types;
import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -362,7 +362,7 @@ public void testCreateTable() throws JsonProcessingException {
sortOrderDTOs,
DistributionDTO.NONE,
EMPTY_PARTITIONING,
Indexes.EMPTY_INDEXES);
IndexDTO.EMPTY_INDEXES);
TableResponse resp = new TableResponse(expectedTable);
buildMockResource(Method.POST, tablePath, req, resp, SC_OK);

Expand Down Expand Up @@ -481,7 +481,7 @@ public void testCreatePartitionedTable() throws JsonProcessingException {
SortOrderDTO.EMPTY_SORT,
DistributionDTO.NONE,
EMPTY_PARTITIONING,
Indexes.EMPTY_INDEXES);
IndexDTO.EMPTY_INDEXES);
TableResponse resp = new TableResponse(expectedTable);
buildMockResource(Method.POST, tablePath, req, resp, SC_OK);

Expand Down Expand Up @@ -514,7 +514,7 @@ public void testCreatePartitionedTable() throws JsonProcessingException {
SortOrderDTO.EMPTY_SORT,
DistributionDTO.NONE,
partitioning,
Indexes.EMPTY_INDEXES);
IndexDTO.EMPTY_INDEX);
resp = new TableResponse(expectedTable);
buildMockResource(Method.POST, tablePath, req, resp, SC_OK);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package com.datastrato.gravitino.dto.rel;

import com.datastrato.gravitino.dto.AuditDTO;
import com.datastrato.gravitino.dto.rel.indexes.IndexDTO;
import com.datastrato.gravitino.dto.rel.partitioning.Partitioning;
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.Table;
Expand Down Expand Up @@ -44,7 +45,7 @@ public class TableDTO implements Table {
private Partitioning[] partitioning;

@JsonProperty("indexes")
private Index[] indexes;
private IndexDTO[] indexes;

private TableDTO() {}

Expand All @@ -68,7 +69,7 @@ private TableDTO(
Partitioning[] partitioning,
DistributionDTO distribution,
SortOrderDTO[] sortOrderDTOs,
Index[] indexes) {
IndexDTO[] indexes) {
this.name = name;
this.comment = comment;
this.columns = columns;
Expand Down Expand Up @@ -148,7 +149,7 @@ public static class Builder<S extends Builder> {
protected SortOrderDTO[] sortOrderDTOs;
protected DistributionDTO distributionDTO;
protected Partitioning[] Partitioning;
protected Index[] indexes;
protected IndexDTO[] indexes;

public Builder() {}

Expand Down Expand Up @@ -222,7 +223,7 @@ public S withPartitioning(Partitioning[] Partitioning) {
return (S) this;
}

public S withIndex(Index[] indexes) {
public S withIndex(IndexDTO[] indexes) {
this.indexes = indexes;
return (S) this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@
*/
package com.datastrato.gravitino.dto.rel.indexes;

import com.datastrato.gravitino.json.JsonUtils;
import com.datastrato.gravitino.rel.indexes.Index;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;

@JsonSerialize(using = JsonUtils.IndexSerializer.class)
@JsonDeserialize(using = JsonUtils.IndexDeserializer.class)
public class IndexDTO implements Index {

public static final IndexDTO[] EMPTY_INDEXES = new IndexDTO[0];

private IndexType indexType;
private String name;
private String[][] fieldNames;
Expand Down Expand Up @@ -67,7 +73,6 @@ public S withFieldNames(String[][] fieldNames) {

public IndexDTO build() {
Preconditions.checkArgument(indexType != null, "Index type cannot be null");
Preconditions.checkArgument(StringUtils.isNotBlank(name), "Index name cannot be blank");
Preconditions.checkArgument(
fieldNames != null && fieldNames.length > 0,
"The index must be set with corresponding column names");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import com.datastrato.gravitino.dto.rel.DistributionDTO;
import com.datastrato.gravitino.dto.rel.SortOrderDTO;
import com.datastrato.gravitino.dto.rel.expressions.FunctionArg;
import com.datastrato.gravitino.dto.rel.indexes.IndexDTO;
import com.datastrato.gravitino.dto.rel.partitioning.Partitioning;
import com.datastrato.gravitino.rel.indexes.Index;
import com.datastrato.gravitino.rest.RESTRequest;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -60,7 +60,7 @@ public class TableCreateRequest implements RESTRequest {

@Nullable
@JsonProperty("indexes")
private final Index[] indexes;
private final IndexDTO[] indexes;

public TableCreateRequest() {
this(null, null, null, null, null, null, null, null);
Expand All @@ -74,7 +74,7 @@ public TableCreateRequest(
@Nullable SortOrderDTO[] sortOrders,
@Nullable DistributionDTO distribution,
@Nullable Partitioning[] partitioning,
@Nullable Index[] indexes) {
@Nullable IndexDTO[] indexes) {
this.name = name;
this.columns = columns;
this.comment = comment;
Expand Down Expand Up @@ -119,7 +119,13 @@ public void validate() throws IllegalArgumentException {
+ autoIncrementColsStr);

if (indexes != null && indexes.length > 0) {
throw new UnsupportedOperationException("Support for indexing is currently not implemented");
Arrays.stream(indexes)
.forEach(
index -> {
Preconditions.checkArgument(index.type() != null, "Index type cannot be null");
Preconditions.checkArgument(
index.fieldNames().length > 0, "Index field names cannot be null");
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.expressions.transforms.Transforms;
import com.datastrato.gravitino.rel.indexes.Index;
import com.datastrato.gravitino.rel.indexes.Indexes;
import java.util.Arrays;
import org.apache.commons.lang3.ArrayUtils;

Expand Down Expand Up @@ -120,6 +121,7 @@ public static TableDTO toDTO(Table table) {
.withDistribution(toDTO(table.distribution()))
.withAudit(toDTO(table.auditInfo()))
.withPartitioning(toDTOs(table.partitioning()))
.withIndex(toDTOs(table.index()))
.build();
}

Expand Down Expand Up @@ -261,7 +263,7 @@ public static Partitioning[] toDTOs(Transform[] transforms) {

public static IndexDTO[] toDTOs(Index[] indexes) {
if (ArrayUtils.isEmpty(indexes)) {
return new IndexDTO[0];
return IndexDTO.EMPTY_INDEXES;
}
return Arrays.stream(indexes).map(DTOConverters::toDTO).toArray(IndexDTO[]::new);
}
Expand Down Expand Up @@ -302,6 +304,18 @@ public static Expression fromFunctionArg(FunctionArg arg) {
}
}

public static Index fromDTO(IndexDTO indexDTO) {
return Indexes.of(indexDTO.type(), indexDTO.name(), indexDTO.fieldNames());
}

public static Index[] fromDTOs(IndexDTO[] indexDTOS) {
if (ArrayUtils.isEmpty(indexDTOS)) {
return Indexes.EMPTY_INDEXES;
}

return Arrays.stream(indexDTOS).map(DTOConverters::fromDTO).toArray(Index[]::new);
}

public static SortOrder fromDTO(SortOrderDTO sortOrderDTO) {
return SortOrders.of(
fromFunctionArg(sortOrderDTO.sortTerm()),
Expand Down
Loading

0 comments on commit 6d90696

Please sign in to comment.