Skip to content

Commit

Permalink
fix fabric8io#3300: address the watchTest failures
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins authored and manusa committed May 25, 2022
1 parent 2d73f03 commit 895ff02
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 52 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#### Bugs
* Fix #2860: ensure that lockexceptions won't inhibit notification
* Fix #3300: addressed race connection with watch reconnects
* Fix #3832 #1883: simplifying the isHttpsAvailable check
* Fix #3745: the client will throw better exceptions when a namespace is not discernible for an operation
* Fix #3990: Throw exception when `HasMetadata` is used in `resources(...)` API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ public abstract class AbstractWatchManager<T extends HasMetadata> implements Wat
private ListOptions listOptions;
private URL requestUrl;

private final AtomicBoolean reconnectPending = new AtomicBoolean(false);

private final boolean receiveBookmarks;

AbstractWatchManager(
Expand All @@ -91,10 +89,10 @@ public abstract class AbstractWatchManager<T extends HasMetadata> implements Wat
this.listOptions = listOptions;
this.client = clientSupplier.get();

runWatch();
startWatch();
}

protected abstract void run(URL url, Map<String, String> headers);
protected abstract void start(URL url, Map<String, String> headers);

protected abstract void closeRequest();

Expand Down Expand Up @@ -126,14 +124,12 @@ final synchronized void cancelReconnect() {
}
}

/**
* Called to reestablish the connection. Should only be called once per request.
*/
void scheduleReconnect() {
if (!reconnectPending.compareAndSet(false, true)) {
logger.debug("Reconnect already scheduled");
return;
}

if (isForceClosed()) {
logger.debug("Ignoring error for already closed/closing connection");
logger.debug("Ignoring already closed/closing connection");
return;
}

Expand All @@ -147,26 +143,26 @@ void scheduleReconnect() {
long delay = nextReconnectInterval();

synchronized (this) {
reconnectAttempt = Utils.schedule(Utils.getCommonExecutorSerive(), () -> {
try {
runWatch();
if (isForceClosed()) {
closeRequest();
}
} catch (Exception e) {
// An unexpected error occurred and we didn't even get an onFailure callback.
logger.error("Exception in reconnect", e);
close(new WatcherException("Unhandled exception in reconnect attempt", e));
} finally {
reconnectPending.set(false);
}
}, delay, TimeUnit.MILLISECONDS);
reconnectAttempt = Utils.schedule(Utils.getCommonExecutorSerive(), this::reconnect, delay, TimeUnit.MILLISECONDS);
if (isForceClosed()) {
cancelReconnect();
}
}
}

synchronized void reconnect() {
try {
startWatch();
if (isForceClosed()) {
closeRequest();
}
} catch (Exception e) {
// An unexpected error occurred and we didn't even get an onFailure callback.
logger.error("Exception in reconnect", e);
close(new WatcherException("Unhandled exception in reconnect attempt", e));
}
}

final boolean cannotReconnect() {
return !watcher.reconnecting() && currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0;
}
Expand Down Expand Up @@ -203,7 +199,10 @@ void updateResourceVersion(final String newResourceVersion) {
resourceVersion.set(newResourceVersion);
}

protected void runWatch() {
/**
* Async start of the watch
*/
protected void startWatch() {
listOptions.setResourceVersion(resourceVersion.get());
URL url = BaseOperation.appendListOptionParams(requestUrl, listOptions);

Expand All @@ -218,7 +217,7 @@ protected void runWatch() {
logger.debug("Watching {}...", url);

closeRequest(); // only one can be active at a time
run(url, headers);
start(url, headers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public CompletableFuture<WebSocket> getWebsocketFuture() {
}

@Override
protected void run(URL url, Map<String, String> headers) {
protected void start(URL url, Map<String, String> headers) {
this.listener = new WatcherWebSocketListener<>(this);
Builder builder = client.newWebSocketBuilder();
headers.forEach(builder::header);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public WatchHTTPManager(final HttpClient client,
}

@Override
protected synchronized void run(URL url, Map<String, String> headers) {
protected synchronized void start(URL url, Map<String, String> headers) {
HttpRequest.Builder builder = client.newHttpRequestBuilder().url(url);
headers.forEach(builder::header);
call = client.consumeLines(builder.build(), (s, a) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.fabric8.kubernetes.client.dsl.internal;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.http.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -27,13 +26,10 @@
class WatcherWebSocketListener<T extends HasMetadata> implements WebSocket.Listener {
protected static final Logger logger = LoggerFactory.getLogger(WatcherWebSocketListener.class);

// don't allow for concurrent failure and message processing
// if something is holding the message thread, this can lead to concurrent processing on the watcher
// or worse additional reconnection attempts while the previous threads are still held
private final Object reconnectLock = new Object();

protected final AbstractWatchManager<T> manager;

private boolean reconnected = false;

protected WatcherWebSocketListener(AbstractWatchManager<T> manager) {
this.manager = manager;
}
Expand All @@ -46,25 +42,16 @@ public void onOpen(final WebSocket webSocket) {

@Override
public void onError(WebSocket webSocket, Throwable t) {
if (manager.isForceClosed()) {
logger.debug("Ignoring onFailure for already closed/closing websocket", t);
return;
}

if (manager.cannotReconnect()) {
manager.close(new WatcherException("Connection failure", t));
return;
}

synchronized (reconnectLock) {
manager.scheduleReconnect();
}
logger.debug("WebSocket error received", t);
scheduleReconnect();
}

@Override
public void onMessage(WebSocket webSocket, String text) {
webSocket.request();
synchronized (reconnectLock) {
// onMesssage and onClose are serialized, but it's not specified if onError
// may occur simultaneous with onMessage. So we prevent concurrent processing
synchronized (this) {
manager.onMessage(text);
}
}
Expand All @@ -78,7 +65,14 @@ public void onMessage(WebSocket webSocket, ByteBuffer bytes) {
public void onClose(WebSocket webSocket, int code, String reason) {
logger.debug("WebSocket close received. code: {}, reason: {}", code, reason);
webSocket.sendClose(code, reason);
manager.scheduleReconnect();
scheduleReconnect();
}

private synchronized void scheduleReconnect() {
if (!reconnected) {
manager.scheduleReconnect();
reconnected = true;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -146,6 +149,41 @@ void cancelReconnectNonNullAttempt() throws MalformedURLException {
verify(sf, times(1)).cancel(true);
}

@Test
void reconnectRace() throws Exception {
// Given
final WatcherAdapter<HasMetadata> watcher = new WatcherAdapter<>();
CompletableFuture<Void> done = new CompletableFuture<Void>();
final WatchManager<HasMetadata> awm = new WatchManager<HasMetadata>(
watcher, mock(ListOptions.class, RETURNS_DEEP_STUBS), 1, 0, 0) {

boolean first = true;

@Override
protected void startWatch() {
if (first) {
first = false;
// simulate failing before the call to startWatch finishes
ForkJoinPool.commonPool().execute(this::scheduleReconnect);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError(e);
}
} else {
done.complete(null);
}
}
};

// When
awm.cancelReconnect();
// Then

done.get(5, TimeUnit.SECONDS);
}

@Test
@DisplayName("isClosed, after close invocation, should return true")
void isForceClosedWhenClosed() throws MalformedURLException {
Expand Down Expand Up @@ -194,7 +232,7 @@ public void onClose() {
}
}

private static final class WatchManager<T extends HasMetadata> extends AbstractWatchManager<T> {
private static class WatchManager<T extends HasMetadata> extends AbstractWatchManager<T> {

private final AtomicInteger closeCount = new AtomicInteger(0);

Expand All @@ -205,7 +243,7 @@ public WatchManager(Watcher<T> watcher, ListOptions listOptions, int reconnectLi
}

@Override
protected void run(URL url, Map<String, String> headers) {
protected void start(URL url, Map<String, String> headers) {
}

@Override
Expand All @@ -214,7 +252,7 @@ protected void closeRequest() {
}

@Override
protected void runWatch() {
protected void startWatch() {
}
}
}

0 comments on commit 895ff02

Please sign in to comment.