Skip to content

Commit

Permalink
Merge pull request #48 from freehan/neg
Browse files Browse the repository at this point in the history
K8s-NEG Integration
  • Loading branch information
bowei authored Oct 27, 2017
2 parents 6cad689 + 75a3563 commit b0603c6
Show file tree
Hide file tree
Showing 26 changed files with 3,363 additions and 109 deletions.
16 changes: 12 additions & 4 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ import (
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/controller"
"k8s.io/ingress-gce/pkg/loadbalancers"
neg "k8s.io/ingress-gce/pkg/networkendpointgroup"
"k8s.io/ingress-gce/pkg/storage"
"k8s.io/ingress-gce/pkg/utils"

"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
)
Expand Down Expand Up @@ -288,11 +290,10 @@ func main() {
// Create fake cluster manager
clusterManager = controller.NewFakeClusterManager(*clusterName, controller.DefaultFirewallName).ClusterManager
}

ctx := context.NewControllerContext(kubeClient, *watchNamespace, *resyncPeriod, false)

enableNEG := cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureNetworkEndpointGroup)
ctx := context.NewControllerContext(kubeClient, *watchNamespace, *resyncPeriod, enableNEG)
// Start loadbalancer controller
lbc, err := controller.NewLoadBalancerController(kubeClient, ctx, clusterManager)
lbc, err := controller.NewLoadBalancerController(kubeClient, ctx, clusterManager, enableNEG)
if err != nil {
glog.Fatalf("%v", err)
}
Expand All @@ -301,6 +302,13 @@ func main() {
glog.V(3).Infof("Cluster name %+v", clusterManager.ClusterNamer.GetClusterName())
}
clusterManager.Init(&controller.GCETranslator{LoadBalancerController: lbc})

// Start NEG controller
if enableNEG {
negController, _ := neg.NewController(kubeClient, cloud, ctx, lbc.Translator, namer, *resyncPeriod)
go negController.Run(ctx.StopCh)
}

go registerHandlers(lbc)
go handleSigterm(lbc, *deleteAllOnQuit)

Expand Down
113 changes: 102 additions & 11 deletions pkg/backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/golang/glog"

computealpha "google.golang.org/api/compute/v0.alpha"
compute "google.golang.org/api/compute/v1"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -76,6 +77,7 @@ const maxRPS = 1
// Backends implements BackendPool.
type Backends struct {
cloud BackendServices
negGetter NEGGetter
nodePool instances.NodePool
healthChecker healthchecks.HealthChecker
snapshotter storage.Snapshotter
Expand All @@ -93,10 +95,14 @@ func portKey(port int64) string {

// ServicePort for tupling port and protocol
type ServicePort struct {
Port int64
Protocol utils.AppProtocol
SvcName types.NamespacedName
SvcPort intstr.IntOrString
// Port is the service node port
// TODO: rename it to NodePort
Port int64
Protocol utils.AppProtocol
SvcName types.NamespacedName
SvcPort intstr.IntOrString
SvcTargetPort string
NEGEnabled bool
}

// Description returns a string describing the ServicePort.
Expand All @@ -116,6 +122,7 @@ func (sp ServicePort) Description() string {
// - resyncWithCloud: if true, periodically syncs with cloud resources.
func NewBackendPool(
cloud BackendServices,
negGetter NEGGetter,
healthChecker healthchecks.HealthChecker,
nodePool instances.NodePool,
namer *utils.Namer,
Expand All @@ -128,6 +135,7 @@ func NewBackendPool(
}
backendPool := &Backends{
cloud: cloud,
negGetter: negGetter,
nodePool: nodePool,
healthChecker: healthChecker,
namer: namer,
Expand Down Expand Up @@ -171,8 +179,7 @@ func (b *Backends) Get(port int64) (*compute.BackendService, error) {
}

func (b *Backends) ensureHealthCheck(sp ServicePort) (string, error) {
hc := b.healthChecker.New(sp.Port, sp.Protocol)

hc := b.healthChecker.New(sp.Port, sp.Protocol, sp.NEGEnabled)
existingLegacyHC, err := b.healthChecker.GetLegacy(sp.Port)
if err != nil && !utils.IsNotFoundError(err) {
return "", err
Expand Down Expand Up @@ -271,7 +278,15 @@ func (b *Backends) ensureBackendService(p ServicePort, igs []*compute.InstanceGr
if len(be.HealthChecks) == 1 {
existingHCLink = be.HealthChecks[0]
}
if be.Protocol != string(p.Protocol) || existingHCLink != hcLink || be.Description != p.Description() {

// Compare health check name instead of health check link.
// This is because health check link contains api version.
// For NEG, the api version for health check will be alpha.
// Hence, it will cause the health check links to be always different
// TODO (mixia): compare health check link directly once NEG is GA
existingHCName := retrieveObjectName(existingHCLink)
expectedHCName := retrieveObjectName(hcLink)
if be.Protocol != string(p.Protocol) || existingHCName != expectedHCName || be.Description != p.Description() {
glog.V(2).Infof("Updating backend protocol %v (%v) for change in protocol (%v) or health check", beName, be.Protocol, string(p.Protocol))
be.Protocol = string(p.Protocol)
be.HealthChecks = []string{hcLink}
Expand All @@ -293,6 +308,10 @@ func (b *Backends) ensureBackendService(p ServicePort, igs []*compute.InstanceGr
return nil
}

// If NEG is enabled, do not link backend service to instance groups.
if p.NEGEnabled {
return nil
}
// Verify that backend service contains links to all backends/instance-groups
return b.edgeHop(be, igs)
}
Expand Down Expand Up @@ -352,6 +371,19 @@ func getBackendsForIGs(igs []*compute.InstanceGroup, bm BalancingMode) []*comput
return backends
}

func getBackendsForNEGs(negs []*computealpha.NetworkEndpointGroup) []*computealpha.Backend {
var backends []*computealpha.Backend
for _, neg := range negs {
b := &computealpha.Backend{
Group: neg.SelfLink,
BalancingMode: string(Rate),
MaxRatePerEndpoint: maxRPS,
}
backends = append(backends, b)
}
return backends
}

// edgeHop checks the links of the given backend by executing an edge hop.
// It fixes broken links.
func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGroup) error {
Expand All @@ -369,7 +401,15 @@ func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGr
glog.V(2).Infof("Updating backend service %v with %d backends: expected igs %+v, current igs %+v",
be.Name, igLinks.Len(), igLinks.List(), beIGs.List())

originalBackends := be.Backends
originalIGBackends := []*compute.Backend{}
for _, backend := range be.Backends {
// Backend service is not able to point to NEG and IG at the same time.
// Filter IG backends here.
if strings.Contains(backend.Group, "instanceGroups") {
originalIGBackends = append(originalIGBackends, backend)
}
}

var addIGs []*compute.InstanceGroup
for _, ig := range igs {
if !beIGs.Has(ig.SelfLink) {
Expand All @@ -387,7 +427,7 @@ func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGr
for _, bm := range []BalancingMode{Rate, Utilization} {
// Generate backends with given instance groups with a specific mode
newBackends := getBackendsForIGs(addIGs, bm)
be.Backends = append(originalBackends, newBackends...)
be.Backends = append(originalIGBackends, newBackends...)

if err := b.cloud.UpdateGlobalBackendService(be); err != nil {
if utils.IsHTTPErrorCode(err, http.StatusBadRequest) {
Expand Down Expand Up @@ -457,6 +497,46 @@ func (b *Backends) Status(name string) string {
return hs.HealthStatus[0].HealthState
}

func (b *Backends) Link(port ServicePort, zones []string) error {
if !port.NEGEnabled {
return nil
}
negName := b.namer.NEGName(port.SvcName.Namespace, port.SvcName.Name, port.SvcTargetPort)
var negs []*computealpha.NetworkEndpointGroup
var err error
for _, zone := range zones {
neg, err := b.negGetter.GetNetworkEndpointGroup(negName, zone)
if err != nil {
return err
}
negs = append(negs, neg)
}

backendService, err := b.cloud.GetAlphaGlobalBackendService(b.namer.BeName(port.Port))
if err != nil {
return err
}

targetBackends := getBackendsForNEGs(negs)
oldBackends := sets.NewString()
newBackends := sets.NewString()

// WARNING: the backend link includes api version.
// API versions has to match, otherwise backend link will be always different.
for _, be := range backendService.Backends {
oldBackends.Insert(be.Group)
}
for _, be := range targetBackends {
newBackends.Insert(be.Group)
}

if !oldBackends.Equal(newBackends) {
backendService.Backends = targetBackends
return b.cloud.UpdateAlphaGlobalBackendService(backendService)
}
return nil
}

func applyLegacyHCToHC(existing *compute.HttpHealthCheck, hc *healthchecks.HealthCheck) {
hc.Description = existing.Description
hc.CheckIntervalSec = existing.CheckIntervalSec
Expand All @@ -482,10 +562,21 @@ func applyProbeSettingsToHC(p *v1.Probe, hc *healthchecks.HealthCheck) {
break
}
}

hc.RequestPath = healthPath
hc.Host = host
hc.Description = "Kubernetes L7 health check generated with readiness probe settings."
hc.CheckIntervalSec = int64(p.PeriodSeconds) + int64(healthchecks.DefaultHealthCheckInterval.Seconds())
hc.TimeoutSec = int64(p.TimeoutSeconds)
if hc.ForNEG {
// For NEG mode, we can support more aggresive healthcheck interval.
hc.CheckIntervalSec = int64(p.PeriodSeconds)
} else {
// For IG mode, short healthcheck interval may health check flooding problem.
hc.CheckIntervalSec = int64(p.PeriodSeconds) + int64(healthchecks.DefaultHealthCheckInterval.Seconds())
}
}

//retrieveObjectName takes a GCE object link and return the last part of the url as object name
func retrieveObjectName(url string) string {
splited := strings.Split(url, "/")
return splited[len(splited)-1]
}
Loading

0 comments on commit b0603c6

Please sign in to comment.