Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc: Perform initial-heartbeat validation on GRPC reconnections #22518

Merged
merged 2 commits into from
Feb 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/acceptance/localcluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ func (n *Node) StatusClient() serverpb.StatusClient {
return existingClient
}

conn, err := n.rpcCtx.GRPCDialRaw(n.RPCAddr())
conn, _, err := n.rpcCtx.GRPCDialRaw(n.RPCAddr())
if err != nil {
log.Fatalf(context.Background(), "failed to initialize status client: %s", err)
}
Expand Down
75 changes: 56 additions & 19 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)
Expand All @@ -65,6 +66,8 @@ const (
initialConnWindowSize = initialWindowSize * 16 // for a connection
)

var errRedialing = errors.New("redialing")

// sourceAddr is the environment-provided local address for outgoing
// connections.
var sourceAddr = func() net.Addr {
Expand Down Expand Up @@ -423,13 +426,43 @@ func (ctx *Context) GRPCDialOptions() ([]grpc.DialOption, error) {
return dialOpts, nil
}

// onlyOnceDialer implements the grpc.WithDialer interface but only
// allows a single connection attempt. If a reconnection is attempted,
// redialChan is closed to signal a higher-level retry loop. This
// ensures that our initial heartbeat (and its version/clusterID
// validation) occurs on every new connection.
type onlyOnceDialer struct {
syncutil.Mutex
dialed bool
closed bool
redialChan chan struct{}
}

func (ood *onlyOnceDialer) dial(addr string, timeout time.Duration) (net.Conn, error) {
ood.Lock()
defer ood.Unlock()
if !ood.dialed {
ood.dialed = true
dialer := net.Dialer{
Timeout: timeout,
LocalAddr: sourceAddr,
}
return dialer.Dial("tcp", addr)
} else if !ood.closed {
ood.closed = true
close(ood.redialChan)
}
return nil, errRedialing
}

// GRPCDialRaw calls grpc.Dial with options appropriate for the context.
// Unlike GRPCDial, it does not start an RPC heartbeat to validate the
// connection.
func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, error) {
// connection. This connection will not be reconnected automatically;
// the returned channel is closed when a reconnection is attempted.
func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, <-chan struct{}, error) {
dialOpts, err := ctx.GRPCDialOptions()
if err != nil {
return nil, err
return nil, nil, err
}

// Add a stats handler to measure client network stats.
Expand All @@ -440,24 +473,22 @@ func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, error) {
dialOpts = append(dialOpts,
grpc.WithInitialWindowSize(initialWindowSize),
grpc.WithInitialConnWindowSize(initialConnWindowSize))
dialOpts = append(dialOpts, ctx.testingDialOpts...)

if sourceAddr != nil {
dialOpts = append(dialOpts, grpc.WithDialer(
func(addr string, timeout time.Duration) (net.Conn, error) {
dialer := net.Dialer{
Timeout: timeout,
LocalAddr: sourceAddr,
}
return dialer.Dial("tcp", addr)
},
))
dialer := onlyOnceDialer{
redialChan: make(chan struct{}),
}
dialOpts = append(dialOpts, grpc.WithDialer(dialer.dial))

// add testingDialOpts after our dialer because one of our tests
// uses a custom dialer (this disables the only-one-connection
// behavior and redialChan will never be closed).
dialOpts = append(dialOpts, ctx.testingDialOpts...)

if log.V(1) {
log.Infof(ctx.masterCtx, "dialing %s", target)
}
return grpc.DialContext(ctx.masterCtx, target, dialOpts...)
conn, err := grpc.DialContext(ctx.masterCtx, target, dialOpts...)
return conn, dialer.redialChan, err
}

// GRPCDial calls grpc.Dial with options appropriate for the context.
Expand All @@ -469,7 +500,8 @@ func (ctx *Context) GRPCDial(target string) *Connection {

conn := value.(*Connection)
conn.initOnce.Do(func() {
conn.grpcConn, conn.dialErr = ctx.GRPCDialRaw(target)
var redialChan <-chan struct{}
conn.grpcConn, redialChan, conn.dialErr = ctx.GRPCDialRaw(target)
if ctx.GetLocalInternalServerForAddr(target) != nil {
conn.heartbeatResult.Store(heartbeatResult{err: nil, everSucceeded: true})
conn.setInitialHeartbeatDone()
Expand All @@ -479,7 +511,7 @@ func (ctx *Context) GRPCDial(target string) *Connection {
if err := ctx.Stopper.RunTask(
ctx.masterCtx, "rpc.Context: grpc heartbeat", func(masterCtx context.Context) {
ctx.Stopper.RunWorker(masterCtx, func(masterCtx context.Context) {
err := ctx.runHeartbeat(conn, target)
err := ctx.runHeartbeat(conn, target, redialChan)
if err != nil && !grpcutil.IsClosedConnection(err) {
log.Errorf(masterCtx, "removing connection to %s due to error: %s", target, err)
}
Expand All @@ -505,7 +537,8 @@ func (ctx *Context) NewBreaker() *circuit.Breaker {
}

// ErrNotConnected is returned by ConnHealth when there is no connection to the
// host (e.g. GRPCDial was never called for that address).
// host (e.g. GRPCDial was never called for that address, or a connection has
// been closed and not reconnected).
var ErrNotConnected = errors.New("not connected")

// ErrNotHeartbeated is returned by ConnHealth when we have not yet performed
Expand All @@ -526,7 +559,9 @@ func (ctx *Context) ConnHealth(target string) error {
return ErrNotConnected
}

func (ctx *Context) runHeartbeat(conn *Connection, target string) error {
func (ctx *Context) runHeartbeat(
conn *Connection, target string, redialChan <-chan struct{},
) error {
maxOffset := ctx.LocalClock.MaxOffset()
clusterID := ctx.ClusterID.Get()

Expand All @@ -546,6 +581,8 @@ func (ctx *Context) runHeartbeat(conn *Connection, target string) error {
everSucceeded := false
for {
select {
case <-redialChan:
return errRedialing
case <-ctx.Stopper.ShouldStop():
return nil
case <-heartbeatTimer.C:
Expand Down
14 changes: 13 additions & 1 deletion pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,14 @@ func TestHeartbeatHealthTransport(t *testing.T) {
}

isUnhealthy := func(err error) bool {
// Most of the time, an unhealthy connection will get
// ErrNotConnected, but there are brief periods during which we
// could get ErrNotHeartbeated (while we're trying to start a new
// connection) or one of the grpc errors below (while the old
// connection is in the middle of closing).
if err == ErrNotConnected || err == ErrNotHeartbeated {
return true
}
// The expected code here is Unavailable, but at least on OSX you can also get
//
// rpc error: code = Internal desc = connection error: desc = "transport: authentication
Expand Down Expand Up @@ -417,12 +425,16 @@ func TestHeartbeatHealthTransport(t *testing.T) {
if timeutil.Since(then) > 45*time.Second {
t.Fatal(err)
}
time.Sleep(10 * time.Millisecond)
}

close(done)

// Should become healthy again after GRPC reconnects.
// We can reconnect and the connection becomes healthy again.
testutils.SucceedsSoon(t, func() error {
if _, err := clientCtx.GRPCDial(remoteAddr).Connect(context.Background()); err != nil {
return err
}
return clientCtx.ConnHealth(remoteAddr)
})

Expand Down
31 changes: 1 addition & 30 deletions pkg/storage/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"bytes"
"context"
"fmt"
"strings"

"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -99,42 +98,14 @@ func (r *raftLogger) Fatalf(format string, v ...interface{}) {
func (r *raftLogger) Panic(v ...interface{}) {
s := fmt.Sprint(v...)
log.ErrorfDepth(r.ctx, 1, s)
if is14231Error(s) {
log.Shout(r.ctx, log.Severity_ERROR, is14231HelpMessage)
}
panic(s)
}

func (r *raftLogger) Panicf(format string, v ...interface{}) {
s := fmt.Sprintf(format, v...)
log.ErrorfDepth(r.ctx, 1, s)
if is14231Error(s) {
log.Shout(r.ctx, log.Severity_ERROR, is14231HelpMessage)
}
log.ErrorfDepth(r.ctx, 1, format, v...)
panic(fmt.Sprintf(format, v...))
}

// Returns whether the provided error message matches the error observed in
// issue #14231. Such errors are likely to be the result of wiping a store's
// data directory and bringing it back online without join flags but with the
// same address, node ID, and store ID, and join flags. While we should prevent
// these errors in our next release, for now the best we can do is help users
// understand them.
//
// TODO(#14231): Remove this method once the issue is fixed (hopefully in 1.2).
func is14231Error(s string) bool {
return strings.Contains(s, "is out of range [lastIndex(") &&
strings.Contains(s, "Was the raft log corrupted, truncated, or lost?")
}

// TODO(#14231): Remove this once the issue is fixed (hopefully in 1.2).
const is14231HelpMessage = "Server crashing due to missing data. Was the server restarted with a\n" +
"different store directory than before? A --join parameter must be specified\n" +
"in order to make restarts with a new store directory safe. Please try again\n" +
"with a new directory and a valid --join flag. If this server was started with\n" +
"its old data or with a valid --join parameter and you are still seeing this,\n" +
"please report an issue at https://github.com/cockroachdb/cockroach/issues/new"

func logRaftReady(ctx context.Context, ready raft.Ready) {
if log.V(5) {
var buf bytes.Buffer
Expand Down