From 6fd03a9aa22c357c1745dbff293ffad5349ceafc Mon Sep 17 00:00:00 2001 From: Yash Nisar Date: Sat, 30 Dec 2023 14:35:02 -0600 Subject: [PATCH] Finish retries --- charts/dapr/crds/resiliency.yaml | 42 +++++- pkg/apis/resiliency/v1alpha1/types.go | 27 +++- .../v1alpha1/zz_generated.deepcopy.go | 66 +++++++++ pkg/http/api_directmessaging.go | 133 ++++++++++++++++-- pkg/resiliency/policy.go | 26 ++++ pkg/resiliency/resiliency.go | 33 +++++ pkg/retry/retry.go | 18 +++ 7 files changed, 325 insertions(+), 20 deletions(-) diff --git a/charts/dapr/crds/resiliency.yaml b/charts/dapr/crds/resiliency.yaml index ecc7a033db9..9f63c469289 100644 --- a/charts/dapr/crds/resiliency.yaml +++ b/charts/dapr/crds/resiliency.yaml @@ -1,11 +1,9 @@ - --- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.5.0 - creationTimestamp: null + controller-gen.kubebuilder.io/version: v0.9.2 name: resiliencies.dapr.io labels: app.kubernetes.io/part-of: "dapr" @@ -25,10 +23,14 @@ spec: openAPIV3Schema: properties: apiVersion: - description: 'APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' type: string kind: - description: 'Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' type: string metadata: type: object @@ -58,6 +60,35 @@ spec: properties: duration: type: string + matches: + properties: + errors: + items: + type: string + type: array + headers: + items: + properties: + header: + type: string + match: + properties: + exactMatch: + type: string + prefixMatch: + type: string + regexMatch: + type: string + suffixMatch: + type: string + type: object + type: object + type: array + httpStatusCodes: + items: + type: integer + type: array + type: object maxInterval: type: string maxRetries: @@ -132,4 +163,3 @@ spec: type: object served: true storage: true - diff --git a/pkg/apis/resiliency/v1alpha1/types.go b/pkg/apis/resiliency/v1alpha1/types.go index 9115417e1c4..3e4fa5051da 100644 --- a/pkg/apis/resiliency/v1alpha1/types.go +++ b/pkg/apis/resiliency/v1alpha1/types.go @@ -51,10 +51,29 @@ type Policies struct { } type Retry struct { - Policy string `json:"policy,omitempty" yaml:"policy,omitempty"` - Duration string `json:"duration,omitempty" yaml:"duration,omitempty"` - MaxInterval string `json:"maxInterval,omitempty" yaml:"maxInterval,omitempty"` - MaxRetries *int `json:"maxRetries,omitempty" yaml:"maxRetries,omitempty"` + Policy string `json:"policy,omitempty" yaml:"policy,omitempty"` + Duration string `json:"duration,omitempty" yaml:"duration,omitempty"` + MaxInterval string `json:"maxInterval,omitempty" yaml:"maxInterval,omitempty"` + MaxRetries *int `json:"maxRetries,omitempty" yaml:"maxRetries,omitempty"` + Matches *HTTPMatches `json:"matches,omitempty" yaml:"matches,omitempty"` +} + +type HTTPMatches struct { + Headers []HeaderMatch `json:"headers,omitempty" yaml:"headers,omitempty"` + HTTPStatusCodes []int `json:"httpStatusCodes,omitempty" yaml:"httpStatusCodes,omitempty"` + Errors []string `json:"errors,omitempty" yaml:"errors,omitempty"` +} + +type HeaderMatch struct { + Header string `json:"header,omitempty" yaml:"header,omitempty"` + Match ValueMatch `json:"match,omitempty" yaml:"match,omitempty"` +} + +type ValueMatch struct { + ExactMatch string `json:"exactMatch,omitempty" yaml:"exactMatch,omitempty"` + PrefixMatch string `json:"prefixMatch,omitempty" yaml:"prefixMatch,omitempty"` + SuffixMatch string `json:"suffixMatch,omitempty" yaml:"suffixMatch,omitempty"` + RegexMatch string `json:"regexMatch,omitempty" yaml:"regexMatch,omitempty"` } type CircuitBreaker struct { diff --git a/pkg/apis/resiliency/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/resiliency/v1alpha1/zz_generated.deepcopy.go index e79ac267d8e..5700627609c 100644 --- a/pkg/apis/resiliency/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/resiliency/v1alpha1/zz_generated.deepcopy.go @@ -87,6 +87,52 @@ func (in *EndpointPolicyNames) DeepCopy() *EndpointPolicyNames { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *HTTPMatches) DeepCopyInto(out *HTTPMatches) { + *out = *in + if in.Headers != nil { + in, out := &in.Headers, &out.Headers + *out = make([]HeaderMatch, len(*in)) + copy(*out, *in) + } + if in.HTTPStatusCodes != nil { + in, out := &in.HTTPStatusCodes, &out.HTTPStatusCodes + *out = make([]int, len(*in)) + copy(*out, *in) + } + if in.Errors != nil { + in, out := &in.Errors, &out.Errors + *out = make([]string, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new HTTPMatches. +func (in *HTTPMatches) DeepCopy() *HTTPMatches { + if in == nil { + return nil + } + out := new(HTTPMatches) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *HeaderMatch) DeepCopyInto(out *HeaderMatch) { + *out = *in + out.Match = in.Match +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new HeaderMatch. +func (in *HeaderMatch) DeepCopy() *HeaderMatch { + if in == nil { + return nil + } + out := new(HeaderMatch) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Policies) DeepCopyInto(out *Policies) { *out = *in @@ -226,6 +272,11 @@ func (in *Retry) DeepCopyInto(out *Retry) { *out = new(int) **out = **in } + if in.Matches != nil { + in, out := &in.Matches, &out.Matches + *out = new(HTTPMatches) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Retry. @@ -273,3 +324,18 @@ func (in *Targets) DeepCopy() *Targets { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ValueMatch) DeepCopyInto(out *ValueMatch) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ValueMatch. +func (in *ValueMatch) DeepCopy() *ValueMatch { + if in == nil { + return nil + } + out := new(ValueMatch) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/http/api_directmessaging.go b/pkg/http/api_directmessaging.go index 359179a4641..5c620a503c3 100644 --- a/pkg/http/api_directmessaging.go +++ b/pkg/http/api_directmessaging.go @@ -22,6 +22,7 @@ import ( "net/http" "net/url" "path" + "regexp" "strings" "sync/atomic" @@ -154,8 +155,18 @@ func (a *api) onDirectMessage(w http.ResponseWriter, r *http.Request) { success := atomic.Bool{} // Since we don't want to return the actual error, we have to extract several things in order to construct our response. resp, err := policyRunner(func(ctx context.Context) (*invokev1.InvokeMethodResponse, error) { + + httpErrMap := make(map[string]bool) + + //Convert the list of errors to a map for faster lookup + if policyDef.HasHttpRetryErrors() { + for _, err := range policyDef.GetHttpRetryErrors() { + httpErrMap[err] = true + } + } + rResp, rErr := a.directMessaging.Invoke(ctx, targetID, req) - if rErr != nil { + if rErr != nil && httpErrMap["reset"] { // Allowlist policies that are applied on the callee side can return a Permission Denied error. // For everything else, treat it as a gRPC transport error apiErr := messages.ErrDirectInvoke.WithFormat(targetID, rErr) @@ -181,7 +192,7 @@ func (a *api) onDirectMessage(w http.ResponseWriter, r *http.Request) { body, rErr = invokev1.ProtobufToJSON(resStatus) rResp.WithRawDataBytes(body) resStatus.Code = statusCode - if rErr != nil { + if rErr != nil && httpErrMap["reset"] { return rResp, invokeError{ statusCode: http.StatusInternalServerError, msg: NewErrorResponse("ERR_MALFORMED_RESPONSE", rErr.Error()).JSONErrorValue(), @@ -190,14 +201,116 @@ func (a *api) onDirectMessage(w http.ResponseWriter, r *http.Request) { } else { resStatus.Code = statusCode } - } else if resStatus.GetCode() < 200 || resStatus.GetCode() > 399 { - msg, _ := rResp.RawDataFull() - // Returning a `codeError` here will cause Resiliency to retry the request (if retries are enabled), but if the request continues to fail, the response is sent to the user with whatever status code the app returned. - return rResp, codeError{ - headers: rResp.Headers(), - statusCode: int(resStatus.GetCode()), - msg: msg, - contentType: rResp.ContentType(), + } else { + //Retriable headers regardless of status code + if policyDef.HasHttpRetryHeaders() { + headers := policyDef.GetHttpRetryHeaders() + + for _, confHeader := range headers { + confHeaderKey := confHeader.Header + var confHeaderVal string + + for respKey, respVal := range rResp.Headers() { + rVal := respVal.String() + rVal = strings.TrimPrefix(rVal, "values:") + rVal = strings.Replace(rVal, "\"", "", -1) + + if confHeader.Match.ExactMatch != "" { + confHeaderVal = confHeader.Match.ExactMatch + if confHeaderKey == respKey && confHeaderVal == rVal { + msg, _ := rResp.RawDataFull() + // Returning a `codeError` here will cause Resiliency to retry the request (if retries are enabled), but if the request continues to fail, the response is sent to the user with whatever status code the app returned. + return rResp, codeError{ + headers: rResp.Headers(), + statusCode: int(resStatus.Code), + msg: msg, + contentType: rResp.ContentType(), + } + } + } else if confHeader.Match.PrefixMatch != "" { + confHeaderVal = confHeader.Match.PrefixMatch + if confHeaderKey == respKey && strings.HasPrefix(rVal, confHeaderVal) { + msg, _ := rResp.RawDataFull() + // Returning a `codeError` here will cause Resiliency to retry the request (if retries are enabled), but if the request continues to fail, the response is sent to the user with whatever status code the app returned. + return rResp, codeError{ + headers: rResp.Headers(), + statusCode: int(resStatus.Code), + msg: msg, + contentType: rResp.ContentType(), + } + } + } else if confHeader.Match.SuffixMatch != "" { + confHeaderVal = confHeader.Match.SuffixMatch + if confHeaderKey == respKey && strings.HasSuffix(rVal, confHeaderVal) { + msg, _ := rResp.RawDataFull() + // Returning a `codeError` here will cause Resiliency to retry the request (if retries are enabled), but if the request continues to fail, the response is sent to the user with whatever status code the app returned. + return rResp, codeError{ + headers: rResp.Headers(), + statusCode: int(resStatus.Code), + msg: msg, + contentType: rResp.ContentType(), + } + } + } else if confHeader.Match.RegexMatch != "" { + confHeaderVal = confHeader.Match.RegexMatch + matched, err := regexp.MatchString(confHeaderVal, rVal) + if err != nil { + fmt.Println("Invalid regex:", err) + //fix this bruhhh + } + if confHeaderKey == respKey && matched { + msg, _ := rResp.RawDataFull() + // Returning a `codeError` here will cause Resiliency to retry the request (if retries are enabled), but if the request continues to fail, the response is sent to the user with whatever status code the app returned. + return rResp, codeError{ + headers: rResp.Headers(), + statusCode: int(resStatus.Code), + msg: msg, + contentType: rResp.ContentType(), + } + } + } + } + } + } + + if (resStatus.Code < 200 || resStatus.Code > 399) { + if policyDef.HasHttpRetryErrors() || policyDef.HasHttpRetryStatusCodes() { + errCodes := policyDef.GetHttpRetryStatusCodes() + + if ((httpErrMap["5xx"] && (resStatus.Code >= 500 && resStatus.Code <= 599)) || + (httpErrMap["retriable_4xx"] && resStatus.Code == 409)) { + msg, _ := rResp.RawDataFull() + // Returning a `codeError` here will cause Resiliency to retry the request (if retries are enabled), but if the request continues to fail, the response is sent to the user with whatever status code the app returned. + return rResp, codeError{ + headers: rResp.Headers(), + statusCode: int(resStatus.Code), + msg: msg, + contentType: rResp.ContentType(), + } + } + + for _, respCode := range errCodes { + if respCode == int(resStatus.Code) { + msg, _ := rResp.RawDataFull() + // Returning a `codeError` here will cause Resiliency to retry the request (if retries are enabled), but if the request continues to fail, the response is sent to the user with whatever status code the app returned. + return rResp, codeError{ + headers: rResp.Headers(), + statusCode: int(resStatus.Code), + msg: msg, + contentType: rResp.ContentType(), + } + } + } + } else { + msg, _ := rResp.RawDataFull() + // Returning a `codeError` here will cause Resiliency to retry the request (if retries are enabled), but if the request continues to fail, the response is sent to the user with whatever status code the app returned. + return rResp, codeError{ + headers: rResp.Headers(), + statusCode: int(resStatus.Code), + msg: msg, + contentType: rResp.ContentType(), + } + } } } diff --git a/pkg/resiliency/policy.go b/pkg/resiliency/policy.go index 035ced9d63c..012e28f6ea0 100644 --- a/pkg/resiliency/policy.go +++ b/pkg/resiliency/policy.go @@ -25,6 +25,7 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/dapr/dapr/pkg/resiliency/breaker" + httpRetryMatch "github.com/dapr/dapr/pkg/retry" "github.com/dapr/kit/logger" "github.com/dapr/kit/retry" ) @@ -50,6 +51,7 @@ type PolicyDefinition struct { name string t time.Duration r *retry.Config + httpRetryMatches *httpRetryMatch.HttpRetryMatch cb *breaker.CircuitBreaker addTimeoutActivatedMetric func() addRetryActivatedMetric func() @@ -80,6 +82,30 @@ func (p PolicyDefinition) HasRetries() bool { return p.r != nil && p.r.MaxRetries != 0 } +func (p PolicyDefinition) HasHttpRetryStatusCodes() bool { + return p.httpRetryMatches != nil && len(p.httpRetryMatches.HTTPStatusCodes) > 0 +} + +func (p PolicyDefinition) GetHttpRetryStatusCodes() []int { + return p.httpRetryMatches.HTTPStatusCodes +} + +func (p PolicyDefinition) HasHttpRetryErrors() bool { + return p.httpRetryMatches != nil && len(p.httpRetryMatches.Errors) > 0 +} + +func (p PolicyDefinition) GetHttpRetryErrors() []string { + return p.httpRetryMatches.Errors +} + +func (p PolicyDefinition) HasHttpRetryHeaders() bool { + return p.httpRetryMatches != nil && len(p.httpRetryMatches.Headers) > 0 +} + +func (p PolicyDefinition) GetHttpRetryHeaders() []httpRetryMatch.HeaderMatch { + return p.httpRetryMatches.Headers +} + type RunnerOpts[T any] struct { // The disposer is a function which is invoked when the operation fails, including due to timing out in a background goroutine. It receives the value returned by the operation function as long as it's non-zero (e.g. non-nil for pointer types). // The disposer can be used to perform cleanup tasks on values returned by the operation function that would otherwise leak (because they're not returned by the result of the runner). diff --git a/pkg/resiliency/resiliency.go b/pkg/resiliency/resiliency.go index 76691a07af1..b392cf88ebb 100644 --- a/pkg/resiliency/resiliency.go +++ b/pkg/resiliency/resiliency.go @@ -33,6 +33,8 @@ import ( diag "github.com/dapr/dapr/pkg/diagnostics" operatorv1pb "github.com/dapr/dapr/pkg/proto/operator/v1" "github.com/dapr/dapr/pkg/resiliency/breaker" + httpRetryMatch "github.com/dapr/dapr/pkg/retry" + "github.com/dapr/dapr/utils" "github.com/dapr/kit/config" "github.com/dapr/kit/logger" "github.com/dapr/kit/retry" @@ -112,6 +114,7 @@ type ( timeouts map[string]time.Duration retries map[string]*retry.Config + httpRetryMatch map[string]*httpRetryMatch.HttpRetryMatch circuitBreakers map[string]*breaker.CircuitBreaker actorCBCaches map[string]*lru.Cache[string, *breaker.CircuitBreaker] @@ -306,6 +309,7 @@ func New(log logger.Logger) *Resiliency { log: log, timeouts: make(map[string]time.Duration), retries: make(map[string]*retry.Config), + httpRetryMatch: make(map[string]*httpRetryMatch.HttpRetryMatch), circuitBreakers: make(map[string]*breaker.CircuitBreaker), actorCBCaches: make(map[string]*lru.Cache[string, *breaker.CircuitBreaker]), serviceCBs: make(map[string]*lru.Cache[string, *breaker.CircuitBreaker]), @@ -413,6 +417,31 @@ func (r *Resiliency) decodePolicies(c *resiliencyV1alpha.Resiliency) (err error) rc.MaxRetries = 3 } + if t.Matches != nil { + if r.httpRetryMatch[name] == nil { + r.httpRetryMatch[name] = &httpRetryMatch.HttpRetryMatch{} + } + if len(t.Matches.Errors) > 0 { + r.httpRetryMatch[name].Errors = t.Matches.Errors + } + if len(t.Matches.HTTPStatusCodes) > 0 { + r.httpRetryMatch[name].HTTPStatusCodes = t.Matches.HTTPStatusCodes + } + if len(t.Matches.Headers) > 0 { + for _, header := range t.Matches.Headers { + r.httpRetryMatch[name].Headers = append(r.httpRetryMatch[name].Headers, httpRetryMatch.HeaderMatch{ + Header: header.Header, + Match: httpRetryMatch.ValueMatch{ + ExactMatch: header.Match.ExactMatch, + PrefixMatch: header.Match.PrefixMatch, + SuffixMatch: header.Match.SuffixMatch, + RegexMatch: header.Match.RegexMatch, + }, + }) + } + } + } + r.retries[name] = &rc } else { r.log.Warnf("Attempted to override protected policy %s which is not allowed. Ignoring provided policy and using default.", name) @@ -576,6 +605,10 @@ func (r *Resiliency) EndpointPolicy(app string, endpoint string) *PolicyDefiniti } if policyNames.Retry != "" { policyDef.r = r.retries[policyNames.Retry] + + if r.httpRetryMatch != nil { + policyDef.httpRetryMatches = r.httpRetryMatch[policyNames.Retry] + } } if policyNames.CircuitBreaker != "" { template, ok := r.circuitBreakers[policyNames.CircuitBreaker] diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index c4040d3e404..ef7a2f3945d 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -19,3 +19,21 @@ const ( DefaultLinearBackoffInterval = time.Second DefaultLinearRetryCount = 3 ) + +type HttpRetryMatch struct { + Headers []HeaderMatch `mapstructure:"headers"` + HTTPStatusCodes []int `mapstructure:"httpStatusCodes"` + Errors []string `mapstructure:"errors"` +} + +type HeaderMatch struct { + Header string + Match ValueMatch +} + +type ValueMatch struct { + ExactMatch string + PrefixMatch string + SuffixMatch string + RegexMatch string +}