From c4f9e1529e12ed03a1a4acd06848642f52b7c96a Mon Sep 17 00:00:00 2001 From: Vladimir Popov Date: Mon, 16 Nov 2020 10:54:01 +0700 Subject: [PATCH] Rework serialize with RequestExecutor, CloseExecutor Signed-off-by: Vladimir Popov --- pkg/networkservice/chains/client/client.go | 2 + pkg/networkservice/chains/endpoint/server.go | 2 +- pkg/networkservice/common/serialize/README.md | 44 +++++++------- .../common/serialize/cancellable_executor.go | 49 ---------------- pkg/networkservice/common/serialize/common.go | 19 +++--- .../common/serialize/context.go | 38 +++++++++--- .../common/serialize/executor.go | 58 +++++++++++++++++++ .../common/serialize/server_test.go | 7 +-- pkg/networkservice/common/timeout/README.md | 2 +- pkg/networkservice/common/timeout/server.go | 3 +- 10 files changed, 127 insertions(+), 97 deletions(-) delete mode 100644 pkg/networkservice/common/serialize/cancellable_executor.go create mode 100644 pkg/networkservice/common/serialize/executor.go diff --git a/pkg/networkservice/chains/client/client.go b/pkg/networkservice/chains/client/client.go index d25a3b9290..e8f782fc2a 100644 --- a/pkg/networkservice/chains/client/client.go +++ b/pkg/networkservice/chains/client/client.go @@ -20,6 +20,7 @@ package client import ( "context" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/serialize" "github.com/networkservicemesh/sdk/pkg/networkservice/common/updatetoken" "github.com/networkservicemesh/sdk/pkg/networkservice/utils/inject/injectpeer" @@ -59,6 +60,7 @@ func NewClient(ctx context.Context, name string, onHeal *networkservice.NetworkS rv = chain.NewNetworkServiceClient( append( append([]networkservice.NetworkServiceClient{ + serialize.NewClient(), authorize.NewClient(), updatepath.NewClient(name), heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc), onHeal), diff --git a/pkg/networkservice/chains/endpoint/server.go b/pkg/networkservice/chains/endpoint/server.go index 5a96d1060e..412beb0a4f 100644 --- a/pkg/networkservice/chains/endpoint/server.go +++ b/pkg/networkservice/chains/endpoint/server.go @@ -66,9 +66,9 @@ func NewServer(ctx context.Context, name string, authzServer networkservice.Netw rv.NetworkServiceServer = setlogoption.NewServer(map[string]string{"chain": name}, chain.NewNetworkServiceServer( append([]networkservice.NetworkServiceServer{ + serialize.NewServer(), authzServer, updatepath.NewServer(name), - serialize.NewServer(), // `timeout` uses ctx as a context for the timeout Close and it closes only the subsequent chain, so // chain elements before the `timeout` in chain shouldn't make any updates to the Close context and // shouldn't be closed on Connection Close. diff --git a/pkg/networkservice/common/serialize/README.md b/pkg/networkservice/common/serialize/README.md index 06799b869f..a2ae97e7b3 100644 --- a/pkg/networkservice/common/serialize/README.md +++ b/pkg/networkservice/common/serialize/README.md @@ -11,41 +11,41 @@ mapping incoming `Connection.ID` to a [serialize.Executor](https://github.com/ed New executors are created on Request. Close deletes existing executor for the request. To make possible a new chain element firing asynchronously with `Request`, `Close` events, serialize chain elements wraps -per-connection executor with [serialize.CancellableExecutor](https://github.com/networkservicemesh/sdk/blob/master/pkg/networkservice/common/serialize/cancellable_executor.go) -and inserts it into the `Request` context. Such thing is not performed for the `Close` context because the executor will -already be cancelled by the time it becomes free. +per-connection executor with [request executor](https://github.com/edwarnicke/serialize/blob/master/executor.go#L35), +[close executor](https://github.com/edwarnicke/serialize/blob/master/executor.go#L50) and inserts them into the `Request` +context. Such thing is not performed for the `Close` context because the executor will already be cancelled by the time +it becomes free. Correct `Request` firing chain element example: ```go func (s *requestServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { - executor := serialize.Executor(ctx) - go func() { - executor.AsyncExec(func() { - _, _ = next.Server(ctx).Request(serialize.WithExecutor(context.TODO(), executor), request) - }) - }() + executor := serialize.RequestExecutor(ctx) + go func() { + executor.AsyncExec(func() { + _, _ = next.Server(ctx).Request(serialize.WithExecutorsFromContext(context.TODO(), ctx), request) + }) + }() - return next.Server(ctx).Request(ctx, request) + return next.Server(ctx).Request(ctx, request) } ``` Correct `Close` firing chain element example: ```go func (s *closeServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { - conn, err := next.Server(ctx).Request(ctx, request) - if err != nil { - return nil, err - } + conn, err := next.Server(ctx).Request(ctx, request) + if err != nil { + return nil, err + } - executor := serialize.Executor(ctx) - go func() { - executor.AsyncExec(func() { - _, _ = next.Server(ctx).Close(context.TODO(), conn) - executor.Cancel() - }) - }() + executor := serialize.CloseExecutor(ctx) + go func() { + executor.AsyncExec(func() { + _, _ = next.Server(ctx).Close(context.TODO(), conn) + }) + }() - return conn, err + return conn, err } ``` diff --git a/pkg/networkservice/common/serialize/cancellable_executor.go b/pkg/networkservice/common/serialize/cancellable_executor.go deleted file mode 100644 index caf375845f..0000000000 --- a/pkg/networkservice/common/serialize/cancellable_executor.go +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright (c) 2020 Doc.ai and/or its affiliates. -// -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package serialize - -import ( - "github.com/edwarnicke/serialize" - "github.com/pkg/errors" -) - -// CancellableExecutor is a cancellable serialize.Executor -type CancellableExecutor struct { - executors *executorMap - id string - executor *serialize.Executor -} - -// AsyncExec is same as serialize.Executor.AsyncExec() except that it returns error channel which sends error if -// the executor is already canceled at the moment of execution -func (e *CancellableExecutor) AsyncExec(f func()) <-chan error { - errCh := make(chan error, 1) - e.executor.AsyncExec(func() { - if executor, ok := e.executors.Load(e.id); !ok || executor != e.executor { - errCh <- errors.Errorf("executor is already canceled: %v", e.id) - return - } - f() - close(errCh) - }) - return errCh -} - -// Cancel cancels the executor, should be called under the AsyncExec to prevent concurrent errors -func (e *CancellableExecutor) Cancel() { - e.executors.Delete(e.id) -} diff --git a/pkg/networkservice/common/serialize/common.go b/pkg/networkservice/common/serialize/common.go index df56191452..1785b8d4cd 100644 --- a/pkg/networkservice/common/serialize/common.go +++ b/pkg/networkservice/common/serialize/common.go @@ -43,13 +43,10 @@ func requestConnection( newExecutor := new(serialize.Executor) <-newExecutor.AsyncExec(func() { executor, loaded := executors.LoadOrStore(connID, newExecutor) - // We should set `cancellableExecutor` into the request context so the following chain elements can generate - // new Request, Close events and insert them into the chain in a serial way. - cancellableExecutor := &CancellableExecutor{ - executors: executors, - id: connID, - executor: executor, - } + // We should set `requestExecutor`, `closeExecutor` into the request context so the following chain elements + // can generate new Request, Close events and insert them into the chain in a serial way. + requestExecutor := newRequestExecutor(executor, connID, executors) + closeExecutor := newCloseExecutor(executor, connID, executors) if loaded { <-executor.AsyncExec(func() { // Executor has been possibly removed at this moment, we need to store it back. @@ -67,11 +64,11 @@ func requestConnection( err = errors.Errorf("race condition, parallel request execution: %v", connID) return } - if conn, err = requestConn(WithExecutor(ctx, cancellableExecutor)); err != nil { + if conn, err = requestConn(withExecutors(ctx, requestExecutor, closeExecutor)); err != nil { executors.Delete(connID) } }) - } else if conn, err = requestConn(WithExecutor(ctx, cancellableExecutor)); err != nil { + } else if conn, err = requestConn(withExecutors(ctx, requestExecutor, closeExecutor)); err != nil { executors.Delete(connID) } }) @@ -92,8 +89,8 @@ func closeConnection( <-executor.AsyncExec(func() { var exec *serialize.Executor if exec, ok = executors.Load(connID); ok && exec == executor { - // We don't set `cancellableExecutor` into the close context because it should be canceled by the time - // anything can be executed with it. + // We don't set `requestExecutor`, `closeExecutor` into the close context because they should be + // canceled by the time anything can be executed with them. _, err = closeConn(ctx) executors.Delete(connID) } else { diff --git a/pkg/networkservice/common/serialize/context.go b/pkg/networkservice/common/serialize/context.go index a8c394d2f2..43a0d42f31 100644 --- a/pkg/networkservice/common/serialize/context.go +++ b/pkg/networkservice/common/serialize/context.go @@ -21,22 +21,46 @@ import ( ) const ( - executorKey contextKeyType = "serialize.Executor" + requestExecutorKey contextKeyType = "requestExecutor" + closeExecutorKey contextKeyType = "closeExecutor" ) type contextKeyType string -// WithExecutor wraps `parent` in a new context with the CancellableExecutor -func WithExecutor(parent context.Context, executor *CancellableExecutor) context.Context { +func withExecutors(parent context.Context, requestExecutor, closeExecutor Executor) context.Context { if parent == nil { parent = context.TODO() } - return context.WithValue(parent, executorKey, executor) + return context.WithValue(context.WithValue(parent, + requestExecutorKey, requestExecutor), + closeExecutorKey, closeExecutor) } -// Executor returns CancellableExecutor -func Executor(ctx context.Context) *CancellableExecutor { - if executor, ok := ctx.Value(executorKey).(*CancellableExecutor); ok { +// WithExecutorsFromContext wraps `parent` in a new context with the executors from `executorsContext` +func WithExecutorsFromContext(parent, executorsContext context.Context) context.Context { + if parent == nil { + parent = context.TODO() + } + if requestExecutor := RequestExecutor(executorsContext); requestExecutor != nil { + parent = context.WithValue(parent, requestExecutorKey, requestExecutor) + } + if closeExecutor := CloseExecutor(executorsContext); closeExecutor != nil { + parent = context.WithValue(parent, closeExecutorKey, closeExecutor) + } + return parent +} + +// RequestExecutor returns Request `Executor` +func RequestExecutor(ctx context.Context) Executor { + if executor, ok := ctx.Value(requestExecutorKey).(Executor); ok { + return executor + } + return nil +} + +// CloseExecutor returns Close `Executor` +func CloseExecutor(ctx context.Context) Executor { + if executor, ok := ctx.Value(closeExecutorKey).(Executor); ok { return executor } return nil diff --git a/pkg/networkservice/common/serialize/executor.go b/pkg/networkservice/common/serialize/executor.go new file mode 100644 index 0000000000..f736bb4b2c --- /dev/null +++ b/pkg/networkservice/common/serialize/executor.go @@ -0,0 +1,58 @@ +// Copyright (c) 2020 Doc.ai and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package serialize + +import ( + "github.com/edwarnicke/serialize" + "github.com/pkg/errors" +) + +// Executor is same as serialize.Executor except that it returns error channel +type Executor interface { + AsyncExec(f func()) <-chan error +} + +type executorFunc func(f func()) <-chan error + +func (ef executorFunc) AsyncExec(f func()) <-chan error { + return ef(f) +} + +func newRequestExecutor(executor *serialize.Executor, id string, executors *executorMap) Executor { + return executorFunc(func(f func()) <-chan error { + errCh := make(chan error, 1) + executor.AsyncExec(func() { + if exec, ok := executors.Load(id); !ok || exec != executor { + errCh <- errors.Errorf("connection is already closed: %v", id) + return + } + f() + close(errCh) + }) + return errCh + }) +} + +func newCloseExecutor(executor *serialize.Executor, id string, executors *executorMap) Executor { + return executorFunc(func(f func()) <-chan error { + exec := newRequestExecutor(executor, id, executors) + return exec.AsyncExec(func() { + f() + executors.Delete(id) + }) + }) +} diff --git a/pkg/networkservice/common/serialize/server_test.go b/pkg/networkservice/common/serialize/server_test.go index 252d10fd01..32dac6d3c0 100644 --- a/pkg/networkservice/common/serialize/server_test.go +++ b/pkg/networkservice/common/serialize/server_test.go @@ -78,10 +78,10 @@ func TestSerializeServer_StressTest(t *testing.T) { type requestServer struct{} func (s *requestServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { - executor := serialize.Executor(ctx) + executor := serialize.RequestExecutor(ctx) go func() { executor.AsyncExec(func() { - _, _ = next.Server(ctx).Request(serialize.WithExecutor(context.TODO(), executor), request) + _, _ = next.Server(ctx).Request(serialize.WithExecutorsFromContext(context.TODO(), ctx), request) }) }() @@ -100,11 +100,10 @@ func (s *closeServer) Request(ctx context.Context, request *networkservice.Netwo return nil, err } - executor := serialize.Executor(ctx) + executor := serialize.CloseExecutor(ctx) go func() { executor.AsyncExec(func() { _, _ = next.Server(ctx).Close(context.TODO(), conn) - executor.Cancel() }) }() diff --git a/pkg/networkservice/common/timeout/README.md b/pkg/networkservice/common/timeout/README.md index 43beac7540..12ea486a51 100644 --- a/pkg/networkservice/common/timeout/README.md +++ b/pkg/networkservice/common/timeout/README.md @@ -18,9 +18,9 @@ Example of correct timeoutServer usage in [endpoint.NewServer](https://github.co ```go rv.NetworkServiceServer = chain.NewNetworkServiceServer( append([]networkservice.NetworkServiceServer{ + serialize.NewServer(ctx) // <-- should be before the timeoutServer authzServer, // <-- shouldn't be closed, don't set anything to the context updatepath.NewServer(name), // <-- same - serialize.NewServer(ctx) // <-- should be before the timeoutServer timeout.NewServer(ctx), // <-- timeoutServer monitor.NewServer(ctx, &rv.MonitorConnectionServer), // <-- should be closed updatetoken.NewServer(tokenGenerator), // <-- should be closed diff --git a/pkg/networkservice/common/timeout/server.go b/pkg/networkservice/common/timeout/server.go index ab74268aa0..03acbfec19 100644 --- a/pkg/networkservice/common/timeout/server.go +++ b/pkg/networkservice/common/timeout/server.go @@ -86,7 +86,7 @@ func (t *timeoutServer) Request(ctx context.Context, request *networkservice.Net func (t *timeoutServer) createTimer(ctx context.Context, conn *networkservice.Connection) (*time.Timer, error) { logEntry := log.Entry(ctx).WithField("timeoutServer", "createTimer") - executor := serialize.Executor(ctx) + executor := serialize.CloseExecutor(ctx) if executor == nil { return nil, errors.New("no executor provided") } @@ -108,7 +108,6 @@ func (t *timeoutServer) createTimer(ctx context.Context, conn *networkservice.Co if err := t.close(t.ctx, conn, next.Server(ctx)); err != nil { logEntry.Errorf("failed to close timed out connection: %v %+v", conn.GetId(), err) } - executor.Cancel() }); err != nil { logEntry.Warnf("connection has been already closed: %v", conn.GetId()) }