Skip to content

Commit

Permalink
Fix FlowExporter memory bloat when export process is dead
Browse files Browse the repository at this point in the history
Signed-off-by: Shawn Wang <[email protected]>
  • Loading branch information
wsquan171 committed Jul 13, 2022
1 parent b9f3426 commit 16e4907
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 6 deletions.
10 changes: 8 additions & 2 deletions pkg/agent/flowexporter/connections/conntrack_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ func (cs *ConntrackConnectionStore) Poll() ([]int, error) {
// Delete the connection if it is ready to delete or it was not exported
// in the time period as specified by the stale connection timeout.
if conn.ReadyToDelete || time.Since(conn.LastExportTime) >= cs.staleConnectionTimeout {
if removedItem := cs.expirePriorityQueue.Remove(key); removedItem != nil {
// In case ReadyToDelete is true, item should already have been removed from pq
klog.V(4).InfoS("Conn removed from cs pq due to stale timeout",
"key", key, "conn", removedItem.Conn)
}
if err := cs.deleteConnWithoutLock(key); err != nil {
return err
}
Expand Down Expand Up @@ -239,7 +244,7 @@ func (cs *ConntrackConnectionStore) AddOrUpdateConn(conn *flowexporter.Connectio
// If the connKey:pqItem pair does not exist in the map, it shows the
// conn was inactive, and was removed from PQ and map. Since it becomes
// active again now, we create a new pqItem and add it to PQ and map.
cs.expirePriorityQueue.AddItemToQueue(connKey, existingConn)
cs.expirePriorityQueue.WriteItemToQueue(connKey, existingConn)
} else {
cs.connectionStore.expirePriorityQueue.Update(existingItem, existingItem.ActiveExpireTime,
time.Now().Add(cs.connectionStore.expirePriorityQueue.IdleFlowTimeout))
Expand All @@ -264,11 +269,12 @@ func (cs *ConntrackConnectionStore) AddOrUpdateConn(conn *flowexporter.Connectio
conn.StartTime = time.Now()
conn.StopTime = time.Now()
}
conn.LastExportTime = conn.StartTime
metrics.TotalAntreaConnectionsInConnTrackTable.Inc()
conn.IsActive = true
// Add new antrea connection to connection store and PQ.
cs.connections[connKey] = conn
cs.expirePriorityQueue.AddItemToQueue(connKey, conn)
cs.expirePriorityQueue.WriteItemToQueue(connKey, conn)
klog.V(4).InfoS("New Antrea flow added", "connection", conn)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) {
expectedConn: flowexporter.Connection{
StartTime: refTime,
StopTime: refTime,
LastExportTime: refTime,
FlowKey: tuple1,
Labels: []byte{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0},
Mark: openflow.ServiceCTMark.GetValue(),
Expand All @@ -136,6 +137,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) {
oldConn: &flowexporter.Connection{
StartTime: refTime.Add(-(time.Second * 50)),
StopTime: refTime.Add(-(time.Second * 30)),
LastExportTime: refTime.Add(-(time.Second * 50)),
OriginalPackets: 0xfff,
OriginalBytes: 0xbaaaaa00000000,
ReversePackets: 0xf,
Expand All @@ -156,6 +158,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) {
expectedConn: flowexporter.Connection{
StartTime: refTime.Add(-(time.Second * 50)),
StopTime: refTime,
LastExportTime: refTime.Add(-(time.Second * 50)),
OriginalPackets: 0xffff,
OriginalBytes: 0xbaaaaa0000000000,
ReversePackets: 0xff,
Expand All @@ -173,6 +176,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) {
oldConn: &flowexporter.Connection{
StartTime: refTime.Add(-(time.Second * 50)),
StopTime: refTime.Add(-(time.Second * 30)),
LastExportTime: refTime.Add(-(time.Second * 50)),
OriginalPackets: 0xfff,
OriginalBytes: 0xbaaaaa00000000,
ReversePackets: 0xf,
Expand All @@ -195,6 +199,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) {
expectedConn: flowexporter.Connection{
StartTime: refTime.Add(-(time.Second * 50)),
StopTime: refTime.Add(-(time.Second * 30)),
LastExportTime: refTime.Add(-(time.Second * 50)),
OriginalPackets: 0xfff,
OriginalBytes: 0xbaaaaa00000000,
ReversePackets: 0xf,
Expand Down Expand Up @@ -249,7 +254,7 @@ func testAddNewConn(mockIfaceStore *interfacestoretest.MockInterfaceStore, mockP
func addConnToStore(cs *ConntrackConnectionStore, conn *flowexporter.Connection) {
connKey := flowexporter.NewConnectionKey(conn)
cs.AddConnToMap(&connKey, conn)
cs.expirePriorityQueue.AddItemToQueue(connKey, conn)
cs.expirePriorityQueue.WriteItemToQueue(connKey, conn)
metrics.TotalAntreaConnectionsInConnTrackTable.Inc()
}

Expand Down
10 changes: 8 additions & 2 deletions pkg/agent/flowexporter/connections/deny_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ func (ds *DenyConnectionStore) RunPeriodicDeletion(stopCh <-chan struct{}) {
case <-pollTicker.C:
deleteIfStaleConn := func(key flowexporter.ConnectionKey, conn *flowexporter.Connection) error {
if conn.ReadyToDelete || time.Since(conn.LastExportTime) >= ds.staleConnectionTimeout {
if removedItem := ds.expirePriorityQueue.Remove(key); removedItem != nil {
// In case ReadyToDelete is true, item should already have been removed from pq
klog.V(4).InfoS("Conn removed from ds pq due to stale timeout",
"key", key, "conn", removedItem.Conn)
}
if err := ds.deleteConnWithoutLock(key); err != nil {
return err
}
Expand Down Expand Up @@ -78,7 +83,7 @@ func (ds *DenyConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection, ti
conn.IsActive = true
existingItem, exists := ds.expirePriorityQueue.KeyToItem[connKey]
if !exists {
ds.expirePriorityQueue.AddItemToQueue(connKey, conn)
ds.expirePriorityQueue.WriteItemToQueue(connKey, conn)
} else {
ds.connectionStore.expirePriorityQueue.Update(existingItem, existingItem.ActiveExpireTime,
time.Now().Add(ds.connectionStore.expirePriorityQueue.IdleFlowTimeout))
Expand All @@ -87,6 +92,7 @@ func (ds *DenyConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection, ti
} else {
conn.StartTime = timeSeen
conn.StopTime = timeSeen
conn.LastExportTime = timeSeen
conn.OriginalBytes = bytes
conn.OriginalPackets = uint64(1)
ds.fillPodInfo(conn)
Expand All @@ -96,7 +102,7 @@ func (ds *DenyConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection, ti
metrics.TotalDenyConnections.Inc()
conn.IsActive = true
ds.connections[connKey] = conn
ds.expirePriorityQueue.AddItemToQueue(connKey, conn)
ds.expirePriorityQueue.WriteItemToQueue(connKey, conn)
klog.V(4).InfoS("New deny connection added", "connection", conn)
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/flowexporter/connections/deny_connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) {
assert.Equal(t, ok, true, "deny connection should be there in deny connection store")
assert.Equal(t, expConn, *actualConn, "deny connections should be equal")
assert.Equal(t, 1, denyConnStore.connectionStore.expirePriorityQueue.Len(), "Length of the expire priority queue should be 1")
assert.Equal(t, refTime.Add(-(time.Second * 20)), actualConn.LastExportTime, "LastExportTime should be set to StartTime during Add")
checkDenyConnectionMetrics(t, len(denyConnStore.connections))

denyConnStore.AddOrUpdateConn(&testFlow, refTime.Add(-(time.Second * 10)), uint64(60))
Expand All @@ -89,6 +90,7 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) {
assert.Equal(t, expConn, *actualConn, "deny connections should be equal")
assert.True(t, actualConn.IsActive)
assert.Equal(t, 1, denyConnStore.connectionStore.expirePriorityQueue.Len())
assert.Equal(t, refTime.Add(-(time.Second * 20)), actualConn.LastExportTime, "LastExportTime should not be changed during Update")
checkDenyConnectionMetrics(t, len(denyConnStore.connections))
}

Expand Down
19 changes: 18 additions & 1 deletion pkg/agent/flowexporter/priorityqueue/priorityqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@ func (pq *ExpirePriorityQueue) Update(item *flowexporter.ItemToExpire, activeExp
heap.Fix(pq, item.Index)
}

// Remove removes and returns an Item by key from priority queue if it exists.
func (pq *ExpirePriorityQueue) Remove(connKey flowexporter.ConnectionKey) *flowexporter.ItemToExpire {
item, exists := pq.KeyToItem[connKey]
if !exists {
return nil
}

removedItem := heap.Remove(pq, item.Index)
delete(pq.KeyToItem, connKey)
return removedItem.(*flowexporter.ItemToExpire)
}

// GetExpiryFromExpirePriorityQueue returns the shortest expire time duration
// from expire priority queue.
func (pq *ExpirePriorityQueue) GetExpiryFromExpirePriorityQueue() time.Duration {
Expand All @@ -114,13 +126,18 @@ func (pq *ExpirePriorityQueue) GetExpiryFromExpirePriorityQueue() time.Duration
return pq.IdleFlowTimeout
}

func (pq *ExpirePriorityQueue) AddItemToQueue(connKey flowexporter.ConnectionKey, conn *flowexporter.Connection) {
// WriteItemToQueue adds conn with connKey into the queue. If an existing item
// has the same connKey, it will be overwritten by the new item.
func (pq *ExpirePriorityQueue) WriteItemToQueue(connKey flowexporter.ConnectionKey, conn *flowexporter.Connection) {
currTime := time.Now()
pqItem := &flowexporter.ItemToExpire{
Conn: conn,
ActiveExpireTime: currTime.Add(pq.ActiveFlowTimeout),
IdleExpireTime: currTime.Add(pq.IdleFlowTimeout),
}
// If connKey exists in pq, it is removed first to avoid having multiple pqItems with same key
// in the queue, which can cause memory leak as the previous one can't be updated or removed.
pq.Remove(connKey)
heap.Push(pq, pqItem)
pq.KeyToItem[connKey] = pqItem
}
Expand Down
32 changes: 32 additions & 0 deletions pkg/agent/flowexporter/priorityqueue/priorityqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package priorityqueue

import (
"container/heap"
"fmt"
"testing"
"time"

Expand All @@ -41,9 +42,40 @@ func TestExpirePriorityQueue(t *testing.T) {
Index: key,
}
testPriorityQueue.items = append(testPriorityQueue.items, item)
testPriorityQueue.KeyToItem[flowexporter.ConnectionKey{fmt.Sprintf("%d", key)}] = item
}
heap.Init(testPriorityQueue)

// Test WriteItemToQueue
connKey := flowexporter.ConnectionKey{"3"}
conn := flowexporter.Connection{}
testPriorityQueue.WriteItemToQueue(connKey, &conn)
assert.Equal(t, &conn, testPriorityQueue.KeyToItem[connKey].Conn, "WriteItemToQueue didn't add new conn to map")
newConn := flowexporter.Connection{}
testPriorityQueue.WriteItemToQueue(connKey, &newConn)
assert.Equal(t, &newConn, testPriorityQueue.KeyToItem[connKey].Conn, "WriteItemToQueue didn't overwrite existing conn to map")
hasOld, hasNew := false, false
for _, item := range testPriorityQueue.items {
if item.Conn == &conn {
hasOld = true
}
if item.Conn == &newConn {
hasNew = true
}
}
assert.False(t, hasOld && hasNew, "WriteItemToQueue shouldn't add two items with same key to heap")

// Test Remove
removedItem := testPriorityQueue.Remove(connKey)
assert.Equal(t, &newConn, removedItem.Conn, "Remove didn't return correct item")
_, exist := testPriorityQueue.KeyToItem[connKey]
assert.False(t, exist, "Remove didn't delete KeyToItem entry")
for _, item := range testPriorityQueue.items {
if item.Conn == &newConn {
assert.Fail(t, "Remove didn't delete item from queue")
}
}

// Add new flow to the priority queue
testFlowsWithExpire[3] = []time.Time{startTime.Add(3 * time.Second), startTime.Add(500 * time.Millisecond)}
newFlowItem := &flowexporter.ItemToExpire{
Expand Down

0 comments on commit 16e4907

Please sign in to comment.