diff --git a/pkg/networkservice/common/serialize/client.go b/pkg/networkservice/common/serialize/client.go index 23b80ae3ba..f9202342aa 100644 --- a/pkg/networkservice/common/serialize/client.go +++ b/pkg/networkservice/common/serialize/client.go @@ -25,11 +25,10 @@ import ( "github.com/networkservicemesh/api/pkg/api/networkservice" "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" - "github.com/networkservicemesh/sdk/pkg/tools/multiexecutor" ) type serializeClient struct { - executor multiexecutor.Executor + executor multiExecutor } // NewClient returns a new serialize client chain element @@ -40,14 +39,14 @@ func NewClient() networkservice.NetworkServiceClient { func (c *serializeClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (conn *networkservice.Connection, err error) { connID := request.GetConnection().GetId() <-c.executor.AsyncExec(connID, func() { - conn, err = next.Client(ctx).Request(WithExecutor(ctx, newExecutorFunc(connID, &c.executor)), request, opts...) + conn, err = next.Client(ctx).Request(WithExecutor(ctx, c.executor.Executor(connID)), request, opts...) }) return conn, err } func (c *serializeClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (_ *empty.Empty, err error) { <-c.executor.AsyncExec(conn.GetId(), func() { - _, err = next.Client(ctx).Close(WithExecutor(ctx, newExecutorFunc(conn.GetId(), &c.executor)), conn, opts...) + _, err = next.Client(ctx).Close(WithExecutor(ctx, c.executor.Executor(conn.GetId())), conn, opts...) }) return new(empty.Empty), err } diff --git a/pkg/networkservice/common/serialize/context.go b/pkg/networkservice/common/serialize/context.go index f9174d4cd1..0e6a0c79d7 100644 --- a/pkg/networkservice/common/serialize/context.go +++ b/pkg/networkservice/common/serialize/context.go @@ -26,17 +26,22 @@ const ( type contextKeyType string -// WithExecutor wraps `parent` in a new context with ExecutorFunc -func WithExecutor(parent context.Context, executor ExecutorFunc) context.Context { +// IExecutor is a serialize.Executor interface type +type IExecutor interface { + AsyncExec(f func()) <-chan struct{} +} + +// WithExecutor wraps `parent` in a new context with IExecutor +func WithExecutor(parent context.Context, executor IExecutor) context.Context { if parent == nil { - parent = context.TODO() + panic("cannot create context from nil parent") } return context.WithValue(parent, executorKey, executor) } -// Executor returns ExecutorFunc -func Executor(ctx context.Context) ExecutorFunc { - if executor, ok := ctx.Value(executorKey).(ExecutorFunc); ok { +// Executor returns IExecutor +func Executor(ctx context.Context) IExecutor { + if executor, ok := ctx.Value(executorKey).(IExecutor); ok { return executor } return nil diff --git a/pkg/networkservice/common/serialize/executor_func.go b/pkg/networkservice/common/serialize/executor_func.go index 05d0a9b711..9025ea75f2 100644 --- a/pkg/networkservice/common/serialize/executor_func.go +++ b/pkg/networkservice/common/serialize/executor_func.go @@ -16,18 +16,8 @@ package serialize -import "github.com/networkservicemesh/sdk/pkg/tools/multiexecutor" +type executorFunc func(f func()) <-chan struct{} -// ExecutorFunc is a serialize.Executor.AsyncExec func type -type ExecutorFunc func(f func()) <-chan struct{} - -func newExecutorFunc(id string, executor *multiexecutor.Executor) ExecutorFunc { - return func(f func()) <-chan struct{} { - return executor.AsyncExec(id, f) - } -} - -// AsyncExec calls ExecutorFunc -func (e ExecutorFunc) AsyncExec(f func()) <-chan struct{} { +func (e executorFunc) AsyncExec(f func()) <-chan struct{} { return e(f) } diff --git a/pkg/tools/multiexecutor/multi_executor.go b/pkg/networkservice/common/serialize/multi_executor.go similarity index 67% rename from pkg/tools/multiexecutor/multi_executor.go rename to pkg/networkservice/common/serialize/multi_executor.go index ffdc789939..676caa7139 100644 --- a/pkg/tools/multiexecutor/multi_executor.go +++ b/pkg/networkservice/common/serialize/multi_executor.go @@ -14,8 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package multiexecutor provides serial executor with multiple execution queues -package multiexecutor +package serialize import ( "sync" @@ -23,37 +22,35 @@ import ( "github.com/edwarnicke/serialize" ) -// Executor is a serial executor with multiple execution queues -type Executor struct { - executors map[string]*executor +type multiExecutor struct { + executors map[string]*refCountExecutor executor serialize.Executor once sync.Once } -type executor struct { +type refCountExecutor struct { + count int executor serialize.Executor - refCount int } -// AsyncExec executes `f` serially in the `id` queue -func (e *Executor) AsyncExec(id string, f func()) (ch <-chan struct{}) { +func (e *multiExecutor) AsyncExec(id string, f func()) (ch <-chan struct{}) { e.once.Do(func() { - e.executors = make(map[string]*executor) + e.executors = make(map[string]*refCountExecutor) }) <-e.executor.AsyncExec(func() { exec, ok := e.executors[id] if !ok { - exec = new(executor) + exec = new(refCountExecutor) e.executors[id] = exec } - exec.refCount++ + exec.count++ ch = exec.executor.AsyncExec(func() { f() e.executor.AsyncExec(func() { - exec.refCount-- - if exec.refCount == 0 { + exec.count-- + if exec.count == 0 { delete(e.executors, id) } }) @@ -61,3 +58,9 @@ func (e *Executor) AsyncExec(id string, f func()) (ch <-chan struct{}) { }) return ch } + +func (e *multiExecutor) Executor(id string) IExecutor { + return executorFunc(func(f func()) <-chan struct{} { + return e.AsyncExec(id, f) + }) +} diff --git a/pkg/networkservice/common/serialize/server.go b/pkg/networkservice/common/serialize/server.go index 3897d42f9e..f763d80bfd 100644 --- a/pkg/networkservice/common/serialize/server.go +++ b/pkg/networkservice/common/serialize/server.go @@ -24,11 +24,10 @@ import ( "github.com/networkservicemesh/api/pkg/api/networkservice" "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" - "github.com/networkservicemesh/sdk/pkg/tools/multiexecutor" ) type serializeServer struct { - executor multiexecutor.Executor + executor multiExecutor } // NewServer returns a new serialize server chain element @@ -39,14 +38,14 @@ func NewServer() networkservice.NetworkServiceServer { func (s *serializeServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (conn *networkservice.Connection, err error) { connID := request.GetConnection().GetId() <-s.executor.AsyncExec(connID, func() { - conn, err = next.Server(ctx).Request(WithExecutor(ctx, newExecutorFunc(connID, &s.executor)), request) + conn, err = next.Server(ctx).Request(WithExecutor(ctx, s.executor.Executor(connID)), request) }) return conn, err } func (s *serializeServer) Close(ctx context.Context, conn *networkservice.Connection) (_ *empty.Empty, err error) { <-s.executor.AsyncExec(conn.GetId(), func() { - _, err = next.Server(ctx).Close(WithExecutor(ctx, newExecutorFunc(conn.GetId(), &s.executor)), conn) + _, err = next.Server(ctx).Close(WithExecutor(ctx, s.executor.Executor(conn.GetId())), conn) }) return new(empty.Empty), err } diff --git a/pkg/networkservice/common/serialize/server_test.go b/pkg/networkservice/common/serialize/server_test.go index 403e01a7b0..b21b8189bd 100644 --- a/pkg/networkservice/common/serialize/server_test.go +++ b/pkg/networkservice/common/serialize/server_test.go @@ -18,14 +18,17 @@ package serialize_test import ( "context" + "fmt" "sync" "sync/atomic" "testing" "github.com/golang/protobuf/ptypes/empty" - "github.com/networkservicemesh/api/pkg/api/networkservice" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + "go.uber.org/goleak" + + "github.com/networkservicemesh/api/pkg/api/networkservice" "github.com/networkservicemesh/sdk/pkg/networkservice/common/serialize" "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" @@ -36,7 +39,17 @@ const ( parallelCount = 1000 ) +func testRequest(id string) *networkservice.NetworkServiceRequest { + return &networkservice.NetworkServiceRequest{ + Connection: &networkservice.Connection{ + Id: id, + }, + } +} + func TestSerializeServer_StressTest(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -49,22 +62,17 @@ func TestSerializeServer_StressTest(t *testing.T) { new(eventServer), newParallelServer(t), ) - request := &networkservice.NetworkServiceRequest{ - Connection: &networkservice.Connection{ - Id: "id", - }, - } wg := new(sync.WaitGroup) wg.Add(parallelCount) for i := 0; i < parallelCount; i++ { - go func() { + go func(id string) { defer wg.Done() - conn, err := server.Request(ctx, request) + conn, err := server.Request(ctx, testRequest(id)) assert.NoError(t, err) _, err = server.Close(ctx, conn) assert.NoError(t, err) - }() + }(fmt.Sprint(i % 20)) } wg.Wait() } @@ -98,8 +106,8 @@ func (s *eventServer) Close(ctx context.Context, conn *networkservice.Connection } type parallelServer struct { - t *testing.T - state int32 + t *testing.T + states sync.Map } func newParallelServer(t *testing.T) *parallelServer { @@ -109,13 +117,19 @@ func newParallelServer(t *testing.T) *parallelServer { } func (s *parallelServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { - state := atomic.LoadInt32(&s.state) - assert.True(s.t, atomic.CompareAndSwapInt32(&s.state, state, state+1), "state has been changed") + raw, _ := s.states.LoadOrStore(request.Connection.Id, new(int32)) + statePtr := raw.(*int32) + + state := atomic.LoadInt32(statePtr) + assert.True(s.t, atomic.CompareAndSwapInt32(statePtr, state, state+1), "state has been changed") return next.Server(ctx).Request(ctx, request) } func (s *parallelServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) { - state := atomic.LoadInt32(&s.state) - assert.True(s.t, atomic.CompareAndSwapInt32(&s.state, state, state+1), "state has been changed") + raw, _ := s.states.LoadOrStore(conn.Id, new(int32)) + statePtr := raw.(*int32) + + state := atomic.LoadInt32(statePtr) + assert.True(s.t, atomic.CompareAndSwapInt32(statePtr, state, state+1), "state has been changed") return next.Server(ctx).Close(ctx, conn) } diff --git a/pkg/tools/multiexecutor/multi_executor_test.go b/pkg/tools/multiexecutor/multi_executor_test.go deleted file mode 100644 index d37933caa8..0000000000 --- a/pkg/tools/multiexecutor/multi_executor_test.go +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright (c) 2020 Doc.ai and/or its affiliates. -// -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package multiexecutor_test - -import ( - "sync" - "testing" - - "github.com/stretchr/testify/assert" - "go.uber.org/goleak" - - "github.com/networkservicemesh/sdk/pkg/tools/multiexecutor" -) - -const ( - parallelCount = 1000 -) - -var ids = []string{ - "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", -} - -func TestExecutor_AsyncExec(t *testing.T) { - defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) - - var wg sync.WaitGroup - wg.Add(parallelCount) - - var e multiexecutor.Executor - var vals intMap - for i := 0; i < parallelCount; i++ { - k := i - id := ids[i%len(ids)] - e.AsyncExec(id, func() { - defer wg.Done() - val := vals.load(id) - assert.Equal(t, k/len(ids), val) - vals.store(id, val+1) - }) - } - - wg.Wait() -} - -type intMap sync.Map - -func (m *intMap) load(key string) int { - val, ok := (*sync.Map)(m).Load(key) - if !ok { - return 0 - } - return val.(int) -} - -func (m *intMap) store(key string, val int) { - (*sync.Map)(m).Store(key, val) -}