Skip to content

Commit

Permalink
Registry refresh client should be able to set a default expiration ti…
Browse files Browse the repository at this point in the history
…me for NSE (networkservicemesh#350)

* refresh client should be able to set default expiration time for NSE

Signed-off-by: denis-tingajkin <[email protected]>

* rename nses map

Signed-off-by: denis-tingajkin <[email protected]>

* replace time.After to require.Eventually to proper handle async cases in tests

Signed-off-by: denis-tingajkin <[email protected]>
  • Loading branch information
denis-tingaikin authored Jun 29, 2020
1 parent 0d403ad commit 565dea8
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 42 deletions.
31 changes: 17 additions & 14 deletions pkg/registry/common/expire/ns_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,15 @@ func TestNewNetworkServiceRegistryServer(t *testing.T) {
require.Nil(t, err)
list := registry.ReadNetworkServiceList(stream)
require.NotEmpty(t, list)
<-time.After(testPeriod * 3)
stream, err = nsClient.Find(context.Background(), &registry.NetworkServiceQuery{
NetworkService: &registry.NetworkService{},
})
require.Nil(t, err)
list = registry.ReadNetworkServiceList(stream)
require.Empty(t, list)

require.Eventually(t, func() bool {
stream, err = nsClient.Find(context.Background(), &registry.NetworkServiceQuery{
NetworkService: &registry.NetworkService{},
})
require.Nil(t, err)
list = registry.ReadNetworkServiceList(stream)
return len(list) == 0
}, time.Second, time.Millisecond*100)
}

func TestNewNetworkServiceRegistryServer_NSEUnregister(t *testing.T) {
Expand Down Expand Up @@ -103,11 +105,12 @@ func TestNewNetworkServiceRegistryServer_NSEUnregister(t *testing.T) {
NetworkServiceNames: []string{"IP terminator"},
})
require.Nil(t, err)
<-time.After(testPeriod * 3)
stream, err = nsClient.Find(context.Background(), &registry.NetworkServiceQuery{
NetworkService: &registry.NetworkService{},
})
require.Nil(t, err)
list = registry.ReadNetworkServiceList(stream)
require.Empty(t, list)
require.Eventually(t, func() bool {
stream, err = nsClient.Find(context.Background(), &registry.NetworkServiceQuery{
NetworkService: &registry.NetworkService{},
})
require.Nil(t, err)
list = registry.ReadNetworkServiceList(stream)
return len(list) == 0
}, time.Second, time.Millisecond*100)
}
15 changes: 8 additions & 7 deletions pkg/registry/common/expire/nse_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@ func TestNewNetworkServiceEndpointRegistryServer(t *testing.T) {
require.Nil(t, err)
list := registry.ReadNetworkServiceEndpointList(stream)
require.NotEmpty(t, list)
<-time.After(testPeriod * 3)
stream, err = c.Find(context.Background(), &registry.NetworkServiceEndpointQuery{
NetworkServiceEndpoint: &registry.NetworkServiceEndpoint{},
})
require.Nil(t, err)
list = registry.ReadNetworkServiceEndpointList(stream)
require.Empty(t, list)
require.Eventually(t, func() bool {
stream, err = c.Find(context.Background(), &registry.NetworkServiceEndpointQuery{
NetworkServiceEndpoint: &registry.NetworkServiceEndpoint{},
})
require.Nil(t, err)
list = registry.ReadNetworkServiceEndpointList(stream)
return len(list) == 0
}, time.Second, time.Millisecond*100)
}
38 changes: 28 additions & 10 deletions pkg/registry/common/refresh/nse_registry_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"sync"
"time"

"github.com/golang/protobuf/ptypes/timestamp"

"github.com/golang/protobuf/ptypes/empty"
"github.com/networkservicemesh/api/pkg/api/registry"
"google.golang.org/grpc"
Expand All @@ -29,10 +31,15 @@ import (
)

type refreshNSEClient struct {
client registry.NetworkServiceEndpointRegistryClient
nsesMutex sync.Mutex
nses map[string]func()
retryDelay time.Duration
client registry.NetworkServiceEndpointRegistryClient
nsesMutex sync.Mutex
nseCancels map[string]context.CancelFunc
retryDelay time.Duration
defaultExpirationDuration time.Duration
}

func (c *refreshNSEClient) setDefaultExpiration(duration time.Duration) {
c.defaultExpirationDuration = duration
}

func (c *refreshNSEClient) setRetryPeriod(p time.Duration) {
Expand Down Expand Up @@ -70,8 +77,18 @@ func (c *refreshNSEClient) Register(ctx context.Context, in *registry.NetworkSer
}
c.nsesMutex.Lock()
defer c.nsesMutex.Unlock()
if resp.ExpirationTime == nil {
expirationTime := time.Now().Add(c.defaultExpirationDuration)
resp.ExpirationTime = &timestamp.Timestamp{
Seconds: expirationTime.Unix(),
Nanos: int32(expirationTime.Nanosecond()),
}
}
if v, ok := c.nseCancels[resp.Name]; ok {
v()
}
ctx, cancel := context.WithCancel(context.Background())
c.nses[resp.Name] = cancel
c.nseCancels[resp.Name] = cancel
c.startRefresh(ctx, resp)
return resp, err
}
Expand All @@ -87,20 +104,21 @@ func (c *refreshNSEClient) Unregister(ctx context.Context, in *registry.NetworkS
}
c.nsesMutex.Lock()
defer c.nsesMutex.Unlock()
cancel, ok := c.nses[in.Name]
cancel, ok := c.nseCancels[in.Name]
if ok {
cancel()
delete(c.nses, in.Name)
delete(c.nseCancels, in.Name)
}
return resp, nil
}

// NewNetworkServiceEndpointRegistryClient creates new NetworkServiceEndpointRegistryClient that will refresh expiration time for registered NSEs
func NewNetworkServiceEndpointRegistryClient(client registry.NetworkServiceEndpointRegistryClient, options ...Option) registry.NetworkServiceEndpointRegistryClient {
c := &refreshNSEClient{
client: client,
nses: map[string]func(){},
retryDelay: time.Second * 5,
client: client,
nseCancels: map[string]context.CancelFunc{},
retryDelay: time.Second * 5,
defaultExpirationDuration: time.Minute * 30,
}

for _, o := range options {
Expand Down
46 changes: 35 additions & 11 deletions pkg/registry/common/refresh/nse_registry_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

"github.com/golang/protobuf/ptypes/empty"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/networkservicemesh/api/pkg/api/registry"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
Expand All @@ -32,6 +31,8 @@ import (
"github.com/networkservicemesh/sdk/pkg/registry/common/refresh"
)

const testExpirationDuration = time.Millisecond * 100

type testNSEClient struct {
sync.Mutex
requestCount int
Expand All @@ -55,20 +56,43 @@ func (t *testNSEClient) Unregister(ctx context.Context, in *registry.NetworkServ
func TestNewNetworkServiceEndpointRegistryClient(t *testing.T) {
defer goleak.VerifyNone(t)
testClient := testNSEClient{}
refreshClient := refresh.NewNetworkServiceEndpointRegistryClient(&testClient, refresh.WithRetryPeriod(time.Millisecond*100))
expirationTime := time.Now().Add(time.Millisecond * 100)
refreshClient := refresh.NewNetworkServiceEndpointRegistryClient(&testClient,
refresh.WithRetryPeriod(time.Millisecond*100),
refresh.WithDefaultExpiration(testExpirationDuration),
)
_, err := refreshClient.Register(context.Background(), &registry.NetworkServiceEndpoint{
Name: "nse-1",
})
require.Nil(t, err)
require.Eventually(t, func() bool {
testClient.Lock()
defer testClient.Unlock()
return testClient.requestCount == 1
}, testExpirationDuration*2, testExpirationDuration/4)
_, err = refreshClient.Unregister(context.Background(), &registry.NetworkServiceEndpoint{Name: "nse-1"})
require.Nil(t, err)
}

func TestNewNetworkServiceEndpointRegistryClient_CalledRegisterTwice(t *testing.T) {
defer goleak.VerifyNone(t)
testClient := testNSEClient{}
refreshClient := refresh.NewNetworkServiceEndpointRegistryClient(&testClient,
refresh.WithRetryPeriod(time.Millisecond*100),
refresh.WithDefaultExpiration(time.Millisecond*100),
)
_, err := refreshClient.Register(context.Background(), &registry.NetworkServiceEndpoint{
Name: "nse-1",
ExpirationTime: &timestamp.Timestamp{
Seconds: expirationTime.Unix(),
Nanos: int32(expirationTime.Nanosecond()),
},
})
require.Nil(t, err)
<-time.After(time.Millisecond * 100)
_, err = refreshClient.Register(context.Background(), &registry.NetworkServiceEndpoint{
Name: "nse-1",
})
require.Nil(t, err)
require.Eventually(t, func() bool {
testClient.Lock()
defer testClient.Unlock()
return testClient.requestCount == 1
}, testExpirationDuration*2, testExpirationDuration/4)
_, err = refreshClient.Unregister(context.Background(), &registry.NetworkServiceEndpoint{Name: "nse-1"})
require.Nil(t, err)
testClient.Lock()
defer testClient.Unlock()
require.Equal(t, 1, testClient.requestCount)
}
8 changes: 8 additions & 0 deletions pkg/registry/common/refresh/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import "time"

type configurable interface {
setRetryPeriod(time.Duration)
setDefaultExpiration(duration time.Duration)
}

// Option is expire registry configuration option
Expand All @@ -39,3 +40,10 @@ func WithRetryPeriod(duration time.Duration) Option {
c.setRetryPeriod(duration)
})
}

// WithDefaultExpiration sets a default expiration to NSE if expiration was not set
func WithDefaultExpiration(duration time.Duration) Option {
return applierFunc(func(c configurable) {
c.setDefaultExpiration(duration)
})
}

0 comments on commit 565dea8

Please sign in to comment.