diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 507c8da95573..89b4f6ba3486 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -689,6 +689,19 @@ func (ctx *Context) removeConn(conn *Connection, keys ...connKey) { } } +// ConnHealth returns nil if we have an open connection of the request +// class to the given node that succeeded on its most recent heartbeat. +func (ctx *Context) ConnHealth(target string, nodeID roachpb.NodeID, class ConnectionClass) error { + // The local client is always considered healthy. + if ctx.GetLocalInternalClientForAddr(target, nodeID) != nil { + return nil + } + if value, ok := ctx.conns.Load(connKey{target, nodeID, class}); ok { + return value.(*Connection).Health() + } + return ErrNoConnection +} + // GRPCDialOptions returns the minimal `grpc.DialOption`s necessary to connect // to a server created with `NewServer`. // @@ -1136,6 +1149,10 @@ func (ctx *Context) NewBreaker(name string) *circuit.Breaker { // the first heartbeat. var ErrNotHeartbeated = errors.New("not yet heartbeated") +// ErrNoConnection is returned by ConnHealth when no connection exists to +// the node. +var ErrNoConnection = errors.New("no connection found") + func (ctx *Context) runHeartbeat( conn *Connection, target string, redialChan <-chan struct{}, ) (retErr error) { diff --git a/pkg/rpc/nodedialer/nodedialer.go b/pkg/rpc/nodedialer/nodedialer.go index 38c7ea6cafb7..ef772eae036f 100644 --- a/pkg/rpc/nodedialer/nodedialer.go +++ b/pkg/rpc/nodedialer/nodedialer.go @@ -201,27 +201,45 @@ func (n *Dialer) dial( } // ConnHealth returns nil if we have an open connection of the request -// class to the given node that succeeded on its most recent heartbeat. See the -// method of the same name on rpc.Context for more details. +// class to the given node that succeeded on its most recent heartbeat. +// Returns circuit.ErrBreakerOpen if the breaker is tripped, otherwise +// ErrNoConnection if no connection to the node currently exists. func (n *Dialer) ConnHealth(nodeID roachpb.NodeID, class rpc.ConnectionClass) error { if n == nil || n.resolver == nil { return errors.New("no node dialer configured") } - if !n.getBreaker(nodeID, class).Ready() { + // NB: Don't call Ready(). The breaker protocol would require us to follow + // that up with a dial, which we won't do as this is called in hot paths. + if n.getBreaker(nodeID, class).Tripped() { return circuit.ErrBreakerOpen } addr, err := n.resolver(nodeID) if err != nil { return err } - // TODO(bdarnell): GRPCDialNode should detect local addresses and return - // a dummy connection instead of requiring callers to do this check. - if n.rpcContext.GetLocalInternalClientForAddr(addr.String(), nodeID) != nil { - // The local client is always considered healthy. - return nil + return n.rpcContext.ConnHealth(addr.String(), nodeID, class) +} + +// ConnHealthTryDial returns nil if we have an open connection of the request +// class to the given node that succeeded on its most recent heartbeat. If no +// healthy connection is found, it will attempt to dial the node. +// +// This exists for components that do not themselves actively maintain RPC +// connections to remote nodes, e.g. DistSQL. However, it can cause significant +// latency if the remote node is unresponsive (e.g. if the server/VM is shut +// down), and should be avoided in latency-sensitive code paths. Preferably, +// this should be replaced by some other mechanism to maintain RPC connections. +// See also: https://github.com/cockroachdb/cockroach/issues/70111 +func (n *Dialer) ConnHealthTryDial(nodeID roachpb.NodeID, class rpc.ConnectionClass) error { + err := n.ConnHealth(nodeID, class) + if err == nil || !n.getBreaker(nodeID, class).Ready() { + return err + } + addr, err := n.resolver(nodeID) + if err != nil { + return err } - conn := n.rpcContext.GRPCDialNode(addr.String(), nodeID, class) - return conn.Health() + return n.rpcContext.GRPCDialNode(addr.String(), nodeID, class).Health() } // GetCircuitBreaker retrieves the circuit breaker for connections to the diff --git a/pkg/rpc/nodedialer/nodedialer_test.go b/pkg/rpc/nodedialer/nodedialer_test.go index d64374b22859..a8858ff8b866 100644 --- a/pkg/rpc/nodedialer/nodedialer_test.go +++ b/pkg/rpc/nodedialer/nodedialer_test.go @@ -67,14 +67,17 @@ func TestDialNoBreaker(t *testing.T) { _, ln, _ := newTestServer(t, clock, stopper, true /* useHeartbeat */) defer stopper.Stop(ctx) - // Test that DialNoBreaker is successful normally. nd := New(rpcCtx, newSingleNodeResolver(staticNodeID, ln.Addr())) + _, err := nd.Dial(ctx, staticNodeID, rpc.DefaultClass) + require.NoError(t, err) testutils.SucceedsSoon(t, func() error { return nd.ConnHealth(staticNodeID, rpc.DefaultClass) }) breaker := nd.GetCircuitBreaker(staticNodeID, rpc.DefaultClass) assert.True(t, breaker.Ready()) - _, err := nd.DialNoBreaker(ctx, staticNodeID, rpc.DefaultClass) + + // Test that DialNoBreaker is successful normally. + _, err = nd.DialNoBreaker(ctx, staticNodeID, rpc.DefaultClass) assert.Nil(t, err, "failed to dial") assert.True(t, breaker.Ready()) assert.Equal(t, breaker.Failures(), int64(0)) @@ -117,6 +120,144 @@ func TestDialNoBreaker(t *testing.T) { assert.Equal(t, breaker.Failures(), int64(1)) } +func TestConnHealth(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + stopper := stop.NewStopper() + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + rpcCtx := newTestContext(clock, stopper) + rpcCtx.NodeID.Set(ctx, staticNodeID) + _, ln, hb := newTestServer(t, clock, stopper, true /* useHeartbeat */) + defer stopper.Stop(ctx) + nd := New(rpcCtx, newSingleNodeResolver(staticNodeID, ln.Addr())) + + // When no connection exists, we expect ConnHealth to return ErrNoConnection. + require.Equal(t, rpc.ErrNoConnection, nd.ConnHealth(staticNodeID, rpc.DefaultClass)) + + // After dialing the node, ConnHealth should return nil. + _, err := nd.Dial(ctx, staticNodeID, rpc.DefaultClass) + require.NoError(t, err) + require.NoError(t, nd.ConnHealth(staticNodeID, rpc.DefaultClass)) + + // ConnHealth should still error for other node ID and class. + require.Error(t, nd.ConnHealth(9, rpc.DefaultClass)) + require.Equal(t, rpc.ErrNoConnection, nd.ConnHealth(staticNodeID, rpc.SystemClass)) + + // When the heartbeat errors, ConnHealth should eventually error too. + hb.setErr(errors.New("boom")) + require.Eventually(t, func() bool { + return nd.ConnHealth(staticNodeID, rpc.DefaultClass) != nil + }, time.Second, 10*time.Millisecond) + + // When the heartbeat recovers, ConnHealth should too. + hb.setErr(nil) + require.Eventually(t, func() bool { + return nd.ConnHealth(staticNodeID, rpc.DefaultClass) == nil + }, time.Second, 10*time.Millisecond) + + // Tripping the breaker should return ErrBreakerOpen. + br := nd.getBreaker(staticNodeID, rpc.DefaultClass) + br.Trip() + require.Equal(t, circuit.ErrBreakerOpen, nd.ConnHealth(staticNodeID, rpc.DefaultClass)) + + // Resetting the breaker should recover ConnHealth. + br.Reset() + require.NoError(t, nd.ConnHealth(staticNodeID, rpc.DefaultClass)) + + // Closing the remote connection should fail ConnHealth. + require.NoError(t, ln.popConn().Close()) + require.Eventually(t, func() bool { + return nd.ConnHealth(staticNodeID, rpc.DefaultClass) != nil + }, time.Second, 10*time.Millisecond) +} + +func TestConnHealthTryDial(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + stopper := stop.NewStopper() + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + rpcCtx := newTestContext(clock, stopper) + rpcCtx.NodeID.Set(ctx, staticNodeID) + _, ln, hb := newTestServer(t, clock, stopper, true /* useHeartbeat */) + defer stopper.Stop(ctx) + nd := New(rpcCtx, newSingleNodeResolver(staticNodeID, ln.Addr())) + + // When no connection exists, we expect ConnHealthTryDial to dial the node, + // which will return ErrNoHeartbeat at first but eventually succeed. + require.Equal(t, rpc.ErrNotHeartbeated, nd.ConnHealthTryDial(staticNodeID, rpc.DefaultClass)) + require.Eventually(t, func() bool { + return nd.ConnHealthTryDial(staticNodeID, rpc.DefaultClass) == nil + }, time.Second, 10*time.Millisecond) + + // But it should error for other node ID. + require.Error(t, nd.ConnHealthTryDial(9, rpc.DefaultClass)) + + // When the heartbeat errors, ConnHealthTryDial should eventually error too. + hb.setErr(errors.New("boom")) + require.Eventually(t, func() bool { + return nd.ConnHealthTryDial(staticNodeID, rpc.DefaultClass) != nil + }, time.Second, 10*time.Millisecond) + + // When the heartbeat recovers, ConnHealthTryDial should too. + hb.setErr(nil) + require.Eventually(t, func() bool { + return nd.ConnHealthTryDial(staticNodeID, rpc.DefaultClass) == nil + }, time.Second, 10*time.Millisecond) + + // Tripping the breaker should return ErrBreakerOpen. + br := nd.getBreaker(staticNodeID, rpc.DefaultClass) + br.Trip() + require.Equal(t, circuit.ErrBreakerOpen, nd.ConnHealthTryDial(staticNodeID, rpc.DefaultClass)) + + // But it should eventually recover, when the breaker allows it. + require.Eventually(t, func() bool { + return nd.ConnHealthTryDial(staticNodeID, rpc.DefaultClass) == nil + }, 5*time.Second, 10*time.Millisecond) + + // Closing the remote connection should eventually recover. + require.NoError(t, ln.popConn().Close()) + require.Eventually(t, func() bool { + return nd.ConnHealthTryDial(staticNodeID, rpc.DefaultClass) == nil + }, time.Second, 10*time.Millisecond) +} + +func TestConnHealthInternal(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + stopper := stop.NewStopper() + localAddr := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 26657} + + // Set up an internal server and relevant configuration. The RPC connection + // will then be considered internal, and we don't have to dial it. + rpcCtx := newTestContext(clock, stopper) + rpcCtx.SetLocalInternalServer(&internalServer{}) + rpcCtx.NodeID.Set(ctx, staticNodeID) + rpcCtx.Config.AdvertiseAddr = localAddr.String() + + nd := New(rpcCtx, newSingleNodeResolver(staticNodeID, localAddr)) + defer stopper.Stop(ctx) + + // Even though we haven't dialed the node yet, the internal connection is + // always healthy. + require.NoError(t, nd.ConnHealth(staticNodeID, rpc.DefaultClass)) + require.NoError(t, nd.ConnHealth(staticNodeID, rpc.SystemClass)) + + // However, it does respect the breaker. + br := nd.getBreaker(staticNodeID, rpc.DefaultClass) + br.Trip() + require.Equal(t, circuit.ErrBreakerOpen, nd.ConnHealth(staticNodeID, rpc.DefaultClass)) + + br.Reset() + require.NoError(t, nd.ConnHealth(staticNodeID, rpc.DefaultClass)) + + // Other nodes still fail though. + require.Error(t, nd.ConnHealth(7, rpc.DefaultClass)) +} + func TestConcurrentCancellationAndTimeout(t *testing.T) { defer leaktest.AfterTest(t)() stopper, _, _, _, nd := setUpNodedialerTest(t, staticNodeID) @@ -182,7 +323,7 @@ func TestDisconnectsTrip(t *testing.T) { // in to the breaker are interesting ones as determined by shouldTrip. hb.setErr(fmt.Errorf("boom")) underlyingNetConn := ln.popConn() - assert.Nil(t, underlyingNetConn.Close()) + require.NoError(t, underlyingNetConn.Close()) const N = 1000 breakerEventChan := make(chan circuit.ListenerEvent, N) breaker.AddListener(breakerEventChan) @@ -226,12 +367,15 @@ func TestDisconnectsTrip(t *testing.T) { } } // Ensure that all of the interesting errors were seen by the breaker. - assert.Equal(t, errorsSeen, failsSeen) + require.Equal(t, errorsSeen, failsSeen) - // Ensure that the connection becomes healthy soon now that the heartbeat - // service is not returning errors. - hb.setErr(nil) // reset in case there were no errors + // Ensure that the connection eventually becomes healthy if we fix the + // heartbeat and keep dialing. + hb.setErr(nil) testutils.SucceedsSoon(t, func() error { + if _, err := nd.Dial(ctx, staticNodeID, rpc.DefaultClass); err != nil { + return err + } return nd.ConnHealth(staticNodeID, rpc.DefaultClass) }) } @@ -252,6 +396,8 @@ func setUpNodedialerTest( rpcCtx.NodeID.Set(context.Background(), nodeID) _, ln, hb = newTestServer(t, clock, stopper, true /* useHeartbeat */) nd = New(rpcCtx, newSingleNodeResolver(nodeID, ln.Addr())) + _, err := nd.Dial(context.Background(), nodeID, rpc.DefaultClass) + require.NoError(t, err) testutils.SucceedsSoon(t, func() error { return nd.ConnHealth(nodeID, rpc.DefaultClass) }) @@ -296,7 +442,7 @@ func newTestServer( func newTestContext(clock *hlc.Clock, stopper *stop.Stopper) *rpc.Context { cfg := testutils.NewNodeTestBaseContext() cfg.Insecure = true - cfg.RPCHeartbeatInterval = 10 * time.Millisecond + cfg.RPCHeartbeatInterval = 100 * time.Millisecond rctx := rpc.NewContext(rpc.ContextOptions{ TenantID: roachpb.SystemTenantID, AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, @@ -392,3 +538,61 @@ func (hb *heartbeatService) Ping( ServerVersion: hb.serverVersion, }, nil } + +var _ roachpb.InternalServer = &internalServer{} + +type internalServer struct{} + +func (*internalServer) Batch( + context.Context, *roachpb.BatchRequest, +) (*roachpb.BatchResponse, error) { + return nil, nil +} + +func (*internalServer) RangeLookup( + context.Context, *roachpb.RangeLookupRequest, +) (*roachpb.RangeLookupResponse, error) { + panic("unimplemented") +} + +func (*internalServer) RangeFeed( + *roachpb.RangeFeedRequest, roachpb.Internal_RangeFeedServer, +) error { + panic("unimplemented") +} + +func (*internalServer) GossipSubscription( + *roachpb.GossipSubscriptionRequest, roachpb.Internal_GossipSubscriptionServer, +) error { + panic("unimplemented") +} + +func (*internalServer) ResetQuorum( + context.Context, *roachpb.ResetQuorumRequest, +) (*roachpb.ResetQuorumResponse, error) { + panic("unimplemented") +} + +func (*internalServer) Join( + context.Context, *roachpb.JoinNodeRequest, +) (*roachpb.JoinNodeResponse, error) { + panic("unimplemented") +} + +func (*internalServer) TokenBucket( + ctx context.Context, in *roachpb.TokenBucketRequest, +) (*roachpb.TokenBucketResponse, error) { + panic("unimplemented") +} + +func (*internalServer) GetSpanConfigs( + context.Context, *roachpb.GetSpanConfigsRequest, +) (*roachpb.GetSpanConfigsResponse, error) { + panic("unimplemented") +} + +func (*internalServer) UpdateSpanConfigs( + context.Context, *roachpb.UpdateSpanConfigsRequest, +) (*roachpb.UpdateSpanConfigsResponse, error) { + panic("unimplemented") +} diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 8dba44107238..9d6628642ca8 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -165,7 +165,7 @@ func NewDistSQLPlanner( nodeDialer: nodeDialer, nodeHealth: distSQLNodeHealth{ gossip: gw, - connHealth: nodeDialer.ConnHealth, + connHealth: nodeDialer.ConnHealthTryDial, isAvailable: isAvailable, }, distSender: distSender,