Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix nse/ns Find(watch=true) corner cases on CI #533

Merged
merged 3 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 18 additions & 20 deletions pkg/registry/etcd/ns_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import (
"container/list"
"context"
"io"
"sync"
"time"

"github.com/edwarnicke/serialize"
"github.com/golang/protobuf/ptypes/empty"

"github.com/pkg/errors"
Expand All @@ -46,8 +46,8 @@ type etcdNSRegistryServer struct {
client versioned.Interface
ns string

subscribers *list.List
subscribersMutex sync.Mutex
subscribers *list.List
subscribersExecutor serialize.Executor

updateChannelSize int
}
Expand Down Expand Up @@ -111,22 +111,19 @@ func (n *etcdNSRegistryServer) watchRemoteStorage() {
NetworkService: item,
Deleted: deleted,
}
n.sendEvent(resp)
n.subscribersExecutor.AsyncExec(func() {
n.sendEvent(resp)
})
}
}
watcher.Stop()
}
}

func (n *etcdNSRegistryServer) sendEvent(resp *registry.NetworkServiceResponse) {
n.subscribersMutex.Lock()
for curr := n.subscribers.Front(); curr != nil; curr = curr.Next() {
select {
case curr.Value.(chan *registry.NetworkServiceResponse) <- resp:
default:
}
curr.Value.(chan *registry.NetworkServiceResponse) <- resp
}
n.subscribersMutex.Unlock()
}

func (n *etcdNSRegistryServer) Register(ctx context.Context, request *registry.NetworkService) (*registry.NetworkService, error) {
Expand Down Expand Up @@ -186,7 +183,9 @@ func (n *etcdNSRegistryServer) Find(query *registry.NetworkServiceQuery, s regis
}
}
if query.Watch {
if err := n.watch(s.Context(), query, s); err != nil && !errors.Is(err, io.EOF) {
var watchCtx, cancel = context.WithCancel(s.Context())
defer cancel()
if err := n.watch(watchCtx, query, s); err != nil && !errors.Is(err, io.EOF) {
return err
}
}
Expand All @@ -211,19 +210,18 @@ func (n *etcdNSRegistryServer) Unregister(ctx context.Context, request *registry

func (n *etcdNSRegistryServer) subscribeOnEvents(ctx context.Context) <-chan *registry.NetworkServiceResponse {
var ret = make(chan *registry.NetworkServiceResponse, n.updateChannelSize)
var node *list.Element

n.subscribersMutex.Lock()
var node = n.subscribers.PushBack(ret)
n.subscribersMutex.Unlock()
n.subscribersExecutor.AsyncExec(func() {
node = n.subscribers.PushBack(ret)
})

go func() {
<-ctx.Done()

n.subscribersMutex.Lock()
n.subscribers.Remove(node)
n.subscribersMutex.Unlock()

close(ret)
n.subscribersExecutor.AsyncExec(func() {
n.subscribers.Remove(node)
close(ret)
})
}()

return ret
Expand Down
6 changes: 4 additions & 2 deletions pkg/registry/etcd/ns_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"

"github.com/networkservicemesh/sdk/pkg/registry/core/adapters"

Expand Down Expand Up @@ -199,8 +200,8 @@ func Test_NSHighloadWatch_ShouldNotFail(t *testing.T) {
defer cancel()

const clinetCount = 20
const updateCount int32 = 200

var updateCount = watch.DefaultChanSize
var actual atomic.Int32
var myClientset = fake.NewSimpleClientset()

Expand All @@ -219,8 +220,9 @@ func Test_NSHighloadWatch_ShouldNotFail(t *testing.T) {
NetworkService: &registry.NetworkService{},
Watch: true,
})
ch := registry.ReadNetworkServiceChannel(stream)
startWg.Done()
for range registry.ReadNetworkServiceChannel(stream) {
for range ch {
actual.Add(1)
}
}()
Expand Down
83 changes: 29 additions & 54 deletions pkg/registry/etcd/nse_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"container/list"
"context"
"io"
"sync"
"time"

"github.com/edwarnicke/serialize"
Expand All @@ -46,11 +45,10 @@ type etcdNSERegistryServer struct {
chainContext context.Context
deleteExecutor serialize.Executor
client versioned.Interface
versions sync.Map
ns string

subscribers *list.List
subscribersMutex sync.Mutex
subscribers *list.List
subscribersExecutor serialize.Executor

updateChannelSize int
}
Expand Down Expand Up @@ -114,9 +112,8 @@ func (n *etcdNSERegistryServer) watchRemoteStorage() {
NetworkServiceEndpoint: item,
Deleted: deleted,
}
n.sendEvent(resp)
n.subscribersExecutor.AsyncExec(func() { n.sendEvent(resp) })
if !deleted && item.ExpirationTime != nil && item.ExpirationTime.AsTime().Local().Before(time.Now()) {
n.versions.Delete(item.GetName())
n.deleteExecutor.AsyncExec(func() {
_ = n.client.NetworkservicemeshV1().NetworkServiceEndpoints(n.ns).Delete(n.chainContext, item.GetName(), metav1.DeleteOptions{
Preconditions: &metav1.Preconditions{
Expand All @@ -132,14 +129,9 @@ func (n *etcdNSERegistryServer) watchRemoteStorage() {
}

func (n *etcdNSERegistryServer) sendEvent(resp *registry.NetworkServiceEndpointResponse) {
n.subscribersMutex.Lock()
for curr := n.subscribers.Front(); curr != nil; curr = curr.Next() {
select {
case curr.Value.(chan *registry.NetworkServiceEndpointResponse) <- resp:
default:
}
curr.Value.(chan *registry.NetworkServiceEndpointResponse) <- resp
}
n.subscribersMutex.Unlock()
}

func (n *etcdNSERegistryServer) Register(ctx context.Context, request *registry.NetworkServiceEndpoint) (*registry.NetworkServiceEndpoint, error) {
Expand Down Expand Up @@ -167,20 +159,11 @@ func (n *etcdNSERegistryServer) Register(ctx context.Context, request *registry.
if nse != nil {
nse.Spec = *(*v1.NetworkServiceEndpointSpec)(request)
apiResp, err = n.client.NetworkservicemeshV1().NetworkServiceEndpoints(n.ns).Update(ctx, nse, metav1.UpdateOptions{})
if err != nil {
return nil, errors.Wrapf(err, "failed to update a pod %s in a namespace %s", nse.Name, n.ns)
}

n.versions.Store(apiResp.Spec.Name, apiResp.ResourceVersion)
ctx = withNSEVersion(ctx, apiResp.ResourceVersion)
return next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, request)
}
}
if err != nil {
return nil, err
}

n.versions.Store(apiResp.Spec.Name, apiResp.ResourceVersion)
ctx = withNSEVersion(ctx, apiResp.ResourceVersion)
return next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, request)
}
Expand Down Expand Up @@ -214,7 +197,9 @@ func (n *etcdNSERegistryServer) Find(query *registry.NetworkServiceEndpointQuery
}
}
if query.Watch {
if err := n.watch(s.Context(), query, s); err != nil && !errors.Is(err, io.EOF) {
var watchCtx, cancel = context.WithCancel(s.Context())
defer cancel()
if err := n.watch(watchCtx, query, s); err != nil && !errors.Is(err, io.EOF) {
return err
}
}
Expand All @@ -226,51 +211,41 @@ func (n *etcdNSERegistryServer) Unregister(ctx context.Context, request *registr
if err != nil {
return nil, errors.WithStack(err)
}

if _, ok := nseVersionFromContext(ctx); !ok {
err = n.client.NetworkservicemeshV1().NetworkServiceEndpoints(n.ns).Delete(
ctx,
request.Name,
metav1.DeleteOptions{})
if err != nil {
return nil, errors.Wrapf(err, "failed to delete a NetworkServiceEndpoints %s in a namespace %s", request.Name, n.ns)
}
return resp, nil
var version *string
if v, ok := nseVersionFromContext(ctx); ok {
version = &v
}

if v, ok := n.versions.Load(request.Name); ok {
version := v.(string)
err = n.client.NetworkservicemeshV1().NetworkServiceEndpoints(n.ns).Delete(
ctx,
request.Name,
metav1.DeleteOptions{
Preconditions: &metav1.Preconditions{
ResourceVersion: &version,
},
})
if err != nil {
log.FromContext(ctx).Warnf("failed to delete a NetworkServiceEndpoints %s in a namespace %s, cause: %v", request.Name, n.ns, err.Error())
}
n.versions.Delete(request.GetName())
err = n.client.NetworkservicemeshV1().NetworkServiceEndpoints(n.ns).Delete(
ctx,
request.Name,
metav1.DeleteOptions{
Preconditions: &metav1.Preconditions{
ResourceVersion: version,
},
})
if err != nil {
log.FromContext(ctx).Warnf("failed to delete a NetworkServiceEndpoints %s in a namespace %s, cause: %v", request.Name, n.ns, err.Error())
}

return resp, nil
}

func (n *etcdNSERegistryServer) subscribeOnEvents(ctx context.Context) <-chan *registry.NetworkServiceEndpointResponse {
var ret = make(chan *registry.NetworkServiceEndpointResponse, n.updateChannelSize)
var node *list.Element

n.subscribersMutex.Lock()
var node = n.subscribers.PushBack(ret)
n.subscribersMutex.Unlock()
n.subscribersExecutor.AsyncExec(func() {
node = n.subscribers.PushBack(ret)
})

go func() {
<-ctx.Done()

n.subscribersMutex.Lock()
n.subscribers.Remove(node)
n.subscribersMutex.Unlock()

close(ret)
n.subscribersExecutor.AsyncExec(func() {
n.subscribers.Remove(node)
close(ret)
})
}()

return ret
Expand Down
3 changes: 2 additions & 1 deletion pkg/registry/etcd/nse_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"

"github.com/networkservicemesh/sdk/pkg/registry/core/adapters"

Expand Down Expand Up @@ -197,8 +198,8 @@ func Test_NSEHighloadWatch_ShouldNotFail(t *testing.T) {
defer cancel()

const clinetCount = 20
const updateCount int32 = 200

var updateCount = watch.DefaultChanSize
var actual atomic.Int32
var myClientset = fake.NewSimpleClientset()

Expand Down
Loading