Skip to content

Commit

Permalink
Patch after adding async rule cache
Browse files Browse the repository at this point in the history
  • Loading branch information
srikartati committed Nov 11, 2020
1 parent 101b82d commit 3198f96
Show file tree
Hide file tree
Showing 14 changed files with 92 additions and 64 deletions.
5 changes: 3 additions & 2 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ func run(o *Options) error {
ifaceStore,
nodeConfig.Name,
podUpdates,
features.DefaultFeatureGate.Enabled(features.AntreaPolicy))
features.DefaultFeatureGate.Enabled(features.AntreaPolicy),
o.pollInterval)
if err != nil {
return fmt.Errorf("error creating new NetworkPolicy controller: %v", err)
}
Expand Down Expand Up @@ -282,7 +283,7 @@ func run(o *Options) error {
connections.InitializeConnTrackDumper(nodeConfig, serviceCIDRNet, o.config.OVSDatapathType, features.DefaultFeatureGate.Enabled(features.AntreaProxy)),
ifaceStore,
proxier,
ofClient,
networkPolicyController,
o.pollInterval)
pollDone := make(chan struct{})
go connStore.Run(stopCh, pollDone)
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ github.com/containernetworking/cni v0.7.1 h1:fE3r16wpSEyaqY4Z4oFrLMmIGfBYIKpPrHK
github.com/containernetworking/cni v0.7.1/go.mod h1:LGwApLUm2FpoOfxTDEeq8T9ipbpZ61X79hmU3w8FmsY=
github.com/containernetworking/plugins v0.8.2-0.20190724153215-ded2f1757770 h1:AuFzUXPFhLlTfxhHMGdCIr7wZOQ0J0dEZLoffU0KfRM=
github.com/containernetworking/plugins v0.8.2-0.20190724153215-ded2f1757770/go.mod h1:AlmXjbiLJBqvZ4vxkWAqjx1CKHVEoQf0/Ugrvc6Cv70=
github.com/contiv/libOpenflow v0.0.0-20200728044739-7c6534390721/go.mod h1:DtsPlJOByJZ+MO9YITEGUlbJ/jfh/ef0qeNyBYaeNR4=
github.com/contiv/libOpenflow v0.0.0-20201014051314-c1702744526c h1:JroumMoYWz73Oxwmy9JAXwii8jsayVw0HxKnbfxj/0o=
github.com/contiv/libOpenflow v0.0.0-20201014051314-c1702744526c/go.mod h1:DtsPlJOByJZ+MO9YITEGUlbJ/jfh/ef0qeNyBYaeNR4=
github.com/contiv/libovsdb v0.0.0-20170227191248-d0061a53e358 h1:AiA9SKyNXulsU7aAnyka3UFHYOIH00A9HvdIRnDXlg0=
Expand Down Expand Up @@ -394,10 +393,10 @@ github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYp
github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df h1:OviZH7qLw/7ZovXvuNyL3XQl8UFofeikI1NW1Gypu7k=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
github.com/vmware/go-ipfix v0.2.3 h1:El/6HuU+DTo/u+3quuhdRvhgTR+vOOoZwiv1WuNbpP4=
github.com/vmware/go-ipfix v0.2.3/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU=
github.com/wenyingd/ofnet v0.0.0-20201015012029-21df99f8161d h1:wjTew5yHsgqNXpQPIEduDLFR4pZv4iVPcRYhZGyr7Lk=
github.com/wenyingd/ofnet v0.0.0-20201015012029-21df99f8161d/go.mod h1:oF9872TvzJqLzLKDGVMItRLWJHlnwXluuIuNbOP5WKM=
github.com/vmware/go-ipfix v0.2.4 h1:zlf+GTDh/Q+t0z5dkUQBr/7bU0fxR9WjtX0WopU3GSg=
github.com/vmware/go-ipfix v0.2.4/go.mod h1:8suqePBGCX20vEh/4/ekuRjX4BsZ2zYWcD22NpAWHVU=
github.com/wenyingd/ofnet v0.0.0-20201109024835-6fd225d8c8d1 h1:jBQJP2829C09r07Rc5EWS0+LefZhY51BJG0v3pLlGGA=
github.com/wenyingd/ofnet v0.0.0-20201109024835-6fd225d8c8d1/go.mod h1:8mMMWAYBNUeTGXYKizOLETfN3WIbu3P5DgvS2jiXKdI=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xlab/handysort v0.0.0-20150421192137-fb3537ed64a1/go.mod h1:QcJo0QPSfTONNIgpN5RA8prR7fF8nkF6cTWTcNerRO8=
Expand All @@ -422,8 +421,9 @@ golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975 h1:/Tl7pH94bvbAAHBdZJT947M/+gp0+CqQXDtMRC0fseo=
golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190312203227-4b39c73a6495 h1:I6A9Ag9FpEKOjcKrRNjQkPHawoXIhKyTGfvvjFAiiAk=
golang.org/x/exp v0.0.0-20190312203227-4b39c73a6495/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand Down
7 changes: 6 additions & 1 deletion pkg/agent/controller/networkpolicy/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,18 @@ func asyncRuleCacheKeyFunc(obj interface{}) (string, error) {

// newIDAllocator returns a new *idAllocator.
// It takes a list of allocated IDs, which can be used for the restart case.
func newIDAllocator(allocatedIDs ...uint32) *idAllocator {
func newIDAllocator(flowPollInterval time.Duration, allocatedIDs ...uint32) *idAllocator {
allocator := &idAllocator{
availableSet: make(map[uint32]struct{}),
asyncRuleCache: cache.NewStore(asyncRuleCacheKeyFunc),
deleteQueue: workqueue.NewNamedDelayingQueue("async_delete_networkpolicyrule"),
}

// Set the asyncDeleteInterval based on flowPollInterval value.
if asyncDeleteInterval < flowPollInterval {
asyncDeleteInterval = flowPollInterval
}

var maxID uint32
allocatedSet := make(map[uint32]struct{}, len(allocatedIDs))
for _, id := range allocatedIDs {
Expand Down
43 changes: 31 additions & 12 deletions pkg/agent/controller/networkpolicy/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (
"github.com/vmware-tanzu/antrea/pkg/apis/controlplane/v1beta2"
)

var (
testFlowPollInterval = 5 * time.Millisecond
)

func TestNewIDAllocator(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -59,7 +63,7 @@ func TestNewIDAllocator(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := newIDAllocator(tt.args...)
got := newIDAllocator(testFlowPollInterval, tt.args...)
assert.Equalf(t, tt.expectedLastAllocatedID, got.lastAllocatedID, "Got lastAllocatedID %v, expected %v", got.lastAllocatedID, tt.expectedLastAllocatedID)
assert.Equalf(t, tt.expectedAvailableSets, got.availableSet, "Got availableSet %v, expected %v", got.availableSet, tt.expectedAvailableSets)
assert.Equalf(t, tt.expectedAvailableSlice, got.availableSlice, "Got availableSlice %v, expected %v", got.availableSlice, tt.expectedAvailableSlice)
Expand Down Expand Up @@ -105,7 +109,7 @@ func TestAllocateForRule(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
a := newIDAllocator(tt.args...)
a := newIDAllocator(testFlowPollInterval, tt.args...)
actualErr := a.allocateForRule(tt.rule)
if actualErr != tt.expectedErr {
t.Fatalf("Got error %v, expected %v", actualErr, tt.expectedErr)
Expand Down Expand Up @@ -155,7 +159,7 @@ func TestRelease(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
a := newIDAllocator(tt.newArgs...)
a := newIDAllocator(testFlowPollInterval, tt.newArgs...)
actualErr := a.release(tt.releaseArgs)
assert.Equalf(t, tt.expectedErr, actualErr, "Got error %v, expected %v", actualErr, tt.expectedErr)
assert.Equalf(t, tt.expectedAvailableSets, a.availableSet, "Got availableSet %v, expected %v", a.availableSet, tt.expectedAvailableSets)
Expand All @@ -172,23 +176,34 @@ func TestWorker(t *testing.T) {
Service: nil,
}
tests := []struct {
name string
args []uint32
rule *types.PolicyRule
expectedID uint32
expectedErr error
name string
args []uint32
deleteInterval time.Duration
rule *types.PolicyRule
expectedID uint32
expectedErr error
}{
{
"delete-rule-from-async-rule-cache",
"delete-rule-with-async-delete-interval",
nil,
5 * time.Millisecond,
rule,
1,
nil,
},
{
"delete-rule-with-flow-poll-interval",
nil,
1 * time.Millisecond,
rule,
1,
nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
a := newIDAllocator(tt.args...)
asyncDeleteInterval = tt.deleteInterval
a := newIDAllocator(testFlowPollInterval, tt.args...)
actualErr := a.allocateForRule(tt.rule)
if actualErr != tt.expectedErr {
t.Fatalf("Got error %v, expected %v", actualErr, tt.expectedErr)
Expand All @@ -197,7 +212,8 @@ func TestWorker(t *testing.T) {
defer close(stopCh)
go wait.Until(a.worker, time.Millisecond, stopCh)

a.forgetRule(tt.rule.FlowID, 5*time.Millisecond)
start := time.Now()
a.forgetRule(tt.rule.FlowID, asyncDeleteInterval)
conditionFunc := func() (bool, error) {
a.Lock()
defer a.Unlock()
Expand All @@ -206,12 +222,15 @@ func TestWorker(t *testing.T) {
}
return false, nil
}
if err := wait.Poll(time.Millisecond, time.Millisecond*10, conditionFunc); err != nil {
if err := wait.PollImmediate(time.Millisecond, time.Millisecond*10, conditionFunc); err != nil {
t.Fatalf("Expect the rule with id %v to be deleted from async rule cache", tt.expectedID)
}
_, exists, err := a.getRuleFromAsyncCache(tt.expectedID)
assert.Falsef(t, exists, "Expect rule to be not present in asyncRuleCache")
assert.NoErrorf(t, err, "getRuleFromAsyncCache should not return any error")

elapsedTime := time.Since(start)
assert.GreaterOrEqualf(t, int64(elapsedTime)/int64(time.Millisecond), int64(5), "rule should be there for at least 5ms")
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,12 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider,
ifaceStore interfacestore.InterfaceStore,
nodeName string,
podUpdates <-chan v1beta2.PodReference,
antreaPolicyEnabled bool) (*Controller, error) {
antreaPolicyEnabled bool,
flowPollInterval time.Duration) (*Controller, error) {
c := &Controller{
antreaClientProvider: antreaClientGetter,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "networkpolicyrule"),
reconciler: newReconciler(ofClient, ifaceStore),
reconciler: newReconciler(ofClient, ifaceStore, flowPollInterval),
ofClient: ofClient,
antreaPolicyEnabled: antreaPolicyEnabled,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (g *antreaClientGetter) GetAntreaClient() (versioned.Interface, error) {
func newTestController() (*Controller, *fake.Clientset, *mockReconciler) {
clientset := &fake.Clientset{}
ch := make(chan v1beta2.PodReference, 100)
controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", ch, true)
controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", ch, true, testFlowPollInterval)
reconciler := newMockReconciler()
controller.reconciler = reconciler
return controller, clientset, reconciler
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/controller/networkpolicy/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ type reconciler struct {
}

// newReconciler returns a new *reconciler.
func newReconciler(ofClient openflow.Client, ifaceStore interfacestore.InterfaceStore) *reconciler {
func newReconciler(ofClient openflow.Client, ifaceStore interfacestore.InterfaceStore, flowPollInterval time.Duration) *reconciler {
priorityAssigners := map[binding.TableIDType]*tablePriorityAssigner{}
for _, table := range openflow.GetAntreaPolicyBaselineTierTables() {
priorityAssigners[table] = &tablePriorityAssigner{
Expand All @@ -205,7 +205,7 @@ func newReconciler(ofClient openflow.Client, ifaceStore interfacestore.Interface
ofClient: ofClient,
ifaceStore: ifaceStore,
lastRealizeds: sync.Map{},
idAllocator: newIDAllocator(),
idAllocator: newIDAllocator(flowPollInterval),
priorityAssigners: priorityAssigners,
}
return reconciler
Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/controller/networkpolicy/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func TestReconcilerForget(t *testing.T) {
mockOFClient.EXPECT().UninstallPolicyRuleFlows(ofID)
}
}
r := newReconciler(mockOFClient, ifaceStore)
r := newReconciler(mockOFClient, ifaceStore, testFlowPollInterval)
for key, value := range tt.lastRealizeds {
r.lastRealizeds.Store(key, value)
}
Expand Down Expand Up @@ -511,7 +511,7 @@ func TestReconcilerReconcile(t *testing.T) {
for i := 0; i < len(tt.expectedOFRules); i++ {
mockOFClient.EXPECT().InstallPolicyRuleFlows(gomock.Any())
}
r := newReconciler(mockOFClient, ifaceStore)
r := newReconciler(mockOFClient, ifaceStore, testFlowPollInterval)
if err := r.Reconcile(tt.args); (err != nil) != tt.wantErr {
t.Fatalf("Reconcile() error = %v, wantErr %v", err, tt.wantErr)
}
Expand Down Expand Up @@ -619,7 +619,7 @@ func TestReconcilerBatchReconcile(t *testing.T) {
controller := gomock.NewController(t)
defer controller.Finish()
mockOFClient := openflowtest.NewMockClient(controller)
r := newReconciler(mockOFClient, ifaceStore)
r := newReconciler(mockOFClient, ifaceStore, testFlowPollInterval)
if tt.numInstalledRules > 0 {
// BatchInstall should skip rules already installed
r.lastRealizeds.Store(tt.args[0].ID, newLastRealized(tt.args[0]))
Expand Down Expand Up @@ -833,7 +833,7 @@ func TestReconcilerUpdate(t *testing.T) {
if len(tt.expectedDeletedTo) > 0 {
mockOFClient.EXPECT().DeletePolicyRuleAddress(gomock.Any(), types.DstAddress, gomock.Eq(tt.expectedDeletedTo), priority)
}
r := newReconciler(mockOFClient, ifaceStore)
r := newReconciler(mockOFClient, ifaceStore, testFlowPollInterval)
if err := r.Reconcile(tt.originalRule); (err != nil) != tt.wantErr {
t.Fatalf("Reconcile() error = %v, wantErr %v", err, tt.wantErr)
}
Expand Down
43 changes: 23 additions & 20 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/vmware-tanzu/antrea/pkg/agent/metrics"
"github.com/vmware-tanzu/antrea/pkg/agent/openflow"
"github.com/vmware-tanzu/antrea/pkg/agent/proxy"
"github.com/vmware-tanzu/antrea/pkg/querier"
)

var serviceProtocolMap = map[uint8]corev1.Protocol{
Expand All @@ -37,29 +38,29 @@ var serviceProtocolMap = map[uint8]corev1.Protocol{
}

type ConnectionStore struct {
connections map[flowexporter.ConnectionKey]flowexporter.Connection
connDumper ConnTrackDumper
ifaceStore interfacestore.InterfaceStore
antreaProxier proxy.Proxier
ofClient openflow.Client
pollInterval time.Duration
mutex sync.Mutex
connections map[flowexporter.ConnectionKey]flowexporter.Connection
connDumper ConnTrackDumper
ifaceStore interfacestore.InterfaceStore
antreaProxier proxy.Proxier
networkPolicyQuerier querier.AgentNetworkPolicyInfoQuerier
pollInterval time.Duration
mutex sync.Mutex
}

func NewConnectionStore(
connTrackDumper ConnTrackDumper,
ifaceStore interfacestore.InterfaceStore,
proxier proxy.Proxier,
ofClient openflow.Client,
npQuerier querier.AgentNetworkPolicyInfoQuerier,
pollInterval time.Duration,
) *ConnectionStore {
return &ConnectionStore{
connections: make(map[flowexporter.ConnectionKey]flowexporter.Connection),
connDumper: connTrackDumper,
ifaceStore: ifaceStore,
antreaProxier: proxier,
ofClient: ofClient,
pollInterval: pollInterval,
connections: make(map[flowexporter.ConnectionKey]flowexporter.Connection),
connDumper: connTrackDumper,
ifaceStore: ifaceStore,
antreaProxier: proxier,
networkPolicyQuerier: npQuerier,
pollInterval: pollInterval,
}
}

Expand Down Expand Up @@ -163,20 +164,22 @@ func (cs *ConnectionStore) addOrUpdateConn(conn *flowexporter.Connection) {
ingressOfID := binary.LittleEndian.Uint32(conn.Labels[:4])
egressOfID := binary.LittleEndian.Uint32(conn.Labels[4:8])
if ingressOfID != 0 {
policy := cs.ofClient.GetPolicyFromConjunction(ingressOfID)
policy := cs.networkPolicyQuerier.GetNetworkPolicyByRuleFlowID(ingressOfID)
if policy == nil {
// We should not hit this case at all. Log the warning and continue.
klog.Warningf("Cannot find NetworkPolicy that has ingressOfID %v", ingressOfID)
// This should not happen because the rule flow ID to rule mapping is
// preserved for max(5s, flowPollInterval) even after the rule deletion.
klog.Warningf("Cannot find NetworkPolicy that has rule with ingressOfID %v", ingressOfID)
} else {
conn.IngressNetworkPolicyName = policy.Name
conn.IngressNetworkPolicyNamespace = policy.Namespace
}
}
if egressOfID != 0 {
policy := cs.ofClient.GetPolicyFromConjunction(egressOfID)
policy := cs.networkPolicyQuerier.GetNetworkPolicyByRuleFlowID(egressOfID)
if policy == nil {
// We should not hit this case at all. Log the warning and continue.
klog.Warningf("Cannot find NetworkPolicy that has egressOfID %v", egressOfID)
// This should not happen because the rule flow ID to rule mapping is
// preserved for max(5s, flowPollInterval) even after the rule deletion.
klog.Warningf("Cannot find NetworkPolicy that has rule with egressOfID %v", egressOfID)
} else {
conn.EgressNetworkPolicyName = policy.Name
conn.EgressNetworkPolicyNamespace = policy.Namespace
Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/flowexporter/connections/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package connections

import (
"fmt"
openflowtest "github.com/vmware-tanzu/antrea/pkg/agent/openflow/testing"
"net"
"strings"
"testing"
Expand All @@ -37,6 +36,7 @@ import (
"github.com/vmware-tanzu/antrea/pkg/agent/metrics"
"github.com/vmware-tanzu/antrea/pkg/agent/openflow"
proxytest "github.com/vmware-tanzu/antrea/pkg/agent/proxy/testing"
queriertest "github.com/vmware-tanzu/antrea/pkg/querier/testing"
k8sproxy "github.com/vmware-tanzu/antrea/third_party/proxy"
)

Expand Down Expand Up @@ -139,8 +139,8 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
mockProxier := proxytest.NewMockProxier(ctrl)
mockOfClient := openflowtest.NewMockClient(ctrl)
connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, mockProxier, mockOfClient, testPollInterval)
npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl)
connStore := NewConnectionStore(mockConnDumper, mockIfaceStore, mockProxier, npQuerier, testPollInterval)

// Add flow1conn to the Connection map
testFlow1Tuple := flowexporter.NewConnectionKey(&testFlow1)
Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/flowexporter/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ type Connection struct {
// IsActive flag helps in cleaning up connections when they are not in conntrack any module more.
IsActive bool
// DoExport flag helps in tagging connections that can be exported by Flow Exporter
DoExport bool
Zone uint16
Mark uint32
StatusFlag uint32
DoExport bool
Zone uint16
Mark uint32
StatusFlag uint32
Labels, LabelsMask []byte
// TODO: Have a separate field for protocol. No need to keep it in Tuple.
TupleOrig, TupleReply Tuple
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/stats/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (m *Collector) collect() *statsCollection {
policyRef := m.networkPolicyQuerier.GetNetworkPolicyByRuleFlowID(ofID)
if policyRef == nil {
// This should not happen because the rule flow ID to rule mapping is
// preserved for 5 seconds even after the rule deletion.
// preserved for at least 5 seconds even after the rule deletion.
klog.Warningf("Cannot find NetworkPolicy that has ofID %v", ofID)
continue
}
Expand Down
Loading

0 comments on commit 3198f96

Please sign in to comment.