Skip to content

Commit

Permalink
Rework serialize with multi executor
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Popov <[email protected]>
  • Loading branch information
Vladimir Popov committed Nov 26, 2020
1 parent 547a00b commit df6242b
Show file tree
Hide file tree
Showing 12 changed files with 115 additions and 493 deletions.
77 changes: 25 additions & 52 deletions pkg/networkservice/common/serialize/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,59 +4,32 @@

# Implementation

## serializer

`serializer` maps incoming `Connection.ID` to a [serialize.Executor](https://github.com/edwarnicke/serialize/blob/master/serialize.go),
request count and state. New mappings are created on Request. If request count and state are equal 0, mapping becomes
free and can be cleaned up. Request count is incremented with each Request before requesting the next chain element and
decrements after. All request count modifications are performed under the `serializer.executor`. State is being changed
in Request from 0 to random, or in Close from not 0 to 0. If state is equal 0, connection is closed so all incoming
Close events should not be processed.

To make possible a new chain element firing asynchronously with `Request`, `Close` events, serialize chain elements wraps
per-connection executor with [request executor](https://github.com/networkservicemesh/sdk/blob/master/pkg/networkservice/common/serialize/executor.go#L36),
[close executor](https://github.com/networkservicemesh/sdk/blob/master/pkg/networkservice/common/serialize/executor.go#L40)
and inserts them into the `Request` context. Such a thing is not being performed for the `Close` context because the
executor will already be cancelled by the time it becomes free. For the generated events state should be equal the
original state for the Request/CloseExecutor.

Correct `Request` firing chain element example:
```go
func (s *requestServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
executor := serialize.RequestExecutor(ctx)
go func() {
executor.AsyncExec(func() error {
_, _ = next.Server(ctx).Request(serialize.WithExecutorsFromContext(context.TODO(), ctx), request)
return nil
})
}()

return next.Server(ctx).Request(ctx, request)
}
```
## serializeServer, serializeClient

Serialize chain elements use [multi executor](https://github.com/networkservicemesh/sdk/blob/master/pkg/tools/multiexecutor/multi_executor.go)
and stores per-id executor in the context.

Correct `Close` firing chain element example:
Correct event firing chain element example:
```go
func (s *closeServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
conn, err := next.Server(ctx).Request(ctx, request)
if err != nil {
return nil, err
}

executor := serialize.CloseExecutor(ctx)
go func() {
executor.AsyncExec(func() error {
_, _ = next.Server(ctx).Close(context.TODO(), conn)
return errors.New() // I don't want to close the chain.
})
}()
go func() {
executor.AsyncExec(func() error {
_, _ = next.Server(ctx).Close(context.TODO(), conn)
return nil // I want to close the chain.
})
}()

return conn, err
func (s *eventServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
executor := serialize.Executor(ctx)
go func() {
executor.AsyncExec(func() {
_, _ = next.Server(ctx).Request(serialize.WithExecutor(context.TODO(), executor), request)
})
}()

conn, err := next.Server(ctx).Request(ctx, request)
if err != nil {
return nil, err
}

go func() {
executor.AsyncExec(func() {
_, _ = next.Server(ctx).Close(serialize.WithExecutor(context.TODO(), executor), conn)
})
}()

return conn, nil
}
```
19 changes: 9 additions & 10 deletions pkg/networkservice/common/serialize/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,29 @@ import (
"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/multiexecutor"
)

type serializeClient struct {
serializer
executor multiexecutor.Executor
}

// NewClient returns a new serialize client chain element
func NewClient() networkservice.NetworkServiceClient {
return &serializeClient{
serializer{
executors: map[string]*executor{},
},
}
return new(serializeClient)
}

func (c *serializeClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (conn *networkservice.Connection, err error) {
connID := request.GetConnection().GetId()
return c.requestConnection(ctx, connID, func(requestCtx context.Context) (*networkservice.Connection, error) {
return next.Client(ctx).Request(requestCtx, request, opts...)
<-c.executor.AsyncExec(connID, func() {
conn, err = next.Client(ctx).Request(WithExecutor(ctx, newExecutorFunc(connID, &c.executor)), request, opts...)
})
return conn, err
}

func (c *serializeClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (_ *empty.Empty, err error) {
return c.closeConnection(ctx, conn.GetId(), func(closeCtx context.Context) (*empty.Empty, error) {
return next.Client(ctx).Close(closeCtx, conn, opts...)
<-c.executor.AsyncExec(conn.GetId(), func() {
_, err = next.Client(ctx).Close(WithExecutor(ctx, newExecutorFunc(conn.GetId(), &c.executor)), conn, opts...)
})
return new(empty.Empty), err
}
38 changes: 7 additions & 31 deletions pkg/networkservice/common/serialize/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,46 +21,22 @@ import (
)

const (
requestExecutorKey contextKeyType = "requestExecutor"
closeExecutorKey contextKeyType = "closeExecutor"
executorKey contextKeyType = "executor"
)

type contextKeyType string

func withExecutors(parent context.Context, requestExecutor, closeExecutor Executor) context.Context {
// WithExecutor wraps `parent` in a new context with ExecutorFunc
func WithExecutor(parent context.Context, executor ExecutorFunc) context.Context {
if parent == nil {
parent = context.TODO()
}
return context.WithValue(context.WithValue(parent,
requestExecutorKey, requestExecutor),
closeExecutorKey, closeExecutor)
return context.WithValue(parent, executorKey, executor)
}

// WithExecutorsFromContext wraps `parent` in a new context with the executors from `executorsContext`
func WithExecutorsFromContext(parent, executorsContext context.Context) context.Context {
if parent == nil {
parent = context.TODO()
}
if requestExecutor := RequestExecutor(executorsContext); requestExecutor != nil {
parent = context.WithValue(parent, requestExecutorKey, requestExecutor)
}
if closeExecutor := CloseExecutor(executorsContext); closeExecutor != nil {
parent = context.WithValue(parent, closeExecutorKey, closeExecutor)
}
return parent
}

// RequestExecutor returns Request `Executor`
func RequestExecutor(ctx context.Context) Executor {
if executor, ok := ctx.Value(requestExecutorKey).(Executor); ok {
return executor
}
return nil
}

// CloseExecutor returns Close `Executor`
func CloseExecutor(ctx context.Context) Executor {
if executor, ok := ctx.Value(closeExecutorKey).(Executor); ok {
// Executor returns ExecutorFunc
func Executor(ctx context.Context) ExecutorFunc {
if executor, ok := ctx.Value(executorKey).(ExecutorFunc); ok {
return executor
}
return nil
Expand Down
68 changes: 0 additions & 68 deletions pkg/networkservice/common/serialize/executor.go

This file was deleted.

33 changes: 33 additions & 0 deletions pkg/networkservice/common/serialize/executor_func.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) 2020 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package serialize

import "github.com/networkservicemesh/sdk/pkg/tools/multiexecutor"

// ExecutorFunc is a serialize.Executor.AsyncExec func type
type ExecutorFunc func(f func()) <-chan struct{}

func newExecutorFunc(id string, executor *multiexecutor.Executor) ExecutorFunc {
return func(f func()) <-chan struct{} {
return executor.AsyncExec(id, f)
}
}

// AsyncExec calls ExecutorFunc
func (e ExecutorFunc) AsyncExec(f func()) <-chan struct{} {
return e(f)
}
Loading

0 comments on commit df6242b

Please sign in to comment.