Skip to content

Commit

Permalink
add performance unit test for Flow Exporter
Browse files Browse the repository at this point in the history
This commit adds performance benchmarking for Flow Exporter. It
evaluates Export() function under different number of conntrack
connections, dying connections, idle records, deny connections and
idle deny connections. A local server will receive the records and
count number. It also evaluates Poll() for adding and updating
connections. CPU and memory profile is collected and visualized
using pprof.

Also from benchmarking, we discovered and removed redundant calls
like GetNodeName(), which is called every time when exporting a
record and ResetConnStatsWithoutLock, which unnecessarily calls
NewConnectionKey each time.

Signed-off-by: zyiou <[email protected]>
  • Loading branch information
zyiou committed Jun 30, 2021
1 parent 4405aaa commit f7ecb65
Show file tree
Hide file tree
Showing 7 changed files with 499 additions and 52 deletions.
8 changes: 8 additions & 0 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ func (cs *connectionStore) ForAllConnectionsDo(callback flowexporter.ConnectionM
return nil
}

// AddConnToMap adds the connection to connections map given connection key.
// This is used only for unit tests.
func (cs *connectionStore) AddConnToMap(connKey *flowexporter.ConnectionKey, conn *flowexporter.Connection) {
cs.mutex.Lock()
defer cs.mutex.Unlock()
cs.connections[*connKey] = conn
}

func (cs *connectionStore) fillPodInfo(conn *flowexporter.Connection) {
if cs.ifaceStore == nil {
klog.V(4).Info("Interface store is not available to retrieve local Pods information.")
Expand Down
176 changes: 176 additions & 0 deletions pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// +build !race

// Copyright 2021 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package connections

import (
"crypto/rand"
"flag"
"fmt"
"math/big"
"net"
"testing"
"time"

"github.com/golang/mock/gomock"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/flowexporter"
connectionstest "antrea.io/antrea/pkg/agent/flowexporter/connections/testing"
"antrea.io/antrea/pkg/agent/flowexporter/flowrecords"
"antrea.io/antrea/pkg/agent/interfacestore"
interfacestoretest "antrea.io/antrea/pkg/agent/interfacestore/testing"
"antrea.io/antrea/pkg/agent/openflow"
proxytest "antrea.io/antrea/pkg/agent/proxy/testing"
queriertest "antrea.io/antrea/pkg/querier/testing"
k8sproxy "antrea.io/antrea/third_party/proxy"
)

const (
testNumOfConns = 10000
testNumOfNewConns = 1000
testNumOfDeletedConns = 1000
)

/*
Sample output (10000 init connections, 1000 new connections, 1000 deleted connections):
go test -test.v -run=BenchmarkPoll -test.benchmem -bench=. -memprofile memprofile.out -cpuprofile profile.out
goos: linux
goarch: amd64
pkg: antrea.io/antrea/pkg/agent/flowexporter/connections
cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
BenchmarkPoll
BenchmarkPoll-2 116 9068998 ns/op 889713 B/op 54458 allocs/op
PASS
ok antrea.io/antrea/pkg/agent/flowexporter/connections 3.618s
*/

func BenchmarkPoll(b *testing.B) {
disableLogToStderr()
connStore, mockConnDumper := setupConntrackConnStore(b)
conns := generateConns()
b.ResetTimer()
for n := 0; n < b.N; n++ {
mockConnDumper.EXPECT().DumpFlows(uint16(openflow.CtZone)).Return(conns, testNumOfConns, nil)
connStore.Poll()
b.StopTimer()
conns = generateUpdatedConns(conns)
b.StartTimer()
}
b.Logf("\nSummary:\nNumber of initial connections: %d\nNumber of new connections/poll: %d\nNumber of deleted connections/poll: %d\n", testNumOfConns, testNumOfNewConns, testNumOfDeletedConns)
}

func setupConntrackConnStore(b *testing.B) (*ConntrackConnectionStore, *connectionstest.MockConnTrackDumper) {
ctrl := gomock.NewController(b)
defer ctrl.Finish()
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
testInterface := &interfacestore.InterfaceConfig{
Type: interfacestore.ContainerInterface,
ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{
PodName: "pod",
PodNamespace: "pod-ns",
},
}
mockIfaceStore.EXPECT().GetInterfaceByIP(gomock.Any()).Return(testInterface, true).AnyTimes()

mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
mockConnDumper.EXPECT().GetMaxConnections().Return(100000, nil).AnyTimes()

serviceStr := "10.0.0.1:30000/TCP"
servicePortName := k8sproxy.ServicePortName{
NamespacedName: types.NamespacedName{
Namespace: "serviceNS1",
Name: "service1",
},
Port: "30000",
Protocol: v1.ProtocolTCP,
}
mockProxier := proxytest.NewMockProxier(ctrl)
mockProxier.EXPECT().GetServiceByIP(serviceStr).Return(servicePortName, true).AnyTimes()

npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl)

return NewConntrackConnectionStore(mockConnDumper, flowrecords.NewFlowRecords(), mockIfaceStore, true, false, mockProxier, npQuerier, testPollInterval), mockConnDumper
}

func generateConns() []*flowexporter.Connection {
conns := make([]*flowexporter.Connection, testNumOfConns)
for i := 0; i < testNumOfConns; i++ {
conns[i] = getNewConn()
}
return conns
}

func generateUpdatedConns(conns []*flowexporter.Connection) []*flowexporter.Connection {
length := len(conns) - testNumOfDeletedConns + testNumOfNewConns
updatedConns := make([]*flowexporter.Connection, length)
for i := 0; i < len(conns); i++ {
// replace deleted connection with new connection
if conns[i].DoneExport == true {
conns[i] = getNewConn()
} else { // update rest of connections
conns[i].OriginalPackets += 5
conns[i].OriginalBytes += 20
conns[i].ReversePackets += 2
conns[i].ReverseBytes += 10
}
updatedConns[i] = conns[i]
}
for i := len(conns); i < length; i++ {
updatedConns[i] = getNewConn()
}
randomNum := getRandomNum(int64(length - testNumOfDeletedConns))
for i := randomNum; i < testNumOfDeletedConns+randomNum; i++ {
updatedConns[i].DoneExport = true
}
return updatedConns
}

func getNewConn() *flowexporter.Connection {
randomNum1 := getRandomNum(255)
randomNum2 := getRandomNum(255)
randomNum3 := getRandomNum(255)
src := net.ParseIP(fmt.Sprintf("192.%d.%d.%d", randomNum1, randomNum2, randomNum3))
dst := net.ParseIP(fmt.Sprintf("192.%d.%d.%d", randomNum3, randomNum2, randomNum1))
flowKey := flowexporter.Tuple{SourceAddress: src, DestinationAddress: dst, Protocol: 6, SourcePort: uint16(randomNum1), DestinationPort: uint16(randomNum2)}
return &flowexporter.Connection{
StartTime: time.Now().Add(-time.Duration(randomNum1) * time.Second),
StopTime: time.Now(),
IsPresent: true,
DoneExport: false,
FlowKey: flowKey,
OriginalPackets: 10,
OriginalBytes: 100,
ReversePackets: 5,
ReverseBytes: 50,
DestinationServiceAddress: net.ParseIP("10.0.0.1"),
DestinationServicePort: 30000,
TCPState: "SYN_SENT",
}
}

func getRandomNum(value int64) uint64 {
number, _ := rand.Int(rand.Reader, big.NewInt(value))
return number.Uint64()
}

func disableLogToStderr() {
klogFlagSet := flag.NewFlagSet("klog", flag.ContinueOnError)
klog.InitFlags(klogFlagSet)
klogFlagSet.Parse([]string{"-logtostderr=false"})
}
3 changes: 1 addition & 2 deletions pkg/agent/flowexporter/connections/deny_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ func (ds *DenyConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection, ti

// ResetConnStatsWithoutLock resets DeltaBytes and DeltaPackets of connection
// after exporting without grabbing the lock. Caller is expected to grab lock.
func (ds *DenyConnectionStore) ResetConnStatsWithoutLock(conn *flowexporter.Connection) {
connKey := flowexporter.NewConnectionKey(conn)
func (ds *DenyConnectionStore) ResetConnStatsWithoutLock(connKey flowexporter.ConnectionKey) {
conn, exist := ds.connections[connKey]
if !exist {
klog.Warningf("Connection with key %s does not exist in deny connection store.", connKey)
Expand Down
39 changes: 16 additions & 23 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,26 +104,20 @@ type flowExporter struct {
k8sClient kubernetes.Interface
nodeRouteController *noderoute.Controller
isNetworkPolicyOnly bool
nodeName string
}

func genObservationID() (uint32, error) {
name, err := env.GetNodeName()
if err != nil {
return 0, err
}
func genObservationID(nodeName string) uint32 {
h := fnv.New32()
h.Write([]byte(name))
return h.Sum32(), nil
h.Write([]byte(nodeName))
return h.Sum32()
}

func prepareExporterInputArgs(collectorAddr, collectorProto string) (exporter.ExporterInput, error) {
func prepareExporterInputArgs(collectorAddr, collectorProto, nodeName string) exporter.ExporterInput {
expInput := exporter.ExporterInput{}
var err error
// Exporting process requires domain observation ID.
expInput.ObservationDomainID, err = genObservationID()
if err != nil {
return expInput, err
}
expInput.ObservationDomainID = genObservationID(nodeName)

expInput.CollectorAddress = collectorAddr
if collectorProto == "tls" {
expInput.IsEncrypted = true
Expand All @@ -134,7 +128,7 @@ func prepareExporterInputArgs(collectorAddr, collectorProto string) (exporter.Ex
}
expInput.PathMTU = 0

return expInput, nil
return expInput
}

func NewFlowExporter(connStore *connections.ConntrackConnectionStore, records *flowrecords.FlowRecords, denyConnStore *connections.DenyConnectionStore,
Expand All @@ -146,10 +140,11 @@ func NewFlowExporter(connStore *connections.ConntrackConnectionStore, records *f
registry.LoadRegistry()

// Prepare input args for IPFIX exporting process.
expInput, err := prepareExporterInputArgs(collectorAddr, collectorProto)
nodeName, err := env.GetNodeName()
if err != nil {
return nil, err
}
expInput := prepareExporterInputArgs(collectorAddr, collectorProto, nodeName)

return &flowExporter{
conntrackConnStore: connStore,
Expand All @@ -165,6 +160,7 @@ func NewFlowExporter(connStore *connections.ConntrackConnectionStore, records *f
k8sClient: k8sClient,
nodeRouteController: nodeRouteController,
isNetworkPolicyOnly: isNetworkPolicyOnly,
nodeName: nodeName,
}, nil
}

Expand Down Expand Up @@ -334,7 +330,7 @@ func (exp *flowExporter) sendFlowRecords() error {
return err
}
exp.numDataSetsSent = exp.numDataSetsSent + 1
exp.denyConnStore.ResetConnStatsWithoutLock(conn)
exp.denyConnStore.ResetConnStatsWithoutLock(connKey)
}
if time.Since(conn.LastExportTime) >= exp.idleFlowTimeout {
if err := exp.addDenyConnToSet(conn, ipfixregistry.IdleTimeoutReason); err != nil {
Expand Down Expand Up @@ -414,8 +410,6 @@ func (exp *flowExporter) sendTemplateSet(isIPv6 bool) (int, error) {
}

func (exp *flowExporter) addRecordToSet(record flowexporter.FlowRecord) error {
nodeName, _ := env.GetNodeName()

// Iterate over all infoElements in the list
eL := exp.elementsListv4
if record.IsIPv6 {
Expand Down Expand Up @@ -488,7 +482,7 @@ func (exp *flowExporter) addRecordToSet(record flowexporter.FlowRecord) error {
case "sourceNodeName":
// Add nodeName for only local pods whose pod names are resolved.
if record.Conn.SourcePodName != "" {
ie.Value = nodeName
ie.Value = exp.nodeName
} else {
ie.Value = ""
}
Expand All @@ -499,7 +493,7 @@ func (exp *flowExporter) addRecordToSet(record flowexporter.FlowRecord) error {
case "destinationNodeName":
// Add nodeName for only local pods whose pod names are resolved.
if record.Conn.DestinationPodName != "" {
ie.Value = nodeName
ie.Value = exp.nodeName
} else {
ie.Value = ""
}
Expand Down Expand Up @@ -570,7 +564,6 @@ func (exp *flowExporter) addRecordToSet(record flowexporter.FlowRecord) error {
}

func (exp *flowExporter) addDenyConnToSet(conn *flowexporter.Connection, flowEndReason uint8) error {
nodeName, _ := env.GetNodeName()
exp.ipfixSet.ResetSet()

eL := exp.elementsListv4
Expand Down Expand Up @@ -622,7 +615,7 @@ func (exp *flowExporter) addDenyConnToSet(conn *flowexporter.Connection, flowEnd
case "sourceNodeName":
// Add nodeName for only local pods whose pod names are resolved.
if conn.SourcePodName != "" {
ie.Value = nodeName
ie.Value = exp.nodeName
} else {
ie.Value = ""
}
Expand All @@ -633,7 +626,7 @@ func (exp *flowExporter) addDenyConnToSet(conn *flowexporter.Connection, flowEnd
case "destinationNodeName":
// Add nodeName for only local pods whose pod names are resolved.
if conn.DestinationPodName != "" {
ie.Value = nodeName
ie.Value = exp.nodeName
} else {
ie.Value = ""
}
Expand Down
Loading

0 comments on commit f7ecb65

Please sign in to comment.