-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][Java Client] Fix large message sometimes cannot be split into chunks after PIP-132 #16196
Merged
BewareMyPower
merged 3 commits into
apache:master
from
BewareMyPower:bewaremypower/pip-132-bug-fix
Jun 28, 2022
Merged
[fix][Java Client] Fix large message sometimes cannot be split into chunks after PIP-132 #16196
BewareMyPower
merged 3 commits into
apache:master
from
BewareMyPower:bewaremypower/pip-132-bug-fix
Jun 28, 2022
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…hunks after PIP-132 Fixes apache#16195 ### Motivation [PIP-132](apache#14007) considers the message metadata size when computing the payload chunk size and the number of chunks. However, it could make some messages whose size is less than `maxMessageSize` cannot be sent. There are two reasons: 1. The `MessageMetadata` will be updated after computing the payload chunk size, i.e. the actual metadata size would be greater. 2. `OpSendMsg#getMessageHeaderAndPayloadSize` doesn't exclude all bytes other than the metadata and payload, e.g. the 4 bytes checksum field. For example, if the max message size is 100, send a string whose size is 60 with chunking enabled. 1. The initial metadata size is 25 so the chunk size is 75, the message won't be spit into chunks. 2. After `serializeAndSendMessage`, the metadata size becomes 32, so the serialized header's total size is 4 + 8 + 6 + 4 + 32 = 54, and the total size is 54 + 60 = 114, see `headerContentSize` in `serializeCommandSendWithSize`. 3. In `getMessageHeaderAndPayloadSize`, the returned value is computed by 114 - 8 - 4 = 102 > 100. The 6 bytes magic and checksum and 4 bytes metadata length field are not included. ### Modifications - Update the message metadata before computing the chunk size. - Compute the correct size in `getMessageHeaderAndPayloadSize`. ### Verifying this change Add `testChunkSize` to verify all sizes in range [1, maxMessageSize] can be sent successfully when chunking is enabled.
RobertIndie
approved these changes
Jun 24, 2022
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! LGTM.
Jason918
reviewed
Jun 24, 2022
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Show resolved
Hide resolved
Jason918
reviewed
Jun 25, 2022
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Show resolved
Hide resolved
Jason918
approved these changes
Jun 27, 2022
1 task
liangyepianzhou
added a commit
that referenced
this pull request
Dec 13, 2022
### Motivation When the producer sends messages in multiple threads, the message with the smaller sequence Id can be pushed later than the message with the bigger sequence Id. The `internalSendWithTxnAsync` call `internalSendAsync` Asynchronously when `txn != null` https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L409 And the `sendAsync` acquire sequence ID is not included in the synchronized block with `serializeAndSendMessage`. https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L490 https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L555-L560 For example: We send 4 messages (msg1, msg2, msg3, msg4) to the broker and then the 4 messages may get 4 sequence Id (1, 3, 2, 4) which is not in order due to the logic to get the sequence ID and send the message is not in the same synchronous code block. And then the msg3 with sequence ID 2 will never be persistent successfully. ### Modification Add a method to update `sequenceId` and move the method in the sync code. Via #16196 we should update message metadata before computing the message size.
Demogorgon314
pushed a commit
to Demogorgon314/pulsar
that referenced
this pull request
Dec 26, 2022
…e#17836) ### Motivation When the producer sends messages in multiple threads, the message with the smaller sequence Id can be pushed later than the message with the bigger sequence Id. The `internalSendWithTxnAsync` call `internalSendAsync` Asynchronously when `txn != null` https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L409 And the `sendAsync` acquire sequence ID is not included in the synchronized block with `serializeAndSendMessage`. https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L490 https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L555-L560 For example: We send 4 messages (msg1, msg2, msg3, msg4) to the broker and then the 4 messages may get 4 sequence Id (1, 3, 2, 4) which is not in order due to the logic to get the sequence ID and send the message is not in the same synchronous code block. And then the msg3 with sequence ID 2 will never be persistent successfully. ### Modification Add a method to update `sequenceId` and move the method in the sync code. Via apache#16196 we should update message metadata before computing the message size.
Demogorgon314
pushed a commit
to Demogorgon314/pulsar
that referenced
this pull request
Dec 29, 2022
…e#17836) ### Motivation When the producer sends messages in multiple threads, the message with the smaller sequence Id can be pushed later than the message with the bigger sequence Id. The `internalSendWithTxnAsync` call `internalSendAsync` Asynchronously when `txn != null` https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L409 And the `sendAsync` acquire sequence ID is not included in the synchronized block with `serializeAndSendMessage`. https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L490 https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L555-L560 For example: We send 4 messages (msg1, msg2, msg3, msg4) to the broker and then the 4 messages may get 4 sequence Id (1, 3, 2, 4) which is not in order due to the logic to get the sequence ID and send the message is not in the same synchronous code block. And then the msg3 with sequence ID 2 will never be persistent successfully. ### Modification Add a method to update `sequenceId` and move the method in the sync code. Via apache#16196 we should update message metadata before computing the message size.
lifepuzzlefun
pushed a commit
to lifepuzzlefun/pulsar
that referenced
this pull request
Jan 10, 2023
…e#17836) ### Motivation When the producer sends messages in multiple threads, the message with the smaller sequence Id can be pushed later than the message with the bigger sequence Id. The `internalSendWithTxnAsync` call `internalSendAsync` Asynchronously when `txn != null` https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L409 And the `sendAsync` acquire sequence ID is not included in the synchronized block with `serializeAndSendMessage`. https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L490 https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L555-L560 For example: We send 4 messages (msg1, msg2, msg3, msg4) to the broker and then the 4 messages may get 4 sequence Id (1, 3, 2, 4) which is not in order due to the logic to get the sequence ID and send the message is not in the same synchronous code block. And then the msg3 with sequence ID 2 will never be persistent successfully. ### Modification Add a method to update `sequenceId` and move the method in the sync code. Via apache#16196 we should update message metadata before computing the message size.
liangyepianzhou
added a commit
that referenced
this pull request
Mar 16, 2023
When the producer sends messages in multiple threads, the message with the smaller sequence Id can be pushed later than the message with the bigger sequence Id. The `internalSendWithTxnAsync` call `internalSendAsync` Asynchronously when `txn != null` https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L409 And the `sendAsync` acquire sequence ID is not included in the synchronized block with `serializeAndSendMessage`. https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L490 https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L555-L560 For example: We send 4 messages (msg1, msg2, msg3, msg4) to the broker and then the 4 messages may get 4 sequence Id (1, 3, 2, 4) which is not in order due to the logic to get the sequence ID and send the message is not in the same synchronous code block. And then the msg3 with sequence ID 2 will never be persistent successfully. Add a method to update `sequenceId` and move the method in the sync code. Via #16196 we should update message metadata before computing the message size. (cherry picked from commit 7e258af)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Labels
doc-not-needed
Your PR changes do not impact docs
type/bug
The PR fixed a bug or issue reported a bug
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Fixes #16195
Motivation
PIP-132 considers the
message metadata size when computing the payload chunk size and the
number of chunks. However, it could make some messages whose size is
less than
maxMessageSize
cannot be sent. There are two reasons:MessageMetadata
will be updated after computing the payloadchunk size, i.e. the actual metadata size would be greater.
OpSendMsg#getMessageHeaderAndPayloadSize
doesn't exclude all bytesother than the metadata and payload, e.g. the 4 bytes checksum field.
For example, if the max message size is 100, send a string whose size is
60 with chunking enabled.
won't be spit into chunks.
serializeAndSendMessage
, the metadata size becomes 32, so theserialized header's total size is 4 + 8 + 6 + 4 + 32 = 54, and the
total size is 54 + 60 = 114, see
headerContentSize
inserializeCommandSendWithSize
.getMessageHeaderAndPayloadSize
, the returned value is computedby 114 - 8 - 4 = 102 > 100. The 6 bytes magic and checksum and 4
bytes metadata length field are not included.
Modifications
getMessageHeaderAndPayloadSize
.Verifying this change
Add
testChunkSize
to verify all sizes in range [1, maxMessageSize] canbe sent successfully when chunking is enabled.
Documentation
Check the box below or label this PR directly.
Need to update docs?
doc-required
(Your PR needs to update docs and you will update later)
doc-not-needed
(Please explain why)
doc
(Your PR contains doc changes)
doc-complete
(Docs have been already added)