From e447d3745f0d63bd1c7a92abc9ea30c4b0ffa3a6 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Wed, 15 Nov 2023 22:36:36 -0800 Subject: [PATCH 1/4] add target group policy controller and status updates --- cmd/aws-application-networking-k8s/main.go | 5 + ...etworking.k8s.aws_targetgrouppolicies.yaml | 12 +- controllers/eventhandlers/policy.go | 1 + controllers/targetgrouppolicy_controller.go | 158 ++++++++++++++++++ ...etworking.k8s.aws_targetgrouppolicies.yaml | 12 +- .../v1alpha1/targetgrouppolicy_types.go | 4 + pkg/gateway/model_build_targetgroup.go | 36 ++-- pkg/k8s/policyhelper/policy.go | 31 +++- pkg/k8s/policyhelper/policy_test.go | 47 ++++++ 9 files changed, 290 insertions(+), 16 deletions(-) create mode 100644 controllers/targetgrouppolicy_controller.go diff --git a/cmd/aws-application-networking-k8s/main.go b/cmd/aws-application-networking-k8s/main.go index 3c5d99d7..41ed9f71 100644 --- a/cmd/aws-application-networking-k8s/main.go +++ b/cmd/aws-application-networking-k8s/main.go @@ -187,6 +187,11 @@ func main() { setupLog.Fatalf("iam auth policy controller setup failed: %s", err) } + err = controllers.RegisterTargetGroupPolicyController(ctrlLog.Named("target-group-policy"), mgr) + if err != nil { + setupLog.Fatalf("target group policy controller setup failed: %s", err) + } + err = controllers.RegisterVpcAssociationPolicyController(ctrlLog.Named("vpc-association-policy"), mgr, cloud) if err != nil { setupLog.Fatalf("vpc association policy controller setup failed: %s", err) diff --git a/config/crds/bases/application-networking.k8s.aws_targetgrouppolicies.yaml b/config/crds/bases/application-networking.k8s.aws_targetgrouppolicies.yaml index b04f792e..17fbd81f 100644 --- a/config/crds/bases/application-networking.k8s.aws_targetgrouppolicies.yaml +++ b/config/crds/bases/application-networking.k8s.aws_targetgrouppolicies.yaml @@ -157,7 +157,14 @@ spec: - targetRef type: object status: - description: TargetGroupPolicyStatus defines the observed state of AccessLogPolicy. + default: + conditions: + - lastTransitionTime: "1970-01-01T00:00:00Z" + message: Waiting for controller + reason: NotReconciled + status: Unknown + type: Accepted + description: Status defines the current state of TargetGroupPolicy. properties: conditions: default: @@ -254,4 +261,5 @@ spec: type: object served: true storage: true - subresources: {} + subresources: + status: {} diff --git a/controllers/eventhandlers/policy.go b/controllers/eventhandlers/policy.go index ec2b1270..20d769a6 100644 --- a/controllers/eventhandlers/policy.go +++ b/controllers/eventhandlers/policy.go @@ -27,6 +27,7 @@ func (h *policyEventHandler[T]) MapObjectToPolicy() handler.EventHandler { func (h *policyEventHandler[T]) mapObjectToPolicy(eventObj client.Object) []reconcile.Request { var requests []reconcile.Request + policies, err := policyhelper.GetAttachedPolicies(context.Background(), h.client, k8s.NamespacedName(eventObj), *new(T)) if err != nil { h.log.Errorf("Failed calling k8s operation: %s", err.Error()) diff --git a/controllers/targetgrouppolicy_controller.go b/controllers/targetgrouppolicy_controller.go new file mode 100644 index 00000000..1aa9b550 --- /dev/null +++ b/controllers/targetgrouppolicy_controller.go @@ -0,0 +1,158 @@ +package controllers + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/source" + gwv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + + anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1" + "github.com/aws/aws-application-networking-k8s/pkg/k8s" + "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" +) + +type TargetGroupPolicyController struct { + log gwlog.Logger + client client.Client +} + +func RegisterTargetGroupPolicyController(log gwlog.Logger, mgr ctrl.Manager) error { + controller := &TargetGroupPolicyController{ + log: log, + client: mgr.GetClient(), + } + mapfn := targetGroupPolicyMapFunc(mgr.GetClient(), log) + return ctrl.NewControllerManagedBy(mgr). + For(&anv1alpha1.TargetGroupPolicy{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). + Watches(&source.Kind{Type: &corev1.Service{}}, handler.EnqueueRequestsFromMapFunc(mapfn)). + Complete(controller) +} + +func (c *TargetGroupPolicyController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + tgPolicy := &anv1alpha1.TargetGroupPolicy{} + err := c.client.Get(ctx, req.NamespacedName, tgPolicy) + if err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + c.log.Infow("reconcile target group policy", "req", req, "targetRef", tgPolicy.Spec.TargetRef) + + validationErr := c.validateSpec(ctx, tgPolicy) + reason := validationErrToStatusReason(validationErr) + msg := "" + if validationErr != nil { + msg = validationErr.Error() + } + c.updatePolicyCondition(tgPolicy, reason, msg) + err = c.client.Status().Update(ctx, tgPolicy) + if err != nil { + return ctrl.Result{}, err + } + + c.log.Infow("reconciled target group policy", + "req", req, + "targetRef", tgPolicy.Spec.TargetRef, + ) + return ctrl.Result{}, nil +} + +func (c *TargetGroupPolicyController) validateSpec(ctx context.Context, tgPolicy *anv1alpha1.TargetGroupPolicy) error { + tr := tgPolicy.Spec.TargetRef + if tr.Group != corev1.GroupName { + return fmt.Errorf("%w: %s", GroupNameError, tr.Group) + } + if string(tr.Kind) != "Service" { + return fmt.Errorf("%w: %s", KindError, tr.Kind) + } + conflictingPolicies, err := c.findConflictingPolicies(ctx, tgPolicy) + if err != nil { + return err + } + if len(conflictingPolicies) > 0 { + return fmt.Errorf("%w, policies: %v", TargetRefConflict, conflictingPolicies) + } + refExists, err := c.targetRefExists(ctx, tgPolicy) + if err != nil { + return err + } + if !refExists { + return fmt.Errorf("%w: %s", TargetRefNotFound, tr.Name) + } + return nil +} + +func (c *TargetGroupPolicyController) targetRefExists(ctx context.Context, tgPolicy *anv1alpha1.TargetGroupPolicy) (bool, error) { + tr := tgPolicy.Spec.TargetRef + var obj client.Object + switch tr.Kind { + case "Service": + obj = &corev1.Service{} + default: + panic("unexpected targetRef Kind=" + tr.Kind) + } + return k8s.ObjExists(ctx, c.client, types.NamespacedName{ + Namespace: tgPolicy.Namespace, + Name: string(tr.Name), + }, obj) +} + +func (c *TargetGroupPolicyController) findConflictingPolicies(ctx context.Context, tgPolicy *anv1alpha1.TargetGroupPolicy) ([]string, error) { + var out []string + policies := &anv1alpha1.TargetGroupPolicyList{} + err := c.client.List(ctx, policies, &client.ListOptions{ + Namespace: tgPolicy.Namespace, + }) + if err != nil { + return out, err + } + for _, p := range policies.Items { + if tgPolicy.Name == p.Name { + continue + } + if *tgPolicy.Spec.TargetRef == *p.Spec.TargetRef { + out = append(out, p.Name) + } + } + return out, nil +} + +func (c *TargetGroupPolicyController) updatePolicyCondition(tgPolicy *anv1alpha1.TargetGroupPolicy, reason gwv1alpha2.PolicyConditionReason, msg string) { + status := metav1.ConditionTrue + if reason != gwv1alpha2.PolicyReasonAccepted { + status = metav1.ConditionFalse + } + cnd := metav1.Condition{ + Type: string(gwv1alpha2.PolicyConditionAccepted), + Status: status, + Reason: string(reason), + Message: msg, + } + meta.SetStatusCondition(&tgPolicy.Status.Conditions, cnd) +} + +func targetGroupPolicyMapFunc(c client.Client, log gwlog.Logger) handler.MapFunc { + return func(obj client.Object) []ctrl.Request { + requests := []ctrl.Request{} + policies := &anv1alpha1.TargetGroupPolicyList{} + err := c.List(context.Background(), policies, &client.ListOptions{Namespace: obj.GetNamespace()}) + if err != nil { + log.Error(err) + return requests + } + for _, policy := range policies.Items { + if obj.GetName() == string(policy.Spec.TargetRef.Name) { + requests = append(requests, ctrl.Request{NamespacedName: policy.GetNamespacedName()}) + } + } + return requests + } +} diff --git a/helm/crds/application-networking.k8s.aws_targetgrouppolicies.yaml b/helm/crds/application-networking.k8s.aws_targetgrouppolicies.yaml index b04f792e..17fbd81f 100644 --- a/helm/crds/application-networking.k8s.aws_targetgrouppolicies.yaml +++ b/helm/crds/application-networking.k8s.aws_targetgrouppolicies.yaml @@ -157,7 +157,14 @@ spec: - targetRef type: object status: - description: TargetGroupPolicyStatus defines the observed state of AccessLogPolicy. + default: + conditions: + - lastTransitionTime: "1970-01-01T00:00:00Z" + message: Waiting for controller + reason: NotReconciled + status: Unknown + type: Accepted + description: Status defines the current state of TargetGroupPolicy. properties: conditions: default: @@ -254,4 +261,5 @@ spec: type: object served: true storage: true - subresources: {} + subresources: + status: {} diff --git a/pkg/apis/applicationnetworking/v1alpha1/targetgrouppolicy_types.go b/pkg/apis/applicationnetworking/v1alpha1/targetgrouppolicy_types.go index d9855bc6..a822cec4 100644 --- a/pkg/apis/applicationnetworking/v1alpha1/targetgrouppolicy_types.go +++ b/pkg/apis/applicationnetworking/v1alpha1/targetgrouppolicy_types.go @@ -19,12 +19,16 @@ const ( // +kubebuilder:resource:categories=gateway-api,shortName=tgp // +kubebuilder:storageversion // +kubebuilder:printcolumn:name="Age",type=date,JSONPath=`.metadata.creationTimestamp` +// +kubebuilder:subresource:status type TargetGroupPolicy struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` Spec TargetGroupPolicySpec `json:"spec"` + // Status defines the current state of TargetGroupPolicy. + // + // +kubebuilder:default={conditions: {{type: "Accepted", status: "Unknown", reason:"NotReconciled", message:"Waiting for controller", lastTransitionTime: "1970-01-01T00:00:00Z"}}} Status TargetGroupPolicyStatus `json:"status,omitempty"` } diff --git a/pkg/gateway/model_build_targetgroup.go b/pkg/gateway/model_build_targetgroup.go index cea39d2b..9d4c42d7 100644 --- a/pkg/gateway/model_build_targetgroup.go +++ b/pkg/gateway/model_build_targetgroup.go @@ -5,16 +5,12 @@ import ( "errors" "fmt" - apierrors "k8s.io/apimachinery/pkg/api/errors" - - "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" - + "github.com/aws/aws-sdk-go/service/vpclattice" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/aws/aws-sdk-go/service/vpclattice" + "sigs.k8s.io/gateway-api/apis/v1alpha2" anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1" "github.com/aws/aws-application-networking-k8s/pkg/config" @@ -22,6 +18,7 @@ import ( "github.com/aws/aws-application-networking-k8s/pkg/k8s/policyhelper" "github.com/aws/aws-application-networking-k8s/pkg/model/core" model "github.com/aws/aws-application-networking-k8s/pkg/model/lattice" + "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" ) type InvalidBackendRefError struct { @@ -147,7 +144,19 @@ func (t *svcExportTargetGroupModelBuildTask) buildTargetGroup(ctx context.Contex } } - tgps, err := policyhelper.GetAttachedPolicies(ctx, t.client, k8s.NamespacedName(t.serviceExport), &anv1alpha1.TargetGroupPolicy{}) + policyConditionFilter := policyhelper.PolicyConditionFilter{ + Reasons: []v1alpha2.PolicyConditionReason{ + v1alpha2.PolicyReasonAccepted, + v1alpha2.PolicyReasonTargetNotFound, // policy was created before service + }, + } + + tgps, err := policyhelper.GetAttachedPoliciesConditionFilter( + ctx, t.client, + k8s.NamespacedName(t.serviceExport), + &anv1alpha1.TargetGroupPolicy{}, + policyConditionFilter, + ) if err != nil { return nil, err } @@ -156,7 +165,6 @@ func (t *svcExportTargetGroupModelBuildTask) buildTargetGroup(ctx context.Contex protocolVersion := vpclattice.TargetGroupProtocolVersionHttp1 var healthCheckConfig *vpclattice.HealthCheckConfig if len(tgps) > 0 { - // TODO: TGP conflicts should be handled correctly w/ status update, for now just picking up one tgp := tgps[0] if tgp.Spec.Protocol != nil { protocol = *tgp.Spec.Protocol @@ -316,7 +324,15 @@ func (t *backendRefTargetGroupModelBuildTask) buildTargetGroupSpec(ctx context.C } } - tgps, err := policyhelper.GetAttachedPolicies(ctx, t.client, backendRefNsName, &anv1alpha1.TargetGroupPolicy{}) + policyConditionFilter := policyhelper.PolicyConditionFilter{ + Reasons: []v1alpha2.PolicyConditionReason{ + v1alpha2.PolicyReasonAccepted, + v1alpha2.PolicyReasonTargetNotFound, // policy was created before service + }, + } + + tgps, err := policyhelper.GetAttachedPoliciesConditionFilter(ctx, t.client, backendRefNsName, + &anv1alpha1.TargetGroupPolicy{}, policyConditionFilter) if err != nil { return model.TargetGroupSpec{}, err } diff --git a/pkg/k8s/policyhelper/policy.go b/pkg/k8s/policyhelper/policy.go index b6237d75..6eb30a99 100644 --- a/pkg/k8s/policyhelper/policy.go +++ b/pkg/k8s/policyhelper/policy.go @@ -6,8 +6,10 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" + apimachineryv1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" + gwv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" gwv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1" @@ -20,7 +22,11 @@ type policyInfo struct { kind gwv1beta1.Kind } -func GetAttachedPolicies[T core.Policy](ctx context.Context, k8sClient client.Client, searchTargetRef types.NamespacedName, policy T) ([]T, error) { +type PolicyConditionFilter struct { + Reasons []gwv1alpha2.PolicyConditionReason +} + +func GetAttachedPoliciesConditionFilter[T core.Policy](ctx context.Context, k8sClient client.Client, searchTargetRef types.NamespacedName, policy T, filter PolicyConditionFilter) ([]T, error) { var policies []T info, err := getPolicyInfo(policy) if err != nil { @@ -51,13 +57,18 @@ func GetAttachedPolicies[T core.Policy](ctx context.Context, k8sClient client.Cl retrievedNamespace = string(*targetRef.Namespace) } namespaceMatch := retrievedNamespace == searchTargetRef.Namespace - if groupKindMatch && nameMatch && namespaceMatch { + filterMatch := policyConditionMatch(p.GetStatusConditions(), filter.Reasons) + if groupKindMatch && nameMatch && namespaceMatch && filterMatch { policies = append(policies, p.(T)) } } return policies, nil } +func GetAttachedPolicies[T core.Policy](ctx context.Context, k8sClient client.Client, searchTargetRef types.NamespacedName, policy T) ([]T, error) { + return GetAttachedPoliciesConditionFilter(ctx, k8sClient, searchTargetRef, policy, PolicyConditionFilter{}) +} + func getPolicyInfo(policyType core.Policy) (policyInfo, error) { switch policyType.(type) { case *anv1alpha1.VpcAssociationPolicy: @@ -76,3 +87,19 @@ func getPolicyInfo(policyType core.Policy) (policyInfo, error) { return policyInfo{}, fmt.Errorf("unsupported policy type %T", policyType) } } + +func policyConditionMatch(cnds []apimachineryv1.Condition, filter []gwv1alpha2.PolicyConditionReason) bool { + if cnds == nil || filter == nil || len(filter) == 0 { + return true + } + cnd := meta.FindStatusCondition(cnds, string(gwv1alpha2.PolicyConditionAccepted)) + if cnd == nil { + return false + } + for _, reason := range filter { + if cnd.Reason == string(reason) { + return true + } + } + return false +} diff --git a/pkg/k8s/policyhelper/policy_test.go b/pkg/k8s/policyhelper/policy_test.go index 7b140921..ec47c14d 100644 --- a/pkg/k8s/policyhelper/policy_test.go +++ b/pkg/k8s/policyhelper/policy_test.go @@ -7,6 +7,7 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" + apimachineryv1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" gwv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" @@ -207,3 +208,49 @@ func Test_GetAttachedPolicies(t *testing.T) { }) } } + +func TestPolicyConditionMatch(t *testing.T) { + type Condition = apimachineryv1.Condition + type Reason = gwv1alpha2.PolicyConditionReason + + type Test struct { + name string + cnds []Condition + filter []Reason + want bool + } + + tests := []Test{ + {"nil", nil, nil, true}, + {"nil filter", []Condition{{}}, nil, true}, + {"0 len filter", []Condition{{}}, []Reason{}, true}, + {"condition not found", []Condition{{}}, []Reason{"Accepted"}, false}, + { + "condition does not match", + []Condition{{Type: string(gwv1alpha2.PolicyConditionAccepted), Reason: "NotReconciled"}}, + []Reason{"Accepted"}, false, + }, + { + "condtion match", + []Condition{{Type: string(gwv1alpha2.PolicyConditionAccepted), Reason: "Accepted"}}, + []Reason{"Accepted"}, + true, + }, + { + "condtion match mutli-filter", + []Condition{{Type: string(gwv1alpha2.PolicyConditionAccepted), Reason: "Accepted"}}, + []Reason{"NotFound", "Accepted"}, + true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + match := policyConditionMatch(tt.cnds, tt.filter) + if match != tt.want { + t.Errorf("filter does not match, cnds=%v, filter=%v, want=%t", tt.cnds, tt.filter, tt.want) + } + }) + } + +} From 8edbceed0f58f71b5ca164b89ad89f9dfa292d6e Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Wed, 15 Nov 2023 23:00:49 -0800 Subject: [PATCH 2/4] controller v0.16 update --- controllers/targetgrouppolicy_controller.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/controllers/targetgrouppolicy_controller.go b/controllers/targetgrouppolicy_controller.go index 1aa9b550..30e95186 100644 --- a/controllers/targetgrouppolicy_controller.go +++ b/controllers/targetgrouppolicy_controller.go @@ -13,7 +13,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/source" gwv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1" @@ -34,7 +33,7 @@ func RegisterTargetGroupPolicyController(log gwlog.Logger, mgr ctrl.Manager) err mapfn := targetGroupPolicyMapFunc(mgr.GetClient(), log) return ctrl.NewControllerManagedBy(mgr). For(&anv1alpha1.TargetGroupPolicy{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). - Watches(&source.Kind{Type: &corev1.Service{}}, handler.EnqueueRequestsFromMapFunc(mapfn)). + Watches(&corev1.Service{}, handler.EnqueueRequestsFromMapFunc(mapfn)). Complete(controller) } @@ -140,10 +139,10 @@ func (c *TargetGroupPolicyController) updatePolicyCondition(tgPolicy *anv1alpha1 } func targetGroupPolicyMapFunc(c client.Client, log gwlog.Logger) handler.MapFunc { - return func(obj client.Object) []ctrl.Request { + return func(ctx context.Context, obj client.Object) []ctrl.Request { requests := []ctrl.Request{} policies := &anv1alpha1.TargetGroupPolicyList{} - err := c.List(context.Background(), policies, &client.ListOptions{Namespace: obj.GetNamespace()}) + err := c.List(ctx, policies, &client.ListOptions{Namespace: obj.GetNamespace()}) if err != nil { log.Error(err) return requests From a67254a6bf822216d8d8e84e9d9a99fbe1516670 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Wed, 15 Nov 2023 23:17:29 -0800 Subject: [PATCH 3/4] update policy filter condition match --- pkg/k8s/policyhelper/policy.go | 2 +- pkg/k8s/policyhelper/policy_test.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/k8s/policyhelper/policy.go b/pkg/k8s/policyhelper/policy.go index 6eb30a99..6d92feb8 100644 --- a/pkg/k8s/policyhelper/policy.go +++ b/pkg/k8s/policyhelper/policy.go @@ -89,7 +89,7 @@ func getPolicyInfo(policyType core.Policy) (policyInfo, error) { } func policyConditionMatch(cnds []apimachineryv1.Condition, filter []gwv1alpha2.PolicyConditionReason) bool { - if cnds == nil || filter == nil || len(filter) == 0 { + if filter == nil || len(filter) == 0 { return true } cnd := meta.FindStatusCondition(cnds, string(gwv1alpha2.PolicyConditionAccepted)) diff --git a/pkg/k8s/policyhelper/policy_test.go b/pkg/k8s/policyhelper/policy_test.go index ec47c14d..07e83ea8 100644 --- a/pkg/k8s/policyhelper/policy_test.go +++ b/pkg/k8s/policyhelper/policy_test.go @@ -223,6 +223,7 @@ func TestPolicyConditionMatch(t *testing.T) { tests := []Test{ {"nil", nil, nil, true}, {"nil filter", []Condition{{}}, nil, true}, + {"nil conditions with filter", nil, []Reason{"x"}, false}, {"0 len filter", []Condition{{}}, []Reason{}, true}, {"condition not found", []Condition{{}}, []Reason{"Accepted"}, false}, { From 6236a9feef8ada038750a08d071fa4eb8431914e Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Thu, 16 Nov 2023 15:53:29 -0800 Subject: [PATCH 4/4] change conflict resolution for target-group-policy --- cmd/aws-application-networking-k8s/main.go | 2 +- controllers/targetgrouppolicy_controller.go | 33 +++----- .../vpcassociationpolicy_controller.go | 56 +++++++------ pkg/gateway/model_build_targetgroup.go | 33 ++------ pkg/k8s/policyhelper/policy.go | 64 +++++++++------ pkg/k8s/policyhelper/policy_test.go | 78 ++++++++----------- pkg/model/core/policy.go | 2 + 7 files changed, 116 insertions(+), 152 deletions(-) diff --git a/cmd/aws-application-networking-k8s/main.go b/cmd/aws-application-networking-k8s/main.go index 7f59fec3..a8ba2f64 100644 --- a/cmd/aws-application-networking-k8s/main.go +++ b/cmd/aws-application-networking-k8s/main.go @@ -198,7 +198,7 @@ func main() { setupLog.Fatalf("target group policy controller setup failed: %s", err) } - err = controllers.RegisterVpcAssociationPolicyController(ctrlLog.Named("vpc-association-policy"), mgr, cloud) + err = controllers.RegisterVpcAssociationPolicyController(ctrlLog.Named("vpc-association-policy"), cloud, finalizerManager, mgr) if err != nil { setupLog.Fatalf("vpc association policy controller setup failed: %s", err) } diff --git a/controllers/targetgrouppolicy_controller.go b/controllers/targetgrouppolicy_controller.go index 30e95186..053e5353 100644 --- a/controllers/targetgrouppolicy_controller.go +++ b/controllers/targetgrouppolicy_controller.go @@ -17,6 +17,7 @@ import ( anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1" "github.com/aws/aws-application-networking-k8s/pkg/k8s" + "github.com/aws/aws-application-networking-k8s/pkg/k8s/policyhelper" "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" ) @@ -72,12 +73,16 @@ func (c *TargetGroupPolicyController) validateSpec(ctx context.Context, tgPolicy if string(tr.Kind) != "Service" { return fmt.Errorf("%w: %s", KindError, tr.Kind) } - conflictingPolicies, err := c.findConflictingPolicies(ctx, tgPolicy) + tgref := types.NamespacedName{ + Namespace: tgPolicy.Namespace, + Name: string(tgPolicy.Spec.TargetRef.Name), + } + valid, err := policyhelper.GetValidPolicy(ctx, c.client, tgref, tgPolicy) if err != nil { - return err + return nil } - if len(conflictingPolicies) > 0 { - return fmt.Errorf("%w, policies: %v", TargetRefConflict, conflictingPolicies) + if valid != nil && valid.GetNamespacedName() != tgPolicy.GetNamespacedName() { + return fmt.Errorf("%w, with policy %s", TargetRefConflict, valid.GetName()) } refExists, err := c.targetRefExists(ctx, tgPolicy) if err != nil { @@ -104,26 +109,6 @@ func (c *TargetGroupPolicyController) targetRefExists(ctx context.Context, tgPol }, obj) } -func (c *TargetGroupPolicyController) findConflictingPolicies(ctx context.Context, tgPolicy *anv1alpha1.TargetGroupPolicy) ([]string, error) { - var out []string - policies := &anv1alpha1.TargetGroupPolicyList{} - err := c.client.List(ctx, policies, &client.ListOptions{ - Namespace: tgPolicy.Namespace, - }) - if err != nil { - return out, err - } - for _, p := range policies.Items { - if tgPolicy.Name == p.Name { - continue - } - if *tgPolicy.Spec.TargetRef == *p.Spec.TargetRef { - out = append(out, p.Name) - } - } - return out, nil -} - func (c *TargetGroupPolicyController) updatePolicyCondition(tgPolicy *anv1alpha1.TargetGroupPolicy, reason gwv1alpha2.PolicyConditionReason, msg string) { status := metav1.ConditionTrue if reason != gwv1alpha2.PolicyReasonAccepted { diff --git a/controllers/vpcassociationpolicy_controller.go b/controllers/vpcassociationpolicy_controller.go index f7a74c5c..c05ea17f 100644 --- a/controllers/vpcassociationpolicy_controller.go +++ b/controllers/vpcassociationpolicy_controller.go @@ -11,31 +11,37 @@ import ( "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" "github.com/aws/aws-application-networking-k8s/controllers/eventhandlers" + "github.com/aws/aws-application-networking-k8s/pkg/k8s" "github.com/aws/aws-application-networking-k8s/pkg/utils" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/predicate" gwv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" gwv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" ) +const ( + finalizer = "vpcassociationpolicies.application-networking.k8s.aws/resources" +) + type vpcAssociationPolicyReconciler struct { - log gwlog.Logger - client client.Client - cloud pkg_aws.Cloud - manager deploy.ServiceNetworkManager + log gwlog.Logger + client client.Client + cloud pkg_aws.Cloud + finalizerManager k8s.FinalizerManager + manager deploy.ServiceNetworkManager } -func RegisterVpcAssociationPolicyController(log gwlog.Logger, mgr ctrl.Manager, cloud pkg_aws.Cloud) error { +func RegisterVpcAssociationPolicyController(log gwlog.Logger, cloud pkg_aws.Cloud, finalizerManager k8s.FinalizerManager, mgr ctrl.Manager) error { controller := &vpcAssociationPolicyReconciler{ - log: log, - client: mgr.GetClient(), - cloud: cloud, - manager: deploy.NewDefaultServiceNetworkManager(log, cloud), + log: log, + client: mgr.GetClient(), + cloud: cloud, + finalizerManager: finalizerManager, + manager: deploy.NewDefaultServiceNetworkManager(log, cloud), } eh := eventhandlers.NewPolicyEventHandler(log, mgr.GetClient(), &anv1alpha1.VpcAssociationPolicy{}) @@ -77,11 +83,6 @@ func (c *vpcAssociationPolicyReconciler) Reconcile(ctx context.Context, req ctrl return ctrl.Result{RequeueAfter: time.Second * 30}, nil } - err = c.handleFinalizer(ctx, k8sPolicy) - if err != nil { - return ctrl.Result{}, err - } - c.log.Infow("reconciled vpc association policy", "req", req, "targetRef", k8sPolicy.Spec.TargetRef, @@ -90,21 +91,11 @@ func (c *vpcAssociationPolicyReconciler) Reconcile(ctx context.Context, req ctrl return ctrl.Result{}, nil } -func (c *vpcAssociationPolicyReconciler) handleFinalizer(ctx context.Context, k8sPolicy *anv1alpha1.VpcAssociationPolicy) error { - finalizer := "vpcassociationpolicies.application-networking.k8s.aws/resources" - if k8sPolicy.DeletionTimestamp.IsZero() { - if !controllerutil.ContainsFinalizer(k8sPolicy, finalizer) { - controllerutil.AddFinalizer(k8sPolicy, finalizer) - } - } else { - if controllerutil.ContainsFinalizer(k8sPolicy, finalizer) { - controllerutil.RemoveFinalizer(k8sPolicy, finalizer) - } - } - return c.client.Update(ctx, k8sPolicy) -} - func (c *vpcAssociationPolicyReconciler) upsert(ctx context.Context, k8sPolicy *anv1alpha1.VpcAssociationPolicy) error { + err := c.finalizerManager.AddFinalizers(ctx, k8sPolicy, finalizer) + if err != nil { + return err + } snName := string(k8sPolicy.Spec.TargetRef.Name) sgIds := utils.SliceMap(k8sPolicy.Spec.SecurityGroupIds, func(sg anv1alpha1.SecurityGroupId) *string { str := string(sg) @@ -132,6 +123,10 @@ func (c *vpcAssociationPolicyReconciler) delete(ctx context.Context, k8sPolicy * if err != nil { return c.handleDeleteError(err) } + err = c.finalizerManager.RemoveFinalizers(ctx, k8sPolicy, finalizer) + if err != nil { + return err + } return nil } @@ -169,6 +164,9 @@ func (c *vpcAssociationPolicyReconciler) handleDeleteError(err error) error { } func (c *vpcAssociationPolicyReconciler) updateLatticeAnnotation(ctx context.Context, k8sPolicy *anv1alpha1.VpcAssociationPolicy, resArn string) error { + if k8sPolicy.Annotations == nil { + k8sPolicy.Annotations = make(map[string]string) + } k8sPolicy.Annotations["application-networking.k8s.aws/resourceArn"] = resArn err := c.client.Update(ctx, k8sPolicy) return err diff --git a/pkg/gateway/model_build_targetgroup.go b/pkg/gateway/model_build_targetgroup.go index 3ea51a92..443e6c00 100644 --- a/pkg/gateway/model_build_targetgroup.go +++ b/pkg/gateway/model_build_targetgroup.go @@ -10,7 +10,6 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/gateway-api/apis/v1alpha2" anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1" "github.com/aws/aws-application-networking-k8s/pkg/config" @@ -146,19 +145,8 @@ func (t *svcExportTargetGroupModelBuildTask) buildTargetGroup(ctx context.Contex } } - policyConditionFilter := policyhelper.PolicyConditionFilter{ - Reasons: []v1alpha2.PolicyConditionReason{ - v1alpha2.PolicyReasonAccepted, - v1alpha2.PolicyReasonTargetNotFound, // policy was created before service - }, - } - - tgps, err := policyhelper.GetAttachedPoliciesConditionFilter( - ctx, t.client, - k8s.NamespacedName(t.serviceExport), - &anv1alpha1.TargetGroupPolicy{}, - policyConditionFilter, - ) + tgp, err := policyhelper.GetValidPolicy(ctx, t.client, + k8s.NamespacedName(t.serviceExport), &anv1alpha1.TargetGroupPolicy{}) if err != nil { return nil, err } @@ -166,8 +154,7 @@ func (t *svcExportTargetGroupModelBuildTask) buildTargetGroup(ctx context.Contex protocol := "HTTP" protocolVersion := vpclattice.TargetGroupProtocolVersionHttp1 var healthCheckConfig *vpclattice.HealthCheckConfig - if len(tgps) > 0 { - tgp := tgps[0] + if tgp != nil { if tgp.Spec.Protocol != nil { protocol = *tgp.Spec.Protocol } @@ -326,15 +313,7 @@ func (t *backendRefTargetGroupModelBuildTask) buildTargetGroupSpec(ctx context.C } } - policyConditionFilter := policyhelper.PolicyConditionFilter{ - Reasons: []v1alpha2.PolicyConditionReason{ - v1alpha2.PolicyReasonAccepted, - v1alpha2.PolicyReasonTargetNotFound, // policy was created before service - }, - } - - tgps, err := policyhelper.GetAttachedPoliciesConditionFilter(ctx, t.client, backendRefNsName, - &anv1alpha1.TargetGroupPolicy{}, policyConditionFilter) + tgp, err := policyhelper.GetValidPolicy(ctx, t.client, backendRefNsName, &anv1alpha1.TargetGroupPolicy{}) if err != nil { return model.TargetGroupSpec{}, err } @@ -342,9 +321,7 @@ func (t *backendRefTargetGroupModelBuildTask) buildTargetGroupSpec(ctx context.C protocol := "HTTP" protocolVersion := vpclattice.TargetGroupProtocolVersionHttp1 var healthCheckConfig *vpclattice.HealthCheckConfig - if len(tgps) > 0 { - // TODO: TGP conflicts should be handled correctly w/ status update, for now just picking up one - tgp := tgps[0] + if tgp != nil { if tgp.Spec.Protocol != nil { protocol = *tgp.Spec.Protocol } diff --git a/pkg/k8s/policyhelper/policy.go b/pkg/k8s/policyhelper/policy.go index 6d92feb8..d52bd12f 100644 --- a/pkg/k8s/policyhelper/policy.go +++ b/pkg/k8s/policyhelper/policy.go @@ -3,13 +3,13 @@ package policyhelper import ( "context" "fmt" + "strings" + "golang.org/x/exp/slices" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" - apimachineryv1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" - gwv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" gwv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1" @@ -22,11 +22,20 @@ type policyInfo struct { kind gwv1beta1.Kind } -type PolicyConditionFilter struct { - Reasons []gwv1alpha2.PolicyConditionReason +func GetValidPolicy[T core.Policy](ctx context.Context, k8sClient client.Client, searchTargetRef types.NamespacedName, policy T) (T, error) { + var empty T + policies, err := GetAttachedPolicies(ctx, k8sClient, searchTargetRef, policy) + conflictResolutionSort(policies) + if err != nil { + return empty, err + } + if len(policies) == 0 { + return empty, nil + } + return policies[0], nil } -func GetAttachedPoliciesConditionFilter[T core.Policy](ctx context.Context, k8sClient client.Client, searchTargetRef types.NamespacedName, policy T, filter PolicyConditionFilter) ([]T, error) { +func GetAttachedPolicies[T core.Policy](ctx context.Context, k8sClient client.Client, searchTargetRef types.NamespacedName, policy T) ([]T, error) { var policies []T info, err := getPolicyInfo(policy) if err != nil { @@ -57,18 +66,13 @@ func GetAttachedPoliciesConditionFilter[T core.Policy](ctx context.Context, k8sC retrievedNamespace = string(*targetRef.Namespace) } namespaceMatch := retrievedNamespace == searchTargetRef.Namespace - filterMatch := policyConditionMatch(p.GetStatusConditions(), filter.Reasons) - if groupKindMatch && nameMatch && namespaceMatch && filterMatch { + if groupKindMatch && nameMatch && namespaceMatch { policies = append(policies, p.(T)) } } return policies, nil } -func GetAttachedPolicies[T core.Policy](ctx context.Context, k8sClient client.Client, searchTargetRef types.NamespacedName, policy T) ([]T, error) { - return GetAttachedPoliciesConditionFilter(ctx, k8sClient, searchTargetRef, policy, PolicyConditionFilter{}) -} - func getPolicyInfo(policyType core.Policy) (policyInfo, error) { switch policyType.(type) { case *anv1alpha1.VpcAssociationPolicy: @@ -88,18 +92,30 @@ func getPolicyInfo(policyType core.Policy) (policyInfo, error) { } } -func policyConditionMatch(cnds []apimachineryv1.Condition, filter []gwv1alpha2.PolicyConditionReason) bool { - if filter == nil || len(filter) == 0 { - return true - } - cnd := meta.FindStatusCondition(cnds, string(gwv1alpha2.PolicyConditionAccepted)) - if cnd == nil { - return false - } - for _, reason := range filter { - if cnd.Reason == string(reason) { - return true +// sort in-place for policy conflict resolution +// 1. older policy (CreationTimeStamp) has precedence +// 2. alphabetical order namespace, then name +func conflictResolutionSort[T core.Policy](policies []T) { + slices.SortFunc(policies, func(a, b T) int { + tsA := a.GetCreationTimestamp().Time + tsB := b.GetCreationTimestamp().Time + switch { + case tsA.Before(tsB): + return -1 + case tsA.After(tsB): + return 1 + default: + nsnA := a.GetNamespacedName() + nsnB := b.GetNamespacedName() + nsA := nsnA.Namespace + nsB := nsnB.Namespace + nsCmp := strings.Compare(nsA, nsB) + if nsCmp != 0 { + return nsCmp + } + nA := nsnA.Name + nB := nsnB.Name + return strings.Compare(nA, nB) } - } - return false + }) } diff --git a/pkg/k8s/policyhelper/policy_test.go b/pkg/k8s/policyhelper/policy_test.go index 07e83ea8..8b942361 100644 --- a/pkg/k8s/policyhelper/policy_test.go +++ b/pkg/k8s/policyhelper/policy_test.go @@ -3,11 +3,11 @@ package policyhelper import ( "context" "testing" + "time" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" - apimachineryv1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" gwv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" @@ -209,49 +209,35 @@ func Test_GetAttachedPolicies(t *testing.T) { } } -func TestPolicyConditionMatch(t *testing.T) { - type Condition = apimachineryv1.Condition - type Reason = gwv1alpha2.PolicyConditionReason - - type Test struct { - name string - cnds []Condition - filter []Reason - want bool - } - - tests := []Test{ - {"nil", nil, nil, true}, - {"nil filter", []Condition{{}}, nil, true}, - {"nil conditions with filter", nil, []Reason{"x"}, false}, - {"0 len filter", []Condition{{}}, []Reason{}, true}, - {"condition not found", []Condition{{}}, []Reason{"Accepted"}, false}, - { - "condition does not match", - []Condition{{Type: string(gwv1alpha2.PolicyConditionAccepted), Reason: "NotReconciled"}}, - []Reason{"Accepted"}, false, - }, - { - "condtion match", - []Condition{{Type: string(gwv1alpha2.PolicyConditionAccepted), Reason: "Accepted"}}, - []Reason{"Accepted"}, - true, - }, - { - "condtion match mutli-filter", - []Condition{{Type: string(gwv1alpha2.PolicyConditionAccepted), Reason: "Accepted"}}, - []Reason{"NotFound", "Accepted"}, - true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - match := policyConditionMatch(tt.cnds, tt.filter) - if match != tt.want { - t.Errorf("filter does not match, cnds=%v, filter=%v, want=%t", tt.cnds, tt.filter, tt.want) - } - }) - } - +func TestConflictResolutionSort(t *testing.T) { + + t.Run("CreationTimestamp", func(t *testing.T) { + tnow := metav1.Now() + policies := []*anv1alpha1.TargetGroupPolicy{ + {ObjectMeta: metav1.ObjectMeta{CreationTimestamp: metav1.Time{Time: tnow.Add(time.Hour)}}}, + {ObjectMeta: metav1.ObjectMeta{CreationTimestamp: metav1.Time{Time: tnow.Add(time.Second)}}}, + {ObjectMeta: metav1.ObjectMeta{CreationTimestamp: tnow}}, + } + conflictResolutionSort(policies) + if policies[0].CreationTimestamp != tnow { + t.Errorf("wrong order, expect ts=%s, got ts=%s", tnow, policies[0].CreationTimestamp) + } + }) + + t.Run("Namespace and Name", func(t *testing.T) { + policies := []*anv1alpha1.TargetGroupPolicy{ + {ObjectMeta: metav1.ObjectMeta{Namespace: "a", Name: "b"}}, + {ObjectMeta: metav1.ObjectMeta{Namespace: "a", Name: "c"}}, + {ObjectMeta: metav1.ObjectMeta{Namespace: "a", Name: "a"}}, + {ObjectMeta: metav1.ObjectMeta{Namespace: "b", Name: "z"}}, + {ObjectMeta: metav1.ObjectMeta{Namespace: "b", Name: "y"}}, + } + conflictResolutionSort(policies) + if policies[0].Name != "a" { + t.Errorf("expect 'a' being first, got %s", policies[0].Name) + } + if policies[4].Name != "z" { + t.Errorf("expect 'z' being last, got %s", policies[4].Name) + } + }) } diff --git a/pkg/model/core/policy.go b/pkg/model/core/policy.go index 3216911f..4f675e2f 100644 --- a/pkg/model/core/policy.go +++ b/pkg/model/core/policy.go @@ -3,10 +3,12 @@ package core import ( apimachineryv1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" gwv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" ) type Policy interface { + client.Object GetNamespacedName() types.NamespacedName GetTargetRef() *gwv1alpha2.PolicyTargetReference GetStatusConditions() []apimachineryv1.Condition