Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Analysis run to rollout notifications #3296

Merged
merged 6 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func NewManager(
ingressWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Ingresses")

refResolver := rollout.NewInformerBasedWorkloadRefResolver(namespace, dynamicclientset, discoveryClient, argoprojclientset, rolloutsInformer.Informer())
apiFactory := notificationapi.NewFactory(record.NewAPIFactorySettings(), defaults.Namespace(), notificationSecretInformerFactory.Core().V1().Secrets().Informer(), notificationConfigMapInformerFactory.Core().V1().ConfigMaps().Informer())
apiFactory := notificationapi.NewFactory(record.NewAPIFactorySettings(analysisRunInformer), defaults.Namespace(), notificationSecretInformerFactory.Core().V1().Secrets().Informer(), notificationConfigMapInformerFactory.Core().V1().ConfigMaps().Informer())
recorder := record.NewEventRecorder(kubeclientset, metrics.MetricRolloutEventsTotal, metrics.MetricNotificationFailedTotal, metrics.MetricNotificationSuccessTotal, metrics.MetricNotificationSend, apiFactory)
notificationsController := notificationcontroller.NewControllerWithNamespaceSupport(dynamicclientset.Resource(v1alpha1.RolloutGVR), rolloutsInformer.Informer(), apiFactory,
notificationcontroller.WithToUnstructured(func(obj metav1.Object) (*unstructured.Unstructured, error) {
Expand Down
2 changes: 1 addition & 1 deletion controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (f *fixture) newManager(t *testing.T) *Manager {
Recorder: record.NewFakeEventRecorder(),
})

apiFactory := notificationapi.NewFactory(record.NewAPIFactorySettings(), "default", k8sI.Core().V1().Secrets().Informer(), k8sI.Core().V1().ConfigMaps().Informer())
apiFactory := notificationapi.NewFactory(record.NewAPIFactorySettings(i.Argoproj().V1alpha1().AnalysisRuns()), "default", k8sI.Core().V1().Secrets().Informer(), k8sI.Core().V1().ConfigMaps().Informer())
// rolloutsInformer := rolloutinformers.NewRolloutInformer(f.client, "", time.Minute, cache.Indexers{})
cm.notificationsController = notificationcontroller.NewController(dynamicClient.Resource(v1alpha1.RolloutGVR), i.Argoproj().V1alpha1().Rollouts().Informer(), apiFactory,
notificationcontroller.WithToUnstructured(func(obj metav1.Object) (*unstructured.Unstructured, error) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/kubectl-argo-rollouts/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func NewCmdArgoRollouts(o *options.ArgoRolloutsOptions) *cobra.Command {
return o.UsageErr(c)
},
}

o.AddKubectlFlags(cmd)
cmd.AddCommand(create.NewCmdCreate(o))
cmd.AddCommand(get.NewCmdGet(o))
Expand All @@ -72,7 +73,7 @@ func NewCmdArgoRollouts(o *options.ArgoRolloutsOptions) *cobra.Command {
cmd.AddCommand(undo.NewCmdUndo(o))
cmd.AddCommand(dashboard.NewCmdDashboard(o))
cmd.AddCommand(status.NewCmdStatus(o))
cmd.AddCommand(notificationcmd.NewToolsCommand("notifications", "kubectl argo rollouts notifications", v1alpha1.RolloutGVR, record.NewAPIFactorySettings()))
cmd.AddCommand(notificationcmd.NewToolsCommand("notifications", "kubectl argo rollouts notifications", v1alpha1.RolloutGVR, record.NewAPIFactorySettings(nil)))
cmd.AddCommand(completion.NewCmdCompletion(o))

return cmd
Expand Down
24 changes: 19 additions & 5 deletions utils/annotations/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
log "github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/argoproj/argo-rollouts/utils/defaults"
Expand Down Expand Up @@ -52,17 +53,30 @@ func GetWorkloadGenerationAnnotation(ro *v1alpha1.Rollout) (int32, bool) {
}

// GetRevisionAnnotation returns revision of rollout
func GetRevisionAnnotation(ro *v1alpha1.Rollout) (int32, bool) {
if ro == nil {
func GetRevisionAnnotation(anyObj metav1.Object) (int32, bool) {

if anyObj == nil {
return 0, false
}
var obj interface{}
switch anyObj.(type) {
case *v1alpha1.Rollout:
obj = anyObj.(*v1alpha1.Rollout)
case *v1alpha1.AnalysisRun:
obj = anyObj.(*v1alpha1.AnalysisRun)
default:
ashutosh16 marked this conversation as resolved.
Show resolved Hide resolved
log.Warnf("object not supported type: %T", anyObj)
return 0, false
}
annotationValue, ok := ro.Annotations[RevisionAnnotation]

annotationValue, ok := obj.(metav1.Object).GetAnnotations()[RevisionAnnotation]
if !ok {
return int32(0), false
return 0, false
}

intValue, err := strconv.ParseInt(annotationValue, 10, 32)
if err != nil {
log.Warnf("Cannot convert the value %q with annotation key %q for the replica set %q", annotationValue, RevisionAnnotation, ro.Name)
log.Warnf("Cannot convert the value %q with annotation key %q for the object %q", annotationValue, RevisionAnnotation, anyObj.GetName())
return int32(0), false
}
return int32(intValue), true
Expand Down
12 changes: 12 additions & 0 deletions utils/annotations/annotations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,4 +483,16 @@ func TestGetRevisionAnnotation(t *testing.T) {
})
assert.False(t, found)
assert.Equal(t, int32(0), rev)

revAR, found := GetRevisionAnnotation(&v1alpha1.AnalysisRun{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: metav1.NamespaceDefault,
Annotations: map[string]string{
RevisionAnnotation: "1",
},
},
})
assert.True(t, found)
assert.Equal(t, int32(1), revAR)
}
79 changes: 76 additions & 3 deletions utils/record/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ import (
"encoding/json"
"fmt"
"regexp"
"sort"
"strings"
"time"

argoinformers "github.com/argoproj/argo-rollouts/pkg/client/informers/externalversions/rollouts/v1alpha1"
timeutil "github.com/argoproj/argo-rollouts/utils/time"

"github.com/argoproj/notifications-engine/pkg/api"
"github.com/argoproj/notifications-engine/pkg/services"
"github.com/argoproj/notifications-engine/pkg/subscriptions"
Expand All @@ -20,6 +21,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
k8sinformers "k8s.io/client-go/informers"
Expand All @@ -32,6 +34,7 @@ import (

"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
rolloutscheme "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned/scheme"
"github.com/argoproj/argo-rollouts/utils/annotations"
logutil "github.com/argoproj/argo-rollouts/utils/log"
)

Expand Down Expand Up @@ -235,13 +238,83 @@ func (e *EventRecorderAdapter) K8sRecorder() record.EventRecorder {
return e.Recorder
}

func NewAPIFactorySettings() api.Settings {
func getAnalysisRunsFilterWithLabels(ro v1alpha1.Rollout, arInformer argoinformers.AnalysisRunInformer) (any, error) {

set := labels.Set(map[string]string{
v1alpha1.DefaultRolloutUniqueLabelKey: ro.Status.CurrentPodHash,
})

revision, _ := annotations.GetRevisionAnnotation(&ro)
ars, err := arInformer.Lister().AnalysisRuns(ro.Namespace).List(labels.SelectorFromSet(set))
if err != nil {
return nil, fmt.Errorf("error getting analysisruns from informer for namespace: %s error: %w", ro.Namespace, err)
}
if len(ars) == 0 {
return nil, nil
}

filteredArs := make([]*v1alpha1.AnalysisRun, 0, len(ars))
for _, ar := range ars {
arRevision, _ := annotations.GetRevisionAnnotation(ar)
if arRevision == revision {
filteredArs = append(filteredArs, ar)
}
}

sort.Slice(filteredArs, func(i, j int) bool {
ts1 := filteredArs[i].ObjectMeta.CreationTimestamp.Time
ts2 := filteredArs[j].ObjectMeta.CreationTimestamp.Time
return ts1.After(ts2)
})

var arsObj any
arBytes, err := json.Marshal(filteredArs)

if err != nil {
return nil, fmt.Errorf("Failed to marshal analysisRuns for rollout revision: %s, err: %w", string(revision), err)
}

err = json.Unmarshal(arBytes, &arsObj)
if err != nil {
return nil, fmt.Errorf("Failed to unmarshal analysisRuns for rollout revision: %s, err: %w", string(revision), err)
}

return arsObj, nil
}

func NewAPIFactorySettings(arInformer argoinformers.AnalysisRunInformer) api.Settings {
return api.Settings{
SecretName: NotificationSecret,
ConfigMapName: NotificationConfigMap,
InitGetVars: func(cfg *api.Config, configMap *corev1.ConfigMap, secret *corev1.Secret) (api.GetVars, error) {
return func(obj map[string]any, dest services.Destination) map[string]any {
return map[string]any{"rollout": obj, "time": timeExprs}

var vars = map[string]any{"rollout": obj, "time": timeExprs}

if arInformer == nil {
log.Infof("Notification is not set for analysisRun Informer: %s", dest)
return vars
}

var ro v1alpha1.Rollout
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj, &ro)

if err != nil {
log.Errorf("unable to send notification: bad rollout object: %v", err)
return vars
}

arsObj, err := getAnalysisRunsFilterWithLabels(ro, arInformer)

if err != nil {
log.Errorf("Error calling getAnalysisRunsFilterWithLabels for namespace: %s",
ro.Namespace)
return vars

}

vars = map[string]any{"rollout": obj, "analysisRuns": arsObj, "time": timeExprs}
return vars
}, nil
},
}
Expand Down
Loading
Loading