Skip to content

Commit

Permalink
server: use a shared informer pod cache rather than direct apiserver …
Browse files Browse the repository at this point in the history
…access

When running in server mode we can use a shared informer to listen for
Pod events from the apiserver, and grab pod info from that cache rather
than doing direct apiserver requests each time.

This reduces apiserver load and retry latency, since multus can poll
the local cache more frequently than it should do direct apiserver
requests.

Signed-off-by: Dan Williams <[email protected]>
  • Loading branch information
dcbw committed Sep 8, 2023
1 parent f6776b5 commit bab52cb
Show file tree
Hide file tree
Showing 7 changed files with 332 additions and 110 deletions.
4 changes: 2 additions & 2 deletions cmd/multus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func main() {

skel.PluginMain(
func(args *skel.CmdArgs) error {
result, err := multus.CmdAdd(args, nil, nil)
result, err := multus.CmdAdd(args, nil, nil, nil)
if err != nil {
return err
}
Expand All @@ -54,6 +54,6 @@ func main() {
func(args *skel.CmdArgs) error {
return multus.CmdCheck(args, nil, nil)
},
func(args *skel.CmdArgs) error { return multus.CmdDel(args, nil, nil) },
func(args *skel.CmdArgs) error { return multus.CmdDel(args, nil, nil, nil) },
cniversion.All, "meta-plugin that delegates to other CNI plugins")
}
79 changes: 52 additions & 27 deletions pkg/multus/multus.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
k8snet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"

k8s "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/k8sclient"
"gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/logging"
Expand All @@ -46,8 +48,9 @@ import (
)

const (
shortPollDuration = 250 * time.Millisecond
shortPollTimeout = 2500 * time.Millisecond
shortPollDuration = 250 * time.Millisecond
informerPollDuration = 50 * time.Millisecond
shortPollTimeout = 2500 * time.Millisecond
)

var (
Expand Down Expand Up @@ -492,7 +495,7 @@ func cmdPluginErr(k8sArgs *types.K8sArgs, confName string, format string, args .
return logging.Errorf(msg+format, args...)
}

func isCriticalRequestRetriable(err error) bool {
func isCriticalRequestRetriable(err error, retryOnNotFound bool) bool {
logging.Debugf("isCriticalRequestRetriable: %v", err)
errorTypesAllowingRetry := []func(error) bool{
errors.IsServiceUnavailable, errors.IsInternalError, k8snet.IsConnectionReset, k8snet.IsConnectionRefused}
Expand All @@ -501,12 +504,12 @@ func isCriticalRequestRetriable(err error) bool {
return true
}
}
return false
return retryOnNotFound && errors.IsNotFound(err)
}

// GetPod retrieves Kubernetes Pod object from given namespace/name in k8sArgs (i.e. cni args)
// GetPod also get pod UID, but it is not used to retrieve, but it is used for double check
func GetPod(kubeClient *k8s.ClientInfo, k8sArgs *types.K8sArgs, warnOnly bool) (*v1.Pod, error) {
func GetPod(kubeClient *k8s.ClientInfo, podInformer cache.SharedIndexInformer, k8sArgs *types.K8sArgs, isDel bool) (*v1.Pod, error) {
if kubeClient == nil {
return nil, nil
}
Expand All @@ -515,31 +518,53 @@ func GetPod(kubeClient *k8s.ClientInfo, k8sArgs *types.K8sArgs, warnOnly bool) (
podName := string(k8sArgs.K8S_POD_NAME)
podUID := string(k8sArgs.K8S_POD_UID)

pod, err := kubeClient.GetPod(podNamespace, podName)
if err != nil {
// in case of a retriable error, retry 10 times with 0.25 sec interval
if isCriticalRequestRetriable(err) {
waitErr := wait.PollImmediate(shortPollDuration, shortPollTimeout, func() (bool, error) {
pod, err = kubeClient.GetPod(podNamespace, podName)
return pod != nil, err
})
// retry failed, then return error with retry out
if waitErr != nil {
return nil, cmdErr(k8sArgs, "error waiting for pod: %v", err)
}
} else if warnOnly && errors.IsNotFound(err) {
// If not found, proceed to remove interface with cache
// Keep track of how long getting the pod takes
logging.Debugf("GetPod for [%s/%s] starting", podNamespace, podName)
start := time.Now()
defer func() {
logging.Debugf("GetPod for [%s/%s] took %v", podNamespace, podName, time.Since(start))
}()

// Standard getter grabs pod directly from the apiserver
podGetter := func(ns, name string) (*v1.Pod, error) {
return kubeClient.GetPod(ns, name)
}
// Use a fairly long 0.25 sec interval so we don't hammer the apiserver
pollDuration := shortPollDuration
retryOnNotFound := false

if podInformer != nil {
logging.Debugf("GetPod for [%s/%s] will use informer cache", podNamespace, podName)
// If we have an informer get the pod from the informer cache
podGetter = func(ns, name string) (*v1.Pod, error) {
return listers.NewPodLister(podInformer.GetIndexer()).Pods(ns).Get(name)
}
// Use short retry intervals with the informer since it's a local cache
pollDuration = informerPollDuration
// Retry NotFound on ADD since the cache may be a bit behind the apiserver
retryOnNotFound = !isDel
}

var pod *v1.Pod
if err := wait.PollImmediate(pollDuration, shortPollTimeout, func() (bool, error) {
var getErr error
pod, getErr = podGetter(podNamespace, podName)
if isCriticalRequestRetriable(getErr, retryOnNotFound) {
return false, nil
}
return pod != nil, getErr
}); err != nil {
if isDel && errors.IsNotFound(err) {
// On DEL pod may already be gone from apiserver/informer
return nil, nil
} else {
// Other case, return error
return nil, cmdErr(k8sArgs, "error getting pod: %v", err)
}
return nil, cmdErr(k8sArgs, "error waiting for pod: %v", err)
}

// In case of static pod, UID through kube api is different because of mirror pod, hence it is expected.
if podUID != "" && string(pod.UID) != podUID && !k8s.IsStaticPod(pod) {
msg := fmt.Sprintf("expected pod UID %q but got %q from Kube API", podUID, pod.UID)
if warnOnly {
if isDel {
// On CNI DEL we just operate on the cache when these mismatch, we don't error out.
// For example: stateful sets namespace/name can remain the same while podUID changes.
logging.Verbosef("warning: %s", msg)
Expand All @@ -552,7 +577,7 @@ func GetPod(kubeClient *k8s.ClientInfo, k8sArgs *types.K8sArgs, warnOnly bool) (
}

// CmdAdd ...
func CmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) (cnitypes.Result, error) {
func CmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo, podInformer cache.SharedIndexInformer) (cnitypes.Result, error) {
n, err := types.LoadNetConf(args.StdinData)
logging.Debugf("CmdAdd: %v, %v, %v", args, exec, kubeClient)
if err != nil {
Expand All @@ -575,7 +600,7 @@ func CmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) (c
}
}

pod, err := GetPod(kubeClient, k8sArgs, false)
pod, err := GetPod(kubeClient, podInformer, k8sArgs, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -772,7 +797,7 @@ func CmdCheck(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo)
}

// CmdDel ...
func CmdDel(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) error {
func CmdDel(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo, podInformer cache.SharedIndexInformer) error {
in, err := types.LoadNetConf(args.StdinData)
logging.Debugf("CmdDel: %v, %v, %v", args, exec, kubeClient)
if err != nil {
Expand Down Expand Up @@ -814,7 +839,7 @@ func CmdDel(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) er
return cmdErr(nil, "error getting k8s client: %v", err)
}

pod, err := GetPod(kubeClient, k8sArgs, true)
pod, err := GetPod(kubeClient, podInformer, k8sArgs, true)
if err != nil {
// GetPod may be failed but just do print error in its log and continue to delete
logging.Errorf("Multus: GetPod failed: %v, but continue to delete", err)
Expand Down
36 changes: 18 additions & 18 deletions pkg/multus/multus_cni020_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,14 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
}`
fExec.addPlugin020(nil, "net1", expectedConf2, expectedResult2, nil)

result, err := CmdAdd(args, fExec, nil)
result, err := CmdAdd(args, fExec, nil, nil)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.addIndex).To(Equal(len(fExec.plugins)))
r := result.(*types020.Result)
// plugin 1 is the masterplugin
Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue())

err = CmdDel(args, fExec, nil)
err = CmdDel(args, fExec, nil, nil)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.delIndex).To(Equal(len(fExec.plugins)))
})
Expand Down Expand Up @@ -203,14 +203,14 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
}`
fExec.addPlugin020(nil, "net1", expectedConf2, expectedResult2, nil)

result, err := CmdAdd(args, fExec, nil)
result, err := CmdAdd(args, fExec, nil, nil)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.addIndex).To(Equal(len(fExec.plugins)))
r := result.(*types020.Result)
// plugin 1 is the masterplugin
Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue())

err = CmdDel(args, fExec, nil)
err = CmdDel(args, fExec, nil, nil)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.delIndex).To(Equal(len(fExec.plugins)))
})
Expand Down Expand Up @@ -265,7 +265,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
}`
fExec.addPlugin020(nil, "net1", expectedConf2, expectedResult2, nil)

_, err := CmdAdd(args, fExec, nil)
_, err := CmdAdd(args, fExec, nil, nil)
Expect(err).To(MatchError("[//:weave1]: error adding container to network \"weave1\": DelegateAdd: cannot set \"weave-net\" interface name to \"eth0\": validateIfName: no net namespace fsdadfad found: failed to Statfs \"fsdadfad\": no such file or directory"))
})

Expand Down Expand Up @@ -319,10 +319,10 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
}`
fExec.addPlugin020(nil, "net1", expectedConf2, expectedResult2, nil)

_, err := CmdAdd(args, fExec, nil)
_, err := CmdAdd(args, fExec, nil, nil)
Expect(err).To(HaveOccurred())

err = CmdDel(args, fExec, nil)
err = CmdDel(args, fExec, nil, nil)
Expect(err).To(HaveOccurred())
})

Expand Down Expand Up @@ -363,7 +363,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
err := fmt.Errorf("expected plugin failure")
fExec.addPlugin020(nil, "net1", expectedConf2, nil, err)

_, err = CmdAdd(args, fExec, nil)
_, err = CmdAdd(args, fExec, nil, nil)
Expect(fExec.addIndex).To(Equal(2))
Expect(fExec.delIndex).To(Equal(2))
Expect(err).To(MatchError("[//:other1]: error adding container to network \"other1\": expected plugin failure"))
Expand Down Expand Up @@ -409,7 +409,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
err := fmt.Errorf("expected plugin failure")
fExec.addPlugin020(nil, "net1", expectedConf2, nil, err)

_, err = CmdAdd(args, fExec, nil)
_, err = CmdAdd(args, fExec, nil, nil)
Expect(fExec.addIndex).To(Equal(1))
Expect(fExec.delIndex).To(Equal(2))
Expect(err).To(HaveOccurred())
Expand Down Expand Up @@ -491,7 +491,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net3", net3))
Expect(err).NotTo(HaveOccurred())

result, err := CmdAdd(args, fExec, clientInfo)
result, err := CmdAdd(args, fExec, clientInfo, nil)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.addIndex).To(Equal(len(fExec.plugins)))
r := result.(*types020.Result)
Expand Down Expand Up @@ -557,7 +557,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1))
Expect(err).NotTo(HaveOccurred())

result, err := CmdAdd(args, fExec, fKubeClient)
result, err := CmdAdd(args, fExec, fKubeClient, nil)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.addIndex).To(Equal(len(fExec.plugins)))
r := result.(*types020.Result)
Expand All @@ -568,7 +568,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
err = fKubeClient.DeletePod(fakePod.ObjectMeta.Namespace, fakePod.ObjectMeta.Name)
Expect(err).NotTo(HaveOccurred())

err = CmdDel(args, fExec, fKubeClient)
err = CmdDel(args, fExec, fKubeClient, nil)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.delIndex).To(Equal(len(fExec.plugins)))
})
Expand Down Expand Up @@ -609,13 +609,13 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
_, err := fKubeClient.AddNetAttachDef(testhelpers.NewFakeNetAttachDef("kube-system", "net1", net1))
Expect(err).NotTo(HaveOccurred())

result, err := CmdAdd(args, fExec, fKubeClient)
result, err := CmdAdd(args, fExec, fKubeClient, nil)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.addIndex).To(Equal(len(fExec.plugins)))
r := result.(*types020.Result)
Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue())

err = CmdDel(args, fExec, fKubeClient)
err = CmdDel(args, fExec, fKubeClient, nil)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.delIndex).To(Equal(len(fExec.plugins)))
})
Expand Down Expand Up @@ -674,7 +674,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
_, err = fKubeClient.AddNetAttachDef(
testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1))
Expect(err).NotTo(HaveOccurred())
result, err := CmdAdd(args, fExec, fKubeClient)
result, err := CmdAdd(args, fExec, fKubeClient, nil)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.addIndex).To(Equal(len(fExec.plugins)))
r := result.(*types020.Result)
Expand All @@ -687,7 +687,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
Expect(err).NotTo(HaveOccurred())

By("Delete and check net count is not incremented")
err = CmdDel(args, fExec, fKubeClient)
err = CmdDel(args, fExec, fKubeClient, nil)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.delIndex).To(Equal(len(fExec.plugins)))
})
Expand Down Expand Up @@ -746,7 +746,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
_, err = fKubeClient.AddNetAttachDef(
testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1))
Expect(err).NotTo(HaveOccurred())
result, err := CmdAdd(args, fExec, fKubeClient)
result, err := CmdAdd(args, fExec, fKubeClient, nil)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.addIndex).To(Equal(len(fExec.plugins)))
r := result.(*types020.Result)
Expand All @@ -762,7 +762,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
Expect(err).NotTo(HaveOccurred())

By("Delete and check pod/net count is incremented")
err = CmdDel(args, fExec, fKubeClient)
err = CmdDel(args, fExec, fKubeClient, nil)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.delIndex).To(Equal(len(fExec.plugins)))
})
Expand Down
Loading

0 comments on commit bab52cb

Please sign in to comment.