diff --git a/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/main/java/fish/payara/microprofile/faulttolerance/FaultToleranceServiceConfiguration.java b/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/main/java/fish/payara/microprofile/faulttolerance/FaultToleranceServiceConfiguration.java index f02af63bd21..4409782278f 100644 --- a/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/main/java/fish/payara/microprofile/faulttolerance/FaultToleranceServiceConfiguration.java +++ b/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/main/java/fish/payara/microprofile/faulttolerance/FaultToleranceServiceConfiguration.java @@ -55,38 +55,44 @@ @Configured(name = "microprofile-fault-tolerance-configuration") public interface FaultToleranceServiceConfiguration extends ConfigExtension { - @Attribute(defaultValue = "concurrent/__defaultManagedExecutorService", dataType = String.class) + @Attribute(defaultValue = "java:comp/DefaultManagedExecutorService", dataType = String.class) public String getManagedExecutorService(); public void setManagedExecutorService(String managedExecutorServiceName); - @Attribute(defaultValue = "concurrent/__defaultManagedScheduledExecutorService", dataType = String.class) + @Attribute(defaultValue = "java:comp/DefaultManagedScheduledExecutorService", dataType = String.class) public String getManagedScheduledExecutorService(); public void setManagedScheduledExecutorService(String managedScheduledExecutorServiceName); /** + * @deprecated Managed thread pools are used since 5.2020.8 (again) * @return The maximum number of threads used to run asynchronous methods concurrently. This is the upper limit. The * executor will vary the actual pool size depending on demand up to this upper limit. If no demand exist * the actual pool size is zero. */ + @Deprecated @Attribute(defaultValue = "2000", dataType = Integer.class) @Min(value = 20) String getAsyncMaxPoolSize(); void setAsyncMaxPoolSize(String asyncMaxPoolSize); /** + * @deprecated Managed thread pools are used since 5.2020.8 (again) * @return The maximum number of threads used to schedule delayed execution and detect timeouts processing FT * semantics. This should be understood as upper limit. The implementation might choose to keep up to this * number of threads alive or vary the actual pool size according to demands. */ + @Deprecated @Attribute(defaultValue = "20", dataType = Integer.class) @Min(value = 1) String getDelayMaxPoolSize(); void setDelayMaxPoolSize(String delayMaxPoolSize); /** + * @deprecated Managed thread pools are used since 5.2020.8 (again) * @return The number of seconds an idle worker in the async pool has to be out of work before it is disposed and * the pool scales down in size. Changes to this setting are dynamically applied and do not need a restart. */ + @Deprecated @Attribute(defaultValue = "60", dataType = Integer.class) @Min(value = 20) @Max(value = 60 * 60) @@ -94,12 +100,14 @@ public interface FaultToleranceServiceConfiguration extends ConfigExtension { void setAsyncPoolKeepAliveInSeconds(String asyncPoolKeepAliveInSeconds); /** + * @deprecated cleaning of FT state has been removed in 5.2020.8 * @return The interval duration in minutes for the background job that cleans expired FT state. Changes do need a * restart of the server. */ @Attribute(defaultValue = "1", dataType = Integer.class) @Min(value = 1) @Max(value = 60 * 24) + @Deprecated String getCleanupIntervalInMinutes(); void setCleanupIntervalInMinutes(String cleanupIntervalInMinutes); } diff --git a/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/main/java/fish/payara/microprofile/faulttolerance/cdi/FaultToleranceInterceptor.java b/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/main/java/fish/payara/microprofile/faulttolerance/cdi/FaultToleranceInterceptor.java index 846f5ff8a75..6cf9e4dcecc 100644 --- a/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/main/java/fish/payara/microprofile/faulttolerance/cdi/FaultToleranceInterceptor.java +++ b/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/main/java/fish/payara/microprofile/faulttolerance/cdi/FaultToleranceInterceptor.java @@ -56,13 +56,12 @@ import javax.interceptor.Interceptor; import javax.interceptor.InvocationContext; -import org.eclipse.microprofile.faulttolerance.exceptions.FaultToleranceDefinitionException; -import org.glassfish.internal.api.Globals; - import fish.payara.microprofile.faulttolerance.FaultToleranceConfig; import fish.payara.microprofile.faulttolerance.FaultToleranceService; import fish.payara.microprofile.faulttolerance.policy.FaultTolerancePolicy; import fish.payara.microprofile.faulttolerance.service.Stereotypes; +import org.eclipse.microprofile.faulttolerance.exceptions.FaultToleranceDefinitionException; +import org.glassfish.internal.api.Globals; @Interceptor @FaultTolerance @@ -77,17 +76,18 @@ public class FaultToleranceInterceptor implements Stereotypes, Serializable { @Inject private Instance requestContextControllerInstance; + private FaultToleranceService faultToleranceService; + @AroundInvoke public Object intercept(InvocationContext context) throws Exception { try { - FaultToleranceService env = - Globals.getDefaultBaseServiceLocator().getService(FaultToleranceService.class); + initialize(); AtomicReference lazyConfig = new AtomicReference<>(); Supplier configSupplier = () -> // - lazyConfig.updateAndGet(value -> value != null ? value : env.getConfig(context, this)); + lazyConfig.updateAndGet(value -> value != null ? value : faultToleranceService.getConfig(context, this)); FaultTolerancePolicy policy = FaultTolerancePolicy.get(context, configSupplier); if (policy.isPresent) { - return policy.proceed(context, () -> env.getMethodContext(context, policy, getRequestContextController())); + return policy.proceed(context, () -> faultToleranceService.getMethodContext(context, policy, getRequestContextController())); } } catch (FaultToleranceDefinitionException e) { logger.log(Level.SEVERE, "Effective FT policy contains illegal values, fault tolerance cannot be applied," @@ -97,6 +97,13 @@ public Object intercept(InvocationContext context) throws Exception { return context.proceed(); } + private void initialize() { + if (this.faultToleranceService != null) { + return; + } + this.faultToleranceService = Globals.getDefaultBaseServiceLocator().getService(FaultToleranceService.class); + } + private RequestContextController getRequestContextController() { return requestContextControllerInstance.isResolvable() ? requestContextControllerInstance.get() : null; } diff --git a/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/main/java/fish/payara/microprofile/faulttolerance/policy/FaultTolerancePolicy.java b/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/main/java/fish/payara/microprofile/faulttolerance/policy/FaultTolerancePolicy.java index 599a02dff9d..a15eccebf63 100644 --- a/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/main/java/fish/payara/microprofile/faulttolerance/policy/FaultTolerancePolicy.java +++ b/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/main/java/fish/payara/microprofile/faulttolerance/policy/FaultTolerancePolicy.java @@ -107,6 +107,16 @@ public static void clean() { map -> map.entrySet().removeIf(entry -> now > entry.getValue().expiresMillis)); } + + /** + * Removes all expired policies from the cache and all policies related to this classloader + */ + public static void clean(ClassLoader appClassLoader) { + long now = System.currentTimeMillis(); + POLICY_BY_METHOD.entrySet().removeIf(entry -> entry.getKey().getClassLoader().equals(appClassLoader)); + clean(); + } + public static FaultTolerancePolicy asAnnotated(Class target, Method annotated) { return create(new StaticAnalysisContext(target, annotated), FaultToleranceConfig.asAnnotated(target, annotated)); @@ -128,6 +138,7 @@ public static FaultTolerancePolicy get(InvocationContext context, Supplier "Proceeding invocation with bulkhead semantics in "+invocation); final boolean isAsync = isAsynchronous(); final boolean exitIsOnCompletion = isAsync && bulkhead.exitIsOnCompletion; boolean directExit = false; // Whether or not we semantically leave the bulkhead when leaving this method final int runCapacity = bulkhead.value; final int queueCapacity = isAsync ? bulkhead.waitingTaskQueue : 0; AtomicInteger queuingOrRunning = invocation.context.getQueuingOrRunningPopulation(); - final int maxAttemps = 5; - for (int i = 0; i < maxAttemps; i++) { + while (true) { final int currentlyIn = queuingOrRunning.get(); if (currentlyIn >= runCapacity + queueCapacity) { invocation.metrics.incrementBulkheadCallsRejectedTotal(); + logger.log(Level.FINER, "No free work or queue space."); throw new BulkheadException("No free work or queue space."); } logger.log(Level.FINER, "Attempting to enter bulkhead."); @@ -592,11 +609,13 @@ private Object processBulkheadStage(FaultToleranceInvocation invocation) throws // ok, lets run Object res = proceed(invocation); if (!exitIsOnCompletion) { + logger.log(Level.FINER, () -> "Exiting synchronously with "+res); return res; } directExit = false; // if we make if here exit is going to happen on completion CompletionStage asyncResult = ((CompletionStage) res); asyncResult.whenComplete((value, exception) -> { + logger.log(Level.FINER, () -> "Bulkhead invocation "+invocation+ " finished " + (exception != null ? "with exception "+exception.getMessage() : "sucessfully")); invocation.metrics.addBulkheadExecutionDuration(Math.max(1, System.nanoTime() - executionSince)); // successful or not, we are out... running.remove(currentThread); @@ -619,8 +638,6 @@ private Object processBulkheadStage(FaultToleranceInvocation invocation) throws } } } - invocation.metrics.incrementBulkheadCallsRejectedTotal(); - throw new BulkheadException("No free work or queue space."); } /** diff --git a/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/main/java/fish/payara/microprofile/faulttolerance/service/FaultToleranceMethodContextImpl.java b/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/main/java/fish/payara/microprofile/faulttolerance/service/FaultToleranceMethodContextImpl.java index 241909b81b5..95b5ca9a46d 100644 --- a/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/main/java/fish/payara/microprofile/faulttolerance/service/FaultToleranceMethodContextImpl.java +++ b/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/main/java/fish/payara/microprofile/faulttolerance/service/FaultToleranceMethodContextImpl.java @@ -41,7 +41,6 @@ import static java.lang.System.currentTimeMillis; -import java.lang.ref.WeakReference; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.concurrent.ArrayBlockingQueue; @@ -71,7 +70,6 @@ import fish.payara.microprofile.faulttolerance.policy.AsynchronousPolicy; import fish.payara.microprofile.faulttolerance.policy.FaultTolerancePolicy; import fish.payara.microprofile.faulttolerance.state.CircuitBreakerState; -import fish.payara.microprofile.faulttolerance.state.CircuitBreakerState.CircuitState; import fish.payara.notification.requesttracing.RequestTraceSpan; /** @@ -92,6 +90,7 @@ public final class FaultToleranceMethodContextImpl implements FaultToleranceMeth private static final Logger logger = Logger.getLogger(FaultToleranceMethodContextImpl.class.getName()); + static final class FaultToleranceMethodState { final RequestContextController requestContext; @@ -99,7 +98,6 @@ static final class FaultToleranceMethodState { final FaultToleranceMetrics metrics; final ExecutorService asyncExecution; final ScheduledExecutorService delayedExecution; - final WeakReference target; final AtomicReference circuitBreakerState = new AtomicReference<>(); final AtomicReference> concurrentExecutions = new AtomicReference<>(); final AtomicInteger queuingOrRunningPopulation = new AtomicInteger(); @@ -107,29 +105,13 @@ static final class FaultToleranceMethodState { final AtomicLong lastUsed = new AtomicLong(currentTimeMillis()); FaultToleranceMethodState(RequestContextController requestContext, FaultToleranceRequestTracing requestTracing, - FaultToleranceMetrics metrics, ExecutorService asyncExecution, - ScheduledExecutorService delayedExecution, WeakReference target) { + FaultToleranceMetrics metrics, ExecutorService asyncExecution, + ScheduledExecutorService delayedExecution) { this.requestContext = requestContext; this.requestTracing = requestTracing; this.metrics = metrics; this.asyncExecution = asyncExecution; this.delayedExecution = delayedExecution; - this.target = target; - } - - public boolean isExpired(long ttl) { - if (target.get() == null) { - return true; // target got GC'd - this is not useful any longer - } - return executingThreadCount.get() == 0 // - && queuingOrRunningPopulation.get() == 0 // - && lastUsed.get() + ttl < currentTimeMillis() // - && isStabilyClosedCuicuit(); - } - - private boolean isStabilyClosedCuicuit() { - CircuitBreakerState state = circuitBreakerState.get(); - return state == null || state.getCircuitState() == CircuitState.CLOSED && state.isClosedOutcomeSuccessOnly(); } } @@ -140,28 +122,30 @@ private boolean isStabilyClosedCuicuit() { private final FaultToleranceMethodState shared; private final InvocationContext context; private final FaultTolerancePolicy policy; + private final String appName; + + public String getAppName() { + return appName; + } public FaultToleranceMethodContextImpl(RequestContextController requestContext, FaultToleranceRequestTracing requestTracing, FaultToleranceMetrics metrics, - ExecutorService asyncExecution, ScheduledExecutorService delayedExecution, Object target) { - this(new FaultToleranceMethodState(requestContext, requestTracing, metrics, asyncExecution, delayedExecution, - new WeakReference<>(target)), null, null); + ExecutorService asyncExecution, ScheduledExecutorService delayedExecution, String appName) { + this(new FaultToleranceMethodState(requestContext, requestTracing, metrics, asyncExecution, delayedExecution + ), appName, null, null); } - private FaultToleranceMethodContextImpl(FaultToleranceMethodState shared, InvocationContext context, + private FaultToleranceMethodContextImpl(FaultToleranceMethodState shared, String appName, InvocationContext context, FaultTolerancePolicy policy) { this.shared = shared; this.context = context; this.policy = policy; + this.appName = appName; shared.lastUsed.accumulateAndGet(currentTimeMillis(), Long::max); } - public boolean isExpired(long ttl) { - return shared.isExpired(ttl); - } - @Override public FaultToleranceMethodContext boundTo(InvocationContext context, FaultTolerancePolicy policy) { - return new FaultToleranceMethodContextImpl(shared, context, policy); + return new FaultToleranceMethodContextImpl(shared, appName, context, policy); } @Override @@ -243,7 +227,7 @@ public void runAsynchronous(AsyncFuture asyncResult, Callable task) Future futureResult = AsynchronousPolicy.toFuture(res); if (!asyncResult.isCancelled()) { // could be cancelled in the meanwhile if (!asyncResult.isDone()) { - asyncResult.complete(futureResult.get()); + asyncResult.complete(futureResult.get()); } } else { futureResult.cancel(true); @@ -297,4 +281,8 @@ public void endTrace() { shared.requestTracing.endSpan(); } + @Override + public String toString() { + return super.toString()+"[method="+context.getMethod()+", target="+ context.getTarget()+", sharedState=" + shared + "]"; + } } \ No newline at end of file diff --git a/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/main/java/fish/payara/microprofile/faulttolerance/service/FaultToleranceServiceImpl.java b/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/main/java/fish/payara/microprofile/faulttolerance/service/FaultToleranceServiceImpl.java index 81d83a76973..19efdc1f890 100644 --- a/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/main/java/fish/payara/microprofile/faulttolerance/service/FaultToleranceServiceImpl.java +++ b/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/main/java/fish/payara/microprofile/faulttolerance/service/FaultToleranceServiceImpl.java @@ -53,28 +53,24 @@ import fish.payara.notification.requesttracing.RequestTraceSpan; import fish.payara.nucleus.requesttracing.RequestTracingService; -import static java.lang.Integer.parseInt; - -import java.lang.reflect.Method; -import java.util.HashSet; -import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.PostConstruct; +import javax.enterprise.concurrent.ManagedExecutorService; +import javax.enterprise.concurrent.ManagedScheduledExecutorService; import javax.enterprise.context.control.RequestContextController; import javax.inject.Inject; import javax.interceptor.InvocationContext; +import javax.naming.InitialContext; +import javax.naming.NamingException; import org.eclipse.microprofile.metrics.MetricRegistry; import org.glassfish.api.StartupRunLevel; @@ -119,87 +115,28 @@ public class FaultToleranceServiceImpl @Inject private MetricsService metricsService; - private final ConcurrentMap> contextByAppNameAndMethodId = new ConcurrentHashMap<>(); + private final ConcurrentMap contextByMethod = new ConcurrentHashMap<>(); private final ConcurrentMap configByAppName = new ConcurrentHashMap<>(); - private ThreadPoolExecutor asyncExecutorService; + private ExecutorService asyncExecutorService; private ScheduledExecutorService delayExecutorService; @PostConstruct - public void postConstruct() { + public void postConstruct() throws NamingException { events.register(this); invocationManager = serviceLocator.getService(InvocationManager.class); requestTracingService = serviceLocator.getService(RequestTracingService.class); config = serviceLocator.getService(FaultToleranceServiceConfiguration.class); - delayExecutorService = Executors.newScheduledThreadPool(getMaxDelayPoolSize()); - asyncExecutorService = new ThreadPoolExecutor(0, getMaxAsyncPoolSize(), getAsyncPoolKeepAliveInSeconds(), - TimeUnit.SECONDS, new SynchronousQueue(true)); // a fair queue => FIFO - int interval = getCleanupIntervalInMinutes(); - delayExecutorService.scheduleAtFixedRate(this::cleanMethodContexts, interval, interval, TimeUnit.MINUTES); - if (config != null) { - if (!"concurrent/__defaultManagedExecutorService".equals(config.getManagedExecutorService())) { - logger.log(Level.WARNING, - "Fault tolerance executor service was configured to managed executor service {0}. This option has been replaced by 'async-max-pool-size' to set the maximum size of a fixed Fault Tolerance pool.", - config.getManagedExecutorService()); - } - if (!"concurrent/__defaultManagedScheduledExecutorService".equals(config.getManagedScheduledExecutorService())) { - logger.log(Level.WARNING, - "Fault tolerance scheduled executor service was configured to managed scheduled executor service {0}. This option has been replaced by 'delay-max-pool-size' to set the maximum size of a fixed Fault Tolerance pool.", - config.getManagedScheduledExecutorService()); - } - } - } - - /** - * Since {@link Map#compute(Object, java.util.function.BiFunction)} locks the key entry for - * {@link ConcurrentHashMap} it is safe to remove the entry in case - * {@link FaultToleranceMethodContextImpl#isExpired(long)} as concurrent call to - * {@link Map#computeIfAbsent(Object, java.util.function.Function)} are going to wait for the completion of - * {@link Map#compute(Object, java.util.function.BiFunction)}. - */ - private void cleanMethodContexts() { - final long ttl = TimeUnit.MINUTES.toMillis(1); - int cleaned = 0; - for (Map appEntry : contextByAppNameAndMethodId.values()) { - for (String key : new HashSet<>(appEntry.keySet())) { - try { - Object newValue = appEntry.compute(key, - (k, methodContext) -> methodContext.isExpired(ttl) ? null : methodContext); - if (newValue == null) { - cleaned++; - } - } catch (Exception e) { - logger.log(Level.WARNING, "Failed to clean FT method context for " + key, e); - } - } - } - if (cleaned > 0) { - String allClean = contextByAppNameAndMethodId.isEmpty() ? ".All clean." : "."; - logger.log(Level.INFO, "Cleaned {0} expired FT method contexts" + allClean, cleaned); - } - } - - private int getMaxDelayPoolSize() { - return config == null ? 20 : parseInt(config.getDelayMaxPoolSize()); - } - - private int getMaxAsyncPoolSize() { - return config == null ? 2000 : parseInt(config.getAsyncMaxPoolSize()); - } - - private int getAsyncPoolKeepAliveInSeconds() { - return config == null ? 60 : parseInt(config.getAsyncPoolKeepAliveInSeconds()); - } - - private int getCleanupIntervalInMinutes() { - return config == null ? 1 : parseInt(config.getCleanupIntervalInMinutes()); + InitialContext context = new InitialContext(); + asyncExecutorService = (ManagedExecutorService) context.lookup(config.getManagedExecutorService()); + delayExecutorService = (ManagedScheduledExecutorService) context.lookup(config.getManagedScheduledExecutorService()); } @Override public void event(Event event) { if (event.is(Deployment.APPLICATION_UNLOADED)) { ApplicationInfo info = (ApplicationInfo) event.hook(); - deregisterApplication(info.getName()); - FaultTolerancePolicy.clean(); + deregisterApplication(info); + FaultTolerancePolicy.clean(info.getAppClassLoader()); } else if (event.is(EventTypes.SERVER_SHUTDOWN)) { if (asyncExecutorService != null) { asyncExecutorService.shutdownNow(); @@ -213,10 +150,9 @@ public void event(Event event) { @Override @MonitoringData(ns = "ft") public void collect(MonitoringDataCollector collector) { - for (Entry> appEntry : contextByAppNameAndMethodId.entrySet()) { - String app = appEntry.getKey(); - for (Entry methodEntry : appEntry.getValue().entrySet()) { - MonitoringDataCollector methodCollector = collector.group(methodEntry.getKey()).tag("app", app); + for (Entry methodEntry : contextByMethod.entrySet()) { + MonitoringDataCollector methodCollector = collector.group(methodEntry.getKey().getMethodId()) + .tag("app", methodEntry.getValue().getAppName()); FaultToleranceMethodContext context = methodEntry.getValue(); BlockingQueue concurrentExecutions = context.getConcurrentExecutions(); if (concurrentExecutions != null) { @@ -224,7 +160,6 @@ public void collect(MonitoringDataCollector collector) { collectBulkheadSemaphores(methodCollector, concurrentExecutions, context.getQueuingOrRunningPopulation()); } collectCircuitBreakerState(methodCollector, context.getState()); - } } } @@ -256,9 +191,9 @@ public FaultToleranceConfig getConfig(InvocationContext context, Stereotypes ste key -> new BindableFaultToleranceConfig(stereotypes)).bindTo(context); } - private MetricRegistry getBaseMetricRegistry() { + private MetricsService.MetricsContext getMetricsContext() { try { - return metricsService.getContext(true).getBaseRegistry(); + return metricsService.getContext(true); } catch (Exception e) { return null; } @@ -266,17 +201,17 @@ private MetricRegistry getBaseMetricRegistry() { /** * Removes an application from the enabled map, CircuitBreaker map, and bulkhead maps - * @param applicationName The name of the application to remove + * @param appInfo The name of the application to remove */ - private void deregisterApplication(String applicationName) { - configByAppName.remove(applicationName); - contextByAppNameAndMethodId.remove(applicationName); + private void deregisterApplication(ApplicationInfo appInfo) { + configByAppName.remove(appInfo.getName()); + contextByMethod.keySet().removeIf(methodKey -> + methodKey.targetClass.getClassLoader().equals(appInfo.getAppClassLoader())); } /** * Gets the application name from the invocation manager. Failing that, it will use the module name, component name, * or method signature (in that order). - * @param invocationManager The invocation manager to get the application name from * @param context The context of the current invocation * @return The application name */ @@ -314,45 +249,20 @@ private void addGenericFaultToleranceRequestTracingDetails(RequestTraceSpan span @Override public FaultToleranceMethodContext getMethodContext(InvocationContext context, FaultTolerancePolicy policy, RequestContextController requestContextController) { - return contextByAppNameAndMethodId.computeIfAbsent(getAppName(context), appId -> new ConcurrentHashMap<>()) - .computeIfAbsent(getTargetMethodId(context), - methodId -> createMethodContext(methodId, context, requestContextController)).boundTo(context, policy); + return contextByMethod.computeIfAbsent(new MethodKey(context), + methodKey -> createMethodContext(methodKey, context, requestContextController)).boundTo(context, policy); } - private FaultToleranceMethodContextImpl createMethodContext(String methodId, InvocationContext context, - RequestContextController requestContextController) { - MetricRegistry metricRegistry = getBaseMetricRegistry(); + private FaultToleranceMethodContextImpl createMethodContext(MethodKey methodKey, InvocationContext context, + RequestContextController requestContextController) { + MetricsService.MetricsContext metricsContext = getMetricsContext(); + MetricRegistry metricRegistry = metricsContext != null ? metricsContext.getBaseRegistry() : null; + String appName = metricsContext != null ? metricsContext.getName() : ""; FaultToleranceMetrics metrics = metricRegistry == null ? FaultToleranceMetrics.DISABLED : new MethodFaultToleranceMetrics(metricRegistry, FaultToleranceUtils.getCanonicalMethodName(context)); - asyncExecutorService.setMaximumPoolSize(getMaxAsyncPoolSize()); // lazy update of max size - asyncExecutorService.setKeepAliveTime(getAsyncPoolKeepAliveInSeconds(), TimeUnit.SECONDS); - logger.log(Level.INFO, "Creating FT method context for {0}", methodId); + logger.log(Level.FINE, "Creating FT method context for {0}", methodKey); return new FaultToleranceMethodContextImpl(requestContextController, this, metrics, asyncExecutorService, - delayExecutorService, context.getTarget()); + delayExecutorService, appName); } - - /** - * It is essential that the computed signature is referring to the {@link Method} as defined by the target - * {@link Object} class not its declaring {@link Class} as this could be different when called via an abstract - * {@link Method} implemented or overridden by the target {@link Class}. - * - * Since MP FT 3.0 all instances of a class share same state object for the same method. Or in other words the FT - * context is not specific to an instance but to the annotated class and method. - */ - public static String getTargetMethodId(InvocationContext context) { - Object target = context.getTarget(); - Method method = context.getMethod(); - StringBuilder methodId = new StringBuilder(); - methodId.append(target.getClass().getName()).append('.').append(method.getName()); - if (method.getParameterCount() > 0) { - methodId.append('('); - for (Class param : method.getParameterTypes()) { - methodId.append(param.getName()).append(' '); - } - methodId.append(')'); - } - return methodId.toString(); - } - } diff --git a/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/main/java/fish/payara/microprofile/faulttolerance/service/MethodKey.java b/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/main/java/fish/payara/microprofile/faulttolerance/service/MethodKey.java new file mode 100644 index 00000000000..93211b27c9e --- /dev/null +++ b/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/main/java/fish/payara/microprofile/faulttolerance/service/MethodKey.java @@ -0,0 +1,64 @@ +package fish.payara.microprofile.faulttolerance.service; + +import javax.interceptor.InvocationContext; +import java.lang.reflect.Method; +import java.util.Objects; + +/** + * Identifier of method-related data in Fault Tolerance. + * It is essential that the computed signature is referring to the {@link Method} as defined by the target + * {@link Object} class not its declaring {@link Class} as this could be different when called via an abstract + * {@link Method} implemented or overridden by the target {@link Class}. + * + * Since MP FT 3.0 all instances of a class share same state object for the same method. Or in other words the FT + * context is not specific to an instance but to the annotated class and method. + */ +final class MethodKey { + final Class targetClass; + final Method method; + private String methodId; + + MethodKey(InvocationContext ctx) { + this.targetClass = ctx.getTarget().getClass(); + this.method = ctx.getMethod(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + MethodKey methodKey = (MethodKey) o; + return targetClass.equals(methodKey.targetClass) && method.equals(methodKey.method); + } + + @Override + public int hashCode() { + return Objects.hash(targetClass, method); + } + + String getMethodId() { + if (methodId != null) { + return methodId; + } + StringBuilder idBuilder = new StringBuilder(); + idBuilder.append(targetClass.getName()).append('.').append(method.getName()); + if (method.getParameterCount() > 0) { + idBuilder.append('('); + for (Class param : method.getParameterTypes()) { + idBuilder.append(param.getName()).append(' '); + } + idBuilder.append(')'); + } + methodId = idBuilder.toString(); + return methodId; + } + + @Override + public String toString() { + return getMethodId(); + } +} diff --git a/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/test/java/fish/payara/microprofile/faulttolerance/policy/AbstractBulkheadTest.java b/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/test/java/fish/payara/microprofile/faulttolerance/policy/AbstractBulkheadTest.java index 2f2070047c7..e44bbd0ab2f 100644 --- a/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/test/java/fish/payara/microprofile/faulttolerance/policy/AbstractBulkheadTest.java +++ b/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/test/java/fish/payara/microprofile/faulttolerance/policy/AbstractBulkheadTest.java @@ -47,6 +47,8 @@ import fish.payara.microprofile.faulttolerance.service.FaultToleranceMethodContextStub; import fish.payara.microprofile.faulttolerance.service.FaultToleranceServiceStub; +import java.util.function.BiFunction; + /** * Base class for tests focusing on the behaviour of methods annotated with the {@link Bulkhead} annotation. * @@ -59,10 +61,8 @@ protected FaultToleranceServiceStub createService() { return new FaultToleranceServiceStub() { @Override - protected FaultToleranceMethodContext createMethodContext(String methodId, InvocationContext context, - FaultTolerancePolicy policy) { - return new FaultToleranceMethodContextStub(context, policy, state, concurrentExecutions, waitingQueuePopulation, - (c, p) -> createMethodContext(methodId, c, p)) { + protected FaultToleranceMethodContext stubMethodContext(StubContext ctx) { + return new FaultToleranceMethodContextStub(ctx, state, concurrentExecutions, waitingQueuePopulation) { @Override public void delay(long delayMillis) throws InterruptedException { @@ -70,7 +70,6 @@ public void delay(long delayMillis) throws InterruptedException { } }; } - }; } diff --git a/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/test/java/fish/payara/microprofile/faulttolerance/policy/AbstractMetricTest.java b/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/test/java/fish/payara/microprofile/faulttolerance/policy/AbstractMetricTest.java index bcf4bc913ab..8c9c200b26b 100644 --- a/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/test/java/fish/payara/microprofile/faulttolerance/policy/AbstractMetricTest.java +++ b/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/test/java/fish/payara/microprofile/faulttolerance/policy/AbstractMetricTest.java @@ -42,8 +42,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; -import javax.interceptor.InvocationContext; - import org.eclipse.microprofile.metrics.MetricRegistry; import org.eclipse.microprofile.metrics.MetricRegistry.Type; @@ -69,11 +67,9 @@ protected FaultToleranceServiceStub createService() { registry = new MetricRegistryImpl(Type.BASE); return new FaultToleranceServiceStub() { @Override - protected FaultToleranceMethodContext createMethodContext(String methodId, InvocationContext context, - FaultTolerancePolicy policy) { - FaultToleranceMetrics metrics = new MethodFaultToleranceMetrics(registry, FaultToleranceUtils.getCanonicalMethodName(context)); - return new FaultToleranceMethodContextStub(context, policy, state, concurrentExecutions, waitingQueuePopulation, - (c, p) -> createMethodContext(methodId, c, p)) { + protected FaultToleranceMethodContext stubMethodContext(StubContext ctx) { + FaultToleranceMetrics metrics = new MethodFaultToleranceMetrics(registry, FaultToleranceUtils.getCanonicalMethodName(ctx.context)); + return new FaultToleranceMethodContextStub(ctx, state, concurrentExecutions, waitingQueuePopulation) { @Override public FaultToleranceMetrics getMetrics() { diff --git a/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/test/java/fish/payara/microprofile/faulttolerance/policy/BulkheadLifecycleTckTest.java b/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/test/java/fish/payara/microprofile/faulttolerance/policy/BulkheadLifecycleTckTest.java index 6e804c999f9..de9b8144f0c 100644 --- a/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/test/java/fish/payara/microprofile/faulttolerance/policy/BulkheadLifecycleTckTest.java +++ b/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/test/java/fish/payara/microprofile/faulttolerance/policy/BulkheadLifecycleTckTest.java @@ -1,8 +1,5 @@ package fish.payara.microprofile.faulttolerance.policy; -import static org.awaitility.Awaitility.await; -import static org.junit.Assert.assertEquals; - import java.lang.reflect.Method; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -14,13 +11,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import javax.interceptor.InvocationContext; - -import org.eclipse.microprofile.faulttolerance.Bulkhead; -import org.eclipse.microprofile.metrics.MetricRegistry; -import org.eclipse.microprofile.metrics.MetricRegistry.Type; -import org.junit.Test; - import fish.payara.microprofile.faulttolerance.FaultToleranceMethodContext; import fish.payara.microprofile.faulttolerance.FaultToleranceMetrics; import fish.payara.microprofile.faulttolerance.service.FaultToleranceMethodContextStub; @@ -28,6 +18,13 @@ import fish.payara.microprofile.faulttolerance.service.FaultToleranceUtils; import fish.payara.microprofile.faulttolerance.service.MethodFaultToleranceMetrics; import fish.payara.microprofile.metrics.impl.MetricRegistryImpl; +import org.eclipse.microprofile.faulttolerance.Bulkhead; +import org.eclipse.microprofile.metrics.MetricRegistry; +import org.eclipse.microprofile.metrics.MetricRegistry.Type; +import org.junit.Test; + +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertEquals; /** * Based on MP FT TCK Test {@code org.eclipse.microprofile.fault.tolerance.tck.bulkhead.lifecycle.BulkheadLifecycleTest}. @@ -36,24 +33,29 @@ */ public class BulkheadLifecycleTckTest extends AbstractRecordingTest { + static final AtomicInteger barrier = new AtomicInteger(); MetricRegistry registry; + static void inService(CompletableFuture waiter) throws InterruptedException, ExecutionException { + barrier.incrementAndGet(); + waiter.get(); + } + @Override protected FaultToleranceServiceStub createService() { // this test needs to use more advanced state per method as multiple methods are involved // therefore the below special setup where we have state per method as in the actual implementation - final Map>> concurrentExecutionByMethodId = new ConcurrentHashMap<>(); - final Map waitingQueuePopulationByMethodId = new ConcurrentHashMap<>(); + final Map>> concurrentExecutionByMethodId = new ConcurrentHashMap<>(); + final Map waitingQueuePopulationByMethodId = new ConcurrentHashMap<>(); + registry = new MetricRegistryImpl(Type.BASE); return new FaultToleranceServiceStub() { @Override - protected FaultToleranceMethodContext createMethodContext(String methodId, InvocationContext context, - FaultTolerancePolicy policy) { - FaultToleranceMetrics metrics = new MethodFaultToleranceMetrics(registry, FaultToleranceUtils.getCanonicalMethodName(context)); - return new FaultToleranceMethodContextStub(context, policy, state, - concurrentExecutionByMethodId.computeIfAbsent(methodId, key -> new AtomicReference<>()), - waitingQueuePopulationByMethodId.computeIfAbsent(methodId, key -> new AtomicInteger()), - (c, p) -> createMethodContext(methodId, c, p)) { + protected FaultToleranceMethodContext stubMethodContext(StubContext ctx) { + FaultToleranceMetrics metrics = new MethodFaultToleranceMetrics(registry, FaultToleranceUtils.getCanonicalMethodName(ctx.context)); + return new FaultToleranceMethodContextStub(ctx, state, + concurrentExecutionByMethodId.computeIfAbsent(ctx.key, key -> new AtomicReference<>()), + waitingQueuePopulationByMethodId.computeIfAbsent(ctx.key, key -> new AtomicInteger())) { @Override public FaultToleranceMetrics getMetrics() { @@ -69,8 +71,6 @@ public Future runDelayed(long delayMillis, Runnable task) throws Exception { }; } - static final AtomicInteger barrier = new AtomicInteger(); - /** * Scenario is equivalent to the TCK test of same name but not 100% identical */ @@ -117,9 +117,4 @@ public void service(CompletableFuture waiter) throws Exception { inService(waiter); } } - - static void inService(CompletableFuture waiter) throws InterruptedException, ExecutionException { - barrier.incrementAndGet(); - waiter.get(); - } } diff --git a/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/test/java/fish/payara/microprofile/faulttolerance/policy/CircuitBreakerBasicTest.java b/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/test/java/fish/payara/microprofile/faulttolerance/policy/CircuitBreakerBasicTest.java index 4c5195613a6..f15ca3c56f9 100644 --- a/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/test/java/fish/payara/microprofile/faulttolerance/policy/CircuitBreakerBasicTest.java +++ b/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/test/java/fish/payara/microprofile/faulttolerance/policy/CircuitBreakerBasicTest.java @@ -75,10 +75,8 @@ public class CircuitBreakerBasicTest { private final FaultToleranceServiceStub service = new FaultToleranceServiceStub() { @Override - protected FaultToleranceMethodContext createMethodContext(String methodId, InvocationContext context, - FaultTolerancePolicy policy) { - return new FaultToleranceMethodContextStub(context, policy, state, concurrentExecutions, waitingQueuePopulation, - (c, p) -> createMethodContext(methodId, c, p)) { + protected FaultToleranceMethodContext stubMethodContext(StubContext ctx) { + return new FaultToleranceMethodContextStub(ctx, state, concurrentExecutions, waitingQueuePopulation) { @Override public Future runDelayed(long delayMillis, Runnable task) throws Exception { diff --git a/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/test/java/fish/payara/microprofile/faulttolerance/policy/FaultToleranceStressTest.java b/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/test/java/fish/payara/microprofile/faulttolerance/policy/FaultToleranceStressTest.java index 7e5bf03bfcc..59dca50321c 100644 --- a/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/test/java/fish/payara/microprofile/faulttolerance/policy/FaultToleranceStressTest.java +++ b/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/test/java/fish/payara/microprofile/faulttolerance/policy/FaultToleranceStressTest.java @@ -66,8 +66,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import javax.interceptor.InvocationContext; - import org.eclipse.microprofile.faulttolerance.Asynchronous; import org.eclipse.microprofile.faulttolerance.Bulkhead; import org.eclipse.microprofile.faulttolerance.CircuitBreaker; @@ -119,12 +117,9 @@ public class FaultToleranceStressTest implements FallbackHandler> final ExecutorService executorService = Executors.newWorkStealingPool(NUMBER_OF_CALLERS / 2); final FaultToleranceServiceStub service = new FaultToleranceServiceStub() { - @Override - protected FaultToleranceMethodContext createMethodContext(String methodId, InvocationContext context, - FaultTolerancePolicy policy) { - return new FaultToleranceMethodContextStub(context, policy, state, concurrentExecutions, waitingQueuePopulation, - (c, p) -> createMethodContext(methodId, c, p)) { + protected FaultToleranceMethodContext stubMethodContext(StubContext ctx) { + return new FaultToleranceMethodContextStub(ctx, state, concurrentExecutions, waitingQueuePopulation) { @Override public CircuitBreakerState getState() { circuitStateAccessCount.incrementAndGet(); diff --git a/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/test/java/fish/payara/microprofile/faulttolerance/service/FaultToleranceMethodContextStub.java b/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/test/java/fish/payara/microprofile/faulttolerance/service/FaultToleranceMethodContextStub.java index 7d6706974bb..f69ca0dcd90 100644 --- a/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/test/java/fish/payara/microprofile/faulttolerance/service/FaultToleranceMethodContextStub.java +++ b/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/test/java/fish/payara/microprofile/faulttolerance/service/FaultToleranceMethodContextStub.java @@ -71,17 +71,16 @@ public class FaultToleranceMethodContextStub implements FaultToleranceMethodCont private final AtomicInteger queuingOrRunningPopulation; private final BiFunction binder; - public FaultToleranceMethodContextStub(InvocationContext context, FaultTolerancePolicy policy, - AtomicReference state, - AtomicReference> concurrentExecutions, - AtomicInteger queuingOrRunningPopulation, - BiFunction binder) { - this.context = context; - this.policy = policy; + public FaultToleranceMethodContextStub(FaultToleranceServiceStub.StubContext ctx, + AtomicReference state, + AtomicReference> concurrentExecutions, + AtomicInteger queuingOrRunningPopulation) { + this.context = ctx.context; + this.policy = ctx.policy; this.state = state; this.concurrentExecutions = concurrentExecutions; this.queuingOrRunningPopulation = queuingOrRunningPopulation; - this.binder = binder; + this.binder = ctx.binder; } @Override diff --git a/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/test/java/fish/payara/microprofile/faulttolerance/service/FaultToleranceServiceStub.java b/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/test/java/fish/payara/microprofile/faulttolerance/service/FaultToleranceServiceStub.java index 89e76503948..43bfe74c7d5 100644 --- a/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/test/java/fish/payara/microprofile/faulttolerance/service/FaultToleranceServiceStub.java +++ b/appserver/payara-appserver-modules/microprofile/fault-tolerance/src/test/java/fish/payara/microprofile/faulttolerance/service/FaultToleranceServiceStub.java @@ -39,8 +39,6 @@ */ package fish.payara.microprofile.faulttolerance.service; -import static fish.payara.microprofile.faulttolerance.service.FaultToleranceServiceImpl.getTargetMethodId; - import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; @@ -48,6 +46,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; import javax.enterprise.context.control.RequestContextController; import javax.interceptor.InvocationContext; @@ -56,7 +55,6 @@ import fish.payara.microprofile.faulttolerance.FaultToleranceMethodContext; import fish.payara.microprofile.faulttolerance.FaultToleranceService; import fish.payara.microprofile.faulttolerance.policy.FaultTolerancePolicy; -import fish.payara.microprofile.faulttolerance.service.Stereotypes; import fish.payara.microprofile.faulttolerance.state.CircuitBreakerState; /** @@ -72,12 +70,29 @@ */ public class FaultToleranceServiceStub implements FaultToleranceService { - private final ConcurrentMap contextByMethodId = new ConcurrentHashMap<>(); + private final ConcurrentMap contextByMethodId = new ConcurrentHashMap<>(); protected final AtomicReference state = new AtomicReference<>(); protected final AtomicReference> concurrentExecutions = new AtomicReference<>(); protected final AtomicInteger waitingQueuePopulation = new AtomicInteger(); + protected class StubContext { + public final InvocationContext context; + public final FaultTolerancePolicy policy; + private final MethodKey methodKey; + public final Object key; + public BiFunction binder; + + protected StubContext(MethodKey key, InvocationContext context, FaultTolerancePolicy policy) { + this.methodKey = key; + this.key = key; + this.context = context; + this.policy = policy; + this.binder = (i, p) -> createMethodContext(methodKey, i, p); + } + + } + @Override public FaultToleranceConfig getConfig(InvocationContext context, Stereotypes stereotypes) { return FaultToleranceConfig.asAnnotated(context.getTarget().getClass(), context.getMethod()); @@ -91,17 +106,18 @@ public final FaultToleranceMethodContext getMethodContext(InvocationContext cont @Override public final FaultToleranceMethodContext getMethodContext(InvocationContext context, FaultTolerancePolicy policy, RequestContextController requestContextController) { - return contextByMethodId.computeIfAbsent(getTargetMethodId(context), - methodId -> { - return createMethodContext(methodId, context, policy); - }).boundTo(context, policy); + return contextByMethodId.computeIfAbsent(new MethodKey(context), + key -> createMethodContext(key, context, policy)).boundTo(context, policy); } @SuppressWarnings("unused") - protected FaultToleranceMethodContext createMethodContext(String methodId, InvocationContext context, - FaultTolerancePolicy policy) { - return new FaultToleranceMethodContextStub(context, policy, state, concurrentExecutions, waitingQueuePopulation, - (c, p) -> createMethodContext(methodId, c, p)); + protected final FaultToleranceMethodContext createMethodContext(MethodKey key, InvocationContext context, + FaultTolerancePolicy policy) { + return stubMethodContext(new StubContext(key, context, policy)); + } + + protected FaultToleranceMethodContext stubMethodContext(StubContext ctx) { + return new FaultToleranceMethodContextStub(ctx, state, concurrentExecutions, waitingQueuePopulation); } public AtomicReference getStateReference() {