diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java index a619d658133..743985020f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java @@ -332,6 +332,23 @@ public static String getFullStackTrace(Throwable throwable) { return sw.getBuffer().toString(); } + /** + * Checks if passed in {@code 'Throwable'} has given class in {@code 'cause'} hierarchy + * including that throwable itself. + * Note that this method follows includes {@link Throwable#getSuppressed()} + * into check. + * + * @param throwable Throwable to check (if {@code null}, {@code false} is returned). + * @param clazz Cause classes to check (if {@code null} or empty, {@code false} is returned). + * @return {@code true} if one of the causing exception is an instance of passed in classes, + * {@code false} otherwise. + */ + public static boolean hasCauseOrSuppressed( + @Nullable Throwable throwable, + Class @Nullable... clazz) { + return hasCauseOrSuppressed(throwable, null, clazz); + } + /** * Checks if passed in {@code 'Throwable'} has given class in {@code 'cause'} hierarchy * including that throwable itself. diff --git a/modules/failure-handler/src/main/java/org/apache/ignite/internal/failure/FailureManager.java b/modules/failure-handler/src/main/java/org/apache/ignite/internal/failure/FailureManager.java index 2efa03449b0..6c17f5fc7b5 100644 --- a/modules/failure-handler/src/main/java/org/apache/ignite/internal/failure/FailureManager.java +++ b/modules/failure-handler/src/main/java/org/apache/ignite/internal/failure/FailureManager.java @@ -164,7 +164,7 @@ private synchronized boolean process(FailureContext failureCtx, FailureHandler h LOG.error(FAILURE_LOG_MSG, failureCtx.error(), handler, failureCtx.type()); } - if (reserveBuf != null && hasCauseOrSuppressed(failureCtx.error(), null, OutOfMemoryError.class)) { + if (reserveBuf != null && hasCauseOrSuppressed(failureCtx.error(), OutOfMemoryError.class)) { reserveBuf = null; } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java index 4b9940ef43f..9fae41262eb 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java @@ -1186,7 +1186,7 @@ static void onHeartbeatReturned(final ThreadId id, final Status status, final Ap } r.setState(State.Probe); notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status); - if (++r.consecutiveErrorTimes % 10 == 0) { + if (status.getRaftError() != RaftError.ESHUTDOWN && ++r.consecutiveErrorTimes % 10 == 0) { LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}", r.options.getPeerId(), r.consecutiveErrorTimes, status); } @@ -1426,7 +1426,7 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight LOG.debug(sb.toString()); } notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status); - if (++r.consecutiveErrorTimes % 10 == 0) { + if (status.getRaftError() != RaftError.ESHUTDOWN && ++r.consecutiveErrorTimes % 10 == 0) { LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}", r.options.getPeerId(), r.consecutiveErrorTimes, status); } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java index e2f2575ea6b..fdd78616ba7 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.raft.PeerUnavailableException; @@ -233,13 +234,12 @@ public void complete(final Object result, final Throwable err) { } } else { - if (ExceptionUtils.hasCauseOrSuppressed(err, null, PeerUnavailableException.class, ConnectException.class)) + if (ExceptionUtils.hasCauseOrSuppressed(err, PeerUnavailableException.class, ConnectException.class)) readyConsistentIds.remove(peerId.getConsistentId()); // Force logical reconnect. if (done != null) { try { - done.run(new Status(err instanceof InvokeTimeoutException ? RaftError.ETIMEDOUT - : RaftError.EINTERNAL, "RPC exception:" + err.getMessage())); + done.run(new Status(errorCodeByException(err), "RPC exception:" + err.getMessage())); } catch (final Throwable t) { LOG.error("Fail to run RpcResponseClosure, the request is {}.", t, request); @@ -274,6 +274,14 @@ public Executor executor() { return future; } + private static RaftError errorCodeByException(Throwable err) { + if (ExceptionUtils.hasCauseOrSuppressed(err, NodeStoppingException.class)) { + return RaftError.ESHUTDOWN; + } + + return err instanceof InvokeTimeoutException ? RaftError.ETIMEDOUT : RaftError.EINTERNAL; + } + private static Status handleErrorResponse(final ErrorResponse eResp) { final Status status = new Status(); status.setCode(eResp.errorCode()); diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java index 77f51f22e15..765a8fceb93 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java @@ -50,6 +50,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; @@ -1137,7 +1138,8 @@ private void sendSafeTimeSyncIfReplicaReady(CompletableFuture replicaFu .build(); replica.processRequest(req, localNodeId).whenComplete((res, ex) -> { - if (ex != null && !hasCauseOrSuppressed(ex, null, NodeStoppingException.class)) { + if (ex != null && !hasCauseOrSuppressed(ex, NodeStoppingException.class) + && !hasCauseOrSuppressed(ex, CancellationException.class)) { LOG.error("Could not advance safe time for {} to {}", ex, replica.groupId()); } });