Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add performance unit test for Flow Exporter #2129

Merged
merged 1 commit into from
Jul 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ func (cs *connectionStore) ForAllConnectionsDo(callback flowexporter.ConnectionM
return nil
}

// AddConnToMap adds the connection to connections map given connection key.
// This is used only for unit tests.
func (cs *connectionStore) AddConnToMap(connKey *flowexporter.ConnectionKey, conn *flowexporter.Connection) {
cs.mutex.Lock()
defer cs.mutex.Unlock()
cs.connections[*connKey] = conn
}

func (cs *connectionStore) fillPodInfo(conn *flowexporter.Connection) {
if cs.ifaceStore == nil {
klog.V(4).Info("Interface store is not available to retrieve local Pods information.")
Expand Down
178 changes: 178 additions & 0 deletions pkg/agent/flowexporter/connections/conntrack_connections_perf_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
// +build !race

// Copyright 2021 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package connections

import (
"crypto/rand"
"flag"
"fmt"
"math/big"
"net"
"testing"
"time"

"github.com/golang/mock/gomock"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/flowexporter"
connectionstest "antrea.io/antrea/pkg/agent/flowexporter/connections/testing"
"antrea.io/antrea/pkg/agent/flowexporter/flowrecords"
"antrea.io/antrea/pkg/agent/interfacestore"
interfacestoretest "antrea.io/antrea/pkg/agent/interfacestore/testing"
"antrea.io/antrea/pkg/agent/openflow"
proxytest "antrea.io/antrea/pkg/agent/proxy/testing"
queriertest "antrea.io/antrea/pkg/querier/testing"
k8sproxy "antrea.io/antrea/third_party/proxy"
)

const (
testNumOfConns = 10000
testNumOfNewConns = 1000
testNumOfDeletedConns = 1000
)

/*
Sample output (10000 init connections, 1000 new connections, 1000 deleted connections):
go test -test.v -run=BenchmarkPoll -test.benchmem -bench=. -memprofile memprofile.out -cpuprofile profile.out
goos: linux
goarch: amd64
pkg: antrea.io/antrea/pkg/agent/flowexporter/connections
cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
BenchmarkPoll
BenchmarkPoll-2 116 9068998 ns/op 889713 B/op 54458 allocs/op
PASS
ok antrea.io/antrea/pkg/agent/flowexporter/connections 3.618s
*/

func BenchmarkPoll(b *testing.B) {
disableLogToStderr()
connStore, mockConnDumper := setupConntrackConnStore(b)
conns := generateConns()
b.ResetTimer()
for n := 0; n < b.N; n++ {
mockConnDumper.EXPECT().DumpFlows(uint16(openflow.CtZone)).Return(conns, testNumOfConns, nil)
connStore.Poll()
b.StopTimer()
conns = generateUpdatedConns(conns)
b.StartTimer()
}
b.Logf("\nSummary:\nNumber of initial connections: %d\nNumber of new connections/poll: %d\nNumber of deleted connections/poll: %d\n", testNumOfConns, testNumOfNewConns, testNumOfDeletedConns)
}

func setupConntrackConnStore(b *testing.B) (*ConntrackConnectionStore, *connectionstest.MockConnTrackDumper) {
ctrl := gomock.NewController(b)
defer ctrl.Finish()
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
testInterface := &interfacestore.InterfaceConfig{
Type: interfacestore.ContainerInterface,
ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{
PodName: "pod",
PodNamespace: "pod-ns",
},
}
mockIfaceStore.EXPECT().GetInterfaceByIP(gomock.Any()).Return(testInterface, true).AnyTimes()

mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
mockConnDumper.EXPECT().GetMaxConnections().Return(100000, nil).AnyTimes()

serviceStr := "10.0.0.1:30000/TCP"
servicePortName := k8sproxy.ServicePortName{
NamespacedName: types.NamespacedName{
Namespace: "serviceNS1",
Name: "service1",
},
Port: "30000",
Protocol: v1.ProtocolTCP,
}
mockProxier := proxytest.NewMockProxier(ctrl)
mockProxier.EXPECT().GetServiceByIP(serviceStr).Return(servicePortName, true).AnyTimes()

npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl)

return NewConntrackConnectionStore(mockConnDumper, flowrecords.NewFlowRecords(), mockIfaceStore, true, false, mockProxier, npQuerier, testPollInterval), mockConnDumper
}

func generateConns() []*flowexporter.Connection {
conns := make([]*flowexporter.Connection, testNumOfConns)
for i := 0; i < testNumOfConns; i++ {
conns[i] = getNewConn()
}
return conns
}

func generateUpdatedConns(conns []*flowexporter.Connection) []*flowexporter.Connection {
length := len(conns) - testNumOfDeletedConns + testNumOfNewConns
updatedConns := make([]*flowexporter.Connection, length)
for i := 0; i < len(conns); i++ {
// replace deleted connection with new connection
if conns[i].DoneExport == true {
conns[i] = getNewConn()
} else { // update rest of connections
conns[i].OriginalPackets += 5
conns[i].OriginalBytes += 20
conns[i].ReversePackets += 2
conns[i].ReverseBytes += 10
}
updatedConns[i] = conns[i]
}
for i := len(conns); i < length; i++ {
updatedConns[i] = getNewConn()
}
randomNum := getRandomNum(int64(length - testNumOfDeletedConns))
for i := randomNum; i < testNumOfDeletedConns+randomNum; i++ {
// hardcode DoneExport here for testing deletion of connections
// not valid for testing update and export of records
updatedConns[i].DoneExport = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if updatedConns[i] is one of the new connections for this iteration? Does it still make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion it is still valid because the connection can be stored and will be deleted in the next round. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok since we are not testing export here. It would have been good to add a comment to this effect.

}
return updatedConns
}

func getNewConn() *flowexporter.Connection {
randomNum1 := getRandomNum(255)
randomNum2 := getRandomNum(255)
randomNum3 := getRandomNum(255)
src := net.ParseIP(fmt.Sprintf("192.%d.%d.%d", randomNum1, randomNum2, randomNum3))
dst := net.ParseIP(fmt.Sprintf("192.%d.%d.%d", randomNum3, randomNum2, randomNum1))
flowKey := flowexporter.Tuple{SourceAddress: src, DestinationAddress: dst, Protocol: 6, SourcePort: uint16(randomNum1), DestinationPort: uint16(randomNum2)}
return &flowexporter.Connection{
StartTime: time.Now().Add(-time.Duration(randomNum1) * time.Second),
StopTime: time.Now(),
IsPresent: true,
DoneExport: false,
FlowKey: flowKey,
OriginalPackets: 10,
OriginalBytes: 100,
ReversePackets: 5,
ReverseBytes: 50,
DestinationServiceAddress: net.ParseIP("10.0.0.1"),
DestinationServicePort: 30000,
TCPState: "SYN_SENT",
}
}

func getRandomNum(value int64) uint64 {
number, _ := rand.Int(rand.Reader, big.NewInt(value))
return number.Uint64()
}

func disableLogToStderr() {
klogFlagSet := flag.NewFlagSet("klog", flag.ContinueOnError)
klog.InitFlags(klogFlagSet)
klogFlagSet.Parse([]string{"-logtostderr=false"})
}
3 changes: 1 addition & 2 deletions pkg/agent/flowexporter/connections/deny_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ func (ds *DenyConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection, ti

// ResetConnStatsWithoutLock resets DeltaBytes and DeltaPackets of connection
// after exporting without grabbing the lock. Caller is expected to grab lock.
func (ds *DenyConnectionStore) ResetConnStatsWithoutLock(conn *flowexporter.Connection) {
connKey := flowexporter.NewConnectionKey(conn)
func (ds *DenyConnectionStore) ResetConnStatsWithoutLock(connKey flowexporter.ConnectionKey) {
conn, exist := ds.connections[connKey]
if !exist {
klog.Warningf("Connection with key %s does not exist in deny connection store.", connKey)
Expand Down
39 changes: 16 additions & 23 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,26 +104,20 @@ type flowExporter struct {
k8sClient kubernetes.Interface
nodeRouteController *noderoute.Controller
isNetworkPolicyOnly bool
nodeName string
}

func genObservationID() (uint32, error) {
name, err := env.GetNodeName()
if err != nil {
return 0, err
}
func genObservationID(nodeName string) uint32 {
h := fnv.New32()
h.Write([]byte(name))
return h.Sum32(), nil
h.Write([]byte(nodeName))
return h.Sum32()
}

func prepareExporterInputArgs(collectorAddr, collectorProto string) (exporter.ExporterInput, error) {
func prepareExporterInputArgs(collectorAddr, collectorProto, nodeName string) exporter.ExporterInput {
expInput := exporter.ExporterInput{}
var err error
// Exporting process requires domain observation ID.
expInput.ObservationDomainID, err = genObservationID()
if err != nil {
return expInput, err
}
expInput.ObservationDomainID = genObservationID(nodeName)

expInput.CollectorAddress = collectorAddr
if collectorProto == "tls" {
expInput.IsEncrypted = true
Expand All @@ -134,7 +128,7 @@ func prepareExporterInputArgs(collectorAddr, collectorProto string) (exporter.Ex
}
expInput.PathMTU = 0

return expInput, nil
return expInput
}

func NewFlowExporter(connStore *connections.ConntrackConnectionStore, records *flowrecords.FlowRecords, denyConnStore *connections.DenyConnectionStore,
Expand All @@ -146,10 +140,11 @@ func NewFlowExporter(connStore *connections.ConntrackConnectionStore, records *f
registry.LoadRegistry()

// Prepare input args for IPFIX exporting process.
expInput, err := prepareExporterInputArgs(collectorAddr, collectorProto)
nodeName, err := env.GetNodeName()
if err != nil {
return nil, err
}
expInput := prepareExporterInputArgs(collectorAddr, collectorProto, nodeName)

return &flowExporter{
conntrackConnStore: connStore,
Expand All @@ -165,6 +160,7 @@ func NewFlowExporter(connStore *connections.ConntrackConnectionStore, records *f
k8sClient: k8sClient,
nodeRouteController: nodeRouteController,
isNetworkPolicyOnly: isNetworkPolicyOnly,
nodeName: nodeName,
}, nil
}

Expand Down Expand Up @@ -334,7 +330,7 @@ func (exp *flowExporter) sendFlowRecords() error {
return err
}
exp.numDataSetsSent = exp.numDataSetsSent + 1
exp.denyConnStore.ResetConnStatsWithoutLock(conn)
exp.denyConnStore.ResetConnStatsWithoutLock(connKey)
}
if time.Since(conn.LastExportTime) >= exp.idleFlowTimeout {
if err := exp.addDenyConnToSet(conn, ipfixregistry.IdleTimeoutReason); err != nil {
Expand Down Expand Up @@ -414,8 +410,6 @@ func (exp *flowExporter) sendTemplateSet(isIPv6 bool) (int, error) {
}

func (exp *flowExporter) addRecordToSet(record flowexporter.FlowRecord) error {
nodeName, _ := env.GetNodeName()

// Iterate over all infoElements in the list
eL := exp.elementsListv4
if record.IsIPv6 {
Expand Down Expand Up @@ -488,7 +482,7 @@ func (exp *flowExporter) addRecordToSet(record flowexporter.FlowRecord) error {
case "sourceNodeName":
// Add nodeName for only local pods whose pod names are resolved.
if record.Conn.SourcePodName != "" {
ie.Value = nodeName
ie.Value = exp.nodeName
} else {
ie.Value = ""
}
Expand All @@ -499,7 +493,7 @@ func (exp *flowExporter) addRecordToSet(record flowexporter.FlowRecord) error {
case "destinationNodeName":
// Add nodeName for only local pods whose pod names are resolved.
if record.Conn.DestinationPodName != "" {
ie.Value = nodeName
ie.Value = exp.nodeName
} else {
ie.Value = ""
}
Expand Down Expand Up @@ -570,7 +564,6 @@ func (exp *flowExporter) addRecordToSet(record flowexporter.FlowRecord) error {
}

func (exp *flowExporter) addDenyConnToSet(conn *flowexporter.Connection, flowEndReason uint8) error {
nodeName, _ := env.GetNodeName()
exp.ipfixSet.ResetSet()

eL := exp.elementsListv4
Expand Down Expand Up @@ -622,7 +615,7 @@ func (exp *flowExporter) addDenyConnToSet(conn *flowexporter.Connection, flowEnd
case "sourceNodeName":
// Add nodeName for only local pods whose pod names are resolved.
if conn.SourcePodName != "" {
ie.Value = nodeName
ie.Value = exp.nodeName
} else {
ie.Value = ""
}
Expand All @@ -633,7 +626,7 @@ func (exp *flowExporter) addDenyConnToSet(conn *flowexporter.Connection, flowEnd
case "destinationNodeName":
// Add nodeName for only local pods whose pod names are resolved.
if conn.DestinationPodName != "" {
ie.Value = nodeName
ie.Value = exp.nodeName
} else {
ie.Value = ""
}
Expand Down
Loading