Skip to content

Commit

Permalink
[branch-2.1](fe) Avoid interrupt daemon thread and use proper polling…
Browse files Browse the repository at this point in the history
… interval (apache#42210) (apache#42646)

pick apache#42210
  • Loading branch information
zclllyybb authored Oct 29, 2024
1 parent 78a8d12 commit d5c43c5
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 5 deletions.
3 changes: 2 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -5579,8 +5579,9 @@ public void setMutableConfigwithCallback(String key, String value) throws Config
ConfigBase.setMutableConfig(key, value);
if (configtoThreads.get(key) != null) {
try {
// not atomic. maybe delay to aware. but acceptable.
configtoThreads.get(key).get().setInterval(Config.getField(key).getLong(null) * 1000L);
configtoThreads.get(key).get().interrupt();
// shouldn't interrupt to keep possible bdbje writing safe.
LOG.info("set config " + key + " to " + value);
} catch (IllegalAccessException e) {
LOG.warn("set config " + key + " failed: " + e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.RangeUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.thrift.TStorageMedium;

import com.google.common.base.Strings;
Expand Down Expand Up @@ -88,6 +89,8 @@ public class DynamicPartitionScheduler extends MasterDaemon {

private static final String DEFAULT_RUNTIME_VALUE = FeConstants.null_string;

private static final long SLEEP_PIECE = 5000L;

private Map<Long, Map<String, String>> runtimeInfos = Maps.newConcurrentMap();
private Set<Pair<Long, Long>> dynamicPartitionTableInfo = Sets.newConcurrentHashSet();
private boolean initialize;
Expand Down Expand Up @@ -663,6 +666,48 @@ private void initDynamicPartitionTable() {
initialize = true;
}

// specialized schedule logic. split sleep to many small pieces. so if interval changed, it won't take too much
// time to aware.
@Override
public void run() {
if (metaContext != null) {
metaContext.setThreadLocalInfo();
}

while (!isStop.get()) {
try {
runOneCycle();
} catch (Throwable e) {
LOG.error("daemon thread got exception. name: {}", getName(), e);
}

try {
long oldInterval = intervalMs;
long remainingInterval = oldInterval;
while (remainingInterval > SLEEP_PIECE) {
// if it changed. let it know at most 10 seconds. and 5 second per wakeup is acceptable.
if (intervalMs != oldInterval) { // changed
break;
}

Thread.sleep(SLEEP_PIECE);
remainingInterval -= SLEEP_PIECE;
}
if (remainingInterval <= SLEEP_PIECE) {
Thread.sleep(remainingInterval);
}
} catch (InterruptedException e) {
// This thread should NEVER be interrupted. or meet bdbje writing, it will be disaster.
LOG.fatal("InterruptedException: ", e);
}
}

if (metaContext != null) {
MetaContext.remove();
}
LOG.error("daemon thread exits. name=" + this.getName());
}

@Override
protected void runAfterCatalogReady() {
if (!initialize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ public class Daemon extends Thread {
private static final Logger LOG = LogManager.getLogger(Daemon.class);
private static final int DEFAULT_INTERVAL_SECONDS = 30; // 30 seconds

private long intervalMs;
private AtomicBoolean isStop;
protected long intervalMs;

protected AtomicBoolean isStop;

protected MetaContext metaContext = null;

private Runnable runnable;
private AtomicBoolean isStart = new AtomicBoolean(false);

private MetaContext metaContext = null;
private AtomicBoolean isStart = new AtomicBoolean(false);

{
setDaemon(true);
Expand Down

0 comments on commit d5c43c5

Please sign in to comment.