Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Add client bad request/internal error distinction for metrics and sampled logs #2201

Merged
merged 5 commits into from
Mar 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/cmd/services/m3coordinator/ingest/m3msg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (cfg Configuration) newOptions(
if cfg.LogSampleRate != nil {
logSampleRate = *cfg.LogSampleRate
}
sampler, err := sampler.NewSampler(logSampleRate)
sampler, err := sampler.NewSampler(sampler.Rate(logSampleRate))
if err != nil {
return Options{}, err
}
Expand Down
1 change: 1 addition & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ func TestConfiguration(t *testing.T) {
maxRetries: 3
forever: null
jitter: true
logErrorSampleRate: 0
backgroundHealthCheckFailLimit: 4
backgroundHealthCheckFailThrottleFactor: 0.5
hashing:
Expand Down
57 changes: 57 additions & 0 deletions src/dbnode/client/client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion src/dbnode/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/retry"
"github.com/m3db/m3/src/x/sampler"
xsync "github.com/m3db/m3/src/x/sync"
)

Expand Down Expand Up @@ -76,6 +77,9 @@ type Configuration struct {
// FetchRetry is the fetch retry config.
FetchRetry *retry.Configuration `yaml:"fetchRetry"`

// LogErrorSampleRate is the log error sample rate.
LogErrorSampleRate sampler.Rate `yaml:"logErrorSampleRate"`

// BackgroundHealthCheckFailLimit is the amount of times a background check
// must fail before a connection is taken out of consideration.
BackgroundHealthCheckFailLimit *int `yaml:"backgroundHealthCheckFailLimit"`
Expand Down Expand Up @@ -158,6 +162,10 @@ func (c *Configuration) Validate() error {
return fmt.Errorf("m3db client connectTimeout was: %d but must be >= 0", *c.ConnectTimeout)
}

if err := c.LogErrorSampleRate.Validate(); err != nil {
return fmt.Errorf("m3db client error validating log error sample rate: %v", err)
}

if c.BackgroundHealthCheckFailLimit != nil &&
(*c.BackgroundHealthCheckFailLimit < 0 || *c.BackgroundHealthCheckFailLimit > 10) {
return fmt.Errorf(
Expand Down Expand Up @@ -296,7 +304,8 @@ func (c Configuration) NewAdminClient(
v := NewAdminOptions().
SetTopologyInitializer(syncTopoInit).
SetAsyncTopologyInitializers(asyncTopoInits).
SetInstrumentOptions(iopts)
SetInstrumentOptions(iopts).
SetLogErrorSampleRate(c.LogErrorSampleRate)

if c.UseV2BatchAPIs != nil {
v = v.SetUseV2BatchAPIs(*c.UseV2BatchAPIs)
Expand Down
19 changes: 17 additions & 2 deletions src/dbnode/client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"
xretry "github.com/m3db/m3/src/x/retry"
"github.com/m3db/m3/src/x/sampler"
"github.com/m3db/m3/src/x/serialize"
xsync "github.com/m3db/m3/src/x/sync"

Expand Down Expand Up @@ -214,6 +215,7 @@ type options struct {
runtimeOptsMgr m3dbruntime.OptionsManager
clockOpts clock.Options
instrumentOpts instrument.Options
logErrorSampleRate sampler.Rate
topologyInitializer topology.Initializer
readConsistencyLevel topology.ReadConsistencyLevel
writeConsistencyLevel topology.ConsistencyLevel
Expand Down Expand Up @@ -393,9 +395,12 @@ func validate(opts *options) error {
); err != nil {
return err
}
return topology.ValidateConnectConsistencyLevel(
if err := topology.ValidateConnectConsistencyLevel(
opts.clusterConnectConsistencyLevel,
)
); err != nil {
return err
}
return opts.logErrorSampleRate.Validate()
}

func (o *options) Validate() error {
Expand Down Expand Up @@ -454,6 +459,16 @@ func (o *options) InstrumentOptions() instrument.Options {
return o.instrumentOpts
}

func (o *options) SetLogErrorSampleRate(value sampler.Rate) Options {
opts := *o
opts.logErrorSampleRate = value
return &opts
}

func (o *options) LogErrorSampleRate() sampler.Rate {
return o.logErrorSampleRate
}

func (o *options) SetTopologyInitializer(value topology.Initializer) Options {
opts := *o
opts.topologyInitializer = value
Expand Down
61 changes: 52 additions & 9 deletions src/dbnode/client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"
xretry "github.com/m3db/m3/src/x/retry"
"github.com/m3db/m3/src/x/sampler"
"github.com/m3db/m3/src/x/serialize"
xsync "github.com/m3db/m3/src/x/sync"
xtime "github.com/m3db/m3/src/x/time"
Expand Down Expand Up @@ -141,6 +142,8 @@ type session struct {
scope tally.Scope
nowFn clock.NowFn
log *zap.Logger
logWriteErrorSampler *sampler.Sampler
logFetchErrorSampler *sampler.Sampler
newHostQueueFn newHostQueueFn
writeRetrier xretry.Retrier
fetchRetrier xretry.Retrier
Expand All @@ -167,12 +170,14 @@ type shardMetricsKey struct {
type sessionMetrics struct {
sync.RWMutex
writeSuccess tally.Counter
writeErrors tally.Counter
writeErrorsBadRequest tally.Counter
writeErrorsInternalError tally.Counter
writeLatencyHistogram tally.Histogram
writeNodesRespondingErrors []tally.Counter
writeNodesRespondingBadRequestErrors []tally.Counter
fetchSuccess tally.Counter
fetchErrors tally.Counter
fetchErrorsBadRequest tally.Counter
fetchErrorsInternalError tally.Counter
fetchLatencyHistogram tally.Histogram
fetchNodesRespondingErrors []tally.Counter
fetchNodesRespondingBadRequestErrors []tally.Counter
Expand All @@ -183,11 +188,21 @@ type sessionMetrics struct {

func newSessionMetrics(scope tally.Scope) sessionMetrics {
return sessionMetrics{
writeSuccess: scope.Counter("write.success"),
writeErrors: scope.Counter("write.errors"),
writeLatencyHistogram: histogramWithDurationBuckets(scope, "write.latency"),
fetchSuccess: scope.Counter("fetch.success"),
fetchErrors: scope.Counter("fetch.errors"),
writeSuccess: scope.Counter("write.success"),
writeErrorsBadRequest: scope.Tagged(map[string]string{
"error_type": "bad_request",
}).Counter("write.errors"),
writeErrorsInternalError: scope.Tagged(map[string]string{
"error_type": "internal_error",
}).Counter("write.errors"),
writeLatencyHistogram: histogramWithDurationBuckets(scope, "write.latency"),
fetchSuccess: scope.Counter("fetch.success"),
fetchErrorsBadRequest: scope.Tagged(map[string]string{
"error_type": "bad_request",
}).Counter("fetch.errors"),
fetchErrorsInternalError: scope.Tagged(map[string]string{
"error_type": "internal_error",
}).Counter("fetch.errors"),
fetchLatencyHistogram: histogramWithDurationBuckets(scope, "fetch.latency"),
topologyUpdatedSuccess: scope.Counter("topology.updated-success"),
topologyUpdatedError: scope.Counter("topology.updated-error"),
Expand Down Expand Up @@ -239,6 +254,16 @@ func newSession(opts Options) (clientSession, error) {
return nil, err
}

logWriteErrorSampler, err := sampler.NewSampler(opts.LogErrorSampleRate())
if err != nil {
return nil, err
}

logFetchErrorSampler, err := sampler.NewSampler(opts.LogErrorSampleRate())
if err != nil {
return nil, err
}

scope := opts.InstrumentOptions().MetricsScope()

s := &session{
Expand All @@ -252,6 +277,8 @@ func newSession(opts Options) (clientSession, error) {
scope: scope,
nowFn: opts.ClockOptions().NowFn(),
log: opts.InstrumentOptions().Logger(),
logWriteErrorSampler: logWriteErrorSampler,
logFetchErrorSampler: logFetchErrorSampler,
newHostQueueFn: newHostQueue,
fetchBatchSize: opts.FetchBatchSize(),
newPeerBlocksQueueFn: newPeerBlocksQueue,
Expand Down Expand Up @@ -425,10 +452,18 @@ func (s *session) recordWriteMetrics(consistencyResultErr error, respErrs int32,
}
if consistencyResultErr == nil {
s.metrics.writeSuccess.Inc(1)
} else if IsBadRequestError(consistencyResultErr) {
s.metrics.writeErrorsBadRequest.Inc(1)
} else {
s.metrics.writeErrors.Inc(1)
s.metrics.writeErrorsInternalError.Inc(1)
}
s.metrics.writeLatencyHistogram.RecordDuration(s.nowFn().Sub(start))

if consistencyResultErr != nil && s.logWriteErrorSampler.Sample() {
s.log.Error("m3db client write error occurred",
zap.Float64("sampleRateLog", s.logWriteErrorSampler.SampleRate().Value()),
zap.Error(consistencyResultErr))
}
}

func (s *session) recordFetchMetrics(consistencyResultErr error, respErrs int32, start time.Time) {
Expand All @@ -441,10 +476,18 @@ func (s *session) recordFetchMetrics(consistencyResultErr error, respErrs int32,
}
if consistencyResultErr == nil {
s.metrics.fetchSuccess.Inc(1)
} else if IsBadRequestError(consistencyResultErr) {
s.metrics.fetchErrorsBadRequest.Inc(1)
} else {
s.metrics.fetchErrors.Inc(1)
s.metrics.fetchErrorsInternalError.Inc(1)
}
s.metrics.fetchLatencyHistogram.RecordDuration(s.nowFn().Sub(start))

if consistencyResultErr != nil && s.logFetchErrorSampler.Sample() {
s.log.Error("m3db client fetch error occurred",
zap.Float64("sampleRateLog", s.logFetchErrorSampler.SampleRate().Value()),
zap.Error(consistencyResultErr))
}
}

func (s *session) nodesRespondingErrorsMetricIndex(respErrs int32) int32 {
Expand Down
7 changes: 7 additions & 0 deletions src/dbnode/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"
xretry "github.com/m3db/m3/src/x/retry"
"github.com/m3db/m3/src/x/sampler"
"github.com/m3db/m3/src/x/serialize"
xsync "github.com/m3db/m3/src/x/sync"
xtime "github.com/m3db/m3/src/x/time"
Expand Down Expand Up @@ -272,6 +273,12 @@ type Options interface {
// InstrumentOptions returns the instrumentation options.
InstrumentOptions() instrument.Options

// SetLogErrorSampleRate sets the log error sample rate between [0,1.0].
SetLogErrorSampleRate(value sampler.Rate) Options

// LogErrorSampleRate returns the log error sample rate between [0,1.0].
LogErrorSampleRate() sampler.Rate

// SetTopologyInitializer sets the TopologyInitializer.
SetTopologyInitializer(value topology.Initializer) Options

Expand Down
Loading