Skip to content

Commit

Permalink
Add network policy info to flow records
Browse files Browse the repository at this point in the history
Network policy info (name and namespace), both ingress and egress
policies are added.
  • Loading branch information
srikartati committed Nov 2, 2020
1 parent 60b723a commit 34cc9ca
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 57 deletions.
1 change: 1 addition & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ func run(o *Options) error {
connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, o.config.OVSDatapathType, features.DefaultFeatureGate.Enabled(features.AntreaProxy)),
ifaceStore,
proxier,
ofClient,
o.pollInterval)
pollDone := make(chan struct{})
go connStore.Run(stopCh, pollDone)
Expand Down
10 changes: 4 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/cenkalti/hub v1.0.1 // indirect
github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa // indirect
github.com/cheggaaa/pb/v3 v3.0.4
github.com/confluentinc/bincover v0.0.0-20200910210245-839e88185831
github.com/confluentinc/bincover v0.1.0
github.com/containernetworking/cni v0.7.1
github.com/containernetworking/plugins v0.8.2-0.20190724153215-ded2f1757770
github.com/contiv/libOpenflow v0.0.0-20201014051314-c1702744526c
Expand All @@ -40,14 +40,12 @@ require (
github.com/stretchr/testify v1.5.1
github.com/ti-mo/conntrack v0.3.0
github.com/vishvananda/netlink v1.1.0
github.com/vmware/go-ipfix v0.2.1
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
github.com/vmware/go-ipfix v0.2.3
golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975
golang.org/x/exp v0.0.0-20190312203227-4b39c73a6495
golang.org/x/net v0.0.0-20200822124328-c89045814202 // indirect
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
golang.org/x/sys v0.0.0-20200331124033-c3d80250170d
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/grpc v1.26.0
gopkg.in/yaml.v2 v2.2.8
k8s.io/api v0.18.4
Expand Down
28 changes: 16 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ github.com/cheggaaa/pb/v3 v3.0.4/go.mod h1:7rgWxLrAUcFMkvJuv09+DYi7mMUYi8nO9iOWc
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa h1:OaNxuTZr7kxeODyLWsRMC+OD03aFUH+mW6r2d+MWa5Y=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
github.com/confluentinc/bincover v0.0.0-20200910210245-839e88185831 h1:ywdQifxYw0VXYZfWtykHW785ueW1PgLrYuSdHA31gk4=
github.com/confluentinc/bincover v0.0.0-20200910210245-839e88185831/go.mod h1:qeI1wx0RxdGTZtrJY0HVlgJ4NqC/X2Z+fHbvy87tgHE=
github.com/confluentinc/bincover v0.1.0 h1:M4Gfj4rCXuUQVe8TqT/VXcAMjLyvN81oDRy79fjSv3o=
github.com/confluentinc/bincover v0.1.0/go.mod h1:qeI1wx0RxdGTZtrJY0HVlgJ4NqC/X2Z+fHbvy87tgHE=
github.com/containerd/cgroups v0.0.0-20190919134610-bf292b21730f h1:tSNMc+rJDfmYntojat8lljbt1mgKNpTxUZJsSzJ9Y1s=
github.com/containerd/cgroups v0.0.0-20190919134610-bf292b21730f/go.mod h1:OApqhQ4XNSNC13gXIwDjhOQxjWa/NxkwZXJ1EvqT0ko=
github.com/containerd/console v0.0.0-20180822173158-c12b1e7919c1/go.mod h1:Tj/on1eG8kiEhd0+fhSDzsPAFESxzBBvdyEgyryXffw=
Expand Down Expand Up @@ -138,6 +138,8 @@ github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeME
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/go-logr/logr v0.2.0 h1:QvGt2nLcHH0WK9orKa+ppBPAxREcH364nPUedEpK0TY=
github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
github.com/go-openapi/jsonpointer v0.0.0-20160704185906-46af16f9f7b1/go.mod h1:+35s3my2LFTysnkMfxsJBAMHj/DoqoB9knIWoYG/Vk0=
github.com/go-openapi/jsonpointer v0.19.2/go.mod h1:3akKfEdA7DF1sugOqz1dVQHBcuDBPKZGEoHC/NkiQRg=
github.com/go-openapi/jsonpointer v0.19.3 h1:gihV7YNZK1iK6Tgwwsxo2rJbD1GTbdm72325Bq8FI3w=
Expand Down Expand Up @@ -380,8 +382,12 @@ github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYp
github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df h1:OviZH7qLw/7ZovXvuNyL3XQl8UFofeikI1NW1Gypu7k=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
github.com/vmware/go-ipfix v0.2.1 h1:6Sj4/A7LPlhCiJMRsjSyn8zjkk+ZBONXMgBKZ+epFgA=
github.com/vmware/go-ipfix v0.2.1/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU=
github.com/vmware/go-ipfix v0.2.0 h1:rRFBy3iITa2BMUnMLaE7rivtz4aYjd9z4s2L6EaEBFw=
github.com/vmware/go-ipfix v0.2.0/go.mod h1:1R0lTFofwtd3AksRJtXdLrbdT8nIxKFHObfkXBkJPyA=
github.com/vmware/go-ipfix v0.2.2 h1:GvkvDPjdA+uAp9JPn24PxEoNgLN7XQlAG7UPasrJ/Vk=
github.com/vmware/go-ipfix v0.2.2/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU=
github.com/vmware/go-ipfix v0.2.3 h1:El/6HuU+DTo/u+3quuhdRvhgTR+vOOoZwiv1WuNbpP4=
github.com/vmware/go-ipfix v0.2.3/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU=
github.com/wenyingd/ofnet v0.0.0-20201015012029-21df99f8161d h1:wjTew5yHsgqNXpQPIEduDLFR4pZv4iVPcRYhZGyr7Lk=
github.com/wenyingd/ofnet v0.0.0-20201015012029-21df99f8161d/go.mod h1:oF9872TvzJqLzLKDGVMItRLWJHlnwXluuIuNbOP5WKM=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
Expand All @@ -407,9 +413,8 @@ golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975 h1:/Tl7pH94bvbAAHBdZJT947M/+gp0+CqQXDtMRC0fseo=
golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190312203227-4b39c73a6495 h1:I6A9Ag9FpEKOjcKrRNjQkPHawoXIhKyTGfvvjFAiiAk=
golang.org/x/exp v0.0.0-20190312203227-4b39c73a6495/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand Down Expand Up @@ -438,9 +443,8 @@ golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20191004110552-13f9640d40b9/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191007182048-72f939374954/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e h1:3G+cUijn7XD+S4eJFddp53Pv7+slrESplyjG25HgL+k=
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200822124328-c89045814202 h1:VvcQYSHwXgi7W+TpUR6A9g6Up98WAHf3f/ulnJ62IyA=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 h1:SVwTIAaPC2U/AvvLNZ2a7OVsmBpC8L5BlwK1whH3hm0=
Expand All @@ -450,9 +454,8 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 h1:qwRHBd0NqMbJxfbotnDhm2ByMI1Shq4Y6oRJo21SGJA=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -507,9 +510,8 @@ golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBn
golang.org/x/tools v0.0.0-20190614205625-5aca471b1d59/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20190920225731-5eefd052ad72/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
Expand Down Expand Up @@ -573,6 +575,8 @@ k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUc
k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8=
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
k8s.io/klog/v2 v2.3.0 h1:WmkrnW7fdrm0/DMClc+HIxtftvxVIPAhlVwMQo5yLco=
k8s.io/klog/v2 v2.3.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y=
k8s.io/kube-aggregator v0.18.4 h1:OoQD9bbA+blyhcppPMCEabngLaJ/PJNc4ksNr8tyIzY=
k8s.io/kube-aggregator v0.18.4/go.mod h1:xOVy4wqhpivXCt07Diwdms2gonG+SONVx+1e7O+GfC0=
k8s.io/kube-openapi v0.0.0-20200410145947-61e04a5be9a6 h1:Oh3Mzx5pJ+yIumsAD0MOECPVeXsVot0UkiaCGVyfGQY=
Expand Down
45 changes: 44 additions & 1 deletion pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package connections

import (
"encoding/binary"
"fmt"
"sync"
"time"
Expand All @@ -40,16 +41,24 @@ type ConnectionStore struct {
connDumper ConnTrackDumper
ifaceStore interfacestore.InterfaceStore
antreaProxier proxy.Proxier
ofClient openflow.Client
pollInterval time.Duration
mutex sync.Mutex
}

func NewConnectionStore(connTrackDumper ConnTrackDumper, ifaceStore interfacestore.InterfaceStore, proxier proxy.Proxier, pollInterval time.Duration) *ConnectionStore {
func NewConnectionStore(
connTrackDumper ConnTrackDumper,
ifaceStore interfacestore.InterfaceStore,
proxier proxy.Proxier,
ofClient openflow.Client,
pollInterval time.Duration,
) *ConnectionStore {
return &ConnectionStore{
connections: make(map[flowexporter.ConnectionKey]flowexporter.Connection),
connDumper: connTrackDumper,
ifaceStore: ifaceStore,
antreaProxier: proxier,
ofClient: ofClient,
pollInterval: pollInterval,
}
}
Expand Down Expand Up @@ -118,6 +127,7 @@ func (cs *ConnectionStore) addOrUpdateConn(conn *flowexporter.Connection) {
conn.DestinationPodName = dIface.ContainerInterfaceConfig.PodName
conn.DestinationPodNamespace = dIface.ContainerInterfaceConfig.PodNamespace
}

// Do not export flow records of connections whose destination is local Pod and source is remote Pod.
// We export flow records only from "source node", where the connection is originated from. This is to avoid
// 2 copies of flow records at flow collector. This restriction will be removed when flow records store network policy rule ID.
Expand Down Expand Up @@ -145,6 +155,39 @@ func (cs *ConnectionStore) addOrUpdateConn(conn *flowexporter.Connection) {
}
}
}

// Add network policy name and namespace from Connection label
if len(conn.Labels) != 0 {
// Make sure this is LittleEndian or BigEndian?
klog.V(4).Infof("connection label: %x; label masks: %x", conn.Labels, conn.LabelsMask)
ingressOfID := binary.LittleEndian.Uint32(conn.Labels[:4])
egressOfID := binary.LittleEndian.Uint32(conn.Labels[4:8])
// TODO: There's a chance that the ingressOfID is released and reused by another
// NetworkPolicy rule could be modified in-between by the time the conntrack
// table is polled, leading to incorrect network policy info.
// This is similar to the atomic requirement given in network policy
// metrics. We probably need a different solution.
if ingressOfID != 0 {
policy := cs.ofClient.GetPolicyFromConjunction(ingressOfID)
// Same as above, this may be because the NetworkPolicy is removed right after the metrics are fetched.
if policy == nil {
klog.Infof("Cannot find NetworkPolicy that has ingressOfID %v", ingressOfID)
} else {
conn.IngressNetworkPolicyName = policy.Name
conn.IngressNetworkPolicyNamespace = policy.Namespace
}
}
if egressOfID != 0 {
policy := cs.ofClient.GetPolicyFromConjunction(egressOfID)
// Same as above, this may be because the NetworkPolicy is removed right after the metrics are fetched.
if policy == nil {
klog.Infof("Cannot find NetworkPolicy that has egressOfID %v", egressOfID)
} else {
conn.EgressNetworkPolicyName = policy.Name
conn.EgressNetworkPolicyNamespace = policy.Namespace
}
}
}
metrics.TotalAntreaConnectionsInConnTrackTable.Inc()
klog.V(4).Infof("New Antrea flow added: %v", conn)
// Add new antrea connection to connection store
Expand Down
10 changes: 6 additions & 4 deletions pkg/agent/flowexporter/connections/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package connections

import (
"fmt"
openflowtest "github.com/vmware-tanzu/antrea/pkg/agent/openflow/testing"
"net"
"strings"
"testing"
Expand Down Expand Up @@ -138,7 +139,8 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
mockProxier := proxytest.NewMockProxier(ctrl)
connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, mockProxier, testPollInterval)
mockOfClient := openflowtest.NewMockClient(ctrl)
connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, mockProxier, mockOfClient, testPollInterval)

// Add flow1conn to the Connection map
testFlow1Tuple := flowexporter.NewConnectionKey(&testFlow1)
Expand Down Expand Up @@ -221,7 +223,7 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) {
// Create ConnectionStore
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, nil, testPollInterval)
connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, nil, nil, testPollInterval)
// Add flows to the Connection store
for i, flow := range testFlows {
connStore.connections[*testFlowKeys[i]] = *flow
Expand Down Expand Up @@ -286,7 +288,7 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) {
// Create ConnectionStore
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, nil, testPollInterval)
connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, nil, nil, testPollInterval)
// Add flows to the connection store.
for i, flow := range testFlows {
connStore.connections[*testFlowKeys[i]] = *flow
Expand All @@ -310,7 +312,7 @@ func TestConnectionStore_MetricSettingInPoll(t *testing.T) {
// Create ConnectionStore
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, nil, testPollInterval)
connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, nil, nil, testPollInterval)
// Hard-coded conntrack occupancy metrics for test
TotalConnections := 0
MaxConnections := 300000
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/flowexporter/connections/conntrack_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ func netlinkFlowToAntreaConnection(conn *conntrack.Flow) *flowexporter.Connectio
DoExport: true,
Zone: conn.Zone,
Mark: conn.Mark,
Labels: conn.Labels,
LabelsMask: conn.LabelsMask,
StatusFlag: uint32(conn.Status.Value),
TupleOrig: tupleOrig,
TupleReply: tupleReply,
Expand Down
37 changes: 22 additions & 15 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,11 @@ var (
"packetDeltaCount",
"octetDeltaCount",
}
// Substring "reverse" is an indication to get reverse element of go-ipfix library.
IANAReverseInfoElements = []string{
"reverse_PacketTotalCount",
"reverse_OctetTotalCount",
"reverse_PacketDeltaCount",
"reverse_OctetDeltaCount",
"packetTotalCount",
"octetTotalCount",
"packetDeltaCount",
"octetDeltaCount",
}
AntreaInfoElements = []string{
"sourcePodName",
Expand All @@ -57,8 +56,12 @@ var (
"destinationPodName",
"destinationPodNamespace",
"destinationNodeName",
"destinationClusterIP",
"destinationClusterIPv4",
"destinationServicePortName",
"ingressNetworkPolicyName",
"ingressNetworkPolicyNamespace",
"egressNetworkPolicyName",
"egressNetworkPolicyNamespace",
}
)

Expand Down Expand Up @@ -208,7 +211,7 @@ func (exp *flowExporter) sendTemplateRecord(templateRec ipfix.IPFIXRecord) (int,
}
}
for _, ie := range IANAReverseInfoElements {
element, err := exp.registry.GetInfoElement(ie, ipfixregistry.ReverseEnterpriseID)
element, err := exp.registry.GetInfoElement(ie, ipfixregistry.IANAReversedEnterpriseID)
if err != nil {
return 0, fmt.Errorf("%s not present. returned error: %v", ie, err)
}
Expand Down Expand Up @@ -244,9 +247,9 @@ func (exp *flowExporter) sendDataRecord(dataRec ipfix.IPFIXRecord, record flowex
var err error
switch ieName := ie.Name; ieName {
case "flowStartSeconds":
_, err = dataRec.AddInfoElement(ie, record.Conn.StartTime.Unix())
_, err = dataRec.AddInfoElement(ie, uint32(record.Conn.StartTime.Unix()))
case "flowEndSeconds":
_, err = dataRec.AddInfoElement(ie, record.Conn.StopTime.Unix())
_, err = dataRec.AddInfoElement(ie, uint32(record.Conn.StopTime.Unix()))
case "sourceIPv4Address":
_, err = dataRec.AddInfoElement(ie, record.Conn.TupleOrig.SourceAddress)
case "destinationIPv4Address":
Expand Down Expand Up @@ -323,7 +326,7 @@ func (exp *flowExporter) sendDataRecord(dataRec ipfix.IPFIXRecord, record flowex
} else {
_, err = dataRec.AddInfoElement(ie, "")
}
case "destinationClusterIP":
case "destinationClusterIPv4":
if record.Conn.DestinationServicePortName != "" {
_, err = dataRec.AddInfoElement(ie, record.Conn.TupleOrig.DestinationAddress)
} else {
Expand All @@ -333,11 +336,15 @@ func (exp *flowExporter) sendDataRecord(dataRec ipfix.IPFIXRecord, record flowex
_, err = dataRec.AddInfoElement(ie, net.IP{0, 0, 0, 0})
}
case "destinationServicePortName":
if record.Conn.DestinationServicePortName != "" {
_, err = dataRec.AddInfoElement(ie, record.Conn.DestinationServicePortName)
} else {
_, err = dataRec.AddInfoElement(ie, "")
}
_, err = dataRec.AddInfoElement(ie, record.Conn.DestinationServicePortName)
case "ingressNetworkPolicyName":
_, err = dataRec.AddInfoElement(ie, record.Conn.IngressNetworkPolicyName)
case "ingressNetworkPolicyNamespace":
_, err = dataRec.AddInfoElement(ie, record.Conn.IngressNetworkPolicyNamespace)
case "egressNetworkPolicyName":
_, err = dataRec.AddInfoElement(ie, record.Conn.EgressNetworkPolicyName)
case "egressNetworkPolicyNamespace":
_, err = dataRec.AddInfoElement(ie, record.Conn.EgressNetworkPolicyNamespace)
}
if err != nil {
return fmt.Errorf("error while adding info element: %s to data record: %v", ie.Name, err)
Expand Down
Loading

0 comments on commit 34cc9ca

Please sign in to comment.