Skip to content

Commit

Permalink
Pub/Sub: Add message abandonment (#4250)
Browse files Browse the repository at this point in the history
* add abandonment

* add unit test for abandon

* update forget() to stop extending deadline
  • Loading branch information
charlesliqlogic authored and chingor13 committed Jan 2, 2019
1 parent fb2b690 commit 6e2c2dd
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ public interface AckReplyConsumer {
* message.
*/
void nack();

void abandon();
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ private class AckHandler implements ApiFutureCallback<AckReply> {
private final int outstandingBytes;
private final long receivedTimeMillis;
private final Instant totalExpiration;
private boolean extending = true;

AckHandler(String ackId, int outstandingBytes, Instant totalExpiration) {
this.ackId = ackId;
Expand All @@ -151,6 +152,7 @@ private void forget() {
*/
return;
}
extending = false;
flowController.release(1, outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
processOutstandingBatches();
Expand Down Expand Up @@ -417,6 +419,11 @@ public void ack() {
public void nack() {
response.set(AckReply.NACK);
}

@Override
public void abandon() {
ackHandler.forget();
}
};
ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor());
executor.execute(
Expand Down Expand Up @@ -471,6 +478,9 @@ void extendDeadlines() {
Instant extendTo = now.plusSeconds(extendSeconds);

for (Map.Entry<String, AckHandler> entry : pendingMessages.entrySet()) {
if (!entry.getValue().extending) {
continue;
}
String ackId = entry.getKey();
Instant totalExpiration = entry.getValue().totalExpiration;
if (totalExpiration.isAfter(extendTo)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ public void testNack() throws Exception {
assertThat(sentModAcks).contains(ModAckItem.of(TEST_MESSAGE.getAckId(), 0));
}

@Test
public void testAbandon() throws Exception {
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
consumers.take().abandon();
dispatcher.extendDeadlines();
assertThat(sentModAcks).doesNotContain(TEST_MESSAGE.getAckId());
}

@Test
public void testExtension() throws Exception {
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
Expand Down

0 comments on commit 6e2c2dd

Please sign in to comment.