Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix FlowExporter memory bloat when export process is dead #3994

Merged
merged 1 commit into from
Jul 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pkg/agent/flowexporter/connections/conntrack_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ func (cs *ConntrackConnectionStore) Poll() ([]int, error) {
// Delete the connection if it is ready to delete or it was not exported
// in the time period as specified by the stale connection timeout.
if conn.ReadyToDelete || time.Since(conn.LastExportTime) >= cs.staleConnectionTimeout {
if removedItem := cs.expirePriorityQueue.Remove(key); removedItem != nil {
// In case ReadyToDelete is true, item should already have been removed from pq
klog.V(4).InfoS("Conn removed from cs pq due to stale timeout",
"key", key, "conn", removedItem.Conn)
}
if err := cs.deleteConnWithoutLock(key); err != nil {
return err
}
Expand Down Expand Up @@ -239,7 +244,7 @@ func (cs *ConntrackConnectionStore) AddOrUpdateConn(conn *flowexporter.Connectio
// If the connKey:pqItem pair does not exist in the map, it shows the
// conn was inactive, and was removed from PQ and map. Since it becomes
// active again now, we create a new pqItem and add it to PQ and map.
cs.expirePriorityQueue.AddItemToQueue(connKey, existingConn)
cs.expirePriorityQueue.WriteItemToQueue(connKey, existingConn)
} else {
cs.connectionStore.expirePriorityQueue.Update(existingItem, existingItem.ActiveExpireTime,
time.Now().Add(cs.connectionStore.expirePriorityQueue.IdleFlowTimeout))
Expand All @@ -264,11 +269,12 @@ func (cs *ConntrackConnectionStore) AddOrUpdateConn(conn *flowexporter.Connectio
conn.StartTime = time.Now()
conn.StopTime = time.Now()
}
conn.LastExportTime = conn.StartTime
metrics.TotalAntreaConnectionsInConnTrackTable.Inc()
conn.IsActive = true
// Add new antrea connection to connection store and PQ.
cs.connections[connKey] = conn
cs.expirePriorityQueue.AddItemToQueue(connKey, conn)
cs.expirePriorityQueue.WriteItemToQueue(connKey, conn)
klog.V(4).InfoS("New Antrea flow added", "connection", conn)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) {
expectedConn: flowexporter.Connection{
StartTime: refTime,
StopTime: refTime,
LastExportTime: refTime,
FlowKey: tuple1,
Labels: []byte{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0},
Mark: openflow.ServiceCTMark.GetValue(),
Expand All @@ -136,6 +137,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) {
oldConn: &flowexporter.Connection{
StartTime: refTime.Add(-(time.Second * 50)),
StopTime: refTime.Add(-(time.Second * 30)),
LastExportTime: refTime.Add(-(time.Second * 50)),
OriginalPackets: 0xfff,
OriginalBytes: 0xbaaaaa00000000,
ReversePackets: 0xf,
Expand All @@ -156,6 +158,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) {
expectedConn: flowexporter.Connection{
StartTime: refTime.Add(-(time.Second * 50)),
StopTime: refTime,
LastExportTime: refTime.Add(-(time.Second * 50)),
OriginalPackets: 0xffff,
OriginalBytes: 0xbaaaaa0000000000,
ReversePackets: 0xff,
Expand All @@ -173,6 +176,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) {
oldConn: &flowexporter.Connection{
StartTime: refTime.Add(-(time.Second * 50)),
StopTime: refTime.Add(-(time.Second * 30)),
LastExportTime: refTime.Add(-(time.Second * 50)),
OriginalPackets: 0xfff,
OriginalBytes: 0xbaaaaa00000000,
ReversePackets: 0xf,
Expand All @@ -195,6 +199,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) {
expectedConn: flowexporter.Connection{
StartTime: refTime.Add(-(time.Second * 50)),
StopTime: refTime.Add(-(time.Second * 30)),
LastExportTime: refTime.Add(-(time.Second * 50)),
OriginalPackets: 0xfff,
OriginalBytes: 0xbaaaaa00000000,
ReversePackets: 0xf,
Expand Down Expand Up @@ -249,7 +254,7 @@ func testAddNewConn(mockIfaceStore *interfacestoretest.MockInterfaceStore, mockP
func addConnToStore(cs *ConntrackConnectionStore, conn *flowexporter.Connection) {
connKey := flowexporter.NewConnectionKey(conn)
cs.AddConnToMap(&connKey, conn)
cs.expirePriorityQueue.AddItemToQueue(connKey, conn)
cs.expirePriorityQueue.WriteItemToQueue(connKey, conn)
metrics.TotalAntreaConnectionsInConnTrackTable.Inc()
}

Expand Down
10 changes: 8 additions & 2 deletions pkg/agent/flowexporter/connections/deny_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ func (ds *DenyConnectionStore) RunPeriodicDeletion(stopCh <-chan struct{}) {
case <-pollTicker.C:
deleteIfStaleConn := func(key flowexporter.ConnectionKey, conn *flowexporter.Connection) error {
if conn.ReadyToDelete || time.Since(conn.LastExportTime) >= ds.staleConnectionTimeout {
if removedItem := ds.expirePriorityQueue.Remove(key); removedItem != nil {
// In case ReadyToDelete is true, item should already have been removed from pq
klog.V(4).InfoS("Conn removed from ds pq due to stale timeout",
"key", key, "conn", removedItem.Conn)
}
if err := ds.deleteConnWithoutLock(key); err != nil {
return err
}
Expand Down Expand Up @@ -78,7 +83,7 @@ func (ds *DenyConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection, ti
conn.IsActive = true
existingItem, exists := ds.expirePriorityQueue.KeyToItem[connKey]
if !exists {
ds.expirePriorityQueue.AddItemToQueue(connKey, conn)
ds.expirePriorityQueue.WriteItemToQueue(connKey, conn)
} else {
ds.connectionStore.expirePriorityQueue.Update(existingItem, existingItem.ActiveExpireTime,
time.Now().Add(ds.connectionStore.expirePriorityQueue.IdleFlowTimeout))
Expand All @@ -87,6 +92,7 @@ func (ds *DenyConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection, ti
} else {
conn.StartTime = timeSeen
conn.StopTime = timeSeen
conn.LastExportTime = timeSeen
conn.OriginalBytes = bytes
conn.OriginalPackets = uint64(1)
ds.fillPodInfo(conn)
Expand All @@ -96,7 +102,7 @@ func (ds *DenyConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection, ti
metrics.TotalDenyConnections.Inc()
conn.IsActive = true
ds.connections[connKey] = conn
ds.expirePriorityQueue.AddItemToQueue(connKey, conn)
ds.expirePriorityQueue.WriteItemToQueue(connKey, conn)
klog.V(4).InfoS("New deny connection added", "connection", conn)
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/flowexporter/connections/deny_connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) {
assert.Equal(t, ok, true, "deny connection should be there in deny connection store")
assert.Equal(t, expConn, *actualConn, "deny connections should be equal")
assert.Equal(t, 1, denyConnStore.connectionStore.expirePriorityQueue.Len(), "Length of the expire priority queue should be 1")
assert.Equal(t, refTime.Add(-(time.Second * 20)), actualConn.LastExportTime, "LastExportTime should be set to StartTime during Add")
checkDenyConnectionMetrics(t, len(denyConnStore.connections))

denyConnStore.AddOrUpdateConn(&testFlow, refTime.Add(-(time.Second * 10)), uint64(60))
Expand All @@ -89,6 +90,7 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) {
assert.Equal(t, expConn, *actualConn, "deny connections should be equal")
assert.True(t, actualConn.IsActive)
assert.Equal(t, 1, denyConnStore.connectionStore.expirePriorityQueue.Len())
assert.Equal(t, refTime.Add(-(time.Second * 20)), actualConn.LastExportTime, "LastExportTime should not be changed during Update")
checkDenyConnectionMetrics(t, len(denyConnStore.connections))
}

Expand Down
19 changes: 18 additions & 1 deletion pkg/agent/flowexporter/priorityqueue/priorityqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@ func (pq *ExpirePriorityQueue) Update(item *flowexporter.ItemToExpire, activeExp
heap.Fix(pq, item.Index)
}

// Remove removes and returns an Item by key from priority queue if it exists.
func (pq *ExpirePriorityQueue) Remove(connKey flowexporter.ConnectionKey) *flowexporter.ItemToExpire {
item, exists := pq.KeyToItem[connKey]
if !exists {
return nil
}

removedItem := heap.Remove(pq, item.Index)
delete(pq.KeyToItem, connKey)
return removedItem.(*flowexporter.ItemToExpire)
}

// GetExpiryFromExpirePriorityQueue returns the shortest expire time duration
// from expire priority queue.
func (pq *ExpirePriorityQueue) GetExpiryFromExpirePriorityQueue() time.Duration {
Expand All @@ -114,13 +126,18 @@ func (pq *ExpirePriorityQueue) GetExpiryFromExpirePriorityQueue() time.Duration
return pq.IdleFlowTimeout
}

func (pq *ExpirePriorityQueue) AddItemToQueue(connKey flowexporter.ConnectionKey, conn *flowexporter.Connection) {
// WriteItemToQueue adds conn with connKey into the queue. If an existing item
// has the same connKey, it will be overwritten by the new item.
func (pq *ExpirePriorityQueue) WriteItemToQueue(connKey flowexporter.ConnectionKey, conn *flowexporter.Connection) {
currTime := time.Now()
pqItem := &flowexporter.ItemToExpire{
Conn: conn,
ActiveExpireTime: currTime.Add(pq.ActiveFlowTimeout),
IdleExpireTime: currTime.Add(pq.IdleFlowTimeout),
}
// If connKey exists in pq, it is removed first to avoid having multiple pqItems with same key
// in the queue, which can cause memory leak as the previous one can't be updated or removed.
pq.Remove(connKey)
heap.Push(pq, pqItem)
pq.KeyToItem[connKey] = pqItem
}
Expand Down
32 changes: 32 additions & 0 deletions pkg/agent/flowexporter/priorityqueue/priorityqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package priorityqueue

import (
"container/heap"
"fmt"
"testing"
"time"

Expand All @@ -41,9 +42,40 @@ func TestExpirePriorityQueue(t *testing.T) {
Index: key,
}
testPriorityQueue.items = append(testPriorityQueue.items, item)
testPriorityQueue.KeyToItem[flowexporter.ConnectionKey{fmt.Sprintf("%d", key)}] = item
}
heap.Init(testPriorityQueue)

// Test WriteItemToQueue
connKey := flowexporter.ConnectionKey{"3"}
conn := flowexporter.Connection{}
testPriorityQueue.WriteItemToQueue(connKey, &conn)
assert.Equal(t, &conn, testPriorityQueue.KeyToItem[connKey].Conn, "WriteItemToQueue didn't add new conn to map")
newConn := flowexporter.Connection{}
testPriorityQueue.WriteItemToQueue(connKey, &newConn)
assert.Equal(t, &newConn, testPriorityQueue.KeyToItem[connKey].Conn, "WriteItemToQueue didn't overwrite existing conn to map")
hasOld, hasNew := false, false
for _, item := range testPriorityQueue.items {
if item.Conn == &conn {
hasOld = true
}
if item.Conn == &newConn {
hasNew = true
}
}
assert.False(t, hasOld && hasNew, "WriteItemToQueue shouldn't add two items with same key to heap")

// Test Remove
removedItem := testPriorityQueue.Remove(connKey)
assert.Equal(t, &newConn, removedItem.Conn, "Remove didn't return correct item")
_, exist := testPriorityQueue.KeyToItem[connKey]
assert.False(t, exist, "Remove didn't delete KeyToItem entry")
for _, item := range testPriorityQueue.items {
if item.Conn == &newConn {
assert.Fail(t, "Remove didn't delete item from queue")
}
}

// Add new flow to the priority queue
testFlowsWithExpire[3] = []time.Time{startTime.Add(3 * time.Second), startTime.Add(500 * time.Millisecond)}
newFlowItem := &flowexporter.ItemToExpire{
Expand Down