Skip to content

Commit

Permalink
kvserver: remove extraneous circuit breaker check in Raft transport
Browse files Browse the repository at this point in the history
See #68419. We now use
`DialNoBreaker` for the raft transport, taking into account the previous
`Ready()` check.

`DialNoBreaker` was previously bypassing the breaker as it ought to but
was also *not reporting to the breaker* the result of the operation;
this is not ideal and was caught by the tests. This commit changes
`DialNoBreaker` to report the result (i.e. fail or success).

Release justification: bug fix
Release note (bug fix): Previously, after a temporary node outage, other
nodes in the cluster could fail to connect to the restarted node due to
their circuit breakers not resetting. This would manifest in the logs
via messages "unable to dial nXX: breaker open", where `XX` is the ID
of the restarted node. (Note that such errors are expected for nodes
that are truly unreachable, and may still occur around the time of
the restart, but for no longer than a few seconds).
  • Loading branch information
tbg committed Aug 26, 2021
1 parent 2a16eed commit 4304289
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 13 deletions.
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,11 +614,15 @@ func (t *RaftTransport) startProcessNewQueue(
}
defer cleanup(ch)
defer t.queues[class].Delete(int64(toNodeID))
conn, err := t.dialer.Dial(ctx, toNodeID, class)
// NB: we dial without a breaker here because the caller has already
// checked the breaker. Checking it again can cause livelock, see:
// https://github.com/cockroachdb/cockroach/issues/68419
conn, err := t.dialer.DialNoBreaker(ctx, toNodeID, class)
if err != nil {
// DialNode already logs sufficiently, so just return.
return
}

client := NewMultiRaftClient(conn)
batchCtx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down
1 change: 1 addition & 0 deletions pkg/rpc/nodedialer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ go_test(
"@com_github_cockroachdb_circuitbreaker//:circuitbreaker",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//:go_default_library",
],
)
18 changes: 12 additions & 6 deletions pkg/rpc/nodedialer/nodedialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,13 @@ func (n *Dialer) Dial(
breaker.Fail(err)
return nil, err
}
return n.dial(ctx, nodeID, addr, breaker, class)
return n.dial(ctx, nodeID, addr, breaker, true /* checkBreaker */, class)
}

// DialNoBreaker ignores the breaker if there is an error dialing. This function
// should only be used when there is good reason to believe that the node is reachable.
// DialNoBreaker is like Dial, but will not check the circuit breaker before
// trying to connect. The breaker is notified of the outcome. This function
// should only be used when there is good reason to believe that the node is
// reachable.
func (n *Dialer) DialNoBreaker(
ctx context.Context, nodeID roachpb.NodeID, class rpc.ConnectionClass,
) (_ *grpc.ClientConn, err error) {
Expand All @@ -101,9 +103,12 @@ func (n *Dialer) DialNoBreaker(
}
addr, err := n.resolver(nodeID)
if err != nil {
if ctx.Err() == nil {
n.getBreaker(nodeID, class).Fail(err)
}
return nil, err
}
return n.dial(ctx, nodeID, addr, nil /* breaker */, class)
return n.dial(ctx, nodeID, addr, n.getBreaker(nodeID, class), false /* checkBreaker */, class)
}

// DialInternalClient is a specialization of DialClass for callers that
Expand Down Expand Up @@ -131,7 +136,7 @@ func (n *Dialer) DialInternalClient(
return localCtx, localClient, nil
}
log.VEventf(ctx, 2, "sending request to %s", addr)
conn, err := n.dial(ctx, nodeID, addr, n.getBreaker(nodeID, class), class)
conn, err := n.dial(ctx, nodeID, addr, n.getBreaker(nodeID, class), true /* checkBreaker */, class)
if err != nil {
return nil, nil, err
}
Expand All @@ -145,13 +150,14 @@ func (n *Dialer) dial(
nodeID roachpb.NodeID,
addr net.Addr,
breaker *wrappedBreaker,
checkBreaker bool,
class rpc.ConnectionClass,
) (_ *grpc.ClientConn, err error) {
// Don't trip the breaker if we're already canceled.
if ctxErr := ctx.Err(); ctxErr != nil {
return nil, ctxErr
}
if breaker != nil && !breaker.Ready() {
if checkBreaker && !breaker.Ready() {
err = errors.Wrapf(circuit.ErrBreakerOpen, "unable to dial n%d", nodeID)
return nil, err
}
Expand Down
24 changes: 18 additions & 6 deletions pkg/rpc/nodedialer/nodedialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -78,18 +79,28 @@ func TestDialNoBreaker(t *testing.T) {
assert.True(t, breaker.Ready())
assert.Equal(t, breaker.Failures(), int64(0))

// Test that resolver errors don't trip the breaker.
// Now trip the breaker and check that DialNoBreaker will go ahead
// and dial anyway, and on top of that open the breaker again (since
// the dial will succeed).
breaker.Trip()
require.True(t, breaker.Tripped())
_, err = nd.DialNoBreaker(ctx, staticNodeID, rpc.DefaultClass)
require.NoError(t, err)
require.False(t, breaker.Tripped())

// Test that resolver errors also trip the breaker, just like
// they would for regular Dial.
boom := fmt.Errorf("boom")
nd = New(rpcCtx, func(roachpb.NodeID) (net.Addr, error) {
return nil, boom
})
breaker = nd.GetCircuitBreaker(staticNodeID, rpc.DefaultClass)
_, err = nd.DialNoBreaker(ctx, staticNodeID, rpc.DefaultClass)
assert.Equal(t, errors.Cause(err), boom)
assert.True(t, breaker.Ready())
assert.Equal(t, breaker.Failures(), int64(0))
assert.Equal(t, breaker.Failures(), int64(1))

// Test that connection errors don't trip the breaker either.
// Test that connection errors are reported to the breaker even
// with DialNoBreaker.
// To do this, we have to trick grpc into never successfully dialing
// the server, because if it succeeds once then it doesn't try again
// to perform a connection. To trick grpc in this way, we have to
Expand All @@ -99,10 +110,11 @@ func TestDialNoBreaker(t *testing.T) {
_, ln, _ = newTestServer(t, clock, stopper, false /* useHeartbeat */)
nd = New(rpcCtx, newSingleNodeResolver(staticNodeID, ln.Addr()))
breaker = nd.GetCircuitBreaker(staticNodeID, rpc.DefaultClass)
_, err = nd.DialNoBreaker(ctx, staticNodeID, rpc.DefaultClass)
assert.NotNil(t, err, "expected dial error")
assert.True(t, breaker.Ready())
assert.Equal(t, breaker.Failures(), int64(0))
_, err = nd.DialNoBreaker(ctx, staticNodeID, rpc.DefaultClass)
assert.NotNil(t, err, "expected dial error")
assert.Equal(t, breaker.Failures(), int64(1))
}

func TestConcurrentCancellationAndTimeout(t *testing.T) {
Expand Down

0 comments on commit 4304289

Please sign in to comment.