Skip to content

Commit

Permalink
add performance unit test for sendFlowRecords
Browse files Browse the repository at this point in the history
  • Loading branch information
zyiou committed May 21, 2021
1 parent fa54190 commit 6604dbd
Show file tree
Hide file tree
Showing 5 changed files with 297 additions and 9 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
github.com/stretchr/testify v1.6.1
github.com/ti-mo/conntrack v0.3.0
github.com/vishvananda/netlink v1.1.0
github.com/vmware/go-ipfix v0.5.2
github.com/vmware/go-ipfix v0.2.1-0.20210519004359-604fc6c4914c
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6
golang.org/x/mod v0.4.0
Expand All @@ -58,6 +58,7 @@ require (
k8s.io/apiserver v0.21.0
k8s.io/client-go v0.21.0
k8s.io/component-base v0.21.0
k8s.io/klog v1.0.0
k8s.io/klog/v2 v2.8.0
k8s.io/kube-aggregator v0.21.0
k8s.io/kube-openapi v0.0.0-20210305164622-f622666832c1
Expand Down
7 changes: 4 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -597,8 +597,8 @@ github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYp
github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df h1:OviZH7qLw/7ZovXvuNyL3XQl8UFofeikI1NW1Gypu7k=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
github.com/vmware/go-ipfix v0.5.2 h1:5VIW+HoJoq2i/mhN8ErNSYXNpjbtALImLIH72xh2Afc=
github.com/vmware/go-ipfix v0.5.2/go.mod h1:rMDcGc5pEmG+wT2grK5ZgvsF1EOdxqHDnNkB/kFJT78=
github.com/vmware/go-ipfix v0.2.1-0.20210519004359-604fc6c4914c h1:tnSPF1e69v/DCKSqdtiyvyIkusSslhLX8fpHbWGni6s=
github.com/vmware/go-ipfix v0.2.1-0.20210519004359-604fc6c4914c/go.mod h1:rMDcGc5pEmG+wT2grK5ZgvsF1EOdxqHDnNkB/kFJT78=
github.com/wenyingd/ofnet v0.0.0-20210318032909-171b6795a2da h1:ragN21nQa4zKuCwR2UEbTXEAh3L2YN/Id5SCVkjjwdY=
github.com/wenyingd/ofnet v0.0.0-20210318032909-171b6795a2da/go.mod h1:8mMMWAYBNUeTGXYKizOLETfN3WIbu3P5DgvS2jiXKdI=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
Expand Down Expand Up @@ -972,8 +972,9 @@ k8s.io/component-base v0.21.0/go.mod h1:qvtjz6X0USWXbgmbfXR+Agik4RZ3jv2Bgr5QnZzd
k8s.io/component-helpers v0.21.0/go.mod h1:tezqefP7lxfvJyR+0a+6QtVrkZ/wIkyMLK4WcQ3Cj8U=
k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
k8s.io/gengo v0.0.0-20201214224949-b6c5ce23f027/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E=
k8s.io/klog v0.3.0 h1:0VPpR+sizsiivjIfIAQH/rl8tan6jvWkS7lU+0di3lE=
k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8=
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y=
k8s.io/klog/v2 v2.8.0 h1:Q3gmuM9hKEjefWFFYF0Mat+YyFJvsUyYuwyNNJ5C9Ts=
Expand Down
269 changes: 269 additions & 0 deletions pkg/agent/flowexporter/exporter/exporter_perf_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
// +build !race

// Copyright 2021 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package exporter

import (
"flag"
"fmt"
"math/rand"
"net"
goruntime "runtime"
"testing"
"time"

"github.com/golang/mock/gomock"
"k8s.io/klog"

"antrea.io/antrea/pkg/agent/flowexporter"
"antrea.io/antrea/pkg/agent/flowexporter/connections"
connectionstest "antrea.io/antrea/pkg/agent/flowexporter/connections/testing"
"antrea.io/antrea/pkg/agent/flowexporter/flowrecords"
interfacestoretest "antrea.io/antrea/pkg/agent/interfacestore/testing"
proxytest "antrea.io/antrea/pkg/agent/proxy/testing"
"antrea.io/antrea/pkg/ipfix"
queriertest "antrea.io/antrea/pkg/querier/testing"
)

const (
testPollInterval = 2 * time.Second
testConnectionsCount = 30000
testUpdateConnectionsCount = 20000
testNewConnectionsCount = 4000
testDyingConnectionsCount = 1000
testZoneFilter = uint16(65520)
testDuration = 15 * time.Second
testIdleTimeOut = 5 * time.Second
testPerfActiveFlowTimeout = 2 * time.Second
testPerfIdleFlowTimeout = 1 * time.Second
)

var count = 0
var exp *flowExporter

func BenchmarkExport(b *testing.B) {
disableLogToStderr()
ctrl := gomock.NewController(b)
defer ctrl.Finish()

registry := ipfix.NewIPFIXRegistry()
registry.LoadRegistry()
stopChan1 := make(chan struct{})
stopChan2 := make(chan struct{})
listener := startLocalServer(b, stopChan1, stopChan2)

conns := getConnections(testConnectionsCount)
records := flowrecords.NewFlowRecords()
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
mockProxier := proxytest.NewMockProxier(ctrl)
npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl)
connStore := connections.NewConnectionStore(mockConnDumper, records, mockIfaceStore, true, false, mockProxier, npQuerier, testPollInterval)
var err error
exp, err = NewFlowExporter(connStore, records, listener.String(), listener.Network(), testPerfActiveFlowTimeout, testPerfIdleFlowTimeout, false, true, false, nil, nil, false)
if err != nil {
b.Errorf("error in starting flow exporter: %v", err)
}
// Generate connections for Dumpflows, each round with update connections, new connections and dying connections
mockConnDumper.EXPECT().DumpFlows(testZoneFilter).Return(conns, testConnectionsCount, nil).Times(1)
for i := 0; i < 5; i++ {
conns = getUpdateConnections(conns, testNewConnectionsCount, testUpdateConnectionsCount, testDyingConnectionsCount, i)
mockConnDumper.EXPECT().DumpFlows(testZoneFilter).Return(conns, len(conns), nil).Times(1)
}
mockConnDumper.EXPECT().DumpFlows(testZoneFilter).Return(conns, len(conns), nil).AnyTimes()
mockConnDumper.EXPECT().GetMaxConnections().Return(testConnectionsCount, nil).AnyTimes()
mockIfaceStore.EXPECT().GetInterfaceByIP(gomock.Any()).Return(nil, false).AnyTimes()

var maxAlloc uint64
go statMaxMemAlloc(&maxAlloc, 500*time.Millisecond, stopChan2)
go connStore.Run(stopChan1)
go exp.Run(stopChan2)
<-stopChan2
b.Logf(`Summary metrics:
TOTAL_CONNECTIONS INIT_CONNECTIONS UPDATE_CONNECTIONS/poll NEW_CONNECTIONS/poll DYING_CONNECTIONS/poll POLL_INTERVAL(s) MEMORY(M)
%d %d %d %d %d %d %-12d
`, count, testConnectionsCount, testUpdateConnectionsCount, testNewConnectionsCount, testDyingConnectionsCount, testPollInterval/1000000, maxAlloc/1024/1024)
}

func startLocalServer(b *testing.B, stopChan1, stopChan2 chan struct{}) (address net.Addr) {
udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
if err != nil {
b.Fatalf("Got error when resolving UDP address: %v", err)
}
conn, err := net.ListenUDP("udp", udpAddr)
if err != nil {
b.Fatalf("Got error when creating a local server: %v", err)
}
if err != nil {
b.Fatalf("Got error when creating a local server: %v", err)
}
prevCount := 0
ticker := time.NewTicker(testIdleTimeOut)
go func() {
defer conn.Close()
for {
buff := make([]byte, 198)
_, _, err := conn.ReadFromUDP(buff)
if err != nil {
b.Error(err)
return
}
count++
}
}()
go func() {
time.Sleep(testDuration)
close(stopChan1)
for {
select {
case <-ticker.C:
if count > prevCount {
prevCount = count
continue
} else {
exp.process.CloseConnToCollector()
close(stopChan2)
}
}
}
}()
return conn.LocalAddr()
}

func getConnections(numberOfConn int) []*flowexporter.Connection {
connections := make([]*flowexporter.Connection, numberOfConn)
for i := 0; i < numberOfConn; i++ {
randomNum := rand.Intn(255)
src := net.ParseIP(fmt.Sprintf("192.168.0.%d", randomNum))
dst := net.ParseIP(fmt.Sprintf("192.169.0.%d", randomNum))
tuple, revTuple := makeTuple(&src, &dst, 6, uint16(i), uint16(i))
conn := &flowexporter.Connection{
StartTime: time.Now().Add(-time.Duration(randomNum) * time.Millisecond),
StopTime: time.Now().Add(-time.Duration(randomNum) * time.Millisecond),
IsPresent: true,
DoneExport: false,
TupleOrig: tuple,
TupleReply: revTuple,
OriginalPackets: 100,
OriginalBytes: 10,
ReversePackets: 50,
ReverseBytes: 5,
SourcePodNamespace: "ns1",
SourcePodName: "pod1",
DestinationPodNamespace: "ns2",
DestinationPodName: "pod2",
DestinationServicePortName: "service",
TCPState: "SYN_SENT",
}
connections[i] = conn
}
return connections
}

func getUpdateConnections(conns []*flowexporter.Connection, numOfNewConn int, numOfUpdateConn int, numOfDyingConn int, index int) []*flowexporter.Connection {
currentTime := time.Now().Add(time.Duration(index) * time.Second)
connections := make([]*flowexporter.Connection, 0)
randomNum := rand.Intn(len(conns) - numOfUpdateConn)

// update connections and delete connections in dying state
for i := 0; i < randomNum; i++ {
if conns[i].TCPState == "TIME_WAIT" {
continue
}
conn := *conns[i]
connections = append(connections, &conn)
}
for i := randomNum; i < numOfUpdateConn+randomNum; i++ {
if conns[i].TCPState == "TIME_WAIT" {
continue
}
conn := *conns[i]
conn.OriginalPackets += 10
conn.OriginalBytes += 10
conn.ReversePackets += 5
conn.ReverseBytes += 5
rand := rand.Intn(1000)
conn.StopTime = currentTime.Add(-time.Duration(rand) * time.Millisecond)
connections = append(connections, &conn)
}
for i := numOfUpdateConn + randomNum; i < len(conns); i++ {
if conns[i].TCPState == "TIME_WAIT" {
continue
}
conn := *conns[i]
connections = append(connections, &conn)
}

// add new connections
for i := 0; i < numOfNewConn; i++ {
randomNum1 := rand.Intn(255)
randomNum2 := rand.Intn(255)
randomNum3 := rand.Intn(255)
src := net.ParseIP(fmt.Sprintf("192.%d.%d.%d", randomNum1, randomNum2, randomNum3))
dst := net.ParseIP(fmt.Sprintf("192.%d.%d.%d", randomNum1, randomNum2, randomNum3))
randomPort := rand.Intn(90000)
tuple, revTuple := makeTuple(&src, &dst, 6, uint16(randomPort), uint16(randomPort))
conn := &flowexporter.Connection{
StartTime: currentTime.Add(-time.Duration(randomNum1) * time.Millisecond),
StopTime: currentTime.Add(-time.Duration(randomNum1) * time.Millisecond),
IsPresent: true,
DoneExport: false,
TupleOrig: tuple,
TupleReply: revTuple,
OriginalPackets: 100,
OriginalBytes: 10,
ReversePackets: 50,
ReverseBytes: 5,
SourcePodNamespace: "ns1",
SourcePodName: "pod1",
DestinationPodNamespace: "ns2",
DestinationPodName: "pod2",
DestinationServicePortName: "service",
TCPState: "SYN_SENT",
}
connections = append(connections, conn)
}

// change connections to dying state, which will be deleted in next round
randomNum = rand.Intn(len(connections) - numOfDyingConn)
for i := randomNum; i < numOfDyingConn+randomNum; i++ {
connections[i].TCPState = "TIME_WAIT"
}
return connections
}

func statMaxMemAlloc(maxAlloc *uint64, interval time.Duration, stopCh chan struct{}) {
var memStats goruntime.MemStats
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
goruntime.ReadMemStats(&memStats)
if memStats.Alloc > *maxAlloc {
*maxAlloc = memStats.Alloc
}
case <-stopCh:
return
}
}
}

func disableLogToStderr() {
klogFlagSet := flag.NewFlagSet("klog", flag.ContinueOnError)
klog.InitFlags(klogFlagSet)
klogFlagSet.Parse([]string{"-logtostderr=false"})
}
25 changes: 21 additions & 4 deletions pkg/agent/flowexporter/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,23 +98,20 @@ func sendTemplateSet(t *testing.T, ctrl *gomock.Controller, mockIPFIXExpProc *ip

// Following consists of all elements that are in IANAInfoElements and AntreaInfoElements (globals)
// Only the element name is needed, other arguments have dummy values.
var elemList = make([]*ipfixentities.InfoElementWithValue, 0)
elemList := getElementList(isIPv6)
ianaIE := IANAInfoElementsIPv4
antreaIE := AntreaInfoElementsIPv4
if isIPv6 {
ianaIE = IANAInfoElementsIPv6
antreaIE = AntreaInfoElementsIPv6
}
for i, ie := range ianaIE {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAEnterpriseID).Return(elemList[i].Element, nil)
}
for i, ie := range IANAReverseInfoElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAReversedEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAReversedEnterpriseID).Return(elemList[i+len(ianaIE)].Element, nil)
}
for i, ie := range antreaIE {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaIE)+len(IANAReverseInfoElements)].Element, nil)
}
if !isIPv6 {
Expand All @@ -135,6 +132,26 @@ func sendTemplateSet(t *testing.T, ctrl *gomock.Controller, mockIPFIXExpProc *ip
assert.Len(t, eL, len(ianaIE)+len(IANAReverseInfoElements)+len(antreaIE), "flowExp.elementsList and template record should have same number of elements")
}

func getElementList(isIPv6 bool) []*ipfixentities.InfoElementWithValue {
elemList := make([]*ipfixentities.InfoElementWithValue, 0)
ianaIE := IANAInfoElementsIPv4
antreaIE := AntreaInfoElementsIPv4
if isIPv6 {
ianaIE = IANAInfoElementsIPv6
antreaIE = AntreaInfoElementsIPv6
}
for _, ie := range ianaIE {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAEnterpriseID, 0), nil))
}
for _, ie := range IANAReverseInfoElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAReversedEnterpriseID, 0), nil))
}
for _, ie := range antreaIE {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
}
return elemList
}

// TestFlowExporter_sendDataRecord tests essentially if element names in the switch-case matches globals
// IANAInfoElements and AntreaInfoElements.
func TestFlowExporter_sendDataSet(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/flowexporter/flowrecords/flow_records.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (fr *FlowRecords) ForAllFlowRecordsDo(callback flowexporter.FlowRecordCallB
for k, v := range fr.recordsMap {
err := callback(k, v)
if err != nil {
klog.Errorf("Error when executing callback for flow record")
klog.Errorf("Error when executing callback for flow record: %v", err)
return err
}
}
Expand Down

0 comments on commit 6604dbd

Please sign in to comment.