Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TimeoutListener for MemcachedClient. #32

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
32 changes: 31 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>net.spy</groupId>
<artifactId>spymemcached</artifactId>
<version>2.999.999-SNAPSHOT</version> <!-- not used -->
<version>2.12.3-kwai-0.0.11</version> <!-- not used -->

<name>Spymemcached</name>
<description>A client library for memcached.</description>
Expand Down Expand Up @@ -42,6 +42,19 @@
<organizationUrl>http://couchbase.com/</organizationUrl>
</developer>
</developers>
<!-- 发布位置配置 -->
<distributionManagement>
<repository>
<id>kuaishou.releases</id>
<name>Internal Release Repository</name>
<url>http://nexus.corp.kuaishou.com:88/nexus/content/repositories/releases/</url>
</repository>
<snapshotRepository>
<id>kuaishou.snapshots</id>
<name>Internal Snapshot Repository</name>
<url>http://nexus.corp.kuaishou.com:88/nexus/content/repositories/snapshots/</url>
</snapshotRepository>
</distributionManagement>

<dependencies>
<!-- Optional Dependencies at Runtime -->
Expand Down Expand Up @@ -85,5 +98,22 @@
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
17 changes: 17 additions & 0 deletions src/main/java/net/spy/memcached/AsyncOpListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package net.spy.memcached;

import net.spy.memcached.TimeoutListener.Method;
import net.spy.memcached.internal.BulkGetFuture;
import net.spy.memcached.internal.GetFuture;
import net.spy.memcached.internal.OperationFuture;

public interface AsyncOpListener<T> {

T beforeInvoke(Method method);

void onBulkGetCompletion(T before, BulkGetFuture<?> future);

void onGetCompletion(T before, GetFuture<?> future);

void onOperationCompletion(T before, Method method, OperationFuture<?> future);
}
3 changes: 2 additions & 1 deletion src/main/java/net/spy/memcached/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

import net.spy.memcached.auth.AuthDescriptor;
Expand Down Expand Up @@ -87,7 +88,7 @@ MemcachedNode createMemcachedNode(SocketAddress sa, SocketChannel c,
* Get the ExecutorService which is used to asynchronously execute listeners
* on futures.
*/
ExecutorService getListenerExecutorService();
Executor getListenerExecutorService();

/**
* Returns true if the default provided {@link ExecutorService} has not been
Expand Down
13 changes: 7 additions & 6 deletions src/main/java/net/spy/memcached/ConnectionFactoryBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

import net.spy.memcached.auth.AuthDescriptor;
Expand Down Expand Up @@ -75,7 +76,7 @@ public class ConnectionFactoryBuilder {

protected MetricType metricType = null;
protected MetricCollector collector = null;
protected ExecutorService executorService = null;
protected Executor executor = null;
protected long authWaitTime = DefaultConnectionFactory.DEFAULT_AUTH_WAIT_TIME;

/**
Expand Down Expand Up @@ -311,8 +312,8 @@ public ConnectionFactoryBuilder setMetricCollector(MetricCollector collector) {
*
* @param executorService the ExecutorService to use.
*/
public ConnectionFactoryBuilder setListenerExecutorService(ExecutorService executorService) {
this.executorService = executorService;
public ConnectionFactoryBuilder setListenerExecutorService(Executor executorService) {
this.executor = executorService;
return this;
}

Expand Down Expand Up @@ -447,13 +448,13 @@ public MetricCollector getMetricCollector() {
}

@Override
public ExecutorService getListenerExecutorService() {
return executorService == null ? super.getListenerExecutorService() : executorService;
public Executor getListenerExecutorService() {
return executor == null ? super.getListenerExecutorService() : executor;
}

@Override
public boolean isDefaultExecutorService() {
return executorService == null;
return executor == null;
}

@Override
Expand Down
17 changes: 8 additions & 9 deletions src/main/java/net/spy/memcached/DefaultConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -132,9 +131,9 @@ public class DefaultConnectionFactory extends SpyObject implements
private MetricCollector metrics;

/**
* The ExecutorService in which the listener callbacks will be executed.
* The Executor in which the listener callbacks will be executed.
*/
private ExecutorService executorService;
private Executor executor;

/**
* Construct a DefaultConnectionFactory with the given parameters.
Expand Down Expand Up @@ -289,16 +288,16 @@ public long getAuthWaitTime() {
* @return the stored {@link ExecutorService}.
*/
@Override
public ExecutorService getListenerExecutorService() {
if (executorService == null) {
public Executor getListenerExecutorService() {
if (executor == null) {
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "FutureNotifyListener");
return new Thread(r, "FutureNotifyListener[" + DefaultConnectionFactory.this.getName() + "]");
}
};

executorService = new ThreadPoolExecutor(
executor = new ThreadPoolExecutor(
0,
Runtime.getRuntime().availableProcessors(),
60L,
Expand All @@ -308,7 +307,7 @@ public Thread newThread(Runnable r) {
);
}

return executorService;
return executor;
}

@Override
Expand Down
Loading