Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: defer watch until the initial processing of list is complete #6004

Merged
merged 1 commit into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* Fix #5867: (java-generator) Add JsonFormat shape to date-time
* Fix #5954: (crd-generator) Sort required properties to ensure deterministic output
* Fix #5973: CacheImpl locking for reading indexes (Cache.byIndex|indexKeys|index) was reduced
* Fix #5953: Made informer watch starting deterministic with respect to list processing

#### Dependency Upgrade
* Fix #5695: Upgrade Fabric8 Kubernetes Model to Kubernetes v1.30.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

/**
* Wraps a {@link Cache} and a {@link SharedProcessor} to distribute events related to changes and syncs
*/
public class ProcessorStore<T extends HasMetadata> implements SyncableStore<T> {
public class ProcessorStore<T extends HasMetadata> {

private CacheImpl<T> cache;
private SharedProcessor<T> processor;
Expand All @@ -40,12 +42,10 @@ public ProcessorStore(CacheImpl<T> cache, SharedProcessor<T> processor) {
this.processor = processor;
}

@Override
public void add(T obj) {
update(obj);
}

@Override
public void update(List<T> items) {
items.stream().map(this::updateInternal).filter(Objects::nonNull).forEach(n -> this.processor.distribute(n, false));
}
Expand All @@ -65,44 +65,37 @@ private Notification<T> updateInternal(T obj) {
return notification;
}

@Override
public void update(T obj) {
Notification<T> notification = updateInternal(obj);
if (notification != null) {
this.processor.distribute(notification, false);
}
}

@Override
public void delete(T obj) {
Object oldObj = this.cache.remove(obj);
if (oldObj != null) {
this.processor.distribute(new ProcessorListener.DeleteNotification<>(obj, false), false);
}
}

@Override
public List<T> list() {
return cache.list();
}

@Override
public List<String> listKeys() {
return cache.listKeys();
}

@Override
public T get(T object) {
return cache.get(object);
}

@Override
public T getByKey(String key) {
return cache.getByKey(key);
}

@Override
public void retainAll(Set<String> nextKeys) {
public void retainAll(Set<String> nextKeys, Consumer<Executor> cacheStateComplete) {
if (synced.compareAndSet(false, true)) {
deferredAdd.stream().map(cache::getByKey).filter(Objects::nonNull)
.forEach(v -> this.processor.distribute(new ProcessorListener.AddNotification<>(v), false));
Expand All @@ -111,7 +104,6 @@ public void retainAll(Set<String> nextKeys) {
List<T> current = cache.list();
if (nextKeys.isEmpty() && current.isEmpty()) {
this.processor.distribute(l -> l.getHandler().onNothing(), false);
return;
}
current.forEach(v -> {
String key = cache.getKey(v);
Expand All @@ -120,14 +112,15 @@ public void retainAll(Set<String> nextKeys) {
this.processor.distribute(new ProcessorListener.DeleteNotification<>(v, true), false);
}
});
if (cacheStateComplete != null) {
cacheStateComplete.accept(this.processor::execute);
}
}

@Override
public String getKey(T obj) {
return cache.getKey(obj);
}

@Override
public void resync() {
// lock to ensure the ordering wrt other events
synchronized (cache.getLockObject()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T

private volatile String lastSyncResourceVersion;
private final ListerWatcher<T, L> listerWatcher;
private final SyncableStore<T> store;
private final ProcessorStore<T> store;
private final ReflectorWatcher watcher;
private volatile boolean watching;
private volatile CompletableFuture<AbstractWatchManager<T>> watchFuture;
Expand All @@ -64,11 +64,11 @@ public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T

private boolean cachedListing = true;

public Reflector(ListerWatcher<T, L> listerWatcher, SyncableStore<T> store) {
public Reflector(ListerWatcher<T, L> listerWatcher, ProcessorStore<T> store) {
this(listerWatcher, store, Runnable::run);
}

public Reflector(ListerWatcher<T, L> listerWatcher, SyncableStore<T> store, Executor executor) {
public Reflector(ListerWatcher<T, L> listerWatcher, ProcessorStore<T> store, Executor executor) {
this.listerWatcher = listerWatcher;
this.store = store;
this.watcher = new ReflectorWatcher();
Expand Down Expand Up @@ -124,11 +124,19 @@ public CompletableFuture<Void> listSyncAndWatch() {
}
Set<String> nextKeys = new ConcurrentSkipListSet<>();
CompletableFuture<Void> theFuture = processList(nextKeys, null).thenCompose(result -> {
store.retainAll(nextKeys);
final String latestResourceVersion = result.getMetadata().getResourceVersion();
lastSyncResourceVersion = latestResourceVersion;
log.debug("Listing items ({}) for {} at v{}", nextKeys.size(), this, latestResourceVersion);
return startWatcher(latestResourceVersion);
CompletableFuture<?> cf = new CompletableFuture<>();
store.retainAll(nextKeys, executor -> {
boolean startWatchImmediately = cachedListing && lastSyncResourceVersion == null;
lastSyncResourceVersion = latestResourceVersion;
if (startWatchImmediately) {
cf.complete(null);
} else {
executor.execute(() -> cf.complete(null));
}
});
return cf.thenCompose(ignored -> startWatcher(latestResourceVersion));
}).thenAccept(w -> {
if (w != null) {
if (!isStopped()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,8 @@ public ProcessorListener<T> addProcessorListener(ResourceEventHandler<? super T>
lock.writeLock().unlock();
}
}

public void execute(Runnable runnable) {
this.executor.execute(runnable);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,13 @@ void testSyncEvents() {
List<Pod> pods = Arrays.asList(pod, pod2);
processorStore.update(pods);

processorStore.retainAll(pods.stream().map(Cache::metaNamespaceKeyFunc).collect(Collectors.toSet()));
processorStore.retainAll(pods.stream().map(Cache::metaNamespaceKeyFunc).collect(Collectors.toSet()), null);

// resync two values
processorStore.resync();

// relist with deletes
processorStore.retainAll(Collections.emptySet());
processorStore.retainAll(Collections.emptySet(), null);

Mockito.verify(processor, Mockito.times(6)).distribute(notificationCaptor.capture(), syncCaptor.capture());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.fabric8.kubernetes.client.dsl.internal.AbstractWatchManager;
import io.fabric8.kubernetes.client.informers.impl.ListerWatcher;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.mockito.exceptions.verification.TooFewActualInvocations;
Expand All @@ -33,6 +34,7 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand All @@ -42,13 +44,24 @@

class ReflectorTest {

private ProcessorStore<Pod> mockStore;

@BeforeEach
void setup() {
mockStore = Mockito.mock(ProcessorStore.class);
Mockito.doAnswer(invocation -> {
((Consumer<Executor>) invocation.getArguments()[1]).accept(Runnable::run);
return null;
}).when(mockStore).retainAll(Mockito.anySet(),
Mockito.any());
}

@Test
void testStateFlags() {
ListerWatcher<Pod, PodList> mock = Mockito.mock(ListerWatcher.class);
PodList list = new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build();
Mockito.when(mock.submitList(Mockito.any())).thenReturn(CompletableFuture.completedFuture(list));

SyncableStore<Pod> mockStore = Mockito.mock(SyncableStore.class);
Reflector<Pod, PodList> reflector = new Reflector<Pod, PodList>(mock, mockStore) {
@Override
protected void reconnect() {
Expand Down Expand Up @@ -95,7 +108,7 @@ void testNotRunningAfterStartError() {
PodList list = new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build();
Mockito.when(mock.submitList(Mockito.any())).thenReturn(CompletableFuture.completedFuture(list));

Reflector<Pod, PodList> reflector = new Reflector<Pod, PodList>(mock, Mockito.mock(SyncableStore.class));
Reflector<Pod, PodList> reflector = new Reflector<Pod, PodList>(mock, mockStore);

// throw an exception, then watch normally
Mockito.when(mock.submitWatch(Mockito.any(), Mockito.any()))
Expand All @@ -115,7 +128,7 @@ void testNonHttpGone() {
PodList list = new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build();
Mockito.when(mock.submitList(Mockito.any())).thenReturn(CompletableFuture.completedFuture(list));

Reflector<Pod, PodList> reflector = new Reflector<>(mock, Mockito.mock(SyncableStore.class));
Reflector<Pod, PodList> reflector = new Reflector<>(mock, mockStore);

Mockito.when(mock.submitWatch(Mockito.any(), Mockito.any()))
.thenReturn(CompletableFuture.completedFuture(Mockito.mock(AbstractWatchManager.class)));
Expand Down Expand Up @@ -143,7 +156,7 @@ void testTimeout() {
return null;
}).when(ex).execute(Mockito.any(Runnable.class));

Reflector<Pod, PodList> reflector = new Reflector<>(mock, Mockito.mock(SyncableStore.class), ex);
Reflector<Pod, PodList> reflector = new Reflector<>(mock, mockStore, ex);
reflector.setMinTimeout(1);

AbstractWatchManager manager = Mockito.mock(AbstractWatchManager.class);
Expand Down
Loading