Skip to content

Commit

Permalink
[bugfix](schema change) fix multi alter clauses for light schema chan…
Browse files Browse the repository at this point in the history
…ge. (#2)
  • Loading branch information
SWJTU-ZhangLei authored and Lchangliang committed Jun 22, 2022
1 parent 5413fab commit 92fed3f
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 28 deletions.
1 change: 1 addition & 0 deletions be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ void TabletSchema::build_current_tablet_schema(int64_t index_id,
bool has_bf_columns = false;
_cols.clear();
_field_name_to_index.clear();
_field_id_to_index.clear();

for (const POlapTableIndexSchema& index : ptable_schema_param.indexes()) {
if (index.id() == index_id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ protected void runWaitingTxnJob() throws AlterCancelException {
AlterReplicaTask rollupTask = new AlterReplicaTask(rollupReplica.getBackendId(), dbId, tableId,
partitionId, rollupIndexId, baseIndexId, rollupTabletId, baseTabletId,
rollupReplica.getId(), rollupSchemaHash, baseSchemaHash, visibleVersion, jobId,
JobType.ROLLUP, defineExprs, descTable);
JobType.ROLLUP, defineExprs, descTable, null);
rollupBatchTask.addTask(rollupTask);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1547,6 +1547,8 @@ public void process(List<AlterClause> alterClauses, String clusterName, Database
throws UserException {
olapTable.writeLockOrDdlException();
try {
//alterClauses can or cannot light schema change
boolean ligthSchemaChange = true;
// index id -> index schema
Map<Long, List<Column>> indexSchemaMap = new HashMap<>();
for (Map.Entry<Long, List<Column>> entry : olapTable.getIndexIdToSchema(true).entrySet()) {
Expand Down Expand Up @@ -1610,55 +1612,59 @@ public void process(List<AlterClause> alterClauses, String clusterName, Database

if (alterClause instanceof AddColumnClause) {
// add column
boolean ligthSchemaChange = processAddColumn((AddColumnClause) alterClause, olapTable, indexSchemaMap);
LOG.debug("processAddColumn, table: {}({}), getMaxColUniqueId(): {}, ligthSchemaChange: {}", olapTable.getName(), olapTable.getId(),
olapTable.getMaxColUniqueId(), ligthSchemaChange);
if (ligthSchemaChange) {
//for schema change add column optimize, direct modify table meta.
Catalog.getCurrentCatalog().modifyTableAddOrDropColumns(db, olapTable, indexSchemaMap, newIndexes, false);
return;
boolean clauseCanLigthSchemaChange = processAddColumn((AddColumnClause) alterClause, olapTable, indexSchemaMap);
if (clauseCanLigthSchemaChange == false) {
ligthSchemaChange = false;
}
} else if (alterClause instanceof AddColumnsClause) {
// add columns
boolean ligthSchemaChange = processAddColumns((AddColumnsClause) alterClause, olapTable, indexSchemaMap, false);
LOG.debug("processAddColumns, table: {}({}), getMaxColUniqueId(): {}, ligthSchemaChange: {}", olapTable.getName(), olapTable.getId(),
olapTable.getMaxColUniqueId(), ligthSchemaChange);
if (ligthSchemaChange) {
//for schema change add column optimize, direct modify table meta.
Catalog.getCurrentCatalog().modifyTableAddOrDropColumns(db, olapTable, indexSchemaMap, newIndexes, false);
return;
boolean clauseCanLigthSchemaChange = processAddColumns((AddColumnsClause) alterClause, olapTable, indexSchemaMap, false);
if (clauseCanLigthSchemaChange == false) {
ligthSchemaChange = false;
}
} else if (alterClause instanceof DropColumnClause) {
// drop column and drop indexes on this column
boolean ligthSchemaChange = processDropColumn((DropColumnClause) alterClause, olapTable, indexSchemaMap, newIndexes);
LOG.debug("processDropColumn, table: {}({}), getMaxColUniqueId(): {}", olapTable.getName(), olapTable.getId(), olapTable.getMaxColUniqueId());
if (ligthSchemaChange) {
//for schema change add column optimize, direct modify table meta.
Catalog.getCurrentCatalog().modifyTableAddOrDropColumns(db, olapTable, indexSchemaMap, newIndexes, false);
return;
boolean clauseCanLigthSchemaChange = processDropColumn((DropColumnClause) alterClause, olapTable, indexSchemaMap, newIndexes);
if (clauseCanLigthSchemaChange == false) {
ligthSchemaChange = false;
}
} else if (alterClause instanceof ModifyColumnClause) {
// modify column
processModifyColumn((ModifyColumnClause) alterClause, olapTable, indexSchemaMap);
ligthSchemaChange = false;
} else if (alterClause instanceof ReorderColumnsClause) {
// reorder column
processReorderColumn((ReorderColumnsClause) alterClause, olapTable, indexSchemaMap);
ligthSchemaChange = false;
} else if (alterClause instanceof ModifyTablePropertiesClause) {
// modify table properties
// do nothing, properties are already in propertyMap
ligthSchemaChange = false;
} else if (alterClause instanceof CreateIndexClause) {
if (processAddIndex((CreateIndexClause) alterClause, olapTable, newIndexes)) {
return;
}
ligthSchemaChange = false;
} else if (alterClause instanceof DropIndexClause) {
if (processDropIndex((DropIndexClause) alterClause, olapTable, newIndexes)) {
return;
}
ligthSchemaChange = false;
} else {
Preconditions.checkState(false);
}
} // end for alter clausesnnnnnn
createJob(db.getId(), olapTable, indexSchemaMap, propertyMap, newIndexes);
} // end for alter clauses

LOG.debug("processAddColumns, table: {}({}), getMaxColUniqueId(): {}, ligthSchemaChange: {}", olapTable.getName(),
olapTable.getId(), olapTable.getMaxColUniqueId(), ligthSchemaChange);

if (ligthSchemaChange) {
//for schema change add/drop value column optimize, direct modify table meta.
Catalog.getCurrentCatalog().modifyTableAddOrDropColumns(db, olapTable, indexSchemaMap, newIndexes, false);
return;
} else {
createJob(db.getId(), olapTable, indexSchemaMap, propertyMap, newIndexes);
}
} finally {
olapTable.writeUnlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ protected void runWaitingTxnJob() throws AlterCancelException {
long originIdxId = indexIdMap.get(shadowIdxId);
int shadowSchemaHash = indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash;
int originSchemaHash = tbl.getSchemaHashByIndexId(indexIdMap.get(shadowIdxId));

List<Column> originSchemaColumns = tbl.getSchemaByIndexId(originIdxId);
for (Tablet shadowTablet : shadowIdx.getTablets()) {
long shadowTabletId = shadowTablet.getId();
long originTabletId = partitionIndexTabletMap.get(partitionId, shadowIdxId).get(shadowTabletId);
Expand All @@ -440,7 +440,7 @@ protected void runWaitingTxnJob() throws AlterCancelException {
AlterReplicaTask rollupTask = new AlterReplicaTask(shadowReplica.getBackendId(), dbId,
tableId, partitionId, shadowIdxId, originIdxId, shadowTabletId, originTabletId,
shadowReplica.getId(), shadowSchemaHash, originSchemaHash, visibleVersion, jobId,
JobType.SCHEMA_CHANGE, defineExprs, descTable);
JobType.SCHEMA_CHANGE, defineExprs, descTable, originSchemaColumns);
schemaChangeBatchTask.addTask(rollupTask);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public SlotRef(TableName tblName, String col) {
public SlotRef(SlotDescriptor desc) {
super();
this.tblName = null;
this.col = null;
this.col = desc.getColumn() != null ? desc.getColumn().getName() : null;
this.desc = desc;
this.type = desc.getType();
// TODO(zc): label is meaningful
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,11 @@ private TOlapTableSchemaParam createSchema(long dbId, OlapTable table) {
List<String> columns = Lists.newArrayList();
List<TColumn> columns_desc = Lists.newArrayList();
columns.addAll(indexMeta.getSchema().stream().map(Column::getName).collect(Collectors.toList()));
columns_desc.addAll(indexMeta.getSchema().stream().map(Column::toThrift).collect(Collectors.toList()));
for (Column column : indexMeta.getSchema()) {
TColumn tColumn = column.toThrift();
column.setIndexFlag(tColumn, table.getIndexes());
columns_desc.add(tColumn);
}
TOlapTableIndexSchema indexSchema = new TOlapTableIndexSchema(pair.getKey(), columns,
indexMeta.getSchemaHash(), columns_desc);
schemaParam.addToIndexes(indexSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.Column;
import org.apache.doris.thrift.TAlterMaterializedViewParam;
import org.apache.doris.thrift.TAlterTabletReqV2;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TTaskType;

import com.google.common.collect.Lists;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

Expand All @@ -48,6 +51,7 @@ public class AlterReplicaTask extends AgentTask {

private Map<String, Expr> defineExprs;
private DescriptorTable descTable;
private List<Column> baseSchemaColumns;

/**
* AlterReplicaTask constructor.
Expand All @@ -56,7 +60,7 @@ public class AlterReplicaTask extends AgentTask {
public AlterReplicaTask(long backendId, long dbId, long tableId, long partitionId, long rollupIndexId,
long baseIndexId, long rollupTabletId, long baseTabletId, long newReplicaId, int newSchemaHash,
int baseSchemaHash, long version, long jobId, AlterJobV2.JobType jobType, Map<String, Expr> defineExprs,
DescriptorTable descTable) {
DescriptorTable descTable, List<Column> baseSchemaColumns) {
super(null, backendId, TTaskType.ALTER, dbId, tableId, partitionId, rollupIndexId, rollupTabletId);

this.baseTabletId = baseTabletId;
Expand All @@ -71,6 +75,7 @@ public AlterReplicaTask(long backendId, long dbId, long tableId, long partitionI
this.jobType = jobType;
this.defineExprs = defineExprs;
this.descTable = descTable;
this.baseSchemaColumns = baseSchemaColumns;
}

public long getBaseTabletId() {
Expand Down Expand Up @@ -115,6 +120,14 @@ public TAlterTabletReqV2 toThrift() {
}
}
req.setDescTbl(descTable.toThrift());

if (baseSchemaColumns != null) {
List<TColumn> columns = new ArrayList<TColumn>();
for (Column column : baseSchemaColumns) {
columns.add(column.toThrift());
}
req.setColumns(columns);
}
return req;
}
}

0 comments on commit 92fed3f

Please sign in to comment.