diff --git a/microprofile/fault-tolerance/pom.xml b/microprofile/fault-tolerance/pom.xml index 8b0d588d070..a0268132a8f 100644 --- a/microprofile/fault-tolerance/pom.xml +++ b/microprofile/fault-tolerance/pom.xml @@ -47,8 +47,8 @@ microprofile-fault-tolerance-api - io.helidon.reactive.fault-tolerance - helidon-reactive-fault-tolerance + io.helidon.nima.fault-tolerance + helidon-nima-fault-tolerance org.eclipse.microprofile.metrics diff --git a/microprofile/fault-tolerance/src/main/java/io/helidon/microprofile/faulttolerance/CancellableFtSupplier.java b/microprofile/fault-tolerance/src/main/java/io/helidon/microprofile/faulttolerance/CancellableFtSupplier.java new file mode 100644 index 00000000000..9b72aea1b25 --- /dev/null +++ b/microprofile/fault-tolerance/src/main/java/io/helidon/microprofile/faulttolerance/CancellableFtSupplier.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.microprofile.faulttolerance; + +import java.util.concurrent.CancellationException; + +/** + * A {@code FtSupplier} that can be cancelled. + */ +class CancellableFtSupplier implements FtSupplier { + + private boolean cancelled = false; + private boolean getCalled = false; + private final FtSupplier supplier; + + private CancellableFtSupplier(FtSupplier supplier) { + this.supplier = supplier; + } + + void cancel() { + this.cancelled = true; + } + + boolean isCancelled() { + return cancelled; + } + + boolean getCalled() { + return getCalled; + } + + @Override + public T get() throws Throwable { + getCalled = true; + if (cancelled) { + throw new CancellationException("Supplier has been cancelled"); + } + return supplier.get(); + } + + static CancellableFtSupplier create(FtSupplier supplier) { + return new CancellableFtSupplier<>(supplier); + } +} diff --git a/microprofile/fault-tolerance/src/main/java/io/helidon/microprofile/faulttolerance/FaultToleranceExtension.java b/microprofile/fault-tolerance/src/main/java/io/helidon/microprofile/faulttolerance/FaultToleranceExtension.java index 3784b65cf4e..efcf0c3dc94 100644 --- a/microprofile/fault-tolerance/src/main/java/io/helidon/microprofile/faulttolerance/FaultToleranceExtension.java +++ b/microprofile/fault-tolerance/src/main/java/io/helidon/microprofile/faulttolerance/FaultToleranceExtension.java @@ -28,7 +28,7 @@ import io.helidon.common.configurable.ScheduledThreadPoolSupplier; import io.helidon.common.configurable.ThreadPoolSupplier; import io.helidon.config.mp.MpConfig; -import io.helidon.reactive.faulttolerance.FaultTolerance; +import io.helidon.nima.faulttolerance.FaultTolerance; import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; diff --git a/microprofile/fault-tolerance/src/main/java/io/helidon/microprofile/faulttolerance/MethodInvoker.java b/microprofile/fault-tolerance/src/main/java/io/helidon/microprofile/faulttolerance/MethodInvoker.java index d538eadb94b..b016c24b790 100644 --- a/microprofile/fault-tolerance/src/main/java/io/helidon/microprofile/faulttolerance/MethodInvoker.java +++ b/microprofile/fault-tolerance/src/main/java/io/helidon/microprofile/faulttolerance/MethodInvoker.java @@ -25,24 +25,22 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import io.helidon.common.context.Context; import io.helidon.common.context.Contexts; -import io.helidon.common.reactive.Single; -import io.helidon.reactive.faulttolerance.Async; -import io.helidon.reactive.faulttolerance.Bulkhead; -import io.helidon.reactive.faulttolerance.CircuitBreaker; -import io.helidon.reactive.faulttolerance.CircuitBreaker.State; -import io.helidon.reactive.faulttolerance.Fallback; -import io.helidon.reactive.faulttolerance.FaultTolerance; -import io.helidon.reactive.faulttolerance.FtHandlerTyped; -import io.helidon.reactive.faulttolerance.Retry; -import io.helidon.reactive.faulttolerance.RetryTimeoutException; -import io.helidon.reactive.faulttolerance.Timeout; +import io.helidon.nima.faulttolerance.Async; +import io.helidon.nima.faulttolerance.Bulkhead; +import io.helidon.nima.faulttolerance.CircuitBreaker; +import io.helidon.nima.faulttolerance.CircuitBreaker.State; +import io.helidon.nima.faulttolerance.Fallback; +import io.helidon.nima.faulttolerance.FaultTolerance; +import io.helidon.nima.faulttolerance.FtHandlerTyped; +import io.helidon.nima.faulttolerance.Retry; +import io.helidon.nima.faulttolerance.RetryTimeoutException; +import io.helidon.nima.faulttolerance.Timeout; import jakarta.interceptor.InvocationContext; import org.eclipse.microprofile.faulttolerance.exceptions.BulkheadException; @@ -73,6 +71,9 @@ import static io.helidon.microprofile.faulttolerance.FaultToleranceMetrics.TimeoutTimedOut; import static io.helidon.microprofile.faulttolerance.ThrowableMapper.map; import static io.helidon.microprofile.faulttolerance.ThrowableMapper.mapTypes; +import static io.helidon.nima.faulttolerance.SupplierHelper.toRuntimeException; +import static io.helidon.nima.faulttolerance.SupplierHelper.unwrapThrowable; + /** * Invokes a FT method applying semantics based on method annotations. An instance * of this class is created for each method invocation. Some state is shared across @@ -117,22 +118,11 @@ class MethodInvoker implements FtSupplier { */ private final Context helidonContext; - /** - * Record thread interruption request for later use. - */ - private final AtomicBoolean mayInterruptIfRunning = new AtomicBoolean(false); - - /** - * Async thread in used by this invocation. May be {@code null}. We use this - * reference for thread interruptions. - */ - private Thread asyncInterruptThread; - /** * A boolean value indicates whether the fallback logic was called or not * on this invocation. */ - private AtomicBoolean fallbackCalled = new AtomicBoolean(false); + private final AtomicBoolean fallbackCalled = new AtomicBoolean(false); /** * Helper to properly propagate active request scope to other threads. @@ -160,6 +150,17 @@ private static class MethodState { */ private final FtHandlerTyped handler; + /** + * Wraps method invocation in a supplier that can be cancelled. This is required + * when a task is cancelled without its thread being interrupted. + */ + private CancellableFtSupplier cancellableSupplier; + + /** + * The {@code Supplier} passed to the FT handlers for execution. + */ + private Supplier> handlerSupplier; + /** * A key used to lookup {@code MethodState} instances, which include FT handlers. * A class loader is necessary to support multiple applications as seen in the TCKs. @@ -204,67 +205,6 @@ public int hashCode() { */ private final MethodState methodState; - /** - * Future returned by this method invoker. Some special logic to handle async - * cancellations and methods returning {@code Future}. - * - * @param result type of future - */ - @SuppressWarnings("unchecked") - class InvokerCompletableFuture extends CompletableFuture { - - /** - * If method returns {@code Future}, we let that value pass through - * without further processing. See Section 5.2.1 of spec. - * - * @return value from this future - * @throws ExecutionException if this future completed exceptionally - * @throws InterruptedException if the current thread was interrupted - */ - @Override - public T get() throws InterruptedException, ExecutionException { - T value = super.get(); - if (method.getReturnType() == Future.class) { - return ((Future) value).get(); - } - return value; - } - - /** - * If method returns {@code Future}, we let that value pass through - * without further processing. See Section 5.2.1 of spec. - * - * @param timeout the timeout - * @param unit the timeout unit - * @return value from this future - * @throws CancellationException if this future was cancelled - * @throws ExecutionException if this future completed exceptionally - * @throws InterruptedException if the current thread was interrupted - */ - @Override - public T get(long timeout, TimeUnit unit) throws InterruptedException, - ExecutionException, java.util.concurrent.TimeoutException { - T value = super.get(timeout, unit); - if (method.getReturnType() == Future.class) { - return ((Future) value).get(timeout, unit); - } - return value; - } - - /** - * Overridden to record {@code mayInterruptIfRunning} flag. This flag - * is not currently not propagated over a chain of {@code Single>}'s. - * - * @param mayInterruptIfRunning Interrupt flag. - * @@return {@code true} if this task is now cancelled. - */ - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - MethodInvoker.this.mayInterruptIfRunning.set(mayInterruptIfRunning); - return super.cancel(mayInterruptIfRunning); - } - } - /** * Constructor. * @@ -335,7 +275,6 @@ private void registerMetrics() { introspector.getMethodNameTag()); } } - } @Override @@ -359,94 +298,152 @@ static void clearMethodStatesMap() { } /** - * Invokes a method with one or more FT annotations. + * Invokes a method with one or more FT annotations. This method shall execute synchronously + * or asynchronously w.r.t. its caller based on the nature of the intercepted method. * * @return value returned by method. */ @Override public Object get() throws Throwable { - // Wrap method call with Helidon context - Supplier> supplier = () -> { - try { - return Contexts.runInContextWithThrow(helidonContext, - () -> handler.invoke(toCompletionStageSupplier(context::proceed))); - } catch (Exception e) { - return Single.error(e); - } - }; + // Supplier that shall be passed to FT handlers + handlerSupplier = ftSupplierToSupplier(introspector.isAsynchronous() + ? asyncToSyncFtSupplier(context::proceed) : context::proceed); + + // Wrap supplier with Helidon context info + FtSupplier contextSupplier = () -> + Contexts.runInContextWithThrow(helidonContext, + () -> ftSupplierToSupplier(() -> handler.invoke(handlerSupplier)).get()); - // Update metrics before calling method updateMetricsBefore(); if (introspector.isAsynchronous()) { - // Obtain single from supplier - Single> single = supplier.get(); + return callSupplierNewThread(contextSupplier); + } else { + Object result = null; + Throwable throwable = null; + try { + result = callSupplier(contextSupplier); + } catch (Throwable t) { + throwable = t; + } + updateMetricsAfter(throwable); + if (throwable != null) { + if (throwable instanceof RetryTimeoutException rte) { + throw rte.lastRetryException(); + } + throw throwable; + } + return result; + } + } - // Convert single to CompletableFuture - CompletableFuture> asyncFuture = single.toStage(true).toCompletableFuture(); + private Object callSupplier(FtSupplier supplier) throws Throwable { + Object result = null; + Throwable cause = null; + try { + invocationStartNanos = System.nanoTime(); + result = supplier.get(); + } catch (Throwable t) { + cause = map(unwrapThrowable(t)); + } + if (cause != null) { + throw cause; + } + return result; + } - // Create CompletableFuture that is returned to caller - CompletableFuture resultFuture = new InvokerCompletableFuture<>(); + private CompletableFuture callSupplierNewThread(FtSupplier supplier) { + FtSupplier wrappedSupplier = requestScopeHelper.wrapInScope(supplier); - // Update resultFuture based on outcome of asyncFuture - asyncFuture.whenComplete((result, throwable) -> { - // Release request context - requestScopeHelper.clearScope(); + // Call supplier in new thread + ClassLoader ccl = Thread.currentThread().getContextClassLoader(); + CompletableFuture asyncFuture = Async.create().invoke(() -> { + Thread.currentThread().setContextClassLoader(ccl); + try { + return callSupplier(wrappedSupplier); + } catch (Throwable t) { + throw toRuntimeException(t); + } + }); - if (throwable != null) { - if (throwable instanceof CancellationException) { - single.cancel(); - return; - } - Throwable cause; - if (throwable instanceof ExecutionException) { - cause = map(throwable.getCause()); - } else { - cause = map(throwable); - } - updateMetricsAfter(cause); - resultFuture.completeExceptionally(cause instanceof RetryTimeoutException - ? ((RetryTimeoutException) cause).lastRetryException() : cause); - } else { - updateMetricsAfter(null); - resultFuture.complete(result); - } - }); + // Set resultFuture based on supplier's outcome + AtomicBoolean mayInterrupt = new AtomicBoolean(false); + CompletableFuture resultFuture = new CompletableFuture<>() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + mayInterrupt.set(mayInterruptIfRunning); + return super.cancel(mayInterruptIfRunning); + } + }; + asyncFuture.whenComplete((result, throwable) -> { + requestScopeHelper.clearScope(); + Throwable cause = unwrapThrowable(throwable); + updateMetricsAfter(cause); + if (throwable != null) { + resultFuture.completeExceptionally(cause); + } else { + resultFuture.complete(result); + } + }); - // Propagate cancellation of resultFuture to asyncFuture - resultFuture.whenComplete((result, throwable) -> { - if (throwable instanceof CancellationException) { - asyncFuture.cancel(true); + // If resultFuture is cancelled, then cancel supplier call + resultFuture.exceptionally(t -> { + if (t instanceof CancellationException + || t instanceof org.eclipse.microprofile.faulttolerance.exceptions.TimeoutException) { + Objects.requireNonNull(cancellableSupplier); + cancellableSupplier.cancel(); + // Cancel supplier in bulkhead in case it is queued + if (introspector.hasBulkhead()) { + methodState.bulkhead.cancelSupplier(handlerSupplier); } - }); - return resultFuture; - } else { - Object result = null; - Throwable cause = null; + asyncFuture.cancel(mayInterrupt.get()); + } + return null; + }); + + return resultFuture; + } + + /** + * Maps an {@link FtSupplier} to a {@link Supplier}. + * + * @param supplier The supplier. + * @return The new supplier. + */ + Supplier> ftSupplierToSupplier(FtSupplier supplier) { + return () -> { try { - // Obtain single from supplier and map to CompletableFuture to handle void methods - Single> single = supplier.get(); - CompletableFuture> future = single.toStage(true).toCompletableFuture(); - - // Synchronously way for result - result = future.get(); - } catch (ExecutionException e) { - cause = map(e.getCause()); + invocationStartNanos = System.nanoTime(); // record start + return supplier.get(); } catch (Throwable t) { - cause = map(t); - } finally { - // Release request context - requestScopeHelper.clearScope(); - } - updateMetricsAfter(cause); - if (cause instanceof RetryTimeoutException) { - throw ((RetryTimeoutException) cause).lastRetryException(); + throw toRuntimeException(t); } - if (cause != null) { - throw cause; + }; + } + + /** + * Converts an async supplier into a sync one by waiting on the async supplier + * to produce an actual result. Will block thread indefinitely until such value + * becomes available. Wraps supplier with cancellable supplier for async + * cancellations. + * + * @param supplier async supplier + * @return value produced by supplier + * @param type of value produced + */ + @SuppressWarnings("unchecked") + public FtSupplier asyncToSyncFtSupplier(FtSupplier supplier) { + cancellableSupplier = CancellableFtSupplier.create(supplier); + return () -> { + Object result = cancellableSupplier.get(); + if (result instanceof CompletionStage> cs) { + return (T) cs.toCompletableFuture().get(); + } else if (result instanceof Future> f) { + return (T) f.get(); + } else { + throw new InternalError("Supplier must return Future or CompletionStage"); } - return result; - } + }; } /** @@ -461,7 +458,6 @@ private void initMethodHandler(MethodState methodState) { methodState.bulkhead = Bulkhead.builder() .limit(introspector.getBulkhead().value()) .queueLength(introspector.isAsynchronous() ? introspector.getBulkhead().waitingTaskQueue() : 0) - .cancelSource(false) // for the FT TCK's .build(); } @@ -469,7 +465,6 @@ private void initMethodHandler(MethodState methodState) { methodState.timeout = Timeout.builder() .timeout(Duration.of(introspector.getTimeout().value(), introspector.getTimeout().unit())) .currentThread(!introspector.isAsynchronous()) - .cancelSource(false) // for the FT TCK's .build(); } @@ -488,9 +483,9 @@ private void initMethodHandler(MethodState methodState) { /** * Creates a FT handler for this invocation. Handlers are composed as follows: - * - * fallback(retry(circuitbreaker(timeout(bulkhead(method))))) - * + * + * fallback(retry(circuitbreaker(timeout(bulkhead(method))))) + * * Uses the cached handlers defined in the method state for this invocation's * method, except for fallback. * @@ -539,9 +534,33 @@ private FtHandlerTyped createMethodHandler(MethodState methodState) { if (introspector.hasFallback()) { Fallback fallback = Fallback.builder() .fallback(throwable -> { - fallbackCalled.set(true); FallbackHelper cfb = new FallbackHelper(context, introspector, throwable); - return toCompletionStageSupplier(cfb::execute).get(); + + // Fallback executed in another thread + if (introspector.isAsynchronous()) { + // In a reactive env, we shouldn't block on a Future, so the FT spec + // states not to fallback in this case -- even though we can with VTs + // Note if method throws exception directly, fallback is required. + if (method.getReturnType().equals(Future.class) + && throwable instanceof ExecutionException) { // exception from Future + throw toRuntimeException(throwable); + } + + CompletableFuture> f = callSupplierNewThread(asyncToSyncFtSupplier(cfb::execute)); + try { + fallbackCalled.set(true); + return f.get(); + } catch (Throwable t) { + throw toRuntimeException(t); + } + } else { + try { + fallbackCalled.set(true); + return callSupplier(cfb::execute); + } catch (Throwable t) { + throw toRuntimeException(t); + } + } }) .applyOn(mapTypes(introspector.getFallback().applyOn())) .skipOn(mapTypes(introspector.getFallback().skipOn())) @@ -552,84 +571,6 @@ private FtHandlerTyped createMethodHandler(MethodState methodState) { return builder.build(); } - /** - * Maps an {@link FtSupplier} to a supplier of {@link CompletionStage}. - * - * @param supplier The supplier. - * @return The new supplier. - */ - @SuppressWarnings("unchecked") - Supplier extends CompletionStage> toCompletionStageSupplier(FtSupplier supplier) { - return () -> { - invocationStartNanos = System.nanoTime(); - - // Wrap supplier with request context setup - FtSupplier wrappedSupplier = requestScopeHelper.wrapInScope(supplier); - - CompletableFuture resultFuture = new CompletableFuture<>(); - if (introspector.isAsynchronous()) { - // Invoke supplier in new thread and propagate ccl for config - ClassLoader ccl = Thread.currentThread().getContextClassLoader(); - Single single = Async.create().invoke(() -> { - try { - Thread.currentThread().setContextClassLoader(ccl); - asyncInterruptThread = Thread.currentThread(); - return wrappedSupplier.get(); - } catch (Throwable t) { - return new InvokerAsyncException(t); // wraps Throwable - } - }); - - // Handle async cancellations - resultFuture.whenComplete((result, throwable) -> { - if (throwable instanceof CancellationException) { - single.cancel(); // will not interrupt by default - - // If interrupt was requested, do it manually here - if (mayInterruptIfRunning.get() && asyncInterruptThread != null) { - asyncInterruptThread.interrupt(); - asyncInterruptThread = null; - } - } - }); - - // The result must be Future>, {Completable}Future> or InvokerAsyncException - single.thenAccept(result -> { - try { - // Handle exceptions thrown by an async method - if (result instanceof InvokerAsyncException) { - resultFuture.completeExceptionally(((Exception) result).getCause()); - } else if (method.getReturnType() == Future.class) { - // If method returns Future, pass it without further processing - resultFuture.complete(result); - } else if (result instanceof CompletionStage>) { // also CompletableFuture> - CompletionStage cs = (CompletionStage) result; - cs.whenComplete((o, t) -> { - if (t != null) { - resultFuture.completeExceptionally(t); - } else { - resultFuture.complete(o); - } - }); - } else { - throw new InternalError("Return type validation failed for method " + method); - } - } catch (Throwable t) { - resultFuture.completeExceptionally(t); - } - }); - } else { - try { - resultFuture.complete(wrappedSupplier.get()); - return resultFuture; - } catch (Throwable t) { - resultFuture.completeExceptionally(t); - } - } - return resultFuture; - }; - } - /** * Collects information necessary to update metrics after method is called. */ diff --git a/microprofile/fault-tolerance/src/main/java/io/helidon/microprofile/faulttolerance/ThrowableMapper.java b/microprofile/fault-tolerance/src/main/java/io/helidon/microprofile/faulttolerance/ThrowableMapper.java index 31cc46aa54e..86e1ba5807f 100644 --- a/microprofile/fault-tolerance/src/main/java/io/helidon/microprofile/faulttolerance/ThrowableMapper.java +++ b/microprofile/fault-tolerance/src/main/java/io/helidon/microprofile/faulttolerance/ThrowableMapper.java @@ -19,7 +19,7 @@ import java.util.Arrays; import java.util.concurrent.ExecutionException; -import io.helidon.reactive.faulttolerance.RetryTimeoutException; +import io.helidon.nima.faulttolerance.RetryTimeoutException; import org.eclipse.microprofile.faulttolerance.exceptions.BulkheadException; import org.eclipse.microprofile.faulttolerance.exceptions.CircuitBreakerOpenException; @@ -44,19 +44,17 @@ static Throwable map(Throwable t) { if (t instanceof ExecutionException) { t = t.getCause(); } - if (t instanceof io.helidon.reactive.faulttolerance.CircuitBreakerOpenException) { + if (t instanceof io.helidon.nima.faulttolerance.CircuitBreakerOpenException) { return new CircuitBreakerOpenException(t.getMessage(), t.getCause()); } - if (t instanceof io.helidon.reactive.faulttolerance.BulkheadException) { + if (t instanceof io.helidon.nima.faulttolerance.BulkheadException) { return new BulkheadException(t.getMessage(), t.getCause()); } if (t instanceof RetryTimeoutException) { return t; // the cause is handled elsewhere } - if (t instanceof java.util.concurrent.TimeoutException) { - return new TimeoutException(t.getMessage(), t.getCause()); - } - if (t instanceof java.lang.InterruptedException) { + if (t instanceof io.helidon.nima.faulttolerance.TimeoutException + || t instanceof java.lang.InterruptedException) { return new TimeoutException(t.getMessage(), t.getCause()); } return t; @@ -77,11 +75,11 @@ static Class extends Throwable>[] mapTypes(Class extends Throwable>[] types) for (int i = 0; i < types.length; i++) { Class extends Throwable> t = types[i]; if (t == BulkheadException.class) { - result[i] = io.helidon.reactive.faulttolerance.BulkheadException.class; + result[i] = io.helidon.nima.faulttolerance.BulkheadException.class; } else if (t == CircuitBreakerOpenException.class) { - result[i] = io.helidon.reactive.faulttolerance.CircuitBreakerOpenException.class; + result[i] = io.helidon.nima.faulttolerance.CircuitBreakerOpenException.class; } else if (t == TimeoutException.class) { - result[i] = java.util.concurrent.TimeoutException.class; + result[i] = io.helidon.nima.faulttolerance.TimeoutException.class; } else { result[i] = t; } diff --git a/microprofile/fault-tolerance/src/main/java/module-info.java b/microprofile/fault-tolerance/src/main/java/module-info.java index 54d7e5f8020..0579579856a 100644 --- a/microprofile/fault-tolerance/src/main/java/module-info.java +++ b/microprofile/fault-tolerance/src/main/java/module-info.java @@ -27,7 +27,7 @@ requires io.helidon.common.context; requires io.helidon.common.configurable; - requires io.helidon.reactive.faulttolerance; + requires io.helidon.nima.faulttolerance; requires io.helidon.microprofile.config; requires io.helidon.microprofile.server; requires io.helidon.microprofile.metrics; diff --git a/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/BulkheadBean.java b/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/BulkheadBean.java index d77c8035cad..182440a679e 100644 --- a/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/BulkheadBean.java +++ b/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/BulkheadBean.java @@ -102,7 +102,7 @@ void reset() { CompletableFuture execute(long sleepMillis) { try { counter.increment(); - FaultToleranceTest.printStatus("BulkheadBean::execute", "success"); + FaultToleranceTest.printStatus("BulkheadBean::execute", "incremented"); try { Thread.sleep(sleepMillis); } catch (InterruptedException e) { @@ -111,6 +111,7 @@ CompletableFuture execute(long sleepMillis) { return CompletableFuture.completedFuture(Thread.currentThread().getName()); } finally { counter.decrement(); + FaultToleranceTest.printStatus("BulkheadBean::execute", "decremented"); } } diff --git a/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/BulkheadTest.java b/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/BulkheadTest.java index b160ee28926..0d4845fb2f5 100644 --- a/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/BulkheadTest.java +++ b/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/BulkheadTest.java @@ -107,7 +107,7 @@ void testSynchronous() { 10); // Check that only one thread entered the bulkhead - int sum = Arrays.asList(calls).stream().map(c -> { + int sum = Arrays.stream(calls).map(c -> { try { return c.get(); } catch (Exception e) { diff --git a/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/FaultToleranceTest.java b/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/FaultToleranceTest.java index 8260d01112a..b32389d2d65 100644 --- a/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/FaultToleranceTest.java +++ b/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/FaultToleranceTest.java @@ -25,6 +25,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Supplier; +import java.util.logging.Level; +import java.util.logging.Logger; import java.util.stream.Stream; import io.helidon.microprofile.tests.junit5.HelidonTest; @@ -45,6 +47,7 @@ */ @HelidonTest abstract class FaultToleranceTest { + private static final Logger LOGGER = Logger.getLogger(FaultToleranceTest.class.getName()); private static final long TIMEOUT = 5000; private static final TimeUnit TIMEOUT_UNITS = TimeUnit.MILLISECONDS; @@ -76,8 +79,10 @@ protected static T newNamedBean(Class beanClass) { } static void printStatus(String message, String status) { - System.out.println(message + " -> " + status + " [Thread: " - + Thread.currentThread().getName() + "]"); + if (LOGGER.isLoggable(Level.FINE)) { + LOGGER.log(Level.FINE, message + " -> " + status + + " [Thread: " + Thread.currentThread().threadId() + "]"); + } } @SuppressWarnings("unchecked") diff --git a/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/RetryTest.java b/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/RetryTest.java index f4454596913..dff421fb11b 100644 --- a/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/RetryTest.java +++ b/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/RetryTest.java @@ -22,8 +22,6 @@ import java.util.stream.Stream; import io.helidon.microprofile.tests.junit5.AddBean; - -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -37,7 +35,6 @@ */ @AddBean(RetryBean.class) @AddBean(SyntheticRetryBean.class) -@Disabled("3.0.0-JAKARTA") public class RetryTest extends FaultToleranceTest { static Stream createBeans() { diff --git a/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/TimeoutBean.java b/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/TimeoutBean.java index 29f26b27e79..2d7f1109383 100644 --- a/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/TimeoutBean.java +++ b/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/TimeoutBean.java @@ -16,7 +16,6 @@ package io.helidon.microprofile.faulttolerance; -import java.time.temporal.ChronoUnit; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; @@ -37,39 +36,39 @@ void reset() { duration.set(1600); } - @Timeout(value=1000, unit=ChronoUnit.MILLIS) + @Timeout(value=500) void forceTimeout() throws InterruptedException { FaultToleranceTest.printStatus("TimeoutBean::forceTimeout()", "failure"); - Thread.sleep(1500); + Thread.sleep(1000); } @Asynchronous - @Timeout(value=1000, unit=ChronoUnit.MILLIS) + @Timeout(value=500) CompletableFuture forceTimeoutAsync() throws InterruptedException { FaultToleranceTest.printStatus("TimeoutBean::forceTimeoutAsync()", "failure"); - Thread.sleep(1500); + Thread.sleep(1000); return CompletableFuture.completedFuture("failure"); } - @Timeout(value=1000, unit=ChronoUnit.MILLIS) + @Timeout(value=500) String noTimeout() throws InterruptedException { FaultToleranceTest.printStatus("TimeoutBean::noTimeout()", "success"); - Thread.sleep(500); + Thread.sleep(250); return "success"; } - @Timeout(value=1000, unit=ChronoUnit.MILLIS) + @Timeout(value=500) void forceTimeoutWithCatch() { try { FaultToleranceTest.printStatus("TimeoutBean::forceTimeoutWithCatch()", "failure"); - Thread.sleep(1500); + Thread.sleep(1000); } catch (InterruptedException e) { // falls through } } // See class annotation @Retry(maxRetries = 2) - @Timeout(value=1000, unit=ChronoUnit.MILLIS) + @Timeout(value=1000) String timeoutWithRetries() throws InterruptedException { FaultToleranceTest.printStatus("TimeoutBean::timeoutWithRetries()", duration.get() < 1000 ? "success" : "failure"); @@ -78,16 +77,16 @@ String timeoutWithRetries() throws InterruptedException { } @Fallback(fallbackMethod = "onFailure") - @Timeout(value=1000, unit=ChronoUnit.MILLIS) + @Timeout(value=500) String timeoutWithFallback() throws InterruptedException { FaultToleranceTest.printStatus("TimeoutBean::forceTimeoutWithFallback()", "failure"); - Thread.sleep(1500); + Thread.sleep(1000); return "failure"; } // See class annotation @Retry(maxRetries = 2) @Fallback(fallbackMethod = "onFailure") - @Timeout(value=1000, unit=ChronoUnit.MILLIS) + @Timeout(value=500) String timeoutWithRetriesAndFallback() throws InterruptedException { FaultToleranceTest.printStatus("TimeoutBean::timeoutWithRetriesAndFallback()", "failure"); Thread.sleep(duration.getAndAdd(-100)); // not enough, need fallback diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/AsyncImpl.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/AsyncImpl.java index 16e282138f1..5314686e3b6 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/AsyncImpl.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/AsyncImpl.java @@ -16,13 +16,18 @@ package io.helidon.nima.faulttolerance; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import io.helidon.common.LazyValue; +import static io.helidon.nima.faulttolerance.SupplierHelper.unwrapThrowable; + /** * Implementation of {@code Async}. If no executor specified in builder, then it will * use {@link Executors#newVirtualThreadPerTaskExecutor}. Note that this default executor @@ -42,14 +47,28 @@ class AsyncImpl implements Async { @Override public CompletableFuture invoke(Supplier supplier) { - CompletableFuture result = new CompletableFuture<>(); - executor.get().submit(() -> { + AtomicBoolean mayInterrupt = new AtomicBoolean(false); + CompletableFuture result = new CompletableFuture<>() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + mayInterrupt.set(mayInterruptIfRunning); + return super.cancel(mayInterruptIfRunning); + } + }; + Future> future = executor.get().submit(() -> { try { T t = supplier.get(); result.complete(t); - } catch (Exception e) { - result.completeExceptionally(e); + } catch (Throwable t) { + Throwable throwable = unwrapThrowable(t); + result.completeExceptionally(throwable); + } + }); + result.exceptionally(t -> { + if (t instanceof CancellationException) { + future.cancel(mayInterrupt.get()); } + return null; }); return result; } diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Bulkhead.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Bulkhead.java index a4c033d9d83..019d910829c 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Bulkhead.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Bulkhead.java @@ -46,6 +46,14 @@ static Builder builder() { */ Stats stats(); + /** + * Can be used to cancel a supplier while queued. + * + * @param supplier the supplier + * @return outcome of cancellation + */ + boolean cancelSupplier(Supplier> supplier); + /** * Provides statistics during the lifetime of a bulkhead, such as * concurrent executions, accepted/rejected calls and queue size. diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/BulkheadImpl.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/BulkheadImpl.java index 359f65d680f..3fe89636663 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/BulkheadImpl.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/BulkheadImpl.java @@ -17,29 +17,42 @@ package io.helidon.nima.faulttolerance; import java.lang.System.Logger.Level; +import java.util.IdentityHashMap; import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; +import static io.helidon.nima.faulttolerance.SupplierHelper.toRuntimeException; +import static io.helidon.nima.faulttolerance.SupplierHelper.unwrapThrowable; + class BulkheadImpl implements Bulkhead { private static final System.Logger LOGGER = System.getLogger(BulkheadImpl.class.getName()); private final Semaphore inProgress; private final String name; - private final int maxQueue; + private final BarrierQueue queue; private final AtomicLong concurrentExecutions = new AtomicLong(0L); private final AtomicLong callsAccepted = new AtomicLong(0L); private final AtomicLong callsRejected = new AtomicLong(0L); - private final AtomicInteger enqueued = new AtomicInteger(); private final List listeners; + private final Set> cancelledSuppliers = new CopyOnWriteArraySet<>(); BulkheadImpl(Builder builder) { this.inProgress = new Semaphore(builder.limit(), true); this.name = builder.name(); - this.maxQueue = builder.queueLength(); this.listeners = builder.queueListeners(); + this.queue = builder.queueLength() > 0 + ? new BlockingQueue(builder.queueLength()) + : new ZeroCapacityQueue(); } @Override @@ -57,20 +70,24 @@ public T invoke(Supplier extends T> supplier) { return execute(supplier); } - int queueLength = enqueued.incrementAndGet(); - if (queueLength > maxQueue) { - enqueued.decrementAndGet(); + if (queue.size() == queue.capacity()) { callsRejected.incrementAndGet(); throw new BulkheadException("Bulkhead queue \"" + name + "\" is full"); } try { - // block current thread until permit available + // block current thread until barrier is retracted listeners.forEach(l -> l.enqueueing(supplier)); - inProgress.acquire(); + queue.enqueueAndWaitOn(supplier); // unblocked so we can proceed with execution listeners.forEach(l -> l.dequeued(supplier)); - enqueued.decrementAndGet(); + + // do not run if cancelled while queued + if (cancelledSuppliers.remove(supplier)) { + return null; + } + + // invoke supplier now if (LOGGER.isLoggable(Level.DEBUG)) { LOGGER.log(Level.DEBUG, name + " invoking " + supplier); } @@ -78,6 +95,8 @@ public T invoke(Supplier extends T> supplier) { } catch (InterruptedException e) { callsRejected.incrementAndGet(); throw new BulkheadException("Bulkhead \"" + name + "\" interrupted while acquiring"); + } catch (ExecutionException e) { + throw new BulkheadException(e.getMessage()); } } @@ -101,29 +120,209 @@ public long callsRejected() { @Override public long waitingQueueSize() { - return enqueued.get(); + return queue.size(); } }; } // this method must be called while holding a permit - private T execute(Supplier extends T> task) { + private T execute(Supplier extends T> supplier) { callsAccepted.incrementAndGet(); concurrentExecutions.incrementAndGet(); try { - T result = task.get(); + T result = supplier.get(); if (LOGGER.isLoggable(Level.DEBUG)) { - LOGGER.log(Level.DEBUG, name + " finished execution: " + task + LOGGER.log(Level.DEBUG, name + " finished execution: " + supplier + " (success)"); } return result; } catch (Throwable t) { - LOGGER.log(Level.DEBUG, name + " finished execution: " + task - + " (failure)", t); - throw t; + Throwable throwable = unwrapThrowable(t); + LOGGER.log(Level.DEBUG, name + " finished execution: " + supplier + + " (failure)", throwable); + throw toRuntimeException(throwable); } finally { concurrentExecutions.decrementAndGet(); - inProgress.release(); + if (queue.size() > 0) { + queue.dequeueAndRetract(); + } else { + inProgress.release(); + } + } + } + + @Override + public boolean cancelSupplier(Supplier> supplier) { + boolean cancelled = queue.remove(supplier); + if (cancelled) { + cancelledSuppliers.add(supplier); + } + return cancelled; + } + + /** + * A queue for suppliers that block on barriers. + */ + private interface BarrierQueue { + + /** + * Number of suppliers in queue. + * + * @return current number of suppliers + */ + int size(); + + /** + * Maximum number of suppliers in queue. + * + * @return max number of suppliers + */ + int capacity(); + + /** + * Enqueue supplier and block thread on barrier. + * + * @param supplier the supplier + * @throws ExecutionException if exception encountered while blocked + * @throws InterruptedException if blocking is interrupted + */ + void enqueueAndWaitOn(Supplier> supplier) throws ExecutionException, InterruptedException; + + /** + * Dequeue supplier and retract its barrier. + */ + void dequeueAndRetract(); + + /** + * Remove supplier from queue, if present. + * + * @param supplier the supplier + * @return {@code true} if supplier was removed or {@code false} otherwise + */ + boolean remove(Supplier> supplier); + } + + /** + * A queue with capacity 0. + */ + private static class ZeroCapacityQueue implements BarrierQueue { + + @Override + public int size() { + return 0; + } + + @Override + public int capacity() { + return 0; + } + + @Override + public void enqueueAndWaitOn(Supplier> supplier) throws InterruptedException { + throw new IllegalStateException("Queue capacity is 0"); + } + + @Override + public void dequeueAndRetract() { + throw new IllegalStateException("Queue capacity is 0"); + } + + @Override + public boolean remove(Supplier> supplier) { + throw new IllegalStateException("Queue capacity is 0"); + } + } + + /** + * A queue that holds all those suppliers that don't have permits to execute at a + * certain time. The thread running the supplier will be forced to wait on a barrier + * until a new permit becomes available. + */ + private static class BlockingQueue implements BarrierQueue { + + private final int capacity; + private final ReentrantLock lock; + private final Queue> queue; + private final Map, Barrier> map; + + BlockingQueue(int capacity) { + if (capacity <= 0) { + throw new IllegalArgumentException("Queue capacity must be greater than 0"); + } + this.capacity = capacity; + this.queue = new LinkedBlockingQueue<>(capacity); + this.map = new IdentityHashMap<>(); // just use references + this.lock = new ReentrantLock(); + } + + @Override + public int size() { + return queue.size(); + } + + @Override + public int capacity() { + return capacity; + } + + @Override + public void enqueueAndWaitOn(Supplier> supplier) throws ExecutionException, InterruptedException { + Barrier barrier = enqueue(supplier); + if (barrier != null) { + barrier.waitOn(); + } else { + throw new IllegalStateException("Queue is full"); + } + } + + @Override + public void dequeueAndRetract() { + Barrier barrier = dequeue(); + if (barrier != null) { + barrier.retract(); + } else { + throw new IllegalStateException("Queue is empty"); + } + } + + @Override + public boolean remove(Supplier> supplier) { + return queue.remove(supplier); + } + + private Barrier dequeue() { + lock.lock(); + try { + Supplier> supplier = queue.poll(); + return supplier == null ? null : map.remove(supplier); + } finally { + lock.unlock(); + } + } + + private Barrier enqueue(Supplier> supplier) { + lock.lock(); + try { + boolean added = queue.offer(supplier); + return added ? map.computeIfAbsent(supplier, s -> new Barrier()) : null; + } finally { + lock.unlock(); + } + } + } + + /** + * A barrier is used to force a thread to wait (block) until it is retracted. + */ + private static class Barrier { + private final CompletableFuture future = new CompletableFuture<>(); + + void waitOn() throws ExecutionException, InterruptedException { + future.get(); + } + + void retract() { + future.complete(null); } } } diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/CircuitBreakerImpl.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/CircuitBreakerImpl.java index 1c96ec0c939..9d1b34b909d 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/CircuitBreakerImpl.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/CircuitBreakerImpl.java @@ -26,6 +26,9 @@ import io.helidon.common.LazyValue; +import static io.helidon.nima.faulttolerance.SupplierHelper.toRuntimeException; +import static io.helidon.nima.faulttolerance.SupplierHelper.unwrapThrowable; + class CircuitBreakerImpl implements CircuitBreaker { /* Configuration options @@ -110,18 +113,20 @@ private U executeTask(Supplier extends U> supplier) { U result = supplier.get(); results.update(ResultWindow.Result.SUCCESS); return result; - } catch (Throwable e) { - if (errorChecker.shouldSkip(e)) { + } catch (Throwable t) { + Throwable throwable = unwrapThrowable(t); + if (errorChecker.shouldSkip(throwable)) { results.update(ResultWindow.Result.SUCCESS); } else { results.update(ResultWindow.Result.FAILURE); } + throw toRuntimeException(throwable); + } finally { if (results.shouldOpen() && state.compareAndSet(State.CLOSED, State.OPEN)) { results.reset(); // if we successfully switch to open, we need to schedule switch to half-open scheduleHalf(); } - throw e; } } @@ -138,23 +143,24 @@ private U halfOpenTask(Supplier extends U> supplier) { state.compareAndSet(State.HALF_OPEN, State.CLOSED); } return result; - } catch (Throwable e) { - if (errorChecker.shouldSkip(e)) { + } catch (Throwable t) { + Throwable throwable = unwrapThrowable(t); + if (errorChecker.shouldSkip(throwable)) { // success int successes = successCounter.incrementAndGet(); if (successes >= successThreshold) { // transition to closed successCounter.set(0); state.compareAndSet(State.HALF_OPEN, State.CLOSED); - } else { - // failure - successCounter.set(0); - state.set(State.OPEN); - // if we successfully switch to open, we need to schedule switch to half-open - scheduleHalf(); } + } else { + // failure + successCounter.set(0); + state.set(State.OPEN); + // if we successfully switch to open, we need to schedule switch to half-open + scheduleHalf(); } - throw e; + throw toRuntimeException(throwable); } finally { halfOpenInProgress.set(false); } diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/FallbackImpl.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/FallbackImpl.java index 87d8eb2dd53..ee7f7d46c97 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/FallbackImpl.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/FallbackImpl.java @@ -19,6 +19,9 @@ import java.util.function.Function; import java.util.function.Supplier; +import static io.helidon.nima.faulttolerance.SupplierHelper.toRuntimeException; +import static io.helidon.nima.faulttolerance.SupplierHelper.unwrapThrowable; + class FallbackImpl implements Fallback { private final Function fallback; private final ErrorChecker errorChecker; @@ -32,15 +35,19 @@ class FallbackImpl implements Fallback { public T invoke(Supplier extends T> supplier) { try { return supplier.get(); - } catch (Throwable e) { - if (errorChecker.shouldSkip(e)) { - throw e; + } catch (Throwable t) { + Throwable throwable = unwrapThrowable(t); + if (errorChecker.shouldSkip(throwable)) { + throw toRuntimeException(throwable); } try { - return fallback.apply(e); - } catch (RuntimeException ex) { - ex.addSuppressed(e); - throw ex; + return fallback.apply(throwable); + } catch (Throwable t2) { + Throwable throwable2 = unwrapThrowable(t2); + if (throwable2 != throwable) { // cannot self suppress + throwable2.addSuppressed(throwable); + } + throw toRuntimeException(throwable2); } } } diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/RetryImpl.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/RetryImpl.java index 5871cc22c52..849527f1a13 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/RetryImpl.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/RetryImpl.java @@ -25,6 +25,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; +import static io.helidon.nima.faulttolerance.SupplierHelper.unwrapThrowable; + class RetryImpl implements Retry { private final ErrorChecker errorChecker; private final long maxTimeNanos; @@ -50,9 +52,11 @@ public T invoke(Supplier extends T> supplier) { while (true) { try { return supplier.get(); - } catch (Throwable e) { - context.thrown.add(e); - if (errorChecker.shouldSkip(e)) { + } catch (Throwable t) { + Throwable throwable = unwrapThrowable(t); + context.thrown.add(throwable); + if (errorChecker.shouldSkip(throwable) + || throwable instanceof InterruptedException) { // no retry on interrupt return context.throwIt(); } } @@ -126,8 +130,7 @@ public U throwIt() { if (t instanceof Error e) { throw e; } - // this is a case that should not happen, as the supplier cannot throw a checked exception - throw new RuntimeException("Retries completed with exception", t); + throw new SupplierException(t); } boolean hasThrowable() { diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/RetryTimeoutException.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/RetryTimeoutException.java index 6e004cd0f63..885b695df10 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/RetryTimeoutException.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/RetryTimeoutException.java @@ -22,6 +22,8 @@ public class RetryTimeoutException extends TimeoutException { private static final long serialVersionUID = 1900926677490550714L; + private final Throwable lastRetryException; + /** * Constructs a {@code RetryTimeoutException} with the specified detail * message. @@ -31,6 +33,16 @@ public class RetryTimeoutException extends TimeoutException { */ public RetryTimeoutException(String message, Throwable throwable) { super(message, throwable); + lastRetryException = throwable; + } + + /** + * Last exception thrown in {@code Retry} before the overall timeout reached. + * + * @return last exception thrown + */ + public Throwable lastRetryException() { + return lastRetryException; } } diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/SupplierException.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/SupplierException.java new file mode 100644 index 00000000000..c95a0d9ff66 --- /dev/null +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/SupplierException.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.nima.faulttolerance; + +/** + * An exception class that is a {@code RuntimeException} and is used to wrap + * an exception that cannot be thrown in a supplier. + */ +public class SupplierException extends RuntimeException { + + /** + * Create an instance using a {@code Throwable}. + * + * @param cause the cause + */ + public SupplierException(Throwable cause) { + super(cause); + } + + /** + * Create an instance using a {@code Throwable}. + * + * @param message the message + * @param cause the cause + */ + public SupplierException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/SupplierHelper.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/SupplierHelper.java new file mode 100644 index 00000000000..d27ccc2ce0b --- /dev/null +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/SupplierHelper.java @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.nima.faulttolerance; + +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +/** + * Helper to handle async suppliers that return {@code CompletionStage}. + */ +public class SupplierHelper { + + private SupplierHelper() { + } + + /** + * Maps a supplier returning a {@code CompletionStage} to a supplier returning {@code T} + * by waiting on the stage to produce a value. + * + * @param supplier the async supplier + * @param timeout time to wait + * @param unit unit of time + * @param type produced by supplier + * @return the supplier + */ + public static Supplier toSyncSupplier(Supplier extends CompletionStage> supplier, + long timeout, TimeUnit unit) { + return () -> { + try { + CompletionStage result = supplier.get(); + return result.toCompletableFuture().get(timeout, unit); + } catch (Throwable t) { + Throwable throwable = unwrapThrowable(t); + throw toRuntimeException(throwable); + } + }; + } + + /** + * Wraps a {@code Throwable} in a {@code RuntimeException} of type {@code SupplierException} + * if necessary. Will never nest {@code SupplierException}s. + * + * @param t the throwable + * @return the exception + */ + public static RuntimeException toRuntimeException(Throwable t) { + return t instanceof RuntimeException rt ? rt : new SupplierException(t); + } + + /** + * Gets the underlying cause of a {@code SupplierException} or of a {@code ExecutionException}. + * + * @param t the throwable + * @return the cause or the same throwable + */ + public static Throwable unwrapThrowable(Throwable t) { + if (t instanceof SupplierException || t instanceof ExecutionException) { + return t.getCause(); + } + return t; + } +} diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/TimeoutImpl.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/TimeoutImpl.java index 2b5a5516719..dfeb1093c11 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/TimeoutImpl.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/TimeoutImpl.java @@ -17,7 +17,6 @@ package io.helidon.nima.faulttolerance; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -27,6 +26,9 @@ import io.helidon.common.LazyValue; +import static io.helidon.nima.faulttolerance.SupplierHelper.toRuntimeException; +import static io.helidon.nima.faulttolerance.SupplierHelper.unwrapThrowable; + class TimeoutImpl implements Timeout { private static final System.Logger LOGGER = System.getLogger(TimeoutImpl.class.getName()); @@ -54,25 +56,21 @@ public T invoke(Supplier extends T> supplier) { return CompletableFuture.supplyAsync(supplier, executor.get()) .orTimeout(timeoutMillis, TimeUnit.MILLISECONDS) .get(); - } catch (InterruptedException e) { - throw new TimeoutException("Call interrupted", e); - } catch (ExecutionException e) { - // Map java.util.concurrent.TimeoutException to Nima's TimeoutException - if (e.getCause() instanceof java.util.concurrent.TimeoutException) { - throw new TimeoutException("Timeout reached", e.getCause().getCause()); - } - throw new RuntimeException("Asynchronous execution error", e.getCause()); + } catch (Throwable t) { + throw mapThrowable(t, null); } } else { Thread thisThread = Thread.currentThread(); ReentrantLock interruptLock = new ReentrantLock(); AtomicBoolean callReturned = new AtomicBoolean(false); + AtomicBoolean interrupted = new AtomicBoolean(false); ScheduledFuture> timeoutFuture = executor.get().schedule(() -> { interruptLock.lock(); try { if (callReturned.compareAndSet(false, true)) { thisThread.interrupt(); + interrupted.set(true); // needed if InterruptedException caught in supplier } } finally { interruptLock.unlock(); @@ -81,7 +79,13 @@ public T invoke(Supplier extends T> supplier) { }, timeoutMillis, TimeUnit.MILLISECONDS); try { - return supplier.get(); + T result = supplier.get(); + if (interrupted.get()) { + throw new TimeoutException("Supplier execution interrupted", null); + } + return result; + } catch (Throwable t) { + throw mapThrowable(t, interrupted); } finally { interruptLock.lock(); try { @@ -98,4 +102,17 @@ public T invoke(Supplier extends T> supplier) { } } } + + private static RuntimeException mapThrowable(Throwable t, AtomicBoolean interrupted) { + Throwable throwable = unwrapThrowable(t); + if (throwable instanceof InterruptedException) { + return new TimeoutException("Call interrupted", throwable); + + } else if (throwable instanceof java.util.concurrent.TimeoutException) { + return new TimeoutException("Timeout reached", throwable.getCause()); + } else if (interrupted != null && interrupted.get()) { + return new TimeoutException("Supplier execution interrupted", t); + } + return toRuntimeException(throwable); + } } diff --git a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/BulkheadTest.java b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/BulkheadTest.java index 2e9f38aa36e..f3cb93260f0 100644 --- a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/BulkheadTest.java +++ b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/BulkheadTest.java @@ -40,7 +40,7 @@ class BulkheadTest { private static final System.Logger LOGGER = System.getLogger(BulkheadTest.class.getName()); - private static final long WAIT_TIMEOUT_MILLIS = 2000; + private static final long WAIT_TIMEOUT_MILLIS = 4000; private final CountDownLatch enqueuedSubmitted = new CountDownLatch(1); diff --git a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/CircuitBreakerTest.java b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/CircuitBreakerTest.java index a58a2899af4..ae160af2caf 100644 --- a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/CircuitBreakerTest.java +++ b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/CircuitBreakerTest.java @@ -83,6 +83,21 @@ void testCircuitBreaker() throws InterruptedException, ExecutionException, Timeo assertThat(breaker.state(), is(CircuitBreaker.State.OPEN)); } + @Test + void testOpenOnLastSuccess() { + CircuitBreaker breaker = CircuitBreaker.builder() + .volume(4) + .errorRatio(75) + .build(); + + bad(breaker); + bad(breaker); + bad(breaker); + good(breaker); + + assertThat(breaker.state(), is(CircuitBreaker.State.OPEN)); + } + private void breakerOpen(CircuitBreaker breaker) { Request good = new Request(); assertThrows(CircuitBreakerOpenException.class, () -> breaker.invoke(good::invoke)); diff --git a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/ResultWindowTest.java b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/ResultWindowTest.java index 62879a78adf..c9f01c9f3b1 100644 --- a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/ResultWindowTest.java +++ b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/ResultWindowTest.java @@ -71,6 +71,17 @@ void testOpenAfterCompleteWindow3() { assertThat("Should open after complete window > 20%", window.shouldOpen(), is(true)); } + @Test + void testOpenAfterCompleteWindow4() { + ResultWindow window = new ResultWindow(4, 75); + assertThat("Empty should not open", window.shouldOpen(), is(false)); + window.update(ResultWindow.Result.FAILURE); + window.update(ResultWindow.Result.FAILURE); + window.update(ResultWindow.Result.FAILURE); + window.update(ResultWindow.Result.SUCCESS); + assertThat("Should open after complete window > 75%", window.shouldOpen(), is(true)); + } + @Test void testOpenAfterCompleteWindowReset() { ResultWindow window = new ResultWindow(5, 20); diff --git a/tests/functional/request-scope/src/test/java/io/helidon/tests/functional/requestscope/TenantTest.java b/tests/functional/request-scope/src/test/java/io/helidon/tests/functional/requestscope/TenantTest.java index 0339c6447cf..1de55bec66a 100644 --- a/tests/functional/request-scope/src/test/java/io/helidon/tests/functional/requestscope/TenantTest.java +++ b/tests/functional/request-scope/src/test/java/io/helidon/tests/functional/requestscope/TenantTest.java @@ -20,7 +20,7 @@ import java.util.function.Supplier; import io.helidon.microprofile.tests.junit5.HelidonTest; -import io.helidon.reactive.faulttolerance.Async; +import io.helidon.nima.faulttolerance.Async; import io.netty.handler.codec.http.HttpResponseStatus; import jakarta.inject.Inject;
+ * fallback(retry(circuitbreaker(timeout(bulkhead(method))))) + *
* Uses the cached handlers defined in the method state for this invocation's * method, except for fallback. * @@ -539,9 +534,33 @@ private FtHandlerTyped createMethodHandler(MethodState methodState) { if (introspector.hasFallback()) { Fallback fallback = Fallback.builder() .fallback(throwable -> { - fallbackCalled.set(true); FallbackHelper cfb = new FallbackHelper(context, introspector, throwable); - return toCompletionStageSupplier(cfb::execute).get(); + + // Fallback executed in another thread + if (introspector.isAsynchronous()) { + // In a reactive env, we shouldn't block on a Future, so the FT spec + // states not to fallback in this case -- even though we can with VTs + // Note if method throws exception directly, fallback is required. + if (method.getReturnType().equals(Future.class) + && throwable instanceof ExecutionException) { // exception from Future + throw toRuntimeException(throwable); + } + + CompletableFuture> f = callSupplierNewThread(asyncToSyncFtSupplier(cfb::execute)); + try { + fallbackCalled.set(true); + return f.get(); + } catch (Throwable t) { + throw toRuntimeException(t); + } + } else { + try { + fallbackCalled.set(true); + return callSupplier(cfb::execute); + } catch (Throwable t) { + throw toRuntimeException(t); + } + } }) .applyOn(mapTypes(introspector.getFallback().applyOn())) .skipOn(mapTypes(introspector.getFallback().skipOn())) @@ -552,84 +571,6 @@ private FtHandlerTyped createMethodHandler(MethodState methodState) { return builder.build(); } - /** - * Maps an {@link FtSupplier} to a supplier of {@link CompletionStage}. - * - * @param supplier The supplier. - * @return The new supplier. - */ - @SuppressWarnings("unchecked") - Supplier extends CompletionStage> toCompletionStageSupplier(FtSupplier supplier) { - return () -> { - invocationStartNanos = System.nanoTime(); - - // Wrap supplier with request context setup - FtSupplier wrappedSupplier = requestScopeHelper.wrapInScope(supplier); - - CompletableFuture resultFuture = new CompletableFuture<>(); - if (introspector.isAsynchronous()) { - // Invoke supplier in new thread and propagate ccl for config - ClassLoader ccl = Thread.currentThread().getContextClassLoader(); - Single single = Async.create().invoke(() -> { - try { - Thread.currentThread().setContextClassLoader(ccl); - asyncInterruptThread = Thread.currentThread(); - return wrappedSupplier.get(); - } catch (Throwable t) { - return new InvokerAsyncException(t); // wraps Throwable - } - }); - - // Handle async cancellations - resultFuture.whenComplete((result, throwable) -> { - if (throwable instanceof CancellationException) { - single.cancel(); // will not interrupt by default - - // If interrupt was requested, do it manually here - if (mayInterruptIfRunning.get() && asyncInterruptThread != null) { - asyncInterruptThread.interrupt(); - asyncInterruptThread = null; - } - } - }); - - // The result must be Future>, {Completable}Future> or InvokerAsyncException - single.thenAccept(result -> { - try { - // Handle exceptions thrown by an async method - if (result instanceof InvokerAsyncException) { - resultFuture.completeExceptionally(((Exception) result).getCause()); - } else if (method.getReturnType() == Future.class) { - // If method returns Future, pass it without further processing - resultFuture.complete(result); - } else if (result instanceof CompletionStage>) { // also CompletableFuture> - CompletionStage cs = (CompletionStage) result; - cs.whenComplete((o, t) -> { - if (t != null) { - resultFuture.completeExceptionally(t); - } else { - resultFuture.complete(o); - } - }); - } else { - throw new InternalError("Return type validation failed for method " + method); - } - } catch (Throwable t) { - resultFuture.completeExceptionally(t); - } - }); - } else { - try { - resultFuture.complete(wrappedSupplier.get()); - return resultFuture; - } catch (Throwable t) { - resultFuture.completeExceptionally(t); - } - } - return resultFuture; - }; - } - /** * Collects information necessary to update metrics after method is called. */ diff --git a/microprofile/fault-tolerance/src/main/java/io/helidon/microprofile/faulttolerance/ThrowableMapper.java b/microprofile/fault-tolerance/src/main/java/io/helidon/microprofile/faulttolerance/ThrowableMapper.java index 31cc46aa54e..86e1ba5807f 100644 --- a/microprofile/fault-tolerance/src/main/java/io/helidon/microprofile/faulttolerance/ThrowableMapper.java +++ b/microprofile/fault-tolerance/src/main/java/io/helidon/microprofile/faulttolerance/ThrowableMapper.java @@ -19,7 +19,7 @@ import java.util.Arrays; import java.util.concurrent.ExecutionException; -import io.helidon.reactive.faulttolerance.RetryTimeoutException; +import io.helidon.nima.faulttolerance.RetryTimeoutException; import org.eclipse.microprofile.faulttolerance.exceptions.BulkheadException; import org.eclipse.microprofile.faulttolerance.exceptions.CircuitBreakerOpenException; @@ -44,19 +44,17 @@ static Throwable map(Throwable t) { if (t instanceof ExecutionException) { t = t.getCause(); } - if (t instanceof io.helidon.reactive.faulttolerance.CircuitBreakerOpenException) { + if (t instanceof io.helidon.nima.faulttolerance.CircuitBreakerOpenException) { return new CircuitBreakerOpenException(t.getMessage(), t.getCause()); } - if (t instanceof io.helidon.reactive.faulttolerance.BulkheadException) { + if (t instanceof io.helidon.nima.faulttolerance.BulkheadException) { return new BulkheadException(t.getMessage(), t.getCause()); } if (t instanceof RetryTimeoutException) { return t; // the cause is handled elsewhere } - if (t instanceof java.util.concurrent.TimeoutException) { - return new TimeoutException(t.getMessage(), t.getCause()); - } - if (t instanceof java.lang.InterruptedException) { + if (t instanceof io.helidon.nima.faulttolerance.TimeoutException + || t instanceof java.lang.InterruptedException) { return new TimeoutException(t.getMessage(), t.getCause()); } return t; @@ -77,11 +75,11 @@ static Class extends Throwable>[] mapTypes(Class extends Throwable>[] types) for (int i = 0; i < types.length; i++) { Class extends Throwable> t = types[i]; if (t == BulkheadException.class) { - result[i] = io.helidon.reactive.faulttolerance.BulkheadException.class; + result[i] = io.helidon.nima.faulttolerance.BulkheadException.class; } else if (t == CircuitBreakerOpenException.class) { - result[i] = io.helidon.reactive.faulttolerance.CircuitBreakerOpenException.class; + result[i] = io.helidon.nima.faulttolerance.CircuitBreakerOpenException.class; } else if (t == TimeoutException.class) { - result[i] = java.util.concurrent.TimeoutException.class; + result[i] = io.helidon.nima.faulttolerance.TimeoutException.class; } else { result[i] = t; } diff --git a/microprofile/fault-tolerance/src/main/java/module-info.java b/microprofile/fault-tolerance/src/main/java/module-info.java index 54d7e5f8020..0579579856a 100644 --- a/microprofile/fault-tolerance/src/main/java/module-info.java +++ b/microprofile/fault-tolerance/src/main/java/module-info.java @@ -27,7 +27,7 @@ requires io.helidon.common.context; requires io.helidon.common.configurable; - requires io.helidon.reactive.faulttolerance; + requires io.helidon.nima.faulttolerance; requires io.helidon.microprofile.config; requires io.helidon.microprofile.server; requires io.helidon.microprofile.metrics; diff --git a/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/BulkheadBean.java b/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/BulkheadBean.java index d77c8035cad..182440a679e 100644 --- a/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/BulkheadBean.java +++ b/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/BulkheadBean.java @@ -102,7 +102,7 @@ void reset() { CompletableFuture execute(long sleepMillis) { try { counter.increment(); - FaultToleranceTest.printStatus("BulkheadBean::execute", "success"); + FaultToleranceTest.printStatus("BulkheadBean::execute", "incremented"); try { Thread.sleep(sleepMillis); } catch (InterruptedException e) { @@ -111,6 +111,7 @@ CompletableFuture execute(long sleepMillis) { return CompletableFuture.completedFuture(Thread.currentThread().getName()); } finally { counter.decrement(); + FaultToleranceTest.printStatus("BulkheadBean::execute", "decremented"); } } diff --git a/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/BulkheadTest.java b/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/BulkheadTest.java index b160ee28926..0d4845fb2f5 100644 --- a/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/BulkheadTest.java +++ b/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/BulkheadTest.java @@ -107,7 +107,7 @@ void testSynchronous() { 10); // Check that only one thread entered the bulkhead - int sum = Arrays.asList(calls).stream().map(c -> { + int sum = Arrays.stream(calls).map(c -> { try { return c.get(); } catch (Exception e) { diff --git a/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/FaultToleranceTest.java b/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/FaultToleranceTest.java index 8260d01112a..b32389d2d65 100644 --- a/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/FaultToleranceTest.java +++ b/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/FaultToleranceTest.java @@ -25,6 +25,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Supplier; +import java.util.logging.Level; +import java.util.logging.Logger; import java.util.stream.Stream; import io.helidon.microprofile.tests.junit5.HelidonTest; @@ -45,6 +47,7 @@ */ @HelidonTest abstract class FaultToleranceTest { + private static final Logger LOGGER = Logger.getLogger(FaultToleranceTest.class.getName()); private static final long TIMEOUT = 5000; private static final TimeUnit TIMEOUT_UNITS = TimeUnit.MILLISECONDS; @@ -76,8 +79,10 @@ protected static T newNamedBean(Class beanClass) { } static void printStatus(String message, String status) { - System.out.println(message + " -> " + status + " [Thread: " - + Thread.currentThread().getName() + "]"); + if (LOGGER.isLoggable(Level.FINE)) { + LOGGER.log(Level.FINE, message + " -> " + status + + " [Thread: " + Thread.currentThread().threadId() + "]"); + } } @SuppressWarnings("unchecked") diff --git a/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/RetryTest.java b/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/RetryTest.java index f4454596913..dff421fb11b 100644 --- a/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/RetryTest.java +++ b/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/RetryTest.java @@ -22,8 +22,6 @@ import java.util.stream.Stream; import io.helidon.microprofile.tests.junit5.AddBean; - -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -37,7 +35,6 @@ */ @AddBean(RetryBean.class) @AddBean(SyntheticRetryBean.class) -@Disabled("3.0.0-JAKARTA") public class RetryTest extends FaultToleranceTest { static Stream createBeans() { diff --git a/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/TimeoutBean.java b/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/TimeoutBean.java index 29f26b27e79..2d7f1109383 100644 --- a/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/TimeoutBean.java +++ b/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/TimeoutBean.java @@ -16,7 +16,6 @@ package io.helidon.microprofile.faulttolerance; -import java.time.temporal.ChronoUnit; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; @@ -37,39 +36,39 @@ void reset() { duration.set(1600); } - @Timeout(value=1000, unit=ChronoUnit.MILLIS) + @Timeout(value=500) void forceTimeout() throws InterruptedException { FaultToleranceTest.printStatus("TimeoutBean::forceTimeout()", "failure"); - Thread.sleep(1500); + Thread.sleep(1000); } @Asynchronous - @Timeout(value=1000, unit=ChronoUnit.MILLIS) + @Timeout(value=500) CompletableFuture forceTimeoutAsync() throws InterruptedException { FaultToleranceTest.printStatus("TimeoutBean::forceTimeoutAsync()", "failure"); - Thread.sleep(1500); + Thread.sleep(1000); return CompletableFuture.completedFuture("failure"); } - @Timeout(value=1000, unit=ChronoUnit.MILLIS) + @Timeout(value=500) String noTimeout() throws InterruptedException { FaultToleranceTest.printStatus("TimeoutBean::noTimeout()", "success"); - Thread.sleep(500); + Thread.sleep(250); return "success"; } - @Timeout(value=1000, unit=ChronoUnit.MILLIS) + @Timeout(value=500) void forceTimeoutWithCatch() { try { FaultToleranceTest.printStatus("TimeoutBean::forceTimeoutWithCatch()", "failure"); - Thread.sleep(1500); + Thread.sleep(1000); } catch (InterruptedException e) { // falls through } } // See class annotation @Retry(maxRetries = 2) - @Timeout(value=1000, unit=ChronoUnit.MILLIS) + @Timeout(value=1000) String timeoutWithRetries() throws InterruptedException { FaultToleranceTest.printStatus("TimeoutBean::timeoutWithRetries()", duration.get() < 1000 ? "success" : "failure"); @@ -78,16 +77,16 @@ String timeoutWithRetries() throws InterruptedException { } @Fallback(fallbackMethod = "onFailure") - @Timeout(value=1000, unit=ChronoUnit.MILLIS) + @Timeout(value=500) String timeoutWithFallback() throws InterruptedException { FaultToleranceTest.printStatus("TimeoutBean::forceTimeoutWithFallback()", "failure"); - Thread.sleep(1500); + Thread.sleep(1000); return "failure"; } // See class annotation @Retry(maxRetries = 2) @Fallback(fallbackMethod = "onFailure") - @Timeout(value=1000, unit=ChronoUnit.MILLIS) + @Timeout(value=500) String timeoutWithRetriesAndFallback() throws InterruptedException { FaultToleranceTest.printStatus("TimeoutBean::timeoutWithRetriesAndFallback()", "failure"); Thread.sleep(duration.getAndAdd(-100)); // not enough, need fallback diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/AsyncImpl.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/AsyncImpl.java index 16e282138f1..5314686e3b6 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/AsyncImpl.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/AsyncImpl.java @@ -16,13 +16,18 @@ package io.helidon.nima.faulttolerance; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import io.helidon.common.LazyValue; +import static io.helidon.nima.faulttolerance.SupplierHelper.unwrapThrowable; + /** * Implementation of {@code Async}. If no executor specified in builder, then it will * use {@link Executors#newVirtualThreadPerTaskExecutor}. Note that this default executor @@ -42,14 +47,28 @@ class AsyncImpl implements Async { @Override public CompletableFuture invoke(Supplier supplier) { - CompletableFuture result = new CompletableFuture<>(); - executor.get().submit(() -> { + AtomicBoolean mayInterrupt = new AtomicBoolean(false); + CompletableFuture result = new CompletableFuture<>() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + mayInterrupt.set(mayInterruptIfRunning); + return super.cancel(mayInterruptIfRunning); + } + }; + Future> future = executor.get().submit(() -> { try { T t = supplier.get(); result.complete(t); - } catch (Exception e) { - result.completeExceptionally(e); + } catch (Throwable t) { + Throwable throwable = unwrapThrowable(t); + result.completeExceptionally(throwable); + } + }); + result.exceptionally(t -> { + if (t instanceof CancellationException) { + future.cancel(mayInterrupt.get()); } + return null; }); return result; } diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Bulkhead.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Bulkhead.java index a4c033d9d83..019d910829c 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Bulkhead.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Bulkhead.java @@ -46,6 +46,14 @@ static Builder builder() { */ Stats stats(); + /** + * Can be used to cancel a supplier while queued. + * + * @param supplier the supplier + * @return outcome of cancellation + */ + boolean cancelSupplier(Supplier> supplier); + /** * Provides statistics during the lifetime of a bulkhead, such as * concurrent executions, accepted/rejected calls and queue size. diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/BulkheadImpl.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/BulkheadImpl.java index 359f65d680f..3fe89636663 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/BulkheadImpl.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/BulkheadImpl.java @@ -17,29 +17,42 @@ package io.helidon.nima.faulttolerance; import java.lang.System.Logger.Level; +import java.util.IdentityHashMap; import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; +import static io.helidon.nima.faulttolerance.SupplierHelper.toRuntimeException; +import static io.helidon.nima.faulttolerance.SupplierHelper.unwrapThrowable; + class BulkheadImpl implements Bulkhead { private static final System.Logger LOGGER = System.getLogger(BulkheadImpl.class.getName()); private final Semaphore inProgress; private final String name; - private final int maxQueue; + private final BarrierQueue queue; private final AtomicLong concurrentExecutions = new AtomicLong(0L); private final AtomicLong callsAccepted = new AtomicLong(0L); private final AtomicLong callsRejected = new AtomicLong(0L); - private final AtomicInteger enqueued = new AtomicInteger(); private final List listeners; + private final Set> cancelledSuppliers = new CopyOnWriteArraySet<>(); BulkheadImpl(Builder builder) { this.inProgress = new Semaphore(builder.limit(), true); this.name = builder.name(); - this.maxQueue = builder.queueLength(); this.listeners = builder.queueListeners(); + this.queue = builder.queueLength() > 0 + ? new BlockingQueue(builder.queueLength()) + : new ZeroCapacityQueue(); } @Override @@ -57,20 +70,24 @@ public T invoke(Supplier extends T> supplier) { return execute(supplier); } - int queueLength = enqueued.incrementAndGet(); - if (queueLength > maxQueue) { - enqueued.decrementAndGet(); + if (queue.size() == queue.capacity()) { callsRejected.incrementAndGet(); throw new BulkheadException("Bulkhead queue \"" + name + "\" is full"); } try { - // block current thread until permit available + // block current thread until barrier is retracted listeners.forEach(l -> l.enqueueing(supplier)); - inProgress.acquire(); + queue.enqueueAndWaitOn(supplier); // unblocked so we can proceed with execution listeners.forEach(l -> l.dequeued(supplier)); - enqueued.decrementAndGet(); + + // do not run if cancelled while queued + if (cancelledSuppliers.remove(supplier)) { + return null; + } + + // invoke supplier now if (LOGGER.isLoggable(Level.DEBUG)) { LOGGER.log(Level.DEBUG, name + " invoking " + supplier); } @@ -78,6 +95,8 @@ public T invoke(Supplier extends T> supplier) { } catch (InterruptedException e) { callsRejected.incrementAndGet(); throw new BulkheadException("Bulkhead \"" + name + "\" interrupted while acquiring"); + } catch (ExecutionException e) { + throw new BulkheadException(e.getMessage()); } } @@ -101,29 +120,209 @@ public long callsRejected() { @Override public long waitingQueueSize() { - return enqueued.get(); + return queue.size(); } }; } // this method must be called while holding a permit - private T execute(Supplier extends T> task) { + private T execute(Supplier extends T> supplier) { callsAccepted.incrementAndGet(); concurrentExecutions.incrementAndGet(); try { - T result = task.get(); + T result = supplier.get(); if (LOGGER.isLoggable(Level.DEBUG)) { - LOGGER.log(Level.DEBUG, name + " finished execution: " + task + LOGGER.log(Level.DEBUG, name + " finished execution: " + supplier + " (success)"); } return result; } catch (Throwable t) { - LOGGER.log(Level.DEBUG, name + " finished execution: " + task - + " (failure)", t); - throw t; + Throwable throwable = unwrapThrowable(t); + LOGGER.log(Level.DEBUG, name + " finished execution: " + supplier + + " (failure)", throwable); + throw toRuntimeException(throwable); } finally { concurrentExecutions.decrementAndGet(); - inProgress.release(); + if (queue.size() > 0) { + queue.dequeueAndRetract(); + } else { + inProgress.release(); + } + } + } + + @Override + public boolean cancelSupplier(Supplier> supplier) { + boolean cancelled = queue.remove(supplier); + if (cancelled) { + cancelledSuppliers.add(supplier); + } + return cancelled; + } + + /** + * A queue for suppliers that block on barriers. + */ + private interface BarrierQueue { + + /** + * Number of suppliers in queue. + * + * @return current number of suppliers + */ + int size(); + + /** + * Maximum number of suppliers in queue. + * + * @return max number of suppliers + */ + int capacity(); + + /** + * Enqueue supplier and block thread on barrier. + * + * @param supplier the supplier + * @throws ExecutionException if exception encountered while blocked + * @throws InterruptedException if blocking is interrupted + */ + void enqueueAndWaitOn(Supplier> supplier) throws ExecutionException, InterruptedException; + + /** + * Dequeue supplier and retract its barrier. + */ + void dequeueAndRetract(); + + /** + * Remove supplier from queue, if present. + * + * @param supplier the supplier + * @return {@code true} if supplier was removed or {@code false} otherwise + */ + boolean remove(Supplier> supplier); + } + + /** + * A queue with capacity 0. + */ + private static class ZeroCapacityQueue implements BarrierQueue { + + @Override + public int size() { + return 0; + } + + @Override + public int capacity() { + return 0; + } + + @Override + public void enqueueAndWaitOn(Supplier> supplier) throws InterruptedException { + throw new IllegalStateException("Queue capacity is 0"); + } + + @Override + public void dequeueAndRetract() { + throw new IllegalStateException("Queue capacity is 0"); + } + + @Override + public boolean remove(Supplier> supplier) { + throw new IllegalStateException("Queue capacity is 0"); + } + } + + /** + * A queue that holds all those suppliers that don't have permits to execute at a + * certain time. The thread running the supplier will be forced to wait on a barrier + * until a new permit becomes available. + */ + private static class BlockingQueue implements BarrierQueue { + + private final int capacity; + private final ReentrantLock lock; + private final Queue> queue; + private final Map, Barrier> map; + + BlockingQueue(int capacity) { + if (capacity <= 0) { + throw new IllegalArgumentException("Queue capacity must be greater than 0"); + } + this.capacity = capacity; + this.queue = new LinkedBlockingQueue<>(capacity); + this.map = new IdentityHashMap<>(); // just use references + this.lock = new ReentrantLock(); + } + + @Override + public int size() { + return queue.size(); + } + + @Override + public int capacity() { + return capacity; + } + + @Override + public void enqueueAndWaitOn(Supplier> supplier) throws ExecutionException, InterruptedException { + Barrier barrier = enqueue(supplier); + if (barrier != null) { + barrier.waitOn(); + } else { + throw new IllegalStateException("Queue is full"); + } + } + + @Override + public void dequeueAndRetract() { + Barrier barrier = dequeue(); + if (barrier != null) { + barrier.retract(); + } else { + throw new IllegalStateException("Queue is empty"); + } + } + + @Override + public boolean remove(Supplier> supplier) { + return queue.remove(supplier); + } + + private Barrier dequeue() { + lock.lock(); + try { + Supplier> supplier = queue.poll(); + return supplier == null ? null : map.remove(supplier); + } finally { + lock.unlock(); + } + } + + private Barrier enqueue(Supplier> supplier) { + lock.lock(); + try { + boolean added = queue.offer(supplier); + return added ? map.computeIfAbsent(supplier, s -> new Barrier()) : null; + } finally { + lock.unlock(); + } + } + } + + /** + * A barrier is used to force a thread to wait (block) until it is retracted. + */ + private static class Barrier { + private final CompletableFuture future = new CompletableFuture<>(); + + void waitOn() throws ExecutionException, InterruptedException { + future.get(); + } + + void retract() { + future.complete(null); } } } diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/CircuitBreakerImpl.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/CircuitBreakerImpl.java index 1c96ec0c939..9d1b34b909d 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/CircuitBreakerImpl.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/CircuitBreakerImpl.java @@ -26,6 +26,9 @@ import io.helidon.common.LazyValue; +import static io.helidon.nima.faulttolerance.SupplierHelper.toRuntimeException; +import static io.helidon.nima.faulttolerance.SupplierHelper.unwrapThrowable; + class CircuitBreakerImpl implements CircuitBreaker { /* Configuration options @@ -110,18 +113,20 @@ private U executeTask(Supplier extends U> supplier) { U result = supplier.get(); results.update(ResultWindow.Result.SUCCESS); return result; - } catch (Throwable e) { - if (errorChecker.shouldSkip(e)) { + } catch (Throwable t) { + Throwable throwable = unwrapThrowable(t); + if (errorChecker.shouldSkip(throwable)) { results.update(ResultWindow.Result.SUCCESS); } else { results.update(ResultWindow.Result.FAILURE); } + throw toRuntimeException(throwable); + } finally { if (results.shouldOpen() && state.compareAndSet(State.CLOSED, State.OPEN)) { results.reset(); // if we successfully switch to open, we need to schedule switch to half-open scheduleHalf(); } - throw e; } } @@ -138,23 +143,24 @@ private U halfOpenTask(Supplier extends U> supplier) { state.compareAndSet(State.HALF_OPEN, State.CLOSED); } return result; - } catch (Throwable e) { - if (errorChecker.shouldSkip(e)) { + } catch (Throwable t) { + Throwable throwable = unwrapThrowable(t); + if (errorChecker.shouldSkip(throwable)) { // success int successes = successCounter.incrementAndGet(); if (successes >= successThreshold) { // transition to closed successCounter.set(0); state.compareAndSet(State.HALF_OPEN, State.CLOSED); - } else { - // failure - successCounter.set(0); - state.set(State.OPEN); - // if we successfully switch to open, we need to schedule switch to half-open - scheduleHalf(); } + } else { + // failure + successCounter.set(0); + state.set(State.OPEN); + // if we successfully switch to open, we need to schedule switch to half-open + scheduleHalf(); } - throw e; + throw toRuntimeException(throwable); } finally { halfOpenInProgress.set(false); } diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/FallbackImpl.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/FallbackImpl.java index 87d8eb2dd53..ee7f7d46c97 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/FallbackImpl.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/FallbackImpl.java @@ -19,6 +19,9 @@ import java.util.function.Function; import java.util.function.Supplier; +import static io.helidon.nima.faulttolerance.SupplierHelper.toRuntimeException; +import static io.helidon.nima.faulttolerance.SupplierHelper.unwrapThrowable; + class FallbackImpl implements Fallback { private final Function fallback; private final ErrorChecker errorChecker; @@ -32,15 +35,19 @@ class FallbackImpl implements Fallback { public T invoke(Supplier extends T> supplier) { try { return supplier.get(); - } catch (Throwable e) { - if (errorChecker.shouldSkip(e)) { - throw e; + } catch (Throwable t) { + Throwable throwable = unwrapThrowable(t); + if (errorChecker.shouldSkip(throwable)) { + throw toRuntimeException(throwable); } try { - return fallback.apply(e); - } catch (RuntimeException ex) { - ex.addSuppressed(e); - throw ex; + return fallback.apply(throwable); + } catch (Throwable t2) { + Throwable throwable2 = unwrapThrowable(t2); + if (throwable2 != throwable) { // cannot self suppress + throwable2.addSuppressed(throwable); + } + throw toRuntimeException(throwable2); } } } diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/RetryImpl.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/RetryImpl.java index 5871cc22c52..849527f1a13 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/RetryImpl.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/RetryImpl.java @@ -25,6 +25,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; +import static io.helidon.nima.faulttolerance.SupplierHelper.unwrapThrowable; + class RetryImpl implements Retry { private final ErrorChecker errorChecker; private final long maxTimeNanos; @@ -50,9 +52,11 @@ public T invoke(Supplier extends T> supplier) { while (true) { try { return supplier.get(); - } catch (Throwable e) { - context.thrown.add(e); - if (errorChecker.shouldSkip(e)) { + } catch (Throwable t) { + Throwable throwable = unwrapThrowable(t); + context.thrown.add(throwable); + if (errorChecker.shouldSkip(throwable) + || throwable instanceof InterruptedException) { // no retry on interrupt return context.throwIt(); } } @@ -126,8 +130,7 @@ public U throwIt() { if (t instanceof Error e) { throw e; } - // this is a case that should not happen, as the supplier cannot throw a checked exception - throw new RuntimeException("Retries completed with exception", t); + throw new SupplierException(t); } boolean hasThrowable() { diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/RetryTimeoutException.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/RetryTimeoutException.java index 6e004cd0f63..885b695df10 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/RetryTimeoutException.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/RetryTimeoutException.java @@ -22,6 +22,8 @@ public class RetryTimeoutException extends TimeoutException { private static final long serialVersionUID = 1900926677490550714L; + private final Throwable lastRetryException; + /** * Constructs a {@code RetryTimeoutException} with the specified detail * message. @@ -31,6 +33,16 @@ public class RetryTimeoutException extends TimeoutException { */ public RetryTimeoutException(String message, Throwable throwable) { super(message, throwable); + lastRetryException = throwable; + } + + /** + * Last exception thrown in {@code Retry} before the overall timeout reached. + * + * @return last exception thrown + */ + public Throwable lastRetryException() { + return lastRetryException; } } diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/SupplierException.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/SupplierException.java new file mode 100644 index 00000000000..c95a0d9ff66 --- /dev/null +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/SupplierException.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.nima.faulttolerance; + +/** + * An exception class that is a {@code RuntimeException} and is used to wrap + * an exception that cannot be thrown in a supplier. + */ +public class SupplierException extends RuntimeException { + + /** + * Create an instance using a {@code Throwable}. + * + * @param cause the cause + */ + public SupplierException(Throwable cause) { + super(cause); + } + + /** + * Create an instance using a {@code Throwable}. + * + * @param message the message + * @param cause the cause + */ + public SupplierException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/SupplierHelper.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/SupplierHelper.java new file mode 100644 index 00000000000..d27ccc2ce0b --- /dev/null +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/SupplierHelper.java @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.nima.faulttolerance; + +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +/** + * Helper to handle async suppliers that return {@code CompletionStage}. + */ +public class SupplierHelper { + + private SupplierHelper() { + } + + /** + * Maps a supplier returning a {@code CompletionStage} to a supplier returning {@code T} + * by waiting on the stage to produce a value. + * + * @param supplier the async supplier + * @param timeout time to wait + * @param unit unit of time + * @param type produced by supplier + * @return the supplier + */ + public static Supplier toSyncSupplier(Supplier extends CompletionStage> supplier, + long timeout, TimeUnit unit) { + return () -> { + try { + CompletionStage result = supplier.get(); + return result.toCompletableFuture().get(timeout, unit); + } catch (Throwable t) { + Throwable throwable = unwrapThrowable(t); + throw toRuntimeException(throwable); + } + }; + } + + /** + * Wraps a {@code Throwable} in a {@code RuntimeException} of type {@code SupplierException} + * if necessary. Will never nest {@code SupplierException}s. + * + * @param t the throwable + * @return the exception + */ + public static RuntimeException toRuntimeException(Throwable t) { + return t instanceof RuntimeException rt ? rt : new SupplierException(t); + } + + /** + * Gets the underlying cause of a {@code SupplierException} or of a {@code ExecutionException}. + * + * @param t the throwable + * @return the cause or the same throwable + */ + public static Throwable unwrapThrowable(Throwable t) { + if (t instanceof SupplierException || t instanceof ExecutionException) { + return t.getCause(); + } + return t; + } +} diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/TimeoutImpl.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/TimeoutImpl.java index 2b5a5516719..dfeb1093c11 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/TimeoutImpl.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/TimeoutImpl.java @@ -17,7 +17,6 @@ package io.helidon.nima.faulttolerance; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -27,6 +26,9 @@ import io.helidon.common.LazyValue; +import static io.helidon.nima.faulttolerance.SupplierHelper.toRuntimeException; +import static io.helidon.nima.faulttolerance.SupplierHelper.unwrapThrowable; + class TimeoutImpl implements Timeout { private static final System.Logger LOGGER = System.getLogger(TimeoutImpl.class.getName()); @@ -54,25 +56,21 @@ public T invoke(Supplier extends T> supplier) { return CompletableFuture.supplyAsync(supplier, executor.get()) .orTimeout(timeoutMillis, TimeUnit.MILLISECONDS) .get(); - } catch (InterruptedException e) { - throw new TimeoutException("Call interrupted", e); - } catch (ExecutionException e) { - // Map java.util.concurrent.TimeoutException to Nima's TimeoutException - if (e.getCause() instanceof java.util.concurrent.TimeoutException) { - throw new TimeoutException("Timeout reached", e.getCause().getCause()); - } - throw new RuntimeException("Asynchronous execution error", e.getCause()); + } catch (Throwable t) { + throw mapThrowable(t, null); } } else { Thread thisThread = Thread.currentThread(); ReentrantLock interruptLock = new ReentrantLock(); AtomicBoolean callReturned = new AtomicBoolean(false); + AtomicBoolean interrupted = new AtomicBoolean(false); ScheduledFuture> timeoutFuture = executor.get().schedule(() -> { interruptLock.lock(); try { if (callReturned.compareAndSet(false, true)) { thisThread.interrupt(); + interrupted.set(true); // needed if InterruptedException caught in supplier } } finally { interruptLock.unlock(); @@ -81,7 +79,13 @@ public T invoke(Supplier extends T> supplier) { }, timeoutMillis, TimeUnit.MILLISECONDS); try { - return supplier.get(); + T result = supplier.get(); + if (interrupted.get()) { + throw new TimeoutException("Supplier execution interrupted", null); + } + return result; + } catch (Throwable t) { + throw mapThrowable(t, interrupted); } finally { interruptLock.lock(); try { @@ -98,4 +102,17 @@ public T invoke(Supplier extends T> supplier) { } } } + + private static RuntimeException mapThrowable(Throwable t, AtomicBoolean interrupted) { + Throwable throwable = unwrapThrowable(t); + if (throwable instanceof InterruptedException) { + return new TimeoutException("Call interrupted", throwable); + + } else if (throwable instanceof java.util.concurrent.TimeoutException) { + return new TimeoutException("Timeout reached", throwable.getCause()); + } else if (interrupted != null && interrupted.get()) { + return new TimeoutException("Supplier execution interrupted", t); + } + return toRuntimeException(throwable); + } } diff --git a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/BulkheadTest.java b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/BulkheadTest.java index 2e9f38aa36e..f3cb93260f0 100644 --- a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/BulkheadTest.java +++ b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/BulkheadTest.java @@ -40,7 +40,7 @@ class BulkheadTest { private static final System.Logger LOGGER = System.getLogger(BulkheadTest.class.getName()); - private static final long WAIT_TIMEOUT_MILLIS = 2000; + private static final long WAIT_TIMEOUT_MILLIS = 4000; private final CountDownLatch enqueuedSubmitted = new CountDownLatch(1); diff --git a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/CircuitBreakerTest.java b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/CircuitBreakerTest.java index a58a2899af4..ae160af2caf 100644 --- a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/CircuitBreakerTest.java +++ b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/CircuitBreakerTest.java @@ -83,6 +83,21 @@ void testCircuitBreaker() throws InterruptedException, ExecutionException, Timeo assertThat(breaker.state(), is(CircuitBreaker.State.OPEN)); } + @Test + void testOpenOnLastSuccess() { + CircuitBreaker breaker = CircuitBreaker.builder() + .volume(4) + .errorRatio(75) + .build(); + + bad(breaker); + bad(breaker); + bad(breaker); + good(breaker); + + assertThat(breaker.state(), is(CircuitBreaker.State.OPEN)); + } + private void breakerOpen(CircuitBreaker breaker) { Request good = new Request(); assertThrows(CircuitBreakerOpenException.class, () -> breaker.invoke(good::invoke)); diff --git a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/ResultWindowTest.java b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/ResultWindowTest.java index 62879a78adf..c9f01c9f3b1 100644 --- a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/ResultWindowTest.java +++ b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/ResultWindowTest.java @@ -71,6 +71,17 @@ void testOpenAfterCompleteWindow3() { assertThat("Should open after complete window > 20%", window.shouldOpen(), is(true)); } + @Test + void testOpenAfterCompleteWindow4() { + ResultWindow window = new ResultWindow(4, 75); + assertThat("Empty should not open", window.shouldOpen(), is(false)); + window.update(ResultWindow.Result.FAILURE); + window.update(ResultWindow.Result.FAILURE); + window.update(ResultWindow.Result.FAILURE); + window.update(ResultWindow.Result.SUCCESS); + assertThat("Should open after complete window > 75%", window.shouldOpen(), is(true)); + } + @Test void testOpenAfterCompleteWindowReset() { ResultWindow window = new ResultWindow(5, 20); diff --git a/tests/functional/request-scope/src/test/java/io/helidon/tests/functional/requestscope/TenantTest.java b/tests/functional/request-scope/src/test/java/io/helidon/tests/functional/requestscope/TenantTest.java index 0339c6447cf..1de55bec66a 100644 --- a/tests/functional/request-scope/src/test/java/io/helidon/tests/functional/requestscope/TenantTest.java +++ b/tests/functional/request-scope/src/test/java/io/helidon/tests/functional/requestscope/TenantTest.java @@ -20,7 +20,7 @@ import java.util.function.Supplier; import io.helidon.microprofile.tests.junit5.HelidonTest; -import io.helidon.reactive.faulttolerance.Async; +import io.helidon.nima.faulttolerance.Async; import io.netty.handler.codec.http.HttpResponseStatus; import jakarta.inject.Inject;