Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #6763: Install Pod flows after PortStatus message is #6858

Open
wants to merge 1 commit into
base: release-2.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ func run(o *Options) error {
o.config.CNISocket,
o.config.HostProcPathPrefix,
nodeConfig,
localPodInformer.Get(),
k8sClient,
routeClient,
isChaining,
Expand Down
17 changes: 9 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
module antrea.io/antrea

go 1.21.0
go 1.23.0

toolchain go1.23.2

require (
antrea.io/libOpenflow v0.14.0
antrea.io/ofnet v0.12.0
antrea.io/libOpenflow v0.15.0
antrea.io/ofnet v0.14.0
github.com/ClickHouse/clickhouse-go/v2 v2.6.1
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/Mellanox/sriovnet v1.1.0
Expand Down Expand Up @@ -58,7 +60,7 @@ require (
golang.org/x/mod v0.19.0
golang.org/x/net v0.27.0
golang.org/x/sync v0.7.0
golang.org/x/sys v0.22.0
golang.org/x/sys v0.25.0
golang.org/x/time v0.5.0
golang.org/x/tools v0.23.0
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20210506160403-92e472f520a5
Expand All @@ -73,7 +75,7 @@ require (
k8s.io/apiserver v0.29.2
k8s.io/client-go v0.29.2
k8s.io/component-base v0.29.2
k8s.io/klog/v2 v2.110.1
k8s.io/klog/v2 v2.130.1
k8s.io/kube-aggregator v0.29.2
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00
k8s.io/kubectl v0.29.2
Expand Down Expand Up @@ -113,10 +115,9 @@ require (
github.com/aws/smithy-go v1.12.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/cenk/hub v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cenkalti/hub v1.0.1 // indirect
github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa // indirect
github.com/cenkalti/hub v1.0.2 // indirect
github.com/cenkalti/rpc2 v1.0.3 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/chai2010/gettext-go v1.0.2 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
Expand Down
27 changes: 12 additions & 15 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
antrea.io/libOpenflow v0.14.0 h1:6MS1E52nGQyz/AJ8j3OrotgAc/1ubef5vc5i8ytIxFE=
antrea.io/libOpenflow v0.14.0/go.mod h1:U8YR0ithWrjwLdUUhyeCsYnlKg/mEFjG5CbPNt1V+j0=
antrea.io/ofnet v0.12.0 h1:pgygAsEZJUPK/kGmeuIesDh5hoGLpYeavSLdG/Dp8Ao=
antrea.io/ofnet v0.12.0/go.mod h1:MB3qaInEimj+T8qtpBcIQK+EqeNiY1S/WbUdGk+TzNg=
antrea.io/libOpenflow v0.15.0 h1:wGk+IVCf8piGZgC4+lbf4qfGrJG5ikzfq5Y1T5LzqmI=
antrea.io/libOpenflow v0.15.0/go.mod h1:Mq1JEjYrb6eTVA7qjZRj9plVTKcsLM8wnQ87sLLYuiQ=
antrea.io/ofnet v0.14.0 h1:BGOqg5DdRkvxpBqyoEgWmvGd4EvpacdU/Py1s6qOvSc=
antrea.io/ofnet v0.14.0/go.mod h1:W5JPYFFcRM7tLwsItgmsKqIhtW/QofyIeNsUIecFaBo=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
Expand Down Expand Up @@ -120,14 +120,12 @@ github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdn
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/cenk/hub v1.0.1 h1:RBwXNOF4a8KjD8BJ08XqN8KbrqaGiQLDrgvUGJSHuPA=
github.com/cenk/hub v1.0.1/go.mod h1:rJM1LNAW0ppT8FMMuPK6c2NP/R2nH/UthtuRySSaf6Y=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cenkalti/hub v1.0.1 h1:UMtjc6dHSaOQTO15SVA50MBIR9zQwvsukQupDrkIRtg=
github.com/cenkalti/hub v1.0.1/go.mod h1:tcYwtS3a2d9NO/0xDXVJWx3IedurUjYCqFCmpi0lpHs=
github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa h1:t+iWhuJE2aropY4uxKMVbyP+IJ29o422f7YAd73aTjg=
github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa/go.mod h1:v2npkhrXyk5BCnkNIiPdRI23Uq6uWPUQGL2hnRcRr/M=
github.com/cenkalti/hub v1.0.2 h1:Nqv9TNaA9boeO2wQFW8o87BY3zKthtnzXmWGmJqhAV8=
github.com/cenkalti/hub v1.0.2/go.mod h1:8LAFAZcCasb83vfxatMUnZHRoQcffho2ELpHb+kaTJU=
github.com/cenkalti/rpc2 v1.0.3 h1:OkMsNP/sP9seN1VRCLqhX1xkVGHPoLwWS6fZR14Ji/k=
github.com/cenkalti/rpc2 v1.0.3/go.mod h1:2yfU5b86vOr16+iY1jN3MvT6Kxc9Nf8j5iZWwUf7iaw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down Expand Up @@ -252,7 +250,6 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
Expand Down Expand Up @@ -978,8 +975,8 @@ golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand Down Expand Up @@ -1143,8 +1140,8 @@ k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUc
k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0=
k8s.io/klog/v2 v2.110.1/go.mod h1:YGtd1984u+GgbuZ7e08/yBuAfKLSO0+uR1Fhi6ExXjo=
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
k8s.io/kms v0.29.2 h1:MDsbp98gSlEQs7K7dqLKNNTwKFQRYYvO4UOlBOjNy6Y=
k8s.io/kms v0.29.2/go.mod h1:s/9RC4sYRZ/6Tn6yhNjbfJuZdb8LzlXhdlBnKizeFDo=
k8s.io/kube-aggregator v0.29.2 h1:z9qJn5wlGmGaX6EfM7OEhr6fq6SBjDKR6tPRZ/qgxeY=
Expand Down
1 change: 1 addition & 0 deletions hack/update-codegen-dockerized.sh
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ function generate_antrea_client_code {
--input-dirs "${ANTREA_PKG}/pkg/apis/crd/v1beta1" \
--input-dirs "${ANTREA_PKG}/pkg/apis/stats" \
--input-dirs "${ANTREA_PKG}/pkg/apis/stats/v1alpha1" \
--input-dirs "${ANTREA_PKG}/pkg/agent/interfacestore" \
-O zz_generated.deepcopy \
--go-header-file hack/boilerplate/license_header.go.txt

Expand Down
155 changes: 146 additions & 9 deletions pkg/agent/cniserver/pod_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,24 @@
package cniserver

import (
"bytes"
"encoding/json"
"fmt"
"net"
"strings"
"sync"
"time"

"antrea.io/libOpenflow/openflow15"
current "github.com/containernetworking/cni/pkg/types/100"
"github.com/containernetworking/cni/pkg/version"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/cniserver/ipam"
Expand Down Expand Up @@ -61,6 +69,10 @@

var (
getNSPath = util.GetNSPath
// retryInterval is the interval to re-install Pod OpenFlow entries if any error happened.
// Note, using a variable rather than constant for retryInterval because we may use a shorter time in the
// test code.
retryInterval = 5 * time.Second
)

type podConfigurator struct {
Expand All @@ -76,9 +88,19 @@
// isSecondaryNetwork is true if this instance of podConfigurator is used to configure
// Pod secondary network interfaces.
isSecondaryNetwork bool

containerAccess *containerAccessArbitrator
eventBroadcaster record.EventBroadcaster
recorder record.EventRecorder
podListerSynced cache.InformerSynced
podLister v1.PodLister
kubeClient clientset.Interface
unreadyPortQueue workqueue.TypedDelayingInterface[string]

Check failure on line 98 in pkg/agent/cniserver/pod_configuration.go

View workflow job for this annotation

GitHub Actions / Build Antrea Windows binaries

undefined: workqueue.TypedDelayingInterface

Check failure on line 98 in pkg/agent/cniserver/pod_configuration.go

View workflow job for this annotation

GitHub Actions / Build Antrea and antctl binaries

undefined: workqueue.TypedDelayingInterface

Check failure on line 98 in pkg/agent/cniserver/pod_configuration.go

View workflow job for this annotation

GitHub Actions / Go benchmark test

undefined: workqueue.TypedDelayingInterface

Check failure on line 98 in pkg/agent/cniserver/pod_configuration.go

View workflow job for this annotation

GitHub Actions / Unit test (ubuntu-latest)

undefined: workqueue.TypedDelayingInterface

Check failure on line 98 in pkg/agent/cniserver/pod_configuration.go

View workflow job for this annotation

GitHub Actions / golicense

undefined: workqueue.TypedDelayingInterface

Check failure on line 98 in pkg/agent/cniserver/pod_configuration.go

View workflow job for this annotation

GitHub Actions / Integration test

undefined: workqueue.TypedDelayingInterface

Check failure on line 98 in pkg/agent/cniserver/pod_configuration.go

View workflow job for this annotation

GitHub Actions / Unit test (windows-2022)

undefined: workqueue.TypedDelayingInterface
statusCh chan *openflow15.PortStatus
}

func newPodConfigurator(
kubeClient clientset.Interface,
ovsBridgeClient ovsconfig.OVSBridgeClient,
ofClient openflow.Client,
routeClient route.Interface,
Expand All @@ -88,20 +110,27 @@
isOvsHardwareOffloadEnabled bool,
disableTXChecksumOffload bool,
podUpdateNotifier channel.Notifier,
podInformer cache.SharedIndexInformer,
containerAccess *containerAccessArbitrator,
) (*podConfigurator, error) {
ifConfigurator, err := newInterfaceConfigurator(ovsDatapathType, isOvsHardwareOffloadEnabled, disableTXChecksumOffload)
if err != nil {
return nil, err
}
return &podConfigurator{
pc := &podConfigurator{
ovsBridgeClient: ovsBridgeClient,
ofClient: ofClient,
routeClient: routeClient,
ifaceStore: ifaceStore,
gatewayMAC: gatewayMAC,
ifConfigurator: ifConfigurator,
podUpdateNotifier: podUpdateNotifier,
}, nil
kubeClient: kubeClient,
containerAccess: containerAccess,
}
// Initiate the PortStatus message listener. This function is a no-op except on Windows.
pc.initPortStatusMonitor(podInformer)
return pc, nil
}

func parseContainerIPs(ipcs []*current.IPConfig) ([]net.IP, error) {
Expand Down Expand Up @@ -166,13 +195,13 @@
// not created for a Pod interface.
func ParseOVSPortInterfaceConfig(portData *ovsconfig.OVSPortData, portConfig *interfacestore.OVSPortConfig) *interfacestore.InterfaceConfig {
if portData.ExternalIDs == nil {
klog.V(2).Infof("OVS port %s has no external_ids", portData.Name)
klog.V(2).InfoS("OVS port has no external_ids", "port", portData.Name)
return nil
}

containerID, found := portData.ExternalIDs[ovsExternalIDContainerID]
if !found {
klog.V(2).Infof("OVS port %s has no %s in external_ids", portData.Name, ovsExternalIDContainerID)
klog.V(2).InfoS("OVS port has no containerID in external_ids", "port", portData.Name)
return nil
}

Expand All @@ -187,8 +216,7 @@

containerMAC, err := net.ParseMAC(portData.ExternalIDs[ovsExternalIDMAC])
if err != nil {
klog.Errorf("Failed to parse MAC address from OVS external config %s: %v",
portData.ExternalIDs[ovsExternalIDMAC], err)
klog.ErrorS(err, "Failed to parse MAC address from OVS external config")
}
podName, _ := portData.ExternalIDs[ovsExternalIDPodName]
podNamespace, _ := portData.ExternalIDs[ovsExternalIDPodNamespace]
Expand Down Expand Up @@ -279,7 +307,7 @@
func (pc *podConfigurator) removeInterfaces(containerID string) error {
containerConfig, found := pc.ifaceStore.GetContainerInterface(containerID)
if !found {
klog.V(2).Infof("Did not find the port for container %s in local cache", containerID)
klog.V(2).InfoS("Did not find the port for container in local cache", "container", containerID)
return nil
}

Expand Down Expand Up @@ -498,7 +526,7 @@
// disconnectInterfaceFromOVS disconnects an existing interface from ovs br-int.
func (pc *podConfigurator) disconnectInterfaceFromOVS(containerConfig *interfacestore.InterfaceConfig) error {
containerID := containerConfig.ContainerID
klog.V(2).Infof("Deleting Openflow entries for container %s", containerID)
klog.V(2).InfoS("Deleting Openflow entries for container", "container", containerID)
if !pc.isSecondaryNetwork {
if err := pc.ofClient.UninstallPodFlows(containerConfig.InterfaceName); err != nil {
return fmt.Errorf("failed to delete Openflow entries for container %s: %v", containerID, err)
Expand All @@ -513,6 +541,7 @@
if err := pc.ovsBridgeClient.DeletePort(containerConfig.PortUUID); err != nil {
return fmt.Errorf("failed to delete OVS port for container %s interface %s: %v", containerID, containerConfig.InterfaceName, err)
}

// Remove container configuration from cache.
pc.ifaceStore.DeleteInterface(containerConfig)
if !pc.isSecondaryNetwork {
Expand Down Expand Up @@ -558,7 +587,7 @@
func (pc *podConfigurator) disconnectInterceptedInterface(podName, podNamespace, containerID string) error {
containerConfig, found := pc.ifaceStore.GetContainerInterface(containerID)
if !found {
klog.V(2).Infof("Did not find the port for container %s in local cache", containerID)
klog.V(2).InfoS("Did not find the port for container in local cache", "container", containerID)
return nil
}
for _, ip := range containerConfig.IPs {
Expand All @@ -570,3 +599,111 @@
return pc.disconnectInterfaceFromOVS(containerConfig)
// TODO recover pre-connect state? repatch vethpair to original bridge etc ?? to make first CNI happy??
}

func (pc *podConfigurator) processNextWorkItem() bool {
key, quit := pc.unreadyPortQueue.Get()
if quit {
return false
}
defer pc.unreadyPortQueue.Done(key)

if err := pc.updateUnreadyPod(key); err != nil {
klog.ErrorS(err, "Failed install OpenFlow entries for OVS port interface", "name", key)
// Put the item back on the workqueue to handle any transient errors.
pc.unreadyPortQueue.AddAfter(key, retryInterval)
}
return true
}

func (pc *podConfigurator) updateUnreadyPod(ovsPort string) error {
ifConfig, found := pc.ifaceStore.GetInterfaceByName(ovsPort)
if !found {
klog.InfoS("Interface config is not found, skip processing the port", "name", ovsPort)
return nil
}

pc.containerAccess.lockContainer(ifConfig.ContainerID)
defer pc.containerAccess.unlockContainer(ifConfig.ContainerID)
// Get the InterfaceConfig again after the lock to avoid race conditions.
ifConfig, found = pc.ifaceStore.GetInterfaceByName(ovsPort)
if !found {
klog.InfoS("Interface config is not found, skip processing the port", "name", ovsPort)
return nil
}

if ifConfig.OFPort == 0 {
// Add Pod not-ready event if the pod flows are not successfully installed, and the OpenFlow port is not allocated.
// Returns error so that we can have a retry after 5s.
pc.recordPodEvent(ifConfig, false)
return fmt.Errorf("pod's OpenFlow port is not ready yet")
}

// Install OpenFlow entries for the Pod.
klog.V(2).InfoS("Setting up Openflow entries for OVS port", "port", ovsPort)
if err := pc.ofClient.InstallPodFlows(ovsPort, ifConfig.IPs, ifConfig.MAC, uint32(ifConfig.OFPort), ifConfig.VLANID, nil); err != nil {
// Add Pod not-ready event if the pod flows installation fails.
// Returns error so that we can have a retry after 5s.
pc.recordPodEvent(ifConfig, false)
return fmt.Errorf("failed to add Openflow entries for OVS port %s: %v", ovsPort, err)
}

// Notify the Pod update event to required components.
event := agenttypes.PodUpdate{
PodName: ifConfig.PodName,
PodNamespace: ifConfig.PodNamespace,
IsAdd: true,
ContainerID: ifConfig.ContainerID,
}
pc.podUpdateNotifier.Notify(event)

pc.recordPodEvent(ifConfig, true)
return nil
}

func (pc *podConfigurator) recordPodEvent(ifConfig *interfacestore.InterfaceConfig, installed bool) {
pod, err := pc.podLister.Pods(ifConfig.PodNamespace).Get(ifConfig.PodName)
if err != nil {
klog.InfoS("Unable to get Pod, skip recording Pod event", "Pod", klog.KRef(ifConfig.PodNamespace, ifConfig.PodName))
return
}

if installed {
// Add normal event to record Pod network is ready.
pc.recorder.Eventf(pod, corev1.EventTypeNormal, "NetworkReady", "Installed Pod network forwarding rules")
return
}

pc.recorder.Eventf(pod, corev1.EventTypeWarning, "NetworkNotReady", "Pod network forwarding rules not installed")
}

func (pc *podConfigurator) processPortStatusMessage(status *openflow15.PortStatus) {
// Update Pod OpenFlow entries only after the OpenFlow port state is live.
if status.Desc.State != openflow15.PS_LIVE {
return
}
ovsPort := string(bytes.Trim(status.Desc.Name, "\x00"))
ofPort := status.Desc.PortNo

ifConfig, found := pc.ifaceStore.GetInterfaceByName(ovsPort)
if !found {
klog.InfoS("Interface config is not found", "ovsPort", ovsPort)
return
}

func() {
pc.containerAccess.lockContainer(ifConfig.ContainerID)
defer pc.containerAccess.unlockContainer(ifConfig.ContainerID)
// Get the InterfaceConfig again after the lock to avoid race conditions.
ifConfig, found = pc.ifaceStore.GetInterfaceByName(ovsPort)
if !found {
klog.InfoS("Interface config is not found", "ovsPort", ovsPort)
return
}
// Update interface config with the ofPort.
newIfConfig := ifConfig.DeepCopy()
newIfConfig.OVSPortConfig.OFPort = int32(ofPort)
pc.ifaceStore.UpdateInterface(newIfConfig)
}()

pc.unreadyPortQueue.Add(ovsPort)
}
Loading
Loading