From d5c43c51b5a50efd8f5ea14cff540fdd1383da72 Mon Sep 17 00:00:00 2001 From: zclllhhjj Date: Tue, 29 Oct 2024 10:10:26 +0800 Subject: [PATCH] [branch-2.1](fe) Avoid interrupt daemon thread and use proper polling interval (#42210) (#42646) pick https://github.com/apache/doris/pull/42210 --- .../java/org/apache/doris/catalog/Env.java | 3 +- .../clone/DynamicPartitionScheduler.java | 45 +++++++++++++++++++ .../org/apache/doris/common/util/Daemon.java | 11 +++-- 3 files changed, 54 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index f60f4634c0ba4a..0d9ad091fcb540 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -5579,8 +5579,9 @@ public void setMutableConfigwithCallback(String key, String value) throws Config ConfigBase.setMutableConfig(key, value); if (configtoThreads.get(key) != null) { try { + // not atomic. maybe delay to aware. but acceptable. configtoThreads.get(key).get().setInterval(Config.getField(key).getLong(null) * 1000L); - configtoThreads.get(key).get().interrupt(); + // shouldn't interrupt to keep possible bdbje writing safe. LOG.info("set config " + key + " to " + value); } catch (IllegalAccessException e) { LOG.warn("set config " + key + " failed: " + e.getMessage()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java index 1b00f041964b22..51dc7dd802fbd0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java @@ -50,6 +50,7 @@ import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.RangeUtils; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.meta.MetaContext; import org.apache.doris.thrift.TStorageMedium; import com.google.common.base.Strings; @@ -88,6 +89,8 @@ public class DynamicPartitionScheduler extends MasterDaemon { private static final String DEFAULT_RUNTIME_VALUE = FeConstants.null_string; + private static final long SLEEP_PIECE = 5000L; + private Map> runtimeInfos = Maps.newConcurrentMap(); private Set> dynamicPartitionTableInfo = Sets.newConcurrentHashSet(); private boolean initialize; @@ -663,6 +666,48 @@ private void initDynamicPartitionTable() { initialize = true; } + // specialized schedule logic. split sleep to many small pieces. so if interval changed, it won't take too much + // time to aware. + @Override + public void run() { + if (metaContext != null) { + metaContext.setThreadLocalInfo(); + } + + while (!isStop.get()) { + try { + runOneCycle(); + } catch (Throwable e) { + LOG.error("daemon thread got exception. name: {}", getName(), e); + } + + try { + long oldInterval = intervalMs; + long remainingInterval = oldInterval; + while (remainingInterval > SLEEP_PIECE) { + // if it changed. let it know at most 10 seconds. and 5 second per wakeup is acceptable. + if (intervalMs != oldInterval) { // changed + break; + } + + Thread.sleep(SLEEP_PIECE); + remainingInterval -= SLEEP_PIECE; + } + if (remainingInterval <= SLEEP_PIECE) { + Thread.sleep(remainingInterval); + } + } catch (InterruptedException e) { + // This thread should NEVER be interrupted. or meet bdbje writing, it will be disaster. + LOG.fatal("InterruptedException: ", e); + } + } + + if (metaContext != null) { + MetaContext.remove(); + } + LOG.error("daemon thread exits. name=" + this.getName()); + } + @Override protected void runAfterCatalogReady() { if (!initialize) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Daemon.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/Daemon.java index 472285b476497f..4678f78d6687d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Daemon.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Daemon.java @@ -28,12 +28,15 @@ public class Daemon extends Thread { private static final Logger LOG = LogManager.getLogger(Daemon.class); private static final int DEFAULT_INTERVAL_SECONDS = 30; // 30 seconds - private long intervalMs; - private AtomicBoolean isStop; + protected long intervalMs; + + protected AtomicBoolean isStop; + + protected MetaContext metaContext = null; + private Runnable runnable; - private AtomicBoolean isStart = new AtomicBoolean(false); - private MetaContext metaContext = null; + private AtomicBoolean isStart = new AtomicBoolean(false); { setDaemon(true);