Skip to content

Commit

Permalink
Fix begin on refresh
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Glazychev <[email protected]>
  • Loading branch information
glazychev-art committed Oct 11, 2022
1 parent 6d73fda commit 8f6f8ba
Show file tree
Hide file tree
Showing 13 changed files with 231 additions and 30 deletions.
1 change: 1 addition & 0 deletions pkg/networkservice/common/begin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func (b *beginClient) Request(ctx context.Context, request *networkservice.Netwo
conn, err = b.Request(ctx, request, opts...)
return
}
eventFactoryClient.updateContext(ctx)

ctx = withEventFactory(ctx, eventFactoryClient)
request.Connection = mergeConnection(eventFactoryClient.returnedConnection, request.GetConnection(), eventFactoryClient.request.GetConnection())
Expand Down
28 changes: 18 additions & 10 deletions pkg/networkservice/common/begin/event_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,7 @@ func newEventFactoryClient(ctx context.Context, afterClose func(), opts ...grpc.
client: next.Client(ctx),
opts: opts,
}
ctxFunc := postpone.ContextWithValues(ctx)
f.ctxFunc = func() (context.Context, context.CancelFunc) {
eventCtx, cancel := ctxFunc()
return withEventFactory(eventCtx, f), cancel
}
f.updateContext(ctx)

f.afterCloseFunc = func() {
f.state = closed
Expand All @@ -75,6 +71,14 @@ func newEventFactoryClient(ctx context.Context, afterClose func(), opts ...grpc.
return f
}

func (f *eventFactoryClient) updateContext(ctx context.Context) {
ctxFunc := postpone.ContextWithValues(ctx)
f.ctxFunc = func() (context.Context, context.CancelFunc) {
eventCtx, cancel := ctxFunc()
return withEventFactory(eventCtx, f), cancel
}
}

func (f *eventFactoryClient) Request(opts ...Option) <-chan error {
o := &option{
cancelCtx: context.Background(),
Expand Down Expand Up @@ -155,11 +159,7 @@ func newEventFactoryServer(ctx context.Context, afterClose func()) *eventFactory
f := &eventFactoryServer{
server: next.Server(ctx),
}
ctxFunc := postpone.ContextWithValues(ctx)
f.ctxFunc = func() (context.Context, context.CancelFunc) {
eventCtx, cancel := ctxFunc()
return withEventFactory(eventCtx, f), cancel
}
f.updateContext(ctx)

f.afterCloseFunc = func() {
f.state = closed
Expand All @@ -168,6 +168,14 @@ func newEventFactoryServer(ctx context.Context, afterClose func()) *eventFactory
return f
}

func (f *eventFactoryServer) updateContext(ctx context.Context) {
ctxFunc := postpone.ContextWithValues(ctx)
f.ctxFunc = func() (context.Context, context.CancelFunc) {
eventCtx, cancel := ctxFunc()
return withEventFactory(eventCtx, f), cancel
}
}

func (f *eventFactoryServer) Request(opts ...Option) <-chan error {
o := &option{
cancelCtx: context.Background(),
Expand Down
42 changes: 42 additions & 0 deletions pkg/networkservice/common/begin/event_factory_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,48 @@ import (
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
)

// This test reproduces the situation when refresh changes the eventFactory context
// nolint:dupl
func TestRefresh_Client(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

syncChan := make(chan struct{})
checkCtxCl := &checkContextClient{t: t}
eventFactoryCl := &eventFactoryClient{ch: syncChan}
client := chain.NewNetworkServiceClient(
begin.NewClient(),
checkCtxCl,
eventFactoryCl,
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Set any value to context
ctx = context.WithValue(ctx, contextKey{}, "value_1")
checkCtxCl.setExpectedValue("value_1")

// Do Request with this context
request := testRequest("1")
conn, err := client.Request(ctx, request.Clone())
assert.NotNil(t, t, conn)
assert.NoError(t, err)

// Change context value before refresh Request
ctx = context.WithValue(ctx, contextKey{}, "value_2")
checkCtxCl.setExpectedValue("value_2")
request.Connection = conn.Clone()

// Call refresh
conn, err = client.Request(ctx, request.Clone())
assert.NotNil(t, t, conn)
assert.NoError(t, err)

// Call refresh from eventFactory. We are expecting updated value in the context
eventFactoryCl.callRefresh()
<-syncChan
}

// This test reproduces the situation when Close and Request were called at the same time
// nolint:dupl
func TestRefreshDuringClose_Client(t *testing.T) {
Expand Down
42 changes: 42 additions & 0 deletions pkg/networkservice/common/begin/event_factory_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,48 @@ import (
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
)

// This test reproduces the situation when refresh changes the eventFactory context
// nolint:dupl
func TestRefresh_Server(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

syncChan := make(chan struct{})
checkCtxServ := &checkContextServer{t: t}
eventFactoryServ := &eventFactoryServer{ch: syncChan}
server := chain.NewNetworkServiceServer(
begin.NewServer(),
checkCtxServ,
eventFactoryServ,
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Set any value to context
ctx = context.WithValue(ctx, contextKey{}, "value_1")
checkCtxServ.setExpectedValue("value_1")

// Do Request with this context
request := testRequest("1")
conn, err := server.Request(ctx, request.Clone())
assert.NotNil(t, t, conn)
assert.NoError(t, err)

// Change context value before refresh Request
ctx = context.WithValue(ctx, contextKey{}, "value_2")
checkCtxServ.setExpectedValue("value_2")
request.Connection = conn.Clone()

// Call refresh
conn, err = server.Request(ctx, request.Clone())
assert.NotNil(t, t, conn)
assert.NoError(t, err)

// Call refresh from eventFactory. We are expecting updated value in the context
eventFactoryServ.callRefresh()
<-syncChan
}

// This test reproduces the situation when Close and Request were called at the same time
// nolint:dupl
func TestRefreshDuringClose_Server(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/networkservice/common/begin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func (b *beginServer) Request(ctx context.Context, request *networkservice.Netwo
conn, err = b.Request(ctx, request)
return
}
eventFactoryServer.updateContext(ctx)

ctx = withEventFactory(ctx, eventFactoryServer)
conn, err = next.Server(ctx).Request(ctx, request)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/registry/common/begin/ns_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (b *beginNSClient) Register(ctx context.Context, in *registry.NetworkServic
resp, err = b.Register(ctx, in, opts...)
return
}
eventFactoryClient.updateContext(ctx)

ctx = withEventFactory(ctx, eventFactoryClient)
resp, err = next.NetworkServiceRegistryClient(ctx).Register(ctx, in, opts...)
Expand Down
28 changes: 18 additions & 10 deletions pkg/registry/common/begin/ns_event_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,7 @@ func newEventNSFactoryClient(ctx context.Context, afterClose func(), opts ...grp
client: next.NetworkServiceRegistryClient(ctx),
opts: opts,
}
ctxFunc := postpone.ContextWithValues(ctx)
f.ctxFunc = func() (context.Context, context.CancelFunc) {
eventCtx, cancel := ctxFunc()
return withEventFactory(eventCtx, f), cancel
}
f.updateContext(ctx)

f.afterCloseFunc = func() {
f.state = closed
Expand All @@ -58,6 +54,14 @@ func newEventNSFactoryClient(ctx context.Context, afterClose func(), opts ...grp
return f
}

func (f *eventNSFactoryClient) updateContext(ctx context.Context) {
ctxFunc := postpone.ContextWithValues(ctx)
f.ctxFunc = func() (context.Context, context.CancelFunc) {
eventCtx, cancel := ctxFunc()
return withEventFactory(eventCtx, f), cancel
}
}

func (f *eventNSFactoryClient) Register(opts ...Option) <-chan error {
o := &option{
cancelCtx: context.Background(),
Expand Down Expand Up @@ -129,11 +133,7 @@ func newNSEventFactoryServer(ctx context.Context, afterClose func()) *eventNSFac
f := &eventNSFactoryServer{
server: next.NetworkServiceRegistryServer(ctx),
}
ctxFunc := postpone.ContextWithValues(ctx)
f.ctxFunc = func() (context.Context, context.CancelFunc) {
eventCtx, cancel := ctxFunc()
return withEventFactory(eventCtx, f), cancel
}
f.updateContext(ctx)

f.afterCloseFunc = func() {
f.state = closed
Expand All @@ -142,6 +142,14 @@ func newNSEventFactoryServer(ctx context.Context, afterClose func()) *eventNSFac
return f
}

func (f *eventNSFactoryServer) updateContext(ctx context.Context) {
ctxFunc := postpone.ContextWithValues(ctx)
f.ctxFunc = func() (context.Context, context.CancelFunc) {
eventCtx, cancel := ctxFunc()
return withEventFactory(eventCtx, f), cancel
}
}

func (f *eventNSFactoryServer) Register(opts ...Option) <-chan error {
o := &option{
cancelCtx: context.Background(),
Expand Down
2 changes: 2 additions & 0 deletions pkg/registry/common/begin/ns_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ func (b *beginNSServer) Register(ctx context.Context, in *registry.NetworkServic
resp, err = b.Register(ctx, in)
return
}
eventFactoryServer.updateContext(ctx)

ctx = withEventFactory(ctx, eventFactoryServer)
resp, err = next.NetworkServiceRegistryServer(ctx).Register(ctx, in)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/registry/common/begin/nse_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (b *beginNSEClient) Register(ctx context.Context, in *registry.NetworkServi
resp, err = b.Register(ctx, in, opts...)
return
}
eventFactoryClient.updateContext(ctx)

ctx = withEventFactory(ctx, eventFactoryClient)
resp, err = next.NetworkServiceEndpointRegistryClient(ctx).Register(ctx, in, opts...)
Expand Down
28 changes: 18 additions & 10 deletions pkg/registry/common/begin/nse_event_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,7 @@ func newEventNSEFactoryClient(ctx context.Context, afterClose func(), opts ...gr
client: next.NetworkServiceEndpointRegistryClient(ctx),
opts: opts,
}
ctxFunc := postpone.ContextWithValues(ctx)
f.ctxFunc = func() (context.Context, context.CancelFunc) {
eventCtx, cancel := ctxFunc()
return withEventFactory(eventCtx, f), cancel
}
f.updateContext(ctx)

f.afterCloseFunc = func() {
f.state = closed
Expand All @@ -58,6 +54,14 @@ func newEventNSEFactoryClient(ctx context.Context, afterClose func(), opts ...gr
return f
}

func (f *eventNSEFactoryClient) updateContext(ctx context.Context) {
ctxFunc := postpone.ContextWithValues(ctx)
f.ctxFunc = func() (context.Context, context.CancelFunc) {
eventCtx, cancel := ctxFunc()
return withEventFactory(eventCtx, f), cancel
}
}

func (f *eventNSEFactoryClient) Register(opts ...Option) <-chan error {
o := &option{
cancelCtx: context.Background(),
Expand Down Expand Up @@ -129,11 +133,7 @@ func newNSEEventFactoryServer(ctx context.Context, afterClose func()) *eventNSEF
f := &eventNSEFactoryServer{
server: next.NetworkServiceEndpointRegistryServer(ctx),
}
ctxFunc := postpone.ContextWithValues(ctx)
f.ctxFunc = func() (context.Context, context.CancelFunc) {
eventCtx, cancel := ctxFunc()
return withEventFactory(eventCtx, f), cancel
}
f.updateContext(ctx)

f.afterCloseFunc = func() {
f.state = closed
Expand All @@ -142,6 +142,14 @@ func newNSEEventFactoryServer(ctx context.Context, afterClose func()) *eventNSEF
return f
}

func (f *eventNSEFactoryServer) updateContext(ctx context.Context) {
ctxFunc := postpone.ContextWithValues(ctx)
f.ctxFunc = func() (context.Context, context.CancelFunc) {
eventCtx, cancel := ctxFunc()
return withEventFactory(eventCtx, f), cancel
}
}

func (f *eventNSEFactoryServer) Register(opts ...Option) <-chan error {
o := &option{
cancelCtx: context.Background(),
Expand Down
42 changes: 42 additions & 0 deletions pkg/registry/common/begin/nse_event_factory_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,48 @@ import (
"google.golang.org/grpc"
)

// This test reproduces the situation when refresh changes the eventFactory context
func TestRefresh_Client(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

syncChan := make(chan struct{})
checkCtxCl := &checkContextClient{t: t}
eventFactoryCl := &eventFactoryClient{ch: syncChan}
client := chain.NewNetworkServiceEndpointRegistryClient(
begin.NewNetworkServiceEndpointRegistryClient(),
checkCtxCl,
eventFactoryCl,
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Set any value to context
ctx = context.WithValue(ctx, contextKey{}, "value_1")
checkCtxCl.setExpectedValue("value_1")

// Do Register with this context
nse := &registry.NetworkServiceEndpoint{
Name: "1",
}
conn, err := client.Register(ctx, nse.Clone())
assert.NotNil(t, t, conn)
assert.NoError(t, err)

// Change context value before refresh
ctx = context.WithValue(ctx, contextKey{}, "value_2")
checkCtxCl.setExpectedValue("value_2")

// Call refresh
conn, err = client.Register(ctx, nse.Clone())
assert.NotNil(t, t, conn)
assert.NoError(t, err)

// Call refresh from eventFactory. We are expecting updated value in the context
eventFactoryCl.callRefresh()
<-syncChan
}

// This test reproduces the situation when Unregister and Register were called at the same time
func TestRefreshDuringUnregister_Client(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })
Expand Down
Loading

0 comments on commit 8f6f8ba

Please sign in to comment.