From 335eb1a53a3d9c50f07460fbb4aaff6676187ebf Mon Sep 17 00:00:00 2001 From: Zhou Kang Date: Wed, 10 Apr 2024 00:07:46 +0800 Subject: [PATCH] support doris table operation --- .../jdbc/operation/JdbcTableOperations.java | 7 +- .../DorisColumnDefaultValueConverter.java | 78 ++- .../doris/converter/DorisTypeConverter.java | 87 ++- .../doris/operation/DorisTableOperations.java | 534 +++++++++++++++++- .../test/DorisTableOperationsIT.java | 246 ++++++++ .../mysql/operation/MysqlTableOperations.java | 3 +- 6 files changed, 938 insertions(+), 17 deletions(-) create mode 100644 catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/integration/test/DorisTableOperationsIT.java diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcTableOperations.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcTableOperations.java index 71c68f947dd..5fc3cde3b54 100644 --- a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcTableOperations.java +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcTableOperations.java @@ -175,7 +175,7 @@ public JdbcTable load(String databaseName, String tableName) throws NoSuchTableE jdbcTableBuilder.withProperties(tableProperties); // 5.Leave the information to the bottom layer to append the table - correctJdbcTableFields(connection, tableName, jdbcTableBuilder); + correctJdbcTableFields(connection, databaseName, tableName, jdbcTableBuilder); return jdbcTableBuilder.build(); } catch (SQLException e) { throw exceptionMapper.toGravitinoException(e); @@ -275,7 +275,10 @@ protected ResultSet getColumns(Connection connection, String databaseName, Strin * @throws SQLException */ protected void correctJdbcTableFields( - Connection connection, String tableName, JdbcTable.Builder jdbcTableBuilder) + Connection connection, + String databaseName, + String tableName, + JdbcTable.Builder jdbcTableBuilder) throws SQLException { // nothing to do } diff --git a/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/converter/DorisColumnDefaultValueConverter.java b/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/converter/DorisColumnDefaultValueConverter.java index 429270537ce..b9b1b0f70b4 100644 --- a/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/converter/DorisColumnDefaultValueConverter.java +++ b/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/converter/DorisColumnDefaultValueConverter.java @@ -4,20 +4,94 @@ */ package com.datastrato.gravitino.catalog.doris.converter; +import static com.datastrato.gravitino.catalog.doris.converter.DorisTypeConverter.BIGINT; +import static com.datastrato.gravitino.catalog.doris.converter.DorisTypeConverter.CHAR; +import static com.datastrato.gravitino.catalog.doris.converter.DorisTypeConverter.DATETIME; +import static com.datastrato.gravitino.catalog.doris.converter.DorisTypeConverter.DECIMAL; +import static com.datastrato.gravitino.catalog.doris.converter.DorisTypeConverter.DOUBLE; +import static com.datastrato.gravitino.catalog.doris.converter.DorisTypeConverter.FLOAT; +import static com.datastrato.gravitino.catalog.doris.converter.DorisTypeConverter.INT; +import static com.datastrato.gravitino.catalog.doris.converter.DorisTypeConverter.SMALLINT; +import static com.datastrato.gravitino.catalog.doris.converter.DorisTypeConverter.TINYINT; import static com.datastrato.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET; +import static com.datastrato.gravitino.rel.Column.DEFAULT_VALUE_OF_CURRENT_TIMESTAMP; import com.datastrato.gravitino.catalog.jdbc.converter.JdbcColumnDefaultValueConverter; import com.datastrato.gravitino.catalog.jdbc.converter.JdbcTypeConverter; import com.datastrato.gravitino.rel.expressions.Expression; +import com.datastrato.gravitino.rel.expressions.UnparsedExpression; +import com.datastrato.gravitino.rel.expressions.literals.Literals; +import com.datastrato.gravitino.rel.types.Decimal; +import com.datastrato.gravitino.rel.types.Types; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; public class DorisColumnDefaultValueConverter extends JdbcColumnDefaultValueConverter { + @Override public Expression toGravitino( JdbcTypeConverter.JdbcTypeBean columnType, String columnDefaultValue, boolean isExpression, boolean nullable) { - // TODO: add implementation for doris catalog - return DEFAULT_VALUE_NOT_SET; + if (columnDefaultValue == null) { + return nullable ? Literals.NULL : DEFAULT_VALUE_NOT_SET; + } + + if (columnDefaultValue.equalsIgnoreCase(NULL)) { + return Literals.NULL; + } + + if (isExpression) { + if (columnDefaultValue.equals(CURRENT_TIMESTAMP)) { + return DEFAULT_VALUE_OF_CURRENT_TIMESTAMP; + } + // The parsing of Doris expressions is complex, so we are not currently undertaking the + // parsing. + return UnparsedExpression.of(columnDefaultValue); + } + + switch (columnType.getTypeName().toLowerCase()) { + case TINYINT: + return Literals.byteLiteral(Byte.valueOf(columnDefaultValue)); + case SMALLINT: + return Literals.shortLiteral(Short.valueOf(columnDefaultValue)); + case INT: + return Literals.integerLiteral(Integer.valueOf(columnDefaultValue)); + case BIGINT: + return Literals.longLiteral(Long.valueOf(columnDefaultValue)); + case FLOAT: + return Literals.floatLiteral(Float.valueOf(columnDefaultValue)); + case DOUBLE: + return Literals.doubleLiteral(Double.valueOf(columnDefaultValue)); + case DECIMAL: + return Literals.decimalLiteral( + Decimal.of( + columnDefaultValue, + Integer.parseInt(columnType.getColumnSize()), + Integer.parseInt(columnType.getScale()))); + case JdbcTypeConverter.DATE: + return Literals.dateLiteral(LocalDate.parse(columnDefaultValue, DATE_TIME_FORMATTER)); + case JdbcTypeConverter.TIME: + return Literals.timeLiteral(LocalTime.parse(columnDefaultValue, DATE_TIME_FORMATTER)); + case JdbcTypeConverter.TIMESTAMP: + case DATETIME: + return CURRENT_TIMESTAMP.equals(columnDefaultValue) + ? DEFAULT_VALUE_OF_CURRENT_TIMESTAMP + : Literals.timestampLiteral( + LocalDateTime.parse(columnDefaultValue, DATE_TIME_FORMATTER)); + case JdbcTypeConverter.VARCHAR: + return Literals.of( + columnDefaultValue, Types.VarCharType.of(Integer.parseInt(columnType.getColumnSize()))); + case CHAR: + return Literals.of( + columnDefaultValue, + Types.FixedCharType.of(Integer.parseInt(columnType.getColumnSize()))); + case JdbcTypeConverter.TEXT: + return Literals.stringLiteral(columnDefaultValue); + default: + throw new IllegalArgumentException("Unknown data columnType for literal: " + columnType); + } } } diff --git a/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/converter/DorisTypeConverter.java b/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/converter/DorisTypeConverter.java index 08b75e20b92..68127a2b17f 100644 --- a/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/converter/DorisTypeConverter.java +++ b/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/converter/DorisTypeConverter.java @@ -10,15 +10,96 @@ /** Type converter for Doris. */ public class DorisTypeConverter extends JdbcTypeConverter { - // TODO: add implementation for doris catalog + static final String BOOLEAN = "boolean"; + static final String TINYINT = "tinyint"; + static final String SMALLINT = "smallint"; + static final String INT = "int"; + static final String BIGINT = "bigint"; + static final String FLOAT = "float"; + static final String DOUBLE = "double"; + static final String DECIMAL = "decimal"; + static final String DATETIME = "datetime"; + static final String CHAR = "char"; + static final String STRING = "string"; @Override public Type toGravitinoType(JdbcTypeBean typeBean) { - return Types.UnparsedType.of(typeBean.getTypeName()); + switch (typeBean.getTypeName().toLowerCase()) { + case BOOLEAN: + return Types.BooleanType.get(); + case TINYINT: + return Types.ByteType.get(); + case SMALLINT: + return Types.ShortType.get(); + case INT: + return Types.IntegerType.get(); + case BIGINT: + return Types.LongType.get(); + case FLOAT: + return Types.FloatType.get(); + case DOUBLE: + return Types.DoubleType.get(); + case DECIMAL: + return Types.DecimalType.of( + Integer.parseInt(typeBean.getColumnSize()), Integer.parseInt(typeBean.getScale())); + case DATE: + return Types.DateType.get(); + case DATETIME: + return Types.TimestampType.withTimeZone(); + case CHAR: + return Types.FixedCharType.of(Integer.parseInt(typeBean.getColumnSize())); + case VARCHAR: + return Types.VarCharType.of(Integer.parseInt(typeBean.getColumnSize())); + case STRING: + case TEXT: + return Types.StringType.get(); + default: + throw new IllegalArgumentException("Not a supported type: " + typeBean); + } } @Override public String fromGravitinoType(Type type) { - throw new IllegalArgumentException("unsupported type: " + type.simpleString()); + if (type instanceof Types.BooleanType) { + return BOOLEAN; + } else if (type instanceof Types.ByteType) { + return TINYINT; + } else if (type instanceof Types.ShortType) { + return SMALLINT; + } else if (type instanceof Types.IntegerType) { + return INT; + } else if (type instanceof Types.LongType) { + return BIGINT; + } else if (type instanceof Types.FloatType) { + return FLOAT; + } else if (type instanceof Types.DoubleType) { + return DOUBLE; + } else if (type instanceof Types.DecimalType) { + return DECIMAL + + "(" + + ((Types.DecimalType) type).precision() + + "," + + ((Types.DecimalType) type).scale() + + ")"; + } else if (type instanceof Types.DateType) { + return DATE; + } else if (type instanceof Types.TimestampType) { + return DATETIME; + } else if (type instanceof Types.VarCharType) { + return VARCHAR + "(" + ((Types.VarCharType) type).length() + ")"; + } else if (type instanceof Types.FixedCharType) { + int length = ((Types.FixedCharType) type).length(); + if (length < 1 || length > 255) { + throw new IllegalArgumentException( + String.format( + "Type %s is invalid, length should be between 1 and 255", type.toString())); + } + + return CHAR + "(" + ((Types.FixedCharType) type).length() + ")"; + } else if (type instanceof Types.StringType) { + return STRING; + } + throw new IllegalArgumentException( + String.format("Couldn't convert Gravitino type %s to Doris type", type.toString())); } } diff --git a/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/operation/DorisTableOperations.java b/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/operation/DorisTableOperations.java index 396dbf7bf1b..b389602e200 100644 --- a/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/operation/DorisTableOperations.java +++ b/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/operation/DorisTableOperations.java @@ -4,18 +4,67 @@ */ package com.datastrato.gravitino.catalog.doris.operation; +import static com.datastrato.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET; + +import com.datastrato.gravitino.StringIdentifier; +import com.datastrato.gravitino.catalog.doris.utils.DorisUtils; import com.datastrato.gravitino.catalog.jdbc.JdbcColumn; import com.datastrato.gravitino.catalog.jdbc.JdbcTable; import com.datastrato.gravitino.catalog.jdbc.operation.JdbcTableOperations; +import com.datastrato.gravitino.exceptions.NoSuchColumnException; +import com.datastrato.gravitino.exceptions.NoSuchSchemaException; +import com.datastrato.gravitino.exceptions.NoSuchTableException; +import com.datastrato.gravitino.rel.Column; import com.datastrato.gravitino.rel.TableChange; import com.datastrato.gravitino.rel.expressions.distributions.Distribution; +import com.datastrato.gravitino.rel.expressions.distributions.Strategy; import com.datastrato.gravitino.rel.expressions.transforms.Transform; import com.datastrato.gravitino.rel.indexes.Index; +import com.datastrato.gravitino.rel.indexes.Indexes; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; /** Table operations for Doris. */ public class DorisTableOperations extends JdbcTableOperations { - // TODO: add implementation for doris catalog + private static final String BACK_QUOTE = "`"; + private static final String DORIS_AUTO_INCREMENT = "AUTO_INCREMENT"; + + private static final String NEW_LINE = "\n"; + + @Override + public List listTables(String databaseName) throws NoSuchSchemaException { + String showTablesQuery = "SHOW TABLES"; + + try (Connection connection = getConnection(databaseName); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(showTablesQuery)) { + + List names = new ArrayList<>(); + while (resultSet.next()) { + String tableName = resultSet.getString(1); + names.add(tableName); + } + LOG.info("Finished listing tables size {} for database name {} ", names.size(), databaseName); + return names; + } catch (final SQLException se) { + throw this.exceptionMapper.toGravitinoException(se); + } + } @Override protected String generateCreateTableSql( @@ -25,34 +74,501 @@ protected String generateCreateTableSql( Map properties, Transform[] partitioning, Distribution distribution, - Index[] indexes) { - throw new UnsupportedOperationException(); + com.datastrato.gravitino.rel.indexes.Index[] indexes) { + + validateIncrementCol(columns); + validateDistribution(distribution, columns); + + StringBuilder sqlBuilder = new StringBuilder(); + + sqlBuilder.append(String.format("CREATE TABLE `%s` (", tableName)).append(NEW_LINE); + + // Add columns + sqlBuilder.append( + Arrays.stream(columns) + .map( + column -> { + StringBuilder columnsSql = new StringBuilder(); + columnsSql + .append(SPACE) + .append(BACK_QUOTE) + .append(column.name()) + .append(BACK_QUOTE); + appendColumnDefinition(column, columnsSql); + return columnsSql.toString(); + }) + .collect(Collectors.joining(",\n"))); + + appendIndexesSql(indexes, sqlBuilder); + + sqlBuilder.append(NEW_LINE).append(")"); + + // Add table comment if specified + if (StringUtils.isNotEmpty(comment)) { + sqlBuilder.append(" COMMENT \"").append(comment).append("\""); + } + + // Add distribution info + if (distribution.strategy() == Strategy.HASH) { + sqlBuilder.append(NEW_LINE).append(" DISTRIBUTED BY HASH("); + sqlBuilder.append( + Arrays.stream(distribution.expressions()) + .map(column -> BACK_QUOTE + column.toString() + BACK_QUOTE) + .collect(Collectors.joining(", "))); + sqlBuilder.append(")"); + } else if (distribution.strategy() == Strategy.EVEN) { + sqlBuilder.append(NEW_LINE).append(" DISTRIBUTED BY ").append("RANDOM"); + } + + if (distribution.number() != 0) { + sqlBuilder.append(" BUCKETS ").append(distribution.number()); + } + + // Add table properties + sqlBuilder.append(NEW_LINE).append(DorisUtils.generatePropertiesSql(properties)); + + // Add Partition Info + if (partitioning != null && partitioning.length > 0) { + // TODO: Add partitioning support + } + + // Return the generated SQL statement + String result = sqlBuilder.toString(); + + LOG.info("Generated create table:{} sql: {}", tableName, result); + return result; + } + + private static void validateIncrementCol(JdbcColumn[] columns) { + // Get all auto increment column + List autoIncrementCols = + Arrays.stream(columns).filter(Column::autoIncrement).collect(Collectors.toList()); + + // Doris does not support auto increment column before version 2.1.0 + Preconditions.checkArgument( + autoIncrementCols.isEmpty(), "Doris does not support auto-increment column"); + } + + private static void validateDistribution(Distribution distribution, JdbcColumn[] columns) { + Preconditions.checkArgument(null != distribution, "Doris must set distribution"); + + Preconditions.checkArgument( + Strategy.HASH == distribution.strategy() || Strategy.EVEN == distribution.strategy(), + "Doris only supports HASH or EVEN distribution strategy"); + + if (distribution.strategy() == Strategy.HASH) { + // Check if the distribution column exists + Arrays.stream(distribution.expressions()) + .forEach( + expression -> { + Preconditions.checkArgument( + Arrays.stream(columns) + .anyMatch(column -> column.name().equalsIgnoreCase(expression.toString())), + "Distribution column " + expression + " does not exist in the table columns"); + }); + } + } + + @VisibleForTesting + static void appendIndexesSql( + com.datastrato.gravitino.rel.indexes.Index[] indexes, StringBuilder sqlBuilder) { + + if (indexes.length == 0) { + return; + } + + // validate indexes + Arrays.stream(indexes) + .forEach( + index -> { + if (index.fieldNames().length > 1) { + throw new IllegalArgumentException("Index does not support multi fields in Doris"); + } + }); + + String indexSql = + Arrays.stream(indexes) + .map(index -> String.format("INDEX %s (%s)", index.name(), index.fieldNames()[0][0])) + .collect(Collectors.joining(",\n")); + + sqlBuilder.append(",").append(NEW_LINE).append(indexSql); + } + + @Override + protected boolean getAutoIncrementInfo(ResultSet resultSet) throws SQLException { + return "YES".equalsIgnoreCase(resultSet.getString("IS_AUTOINCREMENT")); + } + + @Override + protected Map getTableProperties(Connection connection, String tableName) + throws SQLException { + + String showCreateTableSQL = String.format("SHOW CREATE TABLE `%s`", tableName); + + StringBuilder createTableSqlSb = new StringBuilder(); + try (PreparedStatement statement = connection.prepareStatement(showCreateTableSQL); + ResultSet resultSet = statement.executeQuery()) { + while (resultSet.next()) { + createTableSqlSb.append(resultSet.getString("Create Table")); + } + } + + String createTableSql = createTableSqlSb.toString(); + + if (StringUtils.isEmpty(createTableSql)) { + throw new NoSuchTableException( + "Table %s does not exist in %s.", tableName, connection.getCatalog()); + } + + return Collections.unmodifiableMap(DorisUtils.extractPropertiesFromSql(createTableSql)); + } + + @Override + protected List getIndexes(String databaseName, String tableName, DatabaseMetaData metaData) + throws SQLException { + String sql = String.format("SHOW INDEX FROM `%s` FROM `%s`", tableName, databaseName); + + // get Indexes from SQL + try (Connection connection = metaData.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(sql); + ResultSet resultSet = preparedStatement.executeQuery()) { + + List indexes = new ArrayList<>(); + while (resultSet.next()) { + String indexName = resultSet.getString("Key_name"); + String columnName = resultSet.getString("Column_name"); + indexes.add( + Indexes.of(Index.IndexType.PRIMARY_KEY, indexName, new String[][] {{columnName}})); + } + return indexes; + } catch (SQLException e) { + throw exceptionMapper.toGravitinoException(e); + } + } + + @Override + protected void correctJdbcTableFields( + Connection connection, String databaseName, String tableName, JdbcTable.Builder tableBuilder) + throws SQLException { + + try (ResultSet resultSet = getTable(connection, databaseName, tableName)) { + String comment = ""; + while (resultSet.next()) { + comment += resultSet.getString("TABLE_COMMENT"); + } + tableBuilder.withComment(comment); + } } @Override protected String generateRenameTableSql(String oldTableName, String newTableName) { - throw new UnsupportedOperationException(); + return String.format("ALTER TABLE `%s` RENAME `%s`", oldTableName, newTableName); } @Override protected String generateDropTableSql(String tableName) { - throw new UnsupportedOperationException(); + return String.format("DROP TABLE `%s`", tableName); } @Override protected String generatePurgeTableSql(String tableName) { - throw new UnsupportedOperationException(); + return String.format("TRUNCATE TABLE `%s`", tableName); } @Override protected String generateAlterTableSql( String databaseName, String tableName, TableChange... changes) { - throw new UnsupportedOperationException(); + // Not all operations require the original table information, so lazy loading is used here + JdbcTable lazyLoadTable = null; + TableChange.UpdateComment updateComment = null; + List setProperties = new ArrayList<>(); + List alterSql = new ArrayList<>(); + for (int i = 0; i < changes.length; i++) { + TableChange change = changes[i]; + if (change instanceof TableChange.UpdateComment) { + updateComment = (TableChange.UpdateComment) change; + } else if (change instanceof TableChange.SetProperty) { + // The set attribute needs to be added at the end. + setProperties.add(((TableChange.SetProperty) change)); + } else if (change instanceof TableChange.RemoveProperty) { + // Doris only support set properties, remove property is not supported yet + throw new IllegalArgumentException("Remove property is not supported yet"); + } else if (change instanceof TableChange.AddColumn) { + TableChange.AddColumn addColumn = (TableChange.AddColumn) change; + lazyLoadTable = getOrCreateTable(databaseName, tableName, lazyLoadTable); + alterSql.add(addColumnFieldDefinition(addColumn)); + } else if (change instanceof TableChange.RenameColumn) { + throw new IllegalArgumentException("Rename column is not supported yet"); + } else if (change instanceof TableChange.UpdateColumnType) { + lazyLoadTable = getOrCreateTable(databaseName, tableName, lazyLoadTable); + TableChange.UpdateColumnType updateColumnType = (TableChange.UpdateColumnType) change; + alterSql.add(updateColumnTypeFieldDefinition(updateColumnType, lazyLoadTable)); + } else if (change instanceof TableChange.UpdateColumnComment) { + TableChange.UpdateColumnComment updateColumnComment = + (TableChange.UpdateColumnComment) change; + alterSql.add(updateColumnCommentFieldDefinition(updateColumnComment)); + } else if (change instanceof TableChange.UpdateColumnPosition) { + lazyLoadTable = getOrCreateTable(databaseName, tableName, lazyLoadTable); + TableChange.UpdateColumnPosition updateColumnPosition = + (TableChange.UpdateColumnPosition) change; + alterSql.add(updateColumnPositionFieldDefinition(updateColumnPosition, lazyLoadTable)); + } else if (change instanceof TableChange.DeleteColumn) { + TableChange.DeleteColumn deleteColumn = (TableChange.DeleteColumn) change; + lazyLoadTable = getOrCreateTable(databaseName, tableName, lazyLoadTable); + String deleteColSql = deleteColumnFieldDefinition(deleteColumn, lazyLoadTable); + if (StringUtils.isNotEmpty(deleteColSql)) { + alterSql.add(deleteColSql); + } + } else if (change instanceof TableChange.UpdateColumnNullability) { + lazyLoadTable = getOrCreateTable(databaseName, tableName, lazyLoadTable); + alterSql.add( + updateColumnNullabilityDefinition( + (TableChange.UpdateColumnNullability) change, lazyLoadTable)); + } else if (change instanceof TableChange.AddIndex) { + alterSql.add(addIndexDefinition((TableChange.AddIndex) change)); + } else if (change instanceof TableChange.DeleteIndex) { + lazyLoadTable = getOrCreateTable(databaseName, tableName, lazyLoadTable); + alterSql.add(deleteIndexDefinition(lazyLoadTable, (TableChange.DeleteIndex) change)); + } else { + throw new IllegalArgumentException( + "Unsupported table change type: " + change.getClass().getName()); + } + } + if (!setProperties.isEmpty()) { + alterSql.add(generateTableProperties(setProperties)); + } + + // Last modified comment + if (null != updateComment) { + String newComment = updateComment.getNewComment(); + if (null == StringIdentifier.fromComment(newComment)) { + // Detect and add gravitino id. + JdbcTable jdbcTable = getOrCreateTable(databaseName, tableName, lazyLoadTable); + StringIdentifier identifier = StringIdentifier.fromComment(jdbcTable.comment()); + if (null != identifier) { + newComment = StringIdentifier.addToComment(identifier, newComment); + } + } + alterSql.add("MODIFY COMMENT \"" + newComment + "\""); + } + + if (!setProperties.isEmpty()) { + alterSql.add(generateTableProperties(setProperties)); + } + + if (CollectionUtils.isEmpty(alterSql)) { + return ""; + } + // Return the generated SQL statement + String result = "ALTER TABLE `" + tableName + "`\n" + String.join(",\n", alterSql) + ";"; + LOG.info("Generated alter table:{} sql: {}", databaseName + "." + tableName, result); + return result; + } + + private String updateColumnNullabilityDefinition( + TableChange.UpdateColumnNullability change, JdbcTable table) { + validateUpdateColumnNullable(change, table); + String col = change.fieldName()[0]; + JdbcColumn column = getJdbcColumnFromTable(table, col); + JdbcColumn updateColumn = + JdbcColumn.builder() + .withName(col) + .withDefaultValue(column.defaultValue()) + .withNullable(change.nullable()) + .withType(column.dataType()) + .withComment(column.comment()) + .withAutoIncrement(column.autoIncrement()) + .build(); + return "MODIFY COLUMN " + + BACK_QUOTE + + col + + BACK_QUOTE + + appendColumnDefinition(updateColumn, new StringBuilder()); + } + + private String generateTableProperties(List setProperties) { + return setProperties.stream() + .map( + setProperty -> + String.format("\"%s\" = \"%s\"", setProperty.getProperty(), setProperty.getValue())) + .collect(Collectors.joining(",\n")); } - @Override protected JdbcTable getOrCreateTable( String databaseName, String tableName, JdbcTable lazyLoadCreateTable) { - throw new UnsupportedOperationException(); + return null != lazyLoadCreateTable ? lazyLoadCreateTable : load(databaseName, tableName); + } + + private String updateColumnCommentFieldDefinition( + TableChange.UpdateColumnComment updateColumnComment) { + String newComment = updateColumnComment.getNewComment(); + if (updateColumnComment.fieldName().length > 1) { + throw new UnsupportedOperationException("Doris does not support nested column names."); + } + String col = updateColumnComment.fieldName()[0]; + + return String.format("MODIFY COLUMN `%s` COMMENT '%s'", col, newComment); + } + + private String addColumnFieldDefinition(TableChange.AddColumn addColumn) { + String dataType = (String) typeConverter.fromGravitinoType(addColumn.getDataType()); + if (addColumn.fieldName().length > 1) { + throw new UnsupportedOperationException("Doris does not support nested column names."); + } + String col = addColumn.fieldName()[0]; + + StringBuilder columnDefinition = new StringBuilder(); + columnDefinition + .append("ADD COLUMN ") + .append(BACK_QUOTE) + .append(col) + .append(BACK_QUOTE) + .append(SPACE) + .append(dataType) + .append(SPACE); + + if (!addColumn.isNullable()) { + columnDefinition.append("NOT NULL "); + } + // Append comment if available + if (StringUtils.isNotEmpty(addColumn.getComment())) { + columnDefinition.append("COMMENT '").append(addColumn.getComment()).append("' "); + } + + // Append position if available + if (addColumn.getPosition() instanceof TableChange.First) { + columnDefinition.append("FIRST"); + } else if (addColumn.getPosition() instanceof TableChange.After) { + TableChange.After afterPosition = (TableChange.After) addColumn.getPosition(); + columnDefinition + .append("AFTER ") + .append(BACK_QUOTE) + .append(afterPosition.getColumn()) + .append(BACK_QUOTE); + } else if (addColumn.getPosition() instanceof TableChange.Default) { + // do nothing, follow the default behavior of doris + } else { + throw new IllegalArgumentException("Invalid column position."); + } + return columnDefinition.toString(); + } + + private String updateColumnPositionFieldDefinition( + TableChange.UpdateColumnPosition updateColumnPosition, JdbcTable jdbcTable) { + if (updateColumnPosition.fieldName().length > 1) { + throw new UnsupportedOperationException("Doris does not support nested column names."); + } + String col = updateColumnPosition.fieldName()[0]; + JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col); + StringBuilder columnDefinition = new StringBuilder(); + columnDefinition.append("MODIFY COLUMN ").append(BACK_QUOTE).append(col).append(BACK_QUOTE); + appendColumnDefinition(column, columnDefinition); + if (updateColumnPosition.getPosition() instanceof TableChange.First) { + columnDefinition.append("FIRST"); + } else if (updateColumnPosition.getPosition() instanceof TableChange.After) { + TableChange.After afterPosition = (TableChange.After) updateColumnPosition.getPosition(); + columnDefinition + .append("AFTER ") + .append(BACK_QUOTE) + .append(afterPosition.getColumn()) + .append(BACK_QUOTE); + } else { + Arrays.stream(jdbcTable.columns()) + .reduce((column1, column2) -> column2) + .map(Column::name) + .ifPresent(s -> columnDefinition.append("AFTER ").append(s)); + } + return columnDefinition.toString(); + } + + private String deleteColumnFieldDefinition( + TableChange.DeleteColumn deleteColumn, JdbcTable jdbcTable) { + if (deleteColumn.fieldName().length > 1) { + throw new UnsupportedOperationException("Doris does not support nested column names."); + } + String col = deleteColumn.fieldName()[0]; + boolean colExists = true; + try { + getJdbcColumnFromTable(jdbcTable, col); + } catch (NoSuchColumnException noSuchColumnException) { + colExists = false; + } + if (!colExists) { + if (BooleanUtils.isTrue(deleteColumn.getIfExists())) { + return ""; + } else { + throw new IllegalArgumentException("Delete column does not exist: " + col); + } + } + return "DROP COLUMN " + BACK_QUOTE + col + BACK_QUOTE; + } + + private String updateColumnTypeFieldDefinition( + TableChange.UpdateColumnType updateColumnType, JdbcTable jdbcTable) { + if (updateColumnType.fieldName().length > 1) { + throw new UnsupportedOperationException("Doris does not support nested column names."); + } + String col = updateColumnType.fieldName()[0]; + JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col); + StringBuilder sqlBuilder = new StringBuilder("MODIFY COLUMN " + BACK_QUOTE + col + BACK_QUOTE); + JdbcColumn newColumn = + JdbcColumn.builder() + .withName(col) + .withType(updateColumnType.getNewDataType()) + .withComment(column.comment()) + .withDefaultValue(DEFAULT_VALUE_NOT_SET) + .withNullable(column.nullable()) + .withAutoIncrement(column.autoIncrement()) + .build(); + return appendColumnDefinition(newColumn, sqlBuilder).toString(); + } + + private StringBuilder appendColumnDefinition(JdbcColumn column, StringBuilder sqlBuilder) { + // Add data type + sqlBuilder + .append(SPACE) + .append(typeConverter.fromGravitinoType(column.dataType())) + .append(SPACE); + + // Add NOT NULL if the column is marked as such + if (column.nullable()) { + sqlBuilder.append("NULL "); + } else { + sqlBuilder.append("NOT NULL "); + } + + // Add DEFAULT value if specified + if (!DEFAULT_VALUE_NOT_SET.equals(column.defaultValue())) { + sqlBuilder + .append("DEFAULT ") + .append(columnDefaultValueConverter.fromGravitino(column.defaultValue())) + .append(SPACE); + } + + // Add column auto_increment if specified + if (column.autoIncrement()) { + sqlBuilder.append(DORIS_AUTO_INCREMENT).append(" "); + } + + // Add column comment if specified + if (StringUtils.isNotEmpty(column.comment())) { + sqlBuilder.append("COMMENT '").append(column.comment()).append("' "); + } + return sqlBuilder; + } + + static String addIndexDefinition(TableChange.AddIndex addIndex) { + return String.format("ADD INDEX %s (%s)", addIndex.getName(), addIndex.getFieldNames()[0][0]); + } + + static String deleteIndexDefinition( + JdbcTable lazyLoadTable, TableChange.DeleteIndex deleteIndex) { + if (deleteIndex.isIfExists()) { + Preconditions.checkArgument( + Arrays.stream(lazyLoadTable.index()) + .anyMatch(index -> index.name().equals(deleteIndex.getName())), + "Index does not exist"); + } + return "DROP INDEX " + deleteIndex.getName(); } } diff --git a/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/integration/test/DorisTableOperationsIT.java b/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/integration/test/DorisTableOperationsIT.java new file mode 100644 index 00000000000..72c8ad48be2 --- /dev/null +++ b/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/integration/test/DorisTableOperationsIT.java @@ -0,0 +1,246 @@ +/* + * Copyright 2023 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.doris.integration.test; + +import com.datastrato.gravitino.catalog.jdbc.JdbcColumn; +import com.datastrato.gravitino.catalog.jdbc.JdbcTable; +import com.datastrato.gravitino.integration.test.util.GravitinoITUtils; +import com.datastrato.gravitino.rel.TableChange; +import com.datastrato.gravitino.rel.expressions.NamedReference; +import com.datastrato.gravitino.rel.expressions.distributions.Distribution; +import com.datastrato.gravitino.rel.expressions.distributions.Distributions; +import com.datastrato.gravitino.rel.indexes.Index; +import com.datastrato.gravitino.rel.types.Type; +import com.datastrato.gravitino.rel.types.Types; +import com.google.common.collect.Maps; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +@Tag("gravitino-docker-it") +public class DorisTableOperationsIT extends TestDorisAbstractIT { + private static final Type VARCHAR_255 = Types.VarCharType.of(255); + private static final Type VARCHAR_1024 = Types.VarCharType.of(1024); + + private static Type INT = Types.IntegerType.get(); + + private static final String databaseName = GravitinoITUtils.genRandomName("doris_test_db"); + private static final String tableName = GravitinoITUtils.genRandomName("doris_test_table"); + + @BeforeAll + public static void startup() { + TestDorisAbstractIT.startup(); + createDatabase(); + } + + private static void createDatabase() { + DATABASE_OPERATIONS.create(databaseName, "test_comment", new HashMap<>()); + } + + private static Map createProperties() { + Map properties = Maps.newHashMap(); + properties.put("replication_allocation", "tag.location.default: 1"); + return properties; + } + + private static void waitForDorisOperation() { + // TODO: use a better way to wait for the operation to complete + // see: https://doris.apache.org/docs/1.2/advanced/alter-table/schema-change/ + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + // do nothing + } + } + + @Test + public void testBasicTableOperation() { + String tableComment = "test_comment"; + List columns = new ArrayList<>(); + JdbcColumn col_1 = + JdbcColumn.builder().withName("col_1").withType(INT).withComment("id").build(); + columns.add(col_1); + JdbcColumn col_2 = + JdbcColumn.builder().withName("col_2").withType(VARCHAR_255).withComment("col_2").build(); + columns.add(col_2); + JdbcColumn col_3 = + JdbcColumn.builder().withName("col_3").withType(VARCHAR_255).withComment("col_3").build(); + columns.add(col_3); + Map properties = new HashMap<>(); + + Distribution distribution = Distributions.hash(32, NamedReference.field("col_1")); + Index[] indexes = new Index[] {}; + + // create table + TABLE_OPERATIONS.create( + databaseName, + tableName, + columns.toArray(new JdbcColumn[0]), + tableComment, + createProperties(), + null, + distribution, + indexes); + List listTables = TABLE_OPERATIONS.listTables(databaseName); + Assertions.assertTrue(listTables.contains(tableName)); + JdbcTable load = TABLE_OPERATIONS.load(databaseName, tableName); + assertionsTableInfo(tableName, tableComment, columns, properties, indexes, load); + } + + @Test + public void testAlterTable() { + + String tableComment = "test_comment"; + List columns = new ArrayList<>(); + JdbcColumn col_1 = + JdbcColumn.builder().withName("col_1").withType(INT).withComment("id").build(); + columns.add(col_1); + JdbcColumn col_2 = + JdbcColumn.builder().withName("col_2").withType(VARCHAR_255).withComment("col_2").build(); + columns.add(col_2); + JdbcColumn col_3 = + JdbcColumn.builder().withName("col_3").withType(VARCHAR_255).withComment("col_3").build(); + columns.add(col_3); + Map properties = new HashMap<>(); + + Distribution distribution = Distributions.hash(32, NamedReference.field("col_1")); + Index[] indexes = new Index[] {}; + + // create table + TABLE_OPERATIONS.create( + databaseName, + tableName, + columns.toArray(new JdbcColumn[0]), + tableComment, + createProperties(), + null, + distribution, + indexes); + JdbcTable load = TABLE_OPERATIONS.load(databaseName, tableName); + assertionsTableInfo(tableName, tableComment, columns, properties, indexes, load); + + TABLE_OPERATIONS.alterTable( + databaseName, + tableName, + TableChange.updateColumnType(new String[] {col_3.name()}, VARCHAR_1024)); + + waitForDorisOperation(); + + load = TABLE_OPERATIONS.load(databaseName, tableName); + + // After modifying the type, check it + columns.clear(); + col_3 = + JdbcColumn.builder() + .withName(col_3.name()) + .withType(VARCHAR_1024) + .withComment(col_3.comment()) + .build(); + columns.add(col_1); + columns.add(col_2); + columns.add(col_3); + assertionsTableInfo(tableName, tableComment, columns, properties, indexes, load); + + String colNewComment = "new_comment"; + // update column comment + + TABLE_OPERATIONS.alterTable( + databaseName, + tableName, + TableChange.updateColumnComment(new String[] {col_2.name()}, colNewComment)); + load = TABLE_OPERATIONS.load(databaseName, tableName); + + columns.clear(); + col_2 = + JdbcColumn.builder() + .withName(col_2.name()) + .withType(col_2.dataType()) + .withComment(colNewComment) + .build(); + columns.add(col_1); + columns.add(col_2); + columns.add(col_3); + assertionsTableInfo(tableName, tableComment, columns, properties, indexes, load); + + // add new column + TABLE_OPERATIONS.alterTable( + databaseName, + tableName, + TableChange.addColumn(new String[] {"col_4"}, VARCHAR_255, "txt4", true)); + + waitForDorisOperation(); + load = TABLE_OPERATIONS.load(databaseName, tableName); + + columns.clear(); + JdbcColumn col_4 = + JdbcColumn.builder().withName("col_4").withType(VARCHAR_255).withComment("txt4").build(); + columns.add(col_1); + columns.add(col_2); + columns.add(col_3); + columns.add(col_4); + assertionsTableInfo(tableName, tableComment, columns, properties, indexes, load); + + // change column position + TABLE_OPERATIONS.alterTable( + databaseName, + tableName, + TableChange.updateColumnPosition( + new String[] {"col_3"}, TableChange.ColumnPosition.after("col_4"))); + waitForDorisOperation(); + load = TABLE_OPERATIONS.load(databaseName, tableName); + + columns.clear(); + columns.add(col_1); + columns.add(col_2); + columns.add(col_4); + columns.add(col_3); + assertionsTableInfo(tableName, tableComment, columns, properties, indexes, load); + } + + @Test + public void testCreateAllTypeTable() { + String tableName = GravitinoITUtils.genRandomName("type_table"); + String tableComment = "test_comment"; + List columns = new ArrayList<>(); + columns.add(JdbcColumn.builder().withName("col_1").withType(Types.IntegerType.get()).build()); + columns.add(JdbcColumn.builder().withName("col_2").withType(Types.BooleanType.get()).build()); + columns.add(JdbcColumn.builder().withName("col_3").withType(Types.ByteType.get()).build()); + columns.add(JdbcColumn.builder().withName("col_4").withType(Types.ShortType.get()).build()); + columns.add(JdbcColumn.builder().withName("col_5").withType(Types.IntegerType.get()).build()); + columns.add(JdbcColumn.builder().withName("col_6").withType(Types.LongType.get()).build()); + columns.add(JdbcColumn.builder().withName("col_7").withType(Types.FloatType.get()).build()); + columns.add(JdbcColumn.builder().withName("col_8").withType(Types.DoubleType.get()).build()); + columns.add( + JdbcColumn.builder().withName("col_9").withType(Types.DecimalType.of(10, 2)).build()); + columns.add(JdbcColumn.builder().withName("col_10").withType(Types.DateType.get()).build()); + columns.add(JdbcColumn.builder().withName("col_11").withType(Types.TimeType.get()).build()); + columns.add( + JdbcColumn.builder().withName("col_12").withType(Types.FixedCharType.of(10)).build()); + columns.add(JdbcColumn.builder().withName("col_13").withType(Types.VarCharType.of(10)).build()); + columns.add(JdbcColumn.builder().withName("col_14").withType(Types.StringType.get()).build()); + + Distribution distribution = Distributions.hash(32, NamedReference.field("col_1")); + Index[] indexes = new Index[] {}; + // create table + TABLE_OPERATIONS.create( + databaseName, + tableName, + columns.toArray(new JdbcColumn[0]), + tableComment, + createProperties(), + null, + distribution, + indexes); + + JdbcTable load = TABLE_OPERATIONS.load(databaseName, tableName); + assertionsTableInfo(tableName, tableComment, columns, Collections.emptyMap(), null, load); + } +} diff --git a/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/operation/MysqlTableOperations.java b/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/operation/MysqlTableOperations.java index ef7486e3e3f..c961d65eb57 100644 --- a/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/operation/MysqlTableOperations.java +++ b/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/operation/MysqlTableOperations.java @@ -254,7 +254,8 @@ protected Map getTableProperties(Connection connection, String t @Override protected void correctJdbcTableFields( - Connection connection, String tableName, JdbcTable.Builder tableBuilder) throws SQLException { + Connection connection, String databaseName, String tableName, JdbcTable.Builder tableBuilder) + throws SQLException { if (StringUtils.isEmpty(tableBuilder.comment())) { // In Mysql version 5.7, the comment field value cannot be obtained in the driver API. LOG.warn("Not found comment in mysql driver api. Will try to get comment from sql");