Skip to content

Commit

Permalink
make lease label and lease namesapce configurable
Browse files Browse the repository at this point in the history
Signed-off-by: Imran Pochi <[email protected]>
  • Loading branch information
ipochi committed Nov 7, 2024
1 parent e4d5a2d commit ea0a0d3
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 6 deletions.
18 changes: 18 additions & 0 deletions cmd/agent/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ type GrpcProxyAgentOptions struct {
// Enables updating the server count by counting the number of valid leases
// matching the selector.
CountServerLeases bool
// Namespace where lease objects are managed.
LeaseNamespace string
// Labels on which lease objects are managed.
LeaseLabel string
// Path to kubeconfig (used by kubernetes client for lease listing)
KubeconfigPath string
// Content type of requests sent to apiserver.
Expand Down Expand Up @@ -132,6 +136,8 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet {
flags.BoolVar(&o.SyncForever, "sync-forever", o.SyncForever, "If true, the agent continues syncing, in order to support server count changes.")
flags.IntVar(&o.XfrChannelSize, "xfr-channel-size", 150, "Set the size of the channel for transferring data between the agent and the proxy server.")
flags.BoolVar(&o.CountServerLeases, "count-server-leases", o.CountServerLeases, "Enables lease counting system to determine the number of proxy servers to connect to.")
flags.StringVar(&o.LeaseNamespace, "lease-namespace", o.LeaseNamespace, "Namespace where lease objects are managed.")
flags.StringVar(&o.LeaseLabel, "lease-label", o.LeaseLabel, "The labels on which the lease objects are managed.")
flags.StringVar(&o.KubeconfigPath, "kubeconfig", o.KubeconfigPath, "Path to the kubeconfig file")
flags.StringVar(&o.APIContentType, "kube-api-content-type", o.APIContentType, "Content type of requests sent to apiserver.")
return flags
Expand Down Expand Up @@ -159,6 +165,9 @@ func (o *GrpcProxyAgentOptions) Print() {
klog.V(1).Infof("AgentIdentifiers set to %s.\n", util.PrettyPrintURL(o.AgentIdentifiers))
klog.V(1).Infof("WarnOnChannelLimit set to %t.\n", o.WarnOnChannelLimit)
klog.V(1).Infof("SyncForever set to %v.\n", o.SyncForever)
klog.V(1).Infof("CountServerLeases set to %v.\n", o.CountServerLeases)
klog.V(1).Infof("LeaseNamespace set to %s.\n", o.LeaseNamespace)
klog.V(1).Infof("LeaseLabel set to %s.\n", o.LeaseLabel)
klog.V(1).Infof("ChannelSize set to %d.\n", o.XfrChannelSize)
klog.V(1).Infof("APIContentType set to %v.\n", o.APIContentType)
}
Expand Down Expand Up @@ -216,6 +225,13 @@ func (o *GrpcProxyAgentOptions) Validate() error {
return fmt.Errorf("error checking KubeconfigPath %q, got %v", o.KubeconfigPath, err)
}
}
// Validate labels provided.
if o.CountServerLeases {
_, err := util.ParseLabels(o.LeaseLabel)
if err != nil {
return err
}
}

return nil
}
Expand Down Expand Up @@ -263,6 +279,8 @@ func NewGrpcProxyAgentOptions() *GrpcProxyAgentOptions {
SyncForever: false,
XfrChannelSize: 150,
CountServerLeases: false,
LeaseNamespace: "kube-system",
LeaseLabel: "k8s-app=konnectivity-server",
KubeconfigPath: "",
APIContentType: runtime.ContentTypeProtobuf,
}
Expand Down
5 changes: 2 additions & 3 deletions cmd/agent/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ import (

const (
ReadHeaderTimeout = 60 * time.Second
LeaseNamespace = "kube-system"
LeaseInformerResync = time.Second * 10
)

Expand Down Expand Up @@ -163,11 +162,11 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, drainCh, st
if err != nil {
return nil, fmt.Errorf("failed to create kubernetes clientset: %v", err)
}
leaseInformer := agent.NewLeaseInformerWithMetrics(k8sClient, LeaseNamespace, LeaseInformerResync)
leaseInformer := agent.NewLeaseInformerWithMetrics(k8sClient, o.LeaseNamespace, LeaseInformerResync)
go leaseInformer.Run(stopCh)
cache.WaitForCacheSync(stopCh, leaseInformer.HasSynced)
leaseLister := coordinationv1lister.NewLeaseLister(leaseInformer.GetIndexer())
serverLeaseSelector, _ := labels.Parse("k8s-app=konnectivity-server")
serverLeaseSelector, _ := labels.Parse(o.LeaseLabel)
serverLeaseCounter := agent.NewServerLeaseCounter(
clock.RealClock{},
leaseLister,
Expand Down
18 changes: 18 additions & 0 deletions cmd/server/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ type ProxyRunOptions struct {

// Lease controller configuration
EnableLeaseController bool
// Lease Namespace
LeaseNamespace string
// Lease Labels
LeaseLabel string
}

func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
Expand Down Expand Up @@ -146,6 +150,8 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
flags.StringSliceVar(&o.CipherSuites, "cipher-suites", o.CipherSuites, "The comma separated list of allowed cipher suites. Has no effect on TLS1.3. Empty means allow default list.")
flags.IntVar(&o.XfrChannelSize, "xfr-channel-size", o.XfrChannelSize, "The size of the two KNP server channels used in server for transferring data. One channel is for data coming from the Kubernetes API Server, and the other one is for data coming from the KNP agent.")
flags.BoolVar(&o.EnableLeaseController, "enable-lease-controller", o.EnableLeaseController, "Enable lease controller to publish and garbage collect proxy server leases.")
flags.StringVar(&o.LeaseNamespace, "lease-namespace", o.LeaseNamespace, "The namespace where lease objects are managed by the controller.")
flags.StringVar(&o.LeaseLabel, "lease-label", o.LeaseLabel, "The labels on which the lease objects are managed.")
flags.Bool("warn-on-channel-limit", true, "This behavior is now thread safe and always on. This flag will be removed in a future release.")
flags.MarkDeprecated("warn-on-channel-limit", "This behavior is now thread safe and always on. This flag will be removed in a future release.")

Expand Down Expand Up @@ -184,6 +190,9 @@ func (o *ProxyRunOptions) Print() {
klog.V(1).Infof("KubeconfigBurst set to %d.\n", o.KubeconfigBurst)
klog.V(1).Infof("APIContentType set to %v.\n", o.APIContentType)
klog.V(1).Infof("ProxyStrategies set to %q.\n", o.ProxyStrategies)
klog.V(1).Infof("EnableLeaseController set to %v.\n", o.EnableLeaseController)
klog.V(1).Infof("LeaseNamespace set to %s.\n", o.LeaseNamespace)
klog.V(1).Infof("LeaseLabel set to %s.\n", o.LeaseLabel)
klog.V(1).Infof("CipherSuites set to %q.\n", o.CipherSuites)
klog.V(1).Infof("XfrChannelSize set to %d.\n", o.XfrChannelSize)
}
Expand Down Expand Up @@ -321,6 +330,13 @@ func (o *ProxyRunOptions) Validate() error {
}
}
}
// Validate labels provided.
if o.EnableLeaseController {
_, err := util.ParseLabels(o.LeaseLabel)
if err != nil {
return err
}
}

return nil
}
Expand Down Expand Up @@ -361,6 +377,8 @@ func NewProxyRunOptions() *ProxyRunOptions {
CipherSuites: make([]string, 0),
XfrChannelSize: 10,
EnableLeaseController: false,
LeaseNamespace: "kube-system",
LeaseLabel: "k8s-app=konnectivity-server",
}
return &o
}
Expand Down
10 changes: 7 additions & 3 deletions cmd/server/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ const (
LeaseDuration = 30 * time.Second
LeaseRenewalInterval = 15 * time.Second
LeaseGCInterval = 15 * time.Second
LeaseNamespace = "kube-system"
)

func NewProxyCommand(p *Proxy, o *options.ProxyRunOptions) *cobra.Command {
Expand Down Expand Up @@ -156,6 +155,11 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
}
defer p.agentServer.Stop()

labels, err := util.ParseLabels(o.LeaseLabel)
if err != nil {
return err
}

if o.EnableLeaseController {
leaseController := leases.NewController(
k8sClient,
Expand All @@ -164,8 +168,8 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
LeaseRenewalInterval,
LeaseGCInterval,
fmt.Sprintf("konnectivity-proxy-server-%v", o.ServerID),
LeaseNamespace,
map[string]string{"k8s-app": "konnectivity-server"},
o.LeaseNamespace,
labels,
)
klog.V(1).Infoln("Starting lease acquisition and garbage collection controller.")
leaseController.Run(ctx)
Expand Down
27 changes: 27 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package util

import (
"fmt"
"strings"
)

// ParseLabels takes a comma-separated string of key-value pairs and returns a map of labels.
func ParseLabels(labelStr string) (map[string]string, error) {
labels := make(map[string]string)

if len(labelStr) == 0 {
return labels, fmt.Errorf("empty string provided")
}
pairs := strings.Split(labelStr, ",")

for _, pair := range pairs {
keyValue := strings.Split(pair, "=")
if len(keyValue) != 2 {
return nil, fmt.Errorf("invalid label format: %s", pair)
}
key := strings.TrimSpace(keyValue[0])
value := strings.TrimSpace(keyValue[1])
labels[key] = value
}
return labels, nil
}
74 changes: 74 additions & 0 deletions pkg/util/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package util

import (
"testing"
)

func TestParseLabels(t *testing.T) {
testCases := []struct {
input string
expectedOutput map[string]string
shouldError bool
}{
{
input: "app=myapp,env=prod,version=1.0",
expectedOutput: map[string]string{
"app": "myapp",
"env": "prod",
"version": "1.0",
},
shouldError: false,
},
{
input: "app=myapp,env=prod,invalid",
expectedOutput: nil,
shouldError: true,
},
{
input: "app=myapp",
expectedOutput: map[string]string{
"app": "myapp",
},
shouldError: false,
},
{
input: "",
expectedOutput: map[string]string{},
shouldError: true,
},
{
input: " key = value , another = test ",
expectedOutput: map[string]string{
"key": "value",
"another": "test",
},
shouldError: false,
},
}

for _, tc := range testCases {
output, err := ParseLabels(tc.input)

// Check for unexpected errors or missing errors
if tc.shouldError && err == nil {
t.Errorf("expected error for input %q but got none", tc.input)
continue
}
if !tc.shouldError && err != nil {
t.Errorf("did not expect error for input %q but got: %v", tc.input, err)
continue
}

// Compare maps if there was no error
if !tc.shouldError {
if len(output) != len(tc.expectedOutput) {
t.Errorf("for input %q, expected map length %d but got %d", tc.input, len(tc.expectedOutput), len(output))
}
for key, expectedValue := range tc.expectedOutput {
if output[key] != expectedValue {
t.Errorf("for input %q, expected %q=%q but got %q=%q", tc.input, key, expectedValue, key, output[key])
}
}
}
}
}

0 comments on commit ea0a0d3

Please sign in to comment.