Skip to content

Commit

Permalink
Rework serialize
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Popov <[email protected]>
  • Loading branch information
Vladimir Popov committed Nov 16, 2020
1 parent 43c4916 commit b19c7ef
Show file tree
Hide file tree
Showing 14 changed files with 220 additions and 292 deletions.
4 changes: 0 additions & 4 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,3 @@ issues:
linters:
- gocyclo
text: "processEvent"
- path: pkg/networkservice/common/serialize/common.go
linters:
- scopelint
text: "Using the variable on range scope `shouldRetry` in function literal"
2 changes: 1 addition & 1 deletion pkg/networkservice/chains/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ func NewClient(ctx context.Context, name string, onHeal *networkservice.NetworkS
rv = chain.NewNetworkServiceClient(
append(
append([]networkservice.NetworkServiceClient{
serialize.NewClient(),
authorize.NewClient(),
updatepath.NewClient(name),
serialize.NewClient(),
heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc), onHeal),
refresh.NewClient(ctx),
}, additionalFunctionality...),
Expand Down
2 changes: 1 addition & 1 deletion pkg/networkservice/chains/endpoint/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ func NewServer(ctx context.Context, name string, authzServer networkservice.Netw
rv.NetworkServiceServer = chain.NewNamedNetworkServiceServer(
name,
append([]networkservice.NetworkServiceServer{
serialize.NewServer(),
authzServer,
updatepath.NewServer(name),
serialize.NewServer(),
// `timeout` uses ctx as a context for the timeout Close and it closes only the subsequent chain, so
// chain elements before the `timeout` in chain shouldn't make any updates to the Close context and
// shouldn't be closed on Connection Close.
Expand Down
45 changes: 21 additions & 24 deletions pkg/networkservice/common/serialize/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,29 @@

# Implementation

## serializeServer, serializeClient
## serializer

serialize chain elements keep [serialize.executorMap](https://github.com/networkservicemesh/sdk/blob/master/pkg/networkservice/common/serialize/gen.go#L25)
mapping incoming `Connection.ID` to a [serialize.Executor](https://github.com/edwarnicke/serialize/blob/master/serialize.go).
New executors are created on Request. Close deletes existing executor for the request.
`serializer` maps incoming `Connection.ID` to a [serialize.Executor](https://github.com/edwarnicke/serialize/blob/master/serialize.go),
request count and state. New mappings are created on Request. If request count and state are equal 0, mapping becomes
free and can be cleaned up. Request count is incremented with each Request before requesting the next chain element and
decrements after. All request count modifications are performed under the `serializer.executor`. State is being changed
in Request from 0 to random, or in Close from not 0 to 0. If state is equal 0, connection is closed so all incoming
Close events should not be processed.

To make possible a new chain element firing asynchronously with `Request`, `Close` events, serialize chain elements wraps
per-connection executor with [request executor](https://github.com/edwarnicke/serialize/blob/master/executor.go#L35),
[close executor](https://github.com/edwarnicke/serialize/blob/master/executor.go#L50) and inserts them into the `Request`
context. Such thing is not performed for the `Close` context because the executor will already be cancelled by the time
it becomes free.
per-connection executor with [request executor](https://github.com/edwarnicke/serialize/blob/master/executor.go#L36),
[close executor](https://github.com/edwarnicke/serialize/blob/master/executor.go#L40) and inserts them into the `Request`
context. Such a thing is not being performed for the `Close` context because the executor will already be cancelled by
the time it becomes free. For the generated events state should be equal the original state for the Request/CloseExecutor.

Correct `Request` firing chain element example:
```go
func (s *requestServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
executor := serialize.RequestExecutor(ctx)
go func() {
executor.AsyncExec(func() {
executor.AsyncExec(func() error {
_, _ = next.Server(ctx).Request(serialize.WithExecutorsFromContext(context.TODO(), ctx), request)
return nil
})
}()

Expand All @@ -40,25 +44,18 @@ func (s *closeServer) Request(ctx context.Context, request *networkservice.Netwo

executor := serialize.CloseExecutor(ctx)
go func() {
executor.AsyncExec(func() {
executor.AsyncExec(func() error {
_, _ = next.Server(ctx).Close(context.TODO(), conn)
return errors.New() // I don't want to close the chain.
})
}()
go func() {
executor.AsyncExec(func() error {
_, _ = next.Server(ctx).Close(context.TODO(), conn)
return nil // I want to close the chain.
})
}()

return conn, err
}
```

# race condition case

There is a possible case for the race condition:
```
1. -> close : locking `executor`
2. -> request-1 : waiting on `executor`
3. close -> : unlocking `executor`, removing it from `executors`
4. -> request-2 : creating `exec`, storing into `executors`, locking `exec`
5. -request-1-> : locking `executor`, trying to store it into `executors`
```
at 5. we get `request-1` locking `executor`, `request-2` locking `exec` and only `exec` stored in `executors`. It means
that `request-2` and all subsequent events will be executed in parallel with `request-1`. If such case happens, `request-1`
fails with _race condition_ error.
12 changes: 8 additions & 4 deletions pkg/networkservice/common/serialize/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,27 @@ import (
)

type serializeClient struct {
executors executorMap
serializer
}

// NewClient returns a new serialize client chain element
func NewClient() networkservice.NetworkServiceClient {
return new(serializeClient)
return &serializeClient{
serializer{
executors: map[string]*executor{},
},
}
}

func (c *serializeClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (conn *networkservice.Connection, err error) {
connID := request.GetConnection().GetId()
return requestConnection(ctx, &c.executors, connID, func(requestCtx context.Context) (*networkservice.Connection, error) {
return c.requestConnection(ctx, connID, func(requestCtx context.Context) (*networkservice.Connection, error) {
return next.Client(ctx).Request(requestCtx, request, opts...)
})
}

func (c *serializeClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (_ *empty.Empty, err error) {
return closeConnection(ctx, &c.executors, conn.GetId(), func(closeCtx context.Context) (*empty.Empty, error) {
return c.closeConnection(ctx, conn.GetId(), func(closeCtx context.Context) (*empty.Empty, error) {
return next.Client(ctx).Close(closeCtx, conn, opts...)
})
}
109 changes: 0 additions & 109 deletions pkg/networkservice/common/serialize/common.go

This file was deleted.

48 changes: 29 additions & 19 deletions pkg/networkservice/common/serialize/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,52 @@
package serialize

import (
"github.com/edwarnicke/serialize"
"sync/atomic"

"github.com/pkg/errors"
)

// Executor is same as serialize.Executor except that it returns error channel
type Executor interface {
AsyncExec(f func()) <-chan error
AsyncExec(f func() error) <-chan error
}

type executorFunc func(f func()) <-chan error
type executorFunc func(f func() error) <-chan error

func (ef executorFunc) AsyncExec(f func()) <-chan error {
func (ef executorFunc) AsyncExec(f func() error) <-chan error {
return ef(f)
}

func newRequestExecutor(executor *serialize.Executor, id string, executors *executorMap) Executor {
return executorFunc(func(f func()) <-chan error {
func newRequestExecutor(exec *executor, id string) Executor {
return newExecutor(exec, exec.state, id)
}

func newCloseExecutor(exec *executor, id string, clean func()) Executor {
state := exec.state
return executorFunc(func(f func() error) <-chan error {
executor := newExecutor(exec, state, id)
return executor.AsyncExec(func() error {
if err := f(); err != nil {
return err
}
atomic.StoreUint32(&exec.state, 0)
clean()
return nil
})
})
}

func newExecutor(exec *executor, state uint32, id string) Executor {
return executorFunc(func(f func() error) <-chan error {
errCh := make(chan error, 1)
executor.AsyncExec(func() {
if exec, ok := executors.Load(id); !ok || exec != executor {
exec.executor.AsyncExec(func() {
if exec.state != state {
errCh <- errors.Errorf("connection is already closed: %v", id)
return
}
f()
errCh <- f()
close(errCh)
})
return errCh
})
}

func newCloseExecutor(executor *serialize.Executor, id string, executors *executorMap) Executor {
return executorFunc(func(f func()) <-chan error {
exec := newRequestExecutor(executor, id, executors)
return exec.AsyncExec(func() {
f()
executors.Delete(id)
})
})
}
75 changes: 0 additions & 75 deletions pkg/networkservice/common/serialize/executor_map.gen.go

This file was deleted.

Loading

0 comments on commit b19c7ef

Please sign in to comment.