diff --git a/go.mod b/go.mod index d55d7f9b42f..83a36729ec8 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ require ( github.com/stretchr/testify v1.6.1 github.com/ti-mo/conntrack v0.3.0 github.com/vishvananda/netlink v1.1.0 - github.com/vmware/go-ipfix v0.5.2 + github.com/vmware/go-ipfix v0.2.1-0.20210519004359-604fc6c4914c golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 golang.org/x/mod v0.4.0 @@ -58,6 +58,7 @@ require ( k8s.io/apiserver v0.21.0 k8s.io/client-go v0.21.0 k8s.io/component-base v0.21.0 + k8s.io/klog v1.0.0 k8s.io/klog/v2 v2.8.0 k8s.io/kube-aggregator v0.21.0 k8s.io/kube-openapi v0.0.0-20210305164622-f622666832c1 diff --git a/go.sum b/go.sum index d7c4c5fb79e..1a74dc7e61c 100644 --- a/go.sum +++ b/go.sum @@ -597,8 +597,8 @@ github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYp github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI= github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df h1:OviZH7qLw/7ZovXvuNyL3XQl8UFofeikI1NW1Gypu7k= github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= -github.com/vmware/go-ipfix v0.5.2 h1:5VIW+HoJoq2i/mhN8ErNSYXNpjbtALImLIH72xh2Afc= -github.com/vmware/go-ipfix v0.5.2/go.mod h1:rMDcGc5pEmG+wT2grK5ZgvsF1EOdxqHDnNkB/kFJT78= +github.com/vmware/go-ipfix v0.2.1-0.20210519004359-604fc6c4914c h1:tnSPF1e69v/DCKSqdtiyvyIkusSslhLX8fpHbWGni6s= +github.com/vmware/go-ipfix v0.2.1-0.20210519004359-604fc6c4914c/go.mod h1:rMDcGc5pEmG+wT2grK5ZgvsF1EOdxqHDnNkB/kFJT78= github.com/wenyingd/ofnet v0.0.0-20210318032909-171b6795a2da h1:ragN21nQa4zKuCwR2UEbTXEAh3L2YN/Id5SCVkjjwdY= github.com/wenyingd/ofnet v0.0.0-20210318032909-171b6795a2da/go.mod h1:8mMMWAYBNUeTGXYKizOLETfN3WIbu3P5DgvS2jiXKdI= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= @@ -972,8 +972,9 @@ k8s.io/component-base v0.21.0/go.mod h1:qvtjz6X0USWXbgmbfXR+Agik4RZ3jv2Bgr5QnZzd k8s.io/component-helpers v0.21.0/go.mod h1:tezqefP7lxfvJyR+0a+6QtVrkZ/wIkyMLK4WcQ3Cj8U= k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= k8s.io/gengo v0.0.0-20201214224949-b6c5ce23f027/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= -k8s.io/klog v0.3.0 h1:0VPpR+sizsiivjIfIAQH/rl8tan6jvWkS7lU+0di3lE= k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= +k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8= +k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= k8s.io/klog/v2 v2.8.0 h1:Q3gmuM9hKEjefWFFYF0Mat+YyFJvsUyYuwyNNJ5C9Ts= diff --git a/pkg/agent/flowexporter/exporter/exporter_perf_test.go b/pkg/agent/flowexporter/exporter/exporter_perf_test.go new file mode 100644 index 00000000000..57a0852efb3 --- /dev/null +++ b/pkg/agent/flowexporter/exporter/exporter_perf_test.go @@ -0,0 +1,269 @@ +// +build !race + +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporter + +import ( + "flag" + "fmt" + "math/rand" + "net" + goruntime "runtime" + "testing" + "time" + + "github.com/golang/mock/gomock" + "k8s.io/klog" + + "antrea.io/antrea/pkg/agent/flowexporter" + "antrea.io/antrea/pkg/agent/flowexporter/connections" + connectionstest "antrea.io/antrea/pkg/agent/flowexporter/connections/testing" + "antrea.io/antrea/pkg/agent/flowexporter/flowrecords" + interfacestoretest "antrea.io/antrea/pkg/agent/interfacestore/testing" + proxytest "antrea.io/antrea/pkg/agent/proxy/testing" + "antrea.io/antrea/pkg/ipfix" + queriertest "antrea.io/antrea/pkg/querier/testing" +) + +const ( + testPollInterval = 2 * time.Second + testConnectionsCount = 30000 + testUpdateConnectionsCount = 20000 + testNewConnectionsCount = 4000 + testDyingConnectionsCount = 1000 + testZoneFilter = uint16(65520) + testDuration = 15 * time.Second + testIdleTimeOut = 5 * time.Second + testPerfActiveFlowTimeout = 2 * time.Second + testPerfIdleFlowTimeout = 1 * time.Second +) + +var count = 0 +var exp *flowExporter + +func BenchmarkExport(b *testing.B) { + disableLogToStderr() + ctrl := gomock.NewController(b) + defer ctrl.Finish() + + registry := ipfix.NewIPFIXRegistry() + registry.LoadRegistry() + stopChan1 := make(chan struct{}) + stopChan2 := make(chan struct{}) + listener := startLocalServer(b, stopChan1, stopChan2) + + conns := getConnections(testConnectionsCount) + records := flowrecords.NewFlowRecords() + mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl) + mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl) + mockProxier := proxytest.NewMockProxier(ctrl) + npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl) + connStore := connections.NewConnectionStore(mockConnDumper, records, mockIfaceStore, true, false, mockProxier, npQuerier, testPollInterval) + var err error + exp, err = NewFlowExporter(connStore, records, listener.String(), listener.Network(), testPerfActiveFlowTimeout, testPerfIdleFlowTimeout, false, true, false, nil, nil, false) + if err != nil { + b.Errorf("error in starting flow exporter: %v", err) + } + // Generate connections for Dumpflows, each round with update connections, new connections and dying connections + mockConnDumper.EXPECT().DumpFlows(testZoneFilter).Return(conns, testConnectionsCount, nil).Times(1) + for i := 0; i < 5; i++ { + conns = getUpdateConnections(conns, testNewConnectionsCount, testUpdateConnectionsCount, testDyingConnectionsCount, i) + mockConnDumper.EXPECT().DumpFlows(testZoneFilter).Return(conns, len(conns), nil).Times(1) + } + mockConnDumper.EXPECT().DumpFlows(testZoneFilter).Return(conns, len(conns), nil).AnyTimes() + mockConnDumper.EXPECT().GetMaxConnections().Return(testConnectionsCount, nil).AnyTimes() + mockIfaceStore.EXPECT().GetInterfaceByIP(gomock.Any()).Return(nil, false).AnyTimes() + + var maxAlloc uint64 + go statMaxMemAlloc(&maxAlloc, 500*time.Millisecond, stopChan2) + go connStore.Run(stopChan1) + go exp.Run(stopChan2) + <-stopChan2 + b.Logf(`Summary metrics: + TOTAL_CONNECTIONS INIT_CONNECTIONS UPDATE_CONNECTIONS/poll NEW_CONNECTIONS/poll DYING_CONNECTIONS/poll POLL_INTERVAL(s) MEMORY(M) +%d %d %d %d %d %d %-12d +`, count, testConnectionsCount, testUpdateConnectionsCount, testNewConnectionsCount, testDyingConnectionsCount, testPollInterval/1000000, maxAlloc/1024/1024) +} + +func startLocalServer(b *testing.B, stopChan1, stopChan2 chan struct{}) (address net.Addr) { + udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") + if err != nil { + b.Fatalf("Got error when resolving UDP address: %v", err) + } + conn, err := net.ListenUDP("udp", udpAddr) + if err != nil { + b.Fatalf("Got error when creating a local server: %v", err) + } + if err != nil { + b.Fatalf("Got error when creating a local server: %v", err) + } + prevCount := 0 + ticker := time.NewTicker(testIdleTimeOut) + go func() { + defer conn.Close() + for { + buff := make([]byte, 198) + _, _, err := conn.ReadFromUDP(buff) + if err != nil { + b.Error(err) + return + } + count++ + } + }() + go func() { + time.Sleep(testDuration) + close(stopChan1) + for { + select { + case <-ticker.C: + if count > prevCount { + prevCount = count + continue + } else { + exp.process.CloseConnToCollector() + close(stopChan2) + } + } + } + }() + return conn.LocalAddr() +} + +func getConnections(numberOfConn int) []*flowexporter.Connection { + connections := make([]*flowexporter.Connection, numberOfConn) + for i := 0; i < numberOfConn; i++ { + randomNum := rand.Intn(255) + src := net.ParseIP(fmt.Sprintf("192.168.0.%d", randomNum)) + dst := net.ParseIP(fmt.Sprintf("192.169.0.%d", randomNum)) + tuple, revTuple := makeTuple(&src, &dst, 6, uint16(i), uint16(i)) + conn := &flowexporter.Connection{ + StartTime: time.Now().Add(-time.Duration(randomNum) * time.Millisecond), + StopTime: time.Now().Add(-time.Duration(randomNum) * time.Millisecond), + IsPresent: true, + DoneExport: false, + TupleOrig: tuple, + TupleReply: revTuple, + OriginalPackets: 100, + OriginalBytes: 10, + ReversePackets: 50, + ReverseBytes: 5, + SourcePodNamespace: "ns1", + SourcePodName: "pod1", + DestinationPodNamespace: "ns2", + DestinationPodName: "pod2", + DestinationServicePortName: "service", + TCPState: "SYN_SENT", + } + connections[i] = conn + } + return connections +} + +func getUpdateConnections(conns []*flowexporter.Connection, numOfNewConn int, numOfUpdateConn int, numOfDyingConn int, index int) []*flowexporter.Connection { + currentTime := time.Now().Add(time.Duration(index) * time.Second) + connections := make([]*flowexporter.Connection, 0) + randomNum := rand.Intn(len(conns) - numOfUpdateConn) + + // update connections and delete connections in dying state + for i := 0; i < randomNum; i++ { + if conns[i].TCPState == "TIME_WAIT" { + continue + } + conn := *conns[i] + connections = append(connections, &conn) + } + for i := randomNum; i < numOfUpdateConn+randomNum; i++ { + if conns[i].TCPState == "TIME_WAIT" { + continue + } + conn := *conns[i] + conn.OriginalPackets += 10 + conn.OriginalBytes += 10 + conn.ReversePackets += 5 + conn.ReverseBytes += 5 + rand := rand.Intn(1000) + conn.StopTime = currentTime.Add(-time.Duration(rand) * time.Millisecond) + connections = append(connections, &conn) + } + for i := numOfUpdateConn + randomNum; i < len(conns); i++ { + if conns[i].TCPState == "TIME_WAIT" { + continue + } + conn := *conns[i] + connections = append(connections, &conn) + } + + // add new connections + for i := 0; i < numOfNewConn; i++ { + randomNum1 := rand.Intn(255) + randomNum2 := rand.Intn(255) + randomNum3 := rand.Intn(255) + src := net.ParseIP(fmt.Sprintf("192.%d.%d.%d", randomNum1, randomNum2, randomNum3)) + dst := net.ParseIP(fmt.Sprintf("192.%d.%d.%d", randomNum1, randomNum2, randomNum3)) + randomPort := rand.Intn(90000) + tuple, revTuple := makeTuple(&src, &dst, 6, uint16(randomPort), uint16(randomPort)) + conn := &flowexporter.Connection{ + StartTime: currentTime.Add(-time.Duration(randomNum1) * time.Millisecond), + StopTime: currentTime.Add(-time.Duration(randomNum1) * time.Millisecond), + IsPresent: true, + DoneExport: false, + TupleOrig: tuple, + TupleReply: revTuple, + OriginalPackets: 100, + OriginalBytes: 10, + ReversePackets: 50, + ReverseBytes: 5, + SourcePodNamespace: "ns1", + SourcePodName: "pod1", + DestinationPodNamespace: "ns2", + DestinationPodName: "pod2", + DestinationServicePortName: "service", + TCPState: "SYN_SENT", + } + connections = append(connections, conn) + } + + // change connections to dying state, which will be deleted in next round + randomNum = rand.Intn(len(connections) - numOfDyingConn) + for i := randomNum; i < numOfDyingConn+randomNum; i++ { + connections[i].TCPState = "TIME_WAIT" + } + return connections +} + +func statMaxMemAlloc(maxAlloc *uint64, interval time.Duration, stopCh chan struct{}) { + var memStats goruntime.MemStats + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + goruntime.ReadMemStats(&memStats) + if memStats.Alloc > *maxAlloc { + *maxAlloc = memStats.Alloc + } + case <-stopCh: + return + } + } +} + +func disableLogToStderr() { + klogFlagSet := flag.NewFlagSet("klog", flag.ContinueOnError) + klog.InitFlags(klogFlagSet) + klogFlagSet.Parse([]string{"-logtostderr=false"}) +} diff --git a/pkg/agent/flowexporter/exporter/exporter_test.go b/pkg/agent/flowexporter/exporter/exporter_test.go index 0614906eef2..61cd484a622 100644 --- a/pkg/agent/flowexporter/exporter/exporter_test.go +++ b/pkg/agent/flowexporter/exporter/exporter_test.go @@ -98,7 +98,7 @@ func sendTemplateSet(t *testing.T, ctrl *gomock.Controller, mockIPFIXExpProc *ip // Following consists of all elements that are in IANAInfoElements and AntreaInfoElements (globals) // Only the element name is needed, other arguments have dummy values. - var elemList = make([]*ipfixentities.InfoElementWithValue, 0) + elemList := getElementList(isIPv6) ianaIE := IANAInfoElementsIPv4 antreaIE := AntreaInfoElementsIPv4 if isIPv6 { @@ -106,15 +106,12 @@ func sendTemplateSet(t *testing.T, ctrl *gomock.Controller, mockIPFIXExpProc *ip antreaIE = AntreaInfoElementsIPv6 } for i, ie := range ianaIE { - elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAEnterpriseID, 0), nil)) mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAEnterpriseID).Return(elemList[i].Element, nil) } for i, ie := range IANAReverseInfoElements { - elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAReversedEnterpriseID, 0), nil)) mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAReversedEnterpriseID).Return(elemList[i+len(ianaIE)].Element, nil) } for i, ie := range antreaIE { - elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil)) mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaIE)+len(IANAReverseInfoElements)].Element, nil) } if !isIPv6 { @@ -135,6 +132,26 @@ func sendTemplateSet(t *testing.T, ctrl *gomock.Controller, mockIPFIXExpProc *ip assert.Len(t, eL, len(ianaIE)+len(IANAReverseInfoElements)+len(antreaIE), "flowExp.elementsList and template record should have same number of elements") } +func getElementList(isIPv6 bool) []*ipfixentities.InfoElementWithValue { + elemList := make([]*ipfixentities.InfoElementWithValue, 0) + ianaIE := IANAInfoElementsIPv4 + antreaIE := AntreaInfoElementsIPv4 + if isIPv6 { + ianaIE = IANAInfoElementsIPv6 + antreaIE = AntreaInfoElementsIPv6 + } + for _, ie := range ianaIE { + elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAEnterpriseID, 0), nil)) + } + for _, ie := range IANAReverseInfoElements { + elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAReversedEnterpriseID, 0), nil)) + } + for _, ie := range antreaIE { + elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil)) + } + return elemList +} + // TestFlowExporter_sendDataRecord tests essentially if element names in the switch-case matches globals // IANAInfoElements and AntreaInfoElements. func TestFlowExporter_sendDataSet(t *testing.T) { diff --git a/pkg/agent/flowexporter/flowrecords/flow_records.go b/pkg/agent/flowexporter/flowrecords/flow_records.go index aa77dcf7a6d..5861193d4a6 100644 --- a/pkg/agent/flowexporter/flowrecords/flow_records.go +++ b/pkg/agent/flowexporter/flowrecords/flow_records.go @@ -125,7 +125,7 @@ func (fr *FlowRecords) ForAllFlowRecordsDo(callback flowexporter.FlowRecordCallB for k, v := range fr.recordsMap { err := callback(k, v) if err != nil { - klog.Errorf("Error when executing callback for flow record") + klog.Errorf("Error when executing callback for flow record: %v", err) return err } }