From 50a81012b8595fb0fffb72c1e65291476904c3cd Mon Sep 17 00:00:00 2001 From: Slavik Panasovets Date: Wed, 18 May 2022 10:18:36 +0000 Subject: [PATCH] Create NodePoolConfig for better NodePool constructor style Add maxIGSize to NodePool, change IG flags spelling --- cmd/glbc/main.go | 1 + pkg/backends/ig_linker_test.go | 18 ++++++++-- pkg/backends/integration_test.go | 9 ++++- pkg/backends/regional_ig_linker_test.go | 9 ++++- pkg/context/context.go | 15 ++++---- pkg/controller/controller_test.go | 9 ++++- pkg/flags/flags.go | 8 ++--- pkg/instances/fakes.go | 7 ++-- pkg/instances/instances.go | 46 ++++++++++++++++--------- pkg/instances/instances_test.go | 44 ++++++++++++++--------- pkg/l4lb/l4netlbcontroller_test.go | 11 ++---- 11 files changed, 116 insertions(+), 61 deletions(-) diff --git a/cmd/glbc/main.go b/cmd/glbc/main.go index 17af41ed1d..a23f1cff27 100644 --- a/cmd/glbc/main.go +++ b/cmd/glbc/main.go @@ -198,6 +198,7 @@ func main() { ASMConfigMapNamespace: flags.F.ASMConfigMapBasedConfigNamespace, ASMConfigMapName: flags.F.ASMConfigMapBasedConfigCMName, EndpointSlicesEnabled: flags.F.EnableEndpointSlices, + MaxIGSize: flags.F.MaxIGSize, } ctx := ingctx.NewControllerContext(kubeConfig, kubeClient, backendConfigClient, frontendConfigClient, svcNegClient, ingParamsClient, svcAttachmentClient, cloud, namer, kubeSystemUID, ctxConfig) go app.RunHTTPServer(ctx.HealthCheck) diff --git a/pkg/backends/ig_linker_test.go b/pkg/backends/ig_linker_test.go index d5119a49b1..4d9bc93cdc 100644 --- a/pkg/backends/ig_linker_test.go +++ b/pkg/backends/ig_linker_test.go @@ -53,7 +53,14 @@ func TestLink(t *testing.T) { fakeIGs := instances.NewEmptyFakeInstanceGroups() fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues()) fakeZL := &instances.FakeZoneLister{Zones: []string{defaultZone}} - fakeNodePool := instances.NewNodePool(fakeIGs, defaultNamer, &test.FakeRecorderSource{}, utils.GetBasePath(fakeGCE), fakeZL) + fakeNodePool := instances.NewNodePool(&instances.NodePoolConfig{ + Cloud: fakeIGs, + Namer: defaultNamer, + Recorders: &test.FakeRecorderSource{}, + BasePath: utils.GetBasePath(fakeGCE), + ZoneLister: fakeZL, + MaxIGSize: 1000, + }) linker := newTestIGLinker(fakeGCE, fakeNodePool) sp := utils.ServicePort{NodePort: 8080, Protocol: annotations.ProtocolHTTP, BackendNamer: defaultNamer} @@ -84,7 +91,14 @@ func TestLinkWithCreationModeError(t *testing.T) { fakeIGs := instances.NewEmptyFakeInstanceGroups() fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues()) fakeZL := &instances.FakeZoneLister{Zones: []string{defaultZone}} - fakeNodePool := instances.NewNodePool(fakeIGs, defaultNamer, &test.FakeRecorderSource{}, utils.GetBasePath(fakeGCE), fakeZL) + fakeNodePool := instances.NewNodePool(&instances.NodePoolConfig{ + Cloud: fakeIGs, + Namer: defaultNamer, + Recorders: &test.FakeRecorderSource{}, + BasePath: utils.GetBasePath(fakeGCE), + ZoneLister: fakeZL, + MaxIGSize: 1000, + }) linker := newTestIGLinker(fakeGCE, fakeNodePool) sp := utils.ServicePort{NodePort: 8080, Protocol: annotations.ProtocolHTTP, BackendNamer: defaultNamer} diff --git a/pkg/backends/integration_test.go b/pkg/backends/integration_test.go index 84da692860..bfa37bbc52 100644 --- a/pkg/backends/integration_test.go +++ b/pkg/backends/integration_test.go @@ -47,7 +47,14 @@ func newTestJig(fakeGCE *gce.Cloud) *Jig { fakeIGs := instances.NewEmptyFakeInstanceGroups() fakeZL := &instances.FakeZoneLister{Zones: []string{defaultZone}} - fakeInstancePool := instances.NewNodePool(fakeIGs, defaultNamer, &test.FakeRecorderSource{}, utils.GetBasePath(fakeGCE), fakeZL) + fakeInstancePool := instances.NewNodePool(&instances.NodePoolConfig{ + Cloud: fakeIGs, + Namer: defaultNamer, + Recorders: &test.FakeRecorderSource{}, + BasePath: utils.GetBasePath(fakeGCE), + ZoneLister: fakeZL, + MaxIGSize: 1000, + }) // Add standard hooks for mocking update calls. Each test can set a different update hook if it chooses to. (fakeGCE.Compute().(*cloud.MockGCE)).MockAlphaBackendServices.UpdateHook = mock.UpdateAlphaBackendServiceHook diff --git a/pkg/backends/regional_ig_linker_test.go b/pkg/backends/regional_ig_linker_test.go index 6466b7a313..420099b5d4 100644 --- a/pkg/backends/regional_ig_linker_test.go +++ b/pkg/backends/regional_ig_linker_test.go @@ -45,7 +45,14 @@ func linkerTestClusterValues() gce.TestClusterValues { func newTestRegionalIgLinker(fakeGCE *gce.Cloud, backendPool *Backends, l4Namer *namer.L4Namer) *RegionalInstanceGroupLinker { fakeIGs := instances.NewEmptyFakeInstanceGroups() fakeZL := &instances.FakeZoneLister{Zones: []string{uscentralzone}} - fakeInstancePool := instances.NewNodePool(fakeIGs, l4Namer, &test.FakeRecorderSource{}, utils.GetBasePath(fakeGCE), fakeZL) + fakeInstancePool := instances.NewNodePool(&instances.NodePoolConfig{ + Cloud: fakeIGs, + Namer: l4Namer, + Recorders: &test.FakeRecorderSource{}, + BasePath: utils.GetBasePath(fakeGCE), + ZoneLister: fakeZL, + MaxIGSize: 1000, + }) (fakeGCE.Compute().(*cloud.MockGCE)).MockRegionBackendServices.UpdateHook = mock.UpdateRegionBackendServiceHook diff --git a/pkg/context/context.go b/pkg/context/context.go index b59b56cb83..9a4d13c975 100644 --- a/pkg/context/context.go +++ b/pkg/context/context.go @@ -133,6 +133,7 @@ type ControllerContextConfig struct { ASMConfigMapNamespace string ASMConfigMapName string EndpointSlicesEnabled bool + MaxIGSize int } // NewControllerContext returns a new shared set of informers. @@ -206,12 +207,14 @@ func NewControllerContext( context.UseEndpointSlices, context.KubeClient, ) - context.InstancePool = instances.NewNodePool(context.Cloud, - context.ClusterNamer, - context, - utils.GetBasePath(context.Cloud), - context.Translator, - ) + context.InstancePool = instances.NewNodePool(&instances.NodePoolConfig{ + Cloud: context.Cloud, + Namer: context.ClusterNamer, + Recorders: context, + BasePath: utils.GetBasePath(context.Cloud), + ZoneLister: context.Translator, + MaxIGSize: config.MaxIGSize, + }) return context } diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 1817ea4e16..4c6aa45f74 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -75,7 +75,14 @@ func newLoadBalancerController() *LoadBalancerController { ctx := context.NewControllerContext(nil, kubeClient, backendConfigClient, nil, nil, nil, nil, fakeGCE, namer, "" /*kubeSystemUID*/, ctxConfig) lbc := NewLoadBalancerController(ctx, stopCh) // TODO(rramkumar): Fix this so we don't have to override with our fake - lbc.instancePool = instances.NewNodePool(instances.NewEmptyFakeInstanceGroups(), namer, &test.FakeRecorderSource{}, utils.GetBasePath(fakeGCE), fakeZL) + lbc.instancePool = instances.NewNodePool(&instances.NodePoolConfig{ + Cloud: instances.NewEmptyFakeInstanceGroups(), + Namer: namer, + Recorders: &test.FakeRecorderSource{}, + BasePath: utils.GetBasePath(fakeGCE), + ZoneLister: fakeZL, + MaxIGSize: 1000, + }) lbc.l7Pool = loadbalancers.NewLoadBalancerPool(fakeGCE, namer, events.RecorderProducerMock{}, namer_util.NewFrontendNamerFactory(namer, "")) lbc.hasSynced = func() bool { return true } diff --git a/pkg/flags/flags.go b/pkg/flags/flags.go index 777ce88c38..8edd9936dd 100644 --- a/pkg/flags/flags.go +++ b/pkg/flags/flags.go @@ -107,9 +107,9 @@ var ( EnableIngressGAFields bool EnableTrafficScaling bool EnableEndpointSlices bool - EnableMultipleIgs bool EnablePinhole bool - MaxIgSize int + EnableMultipleIGs bool + MaxIGSize int }{ GCERateLimitScale: 1.0, } @@ -249,8 +249,8 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5 flag.BoolVar(&F.EnableTrafficScaling, "enable-traffic-scaling", false, "Enable support for Service {max-rate-per-endpoint, capacity-scaler}") flag.BoolVar(&F.EnableEndpointSlices, "enable-endpoint-slices", false, "Enable using Endpoint Slices API instead of Endpoints API") flag.BoolVar(&F.EnablePinhole, "enable-pinhole", false, "Enable Pinhole firewall feature") - flag.BoolVar(&F.EnableMultipleIgs, "enable-multiple-igs", false, "Enable using unmanaged instance group management") - flag.IntVar(&F.MaxIgSize, "max-ig-size", 1000, "Max number of instances in Instance Group") + flag.BoolVar(&F.EnableMultipleIGs, "enable-multiple-igs", false, "Enable using multiple unmanaged instance groups") + flag.IntVar(&F.MaxIGSize, "max-ig-size", 1000, "Max number of instances in Instance Group") flag.DurationVar(&F.MetricsExportInterval, "metrics-export-interval", 10*time.Minute, `Period for calculating and exporting metrics related to state of managed objects.`) } diff --git a/pkg/instances/fakes.go b/pkg/instances/fakes.go index 3238a62ccf..1dfb0e598a 100644 --- a/pkg/instances/fakes.go +++ b/pkg/instances/fakes.go @@ -24,7 +24,6 @@ import ( "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" "google.golang.org/api/compute/v1" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/ingress-gce/pkg/flags" "k8s.io/ingress-gce/pkg/test" "k8s.io/ingress-gce/pkg/utils" ) @@ -37,9 +36,10 @@ func NewEmptyFakeInstanceGroups() *FakeInstanceGroups { } // NewFakeInstanceGroups creates a new FakeInstanceGroups. -func NewFakeInstanceGroups(zonesToIGsToInstances map[string]IGsToInstances) *FakeInstanceGroups { +func NewFakeInstanceGroups(zonesToIGsToInstances map[string]IGsToInstances, maxIGSize int) *FakeInstanceGroups { return &FakeInstanceGroups{ zonesToIGsToInstances: zonesToIGsToInstances, + maxIGSize: maxIGSize, } } @@ -68,6 +68,7 @@ type FakeInstanceGroups struct { getResult *compute.InstanceGroup calls []int zonesToIGsToInstances map[string]IGsToInstances + maxIGSize int } // getInstanceGroup implements fake getting ig by name in zone @@ -142,7 +143,7 @@ func (f *FakeInstanceGroups) AddInstancesToInstanceGroup(name, zone string, inst newValue := sets.NewString(f.zonesToIGsToInstances[zone][ig].List()...) newValue.Insert(instanceNames...) - if len(newValue) > flags.F.MaxIgSize { + if len(newValue) > f.maxIGSize { return test.FakeGoogleAPIRequestEntityTooLargeError() } diff --git a/pkg/instances/instances.go b/pkg/instances/instances.go index efaeb99f84..6453dac7c8 100644 --- a/pkg/instances/instances.go +++ b/pkg/instances/instances.go @@ -25,7 +25,6 @@ import ( "google.golang.org/api/compute/v1" "k8s.io/client-go/tools/record" "k8s.io/ingress-gce/pkg/events" - "k8s.io/ingress-gce/pkg/flags" "k8s.io/ingress-gce/pkg/utils/namer" "k8s.io/klog" @@ -47,22 +46,33 @@ type Instances struct { namer namer.BackendNamer recorder record.EventRecorder instanceLinkFormat string + maxIGSize int } type recorderSource interface { Recorder(ns string) record.EventRecorder } -// NewNodePool creates a new node pool. -// - cloud: implements InstanceGroups, used to sync Kubernetes nodes with -// members of the cloud InstanceGroup. -func NewNodePool(cloud InstanceGroups, namer namer.BackendNamer, recorders recorderSource, basePath string, zl ZoneLister) NodePool { +// NodePoolConfig is used for NodePool constructor. +type NodePoolConfig struct { + // Cloud implements InstanceGroups, used to sync Kubernetes nodes with members of the cloud InstanceGroup. + Cloud InstanceGroups + Namer namer.BackendNamer + Recorders recorderSource + BasePath string + ZoneLister ZoneLister + MaxIGSize int +} + +// NewNodePool creates a new node pool using NodePoolConfig. +func NewNodePool(config *NodePoolConfig) NodePool { return &Instances{ - cloud: cloud, - namer: namer, - recorder: recorders.Recorder(""), // No namespace - instanceLinkFormat: basePath + "zones/%s/instances/%s", - ZoneLister: zl, + cloud: config.Cloud, + namer: config.Namer, + recorder: config.Recorders.Recorder(""), // No namespace + instanceLinkFormat: config.BasePath + "zones/%s/instances/%s", + ZoneLister: config.ZoneLister, + maxIGSize: config.MaxIGSize, } } @@ -337,19 +347,21 @@ func (i *Instances) Sync(nodes []string) (err error) { // Individual InstanceGroup has a limit for 1000 instances in it. // As a result, it's not possible to add more to it. - if len(kubeNodes) > flags.F.MaxIgSize { + if len(kubeNodes) > i.maxIGSize { // List() will return a sorted list so the kubeNodesList truncation will have a stable set of nodes. kubeNodesList := kubeNodes.List() // Store first 10 truncated nodes for logging - truncatedNodesSample := kubeNodesList[flags.F.MaxIgSize:] - maxTruncatedNodesSampleSize := 10 - if len(truncatedNodesSample) > maxTruncatedNodesSampleSize { - truncatedNodesSample = truncatedNodesSample[:maxTruncatedNodesSampleSize] + truncateForLogs := func(nodes []string) []string { + maxLogsSampleSize := 10 + if len(nodes) <= maxLogsSampleSize { + return nodes + } + return nodes[:maxLogsSampleSize] } - klog.Warningf("Total number of kubeNodes: %d, truncating to maximum Instance Group size = %d. Instance group name: %s. First %d truncated instances: %v", len(kubeNodesList), flags.F.MaxIgSize, igName, len(truncatedNodesSample), truncatedNodesSample) - kubeNodes = sets.NewString(kubeNodesList[:flags.F.MaxIgSize]...) + klog.Warningf("Total number of kubeNodes: %d, truncating to maximum Instance Group size = %d. Instance group name: %s. First truncated instances: %v", len(kubeNodesList), i.maxIGSize, igName, truncateForLogs(nodes[i.maxIGSize:])) + kubeNodes = sets.NewString(kubeNodesList[:i.maxIGSize]...) } // A node deleted via kubernetes could still exist as a gce vm. We don't diff --git a/pkg/instances/instances_test.go b/pkg/instances/instances_test.go index 1870ba69f7..10493dff8a 100644 --- a/pkg/instances/instances_test.go +++ b/pkg/instances/instances_test.go @@ -23,7 +23,6 @@ import ( "google.golang.org/api/compute/v1" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/ingress-gce/pkg/flags" "k8s.io/ingress-gce/pkg/test" "k8s.io/ingress-gce/pkg/utils/namer" ) @@ -35,17 +34,23 @@ const ( var defaultNamer = namer.NewNamer("uid1", "fw1") -func newNodePool(f *FakeInstanceGroups, zone string) NodePool { - fakeZL := &FakeZoneLister{[]string{zone}} - pool := NewNodePool(f, defaultNamer, &test.FakeRecorderSource{}, basePath, fakeZL) +func newNodePool(f *FakeInstanceGroups, zone string, maxIGSize int) NodePool { + pool := NewNodePool(&NodePoolConfig{ + Cloud: f, + Namer: defaultNamer, + Recorders: &test.FakeRecorderSource{}, + BasePath: basePath, + ZoneLister: &FakeZoneLister{[]string{zone}}, + MaxIGSize: maxIGSize, + }) return pool } func TestNodePoolSync(t *testing.T) { - flags.F.MaxIgSize = 1000 + maxIGSize := 1000 - names1001 := make([]string, flags.F.MaxIgSize+1) - for i := 1; i <= flags.F.MaxIgSize+1; i++ { + names1001 := make([]string, maxIGSize+1) + for i := 1; i <= maxIGSize+1; i++ { names1001[i-1] = fmt.Sprintf("n%d", i) } @@ -80,13 +85,14 @@ func TestNodePoolSync(t *testing.T) { for _, testCase := range testCases { // create fake gce node pool with existing gceNodes ig := &compute.InstanceGroup{Name: defaultNamer.InstanceGroup()} - fakeGCEInstanceGroups := NewFakeInstanceGroups(map[string]IGsToInstances{ + zonesToIGs := map[string]IGsToInstances{ defaultZone: { ig: testCase.gceNodes, }, - }) + } + fakeGCEInstanceGroups := NewFakeInstanceGroups(zonesToIGs, maxIGSize) - pool := newNodePool(fakeGCEInstanceGroups, defaultZone) + pool := newNodePool(fakeGCEInstanceGroups, defaultZone, maxIGSize) igName := defaultNamer.InstanceGroup() ports := []int64{80} @@ -118,10 +124,10 @@ func TestNodePoolSync(t *testing.T) { } expectedInstancesSize := testCase.kubeNodes.Len() - if testCase.kubeNodes.Len() > flags.F.MaxIgSize { + if testCase.kubeNodes.Len() > maxIGSize { // If kubeNodes bigger than maximum instance group size, resulted instances // should be truncated to flags.F.MaxIgSize - expectedInstancesSize = flags.F.MaxIgSize + expectedInstancesSize = maxIGSize } if instances.Len() != expectedInstancesSize { t.Errorf("instances.Len() = %d not equal expectedInstancesSize = %d", instances.Len(), expectedInstancesSize) @@ -144,12 +150,14 @@ func TestNodePoolSync(t *testing.T) { } func TestSetNamedPorts(t *testing.T) { - fakeIGs := NewFakeInstanceGroups(map[string]IGsToInstances{ + maxIGSize := 1000 + zonesToIGs := map[string]IGsToInstances{ defaultZone: { &compute.InstanceGroup{Name: "ig"}: sets.NewString("ig"), }, - }) - pool := newNodePool(fakeIGs, defaultZone) + } + fakeIGs := NewFakeInstanceGroups(zonesToIGs, maxIGSize) + pool := newNodePool(fakeIGs, defaultZone, maxIGSize) testCases := []struct { activePorts []int64 @@ -198,11 +206,13 @@ func TestSetNamedPorts(t *testing.T) { } func TestGetInstanceReferences(t *testing.T) { - pool := newNodePool(NewFakeInstanceGroups(map[string]IGsToInstances{ + maxIGSize := 1000 + zonesToIGs := map[string]IGsToInstances{ defaultZone: { &compute.InstanceGroup{Name: "ig"}: sets.NewString("ig"), }, - }), defaultZone) + } + pool := newNodePool(NewFakeInstanceGroups(zonesToIGs, maxIGSize), defaultZone, maxIGSize) instances := pool.(*Instances) nodeNames := []string{"node-1", "node-2", "node-3", "node-4.region.zone"} diff --git a/pkg/l4lb/l4netlbcontroller_test.go b/pkg/l4lb/l4netlbcontroller_test.go index b84c9d2dbb..1d7f9d547f 100644 --- a/pkg/l4lb/l4netlbcontroller_test.go +++ b/pkg/l4lb/l4netlbcontroller_test.go @@ -27,13 +27,11 @@ import ( "testing" "time" - "google.golang.org/api/googleapi" - "k8s.io/ingress-gce/pkg/flags" - "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/mock" ga "google.golang.org/api/compute/v1" + "google.golang.org/api/googleapi" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -226,6 +224,7 @@ func buildContext(vals gce.TestClusterValues) *ingctx.ControllerContext { Namespace: v1.NamespaceAll, ResyncPeriod: 1 * time.Minute, NumL4Workers: 5, + MaxIGSize: 1000, } return ingctx.NewControllerContext(nil, kubeClient, nil, nil, nil, nil, nil, fakeGCE, namer, "" /*kubeSystemUID*/, ctxConfig) } @@ -581,8 +580,6 @@ func TestInternalLoadBalancerShouldNotBeProcessByL4NetLBController(t *testing.T) } func TestProcessServiceCreationFailed(t *testing.T) { - flags.F.MaxIgSize = 1000 - for _, param := range []struct { addMockFunc func(*cloud.MockGCE) expectedError string @@ -709,8 +706,6 @@ func TestServiceStatusForSuccessSync(t *testing.T) { } func TestProcessServiceUpdate(t *testing.T) { - flags.F.MaxIgSize = 1000 - for _, param := range []struct { Update func(*v1.Service) CheckResult func(*L4NetLBController, *v1.Service) error @@ -1126,8 +1121,6 @@ func TestShouldProcessService(t *testing.T) { } func TestPreventTargetPoolToRBSMigration(t *testing.T) { - flags.F.MaxIgSize = 1000 - testCases := []struct { desc string frHook getForwardingRuleHook