Skip to content

Commit

Permalink
Merge #70017
Browse files Browse the repository at this point in the history
70017: rpc: avoid network IO in `Dialer.ConnHealth` r=knz,tbg a=erikgrinaker

`Dialer.ConnHealth` is used to check whether a healthy RPC connection
exists to a given node, in order to avoid interacting with unavailable
nodes. However, this actually attempted to dial the node if no
connection was found, which can block for tens of seconds in the case of
an unresponsive node. This is problematic since it is used in
performance-critical code paths, including Raft command application.

This patch changes `Dialer.ConnHealth` to avoid dialing the node, and
adds a `Context.ConnHealth` helper with access to the RPC connection
registry. Because `DistSQLPlanner` relied on `ConnHealth` to dial the
remote node, it also adds `Dialer.ConnHealthTryDial` which retains
the old behavior for use in DistSQL until a better solution can be
implemented.

Resolves #69888.

Release note (bug fix): Avoid dialing nodes in performance-critical code
paths, which could cause substantial latency when encountering
unresponsive nodes (e.g. when a VM or server is shut down).

Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
craig[bot] and erikgrinaker committed Sep 21, 2021
2 parents 2c36f59 + cee675a commit 73ae236
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 19 deletions.
17 changes: 17 additions & 0 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,19 @@ func (ctx *Context) removeConn(conn *Connection, keys ...connKey) {
}
}

// ConnHealth returns nil if we have an open connection of the request
// class to the given node that succeeded on its most recent heartbeat.
func (ctx *Context) ConnHealth(target string, nodeID roachpb.NodeID, class ConnectionClass) error {
// The local client is always considered healthy.
if ctx.GetLocalInternalClientForAddr(target, nodeID) != nil {
return nil
}
if value, ok := ctx.conns.Load(connKey{target, nodeID, class}); ok {
return value.(*Connection).Health()
}
return ErrNoConnection
}

// GRPCDialOptions returns the minimal `grpc.DialOption`s necessary to connect
// to a server created with `NewServer`.
//
Expand Down Expand Up @@ -1136,6 +1149,10 @@ func (ctx *Context) NewBreaker(name string) *circuit.Breaker {
// the first heartbeat.
var ErrNotHeartbeated = errors.New("not yet heartbeated")

// ErrNoConnection is returned by ConnHealth when no connection exists to
// the node.
var ErrNoConnection = errors.New("no connection found")

func (ctx *Context) runHeartbeat(
conn *Connection, target string, redialChan <-chan struct{},
) (retErr error) {
Expand Down
38 changes: 28 additions & 10 deletions pkg/rpc/nodedialer/nodedialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,27 +201,45 @@ func (n *Dialer) dial(
}

// ConnHealth returns nil if we have an open connection of the request
// class to the given node that succeeded on its most recent heartbeat. See the
// method of the same name on rpc.Context for more details.
// class to the given node that succeeded on its most recent heartbeat.
// Returns circuit.ErrBreakerOpen if the breaker is tripped, otherwise
// ErrNoConnection if no connection to the node currently exists.
func (n *Dialer) ConnHealth(nodeID roachpb.NodeID, class rpc.ConnectionClass) error {
if n == nil || n.resolver == nil {
return errors.New("no node dialer configured")
}
if !n.getBreaker(nodeID, class).Ready() {
// NB: Don't call Ready(). The breaker protocol would require us to follow
// that up with a dial, which we won't do as this is called in hot paths.
if n.getBreaker(nodeID, class).Tripped() {
return circuit.ErrBreakerOpen
}
addr, err := n.resolver(nodeID)
if err != nil {
return err
}
// TODO(bdarnell): GRPCDialNode should detect local addresses and return
// a dummy connection instead of requiring callers to do this check.
if n.rpcContext.GetLocalInternalClientForAddr(addr.String(), nodeID) != nil {
// The local client is always considered healthy.
return nil
return n.rpcContext.ConnHealth(addr.String(), nodeID, class)
}

// ConnHealthTryDial returns nil if we have an open connection of the request
// class to the given node that succeeded on its most recent heartbeat. If no
// healthy connection is found, it will attempt to dial the node.
//
// This exists for components that do not themselves actively maintain RPC
// connections to remote nodes, e.g. DistSQL. However, it can cause significant
// latency if the remote node is unresponsive (e.g. if the server/VM is shut
// down), and should be avoided in latency-sensitive code paths. Preferably,
// this should be replaced by some other mechanism to maintain RPC connections.
// See also: https://github.com/cockroachdb/cockroach/issues/70111
func (n *Dialer) ConnHealthTryDial(nodeID roachpb.NodeID, class rpc.ConnectionClass) error {
err := n.ConnHealth(nodeID, class)
if err == nil || !n.getBreaker(nodeID, class).Ready() {
return err
}
addr, err := n.resolver(nodeID)
if err != nil {
return err
}
conn := n.rpcContext.GRPCDialNode(addr.String(), nodeID, class)
return conn.Health()
return n.rpcContext.GRPCDialNode(addr.String(), nodeID, class).Health()
}

// GetCircuitBreaker retrieves the circuit breaker for connections to the
Expand Down
220 changes: 212 additions & 8 deletions pkg/rpc/nodedialer/nodedialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,17 @@ func TestDialNoBreaker(t *testing.T) {
_, ln, _ := newTestServer(t, clock, stopper, true /* useHeartbeat */)
defer stopper.Stop(ctx)

// Test that DialNoBreaker is successful normally.
nd := New(rpcCtx, newSingleNodeResolver(staticNodeID, ln.Addr()))
_, err := nd.Dial(ctx, staticNodeID, rpc.DefaultClass)
require.NoError(t, err)
testutils.SucceedsSoon(t, func() error {
return nd.ConnHealth(staticNodeID, rpc.DefaultClass)
})
breaker := nd.GetCircuitBreaker(staticNodeID, rpc.DefaultClass)
assert.True(t, breaker.Ready())
_, err := nd.DialNoBreaker(ctx, staticNodeID, rpc.DefaultClass)

// Test that DialNoBreaker is successful normally.
_, err = nd.DialNoBreaker(ctx, staticNodeID, rpc.DefaultClass)
assert.Nil(t, err, "failed to dial")
assert.True(t, breaker.Ready())
assert.Equal(t, breaker.Failures(), int64(0))
Expand Down Expand Up @@ -117,6 +120,144 @@ func TestDialNoBreaker(t *testing.T) {
assert.Equal(t, breaker.Failures(), int64(1))
}

func TestConnHealth(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
stopper := stop.NewStopper()
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
rpcCtx := newTestContext(clock, stopper)
rpcCtx.NodeID.Set(ctx, staticNodeID)
_, ln, hb := newTestServer(t, clock, stopper, true /* useHeartbeat */)
defer stopper.Stop(ctx)
nd := New(rpcCtx, newSingleNodeResolver(staticNodeID, ln.Addr()))

// When no connection exists, we expect ConnHealth to return ErrNoConnection.
require.Equal(t, rpc.ErrNoConnection, nd.ConnHealth(staticNodeID, rpc.DefaultClass))

// After dialing the node, ConnHealth should return nil.
_, err := nd.Dial(ctx, staticNodeID, rpc.DefaultClass)
require.NoError(t, err)
require.NoError(t, nd.ConnHealth(staticNodeID, rpc.DefaultClass))

// ConnHealth should still error for other node ID and class.
require.Error(t, nd.ConnHealth(9, rpc.DefaultClass))
require.Equal(t, rpc.ErrNoConnection, nd.ConnHealth(staticNodeID, rpc.SystemClass))

// When the heartbeat errors, ConnHealth should eventually error too.
hb.setErr(errors.New("boom"))
require.Eventually(t, func() bool {
return nd.ConnHealth(staticNodeID, rpc.DefaultClass) != nil
}, time.Second, 10*time.Millisecond)

// When the heartbeat recovers, ConnHealth should too.
hb.setErr(nil)
require.Eventually(t, func() bool {
return nd.ConnHealth(staticNodeID, rpc.DefaultClass) == nil
}, time.Second, 10*time.Millisecond)

// Tripping the breaker should return ErrBreakerOpen.
br := nd.getBreaker(staticNodeID, rpc.DefaultClass)
br.Trip()
require.Equal(t, circuit.ErrBreakerOpen, nd.ConnHealth(staticNodeID, rpc.DefaultClass))

// Resetting the breaker should recover ConnHealth.
br.Reset()
require.NoError(t, nd.ConnHealth(staticNodeID, rpc.DefaultClass))

// Closing the remote connection should fail ConnHealth.
require.NoError(t, ln.popConn().Close())
require.Eventually(t, func() bool {
return nd.ConnHealth(staticNodeID, rpc.DefaultClass) != nil
}, time.Second, 10*time.Millisecond)
}

func TestConnHealthTryDial(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
stopper := stop.NewStopper()
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
rpcCtx := newTestContext(clock, stopper)
rpcCtx.NodeID.Set(ctx, staticNodeID)
_, ln, hb := newTestServer(t, clock, stopper, true /* useHeartbeat */)
defer stopper.Stop(ctx)
nd := New(rpcCtx, newSingleNodeResolver(staticNodeID, ln.Addr()))

// When no connection exists, we expect ConnHealthTryDial to dial the node,
// which will return ErrNoHeartbeat at first but eventually succeed.
require.Equal(t, rpc.ErrNotHeartbeated, nd.ConnHealthTryDial(staticNodeID, rpc.DefaultClass))
require.Eventually(t, func() bool {
return nd.ConnHealthTryDial(staticNodeID, rpc.DefaultClass) == nil
}, time.Second, 10*time.Millisecond)

// But it should error for other node ID.
require.Error(t, nd.ConnHealthTryDial(9, rpc.DefaultClass))

// When the heartbeat errors, ConnHealthTryDial should eventually error too.
hb.setErr(errors.New("boom"))
require.Eventually(t, func() bool {
return nd.ConnHealthTryDial(staticNodeID, rpc.DefaultClass) != nil
}, time.Second, 10*time.Millisecond)

// When the heartbeat recovers, ConnHealthTryDial should too.
hb.setErr(nil)
require.Eventually(t, func() bool {
return nd.ConnHealthTryDial(staticNodeID, rpc.DefaultClass) == nil
}, time.Second, 10*time.Millisecond)

// Tripping the breaker should return ErrBreakerOpen.
br := nd.getBreaker(staticNodeID, rpc.DefaultClass)
br.Trip()
require.Equal(t, circuit.ErrBreakerOpen, nd.ConnHealthTryDial(staticNodeID, rpc.DefaultClass))

// But it should eventually recover, when the breaker allows it.
require.Eventually(t, func() bool {
return nd.ConnHealthTryDial(staticNodeID, rpc.DefaultClass) == nil
}, 5*time.Second, 10*time.Millisecond)

// Closing the remote connection should eventually recover.
require.NoError(t, ln.popConn().Close())
require.Eventually(t, func() bool {
return nd.ConnHealthTryDial(staticNodeID, rpc.DefaultClass) == nil
}, time.Second, 10*time.Millisecond)
}

func TestConnHealthInternal(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
stopper := stop.NewStopper()
localAddr := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 26657}

// Set up an internal server and relevant configuration. The RPC connection
// will then be considered internal, and we don't have to dial it.
rpcCtx := newTestContext(clock, stopper)
rpcCtx.SetLocalInternalServer(&internalServer{})
rpcCtx.NodeID.Set(ctx, staticNodeID)
rpcCtx.Config.AdvertiseAddr = localAddr.String()

nd := New(rpcCtx, newSingleNodeResolver(staticNodeID, localAddr))
defer stopper.Stop(ctx)

// Even though we haven't dialed the node yet, the internal connection is
// always healthy.
require.NoError(t, nd.ConnHealth(staticNodeID, rpc.DefaultClass))
require.NoError(t, nd.ConnHealth(staticNodeID, rpc.SystemClass))

// However, it does respect the breaker.
br := nd.getBreaker(staticNodeID, rpc.DefaultClass)
br.Trip()
require.Equal(t, circuit.ErrBreakerOpen, nd.ConnHealth(staticNodeID, rpc.DefaultClass))

br.Reset()
require.NoError(t, nd.ConnHealth(staticNodeID, rpc.DefaultClass))

// Other nodes still fail though.
require.Error(t, nd.ConnHealth(7, rpc.DefaultClass))
}

func TestConcurrentCancellationAndTimeout(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper, _, _, _, nd := setUpNodedialerTest(t, staticNodeID)
Expand Down Expand Up @@ -182,7 +323,7 @@ func TestDisconnectsTrip(t *testing.T) {
// in to the breaker are interesting ones as determined by shouldTrip.
hb.setErr(fmt.Errorf("boom"))
underlyingNetConn := ln.popConn()
assert.Nil(t, underlyingNetConn.Close())
require.NoError(t, underlyingNetConn.Close())
const N = 1000
breakerEventChan := make(chan circuit.ListenerEvent, N)
breaker.AddListener(breakerEventChan)
Expand Down Expand Up @@ -226,12 +367,15 @@ func TestDisconnectsTrip(t *testing.T) {
}
}
// Ensure that all of the interesting errors were seen by the breaker.
assert.Equal(t, errorsSeen, failsSeen)
require.Equal(t, errorsSeen, failsSeen)

// Ensure that the connection becomes healthy soon now that the heartbeat
// service is not returning errors.
hb.setErr(nil) // reset in case there were no errors
// Ensure that the connection eventually becomes healthy if we fix the
// heartbeat and keep dialing.
hb.setErr(nil)
testutils.SucceedsSoon(t, func() error {
if _, err := nd.Dial(ctx, staticNodeID, rpc.DefaultClass); err != nil {
return err
}
return nd.ConnHealth(staticNodeID, rpc.DefaultClass)
})
}
Expand All @@ -252,6 +396,8 @@ func setUpNodedialerTest(
rpcCtx.NodeID.Set(context.Background(), nodeID)
_, ln, hb = newTestServer(t, clock, stopper, true /* useHeartbeat */)
nd = New(rpcCtx, newSingleNodeResolver(nodeID, ln.Addr()))
_, err := nd.Dial(context.Background(), nodeID, rpc.DefaultClass)
require.NoError(t, err)
testutils.SucceedsSoon(t, func() error {
return nd.ConnHealth(nodeID, rpc.DefaultClass)
})
Expand Down Expand Up @@ -296,7 +442,7 @@ func newTestServer(
func newTestContext(clock *hlc.Clock, stopper *stop.Stopper) *rpc.Context {
cfg := testutils.NewNodeTestBaseContext()
cfg.Insecure = true
cfg.RPCHeartbeatInterval = 10 * time.Millisecond
cfg.RPCHeartbeatInterval = 100 * time.Millisecond
rctx := rpc.NewContext(rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
Expand Down Expand Up @@ -392,3 +538,61 @@ func (hb *heartbeatService) Ping(
ServerVersion: hb.serverVersion,
}, nil
}

var _ roachpb.InternalServer = &internalServer{}

type internalServer struct{}

func (*internalServer) Batch(
context.Context, *roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
return nil, nil
}

func (*internalServer) RangeLookup(
context.Context, *roachpb.RangeLookupRequest,
) (*roachpb.RangeLookupResponse, error) {
panic("unimplemented")
}

func (*internalServer) RangeFeed(
*roachpb.RangeFeedRequest, roachpb.Internal_RangeFeedServer,
) error {
panic("unimplemented")
}

func (*internalServer) GossipSubscription(
*roachpb.GossipSubscriptionRequest, roachpb.Internal_GossipSubscriptionServer,
) error {
panic("unimplemented")
}

func (*internalServer) ResetQuorum(
context.Context, *roachpb.ResetQuorumRequest,
) (*roachpb.ResetQuorumResponse, error) {
panic("unimplemented")
}

func (*internalServer) Join(
context.Context, *roachpb.JoinNodeRequest,
) (*roachpb.JoinNodeResponse, error) {
panic("unimplemented")
}

func (*internalServer) TokenBucket(
ctx context.Context, in *roachpb.TokenBucketRequest,
) (*roachpb.TokenBucketResponse, error) {
panic("unimplemented")
}

func (*internalServer) GetSpanConfigs(
context.Context, *roachpb.GetSpanConfigsRequest,
) (*roachpb.GetSpanConfigsResponse, error) {
panic("unimplemented")
}

func (*internalServer) UpdateSpanConfigs(
context.Context, *roachpb.UpdateSpanConfigsRequest,
) (*roachpb.UpdateSpanConfigsResponse, error) {
panic("unimplemented")
}
2 changes: 1 addition & 1 deletion pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func NewDistSQLPlanner(
nodeDialer: nodeDialer,
nodeHealth: distSQLNodeHealth{
gossip: gw,
connHealth: nodeDialer.ConnHealth,
connHealth: nodeDialer.ConnHealthTryDial,
isAvailable: isAvailable,
},
distSender: distSender,
Expand Down

0 comments on commit 73ae236

Please sign in to comment.