diff --git a/pkg/control/sidecarcontrol/util.go b/pkg/control/sidecarcontrol/util.go index 30dc8ad861..19297b8656 100644 --- a/pkg/control/sidecarcontrol/util.go +++ b/pkg/control/sidecarcontrol/util.go @@ -30,6 +30,7 @@ import ( "github.com/openkruise/kruise/pkg/util" utilclient "github.com/openkruise/kruise/pkg/util/client" "github.com/openkruise/kruise/pkg/util/configuration" + "github.com/openkruise/kruise/pkg/util/expectations" utilfeature "github.com/openkruise/kruise/pkg/util/feature" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" @@ -67,6 +68,8 @@ var ( // SidecarIgnoredNamespaces = []string{"kube-system", "kube-public"} // SubPathExprEnvReg format: $(ODD_NAME)、$(POD_NAME)... SubPathExprEnvReg, _ = regexp.Compile(`\$\(([-._a-zA-Z][-._a-zA-Z0-9]*)\)`) + + UpdateExpectations = expectations.NewUpdateExpectations(RevisionAdapterImpl) ) type SidecarSetUpgradeSpec struct { diff --git a/pkg/controller/sidecarset/sidecarset_controller.go b/pkg/controller/sidecarset/sidecarset_controller.go index ee8ec7e5a7..db10004266 100644 --- a/pkg/controller/sidecarset/sidecarset_controller.go +++ b/pkg/controller/sidecarset/sidecarset_controller.go @@ -34,10 +34,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/source" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - "github.com/openkruise/kruise/pkg/control/sidecarcontrol" utilclient "github.com/openkruise/kruise/pkg/util/client" utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" - "github.com/openkruise/kruise/pkg/util/expectations" "github.com/openkruise/kruise/pkg/util/ratelimiter" ) @@ -66,13 +64,12 @@ func Add(mgr manager.Manager) error { // newReconciler returns a new reconcile.Reconciler func newReconciler(mgr manager.Manager) reconcile.Reconciler { - expectations := expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl) recorder := mgr.GetEventRecorderFor("sidecarset-controller") cli := utilclient.NewClientFromManager(mgr, "sidecarset-controller") return &ReconcileSidecarSet{ Client: cli, scheme: mgr.GetScheme(), - processor: NewSidecarSetProcessor(cli, expectations, recorder), + processor: NewSidecarSetProcessor(cli, recorder), } } @@ -115,9 +112,8 @@ var _ reconcile.Reconciler = &ReconcileSidecarSet{} // ReconcileSidecarSet reconciles a SidecarSet object type ReconcileSidecarSet struct { client.Client - scheme *runtime.Scheme - updateExpectations expectations.UpdateExpectations - processor *Processor + scheme *runtime.Scheme + processor *Processor } // +kubebuilder:rbac:groups=apps.kruise.io,resources=sidecarsets,verbs=get;list;watch;create;update;patch;delete diff --git a/pkg/controller/sidecarset/sidecarset_controller_test.go b/pkg/controller/sidecarset/sidecarset_controller_test.go index 7a3c2448f2..806c2c844f 100644 --- a/pkg/controller/sidecarset/sidecarset_controller_test.go +++ b/pkg/controller/sidecarset/sidecarset_controller_test.go @@ -6,7 +6,6 @@ import ( appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" "github.com/openkruise/kruise/pkg/control/sidecarcontrol" - "github.com/openkruise/kruise/pkg/util/expectations" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -182,11 +181,9 @@ func testUpdateWhenUseNotUpdateStrategy(t *testing.T, sidecarSetInput *appsv1alp } fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(sidecarSetInput, podInput).Build() - exps := expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl) reconciler := ReconcileSidecarSet{ - Client: fakeClient, - updateExpectations: expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl), - processor: NewSidecarSetProcessor(fakeClient, exps, record.NewFakeRecorder(10)), + Client: fakeClient, + processor: NewSidecarSetProcessor(fakeClient, record.NewFakeRecorder(10)), } if _, err := reconciler.Reconcile(context.TODO(), request); err != nil { t.Errorf("reconcile failed, err: %v", err) @@ -217,11 +214,9 @@ func testUpdateWhenSidecarSetPaused(t *testing.T, sidecarSetInput *appsv1alpha1. } fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(sidecarSetInput, podInput).Build() - exps := expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl) reconciler := ReconcileSidecarSet{ - Client: fakeClient, - updateExpectations: exps, - processor: NewSidecarSetProcessor(fakeClient, exps, record.NewFakeRecorder(10)), + Client: fakeClient, + processor: NewSidecarSetProcessor(fakeClient, record.NewFakeRecorder(10)), } if _, err := reconciler.Reconcile(context.TODO(), request); err != nil { t.Errorf("reconcile failed, err: %v", err) @@ -252,11 +247,9 @@ func testUpdateWhenMaxUnavailableNotZero(t *testing.T, sidecarSetInput *appsv1al } fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(sidecarSetInput, podInput).Build() - exps := expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl) reconciler := ReconcileSidecarSet{ - Client: fakeClient, - updateExpectations: exps, - processor: NewSidecarSetProcessor(fakeClient, exps, record.NewFakeRecorder(10)), + Client: fakeClient, + processor: NewSidecarSetProcessor(fakeClient, record.NewFakeRecorder(10)), } if _, err := reconciler.Reconcile(context.TODO(), request); err != nil { t.Errorf("reconcile failed, err: %v", err) @@ -288,11 +281,9 @@ func testUpdateWhenPartitionFinished(t *testing.T, sidecarSetInput *appsv1alpha1 } fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(sidecarSetInput, podInput).Build() - exps := expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl) reconciler := ReconcileSidecarSet{ - Client: fakeClient, - updateExpectations: exps, - processor: NewSidecarSetProcessor(fakeClient, exps, record.NewFakeRecorder(10)), + Client: fakeClient, + processor: NewSidecarSetProcessor(fakeClient, record.NewFakeRecorder(10)), } if _, err := reconciler.Reconcile(context.TODO(), request); err != nil { t.Errorf("reconcile failed, err: %v", err) @@ -324,11 +315,9 @@ func testRemoveSidecarSet(t *testing.T, sidecarSetInput *appsv1alpha1.SidecarSet } fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(sidecarSetInput, podInput).Build() - exps := expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl) reconciler := ReconcileSidecarSet{ - Client: fakeClient, - updateExpectations: exps, - processor: NewSidecarSetProcessor(fakeClient, exps, record.NewFakeRecorder(10)), + Client: fakeClient, + processor: NewSidecarSetProcessor(fakeClient, record.NewFakeRecorder(10)), } if _, err := reconciler.Reconcile(context.TODO(), request); err != nil { t.Errorf("reconcile failed, err: %v", err) diff --git a/pkg/controller/sidecarset/sidecarset_hotupgrade_test.go b/pkg/controller/sidecarset/sidecarset_hotupgrade_test.go index eeb2e55cbe..a7d5056526 100644 --- a/pkg/controller/sidecarset/sidecarset_hotupgrade_test.go +++ b/pkg/controller/sidecarset/sidecarset_hotupgrade_test.go @@ -22,7 +22,6 @@ import ( appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" "github.com/openkruise/kruise/pkg/control/sidecarcontrol" - "github.com/openkruise/kruise/pkg/util/expectations" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -243,13 +242,12 @@ func testUpdateHotUpgradeSidecar(t *testing.T, hotUpgradeEmptyImage string, side expectedStatus: []int32{1, 1, 1, 1}, }, } - exps := expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl) for _, cs := range cases { t.Run(cs.name, func(t *testing.T) { pod := cs.getPods()[0] sidecarset := cs.getSidecarset() fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(sidecarset, pod).Build() - processor := NewSidecarSetProcessor(fakeClient, exps, record.NewFakeRecorder(10)) + processor := NewSidecarSetProcessor(fakeClient, record.NewFakeRecorder(10)) _, err := processor.UpdateSidecarSet(sidecarset) if err != nil { t.Errorf("processor update sidecarset failed: %s", err.Error()) diff --git a/pkg/controller/sidecarset/sidecarset_pod_event_handler.go b/pkg/controller/sidecarset/sidecarset_pod_event_handler.go index 5a9c36fea7..5ac457e67e 100644 --- a/pkg/controller/sidecarset/sidecarset_pod_event_handler.go +++ b/pkg/controller/sidecarset/sidecarset_pod_event_handler.go @@ -33,6 +33,7 @@ func (p *enqueueRequestForPod) Create(evt event.CreateEvent, q workqueue.RateLim } func (p *enqueueRequestForPod) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { + p.deletePod(evt.Object) } func (p *enqueueRequestForPod) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { @@ -42,6 +43,22 @@ func (p *enqueueRequestForPod) Update(evt event.UpdateEvent, q workqueue.RateLim p.updatePod(q, evt.ObjectOld, evt.ObjectNew) } +func (p *enqueueRequestForPod) deletePod(obj runtime.Object) { + pod, ok := obj.(*corev1.Pod) + if !ok { + return + } + + sidecarSets, err := p.getPodMatchedSidecarSets(pod) + if err != nil { + klog.Errorf("unable to get sidecarSets related with pod %s/%s, err: %v", pod.Namespace, pod.Name, err) + return + } + for _, sidecarSet := range sidecarSets { + sidecarcontrol.UpdateExpectations.DeleteObject(sidecarSet.Name, pod) + } +} + // When a pod is added, figure out what sidecarSets it will be a member of and // enqueue them. obj must have *v1.Pod type. func (p *enqueueRequestForPod) addPod(q workqueue.RateLimitingInterface, obj runtime.Object) { @@ -79,6 +96,7 @@ func (p *enqueueRequestForPod) updatePod(q workqueue.RateLimitingInterface, old, return } for _, sidecarSet := range matchedSidecarSets { + sidecarcontrol.UpdateExpectations.ObserveUpdated(sidecarSet.Name, sidecarcontrol.GetSidecarSetRevision(sidecarSet), newPod) if sidecarSet.Spec.UpdateStrategy.Type == appsv1alpha1.NotUpdateSidecarSetStrategyType { continue } diff --git a/pkg/controller/sidecarset/sidecarset_processor.go b/pkg/controller/sidecarset/sidecarset_processor.go index 3c0e147638..0d50e31553 100644 --- a/pkg/controller/sidecarset/sidecarset_processor.go +++ b/pkg/controller/sidecarset/sidecarset_processor.go @@ -29,7 +29,6 @@ import ( "github.com/openkruise/kruise/pkg/control/sidecarcontrol" "github.com/openkruise/kruise/pkg/util" utilclient "github.com/openkruise/kruise/pkg/util/client" - "github.com/openkruise/kruise/pkg/util/expectations" historyutil "github.com/openkruise/kruise/pkg/util/history" webhookutil "github.com/openkruise/kruise/pkg/webhook/util" @@ -50,18 +49,16 @@ import ( ) type Processor struct { - Client client.Client - recorder record.EventRecorder - historyController history.Interface - updateExpectations expectations.UpdateExpectations + Client client.Client + recorder record.EventRecorder + historyController history.Interface } -func NewSidecarSetProcessor(cli client.Client, expectations expectations.UpdateExpectations, rec record.EventRecorder) *Processor { +func NewSidecarSetProcessor(cli client.Client, rec record.EventRecorder) *Processor { return &Processor{ - Client: cli, - updateExpectations: expectations, - recorder: rec, - historyController: historyutil.NewHistory(cli), + Client: cli, + recorder: rec, + historyController: historyutil.NewHistory(cli), } } @@ -96,9 +93,9 @@ func (p *Processor) UpdateSidecarSet(sidecarSet *appsv1alpha1.SidecarSet) (recon // in case of informer cache latency for _, pod := range pods { - p.updateExpectations.ObserveUpdated(sidecarSet.Name, sidecarcontrol.GetSidecarSetRevision(sidecarSet), pod) + sidecarcontrol.UpdateExpectations.ObserveUpdated(sidecarSet.Name, sidecarcontrol.GetSidecarSetRevision(sidecarSet), pod) } - allUpdated, _, inflightPods := p.updateExpectations.SatisfiedExpectations(sidecarSet.Name, sidecarcontrol.GetSidecarSetRevision(sidecarSet)) + allUpdated, _, inflightPods := sidecarcontrol.UpdateExpectations.SatisfiedExpectations(sidecarSet.Name, sidecarcontrol.GetSidecarSetRevision(sidecarSet)) if !allUpdated { klog.V(3).Infof("sidecarset %s matched pods has some update in flight: %v, will sync later", sidecarSet.Name, inflightPods) return reconcile.Result{RequeueAfter: time.Second}, nil @@ -172,7 +169,7 @@ func (p *Processor) updatePods(control sidecarcontrol.SidecarControl, pods []*co klog.Errorf("updatePodSidecarAndHash error, s:%s, pod:%s, err:%v", sidecarset.Name, pod.Name, err) return err } - p.updateExpectations.ExpectUpdated(sidecarset.Name, sidecarcontrol.GetSidecarSetRevision(sidecarset), pod) + sidecarcontrol.UpdateExpectations.ExpectUpdated(sidecarset.Name, sidecarcontrol.GetSidecarSetRevision(sidecarset), pod) } klog.V(3).Infof("sidecarSet(%s) updated pods(%s)", sidecarset.Name, strings.Join(podNames, ",")) diff --git a/pkg/controller/sidecarset/sidecarset_processor_test.go b/pkg/controller/sidecarset/sidecarset_processor_test.go index 7f4d2b01a7..8752b928d3 100644 --- a/pkg/controller/sidecarset/sidecarset_processor_test.go +++ b/pkg/controller/sidecarset/sidecarset_processor_test.go @@ -26,7 +26,6 @@ import ( appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" "github.com/openkruise/kruise/pkg/control/sidecarcontrol" "github.com/openkruise/kruise/pkg/util" - "github.com/openkruise/kruise/pkg/util/expectations" webhookutil "github.com/openkruise/kruise/pkg/webhook/util" apps "k8s.io/api/apps/v1" @@ -172,13 +171,12 @@ func testUpdateColdUpgradeSidecar(t *testing.T, podDemo *corev1.Pod, sidecarSetI expectedStatus: []int32{2, 2, 2, 2}, }, } - exps := expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl) for _, cs := range cases { t.Run(cs.name, func(t *testing.T) { pods := cs.getPods() sidecarset := cs.getSidecarset() fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(sidecarset, pods[0], pods[1]).Build() - processor := NewSidecarSetProcessor(fakeClient, exps, record.NewFakeRecorder(10)) + processor := NewSidecarSetProcessor(fakeClient, record.NewFakeRecorder(10)) _, err := processor.UpdateSidecarSet(sidecarset) if err != nil { t.Errorf("processor update sidecarset failed: %s", err.Error()) @@ -253,8 +251,7 @@ func TestScopeNamespacePods(t *testing.T) { } fakeClient.Create(context.TODO(), pod) } - exps := expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl) - processor := NewSidecarSetProcessor(fakeClient, exps, record.NewFakeRecorder(10)) + processor := NewSidecarSetProcessor(fakeClient, record.NewFakeRecorder(10)) pods, err := processor.getMatchingPods(sidecarSet) if err != nil { t.Fatalf("getMatchingPods failed: %s", err.Error()) @@ -275,7 +272,6 @@ func TestCanUpgradePods(t *testing.T) { } fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(sidecarSet).Build() pods := factoryPodsCommon(100, 0, sidecarSet) - exps := expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl) for i := range pods { pods[i].Annotations[sidecarcontrol.SidecarSetListAnnotation] = `test-sidecarset` if i < 50 { @@ -286,7 +282,7 @@ func TestCanUpgradePods(t *testing.T) { fakeClient.Create(context.TODO(), pods[i]) } - processor := NewSidecarSetProcessor(fakeClient, exps, record.NewFakeRecorder(10)) + processor := NewSidecarSetProcessor(fakeClient, record.NewFakeRecorder(10)) _, err := processor.UpdateSidecarSet(sidecarSet) if err != nil { t.Errorf("processor update sidecarset failed: %s", err.Error()) @@ -318,8 +314,7 @@ func TestGetActiveRevisions(t *testing.T) { kubeSysNs.SetName(webhookutil.GetNamespace()) kubeSysNs.SetNamespace(webhookutil.GetNamespace()) fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(sidecarSet, kubeSysNs).Build() - exps := expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl) - processor := NewSidecarSetProcessor(fakeClient, exps, record.NewFakeRecorder(10)) + processor := NewSidecarSetProcessor(fakeClient, record.NewFakeRecorder(10)) // case 1 latestRevision, _, err := processor.registerLatestRevision(sidecarSet, nil) @@ -433,8 +428,7 @@ func TestTruncateHistory(t *testing.T) { kubeSysNs.SetName(webhookutil.GetNamespace()) //Note that util.GetKruiseManagerNamespace() return "" here kubeSysNs.SetNamespace(webhookutil.GetNamespace()) fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(sidecarSet, kubeSysNs).Build() - exps := expectations.NewUpdateExpectations(sidecarcontrol.RevisionAdapterImpl) - processor := NewSidecarSetProcessor(fakeClient, exps, record.NewFakeRecorder(10)) + processor := NewSidecarSetProcessor(fakeClient, record.NewFakeRecorder(10)) getName := func(i int) string { return "sidecar-" + strconv.Itoa(i)