Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat](lock)add deadlock detection tool and monitored lock implementations #39015

Merged
merged 7 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2987,4 +2987,17 @@ public static int metaServiceRpcRetryTimes() {
//==========================================================================
// end of cloud config
//==========================================================================
//==========================================================================
// start of lock config
@ConfField(description = {"是否开启死锁检测",
"Whether to enable deadlock detection"})
public static boolean enable_deadlock_detection = false;

@ConfField(description = {"死锁检测间隔时间,单位分钟",
"Deadlock detection interval time, unit minute"})
public static long deadlock_detection_interval_minute = 5;

@ConfField(description = {"表示最大锁持有时间,超过该时间会打印告警日志,单位秒",
"Maximum lock hold time; logs a warning if exceeded"})
public static long max_lock_hold_threshold_seconds = 30;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static long max_lock_hold_threshold_seconds = 30;
public static long max_lock_hold_threshold_seconds = 10;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And make it mutable

}
11 changes: 10 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.common.LogUtils;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.Version;
import org.apache.doris.common.lock.DeadlockMonitor;
import org.apache.doris.common.util.JdkUtils;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.httpv2.HttpServer;
Expand Down Expand Up @@ -60,6 +61,7 @@
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.TimeUnit;

public class DorisFE {
private static final Logger LOG = LogManager.getLogger(DorisFE.class);
Expand Down Expand Up @@ -95,6 +97,13 @@ public static void main(String[] args) {
start(DORIS_HOME_DIR, PID_DIR, args, options);
}

private static void startMonitor() {
if (Config.enable_deadlock_detection) {
DeadlockMonitor deadlockMonitor = new DeadlockMonitor();
deadlockMonitor.startMonitoring(Config.deadlock_detection_interval_minute, TimeUnit.MINUTES);
}
}

// entrance for doris frontend
public static void start(String dorisHomeDir, String pidDir, String[] args, StartupOptions options) {
if (System.getenv("DORIS_LOG_TO_STDERR") != null) {
Expand Down Expand Up @@ -214,7 +223,7 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star
}

ThreadPoolManager.registerAllThreadPoolMetric();

startMonitor();
while (true) {
Thread.sleep(2000);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.persist.ColocatePersistInfo;
Expand Down Expand Up @@ -57,7 +58,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -183,7 +183,7 @@ public static boolean isGlobalGroupName(String groupName) {
// save some error msg of the group for show. no need to persist
private Map<GroupId, String> group2ErrMsgs = Maps.newHashMap();

private transient ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private transient MonitoredReentrantReadWriteLock lock = new MonitoredReentrantReadWriteLock();

public ColocateTableIndex() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.proto.InternalService.PAlterVaultSyncRequest;
import org.apache.doris.rpc.BackendServiceProxy;
Expand All @@ -43,8 +44,6 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class StorageVaultMgr {
private static final Logger LOG = LogManager.getLogger(StorageVaultMgr.class);
Expand All @@ -54,7 +53,7 @@ public class StorageVaultMgr {

private Map<String, String> vaultNameToVaultId = new HashMap<>();

private ReadWriteLock rwLock = new ReentrantReadWriteLock();
private MonitoredReentrantReadWriteLock rwLock = new MonitoredReentrantReadWriteLock();

private static final ExecutorService ALTER_BE_SYNC_THREAD_POOL = Executors.newFixedThreadPool(1);

Expand Down
8 changes: 4 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.lock.MonitoredReentrantLock;
import org.apache.doris.common.util.QueryableReentrantReadWriteLock;
import org.apache.doris.common.util.SqlUtils;
import org.apache.doris.common.util.Util;
Expand Down Expand Up @@ -59,7 +60,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -88,7 +88,7 @@ public abstract class Table extends MetaObject implements Writable, TableIf, Gso
// Used for queuing commit transactifon tasks to avoid fdb transaction conflicts,
// especially to reduce conflicts when obtaining delete bitmap update locks for
// MoW table
protected ReentrantLock commitLock;
protected MonitoredReentrantLock commitLock;

/*
* fullSchema and nameToColumn should contains all columns, both visible and shadow.
Expand Down Expand Up @@ -138,7 +138,7 @@ public Table(TableType type) {
if (Config.check_table_lock_leaky) {
this.readLockThreads = Maps.newConcurrentMap();
}
this.commitLock = new ReentrantLock(true);
this.commitLock = new MonitoredReentrantLock(true);
}

public Table(long id, String tableName, TableType type, List<Column> fullSchema) {
Expand All @@ -163,7 +163,7 @@ public Table(long id, String tableName, TableType type, List<Column> fullSchema)
if (Config.check_table_lock_leaky) {
this.readLockThreads = Maps.newConcurrentMap();
}
this.commitLock = new ReentrantLock(true);
this.commitLock = new MonitoredReentrantLock(true);
}

public void markDropped() {
Expand Down
4 changes: 2 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
Expand All @@ -51,7 +52,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

Expand Down Expand Up @@ -98,7 +98,7 @@ public enum TabletStatus {
private long cooldownReplicaId = -1;
@SerializedName(value = "ctm", alternate = {"cooldownTerm"})
private long cooldownTerm = -1;
private ReentrantReadWriteLock cooldownConfLock = new ReentrantReadWriteLock();
private MonitoredReentrantReadWriteLock cooldownConfLock = new MonitoredReentrantReadWriteLock();

// last time that the tablet checker checks this tablet.
// no need to persist
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.lock;

import org.apache.doris.common.Config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Abstract base class for a monitored lock that tracks lock acquisition,
* release, and attempt times. It provides mechanisms for monitoring the
* duration for which a lock is held and logging any instances where locks
* are held longer than a specified timeout or fail to be acquired within
* a specified timeout.
*/
public abstract class AbstractMonitoredLock {
// Lock hold timeout in milliseconds
protected static final long HOLD_TIMEOUT = Config.max_lock_hold_threshold_seconds * 1;
private static final Logger LOG = LoggerFactory.getLogger(AbstractMonitoredLock.class);

// Thread-local variable to store the lock start time
private final ThreadLocal<Long> lockStartTime = new ThreadLocal<>();


/**
* Method to be called after successfully acquiring the lock.
* Sets the start time for the lock.
*/
protected void afterLock() {
lockStartTime.set(System.nanoTime());
}

/**
* Method to be called after releasing the lock.
* Calculates the lock hold time and logs a warning if it exceeds the hold timeout.
*/
protected void afterUnlock() {
Long startTime = lockStartTime.get();
if (startTime != null) {
long lockHoldTimeNanos = System.nanoTime() - startTime;
long lockHoldTimeMs = lockHoldTimeNanos >> 20;
if (lockHoldTimeMs > HOLD_TIMEOUT) {
Thread currentThread = Thread.currentThread();
String stackTrace = getThreadStackTrace(currentThread.getStackTrace());
LOG.warn("Thread ID: {}, Thread Name: {} - Lock held for {} ms, exceeding hold timeout of {} ms "
+ "Thread stack trace:{}",
currentThread.getId(), currentThread.getName(), lockHoldTimeMs, HOLD_TIMEOUT, stackTrace);
}
lockStartTime.remove();
}
}

/**
* Method to be called after attempting to acquire the lock using tryLock.
* Logs a warning if the lock was not acquired within a reasonable time.
*
* @param acquired Whether the lock was successfully acquired
* @param startTime The start time of the lock attempt
*/
protected void afterTryLock(boolean acquired, long startTime) {
if (acquired) {
afterLock();
return;
}
if (LOG.isDebugEnabled()) {
long elapsedTime = (System.nanoTime() - startTime) >> 20;
Thread currentThread = Thread.currentThread();
String stackTrace = getThreadStackTrace(currentThread.getStackTrace());
LOG.debug("Thread ID: {}, Thread Name: {} - Failed to acquire the lock within {} ms"
+ "\nThread blocking info:\n{}",
currentThread.getId(), currentThread.getName(), elapsedTime, stackTrace);
}
}

/**
* Utility method to format the stack trace of a thread.
*
* @param stackTrace The stack trace elements of the thread
* @return A formatted string of the stack trace
*/
private String getThreadStackTrace(StackTraceElement[] stackTrace) {
StringBuilder sb = new StringBuilder();
for (StackTraceElement element : stackTrace) {
sb.append("\tat ").append(element).append("\n");
}
return sb.toString().replace("\n", "\\n");
}
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.lock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.Arrays;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* A utility class for monitoring and reporting deadlocks in a Java application.
* <p>
* This class uses the Java Management API to periodically check for deadlocked threads
* and logs detailed information about any detected deadlocks. It can be configured to
* run at a fixed interval.
* </p>
*/
public class DeadlockMonitor {
private static final Logger LOG = LoggerFactory.getLogger(DeadlockMonitor.class);
private final ThreadMXBean threadMXBean;
private final ScheduledExecutorService scheduler;

public DeadlockMonitor() {
this.threadMXBean = ManagementFactory.getThreadMXBean();
this.scheduler = Executors.newScheduledThreadPool(1);
}

/**
* Starts monitoring for deadlocks at a fixed rate.
*
* @param period the period between successive executions
* @param unit the time unit of the period parameter
*/
public void startMonitoring(long period, TimeUnit unit) {
scheduler.scheduleAtFixedRate(this::detectAndReportDeadlocks, 5, period, unit);
}

/**
* Detects and reports deadlocks if any are found.
*/
public void detectAndReportDeadlocks() {
// Get IDs of threads that are deadlocked
long[] deadlockedThreadIds = threadMXBean.findDeadlockedThreads();

// Check if there are no deadlocked threads
if (deadlockedThreadIds == null || deadlockedThreadIds.length == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("No deadlocks detected.");
}
return;
}

// Get information about deadlocked threads
ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(deadlockedThreadIds, true, true);
String deadlockReportString = Arrays.toString(threadInfos).replace("\n", "\\n");
// Log the deadlock report
LOG.warn("Deadlocks detected {}", deadlockReportString);
}

}
Loading
Loading