-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open node connections asynchronously #35144
Conversation
Pinging @elastic/es-distributed |
This was motivated by my work on #35095. In that issue we will need to introduce retries on connections. So I went ahead and started the work on async connections as part of that. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks awesome, thanks for doing this. I left some comments
private final CompletableContext<Void> closeContext = new CompletableContext<>(); | ||
|
||
Netty4TcpChannel(Channel channel, String profile) { | ||
this(channel, profile, completedConnectContext()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we have the extra ctor here? is it for testing? i would love to remove this one it might be worth going the extra mile and do this in every test explicitly.
import java.util.concurrent.ConcurrentMap; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
class TcpTransportHandshaker { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be final and can have javadocs?!
private final ConnectionProfile connectionProfile; | ||
private final List<TcpChannel> channels; | ||
private final ActionListener<NodeChannels> listener; | ||
private final AtomicInteger pendingConnections; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd use CountDown.java
here instead for the pendingConnections
and an extra boolean if we failed or not. That is more straight forward and the impls will be simpler.
|
||
@Override | ||
public void onFailure(Exception e) { | ||
CloseableChannel.closeChannels(channels, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CloseableChannel.closeChannel
can throw so should we call the listener first or use try/finally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CloseableChannel.closeChannel
actually should not throw. CloseableChannel
close
signature does not include an exception.
static <C extends CloseableChannel> void closeChannels(List<C> channels, boolean blocking) {
try {
IOUtils.close(channels);
} catch (IOException e) {
// The CloseableChannel#close method does not throw IOException, so this should not occur.
throw new UncheckedIOException(e);
}
if (blocking) {
ArrayList<ActionFuture<Void>> futures = new ArrayList<>(channels.size());
for (final C channel : channels) {
PlainActionFuture<Void> closeFuture = PlainActionFuture.newFuture();
channel.addCloseListener(closeFuture);
futures.add(closeFuture);
}
blockOnFutures(futures);
}
}
Should I changed that to UncheckedIOException
to AssertionError
? I can still do your suggested try/catch
if you would like for safety.
} | ||
}); | ||
} catch (Exception ex) { | ||
CloseableChannel.closeChannels(channels, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CloseableChannel.closeChannel
can throw so should we call the listener first or use try/finally?
public void onFailure(Exception ex) { | ||
assert pendingConnections.get() != 0 : "Should not receive non-timeout connection exception if no connections pending."; | ||
if (setFailed()) { | ||
CloseableChannel.closeChannels(channels, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CloseableChannel.closeChannel
can throw so should we call the listener first or use try/finally?
|
||
public void onTimeout() { | ||
if (setFailed()) { | ||
CloseableChannel.closeChannels(channels, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CloseableChannel.closeChannel
can throw so should we call the listener first or use try/finally?
private final HandshakeRequestSender handshakeRequestSender; | ||
private final HandshakeResponseSender handshakeResponseSender; | ||
|
||
TcpTransportHandshaker(Version version, ThreadPool threadPool, HandshakeRequestSender handshakeRequestSender, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would see the value of a functional interface here if we had tests for it but it seems we don't. Did I miss it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a test.
@s1monw - I made changes based on your review. The primary thing that needs to be resolved is what you want done in regards to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left one comment LGTM otherwise
@@ -61,6 +59,24 @@ | |||
} | |||
} | |||
}); | |||
|
|||
if (connectFuture != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just not make it nullable? it would be nice to have it just be there all the time?
@@ -90,7 +89,7 @@ | |||
IOUtils.close(channels); | |||
} catch (IOException e) { | |||
// The CloseableChannel#close method does not throw IOException, so this should not occur. | |||
throw new UncheckedIOException(e); | |||
throw new AssertionError(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
This is related to elastic#29023. Additionally at other points we have discussed a preference for removing the need to unnecessarily block threads for opening new node connections. This commit lays the groudwork for this by opening connections asynchronously at the transport level. We still block, however, this work will make it possible to eventually remove all blocking on new connections out of the TransportService and Transport.
This is related to elastic#29023. Additionally at other points we have discussed a preference for removing the need to unnecessarily block threads for opening new node connections. This commit lays the groudwork for this by opening connections asynchronously at the transport level. We still block, however, this work will make it possible to eventually remove all blocking on new connections out of the TransportService and Transport.
This is a follow-up to #35144. That commit made the underlying connection opening process in TcpTransport asynchronous. However the method still blocked on the process being complete before returning. This commit moves the blocking to the ConnectionManager level. This is another step towards the top-level TransportService api being async.
This is a follow-up to elastic#35144. That commit made the underlying connection opening process in TcpTransport asynchronous. However the method still blocked on the process being complete before returning. This commit moves the blocking to the ConnectionManager level. This is another step towards the top-level TransportService api being async.
This is a follow-up to #35144. That commit made the underlying connection opening process in TcpTransport asynchronous. However the method still blocked on the process being complete before returning. This commit moves the blocking to the ConnectionManager level. This is another step towards the top-level TransportService api being async.
This is related to #29023. Additionally at other points we have
discussed a preference for removing the need to unnecessarily block
threads for opening new node connections. This commit lays the groudwork
for this by opening connections asynchronously at the transport level.
We still block, however, this work will make it possible to eventually
remove all blocking on new connections out of the TransportService
and Transport.