Skip to content

Commit

Permalink
Fix connect (networkservicemesh#585)
Browse files Browse the repository at this point in the history
* Added translateMechanism chain element

Signed-off-by: Tigran Manasyan <[email protected]>

* Rework connect.NewServer

Signed-off-by: Vladimir Popov <[email protected]>

* Add RefcountMap.LoadUnsafe()

Signed-off-by: Vladimir Popov <[email protected]>

* Fix connect

Signed-off-by: Vladimir Popov <[email protected]>

* Move translation client implementations to the related places

Signed-off-by: Vladimir Popov <[email protected]>

* Add new clients, rework connect

Signed-off-by: Vladimir Popov <[email protected]>

* Rename translation to mechanismtranslation

Signed-off-by: Vladimir Popov <[email protected]>

* Make RefcountMap.Delete() return bool

Signed-off-by: Vladimir Popov <[email protected]>

Co-authored-by: Tigran Manasyan <[email protected]>
Signed-off-by: Sergey Ershov <[email protected]>
  • Loading branch information
2 people authored and Sergey Ershov committed Dec 23, 2020
1 parent 9660a0a commit 4532192
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 228 deletions.
4 changes: 0 additions & 4 deletions pkg/networkservice/chains/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ package client
import (
"context"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/updatetoken"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/inject/injectpeer"

"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/authorize"
Expand All @@ -34,7 +31,6 @@ import (
"github.com/networkservicemesh/sdk/pkg/networkservice/common/updatetoken"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/chain"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/inject/injectpeer"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata"
"github.com/networkservicemesh/sdk/pkg/tools/token"
)

Expand Down
9 changes: 4 additions & 5 deletions pkg/networkservice/chains/nsmgr/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,10 @@ func NewServer(ctx context.Context, nsmRegistration *registryapi.NetworkServiceE
newRecvFD(), // Receive any files passed
interpose.NewServer(&interposeRegistry),
filtermechanisms.NewServer(&urlsRegistryServer),
connect.NewServer(
ctx,
client.NewClientFactory(nsmRegistration.Name,
addressof.NetworkServiceClient(
adapters.NewServerToClient(rv)),
connect.NewServer(ctx,
client.NewClientFactory(
nsmRegistration.Name,
addressof.NetworkServiceClient(adapters.NewServerToClient(rv)),
tokenGenerator,
newSendFDClient(), // Send passed files.
),
Expand Down
125 changes: 49 additions & 76 deletions pkg/networkservice/chains/nsmgr/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,14 @@ import (
"context"
"fmt"
"io/ioutil"
"net/url"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/pkg/errors"

"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
Expand Down Expand Up @@ -71,7 +64,7 @@ func TestNSMGR_RemoteUsecase_Parallel(t *testing.T) {

request := &networkservice.NetworkServiceRequest{
MechanismPreferences: []*networkservice.Mechanism{
{Cls: cls.LOCAL, Type: kernel.MECHANISM},
{Cls: cls.LOCAL, Type: kernelmech.MECHANISM},
},
Connection: &networkservice.Connection{
Id: "1",
Expand All @@ -88,8 +81,7 @@ func TestNSMGR_RemoteUsecase_Parallel(t *testing.T) {
_, err := sandbox.NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, domain.Nodes[0].NSMgr, counter)
require.NoError(t, err)
}()
nsc, err := sandbox.NewClient(ctx, sandbox.GenerateTestToken, domain.Nodes[1].NSMgr.URL)
require.NoError(t, err)
nsc := sandbox.NewClient(ctx, sandbox.GenerateTestToken, domain.Nodes[1].NSMgr.URL)

conn, err := nsc.Request(ctx, request.Clone())
require.NoError(t, err)
Expand All @@ -107,7 +99,9 @@ func TestNSMGR_RemoteUsecase_Parallel(t *testing.T) {
require.NotNil(t, conn)
require.Equal(t, 8, len(conn.Path.PathSegments))
require.Equal(t, int32(2), atomic.LoadInt32(&counter.Requests))

// Close.

e, err := nsc.Close(ctx, conn)
require.NoError(t, err)
require.NotNil(t, e)
Expand All @@ -130,7 +124,7 @@ func TestNSMGR_RemoteUsecase_BusyEndpoints(t *testing.T) {

request := &networkservice.NetworkServiceRequest{
MechanismPreferences: []*networkservice.Mechanism{
{Cls: cls.LOCAL, Type: kernel.MECHANISM},
{Cls: cls.LOCAL, Type: kernelmech.MECHANISM},
},
Connection: &networkservice.Connection{
Id: "1",
Expand Down Expand Up @@ -161,8 +155,7 @@ func TestNSMGR_RemoteUsecase_BusyEndpoints(t *testing.T) {
_, err := sandbox.NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, domain.Nodes[1].NSMgr, counter)
require.NoError(t, err)
}()
nsc, err := sandbox.NewClient(ctx, sandbox.GenerateTestToken, domain.Nodes[0].NSMgr.URL)
require.NoError(t, err)
nsc := sandbox.NewClient(ctx, sandbox.GenerateTestToken, domain.Nodes[0].NSMgr.URL)

conn, err := nsc.Request(ctx, request.Clone())
require.NoError(t, err)
Expand Down Expand Up @@ -234,6 +227,7 @@ func TestNSMGR_RemoteUsecase(t *testing.T) {
require.Equal(t, 8, len(conn.Path.PathSegments))

// Close.

e, err := nsc.Close(ctx, conn)
require.NoError(t, err)
require.NotNil(t, e)
Expand All @@ -256,16 +250,15 @@ func TestNSMGR_LocalUsecase(t *testing.T) {
Name: "final-endpoint",
NetworkServiceNames: []string{"my-service-remote"},
}
counter := &counterServer{}
_, err := sandbox.NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, domain.Nodes[0].NSMgr, counter)
require.NoError(t, err)

counter := &counterServer{}
_, err := sandbox.NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, domain.Nodes[0].NSMgr, counter)
require.NoError(t, err)

nsc := sandbox.NewClient(ctx, sandbox.GenerateTestToken, domain.Nodes[0].NSMgr.URL)

nsc := sandbox.NewClient(ctx, sandbox.GenerateTestToken, domain.Nodes[0].NSMgr.URL)

request := &networkservice.NetworkServiceRequest{
MechanismPreferences: []*networkservice.Mechanism{
{Cls: cls.LOCAL, Type: kernelmech.MECHANISM},
Expand All @@ -276,6 +269,7 @@ func TestNSMGR_LocalUsecase(t *testing.T) {
Context: &networkservice.ConnectionContext{},
},
}

conn, err := nsc.Request(ctx, request.Clone())
require.NoError(t, err)
require.NotNil(t, conn)
Expand All @@ -293,6 +287,7 @@ func TestNSMGR_LocalUsecase(t *testing.T) {
require.Equal(t, 5, len(conn2.Path.PathSegments))
require.Equal(t, int32(2), atomic.LoadInt32(&counter.Requests))
// Close.

e, err := nsc.Close(ctx, conn)
require.NoError(t, err)
require.NotNil(t, e)
Expand All @@ -314,18 +309,22 @@ func TestNSMGR_PassThroughRemote(t *testing.T) {
defer domain.Cleanup()

for i := 0; i < nodesCount; i++ {
additionalFunctionality := []networkservice.NetworkServiceServer{}
var additionalFunctionality []networkservice.NetworkServiceServer
if i != 0 {
k := i
// Passtrough to the node i-1
additionalFunctionality = []networkservice.NetworkServiceServer{
adapters.NewClientToServer(
newPassTroughClient(
[]*networkservice.Mechanism{
{Cls: cls.LOCAL, Type: kernel.MECHANISM},
},
fmt.Sprintf("my-service-remote-%v", i-1),
fmt.Sprintf("endpoint-%v", i-1),
domain.Nodes[i].NSMgr.URL)),
chain.NewNetworkServiceServer(
clienturl.NewServer(domain.Nodes[i].NSMgr.URL),
connect.NewServer(ctx,
sandbox.NewCrossConnectClientFactory(sandbox.GenerateTestToken,
newPassTroughClient(
fmt.Sprintf("my-service-remote-%v", k-1),
fmt.Sprintf("endpoint-%v", k-1)),
kernel.NewClient()),
append(spanhelper.WithTracingDial(), grpc.WithBlock(), grpc.WithInsecure())...,
),
),
}
}
nseReg := &registry.NetworkServiceEndpoint{
Expand All @@ -336,12 +335,11 @@ func TestNSMGR_PassThroughRemote(t *testing.T) {
require.NoError(t, err)
}

nsc, err := sandbox.NewClient(ctx, sandbox.GenerateTestToken, domain.Nodes[nodesCount-1].NSMgr.URL)
require.NoError(t, err)
nsc := sandbox.NewClient(ctx, sandbox.GenerateTestToken, domain.Nodes[nodesCount-1].NSMgr.URL)

request := &networkservice.NetworkServiceRequest{
MechanismPreferences: []*networkservice.Mechanism{
{Cls: cls.LOCAL, Type: kernel.MECHANISM},
{Cls: cls.LOCAL, Type: kernelmech.MECHANISM},
},
Connection: &networkservice.Connection{
Id: "1",
Expand Down Expand Up @@ -374,17 +372,21 @@ func TestNSMGR_PassThroughLocal(t *testing.T) {
defer domain.Cleanup()

for i := 0; i < nsesCount; i++ {
additionalFunctionality := []networkservice.NetworkServiceServer{}
var additionalFunctionality []networkservice.NetworkServiceServer
if i != 0 {
k := i
additionalFunctionality = []networkservice.NetworkServiceServer{
adapters.NewClientToServer(
newPassTroughClient(
[]*networkservice.Mechanism{
{Cls: cls.LOCAL, Type: kernel.MECHANISM},
},
fmt.Sprintf("my-service-remote-%v", i-1),
fmt.Sprintf("endpoint-%v", i-1),
domain.Nodes[0].NSMgr.URL)),
chain.NewNetworkServiceServer(
clienturl.NewServer(domain.Nodes[0].NSMgr.URL),
connect.NewServer(ctx,
sandbox.NewCrossConnectClientFactory(sandbox.GenerateTestToken,
newPassTroughClient(
fmt.Sprintf("my-service-remote-%v", k-1),
fmt.Sprintf("endpoint-%v", k-1)),
kernel.NewClient()),
append(spanhelper.WithTracingDial(), grpc.WithBlock(), grpc.WithInsecure())...,
),
),
}
}
nseReg := &registry.NetworkServiceEndpoint{
Expand All @@ -395,12 +397,11 @@ func TestNSMGR_PassThroughLocal(t *testing.T) {
require.NoError(t, err)
}

nsc, err := sandbox.NewClient(ctx, sandbox.GenerateTestToken, domain.Nodes[0].NSMgr.URL)
require.NoError(t, err)
nsc := sandbox.NewClient(ctx, sandbox.GenerateTestToken, domain.Nodes[0].NSMgr.URL)

request := &networkservice.NetworkServiceRequest{
MechanismPreferences: []*networkservice.Mechanism{
{Cls: cls.LOCAL, Type: kernel.MECHANISM},
{Cls: cls.LOCAL, Type: kernelmech.MECHANISM},
},
Connection: &networkservice.Connection{
Id: "1",
Expand All @@ -419,54 +420,26 @@ func TestNSMGR_PassThroughLocal(t *testing.T) {
}

type passThroughClient struct {
mechanismPreferences []*networkservice.Mechanism
networkService string
networkServiceEndpointName string
connectTo *url.URL
}

func newPassTroughClient(mechanismPreferences []*networkservice.Mechanism, networkService, networkServiceEndpointName string, connectTo *url.URL) *passThroughClient {
func newPassTroughClient(networkService, networkServiceEndpointName string) *passThroughClient {
return &passThroughClient{
mechanismPreferences: mechanismPreferences,
networkService: networkService,
networkServiceEndpointName: networkServiceEndpointName,
connectTo: connectTo,
}
}

func (p *passThroughClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) {
newCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

nsc, err := sandbox.NewClient(
newCtx, sandbox.GenerateTestToken, p.connectTo,
)
if err != nil {
return nil, err
}

newRequest := &networkservice.NetworkServiceRequest{
MechanismPreferences: p.mechanismPreferences,
Connection: &networkservice.Connection{
Id: request.Connection.Id,
NetworkService: p.networkService,
NetworkServiceEndpointName: p.networkServiceEndpointName,
Path: request.Connection.Path.Clone(),
Context: &networkservice.ConnectionContext{},
},
}
conn, err := nsc.Request(newCtx, newRequest)
if err != nil {
return nil, err
}

request.Connection.Path.PathSegments = conn.Path.PathSegments

func (c *passThroughClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) {
request.Connection.NetworkService = c.networkService
request.Connection.NetworkServiceEndpointName = c.networkServiceEndpointName
return next.Client(ctx).Request(ctx, request, opts...)
}

func (p *passThroughClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) {
conn = conn.Clone()
func (c *passThroughClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) {
conn.NetworkService = c.networkService
conn.NetworkServiceEndpointName = c.networkServiceEndpointName
return next.Client(ctx).Close(ctx, conn, opts...)
}

Expand Down
69 changes: 0 additions & 69 deletions pkg/networkservice/common/connect/DESIGN.md

This file was deleted.

Loading

0 comments on commit 4532192

Please sign in to comment.