Skip to content

Commit

Permalink
[Enhancement] Add cluster idle HTTP api (#53850)
Browse files Browse the repository at this point in the history
(cherry picked from commit 6cd9fbc)

# Conflicts:
#	fe/fe-core/src/main/java/com/starrocks/alter/LakeTableSchemaChangeJob.java
#	fe/fe-core/src/main/java/com/starrocks/alter/OnlineOptimizeJobV2.java
#	fe/fe-core/src/main/java/com/starrocks/alter/RollupJobV2.java
#	fe/fe-core/src/main/java/com/starrocks/alter/SchemaChangeJobV2.java
#	fe/fe-core/src/main/java/com/starrocks/backup/BackupJob.java
#	fe/fe-core/src/main/java/com/starrocks/backup/RestoreJob.java
#	fe/fe-core/src/main/java/com/starrocks/common/Config.java
#	fe/fe-core/src/main/java/com/starrocks/connector/metadata/MetadataExecutor.java
#	fe/fe-core/src/main/java/com/starrocks/datacache/DataCacheSelectExecutor.java
#	fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadJob.java
#	fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadMgr.java
#	fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadTask.java
#	fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java
#	fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
#	fe/fe-core/src/main/java/com/starrocks/qe/feedback/PlanAdvisorExecutor.java
#	fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java
#	fe/fe-core/src/main/java/com/starrocks/server/WarehouseManager.java
#	fe/fe-core/src/main/java/com/starrocks/sql/util/CustomizedQueryExecutor.java
#	fe/fe-core/src/main/java/com/starrocks/statistic/HyperStatisticsCollectJob.java
#	fe/fe-core/src/test/java/com/starrocks/scheduler/MVRefreshTestBase.java
#	fe/fe-core/src/test/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MvRewriteTestBase.java
  • Loading branch information
gengjun-git authored and mergify[bot] committed Dec 17, 2024
1 parent 0fc0c5a commit 813acb8
Show file tree
Hide file tree
Showing 46 changed files with 2,230 additions and 16 deletions.
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/alter/AlterHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -204,4 +205,14 @@ public void replayAlterJobV2(AlterJobV2 alterJob) {
existingJob.replay(alterJob);
}
}

public Map<Long, Long> getRunningAlterJobCount() {
Map<Long, Long> result = new HashMap<>();
for (AlterJobV2 alterJobV2 : alterJobsV2.values()) {
if (!alterJobV2.isDone()) {
result.compute(alterJobV2.getWarehouseId(), (key, value) -> value == null ? 1L : value + 1);
}
}
return result;
}
}
7 changes: 7 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/alter/AlterJobMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -1094,4 +1094,11 @@ public void load(SRMetaBlockReader reader) throws IOException, SRMetaBlockExcept
}
}
}

public Map<Long, Long> getRunningAlterJobCount() {
Map<Long, Long> mv = materializedViewHandler.getRunningAlterJobCount();
Map<Long, Long> sc = schemaChangeHandler.getRunningAlterJobCount();
sc.forEach((key, value) -> mv.merge(key, value, Long::sum));
return mv;
}
}
24 changes: 21 additions & 3 deletions fe/fe-core/src/main/java/com/starrocks/alter/AlterJobV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.starrocks.qe.ConnectContext;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.ast.UserIdentity;
import com.starrocks.warehouse.WarehouseIdleChecker;
import io.opentelemetry.api.trace.Span;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -190,6 +191,10 @@ public void createConnectContextIfNeeded() {
}
}

public long getWarehouseId() {
return warehouseId;
}

/**
* The keyword 'synchronized' only protects 2 methods:
* run() and cancel()
Expand All @@ -202,7 +207,7 @@ public void createConnectContextIfNeeded() {
*/
public synchronized void run() {
if (isTimeout()) {
cancelImpl("Timeout");
cancelHook(cancelImpl("Timeout"));
return;
}

Expand All @@ -224,6 +229,7 @@ public synchronized void run() {
break;
case FINISHED_REWRITING:
runFinishedRewritingJob();
finishHook();
break;
default:
break;
Expand All @@ -233,13 +239,15 @@ public synchronized void run() {
} // else: handle the new state
}
} catch (AlterCancelException e) {
cancelImpl(e.getMessage());
cancelHook(cancelImpl(e.getMessage()));
}
}

public final boolean cancel(String errMsg) {
synchronized (this) {
return cancelImpl(errMsg);
boolean cancelled = cancelImpl(errMsg);
cancelHook(cancelled);
return cancelled;
}
}

Expand Down Expand Up @@ -297,6 +305,16 @@ protected boolean checkTableStable(Database db) throws AlterCancelException {

public abstract void replay(AlterJobV2 replayedJob);

public void finishHook() {
WarehouseIdleChecker.updateJobLastFinishTime(warehouseId);
}

public void cancelHook(boolean cancelled) {
if (cancelled) {
WarehouseIdleChecker.updateJobLastFinishTime(warehouseId);
}
}

public static AlterJobV2 read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, AlterJobV2.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,29 @@ List<MaterializedIndex> visualiseShadowIndex(@NotNull OlapTable table) {
}

@Override
<<<<<<< HEAD
=======
public final boolean cancel(String errMsg) {
isCancelling.set(true);
try {
// If waitingCreatingReplica == false, we will assume that
// cancel thread will get the object lock very quickly.
if (waitingCreatingReplica.get()) {
Preconditions.checkState(createReplicaLatch != null);
createReplicaLatch.countDownToZero(new Status(TStatusCode.OK, ""));
}
synchronized (this) {
boolean cancelled = cancelImpl(errMsg);
cancelHook(cancelled);
return cancelled;
}
} finally {
isCancelling.set(false);
}
}

@Override
>>>>>>> 6cd9fbc95f ([Enhancement] Add cluster idle HTTP api (#53850))
protected boolean cancelImpl(String errMsg) {
if (jobState == JobState.CANCELLED || jobState == JobState.FINISHED) {
return false;
Expand Down
Loading

0 comments on commit 813acb8

Please sign in to comment.