From 8ab490a5864b9a9d2f6db477fcfbf319fe7920f6 Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Sat, 18 Mar 2023 18:52:34 +0800 Subject: [PATCH 1/4] Fix the issue that DLedger RPC Service thread stuck --- .../storage/dledger/DLedgerEntryPusher.java | 32 ++++++++++++++++--- .../dledger/protocol/DLedgerResponseCode.java | 1 + 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java index ca1f96e5..78595889 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java @@ -31,6 +31,7 @@ import io.openmessaging.storage.dledger.utils.PreConditions; import io.openmessaging.storage.dledger.utils.Quota; +import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -731,7 +732,7 @@ private void doCompare() throws Exception { } else { truncateIndex = compareIndex; } - } else if (response.getEndIndex() <= dLedgerStore.getLedgerBeforeBeginIndex() + } else if (response.getEndIndex() < dLedgerStore.getLedgerBeginIndex() || response.getBeginIndex() > dLedgerStore.getLedgerEndIndex()) { /* The follower's entries does not intersect with the leader. @@ -810,7 +811,7 @@ private class EntryHandler extends ShutdownAbleThread { private long lastCheckFastForwardTimeMs = System.currentTimeMillis(); ConcurrentMap>> writeRequestMap = new ConcurrentHashMap<>(); - BlockingQueue>> compareOrTruncateRequests = new ArrayBlockingQueue<>(100); + BlockingQueue>> compareOrTruncateRequests = new ArrayBlockingQueue>>(1024); public EntryHandler(Logger logger) { super("EntryHandler-" + memberState.getSelfId(), logger); @@ -834,13 +835,23 @@ public CompletableFuture handlePush(PushEntryRequest request) } break; case COMMIT: - compareOrTruncateRequests.put(new Pair<>(request, future)); + synchronized (this) { + if (!compareOrTruncateRequests.offer(new Pair<>(request, future))) { + logger.warn("compareOrTruncateRequests blockingQueue is full when put commit request"); + future.complete(buildResponse(request, DLedgerResponseCode.PUSH_REQUEST_IS_FULL.getCode())); + } + } break; case COMPARE: case TRUNCATE: PreConditions.check(request.getEntry() != null, DLedgerResponseCode.UNEXPECTED_ARGUMENT); writeRequestMap.clear(); - compareOrTruncateRequests.put(new Pair<>(request, future)); + synchronized (this) { + if (!compareOrTruncateRequests.offer(new Pair<>(request, future))) { + logger.warn("compareOrTruncateRequests blockingQueue is full when put compare or truncate request"); + future.complete(buildResponse(request, DLedgerResponseCode.PUSH_REQUEST_IS_FULL.getCode())); + } + } break; default: logger.error("[BUG]Unknown type {} from {}", request.getType(), request.baseInfo()); @@ -1007,10 +1018,23 @@ private void checkAbnormalFuture(long endIndex) { checkAppendFuture(endIndex); } + private void clearCompareOrTruncateRequestsIfNeed() { + synchronized (this) { + if (!memberState.isFollower() && !compareOrTruncateRequests.isEmpty()) { + List>> drainList = new ArrayList<>(); + compareOrTruncateRequests.drainTo(drainList); + for (Pair> pair : drainList) { + pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.NOT_FOLLOWER.getCode())); + } + } + } + } + @Override public void doWork() { try { if (!memberState.isFollower()) { + clearCompareOrTruncateRequestsIfNeed(); waitForRunning(1); return; } diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerResponseCode.java b/dledger/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerResponseCode.java index be4c2f19..ed998454 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerResponseCode.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerResponseCode.java @@ -48,6 +48,7 @@ public enum DLedgerResponseCode { TAKE_LEADERSHIP_FAILED(418, ""), INDEX_LESS_THAN_LOCAL_BEGIN(419, ""), REQUEST_WITH_EMPTY_BODYS(420, ""), + PUSH_REQUEST_IS_FULL(421,""), INTERNAL_ERROR(500, ""), TERM_CHANGED(501, ""), WAIT_QUORUM_ACK_TIMEOUT(502, ""), From 4bc2b838253262769ce1f155edff40d4acbe884d Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Sat, 18 Mar 2023 18:59:35 +0800 Subject: [PATCH 2/4] Fix the issue that DLedger RPC Service thread stuck --- .../io/openmessaging/storage/dledger/DLedgerEntryPusher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java index 78595889..0f541ef4 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java @@ -732,7 +732,7 @@ private void doCompare() throws Exception { } else { truncateIndex = compareIndex; } - } else if (response.getEndIndex() < dLedgerStore.getLedgerBeginIndex() + } else if (response.getEndIndex() <= dLedgerStore.getLedgerBeforeBeginIndex() || response.getBeginIndex() > dLedgerStore.getLedgerEndIndex()) { /* The follower's entries does not intersect with the leader. From 5b31f5720a13eb6e9826b49664528d6e32c3dbf2 Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Sat, 18 Mar 2023 19:00:36 +0800 Subject: [PATCH 3/4] Fix the issue that DLedger RPC Service thread stuck --- .../io/openmessaging/storage/dledger/DLedgerEntryPusher.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java index 0f541ef4..86c03c05 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java @@ -811,7 +811,8 @@ private class EntryHandler extends ShutdownAbleThread { private long lastCheckFastForwardTimeMs = System.currentTimeMillis(); ConcurrentMap>> writeRequestMap = new ConcurrentHashMap<>(); - BlockingQueue>> compareOrTruncateRequests = new ArrayBlockingQueue>>(1024); + BlockingQueue>> + compareOrTruncateRequests = new ArrayBlockingQueue<>(1024); public EntryHandler(Logger logger) { super("EntryHandler-" + memberState.getSelfId(), logger); From 25bf9a834d9ffd3d403a8f3e924e02084c5ea8e0 Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Sat, 18 Mar 2023 19:27:31 +0800 Subject: [PATCH 4/4] Fix the issue that DLedger RPC Service thread stuck --- .../statemachine/StateMachineCaller.java | 68 +++++++++---------- 1 file changed, 31 insertions(+), 37 deletions(-) diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachineCaller.java b/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachineCaller.java index 78bb59ae..8fc5373a 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachineCaller.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachineCaller.java @@ -18,6 +18,7 @@ import io.openmessaging.storage.dledger.DLedgerEntryPusher; import io.openmessaging.storage.dledger.DLedgerServer; +import io.openmessaging.storage.dledger.ShutdownAbleThread; import io.openmessaging.storage.dledger.entry.DLedgerEntry; import io.openmessaging.storage.dledger.exception.DLedgerException; import io.openmessaging.storage.dledger.snapshot.SnapshotManager; @@ -39,16 +40,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; -import org.apache.rocketmq.common.ServiceThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Finite state machine caller - * Through a task queue, all tasks that modify the state of the state machine - * are guaranteed to be executed sequentially. + * Finite state machine caller Through a task queue, all tasks that modify the state of the state machine are guaranteed + * to be executed sequentially. */ -public class StateMachineCaller extends ServiceThread { +public class StateMachineCaller extends ShutdownAbleThread { /** * Task type @@ -80,18 +79,19 @@ private static class ApplyTask { private final AtomicLong applyingIndex; private final BlockingQueue taskQueue; private final ScheduledExecutorService scheduledExecutorService = Executors - .newSingleThreadScheduledExecutor(new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "RetryOnCommittedScheduledThread"); - } - }); + .newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "RetryOnCommittedScheduledThread"); + } + }); private final Function completeEntryCallback; private volatile DLedgerException error; private SnapshotManager snapshotManager; public StateMachineCaller(final DLedgerStore dLedgerStore, final StateMachine statemachine, final DLedgerEntryPusher entryPusher) { + super(StateMachineCaller.class.getName(), logger); this.dLedgerStore = dLedgerStore; this.statemachine = statemachine; this.entryPusher = entryPusher; @@ -114,7 +114,8 @@ public StateMachine getStateMachine() { } public boolean onCommitted(final long committedIndex) { - if (committedIndex <= this.lastAppliedIndex.get()) return false; + if (committedIndex <= this.lastAppliedIndex.get()) + return false; final ApplyTask task = new ApplyTask(); task.type = TaskType.COMMITTED; task.committedIndex = committedIndex; @@ -142,28 +143,26 @@ public void shutdown() { } @Override - public void run() { - while (!this.isStopped()) { - try { - final ApplyTask task = this.taskQueue.poll(5, TimeUnit.SECONDS); - if (task != null) { - switch (task.type) { - case COMMITTED: - doCommitted(task.committedIndex); - break; - case SNAPSHOT_SAVE: - doSnapshotSave((SaveSnapshotHook) task.snapshotHook); - break; - case SNAPSHOT_LOAD: - doSnapshotLoad((LoadSnapshotHook) task.snapshotHook); - break; - } + public void doWork() { + try { + final ApplyTask task = this.taskQueue.poll(5, TimeUnit.SECONDS); + if (task != null) { + switch (task.type) { + case COMMITTED: + doCommitted(task.committedIndex); + break; + case SNAPSHOT_SAVE: + doSnapshotSave((SaveSnapshotHook) task.snapshotHook); + break; + case SNAPSHOT_LOAD: + doSnapshotLoad((LoadSnapshotHook) task.snapshotHook); + break; } - } catch (final InterruptedException e) { - logger.error("Error happen in {} when pull task from task queue", getServiceName(), e); - } catch (Throwable e) { - logger.error("Apply task exception", e); } + } catch (final InterruptedException e) { + logger.error("Error happen in stateMachineCaller when pull task from task queue", e); + } catch (Throwable e) { + logger.error("Apply task exception", e); } } @@ -292,11 +291,6 @@ public void setError(DLedgerServer server, final DLedgerException error) { } } - @Override - public String getServiceName() { - return StateMachineCaller.class.getName(); - } - public Long getLastAppliedIndex() { return this.lastAppliedIndex.get(); }