Skip to content

Commit

Permalink
RPC Timeout/Retries account for blocking requests
Browse files Browse the repository at this point in the history
  • Loading branch information
joel0 committed Oct 20, 2020
1 parent a3f8aa2 commit ee2f740
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 30 deletions.
14 changes: 6 additions & 8 deletions agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,18 +270,16 @@ TRY:
)
metrics.IncrCounterWithLabels([]string{"client", "rpc", "failed"}, 1, []metrics.Label{{Name: "server", Value: server.Name}})
manager.NotifyFailedServer(server)
if retry := canRetry(args, rpcErr); !retry {
if retry := canRetry(args, rpcErr, firstCheck, c.config); !retry {
return rpcErr
}

// We can wait a bit and retry!
if time.Since(firstCheck) < c.config.RPCHoldTimeout {
jitter := lib.RandomStagger(c.config.RPCHoldTimeout / jitterFraction)
select {
case <-time.After(jitter):
goto TRY
case <-c.shutdownCh:
}
jitter := lib.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction)
select {
case <-time.After(jitter):
goto TRY
case <-c.shutdownCh:
}
return rpcErr
}
Expand Down
41 changes: 19 additions & 22 deletions agent/consul/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@ import (
)

const (
// jitterFraction is a the limit to the amount of jitter we apply
// to a user specified MaxQueryTime. We divide the specified time by
// the fraction. So 16 == 6.25% limit of jitter. This same fraction
// is applied to the RPCHoldTimeout
jitterFraction = 16

// Warn if the Raft command is larger than this.
// If it's over 1MB something is probably being abusive.
raftWarnSize = 1024 * 1024
Expand Down Expand Up @@ -485,7 +479,16 @@ func (c *limitedConn) Read(b []byte) (n int, err error) {
}

// canRetry returns true if the given situation is safe for a retry.
func canRetry(args interface{}, err error) bool {
func canRetry(args interface{}, err error, start time.Time, config *Config) bool {
rpcInfo, hasInfo := args.(structs.RPCInfo)
if hasInfo && rpcInfo.HasTimedOut(start, config.RPCHoldTimeout, config.MaxQueryTime, config.DefaultQueryTime) {
// RPCInfo timeout may include extra time for MaxQueryTime
return false
} else if !hasInfo && time.Since(start) > config.RPCHoldTimeout {
// When not RPCInfo, timeout is only RPCHoldTimeout
return false
}

// No leader errors are always safe to retry since no state could have
// been changed.
if structs.IsErrNoLeader(err) {
Expand All @@ -500,8 +503,7 @@ func canRetry(args interface{}, err error) bool {

// Reads are safe to retry for stream errors, such as if a server was
// being shut down.
info, ok := args.(structs.RPCInfo)
if ok && info.IsRead() && lib.IsErrEOF(err) {
if hasInfo && rpcInfo.IsRead() && lib.IsErrEOF(err) {
return true
}

Expand All @@ -511,7 +513,7 @@ func canRetry(args interface{}, err error) bool {
// ForwardRPC is used to forward an RPC request to a remote DC or to the local leader
// Returns a bool of if forwarding was performed, as well as any error
func (s *Server) ForwardRPC(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) {
var firstCheck time.Time
firstCheck := time.Now()

// Handle DC forwarding
dc := info.RequestDatacenter()
Expand Down Expand Up @@ -563,19 +565,14 @@ CHECK_LEADER:
if leader != nil {
rpcErr = s.connPool.RPC(s.config.Datacenter, leader.ShortName, leader.Addr,
method, args, reply)
if rpcErr != nil && canRetry(info, rpcErr) {
goto RETRY
if rpcErr == nil {
return true, nil
}
return true, rpcErr
}

RETRY:
// Gate the request until there is a leader
if firstCheck.IsZero() {
firstCheck = time.Now()
}
if time.Since(firstCheck) < s.config.RPCHoldTimeout {
jitter := lib.RandomStagger(s.config.RPCHoldTimeout / jitterFraction)
if retry := canRetry(args, rpcErr, firstCheck, s.config); retry {
// Gate the request until there is a leader
jitter := lib.RandomStagger(s.config.RPCHoldTimeout / structs.JitterFraction)
select {
case <-time.After(jitter):
goto CHECK_LEADER
Expand Down Expand Up @@ -781,7 +778,7 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s
}

// Apply a small amount of jitter to the request.
queryTimeout += lib.RandomStagger(queryTimeout / jitterFraction)
queryTimeout += lib.RandomStagger(queryTimeout / structs.JitterFraction)

// wrap the base context with a deadline
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(queryTimeout))
Expand Down Expand Up @@ -882,7 +879,7 @@ func (s *Server) consistentRead() error {
if s.isReadyForConsistentReads() {
return nil
}
jitter := lib.RandomStagger(s.config.RPCHoldTimeout / jitterFraction)
jitter := lib.RandomStagger(s.config.RPCHoldTimeout / structs.JitterFraction)
deadline := time.Now().Add(s.config.RPCHoldTimeout)

for time.Now().Before(deadline) {
Expand Down
25 changes: 25 additions & 0 deletions agent/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ const (
// we multiply by time.Second
lockDelayMinThreshold = 1000

// JitterFraction is a the limit to the amount of jitter we apply
// to a user specified MaxQueryTime. We divide the specified time by
// the fraction. So 16 == 6.25% limit of jitter. This same fraction
// is applied to the RPCHoldTimeout
JitterFraction = 16

// WildcardSpecifier is the string which should be used for specifying a wildcard
// The exact semantics of the wildcard is left up to the code where its used.
WildcardSpecifier = "*"
Expand Down Expand Up @@ -194,6 +200,7 @@ type RPCInfo interface {
AllowStaleRead() bool
TokenSecret() string
SetTokenSecret(string)
HasTimedOut(since time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) bool
}

// QueryOptions is used to specify various flags for read queries
Expand Down Expand Up @@ -292,6 +299,20 @@ func (q *QueryOptions) SetTokenSecret(s string) {
q.Token = s
}

func (q QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) bool {
if q.MinQueryIndex > 0 {
if q.MaxQueryTime > maxQueryTime {
q.MaxQueryTime = maxQueryTime
} else if q.MaxQueryTime <= 0 {
q.MaxQueryTime = defaultQueryTime
}
q.MaxQueryTime += lib.RandomStagger(q.MaxQueryTime / JitterFraction)

return time.Since(start) > (q.MaxQueryTime + rpcHoldTimeout)
}
return time.Since(start) > rpcHoldTimeout
}

type WriteRequest struct {
// Token is the ACL token ID. If not provided, the 'anonymous'
// token is assumed for backwards compatibility.
Expand All @@ -315,6 +336,10 @@ func (w *WriteRequest) SetTokenSecret(s string) {
w.Token = s
}

func (w WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) bool {
return time.Since(start) > rpcHoldTimeout
}

// QueryMeta allows a query response to include potentially
// useful metadata about a query
type QueryMeta struct {
Expand Down
3 changes: 3 additions & 0 deletions lib/eof.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ var yamuxSessionShutdown = yamux.ErrSessionShutdown.Error()
// IsErrEOF returns true if we get an EOF error from the socket itself, or
// an EOF equivalent error from yamux.
func IsErrEOF(err error) bool {
if err == nil {
return false
}
if err == io.EOF {
return true
}
Expand Down
6 changes: 6 additions & 0 deletions proto/pbautoconf/auto_config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package pbautoconf

import "time"

func (req *AutoConfigRequest) RequestDatacenter() string {
return req.Datacenter
}
Expand All @@ -19,3 +21,7 @@ func (req *AutoConfigRequest) TokenSecret() string {
func (req *AutoConfigRequest) SetTokenSecret(token string) {
req.ConsulToken = token
}

func (req *AutoConfigRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) bool {
return time.Since(start) > rpcHoldTimeout
}

0 comments on commit ee2f740

Please sign in to comment.