Skip to content

Commit

Permalink
Bug fix in Flow Exporter related deletion of connections
Browse files Browse the repository at this point in the history
If flows are not exported to any collector, we do not clear the connections
in the connection store. This is applicable to both conntrack and deny
connections. This will increase the memory usage unnecessarily in Antrea agent.

Signed-off-by: Srikar Tati <[email protected]>
  • Loading branch information
srikartati committed Aug 3, 2021
1 parent 028393d commit 9629ec0
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 8 deletions.
1 change: 1 addition & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func run(o *Options) error {
var denyConnStore *connections.DenyConnectionStore
if features.DefaultFeatureGate.Enabled(features.FlowExporter) {
denyConnStore = connections.NewDenyConnectionStore(ifaceStore, proxier)
go denyConnStore.RunPeriodicDeletion(stopCh)
}
networkPolicyController, err := networkpolicy.NewNetworkPolicyController(
antreaClientProvider,
Expand Down
10 changes: 9 additions & 1 deletion cmd/antrea-agent/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/flowexporter/connections"
"antrea.io/antrea/pkg/apis"
"antrea.io/antrea/pkg/cni"
"antrea.io/antrea/pkg/features"
Expand Down Expand Up @@ -254,10 +255,17 @@ func (o *Options) validateFlowExporterConfig() error {
return fmt.Errorf("IdleFlowExportTimeout is not provided in right format")
}
if o.idleFlowTimeout < o.pollInterval {
o.activeFlowTimeout = o.pollInterval
o.idleFlowTimeout = o.pollInterval
klog.Warningf("IdleFlowExportTimeout must be greater than or equal to FlowPollInterval")
}
}
if (o.activeFlowTimeout > connections.StaleConnectionTimeout) || (o.idleFlowTimeout > connections.StaleConnectionTimeout) {
if o.activeFlowTimeout > o.idleFlowTimeout {
connections.StaleConnectionTimeout = 2 * o.activeFlowTimeout
} else {
connections.StaleConnectionTimeout = 2 * o.idleFlowTimeout
}
}
}
return nil
}
9 changes: 9 additions & 0 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package connections
import (
"fmt"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
Expand All @@ -26,6 +27,14 @@ import (
"antrea.io/antrea/pkg/agent/proxy"
)

const (
periodicDeleteInterval = time.Minute
)

var (
StaleConnectionTimeout = 5 * time.Minute
)

type connectionStore struct {
connections map[flowexporter.ConnectionKey]*flowexporter.Connection
ifaceStore interfacestore.InterfaceStore
Expand Down
26 changes: 21 additions & 5 deletions pkg/agent/flowexporter/connections/conntrack_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,28 @@ func (cs *ConntrackConnectionStore) Run(stopCh <-chan struct{}) {
// TODO: As optimization, only poll invalid/closed connections during every poll, and poll the established connections right before the export.
func (cs *ConntrackConnectionStore) Poll() ([]int, error) {
klog.V(2).Infof("Polling conntrack")
// Reset IsPresent flag for all connections in connection map before dumping flows in conntrack module.
// if the connection does not exist in conntrack table and has been exported, we will delete it from connection map.
// Reset IsPresent flag for all connections in connection map before dumping
// flows in conntrack module. If the connection does not exist in conntrack
// table and has been exported, then we will delete it from connection map.
// In addition, if the connection was not exported for a specific time period,
// then we consider it to be stale and delete it.
deleteIfStaleOrResetConn := func(key flowexporter.ConnectionKey, conn *flowexporter.Connection) error {
if !conn.IsPresent && conn.DyingAndDoneExport {
if err := cs.DeleteConnWithoutLock(key); err != nil {
return err
if !conn.IsPresent {
if conn.DyingAndDoneExport {
if err := cs.DeleteConnWithoutLock(key); err != nil {
return err
}
} else {
record, exists := cs.flowRecords.GetFlowRecordFromMap(&key)
if exists {
// Delete the connection if it was not exported for the time
// period as specified by the stale connection timeout.
if time.Since(record.LastExportTime) >= StaleConnectionTimeout {
// Ignore error if flow record not found.
cs.flowRecords.DeleteFlowRecordFromMap(&key)
delete(cs.connections, key)
}
}
}
} else {
conn.IsPresent = false
Expand Down
22 changes: 22 additions & 0 deletions pkg/agent/flowexporter/connections/deny_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,28 @@ func NewDenyConnectionStore(ifaceStore interfacestore.InterfaceStore,
}
}

func (ds *DenyConnectionStore) RunPeriodicDeletion(stopCh <-chan struct{}) {
pollTicker := time.NewTicker(periodicDeleteInterval)
defer pollTicker.Stop()

for {
select {
case <-stopCh:
break
case <-pollTicker.C:
deleteIfStaleConn := func(key flowexporter.ConnectionKey, conn *flowexporter.Connection) error {
// Delete the connection if it was not exported in last five minutes.
if time.Since(conn.LastExportTime) >= StaleConnectionTimeout {
delete(ds.connections, key)
}
return nil
}
ds.ForAllConnectionsDo(deleteIfStaleConn)
klog.V(2).Infof("Stale connections in the Deny Connection Store are successfully deleted.")
}
}
}

// AddOrUpdateConn updates the connection if it is already present, i.e., update timestamp, counters etc.,
// or adds a new connection with the resolved K8s metadata.
func (ds *DenyConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection, timeSeen time.Time, bytes uint64) {
Expand Down
38 changes: 38 additions & 0 deletions pkg/agent/flowexporter/exporter/exporter_perf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,44 @@ func BenchmarkExportDenyConns(b *testing.B) {

}

/*
go test -test.v -run=None -test.benchmem -bench=BenchmarkGetRecordAndDeleteFromMap -memprofile memprofile.out -cpuprofile profile.out
goos: linux
goarch: amd64
pkg: antrea.io/antrea/pkg/agent/flowexporter/exporter
cpu: Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
BenchmarkGetRecordAndDeleteFromMap
exporter_perf_test.go:150:
Summary:
Number of conntrack connections: 100000
Number of dying conntrack connections: 5000
BenchmarkGetRecordAndDeleteFromMap-2 168049232 6.737 ns/op 0 B/op 0 allocs/op
PASS
ok antrea.io/antrea/pkg/agent/flowexporter/exporter 8.707s
Reference value:
#conns
20000 168049232 6.737 ns/op 0 B/op 0 allocs/op
*/
func BenchmarkGetRecordAndDeleteFromMap(b *testing.B) {
disableLogToStderr()
ctrl := gomock.NewController(b)
defer ctrl.Finish()

conntrackConnStore := connections.NewConntrackConnectionStore(nil, flowrecords.NewFlowRecords(), nil, true, false, nil, nil, 1)
records := addConnsAndGetRecords(conntrackConnStore)
b.ResetTimer()
for n := 0; n < b.N; n++ {
getFlowRecordAndDelete := func(key flowexporter.ConnectionKey, record flowexporter.FlowRecord) error {
if _, exists := records.GetFlowRecordFromMap(&key); exists {
return records.DeleteFlowRecordFromMap(&key)
}
return nil
}
records.ForAllFlowRecordsDoWithoutLock(getFlowRecordAndDelete)
}
b.Logf("\nSummary:\nNumber of conntrack connections: %d\n Number of dying conntrack connections: %d\n", testNumOfConns, testNumOfDyingConns)
}

func setupExporter(isConntrackConn bool) (*flowExporter, error) {
var err error
collectorAddr, err := startLocalServer()
Expand Down
29 changes: 27 additions & 2 deletions pkg/agent/flowexporter/flowrecords/flow_records.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package flowrecords

import (
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -100,15 +101,26 @@ func (fr *FlowRecords) AddFlowRecordWithoutLock(connKey *flowexporter.Connection
fr.recordsMap[*connKey] = *record
}

// GetFlowRecordFromMap gets the flow record from record map given connection key.
// This is used only for unit tests.
// GetFlowRecordFromMap gets the flow record from record map given the connection key.
func (fr *FlowRecords) GetFlowRecordFromMap(connKey *flowexporter.ConnectionKey) (*flowexporter.FlowRecord, bool) {
fr.mutex.Lock()
defer fr.mutex.Unlock()
record, exists := fr.recordsMap[*connKey]
return &record, exists
}

// DeleteFlowRecordFromMap deletes the flow record from record map given the connection key.
func (fr *FlowRecords) DeleteFlowRecordFromMap(connKey *flowexporter.ConnectionKey) error {
fr.mutex.Lock()
defer fr.mutex.Unlock()
_, exists := fr.recordsMap[*connKey]
if !exists {
return fmt.Errorf("record with key %v doesn't exist in map", connKey)
}
delete(fr.recordsMap, *connKey)
return nil
}

// ValidateAndUpdateStats validates and updates the flow record given the connection
// key. Caller is expected to grab lock.
func (fr *FlowRecords) ValidateAndUpdateStats(connKey flowexporter.ConnectionKey, record flowexporter.FlowRecord) {
Expand Down Expand Up @@ -136,3 +148,16 @@ func (fr *FlowRecords) ForAllFlowRecordsDo(callback flowexporter.FlowRecordCallB
}
return nil
}

// ForAllFlowRecordsDoWithoutLock executes the callback for all records in the flow
// record map. This is used in the perf testing.
func (fr *FlowRecords) ForAllFlowRecordsDoWithoutLock(callback flowexporter.FlowRecordCallBack) error {
for k, v := range fr.recordsMap {
err := callback(k, v)
if err != nil {
klog.Errorf("Error when executing callback for flow record: %v", err)
return err
}
}
return nil
}

0 comments on commit 9629ec0

Please sign in to comment.