Skip to content

Commit

Permalink
Add dual stack support. (#1584)
Browse files Browse the repository at this point in the history
* Add dual stack support.

Signed-off-by: Vitaliy Guschin <[email protected]>

* Support client multi channel subscription.

Signed-off-by: Vitaliy Guschin <[email protected]>

* Fix unit tests.

Signed-off-by: Vitaliy Guschin <[email protected]>

* Fix vL3 test.

Signed-off-by: Vitaliy Guschin <[email protected]>

---------

Signed-off-by: Vitaliy Guschin <[email protected]>
Co-authored-by: Vitaliy Guschin <[email protected]>
  • Loading branch information
VitalyGushin and Vitaliy Guschin authored Apr 18, 2024
1 parent 3eafbf6 commit 608b1b5
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 58 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ linters-settings:
threshold: 150
funlen:
Lines: 120
Statements: 50
Statements: 60
goconst:
min-len: 2
min-occurrences: 2
Expand Down
8 changes: 4 additions & 4 deletions pkg/networkservice/chains/nsmgr/vl3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,14 @@ func Test_vl3NSE_ConnectsTo_vl3NSE(t *testing.T) {
}

clientIpam := vl3.NewIPAM("127.0.0.1/32")
nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken, client.WithAdditionalFunctionality(vl3dns.NewClient(net.ParseIP("127.0.0.1"), dnsConfigs), vl3.NewClient(ctx, clientIpam)))
vl3ServerClient := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken, client.WithAdditionalFunctionality(vl3dns.NewClient(net.ParseIP("127.0.0.1"), dnsConfigs), vl3.NewClient(ctx, clientIpam)))

req := defaultRequest(nsReg.Name)
req.Connection.Id = uuid.New().String()

req.Connection.Labels["podName"] = nscName

resp, err := nsc.Request(ctx, req)
resp, err := vl3ServerClient.Request(ctx, req)
require.NoError(t, err)
require.Len(t, resp.GetContext().GetDnsContext().GetConfigs()[0].DnsServerIps, 1)
require.Equal(t, "127.0.0.1", resp.GetContext().GetDnsContext().GetConfigs()[0].DnsServerIps[0])
Expand All @@ -205,14 +205,14 @@ func Test_vl3NSE_ConnectsTo_vl3NSE(t *testing.T) {

requireIPv4Lookup(ctx, t, &resolver, "nsc1.vl3", "1.1.1.1") // we can lookup this ip address only and only if fanout is working

resp, err = nsc.Request(ctx, req)
resp, err = vl3ServerClient.Request(ctx, req)
require.NoError(t, err)

requireIPv4Lookup(ctx, t, &resolver, "nsc.vl3", "127.0.0.1")

requireIPv4Lookup(ctx, t, &resolver, "nsc1.vl3", "1.1.1.1") // we can lookup this ip address only and only if fanout is working

_, err = nsc.Close(ctx, resp)
_, err = vl3ServerClient.Close(ctx, resp)
require.NoError(t, err)

_, err = resolver.LookupIP(ctx, "ip4", "nsc.vl3")
Expand Down
16 changes: 4 additions & 12 deletions pkg/networkservice/connectioncontext/ipcontext/vl3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type vl3Client struct {
// NewClient - returns a new vL3 client instance that manages connection.context.ipcontext for vL3 scenario.
//
// Produces refresh on prefix update.
// Requires begin and metdata chain elements.
// Requires begin and metadata chain elements.
func NewClient(chainContext context.Context, pool *IPAM) networkservice.NetworkServiceClient {
if chainContext == nil {
panic("chainContext can not be nil")
Expand Down Expand Up @@ -97,17 +97,9 @@ func (n *vl3Client) Request(ctx context.Context, request *networkservice.Network

var address, prefix = n.pool.selfAddress().String(), n.pool.selfPrefix().String()

conn.GetContext().GetIpContext().SrcIpAddrs = []string{address}
conn.GetContext().GetIpContext().DstRoutes = []*networkservice.Route{
{
Prefix: address,
NextHop: n.pool.selfAddress().IP.String(),
},
{
Prefix: prefix,
NextHop: n.pool.selfAddress().IP.String(),
},
}
addAddr(&conn.GetContext().GetIpContext().SrcIpAddrs, address)
addRoute(&conn.GetContext().GetIpContext().DstRoutes, address, n.pool.selfAddress().IP.String())
addRoute(&conn.GetContext().GetIpContext().DstRoutes, prefix, n.pool.selfAddress().IP.String())

return next.Client(ctx).Request(ctx, request, opts...)
}
Expand Down
93 changes: 90 additions & 3 deletions pkg/networkservice/connectioncontext/ipcontext/vl3/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/goleak"

"github.com/networkservicemesh/sdk/pkg/ipam/strictvl3ipam"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/begin"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/excludedprefixes"
"github.com/networkservicemesh/sdk/pkg/networkservice/connectioncontext/ipcontext/vl3"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/chain"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata"
)
Expand Down Expand Up @@ -133,6 +135,91 @@ func Test_VL3NSE_ConnectsToVl3NSE(t *testing.T) {
require.Equal(t, "10.0.1.0/24", resp.GetContext().GetIpContext().GetDstRoutes()[1].GetPrefix())
}

func Test_VL3NSE_ConnectsToVl3NSE_DualStack(t *testing.T) {
t.Cleanup(func() {
goleak.VerifyNone(t)
})

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

var ipams []*vl3.IPAM
ipam1 := vl3.NewIPAM("10.0.0.1/24")
ipams = append(ipams, ipam1)
ipam2 := vl3.NewIPAM("2001:db8::/112")
ipams = append(ipams, ipam2)

var clients []networkservice.NetworkServiceClient
for _, ipam := range ipams {
clients = append(clients, vl3.NewClient(ctx, ipam))
}

var server = next.NewNetworkServiceServer(
adapters.NewClientToServer(
next.NewNetworkServiceClient(
begin.NewClient(),
metadata.NewClient(),
chain.NewNetworkServiceClient(clients...),
),
),
metadata.NewServer(),
strictvl3ipam.NewServer(ctx, vl3.NewServer, ipams...),
)

resp, err := server.Request(ctx, &networkservice.NetworkServiceRequest{Connection: &networkservice.Connection{Id: t.Name()}})

require.NoError(t, err)

require.Equal(t, "10.0.0.0/32", resp.GetContext().GetIpContext().GetSrcIpAddrs()[0])
require.Equal(t, "2001:db8::/128", resp.GetContext().GetIpContext().GetSrcIpAddrs()[1])
require.Equal(t, "10.0.0.1/32", resp.GetContext().GetIpContext().GetSrcIpAddrs()[2])
require.Equal(t, "2001:db8::1/128", resp.GetContext().GetIpContext().GetSrcIpAddrs()[3])

require.Equal(t, "10.0.0.0/32", resp.GetContext().GetIpContext().GetDstIpAddrs()[0])
require.Equal(t, "2001:db8::/128", resp.GetContext().GetIpContext().GetDstIpAddrs()[1])

require.Equal(t, "10.0.0.0/32", resp.GetContext().GetIpContext().GetSrcRoutes()[0].GetPrefix())
require.Equal(t, "10.0.0.0/24", resp.GetContext().GetIpContext().GetSrcRoutes()[1].GetPrefix())
require.Equal(t, "10.0.0.0/16", resp.GetContext().GetIpContext().GetSrcRoutes()[5].GetPrefix())
require.Equal(t, "2001:db8::/128", resp.GetContext().GetIpContext().GetSrcRoutes()[2].GetPrefix())
require.Equal(t, "2001:db8::/112", resp.GetContext().GetIpContext().GetSrcRoutes()[3].GetPrefix())
require.Equal(t, "2001:db8::/64", resp.GetContext().GetIpContext().GetSrcRoutes()[4].GetPrefix())

require.Equal(t, "10.0.0.0/32", resp.GetContext().GetIpContext().GetDstRoutes()[0].GetPrefix())
require.Equal(t, "10.0.0.0/24", resp.GetContext().GetIpContext().GetDstRoutes()[1].GetPrefix())
require.Equal(t, "2001:db8::/128", resp.GetContext().GetIpContext().GetDstRoutes()[2].GetPrefix())
require.Equal(t, "2001:db8::/112", resp.GetContext().GetIpContext().GetDstRoutes()[3].GetPrefix())
require.Equal(t, "10.0.0.1/32", resp.GetContext().GetIpContext().GetDstRoutes()[4].GetPrefix())
require.Equal(t, "2001:db8::1/128", resp.GetContext().GetIpContext().GetDstRoutes()[5].GetPrefix())

// refresh
resp, err = server.Request(ctx, &networkservice.NetworkServiceRequest{Connection: resp})

require.NoError(t, err)

require.Equal(t, "10.0.0.0/32", resp.GetContext().GetIpContext().GetSrcIpAddrs()[0])
require.Equal(t, "2001:db8::/128", resp.GetContext().GetIpContext().GetSrcIpAddrs()[1])
require.Equal(t, "10.0.0.1/32", resp.GetContext().GetIpContext().GetSrcIpAddrs()[2])
require.Equal(t, "2001:db8::1/128", resp.GetContext().GetIpContext().GetSrcIpAddrs()[3])

require.Equal(t, "10.0.0.0/32", resp.GetContext().GetIpContext().GetDstIpAddrs()[0])
require.Equal(t, "2001:db8::/128", resp.GetContext().GetIpContext().GetDstIpAddrs()[1])

require.Equal(t, "10.0.0.0/32", resp.GetContext().GetIpContext().GetSrcRoutes()[0].GetPrefix())
require.Equal(t, "10.0.0.0/24", resp.GetContext().GetIpContext().GetSrcRoutes()[1].GetPrefix())
require.Equal(t, "10.0.0.0/16", resp.GetContext().GetIpContext().GetSrcRoutes()[5].GetPrefix())
require.Equal(t, "2001:db8::/128", resp.GetContext().GetIpContext().GetSrcRoutes()[2].GetPrefix())
require.Equal(t, "2001:db8::/112", resp.GetContext().GetIpContext().GetSrcRoutes()[3].GetPrefix())
require.Equal(t, "2001:db8::/64", resp.GetContext().GetIpContext().GetSrcRoutes()[4].GetPrefix())

require.Equal(t, "10.0.0.0/32", resp.GetContext().GetIpContext().GetDstRoutes()[0].GetPrefix())
require.Equal(t, "10.0.0.0/24", resp.GetContext().GetIpContext().GetDstRoutes()[1].GetPrefix())
require.Equal(t, "2001:db8::/128", resp.GetContext().GetIpContext().GetDstRoutes()[2].GetPrefix())
require.Equal(t, "2001:db8::/112", resp.GetContext().GetIpContext().GetDstRoutes()[3].GetPrefix())
require.Equal(t, "10.0.0.1/32", resp.GetContext().GetIpContext().GetDstRoutes()[4].GetPrefix())
require.Equal(t, "2001:db8::1/128", resp.GetContext().GetIpContext().GetDstRoutes()[5].GetPrefix())
}

func Test_VL3NSE_ConnectsToVl3NSE_ChangePrefix(t *testing.T) {
t.Cleanup(func() {
goleak.VerifyNone(t)
Expand Down Expand Up @@ -178,14 +265,14 @@ func Test_VL3NSE_ConnectsToVl3NSE_ChangePrefix(t *testing.T) {

require.NoError(t, err)

require.Equal(t, "10.0.5.0/32", resp.GetContext().GetIpContext().GetSrcIpAddrs()[0])
require.Equal(t, "10.0.5.0/32", resp.GetContext().GetIpContext().GetSrcIpAddrs()[2])
require.Equal(t, "10.0.0.0/32", resp.GetContext().GetIpContext().GetDstIpAddrs()[0])

require.Equal(t, "10.0.0.0/32", resp.GetContext().GetIpContext().GetSrcRoutes()[0].GetPrefix())
require.Equal(t, "10.0.0.0/24", resp.GetContext().GetIpContext().GetSrcRoutes()[1].GetPrefix())
require.Equal(t, "10.0.0.0/16", resp.GetContext().GetIpContext().GetSrcRoutes()[2].GetPrefix())
require.Equal(t, "10.0.5.0/32", resp.GetContext().GetIpContext().GetDstRoutes()[0].GetPrefix())
require.Equal(t, "10.0.5.0/24", resp.GetContext().GetIpContext().GetDstRoutes()[1].GetPrefix())
require.Equal(t, "10.0.5.0/32", resp.GetContext().GetIpContext().GetDstRoutes()[3].GetPrefix())
require.Equal(t, "10.0.5.0/24", resp.GetContext().GetIpContext().GetDstRoutes()[4].GetPrefix())
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/networkservice/connectioncontext/ipcontext/vl3/ipam.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (p *IPAM) isExcluded(ipNet string) bool {
}

// Reset resets IPAM's ippol by setting new prefix
func (p *IPAM) Reset(prefix string, excludePrefies ...string) error {
func (p *IPAM) Reset(prefix string, excludePrefixes ...string) error {
p.Lock()
defer p.Unlock()

Expand All @@ -172,7 +172,7 @@ func (p *IPAM) Reset(prefix string, excludePrefies ...string) error {
p.excludedPrefixes[selfAddress.String()] = struct{}{}
p.ipPool.Exclude(selfAddress)

for _, excludePrefix := range excludePrefies {
for _, excludePrefix := range excludePrefixes {
p.ipPool.ExcludeString(excludePrefix)
p.excludedPrefixes[excludePrefix] = struct{}{}
}
Expand Down
15 changes: 0 additions & 15 deletions pkg/networkservice/connectioncontext/ipcontext/vl3/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,6 @@ import (
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata"
)

type addressKey struct{}

func loadAddress(ctx context.Context) (string, bool) {
v, ok := metadata.Map(ctx, false).Load(addressKey{})
if ok {
return v.(string), true
}

return "", false
}

func storeAddress(ctx context.Context, address string) {
metadata.Map(ctx, false).Store(addressKey{}, address)
}

type cancelKey struct{}

func storeCancel(ctx context.Context, cancel context.CancelFunc) {
Expand Down
93 changes: 74 additions & 19 deletions pkg/networkservice/connectioncontext/ipcontext/vl3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,23 @@ package vl3

import (
"context"
"net"

"github.com/pkg/errors"

"github.com/golang/protobuf/ptypes/empty"
"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/edwarnicke/genericsync"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/ippool"
"github.com/networkservicemesh/sdk/pkg/tools/log"
)

type vl3Server struct {
pool *IPAM
pool *IPAM
subnetMap genericsync.Map[string, string] // Map connectionId:subnet
}

// NewServer - returns a new vL3 server instance that manages connection.context.ipcontext for vL3 scenario.
Expand All @@ -54,26 +60,28 @@ func (v *vl3Server) Request(ctx context.Context, request *networkservice.Network
conn.GetContext().IpContext = new(networkservice.IPContext)
}

var ipContext = &networkservice.IPContext{
SrcIpAddrs: request.GetConnection().Context.GetIpContext().GetSrcIpAddrs(),
DstRoutes: request.GetConnection().Context.GetIpContext().GetDstRoutes(),
ExcludedPrefixes: request.GetConnection().Context.GetIpContext().GetExcludedPrefixes(),
}

shouldAllocate := len(ipContext.SrcIpAddrs) == 0

if prevAddress, ok := loadAddress(ctx); ok && !shouldAllocate {
shouldAllocate = !v.pool.isExcluded(prevAddress)
}

if shouldAllocate {
var ipContext = conn.GetContext().GetIpContext()

if prevAddress, ok := v.subnetMap.Load(conn.GetId()); ok {
// Remove previous prefix from IP Context if a current server prefix has changed
if v.pool.globalIPNet().String() != prevAddress {
srcNet, err := v.pool.allocate()
log.FromContext(ctx).Infof("Server Request. Allocated net: %+v for connection: %+v", srcNet.String(), conn.GetId())
if err != nil {
return nil, err
}
removePreviousPrefixFromIPContext(ipContext, prevAddress)
ipContext.SrcIpAddrs = append(ipContext.SrcIpAddrs, srcNet.String())
v.subnetMap.Store(conn.GetId(), v.pool.globalIPNet().String())
}
} else if len(request.GetConnection().GetContext().GetDnsContext().GetConfigs()) == 0 || countTheSameVersionSrcIps(request, len(v.pool.self.IP)) == 0 { // TODO Consider a better option to determine vL3NSE-to-vL3NSE server case
srcNet, err := v.pool.allocate()
log.FromContext(ctx).Infof("Server Request. Allocated initial net: %+v for connection: %+v", srcNet.String(), conn.GetId())
if err != nil {
return nil, err
}
ipContext.DstRoutes = nil
ipContext.SrcIpAddrs = append([]string(nil), srcNet.String())
storeAddress(ctx, srcNet.String())
ipContext.SrcIpAddrs = append(ipContext.SrcIpAddrs, srcNet.String())
v.subnetMap.Store(conn.GetId(), v.pool.globalIPNet().String())
}

addRoute(&ipContext.SrcRoutes, v.pool.selfAddress().String(), v.pool.selfAddress().IP.String())
Expand All @@ -83,8 +91,6 @@ func (v *vl3Server) Request(ctx context.Context, request *networkservice.Network
}
addAddr(&ipContext.DstIpAddrs, v.pool.selfAddress().String())

conn.GetContext().IpContext = ipContext

resp, err := next.Server(ctx).Request(ctx, request)
if err == nil {
addRoute(&resp.GetContext().GetIpContext().SrcRoutes, v.pool.globalIPNet().String(), v.pool.selfAddress().IP.String())
Expand All @@ -96,6 +102,7 @@ func (v *vl3Server) Close(ctx context.Context, conn *networkservice.Connection)
for _, srcAddr := range conn.GetContext().GetIpContext().GetSrcIpAddrs() {
v.pool.freeIfAllocated(srcAddr)
}
v.subnetMap.Delete(conn.GetId())
return next.Server(ctx).Close(ctx, conn)
}

Expand All @@ -119,3 +126,51 @@ func addAddr(addrs *[]string, addr string) {
}
*addrs = append(*addrs, addr)
}

func removePreviousPrefixFromIPContext(ipContext *networkservice.IPContext, prevAddress string) {
prevIPPool := ippool.NewWithNetString(prevAddress)

var srcIPAddrs []string
for _, ip := range ipContext.SrcIpAddrs {
if !prevIPPool.ContainsNetString(ip) {
srcIPAddrs = append(srcIPAddrs, ip)
}
}
ipContext.SrcIpAddrs = srcIPAddrs

var dstIPAddrs []string
for _, ip := range ipContext.DstIpAddrs {
if !prevIPPool.ContainsNetString(ip) {
dstIPAddrs = append(dstIPAddrs, ip)
}
}
ipContext.DstIpAddrs = dstIPAddrs

var srcRoutes []*networkservice.Route
for _, r := range ipContext.SrcRoutes {
if !prevIPPool.ContainsNetString(r.Prefix) {
srcRoutes = append(srcRoutes, r)
}
}
ipContext.SrcRoutes = srcRoutes

var dstRoutes []*networkservice.Route
for _, r := range ipContext.DstRoutes {
if !prevIPPool.ContainsNetString(r.Prefix) {
dstRoutes = append(dstRoutes, r)
}
}
ipContext.DstRoutes = dstRoutes
}

func countTheSameVersionSrcIps(request *networkservice.NetworkServiceRequest, selfIPLen int) int {
srcAddrs := request.GetConnection().GetContext().GetIpContext().GetSrcIpAddrs()
count := 0
for _, ip := range srcAddrs {
_, ipNet, err := net.ParseCIDR(ip)
if err == nil && len(ipNet.IP) == selfIPLen {
count++
}
}
return count
}
Loading

0 comments on commit 608b1b5

Please sign in to comment.