Skip to content

Commit

Permalink
Rework timeout with serialize
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Popov <[email protected]>
  • Loading branch information
Vladimir Popov committed Nov 11, 2020
1 parent 51eb262 commit b8d6686
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 94 deletions.
2 changes: 2 additions & 0 deletions pkg/networkservice/chains/endpoint/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/serialize"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/updatetoken"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/setlogoption"

Expand Down Expand Up @@ -67,6 +68,7 @@ func NewServer(ctx context.Context, name string, authzServer networkservice.Netw
append([]networkservice.NetworkServiceServer{
authzServer,
updatepath.NewServer(name),
serialize.NewServer(),
// `timeout` uses ctx as a context for the timeout Close and it closes only the subsequent chain, so
// chain elements before the `timeout` in chain shouldn't make any updates to the Close context and
// shouldn't be closed on Connection Close.
Expand Down
22 changes: 4 additions & 18 deletions pkg/networkservice/common/timeout/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ previous NSM-based application. For such case connection should be closed.

timeoutServer keeps timers [timeout.timerMap](https://github.com/networkservicemesh/sdk/blob/master/pkg/networkservice/common/timeout/gen.go#L26)
mapping incoming request Connection.ID to a timeout timer firing Close on the subsequent chain after the connection previous
path element expires. To prevent simultaneous execution of multiple Request, Close event for the same Connection.ID in parallel
it also keeps executors [timeout.executorMap](https://github.com/networkservicemesh/sdk/blob/master/pkg/networkservice/common/timeout/gen.go#L27)
mapping request Connection.ID to an executor for serializing all Request, Close event for the mapped Connection.ID.
path element expires.

timeoutServer closes only subsequent chain elements and uses base context for the Close. So all the chain elements in
the chain before the timeoutServer shouldn't be closed with Close and shouldn't set any required data to the Close context.
Expand All @@ -22,6 +20,7 @@ rv.NetworkServiceServer = chain.NewNetworkServiceServer(
append([]networkservice.NetworkServiceServer{
authzServer, // <-- shouldn't be closed, don't set anything to the context
updatepath.NewServer(name), // <-- same
serialize.NewServer(ctx) // <-- should be before the timeoutServer
timeout.NewServer(ctx), // <-- timeoutServer
monitor.NewServer(ctx, &rv.MonitorConnectionServer), // <-- should be closed
updatetoken.NewServer(tokenGenerator), // <-- should be closed
Expand All @@ -30,18 +29,5 @@ rv.NetworkServiceServer = chain.NewNetworkServiceServer(

## Comments on concurrency characteristics

Concurrency is managed through type specific wrappers of [sync.Map](https://golang.org/pkg/sync/#Map) and with
per-connection [serialize.Executor](https://github.com/edwarnicke/serialize/blob/master/serialize.go) which are created on
Request and deleted on Close.

Since we are deleting the per-connection executor on connection Close, there possibly can be a race condition:
```
1. -> timeout close : locking executor
2. -> request-1 : waiting on executor
3. timeout close -> : unlocking executor, removing it from executors
4. -> request-2 : creating exec, storing into executors, locking exec
5. -request-1-> : locking executor, trying to store it into executors
```
at 5. we get request-1 locking executor, request-2 locking exec and only exec stored in executors. It means that
request-2 and all subsequent events will be executed in parallel with request-1.
So we check for this [here](https://github.com/networkservicemesh/sdk/blob/master/pkg/networkservice/common/timeout/server.go#L68).
Concurrency is managed with [serialize.NewServer](https://github.com/networkservicemesh/sdk/blob/master/pkg/networkservice/common/serialize/server.go)
in the chain before.
97 changes: 21 additions & 76 deletions pkg/networkservice/common/timeout/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,20 @@ import (
"context"
"time"

"github.com/edwarnicke/serialize"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"

"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/serialize"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/log"
)

type timeoutServer struct {
ctx context.Context
timers timerMap
executors executorMap
ctx context.Context
timers timerMap
}

// NewServer - creates a new NetworkServiceServer chain element that implements timeout of expired connections
Expand All @@ -50,45 +49,7 @@ func NewServer(ctx context.Context) networkservice.NetworkServiceServer {
}
}

func (t *timeoutServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (conn *networkservice.Connection, err error) {
connID := request.GetConnection().GetId()

newExecutor := new(serialize.Executor)
// If we create an executor, we should be the first one who uses it, or else we can get a double Close issue:
// 1. timeout close -> : closing the Connection
// 2. request -> : creating `newExecutor`, storing into `executors`
// 3. close -> : locking `newExecutor`, checking `newExecutor == executors[connID]` - looks like
// a retry Close case (but it isn't), double closing the Connection
<-newExecutor.AsyncExec(func() {
executor, loaded := t.executors.LoadOrStore(connID, newExecutor)
if loaded {
<-executor.AsyncExec(func() {
// Executor has been possibly removed by `t.close()` at this moment, we need to store it back.
exec, _ := t.executors.LoadOrStore(connID, executor)
if exec != executor {
// It can happen in such situation:
// 1. -> timeout close : locking `executor`
// 2. -> request-1 : waiting on `executor`
// 3. timeout close -> : unlocking `executor`, removing it from `executors`
// 4. -> request-2 : creating `exec`, storing into `executors`, locking `exec`
// 5. -request-1-> : locking `executor`, trying to store it into `executors`
// at 5. we get `request-1` locking `executor`, `request-2` locking `exec` and only `exec` stored
// in `executors`. It means that `request-2` and all subsequent events will be executed in parallel
// with `request-1`.
err = errors.Errorf("race condition, parallel request execution: %v", connID)
return
}
conn, err = t.request(ctx, request, executor)
})
} else {
conn, err = t.request(ctx, request, executor)
}
})

return conn, err
}

func (t *timeoutServer) request(ctx context.Context, request *networkservice.NetworkServiceRequest, executor *serialize.Executor) (*networkservice.Connection, error) {
func (t *timeoutServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
logEntry := log.Entry(ctx).WithField("timeoutServer", "request")

connID := request.GetConnection().GetId()
Expand All @@ -109,7 +70,7 @@ func (t *timeoutServer) request(ctx context.Context, request *networkservice.Net
return nil, err
}

timer, err := t.createTimer(ctx, conn, executor)
timer, err := t.createTimer(ctx, conn)
if err != nil {
if _, closeErr := next.Server(ctx).Close(ctx, conn); closeErr != nil {
err = errors.Wrapf(err, "error attempting to close failed connection %v: %+v", connID, closeErr)
Expand All @@ -122,9 +83,14 @@ func (t *timeoutServer) request(ctx context.Context, request *networkservice.Net
return conn, nil
}

func (t *timeoutServer) createTimer(ctx context.Context, conn *networkservice.Connection, executor *serialize.Executor) (*time.Timer, error) {
func (t *timeoutServer) createTimer(ctx context.Context, conn *networkservice.Connection) (*time.Timer, error) {
logEntry := log.Entry(ctx).WithField("timeoutServer", "createTimer")

executor := serialize.Executor(ctx)
if executor == nil {
return nil, errors.New("no executor provided")
}

expireTime, err := ptypes.Timestamp(conn.GetPath().GetPathSegments()[conn.GetPath().GetIndex()-1].GetExpires())
if err != nil {
return nil, err
Expand All @@ -134,59 +100,38 @@ func (t *timeoutServer) createTimer(ctx context.Context, conn *networkservice.Co

timerPtr := new(*time.Timer)
*timerPtr = time.AfterFunc(time.Until(expireTime), func() {
executor.AsyncExec(func() {
if err := <-executor.AsyncExec(func() {
if timer, ok := t.timers.Load(conn.GetId()); !ok || timer != *timerPtr {
logEntry.Warnf("timer has been already stopped: %v", conn.GetId())
return
}
if err := t.close(t.ctx, conn, next.Server(ctx)); err != nil {
logEntry.Errorf("failed to close timed out connection: %v %+v", conn.GetId(), err)
}
})
executor.Cancel()
}); err != nil {
logEntry.Warnf("connection has been already closed: %v", conn.GetId())
}
})

return *timerPtr, nil
}

func (t *timeoutServer) Close(ctx context.Context, conn *networkservice.Connection) (_ *empty.Empty, err error) {
logEntry := log.Entry(ctx).WithField("timeoutServer", "Close")

executor, ok := t.executors.Load(conn.GetId())
if ok {
<-executor.AsyncExec(func() {
var exec *serialize.Executor
if exec, ok = t.executors.Load(conn.GetId()); ok && exec == executor {
err = t.close(ctx, conn, next.Server(ctx))
} else {
ok = false
}
})
}
if !ok {
logEntry.Warnf("connection has been already closed: %v", conn.GetId())
return &empty.Empty{}, nil
}

return &empty.Empty{}, err
return &empty.Empty{}, t.close(ctx, conn, next.Server(ctx))
}

func (t *timeoutServer) close(ctx context.Context, conn *networkservice.Connection, nextServer networkservice.NetworkServiceServer) error {
logEntry := log.Entry(ctx).WithField("timeoutServer", "close")

timer, ok := t.timers.LoadAndDelete(conn.GetId())
if ok {
timer.Stop()
} else {
// Last time we failed to close the Connection, let's do it again.
logEntry.Warnf("retrying to close the connection: %v", conn.GetId())
if !ok {
logEntry.Warnf("connection has been already closed: %v", conn.GetId())
return nil
}
timer.Stop()

_, err := nextServer.Close(ctx, conn)
if err == nil {
// If `nextServer.Close()` returns an error, the Connection is not truly closed, so we don't want to delete
// the related executor.
t.executors.Delete(conn.GetId())
}

return err
}
2 changes: 2 additions & 0 deletions pkg/networkservice/common/timeout/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

"github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/kernel"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/serialize"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/timeout"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/updatepath"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/updatetoken"
Expand Down Expand Up @@ -61,6 +62,7 @@ func testClient(ctx context.Context, server networkservice.NetworkServiceServer,
adapters.NewServerToClient(
chain.NewNetworkServiceServer(
updatepath.NewServer(serverName),
serialize.NewServer(),
timeout.NewServer(ctx),
mechanisms.NewServer(map[string]networkservice.NetworkServiceServer{
kernelmech.MECHANISM: server,
Expand Down

0 comments on commit b8d6686

Please sign in to comment.