diff --git a/.github/workflows/actions_build.yml b/.github/workflows/actions_build.yml index e37c8e9a3db..5df8b5633a0 100644 --- a/.github/workflows/actions_build.yml +++ b/.github/workflows/actions_build.yml @@ -45,6 +45,8 @@ jobs: - java: 19 on: self-hosted snapshot: true + # blockhound makes the build run about 10 minutes slower + blockhound: true steps: - uses: actions/checkout@v2 @@ -86,6 +88,7 @@ jobs: ${{ (matrix.on == 'self-hosted') && '--max-workers=8' || '--max-workers=2' }} --parallel \ ${{ matrix.coverage && '-Pcoverage' || '' }} \ ${{ matrix.leak && '-Pleak' || '' }} \ + ${{ matrix.blockhound && '-Pblockhound' || '' }} \ -PnoLint \ -PflakyTests=false \ -PbuildJdkVersion=${{ env.BUILD_JDK_VERSION }} \ @@ -133,17 +136,23 @@ jobs: if: ${{ matrix.coverage }} uses: codecov/codecov-action@v1 + - name: Fail the run if any threads were blocked + if: ${{ matrix.blockhound }} + run: "if [[ -z `find . -name 'blockhound.log' -size +0` ]]; then exit 0; else exit 1; fi" + shell: bash + - name: Collect the test reports if: failure() - run: find . '(' -name 'java_pid*.hprof' -or -name 'hs_err_*.log' -or -path '*/build/reports/tests' -or -path '*/build/test-results' -or -path '*/javadoc.options' ')' -exec tar rf "reports-JVM-${{ matrix.java }}.tar" {} ';' + run: | + find . '(' -name 'java_pid*.hprof' -or -name 'hs_err_*.log' -or -path '*/build/reports/tests' -or -path '*/build/test-results' -or -path '*/javadoc.options' -or -name 'blockhound.log' ')' -exec tar rf "reports-JVM-${{ matrix.on }}-${{ matrix.java }}.tar" {} ';' shell: bash - name: Upload the artifacts if: failure() uses: actions/upload-artifact@v2 with: - name: reports-JVM-${{ matrix.java }} - path: reports-JVM-${{ matrix.java }}.tar + name: reports-JVM-${{ matrix.on }}-${{ matrix.java }} + path: reports-JVM-${{ matrix.on }}-${{ matrix.java }}.tar retention-days: 3 lint: diff --git a/brave/src/main/java/com/linecorp/armeria/common/brave/BraveBlockHoundIntegration.java b/brave/src/main/java/com/linecorp/armeria/common/brave/BraveBlockHoundIntegration.java new file mode 100644 index 00000000000..fbd586d73ae --- /dev/null +++ b/brave/src/main/java/com/linecorp/armeria/common/brave/BraveBlockHoundIntegration.java @@ -0,0 +1,34 @@ +/* + * Copyright 2022 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.common.brave; + +import com.linecorp.armeria.common.annotation.UnstableApi; + +import reactor.blockhound.BlockHound.Builder; +import reactor.blockhound.integration.BlockHoundIntegration; + +/** + * A {@link BlockHoundIntegration} for the brave module. + */ +@UnstableApi +public final class BraveBlockHoundIntegration implements BlockHoundIntegration { + + @Override + public void applyTo(Builder builder) { + builder.allowBlockingCallsInside("zipkin2.reporter.AsyncReporter$BoundedAsyncReporter", "report"); + } +} diff --git a/brave/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration b/brave/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration new file mode 100644 index 00000000000..5cd2f11865f --- /dev/null +++ b/brave/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration @@ -0,0 +1 @@ +com.linecorp.armeria.common.brave.BraveBlockHoundIntegration diff --git a/brave/src/test/java/com/linecorp/armeria/it/brave/BraveIntegrationTest.java b/brave/src/test/java/com/linecorp/armeria/it/brave/BraveIntegrationTest.java index 4b20920f658..d9183a61867 100644 --- a/brave/src/test/java/com/linecorp/armeria/it/brave/BraveIntegrationTest.java +++ b/brave/src/test/java/com/linecorp/armeria/it/brave/BraveIntegrationTest.java @@ -65,8 +65,8 @@ import com.linecorp.armeria.common.brave.HelloService.AsyncIface; import com.linecorp.armeria.common.brave.RequestContextCurrentTraceContext; import com.linecorp.armeria.common.thrift.ThriftFuture; -import com.linecorp.armeria.common.util.Exceptions; import com.linecorp.armeria.common.util.ThreadFactories; +import com.linecorp.armeria.internal.testing.BlockingUtils; import com.linecorp.armeria.server.AbstractHttpService; import com.linecorp.armeria.server.HttpService; import com.linecorp.armeria.server.ServerBuilder; @@ -569,17 +569,13 @@ private static class SpanHandlerImpl extends SpanHandler { @Override public boolean end(TraceContext context, MutableSpan span, Cause cause) { - return spans.add(span); + return BlockingUtils.blockingRun(() -> spans.add(span)); } MutableSpan[] take(int numSpans) { final List taken = new ArrayList<>(); while (taken.size() < numSpans) { - try { - taken.add(spans.poll(30, TimeUnit.SECONDS)); - } catch (InterruptedException e) { - return Exceptions.throwUnsafely(e); - } + BlockingUtils.blockingRun(() -> taken.add(spans.poll(30, TimeUnit.SECONDS))); } // Reverse the collected spans to sort the spans by request time. diff --git a/build.gradle b/build.gradle index 3dc92ece434..d4cd5ca61ba 100644 --- a/build.gradle +++ b/build.gradle @@ -78,10 +78,17 @@ allprojects { systemProperty "java.security.manager", "allow" } + // required by blockhound for jvm 13+. See https://github.com/reactor/BlockHound/issues/33. + if (rootProject.hasProperty('blockhound') && project.ext.testJavaVersion >= 13) { + jvmArgs "-XX:+AllowRedefinitionToAddDeleteMethods" + } + // Use verbose exception/response reporting for easier debugging. systemProperty 'com.linecorp.armeria.verboseExceptions', 'true' systemProperty 'com.linecorp.armeria.verboseResponses', 'true' + systemProperty 'com.linecorp.armeria.blockhound.reportFile', "${project.buildDir}/blockhound.log" + // Pass special system property to tell our tests that we are measuring coverage. if (project.hasFlags('coverage')) { systemProperty 'com.linecorp.armeria.testing.coverage', 'true' @@ -165,6 +172,9 @@ configure(projectsWithFlags('java')) { // Reflections implementation libs.reflections + // Blockhound + optionalImplementation libs.blockhound + // Test-time dependencies testImplementation libs.guava.testlib testImplementation libs.junit4 @@ -183,6 +193,11 @@ configure(projectsWithFlags('java')) { testImplementation libs.apache.httpclient5 testImplementation libs.hamcrest testImplementation libs.hamcrest.library + testRuntimeOnly libs.kotlin.coroutines.debug + + if (rootProject.hasProperty('blockhound')) { + testRuntimeOnly libs.blockhound.junit.platform + } } // Configure the default DuplicatesStrategy for such as: diff --git a/core/src/main/java/com/linecorp/armeria/client/AbstractEventLoopState.java b/core/src/main/java/com/linecorp/armeria/client/AbstractEventLoopState.java index 4799a2fd96a..ac0b3c5e75a 100644 --- a/core/src/main/java/com/linecorp/armeria/client/AbstractEventLoopState.java +++ b/core/src/main/java/com/linecorp/armeria/client/AbstractEventLoopState.java @@ -21,6 +21,8 @@ import com.google.common.annotations.VisibleForTesting; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; + import io.netty.channel.EventLoop; abstract class AbstractEventLoopState { @@ -35,7 +37,7 @@ static AbstractEventLoopState of(List eventLoops, int maxNumEventLoop return new HeapBasedEventLoopState(eventLoops, maxNumEventLoops, scheduler); } - private final ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantShortLock(); private final List eventLoops; private final DefaultEventLoopScheduler scheduler; diff --git a/core/src/main/java/com/linecorp/armeria/client/DefaultEventLoopScheduler.java b/core/src/main/java/com/linecorp/armeria/client/DefaultEventLoopScheduler.java index 913f2cedc8f..97c2340ee2d 100644 --- a/core/src/main/java/com/linecorp/armeria/client/DefaultEventLoopScheduler.java +++ b/core/src/main/java/com/linecorp/armeria/client/DefaultEventLoopScheduler.java @@ -41,6 +41,7 @@ import com.linecorp.armeria.common.SessionProtocol; import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.util.ReleasableHolder; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; @@ -58,7 +59,7 @@ final class DefaultEventLoopScheduler implements EventLoopScheduler { static final int DEFAULT_MAX_NUM_EVENT_LOOPS = 1; - private final ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantShortLock(); private final List eventLoops; diff --git a/core/src/main/java/com/linecorp/armeria/client/cookie/DefaultCookieJar.java b/core/src/main/java/com/linecorp/armeria/client/cookie/DefaultCookieJar.java index d907966c022..81880e0dc9a 100644 --- a/core/src/main/java/com/linecorp/armeria/client/cookie/DefaultCookieJar.java +++ b/core/src/main/java/com/linecorp/armeria/client/cookie/DefaultCookieJar.java @@ -34,6 +34,7 @@ import com.linecorp.armeria.common.Cookies; import com.linecorp.armeria.common.Scheme; import com.linecorp.armeria.common.SessionProtocol; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; import io.netty.util.NetUtil; import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; @@ -60,7 +61,7 @@ final class DefaultCookieJar implements CookieJar { this.cookiePolicy = cookiePolicy; store = new Object2LongOpenHashMap<>(); filter = new HashMap<>(); - lock = new ReentrantLock(); + lock = new ReentrantShortLock(); } @Override diff --git a/core/src/main/java/com/linecorp/armeria/client/endpoint/DynamicEndpointGroup.java b/core/src/main/java/com/linecorp/armeria/client/endpoint/DynamicEndpointGroup.java index 07ddf6e3399..91765b810f8 100644 --- a/core/src/main/java/com/linecorp/armeria/client/endpoint/DynamicEndpointGroup.java +++ b/core/src/main/java/com/linecorp/armeria/client/endpoint/DynamicEndpointGroup.java @@ -30,7 +30,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects.ToStringHelper; @@ -45,6 +44,7 @@ import com.linecorp.armeria.common.util.AsyncCloseableSupport; import com.linecorp.armeria.common.util.EventLoopCheckingFuture; import com.linecorp.armeria.common.util.ListenableAsyncCloseable; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; /** * A dynamic {@link EndpointGroup}. The list of {@link Endpoint}s can be updated dynamically. @@ -66,7 +66,7 @@ public static DynamicEndpointGroupBuilder builder() { private final EndpointSelectionStrategy selectionStrategy; private final AtomicReference selector = new AtomicReference<>(); private volatile List endpoints = UNINITIALIZED_ENDPOINTS; - private final Lock endpointsLock = new ReentrantLock(); + private final Lock endpointsLock = new ReentrantShortLock(); private final CompletableFuture> initialEndpointsFuture = new InitialEndpointsFuture(); private final AsyncCloseableSupport closeable = AsyncCloseableSupport.of(this::closeAsync); diff --git a/core/src/main/java/com/linecorp/armeria/client/endpoint/FileWatcherRegistry.java b/core/src/main/java/com/linecorp/armeria/client/endpoint/FileWatcherRegistry.java index 541bb5c0db7..525e4c4e59b 100644 --- a/core/src/main/java/com/linecorp/armeria/client/endpoint/FileWatcherRegistry.java +++ b/core/src/main/java/com/linecorp/armeria/client/endpoint/FileWatcherRegistry.java @@ -38,6 +38,7 @@ import com.google.common.base.MoreObjects; import com.linecorp.armeria.client.endpoint.FileWatcherRunnable.FileWatchEvent; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; /** * A registry which wraps a {@link WatchService} and allows paths to be registered. @@ -133,7 +134,7 @@ void close() throws IOException { private final Map fileSystemWatchServiceMap = new HashMap<>(); - private final ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantShortLock(); /** * Registers a {@code filePath} and {@code callback} to the {@link WatchService}. When the diff --git a/core/src/main/java/com/linecorp/armeria/client/endpoint/RestartableThread.java b/core/src/main/java/com/linecorp/armeria/client/endpoint/RestartableThread.java index 62987faecde..8c3ba52104a 100644 --- a/core/src/main/java/com/linecorp/armeria/client/endpoint/RestartableThread.java +++ b/core/src/main/java/com/linecorp/armeria/client/endpoint/RestartableThread.java @@ -22,13 +22,14 @@ import java.util.function.Supplier; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; /** * A restartable thread utility class. */ final class RestartableThread { - private final ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantShortLock(); @Nullable private Thread thread; diff --git a/core/src/main/java/com/linecorp/armeria/client/endpoint/WeightedRandomDistributionEndpointSelector.java b/core/src/main/java/com/linecorp/armeria/client/endpoint/WeightedRandomDistributionEndpointSelector.java index 26b304392e0..f5313d4ed95 100644 --- a/core/src/main/java/com/linecorp/armeria/client/endpoint/WeightedRandomDistributionEndpointSelector.java +++ b/core/src/main/java/com/linecorp/armeria/client/endpoint/WeightedRandomDistributionEndpointSelector.java @@ -27,6 +27,7 @@ import com.linecorp.armeria.client.Endpoint; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; /** * This selector selects an {@link Endpoint} using random and the weight of the {@link Endpoint}. If there are @@ -37,7 +38,7 @@ */ final class WeightedRandomDistributionEndpointSelector { - private final ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantShortLock(); private final List allEntries; @GuardedBy("lock") private final List currentEntries; diff --git a/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/DefaultHealthCheckerContext.java b/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/DefaultHealthCheckerContext.java index c2351782d49..25e07ca8f2c 100644 --- a/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/DefaultHealthCheckerContext.java +++ b/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/DefaultHealthCheckerContext.java @@ -43,6 +43,7 @@ import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.util.AsyncCloseable; import com.linecorp.armeria.common.util.EventLoopCheckingFuture; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.Future; @@ -54,7 +55,7 @@ final class DefaultHealthCheckerContext private final Endpoint endpoint; private final SessionProtocol protocol; private final ClientOptions clientOptions; - private final ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantShortLock(); /** * Keeps the {@link Future}s which were scheduled via this {@link ScheduledExecutorService}. diff --git a/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroup.java b/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroup.java index 2d1493423fb..7bba57f55d5 100644 --- a/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroup.java +++ b/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroup.java @@ -49,6 +49,7 @@ import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.metric.MeterIdPrefix; import com.linecorp.armeria.common.util.AsyncCloseable; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; import io.micrometer.core.instrument.binder.MeterBinder; @@ -110,7 +111,7 @@ public static HealthCheckedEndpointGroupBuilder builder(EndpointGroup delegate, @VisibleForTesting final HealthCheckStrategy healthCheckStrategy; - private final ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantShortLock(); @GuardedBy("lock") private final Deque contextGroupChain = new ArrayDeque<>(4); diff --git a/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HttpHealthChecker.java b/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HttpHealthChecker.java index b374b3734c7..f46f24608b2 100644 --- a/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HttpHealthChecker.java +++ b/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HttpHealthChecker.java @@ -49,6 +49,7 @@ import com.linecorp.armeria.common.util.AsyncCloseable; import com.linecorp.armeria.common.util.AsyncCloseableSupport; import com.linecorp.armeria.common.util.TimeoutMode; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; import com.linecorp.armeria.unsafe.PooledObjects; import io.netty.util.AsciiString; @@ -60,7 +61,7 @@ final class HttpHealthChecker implements AsyncCloseable { private static final AsciiString ARMERIA_LPHC = HttpHeaderNames.of("armeria-lphc"); - private final ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantShortLock(); private final HealthCheckerContext ctx; private final WebClient webClient; private final String authority; diff --git a/core/src/main/java/com/linecorp/armeria/common/CoreBlockHoundIntegration.java b/core/src/main/java/com/linecorp/armeria/common/CoreBlockHoundIntegration.java new file mode 100644 index 00000000000..3faf112ebac --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/common/CoreBlockHoundIntegration.java @@ -0,0 +1,69 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.common; + +import java.util.ResourceBundle; + +import com.linecorp.armeria.common.annotation.UnstableApi; + +import reactor.blockhound.BlockHound.Builder; +import reactor.blockhound.integration.BlockHoundIntegration; + +/** + * A {@link BlockHoundIntegration} for the core module. + */ +@UnstableApi +public final class CoreBlockHoundIntegration implements BlockHoundIntegration { + @Override + public void applyTo(Builder builder) { + // short locks + builder.allowBlockingCallsInside("com.linecorp.armeria.client.HttpClientFactory", + "pool"); + builder.allowBlockingCallsInside("com.linecorp.armeria.internal.common.util.ReentrantShortLock", + "lock"); + + // Thread.yield can be eventually called when PooledObjects.copyAndClose is called + builder.allowBlockingCallsInside("io.netty.util.internal.ReferenceCountUpdater", "release"); + + // hdr histogram holds locks + builder.allowBlockingCallsInside("org.HdrHistogram.ConcurrentHistogram", "getCountAtIndex"); + builder.allowBlockingCallsInside("org.HdrHistogram.WriterReaderPhaser", "flipPhase"); + + // StreamMessageInputStream internally uses a blocking queue + // ThreadPoolExecutor.execute internally uses a blocking queue + builder.allowBlockingCallsInside("java.util.concurrent.LinkedBlockingQueue", "offer"); + + // a single blocking call is incurred for the first invocation, but the result is cached. + builder.allowBlockingCallsInside("com.linecorp.armeria.internal.client.PublicSuffix", + "get"); + builder.allowBlockingCallsInside("java.util.ServiceLoader$LazyClassPathLookupIterator", + "parse"); + builder.allowBlockingCallsInside(ResourceBundle.class.getName(), "getBundle"); + builder.allowBlockingCallsInside("io.netty.handler.codec.compression.Brotli", ""); + + // a lock is held temporarily when adding workers + builder.allowBlockingCallsInside("java.util.concurrent.ThreadPoolExecutor", "addWorker"); + + // prometheus exporting holds a lock temporarily + builder.allowBlockingCallsInside( + "com.linecorp.armeria.server.metric.PrometheusExpositionService", "doGet"); + + // Thread.yield can be called + builder.allowBlockingCallsInside( + "java.util.concurrent.FutureTask", "handlePossibleCancellationInterrupt"); + } +} diff --git a/core/src/main/java/com/linecorp/armeria/common/util/AbstractListenable.java b/core/src/main/java/com/linecorp/armeria/common/util/AbstractListenable.java index fe52d45fdf9..636be2063ca 100644 --- a/core/src/main/java/com/linecorp/armeria/common/util/AbstractListenable.java +++ b/core/src/main/java/com/linecorp/armeria/common/util/AbstractListenable.java @@ -25,6 +25,7 @@ import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.internal.common.util.IdentityHashStrategy; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; import it.unimi.dsi.fastutil.objects.ObjectLinkedOpenCustomHashSet; @@ -40,7 +41,7 @@ public abstract class AbstractListenable implements Listenable { private final Set> updateListeners = new ObjectLinkedOpenCustomHashSet<>(IdentityHashStrategy.of()); - private final ReentrantLock reentrantLock = new ReentrantLock(); + private final ReentrantLock reentrantLock = new ReentrantShortLock(); /** * Notify the new value changes to the listeners added via {@link #addListener(Consumer)}. diff --git a/core/src/main/java/com/linecorp/armeria/common/util/AbstractOption.java b/core/src/main/java/com/linecorp/armeria/common/util/AbstractOption.java index 0f59e3df7a3..9111b600f83 100644 --- a/core/src/main/java/com/linecorp/armeria/common/util/AbstractOption.java +++ b/core/src/main/java/com/linecorp/armeria/common/util/AbstractOption.java @@ -31,6 +31,7 @@ import com.google.common.collect.ImmutableSet; import com.linecorp.armeria.client.ClientOption; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; /** * A configuration option. @@ -236,7 +237,7 @@ private static final class Pool { private final Class type; private final BiMap> options; - private final ReentrantLock reentrantLock = new ReentrantLock(); + private final ReentrantLock reentrantLock = new ReentrantShortLock(); Pool(Class type) { this.type = type; diff --git a/core/src/main/java/com/linecorp/armeria/common/util/CompositeException.java b/core/src/main/java/com/linecorp/armeria/common/util/CompositeException.java index b72e164f6f8..f030df86d8f 100644 --- a/core/src/main/java/com/linecorp/armeria/common/util/CompositeException.java +++ b/core/src/main/java/com/linecorp/armeria/common/util/CompositeException.java @@ -47,6 +47,7 @@ import com.linecorp.armeria.common.Flags; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; /** * Represents an exception that is a composite of one or more other exceptions. A {@code CompositeException} @@ -81,7 +82,7 @@ public final class CompositeException extends RuntimeException { @Nullable private Throwable cause; - private final ReentrantLock reentrantLock = new ReentrantLock(); + private final ReentrantLock reentrantLock = new ReentrantShortLock(); /** * Constructs a CompositeException with the given array of Throwables as the diff --git a/core/src/main/java/com/linecorp/armeria/common/util/NonEventLoopThreadFactory.java b/core/src/main/java/com/linecorp/armeria/common/util/NonEventLoopThreadFactory.java index a662c9b0767..b342967431d 100644 --- a/core/src/main/java/com/linecorp/armeria/common/util/NonEventLoopThreadFactory.java +++ b/core/src/main/java/com/linecorp/armeria/common/util/NonEventLoopThreadFactory.java @@ -35,6 +35,17 @@ final class NonEventLoopThreadFactory extends AbstractThreadFactory { @Override Thread newThread(@Nullable ThreadGroup threadGroup, Runnable r, String name) { - return new FastThreadLocalThread(threadGroup, r, name); + return new BlockingFastThreadLocalThread(threadGroup, r, name); + } + + private static class BlockingFastThreadLocalThread extends FastThreadLocalThread { + BlockingFastThreadLocalThread(@Nullable ThreadGroup threadGroup, Runnable r, String name) { + super(threadGroup, r, name); + } + + @Override + public boolean permitBlockingCalls() { + return true; + } } } diff --git a/core/src/main/java/com/linecorp/armeria/common/util/ShutdownHooks.java b/core/src/main/java/com/linecorp/armeria/common/util/ShutdownHooks.java index 902b7bc3e53..d7c95b0c1ee 100644 --- a/core/src/main/java/com/linecorp/armeria/common/util/ShutdownHooks.java +++ b/core/src/main/java/com/linecorp/armeria/common/util/ShutdownHooks.java @@ -33,6 +33,7 @@ import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.annotation.UnstableApi; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; /** * A utility class for adding a task with an {@link AutoCloseable} on shutdown. @@ -46,7 +47,7 @@ public final class ShutdownHooks { private static final Map> autoCloseableOnShutdownTasks = new LinkedHashMap<>(); - private static final ReentrantLock reentrantLock = new ReentrantLock(); + private static final ReentrantLock reentrantLock = new ReentrantShortLock(); private static final ThreadFactory THREAD_FACTORY = ThreadFactories .builder("armeria-shutdown-hook") diff --git a/core/src/main/java/com/linecorp/armeria/common/util/StartStopSupport.java b/core/src/main/java/com/linecorp/armeria/common/util/StartStopSupport.java index 51399fabe16..065d2d21273 100644 --- a/core/src/main/java/com/linecorp/armeria/common/util/StartStopSupport.java +++ b/core/src/main/java/com/linecorp/armeria/common/util/StartStopSupport.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; /** * Provides asynchronous start-stop life cycle support. @@ -63,7 +64,7 @@ enum State { */ private UnmodifiableFuture future = completedFuture(null); - private final ReentrantLock reentrantLock = new ReentrantLock(); + private final ReentrantLock reentrantLock = new ReentrantShortLock(); /** * Creates a new instance. diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/JacksonUtil.java b/core/src/main/java/com/linecorp/armeria/internal/common/JacksonUtil.java index eb59f6ab85f..fda9434ad15 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/common/JacksonUtil.java +++ b/core/src/main/java/com/linecorp/armeria/internal/common/JacksonUtil.java @@ -38,7 +38,8 @@ public final class JacksonUtil { static { final List providers = - ImmutableList.copyOf(ServiceLoader.load(JacksonObjectMapperProvider.class)); + ImmutableList.copyOf(ServiceLoader.load(JacksonObjectMapperProvider.class, + JacksonObjectMapperProvider.class.getClassLoader())); if (!providers.isEmpty()) { // Use a custom ObjectMapper provided via SPI. provider = providers.get(0); diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/ReflectiveDependencyInjector.java b/core/src/main/java/com/linecorp/armeria/internal/common/ReflectiveDependencyInjector.java index 539194ed92c..939405d82b5 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/common/ReflectiveDependencyInjector.java +++ b/core/src/main/java/com/linecorp/armeria/internal/common/ReflectiveDependencyInjector.java @@ -31,6 +31,7 @@ import com.linecorp.armeria.common.DependencyInjector; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; public final class ReflectiveDependencyInjector implements DependencyInjector { @@ -57,7 +58,7 @@ public static T create(Class type, @Nullable Map, Obje return instance; } - private final ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantShortLock(); private final Map, Object> instances = new HashMap<>(); diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/metric/CaffeineMetricSupport.java b/core/src/main/java/com/linecorp/armeria/internal/common/metric/CaffeineMetricSupport.java index a16e2a0a3e1..50f01ed077f 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/common/metric/CaffeineMetricSupport.java +++ b/core/src/main/java/com/linecorp/armeria/internal/common/metric/CaffeineMetricSupport.java @@ -43,6 +43,7 @@ import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.metric.MeterIdPrefix; import com.linecorp.armeria.common.util.Ticker; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; import io.micrometer.core.instrument.MeterRegistry; @@ -86,7 +87,7 @@ private static final class CaffeineMetrics { private final MeterRegistry parent; private final MeterIdPrefix idPrefix; - private final ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantShortLock(); @GuardedBy("lock") private final List cacheRefs = new ArrayList<>(2); private final AtomicBoolean hasLoadingCache = new AtomicBoolean(); diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/util/MinifiedBouncyCastleProvider.java b/core/src/main/java/com/linecorp/armeria/internal/common/util/MinifiedBouncyCastleProvider.java index 666cc8af085..d5df712ec92 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/common/util/MinifiedBouncyCastleProvider.java +++ b/core/src/main/java/com/linecorp/armeria/internal/common/util/MinifiedBouncyCastleProvider.java @@ -47,7 +47,7 @@ public final class MinifiedBouncyCastleProvider extends Provider implements Conf private static final String PROVIDER_NAME = "ArmeriaBC"; - private static final ReentrantLock lock = new ReentrantLock(); + private static final ReentrantLock lock = new ReentrantShortLock(); private static final Map keyInfoConverters = new ConcurrentHashMap<>(); diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/util/ReentrantShortLock.java b/core/src/main/java/com/linecorp/armeria/internal/common/util/ReentrantShortLock.java new file mode 100644 index 00000000000..28bd8f13de1 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/internal/common/util/ReentrantShortLock.java @@ -0,0 +1,35 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.internal.common.util; + +import java.util.concurrent.locks.ReentrantLock; + +import com.linecorp.armeria.common.CoreBlockHoundIntegration; + +/** + * A short lock which is whitelisted by {@link CoreBlockHoundIntegration}. + * This lock may be preferred over {@link ReentrantLock} when it is known that the + * lock won't block the event loop over long periods of time. + */ +public final class ReentrantShortLock extends ReentrantLock { + private static final long serialVersionUID = 8999619612996643502L; + + @Override + public void lock() { + super.lock(); + } +} diff --git a/core/src/main/java/com/linecorp/armeria/internal/server/annotation/AnnotatedBeanFactoryRegistry.java b/core/src/main/java/com/linecorp/armeria/internal/server/annotation/AnnotatedBeanFactoryRegistry.java index f260d605132..7e9508a6b83 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/server/annotation/AnnotatedBeanFactoryRegistry.java +++ b/core/src/main/java/com/linecorp/armeria/internal/server/annotation/AnnotatedBeanFactoryRegistry.java @@ -49,6 +49,7 @@ import com.linecorp.armeria.common.DependencyInjector; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; import com.linecorp.armeria.internal.server.annotation.AnnotatedValueResolver.NoAnnotatedParameterException; import com.linecorp.armeria.internal.server.annotation.AnnotatedValueResolver.RequestObjectResolver; import com.linecorp.armeria.server.annotation.RequestConverter; @@ -63,7 +64,7 @@ final class AnnotatedBeanFactoryRegistry { private static final Logger logger = LoggerFactory.getLogger(AnnotatedBeanFactoryRegistry.class); - private static final ReentrantLock lock = new ReentrantLock(); + private static final ReentrantLock lock = new ReentrantShortLock(); private static final ClassValue factories = new ClassValue() { diff --git a/core/src/main/java/com/linecorp/armeria/server/Server.java b/core/src/main/java/com/linecorp/armeria/server/Server.java index 3d00a0b8f12..e943e188bdb 100644 --- a/core/src/main/java/com/linecorp/armeria/server/Server.java +++ b/core/src/main/java/com/linecorp/armeria/server/Server.java @@ -77,6 +77,7 @@ import com.linecorp.armeria.common.util.Version; import com.linecorp.armeria.internal.common.RequestTargetCache; import com.linecorp.armeria.internal.common.util.ChannelUtil; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; import com.linecorp.armeria.server.websocket.WebSocketService; import io.micrometer.core.instrument.Gauge; @@ -115,7 +116,7 @@ public static ServerBuilder builder() { private final UpdatableServerConfig config; private final StartStopSupport startStop; private final Set serverChannels = new NonBlockingHashSet<>(); - private final ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantShortLock(); @GuardedBy("lock") private final Map activePorts = new LinkedHashMap<>(); private final ConnectionLimitingHandler connectionLimitingHandler; @@ -700,11 +701,14 @@ private void finishDoStop(CompletableFuture future) { builder.addAll(serviceConfig.shutdownSupports()); } - CompletableFutures.successfulAsList(builder.build() - .stream() - .map(ShutdownSupport::shutdown) - .collect(toImmutableList()), cause -> null) - .thenRunAsync(() -> future.complete(null), config.startStopExecutor()); + CompletableFuture.runAsync(() -> { + // ShutdownSupport may be blocking so run the entire block inside the startStopExecutor + CompletableFutures.successfulAsList(builder.build() + .stream() + .map(ShutdownSupport::shutdown) + .collect(toImmutableList()), cause -> null) + .thenRunAsync(() -> future.complete(null), config.startStopExecutor()); + }, config.startStopExecutor()); } @Override diff --git a/core/src/main/java/com/linecorp/armeria/server/ServerBuilder.java b/core/src/main/java/com/linecorp/armeria/server/ServerBuilder.java index da99275e6f3..63f73597d42 100644 --- a/core/src/main/java/com/linecorp/armeria/server/ServerBuilder.java +++ b/core/src/main/java/com/linecorp/armeria/server/ServerBuilder.java @@ -47,6 +47,8 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -84,6 +86,7 @@ import com.linecorp.armeria.common.util.BlockingTaskExecutor; import com.linecorp.armeria.common.util.EventLoopGroups; import com.linecorp.armeria.common.util.SystemInfo; +import com.linecorp.armeria.common.util.ThreadFactories; import com.linecorp.armeria.internal.common.BuiltInDependencyInjector; import com.linecorp.armeria.internal.common.ReflectiveDependencyInjector; import com.linecorp.armeria.internal.common.RequestContextUtil; @@ -102,7 +105,6 @@ import io.netty.handler.ssl.SslContextBuilder; import io.netty.util.Mapping; import io.netty.util.NetUtil; -import io.netty.util.concurrent.GlobalEventExecutor; import it.unimi.dsi.fastutil.objects.Object2ObjectArrayMap; /** @@ -166,6 +168,8 @@ public final class ServerBuilder implements TlsSetters { @VisibleForTesting static final long MIN_PING_INTERVAL_MILLIS = 1000L; private static final long MIN_MAX_CONNECTION_AGE_MILLIS = 1_000L; + private static final ExecutorService START_STOP_EXECUTOR = Executors.newSingleThreadExecutor( + ThreadFactories.newThreadFactory("startstop-support", true)); static { RequestContextUtil.init(); @@ -180,7 +184,7 @@ public final class ServerBuilder implements TlsSetters { private EventLoopGroup workerGroup = CommonPools.workerGroup(); private boolean shutdownWorkerGroupOnStop; - private Executor startStopExecutor = GlobalEventExecutor.INSTANCE; + private Executor startStopExecutor = START_STOP_EXECUTOR; private final Map, Object> channelOptions = new Object2ObjectArrayMap<>(); private final Map, Object> childChannelOptions = new Object2ObjectArrayMap<>(); private int maxNumConnections = Flags.maxNumConnections(); @@ -503,8 +507,7 @@ public ServerBuilder workerGroup(int numThreads) { /** * Sets the {@link Executor} which will invoke the callbacks of {@link Server#start()}, - * {@link Server#stop()} and {@link ServerListener}. If not set, {@link GlobalEventExecutor} will be used - * by default. + * {@link Server#stop()} and {@link ServerListener}. */ public ServerBuilder startStopExecutor(Executor startStopExecutor) { this.startStopExecutor = requireNonNull(startStopExecutor, "startStopExecutor"); diff --git a/core/src/main/java/com/linecorp/armeria/server/healthcheck/HealthCheckService.java b/core/src/main/java/com/linecorp/armeria/server/healthcheck/HealthCheckService.java index a21986587d6..90ff64773a7 100644 --- a/core/src/main/java/com/linecorp/armeria/server/healthcheck/HealthCheckService.java +++ b/core/src/main/java/com/linecorp/armeria/server/healthcheck/HealthCheckService.java @@ -43,6 +43,7 @@ import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.util.TimeoutMode; import com.linecorp.armeria.internal.common.ArmeriaHttpUtil; +import com.linecorp.armeria.internal.common.util.ReentrantShortLock; import com.linecorp.armeria.server.HttpService; import com.linecorp.armeria.server.HttpStatusException; import com.linecorp.armeria.server.RequestTimeoutException; @@ -137,7 +138,7 @@ public static HealthCheckServiceBuilder builder() { private final long maxLongPollingTimeoutMillis; private final double longPollingTimeoutJitterRate; private final long pingIntervalMillis; - private final ReentrantLock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantShortLock(); @Nullable private final Consumer healthCheckerListener; @Nullable diff --git a/core/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration b/core/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration new file mode 100644 index 00000000000..d6e8ba5a1c9 --- /dev/null +++ b/core/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration @@ -0,0 +1 @@ +com.linecorp.armeria.common.CoreBlockHoundIntegration diff --git a/core/src/test/java/com/linecorp/armeria/client/Http1HeaderNamingTest.java b/core/src/test/java/com/linecorp/armeria/client/Http1HeaderNamingTest.java index aaee7c13497..e6607821505 100644 --- a/core/src/test/java/com/linecorp/armeria/client/Http1HeaderNamingTest.java +++ b/core/src/test/java/com/linecorp/armeria/client/Http1HeaderNamingTest.java @@ -28,6 +28,7 @@ import java.net.Socket; import java.util.concurrent.CompletableFuture; +import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; @@ -39,11 +40,35 @@ import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.common.ResponseHeaders; -import com.linecorp.armeria.server.Server; import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; class Http1HeaderNamingTest { + @RegisterExtension + static ServerExtension traditionalHeaderNameServer = new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) throws Exception { + sb.service("/", (ctx, req) -> HttpResponse + .of(ResponseHeaders.of(HttpStatus.OK, + HttpHeaderNames.AUTHORIZATION, "Bearer foo", + HttpHeaderNames.X_FORWARDED_FOR, "bar"))) + .http1HeaderNaming(Http1HeaderNaming.traditional()); + } + }; + + @RegisterExtension + static ServerExtension server = new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) throws Exception { + sb.service("/", (ctx, req) -> HttpResponse + .of(ResponseHeaders.of(HttpStatus.OK, + HttpHeaderNames.AUTHORIZATION, "Bearer foo", + HttpHeaderNames.X_FORWARDED_FOR, "bar"))) + .http1HeaderNaming(Http1HeaderNaming.ofDefault()); + } + }; + @CsvSource({ "true", "false" }) @ParameterizedTest void clientTraditionalHeaderNaming(boolean useHeaderNaming) throws IOException { @@ -112,20 +137,12 @@ void clientTraditionalHeaderNaming(boolean useHeaderNaming) throws IOException { @CsvSource({ "true", "false" }) @ParameterizedTest void serverTraditionalHeaderNaming(boolean useHeaderNaming) throws IOException { - final ServerBuilder serverBuilder = Server - .builder() - .service("/", (ctx, req) -> HttpResponse - .of(ResponseHeaders.of(HttpStatus.OK, - HttpHeaderNames.AUTHORIZATION, "Bearer foo", - HttpHeaderNames.X_FORWARDED_FOR, "bar"))); - if (useHeaderNaming) { - serverBuilder.http1HeaderNaming(Http1HeaderNaming.traditional()); - } - final Server server = serverBuilder.build(); - server.start().join(); - try (Socket socket = new Socket()) { - socket.connect(server.activePort().localAddress()); + if (useHeaderNaming) { + socket.connect(traditionalHeaderNameServer.httpSocketAddress()); + } else { + socket.connect(server.httpSocketAddress()); + } final PrintWriter outWriter = new PrintWriter(socket.getOutputStream(), false); outWriter.print("GET / HTTP/1.1\r\n"); diff --git a/core/src/test/java/com/linecorp/armeria/client/HttpClientMaxConcurrentStreamTest.java b/core/src/test/java/com/linecorp/armeria/client/HttpClientMaxConcurrentStreamTest.java index f6f1bb79291..97cfd6d3344 100644 --- a/core/src/test/java/com/linecorp/armeria/client/HttpClientMaxConcurrentStreamTest.java +++ b/core/src/test/java/com/linecorp/armeria/client/HttpClientMaxConcurrentStreamTest.java @@ -43,6 +43,7 @@ import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.logging.ClientConnectionTimings; import com.linecorp.armeria.common.logging.RequestLogProperty; +import com.linecorp.armeria.internal.testing.BlockingUtils; import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.testing.junit5.server.ServerExtension; @@ -316,13 +317,8 @@ void ensureCorrectPendingAcquisitionDurationBehavior() throws Exception { .decorator(connectionTimingsAccumulatingDecorator(connectionTimings)) .build(); final int sleepMillis = 300; - connectionPoolListener = newConnectionPoolListener(() -> { - try { - Thread.sleep(sleepMillis); - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } - }, () -> {}); + connectionPoolListener = newConnectionPoolListener( + () -> BlockingUtils.blockingRun(() -> Thread.sleep(sleepMillis)), () -> {}); final int numConnections = MAX_NUM_CONNECTIONS; final int numRequests = MAX_CONCURRENT_STREAMS * numConnections; diff --git a/core/src/test/java/com/linecorp/armeria/client/HttpClientPipeliningTest.java b/core/src/test/java/com/linecorp/armeria/client/HttpClientPipeliningTest.java index 08d5957e891..e73bfc7936d 100644 --- a/core/src/test/java/com/linecorp/armeria/client/HttpClientPipeliningTest.java +++ b/core/src/test/java/com/linecorp/armeria/client/HttpClientPipeliningTest.java @@ -34,6 +34,7 @@ import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.common.MediaType; import com.linecorp.armeria.common.util.EventLoopGroups; +import com.linecorp.armeria.internal.testing.BlockingUtils; import com.linecorp.armeria.server.AbstractHttpService; import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.server.ServiceRequestContext; @@ -72,7 +73,7 @@ protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req) throws lock.unlock(); } - semaphore.acquireUninterruptibly(); + BlockingUtils.blockingRun(() -> semaphore.acquireUninterruptibly()); try { return HttpResponse.of(HttpStatus.OK, MediaType.PLAIN_TEXT_UTF_8, String.valueOf(ctx.remoteAddress())); diff --git a/core/src/test/java/com/linecorp/armeria/client/endpoint/SelectionTimeoutTest.java b/core/src/test/java/com/linecorp/armeria/client/endpoint/SelectionTimeoutTest.java index d6edbc06922..5a3395cb5a6 100644 --- a/core/src/test/java/com/linecorp/armeria/client/endpoint/SelectionTimeoutTest.java +++ b/core/src/test/java/com/linecorp/armeria/client/endpoint/SelectionTimeoutTest.java @@ -45,6 +45,7 @@ import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.util.UnmodifiableFuture; import com.linecorp.armeria.internal.client.ClientPendingThrowableUtil; +import com.linecorp.armeria.internal.testing.BlockingUtils; class SelectionTimeoutTest { @@ -308,7 +309,7 @@ void select_shouldRespectResponseTimeout() { try (MockEndpointGroup endpointGroup = new MockEndpointGroup(5000)) { final CompletableFuture result = endpointGroup.select(ctx, CommonPools.blockingTaskExecutor()); - assertThat(result.join()).isNull(); + assertThat(BlockingUtils.blockingRun(result::join)).isNull(); assertThat(stopwatch.elapsed()) .isGreaterThanOrEqualTo(Duration.ofSeconds(2)); } diff --git a/core/src/test/java/com/linecorp/armeria/client/proxy/ProxyClientIntegrationTest.java b/core/src/test/java/com/linecorp/armeria/client/proxy/ProxyClientIntegrationTest.java index 27022dea844..fae5cfd77fa 100644 --- a/core/src/test/java/com/linecorp/armeria/client/proxy/ProxyClientIntegrationTest.java +++ b/core/src/test/java/com/linecorp/armeria/client/proxy/ProxyClientIntegrationTest.java @@ -40,6 +40,7 @@ import java.util.function.Consumer; import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; @@ -60,6 +61,8 @@ import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.common.SessionProtocol; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.internal.testing.BlockingUtils; import com.linecorp.armeria.internal.testing.NettyServerExtension; import com.linecorp.armeria.internal.testing.SimpleChannelHandlerFactory; import com.linecorp.armeria.server.ServerBuilder; @@ -100,6 +103,9 @@ class ProxyClientIntegrationTest { private static SimpleChannelHandlerFactory channelHandlerFactory; + @Nullable + private static SslContext sslContext; + @RegisterExtension @Order(0) static final SelfSignedCertificateExtension ssc = new SelfSignedCertificateExtension(); @@ -146,6 +152,7 @@ protected void configure(Channel ch) throws Exception { static NettyServerExtension httpsProxyServer = new NettyServerExtension() { @Override protected void configure(Channel ch) throws Exception { + assert sslContext != null; final SslContext sslContext = SslContextBuilder .forServer(ssc.privateKey(), ssc.certificate()).build(); ch.pipeline().addLast(sslContext.newHandler(ch.alloc())); @@ -177,6 +184,12 @@ protected void configure(Channel ch) throws Exception { } }; + @BeforeAll + static void beforeAll() throws Exception { + sslContext = SslContextBuilder + .forServer(ssc.privateKey(), ssc.certificate()).build(); + } + @BeforeEach void beforeEach() { numSuccessfulProxyRequests = 0; @@ -738,7 +751,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc // first writing to the channel occurs after ProxySuccessEvent is triggered. // If the first writing happens before ProxySuccessEvent is triggered, // the client would get WriteTimeoutException that makes the test fail. - Thread.sleep(Flags.defaultWriteTimeoutMillis()); + BlockingUtils.blockingRun(() -> Thread.sleep(Flags.defaultWriteTimeoutMillis())); } super.userEventTriggered(ctx, evt); } diff --git a/core/src/test/java/com/linecorp/armeria/common/metric/EventLoopMetricsTest.java b/core/src/test/java/com/linecorp/armeria/common/metric/EventLoopMetricsTest.java index 5eac1b2a71c..9773b00aef0 100644 --- a/core/src/test/java/com/linecorp/armeria/common/metric/EventLoopMetricsTest.java +++ b/core/src/test/java/com/linecorp/armeria/common/metric/EventLoopMetricsTest.java @@ -24,14 +24,21 @@ import org.hamcrest.Matchers; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.linecorp.armeria.common.util.ThreadFactories; +import com.linecorp.armeria.testing.junit5.common.EventLoopGroupExtension; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; -import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.EventLoopGroup; class EventLoopMetricsTest { + @RegisterExtension + static EventLoopGroupExtension eventLoopGroup = + new EventLoopGroupExtension(2, ThreadFactories.newThreadFactory("block-me", false)); + private class BlockMe extends CountDownLatch implements Runnable { AtomicInteger run = new AtomicInteger(); @@ -61,7 +68,7 @@ void test() { final BlockMe task = new BlockMe(); - final EventLoopGroup workers = new DefaultEventLoopGroup(2); + final EventLoopGroup workers = eventLoopGroup.get(); // Block both executors workers.submit(task); workers.submit(task); @@ -89,7 +96,5 @@ void test() { MoreMeters.measureAll(registry)) .containsEntry("foo.event.loop.workers#value", 2.0) .containsEntry("foo.event.loop.pending.tasks#value", 0.0)); - - workers.shutdownGracefully(); } } diff --git a/core/src/test/java/com/linecorp/armeria/common/util/EventLoopCheckingFutureTest.java b/core/src/test/java/com/linecorp/armeria/common/util/EventLoopCheckingFutureTest.java index 9f67e20cb14..7e8b34969c6 100644 --- a/core/src/test/java/com/linecorp/armeria/common/util/EventLoopCheckingFutureTest.java +++ b/core/src/test/java/com/linecorp/armeria/common/util/EventLoopCheckingFutureTest.java @@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory; import com.linecorp.armeria.common.CommonPools; +import com.linecorp.armeria.internal.testing.BlockingUtils; import com.linecorp.armeria.testing.junit5.common.EventLoopExtension; import ch.qos.logback.classic.Level; @@ -117,7 +118,7 @@ void getTimeoutOffEventLoop() throws Exception { private void testBlockingOperationOnEventLoop(EventLoopCheckingFutureTask task) { final EventLoopCheckingFuture future = new EventLoopCheckingFuture<>(); - eventLoop.get().submit(() -> task.run(future)); + eventLoop.get().submit(() -> BlockingUtils.blockingRun(() -> task.run(future))); try { await().untilAsserted(() -> { verify(appender, atLeast(0)).doAppend(eventCaptor.capture()); diff --git a/core/src/test/java/com/linecorp/armeria/common/util/StartStopSupportTest.java b/core/src/test/java/com/linecorp/armeria/common/util/StartStopSupportTest.java index cb3c0f8e167..46d6ebe71f5 100644 --- a/core/src/test/java/com/linecorp/armeria/common/util/StartStopSupportTest.java +++ b/core/src/test/java/com/linecorp/armeria/common/util/StartStopSupportTest.java @@ -63,7 +63,8 @@ public class StartStopSupportTest { private static final String THREAD_NAME_PREFIX = StartStopSupportTest.class.getSimpleName(); @ClassRule - public static final EventLoopRule rule = new EventLoopRule(THREAD_NAME_PREFIX); + public static final EventLoopRule rule = new EventLoopRule( + ThreadFactories.newThreadFactory(THREAD_NAME_PREFIX, false)); @Rule public TestRule globalTimeout = new DisableOnDebug(new Timeout(10, TimeUnit.SECONDS)); diff --git a/core/src/test/java/com/linecorp/armeria/common/util/ThreadFactoryTest.java b/core/src/test/java/com/linecorp/armeria/common/util/ThreadFactoryTest.java index eae443084ed..de5263179a3 100644 --- a/core/src/test/java/com/linecorp/armeria/common/util/ThreadFactoryTest.java +++ b/core/src/test/java/com/linecorp/armeria/common/util/ThreadFactoryTest.java @@ -68,7 +68,7 @@ void testNonEventLoopThreadFactory() { .build() .newThread(() -> {}); - assertThat(nonEventLoopThread.getClass()).isSameAs(FastThreadLocalThread.class); + assertThat(nonEventLoopThread).isInstanceOf(FastThreadLocalThread.class); assertThat(nonEventLoopThread.getName()).startsWith("normal-thread"); assertThat(nonEventLoopThread.getPriority()).isEqualTo(Thread.NORM_PRIORITY); assertThat(nonEventLoopThread.isDaemon()).isFalse(); @@ -82,7 +82,7 @@ void testNonEventLoopThreadFactory() { .build() .newThread(() -> {}); - assertThat(nonEventLoopCustomThread.getClass()).isSameAs(FastThreadLocalThread.class); + assertThat(nonEventLoopCustomThread).isInstanceOf(FastThreadLocalThread.class); assertThat(nonEventLoopCustomThread.getName()).startsWith("custom-thread"); assertThat(nonEventLoopCustomThread.getPriority()).isEqualTo(Thread.MAX_PRIORITY); assertThat(nonEventLoopCustomThread.isDaemon()).isTrue(); diff --git a/core/src/test/java/com/linecorp/armeria/internal/common/stream/FixedStreamMessageTest.java b/core/src/test/java/com/linecorp/armeria/internal/common/stream/FixedStreamMessageTest.java index a39ad345ac6..607abfb4505 100644 --- a/core/src/test/java/com/linecorp/armeria/internal/common/stream/FixedStreamMessageTest.java +++ b/core/src/test/java/com/linecorp/armeria/internal/common/stream/FixedStreamMessageTest.java @@ -44,6 +44,7 @@ import com.linecorp.armeria.common.stream.StreamMessage; import com.linecorp.armeria.internal.testing.AnticipatedException; +import com.linecorp.armeria.internal.testing.BlockingUtils; import com.linecorp.armeria.testing.junit5.common.EventLoopExtension; import io.netty.util.concurrent.EventExecutor; @@ -94,12 +95,7 @@ void raceBetweenSubscriptionAndAbort(StreamMessage stream) { @Override public void onSubscribe(Subscription s) { - try { - // Wait for `abort()` to be called. - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + BlockingUtils.blockingRun(() -> latch.await()); } @Override diff --git a/core/src/test/java/com/linecorp/armeria/server/GracefulShutdownSupportTest.java b/core/src/test/java/com/linecorp/armeria/server/GracefulShutdownSupportTest.java index 1479e001bb7..57b66007d2b 100644 --- a/core/src/test/java/com/linecorp/armeria/server/GracefulShutdownSupportTest.java +++ b/core/src/test/java/com/linecorp/armeria/server/GracefulShutdownSupportTest.java @@ -52,7 +52,7 @@ class GracefulShutdownSupportTest { void setUp() { executor = new ThreadPoolExecutor( 0, 1, 1, TimeUnit.SECONDS, new LinkedTransferQueue<>(), - ThreadFactories.newEventLoopThreadFactory("graceful-shutdown-test", true)); + ThreadFactories.newThreadFactory("graceful-shutdown-test", true)); support = GracefulShutdownSupport.create(Duration.ofNanos(QUIET_PERIOD_NANOS), executor, ticker); } diff --git a/core/src/test/java/com/linecorp/armeria/server/Http1ServerEarlyDisconnectionTest.java b/core/src/test/java/com/linecorp/armeria/server/Http1ServerEarlyDisconnectionTest.java index debaaf1b91d..dcb260efc6b 100644 --- a/core/src/test/java/com/linecorp/armeria/server/Http1ServerEarlyDisconnectionTest.java +++ b/core/src/test/java/com/linecorp/armeria/server/Http1ServerEarlyDisconnectionTest.java @@ -35,6 +35,7 @@ import com.linecorp.armeria.common.SessionProtocol; import com.linecorp.armeria.common.SplitHttpResponse; import com.linecorp.armeria.common.logging.RequestLog; +import com.linecorp.armeria.internal.testing.BlockingUtils; import com.linecorp.armeria.internal.testing.FlakyTest; import com.linecorp.armeria.testing.junit5.server.ServerExtension; @@ -101,8 +102,10 @@ public void onNext(HttpData httpData) { received += httpData.length(); if (received >= contentLength) { // All data is received, so it should be safe to close the connection. - clientFactory.close(); - latch.countDown(); + BlockingUtils.blockingRun(() -> { + clientFactory.close(); + latch.countDown(); + }); } } diff --git a/core/src/test/java/com/linecorp/armeria/server/ServerTest.java b/core/src/test/java/com/linecorp/armeria/server/ServerTest.java index 5a49e4cdf7d..d965591ee67 100644 --- a/core/src/test/java/com/linecorp/armeria/server/ServerTest.java +++ b/core/src/test/java/com/linecorp/armeria/server/ServerTest.java @@ -76,6 +76,7 @@ import com.linecorp.armeria.common.util.TimeoutMode; import com.linecorp.armeria.internal.common.metric.MicrometerUtil; import com.linecorp.armeria.internal.testing.AnticipatedException; +import com.linecorp.armeria.internal.testing.BlockingUtils; import com.linecorp.armeria.server.logging.AccessLogWriter; import com.linecorp.armeria.server.logging.LoggingService; import com.linecorp.armeria.testing.junit4.server.ServerRule; @@ -112,12 +113,8 @@ protected void configure(ServerBuilder sb) throws Exception { final HttpService delayedResponseOnIoThread = new EchoService() { @Override protected HttpResponse echo(AggregatedHttpRequest aReq) { - try { - Thread.sleep(processDelayMillis); - return super.echo(aReq); - } catch (InterruptedException e) { - return HttpResponse.ofFailure(e); - } + BlockingUtils.blockingRun(() -> Thread.sleep(processDelayMillis)); + return super.echo(aReq); } }.decorate(LoggingService.newDecorator()); @@ -416,7 +413,7 @@ void defaultStartStopExecutor() { threads.add(server.stop().thenApply(unused -> Thread.currentThread()).join()); threads.add(server.start().thenApply(unused -> Thread.currentThread()).join()); - threads.forEach(t -> assertThat(t.getName()).startsWith("globalEventExecutor")); + threads.forEach(t -> assertThat(t.getName()).startsWith("startstop-support")); } @Test diff --git a/dependencies.toml b/dependencies.toml index 6095c6efb69..600d6de3f4f 100644 --- a/dependencies.toml +++ b/dependencies.toml @@ -6,6 +6,7 @@ apache-httpclient5 = "5.2.1" apache-httpclient4 = "4.5.14" assertj = "3.24.2" awaitility = "4.2.0" +blockhound = "1.0.8.RELEASE" bouncycastle = "1.70" brave = "5.15.0" brotli4j = "1.11.0" @@ -180,6 +181,14 @@ version.ref = "assertj" module = "org.awaitility:awaitility" version.ref = "awaitility" +[libraries.blockhound-junit-platform] +module="io.projectreactor.tools:blockhound-junit-platform" +version.ref = "blockhound" + +[libraries.blockhound] +module="io.projectreactor.tools:blockhound" +version.ref = "blockhound" + [libraries.bouncycastle-bcpkix] module = "org.bouncycastle:bcpkix-jdk15on" version.ref = "bouncycastle" @@ -685,6 +694,9 @@ version.ref = "kotlin-coroutine" [libraries.kotlin-coroutines-test] module = "org.jetbrains.kotlinx:kotlinx-coroutines-test" version.ref = "kotlin-coroutine" +[libraries.kotlin-coroutines-debug] +module = "org.jetbrains.kotlinx:kotlinx-coroutines-debug" +version.ref = "kotlin-coroutine" # Don't upgrade Logback 1.4.0 which requires Java 11 # TODO(ikhoon): Upgrade Logback to 1.3.0 when Spring Boot 2 supports it. diff --git a/graphql/src/main/java/com/linecorp/armeria/server/graphql/DefaultGraphqlService.java b/graphql/src/main/java/com/linecorp/armeria/server/graphql/DefaultGraphqlService.java index 443398f17fa..c174c94ba1b 100644 --- a/graphql/src/main/java/com/linecorp/armeria/server/graphql/DefaultGraphqlService.java +++ b/graphql/src/main/java/com/linecorp/armeria/server/graphql/DefaultGraphqlService.java @@ -40,6 +40,7 @@ import graphql.ExecutionInput; import graphql.ExecutionResult; import graphql.GraphQL; +import graphql.execution.ExecutionId; final class DefaultGraphqlService extends AbstractGraphqlService implements GraphqlService { @@ -88,6 +89,7 @@ protected HttpResponse executeGraphql(ServiceRequestContext ctx, GraphqlRequest final ExecutionInput executionInput = builder.context(ctx) + .executionId(ExecutionId.from(ctx.id().text())) .graphQLContext(GraphqlServiceContexts.graphqlContext(ctx)) .dataLoaderRegistry(dataLoaderRegistry) .build(); diff --git a/grpc/src/main/java/com/linecorp/armeria/common/grpc/GrpcBlockHoundIntegration.java b/grpc/src/main/java/com/linecorp/armeria/common/grpc/GrpcBlockHoundIntegration.java new file mode 100644 index 00000000000..26dafcede6f --- /dev/null +++ b/grpc/src/main/java/com/linecorp/armeria/common/grpc/GrpcBlockHoundIntegration.java @@ -0,0 +1,35 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.common.grpc; + +import com.linecorp.armeria.common.annotation.UnstableApi; + +import reactor.blockhound.BlockHound.Builder; +import reactor.blockhound.integration.BlockHoundIntegration; + +/** + * A {@link BlockHoundIntegration} for the gRPC module. + */ +@UnstableApi +public final class GrpcBlockHoundIntegration implements BlockHoundIntegration { + + @Override + public void applyTo(Builder builder) { + // ClientCalls.QueuingListener internally uses a blocking queue + builder.allowBlockingCallsInside("java.util.concurrent.ArrayBlockingQueue", "add"); + } +} diff --git a/grpc/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration b/grpc/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration new file mode 100644 index 00000000000..fd12d963d6d --- /dev/null +++ b/grpc/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration @@ -0,0 +1 @@ +com.linecorp.armeria.common.grpc.GrpcBlockHoundIntegration diff --git a/it/context-storage/src/test/java/com/linecorp/armeria/common/RequestContextStorageCustomizingTest.java b/it/context-storage/src/test/java/com/linecorp/armeria/common/RequestContextStorageCustomizingTest.java index d2e344b5ce5..1bcf94c302b 100644 --- a/it/context-storage/src/test/java/com/linecorp/armeria/common/RequestContextStorageCustomizingTest.java +++ b/it/context-storage/src/test/java/com/linecorp/armeria/common/RequestContextStorageCustomizingTest.java @@ -27,6 +27,7 @@ import com.linecorp.armeria.common.CustomRequestContextStorageProvider.CustomRequestContextStorage; import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.util.SafeCloseable; +import com.linecorp.armeria.common.util.ThreadFactories; import com.linecorp.armeria.server.ServiceRequestContext; import com.linecorp.armeria.testing.junit5.common.EventLoopExtension; @@ -35,7 +36,8 @@ class RequestContextStorageCustomizingTest { @RegisterExtension - static final EventLoopExtension eventLoopExtension = new EventLoopExtension(); + static final EventLoopExtension eventLoopExtension = new EventLoopExtension( + ThreadFactories.newThreadFactory("armeria-testing-eventloop", false)); @Test void requestContextStorageDoesNotAffectOtherThread() throws InterruptedException { diff --git a/it/multipart/src/test/java/com/linecorp/armeria/common/multipart/MultipartCollectIntegrationTest.java b/it/multipart/src/test/java/com/linecorp/armeria/common/multipart/MultipartCollectIntegrationTest.java index 029172b22bf..bc6696255fc 100644 --- a/it/multipart/src/test/java/com/linecorp/armeria/common/multipart/MultipartCollectIntegrationTest.java +++ b/it/multipart/src/test/java/com/linecorp/armeria/common/multipart/MultipartCollectIntegrationTest.java @@ -77,7 +77,7 @@ protected void configure(ServerBuilder sb) throws Exception { }) .thenApply(aggregated -> aggregated.stream().collect( Collectors.toMap(Entry::getKey, Entry::getValue))) - .thenApply(aggregated -> { + .thenApplyAsync(aggregated -> { final StringBuilder responseStringBuilder = new StringBuilder(); responseStringBuilder.append("param1/") .append(aggregated.get("param1")).append('\n'); @@ -110,7 +110,7 @@ protected void configure(ServerBuilder sb) throws Exception { } catch (IOException e) { throw new UncheckedIOException(e); } - }))) + }, ctx.blockingTaskExecutor()))) .service("/multipart/large-file", (ctx, req) -> HttpResponse.from( Multipart.from(req).collect(bodyPart -> { final Path path = tempDir.resolve(bodyPart.name()); @@ -120,7 +120,7 @@ protected void configure(ServerBuilder sb) throws Exception { }) .thenApply(aggregated -> aggregated.stream().collect( Collectors.toMap(Entry::getKey, Entry::getValue))) - .thenApply(aggregated -> { + .thenApplyAsync(aggregated -> { final StringBuilder responseStringBuilder = new StringBuilder(); try { final HashCode file1Hash = @@ -140,7 +140,7 @@ protected void configure(ServerBuilder sb) throws Exception { throw new RuntimeException(e); } return HttpResponse.of(responseStringBuilder.toString()); - }))) + }, ctx.blockingTaskExecutor()))) .requestTimeout(Duration.ZERO) .maxRequestLength(0); } diff --git a/it/trace-context-leak/src/test/java/com/linecorp/armeria/internal/common/TraceRequestContextLeakTest.java b/it/trace-context-leak/src/test/java/com/linecorp/armeria/internal/common/TraceRequestContextLeakTest.java index a6342750f94..7ba506cbe3b 100644 --- a/it/trace-context-leak/src/test/java/com/linecorp/armeria/internal/common/TraceRequestContextLeakTest.java +++ b/it/trace-context-leak/src/test/java/com/linecorp/armeria/internal/common/TraceRequestContextLeakTest.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -36,6 +37,7 @@ import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.RequestContext; import com.linecorp.armeria.common.util.SafeCloseable; +import com.linecorp.armeria.common.util.ThreadFactories; import com.linecorp.armeria.server.ServiceRequestContext; import com.linecorp.armeria.testing.junit5.common.EventLoopExtension; import com.linecorp.armeria.testing.junit5.common.EventLoopGroupExtension; @@ -46,10 +48,12 @@ class TraceRequestContextLeakTest { @RegisterExtension - static final EventLoopExtension eventLoopExtension = new EventLoopExtension(); + static final EventLoopExtension eventLoopExtension = + new EventLoopExtension(ThreadFactories.newThreadFactory("trace-test", false)); @RegisterExtension - static final EventLoopGroupExtension eventLoopGroupExtension = new EventLoopGroupExtension(2); + static final EventLoopGroupExtension eventLoopGroupExtension = + new EventLoopGroupExtension(2, ThreadFactories.newThreadFactory("trace-test-group", false)); @Test void singleThreadContextNotLeak() throws InterruptedException { @@ -83,7 +87,7 @@ void singleThreadContextNotLeak() throws InterruptedException { } }); - latch.await(); + assertThat(latch.await(1, TimeUnit.MINUTES)).isTrue(); assertThat(isThrown).isFalse(); } @@ -114,7 +118,7 @@ void singleThreadContextLeak() throws InterruptedException { await().untilTrue(isThrown); assertThat(exception.get()) - .hasMessageContaining("singleThreadContextLeak$2(TraceRequestContextLeakTest.java:101)"); + .hasMessageContaining("the callback was called from unexpected thread"); } } @@ -154,7 +158,7 @@ void multiThreadContextLeakNotInterfereOthersEventLoop() throws InterruptedExcep } }); - latch.await(); + assertThat(latch.await(1, TimeUnit.MINUTES)).isTrue(); assertThat(isThrown).isFalse(); } } @@ -206,7 +210,7 @@ void multiThreadContextLeak() throws InterruptedException { await().untilTrue(isThrown); assertThat(exception.get()) - .hasMessageContaining("multiThreadContextLeak$7(TraceRequestContextLeakTest.java:180)"); + .hasMessageContaining("Trying to call object wrapped with context"); } } @@ -235,7 +239,7 @@ void multipleRequestContextPushBeforeLeak() { @Test @SuppressWarnings("MustBeClosedChecker") - void connerCase() { + void cornerCase() { final AtomicReference exception = new AtomicReference<>(); try (DeferredClose deferredClose = new DeferredClose()) { @@ -253,7 +257,7 @@ void connerCase() { } } assertThat(exception.get()) - .hasMessageContaining("connerCase(TraceRequestContextLeakTest.java:245)"); + .hasMessageContaining("is not the same as the context in the storage"); } private static ServiceRequestContext newCtx(String path) { diff --git a/junit4/src/main/java/com/linecorp/armeria/testing/junit4/common/AbstractEventLoopGroupRule.java b/junit4/src/main/java/com/linecorp/armeria/testing/junit4/common/AbstractEventLoopGroupRule.java index 3335c69bafb..f2e1a9cbd8e 100644 --- a/junit4/src/main/java/com/linecorp/armeria/testing/junit4/common/AbstractEventLoopGroupRule.java +++ b/junit4/src/main/java/com/linecorp/armeria/testing/junit4/common/AbstractEventLoopGroupRule.java @@ -15,6 +15,8 @@ */ package com.linecorp.armeria.testing.junit4.common; +import java.util.concurrent.ThreadFactory; + import org.junit.rules.ExternalResource; import org.junit.rules.TestRule; @@ -25,8 +27,8 @@ abstract class AbstractEventLoopGroupRule extends ExternalResource { private final EventLoopGroupRuleDelegate delegate; - AbstractEventLoopGroupRule(int numThreads, String threadNamePrefix, boolean useDaemonThreads) { - delegate = new EventLoopGroupRuleDelegate(numThreads, threadNamePrefix, useDaemonThreads); + AbstractEventLoopGroupRule(int numThreads, ThreadFactory threadFactory) { + delegate = new EventLoopGroupRuleDelegate(numThreads, threadFactory); } EventLoopGroup group() { diff --git a/junit4/src/main/java/com/linecorp/armeria/testing/junit4/common/EventLoopGroupRule.java b/junit4/src/main/java/com/linecorp/armeria/testing/junit4/common/EventLoopGroupRule.java index 32b7a4fc6e5..636d31d740b 100644 --- a/junit4/src/main/java/com/linecorp/armeria/testing/junit4/common/EventLoopGroupRule.java +++ b/junit4/src/main/java/com/linecorp/armeria/testing/junit4/common/EventLoopGroupRule.java @@ -15,8 +15,12 @@ */ package com.linecorp.armeria.testing.junit4.common; +import java.util.concurrent.ThreadFactory; + import org.junit.rules.TestRule; +import com.linecorp.armeria.common.util.ThreadFactories; + import io.netty.channel.EventLoopGroup; /** @@ -78,7 +82,17 @@ public EventLoopGroupRule(int numThreads, String threadNamePrefix) { * @param useDaemonThreads whether to create daemon threads or not */ public EventLoopGroupRule(int numThreads, String threadNamePrefix, boolean useDaemonThreads) { - super(numThreads, threadNamePrefix, useDaemonThreads); + this(numThreads, ThreadFactories.newEventLoopThreadFactory(threadNamePrefix, useDaemonThreads)); + } + + /** + * Creates a new {@link TestRule} that provides an {@link EventLoopGroup}. + * + * @param numThreads the number of event loop threads + * @param threadFactory the factory used to create threads. + */ + public EventLoopGroupRule(int numThreads, ThreadFactory threadFactory) { + super(numThreads, threadFactory); } /** diff --git a/junit4/src/main/java/com/linecorp/armeria/testing/junit4/common/EventLoopRule.java b/junit4/src/main/java/com/linecorp/armeria/testing/junit4/common/EventLoopRule.java index 701bf181f33..88f0d3bdc11 100644 --- a/junit4/src/main/java/com/linecorp/armeria/testing/junit4/common/EventLoopRule.java +++ b/junit4/src/main/java/com/linecorp/armeria/testing/junit4/common/EventLoopRule.java @@ -15,8 +15,12 @@ */ package com.linecorp.armeria.testing.junit4.common; +import java.util.concurrent.ThreadFactory; + import org.junit.rules.TestRule; +import com.linecorp.armeria.common.util.ThreadFactories; + import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; @@ -71,7 +75,16 @@ public EventLoopRule(String threadNamePrefix) { * @param useDaemonThread whether to create a daemon thread or not */ public EventLoopRule(String threadNamePrefix, boolean useDaemonThread) { - super(1, threadNamePrefix, useDaemonThread); + super(1, ThreadFactories.newEventLoopThreadFactory(threadNamePrefix, useDaemonThread)); + } + + /** + * Creates a new {@link TestRule} that provides an {@link EventLoop}. + * + * @param threadFactory the factory used to create threads. + */ + public EventLoopRule(ThreadFactory threadFactory) { + super(1, threadFactory); } /** diff --git a/junit5/src/main/java/com/linecorp/armeria/internal/testing/EventLoopGroupRuleDelegate.java b/junit5/src/main/java/com/linecorp/armeria/internal/testing/EventLoopGroupRuleDelegate.java index a2ea5ba90c1..6e981465fec 100644 --- a/junit5/src/main/java/com/linecorp/armeria/internal/testing/EventLoopGroupRuleDelegate.java +++ b/junit5/src/main/java/com/linecorp/armeria/internal/testing/EventLoopGroupRuleDelegate.java @@ -16,6 +16,8 @@ package com.linecorp.armeria.internal.testing; +import java.util.concurrent.ThreadFactory; + import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.util.EventLoopGroups; @@ -27,16 +29,14 @@ public final class EventLoopGroupRuleDelegate { private final int numThreads; - private final String threadNamePrefix; - private final boolean useDaemonThreads; + private final ThreadFactory threadFactory; @Nullable private volatile EventLoopGroup group; - public EventLoopGroupRuleDelegate(int numThreads, String threadNamePrefix, boolean useDaemonThreads) { + public EventLoopGroupRuleDelegate(int numThreads, ThreadFactory threadFactory) { this.numThreads = numThreads; - this.threadNamePrefix = threadNamePrefix; - this.useDaemonThreads = useDaemonThreads; + this.threadFactory = threadFactory; } public EventLoopGroup group() { @@ -48,7 +48,7 @@ public EventLoopGroup group() { } public void before() throws Throwable { - group = EventLoopGroups.newEventLoopGroup(numThreads, threadNamePrefix, useDaemonThreads); + group = EventLoopGroups.newEventLoopGroup(numThreads, threadFactory); } public void after() { diff --git a/junit5/src/main/java/com/linecorp/armeria/testing/junit5/common/AbstractEventLoopGroupExtension.java b/junit5/src/main/java/com/linecorp/armeria/testing/junit5/common/AbstractEventLoopGroupExtension.java index 27908da11b4..b2a7c7203b6 100644 --- a/junit5/src/main/java/com/linecorp/armeria/testing/junit5/common/AbstractEventLoopGroupExtension.java +++ b/junit5/src/main/java/com/linecorp/armeria/testing/junit5/common/AbstractEventLoopGroupExtension.java @@ -16,6 +16,8 @@ package com.linecorp.armeria.testing.junit5.common; +import java.util.concurrent.ThreadFactory; + import org.junit.jupiter.api.extension.Extension; import org.junit.jupiter.api.extension.ExtensionContext; @@ -26,8 +28,8 @@ abstract class AbstractEventLoopGroupExtension extends AbstractAllOrEachExtension { private final EventLoopGroupRuleDelegate delegate; - AbstractEventLoopGroupExtension(int numThreads, String threadNamePrefix, boolean useDaemonThreads) { - delegate = new EventLoopGroupRuleDelegate(numThreads, threadNamePrefix, useDaemonThreads); + AbstractEventLoopGroupExtension(int numThreads, ThreadFactory threadFactory) { + delegate = new EventLoopGroupRuleDelegate(numThreads, threadFactory); } EventLoopGroup group() { diff --git a/junit5/src/main/java/com/linecorp/armeria/testing/junit5/common/EventLoopExtension.java b/junit5/src/main/java/com/linecorp/armeria/testing/junit5/common/EventLoopExtension.java index 42d642e56ab..b17832ef880 100644 --- a/junit5/src/main/java/com/linecorp/armeria/testing/junit5/common/EventLoopExtension.java +++ b/junit5/src/main/java/com/linecorp/armeria/testing/junit5/common/EventLoopExtension.java @@ -16,8 +16,12 @@ package com.linecorp.armeria.testing.junit5.common; +import java.util.concurrent.ThreadFactory; + import org.junit.jupiter.api.extension.Extension; +import com.linecorp.armeria.common.util.ThreadFactories; + import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; @@ -72,7 +76,16 @@ public EventLoopExtension(String threadNamePrefix) { * @param useDaemonThread whether to create a daemon thread or not */ public EventLoopExtension(String threadNamePrefix, boolean useDaemonThread) { - super(1, threadNamePrefix, useDaemonThread); + this(ThreadFactories.newEventLoopThreadFactory(threadNamePrefix, useDaemonThread)); + } + + /** + * Creates a new {@link Extension} that provides an {@link EventLoop}. + * + * @param threadFactory the factory used to create threads. + */ + public EventLoopExtension(ThreadFactory threadFactory) { + super(1, threadFactory); } /** diff --git a/junit5/src/main/java/com/linecorp/armeria/testing/junit5/common/EventLoopGroupExtension.java b/junit5/src/main/java/com/linecorp/armeria/testing/junit5/common/EventLoopGroupExtension.java index 9db7a3b0d4a..b692558e052 100644 --- a/junit5/src/main/java/com/linecorp/armeria/testing/junit5/common/EventLoopGroupExtension.java +++ b/junit5/src/main/java/com/linecorp/armeria/testing/junit5/common/EventLoopGroupExtension.java @@ -16,8 +16,12 @@ package com.linecorp.armeria.testing.junit5.common; +import java.util.concurrent.ThreadFactory; + import org.junit.jupiter.api.extension.Extension; +import com.linecorp.armeria.common.util.ThreadFactories; + import io.netty.channel.EventLoopGroup; /** @@ -79,7 +83,17 @@ public EventLoopGroupExtension(int numThreads, String threadNamePrefix) { * @param useDaemonThreads whether to create daemon threads or not */ public EventLoopGroupExtension(int numThreads, String threadNamePrefix, boolean useDaemonThreads) { - super(numThreads, threadNamePrefix, useDaemonThreads); + this(numThreads, ThreadFactories.newEventLoopThreadFactory(threadNamePrefix, useDaemonThreads)); + } + + /** + * Creates a new {@link Extension} that provides an {@link EventLoopGroup}. + * + * @param numThreads the number of event loop threads + * @param threadFactory the factory used to create threads. + */ + public EventLoopGroupExtension(int numThreads, ThreadFactory threadFactory) { + super(numThreads, threadFactory); } /** diff --git a/retrofit2/src/main/java/com/linecorp/armeria/client/retrofit2/RetrofitBlockHoundIntegration.java b/retrofit2/src/main/java/com/linecorp/armeria/client/retrofit2/RetrofitBlockHoundIntegration.java new file mode 100644 index 00000000000..256d92bf0b1 --- /dev/null +++ b/retrofit2/src/main/java/com/linecorp/armeria/client/retrofit2/RetrofitBlockHoundIntegration.java @@ -0,0 +1,34 @@ +/* + * Copyright 2022 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.retrofit2; + +import com.linecorp.armeria.common.annotation.UnstableApi; + +import reactor.blockhound.BlockHound.Builder; +import reactor.blockhound.integration.BlockHoundIntegration; + +/** + * A {@link BlockHoundIntegration} for the retrofit2 module. + */ +@UnstableApi +public final class RetrofitBlockHoundIntegration implements BlockHoundIntegration { + + @Override + public void applyTo(Builder builder) { + builder.allowBlockingCallsInside("com.linecorp.armeria.client.retrofit2.PipeBuffer$PipeSource", "read"); + } +} diff --git a/retrofit2/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration b/retrofit2/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration new file mode 100644 index 00000000000..b896ee6116e --- /dev/null +++ b/retrofit2/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration @@ -0,0 +1 @@ +com.linecorp.armeria.client.retrofit2.RetrofitBlockHoundIntegration diff --git a/saml/src/main/java/com/linecorp/armeria/server/saml/SamlDecorator.java b/saml/src/main/java/com/linecorp/armeria/server/saml/SamlDecorator.java index d23ab17a35b..23806dc9984 100644 --- a/saml/src/main/java/com/linecorp/armeria/server/saml/SamlDecorator.java +++ b/saml/src/main/java/com/linecorp/armeria/server/saml/SamlDecorator.java @@ -137,7 +137,7 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) throws Exc }).thenCompose(arg -> { return ssoHandler.beforeInitiatingSso(ctx, req, arg.messageContext, arg.idpConfig) .thenApply(unused -> arg); - }).thenApply(arg -> { + }).thenApplyAsync(arg -> { final SAMLBindingContext bindingContext = arg.messageContext.getSubcontext(SAMLBindingContext.class); final String relayState = bindingContext != null ? bindingContext.getRelayState() : null; @@ -153,6 +153,7 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) throws Exc signingCredential, sp.signatureAlgorithm(), relayState)); } else { + // signing can incur a blocking call final String value = toSignedBase64( arg.messageContext.getMessage(), signingCredential, @@ -166,7 +167,7 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) throws Exc } catch (SamlException e) { return fail(ctx, e); } - }).exceptionally(e -> fail(ctx, e))); + }, ctx.blockingTaskExecutor()).exceptionally(e -> fail(ctx, e))); })); } diff --git a/saml/src/main/java/com/linecorp/armeria/server/saml/SamlService.java b/saml/src/main/java/com/linecorp/armeria/server/saml/SamlService.java index 5454b1e399e..498fc351cef 100644 --- a/saml/src/main/java/com/linecorp/armeria/server/saml/SamlService.java +++ b/saml/src/main/java/com/linecorp/armeria/server/saml/SamlService.java @@ -152,7 +152,7 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) throws Exc } else { f = portConfigHolder.future().thenCompose(unused -> req.aggregate()); } - return HttpResponse.from(f.handle((aggregatedReq, cause) -> { + return HttpResponse.from(f.handleAsync((aggregatedReq, cause) -> { if (cause != null) { logger.warn("{} Failed to aggregate a SAML request.", ctx, cause); return HttpResponse.of(HttpStatus.BAD_REQUEST, MediaType.PLAIN_TEXT_UTF_8, @@ -177,8 +177,9 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) throws Exc // If there's no hostname set by a user, the default virtual hostname will be used. final String defaultHostname = firstNonNull(sp.hostname(), ctx.config().virtualHost().defaultHostname()); + // assertion, logout requests incur blocking calls return func.serve(ctx, aggregatedReq, defaultHostname, portConfig); - })); + }, ctx.blockingTaskExecutor())); } /** diff --git a/sangria/sangria_2.13/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration b/sangria/sangria_2.13/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration new file mode 100644 index 00000000000..70c990557ca --- /dev/null +++ b/sangria/sangria_2.13/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration @@ -0,0 +1 @@ +com.linecorp.armeria.server.sangria.SangriaBlockHoundIntegration diff --git a/sangria/sangria_2.13/src/main/scala/com/linecorp/armeria/server/sangria/SangriaBlockHoundIntegration.scala b/sangria/sangria_2.13/src/main/scala/com/linecorp/armeria/server/sangria/SangriaBlockHoundIntegration.scala new file mode 100644 index 00000000000..ef473bf56ac --- /dev/null +++ b/sangria/sangria_2.13/src/main/scala/com/linecorp/armeria/server/sangria/SangriaBlockHoundIntegration.scala @@ -0,0 +1,33 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.server.sangria + +import com.linecorp.armeria.common.annotation.UnstableApi +import reactor.blockhound.BlockHound +import reactor.blockhound.integration.BlockHoundIntegration + +/** + * A [[BlockHoundIntegration]] for the sangria module. + */ +@UnstableApi +final class SangriaBlockHoundIntegration extends BlockHoundIntegration { + + override def applyTo(builder: BlockHound.Builder): Unit = { + builder.allowBlockingCallsInside("sangria.parser.QueryParser$", "parse") + builder.allowBlockingCallsInside("com.thoughtworks.paranamer.CachingParanamer", "lookupParameterNames") + } +} diff --git a/scala/scala_2.13/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration b/scala/scala_2.13/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration new file mode 100644 index 00000000000..55eeeb743ba --- /dev/null +++ b/scala/scala_2.13/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration @@ -0,0 +1 @@ +com.linecorp.armeria.scala.ScalaBlockHoundIntegration diff --git a/scala/scala_2.13/src/main/scala/com/linecorp/armeria/scala/ScalaBlockHoundIntegration.scala b/scala/scala_2.13/src/main/scala/com/linecorp/armeria/scala/ScalaBlockHoundIntegration.scala new file mode 100644 index 00000000000..aa4fd5efe5f --- /dev/null +++ b/scala/scala_2.13/src/main/scala/com/linecorp/armeria/scala/ScalaBlockHoundIntegration.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.scala + +import com.linecorp.armeria.common.annotation.UnstableApi +import reactor.blockhound.BlockHound +import reactor.blockhound.integration.BlockHoundIntegration + +/** + * A [[BlockHoundIntegration]] for the scala module. + */ +@UnstableApi +final class ScalaBlockHoundIntegration extends BlockHoundIntegration { + + override def applyTo(builder: BlockHound.Builder): Unit = { + builder.allowBlockingCallsInside("com.thoughtworks.paranamer.CachingParanamer", "lookupParameterNames") + } +} diff --git a/settings/checkstyle/checkstyle-suppressions.xml b/settings/checkstyle/checkstyle-suppressions.xml index 69321c2c4d4..2b32f251f0d 100644 --- a/settings/checkstyle/checkstyle-suppressions.xml +++ b/settings/checkstyle/checkstyle-suppressions.xml @@ -16,7 +16,9 @@ - - + + + + diff --git a/settings/checkstyle/checkstyle.xml b/settings/checkstyle/checkstyle.xml index d7e7ee3b22e..5295650660c 100644 --- a/settings/checkstyle/checkstyle.xml +++ b/settings/checkstyle/checkstyle.xml @@ -125,6 +125,12 @@ + + + + + + diff --git a/testing-internal/src/main/java/com/linecorp/armeria/internal/testing/BlockingUtils.java b/testing-internal/src/main/java/com/linecorp/armeria/internal/testing/BlockingUtils.java new file mode 100644 index 00000000000..5d879ab5df4 --- /dev/null +++ b/testing-internal/src/main/java/com/linecorp/armeria/internal/testing/BlockingUtils.java @@ -0,0 +1,41 @@ +/* + * Copyright 2022 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.internal.testing; + +import org.junit.jupiter.api.function.Executable; +import org.junit.jupiter.api.function.ThrowingSupplier; + +public final class BlockingUtils { + + public static void blockingRun(Executable runnable) { + try { + runnable.execute(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + public static T blockingRun(ThrowingSupplier supplier) { + try { + return supplier.get(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + private BlockingUtils() {} +} diff --git a/testing-internal/src/main/java/com/linecorp/armeria/internal/testing/InternalTestingBlockHoundIntegration.java b/testing-internal/src/main/java/com/linecorp/armeria/internal/testing/InternalTestingBlockHoundIntegration.java new file mode 100644 index 00000000000..9a4d4a16720 --- /dev/null +++ b/testing-internal/src/main/java/com/linecorp/armeria/internal/testing/InternalTestingBlockHoundIntegration.java @@ -0,0 +1,86 @@ +/* + * Copyright 2022 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.internal.testing; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; + +import reactor.blockhound.BlockHound.Builder; +import reactor.blockhound.BlockingMethod; +import reactor.blockhound.integration.BlockHoundIntegration; + +public final class InternalTestingBlockHoundIntegration implements BlockHoundIntegration { + + private static final OutputStream NULL = new OutputStream() { + @Override + public void write(int b) throws IOException { + } + }; + + static final PrintStream ps; + + static { + final String path = System.getProperties().getProperty("com.linecorp.armeria.blockhound.reportFile"); + if (path == null) { + ps = new PrintStream(NULL); + } else { + final File file = new File(path); + try { + ps = new PrintStream(file); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + Runtime.getRuntime().addShutdownHook(new Thread(ps::close)); + } + + @Override + public void applyTo(Builder builder) { + + // tests are allowed to block event loops + builder.allowBlockingCallsInside("com.linecorp.armeria.internal.testing.BlockingUtils", + "sleep"); + builder.allowBlockingCallsInside("com.linecorp.armeria.internal.testing.BlockingUtils", + "join"); + builder.allowBlockingCallsInside("com.linecorp.armeria.internal.testing.BlockingUtils", + "acquireUninterruptibly"); + builder.allowBlockingCallsInside("com.linecorp.armeria.internal.testing.BlockingUtils", + "await"); + builder.allowBlockingCallsInside("com.linecorp.armeria.internal.testing.BlockingUtils", + "blockingRun"); + builder.allowBlockingCallsInside("org.assertj.core.api.Assertions", "assertThat"); + builder.allowBlockingCallsInside("net.javacrumbs.jsonunit.fluent.JsonFluentAssert", + "assertThatJson"); + builder.allowBlockingCallsInside("com.linecorp.armeria.testing.server.ServiceRequestContextCaptor$2", + "serve"); + + builder.allowBlockingCallsInside( + "com.linecorp.armeria.internal.testing.InternalTestingBlockHoundIntegration", + "writeBlockingMethod"); + + // prints the exception which makes it easier to debug issues + builder.blockingMethodCallback(this::writeBlockingMethod); + } + + void writeBlockingMethod(BlockingMethod m) { + ps.println(Thread.currentThread()); + new Exception(m.toString()).printStackTrace(ps); + ps.flush(); + } +} diff --git a/testing-internal/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration b/testing-internal/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration new file mode 100644 index 00000000000..c0224b4bab3 --- /dev/null +++ b/testing-internal/src/main/resources/META-INF/services/reactor.blockhound.integration.BlockHoundIntegration @@ -0,0 +1 @@ +com.linecorp.armeria.internal.testing.InternalTestingBlockHoundIntegration diff --git a/thrift/thrift0.13/src/test/java/com/linecorp/armeria/client/thrift/ThriftOverHttpClientTest.java b/thrift/thrift0.13/src/test/java/com/linecorp/armeria/client/thrift/ThriftOverHttpClientTest.java index 5df012c5109..375f784424a 100644 --- a/thrift/thrift0.13/src/test/java/com/linecorp/armeria/client/thrift/ThriftOverHttpClientTest.java +++ b/thrift/thrift0.13/src/test/java/com/linecorp/armeria/client/thrift/ThriftOverHttpClientTest.java @@ -62,6 +62,7 @@ import com.linecorp.armeria.client.Endpoint; import com.linecorp.armeria.client.InvalidResponseHeadersException; import com.linecorp.armeria.client.logging.LoggingRpcClient; +import com.linecorp.armeria.common.CommonPools; import com.linecorp.armeria.common.HttpHeaderNames; import com.linecorp.armeria.common.HttpHeaders; import com.linecorp.armeria.common.HttpResponse; @@ -79,6 +80,7 @@ import com.linecorp.armeria.common.thrift.ThriftReply; import com.linecorp.armeria.common.thrift.ThriftSerializationFormats; import com.linecorp.armeria.common.util.Exceptions; +import com.linecorp.armeria.internal.testing.BlockingUtils; import com.linecorp.armeria.server.HttpService; import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.server.ServiceRequestContext; @@ -257,7 +259,7 @@ static void init() throws Exception { final ClientDecorationBuilder decoBuilder = ClientDecoration.builder(); decoBuilder.addRpc((delegate, ctx, req) -> { if (recordMessageLogs) { - ctx.log().whenComplete().thenAccept(requestLogs::add); + ctx.log().whenComplete().thenAcceptAsync(requestLogs::add, CommonPools.blockingTaskExecutor()); } return delegate.execute(ctx, req); }); @@ -318,12 +320,14 @@ void testHelloServiceAsync( client.hello("kukuman" + num, new AsyncMethodCallback() { @Override public void onComplete(String response) { - assertThat(resultQueue.add(new AbstractMap.SimpleEntry<>(num, response))).isTrue(); + BlockingUtils.blockingRun(() -> assertThat(resultQueue.add( + new AbstractMap.SimpleEntry<>(num, response))).isTrue()); } @Override public void onError(Exception exception) { - assertThat(resultQueue.add(new AbstractMap.SimpleEntry<>(num, exception))).isTrue(); + BlockingUtils.blockingRun(() -> assertThat(resultQueue.add( + new AbstractMap.SimpleEntry<>(num, exception))).isTrue()); } }); } @@ -831,12 +835,13 @@ private static class RequestQueuingCallback implements AsyncMethodCallback { @Override public void onComplete(Object response) { - assertThat(resQueue.add(response == null ? "null" : response)).isTrue(); + BlockingUtils.blockingRun( + () -> assertThat(resQueue.add(response == null ? "null" : response)).isTrue()); } @Override public void onError(Exception exception) { - assertThat(resQueue.add(exception)).isTrue(); + BlockingUtils.blockingRun(() -> assertThat(resQueue.add(exception)).isTrue()); } } } diff --git a/zookeeper3/src/main/java/com/linecorp/armeria/client/zookeeper/ZooKeeperEndpointGroup.java b/zookeeper3/src/main/java/com/linecorp/armeria/client/zookeeper/ZooKeeperEndpointGroup.java index ec9acc8342e..7de3b4dad1c 100644 --- a/zookeeper3/src/main/java/com/linecorp/armeria/client/zookeeper/ZooKeeperEndpointGroup.java +++ b/zookeeper3/src/main/java/com/linecorp/armeria/client/zookeeper/ZooKeeperEndpointGroup.java @@ -34,10 +34,9 @@ import com.linecorp.armeria.client.endpoint.EndpointGroup; import com.linecorp.armeria.client.endpoint.EndpointSelectionStrategy; import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.common.util.ThreadFactories; import com.linecorp.armeria.server.zookeeper.ZooKeeperUpdatingListener; -import io.netty.util.concurrent.DefaultThreadFactory; - /** * A ZooKeeper-based {@link EndpointGroup} implementation. This {@link EndpointGroup} retrieves the list of * {@link Endpoint}s from a ZooKeeper and updates it when the children of the znode changes. @@ -49,7 +48,7 @@ public final class ZooKeeperEndpointGroup extends DynamicEndpointGroup { private static final Logger logger = LoggerFactory.getLogger(ZooKeeperEndpointGroup.class); private static final ThreadFactory closeCuratorFrameworkThreadFactory = - new DefaultThreadFactory("armeria-close-CuratorFramework"); + ThreadFactories.newThreadFactory("armeria-close-CuratorFramework", false); /** * Returns a new {@link ZooKeeperEndpointGroup} that retrieves the {@link Endpoint} list from