Skip to content

Commit

Permalink
rpc: don't close gRPC connections on heartbeat timeouts
Browse files Browse the repository at this point in the history
Fixes cockroachdb#13989

Before this patch, the rpc.Context would perform heartbeats (a dedicated
RPC) to see if a connection is healthy. If the heartbeats failed, the
connection was closed (causing in-flight RPCs to fail) and the node was
marked as unhealthy.
These heartbeats, being regular RPCs, were subject to gRPC's flow
control. This means that they were easily blocked by other large RPCs,
which meant they were too feeble. In particular, they were easily
blocked by large DistSQL streams.

This patch moves to using gRPC's internal HTTP2 ping frames for checking
conn health. These are not subject to flow control. The grpc
transport-level connection is closed when they fail (and so in-flight
RPCs still fail), but otherwise gRPC reconnects transparently.
Heartbeats stay for the other current uses - clock skew detection and
node health marking. Marking a node as unhealthy is debatable, give the
shortcomings of these RPCs. However, this marking currently doesn't have
big consequences - it only affects the order in which replicas are tried
when a leaseholder is unknown.
  • Loading branch information
andreimatei committed Mar 30, 2017
1 parent aa70666 commit 08244a7
Show file tree
Hide file tree
Showing 4 changed files with 639 additions and 20 deletions.
32 changes: 12 additions & 20 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
"github.com/rubyist/circuitbreaker"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -247,7 +247,7 @@ func (ctx *Context) GRPCDial(target string, opts ...grpc.DialOption) (*grpc.Clie
dialOpt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
}

dialOpts := make([]grpc.DialOption, 0, 2+len(opts))
var dialOpts []grpc.DialOption
dialOpts = append(dialOpts, dialOpt)
dialOpts = append(dialOpts, grpc.WithBackoffMaxDelay(maxBackoff))
dialOpts = append(dialOpts, grpc.WithDecompressor(snappyDecompressor{}))
Expand All @@ -256,6 +256,16 @@ func (ctx *Context) GRPCDial(target string, opts ...grpc.DialOption) (*grpc.Clie
if ctx.rpcCompression {
dialOpts = append(dialOpts, grpc.WithCompressor(snappyCompressor{}))
}
dialOpts = append(dialOpts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
// Send periodic pings on the connection.
Time: base.NetworkTimeout,
// If the pings don't get a response within the timeout, the connection
// will be closed: we might be experiencing a network partition. All the
// pending RPCs (which may not have timeouts) will fail eagerly.
Timeout: base.NetworkTimeout,
// Do the pings even when there are no ongoing RPCs.
PermitWithoutStream: true,
}))
dialOpts = append(dialOpts, opts...)

if SourceAddr != nil {
Expand Down Expand Up @@ -359,24 +369,6 @@ func (ctx *Context) runHeartbeat(meta *connMeta, remoteAddr string) error {
meta.heartbeatErr = err
ctx.conns.Unlock()

// If we got a timeout, we might be experiencing a network partition. We
// close the connection so that all other pending RPCs (which may not have
// timeouts) fail eagerly. Any other error is likely to be noticed by
// other RPCs, so it's OK to leave the connection open while grpc
// internally reconnects if necessary.
//
// NB: This check is skipped when the connection is initiated from a CLI
// client since those clients aren't sensitive to partitions, are likely
// to be invoked while the server is starting (particularly in tests), and
// are not equipped with the retry logic necessary to deal with this
// connection termination.
//
// TODO(tamird): That we rely on the zero maxOffset to indicate a CLI
// client is a hack; we should do something more explicit.
if maxOffset != 0 && grpc.Code(err) == codes.DeadlineExceeded {
return err
}

// HACK: work around https://github.com/grpc/grpc-go/issues/1026
// Getting a "connection refused" error from the "write" system call
// has confused grpc's error handling and this connection is permanently
Expand Down
95 changes: 95 additions & 0 deletions pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ import (
"time"

"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"

"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util"
Expand Down Expand Up @@ -546,3 +548,96 @@ func TestRemoteOffsetUnhealthy(t *testing.T) {
}
}
}

// This is a smoketest for gRPC Keepalives: rpc.Context asks gRPC to perform
// periodic pings on the transport to check that it's still alive. If the ping
// doesn't get a pong within a timeout, the transport is supposed to be closed -
// that's what we're testing here.
func TestGRPCKeepaliveFailureFailsInflightRPCs(t *testing.T) {
defer leaktest.AfterTest(t)()
if testing.Short() {
t.Skip("short flag")
}

stopper := stop.NewStopper()
defer stopper.Stop()

clock := hlc.NewClock(time.Unix(0, 20).UnixNano, time.Nanosecond)
serverCtx := NewContext(
log.AmbientContext{},
testutils.NewNodeTestBaseContext(),
clock,
stopper,
)
s, ln := newTestServer(t, serverCtx, true)
remoteAddr := ln.Addr().String()

RegisterHeartbeatServer(s, &HeartbeatService{
clock: clock,
remoteClockMonitor: serverCtx.RemoteClocks,
})

clientCtx := NewContext(
log.AmbientContext{}, testutils.NewNodeTestBaseContext(), clock, stopper)
// Disable automatic heartbeats. We'll send them by hand.
clientCtx.heartbeatInterval = time.Hour

// We're going to open RPC transport connections using a dialer that returns
// PartitionableConns. We'll partition the first opened connection.
dialerCh := make(chan *testutils.PartitionableConn, 1)
conn, err := clientCtx.GRPCDial(remoteAddr,
grpc.WithDialer(
func(addr string, timeout time.Duration) (net.Conn, error) {
dialer := net.Dialer{
Timeout: timeout,
}
conn, err := dialer.Dial("tcp", addr)
if err != nil {
return nil, err
}
transportConn := testutils.NewPartitionableConn(conn)
dialerCh <- transportConn
return transportConn, nil
}),
// Override the keepalive settings that the grpContext uses to more
// aggressive ones.
grpc.WithKeepaliveParams(
keepalive.ClientParameters{
Time: time.Millisecond,
Timeout: time.Millisecond,
// Do the pings even when there are no ongoing RPCs.
PermitWithoutStream: true,
}),
)
if err != nil {
t.Fatal(err)
}

// Check that we can perform a heartbeat.
heartbeatClient := NewHeartbeatClient(conn)
request := PingRequest{}
if _, err := heartbeatClient.Ping(context.TODO(), &request); err != nil {
t.Fatal(err)
}

// Now partition client->server and attempt to perform an RPC. We expect it to
// fail once the grpc keepalive fails to get a response from the server.

transportConn := <-dialerCh
transportConn.PartitionC2S()

if _, err := heartbeatClient.Ping(context.TODO(), &request); !testutils.IsError(
err, "transport is closing") {
t.Fatal(err)
}

// Next RPCs would succeed since gRPC reconnects the transport (and that
// would succeed here since we've only partitioned one connection). We could
// find a way to simulate a partition more realistically by not accepting new
// connections, and test that the status reported by Context.ConnHealth() for the
// remote node moves to UNAVAILABLE, but the behaviour of our heartbeats in
// the face of transport failures is sufficiently tested in
// TestHeartbeatHealthTransport.

transportConn.Finish()
}
Loading

0 comments on commit 08244a7

Please sign in to comment.