From 5cc625d4b77d05143f33611f5f9b3f548147deac Mon Sep 17 00:00:00 2001 From: "w.vela" Date: Wed, 22 Nov 2017 15:45:46 +0800 Subject: [PATCH 01/15] Add TimeoutListener for MemcachedClient. --- .../net/spy/memcached/MemcachedClient.java | 20 ++++++ .../net/spy/memcached/TimeoutListener.java | 9 +++ .../internal/AbstractListenableFuture.java | 3 + .../spy/memcached/internal/BulkGetFuture.java | 20 ++++++ .../net/spy/memcached/internal/GetFuture.java | 5 ++ .../memcached/internal/OperationFuture.java | 19 +++++- .../MemcachedTimeoutListenerTest.java | 68 +++++++++++++++++++ .../internal/DummyListenableFuture.java | 6 ++ 8 files changed, 148 insertions(+), 2 deletions(-) create mode 100644 src/main/java/net/spy/memcached/TimeoutListener.java create mode 100644 src/test/java/net/spy/memcached/MemcachedTimeoutListenerTest.java diff --git a/src/main/java/net/spy/memcached/MemcachedClient.java b/src/main/java/net/spy/memcached/MemcachedClient.java index 77d0f048e..2f805a2f5 100644 --- a/src/main/java/net/spy/memcached/MemcachedClient.java +++ b/src/main/java/net/spy/memcached/MemcachedClient.java @@ -159,6 +159,8 @@ public class MemcachedClient extends SpyObject implements MemcachedClientIF, protected final ExecutorService executorService; + private final List timeoutListeners = new ArrayList(); + /** * Get a memcache client operating on the specified memcached locations. * @@ -323,6 +325,7 @@ public void complete() { rv.signalComplete(); } }); + rv.setTimeoutListeners(timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); return rv; @@ -352,6 +355,7 @@ public void complete() { rv.signalComplete(); } }); + rv.setTimeoutListeners(timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); return rv; @@ -404,6 +408,7 @@ public void complete() { rv.signalComplete(); } }); + rv.setTimeoutListeners(timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); return rv; @@ -649,6 +654,7 @@ public void complete() { rv.signalComplete(); } }); + rv.setTimeoutListeners(timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); return rv; @@ -1040,6 +1046,7 @@ public void complete() { rv.signalComplete(); } }); + rv.setTimeoutListeners(timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); return rv; @@ -1099,6 +1106,7 @@ public void complete() { rv.signalComplete(); } }); + rv.setTimeoutListeners(timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); return rv; @@ -1358,6 +1366,7 @@ public void complete() { ops.add(op); } assert mops.size() == chunks.size(); + rv.setTimeoutListeners(timeoutListeners); mconn.checkState(); mconn.addOperations(mops); return rv; @@ -1530,6 +1539,7 @@ public void gotData(String k, int flags, long cas, byte[] data) { tc.getMaxSize()))); } }); + rv.setTimeoutListeners(timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); return rv; @@ -2001,6 +2011,7 @@ public void complete() { rv.signalComplete(); } }); + rv.setTimeoutListeners(timeoutListeners); mconn.enqueueOperation(key, op); rv.setOperation(op); return rv; @@ -2335,6 +2346,7 @@ public void complete() { op = opFact.delete(key, cas, callback); } + rv.setTimeoutListeners(timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); return rv; @@ -2581,6 +2593,14 @@ public boolean addObserver(ConnectionObserver obs) { return rv; } + public MemcachedClient addTimeoutListener(TimeoutListener listener) { + if (listener == null) { + throw new NullPointerException(); + } + timeoutListeners.add(listener); + return this; + } + /** * Remove a connection observer. * diff --git a/src/main/java/net/spy/memcached/TimeoutListener.java b/src/main/java/net/spy/memcached/TimeoutListener.java new file mode 100644 index 000000000..5436aab4d --- /dev/null +++ b/src/main/java/net/spy/memcached/TimeoutListener.java @@ -0,0 +1,9 @@ +package net.spy.memcached; + +import java.util.EventListener; +import java.util.concurrent.Future; + +public interface TimeoutListener extends EventListener { + + void onTimeout(Future future) throws Exception; +} diff --git a/src/main/java/net/spy/memcached/internal/AbstractListenableFuture.java b/src/main/java/net/spy/memcached/internal/AbstractListenableFuture.java index fd0b4e970..54a1084c6 100644 --- a/src/main/java/net/spy/memcached/internal/AbstractListenableFuture.java +++ b/src/main/java/net/spy/memcached/internal/AbstractListenableFuture.java @@ -23,6 +23,7 @@ package net.spy.memcached.internal; +import net.spy.memcached.TimeoutListener; import net.spy.memcached.compat.SpyObject; import java.util.ArrayList; @@ -66,6 +67,8 @@ protected AbstractListenableFuture(ExecutorService executor) { listeners = new ArrayList>>(); } + public abstract void setTimeoutListeners(List timeoutListeners); + /** * Returns the current executor. * diff --git a/src/main/java/net/spy/memcached/internal/BulkGetFuture.java b/src/main/java/net/spy/memcached/internal/BulkGetFuture.java index 0f956c3a1..bcdf141a0 100644 --- a/src/main/java/net/spy/memcached/internal/BulkGetFuture.java +++ b/src/main/java/net/spy/memcached/internal/BulkGetFuture.java @@ -26,6 +26,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; @@ -36,6 +37,7 @@ import java.util.concurrent.TimeoutException; import net.spy.memcached.MemcachedConnection; +import net.spy.memcached.TimeoutListener; import net.spy.memcached.compat.log.LoggerFactory; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationState; @@ -59,6 +61,7 @@ public class BulkGetFuture private OperationStatus status; private boolean cancelled = false; private boolean timeout = false; + private List timeoutListeners; public BulkGetFuture(Map> m, Collection getOps, CountDownLatch l, ExecutorService service) { @@ -104,6 +107,13 @@ public Map getSome(long to, TimeUnit unit) Map ret = internalGet(to, unit, timedoutOps); if (timedoutOps.size() > 0) { timeout = true; + for (TimeoutListener timeoutListener : timeoutListeners) { + try { + timeoutListener.onTimeout(this); + } catch (Exception e) { + LoggerFactory.getLogger(getClass()).error("fail to execute timeout listener:", e); + } + } LoggerFactory.getLogger(getClass()).warn( new CheckedOperationTimeoutException("Operation timed out: ", timedoutOps).getMessage()); @@ -124,6 +134,13 @@ public Map get(long to, TimeUnit unit) Map ret = internalGet(to, unit, timedoutOps); if (timedoutOps.size() > 0) { this.timeout = true; + for (TimeoutListener timeoutListener : timeoutListeners) { + try { + timeoutListener.onTimeout(this); + } catch (Exception e) { + LoggerFactory.getLogger(getClass()).error("fail to execute timeout listener:", e); + } + } throw new CheckedOperationTimeoutException("Operation timed out.", timedoutOps); } @@ -225,4 +242,7 @@ public void signalComplete() { notifyListeners(); } + public void setTimeoutListeners(List timeoutListeners) { + this.timeoutListeners = timeoutListeners; + } } diff --git a/src/main/java/net/spy/memcached/internal/GetFuture.java b/src/main/java/net/spy/memcached/internal/GetFuture.java index a18ab0b56..dddc66116 100644 --- a/src/main/java/net/spy/memcached/internal/GetFuture.java +++ b/src/main/java/net/spy/memcached/internal/GetFuture.java @@ -24,6 +24,7 @@ package net.spy.memcached.internal; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -31,6 +32,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import net.spy.memcached.TimeoutListener; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationStatus; @@ -109,4 +111,7 @@ public void signalComplete() { notifyListeners(); } + public void setTimeoutListeners(List timeoutListeners) { + rv.setTimeoutListeners(timeoutListeners); + } } diff --git a/src/main/java/net/spy/memcached/internal/OperationFuture.java b/src/main/java/net/spy/memcached/internal/OperationFuture.java index 3a7f8cca8..4e1cb1bf6 100644 --- a/src/main/java/net/spy/memcached/internal/OperationFuture.java +++ b/src/main/java/net/spy/memcached/internal/OperationFuture.java @@ -23,6 +23,8 @@ package net.spy.memcached.internal; +import java.util.Collection; +import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -33,6 +35,7 @@ import java.util.concurrent.atomic.AtomicReference; import net.spy.memcached.MemcachedConnection; +import net.spy.memcached.TimeoutListener; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationState; import net.spy.memcached.ops.OperationStatus; @@ -61,6 +64,7 @@ public class OperationFuture private final long timeout; private Operation op; private final String key; + private List timeoutListeners; private Long cas; /** @@ -99,6 +103,11 @@ public OperationFuture(String k, CountDownLatch l, AtomicReference oref, cas = null; } + @Override + public void setTimeoutListeners(List timeoutListeners) { + this.timeoutListeners = timeoutListeners; + } + /** * Cancel this operation, if possible. * @@ -164,8 +173,14 @@ public T get(long duration, TimeUnit units) throws InterruptedException, if (op != null) { // op can be null on a flush op.timeOut(); } - throw new CheckedOperationTimeoutException( - "Timed out waiting for operation", op); + for (TimeoutListener listener : timeoutListeners) { + try { + listener.onTimeout(this); + } catch (Exception e) { + getLogger().error("Error execute timeout listeners", e); + } + } + throw new CheckedOperationTimeoutException("Timed out waiting for operation", op); } else { // continuous timeout counter will be reset MemcachedConnection.opSucceeded(op); diff --git a/src/test/java/net/spy/memcached/MemcachedTimeoutListenerTest.java b/src/test/java/net/spy/memcached/MemcachedTimeoutListenerTest.java new file mode 100644 index 000000000..b7d7a5b97 --- /dev/null +++ b/src/test/java/net/spy/memcached/MemcachedTimeoutListenerTest.java @@ -0,0 +1,68 @@ +package net.spy.memcached; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static net.spy.memcached.TestConfig.PORT_NUMBER; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; + +import junit.framework.TestCase; + +public class MemcachedTimeoutListenerTest extends TestCase { + + private MemcachedClient client = null; + private int timeoutCount; + + @Override + protected void setUp() throws IOException { + client = new MemcachedClient(AddrUtil.getAddresses("1.1.1.1:" + PORT_NUMBER)); + client.addTimeoutListener(new TimeoutListener() { + + public void onTimeout(Future future) { + timeoutCount++; + } + }); + } + + @Override + protected void tearDown() { + if (client != null) { + try { + client.shutdown(); + } catch (NullPointerException e) { + // This is a workaround for a disagreement betweewn how things + // should work in eclipse and buildr. My plan is to upgrade to + // junit4 all around and write some tests that are a bit easier + // to follow. + + // The actual problem here is a client that isn't properly + // initialized is attempting to be shut down. + } + } + } + + public void testTimeoutListener() throws ExecutionException, InterruptedException { + try { + client.get("test"); + fail(); + } catch (OperationTimeoutException e) { + } + assertEquals(1, timeoutCount); + try { + client.set("test", 1, 1).get(1, MILLISECONDS); + fail(); + } catch (TimeoutException e) { + } + assertEquals(2, timeoutCount); + try { + client.getBulk("test1", "test2"); + fail(); + } catch (OperationTimeoutException e) { + } + assertEquals(3, timeoutCount); + client.asyncGetBulk("test3").getSome(1, MILLISECONDS); + assertEquals(4, timeoutCount); + } +} diff --git a/src/test/java/net/spy/memcached/internal/DummyListenableFuture.java b/src/test/java/net/spy/memcached/internal/DummyListenableFuture.java index 86d89ff21..c6b5fd81a 100644 --- a/src/test/java/net/spy/memcached/internal/DummyListenableFuture.java +++ b/src/test/java/net/spy/memcached/internal/DummyListenableFuture.java @@ -23,11 +23,14 @@ package net.spy.memcached.internal; +import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import net.spy.memcached.TimeoutListener; + /** * A very basic {@link ListenableFuture} to verify and test basic * add, remove and notification behavior. @@ -96,4 +99,7 @@ public DummyListenableFuture removeListener( return this; } + public void setTimeoutListeners(List timeoutListeners) { + throw new UnsupportedOperationException(); + } } From 893050fa82d71299c32e1ff3f2b61f79bb2bab54 Mon Sep 17 00:00:00 2001 From: "w.vela" Date: Thu, 23 Nov 2017 08:11:23 +0800 Subject: [PATCH 02/15] Add Method parameter for TimeoutListener. --- .../net/spy/memcached/MemcachedClient.java | 88 ++++++++------ .../net/spy/memcached/TimeoutListener.java | 48 +++++++- .../internal/AbstractListenableFuture.java | 3 +- .../spy/memcached/internal/BulkGetFuture.java | 11 +- .../net/spy/memcached/internal/GetFuture.java | 8 +- .../memcached/internal/OperationFuture.java | 8 +- .../MemcachedTimeoutListenerTest.java | 107 ++++++++++++++++-- .../internal/DummyListenableFuture.java | 3 +- 8 files changed, 221 insertions(+), 55 deletions(-) diff --git a/src/main/java/net/spy/memcached/MemcachedClient.java b/src/main/java/net/spy/memcached/MemcachedClient.java index 2f805a2f5..40737770a 100644 --- a/src/main/java/net/spy/memcached/MemcachedClient.java +++ b/src/main/java/net/spy/memcached/MemcachedClient.java @@ -23,9 +23,44 @@ package net.spy.memcached; +import static net.spy.memcached.TimeoutListener.Method.cas; +import static net.spy.memcached.TimeoutListener.Method.delete; +import static net.spy.memcached.TimeoutListener.Method.from; +import static net.spy.memcached.TimeoutListener.Method.getAndTouch; +import static net.spy.memcached.TimeoutListener.Method.getBulkSome; +import static net.spy.memcached.TimeoutListener.Method.gets; +import static net.spy.memcached.TimeoutListener.Method.touch; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + import net.spy.memcached.auth.AuthDescriptor; import net.spy.memcached.auth.AuthThreadMonitor; import net.spy.memcached.compat.SpyObject; +import net.spy.memcached.compat.log.LoggerFactory; import net.spy.memcached.internal.BulkFuture; import net.spy.memcached.internal.BulkGetFuture; import net.spy.memcached.internal.GetFuture; @@ -53,32 +88,6 @@ import net.spy.memcached.transcoders.Transcoder; import net.spy.memcached.util.StringUtils; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - /** * Client to a memcached server. * @@ -325,7 +334,7 @@ public void complete() { rv.signalComplete(); } }); - rv.setTimeoutListeners(timeoutListeners); + rv.setTimeoutListeners(from(storeType), timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); return rv; @@ -355,7 +364,7 @@ public void complete() { rv.signalComplete(); } }); - rv.setTimeoutListeners(timeoutListeners); + rv.setTimeoutListeners(from(catType), timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); return rv; @@ -408,7 +417,7 @@ public void complete() { rv.signalComplete(); } }); - rv.setTimeoutListeners(timeoutListeners); + rv.setTimeoutListeners(touch, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); return rv; @@ -654,7 +663,7 @@ public void complete() { rv.signalComplete(); } }); - rv.setTimeoutListeners(timeoutListeners); + rv.setTimeoutListeners(cas, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); return rv; @@ -1046,7 +1055,7 @@ public void complete() { rv.signalComplete(); } }); - rv.setTimeoutListeners(timeoutListeners); + rv.setTimeoutListeners(null, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); return rv; @@ -1106,7 +1115,7 @@ public void complete() { rv.signalComplete(); } }); - rv.setTimeoutListeners(timeoutListeners); + rv.setTimeoutListeners(gets, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); return rv; @@ -1366,7 +1375,7 @@ public void complete() { ops.add(op); } assert mops.size() == chunks.size(); - rv.setTimeoutListeners(timeoutListeners); + rv.setTimeoutListeners(null, timeoutListeners); mconn.checkState(); mconn.addOperations(mops); return rv; @@ -1539,7 +1548,7 @@ public void gotData(String k, int flags, long cas, byte[] data) { tc.getMaxSize()))); } }); - rv.setTimeoutListeners(timeoutListeners); + rv.setTimeoutListeners(getAndTouch, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); return rv; @@ -1777,6 +1786,13 @@ public void complete() { })); try { if (!latch.await(operationTimeout, TimeUnit.MILLISECONDS)) { + for (TimeoutListener timeoutListener : timeoutListeners) { + try { + timeoutListener.onTimeout(from(m), null); + } catch (Exception e) { + LoggerFactory.getLogger(getClass()).error("fail to execute timeout listener:", e); + } + } throw new OperationTimeoutException("Mutate operation timed out," + "unable to modify counter [" + key + ']'); } @@ -2011,7 +2027,7 @@ public void complete() { rv.signalComplete(); } }); - rv.setTimeoutListeners(timeoutListeners); + rv.setTimeoutListeners(from(m), timeoutListeners); mconn.enqueueOperation(key, op); rv.setOperation(op); return rv; @@ -2346,7 +2362,7 @@ public void complete() { op = opFact.delete(key, cas, callback); } - rv.setTimeoutListeners(timeoutListeners); + rv.setTimeoutListeners(delete, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); return rv; diff --git a/src/main/java/net/spy/memcached/TimeoutListener.java b/src/main/java/net/spy/memcached/TimeoutListener.java index 5436aab4d..f7616a307 100644 --- a/src/main/java/net/spy/memcached/TimeoutListener.java +++ b/src/main/java/net/spy/memcached/TimeoutListener.java @@ -3,7 +3,53 @@ import java.util.EventListener; import java.util.concurrent.Future; +import net.spy.memcached.ops.ConcatenationType; +import net.spy.memcached.ops.Mutator; +import net.spy.memcached.ops.StoreType; + public interface TimeoutListener extends EventListener { - void onTimeout(Future future) throws Exception; + /** + * @param future may be {@code null} if from some non async operations. + */ + void onTimeout(Method method, Future future) throws Exception; + + enum Method { + get, getAndTouch, getBulk, getBulkSome, gets, set, replace, add, append, prepend, touch, cas, incr, decr, delete; + + static Method from(Mutator mutator) { + switch (mutator) { + case incr: + return incr; + case decr: + return decr; + default: + return null; + } + } + + static Method from(ConcatenationType concatenationType) { + switch (concatenationType) { + case append: + return append; + case prepend: + return prepend; + default: + return null; + } + } + + static Method from(StoreType storeType) { + switch (storeType) { + case set: + return set; + case add: + return add; + case replace: + return replace; + default: + return null; + } + } + } } diff --git a/src/main/java/net/spy/memcached/internal/AbstractListenableFuture.java b/src/main/java/net/spy/memcached/internal/AbstractListenableFuture.java index 54a1084c6..ceac729a2 100644 --- a/src/main/java/net/spy/memcached/internal/AbstractListenableFuture.java +++ b/src/main/java/net/spy/memcached/internal/AbstractListenableFuture.java @@ -24,6 +24,7 @@ package net.spy.memcached.internal; import net.spy.memcached.TimeoutListener; +import net.spy.memcached.TimeoutListener.Method; import net.spy.memcached.compat.SpyObject; import java.util.ArrayList; @@ -67,7 +68,7 @@ protected AbstractListenableFuture(ExecutorService executor) { listeners = new ArrayList>>(); } - public abstract void setTimeoutListeners(List timeoutListeners); + public abstract void setTimeoutListeners(Method method, List timeoutListeners); /** * Returns the current executor. diff --git a/src/main/java/net/spy/memcached/internal/BulkGetFuture.java b/src/main/java/net/spy/memcached/internal/BulkGetFuture.java index bcdf141a0..3ea478b77 100644 --- a/src/main/java/net/spy/memcached/internal/BulkGetFuture.java +++ b/src/main/java/net/spy/memcached/internal/BulkGetFuture.java @@ -23,6 +23,9 @@ package net.spy.memcached.internal; +import static net.spy.memcached.TimeoutListener.Method.getBulk; +import static net.spy.memcached.TimeoutListener.Method.getBulkSome; + import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -38,6 +41,7 @@ import net.spy.memcached.MemcachedConnection; import net.spy.memcached.TimeoutListener; +import net.spy.memcached.TimeoutListener.Method; import net.spy.memcached.compat.log.LoggerFactory; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationState; @@ -109,7 +113,7 @@ public Map getSome(long to, TimeUnit unit) timeout = true; for (TimeoutListener timeoutListener : timeoutListeners) { try { - timeoutListener.onTimeout(this); + timeoutListener.onTimeout(getBulkSome, this); } catch (Exception e) { LoggerFactory.getLogger(getClass()).error("fail to execute timeout listener:", e); } @@ -136,7 +140,7 @@ public Map get(long to, TimeUnit unit) this.timeout = true; for (TimeoutListener timeoutListener : timeoutListeners) { try { - timeoutListener.onTimeout(this); + timeoutListener.onTimeout(getBulk, this); } catch (Exception e) { LoggerFactory.getLogger(getClass()).error("fail to execute timeout listener:", e); } @@ -242,7 +246,8 @@ public void signalComplete() { notifyListeners(); } - public void setTimeoutListeners(List timeoutListeners) { + @Override + public void setTimeoutListeners(Method method, List timeoutListeners) { this.timeoutListeners = timeoutListeners; } } diff --git a/src/main/java/net/spy/memcached/internal/GetFuture.java b/src/main/java/net/spy/memcached/internal/GetFuture.java index dddc66116..bf75378ae 100644 --- a/src/main/java/net/spy/memcached/internal/GetFuture.java +++ b/src/main/java/net/spy/memcached/internal/GetFuture.java @@ -24,6 +24,8 @@ package net.spy.memcached.internal; +import static net.spy.memcached.TimeoutListener.Method.get; + import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -33,6 +35,7 @@ import java.util.concurrent.TimeoutException; import net.spy.memcached.TimeoutListener; +import net.spy.memcached.TimeoutListener.Method; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationStatus; @@ -111,7 +114,8 @@ public void signalComplete() { notifyListeners(); } - public void setTimeoutListeners(List timeoutListeners) { - rv.setTimeoutListeners(timeoutListeners); + @Override + public void setTimeoutListeners(Method method, List timeoutListeners) { + rv.setTimeoutListeners(get, timeoutListeners); } } diff --git a/src/main/java/net/spy/memcached/internal/OperationFuture.java b/src/main/java/net/spy/memcached/internal/OperationFuture.java index 4e1cb1bf6..c65bde3d4 100644 --- a/src/main/java/net/spy/memcached/internal/OperationFuture.java +++ b/src/main/java/net/spy/memcached/internal/OperationFuture.java @@ -23,7 +23,6 @@ package net.spy.memcached.internal; -import java.util.Collection; import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; @@ -36,6 +35,7 @@ import net.spy.memcached.MemcachedConnection; import net.spy.memcached.TimeoutListener; +import net.spy.memcached.TimeoutListener.Method; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationState; import net.spy.memcached.ops.OperationStatus; @@ -64,6 +64,7 @@ public class OperationFuture private final long timeout; private Operation op; private final String key; + private Method method; private List timeoutListeners; private Long cas; @@ -104,7 +105,8 @@ public OperationFuture(String k, CountDownLatch l, AtomicReference oref, } @Override - public void setTimeoutListeners(List timeoutListeners) { + public void setTimeoutListeners(Method method, List timeoutListeners) { + this.method = method; this.timeoutListeners = timeoutListeners; } @@ -175,7 +177,7 @@ public T get(long duration, TimeUnit units) throws InterruptedException, } for (TimeoutListener listener : timeoutListeners) { try { - listener.onTimeout(this); + listener.onTimeout(method, this); } catch (Exception e) { getLogger().error("Error execute timeout listeners", e); } diff --git a/src/test/java/net/spy/memcached/MemcachedTimeoutListenerTest.java b/src/test/java/net/spy/memcached/MemcachedTimeoutListenerTest.java index b7d7a5b97..363057979 100644 --- a/src/test/java/net/spy/memcached/MemcachedTimeoutListenerTest.java +++ b/src/test/java/net/spy/memcached/MemcachedTimeoutListenerTest.java @@ -2,26 +2,41 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static net.spy.memcached.TestConfig.PORT_NUMBER; +import static net.spy.memcached.TimeoutListener.Method.add; +import static net.spy.memcached.TimeoutListener.Method.append; +import static net.spy.memcached.TimeoutListener.Method.decr; +import static net.spy.memcached.TimeoutListener.Method.delete; +import static net.spy.memcached.TimeoutListener.Method.get; +import static net.spy.memcached.TimeoutListener.Method.getAndTouch; +import static net.spy.memcached.TimeoutListener.Method.getBulk; +import static net.spy.memcached.TimeoutListener.Method.getBulkSome; +import static net.spy.memcached.TimeoutListener.Method.incr; +import static net.spy.memcached.TimeoutListener.Method.prepend; +import static net.spy.memcached.TimeoutListener.Method.replace; +import static net.spy.memcached.TimeoutListener.Method.set; import java.io.IOException; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; import junit.framework.TestCase; +import net.spy.memcached.TimeoutListener.Method; public class MemcachedTimeoutListenerTest extends TestCase { private MemcachedClient client = null; - private int timeoutCount; + private Set methodSet = new HashSet(); @Override protected void setUp() throws IOException { - client = new MemcachedClient(AddrUtil.getAddresses("1.1.1.1:" + PORT_NUMBER)); + client = new MemcachedClient(new BinaryConnectionFactory(), AddrUtil.getAddresses("1.1.1.1:" + PORT_NUMBER)); client.addTimeoutListener(new TimeoutListener() { - public void onTimeout(Future future) { - timeoutCount++; + public void onTimeout(Method method, Future future) { + methodSet.add(method); } }); } @@ -49,20 +64,96 @@ public void testTimeoutListener() throws ExecutionException, InterruptedExceptio fail(); } catch (OperationTimeoutException e) { } - assertEquals(1, timeoutCount); + assertTrue(methodSet.contains(get)); + try { client.set("test", 1, 1).get(1, MILLISECONDS); fail(); } catch (TimeoutException e) { } - assertEquals(2, timeoutCount); + assertTrue(methodSet.contains(set)); + try { client.getBulk("test1", "test2"); fail(); } catch (OperationTimeoutException e) { } - assertEquals(3, timeoutCount); + assertTrue(methodSet.contains(getBulk)); + client.asyncGetBulk("test3").getSome(1, MILLISECONDS); - assertEquals(4, timeoutCount); + assertTrue(methodSet.contains(getBulkSome)); + + try { + client.getAndTouch("test1", 1); + fail(); + } catch (OperationTimeoutException e) { + } + assertTrue(methodSet.contains(getAndTouch)); + + try { + client.incr("test1", 1); + fail(); + } catch (OperationTimeoutException e) { + } + assertTrue(methodSet.contains(incr)); + + try { + client.decr("test1", 1); + fail(); + } catch (OperationTimeoutException e) { + } + assertTrue(methodSet.contains(decr)); + + try { + client.append("test1", 1).get(1, MILLISECONDS); + fail(); + } catch (TimeoutException e) { + } + assertTrue(methodSet.contains(append)); + + try { + client.prepend("test1", 1).get(1, MILLISECONDS); + fail(); + } catch (TimeoutException e) { + } + assertTrue(methodSet.contains(prepend)); + + try { + client.add("test1", 1, "value").get(1, MILLISECONDS); + fail(); + } catch (TimeoutException e) { + } + assertTrue(methodSet.contains(add)); + + try { + client.replace("test1", 1, "value").get(1, MILLISECONDS); + fail(); + } catch (TimeoutException e) { + } + assertTrue(methodSet.contains(replace)); + + try { + client.delete("test1").get(1, MILLISECONDS); + fail(); + } catch (TimeoutException e) { + } + assertTrue(methodSet.contains(delete)); + + assertTrue(methodSet.remove(incr)); + assertTrue(methodSet.remove(decr)); + + try { + client.asyncIncr("test1", 1).get(1, MILLISECONDS); + fail(); + } catch (TimeoutException e) { + } + assertTrue(methodSet.contains(incr)); + + try { + client.asyncDecr("test1", 1).get(1, MILLISECONDS); + fail(); + } catch (TimeoutException e) { + } + assertTrue(methodSet.contains(decr)); } } diff --git a/src/test/java/net/spy/memcached/internal/DummyListenableFuture.java b/src/test/java/net/spy/memcached/internal/DummyListenableFuture.java index c6b5fd81a..2c803253f 100644 --- a/src/test/java/net/spy/memcached/internal/DummyListenableFuture.java +++ b/src/test/java/net/spy/memcached/internal/DummyListenableFuture.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeoutException; import net.spy.memcached.TimeoutListener; +import net.spy.memcached.TimeoutListener.Method; /** * A very basic {@link ListenableFuture} to verify and test basic @@ -99,7 +100,7 @@ public DummyListenableFuture removeListener( return this; } - public void setTimeoutListeners(List timeoutListeners) { + public void setTimeoutListeners(Method method, List timeoutListeners) { throw new UnsupportedOperationException(); } } From f38dd189786d3b9a9256ed7c1bfff83975403848 Mon Sep 17 00:00:00 2001 From: "w.vela" Date: Tue, 28 Nov 2017 16:46:29 +0800 Subject: [PATCH 03/15] Add getOperation for Future. --- src/main/java/net/spy/memcached/internal/BulkGetFuture.java | 5 +++++ src/main/java/net/spy/memcached/internal/GetFuture.java | 4 ++++ .../java/net/spy/memcached/internal/OperationFuture.java | 5 +++++ 3 files changed, 14 insertions(+) diff --git a/src/main/java/net/spy/memcached/internal/BulkGetFuture.java b/src/main/java/net/spy/memcached/internal/BulkGetFuture.java index 3ea478b77..f9d1dd41a 100644 --- a/src/main/java/net/spy/memcached/internal/BulkGetFuture.java +++ b/src/main/java/net/spy/memcached/internal/BulkGetFuture.java @@ -23,6 +23,7 @@ package net.spy.memcached.internal; +import static java.util.Collections.unmodifiableCollection; import static net.spy.memcached.TimeoutListener.Method.getBulk; import static net.spy.memcached.TimeoutListener.Method.getBulkSome; @@ -204,6 +205,10 @@ public OperationStatus getStatus() { return status; } + public Collection getOperations() { + return unmodifiableCollection(ops); + } + public void setStatus(OperationStatus s) { status = s; } diff --git a/src/main/java/net/spy/memcached/internal/GetFuture.java b/src/main/java/net/spy/memcached/internal/GetFuture.java index bf75378ae..4711c4526 100644 --- a/src/main/java/net/spy/memcached/internal/GetFuture.java +++ b/src/main/java/net/spy/memcached/internal/GetFuture.java @@ -75,6 +75,10 @@ public T get(long duration, TimeUnit units) throws InterruptedException, return v == null ? null : v.get(); } + public Operation getOperation() { + return rv.getOperation(); + } + public OperationStatus getStatus() { return rv.getStatus(); } diff --git a/src/main/java/net/spy/memcached/internal/OperationFuture.java b/src/main/java/net/spy/memcached/internal/OperationFuture.java index c65bde3d4..52e2dabd7 100644 --- a/src/main/java/net/spy/memcached/internal/OperationFuture.java +++ b/src/main/java/net/spy/memcached/internal/OperationFuture.java @@ -248,6 +248,11 @@ public Long getCas() { } return cas; } + + public Operation getOperation() { + return op; + } + /** * Get the current status of this operation. * From 14a9140ef7b9e91d89d74f2239a96f00a555984a Mon Sep 17 00:00:00 2001 From: "w.vela" Date: Sat, 2 Dec 2017 15:11:10 +0800 Subject: [PATCH 04/15] Add some timestamp getter for Operation. --- .../java/net/spy/memcached/ops/Operation.java | 6 +++++ .../memcached/protocol/BaseOperationImpl.java | 25 ++++++++++++++++++- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/src/main/java/net/spy/memcached/ops/Operation.java b/src/main/java/net/spy/memcached/ops/Operation.java index b2b1f6a6b..36fa47f67 100644 --- a/src/main/java/net/spy/memcached/ops/Operation.java +++ b/src/main/java/net/spy/memcached/ops/Operation.java @@ -184,4 +184,10 @@ public interface Operation { * Sets the clone count for this operation. */ void setCloneCount(int count); + + long getCreateTimestamp(); + + long getStartWritingTimestamp(); + + long getFinishedReadTimestamp(); } diff --git a/src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java b/src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java index 24d690899..8d8e6defa 100644 --- a/src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java @@ -23,6 +23,8 @@ package net.spy.memcached.protocol; +import static java.lang.System.currentTimeMillis; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -31,7 +33,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.concurrent.atomic.AtomicLong; import net.spy.memcached.MemcachedNode; import net.spy.memcached.compat.SpyObject; @@ -70,6 +71,10 @@ public abstract class BaseOperationImpl extends SpyObject implements Operation { new HashSet(); private long writeCompleteTimestamp; + private final long createTime = currentTimeMillis(); + private long startWriteTimestamp; + private long finishReadTimestamp; + /** * If the operation gets cloned, the reference is used to cascade cancellations * and timeouts. @@ -164,12 +169,18 @@ protected final synchronized void transitionState(OperationState newState) { cmd = null; } if (state == OperationState.COMPLETE) { + if (finishReadTimestamp == 0) { + finishReadTimestamp = currentTimeMillis(); + } callback.complete(); } } public final void writing() { transitionState(OperationState.WRITING); + if(startWriteTimestamp == 0){ + startWriteTimestamp = currentTimeMillis(); + } } public final void writeComplete() { @@ -275,4 +286,16 @@ public int getCloneCount() { public void setCloneCount(int count) { cloneCount = count; } + + public long getCreateTimestamp() { + return createTime; + } + + public long getStartWritingTimestamp() { + return startWriteTimestamp; + } + + public long getFinishedReadTimestamp() { + return finishReadTimestamp; + } } From e883e6f799a841b86478cc67a9d47f691274e729 Mon Sep 17 00:00:00 2001 From: "w.vela" Date: Sat, 2 Dec 2017 16:38:48 +0800 Subject: [PATCH 05/15] Add timeout ops for BulkGetFuture. --- .../java/net/spy/memcached/internal/BulkGetFuture.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/main/java/net/spy/memcached/internal/BulkGetFuture.java b/src/main/java/net/spy/memcached/internal/BulkGetFuture.java index f9d1dd41a..9e78e33d8 100644 --- a/src/main/java/net/spy/memcached/internal/BulkGetFuture.java +++ b/src/main/java/net/spy/memcached/internal/BulkGetFuture.java @@ -66,6 +66,7 @@ public class BulkGetFuture private OperationStatus status; private boolean cancelled = false; private boolean timeout = false; + private Collection timeoutOps; private List timeoutListeners; public BulkGetFuture(Map> m, Collection getOps, @@ -112,6 +113,7 @@ public Map getSome(long to, TimeUnit unit) Map ret = internalGet(to, unit, timedoutOps); if (timedoutOps.size() > 0) { timeout = true; + this.timeoutOps = timedoutOps; for (TimeoutListener timeoutListener : timeoutListeners) { try { timeoutListener.onTimeout(getBulkSome, this); @@ -139,6 +141,7 @@ public Map get(long to, TimeUnit unit) Map ret = internalGet(to, unit, timedoutOps); if (timedoutOps.size() > 0) { this.timeout = true; + this.timeoutOps = timedoutOps; for (TimeoutListener timeoutListener : timeoutListeners) { try { timeoutListener.onTimeout(getBulk, this); @@ -255,4 +258,8 @@ public void signalComplete() { public void setTimeoutListeners(Method method, List timeoutListeners) { this.timeoutListeners = timeoutListeners; } + + public Collection getTimeoutOps() { + return timeoutOps; + } } From 3ccb34f3997ab2dad4c07690d6c44dabc683904c Mon Sep 17 00:00:00 2001 From: "w.vela" Date: Mon, 11 Dec 2017 23:00:53 +0800 Subject: [PATCH 06/15] Add name for thread name. --- src/main/java/net/spy/memcached/DefaultConnectionFactory.java | 2 +- src/main/java/net/spy/memcached/MemcachedConnection.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/net/spy/memcached/DefaultConnectionFactory.java b/src/main/java/net/spy/memcached/DefaultConnectionFactory.java index 16623cd3b..b7ed2f164 100644 --- a/src/main/java/net/spy/memcached/DefaultConnectionFactory.java +++ b/src/main/java/net/spy/memcached/DefaultConnectionFactory.java @@ -294,7 +294,7 @@ public ExecutorService getListenerExecutorService() { ThreadFactory threadFactory = new ThreadFactory() { @Override public Thread newThread(Runnable r) { - return new Thread(r, "FutureNotifyListener"); + return new Thread(r, "FutureNotifyListener[" + getName() + "]"); } }; diff --git a/src/main/java/net/spy/memcached/MemcachedConnection.java b/src/main/java/net/spy/memcached/MemcachedConnection.java index 47f432fb1..2a3b951db 100644 --- a/src/main/java/net/spy/memcached/MemcachedConnection.java +++ b/src/main/java/net/spy/memcached/MemcachedConnection.java @@ -302,7 +302,7 @@ public MemcachedConnection(final int bufSize, final ConnectionFactory f, registerMetrics(); - setName("Memcached IO over " + this); + setName("Memcached IO over [" + getName() +"] " + this); setDaemon(f.isDaemon()); start(); } From 646c4a17136de3de533f2fa737352472e38fd58d Mon Sep 17 00:00:00 2001 From: "w.vela" Date: Wed, 13 Dec 2017 11:51:44 +0800 Subject: [PATCH 07/15] fix thread name for memcached. --- src/main/java/net/spy/memcached/DefaultConnectionFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/net/spy/memcached/DefaultConnectionFactory.java b/src/main/java/net/spy/memcached/DefaultConnectionFactory.java index b7ed2f164..fc696d158 100644 --- a/src/main/java/net/spy/memcached/DefaultConnectionFactory.java +++ b/src/main/java/net/spy/memcached/DefaultConnectionFactory.java @@ -294,7 +294,7 @@ public ExecutorService getListenerExecutorService() { ThreadFactory threadFactory = new ThreadFactory() { @Override public Thread newThread(Runnable r) { - return new Thread(r, "FutureNotifyListener[" + getName() + "]"); + return new Thread(r, "FutureNotifyListener[" + DefaultConnectionFactory.this.getName() + "]"); } }; From a6161813c83dae8ef14bd986396c2eabd1ff72da Mon Sep 17 00:00:00 2001 From: "w.vela" Date: Thu, 21 Dec 2017 19:25:53 +0800 Subject: [PATCH 08/15] fix thread name for Memcached IO threads. --- src/main/java/net/spy/memcached/MemcachedConnection.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/net/spy/memcached/MemcachedConnection.java b/src/main/java/net/spy/memcached/MemcachedConnection.java index 2a3b951db..c0443ba28 100644 --- a/src/main/java/net/spy/memcached/MemcachedConnection.java +++ b/src/main/java/net/spy/memcached/MemcachedConnection.java @@ -302,7 +302,9 @@ public MemcachedConnection(final int bufSize, final ConnectionFactory f, registerMetrics(); - setName("Memcached IO over [" + getName() +"] " + this); + String name = f instanceof DefaultConnectionFactory ? ((DefaultConnectionFactory) f) + .getName() : getName(); + setName("Memcached IO over [" + name + "] " + this); setDaemon(f.isDaemon()); start(); } From 194a7a6a026a7a8baaff4797b33582006c9c6cd4 Mon Sep 17 00:00:00 2001 From: "w.vela" Date: Wed, 27 Dec 2017 18:29:02 +0800 Subject: [PATCH 09/15] [code healthy] Add FQCN support for slf4j based log system. --- .../spy/memcached/compat/log/SLF4JLogger.java | 48 ++++++++++++++++--- 1 file changed, 41 insertions(+), 7 deletions(-) diff --git a/src/main/java/net/spy/memcached/compat/log/SLF4JLogger.java b/src/main/java/net/spy/memcached/compat/log/SLF4JLogger.java index 65427a306..9f57eec32 100644 --- a/src/main/java/net/spy/memcached/compat/log/SLF4JLogger.java +++ b/src/main/java/net/spy/memcached/compat/log/SLF4JLogger.java @@ -22,6 +22,14 @@ package net.spy.memcached.compat.log; +import static org.slf4j.spi.LocationAwareLogger.DEBUG_INT; +import static org.slf4j.spi.LocationAwareLogger.ERROR_INT; +import static org.slf4j.spi.LocationAwareLogger.INFO_INT; +import static org.slf4j.spi.LocationAwareLogger.TRACE_INT; +import static org.slf4j.spi.LocationAwareLogger.WARN_INT; + +import org.slf4j.spi.LocationAwareLogger; + /** * Logging Implementation using the SLF4J * logging facade. @@ -38,7 +46,10 @@ */ public class SLF4JLogger extends AbstractLogger { + private static final String FQCN = AbstractLogger.class.getName(); + private final org.slf4j.Logger logger; + private final LocationAwareLogger locationAwareLogger; /** * Get an instance of the SLF4JLogger. @@ -46,6 +57,11 @@ public class SLF4JLogger extends AbstractLogger { public SLF4JLogger(String name) { super(name); logger = org.slf4j.LoggerFactory.getLogger(name); + if(logger instanceof LocationAwareLogger) { + locationAwareLogger = (LocationAwareLogger) logger; + } else { + locationAwareLogger = null; + } } @Override @@ -78,22 +94,40 @@ public void log(Level level, Object message, Throwable e) { switch(level) { case TRACE: - logger.trace(message.toString(), e); + if (locationAwareLogger != null) { + locationAwareLogger.log(null, FQCN, TRACE_INT, message.toString(), null, e); + } else { + logger.trace(message.toString(), e); + } break; case DEBUG: - logger.debug(message.toString(), e); + if (locationAwareLogger != null) { + locationAwareLogger.log(null, FQCN, DEBUG_INT, message.toString(), null, e); + } else { + logger.debug(message.toString(), e); + } break; case INFO: - logger.info(message.toString(), e); + if (locationAwareLogger != null) { + locationAwareLogger.log(null, FQCN, INFO_INT, message.toString(), null, e); + } else { + logger.info(message.toString(), e); + } break; case WARN: - logger.warn(message.toString(), e); + if (locationAwareLogger != null) { + locationAwareLogger.log(null, FQCN, WARN_INT, message.toString(), null, e); + } else { + logger.warn(message.toString(), e); + } break; case ERROR: - logger.error(message.toString(), e); - break; case FATAL: - logger.error(message.toString(), e); + if (locationAwareLogger != null) { + locationAwareLogger.log(null, FQCN, ERROR_INT, message.toString(), null, e); + } else { + logger.error(message.toString(), e); + } break; default: logger.error("Unhandled Logging Level: " + level From 3e34da6346ad87645fa2d08a67f0e64c79d33aec Mon Sep 17 00:00:00 2001 From: "w.vela" Date: Sat, 30 Dec 2017 22:59:12 +0800 Subject: [PATCH 10/15] [refactor] change callback ExecutorService to Executor. --- pom.xml | 32 ++++++++++++- .../net/spy/memcached/ConnectionFactory.java | 3 +- .../memcached/ConnectionFactoryBuilder.java | 13 ++--- .../memcached/DefaultConnectionFactory.java | 15 +++--- .../net/spy/memcached/MemcachedClient.java | 47 +++++++++---------- .../spy/memcached/MemcachedConnection.java | 10 ++-- .../internal/AbstractListenableFuture.java | 15 +++--- .../spy/memcached/internal/BulkGetFuture.java | 4 +- .../net/spy/memcached/internal/GetFuture.java | 4 +- .../memcached/internal/OperationFuture.java | 6 +-- 10 files changed, 89 insertions(+), 60 deletions(-) diff --git a/pom.xml b/pom.xml index 4a07d5fbc..ab7849f0d 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ 4.0.0 net.spy spymemcached - 2.999.999-SNAPSHOT + 2.12.3-kwai-0.0.10 Spymemcached A client library for memcached. @@ -42,6 +42,19 @@ http://couchbase.com/ + + + + kuaishou.releases + Internal Release Repository + http://nexus.corp.kuaishou.com:88/nexus/content/repositories/releases/ + + + kuaishou.snapshots + Internal Snapshot Repository + http://nexus.corp.kuaishou.com:88/nexus/content/repositories/snapshots/ + + @@ -85,5 +98,22 @@ + + + + org.apache.maven.plugins + maven-source-plugin + 3.0.0 + + + attach-sources + + jar + + + + + + diff --git a/src/main/java/net/spy/memcached/ConnectionFactory.java b/src/main/java/net/spy/memcached/ConnectionFactory.java index a85a6dda3..2ed6af30d 100644 --- a/src/main/java/net/spy/memcached/ConnectionFactory.java +++ b/src/main/java/net/spy/memcached/ConnectionFactory.java @@ -30,6 +30,7 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import net.spy.memcached.auth.AuthDescriptor; @@ -87,7 +88,7 @@ MemcachedNode createMemcachedNode(SocketAddress sa, SocketChannel c, * Get the ExecutorService which is used to asynchronously execute listeners * on futures. */ - ExecutorService getListenerExecutorService(); + Executor getListenerExecutorService(); /** * Returns true if the default provided {@link ExecutorService} has not been diff --git a/src/main/java/net/spy/memcached/ConnectionFactoryBuilder.java b/src/main/java/net/spy/memcached/ConnectionFactoryBuilder.java index b3ece2eec..cfc040149 100644 --- a/src/main/java/net/spy/memcached/ConnectionFactoryBuilder.java +++ b/src/main/java/net/spy/memcached/ConnectionFactoryBuilder.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import net.spy.memcached.auth.AuthDescriptor; @@ -75,7 +76,7 @@ public class ConnectionFactoryBuilder { protected MetricType metricType = null; protected MetricCollector collector = null; - protected ExecutorService executorService = null; + protected Executor executor = null; protected long authWaitTime = DefaultConnectionFactory.DEFAULT_AUTH_WAIT_TIME; /** @@ -311,8 +312,8 @@ public ConnectionFactoryBuilder setMetricCollector(MetricCollector collector) { * * @param executorService the ExecutorService to use. */ - public ConnectionFactoryBuilder setListenerExecutorService(ExecutorService executorService) { - this.executorService = executorService; + public ConnectionFactoryBuilder setListenerExecutorService(Executor executorService) { + this.executor = executorService; return this; } @@ -447,13 +448,13 @@ public MetricCollector getMetricCollector() { } @Override - public ExecutorService getListenerExecutorService() { - return executorService == null ? super.getListenerExecutorService() : executorService; + public Executor getListenerExecutorService() { + return executor == null ? super.getListenerExecutorService() : executor; } @Override public boolean isDefaultExecutorService() { - return executorService == null; + return executor == null; } @Override diff --git a/src/main/java/net/spy/memcached/DefaultConnectionFactory.java b/src/main/java/net/spy/memcached/DefaultConnectionFactory.java index fc696d158..5c5f5f828 100644 --- a/src/main/java/net/spy/memcached/DefaultConnectionFactory.java +++ b/src/main/java/net/spy/memcached/DefaultConnectionFactory.java @@ -32,10 +32,9 @@ import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -132,9 +131,9 @@ public class DefaultConnectionFactory extends SpyObject implements private MetricCollector metrics; /** - * The ExecutorService in which the listener callbacks will be executed. + * The Executor in which the listener callbacks will be executed. */ - private ExecutorService executorService; + private Executor executor; /** * Construct a DefaultConnectionFactory with the given parameters. @@ -289,8 +288,8 @@ public long getAuthWaitTime() { * @return the stored {@link ExecutorService}. */ @Override - public ExecutorService getListenerExecutorService() { - if (executorService == null) { + public Executor getListenerExecutorService() { + if (executor == null) { ThreadFactory threadFactory = new ThreadFactory() { @Override public Thread newThread(Runnable r) { @@ -298,7 +297,7 @@ public Thread newThread(Runnable r) { } }; - executorService = new ThreadPoolExecutor( + executor = new ThreadPoolExecutor( 0, Runtime.getRuntime().availableProcessors(), 60L, @@ -308,7 +307,7 @@ public Thread newThread(Runnable r) { ); } - return executorService; + return executor; } @Override diff --git a/src/main/java/net/spy/memcached/MemcachedClient.java b/src/main/java/net/spy/memcached/MemcachedClient.java index 40737770a..e5bbabf1e 100644 --- a/src/main/java/net/spy/memcached/MemcachedClient.java +++ b/src/main/java/net/spy/memcached/MemcachedClient.java @@ -27,7 +27,6 @@ import static net.spy.memcached.TimeoutListener.Method.delete; import static net.spy.memcached.TimeoutListener.Method.from; import static net.spy.memcached.TimeoutListener.Method.getAndTouch; -import static net.spy.memcached.TimeoutListener.Method.getBulkSome; import static net.spy.memcached.TimeoutListener.Method.gets; import static net.spy.memcached.TimeoutListener.Method.touch; @@ -49,6 +48,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -166,7 +166,7 @@ public class MemcachedClient extends SpyObject implements MemcachedClientIF, protected final AuthThreadMonitor authMonitor = new AuthThreadMonitor(); - protected final ExecutorService executorService; + protected final Executor executor; private final List timeoutListeners = new ArrayList(); @@ -221,7 +221,7 @@ public MemcachedClient(ConnectionFactory cf, List addrs) assert mconn != null : "Connection factory failed to make a connection"; operationTimeout = cf.getOperationTimeout(); authDescriptor = cf.getAuthDescriptor(); - executorService = cf.getListenerExecutorService(); + executor = cf.getListenerExecutorService(); if (authDescriptor != null) { addObserver(this); } @@ -315,8 +315,7 @@ private OperationFuture asyncStore(StoreType storeType, CachedData co = tc.encode(value); final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = - new OperationFuture(key, latch, operationTimeout, - executorService); + new OperationFuture(key, latch, operationTimeout, executor); Operation op = opFact.store(storeType, key, co.getFlags(), exp, co.getData(), new StoreOperation.Callback() { @Override @@ -350,7 +349,7 @@ private OperationFuture asyncCat(ConcatenationType catType, CachedData co = tc.encode(value); final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, - latch, operationTimeout, executorService); + latch, operationTimeout, executor); Operation op = opFact.cat(catType, cas, key, co.getData(), new OperationCallback() { @Override @@ -402,8 +401,7 @@ public OperationFuture touch(final String key, final int exp, final Transcoder tc) { final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = - new OperationFuture(key, latch, operationTimeout, - executorService); + new OperationFuture(key, latch, operationTimeout, executor); Operation op = opFact.touch(key, exp, new OperationCallback() { @Override @@ -637,8 +635,7 @@ public OperationFuture prepend(String key, T val, CachedData co = tc.encode(value); final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = - new OperationFuture(key, latch, operationTimeout, - executorService); + new OperationFuture(key, latch, operationTimeout, executor); Operation op = opFact.cas(StoreType.set, key, casId, co.getFlags(), exp, co.getData(), new StoreOperation.Callback() { @Override @@ -1032,8 +1029,7 @@ public OperationFuture replace(String key, int exp, Object o) { public GetFuture asyncGet(final String key, final Transcoder tc) { final CountDownLatch latch = new CountDownLatch(1); - final GetFuture rv = new GetFuture(latch, operationTimeout, key, - executorService); + final GetFuture rv = new GetFuture(latch, operationTimeout, key, executor); Operation op = opFact.get(key, new GetOperation.Callback() { private Future val; @@ -1090,8 +1086,7 @@ public OperationFuture> asyncGets(final String key, final CountDownLatch latch = new CountDownLatch(1); final OperationFuture> rv = - new OperationFuture>(key, latch, operationTimeout, - executorService); + new OperationFuture>(key, latch, operationTimeout, executor); Operation op = opFact.gets(key, new GetsOperation.Callback() { private CASValue val; @@ -1336,7 +1331,7 @@ public BulkFuture> asyncGetBulk(Iterator keyIter, int initialLatchCount = chunks.isEmpty() ? 0 : 1; final CountDownLatch latch = new CountDownLatch(initialLatchCount); final Collection ops = new ArrayList(chunks.size()); - final BulkGetFuture rv = new BulkGetFuture(m, ops, latch, executorService); + final BulkGetFuture rv = new BulkGetFuture(m, ops, latch, executor); GetOperation.Callback cb = new GetOperation.Callback() { @Override @@ -1523,7 +1518,7 @@ public OperationFuture> asyncGetAndTouch(final String key, final int exp, final Transcoder tc) { final CountDownLatch latch = new CountDownLatch(1); final OperationFuture> rv = new OperationFuture>( - key, latch, operationTimeout, executorService); + key, latch, operationTimeout, executor); Operation op = opFact.getAndTouch(key, exp, new GetAndTouchOperation.Callback() { @@ -2013,7 +2008,7 @@ private OperationFuture asyncMutate(Mutator m, String key, long by, final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = - new OperationFuture(key, latch, operationTimeout, executorService); + new OperationFuture(key, latch, operationTimeout, executor); Operation op = opFact.mutate(m, key, by, def, exp, new OperationCallback() { @Override @@ -2335,7 +2330,7 @@ public OperationFuture delete(String key) { public OperationFuture delete(String key, long cas) { final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, - latch, operationTimeout, executorService); + latch, operationTimeout, executor); DeleteOperation.Callback callback = new DeleteOperation.Callback() { @Override @@ -2403,7 +2398,7 @@ public void complete() { }); return new OperationFuture(null, blatch, flushResult, - operationTimeout, executorService) { + operationTimeout, executor) { @Override public void set(Boolean o, OperationStatus s) { @@ -2521,10 +2516,12 @@ public boolean shutdown(long timeout, TimeUnit unit) { mconn.setName(baseName + " - SHUTTING DOWN"); boolean rv = true; if (connFactory.isDefaultExecutorService()) { - try { - executorService.shutdown(); - } catch (Exception ex) { - getLogger().warn("Failed shutting down the ExecutorService: ", ex); + if(executor instanceof ExecutorService) { + try { + ((ExecutorService) executor).shutdown(); + } catch (Exception ex) { + getLogger().warn("Failed shutting down the ExecutorService: ", ex); + } } } try { @@ -2675,8 +2672,8 @@ public TranscodeService getTranscoderService() { return tcService; } - public ExecutorService getExecutorService() { - return executorService; + public Executor getExecutorService() { + return executor; } @Override diff --git a/src/main/java/net/spy/memcached/MemcachedConnection.java b/src/main/java/net/spy/memcached/MemcachedConnection.java index c0443ba28..221c2249a 100644 --- a/src/main/java/net/spy/memcached/MemcachedConnection.java +++ b/src/main/java/net/spy/memcached/MemcachedConnection.java @@ -71,7 +71,7 @@ import java.util.TreeMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; /** @@ -227,9 +227,9 @@ public class MemcachedConnection extends SpyThread { private final boolean verifyAliveOnConnect; /** - * The {@link ExecutorService} to use for callbacks. + * The {@link Executor} to use for callbacks. */ - private final ExecutorService listenerExecutorService; + private final Executor listenerExecutor; /** * The {@link MetricCollector} to accumulate metrics (or dummy). @@ -276,7 +276,7 @@ public MemcachedConnection(final int bufSize, final ConnectionFactory f, selector = Selector.open(); retryOps = Collections.synchronizedList(new ArrayList()); nodesToShutdown = new ConcurrentLinkedQueue(); - listenerExecutorService = f.getListenerExecutorService(); + listenerExecutor = f.getListenerExecutorService(); this.bufSize = bufSize; this.connectionFactory = f; @@ -753,7 +753,7 @@ private void finishConnect(final SelectionKey sk, final MemcachedNode node) if (verifyAliveOnConnect) { final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture("noop", - latch, 2500, listenerExecutorService); + latch, 2500, listenerExecutor); NoopOperation testOp = opFact.noop(new OperationCallback() { public void receivedStatus(OperationStatus status) { rv.set(status.isSuccess(), status); diff --git a/src/main/java/net/spy/memcached/internal/AbstractListenableFuture.java b/src/main/java/net/spy/memcached/internal/AbstractListenableFuture.java index ceac729a2..b7e9ec4c3 100644 --- a/src/main/java/net/spy/memcached/internal/AbstractListenableFuture.java +++ b/src/main/java/net/spy/memcached/internal/AbstractListenableFuture.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -49,9 +50,9 @@ public abstract class AbstractListenableFuture implements ListenableFuture { /** - * The {@link ExecutorService} in which the notifications will be handled. + * The {@link Executor} in which the notifications will be handled. */ - private final ExecutorService service; + private final Executor service; /** * Holds the list of listeners which will be notified upon completion. @@ -63,7 +64,7 @@ public abstract class AbstractListenableFuture * * @param executor the executor in which the callbacks will be executed in. */ - protected AbstractListenableFuture(ExecutorService executor) { + protected AbstractListenableFuture(Executor executor) { service = executor; listeners = new ArrayList>>(); } @@ -75,7 +76,7 @@ protected AbstractListenableFuture(ExecutorService executor) { * * @return the current executor service. */ - protected ExecutorService executor() { + protected Executor executor() { return service; } @@ -112,9 +113,9 @@ protected Future addToListeners( * @param future the future to hand over. * @param listener the listener to notify. */ - protected void notifyListener(final ExecutorService executor, + protected void notifyListener(final Executor executor, final Future future, final GenericCompletionListener listener) { - executor.submit(new Runnable() { + executor.execute(new Runnable() { @Override public void run() { try { @@ -176,4 +177,4 @@ protected Future removeFromListeners( } return this; } -} \ No newline at end of file +} diff --git a/src/main/java/net/spy/memcached/internal/BulkGetFuture.java b/src/main/java/net/spy/memcached/internal/BulkGetFuture.java index 9e78e33d8..236317942 100644 --- a/src/main/java/net/spy/memcached/internal/BulkGetFuture.java +++ b/src/main/java/net/spy/memcached/internal/BulkGetFuture.java @@ -35,7 +35,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -70,7 +70,7 @@ public class BulkGetFuture private List timeoutListeners; public BulkGetFuture(Map> m, Collection getOps, - CountDownLatch l, ExecutorService service) { + CountDownLatch l, Executor service) { super(service); rvMap = m; ops = getOps; diff --git a/src/main/java/net/spy/memcached/internal/GetFuture.java b/src/main/java/net/spy/memcached/internal/GetFuture.java index 4711c4526..9ec8cbcdb 100644 --- a/src/main/java/net/spy/memcached/internal/GetFuture.java +++ b/src/main/java/net/spy/memcached/internal/GetFuture.java @@ -29,7 +29,7 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -53,7 +53,7 @@ public class GetFuture private final OperationFuture> rv; public GetFuture(CountDownLatch l, long opTimeout, String key, - ExecutorService service) { + Executor service) { super(service); this.rv = new OperationFuture>(key, l, opTimeout, service); } diff --git a/src/main/java/net/spy/memcached/internal/OperationFuture.java b/src/main/java/net/spy/memcached/internal/OperationFuture.java index 52e2dabd7..0cb2dff5c 100644 --- a/src/main/java/net/spy/memcached/internal/OperationFuture.java +++ b/src/main/java/net/spy/memcached/internal/OperationFuture.java @@ -27,6 +27,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -77,8 +78,7 @@ public class OperationFuture * @param l the latch to be used counting down the OperationFuture * @param opTimeout the timeout within which the operation needs to be done */ - public OperationFuture(String k, CountDownLatch l, long opTimeout, - ExecutorService service) { + public OperationFuture(String k, CountDownLatch l, long opTimeout, Executor service) { this(k, l, new AtomicReference(null), opTimeout, service); } @@ -93,7 +93,7 @@ public OperationFuture(String k, CountDownLatch l, long opTimeout, * @param opTimeout the timeout within which the operation needs to be done */ public OperationFuture(String k, CountDownLatch l, AtomicReference oref, - long opTimeout, ExecutorService service) { + long opTimeout, Executor service) { super(service); latch = l; From 55df0ba77b6e6c1559fdf9b1143ca6f9772b980a Mon Sep 17 00:00:00 2001 From: "w.vela" Date: Tue, 17 Apr 2018 11:44:27 +0800 Subject: [PATCH 11/15] [feature] add async op listener. --- pom.xml | 2 +- .../net/spy/memcached/AsyncOpListener.java | 17 +++ .../net/spy/memcached/MemcachedClient.java | 107 +++++++++++++++--- .../net/spy/memcached/internal/GetFuture.java | 2 +- 4 files changed, 113 insertions(+), 15 deletions(-) create mode 100644 src/main/java/net/spy/memcached/AsyncOpListener.java diff --git a/pom.xml b/pom.xml index ab7849f0d..ae0d49c61 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ 4.0.0 net.spy spymemcached - 2.12.3-kwai-0.0.10 + 2.12.3-kwai-0.0.11 Spymemcached A client library for memcached. diff --git a/src/main/java/net/spy/memcached/AsyncOpListener.java b/src/main/java/net/spy/memcached/AsyncOpListener.java new file mode 100644 index 000000000..17d66f420 --- /dev/null +++ b/src/main/java/net/spy/memcached/AsyncOpListener.java @@ -0,0 +1,17 @@ +package net.spy.memcached; + +import net.spy.memcached.TimeoutListener.Method; +import net.spy.memcached.internal.BulkGetFuture; +import net.spy.memcached.internal.GetFuture; +import net.spy.memcached.internal.OperationFuture; + +public interface AsyncOpListener { + + T beforeInvoke(Method method); + + void onBulkGetCompletion(T before, BulkGetFuture future); + + void onGetCompletion(T before, GetFuture future); + + void onOperationCompletion(T before, Method method, OperationFuture future); +} diff --git a/src/main/java/net/spy/memcached/MemcachedClient.java b/src/main/java/net/spy/memcached/MemcachedClient.java index e5bbabf1e..29f4a13f9 100644 --- a/src/main/java/net/spy/memcached/MemcachedClient.java +++ b/src/main/java/net/spy/memcached/MemcachedClient.java @@ -23,12 +23,11 @@ package net.spy.memcached; -import static net.spy.memcached.TimeoutListener.Method.cas; +import static net.spy.memcached.TimeoutListener.Method.*; import static net.spy.memcached.TimeoutListener.Method.delete; import static net.spy.memcached.TimeoutListener.Method.from; +import static net.spy.memcached.TimeoutListener.Method.get; import static net.spy.memcached.TimeoutListener.Method.getAndTouch; -import static net.spy.memcached.TimeoutListener.Method.gets; -import static net.spy.memcached.TimeoutListener.Method.touch; import java.io.IOException; import java.net.InetSocketAddress; @@ -38,9 +37,11 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; @@ -57,13 +58,17 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import net.spy.memcached.TimeoutListener.Method; import net.spy.memcached.auth.AuthDescriptor; import net.spy.memcached.auth.AuthThreadMonitor; import net.spy.memcached.compat.SpyObject; import net.spy.memcached.compat.log.LoggerFactory; import net.spy.memcached.internal.BulkFuture; +import net.spy.memcached.internal.BulkGetCompletionListener; import net.spy.memcached.internal.BulkGetFuture; +import net.spy.memcached.internal.GetCompletionListener; import net.spy.memcached.internal.GetFuture; +import net.spy.memcached.internal.OperationCompletionListener; import net.spy.memcached.internal.OperationFuture; import net.spy.memcached.internal.SingleElementInfiniteIterator; import net.spy.memcached.ops.CASOperationStatus; @@ -170,6 +175,8 @@ public class MemcachedClient extends SpyObject implements MemcachedClientIF, private final List timeoutListeners = new ArrayList(); + private final List> asyncOpListeners = new ArrayList>(); + /** * Get a memcache client operating on the specified memcached locations. * @@ -312,10 +319,14 @@ private CountDownLatch broadcastOp(BroadcastOpFactory of, private OperationFuture asyncStore(StoreType storeType, String key, int exp, T value, Transcoder tc) { + Method method = from(storeType); + final Map, Object> before = before(method); CachedData co = tc.encode(value); final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, latch, operationTimeout, executor); + addOpAsyncListener(method, before, rv); + Operation op = opFact.store(storeType, key, co.getFlags(), exp, co.getData(), new StoreOperation.Callback() { @Override @@ -333,12 +344,32 @@ public void complete() { rv.signalComplete(); } }); - rv.setTimeoutListeners(from(storeType), timeoutListeners); + rv.setTimeoutListeners(method, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); return rv; } + private void addOpAsyncListener(final Method method, + final Map, Object> before, OperationFuture rv) { + rv.addListener(new OperationCompletionListener() { + + public void onComplete(OperationFuture future) { + for (Entry, Object> entry : before.entrySet()) { + entry.getKey().onOperationCompletion(entry.getValue(), method, future); + } + } + }); + } + + private Map, Object> before(Method method) { + Map, Object> result = new IdentityHashMap, Object>(); + for (AsyncOpListener asyncOpListener : asyncOpListeners) { + result.put(asyncOpListener, asyncOpListener.beforeInvoke(method)); + } + return result; + } + private OperationFuture asyncStore(StoreType storeType, String key, int exp, Object value) { return asyncStore(storeType, key, exp, value, transcoder); @@ -346,10 +377,14 @@ private OperationFuture asyncStore(StoreType storeType, String key, private OperationFuture asyncCat(ConcatenationType catType, long cas, String key, T value, Transcoder tc) { + Method method = from(catType); + final Map, Object> before = before(method); CachedData co = tc.encode(value); final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, latch, operationTimeout, executor); + addOpAsyncListener(method, before, rv); + Operation op = opFact.cat(catType, cas, key, co.getData(), new OperationCallback() { @Override @@ -363,7 +398,7 @@ public void complete() { rv.signalComplete(); } }); - rv.setTimeoutListeners(from(catType), timeoutListeners); + rv.setTimeoutListeners(method, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); return rv; @@ -399,9 +434,12 @@ public OperationFuture touch(final String key, final int exp) { @Override public OperationFuture touch(final String key, final int exp, final Transcoder tc) { + Method method = touch; + Map, Object> before = before(method); final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, latch, operationTimeout, executor); + addOpAsyncListener(method, before, rv); Operation op = opFact.touch(key, exp, new OperationCallback() { @Override @@ -415,7 +453,7 @@ public void complete() { rv.signalComplete(); } }); - rv.setTimeoutListeners(touch, timeoutListeners); + rv.setTimeoutListeners(method, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); return rv; @@ -632,10 +670,14 @@ public OperationFuture prepend(String key, T val, @Override public OperationFuture asyncCAS(String key, long casId, int exp, T value, Transcoder tc) { + Method method = cas; + Map, Object> before = before(method); CachedData co = tc.encode(value); final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, latch, operationTimeout, executor); + addOpAsyncListener(method, before, rv); + Operation op = opFact.cas(StoreType.set, key, casId, co.getFlags(), exp, co.getData(), new StoreOperation.Callback() { @Override @@ -660,7 +702,7 @@ public void complete() { rv.signalComplete(); } }); - rv.setTimeoutListeners(cas, timeoutListeners); + rv.setTimeoutListeners(method, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); return rv; @@ -1028,8 +1070,17 @@ public OperationFuture replace(String key, int exp, Object o) { @Override public GetFuture asyncGet(final String key, final Transcoder tc) { + final Map, Object> before = before(get); final CountDownLatch latch = new CountDownLatch(1); final GetFuture rv = new GetFuture(latch, operationTimeout, key, executor); + rv.addListener(new GetCompletionListener() { + + public void onComplete(GetFuture future) { + for (Entry, Object> entry : before.entrySet()) { + entry.getKey().onGetCompletion(entry.getValue(), future); + } + } + }); Operation op = opFact.get(key, new GetOperation.Callback() { private Future val; @@ -1051,7 +1102,7 @@ public void complete() { rv.signalComplete(); } }); - rv.setTimeoutListeners(null, timeoutListeners); + rv.setTimeoutListeners(get, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); return rv; @@ -1084,9 +1135,12 @@ public GetFuture asyncGet(final String key) { public OperationFuture> asyncGets(final String key, final Transcoder tc) { + Method method = gets; + Map, Object> before = before(method); final CountDownLatch latch = new CountDownLatch(1); final OperationFuture> rv = new OperationFuture>(key, latch, operationTimeout, executor); + addOpAsyncListener(method, before, rv); Operation op = opFact.gets(key, new GetsOperation.Callback() { private CASValue val; @@ -1110,7 +1164,7 @@ public void complete() { rv.signalComplete(); } }); - rv.setTimeoutListeners(gets, timeoutListeners); + rv.setTimeoutListeners(method, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); return rv; @@ -1285,6 +1339,7 @@ public Object get(String key) { @Override public BulkFuture> asyncGetBulk(Iterator keyIter, Iterator> tcIter) { + final Map, Object> before = before(getBulk); final Map> m = new ConcurrentHashMap>(); // This map does not need to be a ConcurrentHashMap @@ -1332,6 +1387,14 @@ public BulkFuture> asyncGetBulk(Iterator keyIter, final CountDownLatch latch = new CountDownLatch(initialLatchCount); final Collection ops = new ArrayList(chunks.size()); final BulkGetFuture rv = new BulkGetFuture(m, ops, latch, executor); + rv.addListener(new BulkGetCompletionListener() { + + public void onComplete(BulkGetFuture future) { + for (Entry, Object> entry : before.entrySet()) { + entry.getKey().onBulkGetCompletion(entry.getValue(), future); + } + } + }); GetOperation.Callback cb = new GetOperation.Callback() { @Override @@ -1364,7 +1427,7 @@ public void complete() { final Map mops = new HashMap(); - for (Map.Entry> me : chunks.entrySet()) { + for (Entry> me : chunks.entrySet()) { Operation op = opFact.get(me.getValue(), cb); mops.put(me.getKey(), op); ops.add(op); @@ -1516,9 +1579,12 @@ public OperationFuture> asyncGetAndTouch(final String key, @Override public OperationFuture> asyncGetAndTouch(final String key, final int exp, final Transcoder tc) { + Method method = Method.getAndTouch; + Map, Object> before = before(method); final CountDownLatch latch = new CountDownLatch(1); final OperationFuture> rv = new OperationFuture>( key, latch, operationTimeout, executor); + addOpAsyncListener(method, before, rv); Operation op = opFact.getAndTouch(key, exp, new GetAndTouchOperation.Callback() { @@ -1543,7 +1609,7 @@ public void gotData(String k, int flags, long cas, byte[] data) { tc.getMaxSize()))); } }); - rv.setTimeoutListeners(getAndTouch, timeoutListeners); + rv.setTimeoutListeners(method, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); return rv; @@ -2006,9 +2072,13 @@ private OperationFuture asyncMutate(Mutator m, String key, long by, + "binary protocol or the sync variant."); } + Method method = from(m); + Map, Object> before = before(method); final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, latch, operationTimeout, executor); + addOpAsyncListener(method, before, rv); + Operation op = opFact.mutate(m, key, by, def, exp, new OperationCallback() { @Override @@ -2022,7 +2092,7 @@ public void complete() { rv.signalComplete(); } }); - rv.setTimeoutListeners(from(m), timeoutListeners); + rv.setTimeoutListeners(method, timeoutListeners); mconn.enqueueOperation(key, op); rv.setOperation(op); return rv; @@ -2328,9 +2398,12 @@ public OperationFuture delete(String key) { */ @Override public OperationFuture delete(String key, long cas) { + Method method = Method.delete; + Map, Object> before = before(method); final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, latch, operationTimeout, executor); + addOpAsyncListener(method, before, rv); DeleteOperation.Callback callback = new DeleteOperation.Callback() { @Override @@ -2357,7 +2430,7 @@ public void complete() { op = opFact.delete(key, cas, callback); } - rv.setTimeoutListeners(delete, timeoutListeners); + rv.setTimeoutListeners(method, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); return rv; @@ -2614,6 +2687,14 @@ public MemcachedClient addTimeoutListener(TimeoutListener listener) { return this; } + public MemcachedClient addAsyncOpListener(AsyncOpListener listener) { + if (listener == null) { + throw new NullPointerException(); + } + asyncOpListeners.add((AsyncOpListener) listener); + return this; + } + /** * Remove a connection observer. * diff --git a/src/main/java/net/spy/memcached/internal/GetFuture.java b/src/main/java/net/spy/memcached/internal/GetFuture.java index 9ec8cbcdb..b4b554f39 100644 --- a/src/main/java/net/spy/memcached/internal/GetFuture.java +++ b/src/main/java/net/spy/memcached/internal/GetFuture.java @@ -120,6 +120,6 @@ public void signalComplete() { @Override public void setTimeoutListeners(Method method, List timeoutListeners) { - rv.setTimeoutListeners(get, timeoutListeners); + rv.setTimeoutListeners(method, timeoutListeners); } } From 1a0766e571f587753fa871f6e060be0a89afd5c4 Mon Sep 17 00:00:00 2001 From: "w.vela" Date: Tue, 17 Apr 2018 14:10:12 +0800 Subject: [PATCH 12/15] [bugfix] fix npe on async op listener was added. --- .../net/spy/memcached/MemcachedClient.java | 50 +++++++++---------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/src/main/java/net/spy/memcached/MemcachedClient.java b/src/main/java/net/spy/memcached/MemcachedClient.java index 29f4a13f9..6da9eb20c 100644 --- a/src/main/java/net/spy/memcached/MemcachedClient.java +++ b/src/main/java/net/spy/memcached/MemcachedClient.java @@ -325,7 +325,6 @@ private OperationFuture asyncStore(StoreType storeType, final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, latch, operationTimeout, executor); - addOpAsyncListener(method, before, rv); Operation op = opFact.store(storeType, key, co.getFlags(), exp, co.getData(), new StoreOperation.Callback() { @@ -347,6 +346,7 @@ public void complete() { rv.setTimeoutListeners(method, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); + addOpAsyncListener(method, before, rv); return rv; } @@ -383,7 +383,6 @@ private OperationFuture asyncCat(ConcatenationType catType, final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, latch, operationTimeout, executor); - addOpAsyncListener(method, before, rv); Operation op = opFact.cat(catType, cas, key, co.getData(), new OperationCallback() { @@ -401,6 +400,7 @@ public void complete() { rv.setTimeoutListeners(method, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); + addOpAsyncListener(method, before, rv); return rv; } @@ -439,7 +439,6 @@ public OperationFuture touch(final String key, final int exp, final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, latch, operationTimeout, executor); - addOpAsyncListener(method, before, rv); Operation op = opFact.touch(key, exp, new OperationCallback() { @Override @@ -456,6 +455,7 @@ public void complete() { rv.setTimeoutListeners(method, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); + addOpAsyncListener(method, before, rv); return rv; } @@ -676,7 +676,6 @@ public OperationFuture prepend(String key, T val, final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, latch, operationTimeout, executor); - addOpAsyncListener(method, before, rv); Operation op = opFact.cas(StoreType.set, key, casId, co.getFlags(), exp, co.getData(), new StoreOperation.Callback() { @@ -705,6 +704,7 @@ public void complete() { rv.setTimeoutListeners(method, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); + addOpAsyncListener(method, before, rv); return rv; } @@ -1073,14 +1073,6 @@ public GetFuture asyncGet(final String key, final Transcoder tc) { final Map, Object> before = before(get); final CountDownLatch latch = new CountDownLatch(1); final GetFuture rv = new GetFuture(latch, operationTimeout, key, executor); - rv.addListener(new GetCompletionListener() { - - public void onComplete(GetFuture future) { - for (Entry, Object> entry : before.entrySet()) { - entry.getKey().onGetCompletion(entry.getValue(), future); - } - } - }); Operation op = opFact.get(key, new GetOperation.Callback() { private Future val; @@ -1105,6 +1097,14 @@ public void complete() { rv.setTimeoutListeners(get, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); + rv.addListener(new GetCompletionListener() { + + public void onComplete(GetFuture future) { + for (Entry, Object> entry : before.entrySet()) { + entry.getKey().onGetCompletion(entry.getValue(), future); + } + } + }); return rv; } @@ -1140,7 +1140,6 @@ public OperationFuture> asyncGets(final String key, final CountDownLatch latch = new CountDownLatch(1); final OperationFuture> rv = new OperationFuture>(key, latch, operationTimeout, executor); - addOpAsyncListener(method, before, rv); Operation op = opFact.gets(key, new GetsOperation.Callback() { private CASValue val; @@ -1167,6 +1166,7 @@ public void complete() { rv.setTimeoutListeners(method, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); + addOpAsyncListener(method, before, rv); return rv; } @@ -1387,14 +1387,6 @@ public BulkFuture> asyncGetBulk(Iterator keyIter, final CountDownLatch latch = new CountDownLatch(initialLatchCount); final Collection ops = new ArrayList(chunks.size()); final BulkGetFuture rv = new BulkGetFuture(m, ops, latch, executor); - rv.addListener(new BulkGetCompletionListener() { - - public void onComplete(BulkGetFuture future) { - for (Entry, Object> entry : before.entrySet()) { - entry.getKey().onBulkGetCompletion(entry.getValue(), future); - } - } - }); GetOperation.Callback cb = new GetOperation.Callback() { @Override @@ -1436,6 +1428,14 @@ public void complete() { rv.setTimeoutListeners(null, timeoutListeners); mconn.checkState(); mconn.addOperations(mops); + rv.addListener(new BulkGetCompletionListener() { + + public void onComplete(BulkGetFuture future) { + for (Entry, Object> entry : before.entrySet()) { + entry.getKey().onBulkGetCompletion(entry.getValue(), future); + } + } + }); return rv; } @@ -1584,7 +1584,6 @@ public OperationFuture> asyncGetAndTouch(final String key, final CountDownLatch latch = new CountDownLatch(1); final OperationFuture> rv = new OperationFuture>( key, latch, operationTimeout, executor); - addOpAsyncListener(method, before, rv); Operation op = opFact.getAndTouch(key, exp, new GetAndTouchOperation.Callback() { @@ -1612,6 +1611,7 @@ public void gotData(String k, int flags, long cas, byte[] data) { rv.setTimeoutListeners(method, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); + addOpAsyncListener(method, before, rv); return rv; } @@ -2077,7 +2077,6 @@ private OperationFuture asyncMutate(Mutator m, String key, long by, final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, latch, operationTimeout, executor); - addOpAsyncListener(method, before, rv); Operation op = opFact.mutate(m, key, by, def, exp, new OperationCallback() { @@ -2093,8 +2092,9 @@ public void complete() { } }); rv.setTimeoutListeners(method, timeoutListeners); - mconn.enqueueOperation(key, op); rv.setOperation(op); + mconn.enqueueOperation(key, op); + addOpAsyncListener(method, before, rv); return rv; } @@ -2403,7 +2403,6 @@ public OperationFuture delete(String key, long cas) { final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, latch, operationTimeout, executor); - addOpAsyncListener(method, before, rv); DeleteOperation.Callback callback = new DeleteOperation.Callback() { @Override @@ -2433,6 +2432,7 @@ public void complete() { rv.setTimeoutListeners(method, timeoutListeners); rv.setOperation(op); mconn.enqueueOperation(key, op); + addOpAsyncListener(method, before, rv); return rv; } From e595fee941c6ca5dc8f5714343467f75db46e758 Mon Sep 17 00:00:00 2001 From: "w.vela" Date: Thu, 7 Jun 2018 14:03:02 +0800 Subject: [PATCH 13/15] [log] add name for timeout log. --- src/main/java/net/spy/memcached/MemcachedClient.java | 11 +++++++---- .../net/spy/memcached/internal/BulkGetFuture.java | 8 +++++--- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/src/main/java/net/spy/memcached/MemcachedClient.java b/src/main/java/net/spy/memcached/MemcachedClient.java index 6da9eb20c..eb1c2f18d 100644 --- a/src/main/java/net/spy/memcached/MemcachedClient.java +++ b/src/main/java/net/spy/memcached/MemcachedClient.java @@ -23,11 +23,12 @@ package net.spy.memcached; -import static net.spy.memcached.TimeoutListener.Method.*; -import static net.spy.memcached.TimeoutListener.Method.delete; +import static net.spy.memcached.TimeoutListener.Method.cas; import static net.spy.memcached.TimeoutListener.Method.from; import static net.spy.memcached.TimeoutListener.Method.get; -import static net.spy.memcached.TimeoutListener.Method.getAndTouch; +import static net.spy.memcached.TimeoutListener.Method.getBulk; +import static net.spy.memcached.TimeoutListener.Method.gets; +import static net.spy.memcached.TimeoutListener.Method.touch; import java.io.IOException; import java.net.InetSocketAddress; @@ -1386,7 +1387,9 @@ public BulkFuture> asyncGetBulk(Iterator keyIter, int initialLatchCount = chunks.isEmpty() ? 0 : 1; final CountDownLatch latch = new CountDownLatch(initialLatchCount); final Collection ops = new ArrayList(chunks.size()); - final BulkGetFuture rv = new BulkGetFuture(m, ops, latch, executor); + final String name = connFactory instanceof DefaultConnectionFactory ? ((DefaultConnectionFactory) connFactory) + .getName() : null; + final BulkGetFuture rv = new BulkGetFuture(m, ops, latch, executor, name); GetOperation.Callback cb = new GetOperation.Callback() { @Override diff --git a/src/main/java/net/spy/memcached/internal/BulkGetFuture.java b/src/main/java/net/spy/memcached/internal/BulkGetFuture.java index 236317942..f35b4b1c6 100644 --- a/src/main/java/net/spy/memcached/internal/BulkGetFuture.java +++ b/src/main/java/net/spy/memcached/internal/BulkGetFuture.java @@ -63,6 +63,7 @@ public class BulkGetFuture private final Map> rvMap; private final Collection ops; private final CountDownLatch latch; + private final String name; private OperationStatus status; private boolean cancelled = false; private boolean timeout = false; @@ -70,12 +71,13 @@ public class BulkGetFuture private List timeoutListeners; public BulkGetFuture(Map> m, Collection getOps, - CountDownLatch l, Executor service) { + CountDownLatch l, Executor service, String name) { super(service); rvMap = m; ops = getOps; latch = l; status = null; + this.name = name; } public boolean cancel(boolean ign) { @@ -122,7 +124,7 @@ public Map getSome(long to, TimeUnit unit) } } LoggerFactory.getLogger(getClass()).warn( - new CheckedOperationTimeoutException("Operation timed out: ", + new CheckedOperationTimeoutException("Operation timed out[" + name + "]: ", timedoutOps).getMessage()); } return ret; @@ -149,7 +151,7 @@ public Map get(long to, TimeUnit unit) LoggerFactory.getLogger(getClass()).error("fail to execute timeout listener:", e); } } - throw new CheckedOperationTimeoutException("Operation timed out.", + throw new CheckedOperationTimeoutException("Operation timed out[" + name + "].", timedoutOps); } return ret; From 9ea618e0db3fa02a13304d6f6106c400f581308d Mon Sep 17 00:00:00 2001 From: "w.vela" Date: Sun, 2 Sep 2018 12:32:53 +0800 Subject: [PATCH 14/15] [enhancement] add EnqueueTimeoutException. --- .../internal/EnqueueTimeoutException.java | 17 +++++++++++++++++ .../protocol/TCPMemcachedNodeImpl.java | 5 +++-- 2 files changed, 20 insertions(+), 2 deletions(-) create mode 100644 src/main/java/net/spy/memcached/internal/EnqueueTimeoutException.java diff --git a/src/main/java/net/spy/memcached/internal/EnqueueTimeoutException.java b/src/main/java/net/spy/memcached/internal/EnqueueTimeoutException.java new file mode 100644 index 000000000..820dbc172 --- /dev/null +++ b/src/main/java/net/spy/memcached/internal/EnqueueTimeoutException.java @@ -0,0 +1,17 @@ +package net.spy.memcached.internal; + +import net.spy.memcached.ops.Operation; + +public class EnqueueTimeoutException extends IllegalStateException { + + private final Operation op; + + public EnqueueTimeoutException(String s, Operation op) { + super(s); + this.op = op; + } + + public Operation getOp() { + return op; + } +} diff --git a/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java b/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java index 52deeb04b..eb4e1f365 100644 --- a/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java +++ b/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java @@ -40,6 +40,7 @@ import net.spy.memcached.MemcachedConnection; import net.spy.memcached.MemcachedNode; import net.spy.memcached.compat.SpyObject; +import net.spy.memcached.internal.EnqueueTimeoutException; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationState; import net.spy.memcached.protocol.binary.TapAckOperationImpl; @@ -359,8 +360,8 @@ public final void addOp(Operation op) { return; } if (!inputQueue.offer(op, opQueueMaxBlockTime, TimeUnit.MILLISECONDS)) { - throw new IllegalStateException("Timed out waiting to add " + op - + "(max wait=" + opQueueMaxBlockTime + "ms)"); + throw new EnqueueTimeoutException("Timed out waiting to add " + op + + "(max wait=" + opQueueMaxBlockTime + "ms)", op); } } catch (InterruptedException e) { // Restore the interrupted status From f0d46451dc1ec5fe8ae72e3d5a99c4807c8c3ef9 Mon Sep 17 00:00:00 2001 From: "w.vela" Date: Sun, 2 Sep 2018 20:36:27 +0800 Subject: [PATCH 15/15] [reliability] add OOM catching for connection handler. --- .../spy/memcached/MemcachedConnection.java | 46 ++++++++++--------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/src/main/java/net/spy/memcached/MemcachedConnection.java b/src/main/java/net/spy/memcached/MemcachedConnection.java index 221c2249a..4435ebd7c 100644 --- a/src/main/java/net/spy/memcached/MemcachedConnection.java +++ b/src/main/java/net/spy/memcached/MemcachedConnection.java @@ -23,27 +23,6 @@ package net.spy.memcached; -import net.spy.memcached.compat.SpyThread; -import net.spy.memcached.compat.log.Logger; -import net.spy.memcached.compat.log.LoggerFactory; -import net.spy.memcached.internal.OperationFuture; -import net.spy.memcached.metrics.MetricCollector; -import net.spy.memcached.metrics.MetricType; -import net.spy.memcached.ops.GetOperation; -import net.spy.memcached.ops.KeyedOperation; -import net.spy.memcached.ops.NoopOperation; -import net.spy.memcached.ops.Operation; -import net.spy.memcached.ops.OperationCallback; -import net.spy.memcached.ops.OperationException; -import net.spy.memcached.ops.OperationState; -import net.spy.memcached.ops.OperationStatus; -import net.spy.memcached.ops.TapOperation; -import net.spy.memcached.ops.VBucketAware; -import net.spy.memcached.protocol.binary.BinaryOperationFactory; -import net.spy.memcached.protocol.binary.MultiGetOperationImpl; -import net.spy.memcached.protocol.binary.TapAckOperationImpl; -import net.spy.memcached.util.StringUtils; - import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; @@ -74,6 +53,27 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import net.spy.memcached.compat.SpyThread; +import net.spy.memcached.compat.log.Logger; +import net.spy.memcached.compat.log.LoggerFactory; +import net.spy.memcached.internal.OperationFuture; +import net.spy.memcached.metrics.MetricCollector; +import net.spy.memcached.metrics.MetricType; +import net.spy.memcached.ops.GetOperation; +import net.spy.memcached.ops.KeyedOperation; +import net.spy.memcached.ops.NoopOperation; +import net.spy.memcached.ops.Operation; +import net.spy.memcached.ops.OperationCallback; +import net.spy.memcached.ops.OperationException; +import net.spy.memcached.ops.OperationState; +import net.spy.memcached.ops.OperationStatus; +import net.spy.memcached.ops.TapOperation; +import net.spy.memcached.ops.VBucketAware; +import net.spy.memcached.protocol.binary.BinaryOperationFactory; +import net.spy.memcached.protocol.binary.MultiGetOperationImpl; +import net.spy.memcached.protocol.binary.TapAckOperationImpl; +import net.spy.memcached.util.StringUtils; + /** * Main class for handling connections to a memcached cluster. */ @@ -1470,6 +1470,8 @@ public void run() { logRunException(e); } catch (ConcurrentModificationException e) { logRunException(e); + } catch (OutOfMemoryError e) { + logRunException(e); } } getLogger().info("Shut down memcached client"); @@ -1483,7 +1485,7 @@ public void run() { * * @param e the exception to log. */ - private void logRunException(final Exception e) { + private void logRunException(final Throwable e) { if (shutDown) { getLogger().debug("Exception occurred during shutdown", e); } else {