Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed requestResponseSubscriber race condition #137

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

echistyakov
Copy link

Fixing data race condition in requestResponseSubscriber

Motivation:

I discovered this race condition after running a stress test with race flag against the Facebook RSocket Server/Client:

==================
WARNING: DATA RACE
Write at 0x00c00461a590 by goroutine 152328:
  github.com/rsocket/rsocket-go/internal/socket.returnRequestResponseSubscriber()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/internal/socket/subscriber_request_response.go:58 +0xd1
  github.com/rsocket/rsocket-go/internal/socket.(*requestResponseSubscriber).finish()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/internal/socket/subscriber_request_response.go:95 +0x95
  github.com/rsocket/rsocket-go/internal/socket.(*requestResponseSubscriber).OnError.func1()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/internal/socket/subscriber_request_response.go:70 +0x79
  runtime.deferreturn()
      third-party/go/1.23.4/linux_amd64/src/runtime/panic.go:605 +0x5d
  github.com/rsocket/rsocket-go/rx.(*subscriberFacade).OnError()
      <autogenerated>:1 +0x5a
  github.com/jjeffcaii/reactor-go/mono.(*sink).Cancel()
      fbcode/third-party-go/vendor/github.com/jjeffcaii/reactor-go/mono/create.go:71 +0xb1
  github.com/rsocket/rsocket-go/internal/socket.requestResponseCallbackReverse.stopWithError()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/internal/socket/callback.go:56 +0x4c
  github.com/rsocket/rsocket-go/internal/socket.(*requestResponseCallbackReverse).stopWithError()
      <autogenerated>:1 +0x59
  github.com/rsocket/rsocket-go/internal/socket.(*DuplexConnection).destroyHandler()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/internal/socket/duplex.go:171 +0x121
  github.com/rsocket/rsocket-go/internal/socket.(*DuplexConnection).Close()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/internal/socket/duplex.go:135 +0x2a8
  github.com/rsocket/rsocket-go/internal/socket.(*BaseSocket).close.func1()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/internal/socket/base_socket.go:86 +0x64
  sync.(*Once).doSlow()
      third-party/go/1.23.4/linux_amd64/src/sync/once.go:76 +0xe1
  sync.(*Once).Do()
      third-party/go/1.23.4/linux_amd64/src/sync/once.go:67 +0x44
  github.com/rsocket/rsocket-go/internal/socket.(*BaseSocket).close()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/internal/socket/base_socket.go:85 +0xb6
  github.com/rsocket/rsocket-go/internal/socket.(*BaseSocket).Close()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/internal/socket/base_socket.go:81 +0x3c
  github.com/rsocket/rsocket-go/internal/socket.(*simpleServerSocket).Close()
      <autogenerated>:1 +0x3e
  github.com/rsocket/rsocket-go.(*server).Serve.func3.1()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/server.go:168 +0x347
  runtime.deferreturn()
      third-party/go/1.23.4/linux_amd64/src/runtime/panic.go:605 +0x5d
  thrift/lib/go/thrift.(*rocketServerTransport).processRocketRequests()
      fbcode/thrift/lib/go/thrift/rocket_server_transport.go:178 +0x161
  thrift/lib/go/thrift.(*rocketServerTransport).processRequests()
      fbcode/thrift/lib/go/thrift/rocket_server_transport.go:146 +0xaf1
  thrift/lib/go/thrift.(*rocketServerTransport).acceptLoop.func1()
      fbcode/thrift/lib/go/thrift/rocket_server_transport.go:112 +0x18a
  thrift/lib/go/thrift.(*rocketServerTransport).acceptLoop.gowrap1()
      fbcode/thrift/lib/go/thrift/rocket_server_transport.go:113 +0x6e

Previous write at 0x00c00461a590 by goroutine 1704:
  sync/atomic.AddInt32()
      third-party/go/1.23.4/linux_amd64/src/runtime/race_amd64.s:281 +0xb
  sync/atomic.AddInt32()
      <autogenerated>:1 +0x14
  github.com/rsocket/rsocket-go/rx.subscriberFacade.OnNext()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/rx/subscriber.go:53 +0xa9
  github.com/rsocket/rsocket-go/rx.(*subscriberFacade).OnNext()
      <autogenerated>:1 +0x59
  github.com/jjeffcaii/reactor-go/mono.(*sink).Next()
      fbcode/third-party-go/vendor/github.com/jjeffcaii/reactor-go/mono/create.go:101 +0xc4
  github.com/jjeffcaii/reactor-go/mono.(*sink).Success()
      fbcode/third-party-go/vendor/github.com/jjeffcaii/reactor-go/mono/create.go:58 +0x98
  github.com/rsocket/rsocket-go/rx/mono.sinkProxy.Success()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/rx/mono/utils.go:95 +0x88
  github.com/rsocket/rsocket-go/rx/mono.(*sinkProxy).Success()
      <autogenerated>:1 +0x59
  github.com/rsocket/rsocket-go/rx/mono.FromFunc.func1()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/rx/mono/utils.go:21 +0xdb
  github.com/rsocket/rsocket-go/rx/mono.Create.func1()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/rx/mono/utils.go:74 +0x8f
  github.com/jjeffcaii/reactor-go/mono.newMonoCreate.func1()
      fbcode/third-party-go/vendor/github.com/jjeffcaii/reactor-go/mono/create.go:46 +0x1e1
  github.com/jjeffcaii/reactor-go/mono.monoCreate.SubscribeWith()
      fbcode/third-party-go/vendor/github.com/jjeffcaii/reactor-go/mono/create.go:108 +0x108
  github.com/jjeffcaii/reactor-go/mono.(*wrapper).SubscribeWith()
      <autogenerated>:1 +0x72
  github.com/rsocket/rsocket-go/rx/mono.proxy.SubscribeWith()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/rx/mono/proxy_default.go:157 +0x171
  github.com/rsocket/rsocket-go/rx/mono.(*proxy).SubscribeWith()
      <autogenerated>:1 +0x78
  github.com/rsocket/rsocket-go/internal/socket.(*DuplexConnection).respondRequestResponse.func2()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/internal/socket/duplex.go:545 +0x95
  github.com/panjf2000/ants/v2.(*goWorker).run.func1()
      fbcode/third-party-go/vendor/github.com/panjf2000/ants/v2/worker.go:67 +0x130

Goroutine 152328 (running) created at:
  thrift/lib/go/thrift.(*rocketServerTransport).acceptLoop()
      fbcode/thrift/lib/go/thrift/rocket_server_transport.go:93 +0x2e4
  thrift/lib/go/thrift.(*rocketServerTransport).Listen()
      fbcode/thrift/lib/go/thrift/rocket_server_transport.go:67 +0x12a
  github.com/rsocket/rsocket-go.(*server).Serve()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/server.go:225 +0x543
  thrift/lib/go/thrift.(*rocketServer).ServeContext()
      fbcode/thrift/lib/go/thrift/rocket_server.go:92 +0x1ab
  thrift/lib/go/thrift/stress.runStressTest.func2()
      fbcode/thrift/lib/go/thrift/stress/server_test.go:86 +0x58
  golang.org/x/sync/errgroup.(*Group).Go.func1()
      fbcode/third-party-go/vendor/golang.org/x/sync/errgroup/errgroup.go:78 +0xa1

Goroutine 1704 (running) created at:
  github.com/panjf2000/ants/v2.(*goWorker).run()
      fbcode/third-party-go/vendor/github.com/panjf2000/ants/v2/worker.go:48 +0xc4
  github.com/panjf2000/ants/v2.(*Pool).retrieveWorker()
      fbcode/third-party-go/vendor/github.com/panjf2000/ants/v2/pool.go:348 +0x384
  github.com/panjf2000/ants/v2.(*Pool).Submit()
      fbcode/third-party-go/vendor/github.com/panjf2000/ants/v2/pool.go:222 +0x67
  github.com/jjeffcaii/reactor-go/scheduler.(*elasticScheduler).Do()
      fbcode/third-party-go/vendor/github.com/jjeffcaii/reactor-go/scheduler/elastic.go:34 +0x56
  github.com/rsocket/rsocket-go/internal/socket.mustExecute()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/internal/socket/duplex.go:44 +0x59
  github.com/rsocket/rsocket-go/internal/socket.(*DuplexConnection).respondRequestResponse()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/internal/socket/duplex.go:544 +0x35e
  github.com/rsocket/rsocket-go/internal/socket.(*DuplexConnection).onFrameRequestResponse()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/internal/socket/duplex.go:505 +0x99
  github.com/rsocket/rsocket-go/internal/socket.(*DuplexConnection).onFrameRequestResponse-fm()
      <autogenerated>:1 +0x47
  github.com/rsocket/rsocket-go/core/transport.(*Transport).DispatchFrame()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/core/transport/transport.go:322 +0x734
  github.com/rsocket/rsocket-go/core/transport.(*Transport).loopReadBuffer()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/core/transport/transport.go:180 +0x1d4
  github.com/rsocket/rsocket-go/core/transport.(*Transport).Start.gowrap1()
      fbcode/third-party-go/vendor/github.com/rsocket/rsocket-go/core/transport/transport.go:215 +0x79
==================

Modifications:

  1. I updated sndCnt field to be atomic.Int32 (better and easier API).
  2. I changed non-atomic write in returnRequestResponseSubscriber method to be atomic.

Result:

With this fix - the data race goes away.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant