-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Exposing the broker entry metadata to client #11553
Exposing the broker entry metadata to client #11553
Conversation
Thanks for your contribution. For this PR, do we need to update docs? (The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks) |
Thanks. "Broker Entry Metadata" is a new feature since 2.8.0, but I haven't found any doc about it for now. Maybe it's better to add some docs for it? (Not just for my PR) |
@LeBW thanks for your suggestion. Would you like to add some docs from your perspective? |
Yeah, I can do it later. |
@LeBW you can ping me to review the doc and look forward to your contribution, thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work. I left a few comments
This is an API change and wire protocol change, can you please advertise it on the dev@ mailing list?
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
Outdated
Show resolved
Hide resolved
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
Show resolved
Hide resolved
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
Outdated
Show resolved
Hide resolved
Thanks for your suggestion. I'll write an email to advertise it. |
…rom long to Optional<Long>. 2. Improve the unit test.
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
Outdated
Show resolved
Hide resolved
…ryMetadataToClientEnabled
0666188
to
93f066c
Compare
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
Show resolved
Hide resolved
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
Show resolved
Hide resolved
@@ -186,6 +186,7 @@ | |||
private boolean preciseTopicPublishRateLimitingEnable; | |||
private boolean encryptionRequireOnProducer; | |||
|
|||
private boolean exposingBrokerEntryMetadataToClientEnabled; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to add ServerCnx? Is it able to get from the ServiceConfiguration directly?
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Show resolved
Hide resolved
|
||
@Override | ||
public Optional<Long> getIndex() { | ||
if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a batch message, we only have one index in the broker entry metadata, we will get the same index for the individual messages of a batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with @codelipenghui , for a batch message, we need to caculate the individual index for each message from the Entry index and batch size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you all. Let me fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
2. fix getIndex() for batch messages. 3. add E2E tests.
e5f6a0b
to
1025378
Compare
@Cleanup | ||
Producer<byte[]> producer = pulsarClient.newProducer() | ||
.topic(topic) | ||
.enableBatching(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default batching delay is 1ms, it's better to increase the batching delay to such as 1min and force flush the producer, because if we are running the test on a machine which with leak resource, we might get only 1 message in a batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can send 5 messages and flush them and send 5 messages again, to flush again, make sure we got 2 batch message and 10 individual messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Fixed.
if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) { | ||
if (msgMetadata.hasNumMessagesInBatch() && messageId instanceof BatchMessageIdImpl) { | ||
int batchIndex = ((BatchMessageIdImpl) messageId).getBatchIndex(); | ||
return Optional.of(brokerEntryMetadata.getIndex() - batchIndex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the index is 10 and the batch size is 5, we will get 10, 9, 8, 7, 6 for these 5 messages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great contribution!
@eolivelli Please help review the PR again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work !
LGTM
// skip broker entry metadata if consumer-client doesn't support broker entry metadata or the | ||
// features is not enabled | ||
if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v18.getValue() | ||
|| !cnx.isExposingBrokerEntryMetadataToClientEnabled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can get the configuration through the following dependency paths
PulsarCommandSenderImpl -> ServerCnx -> BrokerService -> PulsarService -> configuration
isExposingBrokerEntryMetadataToClientEnabled
in ServerCnx can be removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your suggestion. I've removed it in ServerCnx
Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload); | ||
// skip broker entry metadata if consumer-client doesn't support broker entry metadata or the | ||
// features is not enabled | ||
if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v18.getValue() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better to use the FeatureFlag
mechanism instead of checking the ProtocolVersion
. This will ensure clients can phase in with the support of this feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's interesting. I wasn't aware of that mechanism
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, even if the client with ProtocolVersion.v18
but does not support get broker entry metadata, the broker can skip the broker entry metadata to reduce the network workload.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better to use the
FeatureFlag
mechanism instead of checking theProtocolVersion
. This will ensure clients can phase in with the support of this feature.
Thanks for your suggestion. Maybe it's better to check both the FeatureFlag
and the ProtocolVersion
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better to use the
FeatureFlag
mechanism instead of checking theProtocolVersion
. This will ensure clients can phase in with the support of this feature.
PIP-70 adds a feature flag supports_broker_entry_metadata
, so I use it to check here. Can you review it again?
@LeBW thanks for your contribution! Please do not forget to add docs accordingly to allow users to know your great code changes. |
### Motivation This is an improvement to [PIP-70](https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-broker-entry-metadata). Now the client is able to get the broker entry metadata. ### Modifications 1. increment the client protocol version from 17 to 18. 2. add a configuration `enableExposingBrokerEntryMetadataToClient` in the broker configuration. 3. Let the broker send the broker entry metadata to the client when `client protocol version >= 18` and `enableExposingBrokerEntryMetadataToClient` is true. 4. Add client support for broker entry metadata. Add two methods `getBrokerPublishTime()` and `getIndex()` to utilize the broker entry metadata in `Message`.
This is an improvement to [PIP-70](https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-broker-entry-metadata). Now the client is able to get the broker entry metadata. 1. increment the client protocol version from 17 to 18. 2. add a configuration `enableExposingBrokerEntryMetadataToClient` in the broker configuration. 3. Let the broker send the broker entry metadata to the client when `client protocol version >= 18` and `enableExposingBrokerEntryMetadataToClient` is true. 4. Add client support for broker entry metadata. Add two methods `getBrokerPublishTime()` and `getIndex()` to utilize the broker entry metadata in `Message`. (cherry picked from commit a7a3721)
This is an improvement to [PIP-70](https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-broker-entry-metadata). Now the client is able to get the broker entry metadata. 1. increment the client protocol version from 17 to 18. 2. add a configuration `enableExposingBrokerEntryMetadataToClient` in the broker configuration. 3. Let the broker send the broker entry metadata to the client when `client protocol version >= 18` and `enableExposingBrokerEntryMetadataToClient` is true. 4. Add client support for broker entry metadata. Add two methods `getBrokerPublishTime()` and `getIndex()` to utilize the broker entry metadata in `Message`. (cherry picked from commit a7a3721)
Motivation
This is an improvement to PIP-70. Now the client is able to get the broker entry metadata.
Modifications
enableExposingBrokerEntryMetadataToClient
in the broker configuration.client protocol version >= 18
andenableExposingBrokerEntryMetadataToClient
is true.getBrokerPublishTime()
andgetIndex()
to utilize the broker entry metadata inMessage
.Verifying this change
Added test for
getBrokerPublishTime()
andgetIndex()
.Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation
For contributor
For this PR, do we need to update docs?