diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java index 672448e87470..cb08dde00327 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java @@ -35,8 +35,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; /** @@ -46,17 +44,15 @@ final class MessageConsumerImpl implements MessageConsumer { private static final int MAX_QUEUED_CALLBACKS = 100; // shared scheduled executor, used to schedule pulls - private static final SharedResourceHolder.Resource TIMER = - new SharedResourceHolder.Resource() { + private static final SharedResourceHolder.Resource CONSUMER_EXECUTOR = + new SharedResourceHolder.Resource() { @Override - public ScheduledExecutorService create() { - ScheduledThreadPoolExecutor timer = new ScheduledThreadPoolExecutor(1); - timer.setRemoveOnCancelPolicy(true); - return timer; + public ExecutorService create() { + return Executors.newSingleThreadExecutor(); } @Override - public void close(ScheduledExecutorService instance) { + public void close(ExecutorService instance) { instance.shutdown(); } }; @@ -67,7 +63,7 @@ public void close(ScheduledExecutorService instance) { private final AckDeadlineRenewer deadlineRenewer; private final String subscription; private final MessageProcessor messageProcessor; - private final ScheduledExecutorService timer; + private final ExecutorService consumerExecutor; private final ExecutorFactory executorFactory; private final ExecutorService executor; private final AtomicInteger queuedCallbacks; @@ -192,7 +188,7 @@ private MessageConsumerImpl(Builder builder) { this.pubsub = pubsubOptions.service(); this.deadlineRenewer = builder.deadlineRenewer; this.queuedCallbacks = new AtomicInteger(); - this.timer = SharedResourceHolder.get(TIMER); + this.consumerExecutor = SharedResourceHolder.get(CONSUMER_EXECUTOR); this.executorFactory = builder.executorFactory != null ? builder.executorFactory : new DefaultExecutorFactory(); this.executor = executorFactory.get(); @@ -209,7 +205,7 @@ private void pullIfNeeded() { if (closed || scheduledFuture != null || !pullPolicy.shouldPull(queuedCallbacks.get())) { return; } - scheduledFuture = timer.submit(consumerRunnable); + scheduledFuture = consumerExecutor.submit(consumerRunnable); } } @@ -219,7 +215,7 @@ private void nextPull() { scheduledFuture = null; return; } - scheduledFuture = timer.submit(consumerRunnable); + scheduledFuture = consumerExecutor.submit(consumerRunnable); } } @@ -237,7 +233,7 @@ public void close() { pullerFuture.cancel(true); } } - SharedResourceHolder.release(TIMER, timer); + SharedResourceHolder.release(CONSUMER_EXECUTOR, consumerExecutor); executorFactory.release(executor); }