Skip to content

Commit

Permalink
fix throw RejectedExecutionException (#12674)
Browse files Browse the repository at this point in the history
* fix throw RejectedExecutionException

* fix throw RejectedExecutionException when ThreadlessExecutor has been shutdown
  • Loading branch information
icodening authored Jul 6, 2023
1 parent 96fb4e8 commit 1355b28
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

public class DeadlineFuture extends CompletableFuture<AppResponse> {
Expand All @@ -46,7 +46,7 @@ public class DeadlineFuture extends CompletableFuture<AppResponse> {
private final long start = System.currentTimeMillis();
private final List<Runnable> timeoutListeners = new ArrayList<>();
private final Timeout timeoutTask;
private Executor executor;
private ExecutorService executor;

private DeadlineFuture(String serviceName, String methodName, String address, int timeout) {
this.serviceName = serviceName;
Expand All @@ -69,7 +69,7 @@ public static void destroy() {
* @return a new DeadlineFuture
*/
public static DeadlineFuture newFuture(String serviceName, String methodName, String address,
int timeout, Executor executor) {
int timeout, ExecutorService executor) {
final DeadlineFuture future = new DeadlineFuture(serviceName, methodName, address, timeout);
future.setExecutor(executor);
return future;
Expand Down Expand Up @@ -99,11 +99,11 @@ public List<Runnable> getTimeoutListeners() {
return timeoutListeners;
}

public Executor getExecutor() {
public ExecutorService getExecutor() {
return executor;
}

public void setExecutor(Executor executor) {
public void setExecutor(ExecutorService executor) {
this.executor = executor;
}

Expand Down Expand Up @@ -149,8 +149,9 @@ public void run(Timeout timeout) {
return;
}

if (getExecutor() != null) {
getExecutor().execute(() -> {
ExecutorService executor = getExecutor();
if (executor != null && !executor.isShutdown()) {
executor.execute(() -> {
notifyTimeout();
for (Runnable timeoutListener : getTimeoutListeners()) {
timeoutListener.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.ReentrantLock;

Expand Down Expand Up @@ -135,7 +134,7 @@ protected Result doInvoke(final Invocation invocation) {
final MethodDescriptor methodDescriptor = serviceDescriptor.getMethod(
invocation.getMethodName(),
invocation.getParameterTypes());
Executor callbackExecutor = isSync(methodDescriptor, invocation) ? new ThreadlessExecutor() : streamExecutor;
ExecutorService callbackExecutor = isSync(methodDescriptor, invocation) ? new ThreadlessExecutor() : streamExecutor;
ClientCall call = new TripleClientCall(connectionClient, callbackExecutor,
getUrl().getOrDefaultFrameworkModel(), writeQueue);
AsyncRpcResult result;
Expand Down Expand Up @@ -219,7 +218,7 @@ StreamObserver<Object> streamCall(ClientCall call,
}

AsyncRpcResult invokeUnary(MethodDescriptor methodDescriptor, Invocation invocation,
ClientCall call, Executor callbackExecutor) {
ClientCall call, ExecutorService callbackExecutor) {

int timeout = RpcUtils.calculateTimeout(getUrl(), invocation, RpcUtils.getMethodName(invocation), 3000);
if (timeout <= 0) {
Expand Down

0 comments on commit 1355b28

Please sign in to comment.