Skip to content

Commit

Permalink
Fix unexpected reporting of empty MulticastGroupInfo
Browse files Browse the repository at this point in the history
The empty MulticastGroupInfo will be reported unexpectedly during a single report session when:
1. Statistics of NP, ANP, or ANCP are getting updated.
2. MulticastGroupInfo is not refreshed.
Fix: Always report updated MulticastGroupInfo when reporting is necessary.

Signed-off-by: ceclinux <[email protected]>
  • Loading branch information
Ruochen authored and ceclinux committed Sep 21, 2022
1 parent b36a884 commit 2081a30
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 18 deletions.
39 changes: 21 additions & 18 deletions pkg/agent/stats/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,25 +204,12 @@ func isIdenticalMulticastGroupMap(a, b map[string][]cpv1beta.PodReference) bool
return true
}

// report calculates the delta of the stats and pushes it to the antrea-controller summary API.
// If multicast feature gate is enabled, it also sends the full multicast group and IGMP report stats to the antrea-controller.
func (m *Collector) report(curStatsCollection *statsCollection) error {
func (m *Collector) calculateStatsSummary(curStatsCollection *statsCollection) *cpv1beta.NodeStatsSummary {
npStats := calculateDiff(curStatsCollection.networkPolicyStats, m.lastStatsCollection.networkPolicyStats)
acnpStats := calculateRuleDiff(curStatsCollection.antreaClusterNetworkPolicyStats, m.lastStatsCollection.antreaClusterNetworkPolicyStats)
anpStats := calculateRuleDiff(curStatsCollection.antreaNetworkPolicyStats, m.lastStatsCollection.antreaNetworkPolicyStats)

var multicastGroups []cpv1beta.MulticastGroupInfo
multicastGroupsUpdated := false
if m.multicastEnabled {
// multicastGroups should be reported if the multicast group Pod membership has changed since the last collect.
if !isIdenticalMulticastGroupMap(m.lastStatsCollection.multicastGroups, curStatsCollection.multicastGroups) {
multicastGroupsUpdated = true
multicastGroups = make([]cpv1beta.MulticastGroupInfo, 0, len(curStatsCollection.multicastGroups))
for group, pods := range curStatsCollection.multicastGroups {
multicastGroups = append(multicastGroups, cpv1beta.MulticastGroupInfo{Group: group, Pods: pods})
}
}

// Collect statistics of IGMP report messages hit by ANP or ACNP, and merge them to anpStats and acnpStats.
// Note IGMP reports statistics may be lost if NodeStatsSummary is not reported successfully.
multicastANPStatsMap, multicastACNPStatsMap := m.multicastQuerier.CollectIGMPReportNPStats()
Expand Down Expand Up @@ -250,12 +237,19 @@ func (m *Collector) report(curStatsCollection *statsCollection) error {
anpStats = mergeReportStats(multicastANPStatsMap, anpStats)
}

if len(npStats) == 0 && len(acnpStats) == 0 && len(anpStats) == 0 && !multicastGroupsUpdated {
klog.V(4).Info("No stats to report, skip reporting")
if len(npStats) == 0 && len(acnpStats) == 0 && len(anpStats) == 0 && isIdenticalMulticastGroupMap(m.lastStatsCollection.multicastGroups, curStatsCollection.multicastGroups) {
return nil
}

summary := &cpv1beta.NodeStatsSummary{
var multicastGroups []cpv1beta.MulticastGroupInfo
if m.multicastEnabled {
multicastGroups = make([]cpv1beta.MulticastGroupInfo, 0, len(curStatsCollection.multicastGroups))
for group, pods := range curStatsCollection.multicastGroups {
multicastGroups = append(multicastGroups, cpv1beta.MulticastGroupInfo{Group: group, Pods: pods})
}
}

return &cpv1beta.NodeStatsSummary{
ObjectMeta: metav1.ObjectMeta{
Name: m.nodeName,
},
Expand All @@ -264,8 +258,17 @@ func (m *Collector) report(curStatsCollection *statsCollection) error {
AntreaNetworkPolicies: anpStats,
Multicast: multicastGroups,
}
klog.V(6).Infof("Reporting NodeStatsSummary: %v", summary)
}

// report calculates the delta of the stats and pushes it to the antrea-controller summary API.
// If multicast feature gate is enabled, it also sends the full multicast group and IGMP report stats to the antrea-controller.
func (m *Collector) report(curStatsCollection *statsCollection) error {
summary := m.calculateStatsSummary(curStatsCollection)
if summary == nil {
klog.V(4).Info("No stats to report, skip reporting")
return nil
}
klog.V(4).Infof("Reporting NodeStatsSummary: %v", summary)
antreaClient, err := m.antreaClientProvider.GetAntreaClient()
if err != nil {
return err
Expand Down
87 changes: 87 additions & 0 deletions pkg/agent/stats/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,93 @@ func TestCalculateDiff(t *testing.T) {
}
}

func TestCalculateStatsSummary(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

tests := []struct {
name string
lastStatsCollection *statsCollection
curStatsCollection *statsCollection
expectedSummary *cpv1beta.NodeStatsSummary
}{
{
name: "only multicaststats",
lastStatsCollection: &statsCollection{
multicastGroups: map[string][]cpv1beta.PodReference{
"225.3.4.5": {
{Name: "bar2", Namespace: "foo2"},
},
},
},
curStatsCollection: &statsCollection{
multicastGroups: map[string][]cpv1beta.PodReference{
"225.3.4.5": {
{Name: "bar2", Namespace: "foo2"},
},
},
},
expectedSummary: nil,
},
{
name: "anp and multicaststats",
lastStatsCollection: &statsCollection{
multicastGroups: map[string][]cpv1beta.PodReference{
"225.3.4.5": {
{Name: "bar3", Namespace: "foo3"},
},
},
},
curStatsCollection: &statsCollection{
networkPolicyStats: map[types.UID]*statsv1alpha1.TrafficStats{
np1.UID: {
Bytes: 25,
Packets: 3,
Sessions: 2,
},
},
multicastGroups: map[string][]cpv1beta.PodReference{
"225.3.4.5": {
{Name: "bar3", Namespace: "foo3"},
},
},
},
expectedSummary: &cpv1beta.NodeStatsSummary{
Multicast: []cpv1beta.MulticastGroupInfo{
{
Group: "225.3.4.5", Pods: []cpv1beta.PodReference{
{Name: "bar3", Namespace: "foo3"}},
},
},
NetworkPolicies: []cpv1beta.NetworkPolicyStats{
{
NetworkPolicy: cpv1beta.NetworkPolicyReference{UID: "uid1"},
TrafficStats: statsv1alpha1.TrafficStats{
Bytes: 25,
Packets: 3,
Sessions: 2,
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ofClient := oftest.NewMockClient(ctrl)
npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl)
mcQuerier := queriertest.NewMockAgentMulticastInfoQuerier(ctrl)
ofClient.EXPECT().NetworkPolicyMetrics().AnyTimes()
npQuerier.EXPECT().GetRuleByFlowID(gomock.Any()).AnyTimes()
mcQuerier.EXPECT().CollectIGMPReportNPStats().AnyTimes()

m := &Collector{ofClient: ofClient, networkPolicyQuerier: npQuerier, multicastQuerier: mcQuerier, lastStatsCollection: tt.lastStatsCollection, multicastEnabled: true}
summary := m.calculateStatsSummary(tt.curStatsCollection)
assert.Equal(t, tt.expectedSummary, summary)
})
}
}

func TestCalculateRuleDiff(t *testing.T) {
tests := []struct {
name string
Expand Down

0 comments on commit 2081a30

Please sign in to comment.