Skip to content

Commit

Permalink
Avoid NoSuchElementException when an acquiring connection is closing (#…
Browse files Browse the repository at this point in the history
…2110)

Avoid NoSuchElementException when an acquiring connection is concurrently closed. Also, propagate event loop in context when re-acquiring a failed h2 connection.

Fixes #2113
  • Loading branch information
pderop authored Apr 1, 2022
1 parent 00926e9 commit 51eabd6
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ public void run() {
log.debug(format(c, "Immediately aborted pooled channel, re-acquiring new channel"));
}
pool.acquire(Duration.ofMillis(pendingAcquireTimeout))
.subscribe(new DisposableAcquire(this));
.contextWrite(ctx -> ctx.put(CONTEXT_CALLER_EVENTLOOP, c.eventLoop()))
.subscribe(new DisposableAcquire(this));
}
else {
sink.error(new IOException("Error while acquiring from " + pool));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public abstract class PooledConnectionProvider<T extends Connection> implements
/**
* Context key used to propagate the caller event loop in the connection pool subscription.
*/
final static String CONTEXT_CALLER_EVENTLOOP = "callereventloop";
protected final static String CONTEXT_CALLER_EVENTLOOP = "callereventloop";

final PoolFactory<T> defaultPoolFactory;
final Map<SocketAddress, PoolFactory<T>> poolFactoryPerRemoteHost = new HashMap<>();
Expand Down Expand Up @@ -146,7 +146,8 @@ else if (Metrics.isInstrumentationAvailable()) {
return newPool;
});

EventLoop eventLoop = config.loopResources().onClient(config.isPreferNative()).next();
EventLoop eventLoop = sink.contextView().getOrDefault(CONTEXT_CALLER_EVENTLOOP,
config.loopResources().onClient(config.isPreferNative()).next());
pool.acquire(Duration.ofMillis(poolFactory.pendingAcquireTimeout))
.contextWrite(ctx -> ctx.put(CONTEXT_CALLER_EVENTLOOP, eventLoop))
.subscribe(createDisposableAcquire(config, connectionObserver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ public void operationComplete(Future<Http2StreamChannel> future) {
"re-acquiring a new channel"));
}
pool.acquire(Duration.ofMillis(pendingAcquireTimeout))
.contextWrite(ctx -> ctx.put(CONTEXT_CALLER_EVENTLOOP, channel.eventLoop()))
.subscribe(new DisposableAcquire(this));
}
else {
Expand Down

0 comments on commit 51eabd6

Please sign in to comment.