diff --git a/pkg/agent/flowexporter/connections/conntrack_connections.go b/pkg/agent/flowexporter/connections/conntrack_connections.go index c4aa09361ea..2af08e87463 100644 --- a/pkg/agent/flowexporter/connections/conntrack_connections.go +++ b/pkg/agent/flowexporter/connections/conntrack_connections.go @@ -135,6 +135,11 @@ func (cs *ConntrackConnectionStore) Poll() ([]int, error) { // Delete the connection if it is ready to delete or it was not exported // in the time period as specified by the stale connection timeout. if conn.ReadyToDelete || time.Since(conn.LastExportTime) >= cs.staleConnectionTimeout { + if removedItem := cs.expirePriorityQueue.Remove(key); removedItem != nil { + // In case of ReadyToDelete is true, item should already have been removed from pq + klog.V(4).InfoS("Conn removed from cs pq due to stale timeout", + "key", key, "conn", removedItem.Conn) + } if err := cs.deleteConnWithoutLock(key); err != nil { return err } @@ -239,7 +244,7 @@ func (cs *ConntrackConnectionStore) AddOrUpdateConn(conn *flowexporter.Connectio // If the connKey:pqItem pair does not exist in the map, it shows the // conn was inactive, and was removed from PQ and map. Since it becomes // active again now, we create a new pqItem and add it to PQ and map. - cs.expirePriorityQueue.AddItemToQueue(connKey, existingConn) + cs.expirePriorityQueue.WriteItemToQueue(connKey, existingConn) } else { cs.connectionStore.expirePriorityQueue.Update(existingItem, existingItem.ActiveExpireTime, time.Now().Add(cs.connectionStore.expirePriorityQueue.IdleFlowTimeout)) @@ -264,11 +269,12 @@ func (cs *ConntrackConnectionStore) AddOrUpdateConn(conn *flowexporter.Connectio conn.StartTime = time.Now() conn.StopTime = time.Now() } + conn.LastExportTime = conn.StartTime metrics.TotalAntreaConnectionsInConnTrackTable.Inc() conn.IsActive = true // Add new antrea connection to connection store and PQ. cs.connections[connKey] = conn - cs.expirePriorityQueue.AddItemToQueue(connKey, conn) + cs.expirePriorityQueue.WriteItemToQueue(connKey, conn) klog.V(4).InfoS("New Antrea flow added", "connection", conn) } } diff --git a/pkg/agent/flowexporter/connections/conntrack_connections_test.go b/pkg/agent/flowexporter/connections/conntrack_connections_test.go index b2cfdebf243..d5894376153 100644 --- a/pkg/agent/flowexporter/connections/conntrack_connections_test.go +++ b/pkg/agent/flowexporter/connections/conntrack_connections_test.go @@ -115,6 +115,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) { expectedConn: flowexporter.Connection{ StartTime: refTime, StopTime: refTime, + LastExportTime: refTime, FlowKey: tuple1, Labels: []byte{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0}, Mark: openflow.ServiceCTMark.GetValue(), @@ -136,6 +137,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) { oldConn: &flowexporter.Connection{ StartTime: refTime.Add(-(time.Second * 50)), StopTime: refTime.Add(-(time.Second * 30)), + LastExportTime: refTime.Add(-(time.Second * 50)), OriginalPackets: 0xfff, OriginalBytes: 0xbaaaaa00000000, ReversePackets: 0xf, @@ -156,6 +158,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) { expectedConn: flowexporter.Connection{ StartTime: refTime.Add(-(time.Second * 50)), StopTime: refTime, + LastExportTime: refTime.Add(-(time.Second * 50)), OriginalPackets: 0xffff, OriginalBytes: 0xbaaaaa0000000000, ReversePackets: 0xff, @@ -173,6 +176,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) { oldConn: &flowexporter.Connection{ StartTime: refTime.Add(-(time.Second * 50)), StopTime: refTime.Add(-(time.Second * 30)), + LastExportTime: refTime.Add(-(time.Second * 50)), OriginalPackets: 0xfff, OriginalBytes: 0xbaaaaa00000000, ReversePackets: 0xf, @@ -195,6 +199,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) { expectedConn: flowexporter.Connection{ StartTime: refTime.Add(-(time.Second * 50)), StopTime: refTime.Add(-(time.Second * 30)), + LastExportTime: refTime.Add(-(time.Second * 50)), OriginalPackets: 0xfff, OriginalBytes: 0xbaaaaa00000000, ReversePackets: 0xf, @@ -249,7 +254,7 @@ func testAddNewConn(mockIfaceStore *interfacestoretest.MockInterfaceStore, mockP func addConnToStore(cs *ConntrackConnectionStore, conn *flowexporter.Connection) { connKey := flowexporter.NewConnectionKey(conn) cs.AddConnToMap(&connKey, conn) - cs.expirePriorityQueue.AddItemToQueue(connKey, conn) + cs.expirePriorityQueue.WriteItemToQueue(connKey, conn) metrics.TotalAntreaConnectionsInConnTrackTable.Inc() } diff --git a/pkg/agent/flowexporter/connections/deny_connections.go b/pkg/agent/flowexporter/connections/deny_connections.go index 61cb2e3717b..f0a4ca52109 100644 --- a/pkg/agent/flowexporter/connections/deny_connections.go +++ b/pkg/agent/flowexporter/connections/deny_connections.go @@ -49,6 +49,11 @@ func (ds *DenyConnectionStore) RunPeriodicDeletion(stopCh <-chan struct{}) { case <-pollTicker.C: deleteIfStaleConn := func(key flowexporter.ConnectionKey, conn *flowexporter.Connection) error { if conn.ReadyToDelete || time.Since(conn.LastExportTime) >= ds.staleConnectionTimeout { + if removedItem := ds.expirePriorityQueue.Remove(key); removedItem != nil { + // In case of ReadyToDelete is true, item should already been removed from pq + klog.V(4).InfoS("Conn removed from ds pq due to stale timeout", + "key", key, "conn", removedItem.Conn) + } if err := ds.deleteConnWithoutLock(key); err != nil { return err } @@ -78,7 +83,7 @@ func (ds *DenyConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection, ti conn.IsActive = true existingItem, exists := ds.expirePriorityQueue.KeyToItem[connKey] if !exists { - ds.expirePriorityQueue.AddItemToQueue(connKey, conn) + ds.expirePriorityQueue.WriteItemToQueue(connKey, conn) } else { ds.connectionStore.expirePriorityQueue.Update(existingItem, existingItem.ActiveExpireTime, time.Now().Add(ds.connectionStore.expirePriorityQueue.IdleFlowTimeout)) @@ -87,6 +92,7 @@ func (ds *DenyConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection, ti } else { conn.StartTime = timeSeen conn.StopTime = timeSeen + conn.LastExportTime = timeSeen conn.OriginalBytes = bytes conn.OriginalPackets = uint64(1) ds.fillPodInfo(conn) @@ -96,7 +102,7 @@ func (ds *DenyConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection, ti metrics.TotalDenyConnections.Inc() conn.IsActive = true ds.connections[connKey] = conn - ds.expirePriorityQueue.AddItemToQueue(connKey, conn) + ds.expirePriorityQueue.WriteItemToQueue(connKey, conn) klog.V(4).InfoS("New deny connection added", "connection", conn) } } diff --git a/pkg/agent/flowexporter/connections/deny_connections_test.go b/pkg/agent/flowexporter/connections/deny_connections_test.go index 55c5508907c..8a47ec8bb21 100644 --- a/pkg/agent/flowexporter/connections/deny_connections_test.go +++ b/pkg/agent/flowexporter/connections/deny_connections_test.go @@ -74,6 +74,7 @@ func TestDenyConnectionStore_AddOrUpdateConn(t *testing.T) { denyConnStore.AddOrUpdateConn(&testFlow, refTime.Add(-(time.Second * 20)), uint64(60)) expConn := testFlow expConn.DestinationServicePortName = servicePortName.String() + expConn.LastExportTime = refTime.Add(-(time.Second * 20)) actualConn, ok := denyConnStore.GetConnByKey(flowexporter.NewConnectionKey(&testFlow)) assert.Equal(t, ok, true, "deny connection should be there in deny connection store") assert.Equal(t, expConn, *actualConn, "deny connections should be equal") diff --git a/pkg/agent/flowexporter/priorityqueue/priorityqueue.go b/pkg/agent/flowexporter/priorityqueue/priorityqueue.go index 33952def4e4..d6928fd3afa 100644 --- a/pkg/agent/flowexporter/priorityqueue/priorityqueue.go +++ b/pkg/agent/flowexporter/priorityqueue/priorityqueue.go @@ -96,6 +96,18 @@ func (pq *ExpirePriorityQueue) Update(item *flowexporter.ItemToExpire, activeExp heap.Fix(pq, item.Index) } +// Remove removes and returns an Item by key from priority queue if it exists. +func (pq *ExpirePriorityQueue) Remove(connKey flowexporter.ConnectionKey) *flowexporter.ItemToExpire { + item, exists := pq.KeyToItem[connKey] + if !exists { + return nil + } + + removedItem := heap.Remove(pq, item.Index) + delete(pq.KeyToItem, connKey) + return removedItem.(*flowexporter.ItemToExpire) +} + // GetExpiryFromExpirePriorityQueue returns the shortest expire time duration // from expire priority queue. func (pq *ExpirePriorityQueue) GetExpiryFromExpirePriorityQueue() time.Duration { @@ -114,13 +126,18 @@ func (pq *ExpirePriorityQueue) GetExpiryFromExpirePriorityQueue() time.Duration return pq.IdleFlowTimeout } -func (pq *ExpirePriorityQueue) AddItemToQueue(connKey flowexporter.ConnectionKey, conn *flowexporter.Connection) { +// WriteItemToQueue adds conn with connKey into the queue. If an existing item +// has the same connKey, it will be overwritten by the new item. +func (pq *ExpirePriorityQueue) WriteItemToQueue(connKey flowexporter.ConnectionKey, conn *flowexporter.Connection) { currTime := time.Now() pqItem := &flowexporter.ItemToExpire{ Conn: conn, ActiveExpireTime: currTime.Add(pq.ActiveFlowTimeout), IdleExpireTime: currTime.Add(pq.IdleFlowTimeout), } + // If connKey exists in pq, it is removed first to avoid having multiple pqItems with same key + // in the queue, which can cause memory leak as the previous one can't be updated or removed. + pq.Remove(connKey) heap.Push(pq, pqItem) pq.KeyToItem[connKey] = pqItem } diff --git a/pkg/agent/flowexporter/priorityqueue/priorityqueue_test.go b/pkg/agent/flowexporter/priorityqueue/priorityqueue_test.go index fa1540a26c6..be4c58fc7ff 100644 --- a/pkg/agent/flowexporter/priorityqueue/priorityqueue_test.go +++ b/pkg/agent/flowexporter/priorityqueue/priorityqueue_test.go @@ -15,6 +15,7 @@ package priorityqueue import ( "container/heap" + "fmt" "testing" "time" @@ -41,9 +42,40 @@ func TestExpirePriorityQueue(t *testing.T) { Index: key, } testPriorityQueue.items = append(testPriorityQueue.items, item) + testPriorityQueue.KeyToItem[flowexporter.ConnectionKey{fmt.Sprintf("%d", key)}] = item } heap.Init(testPriorityQueue) + // Test WriteItemToQueue + connKey := flowexporter.ConnectionKey{"3"} + conn := flowexporter.Connection{} + testPriorityQueue.WriteItemToQueue(connKey, &conn) + assert.Equal(t, &conn, testPriorityQueue.KeyToItem[connKey].Conn, "WriteItemToQueue didn't add new conn to map") + newConn := flowexporter.Connection{} + testPriorityQueue.WriteItemToQueue(connKey, &newConn) + assert.Equal(t, &newConn, testPriorityQueue.KeyToItem[connKey].Conn, "WriteItemToQueue didn't overwrite existing conn to map") + hasOld, hasNew := false, false + for _, item := range testPriorityQueue.items { + if item.Conn == &conn { + hasOld = true + } + if item.Conn == &newConn { + hasNew = true + } + } + assert.False(t, hasOld && hasNew, "WriteItemToQueue shouldn't add two items with same key to heap") + + // Test Remove + removedItem := testPriorityQueue.Remove(connKey) + assert.Equal(t, &newConn, removedItem.Conn, "Remove didn't return correct item") + _, exist := testPriorityQueue.KeyToItem[connKey] + assert.False(t, exist, "Remove didn't delete KeyToItem entry") + for _, item := range testPriorityQueue.items { + if item.Conn == &newConn { + assert.Fail(t, "Remove didn't delete item from queue") + } + } + // Add new flow to the priority queue testFlowsWithExpire[3] = []time.Time{startTime.Add(3 * time.Second), startTime.Add(500 * time.Millisecond)} newFlowItem := &flowexporter.ItemToExpire{