Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock on RSocketRequester lock when merging output of rsocket channels into another channel #1059

Closed
lkolisko opened this issue Aug 10, 2022 · 1 comment · Fixed by #1060
Labels
bug superseded Issue is superseded by another

Comments

@lkolisko
Copy link

I am running into a deadlock between close and cancel when multiple rsocket channels output goes through FluxMerge reactor operator with output to a single rsocket channel. Is there any guidance on how to avoid the issue or investigate further?
Thanks in advance.

Expected Behavior

  • No deadlock

Actual Behavior

  • Thread epoll-10 is in deadlock with thread epoll-12
epoll-10 

stackTrace:
java.lang.Thread.State: BLOCKED (on object monitor)
at io.rsocket.core.RequesterResponderSupport.remove(RequesterResponderSupport.java:152)
- waiting to lock <0x00000005d37ad000> (a io.rsocket.core.RSocketRequester)
at io.rsocket.core.RequestChannelRequesterFlux.tryCancel(RequestChannelRequesterFlux.java:513)
at io.rsocket.core.RequestChannelRequesterFlux.cancel(RequestChannelRequesterFlux.java:480)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.cancel(FluxPublishOn.java:277)
at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:169)
....
at io.rsocket.core.RequestChannelRequesterFlux.handleError(RequestChannelRequesterFlux.java:698)
at io.rsocket.core.RSocketRequester.terminate(RSocketRequester.java:354)
- locked <0x00000005d37ad810> (a io.rsocket.core.RSocketRequester)
at io.rsocket.core.RSocketRequester.tryShutdown(RSocketRequester.java:328)
at io.rsocket.core.RSocketRequester$$Lambda$1654/0x00000008015c9948.run(Unknown Source)
at reactor.core.publisher.LambdaMonoSubscriber.onComplete(LambdaMonoSubscriber.java:135)
at reactor.core.publisher.SinkEmptyMulticast$VoidInner.complete(SinkEmptyMulticast.java:238)
at reactor.core.publisher.SinkEmptyMulticast.tryEmitEmpty(SinkEmptyMulticast.java:70)
at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:46)
at io.rsocket.internal.BaseDuplexConnection.dispose(BaseDuplexConnection.java:51)
at io.rsocket.transport.netty.TcpDuplexConnection.lambda$new$0(TcpDuplexConnection.java:49)
at io.rsocket.transport.netty.TcpDuplexConnection$$Lambda$1641/0x00000008015c0df8.operationComplete(Unknown Source)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605)
at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1164)
at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:755)
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:731)
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:620)
at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.shutdownInput(AbstractEpollChannel.java:522)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.handleReadException(AbstractEpollStreamChannel.java:733)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:826)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:487)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:385)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run([email protected]/Thread.java:833)
Locked ownable synchronizers:
- None
epoll-12

stackTrace:
java.lang.Thread.State: BLOCKED (on object monitor)
at io.rsocket.core.RequesterResponderSupport.remove(RequesterResponderSupport.java:152)
- waiting to lock <0x00000005d37ad810> (a io.rsocket.core.RSocketRequester)
at io.rsocket.core.RequestChannelRequesterFlux.tryCancel(RequestChannelRequesterFlux.java:513)
at io.rsocket.core.RequestChannelRequesterFlux.cancel(RequestChannelRequesterFlux.java:480)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.cancel(FluxPublishOn.java:277)
at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:169)
at reactor.core.publisher.Operators.terminate(Operators.java:1240)
....
at reactor.core.scheduler.ImmediateScheduler$ImmediateSchedulerWorker.schedule(ImmediateScheduler.java:84)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.trySchedule(FluxPublishOn.java:312)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onError(FluxPublishOn.java:248)
at io.rsocket.core.RequestChannelRequesterFlux.handleError(RequestChannelRequesterFlux.java:698)
at io.rsocket.core.RSocketRequester.terminate(RSocketRequester.java:354)
- locked <0x00000005d37ad000> (a io.rsocket.core.RSocketRequester)
at io.rsocket.core.RSocketRequester.tryShutdown(RSocketRequester.java:328)
at io.rsocket.core.RSocketRequester$$Lambda$1654/0x00000008015c9948.run(Unknown Source)
at reactor.core.publisher.LambdaMonoSubscriber.onComplete(LambdaMonoSubscriber.java:135)
at reactor.core.publisher.SinkEmptyMulticast$VoidInner.complete(SinkEmptyMulticast.java:238)
at reactor.core.publisher.SinkEmptyMulticast.tryEmitEmpty(SinkEmptyMulticast.java:70)
at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:46)
at io.rsocket.internal.BaseDuplexConnection.dispose(BaseDuplexConnection.java:51)
at io.rsocket.transport.netty.TcpDuplexConnection.lambda$new$0(TcpDuplexConnection.java:49)
at io.rsocket.transport.netty.TcpDuplexConnection$$Lambda$1641/0x00000008015c0df8.operationComplete(Unknown Source)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605)
at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1164)
at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:755)
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:731)
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:620)
at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.shutdownInput(AbstractEpollChannel.java:522)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.handleReadException(AbstractEpollStreamChannel.java:733)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:826)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:487)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:385)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run([email protected]/Thread.java:833)
Locked ownable synchronizers:
- None

Steps to Reproduce

  • The flow is that the output of multiple rsocket channels is merged and written as an output to another rsocket channel.
  • Unfortunately I do not have a simplified test to reproduce the issue at the moment.

Environment

  • OpenJDK 64-Bit Server VM Temurin-17.0.3+7 (build 17.0.3+7, mixed mode, sharing)
  • rsocket 1.1.2
  • netty 4.1.79
  • reactor-core 3.4.21
@OlegDokuka OlegDokuka added the bug label Aug 11, 2022
@OlegDokuka OlegDokuka added this to the 1.1.3 milestone Aug 11, 2022
@OlegDokuka OlegDokuka added the superseded Issue is superseded by another label Aug 16, 2022
@OlegDokuka OlegDokuka removed this from the 1.1.3 milestone Aug 16, 2022
@OlegDokuka
Copy link
Member

OlegDokuka commented Aug 16, 2022

@lkolisko thanks for finding this! Really helpful. Fixed and should be released in 1.1.3 next month

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug superseded Issue is superseded by another
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants