From cee675a62a34035081ca7cfe9593cda4a1c7afb8 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 10 Sep 2021 10:28:29 +0000 Subject: [PATCH] rpc: avoid network IO in `Dialer.ConnHealth` `Dialer.ConnHealth` is used to check whether a healthy RPC connection exists to a given node, in order to avoid interacting with unavailable nodes. However, this actually attempted to dial the node if no connection was found, which can block for tens of seconds in the case of an unresponsive node. This is problematic since it is used in performance-critical code paths, including Raft command application. This patch changes `Dialer.ConnHealth` to avoid dialing the node, and adds a `Context.ConnHealth` helper with access to the RPC connection registry. Because `DistSQLPlanner` relied on `ConnHealth` to dial the remote node, it also adds `Dialer.ConnHealthTryDial` which retains the old behavior for use in DistSQL until a better solution can be implemented. Release note (bug fix): Avoid dialing nodes in performance-critical code paths, which could cause substantial latency when encountering unresponsive nodes (e.g. when a VM or server is shut down). --- pkg/rpc/context.go | 17 ++ pkg/rpc/nodedialer/nodedialer.go | 38 +++-- pkg/rpc/nodedialer/nodedialer_test.go | 220 +++++++++++++++++++++++++- pkg/sql/distsql_physical_planner.go | 2 +- 4 files changed, 258 insertions(+), 19 deletions(-) diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 99dd0fe558d1..8d604dfdb997 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -685,6 +685,19 @@ func (ctx *Context) removeConn(conn *Connection, keys ...connKey) { } } +// ConnHealth returns nil if we have an open connection of the request +// class to the given node that succeeded on its most recent heartbeat. +func (ctx *Context) ConnHealth(target string, nodeID roachpb.NodeID, class ConnectionClass) error { + // The local client is always considered healthy. + if ctx.GetLocalInternalClientForAddr(target, nodeID) != nil { + return nil + } + if value, ok := ctx.conns.Load(connKey{target, nodeID, class}); ok { + return value.(*Connection).Health() + } + return ErrNoConnection +} + // GRPCDialOptions returns the minimal `grpc.DialOption`s necessary to connect // to a server created with `NewServer`. // @@ -1134,6 +1147,10 @@ func (ctx *Context) NewBreaker(name string) *circuit.Breaker { // the first heartbeat. var ErrNotHeartbeated = errors.New("not yet heartbeated") +// ErrNoConnection is returned by ConnHealth when no connection exists to +// the node. +var ErrNoConnection = errors.New("no connection found") + func (ctx *Context) runHeartbeat( conn *Connection, target string, redialChan <-chan struct{}, ) (retErr error) { diff --git a/pkg/rpc/nodedialer/nodedialer.go b/pkg/rpc/nodedialer/nodedialer.go index 1a4614b43fff..2b7869e65e44 100644 --- a/pkg/rpc/nodedialer/nodedialer.go +++ b/pkg/rpc/nodedialer/nodedialer.go @@ -201,27 +201,45 @@ func (n *Dialer) dial( } // ConnHealth returns nil if we have an open connection of the request -// class to the given node that succeeded on its most recent heartbeat. See the -// method of the same name on rpc.Context for more details. +// class to the given node that succeeded on its most recent heartbeat. +// Returns circuit.ErrBreakerOpen if the breaker is tripped, otherwise +// ErrNoConnection if no connection to the node currently exists. func (n *Dialer) ConnHealth(nodeID roachpb.NodeID, class rpc.ConnectionClass) error { if n == nil || n.resolver == nil { return errors.New("no node dialer configured") } - if !n.getBreaker(nodeID, class).Ready() { + // NB: Don't call Ready(). The breaker protocol would require us to follow + // that up with a dial, which we won't do as this is called in hot paths. + if n.getBreaker(nodeID, class).Tripped() { return circuit.ErrBreakerOpen } addr, err := n.resolver(nodeID) if err != nil { return err } - // TODO(bdarnell): GRPCDialNode should detect local addresses and return - // a dummy connection instead of requiring callers to do this check. - if n.rpcContext.GetLocalInternalClientForAddr(addr.String(), nodeID) != nil { - // The local client is always considered healthy. - return nil + return n.rpcContext.ConnHealth(addr.String(), nodeID, class) +} + +// ConnHealthTryDial returns nil if we have an open connection of the request +// class to the given node that succeeded on its most recent heartbeat. If no +// healthy connection is found, it will attempt to dial the node. +// +// This exists for components that do not themselves actively maintain RPC +// connections to remote nodes, e.g. DistSQL. However, it can cause significant +// latency if the remote node is unresponsive (e.g. if the server/VM is shut +// down), and should be avoided in latency-sensitive code paths. Preferably, +// this should be replaced by some other mechanism to maintain RPC connections. +// See also: https://github.com/cockroachdb/cockroach/issues/70111 +func (n *Dialer) ConnHealthTryDial(nodeID roachpb.NodeID, class rpc.ConnectionClass) error { + err := n.ConnHealth(nodeID, class) + if err == nil || !n.getBreaker(nodeID, class).Ready() { + return err + } + addr, err := n.resolver(nodeID) + if err != nil { + return err } - conn := n.rpcContext.GRPCDialNode(addr.String(), nodeID, class) - return conn.Health() + return n.rpcContext.GRPCDialNode(addr.String(), nodeID, class).Health() } // GetCircuitBreaker retrieves the circuit breaker for connections to the diff --git a/pkg/rpc/nodedialer/nodedialer_test.go b/pkg/rpc/nodedialer/nodedialer_test.go index d64374b22859..a8858ff8b866 100644 --- a/pkg/rpc/nodedialer/nodedialer_test.go +++ b/pkg/rpc/nodedialer/nodedialer_test.go @@ -67,14 +67,17 @@ func TestDialNoBreaker(t *testing.T) { _, ln, _ := newTestServer(t, clock, stopper, true /* useHeartbeat */) defer stopper.Stop(ctx) - // Test that DialNoBreaker is successful normally. nd := New(rpcCtx, newSingleNodeResolver(staticNodeID, ln.Addr())) + _, err := nd.Dial(ctx, staticNodeID, rpc.DefaultClass) + require.NoError(t, err) testutils.SucceedsSoon(t, func() error { return nd.ConnHealth(staticNodeID, rpc.DefaultClass) }) breaker := nd.GetCircuitBreaker(staticNodeID, rpc.DefaultClass) assert.True(t, breaker.Ready()) - _, err := nd.DialNoBreaker(ctx, staticNodeID, rpc.DefaultClass) + + // Test that DialNoBreaker is successful normally. + _, err = nd.DialNoBreaker(ctx, staticNodeID, rpc.DefaultClass) assert.Nil(t, err, "failed to dial") assert.True(t, breaker.Ready()) assert.Equal(t, breaker.Failures(), int64(0)) @@ -117,6 +120,144 @@ func TestDialNoBreaker(t *testing.T) { assert.Equal(t, breaker.Failures(), int64(1)) } +func TestConnHealth(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + stopper := stop.NewStopper() + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + rpcCtx := newTestContext(clock, stopper) + rpcCtx.NodeID.Set(ctx, staticNodeID) + _, ln, hb := newTestServer(t, clock, stopper, true /* useHeartbeat */) + defer stopper.Stop(ctx) + nd := New(rpcCtx, newSingleNodeResolver(staticNodeID, ln.Addr())) + + // When no connection exists, we expect ConnHealth to return ErrNoConnection. + require.Equal(t, rpc.ErrNoConnection, nd.ConnHealth(staticNodeID, rpc.DefaultClass)) + + // After dialing the node, ConnHealth should return nil. + _, err := nd.Dial(ctx, staticNodeID, rpc.DefaultClass) + require.NoError(t, err) + require.NoError(t, nd.ConnHealth(staticNodeID, rpc.DefaultClass)) + + // ConnHealth should still error for other node ID and class. + require.Error(t, nd.ConnHealth(9, rpc.DefaultClass)) + require.Equal(t, rpc.ErrNoConnection, nd.ConnHealth(staticNodeID, rpc.SystemClass)) + + // When the heartbeat errors, ConnHealth should eventually error too. + hb.setErr(errors.New("boom")) + require.Eventually(t, func() bool { + return nd.ConnHealth(staticNodeID, rpc.DefaultClass) != nil + }, time.Second, 10*time.Millisecond) + + // When the heartbeat recovers, ConnHealth should too. + hb.setErr(nil) + require.Eventually(t, func() bool { + return nd.ConnHealth(staticNodeID, rpc.DefaultClass) == nil + }, time.Second, 10*time.Millisecond) + + // Tripping the breaker should return ErrBreakerOpen. + br := nd.getBreaker(staticNodeID, rpc.DefaultClass) + br.Trip() + require.Equal(t, circuit.ErrBreakerOpen, nd.ConnHealth(staticNodeID, rpc.DefaultClass)) + + // Resetting the breaker should recover ConnHealth. + br.Reset() + require.NoError(t, nd.ConnHealth(staticNodeID, rpc.DefaultClass)) + + // Closing the remote connection should fail ConnHealth. + require.NoError(t, ln.popConn().Close()) + require.Eventually(t, func() bool { + return nd.ConnHealth(staticNodeID, rpc.DefaultClass) != nil + }, time.Second, 10*time.Millisecond) +} + +func TestConnHealthTryDial(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + stopper := stop.NewStopper() + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + rpcCtx := newTestContext(clock, stopper) + rpcCtx.NodeID.Set(ctx, staticNodeID) + _, ln, hb := newTestServer(t, clock, stopper, true /* useHeartbeat */) + defer stopper.Stop(ctx) + nd := New(rpcCtx, newSingleNodeResolver(staticNodeID, ln.Addr())) + + // When no connection exists, we expect ConnHealthTryDial to dial the node, + // which will return ErrNoHeartbeat at first but eventually succeed. + require.Equal(t, rpc.ErrNotHeartbeated, nd.ConnHealthTryDial(staticNodeID, rpc.DefaultClass)) + require.Eventually(t, func() bool { + return nd.ConnHealthTryDial(staticNodeID, rpc.DefaultClass) == nil + }, time.Second, 10*time.Millisecond) + + // But it should error for other node ID. + require.Error(t, nd.ConnHealthTryDial(9, rpc.DefaultClass)) + + // When the heartbeat errors, ConnHealthTryDial should eventually error too. + hb.setErr(errors.New("boom")) + require.Eventually(t, func() bool { + return nd.ConnHealthTryDial(staticNodeID, rpc.DefaultClass) != nil + }, time.Second, 10*time.Millisecond) + + // When the heartbeat recovers, ConnHealthTryDial should too. + hb.setErr(nil) + require.Eventually(t, func() bool { + return nd.ConnHealthTryDial(staticNodeID, rpc.DefaultClass) == nil + }, time.Second, 10*time.Millisecond) + + // Tripping the breaker should return ErrBreakerOpen. + br := nd.getBreaker(staticNodeID, rpc.DefaultClass) + br.Trip() + require.Equal(t, circuit.ErrBreakerOpen, nd.ConnHealthTryDial(staticNodeID, rpc.DefaultClass)) + + // But it should eventually recover, when the breaker allows it. + require.Eventually(t, func() bool { + return nd.ConnHealthTryDial(staticNodeID, rpc.DefaultClass) == nil + }, 5*time.Second, 10*time.Millisecond) + + // Closing the remote connection should eventually recover. + require.NoError(t, ln.popConn().Close()) + require.Eventually(t, func() bool { + return nd.ConnHealthTryDial(staticNodeID, rpc.DefaultClass) == nil + }, time.Second, 10*time.Millisecond) +} + +func TestConnHealthInternal(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + stopper := stop.NewStopper() + localAddr := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 26657} + + // Set up an internal server and relevant configuration. The RPC connection + // will then be considered internal, and we don't have to dial it. + rpcCtx := newTestContext(clock, stopper) + rpcCtx.SetLocalInternalServer(&internalServer{}) + rpcCtx.NodeID.Set(ctx, staticNodeID) + rpcCtx.Config.AdvertiseAddr = localAddr.String() + + nd := New(rpcCtx, newSingleNodeResolver(staticNodeID, localAddr)) + defer stopper.Stop(ctx) + + // Even though we haven't dialed the node yet, the internal connection is + // always healthy. + require.NoError(t, nd.ConnHealth(staticNodeID, rpc.DefaultClass)) + require.NoError(t, nd.ConnHealth(staticNodeID, rpc.SystemClass)) + + // However, it does respect the breaker. + br := nd.getBreaker(staticNodeID, rpc.DefaultClass) + br.Trip() + require.Equal(t, circuit.ErrBreakerOpen, nd.ConnHealth(staticNodeID, rpc.DefaultClass)) + + br.Reset() + require.NoError(t, nd.ConnHealth(staticNodeID, rpc.DefaultClass)) + + // Other nodes still fail though. + require.Error(t, nd.ConnHealth(7, rpc.DefaultClass)) +} + func TestConcurrentCancellationAndTimeout(t *testing.T) { defer leaktest.AfterTest(t)() stopper, _, _, _, nd := setUpNodedialerTest(t, staticNodeID) @@ -182,7 +323,7 @@ func TestDisconnectsTrip(t *testing.T) { // in to the breaker are interesting ones as determined by shouldTrip. hb.setErr(fmt.Errorf("boom")) underlyingNetConn := ln.popConn() - assert.Nil(t, underlyingNetConn.Close()) + require.NoError(t, underlyingNetConn.Close()) const N = 1000 breakerEventChan := make(chan circuit.ListenerEvent, N) breaker.AddListener(breakerEventChan) @@ -226,12 +367,15 @@ func TestDisconnectsTrip(t *testing.T) { } } // Ensure that all of the interesting errors were seen by the breaker. - assert.Equal(t, errorsSeen, failsSeen) + require.Equal(t, errorsSeen, failsSeen) - // Ensure that the connection becomes healthy soon now that the heartbeat - // service is not returning errors. - hb.setErr(nil) // reset in case there were no errors + // Ensure that the connection eventually becomes healthy if we fix the + // heartbeat and keep dialing. + hb.setErr(nil) testutils.SucceedsSoon(t, func() error { + if _, err := nd.Dial(ctx, staticNodeID, rpc.DefaultClass); err != nil { + return err + } return nd.ConnHealth(staticNodeID, rpc.DefaultClass) }) } @@ -252,6 +396,8 @@ func setUpNodedialerTest( rpcCtx.NodeID.Set(context.Background(), nodeID) _, ln, hb = newTestServer(t, clock, stopper, true /* useHeartbeat */) nd = New(rpcCtx, newSingleNodeResolver(nodeID, ln.Addr())) + _, err := nd.Dial(context.Background(), nodeID, rpc.DefaultClass) + require.NoError(t, err) testutils.SucceedsSoon(t, func() error { return nd.ConnHealth(nodeID, rpc.DefaultClass) }) @@ -296,7 +442,7 @@ func newTestServer( func newTestContext(clock *hlc.Clock, stopper *stop.Stopper) *rpc.Context { cfg := testutils.NewNodeTestBaseContext() cfg.Insecure = true - cfg.RPCHeartbeatInterval = 10 * time.Millisecond + cfg.RPCHeartbeatInterval = 100 * time.Millisecond rctx := rpc.NewContext(rpc.ContextOptions{ TenantID: roachpb.SystemTenantID, AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, @@ -392,3 +538,61 @@ func (hb *heartbeatService) Ping( ServerVersion: hb.serverVersion, }, nil } + +var _ roachpb.InternalServer = &internalServer{} + +type internalServer struct{} + +func (*internalServer) Batch( + context.Context, *roachpb.BatchRequest, +) (*roachpb.BatchResponse, error) { + return nil, nil +} + +func (*internalServer) RangeLookup( + context.Context, *roachpb.RangeLookupRequest, +) (*roachpb.RangeLookupResponse, error) { + panic("unimplemented") +} + +func (*internalServer) RangeFeed( + *roachpb.RangeFeedRequest, roachpb.Internal_RangeFeedServer, +) error { + panic("unimplemented") +} + +func (*internalServer) GossipSubscription( + *roachpb.GossipSubscriptionRequest, roachpb.Internal_GossipSubscriptionServer, +) error { + panic("unimplemented") +} + +func (*internalServer) ResetQuorum( + context.Context, *roachpb.ResetQuorumRequest, +) (*roachpb.ResetQuorumResponse, error) { + panic("unimplemented") +} + +func (*internalServer) Join( + context.Context, *roachpb.JoinNodeRequest, +) (*roachpb.JoinNodeResponse, error) { + panic("unimplemented") +} + +func (*internalServer) TokenBucket( + ctx context.Context, in *roachpb.TokenBucketRequest, +) (*roachpb.TokenBucketResponse, error) { + panic("unimplemented") +} + +func (*internalServer) GetSpanConfigs( + context.Context, *roachpb.GetSpanConfigsRequest, +) (*roachpb.GetSpanConfigsResponse, error) { + panic("unimplemented") +} + +func (*internalServer) UpdateSpanConfigs( + context.Context, *roachpb.UpdateSpanConfigsRequest, +) (*roachpb.UpdateSpanConfigsResponse, error) { + panic("unimplemented") +} diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 56a40bc20c0e..2d8ef63f70f5 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -165,7 +165,7 @@ func NewDistSQLPlanner( nodeDialer: nodeDialer, nodeHealth: distSQLNodeHealth{ gossip: gw, - connHealth: nodeDialer.ConnHealth, + connHealth: nodeDialer.ConnHealthTryDial, isAvailable: isAvailable, }, distSender: distSender,