From 760d9802ec670a0e470c0e821c411c77af3752d9 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Fri, 20 May 2022 10:15:33 -0400 Subject: [PATCH] adding missing reconnect from a recent change to the httpwatch logic --- .../client/dsl/internal/WatchHTTPManager.java | 1 + .../dsl/internal/WatchHttpManagerTest.java | 65 +++++++++++++++++++ 2 files changed, 66 insertions(+) create mode 100644 kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/WatchHttpManagerTest.java diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java index 4975159f080..eaf94fc2a51 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java @@ -71,6 +71,7 @@ protected synchronized void start(URL url, Map headers) { call.whenComplete((response, t) -> { if (!call.isCancelled() && t != null) { logger.info("Watch connection failed. reason: {}", t.getMessage()); + scheduleReconnect(); } if (response != null) { AsyncBody body = response.body(); diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/WatchHttpManagerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/WatchHttpManagerTest.java new file mode 100644 index 00000000000..0143ccdde25 --- /dev/null +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/WatchHttpManagerTest.java @@ -0,0 +1,65 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.fabric8.kubernetes.client.dsl.internal; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.KubernetesResourceList; +import io.fabric8.kubernetes.api.model.ListOptions; +import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.http.HttpClient; +import io.fabric8.kubernetes.client.http.HttpClient.AsyncBody; +import io.fabric8.kubernetes.client.http.HttpClient.DerivedClientBuilder; +import io.fabric8.kubernetes.client.http.HttpResponse; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +class WatchHttpManagerTest { + + @Test + void testReconnectOnException() throws MalformedURLException, InterruptedException { + HttpClient client = Mockito.mock(HttpClient.class, Mockito.RETURNS_DEEP_STUBS); + DerivedClientBuilder builder = Mockito.mock(HttpClient.DerivedClientBuilder.class, Mockito.RETURNS_SELF); + Mockito.when(client.newBuilder()).thenReturn(builder); + Mockito.when(builder.build()).thenReturn(client); + BaseOperation baseOperation = Mockito.mock(BaseOperation.class); + Mockito.when(baseOperation.getNamespacedUrl()).thenReturn(new URL("http://localhost")); + CompletableFuture> future = new CompletableFuture<>(); + Mockito.when(client.consumeLines(Mockito.any(), Mockito.any())).thenReturn(future); + + CountDownLatch reconnect = new CountDownLatch(1); + WatchHTTPManager> watch = new WatchHTTPManager(client, + baseOperation, Mockito.mock(ListOptions.class), Mockito.mock(Watcher.class), 1, 0) { + @Override + void scheduleReconnect() { + reconnect.countDown(); + } + }; + + future.completeExceptionally(new IOException()); + assertTrue(reconnect.await(1, TimeUnit.SECONDS)); + } + +}