From 6e2c2dd890fe445cc896f0ddcb30309213759ee2 Mon Sep 17 00:00:00 2001 From: charlesliqlogic <45208557+charlesliqlogic@users.noreply.github.com> Date: Wed, 2 Jan 2019 12:51:26 -0500 Subject: [PATCH] Pub/Sub: Add message abandonment (#4250) * add abandonment * add unit test for abandon * update forget() to stop extending deadline --- .../com/google/cloud/pubsub/v1/AckReplyConsumer.java | 2 ++ .../com/google/cloud/pubsub/v1/MessageDispatcher.java | 10 ++++++++++ .../google/cloud/pubsub/v1/MessageDispatcherTest.java | 8 ++++++++ 3 files changed, 20 insertions(+) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumer.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumer.java index 9fd9bc837c5c..b3989fc974cb 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumer.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumer.java @@ -29,4 +29,6 @@ public interface AckReplyConsumer { * message. */ void nack(); + + void abandon(); } diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 5f5ebbaee204..6586dc68e535 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -133,6 +133,7 @@ private class AckHandler implements ApiFutureCallback { private final int outstandingBytes; private final long receivedTimeMillis; private final Instant totalExpiration; + private boolean extending = true; AckHandler(String ackId, int outstandingBytes, Instant totalExpiration) { this.ackId = ackId; @@ -151,6 +152,7 @@ private void forget() { */ return; } + extending = false; flowController.release(1, outstandingBytes); messagesWaiter.incrementPendingMessages(-1); processOutstandingBatches(); @@ -417,6 +419,11 @@ public void ack() { public void nack() { response.set(AckReply.NACK); } + + @Override + public void abandon() { + ackHandler.forget(); + } }; ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor()); executor.execute( @@ -471,6 +478,9 @@ void extendDeadlines() { Instant extendTo = now.plusSeconds(extendSeconds); for (Map.Entry entry : pendingMessages.entrySet()) { + if (!entry.getValue().extending) { + continue; + } String ackId = entry.getKey(); Instant totalExpiration = entry.getValue().totalExpiration; if (totalExpiration.isAfter(extendTo)) { diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index 785368bb13cb..2bd2a518bfef 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -148,6 +148,14 @@ public void testNack() throws Exception { assertThat(sentModAcks).contains(ModAckItem.of(TEST_MESSAGE.getAckId(), 0)); } + @Test + public void testAbandon() throws Exception { + dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE); + consumers.take().abandon(); + dispatcher.extendDeadlines(); + assertThat(sentModAcks).doesNotContain(TEST_MESSAGE.getAckId()); + } + @Test public void testExtension() throws Exception { dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);