Skip to content

Commit

Permalink
catalog: adding a controller to reconcile FailoverPolicy resources
Browse files Browse the repository at this point in the history
  • Loading branch information
rboyer committed Aug 8, 2023
1 parent b035889 commit eb39989
Show file tree
Hide file tree
Showing 8 changed files with 887 additions and 0 deletions.
19 changes: 19 additions & 0 deletions internal/catalog/exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@ package catalog
import (
"github.com/hashicorp/consul/internal/catalog/internal/controllers"
"github.com/hashicorp/consul/internal/catalog/internal/controllers/endpoints"
"github.com/hashicorp/consul/internal/catalog/internal/controllers/failover"
"github.com/hashicorp/consul/internal/catalog/internal/controllers/nodehealth"
"github.com/hashicorp/consul/internal/catalog/internal/controllers/workloadhealth"
"github.com/hashicorp/consul/internal/catalog/internal/mappers/failovermapper"
"github.com/hashicorp/consul/internal/catalog/internal/mappers/nodemapper"
"github.com/hashicorp/consul/internal/catalog/internal/mappers/selectiontracker"
"github.com/hashicorp/consul/internal/catalog/internal/types"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/resource"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1"
"github.com/hashicorp/consul/proto-public/pbresource"
)

var (
Expand Down Expand Up @@ -73,6 +76,9 @@ var (
EndpointsStatusConditionEndpointsManaged = endpoints.StatusConditionEndpointsManaged
EndpointsStatusConditionManaged = endpoints.ConditionManaged
EndpointsStatusConditionUnmanaged = endpoints.ConditionUnmanaged

FailoverStatusKey = failover.StatusKey
FailoverStatusConditionOK = failover.ConditionOK
)

// RegisterTypes adds all resource types within the "catalog" API group
Expand All @@ -87,6 +93,7 @@ func DefaultControllerDependencies() ControllerDependencies {
return ControllerDependencies{
WorkloadHealthNodeMapper: nodemapper.New(),
EndpointsWorkloadMapper: selectiontracker.New(),
FailoverMapper: failovermapper.New(),
}
}

Expand All @@ -101,3 +108,15 @@ func RegisterControllers(mgr *controller.Manager, deps ControllerDependencies) {
func SimplifyFailoverPolicy(svc *pbcatalog.Service, failover *pbcatalog.FailoverPolicy) *pbcatalog.FailoverPolicy {
return types.SimplifyFailoverPolicy(svc, failover)
}

// FailoverPolicyMapper maintains the bidirectional tracking relationship of a
// FailoverPolicy to the Services related to it.
type FailoverPolicyMapper interface {
TrackFailover(failover *resource.DecodedResource[pbcatalog.FailoverPolicy, *pbcatalog.FailoverPolicy])
UntrackFailover(failoverID *pbresource.ID)
FailoverIDsByService(svcID *pbresource.ID) []*pbresource.ID
}

func NewFailoverPolicyMapper() FailoverPolicyMapper {
return failovermapper.New()
}
298 changes: 298 additions & 0 deletions internal/catalog/internal/controllers/failover/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,298 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package failover

import (
"context"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/hashicorp/consul/internal/catalog/internal/types"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/resource"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1"
"github.com/hashicorp/consul/proto-public/pbresource"
)

// FailoverMapper tracks the relationship between a FailoverPolicy an a Service
// it references whether due to name-alignment or from a reference in a
// FailoverDestination leg.
type FailoverMapper interface {
// TrackFailover extracts all Service references from the provided
// FailoverPolicy and indexes them so that MapService can turn Service
// events into FailoverPolicy events properly.
TrackFailover(failover *resource.DecodedResource[pbcatalog.FailoverPolicy, *pbcatalog.FailoverPolicy])

// UntrackFailover forgets the links inserted by TrackFailover for the
// provided FailoverPolicyID.
UntrackFailover(failoverID *pbresource.ID)

// MapService will take a Service resource and return controller requests
// for all FailoverPolicies associated with the Service.
MapService(ctx context.Context, rt controller.Runtime, res *pbresource.Resource) ([]controller.Request, error)
}

func FailoverPolicyController(mapper FailoverMapper) controller.Controller {
if mapper == nil {
panic("No FailoverMapper was provided to the FailoverPolicyController constructor")
}
return controller.ForType(types.FailoverPolicyType).
WithWatch(types.ServiceType, mapper.MapService).
WithReconciler(newFailoverPolicyReconciler(mapper))
}

type failoverPolicyReconciler struct {
mapper FailoverMapper
}

func newFailoverPolicyReconciler(mapper FailoverMapper) *failoverPolicyReconciler {
return &failoverPolicyReconciler{
mapper: mapper,
}
}

func (r *failoverPolicyReconciler) Reconcile(ctx context.Context, rt controller.Runtime, req controller.Request) error {
// The runtime is passed by value so replacing it here for the remainder of this
// reconciliation request processing will not affect future invocations.
rt.Logger = rt.Logger.With("resource-id", req.ID, "controller", StatusKey)

rt.Logger.Trace("reconciling failover policies")

failoverPolicyID := req.ID

failoverPolicy, err := getFailoverPolicy(ctx, rt, failoverPolicyID)
if err != nil {
rt.Logger.Error("error retrieving failover policy", "error", err)
return err
}
if failoverPolicy == nil {
r.mapper.UntrackFailover(failoverPolicyID)

// Either the failover policy was deleted, or it doesn't exist but an
// update to a Service came through and we can ignore it.
return nil
}

r.mapper.TrackFailover(failoverPolicy)

// FailoverPolicy is name-aligned with the Service it controls.
serviceID := &pbresource.ID{
Type: types.ServiceType,
Tenancy: failoverPolicyID.Tenancy,
Name: failoverPolicyID.Name,
}

service, err := getService(ctx, rt, serviceID)
if err != nil {
rt.Logger.Error("error retrieving corresponding service", "error", err)
return err
}
otherServices := make(map[resource.ReferenceKey]*resource.DecodedResource[pbcatalog.Service, *pbcatalog.Service])
if service != nil {
otherServices[resource.NewReferenceKey(serviceID)] = service
}

// Denorm the ports and stuff. After this we have no empty ports.
if service != nil {
failoverPolicy.Data = types.SimplifyFailoverPolicy(
service.Data,
failoverPolicy.Data,
)
}

// Fetch services.
for _, dest := range failoverPolicy.Data.GetUnderlyingDestinations() {
if dest.Ref == nil || !isServiceType(dest.Ref.Type) || dest.Ref.Section != "" {
continue // invalid, not possible due to validation hook
}

key := resource.NewReferenceKey(dest.Ref)

if _, ok := otherServices[key]; ok {
continue
}

destID := resource.IDFromReference(dest.Ref)

destService, err := getService(ctx, rt, destID)
if err != nil {
rt.Logger.Error("error retrieving destination service", "service", key, "error", err)
return err
}

if destService != nil {
otherServices[key] = destService
}
}

newStatus := computeNewStatus(failoverPolicy, service, otherServices)

if resource.EqualStatus(failoverPolicy.Resource.Status[StatusKey], newStatus, false) {
rt.Logger.Trace("resource's failover policy status is unchanged",
"conditions", newStatus.Conditions)
return nil
}

_, err = rt.Client.WriteStatus(ctx, &pbresource.WriteStatusRequest{
Id: failoverPolicy.Resource.Id,
Key: StatusKey,
Status: newStatus,
})

if err != nil {
rt.Logger.Error("error encountered when attempting to update the resource's failover policy status", "error", err)
return err
}

rt.Logger.Trace("resource's failover policy status was updated",
"conditions", newStatus.Conditions)
return nil
}

func getFailoverPolicy(ctx context.Context, rt controller.Runtime, id *pbresource.ID) (*resource.DecodedResource[pbcatalog.FailoverPolicy, *pbcatalog.FailoverPolicy], error) {
res, err := getResource(ctx, rt, id)
if err != nil {
return nil, err
}
if res == nil {
return nil, nil
}

return resource.Decode[pbcatalog.FailoverPolicy, *pbcatalog.FailoverPolicy](res)
}

func getService(ctx context.Context, rt controller.Runtime, id *pbresource.ID) (*resource.DecodedResource[pbcatalog.Service, *pbcatalog.Service], error) {
res, err := getResource(ctx, rt, id)
if err != nil {
return nil, err
}
if res == nil {
return nil, nil
}

return resource.Decode[pbcatalog.Service, *pbcatalog.Service](res)
}

func getResource(ctx context.Context, rt controller.Runtime, id *pbresource.ID) (*pbresource.Resource, error) {
rsp, err := rt.Client.Read(ctx, &pbresource.ReadRequest{Id: id})
switch {
case status.Code(err) == codes.NotFound:
return nil, nil
case err != nil:
return nil, err
}
return rsp.Resource, nil
}

func computeNewStatus(
failoverPolicy *resource.DecodedResource[pbcatalog.FailoverPolicy, *pbcatalog.FailoverPolicy],
service *resource.DecodedResource[pbcatalog.Service, *pbcatalog.Service],
otherServices map[resource.ReferenceKey]*resource.DecodedResource[pbcatalog.Service, *pbcatalog.Service],
) *pbresource.Status {
if service == nil {
return &pbresource.Status{
ObservedGeneration: failoverPolicy.Resource.Generation,
Conditions: []*pbresource.Condition{
ConditionMissingService,
},
}
}

allowedPortProtocols := make(map[string]pbcatalog.Protocol)
for _, port := range service.Data.Ports {
if port.Protocol == pbcatalog.Protocol_PROTOCOL_MESH {
continue // skip
}
allowedPortProtocols[port.TargetPort] = port.Protocol
}

serviceHasPort := func(dest *pbcatalog.FailoverDestination) *pbresource.Condition {
key := resource.NewReferenceKey(dest.Ref)
if destService, ok := otherServices[key]; ok {
found := false
for _, port := range destService.Data.Ports {
if port.Protocol == pbcatalog.Protocol_PROTOCOL_MESH {
continue // skip
}
if port.TargetPort == dest.Port {
found = true
break
}
}

if !found {
return ConditionUnknownDestinationPort(dest.Ref, dest.Port)
}
} else {
return ConditionMissingDestinationService(dest.Ref)
}
return nil
}

var conditions []*pbresource.Condition

if failoverPolicy.Data.Config != nil {
for _, dest := range failoverPolicy.Data.Config.Destinations {
// We know from validation that a Ref must be set, and the type it
// points to is a Service.
//
// Rather than do additional validation, just do a quick
// belt-and-suspenders check-and-skip if something looks weird.
if dest.Ref == nil || !isServiceType(dest.Ref.Type) {
continue
}

if cond := serviceHasPort(dest); cond != nil {
conditions = append(conditions, cond)
}
}
// TODO: validate that referenced sameness groups exist
}

for port, pc := range failoverPolicy.Data.PortConfigs {
if _, ok := allowedPortProtocols[port]; !ok {
conditions = append(conditions, ConditionUnknownPort(port))
}

for _, dest := range pc.Destinations {
// We know from validation that a Ref must be set, and the type it
// points to is a Service.
//
// Rather than do additional validation, just do a quick
// belt-and-suspenders check-and-skip if something looks weird.
if dest.Ref == nil || !isServiceType(dest.Ref.Type) {
continue
}

if cond := serviceHasPort(dest); cond != nil {
conditions = append(conditions, cond)
}
}

// TODO: validate that referenced sameness groups exist
}

if len(conditions) > 0 {
return &pbresource.Status{
ObservedGeneration: failoverPolicy.Resource.Generation,
Conditions: conditions,
}
}

return &pbresource.Status{
ObservedGeneration: failoverPolicy.Resource.Generation,
Conditions: []*pbresource.Condition{
ConditionOK,
},
}
}

func isServiceType(typ *pbresource.Type) bool {
switch {
case resource.EqualType(typ, types.ServiceType):
return true
}
return false
}
Loading

0 comments on commit eb39989

Please sign in to comment.