Skip to content

Commit

Permalink
Create NodePoolConfig for better NodePool constructor style
Browse files Browse the repository at this point in the history
Add maxIGSize to NodePool, change IG flags spelling
  • Loading branch information
panslava committed Jul 29, 2022
1 parent 2292c69 commit 50a8101
Show file tree
Hide file tree
Showing 11 changed files with 116 additions and 61 deletions.
1 change: 1 addition & 0 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func main() {
ASMConfigMapNamespace: flags.F.ASMConfigMapBasedConfigNamespace,
ASMConfigMapName: flags.F.ASMConfigMapBasedConfigCMName,
EndpointSlicesEnabled: flags.F.EnableEndpointSlices,
MaxIGSize: flags.F.MaxIGSize,
}
ctx := ingctx.NewControllerContext(kubeConfig, kubeClient, backendConfigClient, frontendConfigClient, svcNegClient, ingParamsClient, svcAttachmentClient, cloud, namer, kubeSystemUID, ctxConfig)
go app.RunHTTPServer(ctx.HealthCheck)
Expand Down
18 changes: 16 additions & 2 deletions pkg/backends/ig_linker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,14 @@ func TestLink(t *testing.T) {
fakeIGs := instances.NewEmptyFakeInstanceGroups()
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
fakeZL := &instances.FakeZoneLister{Zones: []string{defaultZone}}
fakeNodePool := instances.NewNodePool(fakeIGs, defaultNamer, &test.FakeRecorderSource{}, utils.GetBasePath(fakeGCE), fakeZL)
fakeNodePool := instances.NewNodePool(&instances.NodePoolConfig{
Cloud: fakeIGs,
Namer: defaultNamer,
Recorders: &test.FakeRecorderSource{},
BasePath: utils.GetBasePath(fakeGCE),
ZoneLister: fakeZL,
MaxIGSize: 1000,
})
linker := newTestIGLinker(fakeGCE, fakeNodePool)

sp := utils.ServicePort{NodePort: 8080, Protocol: annotations.ProtocolHTTP, BackendNamer: defaultNamer}
Expand Down Expand Up @@ -84,7 +91,14 @@ func TestLinkWithCreationModeError(t *testing.T) {
fakeIGs := instances.NewEmptyFakeInstanceGroups()
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
fakeZL := &instances.FakeZoneLister{Zones: []string{defaultZone}}
fakeNodePool := instances.NewNodePool(fakeIGs, defaultNamer, &test.FakeRecorderSource{}, utils.GetBasePath(fakeGCE), fakeZL)
fakeNodePool := instances.NewNodePool(&instances.NodePoolConfig{
Cloud: fakeIGs,
Namer: defaultNamer,
Recorders: &test.FakeRecorderSource{},
BasePath: utils.GetBasePath(fakeGCE),
ZoneLister: fakeZL,
MaxIGSize: 1000,
})
linker := newTestIGLinker(fakeGCE, fakeNodePool)

sp := utils.ServicePort{NodePort: 8080, Protocol: annotations.ProtocolHTTP, BackendNamer: defaultNamer}
Expand Down
9 changes: 8 additions & 1 deletion pkg/backends/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,14 @@ func newTestJig(fakeGCE *gce.Cloud) *Jig {

fakeIGs := instances.NewEmptyFakeInstanceGroups()
fakeZL := &instances.FakeZoneLister{Zones: []string{defaultZone}}
fakeInstancePool := instances.NewNodePool(fakeIGs, defaultNamer, &test.FakeRecorderSource{}, utils.GetBasePath(fakeGCE), fakeZL)
fakeInstancePool := instances.NewNodePool(&instances.NodePoolConfig{
Cloud: fakeIGs,
Namer: defaultNamer,
Recorders: &test.FakeRecorderSource{},
BasePath: utils.GetBasePath(fakeGCE),
ZoneLister: fakeZL,
MaxIGSize: 1000,
})

// Add standard hooks for mocking update calls. Each test can set a different update hook if it chooses to.
(fakeGCE.Compute().(*cloud.MockGCE)).MockAlphaBackendServices.UpdateHook = mock.UpdateAlphaBackendServiceHook
Expand Down
9 changes: 8 additions & 1 deletion pkg/backends/regional_ig_linker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,14 @@ func linkerTestClusterValues() gce.TestClusterValues {
func newTestRegionalIgLinker(fakeGCE *gce.Cloud, backendPool *Backends, l4Namer *namer.L4Namer) *RegionalInstanceGroupLinker {
fakeIGs := instances.NewEmptyFakeInstanceGroups()
fakeZL := &instances.FakeZoneLister{Zones: []string{uscentralzone}}
fakeInstancePool := instances.NewNodePool(fakeIGs, l4Namer, &test.FakeRecorderSource{}, utils.GetBasePath(fakeGCE), fakeZL)
fakeInstancePool := instances.NewNodePool(&instances.NodePoolConfig{
Cloud: fakeIGs,
Namer: l4Namer,
Recorders: &test.FakeRecorderSource{},
BasePath: utils.GetBasePath(fakeGCE),
ZoneLister: fakeZL,
MaxIGSize: 1000,
})

(fakeGCE.Compute().(*cloud.MockGCE)).MockRegionBackendServices.UpdateHook = mock.UpdateRegionBackendServiceHook

Expand Down
15 changes: 9 additions & 6 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ type ControllerContextConfig struct {
ASMConfigMapNamespace string
ASMConfigMapName string
EndpointSlicesEnabled bool
MaxIGSize int
}

// NewControllerContext returns a new shared set of informers.
Expand Down Expand Up @@ -206,12 +207,14 @@ func NewControllerContext(
context.UseEndpointSlices,
context.KubeClient,
)
context.InstancePool = instances.NewNodePool(context.Cloud,
context.ClusterNamer,
context,
utils.GetBasePath(context.Cloud),
context.Translator,
)
context.InstancePool = instances.NewNodePool(&instances.NodePoolConfig{
Cloud: context.Cloud,
Namer: context.ClusterNamer,
Recorders: context,
BasePath: utils.GetBasePath(context.Cloud),
ZoneLister: context.Translator,
MaxIGSize: config.MaxIGSize,
})

return context
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,14 @@ func newLoadBalancerController() *LoadBalancerController {
ctx := context.NewControllerContext(nil, kubeClient, backendConfigClient, nil, nil, nil, nil, fakeGCE, namer, "" /*kubeSystemUID*/, ctxConfig)
lbc := NewLoadBalancerController(ctx, stopCh)
// TODO(rramkumar): Fix this so we don't have to override with our fake
lbc.instancePool = instances.NewNodePool(instances.NewEmptyFakeInstanceGroups(), namer, &test.FakeRecorderSource{}, utils.GetBasePath(fakeGCE), fakeZL)
lbc.instancePool = instances.NewNodePool(&instances.NodePoolConfig{
Cloud: instances.NewEmptyFakeInstanceGroups(),
Namer: namer,
Recorders: &test.FakeRecorderSource{},
BasePath: utils.GetBasePath(fakeGCE),
ZoneLister: fakeZL,
MaxIGSize: 1000,
})
lbc.l7Pool = loadbalancers.NewLoadBalancerPool(fakeGCE, namer, events.RecorderProducerMock{}, namer_util.NewFrontendNamerFactory(namer, ""))

lbc.hasSynced = func() bool { return true }
Expand Down
8 changes: 4 additions & 4 deletions pkg/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ var (
EnableIngressGAFields bool
EnableTrafficScaling bool
EnableEndpointSlices bool
EnableMultipleIgs bool
EnablePinhole bool
MaxIgSize int
EnableMultipleIGs bool
MaxIGSize int
}{
GCERateLimitScale: 1.0,
}
Expand Down Expand Up @@ -249,8 +249,8 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5
flag.BoolVar(&F.EnableTrafficScaling, "enable-traffic-scaling", false, "Enable support for Service {max-rate-per-endpoint, capacity-scaler}")
flag.BoolVar(&F.EnableEndpointSlices, "enable-endpoint-slices", false, "Enable using Endpoint Slices API instead of Endpoints API")
flag.BoolVar(&F.EnablePinhole, "enable-pinhole", false, "Enable Pinhole firewall feature")
flag.BoolVar(&F.EnableMultipleIgs, "enable-multiple-igs", false, "Enable using unmanaged instance group management")
flag.IntVar(&F.MaxIgSize, "max-ig-size", 1000, "Max number of instances in Instance Group")
flag.BoolVar(&F.EnableMultipleIGs, "enable-multiple-igs", false, "Enable using multiple unmanaged instance groups")
flag.IntVar(&F.MaxIGSize, "max-ig-size", 1000, "Max number of instances in Instance Group")
flag.DurationVar(&F.MetricsExportInterval, "metrics-export-interval", 10*time.Minute, `Period for calculating and exporting metrics related to state of managed objects.`)
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/instances/fakes.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
"google.golang.org/api/compute/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/test"
"k8s.io/ingress-gce/pkg/utils"
)
Expand All @@ -37,9 +36,10 @@ func NewEmptyFakeInstanceGroups() *FakeInstanceGroups {
}

// NewFakeInstanceGroups creates a new FakeInstanceGroups.
func NewFakeInstanceGroups(zonesToIGsToInstances map[string]IGsToInstances) *FakeInstanceGroups {
func NewFakeInstanceGroups(zonesToIGsToInstances map[string]IGsToInstances, maxIGSize int) *FakeInstanceGroups {
return &FakeInstanceGroups{
zonesToIGsToInstances: zonesToIGsToInstances,
maxIGSize: maxIGSize,
}
}

Expand Down Expand Up @@ -68,6 +68,7 @@ type FakeInstanceGroups struct {
getResult *compute.InstanceGroup
calls []int
zonesToIGsToInstances map[string]IGsToInstances
maxIGSize int
}

// getInstanceGroup implements fake getting ig by name in zone
Expand Down Expand Up @@ -142,7 +143,7 @@ func (f *FakeInstanceGroups) AddInstancesToInstanceGroup(name, zone string, inst
newValue := sets.NewString(f.zonesToIGsToInstances[zone][ig].List()...)
newValue.Insert(instanceNames...)

if len(newValue) > flags.F.MaxIgSize {
if len(newValue) > f.maxIGSize {
return test.FakeGoogleAPIRequestEntityTooLargeError()
}

Expand Down
46 changes: 29 additions & 17 deletions pkg/instances/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"google.golang.org/api/compute/v1"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-gce/pkg/events"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/klog"

Expand All @@ -47,22 +46,33 @@ type Instances struct {
namer namer.BackendNamer
recorder record.EventRecorder
instanceLinkFormat string
maxIGSize int
}

type recorderSource interface {
Recorder(ns string) record.EventRecorder
}

// NewNodePool creates a new node pool.
// - cloud: implements InstanceGroups, used to sync Kubernetes nodes with
// members of the cloud InstanceGroup.
func NewNodePool(cloud InstanceGroups, namer namer.BackendNamer, recorders recorderSource, basePath string, zl ZoneLister) NodePool {
// NodePoolConfig is used for NodePool constructor.
type NodePoolConfig struct {
// Cloud implements InstanceGroups, used to sync Kubernetes nodes with members of the cloud InstanceGroup.
Cloud InstanceGroups
Namer namer.BackendNamer
Recorders recorderSource
BasePath string
ZoneLister ZoneLister
MaxIGSize int
}

// NewNodePool creates a new node pool using NodePoolConfig.
func NewNodePool(config *NodePoolConfig) NodePool {
return &Instances{
cloud: cloud,
namer: namer,
recorder: recorders.Recorder(""), // No namespace
instanceLinkFormat: basePath + "zones/%s/instances/%s",
ZoneLister: zl,
cloud: config.Cloud,
namer: config.Namer,
recorder: config.Recorders.Recorder(""), // No namespace
instanceLinkFormat: config.BasePath + "zones/%s/instances/%s",
ZoneLister: config.ZoneLister,
maxIGSize: config.MaxIGSize,
}
}

Expand Down Expand Up @@ -337,19 +347,21 @@ func (i *Instances) Sync(nodes []string) (err error) {

// Individual InstanceGroup has a limit for 1000 instances in it.
// As a result, it's not possible to add more to it.
if len(kubeNodes) > flags.F.MaxIgSize {
if len(kubeNodes) > i.maxIGSize {
// List() will return a sorted list so the kubeNodesList truncation will have a stable set of nodes.
kubeNodesList := kubeNodes.List()

// Store first 10 truncated nodes for logging
truncatedNodesSample := kubeNodesList[flags.F.MaxIgSize:]
maxTruncatedNodesSampleSize := 10
if len(truncatedNodesSample) > maxTruncatedNodesSampleSize {
truncatedNodesSample = truncatedNodesSample[:maxTruncatedNodesSampleSize]
truncateForLogs := func(nodes []string) []string {
maxLogsSampleSize := 10
if len(nodes) <= maxLogsSampleSize {
return nodes
}
return nodes[:maxLogsSampleSize]
}

klog.Warningf("Total number of kubeNodes: %d, truncating to maximum Instance Group size = %d. Instance group name: %s. First %d truncated instances: %v", len(kubeNodesList), flags.F.MaxIgSize, igName, len(truncatedNodesSample), truncatedNodesSample)
kubeNodes = sets.NewString(kubeNodesList[:flags.F.MaxIgSize]...)
klog.Warningf("Total number of kubeNodes: %d, truncating to maximum Instance Group size = %d. Instance group name: %s. First truncated instances: %v", len(kubeNodesList), i.maxIGSize, igName, truncateForLogs(nodes[i.maxIGSize:]))
kubeNodes = sets.NewString(kubeNodesList[:i.maxIGSize]...)
}

// A node deleted via kubernetes could still exist as a gce vm. We don't
Expand Down
44 changes: 27 additions & 17 deletions pkg/instances/instances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"google.golang.org/api/compute/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/test"
"k8s.io/ingress-gce/pkg/utils/namer"
)
Expand All @@ -35,17 +34,23 @@ const (

var defaultNamer = namer.NewNamer("uid1", "fw1")

func newNodePool(f *FakeInstanceGroups, zone string) NodePool {
fakeZL := &FakeZoneLister{[]string{zone}}
pool := NewNodePool(f, defaultNamer, &test.FakeRecorderSource{}, basePath, fakeZL)
func newNodePool(f *FakeInstanceGroups, zone string, maxIGSize int) NodePool {
pool := NewNodePool(&NodePoolConfig{
Cloud: f,
Namer: defaultNamer,
Recorders: &test.FakeRecorderSource{},
BasePath: basePath,
ZoneLister: &FakeZoneLister{[]string{zone}},
MaxIGSize: maxIGSize,
})
return pool
}

func TestNodePoolSync(t *testing.T) {
flags.F.MaxIgSize = 1000
maxIGSize := 1000

names1001 := make([]string, flags.F.MaxIgSize+1)
for i := 1; i <= flags.F.MaxIgSize+1; i++ {
names1001 := make([]string, maxIGSize+1)
for i := 1; i <= maxIGSize+1; i++ {
names1001[i-1] = fmt.Sprintf("n%d", i)
}

Expand Down Expand Up @@ -80,13 +85,14 @@ func TestNodePoolSync(t *testing.T) {
for _, testCase := range testCases {
// create fake gce node pool with existing gceNodes
ig := &compute.InstanceGroup{Name: defaultNamer.InstanceGroup()}
fakeGCEInstanceGroups := NewFakeInstanceGroups(map[string]IGsToInstances{
zonesToIGs := map[string]IGsToInstances{
defaultZone: {
ig: testCase.gceNodes,
},
})
}
fakeGCEInstanceGroups := NewFakeInstanceGroups(zonesToIGs, maxIGSize)

pool := newNodePool(fakeGCEInstanceGroups, defaultZone)
pool := newNodePool(fakeGCEInstanceGroups, defaultZone, maxIGSize)

igName := defaultNamer.InstanceGroup()
ports := []int64{80}
Expand Down Expand Up @@ -118,10 +124,10 @@ func TestNodePoolSync(t *testing.T) {
}

expectedInstancesSize := testCase.kubeNodes.Len()
if testCase.kubeNodes.Len() > flags.F.MaxIgSize {
if testCase.kubeNodes.Len() > maxIGSize {
// If kubeNodes bigger than maximum instance group size, resulted instances
// should be truncated to flags.F.MaxIgSize
expectedInstancesSize = flags.F.MaxIgSize
expectedInstancesSize = maxIGSize
}
if instances.Len() != expectedInstancesSize {
t.Errorf("instances.Len() = %d not equal expectedInstancesSize = %d", instances.Len(), expectedInstancesSize)
Expand All @@ -144,12 +150,14 @@ func TestNodePoolSync(t *testing.T) {
}

func TestSetNamedPorts(t *testing.T) {
fakeIGs := NewFakeInstanceGroups(map[string]IGsToInstances{
maxIGSize := 1000
zonesToIGs := map[string]IGsToInstances{
defaultZone: {
&compute.InstanceGroup{Name: "ig"}: sets.NewString("ig"),
},
})
pool := newNodePool(fakeIGs, defaultZone)
}
fakeIGs := NewFakeInstanceGroups(zonesToIGs, maxIGSize)
pool := newNodePool(fakeIGs, defaultZone, maxIGSize)

testCases := []struct {
activePorts []int64
Expand Down Expand Up @@ -198,11 +206,13 @@ func TestSetNamedPorts(t *testing.T) {
}

func TestGetInstanceReferences(t *testing.T) {
pool := newNodePool(NewFakeInstanceGroups(map[string]IGsToInstances{
maxIGSize := 1000
zonesToIGs := map[string]IGsToInstances{
defaultZone: {
&compute.InstanceGroup{Name: "ig"}: sets.NewString("ig"),
},
}), defaultZone)
}
pool := newNodePool(NewFakeInstanceGroups(zonesToIGs, maxIGSize), defaultZone, maxIGSize)
instances := pool.(*Instances)

nodeNames := []string{"node-1", "node-2", "node-3", "node-4.region.zone"}
Expand Down
Loading

0 comments on commit 50a8101

Please sign in to comment.