diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 3ffcee0797..14b09a0235 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -132,6 +132,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -249,6 +250,8 @@ public long[] getFollowerNextIndices() { private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true); private final ThreadGroup threadGroup; + private final AtomicReference> appendLogFuture; + RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy, RaftStorage.StartupOption option) throws IOException { final RaftPeerId id = proxy.getId(); @@ -282,6 +285,7 @@ public long[] getFollowerNextIndices() { this.transferLeadership = new TransferLeadership(this, properties); this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this); this.snapshotInstallationHandler = new SnapshotInstallationHandler(this, properties); + this.appendLogFuture = new AtomicReference<>(CompletableFuture.completedFuture(null)); this.serverExecutor = ConcurrentUtils.newThreadPoolWithMax( RaftServerConfigKeys.ThreadPool.serverCached(properties), @@ -1585,9 +1589,9 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde state.updateConfiguration(entries); } future.join(); + final CompletableFuture appendLog = entries.isEmpty()? CompletableFuture.completedFuture(null) + : appendLog(entries); - final List> futures = entries.isEmpty() ? Collections.emptyList() - : state.getLog().append(entries); proto.getCommitInfosList().forEach(commitInfoCache::update); CodeInjectionForTesting.execute(LOG_SYNC, getId(), null); @@ -1601,7 +1605,7 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde final long commitIndex = effectiveCommitIndex(proto.getLeaderCommit(), previous, entries.size()); final long matchIndex = isHeartbeat? RaftLog.INVALID_LOG_INDEX: entries.get(entries.size() - 1).getIndex(); - return JavaUtils.allOf(futures).whenCompleteAsync((r, t) -> { + return appendLog.whenCompleteAsync((r, t) -> { followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)); timer.stop(); }, getServerExecutor()).thenApply(v -> { @@ -1618,6 +1622,10 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde return reply; }); } + private CompletableFuture appendLog(List entries) { + return appendLogFuture.updateAndGet(f -> f.thenCompose( + ignored -> JavaUtils.allOf(state.getLog().append(entries)))); + } private long checkInconsistentAppendEntries(TermIndex previous, List entries) { // Check if a snapshot installation through state machine is in progress.