From 7693456c16400ab7a5142dad107a95213fa1858a Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Tue, 7 Jun 2016 17:42:02 +0200 Subject: [PATCH 1/7] Add MessageConsumerImpl class, implement pullAsync, add tests --- .../com/google/cloud/GrpcServiceOptions.java | 30 +- .../google/cloud/GrpcServiceOptionsTest.java | 2 +- .../cloud/pubsub/AckDeadlineRenewer.java | 2 +- .../cloud/pubsub/MessageConsumerImpl.java | 281 ++++++++++++ .../java/com/google/cloud/pubsub/PubSub.java | 70 ++- .../com/google/cloud/pubsub/PubSubImpl.java | 9 +- .../google/cloud/pubsub/PubSubOptions.java | 3 +- .../cloud/pubsub/spi/DefaultPubSubRpc.java | 4 +- .../google/cloud/pubsub/BaseSystemTest.java | 149 +++++++ .../cloud/pubsub/MessageConsumerImplTest.java | 400 ++++++++++++++++++ .../google/cloud/pubsub/PubSubImplTest.java | 74 ++++ .../com/google/cloud/pubsub/PubSubTest.java | 34 +- 12 files changed, 1029 insertions(+), 29 deletions(-) create mode 100644 gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java create mode 100644 gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/MessageConsumerImplTest.java diff --git a/gcloud-java-core/src/main/java/com/google/cloud/GrpcServiceOptions.java b/gcloud-java-core/src/main/java/com/google/cloud/GrpcServiceOptions.java index 2a1110c9362e..67361dba6e64 100644 --- a/gcloud-java-core/src/main/java/com/google/cloud/GrpcServiceOptions.java +++ b/gcloud-java-core/src/main/java/com/google/cloud/GrpcServiceOptions.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.util.Objects; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -50,7 +51,7 @@ public abstract class GrpcServiceOptions, Ser private final double timeoutMultiplier; private final int maxTimeout; - private transient ExecutorFactory executorFactory; + private transient ExecutorFactory executorFactory; /** * Shared thread pool executor. @@ -73,30 +74,32 @@ public void close(ScheduledExecutorService instance) { }; /** - * An interface for {@link ScheduledExecutorService} factories. Implementations of this interface - * can be used to provide an user-defined scheduled executor to execute requests. Any - * implementation of this interface must override the {@code get()} method to return the desired - * executor. The {@code release(executor)} method should be overriden to free resources used by - * the executor (if needed) according to application's logic. + * An interface for {@link ExecutorService} factories. Implementations of this interface can be + * used to provide an user-defined executor to execute requests. Any implementation of this + * interface must override the {@code get()} method to return the desired executor. The + * {@code release(executor)} method should be overriden to free resources used by the executor (if + * needed) according to application's logic. * *

Implementation must provide a public no-arg constructor. Loading of a factory implementation * is done via {@link java.util.ServiceLoader}. + * + * @param the {@link ExecutorService} subclass created by this factory */ - public interface ExecutorFactory { + public interface ExecutorFactory { /** - * Gets a scheduled executor service instance. + * Gets an executor service instance. */ - ScheduledExecutorService get(); + T get(); /** * Releases resources used by the executor and possibly shuts it down. */ - void release(ScheduledExecutorService executor); + void release(T executor); } @VisibleForTesting - static class DefaultExecutorFactory implements ExecutorFactory { + static class DefaultExecutorFactory implements ExecutorFactory { private static final DefaultExecutorFactory INSTANCE = new DefaultExecutorFactory(); @@ -148,7 +151,7 @@ protected Builder(GrpcServiceOptions options) { * * @return the builder */ - public B executorFactory(ExecutorFactory executorFactory) { + public B executorFactory(ExecutorFactory executorFactory) { this.executorFactory = executorFactory; return self(); } @@ -192,6 +195,7 @@ public B maxTimeout(int maxTimeout) { } } + @SuppressWarnings("unchecked") protected GrpcServiceOptions( Class> serviceFactoryClass, Class> rpcFactoryClass, Builder executorFactory() { return executorFactory; } diff --git a/gcloud-java-core/src/test/java/com/google/cloud/GrpcServiceOptionsTest.java b/gcloud-java-core/src/test/java/com/google/cloud/GrpcServiceOptionsTest.java index 0a3c34f87916..457f05cd0ba9 100644 --- a/gcloud-java-core/src/test/java/com/google/cloud/GrpcServiceOptionsTest.java +++ b/gcloud-java-core/src/test/java/com/google/cloud/GrpcServiceOptionsTest.java @@ -211,7 +211,7 @@ public void testBaseHashCode() { @Test public void testDefaultExecutorFactory() { - ExecutorFactory executorFactory = new DefaultExecutorFactory(); + ExecutorFactory executorFactory = new DefaultExecutorFactory(); ScheduledExecutorService executorService = executorFactory.get(); assertSame(executorService, executorFactory.get()); } diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java index b51fef7890e2..d72f788cff65 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java @@ -48,7 +48,7 @@ class AckDeadlineRenewer implements AutoCloseable { private final PubSub pubsub; private final ScheduledExecutorService executor; - private final ExecutorFactory executorFactory; + private final ExecutorFactory executorFactory; private final Clock clock; private final Queue messageQueue; private final Map messageDeadlines; diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java new file mode 100644 index 000000000000..ac120147d98e --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java @@ -0,0 +1,281 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import static com.google.cloud.pubsub.spi.v1.SubscriberApi.formatSubscriptionName; +import static com.google.common.base.MoreObjects.firstNonNull; + +import com.google.cloud.GrpcServiceOptions.ExecutorFactory; +import com.google.cloud.pubsub.PubSub.MessageConsumer; +import com.google.cloud.pubsub.PubSub.MessageProcessor; +import com.google.cloud.pubsub.spi.PubSubRpc; +import com.google.cloud.pubsub.spi.PubSubRpc.PullCallback; +import com.google.cloud.pubsub.spi.PubSubRpc.PullFuture; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; + +import io.grpc.internal.SharedResourceHolder; + +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Default implementation for a message consumer. + */ +final class MessageConsumerImpl implements MessageConsumer { + + private static final int MAX_QUEUED_CALLBACKS = 100; + // shared scheduled executor, used to schedule pulls + private static final SharedResourceHolder.Resource TIMER = + new SharedResourceHolder.Resource() { + @Override + public ScheduledExecutorService create() { + ScheduledThreadPoolExecutor timer = new ScheduledThreadPoolExecutor(1); + timer.setRemoveOnCancelPolicy(true); + return timer; + } + + @Override + public void close(ScheduledExecutorService instance) { + instance.shutdown(); + } + }; + + private final PubSubOptions pubsubOptions; + private final PubSubRpc pubsubRpc; + private final PubSub pubsub; + private final AckDeadlineRenewer deadlineRenewer; + private final String subscription; + private final MessageProcessor messageProcessor; + private final ScheduledExecutorService timer; + private final ExecutorFactory executorFactory; + private final ExecutorService executor; + private final AtomicInteger queuedCallbacks; + private final int maxQueuedCallbacks; + private final Object futureLock = new Object(); + private final Runnable scheduleRunnable; + private boolean closed; + private Future scheduledFuture; + private PullFuture pullerFuture; + private boolean stopped = true; + + /** + * Default executor factory for the message processor executor. By default a single-threaded + * executor is used. + */ + static class DefaultExecutorFactory implements ExecutorFactory { + + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + + @Override + public ExecutorService get() { + return executor; + } + + @Override + public void release(ExecutorService executor) { + executor.shutdownNow(); + } + } + + private MessageConsumerImpl(Builder builder) { + this.pubsubOptions = builder.pubsubOptions; + this.subscription = builder.subscription; + this.messageProcessor = builder.messageProcessor; + this.pubsubRpc = pubsubOptions.rpc(); + this.pubsub = pubsubOptions.service(); + this.deadlineRenewer = builder.deadlineRenewer; + this.queuedCallbacks = new AtomicInteger(); + this.timer = SharedResourceHolder.get(TIMER); + this.executorFactory = firstNonNull(builder.executorFactory, new DefaultExecutorFactory()); + this.executor = executorFactory.get(); + this.maxQueuedCallbacks = firstNonNull(builder.maxQueuedCallbacks, MAX_QUEUED_CALLBACKS); + this.scheduleRunnable = new Runnable() { + @Override + public void run() { + synchronized (futureLock) { + if (closed) { + return; + } + pull(); + } + } + }; + nextPull(); + } + + private Runnable ackingRunnable(final ReceivedMessage receivedMessage) { + return new Runnable() { + @Override + public void run() { + try { + messageProcessor.process(receivedMessage); + pubsub.ackAsync(receivedMessage.subscription(), receivedMessage.ackId()); + } catch (Exception ex) { + pubsub.nackAsync(receivedMessage.subscription(), receivedMessage.ackId()); + } finally { + deadlineRenewer.remove(receivedMessage.subscription(), receivedMessage.ackId()); + queuedCallbacks.decrementAndGet(); + // We can now pull more messages. We do not pull immediately to possibly wait for other + // callbacks to end + scheduleNextPull(500, TimeUnit.MILLISECONDS); + } + } + }; + } + + private PullRequest createPullRequest() { + return PullRequest.newBuilder() + .setSubscription(formatSubscriptionName(pubsubOptions.projectId(), subscription)) + .setMaxMessages(maxQueuedCallbacks - queuedCallbacks.get()) + .setReturnImmediately(false) + .build(); + } + + private void scheduleNextPull(long delay, TimeUnit timeUnit) { + synchronized (futureLock) { + if (!closed && stopped) { + scheduledFuture = timer.schedule(scheduleRunnable, delay, timeUnit); + } + } + } + + private void nextPull() { + synchronized (futureLock) { + if (closed) { + return; + } + if (queuedCallbacks.get() == maxQueuedCallbacks) { + stopped = true; + } else { + stopped = false; + scheduledFuture = timer.submit(scheduleRunnable); + } + } + } + + private void pull() { + pullerFuture = pubsubRpc.pull(createPullRequest()); + pullerFuture.addCallback(new PullCallback() { + @Override + public void success(PullResponse response) { + List messages = response.getReceivedMessagesList(); + queuedCallbacks.addAndGet(messages.size()); + for (com.google.pubsub.v1.ReceivedMessage message : messages) { + deadlineRenewer.add(subscription, message.getAckId()); + final ReceivedMessage receivedMessage = + ReceivedMessage.fromPb(pubsub, subscription, message); + executor.execute(ackingRunnable(receivedMessage)); + } + nextPull(); + } + + @Override + public void failure(Throwable error) { + if (!(error instanceof CancellationException)) { + nextPull(); + } + } + }); + } + + @Override + public void close() { + synchronized (futureLock) { + if (closed) { + return; + } + closed = true; + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + } + if (pullerFuture != null) { + pullerFuture.cancel(true); + } + } + SharedResourceHolder.release(TIMER, timer); + executorFactory.release(executor); + } + + static final class Builder { + private final PubSubOptions pubsubOptions; + private final String subscription; + private final AckDeadlineRenewer deadlineRenewer; + private final MessageProcessor messageProcessor; + private Integer maxQueuedCallbacks; + private ExecutorFactory executorFactory; + + Builder(PubSubOptions pubsubOptions, String subscription, AckDeadlineRenewer deadlineRenewer, + MessageProcessor messageProcessor) { + this.pubsubOptions = pubsubOptions; + this.subscription = subscription; + this.deadlineRenewer = deadlineRenewer; + this.messageProcessor = messageProcessor; + } + + /** + * Sets the maximum number of callbacks either being executed or waiting for execution. + */ + Builder maxQueuedCallbacks(Integer maxQueuedCallbacks) { + this.maxQueuedCallbacks = maxQueuedCallbacks; + return this; + } + + /** + * Sets the executor factory, used to manage the executor that will run message processor + * callbacks message consumer. + */ + Builder executorFactory(ExecutorFactory executorFactory) { + this.executorFactory = executorFactory; + return this; + } + + /** + * Creates a {@code MessageConsumerImpl} object. + */ + MessageConsumerImpl build() { + return new MessageConsumerImpl(this); + } + } + + /** + * Returns a builder for {@code MessageConsumerImpl} objects given the service options, the + * subscription from which messages must be pulled, the acknowledge deadline renewer and a message + * processor used to process messages. + */ + static Builder builder(PubSubOptions pubsubOptions, String subscription, + AckDeadlineRenewer deadlineRenewer, MessageProcessor messageProcessor) { + return new Builder(pubsubOptions, subscription, deadlineRenewer, messageProcessor); + } + + /** + * Returns a {@code MessageConsumerImpl} objects given the service options, the subscription from + * which messages must be pulled, the acknowledge deadline renewer and a message processor used to + * process messages. + */ + static Builder of(PubSubOptions pubsubOptions, String subscription, + AckDeadlineRenewer deadlineRenewer, MessageProcessor messageProcessor) { + return new Builder(pubsubOptions, subscription, deadlineRenewer, messageProcessor); + } +} diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java index f86817c26345..6a6df7ac6231 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java @@ -16,13 +16,17 @@ package com.google.cloud.pubsub; +import static com.google.common.base.Preconditions.checkArgument; + import com.google.cloud.AsyncPage; +import com.google.cloud.GrpcServiceOptions.ExecutorFactory; import com.google.cloud.Page; import com.google.cloud.Service; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -84,7 +88,8 @@ final class PullOption extends Option { private static final long serialVersionUID = 4792164134340316582L; enum OptionType implements Option.OptionType { - MAX_CONCURRENT_CALLBACKS; + EXECUTOR, + MAX_QUEUED_CALLBACKS; @SuppressWarnings("unchecked") T get(Map options) { @@ -94,6 +99,10 @@ T get(Map options) { Integer getInteger(Map options) { return get(options); } + + ExecutorFactory getExecutorFactory(Map options) { + return get(options); + } } private PullOption(Option.OptionType option, Object value) { @@ -101,11 +110,43 @@ private PullOption(Option.OptionType option, Object value) { } /** - * Returns an option to specify the maximum number of messages that can be processed - * concurrently at any time. + * Returns an option to specify the maximum number of messages that can be queued in the message + * consumer at any time. Queued messages are already pulled messages that are either waiting to + * be processed or being processed. Queued messages will have their acknowledge deadline renewed + * until they are acknowledged or "nacked". If not provided, at most 100 messages can be in the + * queue. */ - public static PullOption maxConcurrentCallbacks(int maxConcurrency) { - return new PullOption(OptionType.MAX_CONCURRENT_CALLBACKS, maxConcurrency); + public static PullOption maxQueuedCallbacks(int maxQueuedCallbacks) { + return new PullOption(OptionType.MAX_QUEUED_CALLBACKS, maxQueuedCallbacks); + } + + /** + * Returns an option to specify the executor used to execute message processor callbacks. The + * executor determines the number of messages that can be processed at the same time. The + * {@code shouldAutoClose} parameter sets whether the executor should be shutdown when the + * message consumer is closed. If not provided, a single-threaded executor is used. + * + * @param executor the executor used to run message processor callbacks + * @param shouldAutoClose if {@code true}, the executor is shutdown when the message consumer is + * closed. If {@code false}, the user must take care of shutting the executor down. + */ + public static PullOption executor(final ExecutorService executor, + final boolean shouldAutoClose) { + return new PullOption(OptionType.EXECUTOR, new ExecutorFactory() { + + @Override + public ExecutorService get() { + return executor; + } + + @Override + public void release(ExecutorService toRelease) { + checkArgument(executor == toRelease, "Releasing the wrong executor"); + if (shouldAutoClose) { + executor.shutdown(); + } + } + }); } } @@ -433,6 +474,25 @@ interface MessageConsumer extends AutoCloseable { */ Future> pullAsync(String subscription, int maxMessages); + /** + * Creates a message consumer that pulls messages for the provided subscription. You can stop + * pulling messages by calling {@link MessageConsumer#close()}. The returned message consumer + * executes {@link MessageProcessor#process(Message)} on each pulled message. If + * {@link MessageProcessor#process(Message)} executes correctly, the message is acknowledged. If + * {@link MessageProcessor#process(Message)} throws an exception, the message is "nacked". For + * all pulled messages, the ack deadline is automatically renewed until the message is either + * acknowledged or "nacked". + * + *

The {@link PullOption#maxQueuedCallbacks(int)} option can be used to control the maximum + * number of queued messages (messages either being processed or waiting to be processed). The + * {@link PullOption#executor(ExecutorService, boolean)} can be used to provide an executor to run + * message processor callbacks. + * + * @param subscription the subscription from which to pull messages + * @param callback the callback to be executed on each message + * @param options pulling options + * @return a message consumer for the provided subscription and options + */ MessageConsumer pullAsync(String subscription, MessageProcessor callback, PullOption... options); /** diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java index 166aad008c99..f16e46f729e1 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java @@ -18,6 +18,8 @@ import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_SIZE; import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_TOKEN; +import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.EXECUTOR; +import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.MAX_QUEUED_CALLBACKS; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.util.concurrent.Futures.lazyTransform; @@ -510,8 +512,11 @@ public ReceivedMessage apply(com.google.pubsub.v1.ReceivedMessage receivedMessag @Override public MessageConsumer pullAsync(String subscription, MessageProcessor callback, PullOption... options) { - // this method should use the VTKIT thread-pool (maybe getting it should be part of the spi) - return null; + Map optionMap = optionMap(options); + return MessageConsumerImpl.builder(options(), subscription, ackDeadlineRenewer, callback) + .maxQueuedCallbacks(MAX_QUEUED_CALLBACKS.getInteger(optionMap)) + .executorFactory(EXECUTOR.getExecutorFactory(optionMap)) + .build(); } @Override diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java index 87a25bf0dc36..07691e09e1a4 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; public class PubSubOptions extends GrpcServiceOptions { @@ -86,7 +87,7 @@ protected PubSubOptions(Builder builder) { } @Override - protected ExecutorFactory executorFactory() { + protected ExecutorFactory executorFactory() { return super.executorFactory(); } diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java index b0f9b76e3920..d174e5172a23 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java @@ -73,7 +73,7 @@ public class DefaultPubSubRpc implements PubSubRpc { private final PublisherApi publisherApi; private final SubscriberApi subscriberApi; private final ScheduledExecutorService executor; - private final ExecutorFactory executorFactory; + private final ExecutorFactory executorFactory; private boolean closed; @@ -86,7 +86,7 @@ private InternalPubSubOptions(PubSubOptions options) { } @Override - protected ExecutorFactory executorFactory() { + protected ExecutorFactory executorFactory() { return super.executorFactory(); } } diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java index 2f5a71195df0..8b277b5aa900 100644 --- a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java @@ -24,6 +24,8 @@ import com.google.cloud.AsyncPage; import com.google.cloud.Page; +import com.google.cloud.pubsub.PubSub.MessageConsumer; +import com.google.cloud.pubsub.PubSub.MessageProcessor; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -33,9 +35,12 @@ import org.junit.Test; import org.junit.rules.ExpectedException; +import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -531,6 +536,150 @@ public void testPullAsyncNonExistingSubscription() pubsub().pullAsync(formatForTest("non-existing-subscription"), 2).get(); } + @Test + public void testMessageConsumer() throws Exception { + String topic = formatForTest("test-message-consumer-topic"); + pubsub().create(TopicInfo.of(topic)); + String subscription = formatForTest("test-message-consumer-subscription"); + pubsub().create(SubscriptionInfo.of(topic, subscription)); + Message message1 = Message.of("payload1"); + Message message2 = Message.of("payload2"); + Set payloads = Sets.newHashSet("payload1", "payload2"); + List messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2)); + assertEquals(2, messageIds.size()); + final List receivedMessages = Collections.synchronizedList(new ArrayList()); + final CountDownLatch countDownLatch = new CountDownLatch(2); + MessageProcessor processor = new MessageProcessor() { + @Override + public void process(Message message) throws Exception { + receivedMessages.add(message); + countDownLatch.countDown(); + } + }; + try(MessageConsumer consumer = pubsub().pullAsync(subscription, processor)) { + countDownLatch.await(); + } + for (Message message : receivedMessages) { + payloads.contains(message.payloadAsString()); + } + // Messages have all been acked, they should not be pulled again + Iterator messages = pubsub().pull(subscription, 2); + assertFalse(messages.hasNext()); + assertTrue(pubsub().deleteSubscription(subscription)); + assertTrue(pubsub().deleteTopic(topic)); + } + + @Test + public void testMessageConsumerNack() throws Exception { + String topic = formatForTest("test-message-consumer-nack-topic"); + pubsub().create(TopicInfo.of(topic)); + String subscription = formatForTest("test-message-consumer-nack-subscription"); + pubsub().create(SubscriptionInfo.of(topic, subscription)); + Message message1 = Message.of("payload1"); + Message message2 = Message.of("payload2"); + Set payloads = Sets.newHashSet("payload1", "payload2"); + List messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2)); + assertEquals(2, messageIds.size()); + final List receivedMessages = Collections.synchronizedList(new ArrayList()); + final CountDownLatch countDownLatch = new CountDownLatch(2); + MessageProcessor processor = new MessageProcessor() { + @Override + public void process(Message message) throws Exception { + receivedMessages.add(message); + countDownLatch.countDown(); + throw new RuntimeException("Force nack"); + } + }; + try (MessageConsumer consumer = pubsub().pullAsync(subscription, processor)) { + countDownLatch.await(); + } + for (Message message : receivedMessages) { + payloads.contains(message.payloadAsString()); + } + // Messages have all been nacked, we should be able to pull them again + Thread.sleep(5000); + Iterator messages = pubsub().pull(subscription, 2); + while (messages.hasNext()) { + payloads.contains(messages.next().payloadAsString()); + } + assertTrue(pubsub().deleteSubscription(subscription)); + assertTrue(pubsub().deleteTopic(topic)); + } + + @Test + public void testMessageConsumerWithMoreMessages() throws Exception { + String topic = formatForTest("test-message-consumer-more-messages-topic"); + pubsub().create(TopicInfo.of(topic)); + String subscription = formatForTest("test-message-consumer-more-messages-subscriptions"); + pubsub().create(SubscriptionInfo.of(topic, subscription)); + int totalMessages = 200; + Set payloads = Sets.newHashSetWithExpectedSize(totalMessages); + List messagesToSend = Lists.newArrayListWithCapacity(totalMessages); + for (int i = 0; i < totalMessages; i++) { + String payload = "payload" + i; + messagesToSend.add(Message.of(payload)); + payloads.add(payload); + + } + List messageIds = pubsub().publish(topic, messagesToSend); + assertEquals(totalMessages, messageIds.size()); + final List receivedMessages = Collections.synchronizedList(new ArrayList()); + final CountDownLatch countDownLatch = new CountDownLatch(totalMessages); + MessageProcessor processor = new MessageProcessor() { + @Override + public void process(Message message) throws Exception { + receivedMessages.add(message); + countDownLatch.countDown(); + } + }; + try(MessageConsumer consumer = pubsub().pullAsync(subscription, processor)) { + countDownLatch.await(); + } + // Messages have all been acked, they should not be pulled again + Iterator messages = pubsub().pull(subscription, totalMessages); + assertFalse(messages.hasNext()); + assertTrue(pubsub().deleteSubscription(subscription)); + assertTrue(pubsub().deleteTopic(topic)); + } + + @Test + public void testMessageConsumerAndAutoRenewDeadline() throws Exception { + String topic = formatForTest("test-message-consumer-and-renew-deadline-topic"); + pubsub().create(TopicInfo.of(topic)); + final String subscription = + formatForTest("test-message-consumer-and-renew-deadline-subscription"); + pubsub().create(SubscriptionInfo.builder(topic, subscription).ackDeadLineSeconds(10).build()); + Message message1 = Message.of("payload1"); + Message message2 = Message.of("payload2"); + Set payloads = Sets.newHashSet("payload1", "payload2"); + List messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2)); + assertEquals(2, messageIds.size()); + final List receivedMessages = Collections.synchronizedList(new ArrayList()); + final CountDownLatch countDownLatch = new CountDownLatch(2); + MessageProcessor processor = new MessageProcessor() { + @Override + public void process(Message message) throws Exception { + receivedMessages.add(message); + Thread.sleep(15000); + // message deadline is being renewed, it should not be pulled again + Iterator messages = pubsub().pull(subscription, 2); + assertFalse(messages.hasNext()); + countDownLatch.countDown(); + } + }; + try(MessageConsumer consumer = pubsub().pullAsync(subscription, processor)) { + countDownLatch.await(); + } + for (Message message : receivedMessages) { + payloads.contains(message.payloadAsString()); + } + // Messages have all been acked, they should not be pulled again + Iterator messages = pubsub().pull(subscription, 2); + assertFalse(messages.hasNext()); + assertTrue(pubsub().deleteSubscription(subscription)); + assertTrue(pubsub().deleteTopic(topic)); + } + @Test public void testAckAndNackOneMessage() { String topic = formatForTest("test-ack-one-message-topic"); diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/MessageConsumerImplTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/MessageConsumerImplTest.java new file mode 100644 index 000000000000..91d9ca8e78b6 --- /dev/null +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/MessageConsumerImplTest.java @@ -0,0 +1,400 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import com.google.cloud.GrpcServiceOptions.ExecutorFactory; +import com.google.cloud.pubsub.PubSub.MessageConsumer; +import com.google.cloud.pubsub.PubSub.MessageProcessor; +import com.google.cloud.pubsub.spi.PubSubRpc; +import com.google.cloud.pubsub.spi.PubSubRpc.PullCallback; +import com.google.cloud.pubsub.spi.PubSubRpc.PullFuture; +import com.google.common.util.concurrent.ForwardingListenableFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; + +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; + +public class MessageConsumerImplTest { + + private static final String PROJECT = "project"; + private static final String SUBSCRIPTION = "subscription"; + private static final String SUBSCRIPTION_PB = "projects/project/subscriptions/subscription"; + private static final int MAX_QUEUED_CALLBACKS = 42; + private static final Message MESSAGE1 = Message.of("payload1"); + private static final Message MESSAGE2 = Message.of("payload2"); + private static final String ACK_ID1 = "ack-id1"; + private static final String ACK_ID2 = "ack-id2"; + private static final com.google.pubsub.v1.ReceivedMessage MESSAGE1_PB = + com.google.pubsub.v1.ReceivedMessage.newBuilder() + .setAckId(ACK_ID1) + .setMessage(MESSAGE1.toPb()) + .build(); + private static final com.google.pubsub.v1.ReceivedMessage MESSAGE2_PB = + com.google.pubsub.v1.ReceivedMessage.newBuilder() + .setAckId(ACK_ID2) + .setMessage(MESSAGE2.toPb()) + .build(); + private static final PullResponse PULL_RESPONSE = PullResponse.newBuilder() + .addReceivedMessages(MESSAGE1_PB) + .addReceivedMessages(MESSAGE2_PB) + .build(); + private static final MessageProcessor DO_NOTHING_PROCESSOR = new MessageProcessor() { + @Override + public void process(Message message) throws Exception { + // do nothing + } + }; + private static final MessageProcessor THROW_PROCESSOR = new MessageProcessor() { + @Override + public void process(Message message) throws Exception { + throw new RuntimeException(); + } + }; + private static final PullResponse EMPTY_RESPONSE = PullResponse.getDefaultInstance(); + + private PubSubRpc pubsubRpc; + private PubSub pubsub; + private PubSubOptions options; + private AckDeadlineRenewer renewer; + + @Rule + public Timeout globalTimeout = Timeout.seconds(60); + + static final class TestPullFuture + extends ForwardingListenableFuture.SimpleForwardingListenableFuture + implements PullFuture { + + TestPullFuture(PullResponse response) { + super(Futures.immediateFuture(response)); + } + + @Override + public void addCallback(final PullCallback callback) { + Futures.addCallback(delegate(), new FutureCallback() { + @Override + public void onSuccess(PullResponse result) { + callback.success(result); + } + + @Override + public void onFailure(Throwable error) { + callback.failure(error); + } + }); + } + } + + @Before + public void setUp() { + pubsubRpc = EasyMock.createStrictMock(PubSubRpc.class); + pubsub = EasyMock.createMock(PubSub.class); + options = EasyMock.createStrictMock(PubSubOptions.class); + renewer = EasyMock.createMock(AckDeadlineRenewer.class); + } + + @After + public void tearDown() { + EasyMock.verify(pubsubRpc); + EasyMock.verify(pubsub); + EasyMock.verify(options); + EasyMock.verify(renewer); + + } + + private static PullRequest pullRequest(int maxQueuedCallbacks) { + return PullRequest.newBuilder() + .setMaxMessages(maxQueuedCallbacks) + .setSubscription(SUBSCRIPTION_PB) + .setReturnImmediately(false) + .build(); + } + + private static IAnswer createAnswer(final CountDownLatch latch) { + return new IAnswer() { + @Override + public Void answer() throws Throwable { + latch.countDown(); + return null; + } + }; + } + + @Test + public void testMessageConsumerAck() throws Exception { + PullRequest request = pullRequest(MAX_QUEUED_CALLBACKS); + EasyMock.expect(options.rpc()).andReturn(pubsubRpc); + EasyMock.expect(options.service()).andReturn(pubsub); + EasyMock.expect(options.projectId()).andReturn(PROJECT).anyTimes(); + EasyMock.expect(pubsub.options()).andReturn(options).times(2); + final CountDownLatch latch = new CountDownLatch(2); + EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null); + EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID2)).andReturn(null); + EasyMock.replay(pubsub); + EasyMock.expect(pubsubRpc.pull(request)).andReturn(new TestPullFuture(PULL_RESPONSE)); + EasyMock.expect(pubsubRpc.pull(EasyMock.anyObject())) + .andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes(); + renewer.add(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall(); + renewer.add(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + renewer.remove(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + EasyMock.replay(pubsubRpc, options, renewer); + try (MessageConsumer consumer = + MessageConsumerImpl.builder(options, SUBSCRIPTION, renewer, DO_NOTHING_PROCESSOR) + .maxQueuedCallbacks(MAX_QUEUED_CALLBACKS) + .build()) { + latch.await(); + } + } + + @Test + public void testMessageConsumerNack() throws Exception { + PullRequest request = pullRequest(MAX_QUEUED_CALLBACKS); + EasyMock.expect(options.rpc()).andReturn(pubsubRpc); + EasyMock.expect(options.service()).andReturn(pubsub); + EasyMock.expect(options.projectId()).andReturn(PROJECT).anyTimes(); + EasyMock.expect(pubsub.options()).andReturn(options).times(2); + final CountDownLatch latch = new CountDownLatch(2); + EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null); + EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID2)).andReturn(null); + EasyMock.replay(pubsub); + EasyMock.expect(pubsubRpc.pull(request)).andReturn(new TestPullFuture(PULL_RESPONSE)); + EasyMock.expect(pubsubRpc.pull(EasyMock.anyObject())) + .andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes(); + renewer.add(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall(); + renewer.add(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + renewer.remove(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + EasyMock.replay(pubsubRpc, options, renewer); + try (MessageConsumer consumer = + MessageConsumerImpl.builder(options, SUBSCRIPTION, renewer, THROW_PROCESSOR) + .maxQueuedCallbacks(MAX_QUEUED_CALLBACKS) + .build()) { + latch.await(); + } + } + + @Test + public void testMessageConsumerMultipleCallsAck() throws Exception { + PullRequest request1 = pullRequest(MAX_QUEUED_CALLBACKS); + PullRequest request2 = pullRequest(MAX_QUEUED_CALLBACKS - 1); + PullResponse response1 = PullResponse.newBuilder() + .addReceivedMessages(MESSAGE1_PB) + .build(); + PullResponse response2 = PullResponse.newBuilder() + .addReceivedMessages(MESSAGE2_PB) + .build(); + EasyMock.expect(options.rpc()).andReturn(pubsubRpc); + EasyMock.expect(options.service()).andReturn(pubsub); + EasyMock.expect(options.projectId()).andReturn(PROJECT).anyTimes(); + final CountDownLatch latch = new CountDownLatch(2); + EasyMock.expect(pubsub.options()).andReturn(options); + EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null); + EasyMock.expect(pubsub.options()).andReturn(options); + EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID2)).andReturn(null); + EasyMock.replay(pubsub); + EasyMock.expect(pubsubRpc.pull(request1)).andReturn(new TestPullFuture(response1)); + EasyMock.expect(pubsubRpc.pull(request2)).andReturn(new TestPullFuture(response2)); + EasyMock.expect(pubsubRpc.pull(EasyMock.anyObject())) + .andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes(); + renewer.add(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + renewer.add(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + EasyMock.replay(pubsubRpc, options, renewer); + try (MessageConsumer consumer = + MessageConsumerImpl.builder(options, SUBSCRIPTION, renewer, DO_NOTHING_PROCESSOR) + .maxQueuedCallbacks(MAX_QUEUED_CALLBACKS) + .build()) { + latch.await(); + } + } + + @Test + public void testMessageConsumerMultipleCallsNack() throws Exception { + PullRequest request1 = pullRequest(MAX_QUEUED_CALLBACKS); + PullRequest request2 = pullRequest(MAX_QUEUED_CALLBACKS - 1); + PullResponse response1 = PullResponse.newBuilder() + .addReceivedMessages(MESSAGE1_PB) + .build(); + PullResponse response2 = PullResponse.newBuilder() + .addReceivedMessages(MESSAGE2_PB) + .build(); + EasyMock.expect(options.rpc()).andReturn(pubsubRpc); + EasyMock.expect(options.service()).andReturn(pubsub); + EasyMock.expect(options.projectId()).andReturn(PROJECT).anyTimes(); + final CountDownLatch latch = new CountDownLatch(2); + EasyMock.expect(pubsub.options()).andReturn(options); + EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null); + EasyMock.expect(pubsub.options()).andReturn(options); + EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID2)).andReturn(null); + EasyMock.replay(pubsub); + EasyMock.expect(pubsubRpc.pull(request1)).andReturn(new TestPullFuture(response1)); + EasyMock.expect(pubsubRpc.pull(request2)).andReturn(new TestPullFuture(response2)); + EasyMock.expect(pubsubRpc.pull(EasyMock.anyObject())) + .andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes(); + renewer.add(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + renewer.add(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + EasyMock.replay(pubsubRpc, options, renewer); + try (MessageConsumer consumer = + MessageConsumerImpl.builder(options, SUBSCRIPTION, renewer, THROW_PROCESSOR) + .maxQueuedCallbacks(MAX_QUEUED_CALLBACKS) + .build()) { + latch.await(); + } + } + + @Test + public void testMessageConsumerMaxCallbacksAck() throws Exception { + PullRequest request1 = pullRequest(2); + PullRequest request2 = pullRequest(2); + PullResponse otherPullResponse = PullResponse.newBuilder() + .addReceivedMessages(MESSAGE1_PB) + .build(); + EasyMock.expect(options.rpc()).andReturn(pubsubRpc); + EasyMock.expect(options.service()).andReturn(pubsub); + EasyMock.expect(options.projectId()).andReturn(PROJECT).anyTimes(); + EasyMock.expect(pubsub.options()).andReturn(options).times(2); + final CountDownLatch latch = new CountDownLatch(3); + EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null); + EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID2)).andReturn(null); + EasyMock.expect(pubsub.options()).andReturn(options); + EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null); + EasyMock.replay(pubsub); + EasyMock.expect(pubsubRpc.pull(request1)).andReturn(new TestPullFuture(PULL_RESPONSE)); + EasyMock.expect(pubsubRpc.pull(request2)).andReturn(new TestPullFuture(otherPullResponse)); + EasyMock.expect(pubsubRpc.pull(EasyMock.anyObject())) + .andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes(); + renewer.add(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall(); + renewer.add(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + renewer.remove(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + renewer.add(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + EasyMock.replay(pubsubRpc, options, renewer); + try (MessageConsumer consumer = + MessageConsumerImpl.builder(options, SUBSCRIPTION, renewer, DO_NOTHING_PROCESSOR) + .maxQueuedCallbacks(2) + .build()) { + latch.await(); + } + } + + @Test + public void testMessageConsumerMaxCallbacksNack() throws Exception { + PullRequest request1 = pullRequest(2); + PullRequest request2 = pullRequest(2); + PullResponse otherPullResponse = PullResponse.newBuilder() + .addReceivedMessages(MESSAGE1_PB) + .build(); + EasyMock.expect(options.rpc()).andReturn(pubsubRpc); + EasyMock.expect(options.service()).andReturn(pubsub); + EasyMock.expect(options.projectId()).andReturn(PROJECT).anyTimes(); + EasyMock.expect(pubsub.options()).andReturn(options).times(2); + final CountDownLatch latch = new CountDownLatch(3); + EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null); + EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID2)).andReturn(null); + EasyMock.expect(pubsub.options()).andReturn(options); + EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null); + EasyMock.replay(pubsub); + EasyMock.expect(pubsubRpc.pull(request1)).andReturn(new TestPullFuture(PULL_RESPONSE)); + EasyMock.expect(pubsubRpc.pull(request2)).andReturn(new TestPullFuture(otherPullResponse)); + EasyMock.expect(pubsubRpc.pull(EasyMock.anyObject())) + .andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes(); + renewer.add(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall(); + renewer.add(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + renewer.remove(SUBSCRIPTION, ACK_ID2); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + renewer.add(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall(); + renewer.remove(SUBSCRIPTION, ACK_ID1); + EasyMock.expectLastCall().andAnswer(createAnswer(latch)); + EasyMock.replay(pubsubRpc, options, renewer); + try (MessageConsumer consumer = + MessageConsumerImpl.builder(options, SUBSCRIPTION, renewer, THROW_PROCESSOR) + .maxQueuedCallbacks(2) + .build()) { + latch.await(); + } + } + + @Test + public void testClose() throws Exception { + EasyMock.expect(options.rpc()).andReturn(pubsubRpc); + EasyMock.expect(options.service()).andReturn(pubsub); + final ExecutorService executor = EasyMock.createStrictMock(ExecutorService.class); + executor.shutdown(); + EasyMock.expectLastCall(); + EasyMock.replay(pubsubRpc, pubsub, options, executor, renewer); + MessageConsumer consumer = + MessageConsumerImpl.builder(options, SUBSCRIPTION, renewer, DO_NOTHING_PROCESSOR) + .maxQueuedCallbacks(MAX_QUEUED_CALLBACKS) + .executorFactory(new ExecutorFactory() { + @Override + public ExecutorService get() { + return executor; + } + + @Override + public void release(ExecutorService executor) { + executor.shutdown(); + } + }).build(); + consumer.close(); + // closing again should do nothing + consumer.close(); + EasyMock.verify(executor); + } +} diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java index b310c72fe850..9c846ab7c05a 100644 --- a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java @@ -28,7 +28,11 @@ import com.google.cloud.AsyncPage; import com.google.cloud.Page; import com.google.cloud.RetryParams; +import com.google.cloud.pubsub.MessageConsumerImplTest.TestPullFuture; import com.google.cloud.pubsub.PubSub.ListOption; +import com.google.cloud.pubsub.PubSub.MessageConsumer; +import com.google.cloud.pubsub.PubSub.MessageProcessor; +import com.google.cloud.pubsub.PubSub.PullOption; import com.google.cloud.pubsub.spi.PubSubRpc; import com.google.cloud.pubsub.spi.PubSubRpc.PullCallback; import com.google.cloud.pubsub.spi.PubSubRpc.PullFuture; @@ -60,6 +64,7 @@ import org.easymock.Capture; import org.easymock.EasyMock; +import org.easymock.IAnswer; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -69,7 +74,9 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -128,6 +135,12 @@ public String apply(SubscriptionId subscriptionId) { return formatSubscriptionName(subscriptionId.project(), subscriptionId.subscription()); } }; + private static final MessageProcessor DO_NOTHING = new MessageProcessor() { + @Override + public void process(Message message) throws Exception { + // do nothing + } + }; private PubSubOptions options; private PubSubRpcFactory rpcFactoryMock; @@ -139,6 +152,7 @@ public String apply(SubscriptionId subscriptionId) { public ExpectedException thrown = ExpectedException.none(); @Before + @SuppressWarnings("unchecked") public void setUp() { rpcFactoryMock = EasyMock.createStrictMock(PubSubRpcFactory.class); pubsubRpcMock = EasyMock.createStrictMock(PubSubRpc.class); @@ -1355,6 +1369,66 @@ public void testPullMessagesAsyncError() throws ExecutionException, InterruptedE EasyMock.verify(futureMock); } + @Test + public void testMessageConsumer() throws Exception { + pubsub = new PubSubImpl(options, renewerMock); + EasyMock.reset(options); + EasyMock.expect(options.service()).andReturn(pubsub); + EasyMock.expect(options.rpc()).andReturn(pubsubRpcMock); + EasyMock.expect(options.projectId()).andReturn(PROJECT); + EasyMock.replay(options); + PullRequest request = PullRequest.newBuilder() + .setSubscription(SUBSCRIPTION_NAME_PB) + .setMaxMessages(100) + .setReturnImmediately(false) + .build(); + final PullResponse response = PullResponse.getDefaultInstance(); + final CountDownLatch latch = new CountDownLatch(1); + EasyMock.expect(pubsubRpcMock.pull(request)).andAnswer(new IAnswer() { + @Override + public PullFuture answer() throws Throwable { + latch.countDown(); + return new TestPullFuture(response); + } + }); + EasyMock.replay(pubsubRpcMock, renewerMock); + try (MessageConsumer consumer = pubsub.pullAsync(SUBSCRIPTION, DO_NOTHING)) { + latch.await(); + } + } + + @Test + public void testMessageConsumerWithOptions() throws Exception { + pubsub = new PubSubImpl(options, renewerMock); + EasyMock.reset(options); + EasyMock.expect(options.service()).andReturn(pubsub); + EasyMock.expect(options.rpc()).andReturn(pubsubRpcMock); + EasyMock.expect(options.projectId()).andReturn(PROJECT); + EasyMock.replay(options); + ExecutorService executorServiceMock = EasyMock.createStrictMock(ExecutorService.class); + executorServiceMock.shutdown(); + PullRequest request = PullRequest.newBuilder() + .setSubscription(SUBSCRIPTION_NAME_PB) + .setMaxMessages(42) + .setReturnImmediately(false) + .build(); + final PullResponse response = PullResponse.getDefaultInstance(); + final CountDownLatch latch = new CountDownLatch(1); + EasyMock.expect(pubsubRpcMock.pull(request)).andAnswer(new IAnswer() { + @Override + public PullFuture answer() throws Throwable { + latch.countDown(); + return new TestPullFuture(response); + } + }); + EasyMock.replay(pubsubRpcMock, renewerMock, executorServiceMock); + PullOption[] options = + {PullOption.maxQueuedCallbacks(42), PullOption.executor(executorServiceMock, true)}; + try (MessageConsumer consumer = pubsub.pullAsync(SUBSCRIPTION, DO_NOTHING, options)) { + latch.await(); + } + } + @Test public void testAckOneMessage() { pubsub = new PubSubImpl(options, renewerMock); diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java index 620e737111f7..49fda68ec6fb 100644 --- a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java @@ -17,17 +17,22 @@ package com.google.cloud.pubsub; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import com.google.cloud.GrpcServiceOptions.ExecutorFactory; import com.google.cloud.pubsub.PubSub.ListOption; import com.google.cloud.pubsub.PubSub.PullOption; +import org.easymock.EasyMock; import org.junit.Test; +import java.util.concurrent.ExecutorService; + public class PubSubTest { private static final int PAGE_SIZE = 42; private static final String PAGE_TOKEN = "page token"; - private static final int MAX_CONCURRENT_CALLBACKS = 42; + private static final int MAX_QUEUED_CALLBACKS = 42; @Test public void testListOption() { @@ -42,9 +47,30 @@ public void testListOption() { } @Test + @SuppressWarnings("unchecked") public void testPullOptions() { - PullOption pullOption = PullOption.maxConcurrentCallbacks(MAX_CONCURRENT_CALLBACKS); - assertEquals(MAX_CONCURRENT_CALLBACKS, pullOption.value()); - assertEquals(PullOption.OptionType.MAX_CONCURRENT_CALLBACKS, pullOption.optionType()); + // max queued callbacks + PullOption pullOption = PullOption.maxQueuedCallbacks(MAX_QUEUED_CALLBACKS); + assertEquals(MAX_QUEUED_CALLBACKS, pullOption.value()); + assertEquals(PullOption.OptionType.MAX_QUEUED_CALLBACKS, pullOption.optionType()); + // auto-closing executor + ExecutorService executor = EasyMock.createNiceMock(ExecutorService.class); + pullOption = PullOption.executor(executor, true); + ExecutorFactory factory = (ExecutorFactory) pullOption.value(); + assertSame(executor, factory.get()); + executor.shutdown(); + EasyMock.expectLastCall(); + EasyMock.replay(executor); + factory.release(executor); + EasyMock.verify(executor); + assertEquals(PullOption.OptionType.EXECUTOR, pullOption.optionType()); + // auto-closing executor + EasyMock.reset(executor); + pullOption = PullOption.executor(executor, false); + factory = (ExecutorFactory) pullOption.value(); + assertSame(executor, factory.get()); + EasyMock.replay(executor); + factory.release(executor); + EasyMock.verify(executor); } } From 5260293c2b6b7dbf772c7204821b87bee9f37413 Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Mon, 20 Jun 2016 12:33:20 +0200 Subject: [PATCH 2/7] PullOption.executor option changed to PullOption.executorFactory --- .../java/com/google/cloud/pubsub/PubSub.java | 40 +++++++------------ .../com/google/cloud/pubsub/PubSubImpl.java | 4 +- .../google/cloud/pubsub/PubSubImplTest.java | 9 +++-- .../com/google/cloud/pubsub/PubSubTest.java | 23 ++--------- 4 files changed, 27 insertions(+), 49 deletions(-) diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java index 6a6df7ac6231..62b5fa6ae08d 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java @@ -88,7 +88,7 @@ final class PullOption extends Option { private static final long serialVersionUID = 4792164134340316582L; enum OptionType implements Option.OptionType { - EXECUTOR, + EXECUTOR_FACTORY, MAX_QUEUED_CALLBACKS; @SuppressWarnings("unchecked") @@ -122,31 +122,21 @@ public static PullOption maxQueuedCallbacks(int maxQueuedCallbacks) { /** * Returns an option to specify the executor used to execute message processor callbacks. The - * executor determines the number of messages that can be processed at the same time. The - * {@code shouldAutoClose} parameter sets whether the executor should be shutdown when the - * message consumer is closed. If not provided, a single-threaded executor is used. + * executor determines the number of messages that can be processed at the same time. If not + * provided, a single-threaded executor is used to execute message processor callbacks. * - * @param executor the executor used to run message processor callbacks - * @param shouldAutoClose if {@code true}, the executor is shutdown when the message consumer is - * closed. If {@code false}, the user must take care of shutting the executor down. + *

The {@link ExecutorFactory} object can be used to handle creation and release of the + * executor, possibly reusing existing executors. {@link ExecutorFactory#get()} is called when + * the message consumer is created. {@link ExecutorFactory#release(ExecutorService)} is called + * when the message consumer is closed. + * + *

For the created option to be serializable, the provided executor factory should implement + * {@link java.io.Serializable}. + * + * @param executorFactory the executor factory. */ - public static PullOption executor(final ExecutorService executor, - final boolean shouldAutoClose) { - return new PullOption(OptionType.EXECUTOR, new ExecutorFactory() { - - @Override - public ExecutorService get() { - return executor; - } - - @Override - public void release(ExecutorService toRelease) { - checkArgument(executor == toRelease, "Releasing the wrong executor"); - if (shouldAutoClose) { - executor.shutdown(); - } - } - }); + public static PullOption executorFactory(ExecutorFactory executorFactory) { + return new PullOption(OptionType.EXECUTOR_FACTORY, executorFactory); } } @@ -485,7 +475,7 @@ interface MessageConsumer extends AutoCloseable { * *

The {@link PullOption#maxQueuedCallbacks(int)} option can be used to control the maximum * number of queued messages (messages either being processed or waiting to be processed). The - * {@link PullOption#executor(ExecutorService, boolean)} can be used to provide an executor to run + * {@link PullOption#executorFactory(ExecutorFactory)} can be used to provide an executor to run * message processor callbacks. * * @param subscription the subscription from which to pull messages diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java index f16e46f729e1..355d7c36aed1 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java @@ -18,7 +18,7 @@ import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_SIZE; import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_TOKEN; -import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.EXECUTOR; +import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.EXECUTOR_FACTORY; import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.MAX_QUEUED_CALLBACKS; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.util.concurrent.Futures.lazyTransform; @@ -515,7 +515,7 @@ public MessageConsumer pullAsync(String subscription, MessageProcessor callback, Map optionMap = optionMap(options); return MessageConsumerImpl.builder(options(), subscription, ackDeadlineRenewer, callback) .maxQueuedCallbacks(MAX_QUEUED_CALLBACKS.getInteger(optionMap)) - .executorFactory(EXECUTOR.getExecutorFactory(optionMap)) + .executorFactory(EXECUTOR_FACTORY.getExecutorFactory(optionMap)) .build(); } diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java index 9c846ab7c05a..7d6dc928d883 100644 --- a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java @@ -26,6 +26,7 @@ import static org.junit.Assert.fail; import com.google.cloud.AsyncPage; +import com.google.cloud.GrpcServiceOptions.ExecutorFactory; import com.google.cloud.Page; import com.google.cloud.RetryParams; import com.google.cloud.pubsub.MessageConsumerImplTest.TestPullFuture; @@ -1405,8 +1406,10 @@ public void testMessageConsumerWithOptions() throws Exception { EasyMock.expect(options.rpc()).andReturn(pubsubRpcMock); EasyMock.expect(options.projectId()).andReturn(PROJECT); EasyMock.replay(options); + ExecutorFactory executorFactoryMock = EasyMock.createStrictMock(ExecutorFactory.class); ExecutorService executorServiceMock = EasyMock.createStrictMock(ExecutorService.class); - executorServiceMock.shutdown(); + EasyMock.expect(executorFactoryMock.get()).andReturn(executorServiceMock); + executorFactoryMock.release(executorServiceMock); PullRequest request = PullRequest.newBuilder() .setSubscription(SUBSCRIPTION_NAME_PB) .setMaxMessages(42) @@ -1421,9 +1424,9 @@ public PullFuture answer() throws Throwable { return new TestPullFuture(response); } }); - EasyMock.replay(pubsubRpcMock, renewerMock, executorServiceMock); + EasyMock.replay(pubsubRpcMock, renewerMock, executorFactoryMock, executorServiceMock); PullOption[] options = - {PullOption.maxQueuedCallbacks(42), PullOption.executor(executorServiceMock, true)}; + {PullOption.maxQueuedCallbacks(42), PullOption.executorFactory(executorFactoryMock)}; try (MessageConsumer consumer = pubsub.pullAsync(SUBSCRIPTION, DO_NOTHING, options)) { latch.await(); } diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java index 49fda68ec6fb..30db61717c9e 100644 --- a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java @@ -53,24 +53,9 @@ public void testPullOptions() { PullOption pullOption = PullOption.maxQueuedCallbacks(MAX_QUEUED_CALLBACKS); assertEquals(MAX_QUEUED_CALLBACKS, pullOption.value()); assertEquals(PullOption.OptionType.MAX_QUEUED_CALLBACKS, pullOption.optionType()); - // auto-closing executor - ExecutorService executor = EasyMock.createNiceMock(ExecutorService.class); - pullOption = PullOption.executor(executor, true); - ExecutorFactory factory = (ExecutorFactory) pullOption.value(); - assertSame(executor, factory.get()); - executor.shutdown(); - EasyMock.expectLastCall(); - EasyMock.replay(executor); - factory.release(executor); - EasyMock.verify(executor); - assertEquals(PullOption.OptionType.EXECUTOR, pullOption.optionType()); - // auto-closing executor - EasyMock.reset(executor); - pullOption = PullOption.executor(executor, false); - factory = (ExecutorFactory) pullOption.value(); - assertSame(executor, factory.get()); - EasyMock.replay(executor); - factory.release(executor); - EasyMock.verify(executor); + ExecutorFactory executorFactory = EasyMock.createStrictMock(ExecutorFactory.class); + pullOption = PullOption.executorFactory(executorFactory); + assertSame(executorFactory, pullOption.value()); + assertEquals(PullOption.OptionType.EXECUTOR_FACTORY, pullOption.optionType()); } } From ff4c0a2a00d3e30a2e821fc8b48be33c9cdf2564 Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Mon, 20 Jun 2016 12:44:31 +0200 Subject: [PATCH 3/7] Refactor MessageConsumerImpl --- .../cloud/pubsub/MessageConsumerImpl.java | 96 ++++++++----------- .../java/com/google/cloud/pubsub/PubSub.java | 2 - .../com/google/cloud/pubsub/PubSubTest.java | 2 - 3 files changed, 39 insertions(+), 61 deletions(-) diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java index ac120147d98e..249eab662bae 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java @@ -74,11 +74,10 @@ public void close(ScheduledExecutorService instance) { private final AtomicInteger queuedCallbacks; private final int maxQueuedCallbacks; private final Object futureLock = new Object(); - private final Runnable scheduleRunnable; + private final Runnable consumerRunnable; private boolean closed; private Future scheduledFuture; private PullFuture pullerFuture; - private boolean stopped = true; /** * Default executor factory for the message processor executor. By default a single-threaded @@ -99,6 +98,37 @@ public void release(ExecutorService executor) { } } + class ConsumerRunnable implements Runnable { + + @Override + public void run() { + if (closed) { + return; + } + pullerFuture = pubsubRpc.pull(createPullRequest()); + pullerFuture.addCallback(new PullCallback() { + @Override + public void success(PullResponse response) { + List messages = response.getReceivedMessagesList(); + queuedCallbacks.addAndGet(messages.size()); + for (com.google.pubsub.v1.ReceivedMessage message : messages) { + deadlineRenewer.add(subscription, message.getAckId()); + ReceivedMessage receivedMessage = ReceivedMessage.fromPb(pubsub, subscription, message); + executor.execute(ackingRunnable(receivedMessage)); + } + nextPull(); + } + + @Override + public void failure(Throwable error) { + if (!(error instanceof CancellationException)) { + nextPull(); + } + } + }); + } + } + private MessageConsumerImpl(Builder builder) { this.pubsubOptions = builder.pubsubOptions; this.subscription = builder.subscription; @@ -111,17 +141,7 @@ private MessageConsumerImpl(Builder builder) { this.executorFactory = firstNonNull(builder.executorFactory, new DefaultExecutorFactory()); this.executor = executorFactory.get(); this.maxQueuedCallbacks = firstNonNull(builder.maxQueuedCallbacks, MAX_QUEUED_CALLBACKS); - this.scheduleRunnable = new Runnable() { - @Override - public void run() { - synchronized (futureLock) { - if (closed) { - return; - } - pull(); - } - } - }; + this.consumerRunnable = new ConsumerRunnable(); nextPull(); } @@ -155,51 +175,23 @@ private PullRequest createPullRequest() { private void scheduleNextPull(long delay, TimeUnit timeUnit) { synchronized (futureLock) { - if (!closed && stopped) { - scheduledFuture = timer.schedule(scheduleRunnable, delay, timeUnit); + if (closed || scheduledFuture != null) { + return; } + scheduledFuture = timer.schedule(consumerRunnable, delay, timeUnit); } } private void nextPull() { synchronized (futureLock) { - if (closed) { + if (closed || queuedCallbacks.get() == maxQueuedCallbacks) { + scheduledFuture = null; return; } - if (queuedCallbacks.get() == maxQueuedCallbacks) { - stopped = true; - } else { - stopped = false; - scheduledFuture = timer.submit(scheduleRunnable); - } + scheduledFuture = timer.submit(consumerRunnable); } } - private void pull() { - pullerFuture = pubsubRpc.pull(createPullRequest()); - pullerFuture.addCallback(new PullCallback() { - @Override - public void success(PullResponse response) { - List messages = response.getReceivedMessagesList(); - queuedCallbacks.addAndGet(messages.size()); - for (com.google.pubsub.v1.ReceivedMessage message : messages) { - deadlineRenewer.add(subscription, message.getAckId()); - final ReceivedMessage receivedMessage = - ReceivedMessage.fromPb(pubsub, subscription, message); - executor.execute(ackingRunnable(receivedMessage)); - } - nextPull(); - } - - @Override - public void failure(Throwable error) { - if (!(error instanceof CancellationException)) { - nextPull(); - } - } - }); - } - @Override public void close() { synchronized (futureLock) { @@ -268,14 +260,4 @@ static Builder builder(PubSubOptions pubsubOptions, String subscription, AckDeadlineRenewer deadlineRenewer, MessageProcessor messageProcessor) { return new Builder(pubsubOptions, subscription, deadlineRenewer, messageProcessor); } - - /** - * Returns a {@code MessageConsumerImpl} objects given the service options, the subscription from - * which messages must be pulled, the acknowledge deadline renewer and a message processor used to - * process messages. - */ - static Builder of(PubSubOptions pubsubOptions, String subscription, - AckDeadlineRenewer deadlineRenewer, MessageProcessor messageProcessor) { - return new Builder(pubsubOptions, subscription, deadlineRenewer, messageProcessor); - } } diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java index 62b5fa6ae08d..042f64c317bf 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java @@ -16,8 +16,6 @@ package com.google.cloud.pubsub; -import static com.google.common.base.Preconditions.checkArgument; - import com.google.cloud.AsyncPage; import com.google.cloud.GrpcServiceOptions.ExecutorFactory; import com.google.cloud.Page; diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java index 30db61717c9e..f99cfa2f728e 100644 --- a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java @@ -26,8 +26,6 @@ import org.easymock.EasyMock; import org.junit.Test; -import java.util.concurrent.ExecutorService; - public class PubSubTest { private static final int PAGE_SIZE = 42; From d7ba0aad12b3dac08e1d97fcd10bdbd9dcb00d8c Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Mon, 20 Jun 2016 23:14:18 +0200 Subject: [PATCH 4/7] Implement consumer restart based on processed callbacks --- .../cloud/pubsub/MessageConsumerImpl.java | 109 ++++++++++++------ 1 file changed, 76 insertions(+), 33 deletions(-) diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java index 249eab662bae..b9eb8aa3b2b6 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java @@ -37,7 +37,6 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** @@ -75,10 +74,39 @@ public void close(ScheduledExecutorService instance) { private final int maxQueuedCallbacks; private final Object futureLock = new Object(); private final Runnable consumerRunnable; + private final RestartPolicy restartPolicy; private boolean closed; private Future scheduledFuture; private PullFuture pullerFuture; + /** + * Interface for policies according to which the consumer should be restarted. + */ + interface RestartPolicy { + + boolean shouldRestart(int queuedCallbacks); + } + + /** + * Default restart policy. Restarts the consumer once {@code restartThreshold} messages out of + * {@code maxQueuedCallbacks} have already been processed. + */ + static class DefaultRestartPolicy implements RestartPolicy { + + final int maxQueuedCallbacks; + final int restartThreshold; + + DefaultRestartPolicy(int maxQueuedCallbacks, int restartThreshold) { + this.maxQueuedCallbacks = maxQueuedCallbacks; + this.restartThreshold = restartThreshold; + } + + @Override + public boolean shouldRestart(int queuedCallbacks) { + return (maxQueuedCallbacks - queuedCallbacks) >= restartThreshold; + } + } + /** * Default executor factory for the message processor executor. By default a single-threaded * executor is used. @@ -127,6 +155,33 @@ public void failure(Throwable error) { } }); } + + private PullRequest createPullRequest() { + return PullRequest.newBuilder() + .setSubscription(formatSubscriptionName(pubsubOptions.projectId(), subscription)) + .setMaxMessages(maxQueuedCallbacks - queuedCallbacks.get()) + .setReturnImmediately(false) + .build(); + } + + private Runnable ackingRunnable(final ReceivedMessage receivedMessage) { + return new Runnable() { + @Override + public void run() { + try { + messageProcessor.process(receivedMessage); + pubsub.ackAsync(receivedMessage.subscription(), receivedMessage.ackId()); + } catch (Exception ex) { + pubsub.nackAsync(receivedMessage.subscription(), receivedMessage.ackId()); + } finally { + deadlineRenewer.remove(receivedMessage.subscription(), receivedMessage.ackId()); + queuedCallbacks.decrementAndGet(); + // We can now pull more messages, according to the restart policy. + restartIfNeeded(); + } + } + }; + } } private MessageConsumerImpl(Builder builder) { @@ -138,47 +193,24 @@ private MessageConsumerImpl(Builder builder) { this.deadlineRenewer = builder.deadlineRenewer; this.queuedCallbacks = new AtomicInteger(); this.timer = SharedResourceHolder.get(TIMER); - this.executorFactory = firstNonNull(builder.executorFactory, new DefaultExecutorFactory()); + this.executorFactory = + builder.executorFactory != null ? builder.executorFactory : new DefaultExecutorFactory(); this.executor = executorFactory.get(); this.maxQueuedCallbacks = firstNonNull(builder.maxQueuedCallbacks, MAX_QUEUED_CALLBACKS); this.consumerRunnable = new ConsumerRunnable(); + int restartThreshold = builder.restartThreshold != null ? builder.restartThreshold + : this.maxQueuedCallbacks / 2; + this.restartPolicy = new DefaultRestartPolicy(maxQueuedCallbacks, restartThreshold); nextPull(); } - private Runnable ackingRunnable(final ReceivedMessage receivedMessage) { - return new Runnable() { - @Override - public void run() { - try { - messageProcessor.process(receivedMessage); - pubsub.ackAsync(receivedMessage.subscription(), receivedMessage.ackId()); - } catch (Exception ex) { - pubsub.nackAsync(receivedMessage.subscription(), receivedMessage.ackId()); - } finally { - deadlineRenewer.remove(receivedMessage.subscription(), receivedMessage.ackId()); - queuedCallbacks.decrementAndGet(); - // We can now pull more messages. We do not pull immediately to possibly wait for other - // callbacks to end - scheduleNextPull(500, TimeUnit.MILLISECONDS); - } - } - }; - } - - private PullRequest createPullRequest() { - return PullRequest.newBuilder() - .setSubscription(formatSubscriptionName(pubsubOptions.projectId(), subscription)) - .setMaxMessages(maxQueuedCallbacks - queuedCallbacks.get()) - .setReturnImmediately(false) - .build(); - } - - private void scheduleNextPull(long delay, TimeUnit timeUnit) { + private void restartIfNeeded() { synchronized (futureLock) { - if (closed || scheduledFuture != null) { + if (closed || scheduledFuture != null + || !restartPolicy.shouldRestart(queuedCallbacks.get())) { return; } - scheduledFuture = timer.schedule(consumerRunnable, delay, timeUnit); + scheduledFuture = timer.submit(consumerRunnable); } } @@ -217,6 +249,7 @@ static final class Builder { private final MessageProcessor messageProcessor; private Integer maxQueuedCallbacks; private ExecutorFactory executorFactory; + private Integer restartThreshold; Builder(PubSubOptions pubsubOptions, String subscription, AckDeadlineRenewer deadlineRenewer, MessageProcessor messageProcessor) { @@ -243,6 +276,16 @@ Builder executorFactory(ExecutorFactory executorFactory) { return this; } + /** + * Sets the restart threshold. If the consumer was interrupted for reaching the maximum number + * of queued callbacks, it will be restarted only once at least {@code restartThreshold} + * callbacks have completed their execution. + */ + Builder restartThreshold(Integer restartThreshold) { + this.restartThreshold = restartThreshold; + return this; + } + /** * Creates a {@code MessageConsumerImpl} object. */ From 092acfcd5f685fbc18a6c9449ab7b418d281eb7f Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Tue, 21 Jun 2016 18:52:12 +0200 Subject: [PATCH 5/7] Rename RestartPolicy to NextPullPolicy --- .../cloud/pubsub/MessageConsumerImpl.java | 49 +++++++++---------- 1 file changed, 24 insertions(+), 25 deletions(-) diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java index b9eb8aa3b2b6..672448e87470 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java @@ -74,36 +74,36 @@ public void close(ScheduledExecutorService instance) { private final int maxQueuedCallbacks; private final Object futureLock = new Object(); private final Runnable consumerRunnable; - private final RestartPolicy restartPolicy; + private final NextPullPolicy pullPolicy; private boolean closed; private Future scheduledFuture; private PullFuture pullerFuture; /** - * Interface for policies according to which the consumer should be restarted. + * Interface for policies according to which the consumer should pull messages. */ - interface RestartPolicy { + interface NextPullPolicy { - boolean shouldRestart(int queuedCallbacks); + boolean shouldPull(int queuedCallbacks); } /** - * Default restart policy. Restarts the consumer once {@code restartThreshold} messages out of - * {@code maxQueuedCallbacks} have already been processed. + * Default pull policy. The consumer will pull again once {@code nextPullThreshold} messages out + * of {@code maxQueuedCallbacks} have been processed. */ - static class DefaultRestartPolicy implements RestartPolicy { + static class DefaultNextPullPolicy implements NextPullPolicy { final int maxQueuedCallbacks; - final int restartThreshold; + final int nextPullThreshold; - DefaultRestartPolicy(int maxQueuedCallbacks, int restartThreshold) { + DefaultNextPullPolicy(int maxQueuedCallbacks, int nextPullThreshold) { this.maxQueuedCallbacks = maxQueuedCallbacks; - this.restartThreshold = restartThreshold; + this.nextPullThreshold = nextPullThreshold; } @Override - public boolean shouldRestart(int queuedCallbacks) { - return (maxQueuedCallbacks - queuedCallbacks) >= restartThreshold; + public boolean shouldPull(int queuedCallbacks) { + return (maxQueuedCallbacks - queuedCallbacks) >= nextPullThreshold; } } @@ -176,8 +176,8 @@ public void run() { } finally { deadlineRenewer.remove(receivedMessage.subscription(), receivedMessage.ackId()); queuedCallbacks.decrementAndGet(); - // We can now pull more messages, according to the restart policy. - restartIfNeeded(); + // We can now pull more messages, according to the next pull policy. + pullIfNeeded(); } } }; @@ -198,16 +198,15 @@ private MessageConsumerImpl(Builder builder) { this.executor = executorFactory.get(); this.maxQueuedCallbacks = firstNonNull(builder.maxQueuedCallbacks, MAX_QUEUED_CALLBACKS); this.consumerRunnable = new ConsumerRunnable(); - int restartThreshold = builder.restartThreshold != null ? builder.restartThreshold + int nextPullThreshold = builder.nextPullThreshold != null ? builder.nextPullThreshold : this.maxQueuedCallbacks / 2; - this.restartPolicy = new DefaultRestartPolicy(maxQueuedCallbacks, restartThreshold); + this.pullPolicy = new DefaultNextPullPolicy(maxQueuedCallbacks, nextPullThreshold); nextPull(); } - private void restartIfNeeded() { + private void pullIfNeeded() { synchronized (futureLock) { - if (closed || scheduledFuture != null - || !restartPolicy.shouldRestart(queuedCallbacks.get())) { + if (closed || scheduledFuture != null || !pullPolicy.shouldPull(queuedCallbacks.get())) { return; } scheduledFuture = timer.submit(consumerRunnable); @@ -249,7 +248,7 @@ static final class Builder { private final MessageProcessor messageProcessor; private Integer maxQueuedCallbacks; private ExecutorFactory executorFactory; - private Integer restartThreshold; + private Integer nextPullThreshold; Builder(PubSubOptions pubsubOptions, String subscription, AckDeadlineRenewer deadlineRenewer, MessageProcessor messageProcessor) { @@ -277,12 +276,12 @@ Builder executorFactory(ExecutorFactory executorFactory) { } /** - * Sets the restart threshold. If the consumer was interrupted for reaching the maximum number - * of queued callbacks, it will be restarted only once at least {@code restartThreshold} - * callbacks have completed their execution. + * Sets a threshold for the next pull. If the consumer stopped pulling due to reaching the + * maximum number of queued callbacks, it will be pull again only once at least + * {@code nextPullThreshold} callbacks have completed their execution. */ - Builder restartThreshold(Integer restartThreshold) { - this.restartThreshold = restartThreshold; + Builder nextPullThreshold(Integer nextPullThreshold) { + this.nextPullThreshold = nextPullThreshold; return this; } From 61bdd8109e649cb6459203dee9ed2b7192d394e4 Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Tue, 21 Jun 2016 18:52:29 +0200 Subject: [PATCH 6/7] Fix MessageConsumerImpl tests --- .../cloud/pubsub/MessageConsumerImplTest.java | 81 +++++++++++++++---- 1 file changed, 67 insertions(+), 14 deletions(-) diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/MessageConsumerImplTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/MessageConsumerImplTest.java index 91d9ca8e78b6..1a0d70b8150d 100644 --- a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/MessageConsumerImplTest.java +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/MessageConsumerImplTest.java @@ -38,6 +38,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; public class MessageConsumerImplTest { @@ -213,20 +214,33 @@ public void testMessageConsumerMultipleCallsAck() throws Exception { PullResponse response1 = PullResponse.newBuilder() .addReceivedMessages(MESSAGE1_PB) .build(); - PullResponse response2 = PullResponse.newBuilder() + final PullResponse response2 = PullResponse.newBuilder() .addReceivedMessages(MESSAGE2_PB) .build(); EasyMock.expect(options.rpc()).andReturn(pubsubRpc); EasyMock.expect(options.service()).andReturn(pubsub); EasyMock.expect(options.projectId()).andReturn(PROJECT).anyTimes(); + final CountDownLatch nextPullLatch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(2); EasyMock.expect(pubsub.options()).andReturn(options); - EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null); + EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID1)).andAnswer(new IAnswer>() { + @Override + public Future answer() throws Throwable { + nextPullLatch.await(); + return null; + } + }); EasyMock.expect(pubsub.options()).andReturn(options); EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID2)).andReturn(null); EasyMock.replay(pubsub); EasyMock.expect(pubsubRpc.pull(request1)).andReturn(new TestPullFuture(response1)); - EasyMock.expect(pubsubRpc.pull(request2)).andReturn(new TestPullFuture(response2)); + EasyMock.expect(pubsubRpc.pull(request2)).andAnswer(new IAnswer() { + @Override + public PullFuture answer() throws Throwable { + nextPullLatch.countDown(); + return new TestPullFuture(response2); + } + }); EasyMock.expect(pubsubRpc.pull(EasyMock.anyObject())) .andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes(); renewer.add(SUBSCRIPTION, ACK_ID1); @@ -253,20 +267,33 @@ public void testMessageConsumerMultipleCallsNack() throws Exception { PullResponse response1 = PullResponse.newBuilder() .addReceivedMessages(MESSAGE1_PB) .build(); - PullResponse response2 = PullResponse.newBuilder() + final PullResponse response2 = PullResponse.newBuilder() .addReceivedMessages(MESSAGE2_PB) .build(); EasyMock.expect(options.rpc()).andReturn(pubsubRpc); EasyMock.expect(options.service()).andReturn(pubsub); EasyMock.expect(options.projectId()).andReturn(PROJECT).anyTimes(); + final CountDownLatch nextPullLatch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(2); EasyMock.expect(pubsub.options()).andReturn(options); - EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null); + EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID1)).andAnswer(new IAnswer>() { + @Override + public Future answer() throws Throwable { + nextPullLatch.await(); + return null; + } + }); EasyMock.expect(pubsub.options()).andReturn(options); EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID2)).andReturn(null); EasyMock.replay(pubsub); EasyMock.expect(pubsubRpc.pull(request1)).andReturn(new TestPullFuture(response1)); - EasyMock.expect(pubsubRpc.pull(request2)).andReturn(new TestPullFuture(response2)); + EasyMock.expect(pubsubRpc.pull(request2)).andAnswer(new IAnswer() { + @Override + public PullFuture answer() throws Throwable { + nextPullLatch.countDown(); + return new TestPullFuture(response2); + } + }); EasyMock.expect(pubsubRpc.pull(EasyMock.anyObject())) .andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes(); renewer.add(SUBSCRIPTION, ACK_ID1); @@ -289,22 +316,35 @@ public void testMessageConsumerMultipleCallsNack() throws Exception { @Test public void testMessageConsumerMaxCallbacksAck() throws Exception { PullRequest request1 = pullRequest(2); - PullRequest request2 = pullRequest(2); - PullResponse otherPullResponse = PullResponse.newBuilder() + PullRequest request2 = pullRequest(1); + final PullResponse otherPullResponse = PullResponse.newBuilder() .addReceivedMessages(MESSAGE1_PB) .build(); EasyMock.expect(options.rpc()).andReturn(pubsubRpc); EasyMock.expect(options.service()).andReturn(pubsub); EasyMock.expect(options.projectId()).andReturn(PROJECT).anyTimes(); EasyMock.expect(pubsub.options()).andReturn(options).times(2); + final CountDownLatch nextPullLatch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(3); EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null); - EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID2)).andReturn(null); + EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID2)).andAnswer(new IAnswer>() { + @Override + public Future answer() throws Throwable { + nextPullLatch.await(); + return null; + } + }); EasyMock.expect(pubsub.options()).andReturn(options); EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null); EasyMock.replay(pubsub); EasyMock.expect(pubsubRpc.pull(request1)).andReturn(new TestPullFuture(PULL_RESPONSE)); - EasyMock.expect(pubsubRpc.pull(request2)).andReturn(new TestPullFuture(otherPullResponse)); + EasyMock.expect(pubsubRpc.pull(request2)).andAnswer(new IAnswer() { + @Override + public PullFuture answer() throws Throwable { + nextPullLatch.countDown(); + return new TestPullFuture(otherPullResponse); + } + }); EasyMock.expect(pubsubRpc.pull(EasyMock.anyObject())) .andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes(); renewer.add(SUBSCRIPTION, ACK_ID1); @@ -331,22 +371,35 @@ public void testMessageConsumerMaxCallbacksAck() throws Exception { @Test public void testMessageConsumerMaxCallbacksNack() throws Exception { PullRequest request1 = pullRequest(2); - PullRequest request2 = pullRequest(2); - PullResponse otherPullResponse = PullResponse.newBuilder() + PullRequest request2 = pullRequest(1); + final PullResponse otherPullResponse = PullResponse.newBuilder() .addReceivedMessages(MESSAGE1_PB) .build(); EasyMock.expect(options.rpc()).andReturn(pubsubRpc); EasyMock.expect(options.service()).andReturn(pubsub); EasyMock.expect(options.projectId()).andReturn(PROJECT).anyTimes(); EasyMock.expect(pubsub.options()).andReturn(options).times(2); + final CountDownLatch nextPullLatch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(3); EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null); - EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID2)).andReturn(null); + EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID2)).andAnswer(new IAnswer>() { + @Override + public Future answer() throws Throwable { + nextPullLatch.await(); + return null; + } + }); EasyMock.expect(pubsub.options()).andReturn(options); EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null); EasyMock.replay(pubsub); EasyMock.expect(pubsubRpc.pull(request1)).andReturn(new TestPullFuture(PULL_RESPONSE)); - EasyMock.expect(pubsubRpc.pull(request2)).andReturn(new TestPullFuture(otherPullResponse)); + EasyMock.expect(pubsubRpc.pull(request2)).andAnswer(new IAnswer() { + @Override + public PullFuture answer() throws Throwable { + nextPullLatch.countDown(); + return new TestPullFuture(otherPullResponse); + } + }); EasyMock.expect(pubsubRpc.pull(EasyMock.anyObject())) .andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes(); renewer.add(SUBSCRIPTION, ACK_ID1); From 66f273c7ec982b5b98b5e7fdf318213de56e60f5 Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Wed, 22 Jun 2016 08:32:56 +0200 Subject: [PATCH 7/7] Rename TIMER to CONSUMER_EXECUTOR and make it an ExecutorService --- .../cloud/pubsub/MessageConsumerImpl.java | 24 ++++++++----------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java index 672448e87470..cb08dde00327 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java @@ -35,8 +35,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; /** @@ -46,17 +44,15 @@ final class MessageConsumerImpl implements MessageConsumer { private static final int MAX_QUEUED_CALLBACKS = 100; // shared scheduled executor, used to schedule pulls - private static final SharedResourceHolder.Resource TIMER = - new SharedResourceHolder.Resource() { + private static final SharedResourceHolder.Resource CONSUMER_EXECUTOR = + new SharedResourceHolder.Resource() { @Override - public ScheduledExecutorService create() { - ScheduledThreadPoolExecutor timer = new ScheduledThreadPoolExecutor(1); - timer.setRemoveOnCancelPolicy(true); - return timer; + public ExecutorService create() { + return Executors.newSingleThreadExecutor(); } @Override - public void close(ScheduledExecutorService instance) { + public void close(ExecutorService instance) { instance.shutdown(); } }; @@ -67,7 +63,7 @@ public void close(ScheduledExecutorService instance) { private final AckDeadlineRenewer deadlineRenewer; private final String subscription; private final MessageProcessor messageProcessor; - private final ScheduledExecutorService timer; + private final ExecutorService consumerExecutor; private final ExecutorFactory executorFactory; private final ExecutorService executor; private final AtomicInteger queuedCallbacks; @@ -192,7 +188,7 @@ private MessageConsumerImpl(Builder builder) { this.pubsub = pubsubOptions.service(); this.deadlineRenewer = builder.deadlineRenewer; this.queuedCallbacks = new AtomicInteger(); - this.timer = SharedResourceHolder.get(TIMER); + this.consumerExecutor = SharedResourceHolder.get(CONSUMER_EXECUTOR); this.executorFactory = builder.executorFactory != null ? builder.executorFactory : new DefaultExecutorFactory(); this.executor = executorFactory.get(); @@ -209,7 +205,7 @@ private void pullIfNeeded() { if (closed || scheduledFuture != null || !pullPolicy.shouldPull(queuedCallbacks.get())) { return; } - scheduledFuture = timer.submit(consumerRunnable); + scheduledFuture = consumerExecutor.submit(consumerRunnable); } } @@ -219,7 +215,7 @@ private void nextPull() { scheduledFuture = null; return; } - scheduledFuture = timer.submit(consumerRunnable); + scheduledFuture = consumerExecutor.submit(consumerRunnable); } } @@ -237,7 +233,7 @@ public void close() { pullerFuture.cancel(true); } } - SharedResourceHolder.release(TIMER, timer); + SharedResourceHolder.release(CONSUMER_EXECUTOR, consumerExecutor); executorFactory.release(executor); }