Skip to content

Commit

Permalink
add performance unit test for Flow Exporter
Browse files Browse the repository at this point in the history
Signed-off-by: zyiou <[email protected]>
  • Loading branch information
zyiou committed Jun 3, 2021
1 parent 25960a1 commit 6838a82
Show file tree
Hide file tree
Showing 5 changed files with 296 additions and 30 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,5 @@ replace (
// antrea/plugins/octant/go.mod also has this replacement since replace statement in dependencies
// were ignored. We need to change antrea/plugins/octant/go.mod if there is any change here.
github.com/contiv/ofnet => github.com/wenyingd/ofnet v0.0.0-20210318032909-171b6795a2da
github.com/vmware/go-ipfix => github.com/zyiou/go-ipfix v0.1.2-0.20210602035113-6e8071765ec2
)
8 changes: 5 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.5.0 h1:jlYHihg//f7RRwuPfptm04yp4s7O6Kw8EZiVYIGcH0g=
github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down Expand Up @@ -597,8 +598,6 @@ github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYp
github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df h1:OviZH7qLw/7ZovXvuNyL3XQl8UFofeikI1NW1Gypu7k=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
github.com/vmware/go-ipfix v0.5.2 h1:5VIW+HoJoq2i/mhN8ErNSYXNpjbtALImLIH72xh2Afc=
github.com/vmware/go-ipfix v0.5.2/go.mod h1:rMDcGc5pEmG+wT2grK5ZgvsF1EOdxqHDnNkB/kFJT78=
github.com/wenyingd/ofnet v0.0.0-20210318032909-171b6795a2da h1:ragN21nQa4zKuCwR2UEbTXEAh3L2YN/Id5SCVkjjwdY=
github.com/wenyingd/ofnet v0.0.0-20210318032909-171b6795a2da/go.mod h1:8mMMWAYBNUeTGXYKizOLETfN3WIbu3P5DgvS2jiXKdI=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
Expand All @@ -609,6 +608,8 @@ github.com/xlab/treeprint v0.0.0-20181112141820-a009c3971eca/go.mod h1:ce1O1j6Ut
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/zyiou/go-ipfix v0.1.2-0.20210602035113-6e8071765ec2 h1:PienAu2x60V6z4GbLPcpYAl5ny512KDnGHxaqclJuQo=
github.com/zyiou/go-ipfix v0.1.2-0.20210602035113-6e8071765ec2/go.mod h1:SF6BrZTPvoVdzgmjJvshoegBVbicn4xWlkoCNADab6E=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0=
Expand Down Expand Up @@ -905,8 +906,9 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1 h1:7QnIQpGRHE5RnLKnESfDoxm2dTapTZua5a0kS0A+VXQ=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
264 changes: 264 additions & 0 deletions pkg/agent/flowexporter/exporter/exporter_perf_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
// +build !race

// Copyright 2021 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package exporter

import (
"crypto/rand"
"flag"
"fmt"
"math/big"
"net"
goruntime "runtime"
"testing"
"time"

"github.com/golang/mock/gomock"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/flowexporter"
"antrea.io/antrea/pkg/agent/flowexporter/connections"
connectionstest "antrea.io/antrea/pkg/agent/flowexporter/connections/testing"
"antrea.io/antrea/pkg/agent/flowexporter/flowrecords"
interfacestoretest "antrea.io/antrea/pkg/agent/interfacestore/testing"
proxytest "antrea.io/antrea/pkg/agent/proxy/testing"
"antrea.io/antrea/pkg/ipfix"
queriertest "antrea.io/antrea/pkg/querier/testing"
)

const (
testPollInterval = 5 * time.Second
testConnectionsCount = 30000
testNewConnectionsCount = 10000
testDyingConnectionsCount = 5000
testZoneFilter = uint16(65520)
testDuration = 1 * time.Minute
testIdleTimeOut = 5 * time.Second
testPerfActiveFlowTimeout = 10 * time.Second
testPerfIdleFlowTimeout = 5 * time.Second
testPollCycle = testDuration / testPollInterval
)

var count = 0
var exp *flowExporter

func BenchmarkExport(b *testing.B) {
disableLogToStderr()
ctrl := gomock.NewController(b)
defer ctrl.Finish()

registry := ipfix.NewIPFIXRegistry()
registry.LoadRegistry()
stopChan1 := make(chan struct{})
stopChan2 := make(chan struct{})
listener := startLocalServer(b, stopChan1, stopChan2)

conns := getConnections(b, testConnectionsCount)
records := flowrecords.NewFlowRecords()
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
mockProxier := proxytest.NewMockProxier(ctrl)
npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl)
conntrackStore := connections.NewConntrackConnectionStore(mockConnDumper, records, mockIfaceStore, true, false, mockProxier, npQuerier, testPollInterval)
denyConnStore := connections.NewDenyConnectionStore(mockIfaceStore, mockProxier)
var err error
exp, err = NewFlowExporter(conntrackStore, records, denyConnStore, listener.String(), listener.Network(), testPerfActiveFlowTimeout, testPerfIdleFlowTimeout, true, false, nil, nil, false)
if err != nil {
b.Errorf("error in starting flow exporter: %v", err)
}
// Generate connections for Dumpflows, each round with update connections, new connections and dying connections
mockConnDumper.EXPECT().DumpFlows(testZoneFilter).Return(conns, testConnectionsCount, nil).Times(1)
for i := 0; i < int(testPollCycle); i++ {
conns = getUpdateConnections(b, conns, testNewConnectionsCount, testDyingConnectionsCount, i)
mockConnDumper.EXPECT().DumpFlows(testZoneFilter).Return(conns, len(conns), nil).Times(1)
}
conns = make([]*flowexporter.Connection, 0)
mockConnDumper.EXPECT().DumpFlows(testZoneFilter).Return(conns, len(conns), nil).AnyTimes()
mockConnDumper.EXPECT().GetMaxConnections().Return(testConnectionsCount, nil).AnyTimes()
mockIfaceStore.EXPECT().GetInterfaceByIP(gomock.Any()).Return(nil, false).AnyTimes()

var maxAlloc uint64
go statMaxMemAlloc(&maxAlloc, 500*time.Millisecond, stopChan2)
go conntrackStore.Run(stopChan1)
go exp.Run(stopChan2)
<-stopChan2
b.Logf("\nSummary metrics:\nTOTAL_CONNECTIONS: %d\nINIT_CONNECTIONS: %d\nNEW_CONNECTIONS/poll: %d\nDYING_CONNECTIONS/poll: %d\nPOLL_INTERVAL(s):%d\nMEMORY(M): %d\n", count, testConnectionsCount, testNewConnectionsCount, testDyingConnectionsCount, testPollInterval/1000000000, maxAlloc/1024/1024)
}

func startLocalServer(b *testing.B, stopChan1, stopChan2 chan struct{}) (address net.Addr) {
udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
if err != nil {
b.Fatalf("Got error when resolving UDP address: %v", err)
}
conn, err := net.ListenUDP("udp", udpAddr)
if err != nil {
b.Fatalf("Got error when creating a local server: %v", err)
}
if err != nil {
b.Fatalf("Got error when creating a local server: %v", err)
}
prevCount := 0
ticker := time.NewTicker(testIdleTimeOut)
go func() {
defer conn.Close()
for {
buff := make([]byte, 272)
_, _, err := conn.ReadFromUDP(buff)
if err != nil {
b.Error(err)
return
}
count++
}
}()
go func() {
time.Sleep(testDuration)
close(stopChan1)
for {
select {
case <-ticker.C:
if count > prevCount {
prevCount = count
continue
} else {
exp.process.CloseConnToCollector()
close(stopChan2)
}
}
}
}()
return conn.LocalAddr()
}

func getConnections(b *testing.B, numberOfConn int) []*flowexporter.Connection {
connections := make([]*flowexporter.Connection, numberOfConn)
for i := 0; i < numberOfConn; i++ {
randomNum := getRandomNum(b, 255)
src := net.ParseIP(fmt.Sprintf("192.168.0.%d", randomNum))
dst := net.ParseIP(fmt.Sprintf("192.169.0.%d", randomNum))
flowKey := flowexporter.Tuple{SourceAddress: src, DestinationAddress: dst, Protocol: 6, SourcePort: uint16(i), DestinationPort: uint16(i)}
conn := &flowexporter.Connection{
StartTime: time.Now().Add(-time.Duration(randomNum) * time.Millisecond),
StopTime: time.Now().Add(-time.Duration(randomNum) * time.Millisecond),
LastExportTime: time.Now().Add(-time.Duration(randomNum) * time.Millisecond),
IsPresent: true,
DoneExport: false,
FlowKey: flowKey,
OriginalPackets: 100,
OriginalBytes: 10,
ReversePackets: 50,
ReverseBytes: 5,
SourcePodNamespace: "ns1",
SourcePodName: "pod1",
DestinationPodNamespace: "ns2",
DestinationPodName: "pod2",
DestinationServicePortName: "service",
DestinationServiceAddress: net.ParseIP("0.0.0.0"),
TCPState: "SYN_SENT",
}
connections[i] = conn
}
return connections
}

func getUpdateConnections(b *testing.B, conns []*flowexporter.Connection, numOfNewConn int, numOfDyingConn int, index int) []*flowexporter.Connection {
currentTime := time.Now().Add(time.Duration(index) * time.Second)
connections := make([]*flowexporter.Connection, 0)

// update connections and delete connections in dying state
for i := 0; i < len(conns); i++ {
if conns[i].TCPState == "TIME_WAIT" {
continue
}
conn := *conns[i]
conn.OriginalPackets += 10
conn.OriginalBytes += 10
conn.ReversePackets += 5
conn.ReverseBytes += 5
randomNum := getRandomNum(b, 1000)
conn.StopTime = currentTime.Add(-time.Duration(randomNum) * time.Millisecond)
connections = append(connections, &conn)
}

// add new connections
for i := 0; i < numOfNewConn; i++ {
randomNum1 := getRandomNum(b, 255)
randomNum2 := getRandomNum(b, 255)
randomNum3 := getRandomNum(b, 255)
src := net.ParseIP(fmt.Sprintf("192.%d.%d.%d", randomNum1, randomNum2, randomNum3))
dst := net.ParseIP(fmt.Sprintf("192.%d.%d.%d", randomNum1, randomNum2, randomNum3))
randomPort := getRandomNum(b, 90000)
flowKey := flowexporter.Tuple{SourceAddress: src, DestinationAddress: dst, Protocol: 6, SourcePort: uint16(randomPort), DestinationPort: uint16(randomPort)}
conn := &flowexporter.Connection{
StartTime: currentTime.Add(-time.Duration(randomNum1) * time.Millisecond),
StopTime: currentTime.Add(-time.Duration(randomNum1) * time.Millisecond),
LastExportTime: currentTime.Add(-time.Duration(randomNum1) * time.Millisecond),
IsPresent: true,
DoneExport: false,
FlowKey: flowKey,
OriginalPackets: 100,
OriginalBytes: 10,
ReversePackets: 50,
ReverseBytes: 5,
SourcePodNamespace: "ns1",
SourcePodName: "pod1",
DestinationPodNamespace: "ns2",
DestinationPodName: "pod2",
DestinationServicePortName: "service",
DestinationServiceAddress: net.ParseIP("0.0.0.0"),
TCPState: "SYN_SENT",
}
connections = append(connections, conn)
}

// change connections to dying state, which will be deleted in next round
randomNum := int(getRandomNum(b, int64(len(connections)-numOfDyingConn)))
for i := randomNum; i < numOfDyingConn+randomNum; i++ {
connections[i].TCPState = "TIME_WAIT"
}
return connections
}

func statMaxMemAlloc(maxAlloc *uint64, interval time.Duration, stopCh chan struct{}) {
var memStats goruntime.MemStats
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
goruntime.ReadMemStats(&memStats)
if memStats.Alloc > *maxAlloc {
*maxAlloc = memStats.Alloc
}
case <-stopCh:
return
}
}
}

func getRandomNum(b *testing.B, value int64) uint64 {
number, err := rand.Int(rand.Reader, big.NewInt(value))
if err != nil {
b.Errorf("error when generating random number: %v", err)
}
return number.Uint64()
}

func disableLogToStderr() {
klogFlagSet := flag.NewFlagSet("klog", flag.ContinueOnError)
klog.InitFlags(klogFlagSet)
klogFlagSet.Parse([]string{"-logtostderr=false"})
}
Loading

0 comments on commit 6838a82

Please sign in to comment.