Skip to content

Commit

Permalink
fix monitor
Browse files Browse the repository at this point in the history
Signed-off-by: NikitaSkrynnik <[email protected]>
  • Loading branch information
NikitaSkrynnik committed Jul 1, 2024
1 parent ce7b9a8 commit a3d6c41
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 33 deletions.
3 changes: 3 additions & 0 deletions pkg/networkservice/common/monitor/client_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ func newClientFilter(client networkservice.MonitorConnection_MonitorConnectionsC

func (c *clientFilter) Recv() (*networkservice.ConnectionEvent, error) {
for {
if c == nil || c.MonitorConnection_MonitorConnectionsClient == nil {
return nil, nil
}
eventIn, err := c.MonitorConnection_MonitorConnectionsClient.Recv()
if err != nil {
return nil, errors.Wrap(err, "MonitorConnections client failed to receive an event")
Expand Down
47 changes: 29 additions & 18 deletions pkg/networkservice/common/monitor/eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@ import (
"context"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/pkg/errors"
"github.com/networkservicemesh/sdk/pkg/tools/log"
"google.golang.org/grpc"
)

type eventLoop struct {
eventLoopCtx context.Context
conn *networkservice.Connection
eventConsumer EventConsumer
client networkservice.MonitorConnection_MonitorConnectionsClient
cancel func()
cc grpc.ClientConnInterface
}

func newEventLoop(ctx context.Context, ec EventConsumer, cc grpc.ClientConnInterface, conn *networkservice.Connection) (context.CancelFunc, error) {
Expand All @@ -42,37 +43,47 @@ func newEventLoop(ctx context.Context, ec EventConsumer, cc grpc.ClientConnInter
eventLoopCtx, eventLoopCancel := context.WithCancel(ctx)

// Create selector to only ask for events related to our Connection
cev := &eventLoop{
eventLoopCtx: eventLoopCtx,
conn: conn,
eventConsumer: ec,
cc: cc,
cancel: eventLoopCancel,
}

// Start the eventLoop
go cev.eventLoop()
return eventLoopCancel, nil
}

func (cev *eventLoop) eventLoop() {
selector := &networkservice.MonitorScopeSelector{
PathSegments: []*networkservice.PathSegment{
{
Id: conn.GetCurrentPathSegment().GetId(),
Name: conn.GetCurrentPathSegment().GetName(),
Id: cev.conn.GetCurrentPathSegment().GetId(),
Name: cev.conn.GetCurrentPathSegment().GetName(),
},
},
}

client, err := networkservice.NewMonitorConnectionClient(cc).MonitorConnections(eventLoopCtx, selector)
client, err := networkservice.NewMonitorConnectionClient(cev.cc).MonitorConnections(cev.eventLoopCtx, selector)
if err != nil {
eventLoopCancel()
return nil, errors.Wrap(err, "failed to get a MonitorConnections client")
log.FromContext(cev.eventLoopCtx).Infof("failed to get a MonitorConnections client: %s", err.Error())
cev.cancel()
return
}

cev := &eventLoop{
eventLoopCtx: eventLoopCtx,
conn: conn,
eventConsumer: ec,
client: newClientFilter(client, conn),
if client == nil {
log.FromContext(cev.eventLoopCtx).Infof("failed to get a MonitorConnections client: client is nil")
cev.cancel()
return
}

// Start the eventLoop
go cev.eventLoop()
return eventLoopCancel, nil
}
filter := newClientFilter(client, cev.conn)

func (cev *eventLoop) eventLoop() {
// So we have a client, and can receive events
for {
eventIn, err := cev.client.Recv()
eventIn, err := filter.Recv()
if cev.eventLoopCtx.Err() != nil {
return
}
Expand Down
34 changes: 19 additions & 15 deletions pkg/networkservice/common/monitor/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ import (
"context"

"github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/clientconn"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata"
"github.com/networkservicemesh/sdk/pkg/tools/log"
"github.com/networkservicemesh/sdk/pkg/tools/postpone"

"github.com/networkservicemesh/api/pkg/api/networkservice"

Expand Down Expand Up @@ -55,7 +59,7 @@ func NewServer(chainCtx context.Context, monitorServerPtr *networkservice.Monito
}

func (m *monitorServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
// closeCtxFunc := postpone.ContextWithValues(ctx)
closeCtxFunc := postpone.ContextWithValues(ctx)
// Cancel any existing eventLoop
cancelEventLoop, loaded := loadAndDelete(ctx, metadata.IsClient(m))
if loaded {
Expand Down Expand Up @@ -83,20 +87,20 @@ func (m *monitorServer) Request(ctx context.Context, request *networkservice.Net

// If we have a clientconn ... we must be part of a passthrough server, and have a client to pass
// events through from, so start an eventLoop
// cc, ccLoaded := clientconn.Load(ctx)
// log.FromContext(ctx).Infof("ccLoaded")
// if ccLoaded {
// log.FromContext(ctx).Infof("newEventLoop")
// cancelEventLoop, eventLoopErr := newEventLoop(m.chainCtx, m.MonitorConnectionServer.(EventConsumer), cc, conn)
// if eventLoopErr != nil {
// closeCtx, closeCancel := closeCtxFunc()
// defer closeCancel()
// _, _ = next.Client(closeCtx).Close(closeCtx, conn)
// return nil, errors.Wrap(eventLoopErr, "unable to monitor")
// }
// log.FromContext(ctx).Infof("STORE")
// store(ctx, metadata.IsClient(m), cancelEventLoop)
// }
cc, ccLoaded := clientconn.Load(ctx)
log.FromContext(ctx).Infof("ccLoaded")
if ccLoaded {
log.FromContext(ctx).Infof("newEventLoop")
cancelEventLoop, eventLoopErr := newEventLoop(m.chainCtx, m.MonitorConnectionServer.(EventConsumer), cc, conn)
if eventLoopErr != nil {
closeCtx, closeCancel := closeCtxFunc()
defer closeCancel()
_, _ = next.Client(closeCtx).Close(closeCtx, conn)
return nil, errors.Wrap(eventLoopErr, "unable to monitor")
}
log.FromContext(ctx).Infof("STORE")
store(ctx, metadata.IsClient(m), cancelEventLoop)
}

return conn, nil
}
Expand Down

0 comments on commit a3d6c41

Please sign in to comment.