Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Access Log Policy Controller #424

Merged
merged 10 commits into from
Oct 5, 2023
16 changes: 12 additions & 4 deletions cmd/aws-application-networking-k8s/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import (
"github.com/aws/aws-application-networking-k8s/controllers"

//+kubebuilder:scaffold:imports
"github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1"
anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1"
"github.com/aws/aws-application-networking-k8s/pkg/config"
"github.com/aws/aws-application-networking-k8s/pkg/k8s"
"github.com/aws/aws-application-networking-k8s/pkg/latticestore"
Expand Down Expand Up @@ -74,13 +74,16 @@ func addOptionalCRDs(scheme *runtime.Scheme) {
metav1.AddToGroupVersion(scheme, dnsEndpoint)

awsGatewayControllerCRDGroupVersion := schema.GroupVersion{
Group: v1alpha1.GroupName,
Group: anv1alpha1.GroupName,
Version: "v1alpha1",
}
scheme.AddKnownTypes(awsGatewayControllerCRDGroupVersion, &v1alpha1.TargetGroupPolicy{}, &v1alpha1.TargetGroupPolicyList{})
scheme.AddKnownTypes(awsGatewayControllerCRDGroupVersion, &anv1alpha1.TargetGroupPolicy{}, &anv1alpha1.TargetGroupPolicyList{})
metav1.AddToGroupVersion(scheme, awsGatewayControllerCRDGroupVersion)

scheme.AddKnownTypes(awsGatewayControllerCRDGroupVersion, &v1alpha1.VpcAssociationPolicy{}, &v1alpha1.VpcAssociationPolicyList{})
scheme.AddKnownTypes(awsGatewayControllerCRDGroupVersion, &anv1alpha1.VpcAssociationPolicy{}, &anv1alpha1.VpcAssociationPolicyList{})
metav1.AddToGroupVersion(scheme, awsGatewayControllerCRDGroupVersion)

scheme.AddKnownTypes(awsGatewayControllerCRDGroupVersion, &anv1alpha1.AccessLogPolicy{}, &anv1alpha1.AccessLogPolicyList{})
metav1.AddToGroupVersion(scheme, awsGatewayControllerCRDGroupVersion)
}

Expand Down Expand Up @@ -178,6 +181,11 @@ func main() {
setupLog.Fatalf("serviceexport controller setup failed: %s", err)
}

err = controllers.RegisterAccessLogPolicyController(ctrlLog.Named("access-log-policy"), cloud, finalizerManager, mgr)
if err != nil {
setupLog.Fatalf("accesslogpolicy controller setup failed: %s", err)
}

go latticestore.GetDefaultLatticeDataStore().ServeIntrospection()

//+kubebuilder:scaffold:builder
Expand Down
206 changes: 206 additions & 0 deletions controllers/accesslogpolicy_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
Copyright 2021.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
gwv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gwv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"

anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1"
"github.com/aws/aws-application-networking-k8s/pkg/aws"
"github.com/aws/aws-application-networking-k8s/pkg/config"
"github.com/aws/aws-application-networking-k8s/pkg/deploy"
"github.com/aws/aws-application-networking-k8s/pkg/k8s"
lattice_runtime "github.com/aws/aws-application-networking-k8s/pkg/runtime"
"github.com/aws/aws-application-networking-k8s/pkg/utils"
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"
)

const (
accessLogPolicyFinalizer = "accesslogpolicy.k8s.aws/resources"
)

type accessLogPolicyReconciler struct {
log gwlog.Logger
client client.Client
scheme *runtime.Scheme
finalizerManager k8s.FinalizerManager
eventRecorder record.EventRecorder
stackDeployer deploy.StackDeployer
cloud aws.Cloud
stackMarshaller deploy.StackMarshaller
}

func RegisterAccessLogPolicyController(
log gwlog.Logger,
cloud aws.Cloud,
finalizerManager k8s.FinalizerManager,
mgr ctrl.Manager,
) error {
mgrClient := mgr.GetClient()
scheme := mgr.GetScheme()
evtRec := mgr.GetEventRecorderFor("accesslogpolicy")

stackDeployer := deploy.NewServiceNetworkStackDeployer(log, cloud, mgrClient)
stackMarshaller := deploy.NewDefaultStackMarshaller()

r := &accessLogPolicyReconciler{
log: log,
client: mgrClient,
scheme: scheme,
finalizerManager: finalizerManager,
eventRecorder: evtRec,
stackDeployer: stackDeployer,
cloud: cloud,
stackMarshaller: stackMarshaller,
}

builder := ctrl.NewControllerManagedBy(mgr).
For(&anv1alpha1.AccessLogPolicy{})

return builder.Complete(r)
}

func (r *accessLogPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
r.log.Infow("reconcile", "name", req.Name)
recErr := r.reconcile(ctx, req)
res, retryErr := lattice_runtime.HandleReconcileError(recErr)
if res.RequeueAfter != 0 {
r.log.Infow("requeue request", "name", req.Name, "requeueAfter", res.RequeueAfter)
} else if res.Requeue {
r.log.Infow("requeue request", "name", req.Name)
} else if retryErr == nil {
r.log.Infow("reconciled", "name", req.Name)
}
return res, retryErr
}

func (r *accessLogPolicyReconciler) reconcile(ctx context.Context, req ctrl.Request) error {
alp := &anv1alpha1.AccessLogPolicy{}
if err := r.client.Get(ctx, req.NamespacedName, alp); err != nil {
return client.IgnoreNotFound(err)
}

if !alp.DeletionTimestamp.IsZero() {
return r.reconcileDelete(ctx, alp)
} else {
return r.reconcileUpsert(ctx, alp)
}
}

func (r *accessLogPolicyReconciler) reconcileDelete(ctx context.Context, alp *anv1alpha1.AccessLogPolicy) error {
scottlaiaws marked this conversation as resolved.
Show resolved Hide resolved
err := r.finalizerManager.RemoveFinalizers(ctx, alp, accessLogPolicyFinalizer)
if err != nil {
return err
}

return nil
}

func (r *accessLogPolicyReconciler) reconcileUpsert(ctx context.Context, alp *anv1alpha1.AccessLogPolicy) error {
if err := r.finalizerManager.AddFinalizers(ctx, alp, accessLogPolicyFinalizer); err != nil {
r.eventRecorder.Event(alp, corev1.EventTypeWarning,
k8s.AccessLogPolicyEventReasonFailedAddFinalizer, fmt.Sprintf("Failed to add finalizer due to %s", err))
return err
}

if !r.targetRefExists(ctx, alp) {
r.log.Infof("Could not find Acces Log Policy targetRef %s %s",
alp.Spec.TargetRef.Kind, alp.Spec.TargetRef.Name)
err := r.updateAccessLogPolicyStatus(ctx, alp, gwv1alpha2.PolicyReasonTargetNotFound)
if err != nil {
return fmt.Errorf("failed to update Access Log Policy status, %w", err)
}
return nil
}

// TODO: Create VPC Lattice Access Log Subscription

err := r.updateAccessLogPolicyStatus(ctx, alp, gwv1alpha2.PolicyReasonAccepted)
if err != nil {
return fmt.Errorf("failed to update Access Log Policy status, %w", err)
}

return nil
}

func (r *accessLogPolicyReconciler) targetRefExists(ctx context.Context, alp *anv1alpha1.AccessLogPolicy) bool {
targetRefNamespace := alp.Namespace
if alp.Spec.TargetRef.Namespace != nil {
targetRefNamespace = string(*alp.Spec.TargetRef.Namespace)
}

targetRefNamespacedName := types.NamespacedName{
Name: string(alp.Spec.TargetRef.Name),
Namespace: targetRefNamespace,
}

var err error

switch alp.Spec.TargetRef.Kind {
case "Gateway":
gateway := &gwv1beta1.Gateway{}
err = r.client.Get(ctx, targetRefNamespacedName, gateway)
case "HTTPRoute":
httpRoute := &gwv1beta1.HTTPRoute{}
err = r.client.Get(ctx, targetRefNamespacedName, httpRoute)
case "GRPCRoute":
grpcRoute := &gwv1alpha2.GRPCRoute{}
err = r.client.Get(ctx, targetRefNamespacedName, grpcRoute)
default:
r.log.Infof("Access Log Policy targetRef is for an unsupported Kind: %s", alp.Spec.TargetRef.Kind)
return false
}

return err == nil
}

func (r *accessLogPolicyReconciler) updateAccessLogPolicyStatus(
ctx context.Context,
alp *anv1alpha1.AccessLogPolicy,
reason gwv1alpha2.PolicyConditionReason,
) error {
status := metav1.ConditionTrue
if reason != gwv1alpha2.PolicyReasonAccepted {
status = metav1.ConditionFalse
}

alp.Status.Conditions = utils.GetNewConditions(alp.Status.Conditions, metav1.Condition{
Type: string(gwv1alpha2.PolicyConditionAccepted),
ObservedGeneration: alp.Generation,
Message: config.LatticeGatewayControllerName,
Status: status,
Reason: string(reason),
})

if err := r.client.Status().Update(ctx, alp); err != nil {
return fmt.Errorf("failed to set Access Log Policy Accepted status to %s and reason to %s, %w",
status, reason, err)
}

return nil
}
10 changes: 10 additions & 0 deletions examples/access-log-policy-example.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
apiVersion: application-networking.k8s.aws/v1alpha1
kind: AccessLogPolicy
metadata:
name: test-policy
spec:
targetRef:
group: gateway.networking.k8s.io
kind: Gateway
name: my-hotel
destinationArn: "arn:aws:s3:::my-bucket"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a general question on the feature. How are we planning on reporting back the actual log paths? From what I recall, logs are based on unique resource ids, not names. How will folks using the controller know which logs apply to which k8s resources?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 100% sure I follow, so please correct me if I'm going down the wrong path here. When a user creates a Gateway or Route, that resource is associated to 1 VPC Lattice Service Network or Service. When they create an AccessLogPolicy with a targetRef of a Gateway or Route, it's the equivalent of creating a VPC Lattice Access Log Subscription for the corresponding ServiceNetwork or Service.

So if I create Gateway my-hotel, which in VPC Lattice is Service Network sn-12345678901234567, and then create an AccessLogPolicy which refers to Gateway my-hotel, then it will create an AccessLogSubscription with resourceIdentifier sn-12345678901234567, and any access logs flowing through that Service Network will wind up in the log destination I specified in the AccessLogPolicy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. So the path in S3 would look like [1]:

[bucket]/[prefix]/AWSLogs/[accountId]/VpcLattice/AccessLogs/[region]/[YYYY/MM/DD]/[resource-id]/[accountId]_VpcLatticeAccessLogs_[region]_[resource-id]_YYYYMMDDTHHmmZ_[hash].json.gz

Let's assume I have a number of gateways I'm managing - how will I know the Lattice resource ID for each? For example, do we patch the gateway with the Lattice service network ID? I see it as a message here: https://github.com/aws/aws-application-networking-k8s/blob/main/controllers/gateway_controller.go#L279. Ideally, I would be able to find out the ID by doing something like kubectl get gateway.

Similarly, it may be difficult to map from a k8s route to a log line, or from a log line back to the k8s objects, since these log entries also use unique IDs rather than k8s-based names. Maybe for now just enabling the logs is enough, then customers can use the Lattice API out of band to determine the ID. Longer term we should look a improving this experience.

Copy link
Contributor

@zijun726911 zijun726911 Oct 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Patch the status(or annotation) of gateway and k8sService resource with lattice resource id and Patch AccessLogPolicy with als id seems a good approach to make mapping cleaner

Copy link
Member Author

@xWink xWink Oct 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, this would absolutely improve clarity for users. I'll look into making this an annotation for ALP (don't think status is the right place for this kind of data), and will add it as a followup once the Create workflow is done. We can also create an issue to add this behaviour to other resources, like Gateway and Route.

3 changes: 3 additions & 0 deletions pkg/k8s/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,7 @@ const (
ServiceImportEventReasonFailedAddFinalizer = "FailedAddFinalizer"
ServiceImportEventReasonFailedBuildModel = "FailedBuildModel"
ServiceImportEventReasonFailedDeployModel = "FailedDeployModel"

// AccessLogPolicy events
AccessLogPolicyEventReasonFailedAddFinalizer = "FailedAddFinalizer"
)