Skip to content

Commit

Permalink
Merge pull request #70353 from tbg/backport20.2-69405
Browse files Browse the repository at this point in the history
release-20.2: kvserver: remove extraneous circuit breaker check in Raft transport
  • Loading branch information
tbg authored Sep 17, 2021
2 parents ce1977c + dcda901 commit 657f686
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 13 deletions.
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,11 +618,15 @@ func (t *RaftTransport) startProcessNewQueue(
}
defer cleanup(ch)
defer t.queues[class].Delete(int64(toNodeID))
conn, err := t.dialer.Dial(ctx, toNodeID, class)
// NB: we dial without a breaker here because the caller has already
// checked the breaker. Checking it again can cause livelock, see:
// https://github.com/cockroachdb/cockroach/issues/68419
conn, err := t.dialer.DialNoBreaker(ctx, toNodeID, class)
if err != nil {
// DialNode already logs sufficiently, so just return.
return
}

client := NewMultiRaftClient(conn)
batchCtx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down
18 changes: 12 additions & 6 deletions pkg/rpc/nodedialer/nodedialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,13 @@ func (n *Dialer) Dial(
breaker.Fail(err)
return nil, err
}
return n.dial(ctx, nodeID, addr, breaker, class)
return n.dial(ctx, nodeID, addr, breaker, true /* checkBreaker */, class)
}

// DialNoBreaker ignores the breaker if there is an error dialing. This function
// should only be used when there is good reason to believe that the node is reachable.
// DialNoBreaker is like Dial, but will not check the circuit breaker before
// trying to connect. The breaker is notified of the outcome. This function
// should only be used when there is good reason to believe that the node is
// reachable.
func (n *Dialer) DialNoBreaker(
ctx context.Context, nodeID roachpb.NodeID, class rpc.ConnectionClass,
) (_ *grpc.ClientConn, err error) {
Expand All @@ -100,9 +102,12 @@ func (n *Dialer) DialNoBreaker(
}
addr, err := n.resolver(nodeID)
if err != nil {
if ctx.Err() == nil {
n.getBreaker(nodeID, class).Fail(err)
}
return nil, err
}
return n.dial(ctx, nodeID, addr, nil /* breaker */, class)
return n.dial(ctx, nodeID, addr, n.getBreaker(nodeID, class), false /* checkBreaker */, class)
}

// DialInternalClient is a specialization of DialClass for callers that
Expand Down Expand Up @@ -130,7 +135,7 @@ func (n *Dialer) DialInternalClient(
return localCtx, localClient, nil
}
log.VEventf(ctx, 2, "sending request to %s", addr)
conn, err := n.dial(ctx, nodeID, addr, n.getBreaker(nodeID, class), class)
conn, err := n.dial(ctx, nodeID, addr, n.getBreaker(nodeID, class), true /* checkBreaker */, class)
if err != nil {
return nil, nil, err
}
Expand All @@ -144,13 +149,14 @@ func (n *Dialer) dial(
nodeID roachpb.NodeID,
addr net.Addr,
breaker *wrappedBreaker,
checkBreaker bool,
class rpc.ConnectionClass,
) (_ *grpc.ClientConn, err error) {
// Don't trip the breaker if we're already canceled.
if ctxErr := ctx.Err(); ctxErr != nil {
return nil, ctxErr
}
if breaker != nil && !breaker.Ready() {
if checkBreaker && !breaker.Ready() {
err = errors.Wrapf(circuit.ErrBreakerOpen, "unable to dial n%d", nodeID)
return nil, err
}
Expand Down
24 changes: 18 additions & 6 deletions pkg/rpc/nodedialer/nodedialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -78,18 +79,28 @@ func TestDialNoBreaker(t *testing.T) {
assert.True(t, breaker.Ready())
assert.Equal(t, breaker.Failures(), int64(0))

// Test that resolver errors don't trip the breaker.
// Now trip the breaker and check that DialNoBreaker will go ahead
// and dial anyway, and on top of that open the breaker again (since
// the dial will succeed).
breaker.Trip()
require.True(t, breaker.Tripped())
_, err = nd.DialNoBreaker(ctx, staticNodeID, rpc.DefaultClass)
require.NoError(t, err)
require.False(t, breaker.Tripped())

// Test that resolver errors also trip the breaker, just like
// they would for regular Dial.
boom := fmt.Errorf("boom")
nd = New(rpcCtx, func(roachpb.NodeID) (net.Addr, error) {
return nil, boom
})
breaker = nd.GetCircuitBreaker(staticNodeID, rpc.DefaultClass)
_, err = nd.DialNoBreaker(ctx, staticNodeID, rpc.DefaultClass)
assert.Equal(t, errors.Cause(err), boom)
assert.True(t, breaker.Ready())
assert.Equal(t, breaker.Failures(), int64(0))
assert.Equal(t, breaker.Failures(), int64(1))

// Test that connection errors don't trip the breaker either.
// Test that connection errors are reported to the breaker even
// with DialNoBreaker.
// To do this, we have to trick grpc into never successfully dialing
// the server, because if it succeeds once then it doesn't try again
// to perform a connection. To trick grpc in this way, we have to
Expand All @@ -99,10 +110,11 @@ func TestDialNoBreaker(t *testing.T) {
_, ln, _ = newTestServer(t, clock, stopper, false /* useHeartbeat */)
nd = New(rpcCtx, newSingleNodeResolver(staticNodeID, ln.Addr()))
breaker = nd.GetCircuitBreaker(staticNodeID, rpc.DefaultClass)
_, err = nd.DialNoBreaker(ctx, staticNodeID, rpc.DefaultClass)
assert.NotNil(t, err, "expected dial error")
assert.True(t, breaker.Ready())
assert.Equal(t, breaker.Failures(), int64(0))
_, err = nd.DialNoBreaker(ctx, staticNodeID, rpc.DefaultClass)
assert.NotNil(t, err, "expected dial error")
assert.Equal(t, breaker.Failures(), int64(1))
}

func TestConcurrentCancellationAndTimeout(t *testing.T) {
Expand Down

0 comments on commit 657f686

Please sign in to comment.