Skip to content

Commit

Permalink
Serialize chain elements (networkservicemesh#583)
Browse files Browse the repository at this point in the history
* Add serialize

Signed-off-by: Vladimir Popov <[email protected]>

* Rework timeout with serialize

Signed-off-by: Vladimir Popov <[email protected]>

* Rework serialize with RequestExecutor, CloseExecutor

Signed-off-by: Vladimir Popov <[email protected]>

* Fix race condition issue in serialize

Signed-off-by: Vladimir Popov <[email protected]>

* Update github.com/edwarnicke/serialize version

Signed-off-by: Vladimir Popov <[email protected]>

* Rework serialize

Signed-off-by: Vladimir Popov <[email protected]>

* Fix URL in README

Signed-off-by: Vladimir Popov <[email protected]>

* Use atomic.AddUint32() instead of uuid.New().Id()

Signed-off-by: Vladimir Popov <[email protected]>

* Add multiexecutor

Signed-off-by: Vladimir Popov <[email protected]>

* Rework serialize with multi executor

Signed-off-by: Vladimir Popov <[email protected]>

* Move multi executor to serialize

Signed-off-by: Vladimir Popov <[email protected]>

* Rename `IExecutor` to `Executor`

Signed-off-by: Vladimir Popov <[email protected]>
Signed-off-by: Sergey Ershov <[email protected]>
  • Loading branch information
Vladimir Popov authored and Sergey Ershov committed Dec 23, 2020
1 parent 4532192 commit 333514b
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 224 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/edwarnicke/exechelper v1.0.2
github.com/edwarnicke/grpcfd v0.0.0-20200920223154-d5b6e1f19bd0
github.com/edwarnicke/serialize v0.0.0-20200705214914-ebc43080eecf
github.com/edwarnicke/serialize v1.0.7
github.com/fsnotify/fsnotify v1.4.9
github.com/ghodss/yaml v1.0.0
github.com/golang/protobuf v1.4.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ github.com/edwarnicke/grpcfd v0.0.0-20200920223154-d5b6e1f19bd0 h1:FHjcIM6YU8DnC
github.com/edwarnicke/grpcfd v0.0.0-20200920223154-d5b6e1f19bd0/go.mod h1:rHihB9YvNMixz8rS+ZbwosI2kj65VLkeyYAI2M+/cGA=
github.com/edwarnicke/serialize v0.0.0-20200705214914-ebc43080eecf h1:/lViRfaDxKINb2X6kOR3EJKJGR+MxUvqfgtYt5nh+qc=
github.com/edwarnicke/serialize v0.0.0-20200705214914-ebc43080eecf/go.mod h1:XvbCO/QGsl3X8RzjBMoRpkm54FIAZH5ChK2j+aox7pw=
github.com/edwarnicke/serialize v1.0.7 h1:geX8vmyu8Ij2S5fFIXjy9gBDkKxXnrMIzMoDvV0Ddac=
github.com/edwarnicke/serialize v1.0.7/go.mod h1:y79KgU2P7ALH/4j37uTSIdNavHFNttqN7pzO6Y8B2aw=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down
2 changes: 2 additions & 0 deletions pkg/networkservice/chains/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package client
import (
"context"

"google.golang.org/grpc"

"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/authorize"
Expand Down
2 changes: 2 additions & 0 deletions pkg/networkservice/chains/endpoint/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/serialize"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/updatetoken"
"github.com/networkservicemesh/sdk/pkg/tools/grpcutils"

Expand Down Expand Up @@ -65,6 +66,7 @@ func NewServer(ctx context.Context, name string, authzServer networkservice.Netw
append([]networkservice.NetworkServiceServer{
authzServer,
updatepath.NewServer(name),
serialize.NewServer(),
// `timeout` uses ctx as a context for the timeout Close and it closes only the subsequent chain, so
// chain elements before the `timeout` in chain shouldn't make any updates to the Close context and
// shouldn't be closed on Connection Close.
Expand Down
22 changes: 4 additions & 18 deletions pkg/networkservice/common/timeout/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ previous NSM-based application. For such case connection should be closed.

timeoutServer keeps timers [timeout.timerMap](https://github.com/networkservicemesh/sdk/blob/master/pkg/networkservice/common/timeout/gen.go#L26)
mapping incoming request Connection.ID to a timeout timer firing Close on the subsequent chain after the connection previous
path element expires. To prevent simultaneous execution of multiple Request, Close event for the same Connection.ID in parallel
it also keeps executors [timeout.executorMap](https://github.com/networkservicemesh/sdk/blob/master/pkg/networkservice/common/timeout/gen.go#L27)
mapping request Connection.ID to an executor for serializing all Request, Close event for the mapped Connection.ID.
path element expires.

timeoutServer closes only subsequent chain elements and uses base context for the Close. So all the chain elements in
the chain before the timeoutServer shouldn't be closed with Close and shouldn't set any required data to the Close context.
Expand All @@ -22,6 +20,7 @@ rv.NetworkServiceServer = chain.NewNetworkServiceServer(
append([]networkservice.NetworkServiceServer{
authzServer, // <-- shouldn't be closed, don't set anything to the context
updatepath.NewServer(name), // <-- same
serialize.NewServer(ctx) // <-- should be before the timeoutServer
timeout.NewServer(ctx), // <-- timeoutServer
monitor.NewServer(ctx, &rv.MonitorConnectionServer), // <-- should be closed
updatetoken.NewServer(tokenGenerator), // <-- should be closed
Expand All @@ -30,18 +29,5 @@ rv.NetworkServiceServer = chain.NewNetworkServiceServer(

## Comments on concurrency characteristics

Concurrency is managed through type specific wrappers of [sync.Map](https://golang.org/pkg/sync/#Map) and with
per-connection [serialize.Executor](https://github.com/edwarnicke/serialize/blob/master/serialize.go) which are created on
Request and deleted on Close.

Since we are deleting the per-connection executor on connection Close, there possibly can be a race condition:
```
1. -> timeout close : locking executor
2. -> request-1 : waiting on executor
3. timeout close -> : unlocking executor, removing it from executors
4. -> request-2 : creating exec, storing into executors, locking exec
5. -request-1-> : locking executor, trying to store it into executors
```
at 5. we get request-1 locking executor, request-2 locking exec and only exec stored in executors. It means that
request-2 and all subsequent events will be executed in parallel with request-1.
So we check for this [here](https://github.com/networkservicemesh/sdk/blob/master/pkg/networkservice/common/timeout/server.go#L68).
Concurrency is managed with [serialize.NewServer](https://github.com/networkservicemesh/sdk/blob/master/pkg/networkservice/common/serialize/server.go)
in the chain before.
75 changes: 0 additions & 75 deletions pkg/networkservice/common/timeout/executor_map.gen.go

This file was deleted.

2 changes: 0 additions & 2 deletions pkg/networkservice/common/timeout/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,5 @@ import (
)

//go:generate go-syncmap -output timer_map.gen.go -type timerMap<string,*time.Timer>
//go:generate go-syncmap -output executor_map.gen.go -type executorMap<string,*github.com/edwarnicke/serialize.Executor>

type timerMap sync.Map
type executorMap sync.Map
105 changes: 21 additions & 84 deletions pkg/networkservice/common/timeout/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,20 @@ import (
"context"
"time"

"github.com/edwarnicke/serialize"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"

"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/serialize"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/log"
)

type timeoutServer struct {
ctx context.Context
timers timerMap
executors executorMap
ctx context.Context
timers timerMap
}

// NewServer - creates a new NetworkServiceServer chain element that implements timeout of expired connections
Expand All @@ -50,45 +49,7 @@ func NewServer(ctx context.Context) networkservice.NetworkServiceServer {
}
}

func (t *timeoutServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (conn *networkservice.Connection, err error) {
connID := request.GetConnection().GetId()

newExecutor := new(serialize.Executor)
// If we create an executor, we should be the first one who uses it, or else we can get a double Close issue:
// 1. timeout close -> : closing the Connection
// 2. request -> : creating `newExecutor`, storing into `executors`
// 3. close -> : locking `newExecutor`, checking `newExecutor == executors[connID]` - looks like
// a retry Close case (but it isn't), double closing the Connection
<-newExecutor.AsyncExec(func() {
executor, loaded := t.executors.LoadOrStore(connID, newExecutor)
if loaded {
<-executor.AsyncExec(func() {
// Executor has been possibly removed by `t.close()` at this moment, we need to store it back.
exec, _ := t.executors.LoadOrStore(connID, executor)
if exec != executor {
// It can happen in such situation:
// 1. -> timeout close : locking `executor`
// 2. -> request-1 : waiting on `executor`
// 3. timeout close -> : unlocking `executor`, removing it from `executors`
// 4. -> request-2 : creating `exec`, storing into `executors`, locking `exec`
// 5. -request-1-> : locking `executor`, trying to store it into `executors`
// at 5. we get `request-1` locking `executor`, `request-2` locking `exec` and only `exec` stored
// in `executors`. It means that `request-2` and all subsequent events will be executed in parallel
// with `request-1`.
err = errors.Errorf("race condition, parallel request execution: %v", connID)
return
}
conn, err = t.request(ctx, request, executor)
})
} else {
conn, err = t.request(ctx, request, executor)
}
})

return conn, err
}

func (t *timeoutServer) request(ctx context.Context, request *networkservice.NetworkServiceRequest, executor *serialize.Executor) (*networkservice.Connection, error) {
func (t *timeoutServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
logEntry := log.Entry(ctx).WithField("timeoutServer", "request")

connID := request.GetConnection().GetId()
Expand All @@ -109,7 +70,7 @@ func (t *timeoutServer) request(ctx context.Context, request *networkservice.Net
return nil, err
}

timer, err := t.createTimer(ctx, conn, executor)
timer, err := t.createTimer(ctx, conn)
if err != nil {
if _, closeErr := next.Server(ctx).Close(ctx, conn); closeErr != nil {
err = errors.Wrapf(err, "error attempting to close failed connection %v: %+v", connID, closeErr)
Expand All @@ -122,9 +83,14 @@ func (t *timeoutServer) request(ctx context.Context, request *networkservice.Net
return conn, nil
}

func (t *timeoutServer) createTimer(ctx context.Context, conn *networkservice.Connection, executor *serialize.Executor) (*time.Timer, error) {
func (t *timeoutServer) createTimer(ctx context.Context, conn *networkservice.Connection) (*time.Timer, error) {
logEntry := log.Entry(ctx).WithField("timeoutServer", "createTimer")

executor := serialize.GetExecutor(ctx)
if executor == nil {
return nil, errors.New("no executor provided")
}

if conn.GetPrevPathSegment().GetExpires() == nil {
return nil, errors.Errorf("expiration for prev path segment cannot be nil. conn: %+v", conn)
}
Expand All @@ -137,12 +103,13 @@ func (t *timeoutServer) createTimer(ctx context.Context, conn *networkservice.Co

timerPtr := new(*time.Timer)
*timerPtr = time.AfterFunc(time.Until(expireTime), func() {
executor.AsyncExec(func() {
if timer, ok := t.timers.Load(conn.GetId()); !ok || timer != *timerPtr {
<-executor.AsyncExec(func() {
if timer, _ := t.timers.Load(conn.GetId()); timer != *timerPtr {
logEntry.Warnf("timer has been already stopped: %v", conn.GetId())
return
}
if err := t.close(t.ctx, conn, next.Server(ctx)); err != nil {
t.timers.Delete(conn.GetId())
if _, err := next.Server(ctx).Close(t.ctx, conn); err != nil {
logEntry.Errorf("failed to close timed out connection: %v %+v", conn.GetId(), err)
}
})
Expand All @@ -151,45 +118,15 @@ func (t *timeoutServer) createTimer(ctx context.Context, conn *networkservice.Co
return *timerPtr, nil
}

func (t *timeoutServer) Close(ctx context.Context, conn *networkservice.Connection) (_ *empty.Empty, err error) {
logEntry := log.Entry(ctx).WithField("timeoutServer", "Close")
func (t *timeoutServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) {
logEntry := log.Entry(ctx).WithField("timeoutServer", "createTimer")

executor, ok := t.executors.Load(conn.GetId())
if ok {
<-executor.AsyncExec(func() {
var exec *serialize.Executor
if exec, ok = t.executors.Load(conn.GetId()); ok && exec == executor {
err = t.close(ctx, conn, next.Server(ctx))
} else {
ok = false
}
})
}
timer, ok := t.timers.LoadAndDelete(conn.GetId())
if !ok {
logEntry.Warnf("connection has been already closed: %v", conn.GetId())
return &empty.Empty{}, nil
}

return &empty.Empty{}, err
}

func (t *timeoutServer) close(ctx context.Context, conn *networkservice.Connection, nextServer networkservice.NetworkServiceServer) error {
logEntry := log.Entry(ctx).WithField("timeoutServer", "close")

timer, ok := t.timers.LoadAndDelete(conn.GetId())
if ok {
timer.Stop()
} else {
// Last time we failed to close the Connection, let's do it again.
logEntry.Warnf("retrying to close the connection: %v", conn.GetId())
}

_, err := nextServer.Close(ctx, conn)
if err == nil {
// If `nextServer.Close()` returns an error, the Connection is not truly closed, so we don't want to delete
// the related executor.
t.executors.Delete(conn.GetId())
return new(empty.Empty), nil
}
timer.Stop()

return err
return next.Server(ctx).Close(ctx, conn)
}
Loading

0 comments on commit 333514b

Please sign in to comment.