Skip to content

Commit

Permalink
[Java Client] Use failPendingMessages to ensure proper cleanup (apach…
Browse files Browse the repository at this point in the history
…e#12259)

* [Java Client] Use failPendingMessages to ensure proper cleanup

* Update method name from code review comments

* Update pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java

Co-authored-by: Matteo Merli <[email protected]>

* Move setState into sync block; consolidate client.cleanupProducer call

* Move cleanupProducer into sync block

* Make method closeAndClearPendingMessages synchronized

Co-authored-by: Matteo Merli <[email protected]>
(cherry picked from commit 2ad0e5a)
(cherry picked from commit 2e50783)
  • Loading branch information
michaeljmarshall authored and nicoloboschi committed Oct 27, 2021
1 parent 18a4ae3 commit 8c47458
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

@Test(groups = "broker-impl")
Expand Down Expand Up @@ -73,6 +74,31 @@ public void testProducerCloseCallback() throws Exception {
Assert.assertEquals(completableFuture.isDone(), true);
}

@Test(timeOut = 10_000)
public void testProducerCloseFailsPendingBatchWhenPreviousStateNotReadyCallback() throws Exception {
initClient();
@Cleanup
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
.topic("testProducerClose")
.maxPendingMessages(10)
.batchingMaxPublishDelay(10, TimeUnit.SECONDS)
.batchingMaxBytes(Integer.MAX_VALUE)
.enableBatching(true)
.create();
CompletableFuture<MessageId> completableFuture = producer.newMessage()
.value("test-msg".getBytes(StandardCharsets.UTF_8))
.sendAsync();
// By setting the state to Failed, the close method will exit early because the previous state was not Ready.
producer.setState(HandlerState.State.Failed);
producer.closeAsync();
Assert.assertTrue(completableFuture.isCompletedExceptionally());
try {
completableFuture.get();
} catch (ExecutionException e) {
Assert.assertTrue(e.getCause() instanceof PulsarClientException.AlreadyClosedException);
}
}

private void initClient() throws PulsarClientException {
pulsarClient = PulsarClient.builder().
serviceUrl(lookupUrl.toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -866,12 +866,7 @@ public CompletableFuture<Void> closeAsync() {
ClientCnx cnx = cnx();
if (cnx == null || currentState != State.Ready) {
log.info("[{}] [{}] Closed Producer (not connected)", topic, producerName);
synchronized (this) {
setState(State.Closed);
client.cleanupProducer(this);
clearPendingMessagesWhenClose();
}

closeAndClearPendingMessages();
return CompletableFuture.completedFuture(null);
}

Expand All @@ -884,14 +879,9 @@ public CompletableFuture<Void> closeAsync() {
if (exception == null || !cnx.ctx().channel().isActive()) {
// Either we've received the success response for the close producer command from the broker, or the
// connection did break in the meantime. In any case, the producer is gone.
synchronized (ProducerImpl.this) {
log.info("[{}] [{}] Closed Producer", topic, producerName);
setState(State.Closed);
clearPendingMessagesWhenClose();
}

log.info("[{}] [{}] Closed Producer", topic, producerName);
closeAndClearPendingMessages();
closeFuture.complete(null);
client.cleanupProducer(this);
} else {
closeFuture.completeExceptionally(exception);
}
Expand All @@ -902,17 +892,14 @@ public CompletableFuture<Void> closeAsync() {
return closeFuture;
}

private void clearPendingMessagesWhenClose() {
private synchronized void closeAndClearPendingMessages() {
setState(State.Closed);
client.cleanupProducer(this);
PulsarClientException ex = new PulsarClientException.AlreadyClosedException(
format("The producer %s of the topic %s was already closed when closing the producers",
producerName, topic));
pendingMessages.forEach(msg -> {
client.getMemoryLimitController().releaseMemory(msg.uncompressedSize);
msg.sendComplete(ex);
msg.cmd.release();
msg.recycle();
});
pendingMessages.clear();
// Use null for cnx to ensure that the pending messages are failed immediately
failPendingMessages(null, ex);
}

@Override
Expand Down

0 comments on commit 8c47458

Please sign in to comment.