Skip to content

Commit

Permalink
Rework serialize with RequestExecutor, CloseExecutor
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Popov <[email protected]>
  • Loading branch information
Vladimir Popov committed Nov 16, 2020
1 parent db9781b commit c4f9e15
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 97 deletions.
2 changes: 2 additions & 0 deletions pkg/networkservice/chains/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package client
import (
"context"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/serialize"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/updatetoken"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/inject/injectpeer"

Expand Down Expand Up @@ -59,6 +60,7 @@ func NewClient(ctx context.Context, name string, onHeal *networkservice.NetworkS
rv = chain.NewNetworkServiceClient(
append(
append([]networkservice.NetworkServiceClient{
serialize.NewClient(),
authorize.NewClient(),
updatepath.NewClient(name),
heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc), onHeal),
Expand Down
2 changes: 1 addition & 1 deletion pkg/networkservice/chains/endpoint/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ func NewServer(ctx context.Context, name string, authzServer networkservice.Netw
rv.NetworkServiceServer = setlogoption.NewServer(map[string]string{"chain": name},
chain.NewNetworkServiceServer(
append([]networkservice.NetworkServiceServer{
serialize.NewServer(),
authzServer,
updatepath.NewServer(name),
serialize.NewServer(),
// `timeout` uses ctx as a context for the timeout Close and it closes only the subsequent chain, so
// chain elements before the `timeout` in chain shouldn't make any updates to the Close context and
// shouldn't be closed on Connection Close.
Expand Down
44 changes: 22 additions & 22 deletions pkg/networkservice/common/serialize/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,41 +11,41 @@ mapping incoming `Connection.ID` to a [serialize.Executor](https://github.com/ed
New executors are created on Request. Close deletes existing executor for the request.

To make possible a new chain element firing asynchronously with `Request`, `Close` events, serialize chain elements wraps
per-connection executor with [serialize.CancellableExecutor](https://github.com/networkservicemesh/sdk/blob/master/pkg/networkservice/common/serialize/cancellable_executor.go)
and inserts it into the `Request` context. Such thing is not performed for the `Close` context because the executor will
already be cancelled by the time it becomes free.
per-connection executor with [request executor](https://github.com/edwarnicke/serialize/blob/master/executor.go#L35),
[close executor](https://github.com/edwarnicke/serialize/blob/master/executor.go#L50) and inserts them into the `Request`
context. Such thing is not performed for the `Close` context because the executor will already be cancelled by the time
it becomes free.

Correct `Request` firing chain element example:
```go
func (s *requestServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
executor := serialize.Executor(ctx)
go func() {
executor.AsyncExec(func() {
_, _ = next.Server(ctx).Request(serialize.WithExecutor(context.TODO(), executor), request)
})
}()
executor := serialize.RequestExecutor(ctx)
go func() {
executor.AsyncExec(func() {
_, _ = next.Server(ctx).Request(serialize.WithExecutorsFromContext(context.TODO(), ctx), request)
})
}()

return next.Server(ctx).Request(ctx, request)
return next.Server(ctx).Request(ctx, request)
}
```

Correct `Close` firing chain element example:
```go
func (s *closeServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
conn, err := next.Server(ctx).Request(ctx, request)
if err != nil {
return nil, err
}
conn, err := next.Server(ctx).Request(ctx, request)
if err != nil {
return nil, err
}

executor := serialize.Executor(ctx)
go func() {
executor.AsyncExec(func() {
_, _ = next.Server(ctx).Close(context.TODO(), conn)
executor.Cancel()
})
}()
executor := serialize.CloseExecutor(ctx)
go func() {
executor.AsyncExec(func() {
_, _ = next.Server(ctx).Close(context.TODO(), conn)
})
}()

return conn, err
return conn, err
}
```

Expand Down
49 changes: 0 additions & 49 deletions pkg/networkservice/common/serialize/cancellable_executor.go

This file was deleted.

19 changes: 8 additions & 11 deletions pkg/networkservice/common/serialize/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,10 @@ func requestConnection(
newExecutor := new(serialize.Executor)
<-newExecutor.AsyncExec(func() {
executor, loaded := executors.LoadOrStore(connID, newExecutor)
// We should set `cancellableExecutor` into the request context so the following chain elements can generate
// new Request, Close events and insert them into the chain in a serial way.
cancellableExecutor := &CancellableExecutor{
executors: executors,
id: connID,
executor: executor,
}
// We should set `requestExecutor`, `closeExecutor` into the request context so the following chain elements
// can generate new Request, Close events and insert them into the chain in a serial way.
requestExecutor := newRequestExecutor(executor, connID, executors)
closeExecutor := newCloseExecutor(executor, connID, executors)
if loaded {
<-executor.AsyncExec(func() {
// Executor has been possibly removed at this moment, we need to store it back.
Expand All @@ -67,11 +64,11 @@ func requestConnection(
err = errors.Errorf("race condition, parallel request execution: %v", connID)
return
}
if conn, err = requestConn(WithExecutor(ctx, cancellableExecutor)); err != nil {
if conn, err = requestConn(withExecutors(ctx, requestExecutor, closeExecutor)); err != nil {
executors.Delete(connID)
}
})
} else if conn, err = requestConn(WithExecutor(ctx, cancellableExecutor)); err != nil {
} else if conn, err = requestConn(withExecutors(ctx, requestExecutor, closeExecutor)); err != nil {
executors.Delete(connID)
}
})
Expand All @@ -92,8 +89,8 @@ func closeConnection(
<-executor.AsyncExec(func() {
var exec *serialize.Executor
if exec, ok = executors.Load(connID); ok && exec == executor {
// We don't set `cancellableExecutor` into the close context because it should be canceled by the time
// anything can be executed with it.
// We don't set `requestExecutor`, `closeExecutor` into the close context because they should be
// canceled by the time anything can be executed with them.
_, err = closeConn(ctx)
executors.Delete(connID)
} else {
Expand Down
38 changes: 31 additions & 7 deletions pkg/networkservice/common/serialize/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,46 @@ import (
)

const (
executorKey contextKeyType = "serialize.Executor"
requestExecutorKey contextKeyType = "requestExecutor"
closeExecutorKey contextKeyType = "closeExecutor"
)

type contextKeyType string

// WithExecutor wraps `parent` in a new context with the CancellableExecutor
func WithExecutor(parent context.Context, executor *CancellableExecutor) context.Context {
func withExecutors(parent context.Context, requestExecutor, closeExecutor Executor) context.Context {
if parent == nil {
parent = context.TODO()
}
return context.WithValue(parent, executorKey, executor)
return context.WithValue(context.WithValue(parent,
requestExecutorKey, requestExecutor),
closeExecutorKey, closeExecutor)
}

// Executor returns CancellableExecutor
func Executor(ctx context.Context) *CancellableExecutor {
if executor, ok := ctx.Value(executorKey).(*CancellableExecutor); ok {
// WithExecutorsFromContext wraps `parent` in a new context with the executors from `executorsContext`
func WithExecutorsFromContext(parent, executorsContext context.Context) context.Context {
if parent == nil {
parent = context.TODO()
}
if requestExecutor := RequestExecutor(executorsContext); requestExecutor != nil {
parent = context.WithValue(parent, requestExecutorKey, requestExecutor)
}
if closeExecutor := CloseExecutor(executorsContext); closeExecutor != nil {
parent = context.WithValue(parent, closeExecutorKey, closeExecutor)
}
return parent
}

// RequestExecutor returns Request `Executor`
func RequestExecutor(ctx context.Context) Executor {
if executor, ok := ctx.Value(requestExecutorKey).(Executor); ok {
return executor
}
return nil
}

// CloseExecutor returns Close `Executor`
func CloseExecutor(ctx context.Context) Executor {
if executor, ok := ctx.Value(closeExecutorKey).(Executor); ok {
return executor
}
return nil
Expand Down
58 changes: 58 additions & 0 deletions pkg/networkservice/common/serialize/executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) 2020 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package serialize

import (
"github.com/edwarnicke/serialize"
"github.com/pkg/errors"
)

// Executor is same as serialize.Executor except that it returns error channel
type Executor interface {
AsyncExec(f func()) <-chan error
}

type executorFunc func(f func()) <-chan error

func (ef executorFunc) AsyncExec(f func()) <-chan error {
return ef(f)
}

func newRequestExecutor(executor *serialize.Executor, id string, executors *executorMap) Executor {
return executorFunc(func(f func()) <-chan error {
errCh := make(chan error, 1)
executor.AsyncExec(func() {
if exec, ok := executors.Load(id); !ok || exec != executor {
errCh <- errors.Errorf("connection is already closed: %v", id)
return
}
f()
close(errCh)
})
return errCh
})
}

func newCloseExecutor(executor *serialize.Executor, id string, executors *executorMap) Executor {
return executorFunc(func(f func()) <-chan error {
exec := newRequestExecutor(executor, id, executors)
return exec.AsyncExec(func() {
f()
executors.Delete(id)
})
})
}
7 changes: 3 additions & 4 deletions pkg/networkservice/common/serialize/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ func TestSerializeServer_StressTest(t *testing.T) {
type requestServer struct{}

func (s *requestServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
executor := serialize.Executor(ctx)
executor := serialize.RequestExecutor(ctx)
go func() {
executor.AsyncExec(func() {
_, _ = next.Server(ctx).Request(serialize.WithExecutor(context.TODO(), executor), request)
_, _ = next.Server(ctx).Request(serialize.WithExecutorsFromContext(context.TODO(), ctx), request)
})
}()

Expand All @@ -100,11 +100,10 @@ func (s *closeServer) Request(ctx context.Context, request *networkservice.Netwo
return nil, err
}

executor := serialize.Executor(ctx)
executor := serialize.CloseExecutor(ctx)
go func() {
executor.AsyncExec(func() {
_, _ = next.Server(ctx).Close(context.TODO(), conn)
executor.Cancel()
})
}()

Expand Down
2 changes: 1 addition & 1 deletion pkg/networkservice/common/timeout/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ Example of correct timeoutServer usage in [endpoint.NewServer](https://github.co
```go
rv.NetworkServiceServer = chain.NewNetworkServiceServer(
append([]networkservice.NetworkServiceServer{
serialize.NewServer(ctx) // <-- should be before the timeoutServer
authzServer, // <-- shouldn't be closed, don't set anything to the context
updatepath.NewServer(name), // <-- same
serialize.NewServer(ctx) // <-- should be before the timeoutServer
timeout.NewServer(ctx), // <-- timeoutServer
monitor.NewServer(ctx, &rv.MonitorConnectionServer), // <-- should be closed
updatetoken.NewServer(tokenGenerator), // <-- should be closed
Expand Down
3 changes: 1 addition & 2 deletions pkg/networkservice/common/timeout/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (t *timeoutServer) Request(ctx context.Context, request *networkservice.Net
func (t *timeoutServer) createTimer(ctx context.Context, conn *networkservice.Connection) (*time.Timer, error) {
logEntry := log.Entry(ctx).WithField("timeoutServer", "createTimer")

executor := serialize.Executor(ctx)
executor := serialize.CloseExecutor(ctx)
if executor == nil {
return nil, errors.New("no executor provided")
}
Expand All @@ -108,7 +108,6 @@ func (t *timeoutServer) createTimer(ctx context.Context, conn *networkservice.Co
if err := t.close(t.ctx, conn, next.Server(ctx)); err != nil {
logEntry.Errorf("failed to close timed out connection: %v %+v", conn.GetId(), err)
}
executor.Cancel()
}); err != nil {
logEntry.Warnf("connection has been already closed: %v", conn.GetId())
}
Expand Down

0 comments on commit c4f9e15

Please sign in to comment.