Skip to content

Commit

Permalink
Finish retries
Browse files Browse the repository at this point in the history
  • Loading branch information
yash-nisar committed Dec 30, 2023
1 parent 9e2bcdc commit 6fd03a9
Show file tree
Hide file tree
Showing 7 changed files with 325 additions and 20 deletions.
42 changes: 36 additions & 6 deletions charts/dapr/crds/resiliency.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@

---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.5.0
creationTimestamp: null
controller-gen.kubebuilder.io/version: v0.9.2
name: resiliencies.dapr.io
labels:
app.kubernetes.io/part-of: "dapr"
Expand All @@ -25,10 +23,14 @@ spec:
openAPIV3Schema:
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
description: 'APIVersion defines the versioned schema of this representation
of an object. Servers should convert recognized schemas to the latest
internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
type: string
kind:
description: 'Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
description: 'Kind is a string value representing the REST resource this
object represents. Servers may infer this from the endpoint the client
submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
metadata:
type: object
Expand Down Expand Up @@ -58,6 +60,35 @@ spec:
properties:
duration:
type: string
matches:
properties:
errors:
items:
type: string
type: array
headers:
items:
properties:
header:
type: string
match:
properties:
exactMatch:
type: string
prefixMatch:
type: string
regexMatch:
type: string
suffixMatch:
type: string
type: object
type: object
type: array
httpStatusCodes:
items:
type: integer
type: array
type: object
maxInterval:
type: string
maxRetries:
Expand Down Expand Up @@ -132,4 +163,3 @@ spec:
type: object
served: true
storage: true

27 changes: 23 additions & 4 deletions pkg/apis/resiliency/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,29 @@ type Policies struct {
}

type Retry struct {
Policy string `json:"policy,omitempty" yaml:"policy,omitempty"`
Duration string `json:"duration,omitempty" yaml:"duration,omitempty"`
MaxInterval string `json:"maxInterval,omitempty" yaml:"maxInterval,omitempty"`
MaxRetries *int `json:"maxRetries,omitempty" yaml:"maxRetries,omitempty"`
Policy string `json:"policy,omitempty" yaml:"policy,omitempty"`
Duration string `json:"duration,omitempty" yaml:"duration,omitempty"`
MaxInterval string `json:"maxInterval,omitempty" yaml:"maxInterval,omitempty"`
MaxRetries *int `json:"maxRetries,omitempty" yaml:"maxRetries,omitempty"`
Matches *HTTPMatches `json:"matches,omitempty" yaml:"matches,omitempty"`
}

type HTTPMatches struct {
Headers []HeaderMatch `json:"headers,omitempty" yaml:"headers,omitempty"`
HTTPStatusCodes []int `json:"httpStatusCodes,omitempty" yaml:"httpStatusCodes,omitempty"`
Errors []string `json:"errors,omitempty" yaml:"errors,omitempty"`
}

type HeaderMatch struct {
Header string `json:"header,omitempty" yaml:"header,omitempty"`
Match ValueMatch `json:"match,omitempty" yaml:"match,omitempty"`
}

type ValueMatch struct {
ExactMatch string `json:"exactMatch,omitempty" yaml:"exactMatch,omitempty"`
PrefixMatch string `json:"prefixMatch,omitempty" yaml:"prefixMatch,omitempty"`
SuffixMatch string `json:"suffixMatch,omitempty" yaml:"suffixMatch,omitempty"`
RegexMatch string `json:"regexMatch,omitempty" yaml:"regexMatch,omitempty"`
}

type CircuitBreaker struct {
Expand Down
66 changes: 66 additions & 0 deletions pkg/apis/resiliency/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

133 changes: 123 additions & 10 deletions pkg/http/api_directmessaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net/http"
"net/url"
"path"
"regexp"
"strings"
"sync/atomic"

Expand Down Expand Up @@ -154,8 +155,18 @@ func (a *api) onDirectMessage(w http.ResponseWriter, r *http.Request) {
success := atomic.Bool{}
// Since we don't want to return the actual error, we have to extract several things in order to construct our response.
resp, err := policyRunner(func(ctx context.Context) (*invokev1.InvokeMethodResponse, error) {

httpErrMap := make(map[string]bool)

//Convert the list of errors to a map for faster lookup
if policyDef.HasHttpRetryErrors() {
for _, err := range policyDef.GetHttpRetryErrors() {
httpErrMap[err] = true
}
}

rResp, rErr := a.directMessaging.Invoke(ctx, targetID, req)
if rErr != nil {
if rErr != nil && httpErrMap["reset"] {
// Allowlist policies that are applied on the callee side can return a Permission Denied error.
// For everything else, treat it as a gRPC transport error
apiErr := messages.ErrDirectInvoke.WithFormat(targetID, rErr)
Expand All @@ -181,7 +192,7 @@ func (a *api) onDirectMessage(w http.ResponseWriter, r *http.Request) {
body, rErr = invokev1.ProtobufToJSON(resStatus)
rResp.WithRawDataBytes(body)
resStatus.Code = statusCode
if rErr != nil {
if rErr != nil && httpErrMap["reset"] {
return rResp, invokeError{
statusCode: http.StatusInternalServerError,
msg: NewErrorResponse("ERR_MALFORMED_RESPONSE", rErr.Error()).JSONErrorValue(),
Expand All @@ -190,14 +201,116 @@ func (a *api) onDirectMessage(w http.ResponseWriter, r *http.Request) {
} else {
resStatus.Code = statusCode
}
} else if resStatus.GetCode() < 200 || resStatus.GetCode() > 399 {
msg, _ := rResp.RawDataFull()
// Returning a `codeError` here will cause Resiliency to retry the request (if retries are enabled), but if the request continues to fail, the response is sent to the user with whatever status code the app returned.
return rResp, codeError{
headers: rResp.Headers(),
statusCode: int(resStatus.GetCode()),
msg: msg,
contentType: rResp.ContentType(),
} else {
//Retriable headers regardless of status code
if policyDef.HasHttpRetryHeaders() {
headers := policyDef.GetHttpRetryHeaders()

for _, confHeader := range headers {
confHeaderKey := confHeader.Header
var confHeaderVal string

for respKey, respVal := range rResp.Headers() {
rVal := respVal.String()
rVal = strings.TrimPrefix(rVal, "values:")
rVal = strings.Replace(rVal, "\"", "", -1)

if confHeader.Match.ExactMatch != "" {
confHeaderVal = confHeader.Match.ExactMatch
if confHeaderKey == respKey && confHeaderVal == rVal {
msg, _ := rResp.RawDataFull()
// Returning a `codeError` here will cause Resiliency to retry the request (if retries are enabled), but if the request continues to fail, the response is sent to the user with whatever status code the app returned.
return rResp, codeError{
headers: rResp.Headers(),
statusCode: int(resStatus.Code),
msg: msg,
contentType: rResp.ContentType(),
}
}
} else if confHeader.Match.PrefixMatch != "" {
confHeaderVal = confHeader.Match.PrefixMatch
if confHeaderKey == respKey && strings.HasPrefix(rVal, confHeaderVal) {
msg, _ := rResp.RawDataFull()
// Returning a `codeError` here will cause Resiliency to retry the request (if retries are enabled), but if the request continues to fail, the response is sent to the user with whatever status code the app returned.
return rResp, codeError{
headers: rResp.Headers(),
statusCode: int(resStatus.Code),
msg: msg,
contentType: rResp.ContentType(),
}
}
} else if confHeader.Match.SuffixMatch != "" {
confHeaderVal = confHeader.Match.SuffixMatch
if confHeaderKey == respKey && strings.HasSuffix(rVal, confHeaderVal) {
msg, _ := rResp.RawDataFull()
// Returning a `codeError` here will cause Resiliency to retry the request (if retries are enabled), but if the request continues to fail, the response is sent to the user with whatever status code the app returned.
return rResp, codeError{
headers: rResp.Headers(),
statusCode: int(resStatus.Code),
msg: msg,
contentType: rResp.ContentType(),
}
}
} else if confHeader.Match.RegexMatch != "" {
confHeaderVal = confHeader.Match.RegexMatch
matched, err := regexp.MatchString(confHeaderVal, rVal)
if err != nil {
fmt.Println("Invalid regex:", err)
//fix this bruhhh
}
if confHeaderKey == respKey && matched {
msg, _ := rResp.RawDataFull()
// Returning a `codeError` here will cause Resiliency to retry the request (if retries are enabled), but if the request continues to fail, the response is sent to the user with whatever status code the app returned.
return rResp, codeError{
headers: rResp.Headers(),
statusCode: int(resStatus.Code),
msg: msg,
contentType: rResp.ContentType(),
}
}
}
}
}
}

if (resStatus.Code < 200 || resStatus.Code > 399) {
if policyDef.HasHttpRetryErrors() || policyDef.HasHttpRetryStatusCodes() {
errCodes := policyDef.GetHttpRetryStatusCodes()

if ((httpErrMap["5xx"] && (resStatus.Code >= 500 && resStatus.Code <= 599)) ||
(httpErrMap["retriable_4xx"] && resStatus.Code == 409)) {
msg, _ := rResp.RawDataFull()
// Returning a `codeError` here will cause Resiliency to retry the request (if retries are enabled), but if the request continues to fail, the response is sent to the user with whatever status code the app returned.
return rResp, codeError{
headers: rResp.Headers(),
statusCode: int(resStatus.Code),
msg: msg,
contentType: rResp.ContentType(),
}
}

for _, respCode := range errCodes {
if respCode == int(resStatus.Code) {
msg, _ := rResp.RawDataFull()
// Returning a `codeError` here will cause Resiliency to retry the request (if retries are enabled), but if the request continues to fail, the response is sent to the user with whatever status code the app returned.
return rResp, codeError{
headers: rResp.Headers(),
statusCode: int(resStatus.Code),
msg: msg,
contentType: rResp.ContentType(),
}
}
}
} else {
msg, _ := rResp.RawDataFull()
// Returning a `codeError` here will cause Resiliency to retry the request (if retries are enabled), but if the request continues to fail, the response is sent to the user with whatever status code the app returned.
return rResp, codeError{
headers: rResp.Headers(),
statusCode: int(resStatus.Code),
msg: msg,
contentType: rResp.ContentType(),
}
}
}
}

Expand Down
26 changes: 26 additions & 0 deletions pkg/resiliency/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cenkalti/backoff/v4"

"github.com/dapr/dapr/pkg/resiliency/breaker"
httpRetryMatch "github.com/dapr/dapr/pkg/retry"
"github.com/dapr/kit/logger"
"github.com/dapr/kit/retry"
)
Expand All @@ -50,6 +51,7 @@ type PolicyDefinition struct {
name string
t time.Duration
r *retry.Config
httpRetryMatches *httpRetryMatch.HttpRetryMatch
cb *breaker.CircuitBreaker
addTimeoutActivatedMetric func()
addRetryActivatedMetric func()
Expand Down Expand Up @@ -80,6 +82,30 @@ func (p PolicyDefinition) HasRetries() bool {
return p.r != nil && p.r.MaxRetries != 0
}

func (p PolicyDefinition) HasHttpRetryStatusCodes() bool {
return p.httpRetryMatches != nil && len(p.httpRetryMatches.HTTPStatusCodes) > 0
}

func (p PolicyDefinition) GetHttpRetryStatusCodes() []int {
return p.httpRetryMatches.HTTPStatusCodes
}

func (p PolicyDefinition) HasHttpRetryErrors() bool {
return p.httpRetryMatches != nil && len(p.httpRetryMatches.Errors) > 0
}

func (p PolicyDefinition) GetHttpRetryErrors() []string {
return p.httpRetryMatches.Errors
}

func (p PolicyDefinition) HasHttpRetryHeaders() bool {
return p.httpRetryMatches != nil && len(p.httpRetryMatches.Headers) > 0
}

func (p PolicyDefinition) GetHttpRetryHeaders() []httpRetryMatch.HeaderMatch {
return p.httpRetryMatches.Headers
}

type RunnerOpts[T any] struct {
// The disposer is a function which is invoked when the operation fails, including due to timing out in a background goroutine. It receives the value returned by the operation function as long as it's non-zero (e.g. non-nil for pointer types).
// The disposer can be used to perform cleanup tasks on values returned by the operation function that would otherwise leak (because they're not returned by the result of the runner).
Expand Down
Loading

0 comments on commit 6fd03a9

Please sign in to comment.