From b8d66861cada8a80e7db10a5ff335e6b2a33b695 Mon Sep 17 00:00:00 2001 From: Vladimir Popov Date: Wed, 11 Nov 2020 13:22:48 +0700 Subject: [PATCH] Rework timeout with serialize Signed-off-by: Vladimir Popov --- pkg/networkservice/chains/endpoint/server.go | 2 + pkg/networkservice/common/timeout/README.md | 22 +---- pkg/networkservice/common/timeout/server.go | 97 ++++--------------- .../common/timeout/server_test.go | 2 + 4 files changed, 29 insertions(+), 94 deletions(-) diff --git a/pkg/networkservice/chains/endpoint/server.go b/pkg/networkservice/chains/endpoint/server.go index 570811dbd4..5a96d1060e 100644 --- a/pkg/networkservice/chains/endpoint/server.go +++ b/pkg/networkservice/chains/endpoint/server.go @@ -27,6 +27,7 @@ import ( "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/serialize" "github.com/networkservicemesh/sdk/pkg/networkservice/common/updatetoken" "github.com/networkservicemesh/sdk/pkg/networkservice/core/setlogoption" @@ -67,6 +68,7 @@ func NewServer(ctx context.Context, name string, authzServer networkservice.Netw append([]networkservice.NetworkServiceServer{ authzServer, updatepath.NewServer(name), + serialize.NewServer(), // `timeout` uses ctx as a context for the timeout Close and it closes only the subsequent chain, so // chain elements before the `timeout` in chain shouldn't make any updates to the Close context and // shouldn't be closed on Connection Close. diff --git a/pkg/networkservice/common/timeout/README.md b/pkg/networkservice/common/timeout/README.md index 3cfd6b4e15..43beac7540 100644 --- a/pkg/networkservice/common/timeout/README.md +++ b/pkg/networkservice/common/timeout/README.md @@ -9,9 +9,7 @@ previous NSM-based application. For such case connection should be closed. timeoutServer keeps timers [timeout.timerMap](https://github.com/networkservicemesh/sdk/blob/master/pkg/networkservice/common/timeout/gen.go#L26) mapping incoming request Connection.ID to a timeout timer firing Close on the subsequent chain after the connection previous -path element expires. To prevent simultaneous execution of multiple Request, Close event for the same Connection.ID in parallel -it also keeps executors [timeout.executorMap](https://github.com/networkservicemesh/sdk/blob/master/pkg/networkservice/common/timeout/gen.go#L27) -mapping request Connection.ID to an executor for serializing all Request, Close event for the mapped Connection.ID. +path element expires. timeoutServer closes only subsequent chain elements and uses base context for the Close. So all the chain elements in the chain before the timeoutServer shouldn't be closed with Close and shouldn't set any required data to the Close context. @@ -22,6 +20,7 @@ rv.NetworkServiceServer = chain.NewNetworkServiceServer( append([]networkservice.NetworkServiceServer{ authzServer, // <-- shouldn't be closed, don't set anything to the context updatepath.NewServer(name), // <-- same + serialize.NewServer(ctx) // <-- should be before the timeoutServer timeout.NewServer(ctx), // <-- timeoutServer monitor.NewServer(ctx, &rv.MonitorConnectionServer), // <-- should be closed updatetoken.NewServer(tokenGenerator), // <-- should be closed @@ -30,18 +29,5 @@ rv.NetworkServiceServer = chain.NewNetworkServiceServer( ## Comments on concurrency characteristics -Concurrency is managed through type specific wrappers of [sync.Map](https://golang.org/pkg/sync/#Map) and with -per-connection [serialize.Executor](https://github.com/edwarnicke/serialize/blob/master/serialize.go) which are created on -Request and deleted on Close. - -Since we are deleting the per-connection executor on connection Close, there possibly can be a race condition: -``` -1. -> timeout close : locking executor -2. -> request-1 : waiting on executor -3. timeout close -> : unlocking executor, removing it from executors -4. -> request-2 : creating exec, storing into executors, locking exec -5. -request-1-> : locking executor, trying to store it into executors -``` -at 5. we get request-1 locking executor, request-2 locking exec and only exec stored in executors. It means that -request-2 and all subsequent events will be executed in parallel with request-1. -So we check for this [here](https://github.com/networkservicemesh/sdk/blob/master/pkg/networkservice/common/timeout/server.go#L68). \ No newline at end of file +Concurrency is managed with [serialize.NewServer](https://github.com/networkservicemesh/sdk/blob/master/pkg/networkservice/common/serialize/server.go) +in the chain before. \ No newline at end of file diff --git a/pkg/networkservice/common/timeout/server.go b/pkg/networkservice/common/timeout/server.go index d430a0a9b4..ab74268aa0 100644 --- a/pkg/networkservice/common/timeout/server.go +++ b/pkg/networkservice/common/timeout/server.go @@ -23,21 +23,20 @@ import ( "context" "time" - "github.com/edwarnicke/serialize" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/empty" "github.com/pkg/errors" "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/serialize" "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" "github.com/networkservicemesh/sdk/pkg/tools/log" ) type timeoutServer struct { - ctx context.Context - timers timerMap - executors executorMap + ctx context.Context + timers timerMap } // NewServer - creates a new NetworkServiceServer chain element that implements timeout of expired connections @@ -50,45 +49,7 @@ func NewServer(ctx context.Context) networkservice.NetworkServiceServer { } } -func (t *timeoutServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (conn *networkservice.Connection, err error) { - connID := request.GetConnection().GetId() - - newExecutor := new(serialize.Executor) - // If we create an executor, we should be the first one who uses it, or else we can get a double Close issue: - // 1. timeout close -> : closing the Connection - // 2. request -> : creating `newExecutor`, storing into `executors` - // 3. close -> : locking `newExecutor`, checking `newExecutor == executors[connID]` - looks like - // a retry Close case (but it isn't), double closing the Connection - <-newExecutor.AsyncExec(func() { - executor, loaded := t.executors.LoadOrStore(connID, newExecutor) - if loaded { - <-executor.AsyncExec(func() { - // Executor has been possibly removed by `t.close()` at this moment, we need to store it back. - exec, _ := t.executors.LoadOrStore(connID, executor) - if exec != executor { - // It can happen in such situation: - // 1. -> timeout close : locking `executor` - // 2. -> request-1 : waiting on `executor` - // 3. timeout close -> : unlocking `executor`, removing it from `executors` - // 4. -> request-2 : creating `exec`, storing into `executors`, locking `exec` - // 5. -request-1-> : locking `executor`, trying to store it into `executors` - // at 5. we get `request-1` locking `executor`, `request-2` locking `exec` and only `exec` stored - // in `executors`. It means that `request-2` and all subsequent events will be executed in parallel - // with `request-1`. - err = errors.Errorf("race condition, parallel request execution: %v", connID) - return - } - conn, err = t.request(ctx, request, executor) - }) - } else { - conn, err = t.request(ctx, request, executor) - } - }) - - return conn, err -} - -func (t *timeoutServer) request(ctx context.Context, request *networkservice.NetworkServiceRequest, executor *serialize.Executor) (*networkservice.Connection, error) { +func (t *timeoutServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { logEntry := log.Entry(ctx).WithField("timeoutServer", "request") connID := request.GetConnection().GetId() @@ -109,7 +70,7 @@ func (t *timeoutServer) request(ctx context.Context, request *networkservice.Net return nil, err } - timer, err := t.createTimer(ctx, conn, executor) + timer, err := t.createTimer(ctx, conn) if err != nil { if _, closeErr := next.Server(ctx).Close(ctx, conn); closeErr != nil { err = errors.Wrapf(err, "error attempting to close failed connection %v: %+v", connID, closeErr) @@ -122,9 +83,14 @@ func (t *timeoutServer) request(ctx context.Context, request *networkservice.Net return conn, nil } -func (t *timeoutServer) createTimer(ctx context.Context, conn *networkservice.Connection, executor *serialize.Executor) (*time.Timer, error) { +func (t *timeoutServer) createTimer(ctx context.Context, conn *networkservice.Connection) (*time.Timer, error) { logEntry := log.Entry(ctx).WithField("timeoutServer", "createTimer") + executor := serialize.Executor(ctx) + if executor == nil { + return nil, errors.New("no executor provided") + } + expireTime, err := ptypes.Timestamp(conn.GetPath().GetPathSegments()[conn.GetPath().GetIndex()-1].GetExpires()) if err != nil { return nil, err @@ -134,7 +100,7 @@ func (t *timeoutServer) createTimer(ctx context.Context, conn *networkservice.Co timerPtr := new(*time.Timer) *timerPtr = time.AfterFunc(time.Until(expireTime), func() { - executor.AsyncExec(func() { + if err := <-executor.AsyncExec(func() { if timer, ok := t.timers.Load(conn.GetId()); !ok || timer != *timerPtr { logEntry.Warnf("timer has been already stopped: %v", conn.GetId()) return @@ -142,51 +108,30 @@ func (t *timeoutServer) createTimer(ctx context.Context, conn *networkservice.Co if err := t.close(t.ctx, conn, next.Server(ctx)); err != nil { logEntry.Errorf("failed to close timed out connection: %v %+v", conn.GetId(), err) } - }) + executor.Cancel() + }); err != nil { + logEntry.Warnf("connection has been already closed: %v", conn.GetId()) + } }) return *timerPtr, nil } func (t *timeoutServer) Close(ctx context.Context, conn *networkservice.Connection) (_ *empty.Empty, err error) { - logEntry := log.Entry(ctx).WithField("timeoutServer", "Close") - - executor, ok := t.executors.Load(conn.GetId()) - if ok { - <-executor.AsyncExec(func() { - var exec *serialize.Executor - if exec, ok = t.executors.Load(conn.GetId()); ok && exec == executor { - err = t.close(ctx, conn, next.Server(ctx)) - } else { - ok = false - } - }) - } - if !ok { - logEntry.Warnf("connection has been already closed: %v", conn.GetId()) - return &empty.Empty{}, nil - } - - return &empty.Empty{}, err + return &empty.Empty{}, t.close(ctx, conn, next.Server(ctx)) } func (t *timeoutServer) close(ctx context.Context, conn *networkservice.Connection, nextServer networkservice.NetworkServiceServer) error { logEntry := log.Entry(ctx).WithField("timeoutServer", "close") timer, ok := t.timers.LoadAndDelete(conn.GetId()) - if ok { - timer.Stop() - } else { - // Last time we failed to close the Connection, let's do it again. - logEntry.Warnf("retrying to close the connection: %v", conn.GetId()) + if !ok { + logEntry.Warnf("connection has been already closed: %v", conn.GetId()) + return nil } + timer.Stop() _, err := nextServer.Close(ctx, conn) - if err == nil { - // If `nextServer.Close()` returns an error, the Connection is not truly closed, so we don't want to delete - // the related executor. - t.executors.Delete(conn.GetId()) - } return err } diff --git a/pkg/networkservice/common/timeout/server_test.go b/pkg/networkservice/common/timeout/server_test.go index 4ff7d042c1..560d8dc5dd 100644 --- a/pkg/networkservice/common/timeout/server_test.go +++ b/pkg/networkservice/common/timeout/server_test.go @@ -33,6 +33,7 @@ import ( "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms" "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/kernel" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/serialize" "github.com/networkservicemesh/sdk/pkg/networkservice/common/timeout" "github.com/networkservicemesh/sdk/pkg/networkservice/common/updatepath" "github.com/networkservicemesh/sdk/pkg/networkservice/common/updatetoken" @@ -61,6 +62,7 @@ func testClient(ctx context.Context, server networkservice.NetworkServiceServer, adapters.NewServerToClient( chain.NewNetworkServiceServer( updatepath.NewServer(serverName), + serialize.NewServer(), timeout.NewServer(ctx), mechanisms.NewServer(map[string]networkservice.NetworkServiceServer{ kernelmech.MECHANISM: server,