Skip to content

Commit

Permalink
Fix: Removed EnableExactlyOnceDelivery from Builders (#1052)
Browse files Browse the repository at this point in the history
Removed EnableExactlyOnceDelivery from `Subscriber.Builder`, `StreamingSubscriberConnection.Builder`, and `MessageDispatcher.Builder`.

Added comments to `AckReplyConsumerWithResponse` to be in line with `AckReplyConsumer`
  • Loading branch information
mmicatka authored Mar 9, 2022
1 parent c7551d6 commit 9add538
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 84 deletions.
20 changes: 20 additions & 0 deletions google-cloud-pubsub/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<!-- These should be removed after the next major release 1.117.x -->
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/pubsub/v1/Subscriber$Builder</className>
<method>com.google.cloud.pubsub.v1.Subscriber$Builder setExactlyOnceDeliveryEnabled(boolean)</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/pubsub/v1/StreamingSubscriberConnection$Builder</className>
<method>com.google.cloud.pubsub.v1.StreamingSubscriberConnection$Builder setExactlyOnceDeliveryEnabled(boolean)</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/pubsub/v1/MessageDispatcher$Builder</className>
<method>com.google.cloud.pubsub.v1.MessageDispatcher$Builder setEnableExactlyOnceDelivery(boolean)</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,34 @@

import java.util.concurrent.Future;

/**
* Acknowledging a message in Pub/Sub means that you are done with it, and it will not be delivered
* to this subscription again. You should avoid acknowledging messages until you have *finished*
* processing them, so that in the event of a failure, you receive the message again.
*
* <p>If exactly-once delivery is enabled on the subscription, the future returned by the ack/nack
* methods track the state of acknowledgement operation by the server. If the future completes
* successfully, the message is guaranteed NOT to be re-delivered. Otherwise, the future will
* contain an exception with more details about the failure and the message may be re-delivered.
*
* <p>If exactly-once delivery is NOT enabled on the subscription, the future returns immediately
* with an AckResponse.SUCCESS. Because re-deliveries are possible, you should ensure that your
* processing code is idempotent, as you may receive any given message more than once.
*/
public interface AckReplyConsumerWithResponse {
/**
* Acknowledges that the message has been successfully processed. The service will not send the
* message again.
*
* <p>A future representing the server response is returned
*/
Future<AckResponse> ack();

/**
* Signals that the message has not been successfully processed. The service should resend the
* message.
*
* <p>A future representing the server response is returned
*/
Future<AckResponse> nack();
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class MessageDispatcher {

private final FlowController flowController;

private AtomicBoolean enableExactlyOnceDelivery;
private AtomicBoolean exactlyOnceDeliveryEnabled = new AtomicBoolean(false);

private final Waiter messagesWaiter;

Expand Down Expand Up @@ -198,7 +198,6 @@ private MessageDispatcher(Builder builder) {

ackProcessor = builder.ackProcessor;
flowController = builder.flowController;
enableExactlyOnceDelivery = new AtomicBoolean(builder.enableExactlyOnceDelivery);
ackLatencyDistribution = builder.ackLatencyDistribution;
clock = builder.clock;
jobLock = new ReentrantLock();
Expand Down Expand Up @@ -296,13 +295,13 @@ int getMessageDeadlineSeconds() {
}

@InternalApi
void setEnableExactlyOnceDelivery(boolean enableExactlyOnceDelivery) {
// Sanity check that we are changing the enableExactlyOnceDelivery state
if (enableExactlyOnceDelivery == this.enableExactlyOnceDelivery.get()) {
void setExactlyOnceDeliveryEnabled(boolean exactlyOnceDeliveryEnabled) {
// Sanity check that we are changing the exactlyOnceDeliveryEnabled state
if (exactlyOnceDeliveryEnabled == this.exactlyOnceDeliveryEnabled.get()) {
return;
}

this.enableExactlyOnceDelivery.set(enableExactlyOnceDelivery);
this.exactlyOnceDeliveryEnabled.set(exactlyOnceDeliveryEnabled);

// If a custom value for minDurationPerAckExtension, we should respect that
if (!minDurationPerAckExtensionDefaultUsed) {
Expand All @@ -313,7 +312,7 @@ void setEnableExactlyOnceDelivery(boolean enableExactlyOnceDelivery) {
// maxDurationPerAckExtensionSeconds does not change
int possibleNewMinAckDeadlineExtensionSeconds;

if (enableExactlyOnceDelivery) {
if (exactlyOnceDeliveryEnabled) {
possibleNewMinAckDeadlineExtensionSeconds =
Math.toIntExact(
Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY.getSeconds());
Expand All @@ -323,7 +322,7 @@ void setEnableExactlyOnceDelivery(boolean enableExactlyOnceDelivery) {
}

// If we are not using the default maxDurationAckExtension, check if the
// minAckDeadlineExtensionExactlyOnce needs to be bounded by the set max
// minAckDeadlineExtensionExactlyOnceDelivery needs to be bounded by the set max
if (!maxDurationPerAckExtensionDefaultUsed
&& (possibleNewMinAckDeadlineExtensionSeconds > maxDurationPerAckExtensionSeconds)) {
minDurationPerAckExtensionSeconds = maxDurationPerAckExtensionSeconds;
Expand Down Expand Up @@ -580,7 +579,6 @@ public static final class Builder {

private Distribution ackLatencyDistribution;
private FlowController flowController;
private boolean enableExactlyOnceDelivery;

private Executor executor;
private ScheduledExecutorService systemExecutor;
Expand Down Expand Up @@ -641,11 +639,6 @@ public Builder setFlowController(FlowController flowController) {
return this;
}

public Builder setEnableExactlyOnceDelivery(boolean enableExactlyOnceDelivery) {
this.enableExactlyOnceDelivery = enableExactlyOnceDelivery;
return this;
}

public Builder setExecutor(Executor executor) {
this.executor = executor;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,7 @@ private StreamingSubscriberConnection(Builder builder) {
// We need to set the default stream ack deadline on the initial request, this will be
// updated by modack requests in the message dispatcher
if (builder.maxDurationPerAckExtensionDefaultUsed) {
// If the default is used, check if exactly once is enabled and set appropriately
if (builder.exactlyOnceDeliveryEnabled) {
inititalStreamAckDeadline = Subscriber.STREAM_ACK_DEADLINE_EXACTLY_ONCE_DELIVERY_DEFAULT;
} else {
inititalStreamAckDeadline = Subscriber.STREAM_ACK_DEADLINE_DEFAULT;
}
inititalStreamAckDeadline = Subscriber.STREAM_ACK_DEADLINE_DEFAULT;
} else if (builder.maxDurationPerAckExtension.compareTo(Subscriber.MIN_STREAM_ACK_DEADLINE)
< 0) {
// We will not be able to extend more than the default minimum
Expand All @@ -123,7 +118,6 @@ private StreamingSubscriberConnection(Builder builder) {

subscriberStub = builder.subscriberStub;
channelAffinity = builder.channelAffinity;
exactlyOnceDeliveryEnabled.set(builder.exactlyOnceDeliveryEnabled);

MessageDispatcher.Builder messageDispatcherBuilder;
if (builder.receiver != null) {
Expand All @@ -143,7 +137,6 @@ private StreamingSubscriberConnection(Builder builder) {
.setMaxDurationPerAckExtensionDefaultUsed(builder.maxDurationPerAckExtensionDefaultUsed)
.setAckLatencyDistribution(builder.ackLatencyDistribution)
.setFlowController(builder.flowController)
.setEnableExactlyOnceDelivery(builder.exactlyOnceDeliveryEnabled)
.setExecutor(builder.executor)
.setSystemExecutor(builder.systemExecutor)
.setApiClock(builder.clock)
Expand All @@ -159,7 +152,7 @@ public StreamingSubscriberConnection setExactlyOnceDeliveryEnabled(
return this;
}

public boolean isExactlyOnceDeliveryEnabled() {
public boolean getExactlyOnceDeliveryEnabled() {
return exactlyOnceDeliveryEnabled.get();
}

Expand Down Expand Up @@ -221,7 +214,7 @@ public void onResponse(StreamingPullResponse response) {
response.getSubscriptionProperties().getExactlyOnceDeliveryEnabled();

setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabledResponse);
messageDispatcher.setEnableExactlyOnceDelivery(exactlyOnceDeliveryEnabledResponse);
messageDispatcher.setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabledResponse);
messageDispatcher.processReceivedMessages(response.getReceivedMessagesList());

// Only request more if we're not shutdown.
Expand Down Expand Up @@ -370,7 +363,7 @@ public void setResponseOutstandingMessages(AckResponse ackResponse) {
private void setFailureFutureOutstandingMessages(Throwable t) {
AckResponse ackResponse;

if (isExactlyOnceDeliveryEnabled()) {
if (getExactlyOnceDeliveryEnabled()) {
if (!(t instanceof ApiException)) {
ackResponse = AckResponse.OTHER;
}
Expand Down Expand Up @@ -518,7 +511,7 @@ public void onFailure(Throwable t) {
// Remove from our pending operations
ackOperationsWaiter.incrementPendingCount(-1);

if (!isExactlyOnceDeliveryEnabled()) {
if (!getExactlyOnceDeliveryEnabled()) {
Level level = isAlive() ? Level.WARNING : Level.FINER;
logger.log(level, "failed to send operations", t);
return;
Expand Down Expand Up @@ -609,7 +602,6 @@ public static final class Builder {
private int channelAffinity;
private FlowController flowController;
private FlowControlSettings flowControlSettings;
private boolean exactlyOnceDeliveryEnabled;
private boolean useLegacyFlowControl;
private ScheduledExecutorService executor;
private ScheduledExecutorService systemExecutor;
Expand Down Expand Up @@ -690,11 +682,6 @@ public Builder setUseLegacyFlowControl(boolean useLegacyFlowControl) {
return this;
}

public Builder setExactlyOnceDeliveryEnabled(boolean exactlyOnceDeliveryEnabled) {
this.exactlyOnceDeliveryEnabled = exactlyOnceDeliveryEnabled;
return this;
}

public Builder setExecutor(ScheduledExecutorService executor) {
this.executor = executor;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac
private SubscriberStub subscriberStub;
private final SubscriberStubSettings subStubSettings;
private final FlowController flowController;
private boolean exactlyOnceDeliveryEnabled = false;
private final int numPullers;

private final MessageReceiver receiver;
Expand Down Expand Up @@ -166,8 +165,6 @@ private Subscriber(Builder builder) {
.setLimitExceededBehavior(LimitExceededBehavior.Block)
.build());

exactlyOnceDeliveryEnabled = builder.exactlyOnceDeliveryEnabled;

this.numPullers = builder.parallelPullCount;

executorProvider = builder.executorProvider;
Expand Down Expand Up @@ -385,7 +382,6 @@ private void startStreamingConnections() {
.setExecutor(executor)
.setSystemExecutor(alarmsExecutor)
.setClock(clock)
.setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabled)
.build();

streamingSubscriberConnections.add(streamingSubscriberConnection);
Expand Down Expand Up @@ -479,8 +475,6 @@ public static final class Builder {
private boolean useLegacyFlowControl = false;
private FlowControlSettings flowControlSettings = DEFAULT_FLOW_CONTROL_SETTINGS;

private boolean exactlyOnceDeliveryEnabled = false;

private ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
private ExecutorProvider systemExecutorProvider = null;
private TransportChannelProvider channelProvider =
Expand Down Expand Up @@ -573,22 +567,6 @@ public Builder setUseLegacyFlowControl(boolean value) {
return this;
}

/**
* Enables/Disabled ExactlyOnceDelivery
*
* <p>Will update the minDurationPerAckExtension if a user-provided value is not set
*/
public Builder setExactlyOnceDeliveryEnabled(boolean exactlyOnceDeliveryEnabled) {
// If exactlyOnceDeliveryIsEnabled we want to update the default minAckDeadlineExtension if
// applicable
if (exactlyOnceDeliveryEnabled && this.minDurationPerAckExtensionDefaultUsed) {
this.minDurationPerAckExtension = DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY;
}

this.exactlyOnceDeliveryEnabled = exactlyOnceDeliveryEnabled;
return this;
}

/**
* Set the maximum period a message ack deadline will be extended. Defaults to one hour.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ public void receiveMessage(
receiveQueue.offer(MessageAndConsumer.create(message, consumer));
}
})
.setExactlyOnceDeliveryEnabled(true)
.build();
subscriber.addListener(
new Subscriber.Listener() {
Expand Down Expand Up @@ -282,7 +281,6 @@ public void receiveMessage(
SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
.build())
.setExactlyOnceDeliveryEnabled(false)
.build();
subscriber.addListener(
new Subscriber.Listener() {
Expand Down Expand Up @@ -360,7 +358,6 @@ public void receiveMessage(
SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
.build())
.setExactlyOnceDeliveryEnabled(false)
.build();
subscriber.addListener(
new Subscriber.Listener() {
Expand Down
Loading

0 comments on commit 9add538

Please sign in to comment.