Skip to content

Commit

Permalink
fix fabric8io#4802: using different threads for watch events
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Feb 22, 2023
1 parent f344495 commit 23ecaa3
Show file tree
Hide file tree
Showing 12 changed files with 86 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
* Fix #4659: The SupportTestingClient interface has been deprecated. Please use one of the supports methods or getApiGroup to determine what is available on the api server.
* Fix #4825: removed or deprecated/moved methods that are unrelated to the rolling timeout from ImageEditReplacePatchable. Deprecated rollout methods for timeout and edit - future versions will not support
* Fix #4826: removed RequestConfig upload connection and rolling timeouts. Both were no longer used with no plans to re-introduce their usage.
* Fix #4802: to ease developer burden, and potentially bad behavior with the vertx client, the callbacks for Watcher and ResourceEventHandler will no longer be made by an HttpClient thread, but rather from the thread pool associated with the KuberentesClient. Please ensure that if you are customizing the Executor supplied to the client that it has sufficient threads to handle these callbacks.

### 6.4.1 (2023-01-31)

Expand Down
10 changes: 5 additions & 5 deletions doc/FAQ.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ That will align all of the transitive Fabric8 Kubernetes Client dependencies to

There has been a lot of changes under the covers with thread utilization in the Fabric8 client over the 5.x and 6.x releases. So the exact details of what threads are created / used where will depend on the particular release version.

At the core the thread utilization will depend upon the http client implementation. Per client OkHttp maintains a pool of threads for task execution. It will dedicate 2 threads out of that pool per WebSocket connection. If you have a lot of WebSocket usage (Informer or Watches) with OkHttp, you can expect to see a large number of threads in use - which can potentially exhaust the OkHttp defaults.
At the core the thread utilization will depend upon the HTTP client implementation. Per client OkHttp maintains a pool of threads for task execution. It will dedicate 2 threads out of that pool per WebSocket connection. If you have a lot of WebSocket usage (Informer or Watches) with OkHttp, you can expect to see a large number of threads in use.

With the JDK http client it will only maintain a selector thread and a small worker pool which will be based upon your available processors per client. It does not matter how many Informers or Watches you run, the same worker pool is shared.
With the JDK, Jetty, or Vert.x http clients they will maintain a smaller worker pool for all tasks that is typically sized by default based upon your available processors, and typically a selector / coordinator thread(s). It does not matter how many Informers or Watches you run, the same threads are shared.

> **Note:** It is recommended with either HTTP client implementation that logic you supply via Watchers, ExecListeners, ResourceEventHandlers, Predicates, etc. does not execute long running tasks.
To ease developer burden the callbacks on Watchers and ResourceEventHandlers will not be done directly by an http client thread, but the order of calls will be guaranteed. This will make the logic you include in those callbacks tolerant to some blocking without compromising the on-going work at the HTTP client level. However you should avoid truly long running operations as this will cause further events to queue and may eventually case memory issues.

For non-ResourceEventHandlers call backs long-running operation can be a problem. When using the OkHttp client and default settings holding a IO thread inhibits websocket processing that can timeout the ping and may prevent additional requests since the okhttp client defaults to only 5 concurrent requests per host. When using the JDK http client the long running task will inhibit the use of that IO thread for ALL http processing. Note that calling other KubernetesClient operations, especially those with waits, can be long-running. We are working towards providing non-blocking mode for many of these operations, but until that is available consider using a separate task queue for such work.
> **Note:** It is recommended with any HTTP client implementation that logic you supply via Watchers, ExecListeners, ResourceEventHandlers, Predicates, Interceptors, LeaderCallbacks, etc. does not execute long running tasks.
On top of the http client threads the Fabric8 client maintains a task thread pool for scheduled tasks and for potentially long-running tasks that are called from WebSocket operations, such as handling input and output streams and ResourceEventHandler call backs. This thread pool defaults to an unlimited number of cached threads, which will be shutdown when the client is closed - that is a sensible default with either http client as the amount of concurrently running async tasks will typically be low. If you would rather take full control over the threading use KubernetesClientBuilder.withExecutor or KubernetesClientBuilder.withExecutorSupplier - however note that constraining this thread pool too much will result in a build up of event processing queues.
On top of the http client threads the Fabric8 client maintains a task thread pool for scheduled tasks and for tasks that are called from WebSocket operations, such as handling input and output streams and ResourceEventHandler call backs. This thread pool defaults to an unlimited number of cached threads, which will be shutdown when the client is closed - that is a sensible default with as the amount of concurrently running async tasks will typically be low. If you would rather take full control over the threading use KubernetesClientBuilder.withExecutor or KubernetesClientBuilder.withExecutorSupplier - however note that constraining this thread pool too much will result in a build up of event processing queues.

Finally the fabric8 client will use 1 thread per PortForward and an additional thread per socket connection - this may be improved upon in the future.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public class Config {

public static final String DEFAULT_MASTER_URL = "https://kubernetes.default.svc";
public static final Long DEFAULT_SCALE_TIMEOUT = 10 * 60 * 1000L;
public static final int DEFAULT_REQUEST_TIMEOUT = 10 * 1000;
public static final int DEFAULT_LOGGING_INTERVAL = 20 * 1000;
public static final Long DEFAULT_WEBSOCKET_TIMEOUT = 5 * 1000L;
public static final Long DEFAULT_WEBSOCKET_PING_INTERVAL = 30 * 1000L;
Expand Down Expand Up @@ -193,7 +194,7 @@ public class Config {
private int uploadRequestTimeout = DEFAULT_UPLOAD_REQUEST_TIMEOUT;
private int requestRetryBackoffLimit;
private int requestRetryBackoffInterval;
private int requestTimeout = 10 * 1000;
private int requestTimeout = DEFAULT_REQUEST_TIMEOUT;
private long scaleTimeout = DEFAULT_SCALE_TIMEOUT;
private int loggingInterval = DEFAULT_LOGGING_INTERVAL;
private long websocketTimeout = DEFAULT_WEBSOCKET_TIMEOUT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static io.fabric8.kubernetes.client.Config.DEFAULT_LOGGING_INTERVAL;
import static io.fabric8.kubernetes.client.Config.DEFAULT_REQUEST_RETRY_BACKOFFINTERVAL;
import static io.fabric8.kubernetes.client.Config.DEFAULT_REQUEST_RETRY_BACKOFFLIMIT;
import static io.fabric8.kubernetes.client.Config.DEFAULT_REQUEST_TIMEOUT;
import static io.fabric8.kubernetes.client.Config.DEFAULT_SCALE_TIMEOUT;
import static io.fabric8.kubernetes.client.Config.DEFAULT_UPLOAD_REQUEST_TIMEOUT;
import static io.fabric8.kubernetes.client.Config.DEFAULT_WEBSOCKET_TIMEOUT;
Expand All @@ -42,7 +43,7 @@ public class RequestConfig {
private int uploadRequestTimeout = DEFAULT_UPLOAD_REQUEST_TIMEOUT;
private int requestRetryBackoffLimit = DEFAULT_REQUEST_RETRY_BACKOFFLIMIT;
private int requestRetryBackoffInterval = DEFAULT_REQUEST_RETRY_BACKOFFINTERVAL;
private int requestTimeout = 10 * 1000;
private int requestTimeout = DEFAULT_REQUEST_TIMEOUT;
private long scaleTimeout = DEFAULT_SCALE_TIMEOUT;
private int loggingInterval = DEFAULT_LOGGING_INTERVAL;
private long websocketTimeout = DEFAULT_WEBSOCKET_TIMEOUT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ public interface Watcher<T> {

/**
* If the Watcher can reconnect itself after an error
*
* <p>
* Currently only used to indicate if the Watch should ignore the watch reconnect limit
*
* @return
*/
default boolean reconnecting() {
Expand All @@ -31,8 +33,7 @@ default boolean reconnecting() {
/**
* Handle the given event.
* <p>
* Should not be implemented with long-running logic as this method is called directly from
* an IO thread.
* Should not be implemented with long-running logic as that may lead to memory issues.
*/
void eventReceived(Action action, T resource);

Expand All @@ -46,9 +47,8 @@ default void onClose() {
/**
* Invoked when the watcher closes due to an Exception.
* <p>
* Should not be implemented with long-running logic as this method is called directly from
* an IO thread.
*
* Should not be implemented with long-running logic as that may lead to memory issues.
*
* @param cause What caused the watcher to be closed.
*/
void onClose(WatcherException cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@ public interface ResourceEventHandler<T> {

/**
* Called after an empty list is retrieved on start or after an HTTP GONE when the {@link Store} is empty
* <p>
* Should not be implemented with long-running logic as that may lead to memory issues.
*/
default void onNothing() {

}

/**
* Called when an object is added.
* <p>
* Should not be implemented with long-running logic as that may lead to memory issues.
*
* @param obj object
*/
Expand All @@ -46,6 +50,8 @@ default void onNothing() {
* were combined together, so you can't use this to see every single
* change. It is also called when a sync happens - oldObj will be
* the same as newObj.
* <p>
* Should not be implemented with long-running logic as that may lead to memory issues.
*
* @param oldObj old object
* @param newObj new object
Expand All @@ -57,6 +63,8 @@ default void onNothing() {
* it would get an object of the DeletedFinalStateUnknown. This can
* happen if the watch is closed and misses the delete event and
* we don't notice the deletion until the subsequent re-list.
* <p>
* Should not be implemented with long-running logic as that may lead to memory issues.
*
* @param obj object to delete
* @param deletedFinalStateUnknown get final state of item if it is known or not.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ default boolean hasSynced() {
* The resource version observed when last synced with the underlying store.
* The value returned is not synchronized with access to the underlying store
* and is not thread-safe.
* <p>
* Since the store processes events asynchronously this value should not be
* used as an indication of the last resourceVersion seen. Also after an
* informer is stopped any pending event processing may not happen.
*
* @return string value or null if never synced
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.fabric8.kubernetes.client.utils.ExponentialBackoffIntervalCalculator;
import io.fabric8.kubernetes.client.utils.Serialization;
import io.fabric8.kubernetes.client.utils.Utils;
import io.fabric8.kubernetes.client.utils.internal.SerialExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -50,6 +51,42 @@

public abstract class AbstractWatchManager<T extends HasMetadata> implements Watch {

private static final class SerialWatcher<T> implements Watcher<T> {
private final Watcher<T> watcher;
SerialExecutor serialExecutor;

private SerialWatcher(Watcher<T> watcher, SerialExecutor serialExecutor) {
this.watcher = watcher;
this.serialExecutor = serialExecutor;
}

@Override
public void eventReceived(Action action, T resource) {
serialExecutor.execute(() -> watcher.eventReceived(action, resource));
}

@Override
public void onClose(WatcherException cause) {
serialExecutor.execute(() -> {
watcher.onClose(cause);
serialExecutor.shutdownNow();
});
}

@Override
public void onClose() {
serialExecutor.execute(() -> {
watcher.onClose();
serialExecutor.shutdownNow();
});
}

@Override
public boolean reconnecting() {
return watcher.reconnecting();
}
}

public static class WatchRequestState {

private final AtomicBoolean reconnected = new AtomicBoolean();
Expand Down Expand Up @@ -79,7 +116,8 @@ public static class WatchRequestState {
AbstractWatchManager(
Watcher<T> watcher, BaseOperation<T, ?, ?> baseOperation, ListOptions listOptions, int reconnectLimit,
int reconnectInterval, Supplier<HttpClient> clientSupplier) throws MalformedURLException {
this.watcher = watcher;
// prevent the callbacks from happening in the httpclient thread
this.watcher = new SerialWatcher<>(watcher, new SerialExecutor(baseOperation.getOperationContext().getExecutor()));
this.reconnectLimit = reconnectLimit;
this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(reconnectInterval, reconnectLimit);
this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,16 @@ protected synchronized void scheduleNext() {
}
}

/**
* Shutdown the executor without executing any more tasks.
* <p>
* The the current task will be interrupting if it is not the thread that initiated the shutdown.
*/
public void shutdownNow() {
this.shutdown = true;
tasks.clear();
synchronized (threadLock) {
if (thread != null) {
if (thread != null && thread != Thread.currentThread()) {
thread.interrupt();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,19 @@ public void onClose() {
}
}

static BaseOperation mockOperation() {
BaseOperation operation = mock(BaseOperation.class, Mockito.RETURNS_DEEP_STUBS);
Mockito.when(operation.getOperationContext().getExecutor()).thenReturn(Runnable::run);
return operation;
}

private static class WatchManager<T extends HasMetadata> extends AbstractWatchManager<T> {

private final AtomicInteger closeCount = new AtomicInteger(0);

public WatchManager(Watcher<T> watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval)
throws MalformedURLException {
super(watcher, mock(BaseOperation.class), listOptions, reconnectLimit, reconnectInterval,
super(watcher, mockOperation(), listOptions, reconnectLimit, reconnectInterval,
() -> null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ void testReconnectOnException() throws MalformedURLException, InterruptedExcepti
DerivedClientBuilder builder = Mockito.mock(HttpClient.DerivedClientBuilder.class, Mockito.RETURNS_SELF);
Mockito.when(client.newBuilder()).thenReturn(builder);
Mockito.when(builder.build()).thenReturn(client);
BaseOperation baseOperation = Mockito.mock(BaseOperation.class);
BaseOperation baseOperation = AbstractWatchManagerTest.mockOperation();
Mockito.when(baseOperation.getNamespacedUrl()).thenReturn(new URL("http://localhost"));
CompletableFuture<HttpResponse<AsyncBody>> future = new CompletableFuture<>();
Mockito.when(client.consumeBytes(Mockito.any(), Mockito.any())).thenReturn(future);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
Expand Down Expand Up @@ -120,9 +121,9 @@ public void eventReceived(Action action, ConfigMap pod) {
}
try {
// introduce a delay to cause the ping to terminate the connection
// if this doesn't work reliably, then an alternative would be to
// restart the apiserver
Thread.sleep(10000);
// and that holds the thread for longer than the request timeout
Thread.sleep(Config.DEFAULT_REQUEST_TIMEOUT);
// TODO: could use withRequestConfig to use a shorter timeout
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError(e);
Expand Down Expand Up @@ -159,7 +160,7 @@ public void onClose() {
.build());

assertTrue(eventLatch.await(10, TimeUnit.SECONDS));
assertTrue(modifyLatch.await(15, TimeUnit.SECONDS));
assertTrue(modifyLatch.await(30, TimeUnit.SECONDS));
assertFalse(concurrent.get());
watch.close();
assertTrue(closeLatch.await(30, TimeUnit.SECONDS));
Expand Down

0 comments on commit 23ecaa3

Please sign in to comment.