Skip to content

Commit

Permalink
[dbnode] Introduce Resource Exhausted RPC Error flag for query limits (
Browse files Browse the repository at this point in the history
  • Loading branch information
vpranckaitis authored Dec 17, 2020
1 parent 6c2928c commit cf1ef64
Show file tree
Hide file tree
Showing 10 changed files with 253 additions and 17 deletions.
11 changes: 11 additions & 0 deletions src/dbnode/client/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ func IsBadRequestError(err error) bool {
return false
}

// IsResourceExhaustedError determines if the error is a resource exhausted error.
func IsResourceExhaustedError(err error) bool {
for err != nil {
if e, ok := err.(*rpc.Error); ok && tterrors.IsResourceExhaustedErrorFlag(e) { //nolint:errorlint
return true
}
err = xerrors.InnerError(err)
}
return false
}

// IsConsistencyResultError determines if the error is a consistency result error.
func IsConsistencyResultError(err error) bool {
for err != nil {
Expand Down
6 changes: 6 additions & 0 deletions src/dbnode/generated/thrift/rpc.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,15 @@ enum ErrorType {
BAD_REQUEST
}

enum ErrorFlags {
NONE = 0x00,
RESOURCE_EXHAUSTED = 0x01
}

exception Error {
1: required ErrorType type = ErrorType.INTERNAL_ERROR
2: required string message
3: optional i64 flags = 0
}

exception WriteBatchRawErrors {
Expand Down
101 changes: 101 additions & 0 deletions src/dbnode/generated/thrift/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,64 @@ func (p *ErrorType) Value() (driver.Value, error) {
return int64(*p), nil
}

type ErrorFlags int64

const (
ErrorFlags_NONE ErrorFlags = 0
ErrorFlags_RESOURCE_EXHAUSTED ErrorFlags = 1
)

func (p ErrorFlags) String() string {
switch p {
case ErrorFlags_NONE:
return "NONE"
case ErrorFlags_RESOURCE_EXHAUSTED:
return "RESOURCE_EXHAUSTED"
}
return "<UNSET>"
}

func ErrorFlagsFromString(s string) (ErrorFlags, error) {
switch s {
case "NONE":
return ErrorFlags_NONE, nil
case "RESOURCE_EXHAUSTED":
return ErrorFlags_RESOURCE_EXHAUSTED, nil
}
return ErrorFlags(0), fmt.Errorf("not a valid ErrorFlags string")
}

func ErrorFlagsPtr(v ErrorFlags) *ErrorFlags { return &v }

func (p ErrorFlags) MarshalText() ([]byte, error) {
return []byte(p.String()), nil
}

func (p *ErrorFlags) UnmarshalText(text []byte) error {
q, err := ErrorFlagsFromString(string(text))
if err != nil {
return err
}
*p = q
return nil
}

func (p *ErrorFlags) Scan(value interface{}) error {
v, ok := value.(int64)
if !ok {
return errors.New("Scan value is not int64")
}
*p = ErrorFlags(v)
return nil
}

func (p *ErrorFlags) Value() (driver.Value, error) {
if p == nil {
return nil, nil
}
return int64(*p), nil
}

type AggregateQueryType int64

const (
Expand Down Expand Up @@ -223,9 +281,11 @@ func (p *AggregateQueryType) Value() (driver.Value, error) {
// Attributes:
// - Type
// - Message
// - Flags
type Error struct {
Type ErrorType `thrift:"type,1,required" db:"type" json:"type"`
Message string `thrift:"message,2,required" db:"message" json:"message"`
Flags int64 `thrift:"flags,3" db:"flags" json:"flags,omitempty"`
}

func NewError() *Error {
Expand All @@ -241,6 +301,16 @@ func (p *Error) GetType() ErrorType {
func (p *Error) GetMessage() string {
return p.Message
}

var Error_Flags_DEFAULT int64 = 0

func (p *Error) GetFlags() int64 {
return p.Flags
}
func (p *Error) IsSetFlags() bool {
return p.Flags != Error_Flags_DEFAULT
}

func (p *Error) Read(iprot thrift.TProtocol) error {
if _, err := iprot.ReadStructBegin(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
Expand Down Expand Up @@ -268,6 +338,10 @@ func (p *Error) Read(iprot thrift.TProtocol) error {
return err
}
issetMessage = true
case 3:
if err := p.ReadField3(iprot); err != nil {
return err
}
default:
if err := iprot.Skip(fieldTypeId); err != nil {
return err
Expand Down Expand Up @@ -308,6 +382,15 @@ func (p *Error) ReadField2(iprot thrift.TProtocol) error {
return nil
}

func (p *Error) ReadField3(iprot thrift.TProtocol) error {
if v, err := iprot.ReadI64(); err != nil {
return thrift.PrependError("error reading field 3: ", err)
} else {
p.Flags = v
}
return nil
}

func (p *Error) Write(oprot thrift.TProtocol) error {
if err := oprot.WriteStructBegin("Error"); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err)
Expand All @@ -319,6 +402,9 @@ func (p *Error) Write(oprot thrift.TProtocol) error {
if err := p.writeField2(oprot); err != nil {
return err
}
if err := p.writeField3(oprot); err != nil {
return err
}
}
if err := oprot.WriteFieldStop(); err != nil {
return thrift.PrependError("write field stop error: ", err)
Expand Down Expand Up @@ -355,6 +441,21 @@ func (p *Error) writeField2(oprot thrift.TProtocol) (err error) {
return err
}

func (p *Error) writeField3(oprot thrift.TProtocol) (err error) {
if p.IsSetFlags() {
if err := oprot.WriteFieldBegin("flags", thrift.I64, 3); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field begin error 3:flags: ", p), err)
}
if err := oprot.WriteI64(int64(p.Flags)); err != nil {
return thrift.PrependError(fmt.Sprintf("%T.flags (3) field write error: ", p), err)
}
if err := oprot.WriteFieldEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field end error 3:flags: ", p), err)
}
}
return err
}

func (p *Error) String() string {
if p == nil {
return "<nil>"
Expand Down
15 changes: 3 additions & 12 deletions src/dbnode/network/server/tchannelthrift/cluster/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,7 @@ func (s *service) Fetch(tctx thrift.Context, req *rpc.FetchRequest) (*rpc.FetchR

it, err := session.Fetch(nsID, tsID, start, end)
if err != nil {
if client.IsBadRequestError(err) {
return nil, tterrors.NewBadRequestError(err)
}
return nil, tterrors.NewInternalError(err)
return nil, convert.ToRPCError(err)
}

defer it.Close()
Expand Down Expand Up @@ -340,10 +337,7 @@ func (s *service) Write(tctx thrift.Context, req *rpc.WriteRequest) error {
tsID := s.idPool.GetStringID(ctx, req.ID)
err = session.Write(nsID, tsID, ts, dp.Value, unit, dp.Annotation)
if err != nil {
if client.IsBadRequestError(err) {
return tterrors.NewBadRequestError(err)
}
return tterrors.NewInternalError(err)
return convert.ToRPCError(err)
}
return nil
}
Expand Down Expand Up @@ -377,10 +371,7 @@ func (s *service) WriteTagged(tctx thrift.Context, req *rpc.WriteTaggedRequest)
err = session.WriteTagged(nsID, tsID, ident.NewTagsIterator(tags),
ts, dp.Value, unit, dp.Annotation)
if err != nil {
if client.IsBadRequestError(err) {
return tterrors.NewBadRequestError(err)
}
return tterrors.NewInternalError(err)
return convert.ToRPCError(err)
}
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions src/dbnode/network/server/tchannelthrift/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/m3db/m3/src/dbnode/generated/thrift/rpc"
tterrors "github.com/m3db/m3/src/dbnode/network/server/tchannelthrift/errors"
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/dbnode/storage/limits"
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/dbnode/x/xpool"
"github.com/m3db/m3/src/m3ninx/generated/proto/querypb"
Expand Down Expand Up @@ -189,6 +190,9 @@ func ToRPCError(err error) *rpc.Error {
if err == nil {
return nil
}
if limits.IsQueryLimitExceededError(err) {
return tterrors.NewResourceExhaustedError(err)
}
if xerrors.IsInvalidParams(err) {
return tterrors.NewBadRequestError(err)
}
Expand Down
17 changes: 14 additions & 3 deletions src/dbnode/network/server/tchannelthrift/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ import (
"github.com/m3db/m3/src/dbnode/generated/thrift/rpc"
)

func newError(errType rpc.ErrorType, err error) *rpc.Error {
func newError(errType rpc.ErrorType, err error, flags int64) *rpc.Error {
rpcErr := rpc.NewError()
rpcErr.Type = errType
rpcErr.Message = fmt.Sprintf("%v", err)
rpcErr.Flags = flags
return rpcErr
}

Expand All @@ -43,14 +44,24 @@ func IsBadRequestError(err *rpc.Error) bool {
return err != nil && err.Type == rpc.ErrorType_BAD_REQUEST
}

// IsResourceExhaustedErrorFlag returns whether error has resource exhausted flag.
func IsResourceExhaustedErrorFlag(err *rpc.Error) bool {
return err != nil && err.Flags&int64(rpc.ErrorFlags_RESOURCE_EXHAUSTED) != 0
}

// NewInternalError creates a new internal error
func NewInternalError(err error) *rpc.Error {
return newError(rpc.ErrorType_INTERNAL_ERROR, err)
return newError(rpc.ErrorType_INTERNAL_ERROR, err, int64(rpc.ErrorFlags_NONE))
}

// NewBadRequestError creates a new bad request error
func NewBadRequestError(err error) *rpc.Error {
return newError(rpc.ErrorType_BAD_REQUEST, err)
return newError(rpc.ErrorType_BAD_REQUEST, err, int64(rpc.ErrorFlags_NONE))
}

// NewResourceExhaustedError creates a new resource exhausted error.
func NewResourceExhaustedError(err error) *rpc.Error {
return newError(rpc.ErrorType_BAD_REQUEST, err, int64(rpc.ErrorFlags_RESOURCE_EXHAUSTED))
}

// NewWriteBatchRawError creates a new write batch error
Expand Down
59 changes: 59 additions & 0 deletions src/dbnode/network/server/tchannelthrift/errors/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package errors

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
)

func TestErrorsAreRecognized(t *testing.T) {
someError := errors.New("some inner error")

tests := []struct {
name string
value bool
}{
{
name: "internal error",
value: IsInternalError(NewInternalError(someError)),
},
{
name: "bad request error",
value: IsBadRequestError(NewBadRequestError(someError)),
},
{
name: "resource exhausted error",
value: IsBadRequestError(NewResourceExhaustedError(someError)),
},
{
name: "resource exhausted flag",
value: IsResourceExhaustedErrorFlag(NewResourceExhaustedError(someError)),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.True(t, tt.value)
})
}
}
49 changes: 49 additions & 0 deletions src/dbnode/storage/limits/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package limits

import xerrors "github.com/m3db/m3/src/x/errors"

type queryLimitExceededError struct {
msg string
}

// NewQueryLimitExceededError creates a query limit exceeded error.
func NewQueryLimitExceededError(msg string) error {
return &queryLimitExceededError{
msg: msg,
}
}

func (err *queryLimitExceededError) Error() string {
return err.msg
}

// IsQueryLimitExceededError returns true if the error is a query limits exceeded error.
func IsQueryLimitExceededError(err error) bool {
for err != nil {
if _, ok := err.(*queryLimitExceededError); ok { //nolint:errorlint
return true
}
err = xerrors.InnerError(err)
}
return false
}
Loading

0 comments on commit cf1ef64

Please sign in to comment.