From 07ced33cad484b6713a8d0b9497ed8a443f9cd9f Mon Sep 17 00:00:00 2001 From: Vladimir Popov Date: Tue, 14 Sep 2021 21:03:53 +0700 Subject: [PATCH] Add endpoint.Combine (#1077) Signed-off-by: Vladimir Popov --- pkg/networkservice/chains/endpoint/combine.go | 42 +++ .../chains/endpoint/combine_monitor_server.go | 142 ++++++++ .../chains/endpoint/combine_test.go | 324 ++++++++++++++++++ 3 files changed, 508 insertions(+) create mode 100644 pkg/networkservice/chains/endpoint/combine.go create mode 100644 pkg/networkservice/chains/endpoint/combine_monitor_server.go create mode 100644 pkg/networkservice/chains/endpoint/combine_test.go diff --git a/pkg/networkservice/chains/endpoint/combine.go b/pkg/networkservice/chains/endpoint/combine.go new file mode 100644 index 000000000..56d98f644 --- /dev/null +++ b/pkg/networkservice/chains/endpoint/combine.go @@ -0,0 +1,42 @@ +// Copyright (c) 2021 Doc.ai and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package endpoint + +import "github.com/networkservicemesh/api/pkg/api/networkservice" + +// Combine returns a new combined endpoint: +// * networkservice.NetworkServiceServer created by combineFun(eps) +// * networkservice.MonitorConnectionServer part is managed in the following way: +// * networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER is merged to single event from all endpoints +// * rest events just go with no changes from all endpoints +func Combine(combineFun func(servers []networkservice.NetworkServiceServer) networkservice.NetworkServiceServer, eps ...Endpoint) Endpoint { + var servers []networkservice.NetworkServiceServer + monitorServers := make(map[networkservice.MonitorConnectionServer]int) + for _, ep := range eps { + servers = append(servers, ep) + if _, ok := monitorServers[ep]; !ok { + monitorServers[ep] = len(monitorServers) + } + } + + return &endpoint{ + NetworkServiceServer: combineFun(servers), + MonitorConnectionServer: &combineMonitorServer{ + monitorServers: monitorServers, + }, + } +} diff --git a/pkg/networkservice/chains/endpoint/combine_monitor_server.go b/pkg/networkservice/chains/endpoint/combine_monitor_server.go new file mode 100644 index 000000000..60098f4c4 --- /dev/null +++ b/pkg/networkservice/chains/endpoint/combine_monitor_server.go @@ -0,0 +1,142 @@ +// Copyright (c) 2021 Doc.ai and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package endpoint + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/pkg/errors" + + "github.com/networkservicemesh/api/pkg/api/networkservice" +) + +type combineMonitorServer struct { + monitorServers map[networkservice.MonitorConnectionServer]int +} + +func (m *combineMonitorServer) MonitorConnections(selector *networkservice.MonitorScopeSelector, rawSrv networkservice.MonitorConnection_MonitorConnectionsServer) error { + ctx, cancel := context.WithCancel(rawSrv.Context()) + defer cancel() + + var initChs []chan *networkservice.ConnectionEvent + for range m.monitorServers { + initChs = append(initChs, make(chan *networkservice.ConnectionEvent, 1)) + } + + errCh := make(chan error, len(initChs)) + + var monitorErr atomic.Value + for monitorServer, i := range m.monitorServers { + go startMonitorConnectionsServer(ctx, cancel, initChs[i], errCh, selector, rawSrv, monitorServer, &monitorErr) + } + processInitEvent(ctx, initChs, errCh, rawSrv) + + <-ctx.Done() + + var err error + if rv := monitorErr.Load(); rv != nil { + err = rv.(error) + } + return err +} + +func startMonitorConnectionsServer( + ctx context.Context, cancel context.CancelFunc, + initCh chan<- *networkservice.ConnectionEvent, errCh <-chan error, + selector *networkservice.MonitorScopeSelector, rawSrv networkservice.MonitorConnection_MonitorConnectionsServer, + monitorServer networkservice.MonitorConnectionServer, + monitorErr *atomic.Value, +) { + srv := &combineMonitorConnectionsServer{ + ctx: ctx, + initCh: initCh, + errCh: errCh, + MonitorConnection_MonitorConnectionsServer: rawSrv, + } + srv.initWg.Add(1) + + defer func() { + cancel() + srv.initOnce.Do(srv.initWg.Done) + }() + + if err := monitorServer.MonitorConnections(selector, srv); err != nil { + monitorErr.Store(err) + } +} + +func processInitEvent( + ctx context.Context, + initChs []chan *networkservice.ConnectionEvent, errCh chan error, + rawSrv networkservice.MonitorConnection_MonitorConnectionsServer, +) { + defer close(errCh) + + initEvent := &networkservice.ConnectionEvent{ + Type: networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER, + Connections: make(map[string]*networkservice.Connection), + } + for _, initCh := range initChs { + select { + case <-ctx.Done(): + return + case event := <-initCh: + for id, conn := range event.Connections { + initEvent.Connections[id] = conn + } + } + } + + if initErr := rawSrv.Send(initEvent); initErr != nil { + for i := 0; i < len(initChs); i++ { + errCh <- initErr + } + } +} + +type combineMonitorConnectionsServer struct { + ctx context.Context + initCh chan<- *networkservice.ConnectionEvent + initOnce sync.Once + initWg sync.WaitGroup + errCh <-chan error + + networkservice.MonitorConnection_MonitorConnectionsServer +} + +func (m *combineMonitorConnectionsServer) Send(event *networkservice.ConnectionEvent) error { + switch event.Type { + case networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER: + err := errors.New("double sending initial state transfer") + m.initOnce.Do(func() { + defer m.initWg.Done() + + m.initCh <- event + err = <-m.errCh + }) + return err + default: + m.initWg.Wait() + return m.MonitorConnection_MonitorConnectionsServer.Send(event) + } +} + +func (m *combineMonitorConnectionsServer) Context() context.Context { + return m.ctx +} diff --git a/pkg/networkservice/chains/endpoint/combine_test.go b/pkg/networkservice/chains/endpoint/combine_test.go new file mode 100644 index 000000000..969d5c556 --- /dev/null +++ b/pkg/networkservice/chains/endpoint/combine_test.go @@ -0,0 +1,324 @@ +// Copyright (c) 2021 Doc.ai and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package endpoint_test + +import ( + "context" + "io" + "net/url" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + "google.golang.org/grpc" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel" + "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/memif" + + "github.com/networkservicemesh/sdk/pkg/networkservice/chains/endpoint" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/begin" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/monitor" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/updatepath" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" + "github.com/networkservicemesh/sdk/pkg/tools/grpcutils" +) + +func startEndpoint(ctx context.Context, t *testing.T, e endpoint.Endpoint) *grpc.ClientConn { + listenOn := &url.URL{Scheme: "tcp", Path: "127.0.0.1:0"} + + require.Empty(t, endpoint.Serve(ctx, listenOn, e)) + + cc, err := grpc.Dial(grpcutils.URLToTarget(listenOn), grpc.WithInsecure()) + require.NoError(t, err) + + return cc +} + +func TestCombine(t *testing.T) { + var samples = []struct { + name string + mechanism *networkservice.Mechanism + }{ + { + name: "Kernel", + mechanism: kernel.New(""), + }, + { + name: "Memif", + mechanism: memif.New(""), + }, + } + + for _, s := range samples { + t.Run(s.name, func(t *testing.T) { + // nolint:scopelint + testCombine(t, s.mechanism) + }) + } +} + +func testCombine(t *testing.T, mechanism *networkservice.Mechanism) { + t.Cleanup(func() { goleak.VerifyNone(t) }) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + e := endpoint.Combine(func(servers []networkservice.NetworkServiceServer) networkservice.NetworkServiceServer { + return mechanisms.NewServer(map[string]networkservice.NetworkServiceServer{ + kernel.MECHANISM: servers[0], + memif.MECHANISM: servers[1], + }) + }, newTestEndpoint(ctx, kernel.MECHANISM), newTestEndpoint(ctx, memif.MECHANISM)) + + cc := startEndpoint(ctx, t, e) + defer func() { _ = cc.Close() }() + + c := next.NewNetworkServiceClient( + updatepath.NewClient("client"), + networkservice.NewNetworkServiceClient(cc), + ) + + stream, err := networkservice.NewMonitorConnectionClient(cc).MonitorConnections(ctx, &networkservice.MonitorScopeSelector{}) + require.NoError(t, err) + + // 1. Receive initial event + event, err := stream.Recv() + require.NoError(t, err) + require.Equal(t, (&networkservice.ConnectionEvent{ + Type: networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER, + }).String(), event.String()) + + // 2. Request and receive UPDATE event + conn, err := c.Request(ctx, &networkservice.NetworkServiceRequest{ + Connection: new(networkservice.Connection), + MechanismPreferences: []*networkservice.Mechanism{mechanism.Clone()}, + }) + require.NoError(t, err) + require.Equal(t, mechanism.String(), conn.GetMechanism().String()) + require.Len(t, conn.GetPath().GetPathSegments(), 2) + require.Equal(t, mechanism.Type, conn.GetNextPathSegment().GetName()) + + event, err = stream.Recv() + require.NoError(t, err) + require.Equal(t, (&networkservice.ConnectionEvent{ + Type: networkservice.ConnectionEventType_UPDATE, + Connections: map[string]*networkservice.Connection{ + conn.GetNextPathSegment().GetId(): { + Id: conn.GetNextPathSegment().GetId(), + Mechanism: conn.GetMechanism(), + Path: &networkservice.Path{ + Index: 1, + PathSegments: conn.GetPath().GetPathSegments(), + }, + }, + }, + }).String(), event.String()) + + // 3. Close and receive DELETE event + _, err = c.Close(ctx, conn.Clone()) + require.NoError(t, err) + + event, err = stream.Recv() + require.NoError(t, err) + require.Equal(t, (&networkservice.ConnectionEvent{ + Type: networkservice.ConnectionEventType_DELETE, + Connections: map[string]*networkservice.Connection{ + conn.GetNextPathSegment().GetId(): { + Id: conn.GetNextPathSegment().GetId(), + Mechanism: conn.GetMechanism(), + Path: &networkservice.Path{ + Index: 1, + PathSegments: conn.GetPath().GetPathSegments(), + }, + }, + }, + }).String(), event.String()) +} + +func TestSwitchEndpoint_InitialStateTransfer(t *testing.T) { + t.Cleanup(func() { goleak.VerifyNone(t) }) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + e := endpoint.Combine(func(servers []networkservice.NetworkServiceServer) networkservice.NetworkServiceServer { + return mechanisms.NewServer(map[string]networkservice.NetworkServiceServer{ + kernel.MECHANISM: servers[0], + memif.MECHANISM: servers[1], + }) + }, newTestEndpoint(ctx, kernel.MECHANISM), newTestEndpoint(ctx, memif.MECHANISM)) + + cc := startEndpoint(ctx, t, e) + defer func() { _ = cc.Close() }() + + c := next.NewNetworkServiceClient( + updatepath.NewClient("client"), + networkservice.NewNetworkServiceClient(cc), + ) + + var conns []*networkservice.Connection + for _, mechanism := range []*networkservice.Mechanism{kernel.New(""), memif.New("")} { + conn, err := c.Request(ctx, &networkservice.NetworkServiceRequest{ + Connection: new(networkservice.Connection), + MechanismPreferences: []*networkservice.Mechanism{mechanism.Clone()}, + }) + require.NoError(t, err) + require.Equal(t, mechanism.String(), conn.GetMechanism().String()) + require.Len(t, conn.GetPath().GetPathSegments(), 2) + require.Equal(t, mechanism.Type, conn.GetNextPathSegment().GetName()) + + conns = append(conns, conn) + } + + expectedEvent := &networkservice.ConnectionEvent{ + Type: networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER, + Connections: make(map[string]*networkservice.Connection), + } + for _, conn := range conns { + expectedEvent.Connections[conn.GetNextPathSegment().GetId()] = &networkservice.Connection{ + Id: conn.GetNextPathSegment().GetId(), + Mechanism: conn.GetMechanism(), + Path: &networkservice.Path{ + Index: 1, + PathSegments: conn.GetPath().GetPathSegments(), + }, + } + } + + ignoreCurrent := goleak.IgnoreCurrent() + streamCtx, cancelStream := context.WithCancel(ctx) + + stream, err := networkservice.NewMonitorConnectionClient(cc).MonitorConnections(streamCtx, &networkservice.MonitorScopeSelector{}) + require.NoError(t, err) + + event, err := stream.Recv() + require.NoError(t, err) + require.Equal(t, expectedEvent.String(), event.String()) + + cancelStream() + goleak.VerifyNone(t, ignoreCurrent) +} + +func TestSwitchEndpoint_DuplicateEndpoints(t *testing.T) { + t.Cleanup(func() { goleak.VerifyNone(t) }) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + monitorCtx, cancelMonitor := context.WithCancel(ctx) + + duplicate := newTestEndpoint(monitorCtx, "duplicate") + e := endpoint.Combine(func(servers []networkservice.NetworkServiceServer) networkservice.NetworkServiceServer { + return mechanisms.NewServer(map[string]networkservice.NetworkServiceServer{ + kernel.MECHANISM: servers[0], + memif.MECHANISM: servers[1], + }) + }, duplicate, duplicate) + + cc := startEndpoint(ctx, t, e) + defer func() { _ = cc.Close() }() + + c := next.NewNetworkServiceClient( + updatepath.NewClient("client"), + networkservice.NewNetworkServiceClient(cc), + ) + + stream, err := networkservice.NewMonitorConnectionClient(cc).MonitorConnections(ctx, &networkservice.MonitorScopeSelector{}) + require.NoError(t, err) + + event, err := stream.Recv() + require.NoError(t, err) + require.Equal(t, networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER, event.GetType()) + + var conns []*networkservice.Connection + for _, mechanism := range []*networkservice.Mechanism{kernel.New(""), memif.New("")} { + var conn *networkservice.Connection + conn, err = c.Request(ctx, &networkservice.NetworkServiceRequest{ + Connection: new(networkservice.Connection), + MechanismPreferences: []*networkservice.Mechanism{mechanism.Clone()}, + }) + require.NoError(t, err) + + conns = append(conns, conn) + + event, err = stream.Recv() + require.NoError(t, err) + require.Equal(t, (&networkservice.ConnectionEvent{ + Type: networkservice.ConnectionEventType_UPDATE, + Connections: map[string]*networkservice.Connection{ + conn.GetNextPathSegment().GetId(): { + Id: conn.GetNextPathSegment().GetId(), + Mechanism: conn.GetMechanism(), + Path: &networkservice.Path{ + Index: 1, + PathSegments: conn.GetPath().GetPathSegments(), + }, + }, + }, + }).String(), event.String()) + } + + for _, conn := range conns { + _, err = c.Close(ctx, conn.Clone()) + require.NoError(t, err) + + event, err = stream.Recv() + require.NoError(t, err) + require.Equal(t, (&networkservice.ConnectionEvent{ + Type: networkservice.ConnectionEventType_DELETE, + Connections: map[string]*networkservice.Connection{ + conn.GetNextPathSegment().GetId(): { + Id: conn.GetNextPathSegment().GetId(), + Mechanism: conn.GetMechanism(), + Path: &networkservice.Path{ + Index: 1, + PathSegments: conn.GetPath().GetPathSegments(), + }, + }, + }, + }).String(), event.String()) + } + + cancelMonitor() + + _, err = stream.Recv() + require.ErrorIs(t, err, io.EOF) +} + +type testEndpoint struct { + networkservice.NetworkServiceServer + networkservice.MonitorConnectionServer +} + +func newTestEndpoint(ctx context.Context, name string) *testEndpoint { + e := new(testEndpoint) + e.NetworkServiceServer = next.NewNetworkServiceServer( + updatepath.NewServer(name), + begin.NewServer(), + monitor.NewServer(ctx, &e.MonitorConnectionServer), + ) + return e +} + +func (e *testEndpoint) Register(s *grpc.Server) { + grpcutils.RegisterHealthServices(s, e) + networkservice.RegisterNetworkServiceServer(s, e) + networkservice.RegisterMonitorConnectionServer(s, e) +}