From a69a7c9e67b2745cd7a4867bdc52320baab2f8a6 Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Wed, 25 May 2016 20:56:14 +0200 Subject: [PATCH] Add AckDeadlineRenewer and AckDeadlineRenewerImpl classes --- .../cloud/pubsub/AckDeadlineRenewParams.java | 213 +++++++++ .../cloud/pubsub/AckDeadlineRenewer.java | 69 +++ .../cloud/pubsub/AckDeadlineRenewerImpl.java | 390 +++++++++++++++++ .../google/cloud/pubsub/PubSubOptions.java | 30 ++ .../pubsub/AckDeadlineRenewParamsTest.java | 104 +++++ .../pubsub/AckDeadlineRenewerImplTest.java | 409 ++++++++++++++++++ 6 files changed, 1215 insertions(+) create mode 100644 gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewParams.java create mode 100644 gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java create mode 100644 gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewerImpl.java create mode 100644 gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/AckDeadlineRenewParamsTest.java create mode 100644 gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/AckDeadlineRenewerImplTest.java diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewParams.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewParams.java new file mode 100644 index 000000000000..728dca70ea91 --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewParams.java @@ -0,0 +1,213 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.base.MoreObjects; + +import java.io.Serializable; +import java.util.Objects; + +/** + * Parameters for configuring automatic ack deadline renewals with an exponential backoff. The first + * time the ack deadline is renewed for a pulled message, its value is set to + * {@link #initialDeadlineSeconds()}. For each subsequent ack deadline renewal, the ack deadline is + * calculated as: + * + *

{@code deadlineBackoffFactor ^ renewals * initialDeadlineSeconds} but would be upper-bounded + * to {@code maxDeadlineSeconds} + */ +public final class AckDeadlineRenewParams implements Serializable { + + private static final long serialVersionUID = -8142363212304296426L; + + public static final int DEFAULT_INITIAL_DEADLINE_SECONDS = 10; + public static final int DEFAULT_MAX_DEADLINED_SECONDS = 80; + public static final double DEFAULT_DEADLINE_BACKOFF_FACTOR = 2.0; + + private final int initialDeadlineSeconds; + private final int maxDeadlineSeconds; + private final double deadlineBackoffFactor; + + private static final AckDeadlineRenewParams DEFAULT_INSTANCE = + new AckDeadlineRenewParams(new Builder()); + private static final AckDeadlineRenewParams NO_BACKOFF = builder() + .maxDeadlineSeconds(DEFAULT_INITIAL_DEADLINE_SECONDS) + .deadlineBackoffFactor(1) + .initialDeadlineSeconds(DEFAULT_INITIAL_DEADLINE_SECONDS) + .build(); + + /** + * {@code AckDeadlineRenewParams} builder. + */ + public static final class Builder { + + private int initialDeadlineSeconds; + private int maxDeadlineSeconds; + private double deadlineBackoffFactor; + + private Builder() { + this.initialDeadlineSeconds = DEFAULT_INITIAL_DEADLINE_SECONDS; + this.maxDeadlineSeconds = DEFAULT_MAX_DEADLINED_SECONDS; + this.deadlineBackoffFactor = DEFAULT_DEADLINE_BACKOFF_FACTOR; + } + + Builder(AckDeadlineRenewParams renewParams) { + this.initialDeadlineSeconds = renewParams.initialDeadlineSeconds; + this.maxDeadlineSeconds = renewParams.maxDeadlineSeconds; + this.deadlineBackoffFactor = renewParams.deadlineBackoffFactor; + } + + /** + * Sets the initial deadline value, used the first time the ack deadline is renewed for a pulled + * message. This value must be >= 10 seconds. + * + * @param initialDeadlineSeconds the initial deadline value, in seconds + * @return the Builder for chaining + */ + public Builder initialDeadlineSeconds(int initialDeadlineSeconds) { + this.initialDeadlineSeconds = initialDeadlineSeconds; + return this; + } + + /** + * Sets the maximum deadline value. This value must be greater or equal to the value set for + * {@link #initialDeadlineSeconds(int)}. + * + * @param maxDeadlineSeconds the maximum deadline value, in seconds + * @return the Builder for chaining + */ + public Builder maxDeadlineSeconds(int maxDeadlineSeconds) { + this.maxDeadlineSeconds = maxDeadlineSeconds; + return this; + } + + /** + * Sets the deadline backoff factor, used to compute deadline renewal values after the initial + * one. This value must be >= 1. + * + * @param deadlineBackoffFactor the backoff factor + * @return the Builder for chaining + */ + public Builder deadlineBackoffFactor(double deadlineBackoffFactor) { + this.deadlineBackoffFactor = deadlineBackoffFactor; + return this; + } + + /** + * Create an instance of {@code AckDeadlineRenewParams} with the parameters set in this builder. + * + * @return a new instance of {@code AckDeadlineRenewParams} + */ + public AckDeadlineRenewParams build() { + return new AckDeadlineRenewParams(this); + } + } + + private AckDeadlineRenewParams(Builder builder) { + initialDeadlineSeconds = builder.initialDeadlineSeconds; + maxDeadlineSeconds = builder.maxDeadlineSeconds; + deadlineBackoffFactor = builder.deadlineBackoffFactor; + checkArgument(initialDeadlineSeconds >= 10, "Initial deadline must be >= 10 seconds"); + checkArgument(maxDeadlineSeconds >= initialDeadlineSeconds, + "Max deadline must be greater or equal to the initial deadline"); + checkArgument(deadlineBackoffFactor >= 1.0, "Deadline backoff factor must be >= 1"); + } + + /** + * Returns an {@code AckDeadlineRenewParams} object with default values: initial deadline is + * {@value DEFAULT_INITIAL_DEADLINE_SECONDS} seconds, max deadline is + * {@value DEFAULT_MAX_DEADLINED_SECONDS} seconds and the backoff factor is + * {@value DEFAULT_DEADLINE_BACKOFF_FACTOR}. + */ + public static AckDeadlineRenewParams defaultInstance() { + return DEFAULT_INSTANCE; + } + + /** + * Returns an {@code AckDeadlineRenewParams} object that does no backoff, deadline is always set + * to 10 seconds. + */ + public static AckDeadlineRenewParams noBackoff() { + return NO_BACKOFF; + } + + /** + * Returns the initial deadline value, used the first time the ack deadline is renewed for a + * pulled message. + */ + public int initialDeadlineSeconds() { + return initialDeadlineSeconds; + } + + /** + * Returns the maximum deadline value. + */ + public int maxDeadlineSeconds() { + return maxDeadlineSeconds; + } + + /** + * Sets the deadline backoff factor, used to compute deadline renewal values after the initial + * one. + */ + public double deadlineBackoffFactor() { + return deadlineBackoffFactor; + } + + @Override + public int hashCode() { + return Objects.hash(initialDeadlineSeconds, maxDeadlineSeconds, deadlineBackoffFactor); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof AckDeadlineRenewParams)) { + return false; + } + AckDeadlineRenewParams other = (AckDeadlineRenewParams) obj; + return initialDeadlineSeconds == other.initialDeadlineSeconds + && maxDeadlineSeconds == other.maxDeadlineSeconds + && deadlineBackoffFactor == other.deadlineBackoffFactor; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("initialDeadlineSeconds", initialDeadlineSeconds) + .add("maxDeadlineSeconds", maxDeadlineSeconds) + .add("deadlineBackoffFactor", deadlineBackoffFactor).toString(); + } + + /** + * Returns a builder for {@code AckDeadlineRenewParams} objects. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Returns a builder for the current {@code AckDeadlineRenewParams} object. + */ + public Builder toBuilder() { + return new Builder(this); + } +} diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java new file mode 100644 index 000000000000..5fb4c5025a33 --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java @@ -0,0 +1,69 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +/** + * Interface for an automatic ack deadline renewer. An ack deadline renewer automatically renews + * the acknowledge deadline of messages added to it (via {@link #add(String, String)} or + * {@link #add(String, Iterable)}. The acknowledge deadlines of added messages are renewed until + * the messages are explicitly removed using either {@link #remove(String, String)} or + * {@link #remove(String, Iterable)}. + */ +interface AckDeadlineRenewer extends AutoCloseable { + + /** + * Adds a new message for which the acknowledge deadline should be automatically renewed. The + * message is identified by the subscription from which it was pulled and its acknowledge id. + * Auto-renewal will take place until the message is removed (see {@link #remove(String, String)} + * or {@link #remove(String, Iterable)}). + * + * @param subscription the subscription from which the message has been pulled + * @param ackId the message's acknowledge id + */ + void add(String subscription, String ackId); + + /** + * Adds new messages for which the acknowledge deadlined should be automatically renewed. The + * messages are identified by the subscription from which they were pulled and their + * acknowledge id. Auto-renewal will take place until the messages are removed (see + * {@link #remove(String, String)} or {@link #remove(String, Iterable)}). + * + * @param subscription the subscription from which the messages have been pulled + * @param ackIds the acknowledge ids of the messages + */ + void add(String subscription, Iterable ackIds); + + /** + * Removes a message from this {@code AckDeadlineRenewer}. The message is identified by the + * subscription from which it was pulled and its acknowledge id. Once the message is removed from + * this {@code AckDeadlineRenewer}, automated ack deadline renewals will stop. + * + * @param subscription the subscription from which the message has been pulled + * @param ackId the message's acknowledge id + */ + void remove(String subscription, String ackId); + + /** + * Removes messages from this {@code AckDeadlineRenewer}. The messages are identified by the + * subscription from which they were pulled and their acknowledge id. Once the messages are + * removed from this {@code AckDeadlineRenewer}, automated ack deadline renewals will stop. + * + * @param subscription the subscription from which the message has been pulled + * @param ackIds the acknowledge ids of the messages + */ + void remove(String subscription, Iterable ackIds); +} diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewerImpl.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewerImpl.java new file mode 100644 index 000000000000..a5b3a3a74a8f --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewerImpl.java @@ -0,0 +1,390 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import static java.lang.StrictMath.min; +import static java.lang.StrictMath.pow; + +import com.google.cloud.GrpcServiceOptions.ExecutorFactory; +import com.google.cloud.ServiceOptions.Clock; +import com.google.common.base.MoreObjects; +import com.google.common.collect.LinkedListMultimap; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Multimaps; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +class AckDeadlineRenewerImpl implements AckDeadlineRenewer { + + private static final int MIN_DEADLINE_MILLISECONDS = 10_000; + private static final int RENEW_THRESHOLD_MILLISECONDS = 2_000; + + private final Object lock = new Object(); + private final ScheduledExecutorService executor; + private final ExecutorFactory executorFactory; + private final PubSub pubsub; + private final Clock clock; + private final AckDeadlineRenewParams renewParams; + private final Queue messageQueue; + private final Map messageDeadlines; + private final ScheduledFuture renewerFuture; + + /** + * This class holds the identity of a message to renew: subscription and acknowledge id. + */ + private static class MessageId { + + private final String subscription; + private final String ackId; + + MessageId(String subscription, String ackId) { + this.subscription = subscription; + this.ackId = ackId; + } + + /** + * Returns the subscription name. + */ + String subscription() { + return subscription; + } + + /** + * Returns the message acknowledge id. + */ + String ackId() { + return ackId; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof MessageId)) { + return false; + } + MessageId other = (MessageId) obj; + return Objects.equals(other.subscription, this.subscription) + && Objects.equals(other.ackId, this.ackId); + } + + @Override + public int hashCode() { + return Objects.hash(subscription, ackId); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("subscription", subscription) + .add("ackId", ackId) + .toString(); + } + } + + /** + * This class holds the identity of a message to renew and its expected ack deadline. + */ + private static final class Message implements Comparable { + + private final MessageId messageId; + private final Long expectedDeadline; + + Message(MessageId messageId, Long expectedDeadline) { + this.messageId = messageId; + this.expectedDeadline = expectedDeadline; + } + + MessageId messageId() { + return messageId; + } + + Long expectedDeadline() { + return expectedDeadline; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof Message)) { + return false; + } + Message other = (Message) obj; + return Objects.equals(other.messageId, this.messageId) + && Objects.equals(other.expectedDeadline, this.expectedDeadline); + } + + @Override + public int hashCode() { + return Objects.hash(messageId, expectedDeadline); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("messageId", messageId) + .add("expectedDeadline", expectedDeadline) + .toString(); + } + + @Override + public int compareTo(Message other) { + return expectedDeadline.compareTo(other.expectedDeadline); + } + } + + /** + * This class holds the most updated expected deadline for a message and its renewal count. + */ + private static final class ExpectedDeadlineAndCount { + + private final long expectedDeadline; + private final long renewalCount; + + ExpectedDeadlineAndCount(long expectedDeadline, long renewalCount) { + this.expectedDeadline = expectedDeadline; + this.renewalCount = renewalCount; + } + + /** + * Returns the expected deadline. + */ + long expectedDeadline() { + return expectedDeadline; + } + + /** + * Returns how many times a message has been renewed so far. + */ + long renewalCount() { + return renewalCount; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof ExpectedDeadlineAndCount)) { + return false; + } + ExpectedDeadlineAndCount other = (ExpectedDeadlineAndCount) obj; + return Objects.equals(other.expectedDeadline, this.expectedDeadline) + && Objects.equals(other.renewalCount, this.renewalCount); + } + + @Override + public int hashCode() { + return Objects.hash(expectedDeadline, renewalCount); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("expectedDeadline", expectedDeadline) + .add("renewalCount", renewalCount) + .toString(); + } + } + + /** + * This class holds a subscription name and a relative acknowledge deadline. Objects of this class + * are used to group messages before renewing their deadline, to limit the number of requests + * needed. + */ + private static final class SubscriptionAndDeadline { + + private final String subscription; + private final int relativeDeadline; + + SubscriptionAndDeadline(String subscription, int relativeDeadline) { + this.subscription = subscription; + this.relativeDeadline = relativeDeadline; + } + + /** + * Returns the subscription name. + */ + String subscription() { + return subscription; + } + + /** + * Return the relative acknowledge deadline, used to send modify ack deadline requests. + */ + int relativeDeadline() { + return relativeDeadline; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof SubscriptionAndDeadline)) { + return false; + } + SubscriptionAndDeadline other = (SubscriptionAndDeadline) obj; + return Objects.equals(other.subscription, this.subscription) + && Objects.equals(other.relativeDeadline, this.relativeDeadline); + } + + @Override + public int hashCode() { + return Objects.hash(subscription, relativeDeadline); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("subscription", subscription) + .add("relativeDeadline", relativeDeadline) + .toString(); + } + } + + AckDeadlineRenewerImpl(PubSubOptions options) { + this.executorFactory = options.executorFactory(); + this.executor = executorFactory.get(); + this.pubsub = options.service(); + this.clock = options.clock(); + this.renewParams = options.ackDeadlineRenewParams(); + this.messageQueue = new LinkedList<>(); + this.messageDeadlines = new HashMap<>(); + this.renewerFuture = this.executor.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + renewAckDeadlines(); + } + }, 0, 1, TimeUnit.SECONDS); + } + + private void renewAckDeadlines() { + ListMultimap messagesToRenewNext = LinkedListMultimap.create(); + long threshold = clock.millis() + RENEW_THRESHOLD_MILLISECONDS; + while (nextMessageToRenew(messagesToRenewNext, threshold)) { + // keep adding messages for which the ack deadline must be renewed + } + for (Map.Entry> entry : + Multimaps.asMap(messagesToRenewNext).entrySet()) { + // We send all ack deadline renewals for a pair (subscription, relativeDeadline) + pubsub.modifyAckDeadlineAsync(entry.getKey().subscription(), + entry.getKey().relativeDeadline(), TimeUnit.MILLISECONDS, entry.getValue()); + } + } + + /** + * This method adds to {@code messagesToRenewNext} a message whose acknowledge deadline must be + * renewed (if any exists). Returns {@code true} if other messages to renew may exist, + * {@code false} otherwise. + */ + private boolean nextMessageToRenew( + ListMultimap messagesToRenewNext, long threshold) { + synchronized (lock) { + Message message = messageQueue.peek(); + // if the message does not exist or the next expected deadline is after threshold we stop + if (message == null || message.expectedDeadline() > threshold) { + return false; + } + MessageId messageId = messageQueue.poll().messageId(); + // Check if the next expected deadline changed. This can happen if the message was removed + // from the ack deadline renewer or if it was nacked and then pulled again + ExpectedDeadlineAndCount deadlineAndCount = messageDeadlines.get(messageId); + Long deadline = deadlineAndCount != null ? deadlineAndCount.expectedDeadline() : null; + if (deadline != null && deadline < threshold) { + // Message deadline must be renewed, we must submit it again to the renewer and increment + // renewal count + int nextRelativeDeadline = + nextRelativeDeadline(renewParams, deadlineAndCount.renewalCount()); + update(messageId, clock.millis() + nextRelativeDeadline); + messagesToRenewNext.put( + new SubscriptionAndDeadline(messageId.subscription(), nextRelativeDeadline), + messageId.ackId()); + } + return true; + } + } + + private static int nextRelativeDeadline(AckDeadlineRenewParams renewParams, long renewals) { + return (int) TimeUnit.MILLISECONDS.convert((int) min(renewParams.maxDeadlineSeconds(), + pow(renewParams.deadlineBackoffFactor(), renewals) + * renewParams.initialDeadlineSeconds()), TimeUnit.SECONDS); + } + + private void update(MessageId messageId, long expectedDeadline) { + synchronized (lock) { + Message message = new Message(messageId, expectedDeadline); + messageQueue.add(message); + ExpectedDeadlineAndCount count = messageDeadlines.get(messageId); + messageDeadlines.put(message.messageId(), + new ExpectedDeadlineAndCount(expectedDeadline, count.renewalCount + 1)); + } + } + + private void add(MessageId messageId, long expectedDeadline) { + synchronized (lock) { + Message message = new Message(messageId, expectedDeadline); + messageQueue.add(message); + messageDeadlines.put(message.messageId(), new ExpectedDeadlineAndCount(expectedDeadline, 0L)); + } + } + + @Override + public void add(String subscription, String ackId) { + long expectedDeadline = clock.millis() + MIN_DEADLINE_MILLISECONDS; + add(new MessageId(subscription, ackId), expectedDeadline); + } + + @Override + public void add(String subscription, Iterable ackIds) { + long expectedDeadline = clock.millis() + MIN_DEADLINE_MILLISECONDS; + for (String ackId : ackIds) { + add(new MessageId(subscription, ackId), expectedDeadline); + } + } + + @Override + public void remove(String subscription, String ackId) { + synchronized (lock) { + messageDeadlines.remove(new MessageId(subscription, ackId)); + } + } + + @Override + public void remove(String subscription, Iterable ackIds) { + for (String ackId : ackIds) { + messageDeadlines.remove(new MessageId(subscription, ackId)); + } + } + + @Override + public void close() throws Exception { + renewerFuture.cancel(false); + executorFactory.release(executor); + } +} diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java index 420b0d50148b..ba1ba36dc21a 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java @@ -16,6 +16,8 @@ package com.google.cloud.pubsub; +import static com.google.common.base.MoreObjects.firstNonNull; + import com.google.cloud.GrpcServiceOptions; import com.google.cloud.pubsub.spi.DefaultPubSubRpc; import com.google.cloud.pubsub.spi.PubSubRpc; @@ -32,6 +34,8 @@ public class PubSubOptions extends GrpcServiceOptions SCOPES = ImmutableSet.of(PUBSUB_SCOPE); private static final String DEFAULT_HOST = "https://pubsub.googleapis.com"; + private final AckDeadlineRenewParams ackDeadlineRenewParams; + public static class DefaultPubSubFactory implements PubSubFactory { private static final PubSubFactory INSTANCE = new DefaultPubSubFactory(); @@ -69,12 +73,24 @@ protected String defaultHost() { public static class Builder extends GrpcServiceOptions.Builder { + private AckDeadlineRenewParams ackDeadlineRenewParams; + private Builder() {} private Builder(PubSubOptions options) { super(options); } + /** + * Sets the configuration parameters for automatic deadline renewal. If not set, + * {@link AckDeadlineRenewParams#noBackoff()} is used: automatic deadline renewal always sets + * ack deadline to 10 seconds. + */ + public Builder ackDeadlineRenewParams(AckDeadlineRenewParams ackDeadlineRenewParams) { + this.ackDeadlineRenewParams = ackDeadlineRenewParams; + return self(); + } + @Override public PubSubOptions build() { return new PubSubOptions(this); @@ -83,6 +99,20 @@ public PubSubOptions build() { protected PubSubOptions(Builder builder) { super(PubSubFactory.class, PubSubRpcFactory.class, builder); + ackDeadlineRenewParams = + firstNonNull(builder.ackDeadlineRenewParams, AckDeadlineRenewParams.noBackoff()); + } + + @Override + protected ExecutorFactory executorFactory() { + return super.executorFactory(); + } + + /** + * Returns the configuration parameters for automatic deadline renewal. + */ + public AckDeadlineRenewParams ackDeadlineRenewParams() { + return ackDeadlineRenewParams; } @Override diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/AckDeadlineRenewParamsTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/AckDeadlineRenewParamsTest.java new file mode 100644 index 000000000000..1ae02d5fd4a3 --- /dev/null +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/AckDeadlineRenewParamsTest.java @@ -0,0 +1,104 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import static com.google.cloud.pubsub.AckDeadlineRenewParams.DEFAULT_DEADLINE_BACKOFF_FACTOR; +import static com.google.cloud.pubsub.AckDeadlineRenewParams.DEFAULT_INITIAL_DEADLINE_SECONDS; +import static com.google.cloud.pubsub.AckDeadlineRenewParams.DEFAULT_MAX_DEADLINED_SECONDS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import com.google.cloud.pubsub.AckDeadlineRenewParams.Builder; + +import org.junit.Test; + +import java.util.Arrays; + +public class AckDeadlineRenewParamsTest { + + private static final AckDeadlineRenewParams RENEW_PARAMS = AckDeadlineRenewParams.builder() + .initialDeadlineSeconds(101) + .maxDeadlineSeconds(102) + .deadlineBackoffFactor(103) + .build(); + + @Test + public void testDefaults() { + AckDeadlineRenewParams params1 = AckDeadlineRenewParams.defaultInstance(); + AckDeadlineRenewParams params2 = AckDeadlineRenewParams.builder().build(); + compareAckDeadlineRenewParams(params1, params2); + for (AckDeadlineRenewParams params : Arrays.asList(params1, params2)) { + assertEquals(DEFAULT_INITIAL_DEADLINE_SECONDS, params.initialDeadlineSeconds()); + assertEquals(DEFAULT_MAX_DEADLINED_SECONDS, params.maxDeadlineSeconds()); + assertEquals(DEFAULT_DEADLINE_BACKOFF_FACTOR, params.deadlineBackoffFactor(), 0); + } + } + + @Test + public void testNoBackoff() { + AckDeadlineRenewParams params = AckDeadlineRenewParams.noBackoff(); + assertEquals(DEFAULT_INITIAL_DEADLINE_SECONDS, params.initialDeadlineSeconds()); + assertEquals(DEFAULT_INITIAL_DEADLINE_SECONDS, params.maxDeadlineSeconds()); + assertEquals(1, params.deadlineBackoffFactor(), 0); + } + + @Test + public void testBuilder() { + assertEquals(101, RENEW_PARAMS.initialDeadlineSeconds()); + assertEquals(102, RENEW_PARAMS.maxDeadlineSeconds()); + assertEquals(103, RENEW_PARAMS.deadlineBackoffFactor(), 0); + } + + @Test + public void testToBuilder() { + compareAckDeadlineRenewParams(RENEW_PARAMS, RENEW_PARAMS.toBuilder().build()); + compareAckDeadlineRenewParams(AckDeadlineRenewParams.defaultInstance(), + AckDeadlineRenewParams.defaultInstance().toBuilder().build()); + compareAckDeadlineRenewParams(AckDeadlineRenewParams.noBackoff(), + AckDeadlineRenewParams.noBackoff().toBuilder().build()); + } + + @Test + public void testBadSettings() { + Builder builder = AckDeadlineRenewParams.builder(); + builder.initialDeadlineSeconds(9); + builder = assertFailure(builder); + builder.maxDeadlineSeconds(DEFAULT_INITIAL_DEADLINE_SECONDS - 1); + builder = assertFailure(builder); + builder.deadlineBackoffFactor(0.9); + assertFailure(builder); + } + + private static Builder assertFailure(Builder builder) { + try { + builder.build(); + fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException ex) { + // expected + } + return AckDeadlineRenewParams.builder(); + } + + private static void compareAckDeadlineRenewParams(AckDeadlineRenewParams expected, + AckDeadlineRenewParams value) { + assertEquals(expected, value); + assertEquals(expected.initialDeadlineSeconds(), value.initialDeadlineSeconds()); + assertEquals(expected.maxDeadlineSeconds(), value.maxDeadlineSeconds()); + assertEquals(expected.deadlineBackoffFactor(), value.deadlineBackoffFactor(), 0); + assertEquals(expected.hashCode(), value.hashCode()); + } +} diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/AckDeadlineRenewerImplTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/AckDeadlineRenewerImplTest.java new file mode 100644 index 000000000000..024b59ec1ddb --- /dev/null +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/AckDeadlineRenewerImplTest.java @@ -0,0 +1,409 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import com.google.cloud.GrpcServiceOptions.ExecutorFactory; +import com.google.cloud.ServiceOptions.Clock; +import com.google.common.collect.ImmutableList; + +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public class AckDeadlineRenewerImplTest { + + private static final int MIN_DEADLINE_MILLISECONDS = 10_000; + + private static final String SUBSCRIPTION1 = "subscription1"; + private static final String SUBSCRIPTION2 = "subscription2"; + private static final String ACK_ID1 = "ack-id1"; + private static final String ACK_ID2 = "ack-id2"; + private static final String ACK_ID3 = "ack-id3"; + private static final AckDeadlineRenewParams ACK_DEADLINE_RENEW_PARAMS = + AckDeadlineRenewParams.defaultInstance(); + + private final Capture renewRunnable = Capture.newInstance(); + private final Capture backoffRenewRunnable = Capture.newInstance(); + private Clock clock; + private PubSub pubsub; + private ScheduledExecutorService executor; + private PubSubFactory serviceFactoryMock; + private ExecutorFactory executorFactory; + private AckDeadlineRenewer ackDeadlineRenewer; + private AckDeadlineRenewer backoffAckDeadlineRenewer; + private ScheduledFuture scheduledFuture; + + @Before + public void setUp() { + clock = EasyMock.createStrictMock(Clock.class); + pubsub = EasyMock.createStrictMock(PubSub.class); + executor = EasyMock.createStrictMock(ScheduledExecutorService.class); + serviceFactoryMock = EasyMock.createStrictMock(PubSubFactory.class); + executorFactory = EasyMock.createStrictMock(ExecutorFactory.class); + EasyMock.expect(executorFactory.get()).andReturn(executor).times(2); + scheduledFuture = EasyMock.createStrictMock(ScheduledFuture.class); + EasyMock.expect(executor.scheduleWithFixedDelay(EasyMock.capture(renewRunnable), + EasyMock.eq(0L), EasyMock.eq(1L), EasyMock.same(TimeUnit.SECONDS))) + .andReturn(scheduledFuture); + EasyMock.expect(executor.scheduleWithFixedDelay(EasyMock.capture(backoffRenewRunnable), + EasyMock.eq(0L), EasyMock.eq(1L), EasyMock.same(TimeUnit.SECONDS))) + .andReturn(scheduledFuture); + EasyMock.expect(serviceFactoryMock.create(EasyMock.anyObject())) + .andReturn(pubsub).times(2); + PubSubOptions options = PubSubOptions.builder() + .projectId("projectId") + .clock(clock) + .executorFactory(executorFactory) + .serviceFactory(serviceFactoryMock) + .build(); + PubSubOptions backoffOptions = PubSubOptions.builder() + .projectId("projectId") + .clock(clock) + .executorFactory(executorFactory) + .serviceFactory(serviceFactoryMock) + .ackDeadlineRenewParams(ACK_DEADLINE_RENEW_PARAMS) + .build(); + EasyMock.replay(executor, serviceFactoryMock, executorFactory, scheduledFuture); + ackDeadlineRenewer = new AckDeadlineRenewerImpl(options); + backoffAckDeadlineRenewer = new AckDeadlineRenewerImpl(backoffOptions); + } + + @After + public void tearDown() { + EasyMock.verify(clock, pubsub, executor, serviceFactoryMock, executorFactory, scheduledFuture); + } + +// @Test +// public void testAddOneMessage() { +// EasyMock.expect(clock.millis()).andReturn(0L); +// EasyMock.expect(clock.millis()).andReturn(9_000L); +// EasyMock.expect(clock.millis()).andReturn(10_000L); +// EasyMock.expect(clock.millis()).andReturn(15_000L); +// EasyMock.expect(clock.millis()).andReturn(19_000L); +// EasyMock.expect(clock.millis()).andReturn(20_000L); +// EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLISECONDS, +// TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))).andReturn(null); +// EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLISECONDS, +// TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))).andReturn(null); +// EasyMock.replay(clock, pubsub); +// // Added for clock.millis() == 0 +// ackDeadlineRenewer.add(SUBSCRIPTION1, ACK_ID1); +// // The following call is for clock.millis() == 9000, we renew the message +// renewRunnable.getValue().run(); +// // The following call is for clock.millis() == 15_000, we don't renew the message +// renewRunnable.getValue().run(); +// // The following call is for clock.millis() == 19_000, we renew the message +// renewRunnable.getValue().run(); +// } + + @Test + public void testAddOneMessageBackoff() { + EasyMock.expect(clock.millis()).andReturn(0L); + EasyMock.expect(clock.millis()).andReturn(9_000L); + EasyMock.expect(clock.millis()).andReturn(10_000L); + EasyMock.expect(clock.millis()).andReturn(15_000L); + EasyMock.expect(clock.millis()).andReturn(19_000L); + EasyMock.expect(clock.millis()).andReturn(20_000L); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLISECONDS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))).andReturn(null); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, 20_000, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))).andReturn(null); + EasyMock.replay(clock, pubsub); + // Added for clock.millis() == 0 + backoffAckDeadlineRenewer.add(SUBSCRIPTION1, ACK_ID1); + // The following call is for clock.millis() == 9000, we renew the message + backoffRenewRunnable.getValue().run(); + // The following call is for clock.millis() == 15_000, we don't renew the message + backoffRenewRunnable.getValue().run(); + // The following call is for clock.millis() == 19_000, we renew the message + backoffRenewRunnable.getValue().run(); + } + + @Test + public void testAddMessages() { + EasyMock.expect(clock.millis()).andReturn(0L).times(3); + EasyMock.expect(clock.millis()).andReturn(9_000L); + EasyMock.expect(clock.millis()).andReturn(10_000L).times(3); + EasyMock.expect(clock.millis()).andReturn(10_500L); + EasyMock.expect(clock.millis()).andReturn(15_000L); + EasyMock.expect(clock.millis()).andReturn(19_000L); + EasyMock.expect(clock.millis()).andReturn(20_000L).times(4); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLISECONDS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID2))).andReturn(null); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLISECONDS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))).andReturn(null); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLISECONDS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID2))).andReturn(null); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLISECONDS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID3))).andReturn(null); + EasyMock.replay(clock, pubsub); + // Added for clock.millis() == 0 + ackDeadlineRenewer.add(SUBSCRIPTION1, ACK_ID1); + ackDeadlineRenewer.add(SUBSCRIPTION1, ACK_ID2); + ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1); + renewRunnable.getValue().run(); + // Added for clock.millis() == 10_500 + ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID3); + // The following call is for clock.millis() == 15_000, no messages are renewed + renewRunnable.getValue().run(); + // The following call is for clock.millis() == 19_000, we renew old messages and the new one + renewRunnable.getValue().run(); + } + + @Test + public void testAddMessagesBackoff() { + EasyMock.expect(clock.millis()).andReturn(0L).times(3); + EasyMock.expect(clock.millis()).andReturn(9_000L); + EasyMock.expect(clock.millis()).andReturn(10_000L).times(3); + EasyMock.expect(clock.millis()).andReturn(10_500L); + EasyMock.expect(clock.millis()).andReturn(15_000L); + EasyMock.expect(clock.millis()).andReturn(19_000L); + EasyMock.expect(clock.millis()).andReturn(20_000L).times(4); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLISECONDS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID2))).andReturn(null); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLISECONDS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))).andReturn(null); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, 20000, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID2))).andReturn(null); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, 20000, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))).andReturn(null); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLISECONDS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID3))).andReturn(null); + EasyMock.replay(clock, pubsub); + // Added for clock.millis() == 0 + backoffAckDeadlineRenewer.add(SUBSCRIPTION1, ACK_ID1); + backoffAckDeadlineRenewer.add(SUBSCRIPTION1, ACK_ID2); + backoffAckDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1); + backoffRenewRunnable.getValue().run(); + // Added for clock.millis() == 10_500 + backoffAckDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID3); + // The following call is for clock.millis() == 15_000, no messages are renewed + backoffRenewRunnable.getValue().run(); + // The following call is for clock.millis() == 19_000, we renew old messages and the new one + backoffRenewRunnable.getValue().run(); + } + + @Test + public void testAddExistingMessage() { + EasyMock.expect(clock.millis()).andReturn(0L).times(3); + EasyMock.expect(clock.millis()).andReturn(9_000L); + EasyMock.expect(clock.millis()).andReturn(10_000L).times(3); + EasyMock.expect(clock.millis()).andReturn(14_000L); + EasyMock.expect(clock.millis()).andReturn(15_000L); + EasyMock.expect(clock.millis()).andReturn(19_000L); + EasyMock.expect(clock.millis()).andReturn(20_000L).times(2); + EasyMock.expect(clock.millis()).andReturn(24_000L); + EasyMock.expect(clock.millis()).andReturn(34_000L); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLISECONDS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID2))).andReturn(null); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLISECONDS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))).andReturn(null); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLISECONDS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID2))).andReturn(null); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLISECONDS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))).andReturn(null); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLISECONDS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))).andReturn(null); + EasyMock.replay(clock, pubsub); + // Added for clock.millis() == 0 + ackDeadlineRenewer.add(SUBSCRIPTION1, ACK_ID1); + ackDeadlineRenewer.add(SUBSCRIPTION1, ACK_ID2); + ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1); + renewRunnable.getValue().run(); + // Added again for clock.millis() == 14_000 + ackDeadlineRenewer.add(SUBSCRIPTION1, ACK_ID1); + // The following call is for clock.millis() == 15_000, no messages are renewed + renewRunnable.getValue().run(); + // The following call is for clock.millis() == 19_000, we renew old messages but the updated one + renewRunnable.getValue().run(); + // The following call is for clock.millis() == 24_000, wre renew the updated message + renewRunnable.getValue().run(); + } + + @Test + public void testAddExistingMessageBackoff() { + EasyMock.expect(clock.millis()).andReturn(0L).times(3); + EasyMock.expect(clock.millis()).andReturn(9_000L); + EasyMock.expect(clock.millis()).andReturn(10_000L).times(3); + EasyMock.expect(clock.millis()).andReturn(14_000L); + EasyMock.expect(clock.millis()).andReturn(15_000L); + EasyMock.expect(clock.millis()).andReturn(19_000L); + EasyMock.expect(clock.millis()).andReturn(20_000L).times(2); + EasyMock.expect(clock.millis()).andReturn(24_000L); + EasyMock.expect(clock.millis()).andReturn(34_000L); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLISECONDS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID2))).andReturn(null); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLISECONDS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))).andReturn(null); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, 20000, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID2))).andReturn(null); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, 20000, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))).andReturn(null); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLISECONDS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))).andReturn(null); + EasyMock.replay(clock, pubsub); + // Added for clock.millis() == 0 + backoffAckDeadlineRenewer.add(SUBSCRIPTION1, ACK_ID1); + backoffAckDeadlineRenewer.add(SUBSCRIPTION1, ACK_ID2); + backoffAckDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1); + backoffRenewRunnable.getValue().run(); + // Added again for clock.millis() == 14_000 + backoffAckDeadlineRenewer.add(SUBSCRIPTION1, ACK_ID1); + // The following call is for clock.millis() == 15_000, no messages are renewed + backoffRenewRunnable.getValue().run(); + // The following call is for clock.millis() == 19_000, we renew old messages but the updated one + backoffRenewRunnable.getValue().run(); + // The following call is for clock.millis() == 24_000, wre renew the updated message + backoffRenewRunnable.getValue().run(); + } + + @Test + public void testRemoveNonExistingMessage() { + EasyMock.expect(clock.millis()).andReturn(0L).times(3); + EasyMock.expect(clock.millis()).andReturn(9_000L).times(1); + EasyMock.expect(clock.millis()).andReturn(10_000L).times(3); + EasyMock.expect(clock.millis()).andReturn(15_000L); + EasyMock.expect(clock.millis()).andReturn(19_000L); + EasyMock.expect(clock.millis()).andReturn(20_000L).times(3); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLISECONDS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID2))).andReturn(null); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLISECONDS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))).andReturn(null); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLISECONDS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID2))).andReturn(null); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLISECONDS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))).andReturn(null); + EasyMock.replay(clock, pubsub); + // Added for clock.millis() == 0 + ackDeadlineRenewer.add(SUBSCRIPTION1, ACK_ID1); + ackDeadlineRenewer.add(SUBSCRIPTION1, ACK_ID2); + ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1); + renewRunnable.getValue().run(); + // Remove a message + ackDeadlineRenewer.remove(SUBSCRIPTION1, ACK_ID3); + // The following call is for clock.millis() == 15_000, no messages are renewed + renewRunnable.getValue().run(); + // The following call is for clock.millis() == 19_000, we renew old messages + renewRunnable.getValue().run(); + } + + @Test + public void testRemoveNonExistingMessageBackoff() { + EasyMock.expect(clock.millis()).andReturn(0L).times(3); + EasyMock.expect(clock.millis()).andReturn(9_000L).times(1); + EasyMock.expect(clock.millis()).andReturn(10_000L).times(3); + EasyMock.expect(clock.millis()).andReturn(15_000L); + EasyMock.expect(clock.millis()).andReturn(19_000L); + EasyMock.expect(clock.millis()).andReturn(20_000L).times(3); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLISECONDS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID2))).andReturn(null); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLISECONDS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))).andReturn(null); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, 20000, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID2))).andReturn(null); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, 20000, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))).andReturn(null); + EasyMock.replay(clock, pubsub); + // Added for clock.millis() == 0 + backoffAckDeadlineRenewer.add(SUBSCRIPTION1, ACK_ID1); + backoffAckDeadlineRenewer.add(SUBSCRIPTION1, ACK_ID2); + backoffAckDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1); + backoffRenewRunnable.getValue().run(); + // Remove a message + backoffAckDeadlineRenewer.remove(SUBSCRIPTION1, ACK_ID3); + // The following call is for clock.millis() == 15_000, no messages are renewed + backoffRenewRunnable.getValue().run(); + // The following call is for clock.millis() == 19_000, we renew old messages + backoffRenewRunnable.getValue().run(); + } + + @Test + public void testRemoveMessage() { + EasyMock.expect(clock.millis()).andReturn(0L).times(3); + EasyMock.expect(clock.millis()).andReturn(9_000L).times(1); + EasyMock.expect(clock.millis()).andReturn(10_000L).times(3); + EasyMock.expect(clock.millis()).andReturn(15_000L); + EasyMock.expect(clock.millis()).andReturn(19_000L); + EasyMock.expect(clock.millis()).andReturn(20_000L).times(2); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLISECONDS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID2))).andReturn(null); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLISECONDS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))).andReturn(null); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLISECONDS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))).andReturn(null); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLISECONDS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))).andReturn(null); + EasyMock.replay(clock, pubsub); + // Added for clock.millis() == 0 + ackDeadlineRenewer.add(SUBSCRIPTION1, ACK_ID1); + ackDeadlineRenewer.add(SUBSCRIPTION1, ACK_ID2); + ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1); + renewRunnable.getValue().run(); + // Remove a message + ackDeadlineRenewer.remove(SUBSCRIPTION1, ACK_ID2); + // The following call is for clock.millis() == 15_000, no messages are renewed + renewRunnable.getValue().run(); + // The following call is for clock.millis() == 19_000, we renew old messages + renewRunnable.getValue().run(); + } + + @Test + public void testRemoveMessageBackoff() { + EasyMock.expect(clock.millis()).andReturn(0L).times(3); + EasyMock.expect(clock.millis()).andReturn(9_000L).times(1); + EasyMock.expect(clock.millis()).andReturn(10_000L).times(3); + EasyMock.expect(clock.millis()).andReturn(15_000L); + EasyMock.expect(clock.millis()).andReturn(19_000L); + EasyMock.expect(clock.millis()).andReturn(20_000L).times(2); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, MIN_DEADLINE_MILLISECONDS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID2))).andReturn(null); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, MIN_DEADLINE_MILLISECONDS, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))).andReturn(null); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION1, 20000, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))).andReturn(null); + EasyMock.expect(pubsub.modifyAckDeadlineAsync(SUBSCRIPTION2, 20000, + TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1))).andReturn(null); + EasyMock.replay(clock, pubsub); + // Added for clock.millis() == 0 + backoffAckDeadlineRenewer.add(SUBSCRIPTION1, ACK_ID1); + backoffAckDeadlineRenewer.add(SUBSCRIPTION1, ACK_ID2); + backoffAckDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1); + backoffRenewRunnable.getValue().run(); + // Remove a message + backoffAckDeadlineRenewer.remove(SUBSCRIPTION1, ACK_ID2); + // The following call is for clock.millis() == 15_000, no messages are renewed + backoffRenewRunnable.getValue().run(); + // The following call is for clock.millis() == 19_000, we renew old messages + backoffRenewRunnable.getValue().run(); + } + + @Test + public void testClose() throws Exception { + EasyMock.reset(scheduledFuture, executorFactory); + EasyMock.expect(scheduledFuture.cancel(false)).andReturn(true); + executorFactory.release(executor); + EasyMock.expectLastCall(); + EasyMock.replay(clock, pubsub, scheduledFuture, executorFactory); + ackDeadlineRenewer.close(); + } +}