Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #282] Fix the issue that DLedger RPC Service thread stuck #284

Merged
merged 4 commits into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.openmessaging.storage.dledger.utils.PreConditions;
import io.openmessaging.storage.dledger.utils.Quota;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -810,7 +811,8 @@ private class EntryHandler extends ShutdownAbleThread {
private long lastCheckFastForwardTimeMs = System.currentTimeMillis();

ConcurrentMap<Long, Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>>> writeRequestMap = new ConcurrentHashMap<>();
BlockingQueue<Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>>> compareOrTruncateRequests = new ArrayBlockingQueue<>(100);
BlockingQueue<Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>>>
compareOrTruncateRequests = new ArrayBlockingQueue<>(1024);

public EntryHandler(Logger logger) {
super("EntryHandler-" + memberState.getSelfId(), logger);
Expand All @@ -834,13 +836,23 @@ public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request)
}
break;
case COMMIT:
compareOrTruncateRequests.put(new Pair<>(request, future));
synchronized (this) {
if (!compareOrTruncateRequests.offer(new Pair<>(request, future))) {
logger.warn("compareOrTruncateRequests blockingQueue is full when put commit request");
future.complete(buildResponse(request, DLedgerResponseCode.PUSH_REQUEST_IS_FULL.getCode()));
}
}
break;
case COMPARE:
case TRUNCATE:
PreConditions.check(request.getEntry() != null, DLedgerResponseCode.UNEXPECTED_ARGUMENT);
writeRequestMap.clear();
compareOrTruncateRequests.put(new Pair<>(request, future));
synchronized (this) {
if (!compareOrTruncateRequests.offer(new Pair<>(request, future))) {
logger.warn("compareOrTruncateRequests blockingQueue is full when put compare or truncate request");
future.complete(buildResponse(request, DLedgerResponseCode.PUSH_REQUEST_IS_FULL.getCode()));
}
}
break;
default:
logger.error("[BUG]Unknown type {} from {}", request.getType(), request.baseInfo());
Expand Down Expand Up @@ -1007,10 +1019,23 @@ private void checkAbnormalFuture(long endIndex) {
checkAppendFuture(endIndex);
}

private void clearCompareOrTruncateRequestsIfNeed() {
synchronized (this) {
if (!memberState.isFollower() && !compareOrTruncateRequests.isEmpty()) {
List<Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>>> drainList = new ArrayList<>();
compareOrTruncateRequests.drainTo(drainList);
for (Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair : drainList) {
pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.NOT_FOLLOWER.getCode()));
}
}
}
}

@Override
public void doWork() {
try {
if (!memberState.isFollower()) {
clearCompareOrTruncateRequestsIfNeed();
waitForRunning(1);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public enum DLedgerResponseCode {
TAKE_LEADERSHIP_FAILED(418, ""),
INDEX_LESS_THAN_LOCAL_BEGIN(419, ""),
REQUEST_WITH_EMPTY_BODYS(420, ""),
PUSH_REQUEST_IS_FULL(421,""),
INTERNAL_ERROR(500, ""),
TERM_CHANGED(501, ""),
WAIT_QUORUM_ACK_TIMEOUT(502, ""),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.openmessaging.storage.dledger.DLedgerEntryPusher;
import io.openmessaging.storage.dledger.DLedgerServer;
import io.openmessaging.storage.dledger.ShutdownAbleThread;
import io.openmessaging.storage.dledger.entry.DLedgerEntry;
import io.openmessaging.storage.dledger.exception.DLedgerException;
import io.openmessaging.storage.dledger.snapshot.SnapshotManager;
Expand All @@ -39,16 +40,14 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.rocketmq.common.ServiceThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Finite state machine caller
* Through a task queue, all tasks that modify the state of the state machine
* are guaranteed to be executed sequentially.
* Finite state machine caller Through a task queue, all tasks that modify the state of the state machine are guaranteed
* to be executed sequentially.
*/
public class StateMachineCaller extends ServiceThread {
public class StateMachineCaller extends ShutdownAbleThread {

/**
* Task type
Expand Down Expand Up @@ -80,18 +79,19 @@ private static class ApplyTask {
private final AtomicLong applyingIndex;
private final BlockingQueue<ApplyTask> taskQueue;
private final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "RetryOnCommittedScheduledThread");
}
});
.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "RetryOnCommittedScheduledThread");
}
});
private final Function<Long, Boolean> completeEntryCallback;
private volatile DLedgerException error;
private SnapshotManager snapshotManager;

public StateMachineCaller(final DLedgerStore dLedgerStore, final StateMachine statemachine,
final DLedgerEntryPusher entryPusher) {
super(StateMachineCaller.class.getName(), logger);
this.dLedgerStore = dLedgerStore;
this.statemachine = statemachine;
this.entryPusher = entryPusher;
Expand All @@ -114,7 +114,8 @@ public StateMachine getStateMachine() {
}

public boolean onCommitted(final long committedIndex) {
if (committedIndex <= this.lastAppliedIndex.get()) return false;
if (committedIndex <= this.lastAppliedIndex.get())
return false;
final ApplyTask task = new ApplyTask();
task.type = TaskType.COMMITTED;
task.committedIndex = committedIndex;
Expand Down Expand Up @@ -142,28 +143,26 @@ public void shutdown() {
}

@Override
public void run() {
while (!this.isStopped()) {
try {
final ApplyTask task = this.taskQueue.poll(5, TimeUnit.SECONDS);
if (task != null) {
switch (task.type) {
case COMMITTED:
doCommitted(task.committedIndex);
break;
case SNAPSHOT_SAVE:
doSnapshotSave((SaveSnapshotHook) task.snapshotHook);
break;
case SNAPSHOT_LOAD:
doSnapshotLoad((LoadSnapshotHook) task.snapshotHook);
break;
}
public void doWork() {
try {
final ApplyTask task = this.taskQueue.poll(5, TimeUnit.SECONDS);
if (task != null) {
switch (task.type) {
case COMMITTED:
doCommitted(task.committedIndex);
break;
case SNAPSHOT_SAVE:
doSnapshotSave((SaveSnapshotHook) task.snapshotHook);
break;
case SNAPSHOT_LOAD:
doSnapshotLoad((LoadSnapshotHook) task.snapshotHook);
break;
}
} catch (final InterruptedException e) {
logger.error("Error happen in {} when pull task from task queue", getServiceName(), e);
} catch (Throwable e) {
logger.error("Apply task exception", e);
}
} catch (final InterruptedException e) {
logger.error("Error happen in stateMachineCaller when pull task from task queue", e);
} catch (Throwable e) {
logger.error("Apply task exception", e);
}
}

Expand Down Expand Up @@ -292,11 +291,6 @@ public void setError(DLedgerServer server, final DLedgerException error) {
}
}

@Override
public String getServiceName() {
return StateMachineCaller.class.getName();
}

public Long getLastAppliedIndex() {
return this.lastAppliedIndex.get();
}
Expand Down