Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: refactoring and improve coverage for K8s Sync #466

Merged
merged 5 commits into from
Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ require (
github.com/cucumber/messages-go/v16 v16.0.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.10.1 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ=
github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84=
github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJCLunww=
github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4=
github.com/flowstack/go-jsonschema v0.1.1/go.mod h1:yL7fNggx1o8rm9RlgXv7hTBWxdBM0rVwpMwimd3F3N0=
Expand Down
24 changes: 17 additions & 7 deletions pkg/runtime/from_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,13 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error {
)
rtLogger.Debug(fmt.Sprintf("using filepath sync-provider for: %q", uri))
case regCrd.Match(uriB):
k, err := r.newK8s(uri, logger)
if err != nil {
return err
}
r.SyncImpl = append(
r.SyncImpl,
r.newK8s(uri, logger),
k,
)
rtLogger.Debug(fmt.Sprintf("using kubernetes sync-provider for: %s", uri))
case regURL.Match(uriB):
Expand Down Expand Up @@ -127,15 +131,21 @@ func (r *Runtime) newHTTP(uri string, logger *logger.Logger) *httpSync.Sync {
}
}

func (r *Runtime) newK8s(uri string, logger *logger.Logger) *kubernetes.Sync {
return &kubernetes.Sync{
Logger: logger.WithFields(
func (r *Runtime) newK8s(uri string, logger *logger.Logger) (*kubernetes.Sync, error) {
reader, dynamic, err := kubernetes.GetClients()
if err != nil {
return nil, err
}
return kubernetes.NewK8sSync(
logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "kubernetes"),
),
URI: regCrd.ReplaceAllString(uri, ""),
ProviderArgs: r.config.ProviderArgs,
}
regCrd.ReplaceAllString(uri, ""),
r.config.ProviderArgs,
reader,
dynamic,
), nil
}

func (r *Runtime) newFile(uri string, logger *logger.Logger) *file.Sync {
Expand Down
112 changes: 66 additions & 46 deletions pkg/sync/kubernetes/kubernetes_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,56 @@ import (
)

var (
resyncPeriod = 1 * time.Minute
apiVersion = fmt.Sprintf("%s/%s", v1alpha1.GroupVersion.Group, v1alpha1.GroupVersion.Version)
resyncPeriod = 1 * time.Minute
apiVersion = fmt.Sprintf("%s/%s", v1alpha1.GroupVersion.Group, v1alpha1.GroupVersion.Version)
featureFlagConfigurationResource = v1alpha1.GroupVersion.WithResource("featureflagconfigurations")
)

type Sync struct {
Logger *logger.Logger
ProviderArgs sync.ProviderArgs
URI string

ready bool
namespace string
crdName string
readClient client.Reader
informer cache.SharedInformer
URI string

ready bool
namespace string
crdName string
logger *logger.Logger
providerArgs sync.ProviderArgs
readClient client.Reader
dynamicClient dynamic.Interface
informer cache.SharedInformer
}

func NewK8sSync(
logger *logger.Logger,
uri string,
providerArgs sync.ProviderArgs,
reader client.Reader,
dynamic dynamic.Interface,
) *Sync {
return &Sync{
logger: logger,
URI: uri,
providerArgs: providerArgs,
readClient: reader,
dynamicClient: dynamic,
}
}

func GetClients() (client.Reader, dynamic.Interface, error) {
clusterConfig, err := k8sClusterConfig()
if err != nil {
return nil, nil, err
}

readClient, err := client.New(clusterConfig, client.Options{Scheme: scheme.Scheme})
if err != nil {
return nil, nil, err
}

dynamicClient, err := dynamic.NewForConfig(clusterConfig)
if err != nil {
return nil, nil, err
}
return readClient, dynamicClient, nil
}

func (k *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error {
Expand All @@ -59,28 +95,12 @@ func (k *Sync) Init(ctx context.Context) error {
if err := v1alpha1.AddToScheme(scheme.Scheme); err != nil {
return err
}
clusterConfig, err := k8sClusterConfig()
if err != nil {
return err
}

k.readClient, err = client.New(clusterConfig, client.Options{Scheme: scheme.Scheme})
if err != nil {
return err
}

dynamicClient, err := dynamic.NewForConfig(clusterConfig)
if err != nil {
return err
}

resource := v1alpha1.GroupVersion.WithResource("featureflagconfigurations")

// The created informer will not do resyncs if the given defaultEventHandlerResyncPeriod is zero.
// For more details on resync implications refer to tools/cache/shared_informer.go
factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, resyncPeriod, k.namespace, nil)
factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(k.dynamicClient, resyncPeriod, k.namespace, nil)

k.informer = factory.ForResource(resource).Informer()
k.informer = factory.ForResource(featureFlagConfigurationResource).Informer()

return nil
}
Expand All @@ -90,12 +110,12 @@ func (k *Sync) IsReady() bool {
}

func (k *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
k.Logger.Info(fmt.Sprintf("starting kubernetes sync notifier for resource: %s", k.URI))
k.logger.Info(fmt.Sprintf("starting kubernetes sync notifier for resource: %s", k.URI))

// Initial fetch
fetch, err := k.fetch(ctx)
if err != nil {
k.Logger.Error(fmt.Sprintf("error with the initial fetch: %s", err.Error()))
k.logger.Error(fmt.Sprintf("error with the initial fetch: %s", err.Error()))
return err
}

Expand Down Expand Up @@ -131,27 +151,27 @@ func (k *Sync) watcher(ctx context.Context, notifies chan INotify, dataSync chan
case w := <-notifies:
switch w.GetEvent().EventType {
case DefaultEventTypeCreate:
k.Logger.Debug("new configuration created")
k.logger.Debug("new configuration created")
msg, err := k.fetch(ctx)
if err != nil {
k.Logger.Error(fmt.Sprintf("error fetching after create notification: %s", err.Error()))
k.logger.Error(fmt.Sprintf("error fetching after create notification: %s", err.Error()))
continue
}

dataSync <- sync.DataSync{FlagData: msg, Source: k.URI, Type: sync.ALL}
case DefaultEventTypeModify:
k.Logger.Debug("Configuration modified")
k.logger.Debug("Configuration modified")
msg, err := k.fetch(ctx)
if err != nil {
k.Logger.Error(fmt.Sprintf("error fetching after update notification: %s", err.Error()))
k.logger.Error(fmt.Sprintf("error fetching after update notification: %s", err.Error()))
continue
}

dataSync <- sync.DataSync{FlagData: msg, Source: k.URI, Type: sync.ALL}
case DefaultEventTypeDelete:
k.Logger.Debug("configuration deleted")
k.logger.Debug("configuration deleted")
case DefaultEventTypeReady:
k.Logger.Debug("notifier ready")
k.logger.Debug("notifier ready")
k.ready = true
}
}
Expand All @@ -172,7 +192,7 @@ func (k *Sync) fetch(ctx context.Context) (string, error) {
return "", err
}

k.Logger.Debug(fmt.Sprintf("resource %s served from the informer cache", k.URI))
k.logger.Debug(fmt.Sprintf("resource %s served from the informer cache", k.URI))
return configuration.Spec.FeatureFlagSpec, nil
}

Expand All @@ -186,7 +206,7 @@ func (k *Sync) fetch(ctx context.Context) (string, error) {
return "", err
}

k.Logger.Debug(fmt.Sprintf("resource %s served from API server", k.URI))
k.logger.Debug(fmt.Sprintf("resource %s served from API server", k.URI))
return ff.Spec.FeatureFlagSpec, nil
}

Expand All @@ -197,25 +217,25 @@ func (k *Sync) notify(ctx context.Context, c chan<- INotify) {
}
if _, err := k.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
k.Logger.Info(fmt.Sprintf("kube sync notifier event: add: %s %s", objectKey.Namespace, objectKey.Name))
k.logger.Info(fmt.Sprintf("kube sync notifier event: add: %s %s", objectKey.Namespace, objectKey.Name))
if err := commonHandler(obj, objectKey, DefaultEventTypeCreate, c); err != nil {
k.Logger.Warn(err.Error())
k.logger.Warn(err.Error())
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
k.Logger.Info(fmt.Sprintf("kube sync notifier event: update: %s %s", objectKey.Namespace, objectKey.Name))
k.logger.Info(fmt.Sprintf("kube sync notifier event: update: %s %s", objectKey.Namespace, objectKey.Name))
if err := updateFuncHandler(oldObj, newObj, objectKey, c); err != nil {
k.Logger.Warn(err.Error())
k.logger.Warn(err.Error())
}
},
DeleteFunc: func(obj interface{}) {
k.Logger.Info(fmt.Sprintf("kube sync notifier event: delete: %s %s", objectKey.Namespace, objectKey.Name))
k.logger.Info(fmt.Sprintf("kube sync notifier event: delete: %s %s", objectKey.Namespace, objectKey.Name))
if err := commonHandler(obj, objectKey, DefaultEventTypeDelete, c); err != nil {
k.Logger.Warn(err.Error())
k.logger.Warn(err.Error())
}
},
}); err != nil {
k.Logger.Fatal(err.Error())
k.logger.Fatal(err.Error())
}

c <- &Notifier{
Expand Down
Loading