diff --git a/CHANGELOG.md b/CHANGELOG.md index fa513122365..c5df143270a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ * Fix #5867: (java-generator) Add JsonFormat shape to date-time * Fix #5954: (crd-generator) Sort required properties to ensure deterministic output * Fix #5973: CacheImpl locking for reading indexes (Cache.byIndex|indexKeys|index) was reduced +* Fix #5953: Made informer watch starting deterministic with respect to list processing #### Dependency Upgrade * Fix #5695: Upgrade Fabric8 Kubernetes Model to Kubernetes v1.30.0 diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorStore.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorStore.java index 17ecee054c8..647f71e409d 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorStore.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorStore.java @@ -23,12 +23,14 @@ import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; /** * Wraps a {@link Cache} and a {@link SharedProcessor} to distribute events related to changes and syncs */ -public class ProcessorStore implements SyncableStore { +public class ProcessorStore { private CacheImpl cache; private SharedProcessor processor; @@ -40,12 +42,10 @@ public ProcessorStore(CacheImpl cache, SharedProcessor processor) { this.processor = processor; } - @Override public void add(T obj) { update(obj); } - @Override public void update(List items) { items.stream().map(this::updateInternal).filter(Objects::nonNull).forEach(n -> this.processor.distribute(n, false)); } @@ -65,7 +65,6 @@ private Notification updateInternal(T obj) { return notification; } - @Override public void update(T obj) { Notification notification = updateInternal(obj); if (notification != null) { @@ -73,7 +72,6 @@ public void update(T obj) { } } - @Override public void delete(T obj) { Object oldObj = this.cache.remove(obj); if (oldObj != null) { @@ -81,28 +79,23 @@ public void delete(T obj) { } } - @Override public List list() { return cache.list(); } - @Override public List listKeys() { return cache.listKeys(); } - @Override public T get(T object) { return cache.get(object); } - @Override public T getByKey(String key) { return cache.getByKey(key); } - @Override - public void retainAll(Set nextKeys) { + public void retainAll(Set nextKeys, Consumer cacheStateComplete) { if (synced.compareAndSet(false, true)) { deferredAdd.stream().map(cache::getByKey).filter(Objects::nonNull) .forEach(v -> this.processor.distribute(new ProcessorListener.AddNotification<>(v), false)); @@ -111,7 +104,6 @@ public void retainAll(Set nextKeys) { List current = cache.list(); if (nextKeys.isEmpty() && current.isEmpty()) { this.processor.distribute(l -> l.getHandler().onNothing(), false); - return; } current.forEach(v -> { String key = cache.getKey(v); @@ -120,14 +112,15 @@ public void retainAll(Set nextKeys) { this.processor.distribute(new ProcessorListener.DeleteNotification<>(v, true), false); } }); + if (cacheStateComplete != null) { + cacheStateComplete.accept(this.processor::execute); + } } - @Override public String getKey(T obj) { return cache.getKey(obj); } - @Override public void resync() { // lock to ensure the ordering wrt other events synchronized (cache.getLockObject()) { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java index 1c3cf739d66..1dddea9344e 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java @@ -47,7 +47,7 @@ public class Reflector listerWatcher; - private final SyncableStore store; + private final ProcessorStore store; private final ReflectorWatcher watcher; private volatile boolean watching; private volatile CompletableFuture> watchFuture; @@ -64,11 +64,11 @@ public class Reflector listerWatcher, SyncableStore store) { + public Reflector(ListerWatcher listerWatcher, ProcessorStore store) { this(listerWatcher, store, Runnable::run); } - public Reflector(ListerWatcher listerWatcher, SyncableStore store, Executor executor) { + public Reflector(ListerWatcher listerWatcher, ProcessorStore store, Executor executor) { this.listerWatcher = listerWatcher; this.store = store; this.watcher = new ReflectorWatcher(); @@ -124,11 +124,19 @@ public CompletableFuture listSyncAndWatch() { } Set nextKeys = new ConcurrentSkipListSet<>(); CompletableFuture theFuture = processList(nextKeys, null).thenCompose(result -> { - store.retainAll(nextKeys); final String latestResourceVersion = result.getMetadata().getResourceVersion(); - lastSyncResourceVersion = latestResourceVersion; log.debug("Listing items ({}) for {} at v{}", nextKeys.size(), this, latestResourceVersion); - return startWatcher(latestResourceVersion); + CompletableFuture cf = new CompletableFuture<>(); + store.retainAll(nextKeys, executor -> { + boolean startWatchImmediately = cachedListing && lastSyncResourceVersion == null; + lastSyncResourceVersion = latestResourceVersion; + if (startWatchImmediately) { + cf.complete(null); + } else { + executor.execute(() -> cf.complete(null)); + } + }); + return cf.thenCompose(ignored -> startWatcher(latestResourceVersion)); }).thenAccept(w -> { if (w != null) { if (!isStopped()) { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java index 166c7c74dd2..2de8832c118 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java @@ -174,4 +174,8 @@ public ProcessorListener addProcessorListener(ResourceEventHandler lock.writeLock().unlock(); } } + + public void execute(Runnable runnable) { + this.executor.execute(runnable); + } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SyncableStore.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SyncableStore.java deleted file mode 100644 index eff64ee9e6f..00000000000 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SyncableStore.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (C) 2015 Red Hat, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.fabric8.kubernetes.client.informers.impl.cache; - -import io.fabric8.kubernetes.client.informers.cache.Store; - -import java.util.List; -import java.util.Set; - -/** - * Extends a {@link Store}, but also has the responsibility of - * notifying listeners on all operations. - */ -public interface SyncableStore extends Store { - - /** - * Inserts an item into the store - * - * @param obj object - */ - void add(T obj); - - /** - * Sets an item in the store to its updated state. - * - * @param obj object - */ - void update(T obj); - - /** - * Removes an item from the store - * - * @param obj object - */ - void delete(T obj); - - /** - * Sends a resync event for each item. - */ - void resync(); - - /** - * Retain only the values with keys in the given set - * - * @param nextKeys to retain - */ - void retainAll(Set nextKeys); - - /** - * Process a batch of updates - * - * @param items - */ - void update(List items); - -} diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorStoreTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorStoreTest.java index 7ac549affcb..5b67d67fef5 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorStoreTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ProcessorStoreTest.java @@ -94,13 +94,13 @@ void testSyncEvents() { List pods = Arrays.asList(pod, pod2); processorStore.update(pods); - processorStore.retainAll(pods.stream().map(Cache::metaNamespaceKeyFunc).collect(Collectors.toSet())); + processorStore.retainAll(pods.stream().map(Cache::metaNamespaceKeyFunc).collect(Collectors.toSet()), null); // resync two values processorStore.resync(); // relist with deletes - processorStore.retainAll(Collections.emptySet()); + processorStore.retainAll(Collections.emptySet(), null); Mockito.verify(processor, Mockito.times(6)).distribute(notificationCaptor.capture(), syncCaptor.capture()); diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java index d3badeac9bc..1e90d4b29c3 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java @@ -23,6 +23,7 @@ import io.fabric8.kubernetes.client.dsl.internal.AbstractWatchManager; import io.fabric8.kubernetes.client.informers.impl.ListerWatcher; import org.awaitility.Awaitility; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; import org.mockito.exceptions.verification.TooFewActualInvocations; @@ -33,6 +34,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -42,13 +44,24 @@ class ReflectorTest { + private ProcessorStore mockStore; + + @BeforeEach + void setup() { + mockStore = Mockito.mock(ProcessorStore.class); + Mockito.doAnswer(invocation -> { + ((Consumer) invocation.getArguments()[1]).accept(Runnable::run); + return null; + }).when(mockStore).retainAll(Mockito.anySet(), + Mockito.any()); + } + @Test void testStateFlags() { ListerWatcher mock = Mockito.mock(ListerWatcher.class); PodList list = new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build(); Mockito.when(mock.submitList(Mockito.any())).thenReturn(CompletableFuture.completedFuture(list)); - SyncableStore mockStore = Mockito.mock(SyncableStore.class); Reflector reflector = new Reflector(mock, mockStore) { @Override protected void reconnect() { @@ -95,7 +108,7 @@ void testNotRunningAfterStartError() { PodList list = new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build(); Mockito.when(mock.submitList(Mockito.any())).thenReturn(CompletableFuture.completedFuture(list)); - Reflector reflector = new Reflector(mock, Mockito.mock(SyncableStore.class)); + Reflector reflector = new Reflector(mock, mockStore); // throw an exception, then watch normally Mockito.when(mock.submitWatch(Mockito.any(), Mockito.any())) @@ -115,7 +128,7 @@ void testNonHttpGone() { PodList list = new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build(); Mockito.when(mock.submitList(Mockito.any())).thenReturn(CompletableFuture.completedFuture(list)); - Reflector reflector = new Reflector<>(mock, Mockito.mock(SyncableStore.class)); + Reflector reflector = new Reflector<>(mock, mockStore); Mockito.when(mock.submitWatch(Mockito.any(), Mockito.any())) .thenReturn(CompletableFuture.completedFuture(Mockito.mock(AbstractWatchManager.class))); @@ -143,7 +156,7 @@ void testTimeout() { return null; }).when(ex).execute(Mockito.any(Runnable.class)); - Reflector reflector = new Reflector<>(mock, Mockito.mock(SyncableStore.class), ex); + Reflector reflector = new Reflector<>(mock, mockStore, ex); reflector.setMinTimeout(1); AbstractWatchManager manager = Mockito.mock(AbstractWatchManager.class);