Skip to content

Commit

Permalink
branch-3.0: [Bug](dead lock) Fix dead lock in Tablet Stat Mgr #46959 (#…
Browse files Browse the repository at this point in the history
…47418)

Cherry-picked from #46959

Co-authored-by: xy720 <[email protected]>
  • Loading branch information
github-actions[bot] and xy720 authored Jan 26, 2025
1 parent a3580a6 commit cc38fad
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,10 @@ public class Config extends ConfigBase {
"Queue size to store heartbeat task in heartbeat_mgr"})
public static int heartbeat_mgr_blocking_queue_size = 1024;

@ConfField(masterOnly = true, description = {"TabletStatMgr线程数",
"Num of thread to update tablet stat"})
public static int tablet_stat_mgr_threads_num = -1;

@ConfField(masterOnly = true, description = {"Agent任务线程池的线程数",
"Num of thread to handle agent task in agent task thread-pool"})
public static int max_agent_task_threads_num = 4096;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.Status;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TNetworkAddress;
Expand All @@ -34,7 +38,9 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/*
* TabletStatMgr is for collecting tablet(replica) statistics from backends.
Expand All @@ -43,7 +49,13 @@
public class TabletStatMgr extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(TabletStatMgr.class);

private ForkJoinPool taskPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
private final ExecutorService executor = ThreadPoolManager.newDaemonFixedThreadPool(
Config.tablet_stat_mgr_threads_num > 0
? Config.tablet_stat_mgr_threads_num
: Runtime.getRuntime().availableProcessors(),
1024, "tablet-stat-mgr", true);

private MarkedCountDownLatch<Long, Backend> updateTabletStatsLatch = null;

public TabletStatMgr() {
super("tablet stat mgr", Config.tablet_stat_update_interval_second * 1000);
Expand All @@ -59,9 +71,13 @@ protected void runAfterCatalogReady() {
return;
}
long start = System.currentTimeMillis();
taskPool.submit(() -> {
// no need to get tablet stat if backend is not alive
backends.values().stream().filter(Backend::isAlive).parallel().forEach(backend -> {
// no need to get tablet stat if backend is not alive
List<Backend> aliveBackends = backends.values().stream().filter(Backend::isAlive)
.collect(Collectors.toList());
updateTabletStatsLatch = new MarkedCountDownLatch<>(aliveBackends.size());
aliveBackends.forEach(backend -> {
updateTabletStatsLatch.addMark(backend.getId(), backend);
executor.submit(() -> {
BackendService.Client client = null;
TNetworkAddress address = null;
boolean ok = false;
Expand All @@ -74,8 +90,10 @@ protected void runAfterCatalogReady() {
result.getTabletsStatsSize());
}
updateTabletStat(backend.getId(), result);
updateTabletStatsLatch.markedCountDown(backend.getId(), backend);
ok = true;
} catch (Throwable e) {
updateTabletStatsLatch.markedCountDownWithStatus(backend.getId(), backend, Status.CANCELLED);
LOG.warn("task exec error. backend[{}]", backend.getId(), e);
}

Expand All @@ -89,7 +107,9 @@ protected void runAfterCatalogReady() {
LOG.warn("client pool recyle error. backend[{}]", backend.getId(), e);
}
});
}).join();
});
waitForTabletStatUpdate();

if (LOG.isDebugEnabled()) {
LOG.debug("finished to get tablet stat of all backends. cost: {} ms",
(System.currentTimeMillis() - start));
Expand Down Expand Up @@ -222,6 +242,32 @@ protected void runAfterCatalogReady() {
(System.currentTimeMillis() - start));
}

public void waitForTabletStatUpdate() {
boolean ok = true;
try {
if (!updateTabletStatsLatch.await(600, TimeUnit.SECONDS)) {
LOG.info("timeout waiting {} update tablet stats tasks finish after {} seconds.",
updateTabletStatsLatch.getCount(), 600);
ok = false;
}
} catch (InterruptedException e) {
LOG.warn("InterruptedException, {}", this, e);
}
if (!ok || !updateTabletStatsLatch.getStatus().ok()) {
List<Long> unfinishedBackendIds = updateTabletStatsLatch.getLeftMarks().stream()
.map(Map.Entry::getKey).collect(Collectors.toList());
Status status = Status.TIMEOUT;
if (!updateTabletStatsLatch.getStatus().ok()) {
status = updateTabletStatsLatch.getStatus();
}
LOG.warn("Failed to update tablet stats reason: {}, unfinished backends: {}",
status.getErrorMsg(), unfinishedBackendIds);
if (MetricRepo.isInit) {
MetricRepo.COUNTER_UPDATE_TABLET_STAT_FAILED.increase(1L);
}
}
}

private void updateTabletStat(Long beId, TTabletStatResult result) {
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
if (result.isSetTabletStatList()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ public final class MetricRepo {
public static LongCounterMetric COUNTER_CACHE_HIT_SQL;
public static LongCounterMetric COUNTER_CACHE_HIT_PARTITION;

public static LongCounterMetric COUNTER_UPDATE_TABLET_STAT_FAILED;

public static LongCounterMetric COUNTER_EDIT_LOG_WRITE;
public static LongCounterMetric COUNTER_EDIT_LOG_READ;
public static LongCounterMetric COUNTER_EDIT_LOG_CURRENT;
Expand Down Expand Up @@ -489,6 +491,10 @@ public Long getValue() {
"counter of failed transactions");
COUNTER_TXN_FAILED.addLabel(new MetricLabel("type", "failed"));
DORIS_METRIC_REGISTER.addMetrics(COUNTER_TXN_FAILED);
COUNTER_UPDATE_TABLET_STAT_FAILED = new LongCounterMetric("update_tablet_stat_failed", MetricUnit.REQUESTS,
"counter of failed to update tablet stat");
COUNTER_UPDATE_TABLET_STAT_FAILED.addLabel(new MetricLabel("type", "failed"));
DORIS_METRIC_REGISTER.addMetrics(COUNTER_UPDATE_TABLET_STAT_FAILED);
HISTO_TXN_EXEC_LATENCY = METRIC_REGISTER.histogram(
MetricRegistry.name("txn", "exec", "latency", "ms"));
HISTO_TXN_PUBLISH_LATENCY = METRIC_REGISTER.histogram(
Expand Down

0 comments on commit cc38fad

Please sign in to comment.