Skip to content

Commit

Permalink
Merge pull request #106 from bowei/sync-one-ingress
Browse files Browse the repository at this point in the history
Sync one ingress
  • Loading branch information
bowei authored Jan 29, 2018
2 parents 6bfdb93 + 4a831f3 commit 572e8e5
Show file tree
Hide file tree
Showing 24 changed files with 806 additions and 563 deletions.
2 changes: 1 addition & 1 deletion cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/controller"
neg "k8s.io/ingress-gce/pkg/networkendpointgroup"
neg "k8s.io/ingress-gce/pkg/neg"

"k8s.io/ingress-gce/cmd/glbc/app"
"k8s.io/ingress-gce/pkg/flags"
Expand Down
8 changes: 4 additions & 4 deletions pkg/backends/backends_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/healthchecks"
"k8s.io/ingress-gce/pkg/instances"
"k8s.io/ingress-gce/pkg/networkendpointgroup"
"k8s.io/ingress-gce/pkg/neg"
"k8s.io/ingress-gce/pkg/storage"
"k8s.io/ingress-gce/pkg/utils"
)
Expand All @@ -58,7 +58,7 @@ var (
)

func newTestJig(f BackendServices, fakeIGs instances.InstanceGroups, syncWithCloud bool) (*Backends, healthchecks.HealthCheckProvider) {
negGetter := networkendpointgroup.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network")
negGetter := neg.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network")
nodePool := instances.NewNodePool(fakeIGs, defaultNamer)
nodePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}})
healthCheckProvider := healthchecks.NewFakeHealthCheckProvider()
Expand Down Expand Up @@ -333,7 +333,7 @@ func TestBackendPoolSync(t *testing.T) {
func TestBackendPoolDeleteLegacyHealthChecks(t *testing.T) {
f := NewFakeBackendServices(noOpErrFunc)
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString(), defaultNamer)
negGetter := networkendpointgroup.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network")
negGetter := neg.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network")
nodePool := instances.NewNodePool(fakeIGs, defaultNamer)
nodePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}})
hcp := healthchecks.NewFakeHealthCheckProvider()
Expand Down Expand Up @@ -507,7 +507,7 @@ func TestLinkBackendServiceToNEG(t *testing.T) {
namespace, name, port := "ns", "name", "port"
f := NewFakeBackendServices(noOpErrFunc)
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString(), defaultNamer)
fakeNEG := networkendpointgroup.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network")
fakeNEG := neg.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network")
nodePool := instances.NewNodePool(fakeIGs, defaultNamer)
nodePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}})
hcp := healthchecks.NewFakeHealthCheckProvider()
Expand Down
63 changes: 58 additions & 5 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,88 @@ package context
import (
"time"

"github.com/golang/glog"

apiv1 "k8s.io/api/core/v1"
informerv1 "k8s.io/client-go/informers/core/v1"
informerv1beta1 "k8s.io/client-go/informers/extensions/v1beta1"
"k8s.io/client-go/kubernetes"
scheme "k8s.io/client-go/kubernetes/scheme"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
)

// ControllerContext holds
type ControllerContext struct {
kubeClient kubernetes.Interface

IngressInformer cache.SharedIndexInformer
ServiceInformer cache.SharedIndexInformer
PodInformer cache.SharedIndexInformer
NodeInformer cache.SharedIndexInformer
EndpointInformer cache.SharedIndexInformer

// Map of namespace => record.EventRecorder.
recorders map[string]record.EventRecorder
}

// NewControllerContext returns a new shared set of informers.
func NewControllerContext(kubeClient kubernetes.Interface, namespace string, resyncPeriod time.Duration, enableEndpointsInformer bool) *ControllerContext {
newIndexer := func() cache.Indexers {
return cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}
}
context := &ControllerContext{
IngressInformer: informerv1beta1.NewIngressInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
ServiceInformer: informerv1.NewServiceInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
PodInformer: informerv1.NewPodInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
NodeInformer: informerv1.NewNodeInformer(kubeClient, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
kubeClient: kubeClient,
IngressInformer: informerv1beta1.NewIngressInformer(kubeClient, namespace, resyncPeriod, newIndexer()),
ServiceInformer: informerv1.NewServiceInformer(kubeClient, namespace, resyncPeriod, newIndexer()),
PodInformer: informerv1.NewPodInformer(kubeClient, namespace, resyncPeriod, newIndexer()),
NodeInformer: informerv1.NewNodeInformer(kubeClient, resyncPeriod, newIndexer()),
recorders: map[string]record.EventRecorder{},
}
if enableEndpointsInformer {
context.EndpointInformer = informerv1.NewEndpointsInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
context.EndpointInformer = informerv1.NewEndpointsInformer(kubeClient, namespace, resyncPeriod, newIndexer())
}

return context
}

// HasSynced returns true if all relevant informers has been synced.
func (ctx *ControllerContext) HasSynced() bool {

funcs := []func() bool{
ctx.IngressInformer.HasSynced,
ctx.ServiceInformer.HasSynced,
ctx.PodInformer.HasSynced,
ctx.NodeInformer.HasSynced,
}
if ctx.EndpointInformer != nil {
funcs = append(funcs, ctx.EndpointInformer.HasSynced)
}
for _, f := range funcs {
if !f() {
return false
}
}
return true
}

func (ctx *ControllerContext) Recorder(ns string) record.EventRecorder {
if rec, ok := ctx.recorders[ns]; ok {
return rec
}

broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(glog.Infof)
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{
Interface: ctx.kubeClient.Core().Events(ns),
})
rec := broadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{Component: "loadbalancer-controller"})
ctx.recorders[ns] = rec

return rec
}

// Start all of the informers.
func (ctx *ControllerContext) Start(stopCh chan struct{}) {
go ctx.IngressInformer.Run(stopCh)
Expand Down
12 changes: 8 additions & 4 deletions pkg/controller/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ func (c *ClusterManager) shutdown() error {
// If in performing the checkpoint the cluster manager runs out of quota, a
// googleapi 403 is returned.
func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, backendServicePorts []backends.ServicePort, namedPorts []backends.ServicePort, firewallPorts []int64) ([]*compute.InstanceGroup, error) {
glog.V(4).Infof("Checkpoint %q, len(lbs)=%v, len(nodeNames)=%v, lne(backendServicePorts)=%v, len(namedPorts)=%v, len(firewallPorts)=%v", len(lbs), len(nodeNames), len(backendServicePorts), len(namedPorts), len(firewallPorts))

if len(namedPorts) != 0 {
// Add the default backend node port to the list of named ports for instance groups.
namedPorts = append(namedPorts, c.defaultBackendNodePort)
Expand Down Expand Up @@ -172,15 +174,17 @@ func (c *ClusterManager) GC(lbNames []string, nodePorts []backends.ServicePort)
}

// TODO(ingress#120): Move this to the backend pool so it mirrors creation
var igErr error
if len(lbNames) == 0 {
igName := c.ClusterNamer.InstanceGroup()
glog.Infof("Deleting instance group %v", igName)
igErr = c.instancePool.DeleteInstanceGroup(igName)
if err := c.instancePool.DeleteInstanceGroup(igName); err != err {
return err
}
}
if igErr != nil {
return igErr
if len(lbNames) == 0 {
c.firewallPool.Shutdown()
}

return nil
}

Expand Down
Loading

0 comments on commit 572e8e5

Please sign in to comment.