Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix cache metrics perf issue #1515

Merged
merged 5 commits into from
Jun 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.Iterator;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;

import org.slf4j.Logger;
Expand All @@ -36,15 +37,16 @@ public abstract class AbstractCache<K, V> implements Cache<K, V> {

protected static final Logger LOG = Log.logger(Cache.class);

private volatile long hits = 0L;
private volatile long miss = 0L;
// The unit of expired time is ms
private volatile long expire;

// Default expire time(ms)
private volatile long expire = 0L;
// Enabled cache metrics may cause performance penalty
private volatile boolean enabledMetrics;
private final LongAdder hits;
private final LongAdder miss;

// NOTE: the count in number of items, not in bytes
private final long capacity;
private final long halfCapacity;

// For user attachment
private final AtomicReference<Object> attachment;
Expand All @@ -58,8 +60,13 @@ public AbstractCache(long capacity) {
capacity = 0L;
}
this.capacity = capacity;
this.halfCapacity = this.capacity >> 1;
this.attachment = new AtomicReference<>();

this.expire = 0L;

this.enabledMetrics = false;
this.hits = new LongAdder();
this.miss = new LongAdder();
}

@Watched(prefix = "cache")
Expand All @@ -68,25 +75,13 @@ public V get(K id) {
if (id == null || this.capacity <= 0L) {
return null;
}
V value = null;
if (this.size() <= this.halfCapacity || this.containsKey(id)) {
// Maybe the id removed by other threads and returned null value
value = this.access(id);
}

if (value == null) {
++this.miss;
if (LOG.isDebugEnabled()) {
LOG.debug("Cache missed '{}' (miss={}, hits={})",
id, this.miss, this.hits);
}
} else {
++this.hits;
if (LOG.isDebugEnabled()) {
LOG.debug("Cache cached '{}' (hits={}, miss={})",
id, this.hits, this.miss);
}
V value = this.access(id);

if (this.enabledMetrics) {
this.collectMetrics(id, value);
}

return value;
}

Expand All @@ -96,29 +91,36 @@ public V getOrFetch(K id, Function<K, V> fetcher) {
if (id == null || this.capacity <= 0L) {
return null;
}
V value = null;
if (this.size() <= this.halfCapacity || this.containsKey(id)) {
// Maybe the id removed by other threads and returned null value
value = this.access(id);

V value = this.access(id);

if (this.enabledMetrics) {
this.collectMetrics(id, value);
}

// Do fetch and update the cache if cache missed
if (value == null) {
value = fetcher.apply(id);
this.update(id, value);
}

return value;
}

private void collectMetrics(K key, V value) {
if (value == null) {
++this.miss;
this.miss.add(1L);
if (LOG.isDebugEnabled()) {
LOG.debug("Cache missed '{}' (miss={}, hits={})",
id, this.miss, this.hits);
key, this.miss, this.hits);
}
// Do fetch and update the cache
value = fetcher.apply(id);
this.update(id, value);
} else {
++this.hits;
this.hits.add(1L);
if (LOG.isDebugEnabled()) {
LOG.debug("Cache cached '{}' (hits={}, miss={})",
id, this.hits, this.miss);
key, this.hits, this.miss);
}
}
return value;
}

@Override
Expand Down Expand Up @@ -199,14 +201,25 @@ public long tick() {
return expireItems;
}

@Override
public boolean enableMetrics(boolean enabled) {
boolean old = this.enabledMetrics;
if (!enabled) {
this.hits.reset();
this.miss.reset();
}
this.enabledMetrics = enabled;
return old;
}

@Override
public final long hits() {
return this.hits;
return this.hits.sum();
}

@Override
public final long miss() {
return this.miss;
return this.miss.sum();
}

@Override
Expand All @@ -227,10 +240,6 @@ public <T> T attachment() {
return attachment;
}

protected final long halfCapacity() {
return this.halfCapacity;
}

protected abstract V access(K id);

protected abstract boolean write(K id, V value, long timeOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public interface Cache<K, V> {

public long size();

public boolean enableMetrics(boolean enabled);

public long hits();

public long miss();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ public static CacheManager instance() {
return INSTANCE;
}

public static boolean cacheEnableMetrics(String name, boolean enabled) {
Cache<Id, ?> cache = INSTANCE.caches.get(name);
E.checkArgument(cache != null,
"Not found cache named '%s'", name);
return cache.enableMetrics(enabled);
}

public CacheManager() {
this.caches = new ConcurrentHashMap<>();
this.timer = new Timer("cache-expirer", true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ private Cache<Id, Object> cache(String prefix, String type, long capacity,
}
// Convert the unit from seconds to milliseconds
cache.expire(expire * 1000L);
// Enable metrics for graph cache by default
cache.enableMetrics(true);
return cache;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ protected boolean write(Id id, Object value, long timeOffset) {
}
long expireTime = this.expire();
boolean success;
if (expireTime <= 0) {
if (expireTime <= 0L) {
success = this.cache.put(id, serializedValue);
} else {
expireTime += now() + timeOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class RamCache extends AbstractCache<Id, Object> {
private final LinkedQueueNonBigLock<Id, Object> queue;

private final KeyLock keyLock;
private final long halfCapacity;

public RamCache() {
this(DEFAULT_SIZE);
Expand All @@ -47,11 +48,12 @@ public RamCache() {
public RamCache(long capacity) {
super(capacity);

this.keyLock = new KeyLock();

if (capacity < 0L) {
capacity = 0L;
}
this.keyLock = new KeyLock();
this.halfCapacity = capacity >> 1;

long initialCapacity = capacity >= MB ? capacity >> 10 : 256;
if (initialCapacity > MAX_INIT_CAP) {
initialCapacity = MAX_INIT_CAP;
Expand All @@ -66,8 +68,7 @@ public RamCache(long capacity) {
protected final Object access(Id id) {
assert id != null;

long halfCapacity = this.halfCapacity();
if (this.map.size() <= halfCapacity) {
if (this.map.size() <= this.halfCapacity) {
LinkNode<Id, Object> node = this.map.get(id);
if (node == null) {
return null;
Expand All @@ -76,15 +77,21 @@ protected final Object access(Id id) {
return node.value();
}

// Avoid to catch lock if cache missed
if (!this.containsKey(id)) {
return null;
}

final Lock lock = this.keyLock.lock(id);
try {
// Maybe the id removed by other threads and returned null value
LinkNode<Id, Object> node = this.map.get(id);
if (node == null) {
return null;
}

// NOTE: update the queue only if the size > capacity/2
if (this.map.size() > halfCapacity) {
if (this.map.size() > this.halfCapacity) {
// Move the node from mid to tail
if (this.queue.remove(node) == null) {
// The node may be removed by others through dequeue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,48 @@ public void testCacheInstance() {
this.originCaches.remove("c3");
}

@Test
public void testCacheEnableMetrics() {
// Don't mock
teardown();

CacheManager manager = CacheManager.instance();

Cache<Id, Object> c1 = manager.cache("m1");
Cache<Id, Object> c2 = manager.cache("m2");
Cache<Id, Object> c3 = manager.offheapCache(null, "m3", 1, 11);
Cache<Id, Object> c4 = manager.levelCache(null, "m4", 1, 1, 11);

Assert.assertEquals(false, c1.enableMetrics(false));
Assert.assertEquals(false, c2.enableMetrics(false));
Assert.assertEquals(false, c3.enableMetrics(false));
Assert.assertEquals(false, c4.enableMetrics(false));

Assert.assertEquals(false, CacheManager.cacheEnableMetrics("m1", true));
Assert.assertEquals(true, c1.enableMetrics(true));

Assert.assertEquals(false, CacheManager.cacheEnableMetrics("m2", true));
Assert.assertEquals(true, c2.enableMetrics(true));

Assert.assertEquals(false, CacheManager.cacheEnableMetrics("m3", true));
Assert.assertEquals(true, c3.enableMetrics(true));

Assert.assertEquals(false, CacheManager.cacheEnableMetrics("m4", true));
Assert.assertEquals(true, c4.enableMetrics(true));

Assert.assertEquals(true, CacheManager.cacheEnableMetrics("m1", false));
Assert.assertEquals(false, c1.enableMetrics(true));

Assert.assertEquals(true, CacheManager.cacheEnableMetrics("m2", false));
Assert.assertEquals(false, c2.enableMetrics(true));

Assert.assertEquals(true, CacheManager.cacheEnableMetrics("m3", false));
Assert.assertEquals(false, c3.enableMetrics(true));

Assert.assertEquals(true, CacheManager.cacheEnableMetrics("m4", false));
Assert.assertEquals(false, c4.enableMetrics(true));
}

@Test
public void testCacheGetPut() {
final String name = "test-cache";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,8 @@ public void testSizeWithReachCapacity() {
@Test
public void testHitsAndMiss() {
Cache<Id, Object> cache = newCache();
Assert.assertEquals(false, cache.enableMetrics(true));

Assert.assertEquals(0L, cache.hits());
Assert.assertEquals(0L, cache.miss());

Expand All @@ -457,6 +459,47 @@ public void testHitsAndMiss() {
Assert.assertEquals(2L, cache.miss());
}

@Test
public void testEnableMetrics() {
Cache<Id, Object> cache = newCache();
Assert.assertEquals(false, cache.enableMetrics(false));
Assert.assertEquals(false, cache.enableMetrics(true));

Assert.assertEquals(0L, cache.hits());
Assert.assertEquals(0L, cache.miss());

Id id = IdGenerator.of("1");
cache.update(id, "value-1");
Assert.assertEquals(0L, cache.hits());
Assert.assertEquals(0L, cache.miss());

cache.get(IdGenerator.of("not-exist"));
Assert.assertEquals(0L, cache.hits());
Assert.assertEquals(1L, cache.miss());

cache.get(IdGenerator.of("1"));
Assert.assertEquals(1L, cache.hits());
Assert.assertEquals(1L, cache.miss());

cache.get(IdGenerator.of("not-exist"));
Assert.assertEquals(1L, cache.hits());
Assert.assertEquals(2L, cache.miss());

cache.get(IdGenerator.of("1"));
Assert.assertEquals(2L, cache.hits());
Assert.assertEquals(2L, cache.miss());

Assert.assertEquals(true, cache.enableMetrics(false));

Assert.assertEquals(0L, cache.hits());
Assert.assertEquals(0L, cache.miss());

cache.get(IdGenerator.of("not-exist"));
cache.get(IdGenerator.of("1"));
Assert.assertEquals(0L, cache.hits());
Assert.assertEquals(0L, cache.miss());
}

@Test
public void testExpire() {
Cache<Id, Object> cache = newCache();
Expand Down Expand Up @@ -693,4 +736,3 @@ public void testKeyExpired() {
Assert.assertFalse(cache.containsKey(key));
}
}