Skip to content

Commit

Permalink
fix: reduce the locking of methods reading indexes (5974)
Browse files Browse the repository at this point in the history
Reduce CacheImpl lock contention
---
Add BasicItemStoreTest
---
Expand CacheTest
---
Back to the future - fix copyright year
---
fix: reduce the locking of methods reading indexes

closes: #5973

Signed-off-by: Steve Hawkins <[email protected]>
  • Loading branch information
shawkins authored May 9, 2024
1 parent e4d60ba commit 27e0842
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 90 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* Fix #5867: (crd-generator) Imply schemaFrom via JsonFormat shape (SchemaFrom takes precedence)
* Fix #5867: (java-generator) Add JsonFormat shape to date-time
* Fix #5954: (crd-generator) Sort required properties to ensure deterministic output
* Fix #5973: CacheImpl locking for reading indexes (Cache.byIndex|indexKeys|index) was reduced

#### Dependency Upgrade
* Fix #5695: Upgrade Fabric8 Kubernetes Model to Kubernetes v1.30.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
import io.fabric8.kubernetes.api.model.HasMetadata;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Stream;

public class BasicItemStore<V extends HasMetadata> implements ItemStore<V> {

private Function<V, String> keyFunction;
private ConcurrentHashMap<String, V> store = new ConcurrentHashMap<>();
private final Function<V, String> keyFunction;
private final ConcurrentMap<String, V> store = new ConcurrentHashMap<>();

public BasicItemStore(Function<V, String> keyFunction) {
this.keyFunction = keyFunction;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client.informers.cache;

import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import org.junit.jupiter.api.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.assertj.core.api.Assertions.assertThat;

class BasicItemStoreTest {

@Test
void testEmptyStore() {
Pod pod = new PodBuilder().withNewMetadata().withName("test-pod").withResourceVersion("1").endMetadata().build();

ItemStore<Pod> itemStore = new BasicItemStore<>(BasicItemStoreTest::podToKey);

assertThat(itemStore.size()).isZero();
assertThat(itemStore.keySet()).isEmpty();
assertThat(itemStore.values()).isEmpty();
assertThat(itemStore.getKey(pod)).isEqualTo("pods/test-pod/1");
assertThat(itemStore.get("pods/test-pod/1")).isNull();
assertThat(itemStore.remove("pods/test-pod/1")).isNull();
assertThat(itemStore.size()).isZero();
}

@Test
void testPopulateStore() {
Pod pod1 = new PodBuilder().withNewMetadata().withName("test-pod").withResourceVersion("1").endMetadata().build();
Pod pod2 = new PodBuilder().withNewMetadata().withName("test-pod").withResourceVersion("2").endMetadata().build();

ItemStore<Pod> itemStore = new BasicItemStore<>(BasicItemStoreTest::podToKey);
itemStore.put("pods/test-pod/1", pod1);

assertThat(itemStore.size()).isOne();
assertThat(itemStore.keySet()).hasSize(1).containsExactly("pods/test-pod/1");
assertThat(itemStore.values()).hasSize(1).containsExactly(pod1);
assertThat(itemStore.get("pods/test-pod/1")).isNotNull().isEqualTo(pod1);
assertThat(itemStore.put(itemStore.getKey(pod2), pod2)).isNull();
assertThat(itemStore.size()).isEqualTo(2);
assertThat(itemStore.remove("pods/test-pod/1")).isEqualTo(pod1);
assertThat(itemStore.get("pods/test-pod/1")).isNull();
assertThat(itemStore.size()).isOne();
assertThat(itemStore.remove("pods/test-pod/2")).isEqualTo(pod2);
assertThat(itemStore.get("pods/test-pod/2")).isNull();
assertThat(itemStore.size()).isZero();
}

@Test
void parallelStore() throws InterruptedException {
ItemStore<Pod> itemStore = new BasicItemStore<>(BasicItemStoreTest::podToKey);

int tasks = 1000;
CountDownLatch latch = new CountDownLatch(tasks);
IntStream.range(0, tasks).<Runnable> mapToObj(i -> () -> {
Pod pod = new PodBuilder().withNewMetadata().withName("test-pod").withResourceVersion(Integer.toString(i)).endMetadata()
.build();
String key = itemStore.getKey(pod);
assertThat(itemStore.put(key, pod)).isNull();
assertThat(key).isEqualTo("pods/test-pod/" + i);
latch.countDown();
}).forEach(ForkJoinPool.commonPool()::execute);

assertThat(latch.await(15, TimeUnit.SECONDS)).isTrue();
assertThat(itemStore.size()).isEqualTo(tasks);
assertThat(itemStore.keySet()).hasSize(tasks).containsExactlyInAnyOrderElementsOf(
IntStream.range(0, tasks).mapToObj(i -> "pods/test-pod/" + i).collect(Collectors.toList()));
assertThat(itemStore.values()).hasSize(tasks);
assertThat(itemStore.get("pods/test-pod/123")).isNotNull().extracting(pod -> pod.getMetadata().getResourceVersion())
.isEqualTo("123");
}

private static String podToKey(Pod pod) {
return pod.getFullResourceName() + "/" + pod.getMetadata().getName() + "/" + pod.getMetadata().getResourceVersion();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,58 @@
import io.fabric8.kubernetes.client.utils.Utils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* It basically saves and indexes all the entries.
* <br>
* Index reads {@link #byIndex(String, String)}, {@link #indexKeys(String, String)}, {@link #index(String, HasMetadata)}
* are not globally locked and thus may not be fully consistent with the current state
*
* @param <T> type for cache object
*/
public class CacheImpl<T extends HasMetadata> implements Cache<T> {

private static class Index {
private Map<Object, Set<String>> values = new ConcurrentHashMap<Object, Set<String>>();

public void update(String indexKey, String key, boolean remove) {
if (remove) {
values.computeIfPresent(indexKey == null ? this : indexKey, (k, v) -> {
v.remove(key);
return v.isEmpty() ? null : v;
});
} else {
values.computeIfAbsent(indexKey == null ? this : indexKey, k -> ConcurrentHashMap.newKeySet()).add(key);
}
}

public Set<String> get(String indexKey) {
return values.getOrDefault(indexKey == null ? this : indexKey, Collections.emptySet());
}
}

// NAMESPACE_INDEX is the default index function for caching objects
public static final String NAMESPACE_INDEX = "namespace";

// indexers stores index functions by their names
private final Map<String, Function<T, List<String>>> indexers = new HashMap<>();
private final Map<String, Function<T, List<String>>> indexers = Collections.synchronizedMap(new HashMap<>());

// items stores object instances
private ItemStore<T> items;

// indices stores objects' key by their indices
private final Map<String, Map<String, Set<String>>> indices = new HashMap<>();
private final ConcurrentMap<String, Index> indices = new ConcurrentHashMap<>();

public CacheImpl() {
this(NAMESPACE_INDEX, Cache::metaNamespaceIndexFunc, Cache::metaNamespaceKeyFunc);
Expand All @@ -71,8 +96,10 @@ public void setItemStore(ItemStore<T> items) {
* @return registered indexers
*/
@Override
public synchronized Map<String, Function<T, List<String>>> getIndexers() {
return Collections.unmodifiableMap(indexers);
public Map<String, Function<T, List<String>>> getIndexers() {
synchronized (indexers) {
return Collections.unmodifiableMap(indexers);
}
}

@Override
Expand Down Expand Up @@ -114,7 +141,7 @@ public synchronized T remove(T obj) {
String key = getKey(obj);
T old = this.items.remove(key);
if (old != null) {
this.deleteFromIndices(old, key);
this.updateIndices(old, null, key);
}
return old;
}
Expand Down Expand Up @@ -179,33 +206,34 @@ public T getByKey(String key) {
* @return the list
*/
@Override
public synchronized List<T> index(String indexName, T obj) {
if (!this.indexers.containsKey(indexName)) {
public List<T> index(String indexName, T obj) {
Function<T, List<String>> indexFunc = this.indexers.get(indexName);
if (indexFunc == null) {
throw new IllegalArgumentException(String.format("index %s doesn't exist!", indexName));
}
Function<T, List<String>> indexFunc = this.indexers.get(indexName);
Index index = getIndex(indexName);
List<String> indexKeys = indexFunc.apply(obj);
Map<String, Set<String>> index = this.indices.get(indexName);
if (index.isEmpty()) {
return new ArrayList<>();
}

Set<String> returnKeySet = new HashSet<>();
for (String indexKey : indexKeys) {
Set<String> set = index.get(indexKey);
if (set.isEmpty()) {
continue;
}
returnKeySet.addAll(set);
returnKeySet.addAll(index.get(indexKey));
}

return getItems(returnKeySet);
}

private List<T> getItems(Set<String> returnKeySet) {
List<T> items = new ArrayList<>(returnKeySet.size());
for (String absoluteKey : returnKeySet) {
items.add(this.items.get(absoluteKey));
Optional.ofNullable(this.items.get(absoluteKey)).ifPresent(items::add);
}
return items;
}

private Index getIndex(String indexName) {
return Optional.ofNullable(this.indices.get(indexName))
.orElseThrow(() -> new IllegalArgumentException(String.format("index %s doesn't exist!", indexName)));
}

/**
* Index keys list
*
Expand All @@ -214,17 +242,9 @@ public synchronized List<T> index(String indexName, T obj) {
* @return the list
*/
@Override
public synchronized List<String> indexKeys(String indexName, String indexKey) {
if (!this.indexers.containsKey(indexName)) {
throw new IllegalArgumentException(String.format("index %s doesn't exist!", indexName));
}
Map<String, Set<String>> index = this.indices.get(indexName);
Set<String> set = index.get(indexKey);
List<String> keys = new ArrayList<>(set.size());
for (String key : set) {
keys.add(key);
}
return keys;
public List<String> indexKeys(String indexName, String indexKey) {
Index index = getIndex(indexName);
return new ArrayList<>(index.get(indexKey));
}

/**
Expand All @@ -235,20 +255,9 @@ public synchronized List<String> indexKeys(String indexName, String indexKey) {
* @return the list
*/
@Override
public synchronized List<T> byIndex(String indexName, String indexKey) {
if (!this.indexers.containsKey(indexName)) {
throw new IllegalArgumentException(String.format("index %s doesn't exist!", indexName));
}
Map<String, Set<String>> index = this.indices.get(indexName);
Set<String> set = index.get(indexKey);
if (set == null) {
return Arrays.asList();
}
List<T> items = new ArrayList<>(set.size());
for (String key : set) {
items.add(this.items.get(key));
}
return items;
public List<T> byIndex(String indexName, String indexKey) {
Index index = getIndex(indexName);
return getItems(index.get(indexKey));
}

/**
Expand All @@ -260,55 +269,28 @@ public synchronized List<T> byIndex(String indexName, String indexKey) {
* @param newObj new object
* @param key the key
*/
void updateIndices(T oldObj, T newObj, String key) {
if (oldObj != null) {
deleteFromIndices(oldObj, key);
}

private void updateIndices(T oldObj, T newObj, String key) {
for (Map.Entry<String, Function<T, List<String>>> indexEntry : indexers.entrySet()) {
String indexName = indexEntry.getKey();
Function<T, List<String>> indexFunc = indexEntry.getValue();
Map<String, Set<String>> index = this.indices.get(indexName);

updateIndex(key, newObj, indexFunc, index);
}
}

private void updateIndex(String key, T newObj, Function<T, List<String>> indexFunc, Map<String, Set<String>> index) {
List<String> indexValues = indexFunc.apply(newObj);
if (indexValues != null && !indexValues.isEmpty()) {
for (String indexValue : indexValues) {
Set<String> indexSet = index.computeIfAbsent(indexValue, k -> new HashSet<>());
indexSet.add(key);
Index index = this.indices.get(indexName);
if (index != null) {
if (oldObj != null) {
updateIndex(key, oldObj, indexFunc, index, true);
}
if (newObj != null) {
updateIndex(key, newObj, indexFunc, index, false);
}
}
}
}

/**
* Removes the object from each of the managed indexes.
*
* It is intended to be called from a function that already has a lock on the cache.
*
* @param oldObj the old object
* @param key the key
*/
private void deleteFromIndices(T oldObj, String key) {
for (Map.Entry<String, Function<T, List<String>>> indexEntry : this.indexers.entrySet()) {
Function<T, List<String>> indexFunc = indexEntry.getValue();
List<String> indexValues = indexFunc.apply(oldObj);
if (indexValues == null || indexValues.isEmpty()) {
continue;
}

Map<String, Set<String>> index = this.indices.get(indexEntry.getKey());
if (index == null) {
continue;
}
private void updateIndex(String key, T obj, Function<T, List<String>> indexFunc, Index index,
boolean remove) {
List<String> indexValues = indexFunc.apply(obj);
if (indexValues != null && !indexValues.isEmpty()) {
for (String indexValue : indexValues) {
Set<String> indexSet = index.get(indexValue);
if (indexSet != null) {
indexSet.remove(key);
}
index.update(indexValue, key, remove);
}
}
}
Expand All @@ -320,11 +302,14 @@ private void deleteFromIndices(T oldObj, String key) {
* @param indexFunc the index func
*/
public synchronized CacheImpl<T> addIndexFunc(String indexName, Function<T, List<String>> indexFunc) {
HashMap<String, Set<String>> index = new HashMap<>();
if (this.indices.containsKey(indexName)) {
throw new IllegalArgumentException("Indexer conflict: " + indexName);
}
Index index = new Index();
this.indices.put(indexName, index);
this.indexers.put(indexName, indexFunc);

items.values().forEach(v -> updateIndex(getKey(v), v, indexFunc, index));
items.values().forEach(v -> updateIndex(getKey(v), v, indexFunc, index, false));
return this;
}

Expand Down
Loading

0 comments on commit 27e0842

Please sign in to comment.