Skip to content

Commit

Permalink
Fix the usage of CacheIteratorHelper for service account (elastic#75510)
Browse files Browse the repository at this point in the history
CacheIteratorHelper requires lock acquisition for any mutation to the
underlying cache. This means it is incorrect to manipulate the cache
without invocation of CacheIteratorHelper#acquireUpdateLock. This is OK
for caches of simple values, but feels excessive for caches of
ListenableFuture.

This PR update the cache invalidation code to use cache.forEach instead
of CacheInvalidator. It simplifies the code by removing any explicit
lockings. The tradeoff is that it needs to build a list of keys to
delete in memory. Overall it is a better tradeoff since no explicit
locking is required and better leverage of Cache's own methods.
  • Loading branch information
ywangd committed Jul 30, 2021
1 parent 8d9d552 commit f95e171
Showing 1 changed file with 27 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.security.action.service.TokenInfo.TokenSource;
import org.elasticsearch.xpack.core.security.authc.support.Hasher;
import org.elasticsearch.xpack.core.security.support.CacheIteratorHelper;
import org.elasticsearch.xpack.security.support.CacheInvalidatorRegistry;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;

public abstract class CachingServiceAccountTokenStore implements ServiceAccountTokenStore, CacheInvalidatorRegistry.CacheInvalidator {

Expand All @@ -42,7 +47,6 @@ public abstract class CachingServiceAccountTokenStore implements ServiceAccountT
private final Settings settings;
private final ThreadPool threadPool;
private final Cache<String, ListenableFuture<CachedResult>> cache;
private CacheIteratorHelper<String, ListenableFuture<CachedResult>> cacheIteratorHelper;
private final Hasher hasher;

CachingServiceAccountTokenStore(Settings settings, ThreadPool threadPool) {
Expand All @@ -54,10 +58,8 @@ public abstract class CachingServiceAccountTokenStore implements ServiceAccountT
.setExpireAfterWrite(ttl)
.setMaximumWeight(CACHE_MAX_TOKENS_SETTING.get(settings))
.build();
cacheIteratorHelper = new CacheIteratorHelper<>(cache);
} else {
cache = null;
cacheIteratorHelper = null;
}
hasher = Hasher.resolve(CACHE_HASH_ALGO_SETTING.get(settings));
}
Expand Down Expand Up @@ -126,16 +128,29 @@ private void authenticateWithCache(ServiceAccountToken token, ActionListener<Sto
@Override
public final void invalidate(Collection<String> qualifiedTokenNames) {
if (cache != null) {
logger.trace("invalidating cache for service token [{}]",
Strings.collectionToCommaDelimitedString(qualifiedTokenNames));
for (String qualifiedTokenName : qualifiedTokenNames) {
if (qualifiedTokenName.endsWith("/")) {
// Wildcard case of invalidating all tokens for a service account, e.g. "elastic/fleet-server/"
cacheIteratorHelper.removeKeysIf(key -> key.startsWith(qualifiedTokenName));
} else {
cache.invalidate(qualifiedTokenName);
logger.trace("invalidating cache for service token [{}]", Strings.collectionToCommaDelimitedString(qualifiedTokenNames));
final Set<String> exacts = new HashSet<>(qualifiedTokenNames);
final Set<String> prefixes = new HashSet<>();
final Iterator<String> it = exacts.iterator();
while (it.hasNext()) {
final String name = it.next();
if (name.endsWith("/")) {
prefixes.add(name);
it.remove();
}
}

exacts.forEach(cache::invalidate);
if (false == prefixes.isEmpty()) {
final Predicate<String> predicate = k -> prefixes.stream().anyMatch(k::startsWith);
final List<String> keys = new ArrayList<>();
cache.forEach((k, v) -> {
if (predicate.test(k)) {
keys.add(k);
}
});
keys.forEach(cache::invalidate);
}
}
}

Expand Down

0 comments on commit f95e171

Please sign in to comment.