Skip to content

Commit

Permalink
udpate
Browse files Browse the repository at this point in the history
  • Loading branch information
dcbw committed Sep 8, 2023
1 parent 235783a commit 55fd344
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 36 deletions.
59 changes: 23 additions & 36 deletions pkg/multus/multus.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,24 +495,21 @@ func cmdPluginErr(k8sArgs *types.K8sArgs, confName string, format string, args .
return logging.Errorf(msg+format, args...)
}

func isCriticalRequestRetriable(err error, otherFn func(error) bool) bool {
func isCriticalRequestRetriable(err error, retryOnNotFound bool) bool {
logging.Debugf("isCriticalRequestRetriable: %v", err)
errorTypesAllowingRetry := []func(error) bool{
errors.IsServiceUnavailable, errors.IsInternalError, k8snet.IsConnectionReset, k8snet.IsConnectionRefused}
if otherFn != nil {
errorTypesAllowingRetry = append(errorTypesAllowingRetry, otherFn)
}
for _, f := range errorTypesAllowingRetry {
if f(err) {
return true
}
}
return false
return retryOnNotFound && errors.IsNotFound(err)
}

// GetPod retrieves Kubernetes Pod object from given namespace/name in k8sArgs (i.e. cni args)
// GetPod also get pod UID, but it is not used to retrieve, but it is used for double check
func GetPod(kubeClient *k8s.ClientInfo, podInformer cache.SharedIndexInformer, k8sArgs *types.K8sArgs, warnOnly bool) (*v1.Pod, error) {
func GetPod(kubeClient *k8s.ClientInfo, podInformer cache.SharedIndexInformer, k8sArgs *types.K8sArgs, isDel bool) (*v1.Pod, error) {
if kubeClient == nil {
return nil, nil
}
Expand All @@ -528,56 +525,46 @@ func GetPod(kubeClient *k8s.ClientInfo, podInformer cache.SharedIndexInformer, k
logging.Debugf("GetPod for [%s/%s] took %v", podNamespace, podName, time.Since(start))
}()

var pod *v1.Pod
var err error
var retryErrFunc func(error) bool

// Default to direct apiserver request
// Standard getting grabs pod directly from the apiserver
podGetter := func(ns, name string) (*v1.Pod, error) {
return kubeClient.GetPod(ns, name)
}
// Use a fairly long 0.25 sec interval so we don't hammer the apiserver
pollDuration := shortPollDuration
retryOnNotFound := false

if podInformer != nil {
logging.Debugf("GetPod for [%s/%s] will use informer cache", podNamespace, podName)
// Use the shared informer cache to reduce apiserver load
// If we have an informer get the pod from the informer cache
podGetter = func(ns, name string) (*v1.Pod, error) {
return listers.NewPodLister(podInformer.GetIndexer()).Pods(ns).Get(name)
}
// NotFound is a retriable error since the cache may be a bit behind the apiserver
retryErrFunc = errors.IsNotFound
// We can poll the informer cache more frequently since it's local
// Use short retry intervals with the informer since it's a local cache
pollDuration = informerPollDuration
// Retry NotFound on ADD since the cache may be a bit behind the apiserver
retryOnNotFound = !isDel
}

pod, err = podGetter(podNamespace, podName)
if err != nil {
// in case of a retriable error, retry 10 times with 0.25 sec interval
if isCriticalRequestRetriable(err, retryErrFunc) {
waitErr := wait.PollImmediate(pollDuration, shortPollTimeout, func() (bool, error) {
pod, err = podGetter(podNamespace, podName)
if retryErrFunc != nil && retryErrFunc(err) {
return false, nil
}
return pod != nil, err
})
// retry failed, then return error with retry out
if waitErr != nil {
return nil, cmdErr(k8sArgs, "error waiting for pod: %v", err)
}
} else if warnOnly && errors.IsNotFound(err) {
// If not found, proceed to remove interface with cache
var pod *v1.Pod
if err := wait.PollImmediate(pollDuration, shortPollTimeout, func() (bool, error) {
var getErr error
pod, getErr = podGetter(podNamespace, podName)
if isCriticalRequestRetriable(getErr, retryOnNotFound) {
return false, nil
}
return pod != nil, getErr
}); err != nil {
if isDel && errors.IsNotFound(err) {
// On DEL pod may already be gone from apiserver/informer
return nil, nil
} else {
// Other case, return error
return nil, cmdErr(k8sArgs, "error getting pod: %v", err)
}
return nil, cmdErr(k8sArgs, "error waiting for pod: %v", err)
}

// In case of static pod, UID through kube api is different because of mirror pod, hence it is expected.
if podUID != "" && string(pod.UID) != podUID && !k8s.IsStaticPod(pod) {
msg := fmt.Sprintf("expected pod UID %q but got %q from Kube API", podUID, pod.UID)
if warnOnly {
if isDel {
// On CNI DEL we just operate on the cache when these mismatch, we don't error out.
// For example: stateful sets namespace/name can remain the same while podUID changes.
logging.Verbosef("warning: %s", msg)
Expand Down
58 changes: 58 additions & 0 deletions pkg/multus/multus_cni100_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"os"
"reflect"
"sync"
"time"

"github.com/containernetworking/cni/pkg/skel"
Expand Down Expand Up @@ -923,6 +924,63 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() {
Expect(fExec.delIndex).To(Equal(len(fExec.plugins)))
})

It("executes clusterNetwork delegate with a shared informer if pod is not immediately found", func() {
fakePod := testhelpers.NewFakePod("testpod", "", "kube-system/net1")
net1 := `{
"name": "net1",
"type": "mynet",
"cniVersion": "1.0.0"
}`
expectedResult1 := &cni100.Result{
CNIVersion: "1.0.0",
IPs: []*cni100.IPConfig{{
Address: *testhelpers.EnsureCIDR("1.1.1.2/24"),
},
},
}
args := &skel.CmdArgs{
ContainerID: "123456789",
Netns: testNS.Path(),
IfName: "eth0",
Args: fmt.Sprintf("K8S_POD_NAME=%s;K8S_POD_NAMESPACE=%s", fakePod.ObjectMeta.Name, fakePod.ObjectMeta.Namespace),
StdinData: []byte(`{
"name": "node-cni-network",
"type": "multus",
"kubeconfig": "/etc/kubernetes/node-kubeconfig.yaml",
"defaultNetworks": [],
"clusterNetwork": "net1",
"delegates": []
}`),
}

fExec := newFakeExec()
fExec.addPlugin100(nil, "eth0", net1, expectedResult1, nil)

fKubeClient := NewFakeClientInfo()
_, err := fKubeClient.AddNetAttachDef(testhelpers.NewFakeNetAttachDef("kube-system", "net1", net1))
Expect(err).NotTo(HaveOccurred())

podInformer := newPodInformer(ctx, fKubeClient.Client)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
wg.Done()
time.Sleep(1 * time.Second)
fKubeClient.AddPod(fakePod)
}()
wg.Wait()

result, err := CmdAdd(args, fExec, fKubeClient, podInformer)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.addIndex).To(Equal(len(fExec.plugins)))
Expect(reflect.DeepEqual(result, expectedResult1)).To(BeTrue())

err = CmdDel(args, fExec, fKubeClient, podInformer)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.delIndex).To(Equal(len(fExec.plugins)))
})

It("Verify the cache is created in dataDir", func() {
tmpCNIDir := tmpDir + "/cniData"
err := os.Mkdir(tmpCNIDir, 0777)
Expand Down

0 comments on commit 55fd344

Please sign in to comment.