-
Notifications
You must be signed in to change notification settings - Fork 218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed bug leading to semaphore leak when filtering messages #1891
Conversation
@@ -72,6 +76,8 @@ public Map<SubscriptionPartitionOffset, MessageState> getOffsetsSnapshotAndRelea | |||
if (entry.getValue() == MessageState.PROCESSED) { | |||
slots.remove(entry.getKey()); | |||
permitsReleased++; | |||
} else if (entry.getValue() == MessageState.FILTERED) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it would be convenient to have a comment that we only increase semaphore before a sending message, and omitting increasing for filtered, but in any case we put a slot for both of the messages. I just had some problems with understanding how there can be slots in the queue but we didn't increased semaphore for them, thus the comment would help.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have changed the approach and now filtered messages acquires semaphore, please take a look now 😄
…to new_offset_committing_bugfix
hermes.api().publishUntilSuccess(topic.getQualifiedName(), BOB.asJson()); | ||
|
||
// then | ||
expectSingleBatch(subscriber, SINGLE_MESSAGE_FILTERED); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestions:
- check the value of metric
subscription.filtered-out
- publish bob first - he is getting filtered
- maybe set batch size to 2, and batch time to some small value?
"topic", topic.getName().getName() | ||
) | ||
.withValue(1.0) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why 1.0 instead of 2.0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because previously we set subscriptionPolicy().withInflightSize(1)
and we want to be sure, that this value wouldn't be exceeded when there are more messages and there were messages filtered previously.
@@ -53,6 +53,8 @@ public class Message implements FilterableMessage { | |||
|
|||
private long currentMessageBackoff = -1; | |||
|
|||
private boolean isFiltered = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we must ensure that the this value is visible to all threads (see the comment on top of this class)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have checked it and this is always used within a single thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously the semantics of Optional.empty
from MessageReceiver#next
were that the either the message was not present in kafka or it was filtered. Please add a java doc in MessageReceiver
explaining what is the meaning of Optional.empty
and Messages#isFiltered
now
@@ -10,6 +10,21 @@ | |||
|
|||
public interface MessageReceiver { | |||
|
|||
/** | |||
* Retrieves the next available message from the queue. | |||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
No description provided.