Skip to content

Commit

Permalink
Merge pull request kubernetes#2627 from TheBeeZee/release-1.30
Browse files Browse the repository at this point in the history
[Cherry-pick kubernetes#2575 -> 1.30] Forward individual ports for NetLB with 5 or less service ports
  • Loading branch information
k8s-ci-robot authored Aug 14, 2024
2 parents 7fe0e31 + 62e2fe7 commit d5825ac
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 168 deletions.
2 changes: 2 additions & 0 deletions pkg/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ var (
EnableMultiSubnetCluster bool
EnableWeightedL4ILB bool
EnableWeightedL4NetLB bool
EnableDiscretePortForwarding bool
}{
GCERateLimitScale: 1.0,
}
Expand Down Expand Up @@ -318,6 +319,7 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5
flag.BoolVar(&F.EnableWeightedL4NetLB, "enable-weighted-l4-netlb", false, "EnableWeighted Load balancing for L4 NetLB .")
flag.Float32Var(&F.KubeClientQPS, "kube-client-qps", 0.0, "The QPS that the controllers' kube client should adhere to through client side throttling. If zero, client will be created with default settings.")
flag.IntVar(&F.KubeClientBurst, "kube-client-burst", 0, "The burst QPS that the controllers' kube client should adhere to through client side throttling. If zero, client will be created with default settings.")
flag.BoolVar(&F.EnableDiscretePortForwarding, "enable-discrete-port-forwarding", false, "Enable forwarding of individual ports instead of port ranges.")
}

func Validate() {
Expand Down
26 changes: 23 additions & 3 deletions pkg/l4lb/l4netlbcontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"k8s.io/ingress-gce/pkg/backends"
"k8s.io/ingress-gce/pkg/composite"
ingctx "k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/healthchecksl4"
"k8s.io/ingress-gce/pkg/loadbalancers"
"k8s.io/ingress-gce/pkg/metrics"
Expand Down Expand Up @@ -134,7 +135,7 @@ func deleteNetLBService(lc *L4NetLBController, svc *v1.Service) {
lc.ctx.ServiceInformer.GetIndexer().Delete(svc)
}

func checkForwardingRule(lc *L4NetLBController, svc *v1.Service, expectedPortRange string) error {
func checkForwardingRule(lc *L4NetLBController, svc *v1.Service, expectedPortRange string, expectedPorts []string) error {
if len(svc.Spec.Ports) == 0 {
return fmt.Errorf("There are no ports in service!")
}
Expand All @@ -146,6 +147,9 @@ func checkForwardingRule(lc *L4NetLBController, svc *v1.Service, expectedPortRan
if fwdRule.PortRange != expectedPortRange {
return fmt.Errorf("Port Range Mismatch %v != %v", expectedPortRange, fwdRule.PortRange)
}
if !utils.EqualStringSets(fwdRule.Ports, expectedPorts) {
return fmt.Errorf("Port List Mismatch %v != %v", expectedPorts, fwdRule.Ports)
}
return nil
}

Expand Down Expand Up @@ -386,7 +390,7 @@ func TestProcessMultipleNetLBServices(t *testing.T) {
t.Errorf("%v", err)
}
expectedPortRange := fmt.Sprintf("%d-%d", svc.Spec.Ports[0].Port, svc.Spec.Ports[0].Port)
if err := checkForwardingRule(lc, svc, expectedPortRange); err != nil {
if err := checkForwardingRule(lc, svc, expectedPortRange, nil); err != nil {
t.Errorf("Check forwarding rule error: %v", err)
}
deleteNetLBService(lc, svc)
Expand All @@ -401,6 +405,8 @@ func TestForwardingRuleWithPortRange(t *testing.T) {
for _, tc := range []struct {
svcName string
ports []int32
discretePorts bool
expectedPorts []string
expectedPortRange string
}{
{
Expand All @@ -423,7 +429,21 @@ func TestForwardingRuleWithPortRange(t *testing.T) {
ports: []int32{8081, 80, 8080, 123},
expectedPortRange: "80-8081",
},
{
svcName: "DiscretePortsLessThanMax",
ports: []int32{8081, 80, 8080, 123},
discretePorts: true,
expectedPorts: []string{"80", "123", "8080", "8081"},
expectedPortRange: "",
},
{
svcName: "DiscretePortsMoreThanMax",
ports: []int32{8081, 80, 8080, 123, 666, 555},
discretePorts: true,
expectedPortRange: "80-8081",
},
} {
flags.F.EnableDiscretePortForwarding = tc.discretePorts
svc := test.NewL4NetLBRBSServiceMultiplePorts(tc.svcName, tc.ports)
svc.UID = types.UID(svc.Name + fmt.Sprintf("-%d", rand.Intn(1001)))
addNetLBService(lc, svc)
Expand All @@ -443,7 +463,7 @@ func TestForwardingRuleWithPortRange(t *testing.T) {
if err := checkBackendService(lc, svc); err != nil {
t.Errorf("Check backend service err: %v", err)
}
if err := checkForwardingRule(lc, newSvc, tc.expectedPortRange); err != nil {
if err := checkForwardingRule(lc, newSvc, tc.expectedPortRange, tc.expectedPorts); err != nil {
t.Errorf("Check forwarding rule error: %v", err)
}
deleteNetLBService(lc, svc)
Expand Down
43 changes: 34 additions & 9 deletions pkg/loadbalancers/forwarding_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ package loadbalancers

import (
"fmt"
"k8s.io/klog/v2"
"net/http"
"strings"
"time"

"k8s.io/klog/v2"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
"github.com/google/go-cmp/cmp"
Expand All @@ -34,14 +35,15 @@ import (
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/events"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/translator"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/namer"
)

const (
// maxL4ILBPorts is the maximum number of ports that can be specified in an L4 ILB Forwarding Rule
maxL4ILBPorts = 5
// maxForwardedPorts is the maximum number of ports that can be specified in an Forwarding Rule
maxForwardedPorts = 5
// addressAlreadyInUseMessageExternal is the error message string returned by the compute API
// when creating an external forwarding rule that uses a conflicting IP address.
addressAlreadyInUseMessageExternal = "Specified IP address is in-use and would result in a conflict."
Expand Down Expand Up @@ -247,7 +249,7 @@ func (l4 *L4) ensureIPv4ForwardingRule(bsLink string, options gce.ILBOptions, ex
AllowGlobalAccess: options.AllowGlobalAccess,
Description: frDesc,
}
if len(ports) > maxL4ILBPorts {
if len(ports) > maxForwardedPorts {
fr.Ports = nil
fr.AllPorts = true
}
Expand Down Expand Up @@ -349,8 +351,10 @@ func (l4netlb *L4NetLB) ensureIPv4ForwardingRule(bsLink string) (*composite.Forw
}()
}

portRange, protocol := utils.MinMaxPortRangeAndProtocol(l4netlb.Service.Spec.Ports)

svcPorts := l4netlb.Service.Spec.Ports
ports := utils.GetPorts(svcPorts)
portRange := utils.MinMaxPortRange(svcPorts)
protocol := utils.GetProtocol(svcPorts)
serviceKey := utils.ServiceKeyFunc(l4netlb.Service.Namespace, l4netlb.Service.Name)
frDesc, err := utils.MakeL4LBServiceDescription(serviceKey, ipToUse, version, false, utils.XLB)
if err != nil {
Expand All @@ -361,12 +365,16 @@ func (l4netlb *L4NetLB) ensureIPv4ForwardingRule(bsLink string) (*composite.Forw
Name: frName,
Description: frDesc,
IPAddress: ipToUse,
IPProtocol: protocol,
IPProtocol: string(protocol),
PortRange: portRange,
LoadBalancingScheme: string(cloud.SchemeExternal),
BackendService: bsLink,
NetworkTier: netTier.ToGCEValue(),
}
if len(ports) <= maxForwardedPorts && flags.F.EnableDiscretePortForwarding {
fr.Ports = ports
fr.PortRange = ""
}

if existingFwdRule != nil {
if existingFwdRule.NetworkTier != fr.NetworkTier {
Expand Down Expand Up @@ -433,8 +441,7 @@ func Equal(fr1, fr2 *composite.ForwardingRule) (bool, error) {
return fr1.IPAddress == fr2.IPAddress &&
fr1.IPProtocol == fr2.IPProtocol &&
fr1.LoadBalancingScheme == fr2.LoadBalancingScheme &&
utils.EqualStringSets(fr1.Ports, fr2.Ports) &&
fr1.PortRange == fr2.PortRange &&
equalPorts(fr1.Ports, fr2.Ports, fr1.PortRange, fr2.PortRange) &&
utils.EqualCloudResourceIDs(id1, id2) &&
fr1.AllowGlobalAccess == fr2.AllowGlobalAccess &&
fr1.AllPorts == fr2.AllPorts &&
Expand All @@ -443,6 +450,24 @@ func Equal(fr1, fr2 *composite.ForwardingRule) (bool, error) {
fr1.NetworkTier == fr2.NetworkTier, nil
}

// equalPorts compares two port ranges or slices of ports. Before comparison,
// slices of ports are converted into a port range from smallest to largest
// port. This is done so we don't unnecessarily recreate forwarding rules
// when upgrading from port ranges to distinct ports, because recreating
// forwarding rules is traffic impacting.
func equalPorts(ports1, ports2 []string, portRange1, portRange2 string) bool {
if !flags.F.EnableDiscretePortForwarding {
return utils.EqualStringSets(ports1, ports2) && portRange1 == portRange2
}
if len(ports1) != 0 && portRange1 == "" {
portRange1 = utils.MinMaxPortRange(ports1)
}
if len(ports2) != 0 && portRange2 == "" {
portRange2 = utils.MinMaxPortRange(ports2)
}
return portRange1 == portRange2
}

func equalResourcePaths(rp1, rp2 string) bool {
return rp1 == rp2 || utils.EqualResourceIDs(rp1, rp2)
}
Expand Down
15 changes: 11 additions & 4 deletions pkg/loadbalancers/forwarding_rules_ipv6.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/events"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -113,7 +114,7 @@ func (l4 *L4) buildExpectedIPv6ForwardingRule(bsLink string, options gce.ILBOpti
AllowGlobalAccess: options.AllowGlobalAccess,
NetworkTier: cloud.NetworkTierPremium.ToGCEValue(),
}
if len(ports) > maxL4ILBPorts {
if len(ports) > maxForwardedPorts {
fr.Ports = nil
fr.AllPorts = true
}
Expand Down Expand Up @@ -255,19 +256,25 @@ func (l4netlb *L4NetLB) buildExpectedIPv6ForwardingRule(bsLink, ipv6AddressToUse
}

svcPorts := l4netlb.Service.Spec.Ports
portRange, protocol := utils.MinMaxPortRangeAndProtocol(svcPorts)
ports := utils.GetPorts(svcPorts)
portRange := utils.MinMaxPortRange(svcPorts)
protocol := utils.GetProtocol(svcPorts)
fr := &composite.ForwardingRule{
Name: frName,
Description: frDesc,
IPAddress: ipv6AddressToUse,
IPProtocol: protocol,
IPProtocol: string(protocol),
PortRange: portRange,
LoadBalancingScheme: string(cloud.SchemeExternal),
BackendService: bsLink,
IpVersion: IPVersionIPv6,
NetworkTier: netTier.ToGCEValue(),
Subnetwork: subnetworkURL,
}
if len(ports) <= maxForwardedPorts && flags.F.EnableDiscretePortForwarding {
fr.Ports = utils.GetPorts(svcPorts)
fr.PortRange = ""
}

return fr, nil
}
Expand Down Expand Up @@ -296,7 +303,7 @@ func EqualIPv6ForwardingRules(fr1, fr2 *composite.ForwardingRule) (bool, error)
}
return fr1.IPProtocol == fr2.IPProtocol &&
fr1.LoadBalancingScheme == fr2.LoadBalancingScheme &&
utils.EqualStringSets(fr1.Ports, fr2.Ports) &&
equalPorts(fr1.Ports, fr2.Ports, fr1.PortRange, fr2.PortRange) &&
utils.EqualCloudResourceIDs(id1, id2) &&
fr1.AllowGlobalAccess == fr2.AllowGlobalAccess &&
fr1.AllPorts == fr2.AllPorts &&
Expand Down
Loading

0 comments on commit d5825ac

Please sign in to comment.