diff --git a/ofnet.go b/ofnet.go index 8a2a7270..aa468a36 100755 --- a/ofnet.go +++ b/ofnet.go @@ -189,9 +189,11 @@ type OfnetEndpoint struct { type OfnetPolicyRule struct { RuleId string // Unique identifier for the rule Priority int // Priority for the rule (1..100. 100 is highest) + SrcVrf string // For policy rules, reqiured to uniquely identify the SrcEndpointGroup SrcEndpointGroup int // Source endpoint group + DstVrf string // For policy rules, required to uniquely identify the DstEndpointGroup DstEndpointGroup int // Destination endpoint group - SrcIpAddr string // source IP addrss and mask + SrcIpAddr string // source IP address and mask DstIpAddr string // Destination IP address and mask IpProtocol uint8 // IP protocol number SrcPort uint16 // Source port diff --git a/ofnetAgent.go b/ofnetAgent.go index d9bcef64..6c3b8f78 100755 --- a/ofnetAgent.go +++ b/ofnetAgent.go @@ -40,6 +40,10 @@ import ( cmap "github.com/streamrail/concurrent-map" ) +// these can be passed to NewOfnetAgent for endpointIPsAreUnique parameter +const OFNET_AGENT_ENDPOINT_IPS_ARE_NOT_UNIQUE_PARAM = false +const OFNET_AGENT_ENDPOINT_IPS_ARE_UNIQUE_PARAM = true + // OfnetAgent state type OfnetAgent struct { ctrler *ofctrl.Controller // Controller instance @@ -55,6 +59,11 @@ type OfnetAgent struct { datapath OfnetDatapath // Configured datapath protopath OfnetProto // Configured protopath + // True if all requests to create endpoints no matter the VRF will have + // unique IPs, which would allow for inferring the VRF based on IP address + // True also allows endpoints in different VRFs to communicate directly + endpointIpsAreUnique bool + masterDb map[string]*OfnetNode // list of Masters masterDbMutex sync.Mutex // Sync mutex for masterDb @@ -147,8 +156,8 @@ const ( // Create a new Ofnet agent and initialize it func NewOfnetAgent(bridgeName string, dpName string, localIp net.IP, rpcPort uint16, - ovsPort uint16, uplinkInfo []string) (*OfnetAgent, error) { - log.Infof("Creating new ofnet agent for %s,%s,%d,%d,%d\n", bridgeName, dpName, localIp, rpcPort, ovsPort) + ovsPort uint16, uplinkInfo []string, endpointIpsAreUnique bool) (*OfnetAgent, error) { + log.Infof("Creating new ofnet agent for %s,%s,%d,%d,%d,%v\n", bridgeName, dpName, localIp, rpcPort, ovsPort, endpointIpsAreUnique) agent := new(OfnetAgent) // Init params @@ -168,6 +177,8 @@ func NewOfnetAgent(bridgeName string, dpName string, localIp net.IP, rpcPort uin agent.vniVlanMap = make(map[uint32]*uint16) agent.vlanVniMap = make(map[uint16]*uint32) + agent.endpointIpsAreUnique = endpointIpsAreUnique + // Initialize vtep database agent.vtepTable = make(map[string]*uint32) @@ -253,6 +264,10 @@ func (self *OfnetAgent) incrErrStats(errName string) { self.stats[errName+"-ERROR"] = currStats } +func (a *OfnetAgent) IsEndpointIpsAreUnique() bool { + return a.endpointIpsAreUnique +} + // getEndpointId Get a unique identifier for the endpoint. func (self *OfnetAgent) getEndpointId(endpoint EndpointInfo) string { self.vlanVrfMutex.RLock() diff --git a/ofnetMaster.go b/ofnetMaster.go index dee44148..050391cc 100755 --- a/ofnetMaster.go +++ b/ofnetMaster.go @@ -229,7 +229,7 @@ func (self *OfnetMaster) UnRegisterNode(hostInfo *OfnetNode, ret *bool) error { // Add an Endpoint func (self *OfnetMaster) EndpointAdd(ep *OfnetEndpoint, ret *bool) error { - log.Infof("Received Endpoint CReate from Remote netplugin") + log.Infof("Received Endpoint Create from Remote netplugin") // Check if we have the endpoint already and which is more recent self.masterMutex.RLock() oldEp := self.endpointDb[ep.EndpointID] diff --git a/ofnetPolicy.go b/ofnetPolicy.go index dc5a83c0..b38b678a 100755 --- a/ofnetPolicy.go +++ b/ofnetPolicy.go @@ -17,6 +17,7 @@ package ofnet import ( "errors" + "fmt" "net" "net/rpc" "reflect" @@ -79,31 +80,56 @@ func (self *PolicyAgent) SwitchDisconnected(sw *ofctrl.OFSwitch) { } // Metadata Format -// 6 3 3 1 1 0 0 -// 3 1 0 6 5 1 0 -// +-------------+-+---------------+---------------+-+ -// | ....U |U| SrcGrp | DstGrp |V| -// +-------------+-+---------------+---------------+-+ +// Source Tenant + Group +// 0x1fff ffff 8000 0000 Destination Tenant + Group +// | 0x7FFF FFFE +// +--------+----------+ | +// | v +--------+---------+ +// v Source Group v v +// Source Tenant 0x7FFF 8000 0000 Destination Tenant Destination Group +// 0x1FFF 8000 0000 0000 | 0x7FFE 0000 0x0001 FFFE +// | | | | +// +-------+--------++---------+---------++--------+-----++-----------+------+ +// | || || || | +// v vv vv vv v +// 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 000V // -// U: Unused -// SrcGrp: Source endpoint group -// DstGrp: Destination endpoint group // V: Received on VTEP Port. Dont flood back to VTEP ports. -// -// DstGroupMetadata returns metadata for dst group -func DstGroupMetadata(groupId int) (uint64, uint64) { - metadata := uint64(groupId) << 1 - metadataMask := uint64(0xfffe) +// returns openflow metadata and mask values for dst group +func DstGroupMetadata(vrfid uint16, groupId int) (uint64, uint64) { + // vrf: shift 16 for src group, 1 for VTEP flag + // group: shift 1 for the VTEP flag + metadata := (uint64(vrfid) << 17) + (uint64(groupId) << 1) + // vrf: + // 14 bits shifted 1 for vtep flag and 16 for group + // format((((1<<14))-1)<<(1+16), 'x') + // 0x7ffe0000 + // group: + // format((((1<<16)-1)<<1), 'x') + // 0x1fffe + metadataMask := uint64(0x7ffffffe) metadata = metadata & metadataMask return metadata, metadataMask } -// SrcGroupMetadata returns metadata for src group -func SrcGroupMetadata(groupId int) (uint64, uint64) { - metadata := uint64(groupId) << 16 - metadataMask := uint64(0x7fff0000) +// returns openflow metadata and mask for src group +func SrcGroupMetadata(vrfid uint16, groupId int) (uint64, uint64) { + // vrf: + // shift 30 for dest vrf+group, 16 for src group, 1 for VTEP flag = 47 + // group: + // shift 30 for the dest vrf+group, 1 for the VTEP flag + metadata := (uint64(vrfid) << 47) + (uint64(groupId) << (30 + 1)) + // vrf: + // 14 bits shifted by 1: vtep flag + 30: dest vrf+group + 16: src group + // format((((1<<14))-1)<<(1+30+16), 'x') + // 0x1FFF800000000000 + // group: + // 16 bits shifted 30 for dest vrf+group plus 1 for vtep flag + // format((((1<<16))-1)<<(30+1), 'x') + // 0x7fff80000000 + metadataMask := uint64(0x1FFFFFFF80000000) metadata = metadata & metadataMask return metadata, metadataMask @@ -139,8 +165,9 @@ func (self *PolicyAgent) AddEndpoint(endpoint *OfnetEndpoint) error { self.agent.vrfMutex.RLock() vrfid := self.agent.vrfNameIdMap[*vrf] self.agent.vrfMutex.RUnlock() - vrfMetadata, vrfMetadataMask := Vrfmetadata(*vrfid) - // Install the Dst group lookup flow + + vrfMetadata, vrfMetadataMask := VrfDestMetadata(*vrfid) + // match destination tenant and IP dstGrpFlow, err := self.dstGrpTable.NewFlow(ofctrl.FlowMatch{ Priority: FLOW_MATCH_PRIORITY, Ethertype: 0x0800, @@ -153,8 +180,8 @@ func (self *PolicyAgent) AddEndpoint(endpoint *OfnetEndpoint) error { return err } - // Format the metadata - metadata, metadataMask := DstGroupMetadata(endpoint.EndpointGroup) + // Format the metadata for the destination group + metadata, metadataMask := DstGroupMetadata(*vrfid, endpoint.EndpointGroup) // Set dst GroupId err = dstGrpFlow.SetMetadata(metadata, metadataMask) @@ -230,7 +257,7 @@ func (self *PolicyAgent) AddIpv6Endpoint(endpoint *OfnetEndpoint) error { vrfid := self.agent.vrfNameIdMap[*vrf] self.agent.vrfMutex.RUnlock() - vrfMetadata, vrfMetadataMask := Vrfmetadata(*vrfid) + vrfMetadata, vrfMetadataMask := VrfDestMetadata(*vrfid) // Install the Dst group lookup flow dstGrpFlow, err := self.dstGrpTable.NewFlow(ofctrl.FlowMatch{ Priority: FLOW_MATCH_PRIORITY, @@ -245,7 +272,7 @@ func (self *PolicyAgent) AddIpv6Endpoint(endpoint *OfnetEndpoint) error { } // Format the metadata - metadata, metadataMask := DstGroupMetadata(endpoint.EndpointGroup) + metadata, metadataMask := DstGroupMetadata(*vrfid, endpoint.EndpointGroup) // Set dst GroupId err = dstGrpFlow.SetMetadata(metadata, metadataMask) @@ -299,8 +326,10 @@ func (self *PolicyAgent) AddRule(rule *OfnetPolicyRule, ret *bool) error { var ipDaMask *net.IP = nil var ipSa *net.IP = nil var ipSaMask *net.IP = nil - var md *uint64 = nil - var mdm *uint64 = nil + var metadata uint64 = 0 // for calculations of md + var metadataMask uint64 = 0 // for calculations of mdm + var md *uint64 = nil // flow metadata + var mdm *uint64 = nil // flow metadata mask var flag, flagMask uint16 var flagPtr, flagMaskPtr *uint16 var err error @@ -346,24 +375,52 @@ func (self *PolicyAgent) AddRule(rule *OfnetPolicyRule, ret *bool) error { } } - // parse source/dst endpoint groups - if rule.SrcEndpointGroup != 0 && rule.DstEndpointGroup != 0 { - srcMetadata, srcMetadataMask := SrcGroupMetadata(rule.SrcEndpointGroup) - dstMetadata, dstMetadataMask := DstGroupMetadata(rule.DstEndpointGroup) - metadata := srcMetadata | dstMetadata - metadataMask := srcMetadataMask | dstMetadataMask - md = &metadata - mdm = &metadataMask - } else if rule.SrcEndpointGroup != 0 { - srcMetadata, srcMetadataMask := SrcGroupMetadata(rule.SrcEndpointGroup) - md = &srcMetadata - mdm = &srcMetadataMask - } else if rule.DstEndpointGroup != 0 { - dstMetadata, dstMetadataMask := DstGroupMetadata(rule.DstEndpointGroup) - md = &dstMetadata - mdm = &dstMetadataMask + updateMetadata := func(meta uint64, mask uint64) (*uint64, *uint64) { + metadata |= meta + metadataMask |= mask + return &metadata, &metadataMask } + // parse source/dst endpoint tenants and groups + var srcVrfId *uint16 + var dstVrfId *uint16 + if rule.SrcVrf != "" { + srcVrfId = self.agent.getvrfId(rule.SrcVrf) + if srcVrfId == nil { + errMsg := fmt.Sprintf("VRF %s was not found", rule.SrcVrf) + log.Errorf(errMsg) + return errors.New(errMsg) + } + md, mdm = updateMetadata(VrfSrcMetadata(*srcVrfId)) + } + if rule.SrcEndpointGroup != 0 { + if rule.SrcVrf == "" { + errMsg := fmt.Sprintf("Source group %v was provided without VRF", + rule.SrcEndpointGroup) + log.Errorf(errMsg) + return errors.New(errMsg) + } + md, mdm = updateMetadata(SrcGroupMetadata(*srcVrfId, rule.SrcEndpointGroup)) + } + if rule.DstVrf != "" { + dstVrfId = self.agent.getvrfId(rule.DstVrf) + if dstVrfId == nil { + errMsg := fmt.Sprintf("VRF %s was not found", rule.DstVrf) + log.Errorf(errMsg) + return errors.New(errMsg) + } + md, mdm = updateMetadata(VrfDestMetadata(*dstVrfId)) + } + if rule.DstEndpointGroup != 0 { + if rule.DstVrf == "" { + errMsg := fmt.Sprintf("Destination group %v was provided without VRF", + rule.DstEndpointGroup) + log.Errorf(errMsg) + return errors.New(errMsg) + } + + md, mdm = updateMetadata(DstGroupMetadata(*dstVrfId, rule.DstEndpointGroup)) + } // Setup TCP flags if rule.IpProtocol == 6 && rule.TcpFlags != "" { switch rule.TcpFlags { diff --git a/ofnetPolicy_test.go b/ofnetPolicy_test.go index 3a2e5561..5dc832d4 100755 --- a/ofnetPolicy_test.go +++ b/ofnetPolicy_test.go @@ -29,7 +29,8 @@ func TestPolicyAddDelete(t *testing.T) { rpcPort := uint16(9600) ovsPort := uint16(9601) lclIP := net.ParseIP("10.10.10.10") - ofnetAgent, err := NewOfnetAgent("", "vrouter", lclIP, rpcPort, ovsPort, nil) + ofnetAgent, err := NewOfnetAgent("", "vrouter", lclIP, rpcPort, ovsPort, nil, + OFNET_AGENT_ENDPOINT_IPS_ARE_NOT_UNIQUE_PARAM) if err != nil { t.Fatalf("Error creating ofnet agent. Err: %v", err) } @@ -69,6 +70,9 @@ func TestPolicyAddDelete(t *testing.T) { // Create a vlan for the endpoint ofnetAgent.AddNetwork(1, 1, "", "default") + ofnetAgent.AddNetwork(2, 2, "", "second") + ofnetAgent.AddNetwork(4, 4, "", "third") + ofnetAgent.AddNetwork(8, 8, "", "fourth") macAddr, _ := net.ParseMAC("00:01:02:03:04:05") endpoint := EndpointInfo{ @@ -95,6 +99,8 @@ func TestPolicyAddDelete(t *testing.T) { DstEndpointGroup: 200, SrcIpAddr: "10.10.10.10/24", DstIpAddr: "10.1.1.1/24", + SrcVrf: "default", + DstVrf: "second", IpProtocol: 6, DstPort: 100, SrcPort: 200, @@ -118,6 +124,8 @@ func TestPolicyAddDelete(t *testing.T) { SrcIpAddr: "20.20.20.20/24", DstIpAddr: "20.2.2.2/24", IpProtocol: 17, + SrcVrf: "third", + DstVrf: "fourth", DstPort: 300, SrcPort: 400, Action: "deny", @@ -132,45 +140,122 @@ func TestPolicyAddDelete(t *testing.T) { return } + // vrf second is allowed to talk to group in vrf third + vrfIngressRule := &OfnetPolicyRule{ + RuleId: "vrfIngressRule", + Priority: 50, + DstEndpointGroup: 400, + IpProtocol: 6, + SrcVrf: "second", + DstVrf: "third", + Action: "allow", + } + log.Infof("Adding vrf ingress rule: %+v", udpRule) + err = ofnetMaster.AddRule(vrfIngressRule) + if err != nil { + t.Errorf("Error installing vrf ingress rule {%+v}. Err: %v", vrfIngressRule, err) + return + } + // Get all the flows flowList, err := ofctlFlowDump(brName) if err != nil { t.Errorf("Error getting flow entries. Err: %v", err) return } + log.Infof("Flow dump:") + log.Infof("==========") + for _, f := range flowList { + log.Infof("%+v", f) + } + // verify src group flow - srcGrpFlowMatch := fmt.Sprintf("priority=10,in_port=12 actions=write_metadata:0x100640000/0xff7fff0000") + // vrf+group for src and dest: + // format((1<<(1+30+16)) + (100<<(1+30)) + (1<<(1+16)) + (100<<1), 'x') + // source: + // vrf mask: (((1<<14))-1)<<(1+30+16) = 2305702271725338624 + // group mask: (((1<<16))-1)<<(30+1) = 140735340871680 + // destination: + // vrf mask: (((1<<14))-1)<<(1+16) = 2147352576 + // group mask: (((1<<16))-1)<<1 = 131070 + // mask: format(2305702271725338624 + 140735340871680 + 2147352576 + 131070, 'x') + srcGrpFlowMatch := fmt.Sprintf("priority=10,in_port=12 actions=write_metadata:0x8032000200c8/0x1ffffffffffffffe") if !ofctlFlowMatch(flowList, VLAN_TBL_ID, srcGrpFlowMatch) { - fmt.Printf("Flows:\n%+v", flowList) t.Fatalf("Could not find the flow %s on ovs %s", srcGrpFlowMatch, brName) } - log.Infof("Found src group %s on ovs %s", srcGrpFlowMatch, brName) - // verify dst group flow - dstGrpFlowMatch := fmt.Sprintf("priority=100,ip,metadata=0x100000000/0xff00000000,nw_dst=10.2.2.2 actions=write_metadata:0xc8/0xfffe") + // verify metadata assignment for destination group flow + // source + // destination: + // vrf+group: format((1<<(1+16))+(100<<1), 'x') + // vrf mask: (((1<<14))-1)<<(1+16) = 2147352576 + // group mask: (((1<<16))-1)<<1 = 131070 + // mask: format(2147352576 + 131070, 'x') + dstGroupMetadatAndMask := "0x200c8/0x7ffffffe" + matchVrf := "" + if !ofnetAgent.IsEndpointIpsAreUnique() { + // dest vrf: format(1<<(1+16), 'x') + // dest vrf mask: format((((1<<14))-1)<<(1+16), 'x') + matchVrf = ",metadata=0x20000/0x7ffe0000" + } + dstGrpFlowMatch := fmt.Sprintf("priority=100,ip%s,nw_dst=10.2.2.2 actions=write_metadata:%s", + matchVrf, dstGroupMetadatAndMask) if !ofctlFlowMatch(flowList, DST_GRP_TBL_ID, dstGrpFlowMatch) { - t.Fatalf("Could not find the flow %s on ovs %s", dstGrpFlowMatch, brName) + t.Fatalf("Could not find the dest group assignment flow %s on ovs %s", + dstGrpFlowMatch, brName) } - log.Infof("Found dst group %s on ovs %s", dstGrpFlowMatch, brName) - // verify tcp rule flow entry exists - tcpFlowMatch := fmt.Sprintf("priority=110,tcp,metadata=0x640190/0x7ffffffe,nw_src=10.10.10.0/24,nw_dst=10.1.1.0/24,tp_src=200,tp_dst=100") + // source vrf mask: (((1<<14))-1)<<(1+30+16) = 2305702271725338624 + // source group mask: ( (1<<16) -1 )<<(30+1) = 140735340871680 + // dest vrf mask: (((1<<14))-1)<<(1+16) = 2147352576 + // dest group mask: ( (1<<16) -1 )<<1 = 131070 + // mask: format(2305702271725338624 + 140735340871680 + 2147352576 + 131070, 'x') + metadataMask := "0x1ffffffffffffffe" + + // verify tcp policy rule flow entry exists + // vrf 1 group 100 source + vrf 2 group 200 dest: + // format( (1<<(1+30+16)) + (100<<(30+1)) + (2<<(1+16)) + (200<<1) , 'x') + tcpFlowMatch := fmt.Sprintf("priority=110,tcp,metadata=0x803200040190/%s,nw_src=10.10.10.0/24,nw_dst=10.1.1.0/24,tp_src=200,tp_dst=100", metadataMask) if !ofctlFlowMatch(flowList, POLICY_TBL_ID, tcpFlowMatch) { t.Fatalf("Could not find the flow %s on ovs %s", tcpFlowMatch, brName) } - log.Infof("Found tcp rule %s on ovs %s", tcpFlowMatch, brName) - // verify udp rule flow - udpFlowMatch := fmt.Sprintf("priority=110,udp,metadata=0x12c0320/0x7ffffffe,nw_src=20.20.20.0/24,nw_dst=20.2.2.0/24,tp_src=400,tp_dst=300") + // verify udp policy rule flow + // vrf 3 group 300 source + vrf 4 group 400 dest: + // format( (3<<(1+30+16)) + (300<<(30+1)) + (4<<(1+16)) + (400<<1) , 'x') + udpFlowMatch := fmt.Sprintf("priority=110,udp,metadata=0x1809600080320/%s,nw_src=20.20.20.0/24,nw_dst=20.2.2.0/24,tp_src=400,tp_dst=300", metadataMask) if !ofctlFlowMatch(flowList, POLICY_TBL_ID, udpFlowMatch) { t.Fatalf("Could not find the flow %s on ovs %s", udpFlowMatch, brName) } - log.Infof("Found udp rule %s on ovs %s", udpFlowMatch, brName) + // source vrf mask: (((1<<14))-1)<<(1+30+16) = 2305702271725338624 + // dest vrf mask: (((1<<14))-1)<<(1+16) = 2147352576 + // dest group mask: ( (1<<16) -1 )<<1 = 131070 + // mask: format(2305702271725338624 + 2147352576 + 131070, 'x') + fromVrfMetadataMask := "0x1fff80007ffffffe" + + // verify vrf ingress policy rule flow + // vrf 2 source + vrf 3 group 400 dest: + // format( (2<<(1+30+16)) + (3<<(1+16)) + (400<<1) , 'x') + vrfIngressFlowMatch := fmt.Sprintf("priority=60,tcp,metadata=0x1000000060320/%s", fromVrfMetadataMask) + if !ofctlFlowMatch(flowList, POLICY_TBL_ID, vrfIngressFlowMatch) { + t.Fatalf("Could not find the flow %s on ovs %s", vrfIngressFlowMatch, brName) + } + log.Infof("Found udp rule %s on ovs %s", vrfIngressFlowMatch, brName) + + // verify output flow + // vrf+group: format((1<<(1+16)), 'x') + // vrf mask: format((((1<<14))-1)<<(1+16), 'x') + outputFlowMatch := fmt.Sprintf("priority=100,ip,metadata=0x20000/0x7ffe0000,nw_dst=10.2.2.2") + if !ofctlFlowMatch(flowList, IP_TBL_ID, outputFlowMatch) { + t.Fatalf("Could not find the flow %s on ovs %s", outputFlowMatch, brName) + } + log.Infof("Found src group %s on ovs %s", outputFlowMatch, brName) + // Delete policies err = ofnetMaster.DelRule(tcpRule) if err != nil { @@ -180,6 +265,10 @@ func TestPolicyAddDelete(t *testing.T) { if err != nil { t.Fatalf("Error deleting udpRule {%+v}. Err: %v", udpRule, err) } + err = ofnetMaster.DelRule(vrfIngressRule) + if err != nil { + t.Fatalf("Error deleting VRF ingress rule {%+v}. Err: %v", udpRule, err) + } err = ofnetAgent.RemoveLocalEndpoint(endpoint.PortNo) if err != nil { t.Fatalf("Error deleting endpoint: %+v. Err: %v", endpoint, err) @@ -206,6 +295,9 @@ func TestPolicyAddDelete(t *testing.T) { if ofctlFlowMatch(flowList, POLICY_TBL_ID, udpFlowMatch) { t.Fatalf("Still found the flow %s on ovs %s", udpFlowMatch, brName) } + if ofctlFlowMatch(flowList, POLICY_TBL_ID, vrfIngressFlowMatch) { + t.Fatalf("Still found the flow %s on ovs %s", vrfIngressFlowMatch, brName) + } - log.Infof("Verified all flows are deleted") + log.Infof("Verified all flows are deleted for TestPolicyAddDelete") } diff --git a/ofnetSvcProxy_test.go b/ofnetSvcProxy_test.go index ed87ed8b..882ed5a8 100755 --- a/ofnetSvcProxy_test.go +++ b/ofnetSvcProxy_test.go @@ -277,7 +277,8 @@ func TestSvcProxyInterface(t *testing.T) { rpcPort := uint16(9600) ovsPort := uint16(9601) lclIP := net.ParseIP("10.10.10.10") - ofnetAgent, err := NewOfnetAgent("", "vrouter", lclIP, rpcPort, ovsPort, nil) + ofnetAgent, err := NewOfnetAgent("", "vrouter", lclIP, rpcPort, ovsPort, + nil, OFNET_AGENT_ENDPOINT_IPS_ARE_NOT_UNIQUE_PARAM) if err != nil { t.Fatalf("Error creating ofnet agent. Err: %v", err) } diff --git a/ofnet_flow_test.go b/ofnet_flow_test.go index 44572311..0e054664 100644 --- a/ofnet_flow_test.go +++ b/ofnet_flow_test.go @@ -5,6 +5,8 @@ import ( "net" "strings" "testing" + + log "github.com/Sirupsen/logrus" ) // test portVlan and DSCP flows on all four forwarding modes @@ -22,12 +24,13 @@ func testOfnetPortDscpFlow(t *testing.T, agent *OfnetAgent, brName string) { ipAddr := net.ParseIP(fmt.Sprintf("11.11.2.2")) ipv6Addr := net.ParseIP(fmt.Sprintf("2017::2:2")) endpoint := EndpointInfo{ - PortNo: 14, - MacAddr: macAddr, - Vlan: 1, - IpAddr: ipAddr, - Ipv6Addr: ipv6Addr, - Dscp: 10, + EndpointGroup: 1, + PortNo: 14, + MacAddr: macAddr, + Vlan: 1, + IpAddr: ipAddr, + Ipv6Addr: ipv6Addr, + Dscp: 10, } // Install the local endpoint @@ -42,34 +45,44 @@ func testOfnetPortDscpFlow(t *testing.T, agent *OfnetAgent, brName string) { if err != nil { t.Errorf("Error getting flow entries. Err: %v", err) } + log.Infof("Flow dump:") + log.Infof("==========") + for _, f := range flowList { + log.Infof("%+v", f) + } // verify port flow - portVlanFlowMatch := fmt.Sprintf("priority=10,in_port=14 actions=write_metadata:0x100000000/0xff00000000") + meta, mask := SrcGroupMetadata(1, 1) + dstMeta, dstMask := DstGroupMetadata(1, 1) + portVlanFlowMatch := fmt.Sprintf("priority=10,in_port=14 actions=write_metadata:0x%x/0x%x", + meta|dstMeta, mask|dstMask) if agent.dpName == "vxlan" { - portVlanFlowMatch = fmt.Sprintf("priority=10,in_port=14 actions=push_vlan:0x8100,set_field:4097->vlan_vid,write_metadata:0x100000000/0xff00000000") + portVlanFlowMatch = fmt.Sprintf("priority=10,in_port=14 actions=push_vlan:0x8100,set_field:4097->vlan_vid,write_metadata:0x%x/0x%x", + meta|dstMeta, mask|dstMask) } if !ofctlFlowMatch(flowList, VLAN_TBL_ID, portVlanFlowMatch) { - fmt.Printf("Flows:\n%v", strings.Join(flowList, "\n")) t.Fatalf("Could not find the flow %s on ovs %s", portVlanFlowMatch, brName) } // verify dscp v4 flow - dscpv4FlowMatch := fmt.Sprintf("priority=100,ip,in_port=14 actions=set_field:10->ip_dscp,write_metadata:0x100000000/0xff00000000") + dscpv4FlowMatch := fmt.Sprintf("priority=100,ip,in_port=14 actions=set_field:10->ip_dscp,write_metadata:0x%x/0x%x", + meta|dstMeta, mask|dstMask) if agent.dpName == "vxlan" { - dscpv4FlowMatch = fmt.Sprintf("priority=100,ip,in_port=14 actions=set_field:10->ip_dscp,push_vlan:0x8100,set_field:4097->vlan_vid,write_metadata:0x100000000/0xff00000000") + dscpv4FlowMatch = fmt.Sprintf("priority=100,ip,in_port=14 actions=set_field:10->ip_dscp,push_vlan:0x8100,set_field:4097->vlan_vid,write_metadata:0x%x/0x%x", + meta|dstMeta, mask|dstMask) } if !ofctlFlowMatch(flowList, VLAN_TBL_ID, dscpv4FlowMatch) { - fmt.Printf("Flows:\n%v", strings.Join(flowList, "\n")) t.Fatalf("Could not find the flow %s on ovs %s", dscpv4FlowMatch, brName) } // verify dscp v6 flow - dscpv6FlowMatch := fmt.Sprintf("priority=100,ipv6,in_port=14 actions=set_field:10->ip_dscp,write_metadata:0x100000000/0xff00000000") + dscpv6FlowMatch := fmt.Sprintf("priority=100,ipv6,in_port=14 actions=set_field:10->ip_dscp,write_metadata:0x%x/0x%x", + meta|dstMeta, mask|dstMask) if agent.dpName == "vxlan" { - dscpv6FlowMatch = fmt.Sprintf("priority=100,ipv6,in_port=14 actions=set_field:10->ip_dscp,push_vlan:0x8100,set_field:4097->vlan_vid,write_metadata:0x100000000/0xff00000000") + dscpv6FlowMatch = fmt.Sprintf("priority=100,ipv6,in_port=14 actions=set_field:10->ip_dscp,push_vlan:0x8100,set_field:4097->vlan_vid,write_metadata:0x%x/0x%x", + meta|dstMeta, mask|dstMask) } if !ofctlFlowMatch(flowList, VLAN_TBL_ID, dscpv6FlowMatch) { - fmt.Printf("Flows:\n%v", strings.Join(flowList, "\n")) t.Fatalf("Could not find the flow %s on ovs %s", dscpv6FlowMatch, brName) } @@ -93,22 +106,16 @@ func testOfnetPortDscpFlow(t *testing.T, agent *OfnetAgent, brName string) { } // verify dscp v4 flow - dscpv4FlowMatch = fmt.Sprintf("priority=100,ip,in_port=14 actions=set_field:20->ip_dscp,write_metadata:0x100000000/0xff00000000") - if agent.dpName == "vxlan" { - dscpv4FlowMatch = fmt.Sprintf("priority=100,ip,in_port=14 actions=set_field:20->ip_dscp,push_vlan:0x8100,set_field:4097->vlan_vid,write_metadata:0x100000000/0xff00000000") - } + dscpv4FlowMatch = strings.Replace(dscpv4FlowMatch, "10->ip_dscp", "20->ip_dscp", 1) if !ofctlFlowMatch(flowList, VLAN_TBL_ID, dscpv4FlowMatch) { - fmt.Printf("Flows:\n%v", strings.Join(flowList, "\n")) + fmt.Printf("Flows:\n%v\n", strings.Join(flowList, "\n")) t.Fatalf("Could not find the flow %s on ovs %s", dscpv4FlowMatch, brName) } // verify dscp v6 flow - dscpv6FlowMatch = fmt.Sprintf("priority=100,ipv6,in_port=14 actions=set_field:20->ip_dscp,write_metadata:0x100000000/0xff00000000") - if agent.dpName == "vxlan" { - dscpv6FlowMatch = fmt.Sprintf("priority=100,ipv6,in_port=14 actions=set_field:20->ip_dscp,push_vlan:0x8100,set_field:4097->vlan_vid,write_metadata:0x100000000/0xff00000000") - } + dscpv6FlowMatch = strings.Replace(dscpv6FlowMatch, "10->ip_dscp", "20->ip_dscp", 1) if !ofctlFlowMatch(flowList, VLAN_TBL_ID, dscpv6FlowMatch) { - fmt.Printf("Flows:\n%v", strings.Join(flowList, "\n")) + fmt.Printf("Flows:\n%v\n", strings.Join(flowList, "\n")) t.Fatalf("Could not find the flow %s on ovs %s", dscpv6FlowMatch, brName) } @@ -118,7 +125,7 @@ func testOfnetPortDscpFlow(t *testing.T, agent *OfnetAgent, brName string) { Dscp: 0, } - // Install the local endpoint + // Update the local endpoint err = agent.UpdateLocalEndpoint(endpointInfo) if err != nil { t.Fatalf("Error updating endpoint: %+v. Err: %v", endpointInfo, err) @@ -128,36 +135,24 @@ func testOfnetPortDscpFlow(t *testing.T, agent *OfnetAgent, brName string) { // get the flow entries flowList, err = ofctlFlowDump(brName) if err != nil { - t.Errorf("Error getting flow entries. Err: %v", err) + t.Fatalf("Error getting flow entries. Err: %v", err) } // verify port flow still exists - portVlanFlowMatch = fmt.Sprintf("priority=10,in_port=14 actions=write_metadata:0x100000000/0xff00000000") - if agent.dpName == "vxlan" { - portVlanFlowMatch = fmt.Sprintf("priority=10,in_port=14 actions=push_vlan:0x8100,set_field:4097->vlan_vid,write_metadata:0x100000000/0xff00000000") - } if !ofctlFlowMatch(flowList, VLAN_TBL_ID, portVlanFlowMatch) { - fmt.Printf("Flows:\n%v", strings.Join(flowList, "\n")) + fmt.Printf("Flows:\n%v\n", strings.Join(flowList, "\n")) t.Fatalf("Could not find the flow %s on ovs %s", portVlanFlowMatch, brName) } // verify dscp v4 flow is removed - dscpv4FlowMatch = fmt.Sprintf("priority=100,ip,in_port=14 actions=set_field:20->ip_dscp,write_metadata:0x100000000/0xff00000000") - if agent.dpName == "vxlan" { - dscpv4FlowMatch = fmt.Sprintf("priority=100,ip,in_port=14 actions=set_field:20->ip_dscp,push_vlan:0x8100,set_field:4097->vlan_vid,write_metadata:0x100000000/0xff00000000") - } if ofctlFlowMatch(flowList, VLAN_TBL_ID, dscpv4FlowMatch) { - fmt.Printf("Flows:\n%v", strings.Join(flowList, "\n")) + fmt.Printf("Flows:\n%v\n", strings.Join(flowList, "\n")) t.Fatalf("Flow %s is still present on ovs %s", dscpv4FlowMatch, brName) } // verify dscp v6 flow is removed - dscpv6FlowMatch = fmt.Sprintf("priority=100,ipv6,in_port=14 actions=set_field:20->ip_dscp,write_metadata:0x100000000/0xff00000000") - if agent.dpName == "vxlan" { - dscpv6FlowMatch = fmt.Sprintf("priority=100,ipv6,in_port=14 actions=set_field:20->ip_dscp,push_vlan:0x8100,set_field:4097->vlan_vid,write_metadata:0x100000000/0xff00000000") - } if ofctlFlowMatch(flowList, VLAN_TBL_ID, dscpv6FlowMatch) { - fmt.Printf("Flows:\n%v", strings.Join(flowList, "\n")) + fmt.Printf("Flows:\n%v\n", strings.Join(flowList, "\n")) t.Fatalf("Flow %s is still present on ovs %s", dscpv6FlowMatch, brName) } @@ -167,7 +162,7 @@ func testOfnetPortDscpFlow(t *testing.T, agent *OfnetAgent, brName string) { Dscp: 30, } - // Install the local endpoint + // Update the local endpoint err = agent.UpdateLocalEndpoint(endpointInfo) if err != nil { t.Fatalf("Error updating endpoint: %+v. Err: %v", endpointInfo, err) @@ -177,26 +172,26 @@ func testOfnetPortDscpFlow(t *testing.T, agent *OfnetAgent, brName string) { // get the flow entries flowList, err = ofctlFlowDump(brName) if err != nil { - t.Errorf("Error getting flow entries. Err: %v", err) + t.Fatalf("Error getting flow entries. Err: %v", err) } - // verify dscp v4 flow - dscpv4FlowMatch = fmt.Sprintf("priority=100,ip,in_port=14 actions=set_field:30->ip_dscp,write_metadata:0x100000000/0xff00000000") - if agent.dpName == "vxlan" { - dscpv4FlowMatch = fmt.Sprintf("priority=100,ip,in_port=14 actions=set_field:30->ip_dscp,push_vlan:0x8100,set_field:4097->vlan_vid,write_metadata:0x100000000/0xff00000000") + // verify port flow still exists + if !ofctlFlowMatch(flowList, VLAN_TBL_ID, portVlanFlowMatch) { + fmt.Printf("Flows:\n%v\n", strings.Join(flowList, "\n")) + t.Fatalf("Could not find the flow %s on ovs %s", portVlanFlowMatch, brName) } + + // verify dscp v4 flow + dscpv4FlowMatch = strings.Replace(dscpv4FlowMatch, "20->ip_dscp", "30->ip_dscp", 1) if !ofctlFlowMatch(flowList, VLAN_TBL_ID, dscpv4FlowMatch) { - fmt.Printf("Flows:\n%v", strings.Join(flowList, "\n")) + fmt.Printf("Flows:\n%v\n", strings.Join(flowList, "\n")) t.Fatalf("Could not find the flow %s on ovs %s", dscpv4FlowMatch, brName) } // verify dscp v6 flow - dscpv6FlowMatch = fmt.Sprintf("priority=100,ipv6,in_port=14 actions=set_field:30->ip_dscp,write_metadata:0x100000000/0xff00000000") - if agent.dpName == "vxlan" { - dscpv6FlowMatch = fmt.Sprintf("priority=100,ipv6,in_port=14 actions=set_field:30->ip_dscp,push_vlan:0x8100,set_field:4097->vlan_vid,write_metadata:0x100000000/0xff00000000") - } + dscpv6FlowMatch = strings.Replace(dscpv6FlowMatch, "20->ip_dscp", "30->ip_dscp", 1) if !ofctlFlowMatch(flowList, VLAN_TBL_ID, dscpv6FlowMatch) { - fmt.Printf("Flows:\n%v", strings.Join(flowList, "\n")) + fmt.Printf("Flows:\n%v\n", strings.Join(flowList, "\n")) t.Fatalf("Could not find the flow %s on ovs %s", dscpv6FlowMatch, brName) } @@ -210,36 +205,24 @@ func testOfnetPortDscpFlow(t *testing.T, agent *OfnetAgent, brName string) { // get the flow entries flowList, err = ofctlFlowDump(brName) if err != nil { - t.Errorf("Error getting flow entries. Err: %v", err) + t.Fatalf("Error getting flow entries. Err: %v", err) } // verify port flow is removed - portVlanFlowMatch = fmt.Sprintf("priority=10,in_port=14 actions=write_metadata:0x100000000/0xff00000000") - if agent.dpName == "vxlan" { - portVlanFlowMatch = fmt.Sprintf("priority=10,in_port=14 actions=push_vlan:0x8100,set_field:4097->vlan_vid,write_metadata:0x100000000/0xff00000000") - } if ofctlFlowMatch(flowList, VLAN_TBL_ID, portVlanFlowMatch) { - fmt.Printf("Flows:\n%v", strings.Join(flowList, "\n")) + fmt.Printf("Flows:\n%v\n", strings.Join(flowList, "\n")) t.Fatalf("Flow %s is still present on ovs %s", portVlanFlowMatch, brName) } // verify dscp v4 flow is removed - dscpv4FlowMatch = fmt.Sprintf("priority=100,ip,in_port=14 actions=set_field:30->ip_dscp,write_metadata:0x100000000/0xff00000000") - if agent.dpName == "vxlan" { - dscpv4FlowMatch = fmt.Sprintf("priority=100,ip,in_port=14 actions=set_field:30->ip_dscp,push_vlan:0x8100,set_field:4097->vlan_vid,write_metadata:0x100000000/0xff00000000") - } if ofctlFlowMatch(flowList, VLAN_TBL_ID, dscpv4FlowMatch) { - fmt.Printf("Flows:\n%v", strings.Join(flowList, "\n")) + fmt.Printf("Flows:\n%v\n", strings.Join(flowList, "\n")) t.Fatalf("Flow %s is still present on ovs %s", dscpv4FlowMatch, brName) } // verify dscp v6 flow is removed - dscpv6FlowMatch = fmt.Sprintf("priority=100,ipv6,in_port=14 actions=set_field:30->ip_dscp,write_metadata:0x100000000/0xff00000000") - if agent.dpName == "vxlan" { - dscpv6FlowMatch = fmt.Sprintf("priority=100,ipv6,in_port=14 actions=set_field:30->ip_dscp,push_vlan:0x8100,set_field:4097->vlan_vid,write_metadata:0x100000000/0xff00000000") - } if ofctlFlowMatch(flowList, VLAN_TBL_ID, dscpv6FlowMatch) { - fmt.Printf("Flows:\n%v", strings.Join(flowList, "\n")) + fmt.Printf("Flows:\n%v\n", strings.Join(flowList, "\n")) t.Fatalf("Flow %s is still present on ovs %s", dscpv6FlowMatch, brName) } } diff --git a/ofnet_route_test.go b/ofnet_route_test.go index cd418aa8..1b5b4467 100755 --- a/ofnet_route_test.go +++ b/ofnet_route_test.go @@ -42,12 +42,13 @@ func TestOfnetVrouteAddDelete(t *testing.T) { } hostPvtIP := net.ParseIP(fmt.Sprintf("172.20.20.%d", uint32(NUM_AGENT+2))) endpoint := EndpointInfo{ - PortNo: uint32(NUM_AGENT + 2), - MacAddr: macAddr, - Vlan: 1, - IpAddr: ipAddr, - Ipv6Addr: ipv6Addr, - HostPvtIP: hostPvtIP, + EndpointGroup: 1, + PortNo: uint32(NUM_AGENT + 2), + MacAddr: macAddr, + Vlan: 1, + IpAddr: ipAddr, + Ipv6Addr: ipv6Addr, + HostPvtIP: hostPvtIP, } log.Infof("Installing local vrouter endpoint: %+v", endpoint) @@ -70,12 +71,16 @@ func TestOfnetVrouteAddDelete(t *testing.T) { if err != nil { t.Errorf("Error getting flow entries. Err: %v", err) } + log.Infof("Flow dump:") + log.Infof("==========") + for _, f := range flowList { + log.Infof("%+v", f) + } - log.Infof("Flowlist: %v", flowList) // verify ingress host NAT flows hpInMatch := fmt.Sprintf("priority=99,in_port=%d actions=goto_table:%d", testHostPort+i, HOST_DNAT_TBL_ID) verifyHostNAT(t, flowList, 0, hpInMatch, true) - hpDnatMatch := fmt.Sprintf("priority=100,ip,in_port=%d,nw_dst=172.20.20.%d actions=set_field:02:02:02:%02x:%02x:%02x->eth_dst,set_field:10.10.%d.%d->ip_dst,write_metadata:0x100000000/0xff00000000,goto_table:%d", testHostPort+i, NUM_AGENT+2, i+1, i+1, i+1, i+1, i+1, SRV_PROXY_SNAT_TBL_ID) + hpDnatMatch := fmt.Sprintf("priority=100,ip,in_port=%d,nw_dst=172.20.20.%d actions=set_field:02:02:02:%02x:%02x:%02x->eth_dst,set_field:10.10.%d.%d->ip_dst,write_metadata:0x20000/0x7ffe0000,goto_table:%d", testHostPort+i, NUM_AGENT+2, i+1, i+1, i+1, i+1, i+1, SRV_PROXY_SNAT_TBL_ID) verifyHostNAT(t, flowList, HOST_DNAT_TBL_ID, hpDnatMatch, true) // verify egress host NAT flows ipMiss := fmt.Sprintf("priority=1 actions=goto_table:%d", HOST_SNAT_TBL_ID) @@ -87,24 +92,29 @@ func TestOfnetVrouteAddDelete(t *testing.T) { verifyHostNAT(t, flowList, HOST_SNAT_TBL_ID, denyFlow, true) // verify flow entry exists + missingFlow := false for j := 0; j < NUM_AGENT; j++ { k := j + 1 - ipFlowMatch := fmt.Sprintf("priority=100,ip,metadata=0x100000000/0xff00000000,nw_dst=10.10.%d.%d", k, k) + ipFlowMatch := fmt.Sprintf("priority=100,ip,metadata=0x20000/0x7ffe0000,nw_dst=10.10.%d.%d", k, k) ipTableId := IP_TBL_ID - if !ofctlFlowMatch(flowList, ipTableId, ipFlowMatch) { - t.Errorf("Could not find the route %s on ovs %s", ipFlowMatch, brName) + if ofctlFlowMatch(flowList, ipTableId, ipFlowMatch) { + log.Infof("Found ip table flow %s on ovs %s", ipFlowMatch, brName) + } else { + t.Errorf("Could not find the ip table route %s on ovs %s", ipFlowMatch, brName) + missingFlow = true } - log.Infof("Found ipflow %s on ovs %s", ipFlowMatch, brName) if k%2 == 0 { - ipv6FlowMatch := fmt.Sprintf("priority=100,ipv6,metadata=0x100000000/0xff00000000,ipv6_dst=2016::%d:%d", k, k) + ipv6FlowMatch := fmt.Sprintf("priority=100,ipv6,metadata=0x20000/0x7ffe0000,ipv6_dst=2016::%d:%d", k, k) if !ofctlFlowMatch(flowList, ipTableId, ipv6FlowMatch) { - t.Errorf("Could not find IPv6 route %s on ovs %s", ipv6FlowMatch, brName) - return + t.Errorf("Could not find iptable IPv6 route %s on ovs %s", ipv6FlowMatch, brName) + missingFlow = true } - log.Infof("Found IPv6 ipflow %s on ovs %s", ipv6FlowMatch, brName) + log.Infof("Found iptable IPv6 ipflow %s on ovs %s", ipv6FlowMatch, brName) } - + } + if missingFlow { + return } } @@ -150,7 +160,7 @@ func TestOfnetVrouteAddDelete(t *testing.T) { // verify ingress host NAT flows hpInMatch := fmt.Sprintf("priority=99,in_port=%d actions=goto_table:%d", testHostPort+i, HOST_DNAT_TBL_ID) verifyHostNAT(t, flowList, 0, hpInMatch, false) - hpDnatMatch := fmt.Sprintf("priority=100,ip,in_port=%d,nw_dst=172.20.20.%d actions=set_field:02:02:02:%02x:%02x:%02x->eth_dst,set_field:10.10.%d.%d->ip_dst,write_metadata:0x100000000/0xff00000000,goto_table:%d", testHostPort+i, NUM_AGENT+2, i+1, i+1, i+1, i+1, i+1, SRV_PROXY_SNAT_TBL_ID) + hpDnatMatch := fmt.Sprintf("priority=100,ip,in_port=%d,nw_dst=172.20.20.%d actions=set_field:02:02:02:%02x:%02x:%02x->eth_dst,set_field:10.10.%d.%d->ip_dst,write_metadata:0x800000000000/0x1fff800000000000,goto_table:%d", testHostPort+i, NUM_AGENT+2, i+1, i+1, i+1, i+1, i+1, SRV_PROXY_SNAT_TBL_ID) verifyHostNAT(t, flowList, HOST_DNAT_TBL_ID, hpDnatMatch, false) hostSnat := fmt.Sprintf("priority=100,ip,in_port=%d actions=set_field:00:11:22:33:44:%02x->eth_dst,set_field:172.20.20.%d->ip_src,output:%d", NUM_AGENT+2, i, NUM_AGENT+2, testHostPort+i) verifyHostNAT(t, flowList, HOST_SNAT_TBL_ID, hostSnat, false) @@ -160,13 +170,13 @@ func TestOfnetVrouteAddDelete(t *testing.T) { // verify flow entry exists for j := 0; j < NUM_AGENT; j++ { k := j + 1 - ipFlowMatch := fmt.Sprintf("priority=100,ip,metadata=0x100000000/0xff00000000,nw_dst=10.10.%d.%d", k, k) + ipFlowMatch := fmt.Sprintf("priority=100,ip,metadata=0x800000000000/0x1fff800000000000,nw_dst=10.10.%d.%d", k, k) ipTableId := IP_TBL_ID if ofctlFlowMatch(flowList, ipTableId, ipFlowMatch) { t.Errorf("Still found the flow %s on ovs %s", ipFlowMatch, brName) } if k%2 == 0 { - ipv6FlowMatch := fmt.Sprintf("priority=100,ipv6,metadata=0x100000000/0xff00000000,ipv6_dst=2016::%d:%d", k, k) + ipv6FlowMatch := fmt.Sprintf("priority=100,ipv6,metadata=0x800000000000/0x1fff800000000000,ipv6_dst=2016::%d:%d", k, k) if ofctlFlowMatch(flowList, ipTableId, ipv6FlowMatch) { t.Errorf("Still found the flow %s on ovs %s", ipv6FlowMatch, brName) } @@ -174,7 +184,7 @@ func TestOfnetVrouteAddDelete(t *testing.T) { } } - log.Infof("Verified all flows are deleted") + log.Infof("Verified all flows are deleted for TestOfnetVrouteAddDelete") } } diff --git a/ofnet_test.go b/ofnet_test.go index 4d0c9682..93963a9f 100755 --- a/ofnet_test.go +++ b/ofnet_test.go @@ -108,7 +108,8 @@ func TestMain(m *testing.M) { rpcPort := uint16(VRTR_RPC_PORT + i) ovsPort := uint16(VRTR_OVS_PORT + i) lclIp := net.ParseIP(localIpList[i]) - vrtrAgents[i], err = NewOfnetAgent(brName, "vrouter", lclIp, rpcPort, ovsPort, nil) + vrtrAgents[i], err = NewOfnetAgent(brName, "vrouter", lclIp, rpcPort, + ovsPort, nil, OFNET_AGENT_ENDPOINT_IPS_ARE_NOT_UNIQUE_PARAM) if err != nil { log.Fatalf("Error creating ofnet agent. Err: %v", err) } @@ -125,7 +126,8 @@ func TestMain(m *testing.M) { ovsPort := uint16(VXLAN_OVS_PORT + i) lclIp := net.ParseIP(localIpList[i]) - vxlanAgents[i], err = NewOfnetAgent(brName, "vxlan", lclIp, rpcPort, ovsPort, nil) + vxlanAgents[i], err = NewOfnetAgent(brName, "vxlan", lclIp, rpcPort, + ovsPort, nil, OFNET_AGENT_ENDPOINT_IPS_ARE_NOT_UNIQUE_PARAM) if err != nil { log.Fatalf("Error creating ofnet agent. Err: %v", err) } @@ -142,7 +144,8 @@ func TestMain(m *testing.M) { ovsPort := uint16(VLAN_OVS_PORT + i) lclIp := net.ParseIP(localIpList[i]) - vlanAgents[i], err = NewOfnetAgent(brName, "vlan", lclIp, rpcPort, ovsPort, nil) + vlanAgents[i], err = NewOfnetAgent(brName, "vlan", lclIp, rpcPort, + ovsPort, nil, OFNET_AGENT_ENDPOINT_IPS_ARE_NOT_UNIQUE_PARAM) if err != nil { log.Fatalf("Error creating ofnet agent. Err: %v", err) } @@ -161,7 +164,7 @@ func TestMain(m *testing.M) { portName := "inb0" + fmt.Sprintf("%d", i) driver := ovsdbDriver.NewOvsDriver(brName) driver.CreatePort(portName, "internal", uint(1+i)) - vlrtrAgents[i], err = NewOfnetAgent(brName, "vlrouter", lclIp, rpcPort, ovsPort, []string{portName}) + vlrtrAgents[i], err = NewOfnetAgent(brName, "vlrouter", lclIp, rpcPort, ovsPort, []string{portName}, OFNET_AGENT_ENDPOINT_IPS_ARE_NOT_UNIQUE_PARAM) if err != nil { log.Fatalf("Error creating ofnet agent. Err: %v", err) } diff --git a/ovsSwitch/ovsSwitch.go b/ovsSwitch/ovsSwitch.go index 56798ead..11902e68 100644 --- a/ovsSwitch/ovsSwitch.go +++ b/ovsSwitch/ovsSwitch.go @@ -51,7 +51,10 @@ func NewOvsSwitch(bridgeName, netType, localIP string) (*OvsSwitch, error) { sw.ovsdbDriver = ovsdbDriver.NewOvsDriver(bridgeName) // Create an ofnet agent - sw.ofnetAgent, err = ofnet.NewOfnetAgent(netType, net.ParseIP(localIP), ofnet.OFNET_AGENT_PORT, OVS_CTRLER_PORT) + sw.ofnetAgent, err = ofnet.NewOfnetAgent(netType, net.ParseIP(localIP), + ofnet.OFNET_AGENT_PORT, OVS_CTRLER_PORT, + OFNET_AGENT_ENDPOINT_IPS_ARE_NOT_UNIQUE_PARAM) + if err != nil { log.Fatalf("Error initializing ofnet") return nil, err diff --git a/util.go b/util.go index 76f57efd..49b53972 100755 --- a/util.go +++ b/util.go @@ -167,7 +167,8 @@ func buildUDPRespPkt(inEth *protocol.Ethernet, uData []byte) (*protocol.Ethernet return outEth, nil } -// createPortVlanFlow creates port vlan flow based on endpoint metadata +// createPortVlanFlow creates port vlan flow (traffic coming out of a pod) +// based on endpoint metadata func createPortVlanFlow(agent *OfnetAgent, vlanTable, nextTable *ofctrl.Table, endpoint *OfnetEndpoint) (*ofctrl.Flow, error) { // Install a flow entry for vlan mapping portVlanFlow, err := vlanTable.NewFlow(ofctrl.FlowMatch{ @@ -179,16 +180,24 @@ func createPortVlanFlow(agent *OfnetAgent, vlanTable, nextTable *ofctrl.Table, e return nil, err } - //set vrf id as METADATA + // set vrf id as METADATA for both source and destination + // this enables traffic to reach same VRF when there are overlapping + // IPs across VRFs and apply policy against the source VRF + // If IPs are unique and traffic is not isolated to single VRF (kubernetes) + // thn the table to set destination group will not match source VRF, + // just IP and rewrite the destination VRF vrfid := agent.getvrfId(endpoint.Vrf) - metadata, metadataMask := Vrfmetadata(*vrfid) + metadata, metadataMask := VrfSrcMetadata(*vrfid) + destMetadata, destMetadataMask := VrfDestMetadata(*vrfid) + metadata = metadata | destMetadata + metadataMask = metadataMask | destMetadataMask // set source EPG id if required if endpoint.EndpointGroup != 0 { - srcMetadata, srcMetadataMask := SrcGroupMetadata(endpoint.EndpointGroup) - metadata = metadata | srcMetadata - metadataMask = metadataMask | srcMetadataMask - + srcMetadata, srcMetadataMask := SrcGroupMetadata(*vrfid, endpoint.EndpointGroup) + dstMetadata, dstMetadataMask := DstGroupMetadata(*vrfid, endpoint.EndpointGroup) + metadata = metadata | srcMetadata | dstMetadata + metadataMask = metadataMask | srcMetadataMask | dstMetadataMask } // set vlan if required @@ -238,16 +247,24 @@ func createDscpFlow(agent *OfnetAgent, vlanTable, nextTable *ofctrl.Table, endpo return nil, nil, err } - //set vrf id as METADATA + // set vrf id as METADATA for both source and destination + // this enables traffic to reach same VRF when there are overlapping + // IPs across VRFs and apply policy against the source VRF + // If IPs are unique and traffic is not isolated to single VRF (kubernetes) + // thn the table to set destination group will not match source VRF, + // just IP and rewrite the destination VRF vrfid := agent.getvrfId(endpoint.Vrf) - metadata, metadataMask := Vrfmetadata(*vrfid) + metadata, metadataMask := VrfSrcMetadata(*vrfid) + destMetadata, destMetadataMask := VrfDestMetadata(*vrfid) + metadata = metadata | destMetadata + metadataMask = metadataMask | destMetadataMask // set source EPG id if required if endpoint.EndpointGroup != 0 { - srcMetadata, srcMetadataMask := SrcGroupMetadata(endpoint.EndpointGroup) - metadata = metadata | srcMetadata - metadataMask = metadataMask | srcMetadataMask - + srcMetadata, srcMetadataMask := SrcGroupMetadata(*vrfid, endpoint.EndpointGroup) + dstMetadata, dstMetadataMask := DstGroupMetadata(*vrfid, endpoint.EndpointGroup) + metadata = metadata | srcMetadata | dstMetadata + metadataMask = metadataMask | srcMetadataMask | dstMetadataMask } // set vlan if required diff --git a/vlrouter.go b/vlrouter.go index c8b8f40d..e95665b9 100755 --- a/vlrouter.go +++ b/vlrouter.go @@ -633,7 +633,7 @@ func (vl *Vlrouter) AddEndpoint(endpoint *OfnetEndpoint) error { } //set vrf id as METADATA - //metadata, metadataMask := Vrfmetadata(*vrfid) + //metadata, metadataMask := VrfDestMetadata(*vrfid) outPort, err := vl.ofSwitch.OutputPort(endpoint.PortNo) if err != nil { @@ -787,7 +787,7 @@ func (vl *Vlrouter) AddRemoteIpv6Flow(endpoint *OfnetEndpoint) error { } //set vrf id as METADATA - //metadata, metadataMask := Vrfmetadata(*vrfid) + //metadata, metadataMask := VrfDestMetadata(*vrfid) outPort, err := vl.ofSwitch.OutputPort(endpoint.PortNo) if err != nil { diff --git a/vrouter.go b/vrouter.go index 37274f8e..6a53204a 100755 --- a/vrouter.go +++ b/vrouter.go @@ -263,7 +263,7 @@ func (self *Vrouter) AddLocalEndpoint(endpoint OfnetEndpoint) error { return errors.New("Invalid vrf name") } - vrfmetadata, vrfmetadataMask := Vrfmetadata(*vrfid) + vrfmetadata, vrfmetadataMask := VrfDestMetadata(*vrfid) // Install the IP address ipFlow, err := self.ipTable.NewFlow(ofctrl.FlowMatch{ @@ -417,7 +417,7 @@ func (self *Vrouter) RemoveLocalEndpoint(endpoint OfnetEndpoint) error { flowId := self.agent.getEndpointIdByIpVlan(endpoint.IpAddr, endpoint.Vlan) ipFlow := self.flowDb[flowId] if ipFlow == nil { - log.Errorf("Error finding the flow for endpoint: %+v", endpoint) + log.Errorf("Error finding the flow to remove for local endpoint by IP and VLAN: %+v", endpoint) return errors.New("Flow not found") } @@ -575,7 +575,7 @@ func (self *Vrouter) AddLocalIpv6Flow(endpoint OfnetEndpoint) error { } //Ip table look up will be vrf,ip - vrfmetadata, vrfmetadataMask := Vrfmetadata(*vrfid) + vrfmetadata, vrfmetadataMask := VrfDestMetadata(*vrfid) // Install the IPv6 address ipv6Flow, err := self.ipTable.NewFlow(ofctrl.FlowMatch{ Priority: FLOW_MATCH_PRIORITY, @@ -625,7 +625,7 @@ func (self *Vrouter) RemoveLocalIpv6Flow(endpoint OfnetEndpoint) error { flowId := self.agent.getEndpointIdByIpVlan(endpoint.Ipv6Addr, endpoint.Vlan) ipv6Flow := self.flowDb[flowId] if ipv6Flow == nil { - log.Errorf("Error finding the flow for endpoint: %+v", endpoint) + log.Errorf("Error finding the ipv6 flow by IP and VLAN for local endpoint: %+v", endpoint) return errors.New("Flow not found") } @@ -704,10 +704,11 @@ func (self *Vrouter) AddVtepPort(portNo uint32, remoteIp net.IP) error { } //set vrf id as METADATA - vrfmetadata, vrfmetadataMask := Vrfmetadata(*vrfid) + vrfmetadata, vrfmetadataMask := VrfSrcMetadata(*vrfid) + dstVrfMetadata, dstVrfMetadataMask := VrfDestMetadata(*vrfid) - metadata := METADATA_RX_VTEP | vrfmetadata - metadataMask := METADATA_RX_VTEP | vrfmetadataMask + metadata := METADATA_RX_VTEP | vrfmetadata | dstVrfMetadata + metadataMask := METADATA_RX_VTEP | vrfmetadataMask | dstVrfMetadataMask portVlanFlow.SetMetadata(metadata, metadataMask) @@ -800,7 +801,7 @@ func (self *Vrouter) AddVlan(vlanId uint16, vni uint32, vrf string) error { } //set vrf id as METADATA - vrfmetadata, vrfmetadataMask := Vrfmetadata(*vrfid) + vrfmetadata, vrfmetadataMask := VrfSrcMetadata(*vrfid) // Set the metadata to indicate packet came in from VTEP port metadata := METADATA_RX_VTEP | vrfmetadata @@ -873,7 +874,7 @@ func (self *Vrouter) AddEndpoint(endpoint *OfnetEndpoint) error { } //set vrf id as METADATA - metadata, metadataMask := Vrfmetadata(*vrfid) + metadata, metadataMask := VrfDestMetadata(*vrfid) // Install the IP address ipFlow, err := self.ipTable.NewFlow(ofctrl.FlowMatch{ @@ -934,7 +935,7 @@ func (self *Vrouter) RemoveEndpoint(endpoint *OfnetEndpoint) error { flowId := self.agent.getEndpointIdByIpVlan(endpoint.IpAddr, endpoint.Vlan) ipFlow := self.flowDb[flowId] if ipFlow == nil { - log.Errorf("Error finding the flow for endpoint: %+v", endpoint) + log.Errorf("Error finding the flow to remove by IP and VLAN for endpoint: %+v", endpoint) return errors.New("Flow not found") } @@ -990,7 +991,7 @@ func (self *Vrouter) AddRemoteIpv6Flow(endpoint *OfnetEndpoint) error { } //set vrf id as METADATA - metadata, metadataMask := Vrfmetadata(*vrfid) + metadata, metadataMask := VrfDestMetadata(*vrfid) // Install the IP address ipv6Flow, err := self.ipTable.NewFlow(ofctrl.FlowMatch{ @@ -1040,7 +1041,7 @@ func (self *Vrouter) RemoveRemoteIpv6Flow(endpoint *OfnetEndpoint) error { flowId := self.agent.getEndpointIdByIpVlan(endpoint.Ipv6Addr, endpoint.Vlan) ipv6Flow := self.flowDb[flowId] if ipv6Flow == nil { - log.Errorf("Error finding the flow for endpoint: %+v", endpoint) + log.Errorf("Error finding the IPv6 flow for removal by IP and VLAN for endpoint: %+v", endpoint) return errors.New("Flow not found") } @@ -1302,9 +1303,24 @@ func (self *Vrouter) processArp(pkt protocol.Ethernet, inPort uint32) { } } -func Vrfmetadata(vrfid uint16) (uint64, uint64) { - metadata := uint64(vrfid) << 32 - metadataMask := uint64(0xFF00000000) +func VrfDestMetadata(vrfid uint16) (uint64, uint64) { + // 1 bit for VTEP, 16 for group + metadata := uint64(vrfid) << 17 + // 14 bits shifted 1 for vtep flag and 16 for group + // format((((1<<14))-1)<<(1+16), 'x') + metadataMask := uint64(0x7ffe0000) + metadata = metadata & metadataMask + + return metadata, metadataMask +} + +func VrfSrcMetadata(vrfid uint16) (uint64, uint64) { + // 1 bit for VTEP, 30 for dest tenant+group, 16 for group + metadata := uint64(vrfid) << 47 + // 14 bits shifted 1 for vtep flag and 30 for dest tenant+group + // and 16 for source group + // format((((1<<14))-1)<<(1+30+16), 'x') + metadataMask := uint64(0x1FFF800000000000) metadata = metadata & metadataMask return metadata, metadataMask diff --git a/vxlanBridge.go b/vxlanBridge.go index 889f7d58..168c5392 100755 --- a/vxlanBridge.go +++ b/vxlanBridge.go @@ -490,10 +490,11 @@ func (self *Vxlan) AddVtepPort(portNo uint32, remoteIp net.IP) error { return fmt.Errorf("Unable to find vrf for vlan %v", *vlan) } //set vrf id as METADATA - vrfmetadata, vrfmetadataMask := Vrfmetadata(*vrfid) + vrfmetadata, vrfmetadataMask := VrfSrcMetadata(*vrfid) + dstVrfMetadata, dstVrfMetadataMask := VrfDestMetadata(*vrfid) - metadata := METADATA_RX_VTEP | vrfmetadata - metadataMask := METADATA_RX_VTEP | vrfmetadataMask + metadata := METADATA_RX_VTEP | vrfmetadata | dstVrfMetadata + metadataMask := METADATA_RX_VTEP | vrfmetadataMask | dstVrfMetadataMask portVlanFlow.SetMetadata(metadata, metadataMask) @@ -620,10 +621,11 @@ func (self *Vxlan) AddVlan(vlanId uint16, vni uint32, vrf string) error { return fmt.Errorf("Unable to find vrf for vlan %v", *vlan) } //set vrf id as METADATA - vrfmetadata, vrfmetadataMask := Vrfmetadata(*vrfid) + vrfmetadata, vrfmetadataMask := VrfSrcMetadata(*vrfid) + dstVrfMetadata, dstVrfMetadataMask := VrfDestMetadata(*vrfid) - metadata := METADATA_RX_VTEP | vrfmetadata - metadataMask := METADATA_RX_VTEP | vrfmetadataMask + metadata := METADATA_RX_VTEP | vrfmetadata | dstVrfMetadata + metadataMask := METADATA_RX_VTEP | vrfmetadataMask | dstVrfMetadataMask portVlanFlow.SetMetadata(metadata, metadataMask)