Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RamCache support custom expire time #1494

Merged
merged 2 commits into from
Jun 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,18 @@ public V getOrFetch(K id, Function<K, V> fetcher) {
return value;
}

@Watched(prefix = "cache")
@Override
public boolean update(K id, V value) {
return this.update(id, value, 0L);
}

@Watched(prefix = "cache")
@Override
public boolean update(K id, V value, long timeOffset) {
if (id == null || value == null || this.capacity <= 0L) {
return false;
}
return this.write(id, value);
return this.write(id, value, timeOffset);
}

@Watched(prefix = "cache")
Expand All @@ -137,7 +142,7 @@ public boolean updateIfAbsent(K id, V value) {
this.capacity <= 0L || this.containsKey(id)) {
return false;
}
return this.write(id, value);
return this.write(id, value, 0L);
}

@Watched(prefix = "cache")
Expand All @@ -147,7 +152,7 @@ public boolean updateIfPresent(K id, V value) {
this.capacity <= 0L || !this.containsKey(id)) {
return false;
}
return this.write(id, value);
return this.write(id, value, 0L);
}

@Watched(prefix = "cache")
Expand Down Expand Up @@ -180,7 +185,7 @@ public long tick() {
long current = now();
for (Iterator<CacheNode<K, V>> it = this.nodes(); it.hasNext();) {
CacheNode<K, V> node = it.next();
if (current - node.time() > expireTime) {
if (current - node.time() >= expireTime) {
// Remove item while iterating map (it must be ConcurrentMap)
this.remove(node.key());
expireItems++;
Expand Down Expand Up @@ -228,7 +233,7 @@ protected final long halfCapacity() {

protected abstract V access(K id);

protected abstract boolean write(K id, V value);
protected abstract boolean write(K id, V value, long timeOffset);

protected abstract void remove(K id);

Expand All @@ -242,11 +247,11 @@ protected static class CacheNode<K, V> {

private final K key;
private final V value;
private long time;
private final long time;

public CacheNode(K key, V value) {
public CacheNode(K key, V value, long timeOffset) {
assert key != null;
this.time = now();
this.time = now() + timeOffset;
corgiboygsj marked this conversation as resolved.
Show resolved Hide resolved
this.key = key;
this.value = value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public interface Cache<K, V> {

public boolean update(K id, V value);

public boolean update(K id, V value, long timeOffset);

public boolean updateIfAbsent(K id, V value);

public boolean updateIfPresent(K id, V value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ protected Object access(Id id) {
}

@Override
protected boolean write(Id id, Object value) {
protected boolean write(Id id, Object value, long timeOffset) {
boolean success = false;
for (AbstractCache<Id, Object> cache : this.caches) {
success |= cache.write(id, value);
success |= cache.write(id, value, timeOffset);
}
return success;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ protected Object access(Id id) {
}

@Override
protected boolean write(Id id, Object value) {
protected boolean write(Id id, Object value, long timeOffset) {
Value serializedValue = new Value(value);
int serializedSize = serializedValue.serializedSize();
if (serializedSize > VALUE_SIZE_TO_SKIP) {
Expand All @@ -122,7 +122,7 @@ protected boolean write(Id id, Object value) {
if (expireTime <= 0) {
success = this.cache.put(id, serializedValue);
} else {
expireTime += now();
expireTime += now() + timeOffset;
/*
* Seems only the linked implementation support expiring entries,
* the chunked implementation does not support it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ protected final Object access(Id id) {

@Override
@Watched(prefix = "ramcache")
protected final boolean write(Id id, Object value) {
protected final boolean write(Id id, Object value, long timeOffset) {
assert id != null;
long capacity = this.capacity();
assert capacity > 0;
Expand Down Expand Up @@ -150,7 +150,7 @@ protected final boolean write(Id id, Object value) {
}

// Add the new item to tail of the queue, then map it
this.map.put(id, this.queue.enqueue(id, value));
this.map.put(id, this.queue.enqueue(id, value, timeOffset));
return true;
} finally {
lock.unlock();
Expand Down Expand Up @@ -228,7 +228,11 @@ private static final class LinkNode<K, V> extends CacheNode<K, V> {
private LinkNode<K, V> next;

public LinkNode(K key, V value) {
super(key, value);
this(key, value, 0L);
}

public LinkNode(K key, V value, long timeOffset) {
super(key, value, timeOffset);
this.prev = this.next = null;
}

Expand Down Expand Up @@ -370,8 +374,8 @@ public void clear() {
/**
* Add an item with key-value to the queue
*/
public LinkNode<K, V> enqueue(K key, V value) {
return this.enqueue(new LinkNode<>(key, value));
public LinkNode<K, V> enqueue(K key, V value, long timeOffset) {
return this.enqueue(new LinkNode<>(key, value, timeOffset));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void teardown() throws Exception {

protected abstract Cache<Id, Object> newCache();

protected abstract Cache<Id, Object> newCache(long capacity) ;
protected abstract Cache<Id, Object> newCache(long capacity);

protected abstract void checkSize(Cache<Id, Object> cache, long size,
Map<Id, Object> kvs);
Expand Down Expand Up @@ -673,5 +673,24 @@ public void testMutiThreadsGetAndUpdateWithGtCapacity() {
// In fact, the size may be any value(such as 43)
Assert.assertTrue(cache.size() < 10 + THREADS_NUM);
}

@Test
public void testKeyExpired() {
Cache<Id, Object> cache = newCache();
cache.expire(2000L);

Id key = IdGenerator.of("key");
cache.update(key, "value", -1000L);

waitTillNext(1);
cache.tick();

Assert.assertFalse(cache.containsKey(key));

cache.update(key, "value", -2000L);
cache.tick();

Assert.assertFalse(cache.containsKey(key));
}
}