Skip to content

Commit

Permalink
[JENKINS-70334] Fix and simplify TcpSlaveAgentListenerRescheduler (#…
Browse files Browse the repository at this point in the history
…7547)

Co-authored-by: Basil Crow <[email protected]>
  • Loading branch information
Dohbedoh and basil authored Jan 20, 2023
1 parent 02351da commit 5755957
Showing 1 changed file with 32 additions and 129 deletions.
161 changes: 32 additions & 129 deletions core/src/main/java/hudson/TcpSlaveAgentListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import edu.umd.cs.findbugs.annotations.Nullable;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import hudson.model.AperiodicWork;
import hudson.slaves.OfflineCause;
import hudson.util.VersionNumber;
import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -81,7 +80,7 @@
@StaplerAccessibleType
public final class TcpSlaveAgentListener extends Thread {

private final ServerSocketChannel serverSocket;
private ServerSocketChannel serverSocket;
private volatile boolean shuttingDown;

public final int configuredPort;
Expand All @@ -92,24 +91,27 @@ public final class TcpSlaveAgentListener extends Thread {
*/
public TcpSlaveAgentListener(int port) throws IOException {
super("TCP agent listener port=" + port);
try {
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
} catch (BindException e) {
throw (BindException) new BindException("Failed to listen on port " + port + " because it's already in use.").initCause(e);
}
serverSocket = createSocket(port);
this.configuredPort = port;
setUncaughtExceptionHandler((t, e) -> {
LOGGER.log(Level.SEVERE, "Uncaught exception in TcpSlaveAgentListener " + t + ", attempting to reschedule thread", e);
LOGGER.log(Level.SEVERE, "Uncaught exception in TcpSlaveAgentListener " + t, e);
shutdown();
TcpSlaveAgentListenerRescheduler.schedule(t, e);
});

LOGGER.log(Level.FINE, "TCP agent listener started on port {0}", getPort());

start();
}

private static ServerSocketChannel createSocket(int port) throws IOException {
ServerSocketChannel result;
try {
result = ServerSocketChannel.open();
result.socket().bind(new InetSocketAddress(port));
} catch (BindException e) {
throw (BindException) new BindException("Failed to listen on port " + port + " because it's already in use.").initCause(e);
}
return result;
}

/**
* Gets the TCP port number in which we are listening.
*/
Expand Down Expand Up @@ -172,9 +174,9 @@ public VersionNumber getRemotingMinimumVersion() {

@Override
public void run() {
try {
// the loop eventually terminates when the socket is closed.
while (!shuttingDown) {
// the loop eventually terminates when the thread shuts down
while (!shuttingDown) {
try {
Socket s = serverSocket.accept().socket();

// this prevents a connection from silently terminated by the router in between or the other peer
Expand All @@ -184,18 +186,20 @@ public void run() {
// we take care of buffering on our own
s.setTcpNoDelay(true);

new ConnectionHandler(s, new ConnectionHandlerFailureCallback(this) {
@Override
public void run(Throwable cause) {
LOGGER.log(Level.WARNING, "Connection handler failed, restarting listener", cause);
shutdown();
TcpSlaveAgentListenerRescheduler.schedule(getParentThread(), cause);
new ConnectionHandler(s).start();
} catch (Throwable e) {
if (!shuttingDown) {
LOGGER.log(Level.SEVERE, "Failed to accept TCP connections", e);
if (!serverSocket.isOpen()) {
LOGGER.log(Level.INFO, "Restarting server socket");
try {
serverSocket = createSocket(configuredPort);
LOGGER.log(Level.INFO, "TCP agent listener restarted on port {0}", getPort());
} catch (IOException ioe) {
LOGGER.log(Level.WARNING, "Failed to restart server socket", ioe);
}
}
}).start();
}
} catch (IOException e) {
if (!shuttingDown) {
LOGGER.log(Level.SEVERE, "Failed to accept TCP connections", e);
}
}
}
}
Expand Down Expand Up @@ -234,21 +238,12 @@ private final class ConnectionHandler extends Thread {
*/
private final int id;

ConnectionHandler(Socket s, ConnectionHandlerFailureCallback parentTerminator) {
ConnectionHandler(Socket s) {
this.s = s;
synchronized (getClass()) {
id = iotaGen++;
}
setName("TCP agent connection handler #" + id + " with " + s.getRemoteSocketAddress());
setUncaughtExceptionHandler((t, e) -> {
LOGGER.log(Level.SEVERE, "Uncaught exception in TcpSlaveAgentListener ConnectionHandler " + t, e);
try {
s.close();
parentTerminator.run(e);
} catch (IOException e1) {
LOGGER.log(Level.WARNING, "Could not close socket after unexpected thread death", e1);
}
});
}

@Override
Expand Down Expand Up @@ -295,7 +290,7 @@ public void run() {
} catch (IOException ex) {
// try to clean up the socket
}
} catch (IOException e) {
} catch (Throwable e) {
if (e instanceof EOFException) {
LOGGER.log(Level.INFO, () -> "Connection " + connectionInfo + " failed: " + e.getMessage());
} else {
Expand Down Expand Up @@ -351,21 +346,6 @@ private void error(String msg, Socket s) throws IOException {
}
}

// This is essentially just to be able to pass the parent thread into the callback, as it can't access it otherwise
private abstract static class ConnectionHandlerFailureCallback {
private Thread parentThread;

ConnectionHandlerFailureCallback(Thread parentThread) {
this.parentThread = parentThread;
}

public Thread getParentThread() {
return parentThread;
}

public abstract void run(Throwable cause);
}

/**
* This extension provides a Ping protocol that allows people to verify that the {@link TcpSlaveAgentListener} is alive.
* We also use this to wake the acceptor thread on termination.
Expand Down Expand Up @@ -436,83 +416,6 @@ public boolean connect(Socket socket) throws IOException {
}
}

/**
* Reschedules the {@code TcpSlaveAgentListener} on demand. Disables itself after running.
*/
@Extension
@Restricted(NoExternalUse.class)
public static class TcpSlaveAgentListenerRescheduler extends AperiodicWork {
private Thread originThread;
private Throwable cause;
private long recurrencePeriod = 5000;
private boolean isActive;

public TcpSlaveAgentListenerRescheduler() {
isActive = false;
}

public TcpSlaveAgentListenerRescheduler(Thread originThread, Throwable cause) {
this.originThread = originThread;
this.cause = cause;
this.isActive = false;
}

public void setOriginThread(Thread originThread) {
this.originThread = originThread;
}

public void setCause(Throwable cause) {
this.cause = cause;
}

public void setActive(boolean active) {
isActive = active;
}

@Override
public long getRecurrencePeriod() {
return recurrencePeriod;
}

@Override
public AperiodicWork getNewInstance() {
return new TcpSlaveAgentListenerRescheduler(originThread, cause);
}

@Override
protected void doAperiodicRun() {
if (isActive) {
try {
if (originThread.isAlive()) {
originThread.interrupt();
}
int port = Jenkins.get().getSlaveAgentPort();
if (port != -1) {
new TcpSlaveAgentListener(port).start();
LOGGER.log(Level.INFO, "Restarted TcpSlaveAgentListener");
} else {
LOGGER.log(Level.SEVERE, "Uncaught exception in TcpSlaveAgentListener " + originThread + ". Port is disabled, not rescheduling", cause);
}
isActive = false;
} catch (IOException e) {
LOGGER.log(Level.SEVERE, "Could not reschedule TcpSlaveAgentListener - trying again.", cause);
}
}
}

public static void schedule(Thread originThread, Throwable cause) {
schedule(originThread, cause, 5000);
}

public static void schedule(Thread originThread, Throwable cause, long approxDelay) {
TcpSlaveAgentListenerRescheduler rescheduler = AperiodicWork.all().get(TcpSlaveAgentListenerRescheduler.class);
rescheduler.originThread = originThread;
rescheduler.cause = cause;
rescheduler.recurrencePeriod = approxDelay;
rescheduler.isActive = true;
}
}


/**
* Connection terminated because we are reconnected from the current peer.
Expand Down

0 comments on commit 5755957

Please sign in to comment.