Skip to content

Commit

Permalink
[Enhancement] Migrate part of db lock to table lock in SchemaChange (#…
Browse files Browse the repository at this point in the history
…43907)

Signed-off-by: HangyuanLiu <[email protected]>
  • Loading branch information
HangyuanLiu authored Jul 12, 2024
1 parent 7900032 commit d8649e3
Show file tree
Hide file tree
Showing 9 changed files with 331 additions and 317 deletions.
123 changes: 63 additions & 60 deletions fe/fe-core/src/main/java/com/starrocks/alter/AlterJobMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -372,39 +372,41 @@ public void replayAlterMaterializedViewBaseTableInfos(AlterMaterializedViewBaseT
long dbId = log.getDbId();
long mvId = log.getMvId();
Database db = GlobalStateMgr.getCurrentState().getDb(dbId);
MaterializedView mv = (MaterializedView) db.getTable(mvId);
if (mv == null) {
return;
}

Locker locker = new Locker();
locker.lockDatabase(db, LockType.WRITE);
MaterializedView mv = null;
locker.lockTablesWithIntensiveDbLock(db, Lists.newArrayList(mv.getId()), LockType.WRITE);
try {
mv = (MaterializedView) db.getTable(mvId);
mv.replayAlterMaterializedViewBaseTableInfos(log);
} catch (Throwable e) {
if (mv != null) {
LOG.warn("replay alter materialized-view status failed: {}", mv.getName(), e);
mv.setInactiveAndReason("replay alter status failed: " + e.getMessage());
}
LOG.warn("replay alter materialized-view status failed: {}", mv.getName(), e);
mv.setInactiveAndReason("replay alter status failed: " + e.getMessage());
} finally {
locker.unLockDatabase(db, LockType.WRITE);
locker.unLockTablesWithIntensiveDbLock(db, Lists.newArrayList(mv.getId()), LockType.WRITE);
}
}

public void replayAlterMaterializedViewStatus(AlterMaterializedViewStatusLog log) {
long dbId = log.getDbId();
long tableId = log.getTableId();
Database db = GlobalStateMgr.getCurrentState().getDb(dbId);
MaterializedView mv = (MaterializedView) db.getTable(tableId);
if (mv == null) {
return;
}

Locker locker = new Locker();
locker.lockDatabase(db, LockType.WRITE);
MaterializedView mv = null;
locker.lockTablesWithIntensiveDbLock(db, Lists.newArrayList(mv.getId()), LockType.WRITE);
try {
mv = (MaterializedView) db.getTable(tableId);
alterMaterializedViewStatus(mv, log.getStatus(), true);
} catch (Throwable e) {
if (mv != null) {
LOG.warn("replay alter materialized-view status failed: {}", mv.getName(), e);
mv.setInactiveAndReason("replay alter status failed: " + e.getMessage());
}
LOG.warn("replay alter materialized-view status failed: {}", mv.getName(), e);
mv.setInactiveAndReason("replay alter status failed: " + e.getMessage());
} finally {
locker.unLockDatabase(db, LockType.WRITE);
locker.unLockTablesWithIntensiveDbLock(db, Lists.newArrayList(mv.getId()), LockType.WRITE);
}
}

Expand Down Expand Up @@ -450,17 +452,17 @@ public void replayChangeMaterializedViewRefreshScheme(ChangeMaterializedViewRefr
if (db == null) {
return;
}

MaterializedView oldMaterializedView = (MaterializedView) db.getTable(id);
if (oldMaterializedView == null) {
LOG.warn("Ignore change materialized view refresh scheme log because table:" + id + "is null");
return;
}

Locker locker = new Locker();
locker.lockDatabase(db, LockType.WRITE);
MaterializedView oldMaterializedView = null;
locker.lockTablesWithIntensiveDbLock(db, Lists.newArrayList(oldMaterializedView.getId()), LockType.WRITE);
try {
final MaterializedView.MvRefreshScheme newMvRefreshScheme = new MaterializedView.MvRefreshScheme();

oldMaterializedView = (MaterializedView) db.getTable(id);
if (oldMaterializedView == null) {
LOG.warn("Ignore change materialized view refresh scheme log because table:" + id + "is null");
return;
}
final MaterializedView.MvRefreshScheme oldRefreshScheme = oldMaterializedView.getRefreshScheme();
newMvRefreshScheme.setAsyncRefreshContext(oldRefreshScheme.getAsyncRefreshContext());
newMvRefreshScheme.setLastRefreshTime(oldRefreshScheme.getLastRefreshTime());
Expand All @@ -482,13 +484,11 @@ public void replayChangeMaterializedViewRefreshScheme(ChangeMaterializedViewRefr
asyncRefreshContext.getStep(),
asyncRefreshContext.getTimeUnit(), oldMaterializedView.getId(), maxChangedTableRefreshTime);
} catch (Throwable e) {
if (oldMaterializedView != null) {
oldMaterializedView.setInactiveAndReason("replay failed: " + e.getMessage());
LOG.warn("replay change materialized-view refresh scheme failed: {}",
oldMaterializedView.getName(), e);
}
oldMaterializedView.setInactiveAndReason("replay failed: " + e.getMessage());
LOG.warn("replay change materialized-view refresh scheme failed: {}",
oldMaterializedView.getName(), e);
} finally {
locker.unLockDatabase(db, LockType.WRITE);
locker.unLockTablesWithIntensiveDbLock(db, Lists.newArrayList(oldMaterializedView.getId()), LockType.WRITE);
}
}

Expand All @@ -498,11 +498,15 @@ public void replayAlterMaterializedViewProperties(short opCode, ModifyTablePrope
Map<String, String> properties = log.getProperties();

Database db = GlobalStateMgr.getCurrentState().getDb(dbId);
MaterializedView mv = (MaterializedView) db.getTable(tableId);
if (mv == null) {
LOG.warn("Ignore change materialized view properties og because table:" + tableId + "is null");
return;
}

Locker locker = new Locker();
locker.lockDatabase(db, LockType.WRITE);
MaterializedView mv = null;
locker.lockTablesWithIntensiveDbLock(db, Lists.newArrayList(mv.getId()), LockType.WRITE);
try {
mv = (MaterializedView) db.getTable(tableId);
TableProperty tableProperty = mv.getTableProperty();
if (tableProperty == null) {
tableProperty = new TableProperty(properties);
Expand All @@ -512,12 +516,10 @@ public void replayAlterMaterializedViewProperties(short opCode, ModifyTablePrope
tableProperty.buildProperty(opCode);
}
} catch (Throwable e) {
if (mv != null) {
mv.setInactiveAndReason("replay failed: " + e.getMessage());
LOG.warn("replay alter materialized-view properties failed: {}", mv.getName(), e);
}
mv.setInactiveAndReason("replay failed: " + e.getMessage());
LOG.warn("replay alter materialized-view properties failed: {}", mv.getName(), e);
} finally {
locker.unLockDatabase(db, LockType.WRITE);
locker.unLockTablesWithIntensiveDbLock(db, Lists.newArrayList(mv.getId()), LockType.WRITE);
}
}

Expand Down Expand Up @@ -546,24 +548,23 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
String tableName = dbTableName.getTbl();

boolean isSynchronous = true;
Locker locker = new Locker();
locker.lockDatabase(db, LockType.WRITE);
OlapTable olapTable;
try {
Table table = db.getTable(tableName);
if (table == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_BAD_TABLE_ERROR, tableName);
}

if (!(table.isOlapOrCloudNativeTable() || table.isMaterializedView())) {
throw new DdlException("Do not support alter non-native table/materialized-view[" + tableName + "]");
}
olapTable = (OlapTable) table;
Table table = db.getTable(tableName);
if (table == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_BAD_TABLE_ERROR, tableName);
}

if (!(table.isOlapOrCloudNativeTable() || table.isMaterializedView())) {
throw new DdlException("Do not support alter non-native table/materialized-view[" + tableName + "]");
}
OlapTable olapTable = (OlapTable) table;

Locker locker = new Locker();
locker.lockTablesWithIntensiveDbLock(db, Lists.newArrayList(table.getId()), LockType.WRITE);
try {
if (olapTable.getState() != OlapTableState.NORMAL) {
throw InvalidOlapTableStateException.of(olapTable.getState(), olapTable.getName());
}

if (currentAlterOps.hasSchemaChangeOp()) {
// if modify storage type to v2, do schema change to convert all related tablets to segment v2 format
schemaChangeHandler.process(alterClauses, db, olapTable);
Expand Down Expand Up @@ -645,7 +646,7 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
throw new DdlException("Invalid alter operations: " + currentAlterOps);
}
} finally {
locker.unLockDatabase(db, LockType.WRITE);
locker.unLockTablesWithIntensiveDbLock(db, Lists.newArrayList(table.getId()), LockType.WRITE);
}

// the following ops should be done outside db lock. because it contains synchronized create operation
Expand Down Expand Up @@ -681,11 +682,11 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
schemaChangeHandler.updatePartitionsInMemoryMeta(
db, tableName, partitionNames, properties);

locker.lockDatabase(db, LockType.WRITE);
locker.lockTablesWithIntensiveDbLock(db, Lists.newArrayList(olapTable.getId()), LockType.WRITE);
try {
modifyPartitionsProperty(db, olapTable, partitionNames, properties);
} finally {
locker.unLockDatabase(db, LockType.WRITE);
locker.unLockTablesWithIntensiveDbLock(db, Lists.newArrayList(olapTable.getId()), LockType.WRITE);
}
} else if (alterClause instanceof ModifyTablePropertiesClause) {
Map<String, String> properties = alterClause.getProperties();
Expand Down Expand Up @@ -807,10 +808,11 @@ public void alterView(AlterViewInfo alterViewInfo) {
String comment = alterViewInfo.getComment();

Database db = GlobalStateMgr.getCurrentState().getDb(dbId);
View view = (View) db.getTable(tableId);

Locker locker = new Locker();
locker.lockDatabase(db, LockType.WRITE);
locker.lockTablesWithIntensiveDbLock(db, Lists.newArrayList(view.getId()), LockType.WRITE);
try {
View view = (View) db.getTable(tableId);
String viewName = view.getName();
view.setInlineViewDefWithSqlMode(inlineViewDef, alterViewInfo.getSqlMode());
try {
Expand All @@ -827,7 +829,7 @@ public void alterView(AlterViewInfo alterViewInfo) {

LOG.info("replay modify view[{}] definition to {}", viewName, inlineViewDef);
} finally {
locker.unLockDatabase(db, LockType.WRITE);
locker.unLockTablesWithIntensiveDbLock(db, Lists.newArrayList(view.getId()), LockType.WRITE);
}
}

Expand Down Expand Up @@ -992,10 +994,11 @@ public void modifyPartitionsProperty(Database db,

public void replayModifyPartition(ModifyPartitionInfo info) {
Database db = GlobalStateMgr.getCurrentState().getDb(info.getDbId());
OlapTable olapTable = (OlapTable) db.getTable(info.getTableId());

Locker locker = new Locker();
locker.lockDatabase(db, LockType.WRITE);
locker.lockTablesWithIntensiveDbLock(db, Lists.newArrayList(olapTable.getId()), LockType.WRITE);
try {
OlapTable olapTable = (OlapTable) db.getTable(info.getTableId());
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
if (info.getDataProperty() != null) {
partitionInfo.setDataProperty(info.getPartitionId(), info.getDataProperty());
Expand All @@ -1010,7 +1013,7 @@ public void replayModifyPartition(ModifyPartitionInfo info) {
}
partitionInfo.setIsInMemory(info.getPartitionId(), info.isInMemory());
} finally {
locker.unLockDatabase(db, LockType.WRITE);
locker.unLockTablesWithIntensiveDbLock(db, Lists.newArrayList(olapTable.getId()), LockType.WRITE);
}
}

Expand Down
23 changes: 11 additions & 12 deletions fe/fe-core/src/main/java/com/starrocks/alter/AlterJobV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import com.starrocks.server.WarehouseManager;
import com.starrocks.sql.ast.UserIdentity;
import io.opentelemetry.api.trace.Span;
import org.apache.hadoop.util.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -260,26 +261,24 @@ public boolean cancel(String errMsg) {
* return false if table is not stable.
*/
protected boolean checkTableStable(Database db) throws AlterCancelException {
OlapTable tbl;
long unHealthyTabletId = TabletInvertedIndex.NOT_EXIST_VALUE;
OlapTable tbl = (OlapTable) db.getTable(tableId);
if (tbl == null) {
throw new AlterCancelException("Table " + tableId + " does not exist");
}

Locker locker = new Locker();
locker.lockDatabase(db, LockType.READ);
locker.lockTablesWithIntensiveDbLock(db, Lists.newArrayList(tbl.getId()), LockType.READ);
try {
tbl = (OlapTable) db.getTable(tableId);
if (tbl == null) {
throw new AlterCancelException("Table " + tableId + " does not exist");
}

if (tbl.isOlapTable()) {
unHealthyTabletId = tbl.checkAndGetUnhealthyTablet(GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo(),
GlobalStateMgr.getCurrentState().getTabletScheduler());
}
} finally {
locker.unLockDatabase(db, LockType.READ);
locker.unLockTablesWithIntensiveDbLock(db, Lists.newArrayList(tbl.getId()), LockType.READ);
}

locker.lockDatabase(db, LockType.WRITE);
locker.lockTablesWithIntensiveDbLock(db, Lists.newArrayList(tbl.getId()), LockType.WRITE);
try {
if (unHealthyTabletId != TabletInvertedIndex.NOT_EXIST_VALUE) {
errMsg = "table is unstable, unhealthy (or doing balance) tablet id: " + unHealthyTabletId;
Expand All @@ -293,7 +292,7 @@ protected boolean checkTableStable(Database db) throws AlterCancelException {
return true;
}
} finally {
locker.unLockDatabase(db, LockType.WRITE);
locker.unLockTablesWithIntensiveDbLock(db, Lists.newArrayList(tbl.getId()), LockType.WRITE);
}
}

Expand Down Expand Up @@ -323,8 +322,8 @@ public static AlterJobV2 read(DataInput in) throws IOException {
* Schema change will build a new MaterializedIndexMeta, we need rebuild it(add extra original meta)
* into it from original index meta. Otherwise, some necessary metas will be lost after fe restart.
*
* @param orgIndexMeta : index meta before schema change.
* @param indexMeta : new index meta after schema change.
* @param orgIndexMeta : index meta before schema change.
* @param indexMeta : new index meta after schema change.
*/
protected void rebuildMaterializedIndexMeta(MaterializedIndexMeta orgIndexMeta,
MaterializedIndexMeta indexMeta) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.starrocks.task.AgentTaskQueue;
import com.starrocks.task.CompactionTask;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.util.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -78,7 +79,7 @@ public synchronized ShowResultSet process(List<AlterClause> alterClauses, Databa
CompactionClause compactionClause = (CompactionClause) alterClause;
if (RunMode.isSharedDataMode()) {
Locker locker = new Locker();
locker.lockDatabase(db, LockType.READ);
locker.lockTablesWithIntensiveDbLock(db, Lists.newArrayList(olapTable.getId()), LockType.READ);
try {
List<Partition> allPartitions = findAllPartitions(olapTable, compactionClause);
for (Partition partition : allPartitions) {
Expand All @@ -90,14 +91,14 @@ public synchronized ShowResultSet process(List<AlterClause> alterClauses, Databa
}
}
} finally {
locker.unLockDatabase(db, LockType.READ);
locker.unLockTablesWithIntensiveDbLock(db, Lists.newArrayList(olapTable.getId()), LockType.READ);
}
} else {
ArrayListMultimap<Long, Long> backendToTablets = ArrayListMultimap.create();
AgentBatchTask batchTask = new AgentBatchTask();

Locker locker = new Locker();
locker.lockDatabase(db, LockType.READ);
locker.lockTablesWithIntensiveDbLock(db, Lists.newArrayList(olapTable.getId()), LockType.READ);
try {
List<Partition> allPartitions = findAllPartitions(olapTable, compactionClause);
for (Partition partition : allPartitions) {
Expand All @@ -115,7 +116,7 @@ public synchronized ShowResultSet process(List<AlterClause> alterClauses, Databa
} catch (Exception e) {
throw new UserException(e.getMessage());
} finally {
locker.unLockDatabase(db, LockType.READ);
locker.unLockTablesWithIntensiveDbLock(db, Lists.newArrayList(olapTable.getId()), LockType.READ);
}

for (Long backendId : backendToTablets.keySet()) {
Expand Down
Loading

0 comments on commit d8649e3

Please sign in to comment.