Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: isolate flag state management to store #383

Merged
merged 3 commits into from
Feb 9, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
refactor: isolate flag state management to store
Signed-off-by: Skye Gill <[email protected]>
skyerus committed Feb 9, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 4f403ec5dfd0b787a44593cda283614cd2dfd64f
28 changes: 12 additions & 16 deletions pkg/eval/fractional_evaluation_test.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package eval

import (
"sync"
"testing"

"github.com/open-feature/flagd/pkg/store"

"github.com/open-feature/flagd/pkg/logger"
"github.com/open-feature/flagd/pkg/model"
"google.golang.org/protobuf/types/known/structpb"
)

func TestFractionalEvaluation(t *testing.T) {
flags := Flags{
mx: &sync.RWMutex{},
Flags: map[string]Flag{
Flags: map[string]model.Flag{
"headerColor": {
State: "ENABLED",
DefaultVariant: "red",
@@ -115,8 +115,7 @@ func TestFractionalEvaluation(t *testing.T) {
},
"non even split": {
flags: Flags{
mx: &sync.RWMutex{},
Flags: map[string]Flag{
Flags: map[string]model.Flag{
"headerColor": {
State: "ENABLED",
DefaultVariant: "red",
@@ -167,8 +166,7 @@ func TestFractionalEvaluation(t *testing.T) {
},
"fallback to default variant if no email provided": {
flags: Flags{
mx: &sync.RWMutex{},
Flags: map[string]Flag{
Flags: map[string]model.Flag{
"headerColor": {
State: "ENABLED",
DefaultVariant: "red",
@@ -210,8 +208,7 @@ func TestFractionalEvaluation(t *testing.T) {
},
"fallback to default variant if invalid variant as result of fractional evaluation": {
flags: Flags{
mx: &sync.RWMutex{},
Flags: map[string]Flag{
Flags: map[string]model.Flag{
"headerColor": {
State: "ENABLED",
DefaultVariant: "red",
@@ -245,8 +242,7 @@ func TestFractionalEvaluation(t *testing.T) {
},
"fallback to default variant if percentages don't sum to 100": {
flags: Flags{
mx: &sync.RWMutex{},
Flags: map[string]Flag{
Flags: map[string]model.Flag{
"headerColor": {
State: "ENABLED",
DefaultVariant: "red",
@@ -287,10 +283,10 @@ func TestFractionalEvaluation(t *testing.T) {
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
je := NewJSONEvaluator(logger.NewLogger(nil, false))
je.state = tt.flags
je.store.Flags = tt.flags.Flags

value, variant, reason, err := resolve[string](
reqID, tt.flagKey, tt.context, je.evaluateVariant, je.state.Flags[tt.flagKey].Variants,
reqID, tt.flagKey, tt.context, je.evaluateVariant, je.store.Flags[tt.flagKey].Variants,
)

if value != tt.expectedValue {
@@ -314,7 +310,7 @@ func TestFractionalEvaluation(t *testing.T) {

func BenchmarkFractionalEvaluation(b *testing.B) {
flags := Flags{
Flags: map[string]Flag{
Flags: map[string]model.Flag{
"headerColor": {
State: "ENABLED",
DefaultVariant: "red",
@@ -419,10 +415,10 @@ func BenchmarkFractionalEvaluation(b *testing.B) {
reqID := "test"
for name, tt := range tests {
b.Run(name, func(b *testing.B) {
je := JSONEvaluator{state: tt.flags}
je := JSONEvaluator{store: &store.Flags{Flags: tt.flags.Flags}}
for i := 0; i < b.N; i++ {
value, variant, reason, err := resolve[string](
reqID, tt.flagKey, tt.context, je.evaluateVariant, je.state.Flags[tt.flagKey].Variants,
reqID, tt.flagKey, tt.context, je.evaluateVariant, je.store.Flags[tt.flagKey].Variants,
)

if value != tt.expectedValue {
14 changes: 0 additions & 14 deletions pkg/eval/ievaluator.go
Original file line number Diff line number Diff line change
@@ -5,20 +5,6 @@ import (
"google.golang.org/protobuf/types/known/structpb"
)

type StateChangeNotificationType string

const (
NotificationDelete StateChangeNotificationType = "delete"
NotificationCreate StateChangeNotificationType = "write"
NotificationUpdate StateChangeNotificationType = "update"
)

type StateChangeNotification struct {
Type StateChangeNotificationType `json:"type"`
Source string `json:"source"`
FlagKey string `json:"flagKey"`
}

type AnyValue struct {
Value interface{}
Variant string
63 changes: 25 additions & 38 deletions pkg/eval/json_evaluator.go
Original file line number Diff line number Diff line change
@@ -8,8 +8,8 @@ import (
"regexp"
"strconv"
"strings"
mxSync "sync"

"github.com/open-feature/flagd/pkg/store"
"github.com/open-feature/flagd/pkg/sync"

"github.com/diegoholiveira/jsonlogic/v3"
@@ -28,7 +28,7 @@ func init() {
}

type JSONEvaluator struct {
state Flags
store *store.Flags
Logger *logger.Logger
}

@@ -46,21 +46,14 @@ func NewJSONEvaluator(logger *logger.Logger) *JSONEvaluator {
zap.String("component", "evaluator"),
zap.String("evaluator", "json"),
),
state: Flags{
Flags: map[string]Flag{},
mx: &mxSync.RWMutex{},
},
store: store.NewFlags(),
}
jsonlogic.AddOperator("fractionalEvaluation", ev.fractionalEvaluation)
return &ev
}

func (je *JSONEvaluator) GetState() (string, error) {
data, err := json.Marshal(&je.state)
if err != nil {
return "", err
}
return string(data), nil
return je.store.String()
}

func (je *JSONEvaluator) SetState(payload sync.DataSync) (map[string]interface{}, error) {
@@ -72,13 +65,13 @@ func (je *JSONEvaluator) SetState(payload sync.DataSync) (map[string]interface{}

switch payload.Type {
case sync.ALL:
return je.state.Merge(je.Logger, payload.Source, newFlags), nil
return je.store.Merge(je.Logger, payload.Source, newFlags.Flags), nil
case sync.ADD:
return je.state.Add(je.Logger, payload.Source, newFlags), nil
return je.store.Add(je.Logger, payload.Source, newFlags.Flags), nil
case sync.UPDATE:
return je.state.Update(je.Logger, payload.Source, newFlags), nil
return je.store.Update(je.Logger, payload.Source, newFlags.Flags), nil
case sync.DELETE:
return je.state.Delete(je.Logger, payload.Source, newFlags), nil
return je.store.DeleteFlags(je.Logger, payload.Source, newFlags.Flags), nil
default:
return nil, fmt.Errorf("unsupported sync type: %d", payload.Type)
}
@@ -112,9 +105,8 @@ func (je *JSONEvaluator) ResolveAllValues(reqID string, context *structpb.Struct
var variant string
var reason string
var err error
je.state.mx.RLock()
defer je.state.mx.RUnlock()
for flagKey, flag := range je.state.Flags {
allFlags := je.store.GetAll()
for flagKey, flag := range allFlags {
defaultValue := flag.Variants[flag.DefaultVariant]
switch defaultValue.(type) {
case bool:
@@ -123,31 +115,31 @@ func (je *JSONEvaluator) ResolveAllValues(reqID string, context *structpb.Struct
flagKey,
context,
je.evaluateVariant,
je.state.Flags[flagKey].Variants,
allFlags[flagKey].Variants,
)
case string:
value, variant, reason, err = resolve[string](
reqID,
flagKey,
context,
je.evaluateVariant,
je.state.Flags[flagKey].Variants,
allFlags[flagKey].Variants,
)
case float64:
value, variant, reason, err = resolve[float64](
reqID,
flagKey,
context,
je.evaluateVariant,
je.state.Flags[flagKey].Variants,
allFlags[flagKey].Variants,
)
case map[string]any:
value, variant, reason, err = resolve[map[string]any](
reqID,
flagKey,
context,
je.evaluateVariant,
je.state.Flags[flagKey].Variants,
allFlags[flagKey].Variants,
)
}
if err != nil {
@@ -165,10 +157,9 @@ func (je *JSONEvaluator) ResolveBooleanValue(reqID string, flagKey string, conte
reason string,
err error,
) {
je.state.mx.RLock()
defer je.state.mx.RUnlock()
je.Logger.DebugWithID(reqID, fmt.Sprintf("evaluating boolean flag: %s", flagKey))
return resolve[bool](reqID, flagKey, context, je.evaluateVariant, je.state.Flags[flagKey].Variants)
flag, _ := je.store.Get(flagKey)
return resolve[bool](reqID, flagKey, context, je.evaluateVariant, flag.Variants)
}

func (je *JSONEvaluator) ResolveStringValue(reqID string, flagKey string, context *structpb.Struct) (
@@ -177,10 +168,9 @@ func (je *JSONEvaluator) ResolveStringValue(reqID string, flagKey string, contex
reason string,
err error,
) {
je.state.mx.RLock()
defer je.state.mx.RUnlock()
je.Logger.DebugWithID(reqID, fmt.Sprintf("evaluating string flag: %s", flagKey))
return resolve[string](reqID, flagKey, context, je.evaluateVariant, je.state.Flags[flagKey].Variants)
flag, _ := je.store.Get(flagKey)
return resolve[string](reqID, flagKey, context, je.evaluateVariant, flag.Variants)
}

func (je *JSONEvaluator) ResolveFloatValue(reqID string, flagKey string, context *structpb.Struct) (
@@ -189,11 +179,10 @@ func (je *JSONEvaluator) ResolveFloatValue(reqID string, flagKey string, context
reason string,
err error,
) {
je.state.mx.RLock()
defer je.state.mx.RUnlock()
je.Logger.DebugWithID(reqID, fmt.Sprintf("evaluating float flag: %s", flagKey))
flag, _ := je.store.Get(flagKey)
value, variant, reason, err = resolve[float64](
reqID, flagKey, context, je.evaluateVariant, je.state.Flags[flagKey].Variants)
reqID, flagKey, context, je.evaluateVariant, flag.Variants)
return
}

@@ -203,12 +192,11 @@ func (je *JSONEvaluator) ResolveIntValue(reqID string, flagKey string, context *
reason string,
err error,
) {
je.state.mx.RLock()
defer je.state.mx.RUnlock()
je.Logger.DebugWithID(reqID, fmt.Sprintf("evaluating int flag: %s", flagKey))
flag, _ := je.store.Get(flagKey)
var val float64
val, variant, reason, err = resolve[float64](
reqID, flagKey, context, je.evaluateVariant, je.state.Flags[flagKey].Variants)
reqID, flagKey, context, je.evaluateVariant, flag.Variants)
value = int64(val)
return
}
@@ -219,10 +207,9 @@ func (je *JSONEvaluator) ResolveObjectValue(reqID string, flagKey string, contex
reason string,
err error,
) {
je.state.mx.RLock()
defer je.state.mx.RUnlock()
je.Logger.DebugWithID(reqID, fmt.Sprintf("evaluating object flag: %s", flagKey))
return resolve[map[string]any](reqID, flagKey, context, je.evaluateVariant, je.state.Flags[flagKey].Variants)
flag, _ := je.store.Get(flagKey)
return resolve[map[string]any](reqID, flagKey, context, je.evaluateVariant, flag.Variants)
}

// runs the rules (if defined) to determine the variant, otherwise falling through to the default
@@ -231,7 +218,7 @@ func (je *JSONEvaluator) evaluateVariant(
flagKey string,
context *structpb.Struct,
) (variant string, reason string, err error) {
flag, ok := je.state.Flags[flagKey]
flag, ok := je.store.Get(flagKey)
if !ok {
// flag not found
je.Logger.DebugWithID(reqID, fmt.Sprintf("requested flag could not be found: %s", flagKey))
172 changes: 2 additions & 170 deletions pkg/eval/json_evaluator_model.go
Original file line number Diff line number Diff line change
@@ -2,182 +2,14 @@ package eval

import (
"encoding/json"
"fmt"
"reflect"
"sync"

"github.com/open-feature/flagd/pkg/logger"
"github.com/open-feature/flagd/pkg/model"
)

type Flag struct {
State string `json:"state"`
DefaultVariant string `json:"defaultVariant"`
Variants map[string]any `json:"variants"`
Targeting json.RawMessage `json:"targeting,omitempty"`
Source string `json:"source"`
}

type Evaluators struct {
Evaluators map[string]json.RawMessage `json:"$evaluators"`
}

type Flags struct {
mx *sync.RWMutex
Flags map[string]Flag `json:"flags"`
}

// Add new flags from source. The implementation is not thread safe
func (f Flags) Add(logger *logger.Logger, source string, ff Flags) map[string]interface{} {
notifications := map[string]interface{}{}

for k, newFlag := range ff.Flags {
f.mx.RLock()
storedFlag, ok := f.Flags[k]
f.mx.RUnlock()
if ok && storedFlag.Source != source {
logger.Warn(fmt.Sprintf(
"flag with key %s from source %s already exist, overriding this with flag from source %s",
k,
storedFlag.Source,
source,
))
}

notifications[k] = map[string]interface{}{
"type": string(NotificationCreate),
"source": source,
}

// Store the new version of the flag
newFlag.Source = source
f.mx.Lock()
f.Flags[k] = newFlag
f.mx.Unlock()
}

return notifications
}

// Update existing flags from source. The implementation is not thread safe
func (f Flags) Update(logger *logger.Logger, source string, ff Flags) map[string]interface{} {
notifications := map[string]interface{}{}

for k, flag := range ff.Flags {
f.mx.RLock()
storedFlag, ok := f.Flags[k]
f.mx.RUnlock()
if !ok {
logger.Warn(
fmt.Sprintf("failed to update the flag, flag with key %s from source %s does not exist.",
k,
source))

continue
}
if storedFlag.Source != source {
logger.Warn(fmt.Sprintf(
"flag with key %s from source %s already exist, overriding this with flag from source %s",
k,
storedFlag.Source,
source,
))
}

notifications[k] = map[string]interface{}{
"type": string(NotificationUpdate),
"source": source,
}

flag.Source = source
f.mx.Lock()
f.Flags[k] = flag
f.mx.Unlock()
}

return notifications
}

// Delete matching flags from source. The implementation is not thread safe
func (f Flags) Delete(logger *logger.Logger, source string, ff Flags) map[string]interface{} {
notifications := map[string]interface{}{}

for k := range ff.Flags {
f.mx.RLock()
_, ok := f.Flags[k]
f.mx.RUnlock()
if ok {
notifications[k] = map[string]interface{}{
"type": string(NotificationDelete),
"source": source,
}

f.mx.Lock()
delete(f.Flags, k)
f.mx.Unlock()
} else {
logger.Warn(
fmt.Sprintf("failed to remove flag, flag with key %s from source %s does not exisit.",
k,
source))
}
}

return notifications
}

// Merge provided flags from source with currently stored flags. The implementation is not thread safe
func (f Flags) Merge(logger *logger.Logger, source string, ff Flags) map[string]interface{} {
notifications := map[string]interface{}{}

f.mx.Lock()
for k, v := range f.Flags {
if v.Source == source {
if _, ok := ff.Flags[k]; !ok {
// flag has been deleted
delete(f.Flags, k)
notifications[k] = map[string]interface{}{
"type": string(NotificationDelete),
"source": source,
}
continue
}
}
}
f.mx.Unlock()

for k, newFlag := range ff.Flags {
newFlag.Source = source

f.mx.RLock()
storedFlag, ok := f.Flags[k]
f.mx.RUnlock()
if !ok {
notifications[k] = map[string]interface{}{
"type": string(NotificationCreate),
"source": source,
}
} else if !reflect.DeepEqual(storedFlag, newFlag) {
if storedFlag.Source != source {
logger.Warn(
fmt.Sprintf(
"key value: %s is duplicated across multiple sources this can lead to unexpected behavior: %s, %s",
k,
storedFlag.Source,
source,
),
)
}
notifications[k] = map[string]interface{}{
"type": string(NotificationUpdate),
"source": source,
}
}

f.mx.Lock()
// Store the new version of the flag
f.Flags[k] = newFlag
f.mx.Unlock()
}

return notifications
Flags map[string]model.Flag `json:"flags"`
}
15 changes: 15 additions & 0 deletions pkg/model/flag.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package model

import "encoding/json"

type Flag struct {
State string `json:"state"`
DefaultVariant string `json:"defaultVariant"`
Variants map[string]any `json:"variants"`
Targeting json.RawMessage `json:"targeting,omitempty"`
Source string `json:"source"`
}

type Evaluators struct {
Evaluators map[string]json.RawMessage `json:"$evaluators"`
}
15 changes: 15 additions & 0 deletions pkg/model/notification.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package model

type StateChangeNotificationType string

const (
NotificationDelete StateChangeNotificationType = "delete"
NotificationCreate StateChangeNotificationType = "write"
NotificationUpdate StateChangeNotificationType = "update"
)

type StateChangeNotification struct {
Type StateChangeNotificationType `json:"type"`
Source string `json:"source"`
FlagKey string `json:"flagKey"`
}
205 changes: 205 additions & 0 deletions pkg/store/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package store

import (
"encoding/json"
"fmt"
"reflect"
"sync"

"github.com/open-feature/flagd/pkg/logger"

"github.com/open-feature/flagd/pkg/model"
)

type Flags struct {
mx sync.RWMutex
Flags map[string]model.Flag `json:"flags"`
}

func NewFlags() *Flags {
return &Flags{Flags: map[string]model.Flag{}}
}

func (f *Flags) Set(key string, flag model.Flag) {
f.mx.Lock()
defer f.mx.Unlock()
f.Flags[key] = flag
}

func (f *Flags) Get(key string) (model.Flag, bool) {
f.mx.RLock()
defer f.mx.RUnlock()
flag, ok := f.Flags[key]

return flag, ok
}

func (f *Flags) Delete(key string) {
f.mx.Lock()
defer f.mx.Unlock()
delete(f.Flags, key)
}

func (f *Flags) String() (string, error) {
f.mx.RLock()
defer f.mx.RUnlock()
bytes, err := json.Marshal(f)
if err != nil {
return "", err
}

return string(bytes), nil
}

// GetAll returns a copy of the store's state (copy in order to be concurrency safe)
func (f *Flags) GetAll() map[string]model.Flag {
f.mx.RLock()
defer f.mx.RUnlock()
state := make(map[string]model.Flag, len(f.Flags))

for key, flag := range f.Flags {
state[key] = flag
}

return state
}

// Add new flags from source.
func (f *Flags) Add(logger *logger.Logger, source string, flags map[string]model.Flag) map[string]interface{} {
notifications := map[string]interface{}{}

for k, newFlag := range flags {
storedFlag, ok := f.Get(k)
if ok && storedFlag.Source != source {
logger.Warn(fmt.Sprintf(
"flag with key %s from source %s already exist, overriding this with flag from source %s",
k,
storedFlag.Source,
source,
))
}

notifications[k] = map[string]interface{}{
"type": string(model.NotificationCreate),
"source": source,
}

// Store the new version of the flag
newFlag.Source = source
f.Set(k, newFlag)
}

return notifications
}

// Update existing flags from source.
func (f *Flags) Update(logger *logger.Logger, source string, flags map[string]model.Flag) map[string]interface{} {
notifications := map[string]interface{}{}

for k, flag := range flags {
storedFlag, ok := f.Get(k)
if !ok {
logger.Warn(
fmt.Sprintf("failed to update the flag, flag with key %s from source %s does not exist.",
k,
source))

continue
}
if storedFlag.Source != source {
logger.Warn(fmt.Sprintf(
"flag with key %s from source %s already exist, overriding this with flag from source %s",
k,
storedFlag.Source,
source,
))
}

notifications[k] = map[string]interface{}{
"type": string(model.NotificationUpdate),
"source": source,
}

flag.Source = source
f.Set(k, flag)
}

return notifications
}

// DeleteFlags matching flags from source.
func (f *Flags) DeleteFlags(logger *logger.Logger, source string, flags map[string]model.Flag) map[string]interface{} {
notifications := map[string]interface{}{}

for k := range flags {
_, ok := f.Get(k)
if ok {
notifications[k] = map[string]interface{}{
"type": string(model.NotificationDelete),
"source": source,
}

f.Delete(k)
} else {
logger.Warn(
fmt.Sprintf("failed to remove flag, flag with key %s from source %s does not exisit.",
k,
source))
}
}

return notifications
}

// Merge provided flags from source with currently stored flags.
func (f *Flags) Merge(logger *logger.Logger, source string, flags map[string]model.Flag) map[string]interface{} {
notifications := map[string]interface{}{}

f.mx.Lock()
for k, v := range f.Flags {
if v.Source == source {
if _, ok := flags[k]; !ok {
// flag has been deleted
delete(f.Flags, k)
notifications[k] = map[string]interface{}{
"type": string(model.NotificationDelete),
"source": source,
}
continue
}
}
}
f.mx.Unlock()

for k, newFlag := range flags {
newFlag.Source = source

storedFlag, ok := f.Get(k)
if !ok {
notifications[k] = map[string]interface{}{
"type": string(model.NotificationCreate),
"source": source,
}
} else if !reflect.DeepEqual(storedFlag, newFlag) {
if storedFlag.Source != source {
logger.Warn(
fmt.Sprintf(
"key value: %s is duplicated across multiple sources this can lead to unexpected behavior: %s, %s",
k,
storedFlag.Source,
source,
),
)
}
notifications[k] = map[string]interface{}{
"type": string(model.NotificationUpdate),
"source": source,
}
}

// Store the new version of the flag
f.Set(k, newFlag)
}

return notifications
}
244 changes: 101 additions & 143 deletions pkg/eval/json_evaluator_model_test.go → pkg/store/flags_test.go

Large diffs are not rendered by default.