diff --git a/client/rpc.go b/client/rpc.go index 534e8a507ad..3f52a7b16c7 100644 --- a/client/rpc.go +++ b/client/rpc.go @@ -83,24 +83,22 @@ TRY: // Move off to another server, and see if we can retry. c.rpcLogger.Error("error performing RPC to server", "error", rpcErr, "rpc", method, "server", server.Addr) c.servers.NotifyFailedServer(server) - if retry := canRetry(args, rpcErr); !retry { + if retry := canRetry(args, rpcErr, firstCheck, c.config.RPCHoldTimeout); !retry { return rpcErr } // We can wait a bit and retry! - if time.Since(firstCheck) < c.config.RPCHoldTimeout { - jitter := lib.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction) - select { - case <-time.After(jitter): - goto TRY - case <-c.shutdownCh: - } + jitter := lib.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction) + select { + case <-time.After(jitter): + goto TRY + case <-c.shutdownCh: } return rpcErr } // canRetry returns true if the given situation is safe for a retry. -func canRetry(args interface{}, err error) bool { +func canRetry(args interface{}, err error, start time.Time, rpcHoldTimeout time.Duration) bool { // No leader errors are always safe to retry since no state could have // been changed. if structs.IsErrNoLeader(err) { @@ -110,7 +108,7 @@ func canRetry(args interface{}, err error) bool { // Reads are safe to retry for stream errors, such as if a server was // being shut down. info, ok := args.(structs.RPCInfo) - if ok && info.IsRead() && lib.IsErrEOF(err) { + if ok && info.IsRead() && lib.IsErrEOF(err) && !info.HasTimedOut(start, rpcHoldTimeout) { return true } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index a5a8615d944..340f2e36c37 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -203,6 +203,7 @@ type RPCInfo interface { AllowStaleRead() bool IsForwarded() bool SetForwarded() + HasTimedOut(since time.Time, rpcHoldTimeout time.Duration) bool } // InternalRpcInfo allows adding internal RPC metadata to an RPC. This struct @@ -282,6 +283,13 @@ func (q QueryOptions) AllowStaleRead() bool { return q.AllowStale } +func (q QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration) bool { + if q.MinQueryIndex > 0 { + return time.Since(start) > (q.MaxQueryTime + rpcHoldTimeout) + } + return time.Since(start) > rpcHoldTimeout +} + // AgentPprofRequest is used to request a pprof report for a given node. type AgentPprofRequest struct { // ReqType specifies the profile to use @@ -368,6 +376,10 @@ func (w WriteRequest) AllowStaleRead() bool { return false } +func (w WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration) bool { + return time.Since(start) > rpcHoldTimeout +} + // QueryMeta allows a query response to include potentially // useful metadata about a query type QueryMeta struct {