Skip to content

Commit

Permalink
RATIS-2235. Allow only one thread to perform appendLog (#1206)
Browse files Browse the repository at this point in the history
  • Loading branch information
SzyWilliam committed Jan 9, 2025
1 parent 3255448 commit 7a38c99
Showing 1 changed file with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -249,6 +250,8 @@ public long[] getFollowerNextIndices() {
private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true);
private final ThreadGroup threadGroup;

private final AtomicReference<CompletableFuture<Void>> appendLogFuture;

RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy, RaftStorage.StartupOption option)
throws IOException {
final RaftPeerId id = proxy.getId();
Expand Down Expand Up @@ -282,6 +285,7 @@ public long[] getFollowerNextIndices() {
this.transferLeadership = new TransferLeadership(this, properties);
this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this);
this.snapshotInstallationHandler = new SnapshotInstallationHandler(this, properties);
this.appendLogFuture = new AtomicReference<>(CompletableFuture.completedFuture(null));

this.serverExecutor = ConcurrentUtils.newThreadPoolWithMax(
RaftServerConfigKeys.ThreadPool.serverCached(properties),
Expand Down Expand Up @@ -1585,9 +1589,9 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde
state.updateConfiguration(entries);
}
future.join();
final CompletableFuture<Void> appendLog = entries.isEmpty()? CompletableFuture.completedFuture(null)
: appendLog(entries);

final List<CompletableFuture<Long>> futures = entries.isEmpty() ? Collections.emptyList()
: state.getLog().append(entries);
proto.getCommitInfosList().forEach(commitInfoCache::update);

CodeInjectionForTesting.execute(LOG_SYNC, getId(), null);
Expand All @@ -1601,7 +1605,7 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde

final long commitIndex = effectiveCommitIndex(proto.getLeaderCommit(), previous, entries.size());
final long matchIndex = isHeartbeat? RaftLog.INVALID_LOG_INDEX: entries.get(entries.size() - 1).getIndex();
return JavaUtils.allOf(futures).whenCompleteAsync((r, t) -> {
return appendLog.whenCompleteAsync((r, t) -> {
followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
timer.stop();
}, getServerExecutor()).thenApply(v -> {
Expand All @@ -1618,6 +1622,10 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde
return reply;
});
}
private CompletableFuture<Void> appendLog(List<LogEntryProto> entries) {
return appendLogFuture.updateAndGet(f -> f.thenCompose(
ignored -> JavaUtils.allOf(state.getLog().append(entries))));
}

private long checkInconsistentAppendEntries(TermIndex previous, List<LogEntryProto> entries) {
// Check if a snapshot installation through state machine is in progress.
Expand Down

0 comments on commit 7a38c99

Please sign in to comment.