From 6a6547b0b63b7415c3651a0b5da363e577d2bbb8 Mon Sep 17 00:00:00 2001 From: Benjamin Buzbee Date: Mon, 30 Nov 2020 12:11:10 -0800 Subject: [PATCH] Fix RPC retry logic in nomad client's rpc.go for blocking queries (#9266) --- client/rpc.go | 56 +++++++++++++++++++++++++++++++--------- nomad/rpc.go | 7 +---- nomad/structs/structs.go | 51 +++++++++++++++++++++--------------- 3 files changed, 75 insertions(+), 39 deletions(-) diff --git a/client/rpc.go b/client/rpc.go index 3f52a7b16c7..83a404b6524 100644 --- a/client/rpc.go +++ b/client/rpc.go @@ -53,12 +53,20 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error { return c.config.RPCHandler.RPC(method, args, reply) } - // This is subtle but we start measuring the time on the client side - // right at the time of the first request, vs. on the first retry as - // is done on the server side inside forward(). This is because the - // servers may already be applying the RPCHoldTimeout up there, so by - // starting the timer here we won't potentially double up the delay. - firstCheck := time.Now() + // We will try to automatically retry requests that fail due to things like server unavailability + // but instead of retrying forever, lets have a solid upper-bound + deadline := time.Now() + + // A reasonable amount of time for leader election. Note when servers forward() our RPC requests + // to the leader they may also allow for an RPCHoldTimeout while waiting for leader election. + // That's OK, we won't double up because we are using it here not as a sleep but + // as a hint to give up + deadline = deadline.Add(c.config.RPCHoldTimeout) + + // If its a blocking query, allow the time specified by the request + if info, ok := args.(structs.RPCInfo); ok { + deadline = deadline.Add(info.TimeToBlock()) + } TRY: server := c.servers.FindServer() @@ -68,6 +76,7 @@ TRY: // Make the request. rpcErr := c.connPool.RPC(c.Region(), server.Addr, c.RPCMajorVersion(), method, args, reply) + if rpcErr == nil { c.fireRpcRetryWatcher() return nil @@ -83,14 +92,37 @@ TRY: // Move off to another server, and see if we can retry. c.rpcLogger.Error("error performing RPC to server", "error", rpcErr, "rpc", method, "server", server.Addr) c.servers.NotifyFailedServer(server) - if retry := canRetry(args, rpcErr, firstCheck, c.config.RPCHoldTimeout); !retry { + + if !canRetry(args, rpcErr) { + c.rpcLogger.Error("error performing RPC to server which is not safe to automatically retry", "error", rpcErr, "rpc", method, "server", server.Addr) + return rpcErr + } + if time.Now().After(deadline) { + // Blocking queries are tricky. jitters and rpcholdtimes in multiple places can result in our server call taking longer than we wanted it to. For example: + // a block time of 5s may easily turn into the server blocking for 10s since it applies its own RPCHoldTime. If the server dies at t=7s we still want to retry + // so before we give up on blocking queries make one last attempt for an immediate answer + if info, ok := args.(structs.RPCInfo); ok && info.TimeToBlock() > 0 { + info.SetTimeToBlock(0) + return c.RPC(method, args, reply) + } + c.rpcLogger.Error("error performing RPC to server, deadline exceeded, cannot retry", "error", rpcErr, "rpc", method, "server", server.Addr) return rpcErr } - // We can wait a bit and retry! - jitter := lib.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction) + // Wait to avoid thundering herd select { - case <-time.After(jitter): + case <-time.After(lib.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction)): + // If we are going to retry a blocking query we need to update the time to block so it finishes by our deadline. + if info, ok := args.(structs.RPCInfo); ok && info.TimeToBlock() > 0 { + newBlockTime := deadline.Sub(time.Now()) + // We can get below 0 here on slow computers because we slept for jitter so at least try to get an immediate response + if newBlockTime < 0 { + newBlockTime = 0 + } + info.SetTimeToBlock(newBlockTime) + return c.RPC(method, args, reply) + } + goto TRY case <-c.shutdownCh: } @@ -98,7 +130,7 @@ TRY: } // canRetry returns true if the given situation is safe for a retry. -func canRetry(args interface{}, err error, start time.Time, rpcHoldTimeout time.Duration) bool { +func canRetry(args interface{}, err error) bool { // No leader errors are always safe to retry since no state could have // been changed. if structs.IsErrNoLeader(err) { @@ -108,7 +140,7 @@ func canRetry(args interface{}, err error, start time.Time, rpcHoldTimeout time. // Reads are safe to retry for stream errors, such as if a server was // being shut down. info, ok := args.(structs.RPCInfo) - if ok && info.IsRead() && lib.IsErrEOF(err) && !info.HasTimedOut(start, rpcHoldTimeout) { + if ok && info.IsRead() && lib.IsErrEOF(err) { return true } diff --git a/nomad/rpc.go b/nomad/rpc.go index b2d1ad9207a..aa549b6e2ea 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -780,12 +780,7 @@ func (r *rpcHandler) blockingRPC(opts *blockingOptions) error { goto RUN_QUERY } - // Restrict the max query time, and ensure there is always one - if opts.queryOpts.MaxQueryTime > structs.MaxBlockingRPCQueryTime { - opts.queryOpts.MaxQueryTime = structs.MaxBlockingRPCQueryTime - } else if opts.queryOpts.MaxQueryTime <= 0 { - opts.queryOpts.MaxQueryTime = structs.DefaultBlockingRPCQueryTime - } + opts.queryOpts.MaxQueryTime = opts.queryOpts.TimeToBlock() // Apply a small amount of jitter to the request opts.queryOpts.MaxQueryTime += lib.RandomStagger(opts.queryOpts.MaxQueryTime / structs.JitterFraction) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 0bc0d42cd57..95963af11ca 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -25,7 +25,6 @@ import ( "strings" "time" - "github.com/hashicorp/consul/lib" "github.com/hashicorp/cronexpr" "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/go-multierror" @@ -232,7 +231,11 @@ type RPCInfo interface { AllowStaleRead() bool IsForwarded() bool SetForwarded() - HasTimedOut(since time.Time, rpcHoldTimeout time.Duration) bool + TimeToBlock() time.Duration + // TimeToBlock sets how long this request can block. The requested time may not be possible, + // so Callers should readback TimeToBlock. E.g. you cannot set time to block at all on WriteRequests + // and it cannot exceed MaxBlockingRPCQueryTime + SetTimeToBlock(t time.Duration) } // InternalRpcInfo allows adding internal RPC metadata to an RPC. This struct @@ -287,6 +290,24 @@ type QueryOptions struct { InternalRpcInfo } +// TimeToBlock returns MaxQueryTime adjusted for maximums and defaults +// it will return 0 if this is not a blocking query +func (q QueryOptions) TimeToBlock() time.Duration { + if q.MinQueryIndex == 0 { + return 0 + } + if q.MaxQueryTime > MaxBlockingRPCQueryTime { + return MaxBlockingRPCQueryTime + } else if q.MaxQueryTime <= 0 { + return DefaultBlockingRPCQueryTime + } + return q.MaxQueryTime +} + +func (q QueryOptions) SetTimeToBlock(t time.Duration) { + q.MaxQueryTime = t +} + func (q QueryOptions) RequestRegion() string { return q.Region } @@ -312,21 +333,6 @@ func (q QueryOptions) AllowStaleRead() bool { return q.AllowStale } -func (q QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration) bool { - if q.MinQueryIndex > 0 { - // Restrict the max query time, and ensure there is always one - if q.MaxQueryTime > MaxBlockingRPCQueryTime { - q.MaxQueryTime = MaxBlockingRPCQueryTime - } else if q.MaxQueryTime <= 0 { - q.MaxQueryTime = DefaultBlockingRPCQueryTime - } - q.MaxQueryTime += lib.RandomStagger(q.MaxQueryTime / JitterFraction) - - return time.Since(start) > (q.MaxQueryTime + rpcHoldTimeout) - } - return time.Since(start) > rpcHoldTimeout -} - // AgentPprofRequest is used to request a pprof report for a given node. type AgentPprofRequest struct { // ReqType specifies the profile to use @@ -387,6 +393,13 @@ type WriteRequest struct { InternalRpcInfo } +func (w WriteRequest) TimeToBlock() time.Duration { + return 0 +} + +func (w WriteRequest) SetTimeToBlock(_ time.Duration) { +} + func (w WriteRequest) RequestRegion() string { // The target region for this request return w.Region @@ -413,10 +426,6 @@ func (w WriteRequest) AllowStaleRead() bool { return false } -func (w WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration) bool { - return time.Since(start) > rpcHoldTimeout -} - // QueryMeta allows a query response to include potentially // useful metadata about a query type QueryMeta struct {