Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restructure NEG controller #489

Merged
merged 1 commit into from
Sep 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/backends/neg_linker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import (
computebeta "google.golang.org/api/compute/v0.beta"
"k8s.io/apimachinery/pkg/types"
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/neg"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/mock"
)

func newTestNEGLinker(fakeNEG neg.NetworkEndpointGroupCloud, fakeGCE *gce.GCECloud) *negLinker {
func newTestNEGLinker(fakeNEG negtypes.NetworkEndpointGroupCloud, fakeGCE *gce.GCECloud) *negLinker {
fakeBackendPool := NewPool(fakeGCE, defaultNamer, false)

// Add standard hooks for mocking update calls. Each test can set a update different hook if it chooses to.
Expand All @@ -40,7 +40,7 @@ func newTestNEGLinker(fakeNEG neg.NetworkEndpointGroupCloud, fakeGCE *gce.GCEClo

func TestLinkBackendServiceToNEG(t *testing.T) {
fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues())
fakeNEG := neg.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network")
fakeNEG := negtypes.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network")
linker := newTestNEGLinker(fakeNEG, fakeGCE)

zones := []GroupKey{{"zone1"}, {"zone2"}}
Expand Down
3 changes: 2 additions & 1 deletion pkg/neg/cloudprovideradapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
"strings"

computebeta "google.golang.org/api/compute/v0.beta"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/filter"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta"
)

// NewAdapter takes a GCECloud and returns a NetworkEndpointGroupCloud.
func NewAdapter(g *gce.GCECloud) NetworkEndpointGroupCloud {
func NewAdapter(g *gce.GCECloud) negtypes.NetworkEndpointGroupCloud {
return &cloudProviderAdapter{
c: g.Compute(),
networkURL: g.NetworkURL(),
Expand Down
27 changes: 12 additions & 15 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,25 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/neg/metrics"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/ingress-gce/pkg/utils"
)

func init() {
// register prometheus metrics
registerMetrics()
metrics.RegisterMetrics()
}

// Controller is network endpoint group controller.
// It determines whether NEG for a service port is needed, then signals negSyncerManager to sync it.
// It determines whether NEG for a service port is needed, then signals NegSyncerManager to sync it.
type Controller struct {
manager negSyncerManager
manager negtypes.NegSyncerManager
resyncPeriod time.Duration
gcPeriod time.Duration
recorder record.EventRecorder
namer networkEndpointGroupNamer
zoneGetter zoneGetter
namer negtypes.NetworkEndpointGroupNamer
zoneGetter negtypes.ZoneGetter

ingressSynced cache.InformerSynced
serviceSynced cache.InformerSynced
Expand All @@ -71,10 +72,10 @@ type Controller struct {

// NewController returns a network endpoint group controller.
func NewController(
cloud NetworkEndpointGroupCloud,
cloud negtypes.NetworkEndpointGroupCloud,
ctx *context.ControllerContext,
zoneGetter zoneGetter,
namer networkEndpointGroupNamer,
zoneGetter negtypes.ZoneGetter,
namer negtypes.NetworkEndpointGroupNamer,
resyncPeriod time.Duration,
gcPeriod time.Duration) *Controller {
// init event recorder
Expand Down Expand Up @@ -202,7 +203,7 @@ func (c *Controller) stop() {
func (c *Controller) processEndpoint(obj interface{}) {
defer func() {
now := c.syncTracker.Track()
lastSyncTimestamp.WithLabelValues().Set(float64(now.UTC().UnixNano()))
metrics.LastSyncTimestamp.WithLabelValues().Set(float64(now.UTC().UnixNano()))
}()

key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
Expand Down Expand Up @@ -235,7 +236,7 @@ func (c *Controller) serviceWorker() {
func (c *Controller) processService(key string) error {
defer func() {
now := c.syncTracker.Track()
lastSyncTimestamp.WithLabelValues().Set(float64(now.UTC().UnixNano()))
metrics.LastSyncTimestamp.WithLabelValues().Set(float64(now.UTC().UnixNano()))
}()

namespace, name, err := cache.SplitMetaNamespaceKey(key)
Expand Down Expand Up @@ -438,16 +439,12 @@ func gatherIngressServiceKeys(ing *extensions.Ingress) sets.String {
return set
}
utils.TraverseIngressBackends(ing, func(id utils.ServicePortID) bool {
set.Insert(serviceKeyFunc(id.Service.Namespace, id.Service.Name))
set.Insert(utils.ServiceKeyFunc(id.Service.Namespace, id.Service.Name))
return false
})
return set
}

func serviceKeyFunc(namespace, name string) string {
return fmt.Sprintf("%s/%s", namespace, name)
}

func getIngressServicesFromStore(store cache.Store, svc *apiv1.Service) (ings []extensions.Ingress) {
for _, m := range store.List() {
ing := *m.(*extensions.Ingress)
Expand Down
22 changes: 14 additions & 8 deletions pkg/neg/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
)

const (
testServiceNamespace = "test-ns"
testServiceName = "test-Name"
testNamedPort = "named-Port"
)

var (
defaultBackend = utils.ServicePortID{Service: types.NamespacedName{Name: "default-http-backend", Namespace: "kube-system"}, Port: intstr.FromString("http")}
)
Expand All @@ -55,9 +61,9 @@ func newTestController(kubeClient kubernetes.Interface) *Controller {
}
context := context.NewControllerContext(kubeClient, backendConfigClient, nil, namer, ctxConfig)
controller := NewController(
NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network"),
negtypes.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network"),
context,
NewFakeZoneGetter(),
negtypes.NewFakeZoneGetter(),
namer,
1*time.Second,
1*time.Second,
Expand Down Expand Up @@ -95,7 +101,7 @@ func TestNewNonNEGService(t *testing.T) {
defer controller.stop()
controller.serviceLister.Add(newTestService(controller, false, []int32{}))
controller.ingressLister.Add(newTestIngress())
err := controller.processService(serviceKeyFunc(testServiceNamespace, testServiceName))
err := controller.processService(utils.ServiceKeyFunc(testServiceNamespace, testServiceName))
if err != nil {
t.Fatalf("Failed to process service: %v", err)
}
Expand Down Expand Up @@ -154,7 +160,7 @@ func TestNewNEGService(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
controller := newTestController(fake.NewSimpleClientset())
defer controller.stop()
svcKey := serviceKeyFunc(testServiceNamespace, testServiceName)
svcKey := utils.ServiceKeyFunc(testServiceNamespace, testServiceName)
controller.serviceLister.Add(newTestService(controller, tc.ingress, tc.svcPorts))

if tc.ingress {
Expand Down Expand Up @@ -193,7 +199,7 @@ func TestEnableNEGServiceWithIngress(t *testing.T) {
controller.serviceLister.Add(newTestService(controller, false, []int32{}))
controller.ingressLister.Add(newTestIngress())
svcClient := controller.client.CoreV1().Services(testServiceNamespace)
svcKey := serviceKeyFunc(testServiceNamespace, testServiceName)
svcKey := utils.ServiceKeyFunc(testServiceNamespace, testServiceName)
err := controller.processService(svcKey)
if err != nil {
t.Fatalf("Failed to process service: %v", err)
Expand Down Expand Up @@ -225,14 +231,14 @@ func TestDisableNEGServiceWithIngress(t *testing.T) {
defer controller.stop()
controller.serviceLister.Add(newTestService(controller, true, []int32{}))
controller.ingressLister.Add(newTestIngress())
err := controller.processService(serviceKeyFunc(testServiceNamespace, testServiceName))
err := controller.processService(utils.ServiceKeyFunc(testServiceNamespace, testServiceName))
if err != nil {
t.Fatalf("Failed to process service: %v", err)
}
validateSyncers(t, controller, 3, false)

controller.serviceLister.Update(newTestService(controller, false, []int32{}))
err = controller.processService(serviceKeyFunc(testServiceNamespace, testServiceName))
err = controller.processService(utils.ServiceKeyFunc(testServiceNamespace, testServiceName))
if err != nil {
t.Fatalf("Failed to process service: %v", err)
}
Expand Down Expand Up @@ -436,7 +442,7 @@ func validateServiceStateAnnotation(t *testing.T, svc *apiv1.Service, svcPorts [
}
}

zoneGetter := NewFakeZoneGetter()
zoneGetter := negtypes.NewFakeZoneGetter()
zones, _ := zoneGetter.ListZones()
for _, zone := range zones {
if !strings.Contains(v, zone) {
Expand Down
47 changes: 24 additions & 23 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-gce/pkg/neg/types"
negsyncer "k8s.io/ingress-gce/pkg/neg/syncer"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
)

type serviceKey struct {
Expand All @@ -35,10 +36,10 @@ type serviceKey struct {

// syncerManager contains all the active syncer goroutines and manage their lifecycle.
type syncerManager struct {
namer networkEndpointGroupNamer
namer negtypes.NetworkEndpointGroupNamer
recorder record.EventRecorder
cloud NetworkEndpointGroupCloud
zoneGetter zoneGetter
cloud negtypes.NetworkEndpointGroupCloud
zoneGetter negtypes.ZoneGetter

serviceLister cache.Indexer
endpointLister cache.Indexer
Expand All @@ -48,33 +49,33 @@ type syncerManager struct {
// svcPortMap is the canonical indicator for whether a service needs NEG.
// key consists of service namespace and name. Value is a map of ServicePort
// Port:TargetPort, which represents ports that require NEG
svcPortMap map[serviceKey]types.PortNameMap
svcPortMap map[serviceKey]negtypes.PortNameMap
// syncerMap stores the NEG syncer
// key consists of service namespace, name and targetPort. Value is the corresponding syncer.
syncerMap map[servicePort]negSyncer
syncerMap map[negsyncer.NegSyncerKey]negtypes.NegSyncer
}

func newSyncerManager(namer networkEndpointGroupNamer, recorder record.EventRecorder, cloud NetworkEndpointGroupCloud, zoneGetter zoneGetter, serviceLister cache.Indexer, endpointLister cache.Indexer) *syncerManager {
func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, serviceLister cache.Indexer, endpointLister cache.Indexer) *syncerManager {
return &syncerManager{
namer: namer,
recorder: recorder,
cloud: cloud,
zoneGetter: zoneGetter,
serviceLister: serviceLister,
endpointLister: endpointLister,
svcPortMap: make(map[serviceKey]types.PortNameMap),
syncerMap: make(map[servicePort]negSyncer),
svcPortMap: make(map[serviceKey]negtypes.PortNameMap),
syncerMap: make(map[negsyncer.NegSyncerKey]negtypes.NegSyncer),
}
}

// EnsureSyncer starts and stops syncers based on the input service ports.
func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts types.PortNameMap) error {
func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts negtypes.PortNameMap) error {
manager.mu.Lock()
defer manager.mu.Unlock()
key := getServiceKey(namespace, name)
currentPorts, ok := manager.svcPortMap[key]
if !ok {
currentPorts = make(types.PortNameMap)
currentPorts = make(negtypes.PortNameMap)
}

removes := currentPorts.Difference(newPorts)
Expand All @@ -95,12 +96,12 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts typ
for svcPort, targetPort := range adds {
syncer, ok := manager.syncerMap[getSyncerKey(namespace, name, svcPort, targetPort)]
if !ok {
syncer = newSyncer(
servicePort{
namespace: namespace,
name: name,
port: svcPort,
targetPort: targetPort,
syncer = negsyncer.NewBatchSyncer(
negsyncer.NegSyncerKey{
Namespace: namespace,
Name: name,
Port: svcPort,
TargetPort: targetPort,
},
manager.namer.NEG(namespace, name, svcPort),
manager.recorder,
Expand Down Expand Up @@ -242,12 +243,12 @@ func (manager *syncerManager) ensureDeleteNetworkEndpointGroup(name, zone string
}

// getSyncerKey encodes a service namespace, name, service port and targetPort into a string key
func getSyncerKey(namespace, name string, port int32, targetPort string) servicePort {
return servicePort{
namespace: namespace,
name: name,
port: port,
targetPort: targetPort,
func getSyncerKey(namespace, name string, port int32, targetPort string) negsyncer.NegSyncerKey {
return negsyncer.NegSyncerKey{
Namespace: namespace,
Name: name,
Port: port,
TargetPort: targetPort,
}
}

Expand Down
Loading