From 837b0c673e7e7d4799f100291ca520d22944f22a Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Thu, 13 Oct 2022 12:40:51 +0100 Subject: [PATCH] feat: fixing informer issues (#191) This completely reimplements the informer system. It now avoids the issues using the cache informer watcher which occasionally ommits errors - to my best reckoning it is due to partial object updates not deserialising properly. Signed-off-by: Alex Jones --- go.sum | 1 + .../featureflagconfiguration/clientset.go | 149 --------------- .../featureflagconfiguration.go | 77 -------- pkg/sync/kubernetes/ffclientset.go | 113 ++++++++++++ pkg/sync/kubernetes/kubernetes_sync.go | 172 ++++++++++++++---- 5 files changed, 251 insertions(+), 261 deletions(-) delete mode 100644 pkg/sync/kubernetes/featureflagconfiguration/clientset.go delete mode 100644 pkg/sync/kubernetes/featureflagconfiguration/featureflagconfiguration.go create mode 100644 pkg/sync/kubernetes/ffclientset.go diff --git a/go.sum b/go.sum index 0725287e7..07f0d0ca1 100644 --- a/go.sum +++ b/go.sum @@ -53,6 +53,7 @@ github.com/bufbuild/connect-go v0.5.0 h1:JFbWPWpasBqzM5h/awoRhAXmLERZQlQ5xTn42uf github.com/bufbuild/connect-go v0.5.0/go.mod h1:ZEtBnQ7J/m7bvWOW+H8T/+hKQCzPVfhhhICuvtcnjlI= github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= diff --git a/pkg/sync/kubernetes/featureflagconfiguration/clientset.go b/pkg/sync/kubernetes/featureflagconfiguration/clientset.go deleted file mode 100644 index 19636d5b7..000000000 --- a/pkg/sync/kubernetes/featureflagconfiguration/clientset.go +++ /dev/null @@ -1,149 +0,0 @@ -package featureflagconfiguration - -import ( - "context" - "errors" - "reflect" - "time" - - "github.com/open-feature/flagd/pkg/sync" - ffv1alpha1 "github.com/open-feature/open-feature-operator/apis/core/v1alpha1" - log "github.com/sirupsen/logrus" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/runtime/serializer" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -type FFCInterface interface { - FeatureFlagConfigurations(namespace string) Interface -} - -type FFCClient struct { - restClient rest.Interface -} - -func createFuncHandler(obj interface{}, object client.ObjectKey, c chan<- sync.INotify) error { - if reflect.TypeOf(obj) != reflect.TypeOf(&ffv1alpha1.FeatureFlagConfiguration{}) { - return errors.New("object is not a FeatureFlagConfiguration") - } - if obj.(*ffv1alpha1.FeatureFlagConfiguration).Name == object.Name { - c <- &sync.Notifier{ - Event: sync.Event[sync.DefaultEventType]{ - EventType: sync.DefaultEventTypeCreate, - }, - } - } - return nil -} - -func updateFuncHandler(oldObj interface{}, newObj interface{}, object client.ObjectKey, c chan<- sync.INotify) error { - if reflect.TypeOf(oldObj) != reflect.TypeOf(&ffv1alpha1.FeatureFlagConfiguration{}) { - return errors.New("old object is not a FeatureFlagConfiguration") - } - if reflect.TypeOf(newObj) != reflect.TypeOf(&ffv1alpha1.FeatureFlagConfiguration{}) { - return errors.New("new object is not a FeatureFlagConfiguration") - } - oldObjConfig := oldObj.(*ffv1alpha1.FeatureFlagConfiguration) - newObjConfig := newObj.(*ffv1alpha1.FeatureFlagConfiguration) - if object.Name == newObjConfig.Name && oldObjConfig.ResourceVersion != newObjConfig.ResourceVersion { - // Only update if there is an actual featureFlagSpec change - c <- &sync.Notifier{ - Event: sync.Event[sync.DefaultEventType]{ - EventType: sync.DefaultEventTypeModify, - }, - } - } - return nil -} - -func deleteFuncHandler(obj interface{}, object client.ObjectKey, c chan<- sync.INotify) error { - if reflect.TypeOf(obj) != reflect.TypeOf(&ffv1alpha1.FeatureFlagConfiguration{}) { - return errors.New("object is not a FeatureFlagConfiguration") - } - if obj.(*ffv1alpha1.FeatureFlagConfiguration).Name == object.Name { - c <- &sync.Notifier{ - Event: sync.Event[sync.DefaultEventType]{ - EventType: sync.DefaultEventTypeDelete, - }, - } - } - return nil -} - -// WatchResources watches FeatureFlagConfigurations resources under the given namespace with the given name -// -// - resyncPeriod if non-zero, will re-list the resources this often (you will get OnUpdate -// calls, even if nothing changed). Otherwise, re-list will be delayed as long as possible -// (until the upstream source closes the watch or times out, or you stop the controller). -func WatchResources(ctx context.Context, l log.Entry, clientSet FFCInterface, resyncPeriod time.Duration, - object client.ObjectKey, c chan<- sync.INotify, -) { - ns := "*" - if object.Namespace != "" { - ns = object.Namespace - } - _, ffConfigController := cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { - return clientSet.FeatureFlagConfigurations(ns).List(lo) - }, - WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { - return clientSet.FeatureFlagConfigurations(ns).Watch(lo) - }, - }, - &ffv1alpha1.FeatureFlagConfiguration{}, - resyncPeriod, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - if err := createFuncHandler(obj, object, c); err != nil { - l.Warn(err.Error()) - } - }, - DeleteFunc: func(obj interface{}) { - if err := deleteFuncHandler(obj, object, c); err != nil { - l.Warn(err.Error()) - } - }, - UpdateFunc: func(oldObj, newObj interface{}) { - // This indicates a change to the custom resource - if err := updateFuncHandler(oldObj, newObj, object, c); err != nil { - l.Warn(err.Error()) - } - }, - }, - ) - go ffConfigController.Run(ctx.Done()) -} - -func NewForConfig(config *rest.Config) (*FFCClient, error) { - if config == nil { - return nil, errors.New("rest config is nil") - } - config.ContentConfig.GroupVersion = &schema. - GroupVersion{ - Group: ffv1alpha1.GroupVersion.Group, - Version: ffv1alpha1.GroupVersion.Version, - } - config.APIPath = "/apis" - config.UserAgent = rest.DefaultKubernetesUserAgent() - config.NegotiatedSerializer = serializer.NewCodecFactory(scheme.Scheme) - client, err := rest.RESTClientFor(config) - if err != nil { - return nil, err - } - - return &FFCClient{restClient: client}, nil -} - -func (c *FFCClient) FeatureFlagConfigurations(namespace string) Interface { - return &FeatureFlagClient{ - restClient: c.restClient, - ns: namespace, - } -} diff --git a/pkg/sync/kubernetes/featureflagconfiguration/featureflagconfiguration.go b/pkg/sync/kubernetes/featureflagconfiguration/featureflagconfiguration.go deleted file mode 100644 index 21ac0d3a9..000000000 --- a/pkg/sync/kubernetes/featureflagconfiguration/featureflagconfiguration.go +++ /dev/null @@ -1,77 +0,0 @@ -package featureflagconfiguration - -import ( - "context" - - "github.com/open-feature/open-feature-operator/apis/core/v1alpha1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/rest" -) - -const ( - featureFlagConfigurationName = "featureflagconfigurations" -) - -type Interface interface { - List(opts metav1.ListOptions) (*v1alpha1.FeatureFlagConfigurationList, error) - Get(name string, options metav1.GetOptions) (*v1alpha1.FeatureFlagConfiguration, error) - Create(*v1alpha1.FeatureFlagConfiguration) (*v1alpha1.FeatureFlagConfiguration, error) - Watch(opts metav1.ListOptions) (watch.Interface, error) -} - -type FeatureFlagClient struct { - restClient rest.Interface - ns string -} - -func (c *FeatureFlagClient) List(opts metav1.ListOptions) (*v1alpha1.FeatureFlagConfigurationList, error) { - result := v1alpha1.FeatureFlagConfigurationList{} - err := c.restClient. - Get(). - Resource(featureFlagConfigurationName). - Do(context.Background()). - Into(&result) - - return &result, err -} - -func (c *FeatureFlagClient) Get(name string, opts metav1.GetOptions) (*v1alpha1.FeatureFlagConfiguration, error) { - result := v1alpha1.FeatureFlagConfiguration{} - err := c.restClient. - Get(). - Namespace(c.ns). - Resource(featureFlagConfigurationName). - Name(name). - VersionedParams(&opts, scheme.ParameterCodec). - Do(context.Background()). - Into(&result) - - return &result, err -} - -func (c *FeatureFlagClient) Create(project *v1alpha1.FeatureFlagConfiguration) (*v1alpha1. - FeatureFlagConfiguration, error, -) { - result := v1alpha1.FeatureFlagConfiguration{} - err := c.restClient. - Post(). - Namespace(c.ns). - Resource(featureFlagConfigurationName). - Body(project). - Do(context.Background()). - Into(&result) - - return &result, err -} - -func (c *FeatureFlagClient) Watch(opts metav1.ListOptions) (watch.Interface, error) { - opts.Watch = true - return c.restClient. - Get(). - Namespace(c.ns). - Resource(featureFlagConfigurationName). - VersionedParams(&opts, scheme.ParameterCodec). - Watch(context.Background()) -} diff --git a/pkg/sync/kubernetes/ffclientset.go b/pkg/sync/kubernetes/ffclientset.go new file mode 100644 index 000000000..53527bc0c --- /dev/null +++ b/pkg/sync/kubernetes/ffclientset.go @@ -0,0 +1,113 @@ +package kubernetes + +import ( + "context" + "errors" + + "github.com/open-feature/open-feature-operator/apis/core/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" +) + +type ClientInterface interface { + List(opts metav1.ListOptions) (*v1alpha1.FeatureFlagConfigurationList, error) + Get(name string, options metav1.GetOptions) (*v1alpha1.FeatureFlagConfiguration, error) + Create(*v1alpha1.FeatureFlagConfiguration) (*v1alpha1.FeatureFlagConfiguration, error) + Watch(opts metav1.ListOptions) (watch.Interface, error) +} + +type FeatureFlagConfigurationInterface interface { + FeatureFlagConfigurations(namespace string) ClientInterface +} + +type FeatureFlagConfigurationImpl struct { + restClient rest.Interface + ns string +} + +type FeatureFlagConfigurationRestClient struct { + restClient rest.Interface +} + +func (c *FeatureFlagConfigurationImpl) List(opts metav1.ListOptions) (*v1alpha1.FeatureFlagConfigurationList, error) { + result := v1alpha1.FeatureFlagConfigurationList{} + err := c.restClient. + Get(). + Resource(featureFlagConfigurationName). + Do(context.Background()). + Into(&result) + + return &result, err +} + +func (c *FeatureFlagConfigurationImpl) Get(name string, + opts metav1.GetOptions, +) (*v1alpha1.FeatureFlagConfiguration, error) { + result := v1alpha1.FeatureFlagConfiguration{} + err := c.restClient. + Get(). + Namespace(c.ns). + Resource(featureFlagConfigurationName). + Name(name). + VersionedParams(&opts, scheme.ParameterCodec). + Do(context.Background()). + Into(&result) + + return &result, err +} + +func (c *FeatureFlagConfigurationImpl) Create(project *v1alpha1.FeatureFlagConfiguration) (*v1alpha1. + FeatureFlagConfiguration, error, +) { + result := v1alpha1.FeatureFlagConfiguration{} + err := c.restClient. + Post(). + Namespace(c.ns). + Resource(featureFlagConfigurationName). + Body(project). + Do(context.Background()). + Into(&result) + + return &result, err +} + +func (c *FeatureFlagConfigurationImpl) Watch(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + return c.restClient. + Get(). + Namespace(c.ns). + Resource(featureFlagConfigurationName). + VersionedParams(&opts, scheme.ParameterCodec). + Watch(context.Background()) +} + +func NewForConfig(config *rest.Config) (*FeatureFlagConfigurationRestClient, error) { + if config == nil { + return nil, errors.New("rest config is nil") + } + config.ContentConfig.GroupVersion = &schema. + GroupVersion{ + Group: v1alpha1.GroupVersion.Group, + Version: v1alpha1.GroupVersion.Version, + } + config.APIPath = "/apis" + config.UserAgent = rest.DefaultKubernetesUserAgent() + config.NegotiatedSerializer = serializer.NewCodecFactory(scheme.Scheme) + client, err := rest.RESTClientFor(config) + if err != nil { + return nil, err + } + + return &FeatureFlagConfigurationRestClient{restClient: client}, nil +} + +func (c *FeatureFlagConfigurationRestClient) FeatureFlagConfigurations(namespace string) ClientInterface { + return &FeatureFlagConfigurationImpl{ + restClient: c.restClient, + ns: namespace, + } +} diff --git a/pkg/sync/kubernetes/kubernetes_sync.go b/pkg/sync/kubernetes/kubernetes_sync.go index ade18b9ce..6ecdb0741 100644 --- a/pkg/sync/kubernetes/kubernetes_sync.go +++ b/pkg/sync/kubernetes/kubernetes_sync.go @@ -2,17 +2,23 @@ package kubernetes import ( "context" + "errors" "os" "time" "github.com/open-feature/flagd/pkg/sync" - "github.com/open-feature/flagd/pkg/sync/kubernetes/featureflagconfiguration" - ffv1alpha1 "github.com/open-feature/open-feature-operator/apis/core/v1alpha1" + "github.com/open-feature/open-feature-operator/apis/core/v1alpha1" log "github.com/sirupsen/logrus" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" - controllerClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client" ) const ( @@ -20,12 +26,12 @@ const ( featureFlagNamespaceName = "namespace" ) -var resyncPeriod time.Duration // default of 0 +var resyncPeriod = 1 * time.Minute type Sync struct { Logger *log.Entry ProviderArgs sync.ProviderArgs - client *featureflagconfiguration.FFCClient + client client.Client } func (k *Sync) Source() string { @@ -48,18 +54,29 @@ func (k *Sync) Fetch(ctx context.Context) (string, error) { return "{}", nil } - config, err := k.client.FeatureFlagConfigurations(k.ProviderArgs[featureFlagNamespaceName]). - Get(k.ProviderArgs[featureFlagConfigurationName], metav1.GetOptions{ - TypeMeta: metav1.TypeMeta{ - Kind: "FeatureFlagConfiguration", - APIVersion: "featureflag.open-feature.io/v1alpha1", - }, - }) + var ff v1alpha1.FeatureFlagConfiguration + err := k.client.Get(ctx, client.ObjectKey{ + Name: k.ProviderArgs[featureFlagConfigurationName], + Namespace: k.ProviderArgs[featureFlagNamespaceName], + }, &ff) + + return ff.Spec.FeatureFlagSpec, err +} + +func (k *Sync) buildConfiguration() (*rest.Config, error) { + kubeconfig := os.Getenv("KUBECONFIG") + var clusterConfig *rest.Config + var err error + if kubeconfig != "" { + clusterConfig, err = clientcmd.BuildConfigFromFlags("", kubeconfig) + } else { + clusterConfig, err = rest.InClusterConfig() + } if err != nil { - return "{}", err + return nil, err } - return config.Spec.FeatureFlagSpec, nil + return clusterConfig, nil } func (k *Sync) Notify(ctx context.Context, c chan<- sync.INotify) { @@ -72,36 +89,121 @@ func (k *Sync) Notify(ctx context.Context, c chan<- sync.INotify) { return } k.Logger.Infof("Starting kubernetes sync notifier for resource %s", k.ProviderArgs["featureflagconfiguration"]) - kubeconfig := os.Getenv("KUBECONFIG") - // Create the client configuration - config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + clusterConfig, err := k.buildConfiguration() if err != nil { + k.Logger.Errorf("Error building configuration: %s", err) + } + if err := v1alpha1.AddToScheme(scheme.Scheme); err != nil { k.Logger.Panic(err.Error()) } + k.client, err = client.New(clusterConfig, client.Options{Scheme: scheme.Scheme}) + if err != nil { + k.Logger.Fatalln(err) + } + clusterClient, err := dynamic.NewForConfig(clusterConfig) + if err != nil { + log.Fatalln(err) + } + resource := v1alpha1.GroupVersion.WithResource("featureflagconfigurations") + // The created informer will not do resyncs if the given + // defaultEventHandlerResyncPeriod is zero. + // For more details on resync implications refer to tools/cache/shared_informer.go + factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(clusterClient, + resyncPeriod, corev1.NamespaceAll, nil) + informer := factory.ForResource(resource).Informer() + + objectKey := client.ObjectKey{ + Name: k.ProviderArgs[featureFlagConfigurationName], + Namespace: k.ProviderArgs[featureFlagNamespaceName], + } + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if err := createFuncHandler(obj, objectKey, c); err != nil { + k.Logger.Warn(err.Error()) + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + if err := updateFuncHandler(oldObj, newObj, objectKey, c); err != nil { + k.Logger.Warn(err.Error()) + } + }, + DeleteFunc: func(obj interface{}) { + if err := deleteFuncHandler(obj, objectKey, c); err != nil { + k.Logger.Warn(err.Error()) + } + }, + }) + + informer.Run(ctx.Done()) +} - if k.ProviderArgs["resyncperiod"] != "" { - hr, err := time.ParseDuration(k.ProviderArgs["resyncperiod"]) - if err != nil { - k.Logger.Panic(err.Error()) +func createFuncHandler(obj interface{}, object client.ObjectKey, c chan<- sync.INotify) error { + var ffObj v1alpha1.FeatureFlagConfiguration + u := obj.(*unstructured.Unstructured) + err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &ffObj) + if err != nil { + return err + } + if ffObj.APIVersion != v1alpha1.GroupVersion.Version { + return errors.New("invalid api version") + } + if ffObj.Name == object.Name { + c <- &sync.Notifier{ + Event: sync.Event[sync.DefaultEventType]{ + EventType: sync.DefaultEventTypeCreate, + }, } - resyncPeriod = hr } + return nil +} - k.client, err = featureflagconfiguration.NewForConfig(config) +func updateFuncHandler(oldObj interface{}, newObj interface{}, object client.ObjectKey, c chan<- sync.INotify) error { + var ffOldObj v1alpha1.FeatureFlagConfiguration + u := oldObj.(*unstructured.Unstructured) + err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &ffOldObj) if err != nil { - k.Logger.Panic(err.Error()) + return err } - - if err := ffv1alpha1.AddToScheme(scheme.Scheme); err != nil { - k.Logger.Panic(err.Error()) + if ffOldObj.APIVersion != v1alpha1.GroupVersion.Version { + return errors.New("invalid api version") + } + var ffNewObj v1alpha1.FeatureFlagConfiguration + u = newObj.(*unstructured.Unstructured) + err = runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &ffNewObj) + if err != nil { + return err + } + if ffNewObj.APIVersion != v1alpha1.GroupVersion.Version { + return errors.New("invalid api version") + } + if object.Name == ffNewObj.Name && ffOldObj.ResourceVersion != ffNewObj.ResourceVersion { + // Only update if there is an actual featureFlagSpec change + c <- &sync.Notifier{ + Event: sync.Event[sync.DefaultEventType]{ + EventType: sync.DefaultEventTypeModify, + }, + } } + return nil +} - go featureflagconfiguration.WatchResources(ctx, *k.Logger.WithFields(log.Fields{ - "sync": "kubernetes", - "component": "watchresources", - }), k.client, resyncPeriod, controllerClient.ObjectKey{ - Name: k.ProviderArgs[featureFlagConfigurationName], - Namespace: k.ProviderArgs[featureFlagNamespaceName], - }, c) +func deleteFuncHandler(obj interface{}, object client.ObjectKey, c chan<- sync.INotify) error { + var ffObj v1alpha1.FeatureFlagConfiguration + u := obj.(*unstructured.Unstructured) + err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &ffObj) + if err != nil { + return err + } + if ffObj.APIVersion != v1alpha1.GroupVersion.Version { + return errors.New("invalid api version") + } + if ffObj.Name == object.Name { + c <- &sync.Notifier{ + Event: sync.Event[sync.DefaultEventType]{ + EventType: sync.DefaultEventTypeDelete, + }, + } + } + return nil }