Skip to content

Commit

Permalink
fix: SidecarSet Expectations Leakage Bug (openkruise#1301)
Browse files Browse the repository at this point in the history
* fix: SidecarSet Expectations Leakage Bug

Signed-off-by: wangwenchao7 <[email protected]>

* fix: Modify the code as suggested

Signed-off-by: wangwenchao7 <[email protected]>

* fix: use ObserveUpdated func when update

Signed-off-by: wangwenchao7 <[email protected]>

* fix: observeUpdated before type check
Signed-off-by: wangwenchao7 <[email protected]>

---------

Signed-off-by: wangwenchao7 <[email protected]>
Co-authored-by: wangwenchao7 <[email protected]>
  • Loading branch information
2 people authored and 李龙峰 committed Sep 12, 2023
1 parent a155440 commit f2ac7be
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 55 deletions.
3 changes: 3 additions & 0 deletions pkg/control/sidecarcontrol/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/openkruise/kruise/pkg/util"
utilclient "github.com/openkruise/kruise/pkg/util/client"
"github.com/openkruise/kruise/pkg/util/configuration"
"github.com/openkruise/kruise/pkg/util/expectations"
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -67,6 +68,8 @@ var (
// SidecarIgnoredNamespaces = []string{"kube-system", "kube-public"}
// SubPathExprEnvReg format: $(ODD_NAME)、$(POD_NAME)...
SubPathExprEnvReg, _ = regexp.Compile(`\$\(([-._a-zA-Z][-._a-zA-Z0-9]*)\)`)

UpdateExpectations = expectations.NewUpdateExpectations(RevisionAdapterImpl)
)

type SidecarSetUpgradeSpec struct {
Expand Down
10 changes: 3 additions & 7 deletions pkg/controller/sidecarset/sidecarset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/control/sidecarcontrol"
utilclient "github.com/openkruise/kruise/pkg/util/client"
utildiscovery "github.com/openkruise/kruise/pkg/util/discovery"
"github.com/openkruise/kruise/pkg/util/expectations"
"github.com/openkruise/kruise/pkg/util/ratelimiter"
)

Expand Down Expand Up @@ -66,13 +64,12 @@ func Add(mgr manager.Manager) error {

// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
expectations := expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl)
recorder := mgr.GetEventRecorderFor("sidecarset-controller")
cli := utilclient.NewClientFromManager(mgr, "sidecarset-controller")
return &ReconcileSidecarSet{
Client: cli,
scheme: mgr.GetScheme(),
processor: NewSidecarSetProcessor(cli, expectations, recorder),
processor: NewSidecarSetProcessor(cli, recorder),
}
}

Expand Down Expand Up @@ -115,9 +112,8 @@ var _ reconcile.Reconciler = &ReconcileSidecarSet{}
// ReconcileSidecarSet reconciles a SidecarSet object
type ReconcileSidecarSet struct {
client.Client
scheme *runtime.Scheme
updateExpectations expectations.UpdateExpectations
processor *Processor
scheme *runtime.Scheme
processor *Processor
}

// +kubebuilder:rbac:groups=apps.kruise.io,resources=sidecarsets,verbs=get;list;watch;create;update;patch;delete
Expand Down
31 changes: 10 additions & 21 deletions pkg/controller/sidecarset/sidecarset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/control/sidecarcontrol"
"github.com/openkruise/kruise/pkg/util/expectations"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -182,11 +181,9 @@ func testUpdateWhenUseNotUpdateStrategy(t *testing.T, sidecarSetInput *appsv1alp
}

fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(sidecarSetInput, podInput).Build()
exps := expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl)
reconciler := ReconcileSidecarSet{
Client: fakeClient,
updateExpectations: expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl),
processor: NewSidecarSetProcessor(fakeClient, exps, record.NewFakeRecorder(10)),
Client: fakeClient,
processor: NewSidecarSetProcessor(fakeClient, record.NewFakeRecorder(10)),
}
if _, err := reconciler.Reconcile(context.TODO(), request); err != nil {
t.Errorf("reconcile failed, err: %v", err)
Expand Down Expand Up @@ -217,11 +214,9 @@ func testUpdateWhenSidecarSetPaused(t *testing.T, sidecarSetInput *appsv1alpha1.
}

fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(sidecarSetInput, podInput).Build()
exps := expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl)
reconciler := ReconcileSidecarSet{
Client: fakeClient,
updateExpectations: exps,
processor: NewSidecarSetProcessor(fakeClient, exps, record.NewFakeRecorder(10)),
Client: fakeClient,
processor: NewSidecarSetProcessor(fakeClient, record.NewFakeRecorder(10)),
}
if _, err := reconciler.Reconcile(context.TODO(), request); err != nil {
t.Errorf("reconcile failed, err: %v", err)
Expand Down Expand Up @@ -252,11 +247,9 @@ func testUpdateWhenMaxUnavailableNotZero(t *testing.T, sidecarSetInput *appsv1al
}

fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(sidecarSetInput, podInput).Build()
exps := expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl)
reconciler := ReconcileSidecarSet{
Client: fakeClient,
updateExpectations: exps,
processor: NewSidecarSetProcessor(fakeClient, exps, record.NewFakeRecorder(10)),
Client: fakeClient,
processor: NewSidecarSetProcessor(fakeClient, record.NewFakeRecorder(10)),
}
if _, err := reconciler.Reconcile(context.TODO(), request); err != nil {
t.Errorf("reconcile failed, err: %v", err)
Expand Down Expand Up @@ -288,11 +281,9 @@ func testUpdateWhenPartitionFinished(t *testing.T, sidecarSetInput *appsv1alpha1
}

fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(sidecarSetInput, podInput).Build()
exps := expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl)
reconciler := ReconcileSidecarSet{
Client: fakeClient,
updateExpectations: exps,
processor: NewSidecarSetProcessor(fakeClient, exps, record.NewFakeRecorder(10)),
Client: fakeClient,
processor: NewSidecarSetProcessor(fakeClient, record.NewFakeRecorder(10)),
}
if _, err := reconciler.Reconcile(context.TODO(), request); err != nil {
t.Errorf("reconcile failed, err: %v", err)
Expand Down Expand Up @@ -324,11 +315,9 @@ func testRemoveSidecarSet(t *testing.T, sidecarSetInput *appsv1alpha1.SidecarSet
}

fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(sidecarSetInput, podInput).Build()
exps := expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl)
reconciler := ReconcileSidecarSet{
Client: fakeClient,
updateExpectations: exps,
processor: NewSidecarSetProcessor(fakeClient, exps, record.NewFakeRecorder(10)),
Client: fakeClient,
processor: NewSidecarSetProcessor(fakeClient, record.NewFakeRecorder(10)),
}
if _, err := reconciler.Reconcile(context.TODO(), request); err != nil {
t.Errorf("reconcile failed, err: %v", err)
Expand Down
4 changes: 1 addition & 3 deletions pkg/controller/sidecarset/sidecarset_hotupgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/control/sidecarcontrol"
"github.com/openkruise/kruise/pkg/util/expectations"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -243,13 +242,12 @@ func testUpdateHotUpgradeSidecar(t *testing.T, hotUpgradeEmptyImage string, side
expectedStatus: []int32{1, 1, 1, 1},
},
}
exps := expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl)
for _, cs := range cases {
t.Run(cs.name, func(t *testing.T) {
pod := cs.getPods()[0]
sidecarset := cs.getSidecarset()
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(sidecarset, pod).Build()
processor := NewSidecarSetProcessor(fakeClient, exps, record.NewFakeRecorder(10))
processor := NewSidecarSetProcessor(fakeClient, record.NewFakeRecorder(10))
_, err := processor.UpdateSidecarSet(sidecarset)
if err != nil {
t.Errorf("processor update sidecarset failed: %s", err.Error())
Expand Down
18 changes: 18 additions & 0 deletions pkg/controller/sidecarset/sidecarset_pod_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (p *enqueueRequestForPod) Create(evt event.CreateEvent, q workqueue.RateLim
}

func (p *enqueueRequestForPod) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
p.deletePod(evt.Object)
}

func (p *enqueueRequestForPod) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
Expand All @@ -42,6 +43,22 @@ func (p *enqueueRequestForPod) Update(evt event.UpdateEvent, q workqueue.RateLim
p.updatePod(q, evt.ObjectOld, evt.ObjectNew)
}

func (p *enqueueRequestForPod) deletePod(obj runtime.Object) {
pod, ok := obj.(*corev1.Pod)
if !ok {
return
}

sidecarSets, err := p.getPodMatchedSidecarSets(pod)
if err != nil {
klog.Errorf("unable to get sidecarSets related with pod %s/%s, err: %v", pod.Namespace, pod.Name, err)
return
}
for _, sidecarSet := range sidecarSets {
sidecarcontrol.UpdateExpectations.DeleteObject(sidecarSet.Name, pod)
}
}

// When a pod is added, figure out what sidecarSets it will be a member of and
// enqueue them. obj must have *v1.Pod type.
func (p *enqueueRequestForPod) addPod(q workqueue.RateLimitingInterface, obj runtime.Object) {
Expand Down Expand Up @@ -79,6 +96,7 @@ func (p *enqueueRequestForPod) updatePod(q workqueue.RateLimitingInterface, old,
return
}
for _, sidecarSet := range matchedSidecarSets {
sidecarcontrol.UpdateExpectations.ObserveUpdated(sidecarSet.Name, sidecarcontrol.GetSidecarSetRevision(sidecarSet), newPod)
if sidecarSet.Spec.UpdateStrategy.Type == appsv1alpha1.NotUpdateSidecarSetStrategyType {
continue
}
Expand Down
23 changes: 10 additions & 13 deletions pkg/controller/sidecarset/sidecarset_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/openkruise/kruise/pkg/control/sidecarcontrol"
"github.com/openkruise/kruise/pkg/util"
utilclient "github.com/openkruise/kruise/pkg/util/client"
"github.com/openkruise/kruise/pkg/util/expectations"
historyutil "github.com/openkruise/kruise/pkg/util/history"
webhookutil "github.com/openkruise/kruise/pkg/webhook/util"

Expand All @@ -50,18 +49,16 @@ import (
)

type Processor struct {
Client client.Client
recorder record.EventRecorder
historyController history.Interface
updateExpectations expectations.UpdateExpectations
Client client.Client
recorder record.EventRecorder
historyController history.Interface
}

func NewSidecarSetProcessor(cli client.Client, expectations expectations.UpdateExpectations, rec record.EventRecorder) *Processor {
func NewSidecarSetProcessor(cli client.Client, rec record.EventRecorder) *Processor {
return &Processor{
Client: cli,
updateExpectations: expectations,
recorder: rec,
historyController: historyutil.NewHistory(cli),
Client: cli,
recorder: rec,
historyController: historyutil.NewHistory(cli),
}
}

Expand Down Expand Up @@ -96,9 +93,9 @@ func (p *Processor) UpdateSidecarSet(sidecarSet *appsv1alpha1.SidecarSet) (recon

// in case of informer cache latency
for _, pod := range pods {
p.updateExpectations.ObserveUpdated(sidecarSet.Name, sidecarcontrol.GetSidecarSetRevision(sidecarSet), pod)
sidecarcontrol.UpdateExpectations.ObserveUpdated(sidecarSet.Name, sidecarcontrol.GetSidecarSetRevision(sidecarSet), pod)
}
allUpdated, _, inflightPods := p.updateExpectations.SatisfiedExpectations(sidecarSet.Name, sidecarcontrol.GetSidecarSetRevision(sidecarSet))
allUpdated, _, inflightPods := sidecarcontrol.UpdateExpectations.SatisfiedExpectations(sidecarSet.Name, sidecarcontrol.GetSidecarSetRevision(sidecarSet))
if !allUpdated {
klog.V(3).Infof("sidecarset %s matched pods has some update in flight: %v, will sync later", sidecarSet.Name, inflightPods)
return reconcile.Result{RequeueAfter: time.Second}, nil
Expand Down Expand Up @@ -172,7 +169,7 @@ func (p *Processor) updatePods(control sidecarcontrol.SidecarControl, pods []*co
klog.Errorf("updatePodSidecarAndHash error, s:%s, pod:%s, err:%v", sidecarset.Name, pod.Name, err)
return err
}
p.updateExpectations.ExpectUpdated(sidecarset.Name, sidecarcontrol.GetSidecarSetRevision(sidecarset), pod)
sidecarcontrol.UpdateExpectations.ExpectUpdated(sidecarset.Name, sidecarcontrol.GetSidecarSetRevision(sidecarset), pod)
}

klog.V(3).Infof("sidecarSet(%s) updated pods(%s)", sidecarset.Name, strings.Join(podNames, ","))
Expand Down
16 changes: 5 additions & 11 deletions pkg/controller/sidecarset/sidecarset_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/control/sidecarcontrol"
"github.com/openkruise/kruise/pkg/util"
"github.com/openkruise/kruise/pkg/util/expectations"
webhookutil "github.com/openkruise/kruise/pkg/webhook/util"

apps "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -172,13 +171,12 @@ func testUpdateColdUpgradeSidecar(t *testing.T, podDemo *corev1.Pod, sidecarSetI
expectedStatus: []int32{2, 2, 2, 2},
},
}
exps := expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl)
for _, cs := range cases {
t.Run(cs.name, func(t *testing.T) {
pods := cs.getPods()
sidecarset := cs.getSidecarset()
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(sidecarset, pods[0], pods[1]).Build()
processor := NewSidecarSetProcessor(fakeClient, exps, record.NewFakeRecorder(10))
processor := NewSidecarSetProcessor(fakeClient, record.NewFakeRecorder(10))
_, err := processor.UpdateSidecarSet(sidecarset)
if err != nil {
t.Errorf("processor update sidecarset failed: %s", err.Error())
Expand Down Expand Up @@ -253,8 +251,7 @@ func TestScopeNamespacePods(t *testing.T) {
}
fakeClient.Create(context.TODO(), pod)
}
exps := expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl)
processor := NewSidecarSetProcessor(fakeClient, exps, record.NewFakeRecorder(10))
processor := NewSidecarSetProcessor(fakeClient, record.NewFakeRecorder(10))
pods, err := processor.getMatchingPods(sidecarSet)
if err != nil {
t.Fatalf("getMatchingPods failed: %s", err.Error())
Expand All @@ -275,7 +272,6 @@ func TestCanUpgradePods(t *testing.T) {
}
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(sidecarSet).Build()
pods := factoryPodsCommon(100, 0, sidecarSet)
exps := expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl)
for i := range pods {
pods[i].Annotations[sidecarcontrol.SidecarSetListAnnotation] = `test-sidecarset`
if i < 50 {
Expand All @@ -286,7 +282,7 @@ func TestCanUpgradePods(t *testing.T) {
fakeClient.Create(context.TODO(), pods[i])
}

processor := NewSidecarSetProcessor(fakeClient, exps, record.NewFakeRecorder(10))
processor := NewSidecarSetProcessor(fakeClient, record.NewFakeRecorder(10))
_, err := processor.UpdateSidecarSet(sidecarSet)
if err != nil {
t.Errorf("processor update sidecarset failed: %s", err.Error())
Expand Down Expand Up @@ -318,8 +314,7 @@ func TestGetActiveRevisions(t *testing.T) {
kubeSysNs.SetName(webhookutil.GetNamespace())
kubeSysNs.SetNamespace(webhookutil.GetNamespace())
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(sidecarSet, kubeSysNs).Build()
exps := expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl)
processor := NewSidecarSetProcessor(fakeClient, exps, record.NewFakeRecorder(10))
processor := NewSidecarSetProcessor(fakeClient, record.NewFakeRecorder(10))

// case 1
latestRevision, _, err := processor.registerLatestRevision(sidecarSet, nil)
Expand Down Expand Up @@ -433,8 +428,7 @@ func TestTruncateHistory(t *testing.T) {
kubeSysNs.SetName(webhookutil.GetNamespace()) //Note that util.GetKruiseManagerNamespace() return "" here
kubeSysNs.SetNamespace(webhookutil.GetNamespace())
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(sidecarSet, kubeSysNs).Build()
exps := expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl)
processor := NewSidecarSetProcessor(fakeClient, exps, record.NewFakeRecorder(10))
processor := NewSidecarSetProcessor(fakeClient, record.NewFakeRecorder(10))

getName := func(i int) string {
return "sidecar-" + strconv.Itoa(i)
Expand Down

0 comments on commit f2ac7be

Please sign in to comment.