diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index c4f0a7f05dcce0..d221d8a9ad6f67 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -303,6 +303,10 @@ public class Config extends ConfigBase { "Queue size to store heartbeat task in heartbeat_mgr"}) public static int heartbeat_mgr_blocking_queue_size = 1024; + @ConfField(masterOnly = true, description = {"TabletStatMgr线程数", + "Num of thread to update tablet stat"}) + public static int tablet_stat_mgr_threads_num = -1; + @ConfField(masterOnly = true, description = {"Agent任务线程池的线程数", "Num of thread to handle agent task in agent task thread-pool"}) public static int max_agent_task_threads_num = 4096; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java index f79ed89215b4d3..14dc88eb50990e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java @@ -21,7 +21,11 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ClientPool; import org.apache.doris.common.Config; +import org.apache.doris.common.MarkedCountDownLatch; +import org.apache.doris.common.Status; +import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.metric.MetricRepo; import org.apache.doris.system.Backend; import org.apache.doris.thrift.BackendService; import org.apache.doris.thrift.TNetworkAddress; @@ -34,7 +38,9 @@ import java.util.List; import java.util.Map; -import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /* * TabletStatMgr is for collecting tablet(replica) statistics from backends. @@ -43,7 +49,13 @@ public class TabletStatMgr extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(TabletStatMgr.class); - private ForkJoinPool taskPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors()); + private final ExecutorService executor = ThreadPoolManager.newDaemonFixedThreadPool( + Config.tablet_stat_mgr_threads_num > 0 + ? Config.tablet_stat_mgr_threads_num + : Runtime.getRuntime().availableProcessors(), + 1024, "tablet-stat-mgr", true); + + private MarkedCountDownLatch updateTabletStatsLatch = null; public TabletStatMgr() { super("tablet stat mgr", Config.tablet_stat_update_interval_second * 1000); @@ -59,9 +71,13 @@ protected void runAfterCatalogReady() { return; } long start = System.currentTimeMillis(); - taskPool.submit(() -> { - // no need to get tablet stat if backend is not alive - backends.values().stream().filter(Backend::isAlive).parallel().forEach(backend -> { + // no need to get tablet stat if backend is not alive + List aliveBackends = backends.values().stream().filter(Backend::isAlive) + .collect(Collectors.toList()); + updateTabletStatsLatch = new MarkedCountDownLatch<>(aliveBackends.size()); + aliveBackends.forEach(backend -> { + updateTabletStatsLatch.addMark(backend.getId(), backend); + executor.submit(() -> { BackendService.Client client = null; TNetworkAddress address = null; boolean ok = false; @@ -74,8 +90,10 @@ protected void runAfterCatalogReady() { result.getTabletsStatsSize()); } updateTabletStat(backend.getId(), result); + updateTabletStatsLatch.markedCountDown(backend.getId(), backend); ok = true; } catch (Throwable e) { + updateTabletStatsLatch.markedCountDownWithStatus(backend.getId(), backend, Status.CANCELLED); LOG.warn("task exec error. backend[{}]", backend.getId(), e); } @@ -89,7 +107,9 @@ protected void runAfterCatalogReady() { LOG.warn("client pool recyle error. backend[{}]", backend.getId(), e); } }); - }).join(); + }); + waitForTabletStatUpdate(); + if (LOG.isDebugEnabled()) { LOG.debug("finished to get tablet stat of all backends. cost: {} ms", (System.currentTimeMillis() - start)); @@ -222,6 +242,32 @@ protected void runAfterCatalogReady() { (System.currentTimeMillis() - start)); } + public void waitForTabletStatUpdate() { + boolean ok = true; + try { + if (!updateTabletStatsLatch.await(600, TimeUnit.SECONDS)) { + LOG.info("timeout waiting {} update tablet stats tasks finish after {} seconds.", + updateTabletStatsLatch.getCount(), 600); + ok = false; + } + } catch (InterruptedException e) { + LOG.warn("InterruptedException, {}", this, e); + } + if (!ok || !updateTabletStatsLatch.getStatus().ok()) { + List unfinishedBackendIds = updateTabletStatsLatch.getLeftMarks().stream() + .map(Map.Entry::getKey).collect(Collectors.toList()); + Status status = Status.TIMEOUT; + if (!updateTabletStatsLatch.getStatus().ok()) { + status = updateTabletStatsLatch.getStatus(); + } + LOG.warn("Failed to update tablet stats reason: {}, unfinished backends: {}", + status.getErrorMsg(), unfinishedBackendIds); + if (MetricRepo.isInit) { + MetricRepo.COUNTER_UPDATE_TABLET_STAT_FAILED.increase(1L); + } + } + } + private void updateTabletStat(Long beId, TTabletStatResult result) { TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); if (result.isSetTabletStatList()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java index 8876690711aae9..14dd8834454066 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java @@ -100,6 +100,8 @@ public final class MetricRepo { public static LongCounterMetric COUNTER_CACHE_HIT_SQL; public static LongCounterMetric COUNTER_CACHE_HIT_PARTITION; + public static LongCounterMetric COUNTER_UPDATE_TABLET_STAT_FAILED; + public static LongCounterMetric COUNTER_EDIT_LOG_WRITE; public static LongCounterMetric COUNTER_EDIT_LOG_READ; public static LongCounterMetric COUNTER_EDIT_LOG_CURRENT; @@ -489,6 +491,10 @@ public Long getValue() { "counter of failed transactions"); COUNTER_TXN_FAILED.addLabel(new MetricLabel("type", "failed")); DORIS_METRIC_REGISTER.addMetrics(COUNTER_TXN_FAILED); + COUNTER_UPDATE_TABLET_STAT_FAILED = new LongCounterMetric("update_tablet_stat_failed", MetricUnit.REQUESTS, + "counter of failed to update tablet stat"); + COUNTER_UPDATE_TABLET_STAT_FAILED.addLabel(new MetricLabel("type", "failed")); + DORIS_METRIC_REGISTER.addMetrics(COUNTER_UPDATE_TABLET_STAT_FAILED); HISTO_TXN_EXEC_LATENCY = METRIC_REGISTER.histogram( MetricRegistry.name("txn", "exec", "latency", "ms")); HISTO_TXN_PUBLISH_LATENCY = METRIC_REGISTER.histogram(