Skip to content

Commit

Permalink
RamCache support custom expire time (apache#1494)
Browse files Browse the repository at this point in the history
  • Loading branch information
corgiboygsj authored and jadepeng committed Jun 25, 2021
1 parent 2ac1534 commit af562d8
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,18 @@ public V getOrFetch(K id, Function<K, V> fetcher) {
return value;
}

@Watched(prefix = "cache")
@Override
public boolean update(K id, V value) {
return this.update(id, value, 0L);
}

@Watched(prefix = "cache")
@Override
public boolean update(K id, V value, long timeOffset) {
if (id == null || value == null || this.capacity <= 0L) {
return false;
}
return this.write(id, value);
return this.write(id, value, timeOffset);
}

@Watched(prefix = "cache")
Expand All @@ -137,7 +142,7 @@ public boolean updateIfAbsent(K id, V value) {
this.capacity <= 0L || this.containsKey(id)) {
return false;
}
return this.write(id, value);
return this.write(id, value, 0L);
}

@Watched(prefix = "cache")
Expand All @@ -147,7 +152,7 @@ public boolean updateIfPresent(K id, V value) {
this.capacity <= 0L || !this.containsKey(id)) {
return false;
}
return this.write(id, value);
return this.write(id, value, 0L);
}

@Watched(prefix = "cache")
Expand Down Expand Up @@ -180,7 +185,7 @@ public long tick() {
long current = now();
for (Iterator<CacheNode<K, V>> it = this.nodes(); it.hasNext();) {
CacheNode<K, V> node = it.next();
if (current - node.time() > expireTime) {
if (current - node.time() >= expireTime) {
// Remove item while iterating map (it must be ConcurrentMap)
this.remove(node.key());
expireItems++;
Expand Down Expand Up @@ -228,7 +233,7 @@ protected final long halfCapacity() {

protected abstract V access(K id);

protected abstract boolean write(K id, V value);
protected abstract boolean write(K id, V value, long timeOffset);

protected abstract void remove(K id);

Expand All @@ -242,11 +247,11 @@ protected static class CacheNode<K, V> {

private final K key;
private final V value;
private long time;
private final long time;

public CacheNode(K key, V value) {
public CacheNode(K key, V value, long timeOffset) {
assert key != null;
this.time = now();
this.time = now() + timeOffset;
this.key = key;
this.value = value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public interface Cache<K, V> {

public boolean update(K id, V value);

public boolean update(K id, V value, long timeOffset);

public boolean updateIfAbsent(K id, V value);

public boolean updateIfPresent(K id, V value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ protected Object access(Id id) {
}

@Override
protected boolean write(Id id, Object value) {
protected boolean write(Id id, Object value, long timeOffset) {
boolean success = false;
for (AbstractCache<Id, Object> cache : this.caches) {
success |= cache.write(id, value);
success |= cache.write(id, value, timeOffset);
}
return success;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ protected Object access(Id id) {
}

@Override
protected boolean write(Id id, Object value) {
protected boolean write(Id id, Object value, long timeOffset) {
Value serializedValue = new Value(value);
int serializedSize = serializedValue.serializedSize();
if (serializedSize > VALUE_SIZE_TO_SKIP) {
Expand All @@ -122,7 +122,7 @@ protected boolean write(Id id, Object value) {
if (expireTime <= 0) {
success = this.cache.put(id, serializedValue);
} else {
expireTime += now();
expireTime += now() + timeOffset;
/*
* Seems only the linked implementation support expiring entries,
* the chunked implementation does not support it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ protected final Object access(Id id) {

@Override
@Watched(prefix = "ramcache")
protected final boolean write(Id id, Object value) {
protected final boolean write(Id id, Object value, long timeOffset) {
assert id != null;
long capacity = this.capacity();
assert capacity > 0;
Expand Down Expand Up @@ -150,7 +150,7 @@ protected final boolean write(Id id, Object value) {
}

// Add the new item to tail of the queue, then map it
this.map.put(id, this.queue.enqueue(id, value));
this.map.put(id, this.queue.enqueue(id, value, timeOffset));
return true;
} finally {
lock.unlock();
Expand Down Expand Up @@ -228,7 +228,11 @@ private static final class LinkNode<K, V> extends CacheNode<K, V> {
private LinkNode<K, V> next;

public LinkNode(K key, V value) {
super(key, value);
this(key, value, 0L);
}

public LinkNode(K key, V value, long timeOffset) {
super(key, value, timeOffset);
this.prev = this.next = null;
}

Expand Down Expand Up @@ -370,8 +374,8 @@ public void clear() {
/**
* Add an item with key-value to the queue
*/
public LinkNode<K, V> enqueue(K key, V value) {
return this.enqueue(new LinkNode<>(key, value));
public LinkNode<K, V> enqueue(K key, V value, long timeOffset) {
return this.enqueue(new LinkNode<>(key, value, timeOffset));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void teardown() throws Exception {

protected abstract Cache<Id, Object> newCache();

protected abstract Cache<Id, Object> newCache(long capacity) ;
protected abstract Cache<Id, Object> newCache(long capacity);

protected abstract void checkSize(Cache<Id, Object> cache, long size,
Map<Id, Object> kvs);
Expand Down Expand Up @@ -673,5 +673,24 @@ public void testMutiThreadsGetAndUpdateWithGtCapacity() {
// In fact, the size may be any value(such as 43)
Assert.assertTrue(cache.size() < 10 + THREADS_NUM);
}

@Test
public void testKeyExpired() {
Cache<Id, Object> cache = newCache();
cache.expire(2000L);

Id key = IdGenerator.of("key");
cache.update(key, "value", -1000L);

waitTillNext(1);
cache.tick();

Assert.assertFalse(cache.containsKey(key));

cache.update(key, "value", -2000L);
cache.tick();

Assert.assertFalse(cache.containsKey(key));
}
}

0 comments on commit af562d8

Please sign in to comment.