Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialize chain elements #583

Merged
merged 12 commits into from
Dec 1, 2020
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/edwarnicke/exechelper v1.0.2
github.com/edwarnicke/grpcfd v0.0.0-20200920223154-d5b6e1f19bd0
github.com/edwarnicke/serialize v0.0.0-20200705214914-ebc43080eecf
github.com/edwarnicke/serialize v1.0.7
github.com/fsnotify/fsnotify v1.4.9
github.com/ghodss/yaml v1.0.0
github.com/golang/protobuf v1.4.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ github.com/edwarnicke/grpcfd v0.0.0-20200920223154-d5b6e1f19bd0 h1:FHjcIM6YU8DnC
github.com/edwarnicke/grpcfd v0.0.0-20200920223154-d5b6e1f19bd0/go.mod h1:rHihB9YvNMixz8rS+ZbwosI2kj65VLkeyYAI2M+/cGA=
github.com/edwarnicke/serialize v0.0.0-20200705214914-ebc43080eecf h1:/lViRfaDxKINb2X6kOR3EJKJGR+MxUvqfgtYt5nh+qc=
github.com/edwarnicke/serialize v0.0.0-20200705214914-ebc43080eecf/go.mod h1:XvbCO/QGsl3X8RzjBMoRpkm54FIAZH5ChK2j+aox7pw=
github.com/edwarnicke/serialize v1.0.7 h1:geX8vmyu8Ij2S5fFIXjy9gBDkKxXnrMIzMoDvV0Ddac=
github.com/edwarnicke/serialize v1.0.7/go.mod h1:y79KgU2P7ALH/4j37uTSIdNavHFNttqN7pzO6Y8B2aw=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down
6 changes: 4 additions & 2 deletions pkg/networkservice/chains/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,19 @@ package client
import (
"context"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"google.golang.org/grpc"

"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/authorize"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/heal"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanismtranslation"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/refresh"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/serialize"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/updatepath"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/updatetoken"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/chain"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/inject/injectpeer"

"github.com/networkservicemesh/sdk/pkg/tools/token"
)

Expand Down Expand Up @@ -60,6 +61,7 @@ func NewClient(ctx context.Context, name string, onHeal *networkservice.NetworkS
append([]networkservice.NetworkServiceClient{
authorize.NewClient(),
updatepath.NewClient(name),
serialize.NewClient(),
heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc), onHeal),
refresh.NewClient(ctx),
}, additionalFunctionality...),
Expand Down
2 changes: 2 additions & 0 deletions pkg/networkservice/chains/endpoint/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/serialize"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/updatetoken"
"github.com/networkservicemesh/sdk/pkg/tools/grpcutils"

Expand Down Expand Up @@ -65,6 +66,7 @@ func NewServer(ctx context.Context, name string, authzServer networkservice.Netw
append([]networkservice.NetworkServiceServer{
authzServer,
updatepath.NewServer(name),
serialize.NewServer(),
// `timeout` uses ctx as a context for the timeout Close and it closes only the subsequent chain, so
// chain elements before the `timeout` in chain shouldn't make any updates to the Close context and
// shouldn't be closed on Connection Close.
Expand Down
35 changes: 35 additions & 0 deletions pkg/networkservice/common/serialize/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Functional requirements

`Request`, `Close` events for the same `Connection.ID` should be executed in network service chain serially.

# Implementation

## serializeServer, serializeClient

Serialize chain elements use [multi executor](https://github.com/networkservicemesh/sdk/blob/master/pkg/tools/multiexecutor/multi_executor.go)
and stores per-id executor in the context.

Correct event firing chain element example:
```go
func (s *eventServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
executor := serialize.Executor(ctx)
go func() {
executor.AsyncExec(func() {
_, _ = next.Server(ctx).Request(serialize.WithExecutor(context.TODO(), executor), request)
})
}()

conn, err := next.Server(ctx).Request(ctx, request)
if err != nil {
return nil, err
}

go func() {
executor.AsyncExec(func() {
_, _ = next.Server(ctx).Close(serialize.WithExecutor(context.TODO(), executor), conn)
})
}()

return conn, nil
}
```
52 changes: 52 additions & 0 deletions pkg/networkservice/common/serialize/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (c) 2020 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package serialize

import (
"context"

"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc"

"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
)

type serializeClient struct {
executor multiExecutor
}

// NewClient returns a new serialize client chain element
func NewClient() networkservice.NetworkServiceClient {
return new(serializeClient)
}

func (c *serializeClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (conn *networkservice.Connection, err error) {
connID := request.GetConnection().GetId()
<-c.executor.AsyncExec(connID, func() {
conn, err = next.Client(ctx).Request(WithExecutor(ctx, c.executor.Executor(connID)), request, opts...)
})
return conn, err
}

func (c *serializeClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (_ *empty.Empty, err error) {
<-c.executor.AsyncExec(conn.GetId(), func() {
_, err = next.Client(ctx).Close(WithExecutor(ctx, c.executor.Executor(conn.GetId())), conn, opts...)
})
return new(empty.Empty), err
}
48 changes: 48 additions & 0 deletions pkg/networkservice/common/serialize/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) 2020 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package serialize

import (
"context"
)

const (
executorKey contextKeyType = "executor"
)

type contextKeyType string

// IExecutor is a serialize.Executor interface type
type IExecutor interface {
Copy link
Member

@denis-tingaikin denis-tingaikin Dec 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please lets follow go interface naming convention https://golang.org/doc/effective_go.html#interface-names

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed:

IExecutor    ->  Executor
.Executor()  ->  .GetExecutor()

AsyncExec(f func()) <-chan struct{}
}

// WithExecutor wraps `parent` in a new context with IExecutor
func WithExecutor(parent context.Context, executor IExecutor) context.Context {
if parent == nil {
panic("cannot create context from nil parent")
}
return context.WithValue(parent, executorKey, executor)
}

// Executor returns IExecutor
func Executor(ctx context.Context) IExecutor {
if executor, ok := ctx.Value(executorKey).(IExecutor); ok {
return executor
}
return nil
}
23 changes: 23 additions & 0 deletions pkg/networkservice/common/serialize/executor_func.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) 2020 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package serialize

type executorFunc func(f func()) <-chan struct{}

func (e executorFunc) AsyncExec(f func()) <-chan struct{} {
return e(f)
}
66 changes: 66 additions & 0 deletions pkg/networkservice/common/serialize/multi_executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (c) 2020 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package serialize

import (
"sync"

"github.com/edwarnicke/serialize"
)

type multiExecutor struct {
executors map[string]*refCountExecutor
executor serialize.Executor
once sync.Once
}

type refCountExecutor struct {
count int
executor serialize.Executor
}

func (e *multiExecutor) AsyncExec(id string, f func()) (ch <-chan struct{}) {
e.once.Do(func() {
e.executors = make(map[string]*refCountExecutor)
})

<-e.executor.AsyncExec(func() {
exec, ok := e.executors[id]
if !ok {
exec = new(refCountExecutor)
e.executors[id] = exec
}
exec.count++

ch = exec.executor.AsyncExec(func() {
f()
e.executor.AsyncExec(func() {
exec.count--
if exec.count == 0 {
delete(e.executors, id)
}
})
})
})
return ch
}

func (e *multiExecutor) Executor(id string) IExecutor {
return executorFunc(func(f func()) <-chan struct{} {
return e.AsyncExec(id, f)
})
}
51 changes: 51 additions & 0 deletions pkg/networkservice/common/serialize/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) 2020 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package serialize provides chain elements for serial Request, Close event processing
package serialize

import (
"context"

"github.com/golang/protobuf/ptypes/empty"
"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
)

type serializeServer struct {
executor multiExecutor
}

// NewServer returns a new serialize server chain element
func NewServer() networkservice.NetworkServiceServer {
return new(serializeServer)
}

func (s *serializeServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (conn *networkservice.Connection, err error) {
connID := request.GetConnection().GetId()
<-s.executor.AsyncExec(connID, func() {
conn, err = next.Server(ctx).Request(WithExecutor(ctx, s.executor.Executor(connID)), request)
})
return conn, err
}

func (s *serializeServer) Close(ctx context.Context, conn *networkservice.Connection) (_ *empty.Empty, err error) {
<-s.executor.AsyncExec(conn.GetId(), func() {
_, err = next.Server(ctx).Close(WithExecutor(ctx, s.executor.Executor(conn.GetId())), conn)
})
return new(empty.Empty), err
}
Loading