-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[feat][client]PIP-180 ShadowTopic - Part II - Add support for messgae_id in CommandSend #17195
Conversation
/pulsarbot run-failure-checks |
protected ByteBufPair sendMessage(long producerId, long sequenceId, int numMessages, | ||
MessageId messageId, MessageMetadata msgMetadata, | ||
ByteBuf compressedPayload) { | ||
if (messageId instanceof MessageIdImpl) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't be a code path taken by user Pulsar clients but only by the Replicator.
Maybe we should add an internal flag to the PulsarClient to mark it as a internal client and allow it to be able to do this thing.
Pushing the messageId from the producer also may lead to some security issues.
We must prevent on the broker side that a client writes to the wrong ledger (for instance to a ledger of another topic, that can be not accessible from the client according the authz)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we have two problems:
- regular users should not be able to set the MessageId (I don't know if there is a way)
- the broker must not allow a client to write to a ledger that is not part of the topic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- regular users should not be able to set the MessageId (I don't know if there is a way)
For normal users, the public method TypedMessageBuilder<T> Producer#newMessage()
should be used to construct and send messages, but they can't set MessageId in TypedMessageBuilder
. So messageId won't be set unless users know what they are doing.
And currently, messageId only can be send to broker with public methods in ProducerImpl
. It's already kind of an internal usage.
- the broker must not allow a client to write to a ledger that is not part of the topic
Sure, I understand your concern, but it won't happen. The messageId will only be used by ShadowTopic and they won't have any write access to these ledgers or any ledgers. For normal topic, in case they received this messageId by some accident, they will just ignore it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eolivelli PTAL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's very confusing to see the logic here. What's the behavior if the ledger id and the entry id are both -1 in a CommandSend
? Is this else
branch protected by unit tests? @Jason918 @eolivelli
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I approve the patch.
But when we go down the implementation I would like to see some check about the "ledgerId".
We already had in the past problems with clients that are able to set a ledgerId and that caused lot of troubles (CVEs)
Sure |
…and apache#16605 - apache#17195 changed the method signature that apache#16605 depended upon
…sors (#17273) * [fix][broker] Fix broken build caused by conflict between #17195 and #16605 - #17195 changed the method signature that #16605 depended upon * [fix][broker] Keep sorted list of cursors ordered by read position of active cursors when cacheEvictionByMarkDeletedPosition=false Fixes #16054 - calculate the sorted list of when a read position gets updated - this resolves #9958 in a proper way - #12045 broke the caching solution as explained in #16054 - remove invalid tests - fix tests - add more tests to handle corner cases * Address review comment * Handle durable & non-durable in the correct way * Fix cache tests since now entries get evicted reactively * Address review comment about method names * Change signature for add method so that position must be passed - this is more consistent with cursorUpdated method where the position is passed * Update javadoc for ManagedCursorContainer * Address review comment * Simplify ManagedCursorContainer * Clarify javadoc * Ensure that cursors are tracked by making sure that initial position isn't null unintentionally * Prevent race in updating activeCursors
…sors (#17273) * [fix][broker] Fix broken build caused by conflict between #17195 and #16605 - #17195 changed the method signature that #16605 depended upon * [fix][broker] Keep sorted list of cursors ordered by read position of active cursors when cacheEvictionByMarkDeletedPosition=false Fixes #16054 - calculate the sorted list of when a read position gets updated - this resolves #9958 in a proper way - #12045 broke the caching solution as explained in #16054 - remove invalid tests - fix tests - add more tests to handle corner cases * Address review comment * Handle durable & non-durable in the correct way * Fix cache tests since now entries get evicted reactively * Address review comment about method names * Change signature for add method so that position must be passed - this is more consistent with cursorUpdated method where the position is passed * Update javadoc for ManagedCursorContainer * Address review comment * Simplify ManagedCursorContainer * Clarify javadoc * Ensure that cursors are tracked by making sure that initial position isn't null unintentionally * Prevent race in updating activeCursors
…sors (#17273) * [fix][broker] Fix broken build caused by conflict between #17195 and #16605 - #17195 changed the method signature that #16605 depended upon * [fix][broker] Keep sorted list of cursors ordered by read position of active cursors when cacheEvictionByMarkDeletedPosition=false Fixes #16054 - calculate the sorted list of when a read position gets updated - this resolves #9958 in a proper way - #12045 broke the caching solution as explained in #16054 - remove invalid tests - fix tests - add more tests to handle corner cases * Address review comment * Handle durable & non-durable in the correct way * Fix cache tests since now entries get evicted reactively * Address review comment about method names * Change signature for add method so that position must be passed - this is more consistent with cursorUpdated method where the position is passed * Update javadoc for ManagedCursorContainer * Address review comment * Simplify ManagedCursorContainer * Clarify javadoc * Ensure that cursors are tracked by making sure that initial position isn't null unintentionally * Prevent race in updating activeCursors (cherry picked from commit 856ef15)
…sors (#17273) * [fix][broker] Fix broken build caused by conflict between #17195 and #16605 - #17195 changed the method signature that #16605 depended upon * [fix][broker] Keep sorted list of cursors ordered by read position of active cursors when cacheEvictionByMarkDeletedPosition=false Fixes #16054 - calculate the sorted list of when a read position gets updated - this resolves #9958 in a proper way - #12045 broke the caching solution as explained in #16054 - remove invalid tests - fix tests - add more tests to handle corner cases * Address review comment * Handle durable & non-durable in the correct way * Fix cache tests since now entries get evicted reactively * Address review comment about method names * Change signature for add method so that position must be passed - this is more consistent with cursorUpdated method where the position is passed * Update javadoc for ManagedCursorContainer * Address review comment * Simplify ManagedCursorContainer * Clarify javadoc * Ensure that cursors are tracked by making sure that initial position isn't null unintentionally * Prevent race in updating activeCursors (cherry picked from commit 856ef15)
Master Issue: #16153
Motivation
See PIP-180 #16153. Shadow topic requires message id from shadow replicator to update LAC and populate entry cache.
Modifications
Add
message_id
in CommandSend.Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation
Check the box below or label this PR directly.
Need to update docs?
doc-required
(Your PR needs to update docs and you will update later)
doc-not-needed
This new field is for internal use only right now.
doc
(Your PR contains doc changes)
doc-complete
(Docs have been already added)