Skip to content

Commit

Permalink
Fix race condition issue in serialize
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Popov <[email protected]>
  • Loading branch information
Vladimir Popov committed Nov 16, 2020
1 parent c4f9e15 commit d4ef042
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 38 deletions.
4 changes: 4 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,7 @@ issues:
linters:
- gocyclo
text: "processEvent"
- path: pkg/networkservice/common/serialize/common.go
linters:
- scopelint
text: "Using the variable on range scope `shouldRetry` in function literal"
62 changes: 32 additions & 30 deletions pkg/networkservice/common/serialize/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (

"github.com/edwarnicke/serialize"
"github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"

"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/tools/log"
Expand All @@ -42,34 +40,38 @@ func requestConnection(
// a retry Close case (but it isn't), double closing the Connection
newExecutor := new(serialize.Executor)
<-newExecutor.AsyncExec(func() {
executor, loaded := executors.LoadOrStore(connID, newExecutor)
// We should set `requestExecutor`, `closeExecutor` into the request context so the following chain elements
// can generate new Request, Close events and insert them into the chain in a serial way.
requestExecutor := newRequestExecutor(executor, connID, executors)
closeExecutor := newCloseExecutor(executor, connID, executors)
if loaded {
<-executor.AsyncExec(func() {
// Executor has been possibly removed at this moment, we need to store it back.
exec, _ := executors.LoadOrStore(connID, executor)
if exec != executor {
// It can happen in such situation:
// 1. -> close : locking `executor`
// 2. -> request-1 : waiting on `executor`
// 3. close -> : unlocking `executor`, removing it from `executors`
// 4. -> request-2 : creating `exec`, storing into `executors`, locking `exec`
// 5. -request-1-> : locking `executor`, trying to store it into `executors`
// at 5. we get `request-1` locking `executor`, `request-2` locking `exec` and only `exec` stored
// in `executors`. It means that `request-2` and all subsequent events will be executed in parallel
// with `request-1`.
err = errors.Errorf("race condition, parallel request execution: %v", connID)
return
}
if conn, err = requestConn(withExecutors(ctx, requestExecutor, closeExecutor)); err != nil {
executors.Delete(connID)
}
})
} else if conn, err = requestConn(withExecutors(ctx, requestExecutor, closeExecutor)); err != nil {
executors.Delete(connID)
for shouldRetry := true; shouldRetry; {
shouldRetry = false

executor, loaded := executors.LoadOrStore(connID, newExecutor)
// We should set `requestExecutor`, `closeExecutor` into the request context so the following chain elements
// can generate new Request, Close events and insert them into the chain in a serial way.
requestExecutor := newRequestExecutor(executor, connID, executors)
closeExecutor := newCloseExecutor(executor, connID, executors)
if loaded {
<-executor.AsyncExec(func() {
// Executor has been possibly removed at this moment, we need to store it back.
exec, _ := executors.LoadOrStore(connID, executor)
if exec != executor {
// It can happen in such situation:
// 1. -> close : locking `executor`
// 2. -> request-1 : waiting on `executor`
// 3. close -> : unlocking `executor`, removing it from `executors`
// 4. -> request-2 : creating `exec`, storing into `executors`, locking `exec`
// 5. -request-1-> : locking `executor`, trying to store it into `executors`
// at 5. we get `request-1` locking `executor`, `request-2` locking `exec` and only `exec` stored
// in `executors`. It means that `request-2` and all subsequent events will be executed in parallel
// with `request-1`.
shouldRetry = true
return
}
if conn, err = requestConn(withExecutors(ctx, requestExecutor, closeExecutor)); err != nil {
executors.Delete(connID)
}
})
} else if conn, err = requestConn(withExecutors(ctx, requestExecutor, closeExecutor)); err != nil {
executors.Delete(connID)
}
}
})

Expand Down
5 changes: 1 addition & 4 deletions pkg/networkservice/common/serialize/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,7 @@ func TestSerializeServer_StressTest(t *testing.T) {
go func() {
defer wg.Done()
conn, err := server.Request(ctx, request)
if err != nil {
assert.EqualError(t, err, "race condition, parallel request execution: id")
return
}
assert.NoError(t, err)
_, err = server.Close(ctx, conn)
assert.NoError(t, err)
}()
Expand Down
5 changes: 1 addition & 4 deletions pkg/networkservice/common/timeout/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,7 @@ func TestTimeoutServer_StressTest_DoubleClose(t *testing.T) {
go func() {
defer wg.Done()
conn, err := client.Request(ctx, stressTestRequest())
if err != nil {
assert.EqualError(t, err, "race condition, parallel request execution: server-id")
return
}
assert.NoError(t, err)
_, err = client.Close(ctx, conn)
assert.NoError(t, err)
}()
Expand Down

0 comments on commit d4ef042

Please sign in to comment.