Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] Support periodic refresh of sampling strategies #2188

Merged
merged 2 commits into from
Apr 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions plugin/sampling/strategystore/static/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,11 @@ func defaultStrategyResponse() *sampling.SamplingStrategyResponse {
},
}
}

func defaultStrategies() *storedStrategies {
s := &storedStrategies{
serviceStrategies: make(map[string]*sampling.SamplingStrategyResponse),
}
s.defaultStrategy = defaultStrategyResponse()
return s
}
8 changes: 7 additions & 1 deletion plugin/sampling/strategystore/static/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,33 @@ package static

import (
"flag"
"time"

"github.com/spf13/viper"
)

const (
samplingStrategiesFile = "sampling.strategies-file"
samplingStrategiesFile = "sampling.strategies-file"
samplingStrategiesReloadInterval = "sampling.strategies-reload-interval"
)

// Options holds configuration for the static sampling strategy store.
type Options struct {
// StrategiesFile is the path for the sampling strategies file in JSON format
StrategiesFile string
// ReloadInterval is the time interval to check and reload sampling strategies file
ReloadInterval time.Duration
}

// AddFlags adds flags for Options
func AddFlags(flagSet *flag.FlagSet) {
flagSet.String(samplingStrategiesFile, "", "The path for the sampling strategies file in JSON format. See sampling documentation to see format of the file")
flagSet.Duration(samplingStrategiesReloadInterval, 0, "Reload interval to check and reload sampling strategies file. Zero value means no reloading")
}

// InitFromViper initializes Options with properties from viper
func (opts *Options) InitFromViper(v *viper.Viper) *Options {
opts.StrategiesFile = v.GetString(samplingStrategiesFile)
opts.ReloadInterval = v.GetDuration(samplingStrategiesReloadInterval)
return opts
}
87 changes: 75 additions & 12 deletions plugin/sampling/strategystore/static/strategy_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@ package static

import (
"bytes"
"context"
"encoding/gob"
"encoding/json"
"fmt"
"io/ioutil"
"path/filepath"
"sync/atomic"
"time"

"go.uber.org/zap"

Expand All @@ -30,31 +34,89 @@ import (
type strategyStore struct {
logger *zap.Logger

storedStrategies atomic.Value // holds *storedStrategies

ctx context.Context
cancelFunc context.CancelFunc
}

type storedStrategies struct {
defaultStrategy *sampling.SamplingStrategyResponse
serviceStrategies map[string]*sampling.SamplingStrategyResponse
}

// NewStrategyStore creates a strategy store that holds static sampling strategies.
func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, error) {
ctx, cancelFunc := context.WithCancel(context.Background())
h := &strategyStore{
logger: logger,
serviceStrategies: make(map[string]*sampling.SamplingStrategyResponse),
logger: logger,
ctx: ctx,
cancelFunc: cancelFunc,
}
h.storedStrategies.Store(defaultStrategies())

strategies, err := loadStrategies(options.StrategiesFile)
if err != nil {
return nil, err
}
h.parseStrategies(strategies)

if options.ReloadInterval > 0 {
go h.autoUpdateStrategies(options.ReloadInterval, options.StrategiesFile)
}
return h, nil
}

// GetSamplingStrategy implements StrategyStore#GetSamplingStrategy.
func (h *strategyStore) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) {
if strategy, ok := h.serviceStrategies[serviceName]; ok {
ss := h.storedStrategies.Load().(*storedStrategies)
serviceStrategies := ss.serviceStrategies
if strategy, ok := serviceStrategies[serviceName]; ok {
return strategy, nil
}
h.logger.Debug("sampling strategy not found, using default", zap.String("service", serviceName))
return h.defaultStrategy, nil
return ss.defaultStrategy, nil
}

// Close stops updating the strategies
func (h *strategyStore) Close() {
h.cancelFunc()
}

func (h *strategyStore) autoUpdateStrategies(interval time.Duration, filePath string) {
lastString := ""
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
currBytes, err := ioutil.ReadFile(filepath.Clean(filePath))
if err != nil {
h.logger.Error("ReadFile failed", zap.Error(err))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
h.logger.Error("ReadFile failed", zap.Error(err))
h.logger.Error("failed to load sampling strategies", zap.String("file", filePath), zap.Error(err))
continue

}
currStr := string(currBytes)
if lastString == currStr {
continue
}
if err = h.updateSamplingStrategy(currBytes); err != nil {
h.logger.Error("UpdateSamplingStrategy failed", zap.Error(err))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
h.logger.Error("UpdateSamplingStrategy failed", zap.Error(err))
h.logger.Error("failed to update sampling strategies from file", zap.Error(err))

}
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
lastString = currStr

case <-h.ctx.Done():
return
}
}
}

func (h *strategyStore) updateSamplingStrategy(bytes []byte) error {
var strategies strategies
if err := json.Unmarshal(bytes, &strategies); err != nil {
return fmt.Errorf("failed to unmarshal strategies: %w", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return fmt.Errorf("failed to unmarshal strategies: %w", err)
return fmt.Errorf("failed to unmarshal sampling strategies: %w", err)

}
h.parseStrategies(&strategies)
h.logger.Info("Updated sampling strategies:" + string(bytes))
return nil
}

// TODO good candidate for a global util function
Expand All @@ -74,40 +136,41 @@ func loadStrategies(strategiesFile string) (*strategies, error) {
}

func (h *strategyStore) parseStrategies(strategies *strategies) {
h.defaultStrategy = defaultStrategyResponse()
if strategies == nil {
h.logger.Info("No sampling strategies provided, using defaults")
return
}
newStore := defaultStrategies()
if strategies.DefaultStrategy != nil {
h.defaultStrategy = h.parseServiceStrategies(strategies.DefaultStrategy)
newStore.defaultStrategy = h.parseServiceStrategies(strategies.DefaultStrategy)
}

merge := true
if h.defaultStrategy.OperationSampling == nil ||
h.defaultStrategy.OperationSampling.PerOperationStrategies == nil {
if newStore.defaultStrategy.OperationSampling == nil ||
newStore.defaultStrategy.OperationSampling.PerOperationStrategies == nil {
merge = false
}

for _, s := range strategies.ServiceStrategies {
h.serviceStrategies[s.Service] = h.parseServiceStrategies(s)
newStore.serviceStrategies[s.Service] = h.parseServiceStrategies(s)

// Merge with the default operation strategies, because only merging with
// the default strategy has no effect on service strategies (the default strategy
// is not merged with and only used as a fallback).
opS := h.serviceStrategies[s.Service].OperationSampling
opS := newStore.serviceStrategies[s.Service].OperationSampling
if opS == nil {
// Service has no per-operation strategies, so just reference the default settings.
h.serviceStrategies[s.Service].OperationSampling = h.defaultStrategy.OperationSampling
newStore.serviceStrategies[s.Service].OperationSampling = newStore.defaultStrategy.OperationSampling
continue
}

if merge {
opS.PerOperationStrategies = mergePerOperationSamplingStrategies(
opS.PerOperationStrategies,
h.defaultStrategy.OperationSampling.PerOperationStrategies)
newStore.defaultStrategy.OperationSampling.PerOperationStrategies)
}
}
h.storedStrategies.Store(newStore)
}

// mergePerOperationStrategies merges two operation strategies a and b, where a takes precedence over b.
Expand Down
54 changes: 52 additions & 2 deletions plugin/sampling/strategystore/static/strategy_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
package static

import (
"fmt"
"io/ioutil"
"os"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -79,7 +82,7 @@ func TestPerOperationSamplingStrategies(t *testing.T) {
os := s.OperationSampling
assert.EqualValues(t, os.DefaultSamplingProbability, 0.8)
require.Len(t, os.PerOperationStrategies, 4)
fmt.Println(os)

assert.Equal(t, "op6", os.PerOperationStrategies[0].Operation)
assert.EqualValues(t, 0.5, os.PerOperationStrategies[0].ProbabilisticSampling.SamplingRate)
assert.Equal(t, "op1", os.PerOperationStrategies[1].Operation)
Expand Down Expand Up @@ -243,3 +246,50 @@ func TestDeepCopy(t *testing.T) {
assert.False(t, copy == s)
assert.EqualValues(t, copy, s)
}

func TestAutoUpdateStrategy(t *testing.T) {
// copy from fixtures/strategies.json
tempFile, _ := ioutil.TempFile("", "for_go_test_*.json")
tempFile.Close()

srcFile, dstFile := "fixtures/strategies.json", tempFile.Name()
srcBytes, err := ioutil.ReadFile(srcFile)
require.NoError(t, err)
err = ioutil.WriteFile(dstFile, srcBytes, 0644)
require.NoError(t, err)

interval := time.Millisecond * 10
store, err := NewStrategyStore(Options{
StrategiesFile: dstFile,
ReloadInterval: interval,
}, zap.NewNop())
require.NoError(t, err)
defer store.(*strategyStore).Close()

s, err := store.GetSamplingStrategy("foo")
require.NoError(t, err)
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.8), *s)

// update file
newStr := strings.Replace(string(srcBytes), "0.8", "0.9", 1)
err = ioutil.WriteFile(dstFile, []byte(newStr), 0644)
require.NoError(t, err)

// wait for reloading
time.Sleep(interval * 4)

// verity reloading
s, err = store.GetSamplingStrategy("foo")
require.NoError(t, err)
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.9), *s)

// check bad file content
_ = ioutil.WriteFile(dstFile, []byte("bad value"), 0644)
time.Sleep(interval * 2)

// remove file(test read file failure)
_ = os.Remove(dstFile)
// wait for delete and update failure
time.Sleep(interval * 2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this testing (and how)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose is to increase the test code coverage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is there any other comment on this PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of checking the error conditions via timer updates, I recommend refactoring the logic under case <-ticker.C into a helper function reloadStrategiesFile and unit-testing that function with the desired edge cases.


}