Skip to content

Commit

Permalink
Rename TIMER to CONSUMER_EXECUTOR and make it an ExecutorService
Browse files Browse the repository at this point in the history
  • Loading branch information
mziccard committed Jun 22, 2016
1 parent 61bdd81 commit 66f273c
Showing 1 changed file with 10 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand All @@ -46,17 +44,15 @@ final class MessageConsumerImpl implements MessageConsumer {

private static final int MAX_QUEUED_CALLBACKS = 100;
// shared scheduled executor, used to schedule pulls
private static final SharedResourceHolder.Resource<ScheduledExecutorService> TIMER =
new SharedResourceHolder.Resource<ScheduledExecutorService>() {
private static final SharedResourceHolder.Resource<ExecutorService> CONSUMER_EXECUTOR =
new SharedResourceHolder.Resource<ExecutorService>() {
@Override
public ScheduledExecutorService create() {
ScheduledThreadPoolExecutor timer = new ScheduledThreadPoolExecutor(1);
timer.setRemoveOnCancelPolicy(true);
return timer;
public ExecutorService create() {
return Executors.newSingleThreadExecutor();
}

@Override
public void close(ScheduledExecutorService instance) {
public void close(ExecutorService instance) {
instance.shutdown();
}
};
Expand All @@ -67,7 +63,7 @@ public void close(ScheduledExecutorService instance) {
private final AckDeadlineRenewer deadlineRenewer;
private final String subscription;
private final MessageProcessor messageProcessor;
private final ScheduledExecutorService timer;
private final ExecutorService consumerExecutor;
private final ExecutorFactory<ExecutorService> executorFactory;
private final ExecutorService executor;
private final AtomicInteger queuedCallbacks;
Expand Down Expand Up @@ -192,7 +188,7 @@ private MessageConsumerImpl(Builder builder) {
this.pubsub = pubsubOptions.service();
this.deadlineRenewer = builder.deadlineRenewer;
this.queuedCallbacks = new AtomicInteger();
this.timer = SharedResourceHolder.get(TIMER);
this.consumerExecutor = SharedResourceHolder.get(CONSUMER_EXECUTOR);
this.executorFactory =
builder.executorFactory != null ? builder.executorFactory : new DefaultExecutorFactory();
this.executor = executorFactory.get();
Expand All @@ -209,7 +205,7 @@ private void pullIfNeeded() {
if (closed || scheduledFuture != null || !pullPolicy.shouldPull(queuedCallbacks.get())) {
return;
}
scheduledFuture = timer.submit(consumerRunnable);
scheduledFuture = consumerExecutor.submit(consumerRunnable);
}
}

Expand All @@ -219,7 +215,7 @@ private void nextPull() {
scheduledFuture = null;
return;
}
scheduledFuture = timer.submit(consumerRunnable);
scheduledFuture = consumerExecutor.submit(consumerRunnable);
}
}

Expand All @@ -237,7 +233,7 @@ public void close() {
pullerFuture.cancel(true);
}
}
SharedResourceHolder.release(TIMER, timer);
SharedResourceHolder.release(CONSUMER_EXECUTOR, consumerExecutor);
executorFactory.release(executor);
}

Expand Down

0 comments on commit 66f273c

Please sign in to comment.