-
Notifications
You must be signed in to change notification settings - Fork 752
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Wait until the discover controller cache is synced #738
Conversation
pkg/k8sapi/discovery.go
Outdated
@@ -185,6 +185,11 @@ func (d *Controller) K8SGetLocalPodIPs() ([]*K8SPodInfo, error) { | |||
return localPods, nil | |||
} | |||
|
|||
func (d *Controller) WaitUntilReady() { | |||
stopCh := make(chan struct{}) | |||
cache.WaitForCacheSync(stopCh, d.controller.informer.HasSynced) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds like something we should do, but the go docs say WaitForNamedCacheSync
should be preferred. Could you change to used that function instead? Also, I'll try to test this some before it gets merged.
main.go
Outdated
@@ -53,6 +53,10 @@ func _main() int { | |||
discoverController := k8sapi.NewController(kubeClient) | |||
go discoverController.DiscoverK8SPods() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want to wait for pods to be discovered before moving on, then why not just move the DiscoverK8SPods()
call into the NewController
function and perform this initialization in the controller's construction instead of in a separate goroutine here? Wouldn't that be simpler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought briefly about this before I made this change. This is used for DiscoveryController today but might be a too strong assumption to say this would be the case going forward. I see this logic to be equivalent to an initialization logic, why start the RPC server or any other process when the required initialization is not done.
pkg/k8sapi/discovery.go
Outdated
@@ -185,6 +185,11 @@ func (d *Controller) K8SGetLocalPodIPs() ([]*K8SPodInfo, error) { | |||
return localPods, nil | |||
} | |||
|
|||
func (d *Controller) WaitUntilReady() { | |||
stopCh := make(chan struct{}) | |||
cache.WaitForCacheSync(stopCh, d.controller.informer.HasSynced) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about the Controller.synced
field? In Controller.run
, the synced
field is set to true:
go d.controller.informer.Run(stopCh)
log.Info("Waiting for controller cache sync")
// Wait for all involved caches to be synced, before processing items from the queue is started
if !cache.WaitForCacheSync(stopCh, d.controller.informer.HasSynced) {
log.Error("Timed out waiting for caches to sync!")
runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
log.Info("Synced successfully with APIServer")
d.synced = true
It seems to me that the simplest solution to this problem is to constrain all this setup/cache-prep logic to the construction of the Controller
object itself by creating the SharedInformer
structs in the NewController
function and then returning the ready-to-run Controller
object. You could remove the Controller.synced
field entirely if you did this, and the code would become a whole lot easier to read.
39ab3c8
to
0df2e37
Compare
0df2e37
to
0108430
Compare
|
||
log.Info("Waiting for controller cache sync") | ||
// Wait for all involved caches to be synced, before processing items from the queue is started | ||
if !cache.WaitForCacheSync(stopCh, d.controller.informer.HasSynced) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On a future PR (not this one), we should use the WaitForNamedCacheSync
method that @mogren alluded to in an earlier comment:
Which would basically remove the need for the log lines here.
@@ -68,15 +68,31 @@ type Controller struct { | |||
controller *controller | |||
kubeClient kubernetes.Interface | |||
myNodeName string | |||
synced bool | |||
} | |||
|
|||
// NewController creates a new DiscoveryController |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update the docstring comment here to indicate that the function constructs the caches (and connects to the k8s API server to do so) before returning a fully-constructed controller object.
log.Errorf("Failed to create new Discover Controller: %v", err) | ||
return 1 | ||
} | ||
|
||
go discoverController.DiscoverK8SPods() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to go inside the NewController()
function and be synchronously executed (see below)
|
||
log.Info("Starting Pod informer") | ||
|
||
go d.controller.informer.Run(stopCh) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
d.controller
will be nil
here, since d.controller
is created in the DiscoverK8sPods()
method:
amazon-vpc-cni-k8s/pkg/k8sapi/discovery.go
Line 155 in 7c6dec7
d.controller = newController(queue, indexer, informer) |
What you need to do is basically get rid of the DiscoverK8SPods()
method entirely, and move the code that is in there into the NewController()
function.
The only reason that the DiscoverK8SPods()
method even exists as a separate struct method apparently is to "start the controller" by doing:
select {}
but that's really kinda hacky IMHO. A cleaner solution would be to simply move all this setup code into NewController()
:
// create the pod watcher
podListWatcher := cache.NewListWatchFromClient(d.kubeClient.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector("spec.nodeName", d.myNodeName))
// create the workqueue
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
// Bind the workqueue to a cache with the help of an informer. This way we make sure that
// whenever the cache is updated, the pod key is added to the workqueue.
// Note that when we finally process the item from the workqueue, we might see a newer version
// of the Pod than the version which was responsible for triggering the update.
indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
UpdateFunc: func(old interface{}, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
// IndexerInformer uses a delta queue, therefore for deletes we have to use this
// key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
}, cache.Indexers{})
d.controller = newController(queue, indexer, informer)
and move this code:
// Now let's start the controller
stop := make(chan struct{})
defer close(stop)
go d.run(1, stop)
// Wait forever
select {}
to a separate Controller.Listen()
method that you can call after returning successfully from NewController()
just noticed the original bug report was for a rc version of 1.6 so answering my own question with a YES. |
Replaced by changes in #972 |
#711
This change makes sure we don't have a race condition when the ipamD agent initially starts. In the current state, without these changes, there is a possibility that ipamD would start assigning IPs to pods before its internal state is synced
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.