Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Fix race conditions of LedgerHandle in client #4171

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

graysonzeng
Copy link
Contributor

@graysonzeng graysonzeng commented Jan 5, 2024

Master Issue:
apache/pulsar#21860

Motivation

When bookies do a rolling restart, pulsar topic fence state may be triggered due to race conditions. And it can not recover.

After check the heap dump of the broker, we can see the pendingWriteOps is 161, this is the reason why the topic can not recover from the fenced state.

image

image

The topic will only change to unfenced when pendingWriteOps is reduced to 0. See unfenced condition

    private void decrementPendingWriteOpsAndCheck() {
        long pending = pendingWriteOps.decrementAndGet();
       // unfenced  condition
        if (pending == 0 && isFenced && !isClosingOrDeleting) {
            synchronized (this) {
                if (isFenced && !isClosingOrDeleting) {
                    messageDeduplication.resetHighestSequenceIdPushed();
                    log.info("[{}] Un-fencing topic...", topic);
                    // signal to managed ledger that we are ready to resume by creating a new ledger
                    ledger.readyToCreateNewLedger();
                    unfence();
                }

            }
        }
    }

After a deep investigation, we found the cause of the error。The root cause due to sendAddSuccessCallbacks may be multiple called at the same time. One is that unsetSuccessAndSendWriteRequest is called by the BookKeeperClientWorker-OrderedExecutor thread, and the other is writeCompletein pulsar-io thread. We should prevent sendAddSuccessCallbacks from being called again before it completes.

Changes

Add a boolean value sendingCallbacks to indicate whether sendAddSuccessCallbacks is being called
.If we find that method sendAddSuccessCallbacks is being called when we try to call it, return directly.

Not recommended changes

If we add synchronized to the sendAddSuccessCallbacks, it will impact the performance and may lead to deadlock.

@lhotari
Copy link
Member

lhotari commented Jan 10, 2024

After a deep investigation, we found the cause of the error。The root cause due to sendAddSuccessCallbacks may be multiple called at the same time. One is that unsetSuccessAndSendWriteRequest is called by the BookKeeperClientWorker-OrderedExecutor thread, and the other is writeCompletein pulsar-io thread. We should prevent sendAddSuccessCallbacks from being called again before it completes.

It looks like all calls happen under the same object monitor lock (synchronized for PendingAddOp), therefore it seems that concurrent calls are already prevented. @graysonzeng Could you please check if this is the case?

@graysonzeng
Copy link
Contributor Author

graysonzeng commented Jan 10, 2024

After a deep investigation, we found the cause of the error。The root cause due to sendAddSuccessCallbacks may be multiple called at the same time. One is that unsetSuccessAndSendWriteRequest is called by the BookKeeperClientWorker-OrderedExecutor thread, and the other is writeCompletein pulsar-io thread. We should prevent sendAddSuccessCallbacks from being called again before it completes.

It looks like all calls happen under the same object monitor lock (synchronized for PendingAddOp), therefore it seems that concurrent calls are already prevented. @graysonzeng Could you please check if this is the case?

At first time I was thinking the same doubt as you, it didn't seem like it should be happening. But in fact, these two threads can call two different PendingAddOp instances, and the two instances have the same LedgerHandle instance attribute. Synchronized for PendingAddOp does not block the same LedgerHandle sendAddSuccessCallbacks method of the instance is called, concurrent calls are not prevented. @lhotari

@lhotari
Copy link
Member

lhotari commented Jan 10, 2024

At first I was thinking the same thing as you, it didn't seem like it should be happening. But in fact, these two threads can call two different PendingAddOp instances, and these two instances have the same LedgerHandle instance attribute. Synchronized for PendingAddOp does not block the same LedgerHandle sendAddSuccessCallbacks method of the instance is called, the sendAddSuccessCallbacks concurrent calls are not prevented.

Thanks for explaining that @graysonzeng. Makes sense. Just wondering if the logic really works correctly without making the sendAddSuccessCallbacks method synchronized. For example, on line 1831, the call to pendingAddOps.remove() seems to assume that it is the same instance that pendingAddOps.peek() returned on line 1814. The extensive usage of synchronized isn't nice, but there doesn't seem to be away around it?

@lhotari
Copy link
Member

lhotari commented Jan 10, 2024

@graysonzeng Another thread safety problem is the changingEnsemble field. It's modified under the metadataLock object monitor.

synchronized (metadataLock) {
if (changingEnsemble) {
delayedWriteFailedBookies.putAll(failedBookies);
} else {
changingEnsemble = true;
triggerLoop = true;
toReplace = new HashMap<>(delayedWriteFailedBookies);
delayedWriteFailedBookies.clear();
toReplace.putAll(failedBookies);
origEnsemble = getCurrentEnsemble();
}
}

should we make changingEmsemble field volatile to fix this part of the problem?

@merlimat merlimat added this to the 4.17.0 milestone Jan 10, 2024
@graysonzeng
Copy link
Contributor Author

Thanks for explaining that @graysonzeng. Makes sense. Just wondering if the logic really works correctly without making the sendAddSuccessCallbacks method synchronized. For example, on line 1831, the call to pendingAddOps.remove() seems to assume that it is the same instance that pendingAddOps.peek() returned on line 1814. The extensive usage of synchronized isn't nice, but there doesn't seem to be away around it?

When I first repaired it, I tried to use synchronized on sendAddSuccessCallbacks, and a deadlock occurred.

"BookKeeperClientWorker-OrderedExecutor-8-0":
	at org.apache.bookkeeper.client.LedgerHandle.sendAddSuccessCallbacks(LedgerHandle.java:1811)
	- waiting to lock <0x00000000bfae1120> (a org.apache.bookkeeper.client.LedgerHandle)
	at org.apache.bookkeeper.client.PendingAddOp.unsetSuccessAndSendWriteRequest(PendingAddOp.java:204)
	- locked <0x00000000c3946f68> (a org.apache.bookkeeper.client.PendingAddOp)
	at org.apache.bookkeeper.client.LedgerHandle.unsetSuccessAndSendWriteRequest(LedgerHandle.java:2006)
	at org.apache.bookkeeper.client.LedgerHandle.lambda$ensembleChangeLoop$10(LedgerHandle.java:1997)
	at org.apache.bookkeeper.client.LedgerHandle$$Lambda$1586/0x00007f8e4c95ac58.accept(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(java.base@17.0.8/CompletableFuture.java:863)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(java.base@17.0.8/CompletableFuture.java:841)
	at java.util.concurrent.CompletableFuture$Completion.run(java.base@17.0.8/CompletableFuture.java:482)
	at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137)
	at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:107)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.8/Thread.java:833)


"pulsar-io-11-12":
	at org.apache.bookkeeper.client.PendingAddOp.submitCallback(PendingAddOp.java:394)
	- waiting to lock <0x00000000c3946f68> (a org.apache.bookkeeper.client.PendingAddOp)
	at org.apache.bookkeeper.client.LedgerHandle.sendAddSuccessCallbacks(LedgerHandle.java:1835)
	- locked <0x00000000bfae1120> (a org.apache.bookkeeper.client.LedgerHandle)
	at org.apache.bookkeeper.client.PendingAddOp.sendAddSuccessCallbacks(PendingAddOp.java:390)
	at org.apache.bookkeeper.client.PendingAddOp.writeComplete(PendingAddOp.java:307)
	- locked <0x00000000c3945640> (a org.apache.bookkeeper.client.PendingAddOp)
	at org.apache.bookkeeper.proto.BookieClientImpl.completeAdd(BookieClientImpl.java:284)
	at org.apache.bookkeeper.proto.BookieClientImpl.access$000(BookieClientImpl.java:78)
	at org.apache.bookkeeper.proto.BookieClientImpl$ChannelReadyForAddEntryCallback.operationComplete(BookieClientImpl.java:396)
	at org.apache.bookkeeper.proto.BookieClientImpl$ChannelReadyForAddEntryCallback.operationComplete(BookieClientImpl.java:356)
	at org.apache.bookkeeper.proto.PerChannelBookieClient$ConnectionFutureListener.operationComplete(PerChannelBookieClient.java:2548)
	at org.apache.bookkeeper.proto.PerChannelBookieClient$ConnectionFutureListener.operationComplete(PerChannelBookieClient.java:2453)
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:583)
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:559)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
	at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
	at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:629)
	at io.netty.util.concurrent.DefaultPromise.setFailure(DefaultPromise.java:110)
	at io.netty.channel.DefaultChannelPromise.setFailure(DefaultChannelPromise.java:89)
	at io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:228)
	at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:46)
	at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:189)
	at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:175)
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:557)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
	at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
	at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625)
	at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105)
	at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetSuccess(AbstractChannel.java:990)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:516)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(AbstractChannel.java:429)
	at io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:486)
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:413)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.8/Thread.java:833)

Therefore I want to use sendingCallbacks instead of synchronized

For example, on line 1831, the call to pendingAddOps.remove() seems to assume that it is the same instance that pendingAddOps.peek() returned on line 1814.

Thanks for reminding. I replaced pendingAddOps.remove() with pendingAddOps.remove(pendingAddOp) to ensure that the same element is processed.

should we make changingEmsemble field volatile to fix this part of the problem?

Great suggestion, this will ensure the visibility of changingEmsemble. I've added it

@lhotari
Copy link
Member

lhotari commented Jan 11, 2024

Therefore I want to use sendingCallbacks instead of synchronized

Makes sense. The only concern is about correctness. If the method is already processing while it gets called another time, is it necessary to execute the loop one extra time to ensure that something doesn't get left unprocessed?

@lhotari
Copy link
Member

lhotari commented Jan 11, 2024

Btw. The thread execution model for LedgerHandle isn't consistent. In some cases, the thread is switched to use the ordered executor and sometimes it's not. For example, here the thread is switched:

executeOrdered(() ->
unsetSuccessAndSendWriteRequest(getCurrentEnsemble(), failedBookies.keySet()));

@lhotari
Copy link
Member

lhotari commented Jan 11, 2024

This observation isn't about this PR directly. I have so many doubts of the correctness of sendAddSuccessCallbacks when it's not executed by the ordered executor.

Talking about the current code for LedgerHandle.sendAddSuccessCallbacks in master branch:

void sendAddSuccessCallbacks() {
// Start from the head of the queue and proceed while there are
// entries that have had all their responses come back
PendingAddOp pendingAddOp;
while ((pendingAddOp = pendingAddOps.peek()) != null
&& !changingEnsemble) {
if (!pendingAddOp.completed) {
if (LOG.isDebugEnabled()) {
LOG.debug("pending add not completed: {}", pendingAddOp);
}
return;
}
// Check if it is the next entry in the sequence.
if (pendingAddOp.entryId != 0 && pendingAddOp.entryId != pendingAddsSequenceHead + 1) {
if (LOG.isDebugEnabled()) {
LOG.debug("Head of the queue entryId: {} is not the expected value: {}", pendingAddOp.entryId,
pendingAddsSequenceHead + 1);
}
return;
}
pendingAddOps.remove();
explicitLacFlushPolicy.updatePiggyBackedLac(lastAddConfirmed);
pendingAddsSequenceHead = pendingAddOp.entryId;
if (!writeFlags.contains(WriteFlag.DEFERRED_SYNC)) {
this.lastAddConfirmed = pendingAddsSequenceHead;
}
pendingAddOp.submitCallback(BKException.Code.OK);
}
}

If the order gets mixed, it seems that no progress could be made?

// Check if it is the next entry in the sequence.
if (pendingAddOp.entryId != 0 && pendingAddOp.entryId != pendingAddsSequenceHead + 1) {
if (LOG.isDebugEnabled()) {
LOG.debug("Head of the queue entryId: {} is not the expected value: {}", pendingAddOp.entryId,
pendingAddsSequenceHead + 1);
}
return;
}

@lhotari
Copy link
Member

lhotari commented Jan 11, 2024

The logic in sendAddSuccessCallbacks in master branch feels wrong. The logic gets stuck if the call to sendAddSuccessCallbacks is missed or out of order. Calling the method multiple times won't help once the head of the queue is out of sync since removing entries requires that the head of the queue has pendingAddOp.entryId == pendingAddsSequenceHead + 1.
@eolivelli Do you have a chance to help with this challenge?

@graysonzeng
Copy link
Contributor Author

graysonzeng commented Jan 11, 2024

Makes sense. The only concern is about correctness. If the method is already processing while it gets called another time, is it necessary to execute the loop one extra time to ensure that something doesn't get left unprocessed?

The sendAddSuccessCallbacks method has no input parameters, which means that assuming both threads want to call it, the execution result will be the same no matter which thread triggers it. Therefore there is no need to execute an additional loop

@lhotari
Copy link
Member

lhotari commented Jan 11, 2024

The sendAddSuccessCallbacks method has no input parameters, which means that assuming both threads want to call it, the execution result will be the same no matter which thread triggers it. Therefore there is no need to execute an additional loop

@graysonzeng I added another comment #4171 (comment) where I explain the problem.

I think that this line should be changed to use the ordered executor:

Changing Line 204 to

            lh.executeOrdered(lh::sendAddSuccessCallbacks);

And then sendAddSuccessCallbacks could be made a synchronized method. That would prevent the dead lock seen in #4171 (comment) . Makes sense?

@lhotari
Copy link
Member

lhotari commented Jan 11, 2024

And then sendAddSuccessCallbacks could be made a synchronized method. That would prevent the dead lock seen in #4171 (comment) . Makes sense?

It's possible that it doesn't solve the problem. Most likely it solves the dead lock when the synchronized method gets called without holding any other locks.

@lhotari
Copy link
Member

lhotari commented Jan 11, 2024

@graysonzeng I have a question about the problem description:

After a deep investigation, we found the cause of the error。The root cause due to sendAddSuccessCallbacks may be multiple called at the same time. One is that unsetSuccessAndSendWriteRequest is called by the BookKeeperClientWorker-OrderedExecutor thread, and the other is writeCompletein pulsar-io thread. We should prevent sendAddSuccessCallbacks from being called again before it completes.

Why does it exactly cause a problem when the method is called multiple times at the same time?
I can see that the visibility of changingEnsemble field is a bug and could cause issues, but what other actual problems are there in concurrent access.

@graysonzeng
Copy link
Contributor Author

graysonzeng commented Jan 11, 2024

The logic in sendAddSuccessCallbacks in master branch feels wrong. The logic gets stuck if the call to sendAddSuccessCallbacks is missed or out of order. Calling the method multiple times won't help once the head of the queue is out of sync since removing entries requires that the head of the queue has pendingAddOp.entryId == pendingAddsSequenceHead + 1. @eolivelli Do you have a chance to help with this challenge?

I agree with your comment. In fact, this comment exactly answers the reason for the problem. when the method is called multiple times at the same time,missed or out of order will happen in pendingAddOp. pendingAddOp.entryId == pendingAddsSequenceHead + 1 condition prevents it from continuing to execute. The heap dump of the broker confirms this

image

In a ledgerHanle instance, we can see the peek pendingAddOp pendingAddOp.entryId is 224113,it is not it is not equal to pendingAddsSequenceHead + 1 (pendingAddsSequenceHead 224111)
@lhotari

@lhotari
Copy link
Member

lhotari commented Jan 11, 2024

@graysonzeng After spending more time with this issue, here are my suggestions to fix the issue:

  • make sendAddSuccessCallbacks method synchronized.
  • make changingEnsemble field volatile
  • change line 204 in PendingAddOp to lh.executeOrdered(lh::sendAddSuccessCallbacks); to prevent the deadlock
  • add logic to drainPendingAddsAndAdjustLength method to update pendingAddsSequenceHead accordingly. The drainPendingAddsAndAdjustLength method is where pendingAddsSequenceHead could currently get out of sync.

@hangc0276
Copy link
Contributor

@lhotari

make changingEnsemble field volatile

The changingEnsemble is already protected by synchronized, why do we need volatile?

change line 204 in PendingAddOp to lh.executeOrdered(lh::sendAddSuccessCallbacks); to prevent the deadlock

+1 for @eolivelli 's suggestion. You can add a “safeSendAddSuccessCallbacks” that calls sendAddSuccessCallbacks in the OrderedExecutor

make sendAddSuccessCallbacks method synchronized.

After we make sendAddSuccessCallbacks is called by the same thread, we don't need synchronize.

add logic to drainPendingAddsAndAdjustLength method to update pendingAddsSequenceHead accordingly. The drainPendingAddsAndAdjustLength method is where pendingAddsSequenceHead could currently get out of sync.

IMO, the drainPendingAddsAndAdjustLength method also need to be executed by OrderedExecutor.

@graysonzeng We'd better add a unit test to verify and protect the logic.

@lhotari
Copy link
Member

lhotari commented Jan 11, 2024

The changingEnsemble is already protected by synchronized, why do we need volatile?

@hangc0276 changingEnsemble is mutated under synchronized (metadataLock) {. A mutation in a synchronized block isn't sufficient alone. it would have to be read under synchronized (metadataLock) { for it to be thread safe without considering any other means for achieving "happens-before". Making it volatile is one way to fix the current issue.

@lhotari
Copy link
Member

lhotari commented Jan 11, 2024

After we make sendAddSuccessCallbacks is called by the same thread, we don't need synchronize.

@hangc0276 True, but that would be a significant change that could at least cause performance regressions. The Bookkeeper client code doesn't seem to use the OrderedExecutor in most cases. The current code base is using synchronized in this area.

After thinking about it, it might not be necessary to make the method itself synchronized. It might not even be necessary to run it with an OrderedExecutor. I think that it's sufficient to ensure that sendAddSuccessCallbacks and drainPendingAddsAndAdjustLength methods don't race and are serially executed. A simple synchronized(pendingAddOps) { could be the simplest possible solution. It's also possible that a race between sendAddSuccessCallbacks and drainPendingAddsAndAdjustLength methods isn't even a problem.

IMO, the drainPendingAddsAndAdjustLength method also need to be executed by OrderedExecutor.

The issue in drainPendingAddsAndAdjustLength isn't an ordering problem. The method simply doesn't update pendingAddsSequenceHead so that sendAddSuccessCallbacks could work after drainPendingAddsAndAdjustLength has been called. The instance will get stuck when pendingAddsSequenceHead gets out of sync in drainPendingAddsAndAdjustLength. My current assumption is that it's the root cause of this issue together with the changingEnsemble thread visibility issue.

@lhotari
Copy link
Member

lhotari commented Jan 11, 2024

I created an alternative fix #4175 to bring clarity of what I'd be proposing. Once we have a reproducer for the issue in BK as a unit test, it will be easier to validate.

@graysonzeng
Copy link
Contributor Author

graysonzeng commented Jan 12, 2024

@lhotari @hangc0276 @eolivelli Thank you very much for your suggestions, I'll be happy to continue improving it.

True, but that would be a significant change that could at least cause performance regressions

  1. So we not use the OrderedExecutor, and should synchronized (pendingAddOps) for serializing drainPendingAddsAndAdjustLength & sendAddSuccessCallbacks. Is it right? @lhotari

  2. make changingEnsemble field volatile.

add logic to drainPendingAddsAndAdjustLength method to update pendingAddsSequenceHead accordingly

  1. So make lastEntry = lastAddPushed = pendingAddsSequenceHead = LedgerHandle.this.lastAddConfirmed in doAsyncCloseInternal

IMO, the drainPendingAddsAndAdjustLength method also need to be executed by OrderedExecutor.

After that, we don't need to use OrderedExecutor. Is it right? @hangc0276

remove the too strict rule for pendingAddsSequenceHead which breaks things after failures

I'm a little confused about this . Can you tell it more about it? @lhotari

@lhotari
Copy link
Member

lhotari commented Jan 12, 2024

  1. So we not use the OrderedExecutor, and should synchronized (pendingAddOps) for serializing drainPendingAddsAndAdjustLength & sendAddSuccessCallbacks. Is it right? @lhotari

@graysonzeng I don't think that using OrderedExecutor is justified since callback responses have never been executed by the OrderedExecutor. All sendAddSuccessCallbacks & drainPendingAddsAndAdjustLength calls should be serialized for correctness and that's what synchronized (pendingAddOps) { solves without causing a risk for a dead lock.

remove the too strict rule for pendingAddsSequenceHead which breaks things after failures

I'm a little confused about this . Can you tell it more about it? @lhotari

@graysonzeng
This rule doesn't make sense to me:

// Check if it is the next entry in the sequence.
if (pendingAddOp.entryId != 0 && pendingAddOp.entryId != pendingAddsSequenceHead + 1) {
if (LOG.isDebugEnabled()) {
LOG.debug("Head of the queue entryId: {} is not the expected value: {}", pendingAddOp.entryId,
pendingAddsSequenceHead + 1);
}
return;
}

My assumption is that this doesn't hold under failure conditions. Why would there even need to be such a rule?
When this rule kicks in, the LedgerHandle will stop calling callbacks. That doesn't make any sense to me.

Here's a sign that the entry id isn't always a continuous sequence:
In LedgerHandle.updateLastConfirmed:

When the LAC jumps forward and lastAddPushed = Math.max(lastAddPushed, lac) kicks in, the pendingAddsSequenceHead rule would again prevent LedgerHandle from calling callbacks in sendAddSuccessCallbacks.

I wonder if I'm making correct observations about the code. @eolivelli could you please explain how the above condition is handled?

@graysonzeng
Copy link
Contributor Author

graysonzeng commented Jan 15, 2024

Thanks for your reply @lhotari, I have some test for #4175 , Unfortunately it will also cause deadlock.

"pulsar-io-3-4":
	at org.apache.bookkeeper.client.PendingAddOp.submitCallback(PendingAddOp.java:394)
	- waiting to lock <0x000000077f2becf0> (a org.apache.bookkeeper.client.PendingAddOp)
	at org.apache.bookkeeper.client.LedgerHandle.sendAddSuccessCallbacks(LedgerHandle.java:1838)
	- locked <0x000000077e6c0d58> (a java.util.concurrent.ConcurrentLinkedQueue)
	at org.apache.bookkeeper.client.PendingAddOp.sendAddSuccessCallbacks(PendingAddOp.java:390)
	at org.apache.bookkeeper.client.PendingAddOp.writeComplete(PendingAddOp.java:307)
	- locked <0x000000077f2bae98> (a org.apache.bookkeeper.client.PendingAddOp)
	at org.apache.bookkeeper.proto.BookieClientImpl.completeAdd(BookieClientImpl.java:284)
	at org.apache.bookkeeper.proto.BookieClientImpl.access$000(BookieClientImpl.java:78)
	at org.apache.bookkeeper.proto.BookieClientImpl$ChannelReadyForAddEntryCallback.operationComplete(BookieClientImpl.java:396)
	at org.apache.bookkeeper.proto.BookieClientImpl$ChannelReadyForAddEntryCallback.operationComplete(BookieClientImpl.java:356)
	at org.apache.bookkeeper.proto.PerChannelBookieClient$ConnectionFutureListener.operationComplete(PerChannelBookieClient.java:2548)
	at org.apache.bookkeeper.proto.PerChannelBookieClient$ConnectionFutureListener.operationComplete(PerChannelBookieClient.java:2453)
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:583)
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:559)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
	at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
	at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:629)
	at io.netty.util.concurrent.DefaultPromise.setFailure(DefaultPromise.java:110)
	at io.netty.channel.DefaultChannelPromise.setFailure(DefaultChannelPromise.java:89)
	at io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:228)
	at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:47)
	at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:189)
	at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:175)
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:557)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
	at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
	at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625)
	at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105)
	at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetSuccess(AbstractChannel.java:990)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:516)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(AbstractChannel.java:429)
	at io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:486)
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:413)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.8/Thread.java:833)

"BookKeeperClientWorker-OrderedExecutor-7-0":
	at org.apache.bookkeeper.client.LedgerHandle.sendAddSuccessCallbacks(LedgerHandle.java:1819)
	- waiting to lock <0x000000077e6c0d58> (a java.util.concurrent.ConcurrentLinkedQueue)
	at org.apache.bookkeeper.client.PendingAddOp.sendAddSuccessCallbacks(PendingAddOp.java:390)
	at org.apache.bookkeeper.client.PendingAddOp.writeComplete(PendingAddOp.java:307)
	- locked <0x000000077f2becf0> (a org.apache.bookkeeper.client.PendingAddOp)
	at org.apache.bookkeeper.proto.PerChannelBookieClient$AddCompletion.writeComplete(PerChannelBookieClient.java:2183)
	at org.apache.bookkeeper.proto.PerChannelBookieClient$AddCompletion.handleResponse(PerChannelBookieClient.java:2240)
	at org.apache.bookkeeper.proto.PerChannelBookieClient$AddCompletion.handleV2Response(PerChannelBookieClient.java:2219)
	at org.apache.bookkeeper.proto.PerChannelBookieClient$ReadV2ResponseCallback.run(PerChannelBookieClient.java:1397)
	at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137)
	at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:113)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.8/Thread.java:833)

Found 1 deadlock.

From the stack we can see that this happens within sendAddSuccessCallbacks

    void sendAddSuccessCallbacks() {

        // thread A has acquired the lock of pendingAddOp instance A , and is trying to acquire it <----
        synchronized (pendingAddOps) {
            PendingAddOp pendingAddOp;
            while ((pendingAddOp = pendingAddOps.peek()) != null
                    && !changingEnsemble) {
                if (!pendingAddOp.completed) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("pending add not completed: {}", pendingAddOp);
                    }
                    return;
                }

                pendingAddOps.remove();
                explicitLacFlushPolicy.updatePiggyBackedLac(lastAddConfirmed);
                pendingAddsSequenceHead = pendingAddOp.entryId;
                if (!writeFlags.contains(WriteFlag.DEFERRED_SYNC)) {
                    this.lastAddConfirmed = pendingAddsSequenceHead;
                }
                
                // thread B has acquired the lock of pendingAddOp instance B and   
                // the `pendingAddOps` lock of this LedgerHandle instance,  and is trying to acquire it <----
                pendingAddOp.submitCallback(BKException.Code.OK);
            }
        }

    }

thread A -> pendingAddOp lock A (locked) -> LedgerHandle lock L (locked) -> pendingAddOp lock B (waiting)
thread B -> pendingAddOp lock B (locked) -> LedgerHandle lock L (waiting)

Here is the entire stack file
deadLockThreadDump.txt

@hangc0276 True, but that would be a significant change that could at least cause performance regressions.

Suppose we are not willing to use OrderedExecutor because of performance regressions, are there any other better suggestions? i'm looking forward to it @lhotari

@lhotari
Copy link
Member

lhotari commented Jan 15, 2024

Thanks for your reply @lhotari, I have some test for #4175 , Unfortunately it will also cause deadlock.

Suppose we are not willing to use OrderedExecutor because of performance regressions, are there any other better suggestions? i'm looking forward to it @lhotari

Thanks for testing this @graysonzeng .

It seems that one possibility to prevent the deadlock would be to use the solution that you had proposed initially, the AtomicBoolean solution. The synchronized (pendingAddOps) { block could continued to be used to prevent a race caused by drainPendingAddsAndAdjustLength.
I pushed a commit to #4175 .

@graysonzeng graysonzeng force-pushed the fix_topic_unavailable_error branch from 7cdab79 to da3802a Compare January 17, 2024 09:36
@graysonzeng
Copy link
Contributor Author

graysonzeng commented Jan 17, 2024

Thanks for your help @lhotari ,I have updated the PR and test it again.
Can you help me review it again? @hangc0276 @eolivelli I also think OrderedExecutor will cause performance regressions. How about we fix it by going through the synchronized first? If agree to use it first, I'd like to try adding a unit test to verify it.

@lhotari
Copy link
Member

lhotari commented Jan 25, 2024

Thanks for your help @lhotari ,I have updated the PR and test it again. Can you help me review it again? @hangc0276 @eolivelli I also think OrderedExecutor will cause performance regressions. How about we fix it by going through the synchronized first? If agree to use it first, I'd like to try adding a unit test to verify it.

@graysonzeng Does the current solution in this PR pass your tests?

@lhotari
Copy link
Member

lhotari commented Jan 25, 2024

Client's BookieWriteLedgerTest and DeferredSyncTest are failing. It is always possible that those tests are invalid. The logic adding for deferred sync doesn't make sense to me.

// Check if it is the next entry in the sequence.
if (pendingAddOp.entryId != 0 && pendingAddOp.entryId != pendingAddsSequenceHead + 1) {
if (LOG.isDebugEnabled()) {
LOG.debug("Head of the queue entryId: {} is not the expected value: {}", pendingAddOp.entryId,
pendingAddsSequenceHead + 1);
}
return;
}

More comments about this in #4171 (comment) . I hope @eolivelli would have the chance to chime in again.

@graysonzeng
Copy link
Contributor Author

graysonzeng commented Jan 30, 2024

@graysonzeng Does the current solution in this PR pass your tests?

it is pass. @lhotari

Client's BookieWriteLedgerTest and DeferredSyncTest are failing.

I'll fix it if needed

@lhotari
Copy link
Member

lhotari commented Feb 8, 2024

@graysonzeng is this related to #4194 / #4097 ?

@hezhangjian
Copy link
Member

@graysonzeng Thanks for your contribution, CI is not passed yet, any updates?

@graysonzeng
Copy link
Contributor Author

graysonzeng commented Jun 27, 2024

@graysonzeng Thanks for your contribution, CI is not passed yet, any updates?

@shoothzj Sorry for late reply, I think the current fix may still be controversial. Maybe it would be better to improve the PR after #4194 is approved? What do you think?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants