Skip to content

Commit

Permalink
upgrade client-go to 1.19.4
Browse files Browse the repository at this point in the history
  • Loading branch information
luthermonson committed Nov 16, 2020
1 parent 74beccc commit ba8c216
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 29 deletions.
19 changes: 10 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ func usage() {
os.Exit(0)
}

func newSubnetManager() (subnet.Manager, error) {
func newSubnetManager(ctx context.Context) (subnet.Manager, error) {
if opts.kubeSubnetMgr {
return kube.NewSubnetManager(opts.kubeApiUrl, opts.kubeConfigFile, opts.kubeAnnotationPrefix, opts.netConfPath)
return kube.NewSubnetManager(ctx, opts.kubeApiUrl, opts.kubeConfigFile, opts.kubeAnnotationPrefix, opts.netConfPath)
}

cfg := &etcdv2.EtcdConfig{
Expand Down Expand Up @@ -238,7 +238,14 @@ func main() {
}
}

sm, err := newSubnetManager()
// This is the main context that everything should run in.
// All spawned goroutines should exit when cancel is called on this context.
// Go routines spawned from main.go coordinate using a WaitGroup. This provides a mechanism to allow the shutdownHandler goroutine
// to block until all the goroutines return . If those goroutines spawn other goroutines then they are responsible for
// blocking and returning only when cancel() is called.
ctx, cancel := context.WithCancel(context.Background())

sm, err := newSubnetManager(ctx)
if err != nil {
log.Error("Failed to create SubnetManager: ", err)
os.Exit(1)
Expand All @@ -250,12 +257,6 @@ func main() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)

// This is the main context that everything should run in.
// All spawned goroutines should exit when cancel is called on this context.
// Go routines spawned from main.go coordinate using a WaitGroup. This provides a mechanism to allow the shutdownHandler goroutine
// to block until all the goroutines return . If those goroutines spawn other goroutines then they are responsible for
// blocking and returning only when cancel() is called.
ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}

wg.Add(1)
Expand Down
33 changes: 13 additions & 20 deletions subnet/kube/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (

"github.com/coreos/flannel/pkg/ip"
"github.com/coreos/flannel/subnet"

"github.com/golang/glog"
"golang.org/x/net/context"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -36,8 +36,6 @@ import (
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
Expand All @@ -62,8 +60,7 @@ type kubeSubnetManager struct {
events chan subnet.Event
}

func NewSubnetManager(apiUrl, kubeconfig, prefix, netConfPath string) (subnet.Manager, error) {

func NewSubnetManager(ctx context.Context, apiUrl, kubeconfig, prefix, netConfPath string) (subnet.Manager, error) {
var cfg *rest.Config
var err error
// Try to build kubernetes config from a master url or a kubeconfig filepath. If neither masterUrl
Expand All @@ -90,7 +87,7 @@ func NewSubnetManager(apiUrl, kubeconfig, prefix, netConfPath string) (subnet.Ma
return nil, fmt.Errorf("env variables POD_NAME and POD_NAMESPACE must be set")
}

pod, err := c.Pods(podNamespace).Get(podName, metav1.GetOptions{})
pod, err := c.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error retrieving pod spec for '%s/%s': %v", podNamespace, podName, err)
}
Expand All @@ -110,7 +107,7 @@ func NewSubnetManager(apiUrl, kubeconfig, prefix, netConfPath string) (subnet.Ma
return nil, fmt.Errorf("error parsing subnet config: %s", err)
}

sm, err := newKubeSubnetManager(c, sc, nodeName, prefix)
sm, err := newKubeSubnetManager(ctx, c, sc, nodeName, prefix)
if err != nil {
return nil, fmt.Errorf("error creating network manager: %s", err)
}
Expand All @@ -128,7 +125,7 @@ func NewSubnetManager(apiUrl, kubeconfig, prefix, netConfPath string) (subnet.Ma
return sm, nil
}

func newKubeSubnetManager(c clientset.Interface, sc *subnet.Config, nodeName, prefix string) (*kubeSubnetManager, error) {
func newKubeSubnetManager(ctx context.Context, c clientset.Interface, sc *subnet.Config, nodeName, prefix string) (*kubeSubnetManager, error) {
var err error
var ksm kubeSubnetManager
ksm.annotations, err = newAnnotations(prefix)
Expand All @@ -142,10 +139,10 @@ func newKubeSubnetManager(c clientset.Interface, sc *subnet.Config, nodeName, pr
indexer, controller := cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return ksm.client.CoreV1().Nodes().List(options)
return ksm.client.CoreV1().Nodes().List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return ksm.client.CoreV1().Nodes().Watch(options)
return ksm.client.CoreV1().Nodes().Watch(ctx, options)
},
},
&v1.Node{},
Expand Down Expand Up @@ -224,12 +221,8 @@ func (ksm *kubeSubnetManager) AcquireLease(ctx context.Context, attrs *subnet.Le
if err != nil {
return nil, err
}
nobj, err := api.Scheme.DeepCopy(cachedNode)
if err != nil {
return nil, err
}
n := nobj.(*v1.Node)

n := cachedNode.DeepCopy()
if n.Spec.PodCIDR == "" {
return nil, fmt.Errorf("node %q pod cidr not assigned", ksm.nodeName)
}
Expand Down Expand Up @@ -275,12 +268,12 @@ func (ksm *kubeSubnetManager) AcquireLease(ctx context.Context, attrs *subnet.Le
return nil, fmt.Errorf("failed to create patch for node %q: %v", ksm.nodeName, err)
}

_, err = ksm.client.CoreV1().Nodes().Patch(ksm.nodeName, types.StrategicMergePatchType, patchBytes, "status")
_, err = ksm.client.CoreV1().Nodes().Patch(ctx, ksm.nodeName, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
if err != nil {
return nil, err
}
}
err = ksm.setNodeNetworkUnavailableFalse()
err = ksm.setNodeNetworkUnavailableFalse(ctx)
if err != nil {
glog.Errorf("Unable to set NetworkUnavailable to False for %q: %v", ksm.nodeName, err)
}
Expand All @@ -298,7 +291,7 @@ func (ksm *kubeSubnetManager) WatchLeases(ctx context.Context, cursor interface{
Events: []subnet.Event{event},
}, nil
case <-ctx.Done():
return subnet.LeaseWatchResult{}, nil
return subnet.LeaseWatchResult{}, context.Canceled
}
}

Expand Down Expand Up @@ -340,7 +333,7 @@ func (ksm *kubeSubnetManager) Name() string {

// Set Kubernetes NodeNetworkUnavailable to false when starting
// https://kubernetes.io/docs/concepts/architecture/nodes/#condition
func (ksm *kubeSubnetManager) setNodeNetworkUnavailableFalse() error {
func (ksm *kubeSubnetManager) setNodeNetworkUnavailableFalse(ctx context.Context) error {
condition := v1.NodeCondition{
Type: v1.NodeNetworkUnavailable,
Status: v1.ConditionFalse,
Expand All @@ -354,6 +347,6 @@ func (ksm *kubeSubnetManager) setNodeNetworkUnavailableFalse() error {
return err
}
patch := []byte(fmt.Sprintf(`{"status":{"conditions":%s}}`, raw))
_, err = ksm.client.CoreV1().Nodes().PatchStatus(ksm.nodeName, patch)
_, err = ksm.client.CoreV1().Nodes().PatchStatus(ctx, ksm.nodeName, patch)
return err
}

0 comments on commit ba8c216

Please sign in to comment.