From d4ef042d7e362c1ae2494f152cb5f9a9042fddc3 Mon Sep 17 00:00:00 2001 From: Vladimir Popov Date: Mon, 16 Nov 2020 13:51:59 +0700 Subject: [PATCH] Fix race condition issue in serialize Signed-off-by: Vladimir Popov --- .golangci.yml | 4 ++ pkg/networkservice/common/serialize/common.go | 62 ++++++++++--------- .../common/serialize/server_test.go | 5 +- .../common/timeout/server_test.go | 5 +- 4 files changed, 38 insertions(+), 38 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 362a1d63fb..9a70a9f0a0 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -253,3 +253,7 @@ issues: linters: - gocyclo text: "processEvent" + - path: pkg/networkservice/common/serialize/common.go + linters: + - scopelint + text: "Using the variable on range scope `shouldRetry` in function literal" diff --git a/pkg/networkservice/common/serialize/common.go b/pkg/networkservice/common/serialize/common.go index 1785b8d4cd..49a493e071 100644 --- a/pkg/networkservice/common/serialize/common.go +++ b/pkg/networkservice/common/serialize/common.go @@ -22,8 +22,6 @@ import ( "github.com/edwarnicke/serialize" "github.com/golang/protobuf/ptypes/empty" - "github.com/pkg/errors" - "github.com/networkservicemesh/api/pkg/api/networkservice" "github.com/networkservicemesh/sdk/pkg/tools/log" @@ -42,34 +40,38 @@ func requestConnection( // a retry Close case (but it isn't), double closing the Connection newExecutor := new(serialize.Executor) <-newExecutor.AsyncExec(func() { - executor, loaded := executors.LoadOrStore(connID, newExecutor) - // We should set `requestExecutor`, `closeExecutor` into the request context so the following chain elements - // can generate new Request, Close events and insert them into the chain in a serial way. - requestExecutor := newRequestExecutor(executor, connID, executors) - closeExecutor := newCloseExecutor(executor, connID, executors) - if loaded { - <-executor.AsyncExec(func() { - // Executor has been possibly removed at this moment, we need to store it back. - exec, _ := executors.LoadOrStore(connID, executor) - if exec != executor { - // It can happen in such situation: - // 1. -> close : locking `executor` - // 2. -> request-1 : waiting on `executor` - // 3. close -> : unlocking `executor`, removing it from `executors` - // 4. -> request-2 : creating `exec`, storing into `executors`, locking `exec` - // 5. -request-1-> : locking `executor`, trying to store it into `executors` - // at 5. we get `request-1` locking `executor`, `request-2` locking `exec` and only `exec` stored - // in `executors`. It means that `request-2` and all subsequent events will be executed in parallel - // with `request-1`. - err = errors.Errorf("race condition, parallel request execution: %v", connID) - return - } - if conn, err = requestConn(withExecutors(ctx, requestExecutor, closeExecutor)); err != nil { - executors.Delete(connID) - } - }) - } else if conn, err = requestConn(withExecutors(ctx, requestExecutor, closeExecutor)); err != nil { - executors.Delete(connID) + for shouldRetry := true; shouldRetry; { + shouldRetry = false + + executor, loaded := executors.LoadOrStore(connID, newExecutor) + // We should set `requestExecutor`, `closeExecutor` into the request context so the following chain elements + // can generate new Request, Close events and insert them into the chain in a serial way. + requestExecutor := newRequestExecutor(executor, connID, executors) + closeExecutor := newCloseExecutor(executor, connID, executors) + if loaded { + <-executor.AsyncExec(func() { + // Executor has been possibly removed at this moment, we need to store it back. + exec, _ := executors.LoadOrStore(connID, executor) + if exec != executor { + // It can happen in such situation: + // 1. -> close : locking `executor` + // 2. -> request-1 : waiting on `executor` + // 3. close -> : unlocking `executor`, removing it from `executors` + // 4. -> request-2 : creating `exec`, storing into `executors`, locking `exec` + // 5. -request-1-> : locking `executor`, trying to store it into `executors` + // at 5. we get `request-1` locking `executor`, `request-2` locking `exec` and only `exec` stored + // in `executors`. It means that `request-2` and all subsequent events will be executed in parallel + // with `request-1`. + shouldRetry = true + return + } + if conn, err = requestConn(withExecutors(ctx, requestExecutor, closeExecutor)); err != nil { + executors.Delete(connID) + } + }) + } else if conn, err = requestConn(withExecutors(ctx, requestExecutor, closeExecutor)); err != nil { + executors.Delete(connID) + } } }) diff --git a/pkg/networkservice/common/serialize/server_test.go b/pkg/networkservice/common/serialize/server_test.go index 32dac6d3c0..4152fa4e54 100644 --- a/pkg/networkservice/common/serialize/server_test.go +++ b/pkg/networkservice/common/serialize/server_test.go @@ -64,10 +64,7 @@ func TestSerializeServer_StressTest(t *testing.T) { go func() { defer wg.Done() conn, err := server.Request(ctx, request) - if err != nil { - assert.EqualError(t, err, "race condition, parallel request execution: id") - return - } + assert.NoError(t, err) _, err = server.Close(ctx, conn) assert.NoError(t, err) }() diff --git a/pkg/networkservice/common/timeout/server_test.go b/pkg/networkservice/common/timeout/server_test.go index 560d8dc5dd..a88870334b 100644 --- a/pkg/networkservice/common/timeout/server_test.go +++ b/pkg/networkservice/common/timeout/server_test.go @@ -161,10 +161,7 @@ func TestTimeoutServer_StressTest_DoubleClose(t *testing.T) { go func() { defer wg.Done() conn, err := client.Request(ctx, stressTestRequest()) - if err != nil { - assert.EqualError(t, err, "race condition, parallel request execution: server-id") - return - } + assert.NoError(t, err) _, err = client.Close(ctx, conn) assert.NoError(t, err) }()