Skip to content

Commit

Permalink
chore: refactoring and improve coverage for K8s Sync (#466)
Browse files Browse the repository at this point in the history
Signed-off-by: Giovanni Liva <[email protected]>
  • Loading branch information
thisthat authored Mar 8, 2023
1 parent 5b85b2a commit 6dc441e
Show file tree
Hide file tree
Showing 5 changed files with 399 additions and 62 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ require (
github.com/cucumber/messages-go/v16 v16.0.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.10.1 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ=
github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84=
github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJCLunww=
github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4=
github.com/flowstack/go-jsonschema v0.1.1/go.mod h1:yL7fNggx1o8rm9RlgXv7hTBWxdBM0rVwpMwimd3F3N0=
Expand Down
24 changes: 17 additions & 7 deletions pkg/runtime/from_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,13 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error {
)
rtLogger.Debug(fmt.Sprintf("using filepath sync-provider for: %q", uri))
case regCrd.Match(uriB):
k, err := r.newK8s(uri, logger)
if err != nil {
return err
}
r.SyncImpl = append(
r.SyncImpl,
r.newK8s(uri, logger),
k,
)
rtLogger.Debug(fmt.Sprintf("using kubernetes sync-provider for: %s", uri))
case regURL.Match(uriB):
Expand Down Expand Up @@ -127,15 +131,21 @@ func (r *Runtime) newHTTP(uri string, logger *logger.Logger) *httpSync.Sync {
}
}

func (r *Runtime) newK8s(uri string, logger *logger.Logger) *kubernetes.Sync {
return &kubernetes.Sync{
Logger: logger.WithFields(
func (r *Runtime) newK8s(uri string, logger *logger.Logger) (*kubernetes.Sync, error) {
reader, dynamic, err := kubernetes.GetClients()
if err != nil {
return nil, err
}
return kubernetes.NewK8sSync(
logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "kubernetes"),
),
URI: regCrd.ReplaceAllString(uri, ""),
ProviderArgs: r.config.ProviderArgs,
}
regCrd.ReplaceAllString(uri, ""),
r.config.ProviderArgs,
reader,
dynamic,
), nil
}

func (r *Runtime) newFile(uri string, logger *logger.Logger) *file.Sync {
Expand Down
112 changes: 66 additions & 46 deletions pkg/sync/kubernetes/kubernetes_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,56 @@ import (
)

var (
resyncPeriod = 1 * time.Minute
apiVersion = fmt.Sprintf("%s/%s", v1alpha1.GroupVersion.Group, v1alpha1.GroupVersion.Version)
resyncPeriod = 1 * time.Minute
apiVersion = fmt.Sprintf("%s/%s", v1alpha1.GroupVersion.Group, v1alpha1.GroupVersion.Version)
featureFlagConfigurationResource = v1alpha1.GroupVersion.WithResource("featureflagconfigurations")
)

type Sync struct {
Logger *logger.Logger
ProviderArgs sync.ProviderArgs
URI string

ready bool
namespace string
crdName string
readClient client.Reader
informer cache.SharedInformer
URI string

ready bool
namespace string
crdName string
logger *logger.Logger
providerArgs sync.ProviderArgs
readClient client.Reader
dynamicClient dynamic.Interface
informer cache.SharedInformer
}

func NewK8sSync(
logger *logger.Logger,
uri string,
providerArgs sync.ProviderArgs,
reader client.Reader,
dynamic dynamic.Interface,
) *Sync {
return &Sync{
logger: logger,
URI: uri,
providerArgs: providerArgs,
readClient: reader,
dynamicClient: dynamic,
}
}

func GetClients() (client.Reader, dynamic.Interface, error) {
clusterConfig, err := k8sClusterConfig()
if err != nil {
return nil, nil, err
}

readClient, err := client.New(clusterConfig, client.Options{Scheme: scheme.Scheme})
if err != nil {
return nil, nil, err
}

dynamicClient, err := dynamic.NewForConfig(clusterConfig)
if err != nil {
return nil, nil, err
}
return readClient, dynamicClient, nil
}

func (k *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error {
Expand All @@ -59,28 +95,12 @@ func (k *Sync) Init(ctx context.Context) error {
if err := v1alpha1.AddToScheme(scheme.Scheme); err != nil {
return err
}
clusterConfig, err := k8sClusterConfig()
if err != nil {
return err
}

k.readClient, err = client.New(clusterConfig, client.Options{Scheme: scheme.Scheme})
if err != nil {
return err
}

dynamicClient, err := dynamic.NewForConfig(clusterConfig)
if err != nil {
return err
}

resource := v1alpha1.GroupVersion.WithResource("featureflagconfigurations")

// The created informer will not do resyncs if the given defaultEventHandlerResyncPeriod is zero.
// For more details on resync implications refer to tools/cache/shared_informer.go
factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, resyncPeriod, k.namespace, nil)
factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(k.dynamicClient, resyncPeriod, k.namespace, nil)

k.informer = factory.ForResource(resource).Informer()
k.informer = factory.ForResource(featureFlagConfigurationResource).Informer()

return nil
}
Expand All @@ -90,12 +110,12 @@ func (k *Sync) IsReady() bool {
}

func (k *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
k.Logger.Info(fmt.Sprintf("starting kubernetes sync notifier for resource: %s", k.URI))
k.logger.Info(fmt.Sprintf("starting kubernetes sync notifier for resource: %s", k.URI))

// Initial fetch
fetch, err := k.fetch(ctx)
if err != nil {
k.Logger.Error(fmt.Sprintf("error with the initial fetch: %s", err.Error()))
k.logger.Error(fmt.Sprintf("error with the initial fetch: %s", err.Error()))
return err
}

Expand Down Expand Up @@ -131,27 +151,27 @@ func (k *Sync) watcher(ctx context.Context, notifies chan INotify, dataSync chan
case w := <-notifies:
switch w.GetEvent().EventType {
case DefaultEventTypeCreate:
k.Logger.Debug("new configuration created")
k.logger.Debug("new configuration created")
msg, err := k.fetch(ctx)
if err != nil {
k.Logger.Error(fmt.Sprintf("error fetching after create notification: %s", err.Error()))
k.logger.Error(fmt.Sprintf("error fetching after create notification: %s", err.Error()))
continue
}

dataSync <- sync.DataSync{FlagData: msg, Source: k.URI, Type: sync.ALL}
case DefaultEventTypeModify:
k.Logger.Debug("Configuration modified")
k.logger.Debug("Configuration modified")
msg, err := k.fetch(ctx)
if err != nil {
k.Logger.Error(fmt.Sprintf("error fetching after update notification: %s", err.Error()))
k.logger.Error(fmt.Sprintf("error fetching after update notification: %s", err.Error()))
continue
}

dataSync <- sync.DataSync{FlagData: msg, Source: k.URI, Type: sync.ALL}
case DefaultEventTypeDelete:
k.Logger.Debug("configuration deleted")
k.logger.Debug("configuration deleted")
case DefaultEventTypeReady:
k.Logger.Debug("notifier ready")
k.logger.Debug("notifier ready")
k.ready = true
}
}
Expand All @@ -172,7 +192,7 @@ func (k *Sync) fetch(ctx context.Context) (string, error) {
return "", err
}

k.Logger.Debug(fmt.Sprintf("resource %s served from the informer cache", k.URI))
k.logger.Debug(fmt.Sprintf("resource %s served from the informer cache", k.URI))
return configuration.Spec.FeatureFlagSpec, nil
}

Expand All @@ -186,7 +206,7 @@ func (k *Sync) fetch(ctx context.Context) (string, error) {
return "", err
}

k.Logger.Debug(fmt.Sprintf("resource %s served from API server", k.URI))
k.logger.Debug(fmt.Sprintf("resource %s served from API server", k.URI))
return ff.Spec.FeatureFlagSpec, nil
}

Expand All @@ -197,25 +217,25 @@ func (k *Sync) notify(ctx context.Context, c chan<- INotify) {
}
if _, err := k.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
k.Logger.Info(fmt.Sprintf("kube sync notifier event: add: %s %s", objectKey.Namespace, objectKey.Name))
k.logger.Info(fmt.Sprintf("kube sync notifier event: add: %s %s", objectKey.Namespace, objectKey.Name))
if err := commonHandler(obj, objectKey, DefaultEventTypeCreate, c); err != nil {
k.Logger.Warn(err.Error())
k.logger.Warn(err.Error())
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
k.Logger.Info(fmt.Sprintf("kube sync notifier event: update: %s %s", objectKey.Namespace, objectKey.Name))
k.logger.Info(fmt.Sprintf("kube sync notifier event: update: %s %s", objectKey.Namespace, objectKey.Name))
if err := updateFuncHandler(oldObj, newObj, objectKey, c); err != nil {
k.Logger.Warn(err.Error())
k.logger.Warn(err.Error())
}
},
DeleteFunc: func(obj interface{}) {
k.Logger.Info(fmt.Sprintf("kube sync notifier event: delete: %s %s", objectKey.Namespace, objectKey.Name))
k.logger.Info(fmt.Sprintf("kube sync notifier event: delete: %s %s", objectKey.Namespace, objectKey.Name))
if err := commonHandler(obj, objectKey, DefaultEventTypeDelete, c); err != nil {
k.Logger.Warn(err.Error())
k.logger.Warn(err.Error())
}
},
}); err != nil {
k.Logger.Fatal(err.Error())
k.logger.Fatal(err.Error())
}

c <- &Notifier{
Expand Down
Loading

0 comments on commit 6dc441e

Please sign in to comment.