Skip to content

Commit

Permalink
rpc: don't close gRPC connections on heartbeat timeouts
Browse files Browse the repository at this point in the history
Fixes cockroachdb#13989

Before this patch, the rpc.Context would perform heartbeats (a dedicated
RPC) to see if a connection is healthy. If the heartbeats failed, the
connection was closed (causing in-flight RPCs to fail) and the node was
marked as unhealthy.
These heartbeats, being regular RPCs, were subject to gRPC's flow
control. This means that they were easily blocked by other large RPCs,
which meant they were too feeble. In particular, they were easily
blocked by large DistSQL streams.

This patch moves to using gRPC's internal HTTP2 ping frames for checking
conn health. These are not subject to flow control. The grpc
transport-level connection is closed when they fail (and so in-flight
RPCs still fail), but otherwise gRPC reconnects transparently.
Heartbeats stay for the other current uses - clock skew detection and
node health marking. Marking a node as unhealthy is debatable, give the
shortcomings of these RPCs. However, this marking currently doesn't have
big consequences - it only affects the order in which replicas are tried
when a leaseholder is unknown.
  • Loading branch information
andreimatei committed Apr 7, 2017
1 parent 6e824ae commit 5994186
Show file tree
Hide file tree
Showing 4 changed files with 987 additions and 20 deletions.
33 changes: 13 additions & 20 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
"github.com/rubyist/circuitbreaker"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -247,7 +247,7 @@ func (ctx *Context) GRPCDial(target string, opts ...grpc.DialOption) (*grpc.Clie
dialOpt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
}

dialOpts := make([]grpc.DialOption, 0, 2+len(opts))
var dialOpts []grpc.DialOption
dialOpts = append(dialOpts, dialOpt)
dialOpts = append(dialOpts, grpc.WithBackoffMaxDelay(maxBackoff))
dialOpts = append(dialOpts, grpc.WithDecompressor(snappyDecompressor{}))
Expand All @@ -256,6 +256,17 @@ func (ctx *Context) GRPCDial(target string, opts ...grpc.DialOption) (*grpc.Clie
if ctx.rpcCompression {
dialOpts = append(dialOpts, grpc.WithCompressor(snappyCompressor{}))
}
dialOpts = append(dialOpts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
// Send periodic pings on the connection.
Time: base.NetworkTimeout,
// If the pings don't get a response within the timeout, we might be
// experiencing a network partition. gRPC will close the transport-level
// connection and all the pending RPCs (which may not have timeouts) will
// fail eagerly. gRPC will then reconnect the transport transparently.
Timeout: base.NetworkTimeout,
// Do the pings even when there are no ongoing RPCs.
PermitWithoutStream: true,
}))
dialOpts = append(dialOpts, opts...)

if SourceAddr != nil {
Expand Down Expand Up @@ -359,24 +370,6 @@ func (ctx *Context) runHeartbeat(meta *connMeta, remoteAddr string) error {
meta.heartbeatErr = err
ctx.conns.Unlock()

// If we got a timeout, we might be experiencing a network partition. We
// close the connection so that all other pending RPCs (which may not have
// timeouts) fail eagerly. Any other error is likely to be noticed by
// other RPCs, so it's OK to leave the connection open while grpc
// internally reconnects if necessary.
//
// NB: This check is skipped when the connection is initiated from a CLI
// client since those clients aren't sensitive to partitions, are likely
// to be invoked while the server is starting (particularly in tests), and
// are not equipped with the retry logic necessary to deal with this
// connection termination.
//
// TODO(tamird): That we rely on the zero maxOffset to indicate a CLI
// client is a hack; we should do something more explicit.
if maxOffset != 0 && grpc.Code(err) == codes.DeadlineExceeded {
return err
}

// HACK: work around https://github.com/grpc/grpc-go/issues/1026
// Getting a "connection refused" error from the "write" system call
// has confused grpc's error handling and this connection is permanently
Expand Down
127 changes: 127 additions & 0 deletions pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package rpc

import (
"math"
"net"
"runtime"
"sync"
Expand All @@ -25,9 +26,11 @@ import (
"time"

"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"

"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util"
Expand Down Expand Up @@ -551,3 +554,127 @@ func TestRemoteOffsetUnhealthy(t *testing.T) {
}
}
}

// This is a smoketest for gRPC Keepalives: rpc.Context asks gRPC to perform
// periodic pings on the transport to check that it's still alive. If the ping
// doesn't get a pong within a timeout, the transport is supposed to be closed -
// that's what we're testing here.
func TestGRPCKeepaliveFailureFailsInflightRPCs(t *testing.T) {
defer leaktest.AfterTest(t)()

stopper := stop.NewStopper()
defer stopper.Stop()

clock := hlc.NewClock(time.Unix(0, 20).UnixNano, time.Nanosecond)
serverCtx := NewContext(
log.AmbientContext{},
testutils.NewNodeTestBaseContext(),
clock,
stopper,
)
s, ln := newTestServer(t, serverCtx, true)
remoteAddr := ln.Addr().String()

RegisterHeartbeatServer(s, &HeartbeatService{
clock: clock,
remoteClockMonitor: serverCtx.RemoteClocks,
})

clientCtx := NewContext(
log.AmbientContext{}, testutils.NewNodeTestBaseContext(), clock, stopper)
// Disable automatic heartbeats. We'll send them by hand.
clientCtx.heartbeatInterval = math.MaxInt64

var firstConn int32 = 1

// We're going to open RPC transport connections using a dialer that returns
// PartitionableConns. We'll partition the first opened connection.
dialerCh := make(chan *testutils.PartitionableConn, 1)
conn, err := clientCtx.GRPCDial(remoteAddr,
grpc.WithDialer(
func(addr string, timeout time.Duration) (net.Conn, error) {
if !atomic.CompareAndSwapInt32(&firstConn, 1, 0) {
// If we allow gRPC to open a 2nd transport connection, then our RPCs
// might succeed if they're sent on that one. In the spirit of a
// partition, we'll return errors for the attempt to open a new
// connection (albeit for a TCP connection the error would come after
// a socket connect timeout).
return nil, errors.Errorf("No more connections for you. We're partitioned.")
}

conn, err := net.DialTimeout("tcp", addr, timeout)
if err != nil {
return nil, err
}
transportConn := testutils.NewPartitionableConn(conn)
dialerCh <- transportConn
return transportConn, nil
}),
// Override the keepalive settings that the rpc.Context uses to more
// aggressive ones, so that the test doesn't take long.
grpc.WithKeepaliveParams(
keepalive.ClientParameters{
// The aggressively low timeout we set here makes the connection very
// flaky for any RPC use, particularly when running under stress with -p
// 100. This test can't expect any RPCs to succeed reliably.
Time: time.Millisecond,
Timeout: 5 * time.Millisecond,
PermitWithoutStream: false,
}),
)
if err != nil {
t.Fatal(err)
}
defer func() { _ = conn.Close() }()

// We'll expect any of the errors which tests revealed that the RPC call might
// return when an RPC's transport connection is closed because of the
// heartbeats timing out.
gRPCErrorsRegex := "transport is closing|" +
"rpc error: code = Unavailable desc = grpc: the connection is unavailable|" +
"rpc error: code = Internal desc = transport: io: read/write on closed pipe|" +
"rpc error: code = Internal desc = transport: tls: use of closed connection|" +
"rpc error: code = Internal desc = transport: EOF|" +
"use of closed network connection"

// Perform an RPC so that a connection gets opened. In theory this RPC should
// succeed (and it does when running without too much stress), but we can't
// rely on that - see comment on the timeout above.
heartbeatClient := NewHeartbeatClient(conn)
request := PingRequest{}
if _, err := heartbeatClient.Ping(context.TODO(), &request); err != nil {
if !testutils.IsError(err, gRPCErrorsRegex) {
t.Fatal(err)
}
// In the rare eventuality that we got the expected error, this test
// succeeded: even though we didn't partition the connection, the low gRPC
// keepalive timeout caused our RPC to fail (happens occasionally under
// stress -p 100). We're going to let the rest of the test code run, to make
// sure it's exercised.
// If the heartbeats didn't timeout (the normal case), we're going to
// simulate a network partition and then the heartbeats must timeout.
log.Infof(context.TODO(), "test returning early; no partition done")
}

// Now partition client->server and attempt to perform an RPC. We expect it to
// fail once the grpc keepalive fails to get a response from the server.

transportConn := <-dialerCh
defer transportConn.Finish()

transportConn.PartitionC2S()

if _, err := heartbeatClient.Ping(context.TODO(), &request); !testutils.IsError(
err, gRPCErrorsRegex) {
t.Fatal(err)
}

// If the DialOptions we passed to gRPC didn't prevent it from opening new
// connections, then next RPCs would succeed since gRPC reconnects the
// transport (and that would succeed here since we've only partitioned one
// connection). We could further test that the status reported by
// Context.ConnHealth() for the remote node moves to UNAVAILABLE because of
// the (application-level) heartbeats performed by rpc.Context, but the
// behaviour of our heartbeats in the face of transport failures is
// sufficiently tested in TestHeartbeatHealthTransport.
}
Loading

0 comments on commit 5994186

Please sign in to comment.