Skip to content

Commit

Permalink
Handle backend service and firewall for NEG
Browse files Browse the repository at this point in the history
  • Loading branch information
freehan committed Oct 26, 2017
1 parent eea130b commit e47e18a
Show file tree
Hide file tree
Showing 11 changed files with 477 additions and 59 deletions.
104 changes: 95 additions & 9 deletions pkg/backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/golang/glog"

computealpha "google.golang.org/api/compute/v0.alpha"
compute "google.golang.org/api/compute/v1"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -76,6 +77,7 @@ const maxRPS = 1
// Backends implements BackendPool.
type Backends struct {
cloud BackendServices
negGetter NEGGetter
nodePool instances.NodePool
healthChecker healthchecks.HealthChecker
snapshotter storage.Snapshotter
Expand All @@ -93,10 +95,14 @@ func portKey(port int64) string {

// ServicePort for tupling port and protocol
type ServicePort struct {
Port int64
Protocol utils.AppProtocol
SvcName types.NamespacedName
SvcPort intstr.IntOrString
// Port is the service node port
// TODO: rename it to NodePort
Port int64
Protocol utils.AppProtocol
SvcName types.NamespacedName
SvcPort intstr.IntOrString
SvcTargetPort string
NEGEnabled bool
}

// Description returns a string describing the ServicePort.
Expand All @@ -116,6 +122,7 @@ func (sp ServicePort) Description() string {
// - resyncWithCloud: if true, periodically syncs with cloud resources.
func NewBackendPool(
cloud BackendServices,
negGetter NEGGetter,
healthChecker healthchecks.HealthChecker,
nodePool instances.NodePool,
namer *utils.Namer,
Expand All @@ -128,6 +135,7 @@ func NewBackendPool(
}
backendPool := &Backends{
cloud: cloud,
negGetter: negGetter,
nodePool: nodePool,
healthChecker: healthChecker,
namer: namer,
Expand Down Expand Up @@ -171,8 +179,7 @@ func (b *Backends) Get(port int64) (*compute.BackendService, error) {
}

func (b *Backends) ensureHealthCheck(sp ServicePort) (string, error) {
hc := b.healthChecker.New(sp.Port, sp.Protocol, false)

hc := b.healthChecker.New(sp.Port, sp.Protocol, sp.NEGEnabled)
existingLegacyHC, err := b.healthChecker.GetLegacy(sp.Port)
if err != nil && !utils.IsNotFoundError(err) {
return "", err
Expand Down Expand Up @@ -271,7 +278,15 @@ func (b *Backends) ensureBackendService(p ServicePort, igs []*compute.InstanceGr
if len(be.HealthChecks) == 1 {
existingHCLink = be.HealthChecks[0]
}
if be.Protocol != string(p.Protocol) || existingHCLink != hcLink || be.Description != p.Description() {

// Compare health check name instead of health check link.
// This is because health check link contains api version.
// For NEG, the api version for health check will be alpha.
// Hence, it will cause the health check links to be always different
// TODO (mixia): compare health check link directly once NEG is GA
existingHCName := retrieveObjectName(existingHCLink)
expectedHCName := retrieveObjectName(hcLink)
if be.Protocol != string(p.Protocol) || existingHCName != expectedHCName || be.Description != p.Description() {
glog.V(2).Infof("Updating backend protocol %v (%v) for change in protocol (%v) or health check", beName, be.Protocol, string(p.Protocol))
be.Protocol = string(p.Protocol)
be.HealthChecks = []string{hcLink}
Expand All @@ -293,6 +308,10 @@ func (b *Backends) ensureBackendService(p ServicePort, igs []*compute.InstanceGr
return nil
}

// If NEG is enabled, do not link backend service to instance groups.
if p.NEGEnabled {
return nil
}
// Verify that backend service contains links to all backends/instance-groups
return b.edgeHop(be, igs)
}
Expand Down Expand Up @@ -352,6 +371,19 @@ func getBackendsForIGs(igs []*compute.InstanceGroup, bm BalancingMode) []*comput
return backends
}

func getBackendsForNEGs(negs []*computealpha.NetworkEndpointGroup) []*computealpha.Backend {
var backends []*computealpha.Backend
for _, neg := range negs {
b := &computealpha.Backend{
Group: neg.SelfLink,
BalancingMode: string(Rate),
MaxRatePerEndpoint: maxRPS,
}
backends = append(backends, b)
}
return backends
}

// edgeHop checks the links of the given backend by executing an edge hop.
// It fixes broken links.
func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGroup) error {
Expand All @@ -369,7 +401,15 @@ func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGr
glog.V(2).Infof("Updating backend service %v with %d backends: expected igs %+v, current igs %+v",
be.Name, igLinks.Len(), igLinks.List(), beIGs.List())

originalBackends := be.Backends
originalIGBackends := []*compute.Backend{}
for _, backend := range be.Backends {
// Backend service is not able to point to NEG and IG at the same time.
// Filter IG backends here.
if strings.Contains(backend.Group, "instanceGroups") {
originalIGBackends = append(originalIGBackends, backend)
}
}

var addIGs []*compute.InstanceGroup
for _, ig := range igs {
if !beIGs.Has(ig.SelfLink) {
Expand All @@ -387,7 +427,7 @@ func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGr
for _, bm := range []BalancingMode{Rate, Utilization} {
// Generate backends with given instance groups with a specific mode
newBackends := getBackendsForIGs(addIGs, bm)
be.Backends = append(originalBackends, newBackends...)
be.Backends = append(originalIGBackends, newBackends...)

if err := b.cloud.UpdateGlobalBackendService(be); err != nil {
if utils.IsHTTPErrorCode(err, http.StatusBadRequest) {
Expand Down Expand Up @@ -457,6 +497,46 @@ func (b *Backends) Status(name string) string {
return hs.HealthStatus[0].HealthState
}

func (b *Backends) Link(port ServicePort, zones []string) error {
if !port.NEGEnabled {
return nil
}
negName := b.namer.NEGName(port.SvcName.Namespace, port.SvcName.Name, port.SvcTargetPort)
var negs []*computealpha.NetworkEndpointGroup
var err error
for _, zone := range zones {
neg, err := b.negGetter.GetNetworkEndpointGroup(negName, zone)
if err != nil {
return err
}
negs = append(negs, neg)
}

backendService, err := b.cloud.GetAlphaGlobalBackendService(b.namer.BeName(port.Port))
if err != nil {
return err
}

targetBackends := getBackendsForNEGs(negs)
oldBackends := sets.NewString()
newBackends := sets.NewString()

// WARNING: the backend link includes api version.
// API versions has to match, otherwise backend link will be always different.
for _, be := range backendService.Backends {
oldBackends.Insert(be.Group)
}
for _, be := range targetBackends {
newBackends.Insert(be.Group)
}

if !oldBackends.Equal(newBackends) {
backendService.Backends = targetBackends
return b.cloud.UpdateAlphaGlobalBackendService(backendService)
}
return nil
}

func applyLegacyHCToHC(existing *compute.HttpHealthCheck, hc *healthchecks.HealthCheck) {
hc.Description = existing.Description
hc.CheckIntervalSec = existing.CheckIntervalSec
Expand Down Expand Up @@ -494,3 +574,9 @@ func applyProbeSettingsToHC(p *v1.Probe, hc *healthchecks.HealthCheck) {
hc.CheckIntervalSec = int64(p.PeriodSeconds) + int64(healthchecks.DefaultHealthCheckInterval.Seconds())
}
}

//retrieveObjectName takes a GCE object link and return the last part of the url as object name
func retrieveObjectName(url string) string {
splited := strings.Split(url, "/")
return splited[len(splited)-1]
}
Loading

0 comments on commit e47e18a

Please sign in to comment.