Skip to content

Commit

Permalink
Move multiExecutor to serialize
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Popov <[email protected]>
  • Loading branch information
Vladimir Popov committed Dec 1, 2020
1 parent 227c24b commit 08cebc5
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 126 deletions.
7 changes: 3 additions & 4 deletions pkg/networkservice/common/serialize/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ import (
"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/multiexecutor"
)

type serializeClient struct {
executor multiexecutor.Executor
executor multiExecutor
}

// NewClient returns a new serialize client chain element
Expand All @@ -40,14 +39,14 @@ func NewClient() networkservice.NetworkServiceClient {
func (c *serializeClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (conn *networkservice.Connection, err error) {
connID := request.GetConnection().GetId()
<-c.executor.AsyncExec(connID, func() {
conn, err = next.Client(ctx).Request(WithExecutor(ctx, newExecutorFunc(connID, &c.executor)), request, opts...)
conn, err = next.Client(ctx).Request(WithExecutor(ctx, c.executor.Executor(connID)), request, opts...)
})
return conn, err
}

func (c *serializeClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (_ *empty.Empty, err error) {
<-c.executor.AsyncExec(conn.GetId(), func() {
_, err = next.Client(ctx).Close(WithExecutor(ctx, newExecutorFunc(conn.GetId(), &c.executor)), conn, opts...)
_, err = next.Client(ctx).Close(WithExecutor(ctx, c.executor.Executor(conn.GetId())), conn, opts...)
})
return new(empty.Empty), err
}
17 changes: 11 additions & 6 deletions pkg/networkservice/common/serialize/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,22 @@ const (

type contextKeyType string

// WithExecutor wraps `parent` in a new context with ExecutorFunc
func WithExecutor(parent context.Context, executor ExecutorFunc) context.Context {
// IExecutor is a serialize.Executor interface type
type IExecutor interface {
AsyncExec(f func()) <-chan struct{}
}

// WithExecutor wraps `parent` in a new context with IExecutor
func WithExecutor(parent context.Context, executor IExecutor) context.Context {
if parent == nil {
parent = context.TODO()
panic("cannot create context from nil parent")
}
return context.WithValue(parent, executorKey, executor)
}

// Executor returns ExecutorFunc
func Executor(ctx context.Context) ExecutorFunc {
if executor, ok := ctx.Value(executorKey).(ExecutorFunc); ok {
// Executor returns IExecutor
func Executor(ctx context.Context) IExecutor {
if executor, ok := ctx.Value(executorKey).(IExecutor); ok {
return executor
}
return nil
Expand Down
14 changes: 2 additions & 12 deletions pkg/networkservice/common/serialize/executor_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,8 @@

package serialize

import "github.com/networkservicemesh/sdk/pkg/tools/multiexecutor"
type executorFunc func(f func()) <-chan struct{}

// ExecutorFunc is a serialize.Executor.AsyncExec func type
type ExecutorFunc func(f func()) <-chan struct{}

func newExecutorFunc(id string, executor *multiexecutor.Executor) ExecutorFunc {
return func(f func()) <-chan struct{} {
return executor.AsyncExec(id, f)
}
}

// AsyncExec calls ExecutorFunc
func (e ExecutorFunc) AsyncExec(f func()) <-chan struct{} {
func (e executorFunc) AsyncExec(f func()) <-chan struct{} {
return e(f)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,50 +14,53 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package multiexecutor provides serial executor with multiple execution queues
package multiexecutor
package serialize

import (
"sync"

"github.com/edwarnicke/serialize"
)

// Executor is a serial executor with multiple execution queues
type Executor struct {
executors map[string]*executor
type multiExecutor struct {
executors map[string]*refCountExecutor
executor serialize.Executor
once sync.Once
}

type executor struct {
type refCountExecutor struct {
count int
executor serialize.Executor
refCount int
}

// AsyncExec executes `f` serially in the `id` queue
func (e *Executor) AsyncExec(id string, f func()) (ch <-chan struct{}) {
func (e *multiExecutor) AsyncExec(id string, f func()) (ch <-chan struct{}) {
e.once.Do(func() {
e.executors = make(map[string]*executor)
e.executors = make(map[string]*refCountExecutor)
})

<-e.executor.AsyncExec(func() {
exec, ok := e.executors[id]
if !ok {
exec = new(executor)
exec = new(refCountExecutor)
e.executors[id] = exec
}
exec.refCount++
exec.count++

ch = exec.executor.AsyncExec(func() {
f()
e.executor.AsyncExec(func() {
exec.refCount--
if exec.refCount == 0 {
exec.count--
if exec.count == 0 {
delete(e.executors, id)
}
})
})
})
return ch
}

func (e *multiExecutor) Executor(id string) IExecutor {
return executorFunc(func(f func()) <-chan struct{} {
return e.AsyncExec(id, f)
})
}
7 changes: 3 additions & 4 deletions pkg/networkservice/common/serialize/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ import (
"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/multiexecutor"
)

type serializeServer struct {
executor multiexecutor.Executor
executor multiExecutor
}

// NewServer returns a new serialize server chain element
Expand All @@ -39,14 +38,14 @@ func NewServer() networkservice.NetworkServiceServer {
func (s *serializeServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (conn *networkservice.Connection, err error) {
connID := request.GetConnection().GetId()
<-s.executor.AsyncExec(connID, func() {
conn, err = next.Server(ctx).Request(WithExecutor(ctx, newExecutorFunc(connID, &s.executor)), request)
conn, err = next.Server(ctx).Request(WithExecutor(ctx, s.executor.Executor(connID)), request)
})
return conn, err
}

func (s *serializeServer) Close(ctx context.Context, conn *networkservice.Connection) (_ *empty.Empty, err error) {
<-s.executor.AsyncExec(conn.GetId(), func() {
_, err = next.Server(ctx).Close(WithExecutor(ctx, newExecutorFunc(conn.GetId(), &s.executor)), conn)
_, err = next.Server(ctx).Close(WithExecutor(ctx, s.executor.Executor(conn.GetId())), conn)
})
return new(empty.Empty), err
}
44 changes: 29 additions & 15 deletions pkg/networkservice/common/serialize/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ package serialize_test

import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"

"github.com/golang/protobuf/ptypes/empty"
"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"go.uber.org/goleak"

"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/serialize"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/chain"
Expand All @@ -36,7 +39,17 @@ const (
parallelCount = 1000
)

func testRequest(id string) *networkservice.NetworkServiceRequest {
return &networkservice.NetworkServiceRequest{
Connection: &networkservice.Connection{
Id: id,
},
}
}

func TestSerializeServer_StressTest(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -49,22 +62,17 @@ func TestSerializeServer_StressTest(t *testing.T) {
new(eventServer),
newParallelServer(t),
)
request := &networkservice.NetworkServiceRequest{
Connection: &networkservice.Connection{
Id: "id",
},
}

wg := new(sync.WaitGroup)
wg.Add(parallelCount)
for i := 0; i < parallelCount; i++ {
go func() {
go func(id string) {
defer wg.Done()
conn, err := server.Request(ctx, request)
conn, err := server.Request(ctx, testRequest(id))
assert.NoError(t, err)
_, err = server.Close(ctx, conn)
assert.NoError(t, err)
}()
}(fmt.Sprint(i % 20))
}
wg.Wait()
}
Expand Down Expand Up @@ -98,8 +106,8 @@ func (s *eventServer) Close(ctx context.Context, conn *networkservice.Connection
}

type parallelServer struct {
t *testing.T
state int32
t *testing.T
states sync.Map
}

func newParallelServer(t *testing.T) *parallelServer {
Expand All @@ -109,13 +117,19 @@ func newParallelServer(t *testing.T) *parallelServer {
}

func (s *parallelServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
state := atomic.LoadInt32(&s.state)
assert.True(s.t, atomic.CompareAndSwapInt32(&s.state, state, state+1), "state has been changed")
raw, _ := s.states.LoadOrStore(request.Connection.Id, new(int32))
statePtr := raw.(*int32)

state := atomic.LoadInt32(statePtr)
assert.True(s.t, atomic.CompareAndSwapInt32(statePtr, state, state+1), "state has been changed")
return next.Server(ctx).Request(ctx, request)
}

func (s *parallelServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) {
state := atomic.LoadInt32(&s.state)
assert.True(s.t, atomic.CompareAndSwapInt32(&s.state, state, state+1), "state has been changed")
raw, _ := s.states.LoadOrStore(conn.Id, new(int32))
statePtr := raw.(*int32)

state := atomic.LoadInt32(statePtr)
assert.True(s.t, atomic.CompareAndSwapInt32(statePtr, state, state+1), "state has been changed")
return next.Server(ctx).Close(ctx, conn)
}
71 changes: 0 additions & 71 deletions pkg/tools/multiexecutor/multi_executor_test.go

This file was deleted.

0 comments on commit 08cebc5

Please sign in to comment.