-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Java Client] Use failPendingMessages to ensure proper cleanup #12259
[Java Client] Use failPendingMessages to ensure proper cleanup #12259
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good to me,
I left some minor comments
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Outdated
Show resolved
Hide resolved
}); | ||
pendingMessages.clear(); | ||
setState(State.Closed); | ||
synchronized (this) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the previous code "setState" was inside the synchronized
block
so probably is is better to keep it inside that block.
At that point we can simply move the synchronized
keyword in the signature of the method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's okay to move setState
outside the synchronized
block. I think it's a problem of previous code. See connectionOpened
and connectionFailed
, setState
is all called outside the synchronized
block since it's an atomic operation. It's only called inside the synchronized
block in closeAsync
, which is unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not clear to me why the setState
method was ever in a synchronized
block. In looking at the git history, it has been that way since the initial import. Since setSet
has its own concurrency control and since we call setState
outside of synchronized
blocks in many other parts of the ProducerImpl
, I don't see why this update should be in the synchronized
block. Let me know if you think I am missing something, @eolivelli.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't remember the original reasoning, though if the variable state is changed within the sync block, it would appear atomic also with all the other changes in the sync block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would appear atomic also with all the other changes in the sync block.
Yes, that is true. I'll move it back into the sync block. I wonder if these should be updated too?
pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Lines 1504 to 1517 in da5bac9
if (cause instanceof PulsarClientException.TopicTerminatedException) { | |
setState(State.Terminated); | |
synchronized (this) { | |
failPendingMessages(cnx(), (PulsarClientException) cause); | |
} | |
producerCreatedFuture.completeExceptionally(cause); | |
client.cleanupProducer(this); | |
} else if (cause instanceof PulsarClientException.ProducerFencedException) { | |
setState(State.ProducerFenced); | |
synchronized (this) { | |
failPendingMessages(cnx(), (PulsarClientException) cause); | |
} | |
producerCreatedFuture.completeExceptionally(cause); | |
client.cleanupProducer(this); |
I don't think it will matter too much because once the state is set to a failure/closed state, the producer won't accept new messages anyway.
@merlimat @Shoothzj - PTAL |
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
Outdated
Show resolved
Hide resolved
client.cleanupProducer(this); | ||
clearPendingMessagesWhenClose(); | ||
} | ||
client.cleanupProducer(this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to also move client.cleanupProducer(this);
to closeAndClearPendingMessages()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can add that to closeAndClearPendingMessages
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put it in the sync block based on the same reasoning from the discussion on setState
.
}); | ||
pendingMessages.clear(); | ||
setState(State.Closed); | ||
synchronized (this) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't remember the original reasoning, though if the variable state is changed within the sync block, it would appear atomic also with all the other changes in the sync block.
…ucerCloseTest.java Co-authored-by: Matteo Merli <[email protected]>
Good Catch. LGTM |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm
/pulsarbot run-failure-checks |
2 similar comments
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
* [Java Client] Use failPendingMessages to ensure proper cleanup * Update method name from code review comments * Update pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java Co-authored-by: Matteo Merli <[email protected]> * Move setState into sync block; consolidate client.cleanupProducer call * Move cleanupProducer into sync block * Make method closeAndClearPendingMessages synchronized Co-authored-by: Matteo Merli <[email protected]> (cherry picked from commit 2ad0e5a)
…e#12259) * [Java Client] Use failPendingMessages to ensure proper cleanup * Update method name from code review comments * Update pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java Co-authored-by: Matteo Merli <[email protected]> * Move setState into sync block; consolidate client.cleanupProducer call * Move cleanupProducer into sync block * Make method closeAndClearPendingMessages synchronized Co-authored-by: Matteo Merli <[email protected]> (cherry picked from commit 2ad0e5a) (cherry picked from commit 2e50783)
…e#12259) * [Java Client] Use failPendingMessages to ensure proper cleanup * Update method name from code review comments * Update pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java Co-authored-by: Matteo Merli <[email protected]> * Move setState into sync block; consolidate client.cleanupProducer call * Move cleanupProducer into sync block * Make method closeAndClearPendingMessages synchronized Co-authored-by: Matteo Merli <[email protected]>
Motivation
When calling
ProducerImpl#closeAsync
, the current cleanup relies onclearPendingMessagesWhenClose
. This method does not fail any remaining messages in thebatchMessageContainer
. After inspecting thefailPendingMessages
method, it looks like it would be better to call this within theclearPendingMessagesWhenClose
method. It also has more robust handling of failing the messages contained in thependingMessages
queue.Modifications
clearPendingMessagesWhenClose
to rely onfailPendingMessages
.Move already thread safe method calls, likesetState
outside of thesynchronized
blockVerifying this change
I added a test that will pass regardless of the outcome of the current mailing list thread discussing whether "close implies flush". There is also already a test that ensures that any outstanding pending messages are failed. Together, these tests ensure correct behavior.
Does this pull request potentially affect one of the following parts:
This is not a breaking change.
Documentation
There is no need to update the docs for this change.