From 92fed3f6dda32f003f8f064bd819cbe81e22836d Mon Sep 17 00:00:00 2001 From: Lei Zhang <1091517373@qq.com> Date: Tue, 21 Jun 2022 16:50:14 +0800 Subject: [PATCH] [bugfix](schema change) fix multi alter clauses for light schema change. (#2) --- be/src/olap/tablet_schema.cpp | 1 + .../org/apache/doris/alter/RollupJobV2.java | 2 +- .../doris/alter/SchemaChangeHandler.java | 50 +++++++++++-------- .../apache/doris/alter/SchemaChangeJobV2.java | 4 +- .../org/apache/doris/analysis/SlotRef.java | 2 +- .../apache/doris/planner/OlapTableSink.java | 6 ++- .../apache/doris/task/AlterReplicaTask.java | 15 +++++- 7 files changed, 52 insertions(+), 28 deletions(-) diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index c42f22495aa495..bc5931db85315b 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -520,6 +520,7 @@ void TabletSchema::build_current_tablet_schema(int64_t index_id, bool has_bf_columns = false; _cols.clear(); _field_name_to_index.clear(); + _field_id_to_index.clear(); for (const POlapTableIndexSchema& index : ptable_schema_param.indexes()) { if (index.id() == index_id) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index a05863d9eab506..ba0607f9d70a1e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -381,7 +381,7 @@ protected void runWaitingTxnJob() throws AlterCancelException { AlterReplicaTask rollupTask = new AlterReplicaTask(rollupReplica.getBackendId(), dbId, tableId, partitionId, rollupIndexId, baseIndexId, rollupTabletId, baseTabletId, rollupReplica.getId(), rollupSchemaHash, baseSchemaHash, visibleVersion, jobId, - JobType.ROLLUP, defineExprs, descTable); + JobType.ROLLUP, defineExprs, descTable, null); rollupBatchTask.addTask(rollupTask); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 4ebb64415a1962..93b22f26c1ab54 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -1547,6 +1547,8 @@ public void process(List alterClauses, String clusterName, Database throws UserException { olapTable.writeLockOrDdlException(); try { + //alterClauses can or cannot light schema change + boolean ligthSchemaChange = true; // index id -> index schema Map> indexSchemaMap = new HashMap<>(); for (Map.Entry> entry : olapTable.getIndexIdToSchema(true).entrySet()) { @@ -1610,55 +1612,59 @@ public void process(List alterClauses, String clusterName, Database if (alterClause instanceof AddColumnClause) { // add column - boolean ligthSchemaChange = processAddColumn((AddColumnClause) alterClause, olapTable, indexSchemaMap); - LOG.debug("processAddColumn, table: {}({}), getMaxColUniqueId(): {}, ligthSchemaChange: {}", olapTable.getName(), olapTable.getId(), - olapTable.getMaxColUniqueId(), ligthSchemaChange); - if (ligthSchemaChange) { - //for schema change add column optimize, direct modify table meta. - Catalog.getCurrentCatalog().modifyTableAddOrDropColumns(db, olapTable, indexSchemaMap, newIndexes, false); - return; + boolean clauseCanLigthSchemaChange = processAddColumn((AddColumnClause) alterClause, olapTable, indexSchemaMap); + if (clauseCanLigthSchemaChange == false) { + ligthSchemaChange = false; } } else if (alterClause instanceof AddColumnsClause) { // add columns - boolean ligthSchemaChange = processAddColumns((AddColumnsClause) alterClause, olapTable, indexSchemaMap, false); - LOG.debug("processAddColumns, table: {}({}), getMaxColUniqueId(): {}, ligthSchemaChange: {}", olapTable.getName(), olapTable.getId(), - olapTable.getMaxColUniqueId(), ligthSchemaChange); - if (ligthSchemaChange) { - //for schema change add column optimize, direct modify table meta. - Catalog.getCurrentCatalog().modifyTableAddOrDropColumns(db, olapTable, indexSchemaMap, newIndexes, false); - return; + boolean clauseCanLigthSchemaChange = processAddColumns((AddColumnsClause) alterClause, olapTable, indexSchemaMap, false); + if (clauseCanLigthSchemaChange == false) { + ligthSchemaChange = false; } } else if (alterClause instanceof DropColumnClause) { // drop column and drop indexes on this column - boolean ligthSchemaChange = processDropColumn((DropColumnClause) alterClause, olapTable, indexSchemaMap, newIndexes); - LOG.debug("processDropColumn, table: {}({}), getMaxColUniqueId(): {}", olapTable.getName(), olapTable.getId(), olapTable.getMaxColUniqueId()); - if (ligthSchemaChange) { - //for schema change add column optimize, direct modify table meta. - Catalog.getCurrentCatalog().modifyTableAddOrDropColumns(db, olapTable, indexSchemaMap, newIndexes, false); - return; + boolean clauseCanLigthSchemaChange = processDropColumn((DropColumnClause) alterClause, olapTable, indexSchemaMap, newIndexes); + if (clauseCanLigthSchemaChange == false) { + ligthSchemaChange = false; } } else if (alterClause instanceof ModifyColumnClause) { // modify column processModifyColumn((ModifyColumnClause) alterClause, olapTable, indexSchemaMap); + ligthSchemaChange = false; } else if (alterClause instanceof ReorderColumnsClause) { // reorder column processReorderColumn((ReorderColumnsClause) alterClause, olapTable, indexSchemaMap); + ligthSchemaChange = false; } else if (alterClause instanceof ModifyTablePropertiesClause) { // modify table properties // do nothing, properties are already in propertyMap + ligthSchemaChange = false; } else if (alterClause instanceof CreateIndexClause) { if (processAddIndex((CreateIndexClause) alterClause, olapTable, newIndexes)) { return; } + ligthSchemaChange = false; } else if (alterClause instanceof DropIndexClause) { if (processDropIndex((DropIndexClause) alterClause, olapTable, newIndexes)) { return; } + ligthSchemaChange = false; } else { Preconditions.checkState(false); } - } // end for alter clausesnnnnnn - createJob(db.getId(), olapTable, indexSchemaMap, propertyMap, newIndexes); + } // end for alter clauses + + LOG.debug("processAddColumns, table: {}({}), getMaxColUniqueId(): {}, ligthSchemaChange: {}", olapTable.getName(), + olapTable.getId(), olapTable.getMaxColUniqueId(), ligthSchemaChange); + + if (ligthSchemaChange) { + //for schema change add/drop value column optimize, direct modify table meta. + Catalog.getCurrentCatalog().modifyTableAddOrDropColumns(db, olapTable, indexSchemaMap, newIndexes, false); + return; + } else { + createJob(db.getId(), olapTable, indexSchemaMap, propertyMap, newIndexes); + } } finally { olapTable.writeUnlock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 2d06ef164eb983..0bb0bae43ab6d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -431,7 +431,7 @@ protected void runWaitingTxnJob() throws AlterCancelException { long originIdxId = indexIdMap.get(shadowIdxId); int shadowSchemaHash = indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash; int originSchemaHash = tbl.getSchemaHashByIndexId(indexIdMap.get(shadowIdxId)); - + List originSchemaColumns = tbl.getSchemaByIndexId(originIdxId); for (Tablet shadowTablet : shadowIdx.getTablets()) { long shadowTabletId = shadowTablet.getId(); long originTabletId = partitionIndexTabletMap.get(partitionId, shadowIdxId).get(shadowTabletId); @@ -440,7 +440,7 @@ protected void runWaitingTxnJob() throws AlterCancelException { AlterReplicaTask rollupTask = new AlterReplicaTask(shadowReplica.getBackendId(), dbId, tableId, partitionId, shadowIdxId, originIdxId, shadowTabletId, originTabletId, shadowReplica.getId(), shadowSchemaHash, originSchemaHash, visibleVersion, jobId, - JobType.SCHEMA_CHANGE, defineExprs, descTable); + JobType.SCHEMA_CHANGE, defineExprs, descTable, originSchemaColumns); schemaChangeBatchTask.addTask(rollupTask); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java index 6e8017e3bd5844..c193143f1810af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java @@ -71,7 +71,7 @@ public SlotRef(TableName tblName, String col) { public SlotRef(SlotDescriptor desc) { super(); this.tblName = null; - this.col = null; + this.col = desc.getColumn() != null ? desc.getColumn().getName() : null; this.desc = desc; this.type = desc.getType(); // TODO(zc): label is meaningful diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index 29b2753ec06c28..10719ca2dff1d7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -189,7 +189,11 @@ private TOlapTableSchemaParam createSchema(long dbId, OlapTable table) { List columns = Lists.newArrayList(); List columns_desc = Lists.newArrayList(); columns.addAll(indexMeta.getSchema().stream().map(Column::getName).collect(Collectors.toList())); - columns_desc.addAll(indexMeta.getSchema().stream().map(Column::toThrift).collect(Collectors.toList())); + for (Column column : indexMeta.getSchema()) { + TColumn tColumn = column.toThrift(); + column.setIndexFlag(tColumn, table.getIndexes()); + columns_desc.add(tColumn); + } TOlapTableIndexSchema indexSchema = new TOlapTableIndexSchema(pair.getKey(), columns, indexMeta.getSchemaHash(), columns_desc); schemaParam.addToIndexes(indexSchema); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java index 4235ce0ded3ad2..d9ac6f0bb4743c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java @@ -21,12 +21,15 @@ import org.apache.doris.analysis.DescriptorTable; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.SlotRef; +import org.apache.doris.catalog.Column; import org.apache.doris.thrift.TAlterMaterializedViewParam; import org.apache.doris.thrift.TAlterTabletReqV2; +import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TTaskType; import com.google.common.collect.Lists; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -48,6 +51,7 @@ public class AlterReplicaTask extends AgentTask { private Map defineExprs; private DescriptorTable descTable; + private List baseSchemaColumns; /** * AlterReplicaTask constructor. @@ -56,7 +60,7 @@ public class AlterReplicaTask extends AgentTask { public AlterReplicaTask(long backendId, long dbId, long tableId, long partitionId, long rollupIndexId, long baseIndexId, long rollupTabletId, long baseTabletId, long newReplicaId, int newSchemaHash, int baseSchemaHash, long version, long jobId, AlterJobV2.JobType jobType, Map defineExprs, - DescriptorTable descTable) { + DescriptorTable descTable, List baseSchemaColumns) { super(null, backendId, TTaskType.ALTER, dbId, tableId, partitionId, rollupIndexId, rollupTabletId); this.baseTabletId = baseTabletId; @@ -71,6 +75,7 @@ public AlterReplicaTask(long backendId, long dbId, long tableId, long partitionI this.jobType = jobType; this.defineExprs = defineExprs; this.descTable = descTable; + this.baseSchemaColumns = baseSchemaColumns; } public long getBaseTabletId() { @@ -115,6 +120,14 @@ public TAlterTabletReqV2 toThrift() { } } req.setDescTbl(descTable.toThrift()); + + if (baseSchemaColumns != null) { + List columns = new ArrayList(); + for (Column column : baseSchemaColumns) { + columns.add(column.toThrift()); + } + req.setColumns(columns); + } return req; } }