Skip to content

Commit

Permalink
kvserver: remove extraneous circuit breaker check in Raft transport
Browse files Browse the repository at this point in the history
See #68419.

Release justification: bug fix
Release note (bug fix): Previously, after a temporary node outage, other
nodes in the cluster could fail to connect to the restarted node due to
their circuit breakers not resetting. This would manifest in the logs
via messages "unable to dial nXX: breaker open", where `XX` is the ID
of the restarted node. (Note that such errors are expected for nodes
that are truly unreachable, and may still occur around the time of
the restart, but for no longer than a few seconds).
  • Loading branch information
tbg committed Aug 26, 2021
1 parent 2a16eed commit d5bf49a
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 13 deletions.
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,11 +614,15 @@ func (t *RaftTransport) startProcessNewQueue(
}
defer cleanup(ch)
defer t.queues[class].Delete(int64(toNodeID))
conn, err := t.dialer.Dial(ctx, toNodeID, class)
// NB: we dial without a breaker here because the caller has already
// checked the breaker. Checking it again can cause livelock, see:
// https://github.com/cockroachdb/cockroach/issues/68419
conn, err := t.dialer.DialNoBreaker(ctx, toNodeID, class)
if err != nil {
// DialNode already logs sufficiently, so just return.
return
}

client := NewMultiRaftClient(conn)
batchCtx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down
18 changes: 12 additions & 6 deletions pkg/rpc/nodedialer/nodedialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,13 @@ func (n *Dialer) Dial(
breaker.Fail(err)
return nil, err
}
return n.dial(ctx, nodeID, addr, breaker, class)
return n.dial(ctx, nodeID, addr, breaker, true /* checkBreaker */, class)
}

// DialNoBreaker ignores the breaker if there is an error dialing. This function
// should only be used when there is good reason to believe that the node is reachable.
// DialNoBreaker is like Dial, but will not check the circuit breaker before
// trying to connect. The breaker is notified of the outcome. This function
// should only be used when there is good reason to believe that the node is
// reachable.
func (n *Dialer) DialNoBreaker(
ctx context.Context, nodeID roachpb.NodeID, class rpc.ConnectionClass,
) (_ *grpc.ClientConn, err error) {
Expand All @@ -101,9 +103,12 @@ func (n *Dialer) DialNoBreaker(
}
addr, err := n.resolver(nodeID)
if err != nil {
if ctx.Err() == nil {
n.getBreaker(nodeID, class).Fail(err)
}
return nil, err
}
return n.dial(ctx, nodeID, addr, nil /* breaker */, class)
return n.dial(ctx, nodeID, addr, n.getBreaker(nodeID, class), false /* checkBreaker */, class)
}

// DialInternalClient is a specialization of DialClass for callers that
Expand Down Expand Up @@ -131,7 +136,7 @@ func (n *Dialer) DialInternalClient(
return localCtx, localClient, nil
}
log.VEventf(ctx, 2, "sending request to %s", addr)
conn, err := n.dial(ctx, nodeID, addr, n.getBreaker(nodeID, class), class)
conn, err := n.dial(ctx, nodeID, addr, n.getBreaker(nodeID, class), true /* checkBreaker */, class)
if err != nil {
return nil, nil, err
}
Expand All @@ -145,13 +150,14 @@ func (n *Dialer) dial(
nodeID roachpb.NodeID,
addr net.Addr,
breaker *wrappedBreaker,
checkBreaker bool,
class rpc.ConnectionClass,
) (_ *grpc.ClientConn, err error) {
// Don't trip the breaker if we're already canceled.
if ctxErr := ctx.Err(); ctxErr != nil {
return nil, ctxErr
}
if breaker != nil && !breaker.Ready() {
if checkBreaker && !breaker.Ready() {
err = errors.Wrapf(circuit.ErrBreakerOpen, "unable to dial n%d", nodeID)
return nil, err
}
Expand Down
24 changes: 18 additions & 6 deletions pkg/rpc/nodedialer/nodedialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -78,18 +79,28 @@ func TestDialNoBreaker(t *testing.T) {
assert.True(t, breaker.Ready())
assert.Equal(t, breaker.Failures(), int64(0))

// Test that resolver errors don't trip the breaker.
// Now trip the breaker and check that DialNoBreaker will go ahead
// and dial anyway, and on top of that open the breaker again (since
// the dial will succeed).
breaker.Trip()
require.True(t, breaker.Tripped())
_, err = nd.DialNoBreaker(ctx, staticNodeID, rpc.DefaultClass)
require.NoError(t, err)
require.False(t, breaker.Tripped())

// Test that resolver errors also trip the breaker, just like
// they would for regular Dial.
boom := fmt.Errorf("boom")
nd = New(rpcCtx, func(roachpb.NodeID) (net.Addr, error) {
return nil, boom
})
breaker = nd.GetCircuitBreaker(staticNodeID, rpc.DefaultClass)
_, err = nd.DialNoBreaker(ctx, staticNodeID, rpc.DefaultClass)
assert.Equal(t, errors.Cause(err), boom)
assert.True(t, breaker.Ready())
assert.Equal(t, breaker.Failures(), int64(0))
assert.Equal(t, breaker.Failures(), int64(1))

// Test that connection errors don't trip the breaker either.
// Test that connection errors are reported to the breaker even
// with DialNoBreaker.
// To do this, we have to trick grpc into never successfully dialing
// the server, because if it succeeds once then it doesn't try again
// to perform a connection. To trick grpc in this way, we have to
Expand All @@ -99,10 +110,11 @@ func TestDialNoBreaker(t *testing.T) {
_, ln, _ = newTestServer(t, clock, stopper, false /* useHeartbeat */)
nd = New(rpcCtx, newSingleNodeResolver(staticNodeID, ln.Addr()))
breaker = nd.GetCircuitBreaker(staticNodeID, rpc.DefaultClass)
_, err = nd.DialNoBreaker(ctx, staticNodeID, rpc.DefaultClass)
assert.NotNil(t, err, "expected dial error")
assert.True(t, breaker.Ready())
assert.Equal(t, breaker.Failures(), int64(0))
_, err = nd.DialNoBreaker(ctx, staticNodeID, rpc.DefaultClass)
assert.NotNil(t, err, "expected dial error")
assert.Equal(t, breaker.Failures(), int64(1))
}

func TestConcurrentCancellationAndTimeout(t *testing.T) {
Expand Down

0 comments on commit d5bf49a

Please sign in to comment.