Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][ml] Do not switch thread to execute asyncAddEntry's core logic #23940

Conversation

BewareMyPower
Copy link
Contributor

Motivation

In protocol handler's implementation, sometimes it needs to record the current base offset before the actual asyncAddEntry call (or the wrapped PersistentTopic#publishMessages call), for example:

    private final PersistentTopic persistentTopic;
    private final Set<Long> pendingBaseOffsets = ConcurrentHashMap.newKeySet();

    private synchronized void add(ByteBuf buffer, int batchSize) {
        final var ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
        final var interceptor = (ManagedLedgerInterceptorImpl) ml.getManagedLedgerInterceptor();
        final var baseOffset = interceptor.getIndex(); // the base offset of the next batch to write
        pendingBaseOffsets.add(baseOffset);
        ml.asyncAddEntry(buffer, batchSize, new AsyncCallbacks.AddEntryCallback() {

            @Override
            public void addComplete(Position position, ByteBuf entryData, Object ctx) {
                if (!pendingBaseOffsets.remove(baseOffset)) {
                    log.error("Failed to remove {}", baseOffset);
                }

However, the existing implementation won't work as expected that even if the downstream protocol handler guarantees that asyncAddEntry is called in a single thread. See

// Jump to specific thread to avoid contention from writers writing from different threads
executor.execute(() -> {
OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx,
currentLedgerTimeoutTriggered);
internalAsyncAddEntry(addOperation);
});

First, the code above does not make sense because internalAsyncAddEntry itself is already synchronized.

Second, to guarantee thread safety for concurrent add operations, switching to another thread is not more efficient than synchronizing the method call. Mostly, it's less efficient because execute involves with at least two method calls (offer and poll) on a BlockingQueue, as well as some other operations (like CAS). Take ArrayBlockingQueue as example, both its offer and poll implementations need to acquire the internal lock.

Third, switching to another thread to execute the core logic is anti-intuitive, especially for modifying some fields that represent the current states. For example, to achieve the same goal at the beginning, I have to write the code like:

        ml.getExecutor().execute(() -> {
            pendingBaseOffsets.add(baseOffset);
            ml.asyncAddEntry(buffer, batchSize, callback, ctx);
        });

Modifications

Split internalAsyncAddEntry to two methods beforeAddEntryToQueue and afterAddEntryToQueue that are called before and after adding the operation to pendingAddEntries. Then simplify the code by throwing an exception and pass the exception to the callback. It's also more efficient because OpAddEntry#failed won't be called in the synchronized block.

Add testBeforeAddEntry to protect the change.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Feb 6, 2025
@BewareMyPower BewareMyPower self-assigned this Feb 6, 2025
@BewareMyPower BewareMyPower added type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages area/broker release/4.0.3 labels Feb 6, 2025
@dao-jun
Copy link
Member

dao-jun commented Feb 7, 2025

In protocol handler's implementation, sometimes it needs to record the current base offset before the actual asyncAddEntry call (or the wrapped PersistentTopic#publishMessages call)

I don't understand this part, could you pls explain more clear?

@BewareMyPower
Copy link
Contributor Author

BewareMyPower commented Feb 7, 2025

@dao-jun I found this issue when I implemented the Kafka transaction. In Kafka, analyzeAndValidateProducerState is called before writing messages, this method will update a map that maps the next message's offset (key) to an object and it will remove the key after the record batch is written.

For example, assuming there are 3 ongoing writes, the flow looks like the following pseudo code:

for (int i = 0; i < 3; i++) {
    long nextOffset = interceptor.getIndex() + 1; // LEO
    ongoingTxns.put(nextOffset, txn); // record the ongoing txn of this record batch
    asyncAddEntry().thenAccept(__ -> ongoingTxns.remove(nextOffset));
}

Assuming each record batch has only 1 message, ideally, after these 3 writes, ongoingTxns will have 3 keys (0, 1, 2). However, since asyncAddEntry switches thread to the ML's executor to call interceptor's beforeAddEntry method, there is a chance that nextOffset is always 0 in these 3 loops and ongoingTxns.remove(nextOffset) will take effect only once. Then it could hit here in the actual code, which is much different from the OSS KoP.

The details above are beyond the scope of this PR but could help you understand the motivation.

@dao-jun
Copy link
Member

dao-jun commented Feb 7, 2025

@dao-jun I found this issue when I implemented the Kafka transaction. In Kafka, analyzeAndValidateProducerState is called before writing messages, this method will update a map that maps the next message's offset (key) to an object and it will remove the key after the record batch is written.

For example, assuming there are 3 ongoing writes, the flow looks like the following pseudo code:

for (int i = 0; i < 3; i++) {
    long nextOffset = interceptor.getIndex() + 1; // LEO
    ongoingTxns.put(nextOffset, txn); // record the ongoing txn of this record batch
    asyncAddEntry().thenAccept(__ -> ongoingTxns.remove(nextOffset));
}

Assuming each record batch has only 1 message, ideally, after these 3 writes, ongoingTxns will have 3 keys (0, 1, 2). However, since asyncAddEntry switches thread to the ML's executor to call interceptor's beforeAddEntry method, there is a chance that nextOffset is always 0 in these 3 loops and ongoingTxns.remove(nextOffset) will take effect only once. Then it could hit here in the actual code, which is much different from the OSS KoP.

The details above are beyond the scope of this PR but could help you understand the motivation.

Thanks for your explain!

@BewareMyPower BewareMyPower marked this pull request as draft February 8, 2025 06:52
@BewareMyPower
Copy link
Contributor Author

Some shadow topic related tests failed, I will fix them

@BewareMyPower BewareMyPower marked this pull request as ready for review February 8, 2025 07:47
@codecov-commenter
Copy link

codecov-commenter commented Feb 8, 2025

Codecov Report

Attention: Patch coverage is 84.00000% with 4 lines in your changes missing coverage. Please review.

Project coverage is 74.19%. Comparing base (bbc6224) to head (4f28483).
Report is 890 commits behind head on master.

Files with missing lines Patch % Lines
...okkeeper/mledger/impl/ShadowManagedLedgerImpl.java 25.00% 3 Missing ⚠️
...che/bookkeeper/mledger/impl/ManagedLedgerImpl.java 95.23% 0 Missing and 1 partial ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #23940      +/-   ##
============================================
+ Coverage     73.57%   74.19%   +0.61%     
+ Complexity    32624    32236     -388     
============================================
  Files          1877     1853      -24     
  Lines        139502   143739    +4237     
  Branches      15299    16332    +1033     
============================================
+ Hits         102638   106643    +4005     
+ Misses        28908    28692     -216     
- Partials       7956     8404     +448     
Flag Coverage Δ
inttests 26.71% <52.00%> (+2.13%) ⬆️
systests 23.20% <44.00%> (-1.12%) ⬇️
unittests 73.71% <84.00%> (+0.86%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...che/bookkeeper/mledger/impl/ManagedLedgerImpl.java 81.87% <95.23%> (+1.21%) ⬆️
...okkeeper/mledger/impl/ShadowManagedLedgerImpl.java 59.70% <25.00%> (+4.04%) ⬆️

... and 1032 files with indirect coverage changes

@BewareMyPower BewareMyPower merged commit 215b36d into apache:master Feb 10, 2025
52 checks passed
@BewareMyPower BewareMyPower deleted the bewaremypower/fix-async-add-entry-safety branch February 10, 2025 03:19
BewareMyPower added a commit that referenced this pull request Feb 10, 2025
hanmz pushed a commit to hanmz/pulsar that referenced this pull request Feb 12, 2025
@lhotari
Copy link
Member

lhotari commented Feb 13, 2025

@BewareMyPower Please also remove the comment "Jump to specific thread to avoid contention from writers writing from different threads" that is now obsolete:

// Jump to specific thread to avoid contention from writers writing from different threads

@lhotari
Copy link
Member

lhotari commented Feb 13, 2025

@BewareMyPower This logic doesn't either make sense any more after the changes:

// retain buffer in this thread
buffer.retain();
// Jump to specific thread to avoid contention from writers writing from different threads
final var addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx,
currentLedgerTimeoutTriggered);

@lhotari
Copy link
Member

lhotari commented Feb 13, 2025

@BewareMyPower There's a high chance that this change causes performance regressions:

// Use synchronized to ensure if `addOperation` is added to queue and fails later, it will be the first
// element in `pendingAddEntries`.
synchronized (this) {

In Pulsar use cases, this could possibly happen when there's a large number of producers producing to a topic.
Blocking the IO threads with synchronization will have a larger impact since it will impact Netty IO of all connections sharing the same IO thread.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker cherry-picked/branch-4.0 doc-not-needed Your PR changes do not impact docs release/4.0.3 type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants