diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/DefaultPooledConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/DefaultPooledConnectionProvider.java index 50408d21bc..b8e8817fdc 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/DefaultPooledConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/DefaultPooledConnectionProvider.java @@ -223,7 +223,8 @@ public void run() { log.debug(format(c, "Immediately aborted pooled channel, re-acquiring new channel")); } pool.acquire(Duration.ofMillis(pendingAcquireTimeout)) - .subscribe(new DisposableAcquire(this)); + .contextWrite(ctx -> ctx.put(CONTEXT_CALLER_EVENTLOOP, c.eventLoop())) + .subscribe(new DisposableAcquire(this)); } else { sink.error(new IOException("Error while acquiring from " + pool)); diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java index 13e9cae80e..3a2a97ed96 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java @@ -75,7 +75,7 @@ public abstract class PooledConnectionProvider implements /** * Context key used to propagate the caller event loop in the connection pool subscription. */ - final static String CONTEXT_CALLER_EVENTLOOP = "callereventloop"; + protected final static String CONTEXT_CALLER_EVENTLOOP = "callereventloop"; final PoolFactory defaultPoolFactory; final Map> poolFactoryPerRemoteHost = new HashMap<>(); @@ -146,7 +146,8 @@ else if (Metrics.isInstrumentationAvailable()) { return newPool; }); - EventLoop eventLoop = config.loopResources().onClient(config.isPreferNative()).next(); + EventLoop eventLoop = sink.contextView().getOrDefault(CONTEXT_CALLER_EVENTLOOP, + config.loopResources().onClient(config.isPreferNative()).next()); pool.acquire(Duration.ofMillis(poolFactory.pendingAcquireTimeout)) .contextWrite(ctx -> ctx.put(CONTEXT_CALLER_EVENTLOOP, eventLoop)) .subscribe(createDisposableAcquire(config, connectionObserver, diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java index 0320ab3423..cf76c73902 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java @@ -344,6 +344,7 @@ public void operationComplete(Future future) { "re-acquiring a new channel")); } pool.acquire(Duration.ofMillis(pendingAcquireTimeout)) + .contextWrite(ctx -> ctx.put(CONTEXT_CALLER_EVENTLOOP, channel.eventLoop())) .subscribe(new DisposableAcquire(this)); } else {