From 0c7f78a95fa4ad2a8b2afe2f6023b9c6d4fd48ed Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Thu, 7 Dec 2023 06:59:17 +0100 Subject: [PATCH] chore: refactoring component structure (#1044) This PR refactors the core package by restructuring the components responsible for managing the subscriptions, as well as the creation of sync sources. The following changes have been made: - Renamed `sync-store` package to `subscriptions`. This has been done to avoid confusion because we already have a `sync`, and a `store`. package. Within the package, the `SyncStore`. has been renamed to `subscriptions.Manager`, which should reflect its responsibility in a better way. Also, the `syncHandler` has been renamed to `multiplexer`, as this one is responsible for sending updates of a certain target to all subscribers - `syncHandler` was a bit too generic in my opinion. - Moved the `GetSyncSourceFromURI` method to a new package, `sync/builder`, to remove the responsibility of building concrete sync sources from the subscription manager - Moved the logic of retrieving the K8s client config to the sync builder. Previously, this was done in the `runtime` package by calling the respective methods for the config retrieval provided by the `sync/kubernetes` package and then handing that config back to the initialization of the `K8sSync`. Note: This step can potentially be done in a separate PR, if so desired. --------- Signed-off-by: Florian Bacher Co-authored-by: Giovanni Liva --- Makefile | 1 + core/pkg/runtime/from_config.go | 205 +----------- core/pkg/runtime/from_config_test.go | 299 ------------------ core/pkg/service/sync/handler.go | 4 +- core/pkg/service/sync/server.go | 4 +- .../manager.go} | 180 ++++------- .../manager_test.go} | 66 ++-- core/pkg/subscriptions/multiplexer.go | 45 +++ core/pkg/sync-store/interface.go | 26 -- core/pkg/sync/builder/mock/syncbuilder.go | 107 +++++++ core/pkg/sync/builder/syncbuilder.go | 218 +++++++++++++ core/pkg/sync/builder/syncbuilder_test.go | 240 ++++++++++++++ core/pkg/sync/builder/utils.go | 68 ++++ core/pkg/sync/builder/utils_test.go | 210 ++++++++++++ core/pkg/sync/file/filepath_sync.go | 8 + core/pkg/sync/grpc/mock/grpc.go | 4 +- core/pkg/sync/isync.go | 17 +- core/pkg/sync/kubernetes/kubernetes_sync.go | 53 +--- .../sync/kubernetes/kubernetes_sync_test.go | 25 -- flagd-proxy/cmd/start.go | 4 +- flagd/cmd/start.go | 8 +- 21 files changed, 1035 insertions(+), 757 deletions(-) rename core/pkg/{sync-store/sync_store.go => subscriptions/manager.go} (54%) rename core/pkg/{sync-store/sync_store_test.go => subscriptions/manager_test.go} (85%) create mode 100644 core/pkg/subscriptions/multiplexer.go delete mode 100644 core/pkg/sync-store/interface.go create mode 100644 core/pkg/sync/builder/mock/syncbuilder.go create mode 100644 core/pkg/sync/builder/syncbuilder.go create mode 100644 core/pkg/sync/builder/syncbuilder_test.go create mode 100644 core/pkg/sync/builder/utils.go create mode 100644 core/pkg/sync/builder/utils_test.go diff --git a/Makefile b/Makefile index 0e78e7178..0e618f073 100644 --- a/Makefile +++ b/Makefile @@ -72,6 +72,7 @@ mockgen: install-mockgen cd core; mockgen -source=pkg/sync/grpc/credentials/builder.go -destination=pkg/sync/grpc/credentials/mock/builder.go -package=credendialsmock cd core; mockgen -source=pkg/eval/ievaluator.go -destination=pkg/eval/mock/ievaluator.go -package=evalmock cd core; mockgen -source=pkg/service/middleware/interface.go -destination=pkg/service/middleware/mock/interface.go -package=middlewaremock + cd core; mockgen -source=pkg/sync/builder/syncbuilder.go -destination=pkg/sync/builder/mock/syncbuilder.go -package=middlewaremocksyncbuildermock generate-docs: cd flagd; go run ./cmd/doc/main.go diff --git a/core/pkg/runtime/from_config.go b/core/pkg/runtime/from_config.go index 164c8ba71..d49ae4374 100644 --- a/core/pkg/runtime/from_config.go +++ b/core/pkg/runtime/from_config.go @@ -2,13 +2,7 @@ package runtime import ( "context" - "encoding/json" - "errors" "fmt" - "net/http" - "regexp" - msync "sync" - "time" "github.com/open-feature/flagd/core/pkg/eval" "github.com/open-feature/flagd/core/pkg/logger" @@ -16,46 +10,14 @@ import ( flageval "github.com/open-feature/flagd/core/pkg/service/flag-evaluation" "github.com/open-feature/flagd/core/pkg/store" "github.com/open-feature/flagd/core/pkg/sync" - "github.com/open-feature/flagd/core/pkg/sync/file" - "github.com/open-feature/flagd/core/pkg/sync/grpc" - "github.com/open-feature/flagd/core/pkg/sync/grpc/credentials" - httpSync "github.com/open-feature/flagd/core/pkg/sync/http" - "github.com/open-feature/flagd/core/pkg/sync/kubernetes" + syncbuilder "github.com/open-feature/flagd/core/pkg/sync/builder" "github.com/open-feature/flagd/core/pkg/telemetry" - "github.com/robfig/cron" "go.uber.org/zap" ) // from_config is a collection of structures and parsers responsible for deriving flagd runtime -const ( - syncProviderFile = "file" - syncProviderGrpc = "grpc" - syncProviderKubernetes = "kubernetes" - syncProviderHTTP = "http" - svcName = "flagd" -) - -var ( - regCrd *regexp.Regexp - regURL *regexp.Regexp - regGRPC *regexp.Regexp - regGRPCSecure *regexp.Regexp - regFile *regexp.Regexp -) - -// SourceConfig is configuration option for flagd. This maps to startup parameter sources -type SourceConfig struct { - URI string `json:"uri"` - Provider string `json:"provider"` - - BearerToken string `json:"bearerToken,omitempty"` - CertPath string `json:"certPath,omitempty"` - TLS bool `json:"tls,omitempty"` - ProviderID string `json:"providerID,omitempty"` - Selector string `json:"selector,omitempty"` - Interval uint32 `json:"interval,omitempty"` -} +const svcName = "flagd" // Config is the configuration structure derived from startup arguments. type Config struct { @@ -67,18 +29,10 @@ type Config struct { ServicePort uint16 ServiceSocketPath string - SyncProviders []SourceConfig + SyncProviders []sync.SourceConfig CORS []string } -func init() { - regCrd = regexp.MustCompile("^core.openfeature.dev/") - regURL = regexp.MustCompile("^https?://") - regGRPC = regexp.MustCompile("^" + grpc.Prefix) - regGRPCSecure = regexp.MustCompile("^" + grpc.PrefixSecure) - regFile = regexp.MustCompile("^file:") -} - // FromConfig builds a runtime from startup configurations // nolint: funlen func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime, error) { @@ -176,155 +130,12 @@ func setupJSONEvaluator(logger *logger.Logger, s *store.Flags) *eval.JSONEvaluat } // syncProvidersFromConfig is a helper to build ISync implementations from SourceConfig -func syncProvidersFromConfig(logger *logger.Logger, sources []SourceConfig) ([]sync.ISync, error) { - syncImpls := []sync.ISync{} - - for _, syncProvider := range sources { - switch syncProvider.Provider { - case syncProviderFile: - syncImpls = append(syncImpls, NewFile(syncProvider, logger)) - logger.Debug(fmt.Sprintf("using filepath sync-provider for: %q", syncProvider.URI)) - case syncProviderKubernetes: - k, err := NewK8s(syncProvider.URI, logger) - if err != nil { - return nil, err - } - syncImpls = append(syncImpls, k) - logger.Debug(fmt.Sprintf("using kubernetes sync-provider for: %s", syncProvider.URI)) - case syncProviderHTTP: - syncImpls = append(syncImpls, NewHTTP(syncProvider, logger)) - logger.Debug(fmt.Sprintf("using remote sync-provider for: %s", syncProvider.URI)) - case syncProviderGrpc: - syncImpls = append(syncImpls, NewGRPC(syncProvider, logger)) - logger.Debug(fmt.Sprintf("using grpc sync-provider for: %s", syncProvider.URI)) - - default: - return nil, fmt.Errorf("invalid sync provider: %s, must be one of with '%s', '%s', '%s' or '%s'", - syncProvider.Provider, syncProviderFile, syncProviderKubernetes, syncProviderHTTP, syncProviderKubernetes) - } - } - return syncImpls, nil -} - -func NewGRPC(config SourceConfig, logger *logger.Logger) *grpc.Sync { - return &grpc.Sync{ - URI: config.URI, - Logger: logger.WithFields( - zap.String("component", "sync"), - zap.String("sync", "grpc"), - ), - CredentialBuilder: &credentials.CredentialBuilder{}, - CertPath: config.CertPath, - ProviderID: config.ProviderID, - Secure: config.TLS, - Selector: config.Selector, - } -} - -func NewHTTP(config SourceConfig, logger *logger.Logger) *httpSync.Sync { - // Default to 5 seconds - var interval uint32 = 5 - if config.Interval != 0 { - interval = config.Interval - } - - return &httpSync.Sync{ - URI: config.URI, - Client: &http.Client{ - Timeout: time.Second * 10, - }, - Logger: logger.WithFields( - zap.String("component", "sync"), - zap.String("sync", "remote"), - ), - BearerToken: config.BearerToken, - Interval: interval, - Cron: cron.New(), - } -} - -func NewK8s(uri string, logger *logger.Logger) (*kubernetes.Sync, error) { - reader, dynamic, err := kubernetes.GetClients() +func syncProvidersFromConfig(logger *logger.Logger, sources []sync.SourceConfig) ([]sync.ISync, error) { + builder := syncbuilder.NewSyncBuilder() + syncs, err := builder.SyncsFromConfig(sources, logger) if err != nil { - return nil, fmt.Errorf("error creating kubernetes clients: %w", err) - } - return kubernetes.NewK8sSync( - logger.WithFields( - zap.String("component", "sync"), - zap.String("sync", "kubernetes"), - ), - regCrd.ReplaceAllString(uri, ""), - reader, - dynamic, - ), nil -} - -func NewFile(config SourceConfig, logger *logger.Logger) *file.Sync { - return &file.Sync{ - URI: config.URI, - Logger: logger.WithFields( - zap.String("component", "sync"), - zap.String("sync", "filepath"), - ), - Mux: &msync.RWMutex{}, - } -} - -// ParseSources parse a json formatted SourceConfig array string and performs validations on the content -func ParseSources(sourcesFlag string) ([]SourceConfig, error) { - syncProvidersParsed := []SourceConfig{} - - if err := json.Unmarshal([]byte(sourcesFlag), &syncProvidersParsed); err != nil { - return syncProvidersParsed, fmt.Errorf("error parsing sync providers: %w", err) - } - for _, sp := range syncProvidersParsed { - if sp.URI == "" { - return syncProvidersParsed, errors.New("sync provider argument parse: uri is a required field") - } - if sp.Provider == "" { - return syncProvidersParsed, errors.New("sync provider argument parse: provider is a required field") - } + return nil, fmt.Errorf("could not create sync sources from config: %w", err) } - return syncProvidersParsed, nil -} - -// ParseSyncProviderURIs uri flag based sync sources to SourceConfig array. Replaces uri prefixes where necessary to -// derive SourceConfig -func ParseSyncProviderURIs(uris []string) ([]SourceConfig, error) { - syncProvidersParsed := []SourceConfig{} - for _, uri := range uris { - switch uriB := []byte(uri); { - case regFile.Match(uriB): - syncProvidersParsed = append(syncProvidersParsed, SourceConfig{ - URI: regFile.ReplaceAllString(uri, ""), - Provider: syncProviderFile, - }) - case regCrd.Match(uriB): - syncProvidersParsed = append(syncProvidersParsed, SourceConfig{ - URI: regCrd.ReplaceAllString(uri, ""), - Provider: syncProviderKubernetes, - }) - case regURL.Match(uriB): - syncProvidersParsed = append(syncProvidersParsed, SourceConfig{ - URI: uri, - Provider: syncProviderHTTP, - }) - case regGRPC.Match(uriB): - syncProvidersParsed = append(syncProvidersParsed, SourceConfig{ - URI: regGRPC.ReplaceAllString(uri, ""), - Provider: syncProviderGrpc, - }) - case regGRPCSecure.Match(uriB): - syncProvidersParsed = append(syncProvidersParsed, SourceConfig{ - URI: regGRPCSecure.ReplaceAllString(uri, ""), - Provider: syncProviderGrpc, - TLS: true, - }) - default: - return syncProvidersParsed, fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', "+ - "'http(s)://', 'grpc(s)://', or 'core.openfeature.dev'", uri) - } - } - return syncProvidersParsed, nil + return syncs, nil } diff --git a/core/pkg/runtime/from_config_test.go b/core/pkg/runtime/from_config_test.go index 5e41eb18e..e2ad2da32 100644 --- a/core/pkg/runtime/from_config_test.go +++ b/core/pkg/runtime/from_config_test.go @@ -1,7 +1,6 @@ package runtime import ( - "reflect" "testing" "github.com/open-feature/flagd/core/pkg/logger" @@ -9,304 +8,6 @@ import ( "github.com/stretchr/testify/require" ) -func TestParseSource(t *testing.T) { - test := map[string]struct { - in string - expectErr bool - out []SourceConfig - }{ - "simple": { - in: "[{\"uri\":\"config/samples/example_flags.json\",\"provider\":\"file\"}]", - expectErr: false, - out: []SourceConfig{ - { - URI: "config/samples/example_flags.json", - Provider: syncProviderFile, - }, - }, - }, - "multiple-syncs": { - in: `[ - {"uri":"config/samples/example_flags.json","provider":"file"}, - {"uri":"http://test.com","provider":"http","bearerToken":":)"}, - {"uri":"host:port","provider":"grpc"}, - {"uri":"default/my-crd","provider":"kubernetes"} - ]`, - expectErr: false, - out: []SourceConfig{ - { - URI: "config/samples/example_flags.json", - Provider: syncProviderFile, - }, - { - URI: "http://test.com", - Provider: syncProviderHTTP, - BearerToken: ":)", - }, - { - URI: "host:port", - Provider: syncProviderGrpc, - }, - { - URI: "default/my-crd", - Provider: syncProviderKubernetes, - }, - }, - }, - "multiple-syncs-with-options": { - in: `[{"uri":"config/samples/example_flags.json","provider":"file"}, - {"uri":"http://my-flag-source.json","provider":"http","bearerToken":"bearer-dji34ld2l"}, - {"uri":"https://secure-remote","provider":"http","bearerToken":"bearer-dji34ld2l"}, - {"uri":"http://site.com","provider":"http","interval":77 }, - {"uri":"default/my-flag-config","provider":"kubernetes"}, - {"uri":"grpc-source:8080","provider":"grpc"}, - {"uri":"my-flag-source:8080","provider":"grpc", "tls":true, "certPath": "/certs/ca.cert", "providerID": "flagd-weatherapp-sidecar", "selector": "source=database,app=weatherapp"}] - `, - expectErr: false, - out: []SourceConfig{ - { - URI: "config/samples/example_flags.json", - Provider: syncProviderFile, - }, - { - URI: "http://my-flag-source.json", - Provider: syncProviderHTTP, - BearerToken: "bearer-dji34ld2l", - }, - { - URI: "https://secure-remote", - Provider: syncProviderHTTP, - BearerToken: "bearer-dji34ld2l", - }, - { - URI: "http://site.com", - Provider: syncProviderHTTP, - Interval: 77, - }, - { - URI: "default/my-flag-config", - Provider: syncProviderKubernetes, - }, - { - URI: "grpc-source:8080", - Provider: syncProviderGrpc, - }, - { - URI: "my-flag-source:8080", - Provider: syncProviderGrpc, - TLS: true, - CertPath: "/certs/ca.cert", - ProviderID: "flagd-weatherapp-sidecar", - Selector: "source=database,app=weatherapp", - }, - }, - }, - "empty": { - in: `[]`, - expectErr: false, - out: []SourceConfig{}, - }, - "parse-failure": { - in: ``, - expectErr: true, - out: []SourceConfig{}, - }, - } - - for name, tt := range test { - t.Run(name, func(t *testing.T) { - out, err := ParseSources(tt.in) - if tt.expectErr { - if err == nil { - t.Error("expected error, got none") - } - } else if err != nil { - t.Errorf("did not expect error: %s", err.Error()) - } - if !reflect.DeepEqual(out, tt.out) { - t.Errorf("unexpected output, expected %v, got %v", tt.out, out) - } - }) - } -} - -func TestParseSyncProviderURIs(t *testing.T) { - test := map[string]struct { - in []string - expectErr bool - out []SourceConfig - }{ - "simple": { - in: []string{ - "file:my-file.json", - }, - expectErr: false, - out: []SourceConfig{ - { - URI: "my-file.json", - Provider: "file", - }, - }, - }, - "multiple-uris": { - in: []string{ - "file:my-file.json", - "https://test.com", - "grpc://host:port", - "grpcs://secure-grpc", - "core.openfeature.dev/default/my-crd", - }, - expectErr: false, - out: []SourceConfig{ - { - URI: "my-file.json", - Provider: "file", - }, - { - URI: "https://test.com", - Provider: "http", - }, - { - URI: "host:port", - Provider: "grpc", - TLS: false, - }, - { - URI: "secure-grpc", - Provider: "grpc", - TLS: true, - }, - { - URI: "default/my-crd", - Provider: "kubernetes", - }, - }, - }, - "empty": { - in: []string{}, - expectErr: false, - out: []SourceConfig{}, - }, - "parse-failure": { - in: []string{"care.openfeature.dev/will/fail"}, - expectErr: true, - out: []SourceConfig{}, - }, - } - - for name, tt := range test { - t.Run(name, func(t *testing.T) { - out, err := ParseSyncProviderURIs(tt.in) - if tt.expectErr { - if err == nil { - t.Error("expected error, got none") - } - } else if err != nil { - t.Errorf("did not expect error: %s", err.Error()) - } - if !reflect.DeepEqual(out, tt.out) { - t.Errorf("unexpected output, expected %v, got %v", tt.out, out) - } - }) - } -} - -// Note - K8s configuration require K8s client, hence do not use K8s sync provider in this test -func Test_syncProvidersFromConfig(t *testing.T) { - lg := logger.NewLogger(nil, false) - - type args struct { - logger *logger.Logger - sources []SourceConfig - } - - tests := []struct { - name string - args args - wantSyncs int // simply check the count of ISync providers yield from configurations - wantErr bool - }{ - { - name: "Empty", - args: args{ - logger: lg, - sources: []SourceConfig{}, - }, - wantSyncs: 0, - wantErr: false, - }, - { - name: "Error", - args: args{ - logger: lg, - sources: []SourceConfig{ - { - URI: "fake", - Provider: "disk", - }, - }, - }, - wantSyncs: 0, - wantErr: true, - }, - { - name: "single", - args: args{ - logger: lg, - sources: []SourceConfig{ - { - URI: "grpc://host:port", - Provider: syncProviderGrpc, - ProviderID: "myapp", - CertPath: "/tmp/ca.cert", - Selector: "source=database", - }, - }, - }, - wantSyncs: 1, - wantErr: false, - }, - { - name: "combined", - args: args{ - logger: lg, - sources: []SourceConfig{ - { - URI: "grpc://host:port", - Provider: syncProviderGrpc, - ProviderID: "myapp", - CertPath: "/tmp/ca.cert", - Selector: "source=database", - }, - { - URI: "https://host:port", - Provider: syncProviderHTTP, - BearerToken: "token", - }, - { - URI: "/tmp/flags.json", - Provider: syncProviderFile, - }, - }, - }, - wantSyncs: 3, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - syncs, err := syncProvidersFromConfig(tt.args.logger, tt.args.sources) - if (err != nil) != tt.wantErr { - t.Errorf("syncProvidersFromConfig() error = %v, wantErr %v", err, tt.wantErr) - return - } - if tt.wantSyncs != len(syncs) { - t.Errorf("syncProvidersFromConfig() yielded = %v, but expected %v", len(syncs), tt.wantSyncs) - } - }) - } -} - func Test_setupJSONEvaluator(t *testing.T) { lg := logger.NewLogger(nil, false) diff --git a/core/pkg/service/sync/handler.go b/core/pkg/service/sync/handler.go index d9c530615..4dee36909 100644 --- a/core/pkg/service/sync/handler.go +++ b/core/pkg/service/sync/handler.go @@ -7,13 +7,13 @@ import ( rpc "buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc" syncv1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/sync/v1" "github.com/open-feature/flagd/core/pkg/logger" + "github.com/open-feature/flagd/core/pkg/subscriptions" "github.com/open-feature/flagd/core/pkg/sync" - syncStore "github.com/open-feature/flagd/core/pkg/sync-store" ) type handler struct { rpc.UnimplementedFlagSyncServiceServer - syncStore syncStore.ISyncStore + syncStore subscriptions.Manager logger *logger.Logger } diff --git a/core/pkg/service/sync/server.go b/core/pkg/service/sync/server.go index 20dd84f8b..5642dd281 100644 --- a/core/pkg/service/sync/server.go +++ b/core/pkg/service/sync/server.go @@ -12,7 +12,7 @@ import ( rpc "buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc" "github.com/open-feature/flagd/core/pkg/logger" iservice "github.com/open-feature/flagd/core/pkg/service" - syncStore "github.com/open-feature/flagd/core/pkg/sync-store" + "github.com/open-feature/flagd/core/pkg/subscriptions" "github.com/prometheus/client_golang/prometheus/promhttp" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" @@ -32,7 +32,7 @@ type Server struct { metricServerReady bool } -func NewServer(logger *logger.Logger, store syncStore.ISyncStore) *Server { +func NewServer(logger *logger.Logger, store subscriptions.Manager) *Server { return &Server{ handler: &handler{ logger: logger, diff --git a/core/pkg/sync-store/sync_store.go b/core/pkg/subscriptions/manager.go similarity index 54% rename from core/pkg/sync-store/sync_store.go rename to core/pkg/subscriptions/manager.go index b852fe79e..0f009e330 100644 --- a/core/pkg/sync-store/sync_store.go +++ b/core/pkg/subscriptions/manager.go @@ -1,44 +1,45 @@ //nolint:contextcheck -package store +package subscriptions import ( "context" "errors" "fmt" - "regexp" "sync" "time" "github.com/open-feature/flagd/core/pkg/logger" - "github.com/open-feature/flagd/core/pkg/runtime" isync "github.com/open-feature/flagd/core/pkg/sync" - "go.uber.org/zap" + syncbuilder "github.com/open-feature/flagd/core/pkg/sync/builder" ) -var ( - regCrd *regexp.Regexp - regFile *regexp.Regexp -) - -func init() { - regCrd = regexp.MustCompile("^core.openfeature.dev/") - regFile = regexp.MustCompile("^file:") +// Manager defines the interface for the subscription management +type Manager interface { + FetchAllFlags( + ctx context.Context, + key interface{}, + target string, + ) (isync.DataSync, error) + RegisterSubscription( + ctx context.Context, + target string, + key interface{}, + dataSync chan isync.DataSync, + errChan chan error, + ) + + // metrics hooks + GetActiveSubscriptionsInt64() int64 } -type SyncStore struct { +// Coordinator coordinates subscriptions by aggregating subscribers for the same target, and keeping them up to date +// for any updates that have happened for those targets. +type Coordinator struct { ctx context.Context - syncHandlers map[string]*syncHandler + multiplexers map[string]*multiplexer logger *logger.Logger mu *sync.RWMutex - syncBuilder SyncBuilderInterface -} - -type syncHandler struct { - subs map[interface{}]storedChannels - dataSync chan isync.DataSync - cancelFunc context.CancelFunc - syncRef isync.ISync - mu *sync.RWMutex + syncBuilder syncbuilder.ISyncBuilder } type storedChannels struct { @@ -46,27 +47,27 @@ type storedChannels struct { dataSync chan isync.DataSync } -// NewSyncStore returns a new sync store -func NewSyncStore(ctx context.Context, logger *logger.Logger) *SyncStore { - ss := SyncStore{ +// NewManager returns a new subscription manager +func NewManager(ctx context.Context, logger *logger.Logger) *Coordinator { + mgr := Coordinator{ ctx: ctx, - syncHandlers: map[string]*syncHandler{}, + multiplexers: map[string]*multiplexer{}, logger: logger, mu: &sync.RWMutex{}, - syncBuilder: &SyncBuilder{}, + syncBuilder: &syncbuilder.SyncBuilder{}, } - go ss.cleanup() - return &ss + go mgr.cleanup() + return &mgr } -// FetchAllFlags returns a DataSync containing the full set of flag configurations from the SyncStore. +// FetchAllFlags returns a DataSync containing the full set of flag configurations from the Coordinator. // This will either occur via triggering a resync, or through setting up a new subscription to the resource -func (s *SyncStore) FetchAllFlags(ctx context.Context, key interface{}, target string) (isync.DataSync, error) { +func (s *Coordinator) FetchAllFlags(ctx context.Context, key interface{}, target string) (isync.DataSync, error) { s.logger.Debug(fmt.Sprintf("fetching all flags for target %s", target)) dataSyncChan := make(chan isync.DataSync, 1) errChan := make(chan error, 1) s.mu.RLock() - syncHandler, ok := s.syncHandlers[target] + syncHandler, ok := s.multiplexers[target] s.mu.RUnlock() if !ok { s.logger.Debug(fmt.Sprintf("sync handler does not exist for target %s, registering a new subscription", target)) @@ -95,7 +96,7 @@ func (s *SyncStore) FetchAllFlags(ctx context.Context, key interface{}, target s // RegisterSubscription starts a new subscription to the target resource. // Once the subscription is set an ALL sync event will be received via the DataSync chan. -func (s *SyncStore) RegisterSubscription( +func (s *Coordinator) RegisterSubscription( ctx context.Context, target string, key interface{}, @@ -105,16 +106,16 @@ func (s *SyncStore) RegisterSubscription( s.mu.Lock() defer s.mu.Unlock() // is there a currently active subscription for this target? - sh, ok := s.syncHandlers[target] + sh, ok := s.multiplexers[target] if !ok { // we need to start a sync for this s.logger.Debug( fmt.Sprintf( - "sync handler does not exist for target %s, registering syncHandler with sub %p", + "sync handler does not exist for target %s, registering multiplexer with sub %p", target, key, )) - s.syncHandlers[target] = &syncHandler{ + s.multiplexers[target] = &multiplexer{ dataSync: make(chan isync.DataSync), subs: map[interface{}]storedChannels{ key: { @@ -137,7 +138,7 @@ func (s *SyncStore) RegisterSubscription( go func() { s.mu.RLock() defer s.mu.RUnlock() - if _, ok := s.syncHandlers[target]; ok { + if _, ok := s.multiplexers[target]; ok { s.logger.Debug(fmt.Sprintf("sync handler exists for target %s, triggering a resync", target)) if err := sh.syncRef.ReSync(ctx, dataSync); err != nil { errChan <- err @@ -151,28 +152,28 @@ func (s *SyncStore) RegisterSubscription( <-ctx.Done() s.mu.Lock() defer s.mu.Unlock() - if s.syncHandlers[target] != nil && s.syncHandlers[target].subs != nil { + if s.multiplexers[target] != nil && s.multiplexers[target].subs != nil { s.logger.Debug(fmt.Sprintf("removing sync subscription due to context cancellation %p", key)) - delete(s.syncHandlers[target].subs, key) + delete(s.multiplexers[target].subs, key) } }() } -func (s *SyncStore) watchResource(target string) { +func (s *Coordinator) watchResource(target string) { s.logger.Debug(fmt.Sprintf("watching resource %s", target)) ctx, cancel := context.WithCancel(s.ctx) defer cancel() - sh, ok := s.syncHandlers[target] + sh, ok := s.multiplexers[target] if !ok { s.logger.Error(fmt.Sprintf("no sync handler exists for target %s", target)) return } - // this cancel is accessed by the cleanup method shutdown the listener + delete the syncHandler + // this cancel is accessed by the cleanup method shutdown the listener + delete the multiplexer sh.cancelFunc = cancel go func() { <-ctx.Done() s.mu.Lock() - delete(s.syncHandlers, target) + delete(s.multiplexers, target) s.mu.Unlock() }() // broadcast any data passed through the core channel to all subscribing channels @@ -182,73 +183,47 @@ func (s *SyncStore) watchResource(target string) { case <-ctx.Done(): return case d := <-sh.dataSync: - sh.writeData(s.logger, d) + sh.broadcastData(s.logger, d) } } }() // setup sync, if this fails an error is broadcasted, and the defer results in cleanup - sync, err := s.syncBuilder.SyncFromURI(target, s.logger) + syncSource, err := s.syncBuilder.SyncFromURI(target, s.logger) if err != nil { s.logger.Error(fmt.Sprintf("unable to build sync from URI for target %s: %s", target, err.Error())) - sh.writeError(s.logger, err) + sh.broadcastError(s.logger, err) return } // init sync, if this fails an error is broadcasted, and the defer results in cleanup - err = sync.Init(ctx) + err = syncSource.Init(ctx) if err != nil { s.logger.Error(fmt.Sprintf("unable to initiate sync for target %s: %s", target, err.Error())) - sh.writeError(s.logger, err) + sh.broadcastError(s.logger, err) return } - // sync ref is used to trigger a resync on a single channel when a new subscription is started + // syncSource ref is used to trigger a resync on a single channel when a new subscription is started // but the associated SyncHandler already exists, i.e. this function is not run - sh.syncRef = sync - err = sync.Sync(ctx, sh.dataSync) + sh.syncRef = syncSource + err = syncSource.Sync(ctx, sh.dataSync) if err != nil { s.logger.Error(fmt.Sprintf("error from sync for target %s: %s", target, err.Error())) - sh.writeError(s.logger, err) + sh.broadcastError(s.logger, err) } } -func (h *syncHandler) writeError(logger *logger.Logger, err error) { - h.mu.RLock() - defer h.mu.RUnlock() - for k, ec := range h.subs { - select { - case ec.errChan <- err: - continue - default: - logger.Error(fmt.Sprintf("unable to write error to channel for key %p", k)) - } - } -} - -func (h *syncHandler) writeData(logger *logger.Logger, data isync.DataSync) { - h.mu.RLock() - defer h.mu.RUnlock() - for k, ds := range h.subs { - select { - case ds.dataSync <- data: - continue - default: - logger.Error(fmt.Sprintf("unable to write data to channel for key %p", k)) - } - } -} - -func (s *SyncStore) cleanup() { +func (s *Coordinator) cleanup() { for { select { case <-s.ctx.Done(): return case <-time.After(5 * time.Second): s.mu.Lock() - for k, v := range s.syncHandlers { - // delete any syncHandlers with 0 active subscriptions through cancelling its context - s.logger.Debug(fmt.Sprintf("syncHandler for target %s has %d subscriptions", k, len(v.subs))) + for k, v := range s.multiplexers { + // delete any multiplexers with 0 active subscriptions through cancelling its context + s.logger.Debug(fmt.Sprintf("multiplexer for target %s has %d subscriptions", k, len(v.subs))) if len(v.subs) == 0 { - s.logger.Debug(fmt.Sprintf("shutting down syncHandler %s", k)) - s.syncHandlers[k].cancelFunc() + s.logger.Debug(fmt.Sprintf("shutting down multiplexer %s", k)) + s.multiplexers[k].cancelFunc() } } s.mu.Unlock() @@ -256,45 +231,14 @@ func (s *SyncStore) cleanup() { } } -func (s *SyncStore) GetActiveSubscriptionsInt64() int64 { +func (s *Coordinator) GetActiveSubscriptionsInt64() int64 { s.mu.RLock() defer s.mu.RUnlock() syncs := 0 - for _, v := range s.syncHandlers { + for _, v := range s.multiplexers { syncs += len(v.subs) } return int64(syncs) } - -type SyncBuilderInterface interface { - SyncFromURI(uri string, logger *logger.Logger) (isync.ISync, error) -} - -type SyncBuilder struct{} - -// SyncFromURI builds an ISync interface from the input uri string -func (sb *SyncBuilder) SyncFromURI(uri string, logger *logger.Logger) (isync.ISync, error) { - switch uriB := []byte(uri); { - // filepath may be used for debugging, not recommended in deployment - case regFile.Match(uriB): - return runtime.NewFile(runtime.SourceConfig{ - URI: regFile.ReplaceAllString(uri, ""), - }, logger.WithFields( - zap.String("component", "sync"), - zap.String("sync", "filepath"), - zap.String("target", "target"), - )), nil - case regCrd.Match(uriB): - s, err := runtime.NewK8s(uri, logger.WithFields( - zap.String("component", "sync"), - zap.String("sync", "kubernetes"), - )) - if err != nil { - return nil, fmt.Errorf("error creating k8s sync: %w", err) - } - return s, nil - } - return nil, fmt.Errorf("unrecognized URI: %s", uri) -} diff --git a/core/pkg/sync-store/sync_store_test.go b/core/pkg/subscriptions/manager_test.go similarity index 85% rename from core/pkg/sync-store/sync_store_test.go rename to core/pkg/subscriptions/manager_test.go index f725de2b6..514a553b5 100644 --- a/core/pkg/sync-store/sync_store_test.go +++ b/core/pkg/subscriptions/manager_test.go @@ -1,4 +1,4 @@ -package store +package subscriptions import ( "context" @@ -59,17 +59,21 @@ type syncBuilderMock struct { initError error } +func (s *syncBuilderMock) SyncsFromConfig(_ []isync.SourceConfig, _ *logger.Logger) ([]isync.ISync, error) { + return nil, nil +} + func (s *syncBuilderMock) SyncFromURI(_ string, _ *logger.Logger) (isync.ISync, error) { return s.mock, s.initError } -func newSyncHandler() (*syncHandler, string) { +func newSyncHandler() (*multiplexer, string) { coreDataSyncChan := make(chan isync.DataSync, 1) dataSyncChan := make(chan isync.DataSync, 1) errChan := make(chan error, 1) key := "key" - return &syncHandler{ + return &multiplexer{ dataSync: coreDataSyncChan, subs: map[interface{}]storedChannels{ key: { @@ -83,7 +87,7 @@ func newSyncHandler() (*syncHandler, string) { func Test_watchResource(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false)) + syncStore := NewManager(ctx, logger.NewLogger(nil, false)) syncMock := newMockSync() syncStore.syncBuilder = &syncBuilderMock{ mock: syncMock, @@ -92,7 +96,7 @@ func Test_watchResource(t *testing.T) { target := "test-target" syncHandler, key := newSyncHandler() - syncStore.syncHandlers[target] = syncHandler + syncStore.multiplexers[target] = syncHandler go syncStore.watchResource(target) @@ -128,17 +132,17 @@ func Test_watchResource(t *testing.T) { // no context cancellation should have occurred, and there should still be registered sync sub syncStore.mu.Lock() if len(syncHandler.subs) != 1 { - t.Error("incorrect number of subs in syncHandler", syncHandler.subs) + t.Error("incorrect number of subs in multiplexer", syncHandler.subs) } syncStore.mu.Unlock() - // cancellation of context will result in the syncHandler being deleted + // cancellation of context will result in the multiplexer being deleted cancel() // allow for the goroutine to catch the lock first time.Sleep(1 * time.Second) syncStore.mu.Lock() - if syncStore.syncHandlers[target] != nil { - t.Error("syncHandler has not been closed down after cancellation", syncHandler.subs) + if syncStore.multiplexers[target] != nil { + t.Error("multiplexer has not been closed down after cancellation", syncHandler.subs) } syncStore.mu.Unlock() } @@ -146,7 +150,7 @@ func Test_watchResource(t *testing.T) { func Test_watchResource_initFail(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false)) + syncStore := NewManager(ctx, logger.NewLogger(nil, false)) syncMock := newMockSync() // return an error on startup @@ -158,7 +162,7 @@ func Test_watchResource_initFail(t *testing.T) { target := "test-target" syncHandler, key := newSyncHandler() - syncStore.syncHandlers[target] = syncHandler + syncStore.multiplexers[target] = syncHandler go syncStore.watchResource(target) @@ -175,8 +179,8 @@ func Test_watchResource_initFail(t *testing.T) { // this should then close the internal context and the watcher should be removed time.Sleep(1 * time.Second) syncStore.mu.Lock() - if syncStore.syncHandlers[target] != nil { - t.Error("syncHandler has not been closed down after cancellation", syncHandler.subs) + if syncStore.multiplexers[target] != nil { + t.Error("multiplexer has not been closed down after cancellation", syncHandler.subs) } syncStore.mu.Unlock() } @@ -184,7 +188,7 @@ func Test_watchResource_initFail(t *testing.T) { func Test_watchResource_SyncFromURIFail(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false)) + syncStore := NewManager(ctx, logger.NewLogger(nil, false)) syncMock := newMockSync() // return an error on startup @@ -197,7 +201,7 @@ func Test_watchResource_SyncFromURIFail(t *testing.T) { target := "test-target" syncHandler, key := newSyncHandler() - syncStore.syncHandlers[target] = syncHandler + syncStore.multiplexers[target] = syncHandler go syncStore.watchResource(target) @@ -214,8 +218,8 @@ func Test_watchResource_SyncFromURIFail(t *testing.T) { // this should then close the internal context and the watcher should be removed time.Sleep(1 * time.Second) syncStore.mu.Lock() - if syncStore.syncHandlers[target] != nil { - t.Error("syncHandler has not been closed down after cancellation", syncHandler.subs) + if syncStore.multiplexers[target] != nil { + t.Error("multiplexer has not been closed down after cancellation", syncHandler.subs) } syncStore.mu.Unlock() } @@ -223,7 +227,7 @@ func Test_watchResource_SyncFromURIFail(t *testing.T) { func Test_watchResource_SyncErrorOnClose(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false)) + syncStore := NewManager(ctx, logger.NewLogger(nil, false)) syncMock := newMockSync() // return an error on startup @@ -235,7 +239,7 @@ func Test_watchResource_SyncErrorOnClose(t *testing.T) { target := "test-target" syncHandler, key := newSyncHandler() - syncStore.syncHandlers[target] = syncHandler + syncStore.multiplexers[target] = syncHandler go syncStore.watchResource(target) cancel() @@ -252,8 +256,8 @@ func Test_watchResource_SyncErrorOnClose(t *testing.T) { // this should then close the internal context and the watcher should be removed time.Sleep(1 * time.Second) syncStore.mu.Lock() - if syncStore.syncHandlers[target] != nil { - t.Error("syncHandler has not been closed down after cancellation", syncHandler.subs) + if syncStore.multiplexers[target] != nil { + t.Error("multiplexer has not been closed down after cancellation", syncHandler.subs) } syncStore.mu.Unlock() } @@ -261,7 +265,7 @@ func Test_watchResource_SyncErrorOnClose(t *testing.T) { func Test_watchResource_SyncHandlerDoesNotExist(_ *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false)) + syncStore := NewManager(ctx, logger.NewLogger(nil, false)) syncMock := newMockSync() // return an error on startup @@ -279,7 +283,7 @@ func Test_watchResource_SyncHandlerDoesNotExist(_ *testing.T) { func Test_watchResource_Cleanup(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false)) + syncStore := NewManager(ctx, logger.NewLogger(nil, false)) syncMock := newMockSync() // return an error on startup @@ -297,7 +301,7 @@ func Test_watchResource_Cleanup(t *testing.T) { doneChan <- struct{}{} } syncStore.mu.Lock() - syncStore.syncHandlers[target] = syncHandler + syncStore.multiplexers[target] = syncHandler syncStore.mu.Unlock() go func() { syncStore.cleanup() @@ -307,7 +311,7 @@ func Test_watchResource_Cleanup(t *testing.T) { case <-doneChan: return case <-time.After(10 * time.Second): - t.Error("syncHandlers not being cleaned up, timed out after 10 seconds") + t.Error("multiplexers not being cleaned up, timed out after 10 seconds") } } @@ -351,7 +355,7 @@ func Test_FetchAllFlags(t *testing.T) { t.Run(name, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false)) + syncStore := NewManager(ctx, logger.NewLogger(nil, false)) syncMock := newMockSync() syncMock.resyncData = tt.mockData syncMock.resyncError = tt.mockError @@ -365,7 +369,7 @@ func Test_FetchAllFlags(t *testing.T) { syncHandler.syncRef = syncMock } if tt.setHandler { - syncStore.syncHandlers[target] = syncHandler + syncStore.multiplexers[target] = syncHandler } data, err := syncStore.FetchAllFlags(ctx, key, target) @@ -406,7 +410,7 @@ func Test_registerSubscriptionResyncPath(t *testing.T) { t.Run(name, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false)) + syncStore := NewManager(ctx, logger.NewLogger(nil, false)) syncMock := newMockSync() syncMock.resyncData = tt.data @@ -420,7 +424,7 @@ func Test_registerSubscriptionResyncPath(t *testing.T) { syncHandler, _ := newSyncHandler() syncHandler.syncRef = syncMock key := struct{}{} - syncStore.syncHandlers[target] = syncHandler + syncStore.multiplexers[target] = syncHandler dataChan := make(chan isync.DataSync, 1) errChan := make(chan error, 1) @@ -445,7 +449,7 @@ func Test_registerSubscriptionResyncPath(t *testing.T) { func Test_syncMetrics(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false)) + syncStore := NewManager(ctx, logger.NewLogger(nil, false)) syncMock := newMockSync() syncStore.syncBuilder = &syncBuilderMock{ mock: syncMock, @@ -459,7 +463,7 @@ func Test_syncMetrics(t *testing.T) { target := "test-target" syncHandler, _ := newSyncHandler() - syncStore.syncHandlers[target] = syncHandler + syncStore.multiplexers[target] = syncHandler subs = syncStore.GetActiveSubscriptionsInt64() if subs != 1 { diff --git a/core/pkg/subscriptions/multiplexer.go b/core/pkg/subscriptions/multiplexer.go new file mode 100644 index 000000000..5d53a280a --- /dev/null +++ b/core/pkg/subscriptions/multiplexer.go @@ -0,0 +1,45 @@ +package subscriptions + +import ( + "context" + "fmt" + "sync" + + "github.com/open-feature/flagd/core/pkg/logger" + sourceSync "github.com/open-feature/flagd/core/pkg/sync" +) + +// multiplexer distributes updates for a target to all of its subscribers +type multiplexer struct { + subs map[interface{}]storedChannels + dataSync chan sourceSync.DataSync + cancelFunc context.CancelFunc + syncRef sourceSync.ISync + mu *sync.RWMutex +} + +func (h *multiplexer) broadcastError(logger *logger.Logger, err error) { + h.mu.RLock() + defer h.mu.RUnlock() + for k, ec := range h.subs { + select { + case ec.errChan <- err: + continue + default: + logger.Error(fmt.Sprintf("unable to write error to channel for key %p", k)) + } + } +} + +func (h *multiplexer) broadcastData(logger *logger.Logger, data sourceSync.DataSync) { + h.mu.RLock() + defer h.mu.RUnlock() + for k, ds := range h.subs { + select { + case ds.dataSync <- data: + continue + default: + logger.Error(fmt.Sprintf("unable to write data to channel for key %p", k)) + } + } +} diff --git a/core/pkg/sync-store/interface.go b/core/pkg/sync-store/interface.go deleted file mode 100644 index 3e5a71977..000000000 --- a/core/pkg/sync-store/interface.go +++ /dev/null @@ -1,26 +0,0 @@ -package store - -import ( - "context" - - isync "github.com/open-feature/flagd/core/pkg/sync" -) - -// ISyncStore defines the interface for the sync store -type ISyncStore interface { - FetchAllFlags( - ctx context.Context, - key interface{}, - target string, - ) (isync.DataSync, error) - RegisterSubscription( - ctx context.Context, - target string, - key interface{}, - dataSync chan isync.DataSync, - errChan chan error, - ) - - // metrics hooks - GetActiveSubscriptionsInt64() int64 -} diff --git a/core/pkg/sync/builder/mock/syncbuilder.go b/core/pkg/sync/builder/mock/syncbuilder.go new file mode 100644 index 000000000..623e1c3a5 --- /dev/null +++ b/core/pkg/sync/builder/mock/syncbuilder.go @@ -0,0 +1,107 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: pkg/sync/builder/syncbuilder.go + +// Package middlewaremocksyncbuildermock is a generated GoMock package. +package middlewaremocksyncbuildermock + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + logger "github.com/open-feature/flagd/core/pkg/logger" + sync "github.com/open-feature/flagd/core/pkg/sync" + dynamic "k8s.io/client-go/dynamic" + client "sigs.k8s.io/controller-runtime/pkg/client" +) + +// MockISyncBuilder is a mock of ISyncBuilder interface. +type MockISyncBuilder struct { + ctrl *gomock.Controller + recorder *MockISyncBuilderMockRecorder +} + +// MockISyncBuilderMockRecorder is the mock recorder for MockISyncBuilder. +type MockISyncBuilderMockRecorder struct { + mock *MockISyncBuilder +} + +// NewMockISyncBuilder creates a new mock instance. +func NewMockISyncBuilder(ctrl *gomock.Controller) *MockISyncBuilder { + mock := &MockISyncBuilder{ctrl: ctrl} + mock.recorder = &MockISyncBuilderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockISyncBuilder) EXPECT() *MockISyncBuilderMockRecorder { + return m.recorder +} + +// SyncFromURI mocks base method. +func (m *MockISyncBuilder) SyncFromURI(uri string, logger *logger.Logger) (sync.ISync, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SyncFromURI", uri, logger) + ret0, _ := ret[0].(sync.ISync) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SyncFromURI indicates an expected call of SyncFromURI. +func (mr *MockISyncBuilderMockRecorder) SyncFromURI(uri, logger interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncFromURI", reflect.TypeOf((*MockISyncBuilder)(nil).SyncFromURI), uri, logger) +} + +// SyncsFromConfig mocks base method. +func (m *MockISyncBuilder) SyncsFromConfig(sourceConfig []sync.SourceConfig, logger *logger.Logger) ([]sync.ISync, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SyncsFromConfig", sourceConfig, logger) + ret0, _ := ret[0].([]sync.ISync) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SyncsFromConfig indicates an expected call of SyncsFromConfig. +func (mr *MockISyncBuilderMockRecorder) SyncsFromConfig(sourceConfig, logger interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncsFromConfig", reflect.TypeOf((*MockISyncBuilder)(nil).SyncsFromConfig), sourceConfig, logger) +} + +// MockIK8sClientBuilder is a mock of IK8sClientBuilder interface. +type MockIK8sClientBuilder struct { + ctrl *gomock.Controller + recorder *MockIK8sClientBuilderMockRecorder +} + +// MockIK8sClientBuilderMockRecorder is the mock recorder for MockIK8sClientBuilder. +type MockIK8sClientBuilderMockRecorder struct { + mock *MockIK8sClientBuilder +} + +// NewMockIK8sClientBuilder creates a new mock instance. +func NewMockIK8sClientBuilder(ctrl *gomock.Controller) *MockIK8sClientBuilder { + mock := &MockIK8sClientBuilder{ctrl: ctrl} + mock.recorder = &MockIK8sClientBuilderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockIK8sClientBuilder) EXPECT() *MockIK8sClientBuilderMockRecorder { + return m.recorder +} + +// GetK8sClients mocks base method. +func (m *MockIK8sClientBuilder) GetK8sClients() (client.Reader, dynamic.Interface, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetK8sClients") + ret0, _ := ret[0].(client.Reader) + ret1, _ := ret[1].(dynamic.Interface) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// GetK8sClients indicates an expected call of GetK8sClients. +func (mr *MockIK8sClientBuilderMockRecorder) GetK8sClients() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetK8sClients", reflect.TypeOf((*MockIK8sClientBuilder)(nil).GetK8sClients)) +} diff --git a/core/pkg/sync/builder/syncbuilder.go b/core/pkg/sync/builder/syncbuilder.go new file mode 100644 index 000000000..c40acbb5d --- /dev/null +++ b/core/pkg/sync/builder/syncbuilder.go @@ -0,0 +1,218 @@ +package builder + +import ( + "fmt" + "net/http" + "os" + "regexp" + msync "sync" + "time" + + "github.com/open-feature/flagd/core/pkg/logger" + "github.com/open-feature/flagd/core/pkg/sync" + "github.com/open-feature/flagd/core/pkg/sync/file" + "github.com/open-feature/flagd/core/pkg/sync/grpc" + "github.com/open-feature/flagd/core/pkg/sync/grpc/credentials" + httpSync "github.com/open-feature/flagd/core/pkg/sync/http" + "github.com/open-feature/flagd/core/pkg/sync/kubernetes" + "github.com/robfig/cron" + "go.uber.org/zap" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + syncProviderFile = "file" + syncProviderGrpc = "grpc" + syncProviderKubernetes = "kubernetes" + syncProviderHTTP = "http" +) + +var ( + regCrd *regexp.Regexp + regURL *regexp.Regexp + regGRPC *regexp.Regexp + regGRPCSecure *regexp.Regexp + regFile *regexp.Regexp +) + +func init() { + regCrd = regexp.MustCompile("^core.openfeature.dev/") + regURL = regexp.MustCompile("^https?://") + regGRPC = regexp.MustCompile("^" + grpc.Prefix) + regGRPCSecure = regexp.MustCompile("^" + grpc.PrefixSecure) + regFile = regexp.MustCompile("^file:") +} + +type ISyncBuilder interface { + SyncFromURI(uri string, logger *logger.Logger) (sync.ISync, error) + SyncsFromConfig(sourceConfig []sync.SourceConfig, logger *logger.Logger) ([]sync.ISync, error) +} + +type SyncBuilder struct { + k8sClientBuilder IK8sClientBuilder +} + +func NewSyncBuilder() *SyncBuilder { + return &SyncBuilder{ + k8sClientBuilder: &KubernetesClientBuilder{}, + } +} + +func (sb *SyncBuilder) SyncFromURI(uri string, logger *logger.Logger) (sync.ISync, error) { + switch uriB := []byte(uri); { + // filepath may be used for debugging, not recommended in deployment + case regFile.Match(uriB): + return sb.newFile(uri, logger), nil + case regCrd.Match(uriB): + return sb.newK8s(uri, logger) + } + return nil, fmt.Errorf("unrecognized URI: %s", uri) +} + +func (sb *SyncBuilder) SyncsFromConfig(sourceConfigs []sync.SourceConfig, logger *logger.Logger) ([]sync.ISync, error) { + syncImpls := make([]sync.ISync, len(sourceConfigs)) + for i, syncProvider := range sourceConfigs { + syncImpl, err := sb.syncFromConfig(syncProvider, logger) + if err != nil { + return nil, fmt.Errorf("could not create sync provider: %w", err) + } + syncImpls[i] = syncImpl + } + return syncImpls, nil +} + +func (sb *SyncBuilder) syncFromConfig(sourceConfig sync.SourceConfig, logger *logger.Logger) (sync.ISync, error) { + switch sourceConfig.Provider { + case syncProviderFile: + logger.Debug(fmt.Sprintf("using filepath sync-provider for: %q", sourceConfig.URI)) + return sb.newFile(sourceConfig.URI, logger), nil + case syncProviderKubernetes: + logger.Debug(fmt.Sprintf("using kubernetes sync-provider for: %s", sourceConfig.URI)) + return sb.newK8s(sourceConfig.URI, logger) + case syncProviderHTTP: + logger.Debug(fmt.Sprintf("using remote sync-provider for: %s", sourceConfig.URI)) + return sb.newHTTP(sourceConfig, logger), nil + case syncProviderGrpc: + logger.Debug(fmt.Sprintf("using grpc sync-provider for: %s", sourceConfig.URI)) + return sb.newGRPC(sourceConfig, logger), nil + + default: + return nil, fmt.Errorf("invalid sync provider: %s, must be one of with '%s', '%s', '%s' or '%s'", + sourceConfig.Provider, syncProviderFile, syncProviderKubernetes, syncProviderHTTP, syncProviderKubernetes) + } +} + +func (sb *SyncBuilder) newFile(uri string, logger *logger.Logger) *file.Sync { + return &file.Sync{ + URI: regFile.ReplaceAllString(uri, ""), + Logger: logger.WithFields( + zap.String("component", "sync"), + zap.String("sync", "filepath"), + ), + Mux: &msync.RWMutex{}, + } +} + +func (sb *SyncBuilder) newK8s(uri string, logger *logger.Logger) (*kubernetes.Sync, error) { + reader, dynamicClient, err := sb.k8sClientBuilder.GetK8sClients() + if err != nil { + return nil, fmt.Errorf("error creating kubernetes clients: %w", err) + } + + return kubernetes.NewK8sSync( + logger.WithFields( + zap.String("component", "sync"), + zap.String("sync", "kubernetes"), + ), + regCrd.ReplaceAllString(uri, ""), + reader, + dynamicClient, + ), nil +} + +func (sb *SyncBuilder) newHTTP(config sync.SourceConfig, logger *logger.Logger) *httpSync.Sync { + // Default to 5 seconds + var interval uint32 = 5 + if config.Interval != 0 { + interval = config.Interval + } + + return &httpSync.Sync{ + URI: config.URI, + Client: &http.Client{ + Timeout: time.Second * 10, + }, + Logger: logger.WithFields( + zap.String("component", "sync"), + zap.String("sync", "remote"), + ), + BearerToken: config.BearerToken, + Interval: interval, + Cron: cron.New(), + } +} + +func (sb *SyncBuilder) newGRPC(config sync.SourceConfig, logger *logger.Logger) *grpc.Sync { + return &grpc.Sync{ + URI: config.URI, + Logger: logger.WithFields( + zap.String("component", "sync"), + zap.String("sync", "grpc"), + ), + CredentialBuilder: &credentials.CredentialBuilder{}, + CertPath: config.CertPath, + ProviderID: config.ProviderID, + Secure: config.TLS, + Selector: config.Selector, + } +} + +type IK8sClientBuilder interface { + GetK8sClients() (client.Reader, dynamic.Interface, error) +} + +type KubernetesClientBuilder struct{} + +func (kcb KubernetesClientBuilder) GetK8sClients() (client.Reader, dynamic.Interface, error) { + clusterConfig, err := k8sClusterConfig() + if err != nil { + return nil, nil, err + } + + readClient, err := client.New(clusterConfig, client.Options{Scheme: scheme.Scheme}) + if err != nil { + return nil, nil, fmt.Errorf("unable to create readClient: %w", err) + } + + dynamicClient, err := dynamic.NewForConfig(clusterConfig) + if err != nil { + return nil, nil, fmt.Errorf("unable to create dynamicClient: %w", err) + } + return readClient, dynamicClient, nil +} + +// k8sClusterConfig build K8s connection config based available configurations +func k8sClusterConfig() (*rest.Config, error) { + cfg := os.Getenv("KUBECONFIG") + + var clusterConfig *rest.Config + var err error + + if cfg != "" { + clusterConfig, err = clientcmd.BuildConfigFromFlags("", cfg) + if err != nil { + return nil, fmt.Errorf("error building cluster config from flags: %w", err) + } + } else { + clusterConfig, err = rest.InClusterConfig() + if err != nil { + return nil, fmt.Errorf("error fetching cluster config: %w", err) + } + } + + return clusterConfig, nil +} diff --git a/core/pkg/sync/builder/syncbuilder_test.go b/core/pkg/sync/builder/syncbuilder_test.go new file mode 100644 index 000000000..4d30e6e7c --- /dev/null +++ b/core/pkg/sync/builder/syncbuilder_test.go @@ -0,0 +1,240 @@ +package builder + +import ( + "errors" + "testing" + + "github.com/golang/mock/gomock" + "github.com/open-feature/flagd/core/pkg/logger" + "github.com/open-feature/flagd/core/pkg/sync" + buildermock "github.com/open-feature/flagd/core/pkg/sync/builder/mock" + "github.com/open-feature/flagd/core/pkg/sync/file" + "github.com/open-feature/flagd/core/pkg/sync/grpc" + "github.com/open-feature/flagd/core/pkg/sync/http" + "github.com/open-feature/flagd/core/pkg/sync/kubernetes" + "github.com/stretchr/testify/require" +) + +func TestSyncBuilder_SyncFromURI(t *testing.T) { + type args struct { + uri string + logger *logger.Logger + } + tests := []struct { + name string + args args + injectFunc func(builder *SyncBuilder) + want sync.ISync + wantErr bool + }{ + { + name: "kubernetes sync", + args: args{ + uri: "core.openfeature.dev/ff-config", + logger: logger.NewLogger(nil, false), + }, + injectFunc: func(builder *SyncBuilder) { + ctrl := gomock.NewController(t) + + mockClientBuilder := buildermock.NewMockIK8sClientBuilder(ctrl) + mockClientBuilder.EXPECT().GetK8sClients().Times(1).Return(nil, nil, nil) + + builder.k8sClientBuilder = mockClientBuilder + }, + want: &kubernetes.Sync{}, + wantErr: false, + }, + { + name: "kubernetes sync - error when retrieving config", + args: args{ + uri: "core.openfeature.dev/ff-config", + logger: logger.NewLogger(nil, false), + }, + injectFunc: func(builder *SyncBuilder) { + ctrl := gomock.NewController(t) + + mockClientBuilder := buildermock.NewMockIK8sClientBuilder(ctrl) + mockClientBuilder.EXPECT().GetK8sClients().Times(1).Return(nil, nil, errors.New("oops")) + + builder.k8sClientBuilder = mockClientBuilder + }, + want: nil, + wantErr: true, + }, + { + name: "file sync", + args: args{ + uri: "file:my-file", + logger: logger.NewLogger(nil, false), + }, + want: &file.Sync{}, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sb := NewSyncBuilder() + + if tt.injectFunc != nil { + tt.injectFunc(sb) + } + + got, err := sb.SyncFromURI(tt.args.uri, tt.args.logger) + if tt.wantErr { + require.NotNil(t, err) + require.Nil(t, got) + } else { + require.Nil(t, err) + require.IsType(t, tt.want, got) + } + }) + } +} + +func Test_k8sClusterConfig(t *testing.T) { + t.Run("Cannot find KUBECONFIG file", func(tt *testing.T) { + tt.Setenv("KUBECONFIG", "") + _, err := k8sClusterConfig() + if err == nil { + tt.Error("Expected error but got none") + } + }) + t.Run("KUBECONFIG file not existing", func(tt *testing.T) { + tt.Setenv("KUBECONFIG", "value") + _, err := k8sClusterConfig() + if err == nil { + tt.Error("Expected error but got none") + } + }) + t.Run("Default REST Config and missing svc account", func(tt *testing.T) { + tt.Setenv("KUBERNETES_SERVICE_HOST", "127.0.0.1") + tt.Setenv("KUBERNETES_SERVICE_PORT", "8080") + _, err := k8sClusterConfig() + if err == nil { + tt.Error("Expected error but got none") + } + }) +} + +func Test_SyncsFromFromConfig(t *testing.T) { + lg := logger.NewLogger(nil, false) + + type args struct { + logger *logger.Logger + sources []sync.SourceConfig + } + + tests := []struct { + name string + args args + injectFunc func(builder *SyncBuilder) + wantSyncs []sync.ISync + wantErr bool + }{ + { + name: "Empty", + args: args{ + logger: lg, + sources: []sync.SourceConfig{}, + }, + wantSyncs: nil, + wantErr: false, + }, + { + name: "Error", + args: args{ + logger: lg, + sources: []sync.SourceConfig{ + { + URI: "fake", + Provider: "disk", + }, + }, + }, + wantSyncs: nil, + wantErr: true, + }, + { + name: "single", + args: args{ + logger: lg, + sources: []sync.SourceConfig{ + { + URI: "grpc://host:port", + Provider: syncProviderGrpc, + ProviderID: "myapp", + CertPath: "/tmp/ca.cert", + Selector: "source=database", + }, + }, + }, + wantSyncs: []sync.ISync{ + &grpc.Sync{}, + }, + wantErr: false, + }, + { + name: "combined", + injectFunc: func(builder *SyncBuilder) { + ctrl := gomock.NewController(t) + + mockClientBuilder := buildermock.NewMockIK8sClientBuilder(ctrl) + mockClientBuilder.EXPECT().GetK8sClients().Times(1).Return(nil, nil, nil) + + builder.k8sClientBuilder = mockClientBuilder + }, + args: args{ + logger: lg, + sources: []sync.SourceConfig{ + { + URI: "grpc://host:port", + Provider: syncProviderGrpc, + ProviderID: "myapp", + CertPath: "/tmp/ca.cert", + Selector: "source=database", + }, + { + URI: "https://host:port", + Provider: syncProviderHTTP, + BearerToken: "token", + }, + { + URI: "/tmp/flags.json", + Provider: syncProviderFile, + }, + { + URI: "my-namespace/my-flags", + Provider: syncProviderKubernetes, + }, + }, + }, + wantSyncs: []sync.ISync{ + &grpc.Sync{}, + &http.Sync{}, + &file.Sync{}, + &kubernetes.Sync{}, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sb := NewSyncBuilder() + + if tt.injectFunc != nil { + tt.injectFunc(sb) + } + syncs, err := sb.SyncsFromConfig(tt.args.sources, tt.args.logger) + if (err != nil) != tt.wantErr { + t.Errorf("syncProvidersFromConfig() error = %v, wantErr %v", err, tt.wantErr) + return + } + require.Len(t, syncs, len(tt.wantSyncs)) + + // check if we got the expected sync types + for index, wantType := range tt.wantSyncs { + require.IsType(t, wantType, syncs[index]) + } + }) + } +} diff --git a/core/pkg/sync/builder/utils.go b/core/pkg/sync/builder/utils.go new file mode 100644 index 000000000..a24c98873 --- /dev/null +++ b/core/pkg/sync/builder/utils.go @@ -0,0 +1,68 @@ +package builder + +import ( + "encoding/json" + "errors" + "fmt" + + "github.com/open-feature/flagd/core/pkg/sync" +) + +// ParseSources parse a json formatted SourceConfig array string and performs validations on the content +func ParseSources(sourcesFlag string) ([]sync.SourceConfig, error) { + syncProvidersParsed := []sync.SourceConfig{} + + if err := json.Unmarshal([]byte(sourcesFlag), &syncProvidersParsed); err != nil { + return syncProvidersParsed, fmt.Errorf("error parsing sync providers: %w", err) + } + for _, sp := range syncProvidersParsed { + if sp.URI == "" { + return syncProvidersParsed, errors.New("sync provider argument parse: uri is a required field") + } + if sp.Provider == "" { + return syncProvidersParsed, errors.New("sync provider argument parse: provider is a required field") + } + } + return syncProvidersParsed, nil +} + +// ParseSyncProviderURIs uri flag based sync sources to SourceConfig array. Replaces uri prefixes where necessary to +// derive SourceConfig +func ParseSyncProviderURIs(uris []string) ([]sync.SourceConfig, error) { + syncProvidersParsed := []sync.SourceConfig{} + + for _, uri := range uris { + switch uriB := []byte(uri); { + case regFile.Match(uriB): + syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{ + URI: regFile.ReplaceAllString(uri, ""), + Provider: syncProviderFile, + }) + case regCrd.Match(uriB): + syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{ + URI: regCrd.ReplaceAllString(uri, ""), + Provider: syncProviderKubernetes, + }) + case regURL.Match(uriB): + syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{ + URI: uri, + Provider: syncProviderHTTP, + }) + case regGRPC.Match(uriB): + syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{ + URI: regGRPC.ReplaceAllString(uri, ""), + Provider: syncProviderGrpc, + }) + case regGRPCSecure.Match(uriB): + syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{ + URI: regGRPCSecure.ReplaceAllString(uri, ""), + Provider: syncProviderGrpc, + TLS: true, + }) + default: + return syncProvidersParsed, fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', "+ + "'http(s)://', 'grpc(s)://', or 'core.openfeature.dev'", uri) + } + } + return syncProvidersParsed, nil +} diff --git a/core/pkg/sync/builder/utils_test.go b/core/pkg/sync/builder/utils_test.go new file mode 100644 index 000000000..3f914cfa4 --- /dev/null +++ b/core/pkg/sync/builder/utils_test.go @@ -0,0 +1,210 @@ +package builder + +import ( + "reflect" + "testing" + + "github.com/open-feature/flagd/core/pkg/sync" +) + +func TestParseSource(t *testing.T) { + test := map[string]struct { + in string + expectErr bool + out []sync.SourceConfig + }{ + "simple": { + in: "[{\"uri\":\"config/samples/example_flags.json\",\"provider\":\"file\"}]", + expectErr: false, + out: []sync.SourceConfig{ + { + URI: "config/samples/example_flags.json", + Provider: syncProviderFile, + }, + }, + }, + "multiple-syncs": { + in: `[ + {"uri":"config/samples/example_flags.json","provider":"file"}, + {"uri":"http://test.com","provider":"http","bearerToken":":)"}, + {"uri":"host:port","provider":"grpc"}, + {"uri":"default/my-crd","provider":"kubernetes"} + ]`, + expectErr: false, + out: []sync.SourceConfig{ + { + URI: "config/samples/example_flags.json", + Provider: syncProviderFile, + }, + { + URI: "http://test.com", + Provider: syncProviderHTTP, + BearerToken: ":)", + }, + { + URI: "host:port", + Provider: syncProviderGrpc, + }, + { + URI: "default/my-crd", + Provider: syncProviderKubernetes, + }, + }, + }, + "multiple-syncs-with-options": { + in: `[{"uri":"config/samples/example_flags.json","provider":"file"}, + {"uri":"http://my-flag-source.json","provider":"http","bearerToken":"bearer-dji34ld2l"}, + {"uri":"https://secure-remote","provider":"http","bearerToken":"bearer-dji34ld2l"}, + {"uri":"http://site.com","provider":"http","interval":77 }, + {"uri":"default/my-flag-config","provider":"kubernetes"}, + {"uri":"grpc-source:8080","provider":"grpc"}, + {"uri":"my-flag-source:8080","provider":"grpc", "tls":true, "certPath": "/certs/ca.cert", "providerID": "flagd-weatherapp-sidecar", "selector": "source=database,app=weatherapp"}] + `, + expectErr: false, + out: []sync.SourceConfig{ + { + URI: "config/samples/example_flags.json", + Provider: syncProviderFile, + }, + { + URI: "http://my-flag-source.json", + Provider: syncProviderHTTP, + BearerToken: "bearer-dji34ld2l", + }, + { + URI: "https://secure-remote", + Provider: syncProviderHTTP, + BearerToken: "bearer-dji34ld2l", + }, + { + URI: "http://site.com", + Provider: syncProviderHTTP, + Interval: 77, + }, + { + URI: "default/my-flag-config", + Provider: syncProviderKubernetes, + }, + { + URI: "grpc-source:8080", + Provider: syncProviderGrpc, + }, + { + URI: "my-flag-source:8080", + Provider: syncProviderGrpc, + TLS: true, + CertPath: "/certs/ca.cert", + ProviderID: "flagd-weatherapp-sidecar", + Selector: "source=database,app=weatherapp", + }, + }, + }, + "empty": { + in: `[]`, + expectErr: false, + out: []sync.SourceConfig{}, + }, + "parse-failure": { + in: ``, + expectErr: true, + out: []sync.SourceConfig{}, + }, + } + + for name, tt := range test { + t.Run(name, func(t *testing.T) { + out, err := ParseSources(tt.in) + if tt.expectErr { + if err == nil { + t.Error("expected error, got none") + } + } else if err != nil { + t.Errorf("did not expect error: %s", err.Error()) + } + if !reflect.DeepEqual(out, tt.out) { + t.Errorf("unexpected output, expected %v, got %v", tt.out, out) + } + }) + } +} + +func TestParseSyncProviderURIs(t *testing.T) { + test := map[string]struct { + in []string + expectErr bool + out []sync.SourceConfig + }{ + "simple": { + in: []string{ + "file:my-file.json", + }, + expectErr: false, + out: []sync.SourceConfig{ + { + URI: "my-file.json", + Provider: "file", + }, + }, + }, + "multiple-uris": { + in: []string{ + "file:my-file.json", + "https://test.com", + "grpc://host:port", + "grpcs://secure-grpc", + "core.openfeature.dev/default/my-crd", + }, + expectErr: false, + out: []sync.SourceConfig{ + { + URI: "my-file.json", + Provider: "file", + }, + { + URI: "https://test.com", + Provider: "http", + }, + { + URI: "host:port", + Provider: "grpc", + TLS: false, + }, + { + URI: "secure-grpc", + Provider: "grpc", + TLS: true, + }, + { + URI: "default/my-crd", + Provider: "kubernetes", + }, + }, + }, + "empty": { + in: []string{}, + expectErr: false, + out: []sync.SourceConfig{}, + }, + "parse-failure": { + in: []string{"care.openfeature.dev/will/fail"}, + expectErr: true, + out: []sync.SourceConfig{}, + }, + } + + for name, tt := range test { + t.Run(name, func(t *testing.T) { + out, err := ParseSyncProviderURIs(tt.in) + if tt.expectErr { + if err == nil { + t.Error("expected error, got none") + } + } else if err != nil { + t.Errorf("did not expect error: %s", err.Error()) + } + if !reflect.DeepEqual(out, tt.out) { + t.Errorf("unexpected output, expected %v, got %v", tt.out, out) + } + }) + } +} diff --git a/core/pkg/sync/file/filepath_sync.go b/core/pkg/sync/file/filepath_sync.go index 57c2bd051..dd7ab58cc 100644 --- a/core/pkg/sync/file/filepath_sync.go +++ b/core/pkg/sync/file/filepath_sync.go @@ -25,6 +25,14 @@ type Sync struct { Mux *msync.RWMutex } +func NewFileSync(uri string, logger *logger.Logger) *Sync { + return &Sync{ + URI: uri, + Logger: logger, + Mux: &msync.RWMutex{}, + } +} + // default state is used to prevent EOF errors when handling filepath delete events + empty files const defaultState = "{}" diff --git a/core/pkg/sync/grpc/mock/grpc.go b/core/pkg/sync/grpc/mock/grpc.go index f5eacf54d..7d72a4df5 100644 --- a/core/pkg/sync/grpc/mock/grpc.go +++ b/core/pkg/sync/grpc/mock/grpc.go @@ -160,7 +160,7 @@ func (mr *MockFlagSyncServiceClientResponseMockRecorder) Recv() *gomock.Call { } // RecvMsg mocks base method. -func (m_2 *MockFlagSyncServiceClientResponse) RecvMsg(m interface{}) error { +func (m_2 *MockFlagSyncServiceClientResponse) RecvMsg(m any) error { m_2.ctrl.T.Helper() ret := m_2.ctrl.Call(m_2, "RecvMsg", m) ret0, _ := ret[0].(error) @@ -174,7 +174,7 @@ func (mr *MockFlagSyncServiceClientResponseMockRecorder) RecvMsg(m interface{}) } // SendMsg mocks base method. -func (m_2 *MockFlagSyncServiceClientResponse) SendMsg(m interface{}) error { +func (m_2 *MockFlagSyncServiceClientResponse) SendMsg(m any) error { m_2.ctrl.T.Helper() ret := m_2.ctrl.Call(m_2, "SendMsg", m) ret0, _ := ret[0].(error) diff --git a/core/pkg/sync/isync.go b/core/pkg/sync/isync.go index 9eb80af84..dfdd59535 100644 --- a/core/pkg/sync/isync.go +++ b/core/pkg/sync/isync.go @@ -1,6 +1,8 @@ package sync -import "context" +import ( + "context" +) type Type int @@ -57,3 +59,16 @@ type DataSync struct { Source string Type } + +// SourceConfig is configuration option for flagd. This maps to startup parameter sources +type SourceConfig struct { + URI string `json:"uri"` + Provider string `json:"provider"` + + BearerToken string `json:"bearerToken,omitempty"` + CertPath string `json:"certPath,omitempty"` + TLS bool `json:"tls,omitempty"` + ProviderID string `json:"providerID,omitempty"` + Selector string `json:"selector,omitempty"` + Interval uint32 `json:"interval,omitempty"` +} diff --git a/core/pkg/sync/kubernetes/kubernetes_sync.go b/core/pkg/sync/kubernetes/kubernetes_sync.go index 730771854..8c6f73dc1 100644 --- a/core/pkg/sync/kubernetes/kubernetes_sync.go +++ b/core/pkg/sync/kubernetes/kubernetes_sync.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "os" "strings" msync "sync" "time" @@ -17,9 +16,7 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/clientcmd" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -29,6 +26,8 @@ var ( featureFlagResource = v1beta1.GroupVersion.WithResource("featureflags") ) +type SyncOption func(s *Sync) + type Sync struct { URI string @@ -45,34 +44,16 @@ func NewK8sSync( logger *logger.Logger, uri string, reader client.Reader, - dynamic dynamic.Interface, + dynamicClient dynamic.Interface, ) *Sync { return &Sync{ logger: logger, URI: uri, readClient: reader, - dynamicClient: dynamic, + dynamicClient: dynamicClient, } } -func GetClients() (client.Reader, dynamic.Interface, error) { - clusterConfig, err := k8sClusterConfig() - if err != nil { - return nil, nil, err - } - - readClient, err := client.New(clusterConfig, client.Options{Scheme: scheme.Scheme}) - if err != nil { - return nil, nil, fmt.Errorf("unable to create readClient: %w", err) - } - - dynamicClient, err := dynamic.NewForConfig(clusterConfig) - if err != nil { - return nil, nil, fmt.Errorf("unable to create dynamicClient: %w", err) - } - return readClient, dynamicClient, nil -} - func (k *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error { fetch, err := k.fetch(ctx) if err != nil { @@ -325,32 +306,6 @@ func parseURI(uri string) (string, string, error) { return s[0], s[1], nil } -// k8sClusterConfig build K8s connection config based available configurations -func k8sClusterConfig() (*rest.Config, error) { - cfg := os.Getenv("KUBECONFIG") - - var clusterConfig *rest.Config - var err error - - if cfg != "" { - clusterConfig, err = clientcmd.BuildConfigFromFlags("", cfg) - if err != nil { - err = fmt.Errorf("error building cluster config from flags: %w", err) - } - } else { - clusterConfig, err = rest.InClusterConfig() - if err != nil { - err = fmt.Errorf("error fetch cluster config: %w", err) - } - } - - if err != nil { - return nil, err - } - - return clusterConfig, nil -} - func marshallFeatureFlagSpec(ff *v1beta1.FeatureFlag) (string, error) { b, err := json.Marshal(ff.Spec.FlagSpec) if err != nil { diff --git a/core/pkg/sync/kubernetes/kubernetes_sync_test.go b/core/pkg/sync/kubernetes/kubernetes_sync_test.go index b81f44074..9494c15e4 100644 --- a/core/pkg/sync/kubernetes/kubernetes_sync_test.go +++ b/core/pkg/sync/kubernetes/kubernetes_sync_test.go @@ -786,31 +786,6 @@ func TestNotify(t *testing.T) { } } -func Test_k8sClusterConfig(t *testing.T) { - t.Run("Cannot find KUBECONFIG file", func(tt *testing.T) { - tt.Setenv("KUBECONFIG", "") - _, err := k8sClusterConfig() - if err == nil { - tt.Error("Expected error but got none") - } - }) - t.Run("KUBECONFIG file not existing", func(tt *testing.T) { - tt.Setenv("KUBECONFIG", "value") - _, err := k8sClusterConfig() - if err == nil { - tt.Error("Expected error but got none") - } - }) - t.Run("Default REST Config and missing svc account", func(tt *testing.T) { - tt.Setenv("KUBERNETES_SERVICE_HOST", "127.0.0.1") - tt.Setenv("KUBERNETES_SERVICE_PORT", "8080") - _, err := k8sClusterConfig() - if err == nil { - tt.Error("Expected error but got none") - } - }) -} - func Test_NewK8sSync(t *testing.T) { l, err := logger.NewZapLogger(zapcore.FatalLevel, "console") if err != nil { diff --git a/flagd-proxy/cmd/start.go b/flagd-proxy/cmd/start.go index 76b963a57..0e2ea472b 100644 --- a/flagd-proxy/cmd/start.go +++ b/flagd-proxy/cmd/start.go @@ -13,7 +13,7 @@ import ( "github.com/open-feature/flagd/core/pkg/logger" "github.com/open-feature/flagd/core/pkg/service" syncServer "github.com/open-feature/flagd/core/pkg/service/sync" - syncStore "github.com/open-feature/flagd/core/pkg/sync-store" + "github.com/open-feature/flagd/core/pkg/subscriptions" "github.com/spf13/cobra" "github.com/spf13/viper" "go.uber.org/zap/zapcore" @@ -70,7 +70,7 @@ var startCmd = &cobra.Command{ ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) - syncStore := syncStore.NewSyncStore(ctx, logger) + syncStore := subscriptions.NewManager(ctx, logger) s := syncServer.NewServer(logger, syncStore) // If --management-port is set use that value. If not and diff --git a/flagd/cmd/start.go b/flagd/cmd/start.go index 11660df85..b8a1fda0a 100644 --- a/flagd/cmd/start.go +++ b/flagd/cmd/start.go @@ -7,6 +7,8 @@ import ( "github.com/open-feature/flagd/core/pkg/logger" "github.com/open-feature/flagd/core/pkg/runtime" + "github.com/open-feature/flagd/core/pkg/sync" + syncbuilder "github.com/open-feature/flagd/core/pkg/sync/builder" "github.com/spf13/cobra" "github.com/spf13/viper" "go.uber.org/zap" @@ -140,14 +142,14 @@ var startCmd = &cobra.Command{ docsLinkConfiguration) } - syncProviders, err := runtime.ParseSyncProviderURIs(viper.GetStringSlice(uriFlagName)) + syncProviders, err := syncbuilder.ParseSyncProviderURIs(viper.GetStringSlice(uriFlagName)) if err != nil { log.Fatal(err) } - syncProvidersFromConfig := []runtime.SourceConfig{} + syncProvidersFromConfig := []sync.SourceConfig{} if cfgFile == "" && viper.GetString(sourcesFlagName) != "" { - syncProvidersFromConfig, err = runtime.ParseSources(viper.GetString(sourcesFlagName)) + syncProvidersFromConfig, err = syncbuilder.ParseSources(viper.GetString(sourcesFlagName)) if err != nil { log.Fatal(err) }