diff --git a/controller/controller.go b/controller/controller.go index 9aaf8ee09f..e9509cd78e 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -304,7 +304,7 @@ func NewManager( ingressWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Ingresses") refResolver := rollout.NewInformerBasedWorkloadRefResolver(namespace, dynamicclientset, discoveryClient, argoprojclientset, rolloutsInformer.Informer()) - apiFactory := notificationapi.NewFactory(record.NewAPIFactorySettings(), defaults.Namespace(), notificationSecretInformerFactory.Core().V1().Secrets().Informer(), notificationConfigMapInformerFactory.Core().V1().ConfigMaps().Informer()) + apiFactory := notificationapi.NewFactory(record.NewAPIFactorySettings(analysisRunInformer), defaults.Namespace(), notificationSecretInformerFactory.Core().V1().Secrets().Informer(), notificationConfigMapInformerFactory.Core().V1().ConfigMaps().Informer()) recorder := record.NewEventRecorder(kubeclientset, metrics.MetricRolloutEventsTotal, metrics.MetricNotificationFailedTotal, metrics.MetricNotificationSuccessTotal, metrics.MetricNotificationSend, apiFactory) notificationsController := notificationcontroller.NewControllerWithNamespaceSupport(dynamicclientset.Resource(v1alpha1.RolloutGVR), rolloutsInformer.Informer(), apiFactory, notificationcontroller.WithToUnstructured(func(obj metav1.Object) (*unstructured.Unstructured, error) { diff --git a/controller/controller_test.go b/controller/controller_test.go index f724ef0712..0f94131c96 100644 --- a/controller/controller_test.go +++ b/controller/controller_test.go @@ -205,7 +205,7 @@ func (f *fixture) newManager(t *testing.T) *Manager { Recorder: record.NewFakeEventRecorder(), }) - apiFactory := notificationapi.NewFactory(record.NewAPIFactorySettings(), "default", k8sI.Core().V1().Secrets().Informer(), k8sI.Core().V1().ConfigMaps().Informer()) + apiFactory := notificationapi.NewFactory(record.NewAPIFactorySettings(i.Argoproj().V1alpha1().AnalysisRuns()), "default", k8sI.Core().V1().Secrets().Informer(), k8sI.Core().V1().ConfigMaps().Informer()) // rolloutsInformer := rolloutinformers.NewRolloutInformer(f.client, "", time.Minute, cache.Indexers{}) cm.notificationsController = notificationcontroller.NewController(dynamicClient.Resource(v1alpha1.RolloutGVR), i.Argoproj().V1alpha1().Rollouts().Informer(), apiFactory, notificationcontroller.WithToUnstructured(func(obj metav1.Object) (*unstructured.Unstructured, error) { diff --git a/pkg/kubectl-argo-rollouts/cmd/cmd.go b/pkg/kubectl-argo-rollouts/cmd/cmd.go index 53adf27b5d..372f8c434d 100644 --- a/pkg/kubectl-argo-rollouts/cmd/cmd.go +++ b/pkg/kubectl-argo-rollouts/cmd/cmd.go @@ -56,6 +56,7 @@ func NewCmdArgoRollouts(o *options.ArgoRolloutsOptions) *cobra.Command { return o.UsageErr(c) }, } + o.AddKubectlFlags(cmd) cmd.AddCommand(create.NewCmdCreate(o)) cmd.AddCommand(get.NewCmdGet(o)) @@ -72,7 +73,7 @@ func NewCmdArgoRollouts(o *options.ArgoRolloutsOptions) *cobra.Command { cmd.AddCommand(undo.NewCmdUndo(o)) cmd.AddCommand(dashboard.NewCmdDashboard(o)) cmd.AddCommand(status.NewCmdStatus(o)) - cmd.AddCommand(notificationcmd.NewToolsCommand("notifications", "kubectl argo rollouts notifications", v1alpha1.RolloutGVR, record.NewAPIFactorySettings())) + cmd.AddCommand(notificationcmd.NewToolsCommand("notifications", "kubectl argo rollouts notifications", v1alpha1.RolloutGVR, record.NewAPIFactorySettings(nil))) cmd.AddCommand(completion.NewCmdCompletion(o)) return cmd diff --git a/utils/annotations/annotations.go b/utils/annotations/annotations.go index 80cec5ce2a..121c6c0803 100644 --- a/utils/annotations/annotations.go +++ b/utils/annotations/annotations.go @@ -8,6 +8,7 @@ import ( log "github.com/sirupsen/logrus" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" "github.com/argoproj/argo-rollouts/utils/defaults" @@ -52,17 +53,30 @@ func GetWorkloadGenerationAnnotation(ro *v1alpha1.Rollout) (int32, bool) { } // GetRevisionAnnotation returns revision of rollout -func GetRevisionAnnotation(ro *v1alpha1.Rollout) (int32, bool) { - if ro == nil { +func GetRevisionAnnotation(anyObj metav1.Object) (int32, bool) { + + if anyObj == nil { + return 0, false + } + var obj interface{} + switch anyObj.(type) { + case *v1alpha1.Rollout: + obj = anyObj.(*v1alpha1.Rollout) + case *v1alpha1.AnalysisRun: + obj = anyObj.(*v1alpha1.AnalysisRun) + default: + log.Warnf("object not supported type: %T", anyObj) return 0, false } - annotationValue, ok := ro.Annotations[RevisionAnnotation] + + annotationValue, ok := obj.(metav1.Object).GetAnnotations()[RevisionAnnotation] if !ok { - return int32(0), false + return 0, false } + intValue, err := strconv.ParseInt(annotationValue, 10, 32) if err != nil { - log.Warnf("Cannot convert the value %q with annotation key %q for the replica set %q", annotationValue, RevisionAnnotation, ro.Name) + log.Warnf("Cannot convert the value %q with annotation key %q for the object %q", annotationValue, RevisionAnnotation, anyObj.GetName()) return int32(0), false } return int32(intValue), true diff --git a/utils/annotations/annotations_test.go b/utils/annotations/annotations_test.go index 3ad136fd7e..239ab83fd3 100644 --- a/utils/annotations/annotations_test.go +++ b/utils/annotations/annotations_test.go @@ -483,4 +483,16 @@ func TestGetRevisionAnnotation(t *testing.T) { }) assert.False(t, found) assert.Equal(t, int32(0), rev) + + revAR, found := GetRevisionAnnotation(&v1alpha1.AnalysisRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: metav1.NamespaceDefault, + Annotations: map[string]string{ + RevisionAnnotation: "1", + }, + }, + }) + assert.True(t, found) + assert.Equal(t, int32(1), revAR) } diff --git a/utils/record/record.go b/utils/record/record.go index 9e50f3ba65..801db089b6 100644 --- a/utils/record/record.go +++ b/utils/record/record.go @@ -7,11 +7,12 @@ import ( "encoding/json" "fmt" "regexp" + "sort" "strings" "time" + argoinformers "github.com/argoproj/argo-rollouts/pkg/client/informers/externalversions/rollouts/v1alpha1" timeutil "github.com/argoproj/argo-rollouts/utils/time" - "github.com/argoproj/notifications-engine/pkg/api" "github.com/argoproj/notifications-engine/pkg/services" "github.com/argoproj/notifications-engine/pkg/subscriptions" @@ -20,6 +21,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" k8sinformers "k8s.io/client-go/informers" @@ -32,6 +34,7 @@ import ( "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" rolloutscheme "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned/scheme" + "github.com/argoproj/argo-rollouts/utils/annotations" logutil "github.com/argoproj/argo-rollouts/utils/log" ) @@ -235,13 +238,83 @@ func (e *EventRecorderAdapter) K8sRecorder() record.EventRecorder { return e.Recorder } -func NewAPIFactorySettings() api.Settings { +func getAnalysisRunsFilterWithLabels(ro v1alpha1.Rollout, arInformer argoinformers.AnalysisRunInformer) (any, error) { + + set := labels.Set(map[string]string{ + v1alpha1.DefaultRolloutUniqueLabelKey: ro.Status.CurrentPodHash, + }) + + revision, _ := annotations.GetRevisionAnnotation(&ro) + ars, err := arInformer.Lister().AnalysisRuns(ro.Namespace).List(labels.SelectorFromSet(set)) + if err != nil { + return nil, fmt.Errorf("error getting analysisruns from informer for namespace: %s error: %w", ro.Namespace, err) + } + if len(ars) == 0 { + return nil, nil + } + + filteredArs := make([]*v1alpha1.AnalysisRun, 0, len(ars)) + for _, ar := range ars { + arRevision, _ := annotations.GetRevisionAnnotation(ar) + if arRevision == revision { + filteredArs = append(filteredArs, ar) + } + } + + sort.Slice(filteredArs, func(i, j int) bool { + ts1 := filteredArs[i].ObjectMeta.CreationTimestamp.Time + ts2 := filteredArs[j].ObjectMeta.CreationTimestamp.Time + return ts1.After(ts2) + }) + + var arsObj any + arBytes, err := json.Marshal(filteredArs) + + if err != nil { + return nil, fmt.Errorf("Failed to marshal analysisRuns for rollout revision: %s, err: %w", string(revision), err) + } + + err = json.Unmarshal(arBytes, &arsObj) + if err != nil { + return nil, fmt.Errorf("Failed to unmarshal analysisRuns for rollout revision: %s, err: %w", string(revision), err) + } + + return arsObj, nil +} + +func NewAPIFactorySettings(arInformer argoinformers.AnalysisRunInformer) api.Settings { return api.Settings{ SecretName: NotificationSecret, ConfigMapName: NotificationConfigMap, InitGetVars: func(cfg *api.Config, configMap *corev1.ConfigMap, secret *corev1.Secret) (api.GetVars, error) { return func(obj map[string]any, dest services.Destination) map[string]any { - return map[string]any{"rollout": obj, "time": timeExprs} + + var vars = map[string]any{"rollout": obj, "time": timeExprs} + + if arInformer == nil { + log.Infof("Notification is not set for analysisRun Informer: %s", dest) + return vars + } + + var ro v1alpha1.Rollout + err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj, &ro) + + if err != nil { + log.Errorf("unable to send notification: bad rollout object: %v", err) + return vars + } + + arsObj, err := getAnalysisRunsFilterWithLabels(ro, arInformer) + + if err != nil { + log.Errorf("Error calling getAnalysisRunsFilterWithLabels for namespace: %s", + ro.Namespace) + return vars + + } + + vars = map[string]any{"rollout": obj, "analysisRuns": arsObj, "time": timeExprs} + return vars }, nil }, } diff --git a/utils/record/record_test.go b/utils/record/record_test.go index 97f4452441..2cc7afd12c 100644 --- a/utils/record/record_test.go +++ b/utils/record/record_test.go @@ -2,6 +2,7 @@ package record import ( "bytes" + "encoding/json" "errors" "fmt" "strings" @@ -9,7 +10,11 @@ import ( "time" "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" + argofake "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned/fake" + argoinformersfactory "github.com/argoproj/argo-rollouts/pkg/client/informers/externalversions" + argoinformers "github.com/argoproj/argo-rollouts/pkg/client/informers/externalversions/rollouts/v1alpha1" "github.com/argoproj/argo-rollouts/utils/defaults" + timeutil "github.com/argoproj/argo-rollouts/utils/time" "github.com/argoproj/notifications-engine/pkg/api" notificationapi "github.com/argoproj/notifications-engine/pkg/api" "github.com/argoproj/notifications-engine/pkg/mocks" @@ -21,13 +26,19 @@ import ( dto "github.com/prometheus/client_model/go" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" ) +var ( + noResyncPeriodFunc = func() time.Duration { return 0 } +) + func TestRecordLog(t *testing.T) { prevOutput := log.StandardLogger().Out defer func() { @@ -178,13 +189,18 @@ func TestSendNotificationsWhenConditionTime(t *testing.T) { k8sClient := fake.NewSimpleClientset() sharedInformers := informers.NewSharedInformerFactory(k8sClient, 0) + + f := argofake.NewSimpleClientset() + rolloutsI := argoinformersfactory.NewSharedInformerFactory(f, noResyncPeriodFunc()) + arInformer := rolloutsI.Argoproj().V1alpha1().AnalysisRuns() + cmInformer := sharedInformers.Core().V1().ConfigMaps().Informer() secretInformer := sharedInformers.Core().V1().Secrets().Informer() secretInformer.GetIndexer().Add(secret) cmInformer.GetIndexer().Add(cm) - apiFactory := notificationapi.NewFactory(NewAPIFactorySettings(), defaults.Namespace(), secretInformer, cmInformer) + apiFactory := notificationapi.NewFactory(NewAPIFactorySettings(arInformer), defaults.Namespace(), secretInformer, cmInformer) api, err := apiFactory.GetAPI() assert.NoError(t, err) @@ -224,8 +240,11 @@ func TestSendNotificationsWhenConditionTime(t *testing.T) { secretInformer.GetIndexer().Add(secret) cmInformer.GetIndexer().Add(cm) + f := argofake.NewSimpleClientset() + rolloutsI := argoinformersfactory.NewSharedInformerFactory(f, noResyncPeriodFunc()) + arInformer := rolloutsI.Argoproj().V1alpha1().AnalysisRuns() - apiFactory := notificationapi.NewFactory(NewAPIFactorySettings(), defaults.Namespace(), secretInformer, cmInformer) + apiFactory := notificationapi.NewFactory(NewAPIFactorySettings(arInformer), defaults.Namespace(), secretInformer, cmInformer) api, err := apiFactory.GetAPI() assert.NoError(t, err) @@ -422,17 +441,162 @@ func TestSendNotificationsNoTrigger(t *testing.T) { assert.Len(t, err, 1) } +func createAnalysisRunInformer(ars []*v1alpha1.AnalysisRun) argoinformers.AnalysisRunInformer { + f := argofake.NewSimpleClientset() + rolloutsI := argoinformersfactory.NewSharedInformerFactory(f, noResyncPeriodFunc()) + arInformer := rolloutsI.Argoproj().V1alpha1().AnalysisRuns() + for _, ar := range ars { + _ = arInformer.Informer().GetStore().Add(ar) + } + return arInformer +} + func TestNewAPIFactorySettings(t *testing.T) { - settings := NewAPIFactorySettings() - assert.Equal(t, NotificationConfigMap, settings.ConfigMapName) - assert.Equal(t, NotificationSecret, settings.SecretName) - getVars, err := settings.InitGetVars(nil, nil, nil) - assert.NoError(t, err) - rollout := map[string]any{"name": "hello"} - vars := getVars(rollout, services.Destination{}) + ars := []*v1alpha1.AnalysisRun{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "analysis-run-1", + CreationTimestamp: metav1.NewTime(timeutil.Now().Add(-1 * time.Hour)), + Namespace: "default", + Labels: map[string]string{"rollouts-pod-template-hash": "85659df978"}, + Annotations: map[string]string{"rollout.argoproj.io/revision": "1"}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "analysis-run-2", + CreationTimestamp: metav1.NewTime(timeutil.Now().Add(-2 * time.Hour)), + Namespace: "default", + Labels: map[string]string{"rollouts-pod-template-hash": "85659df978"}, + Annotations: map[string]string{"rollout.argoproj.io/revision": "1"}, + }, + }, + } + ro := v1alpha1.Rollout{ + ObjectMeta: metav1.ObjectMeta{ + Name: "rollout", + Namespace: "default", + Annotations: map[string]string{"rollout.argoproj.io/revision": "1"}, + }, + Status: v1alpha1.RolloutStatus{ + CurrentPodHash: "85659df978", + }, + } + + type expectedFunc func(obj map[string]interface{}, ar any) map[string]interface{} + type arInformerFunc func([]*v1alpha1.AnalysisRun) argoinformers.AnalysisRunInformer + + testcase := []struct { + name string + arInformer arInformerFunc + rollout v1alpha1.Rollout + ars []*v1alpha1.AnalysisRun + expected expectedFunc + }{ + { + name: "Send notification with rollout and analysisRun", + arInformer: func(ars []*v1alpha1.AnalysisRun) argoinformers.AnalysisRunInformer { + return createAnalysisRunInformer(ars) + }, + rollout: ro, + ars: ars, + expected: func(obj map[string]interface{}, ar any) map[string]interface{} { + return map[string]interface{}{ + "rollout": obj, + "analysisRuns": ar, + "time": timeExprs, + } + }, + }, + { + name: "Send notification rollout when revision and label doesn't match", + arInformer: func(ars []*v1alpha1.AnalysisRun) argoinformers.AnalysisRunInformer { + return createAnalysisRunInformer(ars) + }, + rollout: ro, + ars: []*v1alpha1.AnalysisRun{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "analysis-run-3", + CreationTimestamp: metav1.NewTime(timeutil.Now().Add(-2 * time.Hour)), + Namespace: "default", + Labels: map[string]string{"rollouts-pod-template-hash": "1234"}, + Annotations: map[string]string{"rollout.argoproj.io/revision": "2"}, + }, + }, + }, + expected: func(obj map[string]interface{}, ar any) map[string]interface{} { + return map[string]interface{}{ + "rollout": obj, + "analysisRuns": nil, + "time": timeExprs, + } + }, + }, + { + name: "arInformer is nil", + arInformer: func(ars []*v1alpha1.AnalysisRun) argoinformers.AnalysisRunInformer { + return nil + }, + rollout: ro, + ars: nil, + expected: func(obj map[string]interface{}, ar any) map[string]interface{} { + return map[string]interface{}{ + "rollout": obj, + "time": timeExprs, + } + }, + }, + { + name: "analysisRuns nil for no matching namespace", + arInformer: func(ars []*v1alpha1.AnalysisRun) argoinformers.AnalysisRunInformer { + return createAnalysisRunInformer(ars) + }, + rollout: ro, + ars: []*v1alpha1.AnalysisRun{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "analysis-run-1", + CreationTimestamp: metav1.NewTime(timeutil.Now().Add(-2 * time.Hour)), + Namespace: "default-1", + Labels: map[string]string{"rollouts-pod-template-hash": "1234"}, + Annotations: map[string]string{"rollout.argoproj.io/revision": "2"}, + }, + }, + }, + expected: func(obj map[string]interface{}, ar any) map[string]interface{} { + return map[string]interface{}{ + "rollout": obj, + "analysisRuns": nil, + "time": timeExprs, + } + }, + }, + } - assert.Equal(t, map[string]any{"rollout": rollout, "time": timeExprs}, vars) + for _, test := range testcase { + t.Run(test.name, func(t *testing.T) { + + settings := NewAPIFactorySettings(test.arInformer(test.ars)) + getVars, err := settings.InitGetVars(nil, nil, nil) + require.NoError(t, err) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + obj, _ := runtime.DefaultUnstructuredConverter.ToUnstructured(&test.rollout) + + arBytes, err := json.Marshal(test.ars) + var arsObj any + _ = json.Unmarshal(arBytes, &arsObj) + vars := getVars(obj, services.Destination{}) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + assert.Equal(t, test.expected(obj, arsObj), vars) + }) + } } func TestWorkloadRefObjectMap(t *testing.T) {