Skip to content

Commit

Permalink
Merge pull request #1715 from panslava/add-max-ig-size-to-node-pool
Browse files Browse the repository at this point in the history
Add maxIGSize to NodePool constructor, instead of using global flag
  • Loading branch information
k8s-ci-robot authored Jul 29, 2022
2 parents 2292c69 + 50a8101 commit 704f60d
Show file tree
Hide file tree
Showing 11 changed files with 116 additions and 61 deletions.
1 change: 1 addition & 0 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func main() {
ASMConfigMapNamespace: flags.F.ASMConfigMapBasedConfigNamespace,
ASMConfigMapName: flags.F.ASMConfigMapBasedConfigCMName,
EndpointSlicesEnabled: flags.F.EnableEndpointSlices,
MaxIGSize: flags.F.MaxIGSize,
}
ctx := ingctx.NewControllerContext(kubeConfig, kubeClient, backendConfigClient, frontendConfigClient, svcNegClient, ingParamsClient, svcAttachmentClient, cloud, namer, kubeSystemUID, ctxConfig)
go app.RunHTTPServer(ctx.HealthCheck)
Expand Down
18 changes: 16 additions & 2 deletions pkg/backends/ig_linker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,14 @@ func TestLink(t *testing.T) {
fakeIGs := instances.NewEmptyFakeInstanceGroups()
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
fakeZL := &instances.FakeZoneLister{Zones: []string{defaultZone}}
fakeNodePool := instances.NewNodePool(fakeIGs, defaultNamer, &test.FakeRecorderSource{}, utils.GetBasePath(fakeGCE), fakeZL)
fakeNodePool := instances.NewNodePool(&instances.NodePoolConfig{
Cloud: fakeIGs,
Namer: defaultNamer,
Recorders: &test.FakeRecorderSource{},
BasePath: utils.GetBasePath(fakeGCE),
ZoneLister: fakeZL,
MaxIGSize: 1000,
})
linker := newTestIGLinker(fakeGCE, fakeNodePool)

sp := utils.ServicePort{NodePort: 8080, Protocol: annotations.ProtocolHTTP, BackendNamer: defaultNamer}
Expand Down Expand Up @@ -84,7 +91,14 @@ func TestLinkWithCreationModeError(t *testing.T) {
fakeIGs := instances.NewEmptyFakeInstanceGroups()
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
fakeZL := &instances.FakeZoneLister{Zones: []string{defaultZone}}
fakeNodePool := instances.NewNodePool(fakeIGs, defaultNamer, &test.FakeRecorderSource{}, utils.GetBasePath(fakeGCE), fakeZL)
fakeNodePool := instances.NewNodePool(&instances.NodePoolConfig{
Cloud: fakeIGs,
Namer: defaultNamer,
Recorders: &test.FakeRecorderSource{},
BasePath: utils.GetBasePath(fakeGCE),
ZoneLister: fakeZL,
MaxIGSize: 1000,
})
linker := newTestIGLinker(fakeGCE, fakeNodePool)

sp := utils.ServicePort{NodePort: 8080, Protocol: annotations.ProtocolHTTP, BackendNamer: defaultNamer}
Expand Down
9 changes: 8 additions & 1 deletion pkg/backends/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,14 @@ func newTestJig(fakeGCE *gce.Cloud) *Jig {

fakeIGs := instances.NewEmptyFakeInstanceGroups()
fakeZL := &instances.FakeZoneLister{Zones: []string{defaultZone}}
fakeInstancePool := instances.NewNodePool(fakeIGs, defaultNamer, &test.FakeRecorderSource{}, utils.GetBasePath(fakeGCE), fakeZL)
fakeInstancePool := instances.NewNodePool(&instances.NodePoolConfig{
Cloud: fakeIGs,
Namer: defaultNamer,
Recorders: &test.FakeRecorderSource{},
BasePath: utils.GetBasePath(fakeGCE),
ZoneLister: fakeZL,
MaxIGSize: 1000,
})

// Add standard hooks for mocking update calls. Each test can set a different update hook if it chooses to.
(fakeGCE.Compute().(*cloud.MockGCE)).MockAlphaBackendServices.UpdateHook = mock.UpdateAlphaBackendServiceHook
Expand Down
9 changes: 8 additions & 1 deletion pkg/backends/regional_ig_linker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,14 @@ func linkerTestClusterValues() gce.TestClusterValues {
func newTestRegionalIgLinker(fakeGCE *gce.Cloud, backendPool *Backends, l4Namer *namer.L4Namer) *RegionalInstanceGroupLinker {
fakeIGs := instances.NewEmptyFakeInstanceGroups()
fakeZL := &instances.FakeZoneLister{Zones: []string{uscentralzone}}
fakeInstancePool := instances.NewNodePool(fakeIGs, l4Namer, &test.FakeRecorderSource{}, utils.GetBasePath(fakeGCE), fakeZL)
fakeInstancePool := instances.NewNodePool(&instances.NodePoolConfig{
Cloud: fakeIGs,
Namer: l4Namer,
Recorders: &test.FakeRecorderSource{},
BasePath: utils.GetBasePath(fakeGCE),
ZoneLister: fakeZL,
MaxIGSize: 1000,
})

(fakeGCE.Compute().(*cloud.MockGCE)).MockRegionBackendServices.UpdateHook = mock.UpdateRegionBackendServiceHook

Expand Down
15 changes: 9 additions & 6 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ type ControllerContextConfig struct {
ASMConfigMapNamespace string
ASMConfigMapName string
EndpointSlicesEnabled bool
MaxIGSize int
}

// NewControllerContext returns a new shared set of informers.
Expand Down Expand Up @@ -206,12 +207,14 @@ func NewControllerContext(
context.UseEndpointSlices,
context.KubeClient,
)
context.InstancePool = instances.NewNodePool(context.Cloud,
context.ClusterNamer,
context,
utils.GetBasePath(context.Cloud),
context.Translator,
)
context.InstancePool = instances.NewNodePool(&instances.NodePoolConfig{
Cloud: context.Cloud,
Namer: context.ClusterNamer,
Recorders: context,
BasePath: utils.GetBasePath(context.Cloud),
ZoneLister: context.Translator,
MaxIGSize: config.MaxIGSize,
})

return context
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,14 @@ func newLoadBalancerController() *LoadBalancerController {
ctx := context.NewControllerContext(nil, kubeClient, backendConfigClient, nil, nil, nil, nil, fakeGCE, namer, "" /*kubeSystemUID*/, ctxConfig)
lbc := NewLoadBalancerController(ctx, stopCh)
// TODO(rramkumar): Fix this so we don't have to override with our fake
lbc.instancePool = instances.NewNodePool(instances.NewEmptyFakeInstanceGroups(), namer, &test.FakeRecorderSource{}, utils.GetBasePath(fakeGCE), fakeZL)
lbc.instancePool = instances.NewNodePool(&instances.NodePoolConfig{
Cloud: instances.NewEmptyFakeInstanceGroups(),
Namer: namer,
Recorders: &test.FakeRecorderSource{},
BasePath: utils.GetBasePath(fakeGCE),
ZoneLister: fakeZL,
MaxIGSize: 1000,
})
lbc.l7Pool = loadbalancers.NewLoadBalancerPool(fakeGCE, namer, events.RecorderProducerMock{}, namer_util.NewFrontendNamerFactory(namer, ""))

lbc.hasSynced = func() bool { return true }
Expand Down
8 changes: 4 additions & 4 deletions pkg/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ var (
EnableIngressGAFields bool
EnableTrafficScaling bool
EnableEndpointSlices bool
EnableMultipleIgs bool
EnablePinhole bool
MaxIgSize int
EnableMultipleIGs bool
MaxIGSize int
}{
GCERateLimitScale: 1.0,
}
Expand Down Expand Up @@ -249,8 +249,8 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5
flag.BoolVar(&F.EnableTrafficScaling, "enable-traffic-scaling", false, "Enable support for Service {max-rate-per-endpoint, capacity-scaler}")
flag.BoolVar(&F.EnableEndpointSlices, "enable-endpoint-slices", false, "Enable using Endpoint Slices API instead of Endpoints API")
flag.BoolVar(&F.EnablePinhole, "enable-pinhole", false, "Enable Pinhole firewall feature")
flag.BoolVar(&F.EnableMultipleIgs, "enable-multiple-igs", false, "Enable using unmanaged instance group management")
flag.IntVar(&F.MaxIgSize, "max-ig-size", 1000, "Max number of instances in Instance Group")
flag.BoolVar(&F.EnableMultipleIGs, "enable-multiple-igs", false, "Enable using multiple unmanaged instance groups")
flag.IntVar(&F.MaxIGSize, "max-ig-size", 1000, "Max number of instances in Instance Group")
flag.DurationVar(&F.MetricsExportInterval, "metrics-export-interval", 10*time.Minute, `Period for calculating and exporting metrics related to state of managed objects.`)
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/instances/fakes.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
"google.golang.org/api/compute/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/test"
"k8s.io/ingress-gce/pkg/utils"
)
Expand All @@ -37,9 +36,10 @@ func NewEmptyFakeInstanceGroups() *FakeInstanceGroups {
}

// NewFakeInstanceGroups creates a new FakeInstanceGroups.
func NewFakeInstanceGroups(zonesToIGsToInstances map[string]IGsToInstances) *FakeInstanceGroups {
func NewFakeInstanceGroups(zonesToIGsToInstances map[string]IGsToInstances, maxIGSize int) *FakeInstanceGroups {
return &FakeInstanceGroups{
zonesToIGsToInstances: zonesToIGsToInstances,
maxIGSize: maxIGSize,
}
}

Expand Down Expand Up @@ -68,6 +68,7 @@ type FakeInstanceGroups struct {
getResult *compute.InstanceGroup
calls []int
zonesToIGsToInstances map[string]IGsToInstances
maxIGSize int
}

// getInstanceGroup implements fake getting ig by name in zone
Expand Down Expand Up @@ -142,7 +143,7 @@ func (f *FakeInstanceGroups) AddInstancesToInstanceGroup(name, zone string, inst
newValue := sets.NewString(f.zonesToIGsToInstances[zone][ig].List()...)
newValue.Insert(instanceNames...)

if len(newValue) > flags.F.MaxIgSize {
if len(newValue) > f.maxIGSize {
return test.FakeGoogleAPIRequestEntityTooLargeError()
}

Expand Down
46 changes: 29 additions & 17 deletions pkg/instances/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"google.golang.org/api/compute/v1"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-gce/pkg/events"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/klog"

Expand All @@ -47,22 +46,33 @@ type Instances struct {
namer namer.BackendNamer
recorder record.EventRecorder
instanceLinkFormat string
maxIGSize int
}

type recorderSource interface {
Recorder(ns string) record.EventRecorder
}

// NewNodePool creates a new node pool.
// - cloud: implements InstanceGroups, used to sync Kubernetes nodes with
// members of the cloud InstanceGroup.
func NewNodePool(cloud InstanceGroups, namer namer.BackendNamer, recorders recorderSource, basePath string, zl ZoneLister) NodePool {
// NodePoolConfig is used for NodePool constructor.
type NodePoolConfig struct {
// Cloud implements InstanceGroups, used to sync Kubernetes nodes with members of the cloud InstanceGroup.
Cloud InstanceGroups
Namer namer.BackendNamer
Recorders recorderSource
BasePath string
ZoneLister ZoneLister
MaxIGSize int
}

// NewNodePool creates a new node pool using NodePoolConfig.
func NewNodePool(config *NodePoolConfig) NodePool {
return &Instances{
cloud: cloud,
namer: namer,
recorder: recorders.Recorder(""), // No namespace
instanceLinkFormat: basePath + "zones/%s/instances/%s",
ZoneLister: zl,
cloud: config.Cloud,
namer: config.Namer,
recorder: config.Recorders.Recorder(""), // No namespace
instanceLinkFormat: config.BasePath + "zones/%s/instances/%s",
ZoneLister: config.ZoneLister,
maxIGSize: config.MaxIGSize,
}
}

Expand Down Expand Up @@ -337,19 +347,21 @@ func (i *Instances) Sync(nodes []string) (err error) {

// Individual InstanceGroup has a limit for 1000 instances in it.
// As a result, it's not possible to add more to it.
if len(kubeNodes) > flags.F.MaxIgSize {
if len(kubeNodes) > i.maxIGSize {
// List() will return a sorted list so the kubeNodesList truncation will have a stable set of nodes.
kubeNodesList := kubeNodes.List()

// Store first 10 truncated nodes for logging
truncatedNodesSample := kubeNodesList[flags.F.MaxIgSize:]
maxTruncatedNodesSampleSize := 10
if len(truncatedNodesSample) > maxTruncatedNodesSampleSize {
truncatedNodesSample = truncatedNodesSample[:maxTruncatedNodesSampleSize]
truncateForLogs := func(nodes []string) []string {
maxLogsSampleSize := 10
if len(nodes) <= maxLogsSampleSize {
return nodes
}
return nodes[:maxLogsSampleSize]
}

klog.Warningf("Total number of kubeNodes: %d, truncating to maximum Instance Group size = %d. Instance group name: %s. First %d truncated instances: %v", len(kubeNodesList), flags.F.MaxIgSize, igName, len(truncatedNodesSample), truncatedNodesSample)
kubeNodes = sets.NewString(kubeNodesList[:flags.F.MaxIgSize]...)
klog.Warningf("Total number of kubeNodes: %d, truncating to maximum Instance Group size = %d. Instance group name: %s. First truncated instances: %v", len(kubeNodesList), i.maxIGSize, igName, truncateForLogs(nodes[i.maxIGSize:]))
kubeNodes = sets.NewString(kubeNodesList[:i.maxIGSize]...)
}

// A node deleted via kubernetes could still exist as a gce vm. We don't
Expand Down
44 changes: 27 additions & 17 deletions pkg/instances/instances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"google.golang.org/api/compute/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/test"
"k8s.io/ingress-gce/pkg/utils/namer"
)
Expand All @@ -35,17 +34,23 @@ const (

var defaultNamer = namer.NewNamer("uid1", "fw1")

func newNodePool(f *FakeInstanceGroups, zone string) NodePool {
fakeZL := &FakeZoneLister{[]string{zone}}
pool := NewNodePool(f, defaultNamer, &test.FakeRecorderSource{}, basePath, fakeZL)
func newNodePool(f *FakeInstanceGroups, zone string, maxIGSize int) NodePool {
pool := NewNodePool(&NodePoolConfig{
Cloud: f,
Namer: defaultNamer,
Recorders: &test.FakeRecorderSource{},
BasePath: basePath,
ZoneLister: &FakeZoneLister{[]string{zone}},
MaxIGSize: maxIGSize,
})
return pool
}

func TestNodePoolSync(t *testing.T) {
flags.F.MaxIgSize = 1000
maxIGSize := 1000

names1001 := make([]string, flags.F.MaxIgSize+1)
for i := 1; i <= flags.F.MaxIgSize+1; i++ {
names1001 := make([]string, maxIGSize+1)
for i := 1; i <= maxIGSize+1; i++ {
names1001[i-1] = fmt.Sprintf("n%d", i)
}

Expand Down Expand Up @@ -80,13 +85,14 @@ func TestNodePoolSync(t *testing.T) {
for _, testCase := range testCases {
// create fake gce node pool with existing gceNodes
ig := &compute.InstanceGroup{Name: defaultNamer.InstanceGroup()}
fakeGCEInstanceGroups := NewFakeInstanceGroups(map[string]IGsToInstances{
zonesToIGs := map[string]IGsToInstances{
defaultZone: {
ig: testCase.gceNodes,
},
})
}
fakeGCEInstanceGroups := NewFakeInstanceGroups(zonesToIGs, maxIGSize)

pool := newNodePool(fakeGCEInstanceGroups, defaultZone)
pool := newNodePool(fakeGCEInstanceGroups, defaultZone, maxIGSize)

igName := defaultNamer.InstanceGroup()
ports := []int64{80}
Expand Down Expand Up @@ -118,10 +124,10 @@ func TestNodePoolSync(t *testing.T) {
}

expectedInstancesSize := testCase.kubeNodes.Len()
if testCase.kubeNodes.Len() > flags.F.MaxIgSize {
if testCase.kubeNodes.Len() > maxIGSize {
// If kubeNodes bigger than maximum instance group size, resulted instances
// should be truncated to flags.F.MaxIgSize
expectedInstancesSize = flags.F.MaxIgSize
expectedInstancesSize = maxIGSize
}
if instances.Len() != expectedInstancesSize {
t.Errorf("instances.Len() = %d not equal expectedInstancesSize = %d", instances.Len(), expectedInstancesSize)
Expand All @@ -144,12 +150,14 @@ func TestNodePoolSync(t *testing.T) {
}

func TestSetNamedPorts(t *testing.T) {
fakeIGs := NewFakeInstanceGroups(map[string]IGsToInstances{
maxIGSize := 1000
zonesToIGs := map[string]IGsToInstances{
defaultZone: {
&compute.InstanceGroup{Name: "ig"}: sets.NewString("ig"),
},
})
pool := newNodePool(fakeIGs, defaultZone)
}
fakeIGs := NewFakeInstanceGroups(zonesToIGs, maxIGSize)
pool := newNodePool(fakeIGs, defaultZone, maxIGSize)

testCases := []struct {
activePorts []int64
Expand Down Expand Up @@ -198,11 +206,13 @@ func TestSetNamedPorts(t *testing.T) {
}

func TestGetInstanceReferences(t *testing.T) {
pool := newNodePool(NewFakeInstanceGroups(map[string]IGsToInstances{
maxIGSize := 1000
zonesToIGs := map[string]IGsToInstances{
defaultZone: {
&compute.InstanceGroup{Name: "ig"}: sets.NewString("ig"),
},
}), defaultZone)
}
pool := newNodePool(NewFakeInstanceGroups(zonesToIGs, maxIGSize), defaultZone, maxIGSize)
instances := pool.(*Instances)

nodeNames := []string{"node-1", "node-2", "node-3", "node-4.region.zone"}
Expand Down
Loading

0 comments on commit 704f60d

Please sign in to comment.