From 70ebf1b82411f6284accd8bef3a5782292457d0b Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Sat, 13 Jun 2020 00:14:54 -0400 Subject: [PATCH] [query/dbnode] Fix require exhaustive propagation of require exhaustive option through RPC (#2409) --- src/dbnode/client/fetch_state.go | 8 ++++ .../fetch_tagged_results_accumulator.go | 17 +++++++-- src/dbnode/client/session.go | 6 +++ src/dbnode/client/write_state.go | 8 +++- .../server/tchannelthrift/convert/convert.go | 16 ++++---- src/dbnode/storage/index.go | 5 ++- .../handleroptions/fetch_options.go | 37 ++++++++++++++++++- .../prometheus/handleroptions/headers.go | 4 ++ 8 files changed, 86 insertions(+), 15 deletions(-) diff --git a/src/dbnode/client/fetch_state.go b/src/dbnode/client/fetch_state.go index 59e101a6ec..c991e19db5 100644 --- a/src/dbnode/client/fetch_state.go +++ b/src/dbnode/client/fetch_state.go @@ -31,6 +31,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/topology" "github.com/m3db/m3/src/dbnode/x/xpool" + xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/serialize" ) @@ -136,6 +137,13 @@ func (f *fetchState) completionFn( result interface{}, resultErr error, ) { + if IsBadRequestError(resultErr) { + // Wrap with invalid params and non-retryable so it is + // not retried. + resultErr = xerrors.NewInvalidParamsError(resultErr) + resultErr = xerrors.NewNonRetryableError(resultErr) + } + f.Lock() defer func() { f.Unlock() diff --git a/src/dbnode/client/fetch_tagged_results_accumulator.go b/src/dbnode/client/fetch_tagged_results_accumulator.go index 8e6935f7c1..32f3b3aa2d 100644 --- a/src/dbnode/client/fetch_tagged_results_accumulator.go +++ b/src/dbnode/client/fetch_tagged_results_accumulator.go @@ -63,7 +63,7 @@ type fetchTaggedResultAccumulator struct { numHostsPending int32 numShardsPending int32 - errors xerrors.Errors + errors []error fetchResponses fetchTaggedIDResults aggResponses aggregateResults exhaustive bool @@ -203,9 +203,18 @@ func (accum *fetchTaggedResultAccumulator) accumulatedResult( // all shards, so we need to fail if accum.numHostsPending == 0 && accum.numShardsPending != 0 { doneAccumulating := true - return doneAccumulating, fmt.Errorf( - "unable to satisfy consistency requirements for %d shards [ err = %s ]", - accum.numShardsPending, accum.errors.Error()) + // NB(r): Use new renamed error to keep the underlying error + // (invalid/retryable) type. + err := fmt.Errorf("unable to satisfy consistency requirements: shards=%d, err=%v", + accum.numShardsPending, accum.errors) + for i := range accum.errors { + if IsBadRequestError(accum.errors[i]) { + err = xerrors.NewInvalidParamsError(err) + err = xerrors.NewNonRetryableError(err) + break + } + } + return doneAccumulating, err } doneAccumulating := false diff --git a/src/dbnode/client/session.go b/src/dbnode/client/session.go index 704fdce7e5..c85fe493cf 100644 --- a/src/dbnode/client/session.go +++ b/src/dbnode/client/session.go @@ -1658,6 +1658,12 @@ func (s *session) fetchIDsAttempt( completionFn := func(result interface{}, err error) { var snapshotSuccess int32 if err != nil { + if IsBadRequestError(err) { + // Wrap with invalid params and non-retryable so it is + // not retried. + err = xerrors.NewInvalidParamsError(err) + err = xerrors.NewNonRetryableError(err) + } atomic.AddInt32(&errs, 1) // NB(r): reuse the error lock here as we do not want to create // a whole lot of locks for every single ID fetched due to size diff --git a/src/dbnode/client/write_state.go b/src/dbnode/client/write_state.go index 1b91bb181c..3af5d518f6 100644 --- a/src/dbnode/client/write_state.go +++ b/src/dbnode/client/write_state.go @@ -26,10 +26,10 @@ import ( "github.com/m3db/m3/src/cluster/shard" "github.com/m3db/m3/src/dbnode/topology" - "github.com/m3db/m3/src/x/serialize" xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/pool" + "github.com/m3db/m3/src/x/serialize" ) // writeOp represents a generic write operation @@ -115,6 +115,12 @@ func (w *writeState) completionFn(result interface{}, err error) { var wErr error if err != nil { + if IsBadRequestError(err) { + // Wrap with invalid params and non-retryable so it is + // not retried. + err = xerrors.NewInvalidParamsError(err) + err = xerrors.NewNonRetryableError(err) + } wErr = xerrors.NewRenamedError(err, fmt.Errorf("error writing to host %s: %v", hostID, err)) } else if hostShardSet, ok := w.topoMap.LookupHostShardSet(hostID); !ok { errStr := "missing host shard in writeState completionFn: %s" diff --git a/src/dbnode/network/server/tchannelthrift/convert/convert.go b/src/dbnode/network/server/tchannelthrift/convert/convert.go index ee3e777c20..3cadd06855 100644 --- a/src/dbnode/network/server/tchannelthrift/convert/convert.go +++ b/src/dbnode/network/server/tchannelthrift/convert/convert.go @@ -219,8 +219,9 @@ func FromRPCFetchTaggedRequest( } opts := index.QueryOptions{ - StartInclusive: start, - EndExclusive: end, + StartInclusive: start, + EndExclusive: end, + RequireExhaustive: req.RequireExhaustive, } if l := req.Limit; l != nil { opts.Limit = int(*l) @@ -264,11 +265,12 @@ func ToRPCFetchTaggedRequest( } request := rpc.FetchTaggedRequest{ - NameSpace: ns.Bytes(), - RangeStart: rangeStart, - RangeEnd: rangeEnd, - FetchData: fetchData, - Query: query, + NameSpace: ns.Bytes(), + RangeStart: rangeStart, + RangeEnd: rangeEnd, + FetchData: fetchData, + Query: query, + RequireExhaustive: opts.RequireExhaustive, } if opts.Limit > 0 { diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index 4f0fbc3d91..b8730924fb 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -1227,10 +1227,13 @@ func (i *nsIndex) query( // If require exhaustive but not, return error. if opts.RequireExhaustive { i.metrics.queryNonExhaustiveLimitError.Inc(1) - return exhaustive, fmt.Errorf("query matched too many time series: require_exhaustive=%v, limit=%d, matched=%d", + err := fmt.Errorf( + "query matched too many time series: require_exhaustive=%v, limit=%d, matched=%d", opts.RequireExhaustive, opts.Limit, results.Size()) + // NB(r): Make sure error is not retried and returns as bad request. + return exhaustive, xerrors.NewInvalidParamsError(err) } // Otherwise non-exhaustive but not required to be. diff --git a/src/query/api/v1/handler/prometheus/handleroptions/fetch_options.go b/src/query/api/v1/handler/prometheus/handleroptions/fetch_options.go index 6abeaa229a..9969619d55 100644 --- a/src/query/api/v1/handler/prometheus/handleroptions/fetch_options.go +++ b/src/query/api/v1/handler/prometheus/handleroptions/fetch_options.go @@ -79,7 +79,6 @@ func ParseLimit(req *http.Request, defaultLimit int) (int, error) { "could not parse limit: input=%s, err=%v", str, err) return 0, err } - return n, nil } @@ -90,24 +89,58 @@ func ParseLimit(req *http.Request, defaultLimit int) (int, error) { "could not parse limit: input=%s, err=%v", str, err) return 0, err } - return n, nil } return defaultLimit, nil } +// ParseRequireExhaustive parses request limit require exhaustive from header or +// query string. +func ParseRequireExhaustive(req *http.Request, defaultValue bool) (bool, error) { + if str := req.Header.Get(LimitRequireExhaustiveHeader); str != "" { + v, err := strconv.ParseBool(str) + if err != nil { + err = fmt.Errorf( + "could not parse limit: input=%s, err=%v", str, err) + return false, err + } + return v, nil + } + + if str := req.FormValue("requireExhaustive"); str != "" { + v, err := strconv.ParseBool(str) + if err != nil { + err = fmt.Errorf( + "could not parse limit: input=%s, err=%v", str, err) + return false, err + } + return v, nil + } + + return defaultValue, nil +} + // NewFetchOptions parses an http request into fetch options. func (b fetchOptionsBuilder) NewFetchOptions( req *http.Request, ) (*storage.FetchOptions, *xhttp.ParseError) { fetchOpts := storage.NewFetchOptions() + limit, err := ParseLimit(req, b.opts.Limit) if err != nil { return nil, xhttp.NewParseError(err, http.StatusBadRequest) } fetchOpts.Limit = limit + + requireExhaustive, err := ParseRequireExhaustive(req, b.opts.RequireExhaustive) + if err != nil { + return nil, xhttp.NewParseError(err, http.StatusBadRequest) + } + + fetchOpts.RequireExhaustive = requireExhaustive + if str := req.Header.Get(MetricsTypeHeader); str != "" { mt, err := storagemetadata.ParseMetricsType(str) if err != nil { diff --git a/src/query/api/v1/handler/prometheus/handleroptions/headers.go b/src/query/api/v1/handler/prometheus/handleroptions/headers.go index 4d40631850..22092106dd 100644 --- a/src/query/api/v1/handler/prometheus/handleroptions/headers.go +++ b/src/query/api/v1/handler/prometheus/handleroptions/headers.go @@ -82,6 +82,10 @@ const ( // the number of time series returned by each storage node. LimitMaxSeriesHeader = M3HeaderPrefix + "Limit-Max-Series" + // LimitRequireExhaustiveHeader is the M3 limit exhaustive header that will + // ensure M3 returns an error if the results set is not exhaustive. + LimitRequireExhaustiveHeader = M3HeaderPrefix + "Limit-Require-Exhaustive" + // UnaggregatedStoragePolicy specifies the unaggregated storage policy. UnaggregatedStoragePolicy = "unaggregated"