Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand central config support #648

Merged
merged 1 commit into from
Oct 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions capturebody.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ func (t *Tracer) CaptureHTTPRequestBody(req *http.Request) *BodyCapturer {
if req.Body == nil {
return nil
}
t.captureBodyMu.RLock()
captureBody := t.captureBody
t.captureBodyMu.RUnlock()
captureBody := t.instrumentationConfig().captureBody
if captureBody == CaptureBodyOff {
return nil
}
Expand Down
158 changes: 157 additions & 1 deletion env.go → config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"runtime"
"strconv"
"strings"
"sync/atomic"
"time"
"unsafe"

"github.com/pkg/errors"

Expand Down Expand Up @@ -203,6 +205,10 @@ func initialCaptureBody() (CaptureBodyMode, error) {
if value == "" {
return defaultCaptureBody, nil
}
return parseCaptureBody(envCaptureBody, value)
}

func parseCaptureBody(name, value string) (CaptureBodyMode, error) {
switch strings.TrimSpace(strings.ToLower(value)) {
case "all":
return CaptureBodyAll, nil
Expand All @@ -213,7 +219,7 @@ func initialCaptureBody() (CaptureBodyMode, error) {
case "off":
return CaptureBodyOff, nil
}
return -1, errors.Errorf("invalid %s value %q", envCaptureBody, value)
return -1, errors.Errorf("invalid %s value %q", name, value)
}

func initialService() (name, version, environment string) {
Expand Down Expand Up @@ -261,3 +267,153 @@ func initialCentralConfigEnabled() (bool, error) {
func initialBreakdownMetricsEnabled() (bool, error) {
return configutil.ParseBoolEnv(envBreakdownMetrics, true)
}

// updateRemoteConfig updates t and cfg with changes held in "attrs", and reverts to local
// config for config attributes that have been removed (exist in old but not in attrs).
//
// On return from updateRemoteConfig, unapplied config will have been removed from attrs.
func (t *Tracer) updateRemoteConfig(logger WarningLogger, old, attrs map[string]string) {
warningf := func(string, ...interface{}) {}
debugf := func(string, ...interface{}) {}
errorf := func(string, ...interface{}) {}
if logger != nil {
warningf = logger.Warningf
debugf = logger.Debugf
errorf = logger.Errorf
}
envName := func(k string) string {
return "ELASTIC_APM_" + strings.ToUpper(k)
}

var updates []func(cfg *instrumentationConfig)
for k, v := range attrs {
if oldv, ok := old[k]; ok && oldv == v {
continue
}
switch envName(k) {
case envCaptureBody:
value, err := parseCaptureBody(k, v)
if err != nil {
errorf("central config failure: %s", err)
delete(attrs, k)
continue
} else {
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.captureBody = value
})
}
case envMaxSpans:
value, err := strconv.Atoi(v)
if err != nil {
errorf("central config failure: failed to parse %s: %s", k, err)
delete(attrs, k)
continue
} else {
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.maxSpans = value
})
}
case envTransactionSampleRate:
sampler, err := parseSampleRate(k, v)
if err != nil {
errorf("central config failure: %s", err)
delete(attrs, k)
continue
} else {
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.sampler = sampler
})
}
default:
warningf("central config failure: unsupported config: %s", k)
delete(attrs, k)
continue
}
debugf("central config update: updated %s to %s", k, v)
}
for k := range old {
if _, ok := attrs[k]; ok {
continue
}
updates = append(updates, func(cfg *instrumentationConfig) {
if f, ok := cfg.local[envName(k)]; ok {
f(&cfg.instrumentationConfigValues)
}
})
debugf("central config update: reverted %s to local config", k)
}
if updates != nil {
remote := make(map[string]struct{})
for k := range attrs {
remote[envName(k)] = struct{}{}
}
t.updateInstrumentationConfig(func(cfg *instrumentationConfig) {
cfg.remote = remote
for _, update := range updates {
update(cfg)
}
})
}
}

// instrumentationConfig returns the current instrumentationConfig.
//
// The returned value is immutable.
func (t *Tracer) instrumentationConfig() *instrumentationConfig {
config := atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&t.instrumentationConfigInternal)))
return (*instrumentationConfig)(config)
}

// setLocalInstrumentationConfig sets local transaction configuration with
// the specified environment variable key.
func (t *Tracer) setLocalInstrumentationConfig(envKey string, f func(cfg *instrumentationConfigValues)) {
t.updateInstrumentationConfig(func(cfg *instrumentationConfig) {
cfg.local[envKey] = f
if _, ok := cfg.remote[envKey]; !ok {
f(&cfg.instrumentationConfigValues)
}
})
}

func (t *Tracer) updateInstrumentationConfig(f func(cfg *instrumentationConfig)) {
for {
oldConfig := t.instrumentationConfig()
newConfig := *oldConfig
f(&newConfig)
if atomic.CompareAndSwapPointer(
(*unsafe.Pointer)(unsafe.Pointer(&t.instrumentationConfigInternal)),
unsafe.Pointer(oldConfig),
unsafe.Pointer(&newConfig),
) {
return
}
}
}

// instrumentationConfig holds current configuration values, as well as information
// required to revert from remote to local configuration.
type instrumentationConfig struct {
instrumentationConfigValues

// local holds functions for setting instrumentationConfigValues to the most
// recently, locally specified configuration.
local map[string]func(*instrumentationConfigValues)

// remote holds the environment variable keys for applied remote config.
remote map[string]struct{}
}

// instrumentationConfigValues holds configuration that is accessible outside of the
// tracer loop, for instrumentation: StartTransaction, StartSpan, CaptureError, etc.
//
// NOTE(axw) when adding configuration here, you must also update `newTracer` to
// set the initial entry in instrumentationConfig.local, in order to properly reset
// to the local value, even if the default is the zero value.
type instrumentationConfigValues struct {
captureBody CaptureBodyMode
captureHeaders bool
maxSpans int
sampler Sampler
spanFramesMinDuration time.Duration
stackTraceLimit int
}
Loading