diff --git a/src/dbnode/client/errors.go b/src/dbnode/client/errors.go index 56ec98e7c2..5b49e501e3 100644 --- a/src/dbnode/client/errors.go +++ b/src/dbnode/client/errors.go @@ -54,6 +54,17 @@ func IsBadRequestError(err error) bool { return false } +// IsResourceExhaustedError determines if the error is a resource exhausted error. +func IsResourceExhaustedError(err error) bool { + for err != nil { + if e, ok := err.(*rpc.Error); ok && tterrors.IsResourceExhaustedErrorFlag(e) { //nolint:errorlint + return true + } + err = xerrors.InnerError(err) + } + return false +} + // IsConsistencyResultError determines if the error is a consistency result error. func IsConsistencyResultError(err error) bool { for err != nil { diff --git a/src/dbnode/generated/thrift/rpc.thrift b/src/dbnode/generated/thrift/rpc.thrift index 112405c525..e83cc7335f 100644 --- a/src/dbnode/generated/thrift/rpc.thrift +++ b/src/dbnode/generated/thrift/rpc.thrift @@ -32,9 +32,15 @@ enum ErrorType { BAD_REQUEST } +enum ErrorFlags { + NONE = 0x00, + RESOURCE_EXHAUSTED = 0x01 +} + exception Error { 1: required ErrorType type = ErrorType.INTERNAL_ERROR 2: required string message + 3: optional i64 flags = 0 } exception WriteBatchRawErrors { diff --git a/src/dbnode/generated/thrift/rpc/rpc.go b/src/dbnode/generated/thrift/rpc/rpc.go index 52d5696af3..678aa3b183 100644 --- a/src/dbnode/generated/thrift/rpc/rpc.go +++ b/src/dbnode/generated/thrift/rpc/rpc.go @@ -162,6 +162,64 @@ func (p *ErrorType) Value() (driver.Value, error) { return int64(*p), nil } +type ErrorFlags int64 + +const ( + ErrorFlags_NONE ErrorFlags = 0 + ErrorFlags_RESOURCE_EXHAUSTED ErrorFlags = 1 +) + +func (p ErrorFlags) String() string { + switch p { + case ErrorFlags_NONE: + return "NONE" + case ErrorFlags_RESOURCE_EXHAUSTED: + return "RESOURCE_EXHAUSTED" + } + return "" +} + +func ErrorFlagsFromString(s string) (ErrorFlags, error) { + switch s { + case "NONE": + return ErrorFlags_NONE, nil + case "RESOURCE_EXHAUSTED": + return ErrorFlags_RESOURCE_EXHAUSTED, nil + } + return ErrorFlags(0), fmt.Errorf("not a valid ErrorFlags string") +} + +func ErrorFlagsPtr(v ErrorFlags) *ErrorFlags { return &v } + +func (p ErrorFlags) MarshalText() ([]byte, error) { + return []byte(p.String()), nil +} + +func (p *ErrorFlags) UnmarshalText(text []byte) error { + q, err := ErrorFlagsFromString(string(text)) + if err != nil { + return err + } + *p = q + return nil +} + +func (p *ErrorFlags) Scan(value interface{}) error { + v, ok := value.(int64) + if !ok { + return errors.New("Scan value is not int64") + } + *p = ErrorFlags(v) + return nil +} + +func (p *ErrorFlags) Value() (driver.Value, error) { + if p == nil { + return nil, nil + } + return int64(*p), nil +} + type AggregateQueryType int64 const ( @@ -223,9 +281,11 @@ func (p *AggregateQueryType) Value() (driver.Value, error) { // Attributes: // - Type // - Message +// - Flags type Error struct { Type ErrorType `thrift:"type,1,required" db:"type" json:"type"` Message string `thrift:"message,2,required" db:"message" json:"message"` + Flags int64 `thrift:"flags,3" db:"flags" json:"flags,omitempty"` } func NewError() *Error { @@ -241,6 +301,16 @@ func (p *Error) GetType() ErrorType { func (p *Error) GetMessage() string { return p.Message } + +var Error_Flags_DEFAULT int64 = 0 + +func (p *Error) GetFlags() int64 { + return p.Flags +} +func (p *Error) IsSetFlags() bool { + return p.Flags != Error_Flags_DEFAULT +} + func (p *Error) Read(iprot thrift.TProtocol) error { if _, err := iprot.ReadStructBegin(); err != nil { return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) @@ -268,6 +338,10 @@ func (p *Error) Read(iprot thrift.TProtocol) error { return err } issetMessage = true + case 3: + if err := p.ReadField3(iprot); err != nil { + return err + } default: if err := iprot.Skip(fieldTypeId); err != nil { return err @@ -308,6 +382,15 @@ func (p *Error) ReadField2(iprot thrift.TProtocol) error { return nil } +func (p *Error) ReadField3(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI64(); err != nil { + return thrift.PrependError("error reading field 3: ", err) + } else { + p.Flags = v + } + return nil +} + func (p *Error) Write(oprot thrift.TProtocol) error { if err := oprot.WriteStructBegin("Error"); err != nil { return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) @@ -319,6 +402,9 @@ func (p *Error) Write(oprot thrift.TProtocol) error { if err := p.writeField2(oprot); err != nil { return err } + if err := p.writeField3(oprot); err != nil { + return err + } } if err := oprot.WriteFieldStop(); err != nil { return thrift.PrependError("write field stop error: ", err) @@ -355,6 +441,21 @@ func (p *Error) writeField2(oprot thrift.TProtocol) (err error) { return err } +func (p *Error) writeField3(oprot thrift.TProtocol) (err error) { + if p.IsSetFlags() { + if err := oprot.WriteFieldBegin("flags", thrift.I64, 3); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 3:flags: ", p), err) + } + if err := oprot.WriteI64(int64(p.Flags)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.flags (3) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 3:flags: ", p), err) + } + } + return err +} + func (p *Error) String() string { if p == nil { return "" diff --git a/src/dbnode/network/server/tchannelthrift/cluster/service.go b/src/dbnode/network/server/tchannelthrift/cluster/service.go index 40891ef573..0e2548acdd 100644 --- a/src/dbnode/network/server/tchannelthrift/cluster/service.go +++ b/src/dbnode/network/server/tchannelthrift/cluster/service.go @@ -241,10 +241,7 @@ func (s *service) Fetch(tctx thrift.Context, req *rpc.FetchRequest) (*rpc.FetchR it, err := session.Fetch(nsID, tsID, start, end) if err != nil { - if client.IsBadRequestError(err) { - return nil, tterrors.NewBadRequestError(err) - } - return nil, tterrors.NewInternalError(err) + return nil, convert.ToRPCError(err) } defer it.Close() @@ -340,10 +337,7 @@ func (s *service) Write(tctx thrift.Context, req *rpc.WriteRequest) error { tsID := s.idPool.GetStringID(ctx, req.ID) err = session.Write(nsID, tsID, ts, dp.Value, unit, dp.Annotation) if err != nil { - if client.IsBadRequestError(err) { - return tterrors.NewBadRequestError(err) - } - return tterrors.NewInternalError(err) + return convert.ToRPCError(err) } return nil } @@ -377,10 +371,7 @@ func (s *service) WriteTagged(tctx thrift.Context, req *rpc.WriteTaggedRequest) err = session.WriteTagged(nsID, tsID, ident.NewTagsIterator(tags), ts, dp.Value, unit, dp.Annotation) if err != nil { - if client.IsBadRequestError(err) { - return tterrors.NewBadRequestError(err) - } - return tterrors.NewInternalError(err) + return convert.ToRPCError(err) } return nil } diff --git a/src/dbnode/network/server/tchannelthrift/convert/convert.go b/src/dbnode/network/server/tchannelthrift/convert/convert.go index b2d4ac20f1..ed7b9a7fd0 100644 --- a/src/dbnode/network/server/tchannelthrift/convert/convert.go +++ b/src/dbnode/network/server/tchannelthrift/convert/convert.go @@ -28,6 +28,7 @@ import ( "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" tterrors "github.com/m3db/m3/src/dbnode/network/server/tchannelthrift/errors" "github.com/m3db/m3/src/dbnode/storage/index" + "github.com/m3db/m3/src/dbnode/storage/limits" "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/dbnode/x/xpool" "github.com/m3db/m3/src/m3ninx/generated/proto/querypb" @@ -189,6 +190,9 @@ func ToRPCError(err error) *rpc.Error { if err == nil { return nil } + if limits.IsQueryLimitExceededError(err) { + return tterrors.NewResourceExhaustedError(err) + } if xerrors.IsInvalidParams(err) { return tterrors.NewBadRequestError(err) } diff --git a/src/dbnode/network/server/tchannelthrift/errors/errors.go b/src/dbnode/network/server/tchannelthrift/errors/errors.go index 035e3f4fb8..4313aa8357 100644 --- a/src/dbnode/network/server/tchannelthrift/errors/errors.go +++ b/src/dbnode/network/server/tchannelthrift/errors/errors.go @@ -26,10 +26,11 @@ import ( "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" ) -func newError(errType rpc.ErrorType, err error) *rpc.Error { +func newError(errType rpc.ErrorType, err error, flags int64) *rpc.Error { rpcErr := rpc.NewError() rpcErr.Type = errType rpcErr.Message = fmt.Sprintf("%v", err) + rpcErr.Flags = flags return rpcErr } @@ -43,14 +44,24 @@ func IsBadRequestError(err *rpc.Error) bool { return err != nil && err.Type == rpc.ErrorType_BAD_REQUEST } +// IsResourceExhaustedErrorFlag returns whether error has resource exhausted flag. +func IsResourceExhaustedErrorFlag(err *rpc.Error) bool { + return err != nil && err.Flags&int64(rpc.ErrorFlags_RESOURCE_EXHAUSTED) != 0 +} + // NewInternalError creates a new internal error func NewInternalError(err error) *rpc.Error { - return newError(rpc.ErrorType_INTERNAL_ERROR, err) + return newError(rpc.ErrorType_INTERNAL_ERROR, err, int64(rpc.ErrorFlags_NONE)) } // NewBadRequestError creates a new bad request error func NewBadRequestError(err error) *rpc.Error { - return newError(rpc.ErrorType_BAD_REQUEST, err) + return newError(rpc.ErrorType_BAD_REQUEST, err, int64(rpc.ErrorFlags_NONE)) +} + +// NewResourceExhaustedError creates a new resource exhausted error. +func NewResourceExhaustedError(err error) *rpc.Error { + return newError(rpc.ErrorType_BAD_REQUEST, err, int64(rpc.ErrorFlags_RESOURCE_EXHAUSTED)) } // NewWriteBatchRawError creates a new write batch error diff --git a/src/dbnode/network/server/tchannelthrift/errors/errors_test.go b/src/dbnode/network/server/tchannelthrift/errors/errors_test.go new file mode 100644 index 0000000000..6f5f915a4a --- /dev/null +++ b/src/dbnode/network/server/tchannelthrift/errors/errors_test.go @@ -0,0 +1,59 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package errors + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestErrorsAreRecognized(t *testing.T) { + someError := errors.New("some inner error") + + tests := []struct { + name string + value bool + }{ + { + name: "internal error", + value: IsInternalError(NewInternalError(someError)), + }, + { + name: "bad request error", + value: IsBadRequestError(NewBadRequestError(someError)), + }, + { + name: "resource exhausted error", + value: IsBadRequestError(NewResourceExhaustedError(someError)), + }, + { + name: "resource exhausted flag", + value: IsResourceExhaustedErrorFlag(NewResourceExhaustedError(someError)), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.True(t, tt.value) + }) + } +} diff --git a/src/dbnode/storage/limits/errors.go b/src/dbnode/storage/limits/errors.go new file mode 100644 index 0000000000..a876dd9b7a --- /dev/null +++ b/src/dbnode/storage/limits/errors.go @@ -0,0 +1,49 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package limits + +import xerrors "github.com/m3db/m3/src/x/errors" + +type queryLimitExceededError struct { + msg string +} + +// NewQueryLimitExceededError creates a query limit exceeded error. +func NewQueryLimitExceededError(msg string) error { + return &queryLimitExceededError{ + msg: msg, + } +} + +func (err *queryLimitExceededError) Error() string { + return err.msg +} + +// IsQueryLimitExceededError returns true if the error is a query limits exceeded error. +func IsQueryLimitExceededError(err error) bool { + for err != nil { + if _, ok := err.(*queryLimitExceededError); ok { //nolint:errorlint + return true + } + err = xerrors.InnerError(err) + } + return false +} diff --git a/src/dbnode/storage/limits/query_limits.go b/src/dbnode/storage/limits/query_limits.go index 49274f83b7..44643d40ec 100644 --- a/src/dbnode/storage/limits/query_limits.go +++ b/src/dbnode/storage/limits/query_limits.go @@ -192,9 +192,9 @@ func (q *lookbackLimit) exceeded() error { func (q *lookbackLimit) checkLimit(recent int64) error { if q.options.Limit > 0 && recent > q.options.Limit { q.metrics.exceeded.Inc(1) - return xerrors.NewInvalidParamsError(fmt.Errorf( + return xerrors.NewInvalidParamsError(NewQueryLimitExceededError(fmt.Sprintf( "query aborted due to limit: name=%s, limit=%d, current=%d, within=%s", - q.name, q.options.Limit, recent, q.options.Lookback)) + q.name, q.options.Limit, recent, q.options.Lookback))) } return nil } diff --git a/src/dbnode/storage/limits/query_limits_test.go b/src/dbnode/storage/limits/query_limits_test.go index 5ea202bbc1..7739812c62 100644 --- a/src/dbnode/storage/limits/query_limits_test.go +++ b/src/dbnode/storage/limits/query_limits_test.go @@ -67,6 +67,7 @@ func TestQueryLimits(t *testing.T) { err = queryLimits.AnyExceeded() require.Error(t, err) require.True(t, xerrors.IsInvalidParams(err)) + require.True(t, IsQueryLimitExceededError(err)) opts = testQueryLimitOptions(docOpts, bytesOpts, instrument.NewOptions()) queryLimits, err = NewQueryLimits(opts) @@ -82,6 +83,7 @@ func TestQueryLimits(t *testing.T) { err = queryLimits.AnyExceeded() require.Error(t, err) require.True(t, xerrors.IsInvalidParams(err)) + require.True(t, IsQueryLimitExceededError(err)) } func TestLookbackLimit(t *testing.T) { @@ -161,6 +163,7 @@ func verifyLimit(t *testing.T, limit *lookbackLimit, inc int, expectedLimit int6 } else { require.Error(t, err) require.True(t, xerrors.IsInvalidParams(err)) + require.True(t, IsQueryLimitExceededError(err)) exceededCount++ } err = limit.exceeded() @@ -169,6 +172,7 @@ func verifyLimit(t *testing.T, limit *lookbackLimit, inc int, expectedLimit int6 } else { require.Error(t, err) require.True(t, xerrors.IsInvalidParams(err)) + require.True(t, IsQueryLimitExceededError(err)) exceededCount++ } return exceededCount