Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cachelock #5974

Merged
merged 5 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* Fix #5867: (crd-generator) Imply schemaFrom via JsonFormat shape (SchemaFrom takes precedence)
* Fix #5867: (java-generator) Add JsonFormat shape to date-time
* Fix #5954: (crd-generator) Sort required properties to ensure deterministic output
* Fix #5973: CacheImpl locking for reading indexes (Cache.byIndex|indexKeys|index) was reduced

#### Dependency Upgrade
* Fix #5695: Upgrade Fabric8 Kubernetes Model to Kubernetes v1.30.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
import io.fabric8.kubernetes.api.model.HasMetadata;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Stream;

public class BasicItemStore<V extends HasMetadata> implements ItemStore<V> {

private Function<V, String> keyFunction;
private ConcurrentHashMap<String, V> store = new ConcurrentHashMap<>();
private final Function<V, String> keyFunction;
private final ConcurrentMap<String, V> store = new ConcurrentHashMap<>();

public BasicItemStore(Function<V, String> keyFunction) {
this.keyFunction = keyFunction;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client.informers.cache;

import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import org.junit.jupiter.api.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.assertj.core.api.Assertions.assertThat;

class BasicItemStoreTest {

@Test
void testEmptyStore() {
Pod pod = new PodBuilder().withNewMetadata().withName("test-pod").withResourceVersion("1").endMetadata().build();

ItemStore<Pod> itemStore = new BasicItemStore<>(BasicItemStoreTest::podToKey);

assertThat(itemStore.size()).isZero();
assertThat(itemStore.keySet()).isEmpty();
assertThat(itemStore.values()).isEmpty();
assertThat(itemStore.getKey(pod)).isEqualTo("pods/test-pod/1");
assertThat(itemStore.get("pods/test-pod/1")).isNull();
assertThat(itemStore.remove("pods/test-pod/1")).isNull();
assertThat(itemStore.size()).isZero();
}

@Test
void testPopulateStore() {
Pod pod1 = new PodBuilder().withNewMetadata().withName("test-pod").withResourceVersion("1").endMetadata().build();
Pod pod2 = new PodBuilder().withNewMetadata().withName("test-pod").withResourceVersion("2").endMetadata().build();

ItemStore<Pod> itemStore = new BasicItemStore<>(BasicItemStoreTest::podToKey);
itemStore.put("pods/test-pod/1", pod1);

assertThat(itemStore.size()).isOne();
assertThat(itemStore.keySet()).hasSize(1).containsExactly("pods/test-pod/1");
assertThat(itemStore.values()).hasSize(1).containsExactly(pod1);
assertThat(itemStore.get("pods/test-pod/1")).isNotNull().isEqualTo(pod1);
assertThat(itemStore.put(itemStore.getKey(pod2), pod2)).isNull();
assertThat(itemStore.size()).isEqualTo(2);
assertThat(itemStore.remove("pods/test-pod/1")).isEqualTo(pod1);
assertThat(itemStore.get("pods/test-pod/1")).isNull();
assertThat(itemStore.size()).isOne();
assertThat(itemStore.remove("pods/test-pod/2")).isEqualTo(pod2);
assertThat(itemStore.get("pods/test-pod/2")).isNull();
assertThat(itemStore.size()).isZero();
}

@Test
void parallelStore() throws InterruptedException {
ItemStore<Pod> itemStore = new BasicItemStore<>(BasicItemStoreTest::podToKey);

int tasks = 1000;
CountDownLatch latch = new CountDownLatch(tasks);
IntStream.range(0, tasks).<Runnable> mapToObj(i -> () -> {
Pod pod = new PodBuilder().withNewMetadata().withName("test-pod").withResourceVersion(Integer.toString(i)).endMetadata()
.build();
String key = itemStore.getKey(pod);
assertThat(itemStore.put(key, pod)).isNull();
assertThat(key).isEqualTo("pods/test-pod/" + i);
latch.countDown();
}).forEach(ForkJoinPool.commonPool()::execute);

assertThat(latch.await(15, TimeUnit.SECONDS)).isTrue();
assertThat(itemStore.size()).isEqualTo(tasks);
assertThat(itemStore.keySet()).hasSize(tasks).containsExactlyInAnyOrderElementsOf(
IntStream.range(0, tasks).mapToObj(i -> "pods/test-pod/" + i).collect(Collectors.toList()));
assertThat(itemStore.values()).hasSize(tasks);
assertThat(itemStore.get("pods/test-pod/123")).isNotNull().extracting(pod -> pod.getMetadata().getResourceVersion())
.isEqualTo("123");
}

private static String podToKey(Pod pod) {
return pod.getFullResourceName() + "/" + pod.getMetadata().getName() + "/" + pod.getMetadata().getResourceVersion();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,58 @@
import io.fabric8.kubernetes.client.utils.Utils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* It basically saves and indexes all the entries.
* <br>
* Index reads {@link #byIndex(String, String)}, {@link #indexKeys(String, String)}, {@link #index(String, HasMetadata)}
* are not globally locked and thus may not be fully consistent with the current state
*
* @param <T> type for cache object
*/
public class CacheImpl<T extends HasMetadata> implements Cache<T> {

private static class Index {
private Map<Object, Set<String>> values = new ConcurrentHashMap<Object, Set<String>>();

public void update(String indexKey, String key, boolean remove) {
if (remove) {
values.computeIfPresent(indexKey == null ? this : indexKey, (k, v) -> {
v.remove(key);
return v.isEmpty() ? null : v;
});
} else {
values.computeIfAbsent(indexKey == null ? this : indexKey, k -> ConcurrentHashMap.newKeySet()).add(key);
}
}

public Set<String> get(String indexKey) {
return values.getOrDefault(indexKey == null ? this : indexKey, Collections.emptySet());
}
}

// NAMESPACE_INDEX is the default index function for caching objects
public static final String NAMESPACE_INDEX = "namespace";

// indexers stores index functions by their names
private final Map<String, Function<T, List<String>>> indexers = new HashMap<>();
private final Map<String, Function<T, List<String>>> indexers = Collections.synchronizedMap(new HashMap<>());

// items stores object instances
private ItemStore<T> items;

// indices stores objects' key by their indices
private final Map<String, Map<String, Set<String>>> indices = new HashMap<>();
private final ConcurrentMap<String, Index> indices = new ConcurrentHashMap<>();

public CacheImpl() {
this(NAMESPACE_INDEX, Cache::metaNamespaceIndexFunc, Cache::metaNamespaceKeyFunc);
Expand All @@ -71,8 +96,10 @@ public void setItemStore(ItemStore<T> items) {
* @return registered indexers
*/
@Override
public synchronized Map<String, Function<T, List<String>>> getIndexers() {
return Collections.unmodifiableMap(indexers);
public Map<String, Function<T, List<String>>> getIndexers() {
synchronized (indexers) {
return Collections.unmodifiableMap(indexers);
}
}

@Override
Expand Down Expand Up @@ -114,7 +141,7 @@ public synchronized T remove(T obj) {
String key = getKey(obj);
T old = this.items.remove(key);
if (old != null) {
this.deleteFromIndices(old, key);
this.updateIndices(old, null, key);
}
return old;
}
Expand Down Expand Up @@ -179,33 +206,34 @@ public T getByKey(String key) {
* @return the list
*/
@Override
public synchronized List<T> index(String indexName, T obj) {
if (!this.indexers.containsKey(indexName)) {
public List<T> index(String indexName, T obj) {
Function<T, List<String>> indexFunc = this.indexers.get(indexName);
if (indexFunc == null) {
throw new IllegalArgumentException(String.format("index %s doesn't exist!", indexName));
}
Function<T, List<String>> indexFunc = this.indexers.get(indexName);
Index index = getIndex(indexName);
List<String> indexKeys = indexFunc.apply(obj);
Map<String, Set<String>> index = this.indices.get(indexName);
if (index.isEmpty()) {
return new ArrayList<>();
}

Set<String> returnKeySet = new HashSet<>();
for (String indexKey : indexKeys) {
Set<String> set = index.get(indexKey);
if (set.isEmpty()) {
continue;
}
returnKeySet.addAll(set);
returnKeySet.addAll(index.get(indexKey));
}

return getItems(returnKeySet);
}

private List<T> getItems(Set<String> returnKeySet) {
List<T> items = new ArrayList<>(returnKeySet.size());
for (String absoluteKey : returnKeySet) {
items.add(this.items.get(absoluteKey));
Optional.ofNullable(this.items.get(absoluteKey)).ifPresent(items::add);
}
return items;
}

private Index getIndex(String indexName) {
return Optional.ofNullable(this.indices.get(indexName))
.orElseThrow(() -> new IllegalArgumentException(String.format("index %s doesn't exist!", indexName)));
}

/**
* Index keys list
*
Expand All @@ -214,17 +242,9 @@ public synchronized List<T> index(String indexName, T obj) {
* @return the list
*/
@Override
public synchronized List<String> indexKeys(String indexName, String indexKey) {
if (!this.indexers.containsKey(indexName)) {
throw new IllegalArgumentException(String.format("index %s doesn't exist!", indexName));
}
Map<String, Set<String>> index = this.indices.get(indexName);
Set<String> set = index.get(indexKey);
List<String> keys = new ArrayList<>(set.size());
for (String key : set) {
keys.add(key);
}
return keys;
public List<String> indexKeys(String indexName, String indexKey) {
Index index = getIndex(indexName);
return new ArrayList<>(index.get(indexKey));
}

/**
Expand All @@ -235,20 +255,9 @@ public synchronized List<String> indexKeys(String indexName, String indexKey) {
* @return the list
*/
@Override
public synchronized List<T> byIndex(String indexName, String indexKey) {
if (!this.indexers.containsKey(indexName)) {
throw new IllegalArgumentException(String.format("index %s doesn't exist!", indexName));
}
Map<String, Set<String>> index = this.indices.get(indexName);
Set<String> set = index.get(indexKey);
if (set == null) {
return Arrays.asList();
}
List<T> items = new ArrayList<>(set.size());
for (String key : set) {
items.add(this.items.get(key));
}
return items;
public List<T> byIndex(String indexName, String indexKey) {
Index index = getIndex(indexName);
return getItems(index.get(indexKey));
}

/**
Expand All @@ -260,55 +269,28 @@ public synchronized List<T> byIndex(String indexName, String indexKey) {
* @param newObj new object
* @param key the key
*/
void updateIndices(T oldObj, T newObj, String key) {
if (oldObj != null) {
deleteFromIndices(oldObj, key);
}

private void updateIndices(T oldObj, T newObj, String key) {
for (Map.Entry<String, Function<T, List<String>>> indexEntry : indexers.entrySet()) {
String indexName = indexEntry.getKey();
Function<T, List<String>> indexFunc = indexEntry.getValue();
Map<String, Set<String>> index = this.indices.get(indexName);

updateIndex(key, newObj, indexFunc, index);
}
}

private void updateIndex(String key, T newObj, Function<T, List<String>> indexFunc, Map<String, Set<String>> index) {
List<String> indexValues = indexFunc.apply(newObj);
if (indexValues != null && !indexValues.isEmpty()) {
for (String indexValue : indexValues) {
Set<String> indexSet = index.computeIfAbsent(indexValue, k -> new HashSet<>());
indexSet.add(key);
Index index = this.indices.get(indexName);
if (index != null) {
if (oldObj != null) {
updateIndex(key, oldObj, indexFunc, index, true);
}
if (newObj != null) {
updateIndex(key, newObj, indexFunc, index, false);
}
}
}
}

/**
* Removes the object from each of the managed indexes.
*
* It is intended to be called from a function that already has a lock on the cache.
*
* @param oldObj the old object
* @param key the key
*/
private void deleteFromIndices(T oldObj, String key) {
for (Map.Entry<String, Function<T, List<String>>> indexEntry : this.indexers.entrySet()) {
Function<T, List<String>> indexFunc = indexEntry.getValue();
List<String> indexValues = indexFunc.apply(oldObj);
if (indexValues == null || indexValues.isEmpty()) {
continue;
}

Map<String, Set<String>> index = this.indices.get(indexEntry.getKey());
if (index == null) {
continue;
}
private void updateIndex(String key, T obj, Function<T, List<String>> indexFunc, Index index,
boolean remove) {
List<String> indexValues = indexFunc.apply(obj);
if (indexValues != null && !indexValues.isEmpty()) {
for (String indexValue : indexValues) {
Set<String> indexSet = index.get(indexValue);
if (indexSet != null) {
indexSet.remove(key);
}
index.update(indexValue, key, remove);
}
}
}
Expand All @@ -320,11 +302,14 @@ private void deleteFromIndices(T oldObj, String key) {
* @param indexFunc the index func
*/
public synchronized CacheImpl<T> addIndexFunc(String indexName, Function<T, List<String>> indexFunc) {
HashMap<String, Set<String>> index = new HashMap<>();
if (this.indices.containsKey(indexName)) {
throw new IllegalArgumentException("Indexer conflict: " + indexName);
}
Index index = new Index();
this.indices.put(indexName, index);
this.indexers.put(indexName, indexFunc);

items.values().forEach(v -> updateIndex(getKey(v), v, indexFunc, index));
items.values().forEach(v -> updateIndex(getKey(v), v, indexFunc, index, false));
return this;
}

Expand Down
Loading
Loading