diff --git a/.golangci.yml b/.golangci.yml index 7f824b1..ee17b15 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -29,14 +29,14 @@ linters-settings: goimports: local-prefixes: github.com/networkservicemesh gocyclo: - min-complexity: 15 + min-complexity: 20 maligned: suggest-new: true dupl: threshold: 150 funlen: - Lines: 180 - Statements: 80 + Lines: 225 + Statements: 100 goconst: min-len: 2 min-occurrences: 2 diff --git a/internal/imports/imports_linux.go b/internal/imports/imports_linux.go index dd79b6e..d88ea7b 100644 --- a/internal/imports/imports_linux.go +++ b/internal/imports/imports_linux.go @@ -3,6 +3,7 @@ package imports import ( _ "context" + _ "fmt" _ "github.com/antonfisher/nested-logrus-formatter" _ "github.com/edwarnicke/debug" _ "github.com/edwarnicke/grpcfd" @@ -20,6 +21,7 @@ import ( _ "github.com/networkservicemesh/sdk/pkg/networkservice/common/retry" _ "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" _ "github.com/networkservicemesh/sdk/pkg/tools/awarenessgroups" + _ "github.com/networkservicemesh/sdk/pkg/tools/grpcutils" _ "github.com/networkservicemesh/sdk/pkg/tools/log" _ "github.com/networkservicemesh/sdk/pkg/tools/log/logruslogger" _ "github.com/networkservicemesh/sdk/pkg/tools/nsurl" diff --git a/main.go b/main.go index e77c189..c6d0614 100644 --- a/main.go +++ b/main.go @@ -21,6 +21,7 @@ package main import ( "context" + "fmt" "net/url" "os" "os/signal" @@ -50,6 +51,7 @@ import ( "github.com/networkservicemesh/sdk/pkg/networkservice/common/retry" "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" "github.com/networkservicemesh/sdk/pkg/tools/awarenessgroups" + "github.com/networkservicemesh/sdk/pkg/tools/grpcutils" "github.com/networkservicemesh/sdk/pkg/tools/log" "github.com/networkservicemesh/sdk/pkg/tools/log/logruslogger" "github.com/networkservicemesh/sdk/pkg/tools/nsurl" @@ -190,7 +192,7 @@ func main() { grpcfd.WithChainUnaryInterceptor(), ) - c := client.NewClient( + nsmClient := client.NewClient( ctx, client.WithClientURL(&config.ConnectTo), client.WithName(config.Name), @@ -208,28 +210,83 @@ func main() { client.WithDialOptions(dialOptions...), ) - c = retry.NewClient(c, retry.WithTryTimeout(config.RequestTimeout)) + nsmClient = retry.NewClient(nsmClient, retry.WithTryTimeout(config.RequestTimeout)) // ******************************************************************************** - log.FromContext(ctx).Infof("executing phase 5: connect to all passed services (time since start: %s)", time.Since(starttime)) + // Configure signal handling context // ******************************************************************************** signalCtx, cancelSignalCtx := notifyContext(ctx) defer cancelSignalCtx() + // ******************************************************************************** + // Create Network Service Manager monitorClient + // ******************************************************************************** + dialCtx, cancelDial := context.WithTimeout(signalCtx, config.DialTimeout) + defer cancelDial() + + log.FromContext(ctx).Infof("NSC: Connecting to Network Service Manager %v", config.ConnectTo.String()) + cc, err := grpc.DialContext(dialCtx, grpcutils.URLToTarget(&config.ConnectTo), dialOptions...) + if err != nil { + log.FromContext(ctx).Fatalf("failed dial to NSMgr: %v", err.Error()) + } + + monitorClient := networkservice.NewMonitorConnectionClient(cc) + + // ******************************************************************************** + log.FromContext(ctx).Infof("executing phase 5: connect to all passed services (time since start: %s)", time.Since(starttime)) + // ******************************************************************************** + for i := 0; i < len(config.NetworkServices); i++ { u := nsurl.NSURL(config.NetworkServices[i]) + + id := fmt.Sprintf("%s-%d", config.Name, i) + + monitorCtx, cancelMonitor := context.WithTimeout(signalCtx, config.RequestTimeout) + defer cancelMonitor() + + stream, err := monitorClient.MonitorConnections(monitorCtx, &networkservice.MonitorScopeSelector{ + PathSegments: []*networkservice.PathSegment{ + { + Id: id, + }, + }, + }) + if err != nil { + log.FromContext(ctx).Fatal(err.Error()) + } + + event, err := stream.Recv() + if err != nil { + log.FromContext(ctx).Fatal(err.Error()) + } + cancelMonitor() + mech := u.Mechanism() if mech.Type != memif.MECHANISM { - log.FromContext(ctx).Fatalf("mechanism type: %v is not supproted", mech.Type) + log.FromContext(ctx).Fatalf("mechanism type: %v is not supported", mech.Type) } request := &networkservice.NetworkServiceRequest{ Connection: &networkservice.Connection{ + Id: id, NetworkService: u.NetworkService(), Labels: u.Labels(), }, + MechanismPreferences: []*networkservice.Mechanism{ + mech, + }, + } + + for _, conn := range event.Connections { + path := conn.GetPath() + if path.Index == 1 && path.PathSegments[0].Id == id && conn.Mechanism.Type == u.Mechanism().Type { + request.Connection = conn + request.Connection.Path.Index = 0 + request.Connection.Id = id + break + } } - resp, err := c.Request(ctx, request) + resp, err := nsmClient.Request(ctx, request) if err != nil { log.FromContext(ctx).Fatalf("request has failed: %v", err.Error()) } @@ -237,7 +294,7 @@ func main() { defer func() { closeCtx, cancelClose := context.WithTimeout(ctx, config.RequestTimeout) defer cancelClose() - _, _ = c.Close(closeCtx, resp) + _, _ = nsmClient.Close(closeCtx, resp) }() }