From d5bf49a8328fc03b7e3f619dbd5195bdecfeefa6 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Thu, 26 Aug 2021 10:18:06 +0200 Subject: [PATCH] kvserver: remove extraneous circuit breaker check in Raft transport See https://github.com/cockroachdb/cockroach/issues/68419. Release justification: bug fix Release note (bug fix): Previously, after a temporary node outage, other nodes in the cluster could fail to connect to the restarted node due to their circuit breakers not resetting. This would manifest in the logs via messages "unable to dial nXX: breaker open", where `XX` is the ID of the restarted node. (Note that such errors are expected for nodes that are truly unreachable, and may still occur around the time of the restart, but for no longer than a few seconds). --- pkg/kv/kvserver/raft_transport.go | 6 +++++- pkg/rpc/nodedialer/nodedialer.go | 18 ++++++++++++------ pkg/rpc/nodedialer/nodedialer_test.go | 24 ++++++++++++++++++------ 3 files changed, 35 insertions(+), 13 deletions(-) diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index ff92d4ede2e8..7eb0bb452be6 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -614,11 +614,15 @@ func (t *RaftTransport) startProcessNewQueue( } defer cleanup(ch) defer t.queues[class].Delete(int64(toNodeID)) - conn, err := t.dialer.Dial(ctx, toNodeID, class) + // NB: we dial without a breaker here because the caller has already + // checked the breaker. Checking it again can cause livelock, see: + // https://github.com/cockroachdb/cockroach/issues/68419 + conn, err := t.dialer.DialNoBreaker(ctx, toNodeID, class) if err != nil { // DialNode already logs sufficiently, so just return. return } + client := NewMultiRaftClient(conn) batchCtx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/pkg/rpc/nodedialer/nodedialer.go b/pkg/rpc/nodedialer/nodedialer.go index 1d42d308d557..1182a16ee713 100644 --- a/pkg/rpc/nodedialer/nodedialer.go +++ b/pkg/rpc/nodedialer/nodedialer.go @@ -88,11 +88,13 @@ func (n *Dialer) Dial( breaker.Fail(err) return nil, err } - return n.dial(ctx, nodeID, addr, breaker, class) + return n.dial(ctx, nodeID, addr, breaker, true /* checkBreaker */, class) } -// DialNoBreaker ignores the breaker if there is an error dialing. This function -// should only be used when there is good reason to believe that the node is reachable. +// DialNoBreaker is like Dial, but will not check the circuit breaker before +// trying to connect. The breaker is notified of the outcome. This function +// should only be used when there is good reason to believe that the node is +// reachable. func (n *Dialer) DialNoBreaker( ctx context.Context, nodeID roachpb.NodeID, class rpc.ConnectionClass, ) (_ *grpc.ClientConn, err error) { @@ -101,9 +103,12 @@ func (n *Dialer) DialNoBreaker( } addr, err := n.resolver(nodeID) if err != nil { + if ctx.Err() == nil { + n.getBreaker(nodeID, class).Fail(err) + } return nil, err } - return n.dial(ctx, nodeID, addr, nil /* breaker */, class) + return n.dial(ctx, nodeID, addr, n.getBreaker(nodeID, class), false /* checkBreaker */, class) } // DialInternalClient is a specialization of DialClass for callers that @@ -131,7 +136,7 @@ func (n *Dialer) DialInternalClient( return localCtx, localClient, nil } log.VEventf(ctx, 2, "sending request to %s", addr) - conn, err := n.dial(ctx, nodeID, addr, n.getBreaker(nodeID, class), class) + conn, err := n.dial(ctx, nodeID, addr, n.getBreaker(nodeID, class), true /* checkBreaker */, class) if err != nil { return nil, nil, err } @@ -145,13 +150,14 @@ func (n *Dialer) dial( nodeID roachpb.NodeID, addr net.Addr, breaker *wrappedBreaker, + checkBreaker bool, class rpc.ConnectionClass, ) (_ *grpc.ClientConn, err error) { // Don't trip the breaker if we're already canceled. if ctxErr := ctx.Err(); ctxErr != nil { return nil, ctxErr } - if breaker != nil && !breaker.Ready() { + if checkBreaker && !breaker.Ready() { err = errors.Wrapf(circuit.ErrBreakerOpen, "unable to dial n%d", nodeID) return nil, err } diff --git a/pkg/rpc/nodedialer/nodedialer_test.go b/pkg/rpc/nodedialer/nodedialer_test.go index 8b1c5e4bc6fe..d64374b22859 100644 --- a/pkg/rpc/nodedialer/nodedialer_test.go +++ b/pkg/rpc/nodedialer/nodedialer_test.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "google.golang.org/grpc" ) @@ -78,7 +79,17 @@ func TestDialNoBreaker(t *testing.T) { assert.True(t, breaker.Ready()) assert.Equal(t, breaker.Failures(), int64(0)) - // Test that resolver errors don't trip the breaker. + // Now trip the breaker and check that DialNoBreaker will go ahead + // and dial anyway, and on top of that open the breaker again (since + // the dial will succeed). + breaker.Trip() + require.True(t, breaker.Tripped()) + _, err = nd.DialNoBreaker(ctx, staticNodeID, rpc.DefaultClass) + require.NoError(t, err) + require.False(t, breaker.Tripped()) + + // Test that resolver errors also trip the breaker, just like + // they would for regular Dial. boom := fmt.Errorf("boom") nd = New(rpcCtx, func(roachpb.NodeID) (net.Addr, error) { return nil, boom @@ -86,10 +97,10 @@ func TestDialNoBreaker(t *testing.T) { breaker = nd.GetCircuitBreaker(staticNodeID, rpc.DefaultClass) _, err = nd.DialNoBreaker(ctx, staticNodeID, rpc.DefaultClass) assert.Equal(t, errors.Cause(err), boom) - assert.True(t, breaker.Ready()) - assert.Equal(t, breaker.Failures(), int64(0)) + assert.Equal(t, breaker.Failures(), int64(1)) - // Test that connection errors don't trip the breaker either. + // Test that connection errors are reported to the breaker even + // with DialNoBreaker. // To do this, we have to trick grpc into never successfully dialing // the server, because if it succeeds once then it doesn't try again // to perform a connection. To trick grpc in this way, we have to @@ -99,10 +110,11 @@ func TestDialNoBreaker(t *testing.T) { _, ln, _ = newTestServer(t, clock, stopper, false /* useHeartbeat */) nd = New(rpcCtx, newSingleNodeResolver(staticNodeID, ln.Addr())) breaker = nd.GetCircuitBreaker(staticNodeID, rpc.DefaultClass) - _, err = nd.DialNoBreaker(ctx, staticNodeID, rpc.DefaultClass) - assert.NotNil(t, err, "expected dial error") assert.True(t, breaker.Ready()) assert.Equal(t, breaker.Failures(), int64(0)) + _, err = nd.DialNoBreaker(ctx, staticNodeID, rpc.DefaultClass) + assert.NotNil(t, err, "expected dial error") + assert.Equal(t, breaker.Failures(), int64(1)) } func TestConcurrentCancellationAndTimeout(t *testing.T) {