Skip to content

Commit

Permalink
[query/dbnode] Fix require exhaustive propagation of require exhausti…
Browse files Browse the repository at this point in the history
…ve option through RPC (#2409)
  • Loading branch information
robskillington authored Jun 13, 2020
1 parent b770a20 commit 70ebf1b
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 15 deletions.
8 changes: 8 additions & 0 deletions src/dbnode/client/fetch_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/dbnode/topology"
"github.com/m3db/m3/src/dbnode/x/xpool"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/serialize"
)
Expand Down Expand Up @@ -136,6 +137,13 @@ func (f *fetchState) completionFn(
result interface{},
resultErr error,
) {
if IsBadRequestError(resultErr) {
// Wrap with invalid params and non-retryable so it is
// not retried.
resultErr = xerrors.NewInvalidParamsError(resultErr)
resultErr = xerrors.NewNonRetryableError(resultErr)
}

f.Lock()
defer func() {
f.Unlock()
Expand Down
17 changes: 13 additions & 4 deletions src/dbnode/client/fetch_tagged_results_accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type fetchTaggedResultAccumulator struct {
numHostsPending int32
numShardsPending int32

errors xerrors.Errors
errors []error
fetchResponses fetchTaggedIDResults
aggResponses aggregateResults
exhaustive bool
Expand Down Expand Up @@ -203,9 +203,18 @@ func (accum *fetchTaggedResultAccumulator) accumulatedResult(
// all shards, so we need to fail
if accum.numHostsPending == 0 && accum.numShardsPending != 0 {
doneAccumulating := true
return doneAccumulating, fmt.Errorf(
"unable to satisfy consistency requirements for %d shards [ err = %s ]",
accum.numShardsPending, accum.errors.Error())
// NB(r): Use new renamed error to keep the underlying error
// (invalid/retryable) type.
err := fmt.Errorf("unable to satisfy consistency requirements: shards=%d, err=%v",
accum.numShardsPending, accum.errors)
for i := range accum.errors {
if IsBadRequestError(accum.errors[i]) {
err = xerrors.NewInvalidParamsError(err)
err = xerrors.NewNonRetryableError(err)
break
}
}
return doneAccumulating, err
}

doneAccumulating := false
Expand Down
6 changes: 6 additions & 0 deletions src/dbnode/client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,12 @@ func (s *session) fetchIDsAttempt(
completionFn := func(result interface{}, err error) {
var snapshotSuccess int32
if err != nil {
if IsBadRequestError(err) {
// Wrap with invalid params and non-retryable so it is
// not retried.
err = xerrors.NewInvalidParamsError(err)
err = xerrors.NewNonRetryableError(err)
}
atomic.AddInt32(&errs, 1)
// NB(r): reuse the error lock here as we do not want to create
// a whole lot of locks for every single ID fetched due to size
Expand Down
8 changes: 7 additions & 1 deletion src/dbnode/client/write_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import (

"github.com/m3db/m3/src/cluster/shard"
"github.com/m3db/m3/src/dbnode/topology"
"github.com/m3db/m3/src/x/serialize"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/pool"
"github.com/m3db/m3/src/x/serialize"
)

// writeOp represents a generic write operation
Expand Down Expand Up @@ -115,6 +115,12 @@ func (w *writeState) completionFn(result interface{}, err error) {
var wErr error

if err != nil {
if IsBadRequestError(err) {
// Wrap with invalid params and non-retryable so it is
// not retried.
err = xerrors.NewInvalidParamsError(err)
err = xerrors.NewNonRetryableError(err)
}
wErr = xerrors.NewRenamedError(err, fmt.Errorf("error writing to host %s: %v", hostID, err))
} else if hostShardSet, ok := w.topoMap.LookupHostShardSet(hostID); !ok {
errStr := "missing host shard in writeState completionFn: %s"
Expand Down
16 changes: 9 additions & 7 deletions src/dbnode/network/server/tchannelthrift/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,9 @@ func FromRPCFetchTaggedRequest(
}

opts := index.QueryOptions{
StartInclusive: start,
EndExclusive: end,
StartInclusive: start,
EndExclusive: end,
RequireExhaustive: req.RequireExhaustive,
}
if l := req.Limit; l != nil {
opts.Limit = int(*l)
Expand Down Expand Up @@ -264,11 +265,12 @@ func ToRPCFetchTaggedRequest(
}

request := rpc.FetchTaggedRequest{
NameSpace: ns.Bytes(),
RangeStart: rangeStart,
RangeEnd: rangeEnd,
FetchData: fetchData,
Query: query,
NameSpace: ns.Bytes(),
RangeStart: rangeStart,
RangeEnd: rangeEnd,
FetchData: fetchData,
Query: query,
RequireExhaustive: opts.RequireExhaustive,
}

if opts.Limit > 0 {
Expand Down
5 changes: 4 additions & 1 deletion src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1227,10 +1227,13 @@ func (i *nsIndex) query(
// If require exhaustive but not, return error.
if opts.RequireExhaustive {
i.metrics.queryNonExhaustiveLimitError.Inc(1)
return exhaustive, fmt.Errorf("query matched too many time series: require_exhaustive=%v, limit=%d, matched=%d",
err := fmt.Errorf(
"query matched too many time series: require_exhaustive=%v, limit=%d, matched=%d",
opts.RequireExhaustive,
opts.Limit,
results.Size())
// NB(r): Make sure error is not retried and returns as bad request.
return exhaustive, xerrors.NewInvalidParamsError(err)
}

// Otherwise non-exhaustive but not required to be.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func ParseLimit(req *http.Request, defaultLimit int) (int, error) {
"could not parse limit: input=%s, err=%v", str, err)
return 0, err
}

return n, nil
}

Expand All @@ -90,24 +89,58 @@ func ParseLimit(req *http.Request, defaultLimit int) (int, error) {
"could not parse limit: input=%s, err=%v", str, err)
return 0, err
}

return n, nil
}

return defaultLimit, nil
}

// ParseRequireExhaustive parses request limit require exhaustive from header or
// query string.
func ParseRequireExhaustive(req *http.Request, defaultValue bool) (bool, error) {
if str := req.Header.Get(LimitRequireExhaustiveHeader); str != "" {
v, err := strconv.ParseBool(str)
if err != nil {
err = fmt.Errorf(
"could not parse limit: input=%s, err=%v", str, err)
return false, err
}
return v, nil
}

if str := req.FormValue("requireExhaustive"); str != "" {
v, err := strconv.ParseBool(str)
if err != nil {
err = fmt.Errorf(
"could not parse limit: input=%s, err=%v", str, err)
return false, err
}
return v, nil
}

return defaultValue, nil
}

// NewFetchOptions parses an http request into fetch options.
func (b fetchOptionsBuilder) NewFetchOptions(
req *http.Request,
) (*storage.FetchOptions, *xhttp.ParseError) {
fetchOpts := storage.NewFetchOptions()

limit, err := ParseLimit(req, b.opts.Limit)
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

fetchOpts.Limit = limit

requireExhaustive, err := ParseRequireExhaustive(req, b.opts.RequireExhaustive)
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

fetchOpts.RequireExhaustive = requireExhaustive

if str := req.Header.Get(MetricsTypeHeader); str != "" {
mt, err := storagemetadata.ParseMetricsType(str)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions src/query/api/v1/handler/prometheus/handleroptions/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ const (
// the number of time series returned by each storage node.
LimitMaxSeriesHeader = M3HeaderPrefix + "Limit-Max-Series"

// LimitRequireExhaustiveHeader is the M3 limit exhaustive header that will
// ensure M3 returns an error if the results set is not exhaustive.
LimitRequireExhaustiveHeader = M3HeaderPrefix + "Limit-Require-Exhaustive"

// UnaggregatedStoragePolicy specifies the unaggregated storage policy.
UnaggregatedStoragePolicy = "unaggregated"

Expand Down

0 comments on commit 70ebf1b

Please sign in to comment.