diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index f2e5ac2d7fff..01a3102fd6c0 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -28,8 +28,8 @@ import ( "github.com/rubyist/circuitbreaker" "golang.org/x/net/context" "google.golang.org/grpc" - "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/keepalive" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -247,7 +247,7 @@ func (ctx *Context) GRPCDial(target string, opts ...grpc.DialOption) (*grpc.Clie dialOpt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)) } - dialOpts := make([]grpc.DialOption, 0, 2+len(opts)) + var dialOpts []grpc.DialOption dialOpts = append(dialOpts, dialOpt) dialOpts = append(dialOpts, grpc.WithBackoffMaxDelay(maxBackoff)) dialOpts = append(dialOpts, grpc.WithDecompressor(snappyDecompressor{})) @@ -256,6 +256,16 @@ func (ctx *Context) GRPCDial(target string, opts ...grpc.DialOption) (*grpc.Clie if ctx.rpcCompression { dialOpts = append(dialOpts, grpc.WithCompressor(snappyCompressor{})) } + dialOpts = append(dialOpts, grpc.WithKeepaliveParams(keepalive.ClientParameters{ + // Send periodic pings on the connection. + Time: base.NetworkTimeout, + // If the pings don't get a response within the timeout, the connection + // will be closed: we might be experiencing a network partition. All the + // pending RPCs (which may not have timeouts) will fail eagerly. + Timeout: base.NetworkTimeout, + // Do the pings even when there are no ongoing RPCs. + PermitWithoutStream: true, + })) dialOpts = append(dialOpts, opts...) if SourceAddr != nil { @@ -359,24 +369,6 @@ func (ctx *Context) runHeartbeat(meta *connMeta, remoteAddr string) error { meta.heartbeatErr = err ctx.conns.Unlock() - // If we got a timeout, we might be experiencing a network partition. We - // close the connection so that all other pending RPCs (which may not have - // timeouts) fail eagerly. Any other error is likely to be noticed by - // other RPCs, so it's OK to leave the connection open while grpc - // internally reconnects if necessary. - // - // NB: This check is skipped when the connection is initiated from a CLI - // client since those clients aren't sensitive to partitions, are likely - // to be invoked while the server is starting (particularly in tests), and - // are not equipped with the retry logic necessary to deal with this - // connection termination. - // - // TODO(tamird): That we rely on the zero maxOffset to indicate a CLI - // client is a hack; we should do something more explicit. - if maxOffset != 0 && grpc.Code(err) == codes.DeadlineExceeded { - return err - } - // HACK: work around https://github.com/grpc/grpc-go/issues/1026 // Getting a "connection refused" error from the "write" system call // has confused grpc's error handling and this connection is permanently diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index a0f38a41a64e..077d0b1da59f 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -24,9 +24,11 @@ import ( "time" "github.com/pkg/errors" + "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/keepalive" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util" @@ -546,3 +548,96 @@ func TestRemoteOffsetUnhealthy(t *testing.T) { } } } + +// This is a smoketest for gRPC Keepalives: rpc.Context asks gRPC to perform +// periodic pings on the transport to check that it's still alive. If the ping +// doesn't get a pong within a timeout, the transport is supposed to be closed - +// that's what we're testing here. +func TestGRPCKeepaliveFailureFailsInflightRPCs(t *testing.T) { + defer leaktest.AfterTest(t)() + if testing.Short() { + t.Skip("short flag") + } + + stopper := stop.NewStopper() + defer stopper.Stop() + + clock := hlc.NewClock(time.Unix(0, 20).UnixNano, time.Nanosecond) + serverCtx := NewContext( + log.AmbientContext{}, + testutils.NewNodeTestBaseContext(), + clock, + stopper, + ) + s, ln := newTestServer(t, serverCtx, true) + remoteAddr := ln.Addr().String() + + RegisterHeartbeatServer(s, &HeartbeatService{ + clock: clock, + remoteClockMonitor: serverCtx.RemoteClocks, + }) + + clientCtx := NewContext( + log.AmbientContext{}, testutils.NewNodeTestBaseContext(), clock, stopper) + // Disable automatic heartbeats. We'll send them by hand. + clientCtx.heartbeatInterval = time.Hour + + // We're going to open RPC transport connections using a dialer that returns + // PartitionableConns. We'll partition the first opened connection. + dialerCh := make(chan *testutils.PartitionableConn, 1) + conn, err := clientCtx.GRPCDial(remoteAddr, + grpc.WithDialer( + func(addr string, timeout time.Duration) (net.Conn, error) { + dialer := net.Dialer{ + Timeout: timeout, + } + conn, err := dialer.Dial("tcp", addr) + if err != nil { + return nil, err + } + transportConn := testutils.NewPartitionableConn(conn) + dialerCh <- transportConn + return transportConn, nil + }), + // Override the keepalive settings that the grpContext uses to more + // aggressive ones. + grpc.WithKeepaliveParams( + keepalive.ClientParameters{ + Time: time.Millisecond, + Timeout: time.Millisecond, + // Do the pings even when there are no ongoing RPCs. + PermitWithoutStream: true, + }), + ) + if err != nil { + t.Fatal(err) + } + + // Check that we can perform a heartbeat. + heartbeatClient := NewHeartbeatClient(conn) + request := PingRequest{} + if _, err := heartbeatClient.Ping(context.TODO(), &request); err != nil { + t.Fatal(err) + } + + // Now partition client->server and attempt to perform an RPC. We expect it to + // fail once the grpc keepalive fails to get a response from the server. + + transportConn := <-dialerCh + transportConn.PartitionC2S() + + if _, err := heartbeatClient.Ping(context.TODO(), &request); !testutils.IsError( + err, "transport is closing") { + t.Fatal(err) + } + + // Next RPCs would succeed since gRPC reconnects the transport (and that + // would succeed here since we've only partitioned one connection). We could + // find a way to simulate a partition more realistically by not accepting new + // connections, and test that the status reported by Context.ConnHealth() for the + // remote node moves to UNAVAILABLE, but the behaviour of our heartbeats in + // the face of transport failures is sufficiently tested in + // TestHeartbeatHealthTransport. + + transportConn.Finish() +} diff --git a/pkg/testutils/net.go b/pkg/testutils/net.go new file mode 100644 index 000000000000..b744bc8b8f18 --- /dev/null +++ b/pkg/testutils/net.go @@ -0,0 +1,242 @@ +// Copyright 2017 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Andrei Matei (andreimatei1@gmail.com) + +package testutils + +import ( + "io" + "net" + "sync" + + "golang.org/x/net/context" + + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// PartitionableConn is an implementation of net.Conn that allows the +// client->server and/or the server->client directions to be temporarily +// partitioned. +// +// A PartitionableConn wraps a provided net.Conn (the serverConn member) and +// pipes every read and write to it. +// +// While a direction is partitioned, data send in that direction doesn't flow. A +// write done while partitioned may block. Data written to the conn after the +// partition has been established is not delivered to the remote party until the +// partition is lifted. Data written before the partition is established may or +// may not be delivered: every write that returns before the partition except +// the last one will be delivered. The last one may or may not be; use +// application-level ACKs if that's important. +type PartitionableConn struct { + // We embed a net.Conn so that we inherit the interface. Note that we override + // Read() and Write() though. + // This embedded Conn is half of a net.Pipe(). The other half is clientConn. + net.Conn + + clientConn net.Conn + serverConn net.Conn + + mu struct { + syncutil.Mutex + + // err, if set, is returned by any subsequent call to Read or Write. + err error + + // Are any of the two direction (client-to-server, server-to-client) + // currently partitioned? + c2sPartitioned bool + s2cPartitioned bool + + c2sWaiter *sync.Cond + s2cWaiter *sync.Cond + } +} + +// NewPartitionableConn wraps serverConn in a PartitionableConn. +func NewPartitionableConn(serverConn net.Conn) *PartitionableConn { + clientEnd, clientConn := net.Pipe() + c := &PartitionableConn{ + Conn: clientEnd, + clientConn: clientConn, + serverConn: serverConn, + } + c.mu.c2sWaiter = sync.NewCond(&c.mu.Mutex) + c.mu.s2cWaiter = sync.NewCond(&c.mu.Mutex) + + // Start copying from client to server. + go func() { + err := c.copy(copyArgs{ + src: c.clientConn, + dst: c.serverConn, + mu: &c.mu.Mutex, + partitioned: &c.mu.c2sPartitioned, + wait: c.mu.c2sWaiter, + }) + c.mu.Lock() + c.mu.err = err + c.mu.Unlock() + if err := c.clientConn.Close(); err != nil { + log.Fatalf(context.TODO(), "unexpected error closing internal pipe: %s", err) + } + if err := c.serverConn.Close(); err != nil { + log.Errorf(context.TODO(), "error closing server conn: %s", err) + } + }() + + // Start copying from server to client. + go func() { + err := c.copy(copyArgs{ + src: c.serverConn, + dst: c.clientConn, + mu: &c.mu.Mutex, + partitioned: &c.mu.s2cPartitioned, + wait: c.mu.s2cWaiter, + }) + c.mu.Lock() + c.mu.err = err + c.mu.Unlock() + if err := c.clientConn.Close(); err != nil { + log.Fatalf(context.TODO(), "unexpected error closing internal pipe: %s", err) + } + if err := c.serverConn.Close(); err != nil { + log.Errorf(context.TODO(), "error closing server conn: %s", err) + } + }() + + return c +} + +// Finish removes any partitions that may exist so that blocked goroutines can +// finish. +// Finish() must be called if a connection may have been left in a partitioned +// state. +func (c *PartitionableConn) Finish() { + c.mu.Lock() + c.mu.c2sPartitioned = false + c.mu.c2sWaiter.Signal() + c.mu.s2cPartitioned = false + c.mu.s2cWaiter.Signal() + c.mu.Unlock() +} + +// PartitionC2S partitions the client-to-server direction. +// If UnpartitionC2S() is not called, Finish() must be called. +func (c *PartitionableConn) PartitionC2S() { + c.mu.Lock() + c.mu.c2sPartitioned = true + c.mu.Unlock() +} + +// UnpartitionC2S lifts an existing client-to-server partition. +func (c *PartitionableConn) UnpartitionC2S() { + c.mu.Lock() + if !c.mu.c2sPartitioned { + panic("not partitioned") + } + c.mu.c2sPartitioned = false + c.mu.c2sWaiter.Signal() + c.mu.Unlock() +} + +// PartitionS2C partitions the server-to-client direction. +// If UnpartitionS2C() is not called, Finish() must be called. +func (c *PartitionableConn) PartitionS2C() { + c.mu.Lock() + c.mu.s2cPartitioned = true + c.mu.Unlock() +} + +// UnpartitionS2C lifts an existing server-to-client partition. +func (c *PartitionableConn) UnpartitionS2C() { + c.mu.Lock() + if !c.mu.s2cPartitioned { + panic("not partitioned") + } + c.mu.s2cPartitioned = false + c.mu.s2cWaiter.Signal() + c.mu.Unlock() +} + +// Read is part of the net.Conn interface. +func (c *PartitionableConn) Read(b []byte) (n int, err error) { + c.mu.Lock() + err = c.mu.err + c.mu.Unlock() + if err != nil { + return 0, err + } + + // Forward to the embedded connection. + return c.Conn.Read(b) +} + +// Write is part of the net.Conn interface. +func (c *PartitionableConn) Write(b []byte) (n int, err error) { + c.mu.Lock() + err = c.mu.err + c.mu.Unlock() + if err != nil { + return 0, err + } + + // Forward to the embedded connection. + return c.Conn.Write(b) +} + +type copyArgs struct { + src net.Conn + dst net.Conn + + mu *syncutil.Mutex + partitioned *bool + // When partitioned is set, wait can be used to be signaled when the partition + // is lifted. mu needs to be held before waiting. + wait *sync.Cond +} + +// copy copies data from args.src to args.dst until args.src.Read() returns EOF. +// The EOF is returned (i.e. the return value is always != nil). This is because +// the PartitionableConn wants to hold on to any error, including EOF. +func (c *PartitionableConn) copy(args copyArgs) error { + buf := make([]byte, 1000) + var written int64 + for { + nr, err := args.src.Read(buf) + + args.mu.Lock() + for *args.partitioned { + args.wait.Wait() + } + args.mu.Unlock() + + if nr > 0 { + nw, ew := args.dst.Write(buf[0:nr]) + if nw > 0 { + written += int64(nw) + } + if ew != nil { + err = ew + } + if nr != nw { + err = io.ErrShortWrite + } + } + if err != nil { + return err + } + } +} diff --git a/pkg/testutils/net_test.go b/pkg/testutils/net_test.go new file mode 100644 index 000000000000..8b125e5f83af --- /dev/null +++ b/pkg/testutils/net_test.go @@ -0,0 +1,290 @@ +// Copyright 2017 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Andrei Matei (andreimatei1@gmail.com) + +package testutils + +import ( + "bufio" + "fmt" + "io" + "net" + "testing" + "time" + + "golang.org/x/net/context" + + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/netutil" + "github.com/pkg/errors" +) + +// RunEchoServer runs a network server that accepts all the connections from ln +// and echos the data sent on them. +// +// If serverSideCh != nil, every slice of data received by the server is sent +// also sent on this channel before being echoed back on the connection it came +// on. Useful to observe what the server has received when this server is used +// with partitioned connections. +func RunEchoServer(ln net.Listener, serverSideCh chan<- []byte) { + for { + conn, err := ln.Accept() + netutil.FatalIfUnexpected(err) + if err != nil { + return + } + go handleEchoConnection(conn, serverSideCh) + } +} + +func handleEchoConnection(conn net.Conn, connSideCh chan<- []byte) { + _, err := copyWithSideChan(conn, conn, connSideCh) + + if err != nil { + log.Warning(context.TODO(), err) + } +} + +// copyWithSideChan is like io.Copy(), but also takes a channel on which data +// read from src is sent before being written to dst. +func copyWithSideChan(dst io.Writer, src io.Reader, ch chan<- []byte) (written int64, err error) { + buf := make([]byte, 32*1024) + for { + nr, er := src.Read(buf) + if nr > 0 { + if ch != nil { + ch <- buf[:nr] + } + + nw, ew := dst.Write(buf[0:nr]) + if nw > 0 { + written += int64(nw) + } + if ew != nil { + err = ew + break + } + if nr != nw { + err = io.ErrShortWrite + break + } + } + if er != nil { + if er != io.EOF { + err = er + } + break + } + } + return written, err +} + +func TestPartitionableConnBasic(t *testing.T) { + defer leaktest.AfterTest(t)() + addr := util.TestAddr + ln, err := net.Listen(addr.Network(), addr.String()) + if err != nil { + t.Fatal(err) + } + go func() { + RunEchoServer(ln, nil) + }() + defer func() { + netutil.FatalIfUnexpected(ln.Close()) + }() + + serverConn, err := net.Dial("tcp", ln.Addr().String()) + if err != nil { + t.Fatal(err) + } + + pConn := NewPartitionableConn(serverConn) + + exp := "let's see if this value comes back\n" + fmt.Fprintf(pConn, exp) + got, err := bufio.NewReader(pConn).ReadString('\n') + if err != nil { + t.Fatal(err) + } + if got != exp { + t.Fatalf("expecting: %q , got %q", exp, got) + } + pConn.Close() +} + +func TestPartitionableConnPartitionC2S(t *testing.T) { + defer leaktest.AfterTest(t)() + if testing.Short() { + t.Skip("short flag") + } + + addr := util.TestAddr + ln, err := net.Listen(addr.Network(), addr.String()) + if err != nil { + t.Fatal(err) + } + serverSideCh := make(chan []byte) + go func() { + RunEchoServer(ln, serverSideCh) + }() + defer func() { + netutil.FatalIfUnexpected(ln.Close()) + }() + + serverConn, err := net.Dial("tcp", ln.Addr().String()) + if err != nil { + t.Fatal(err) + } + + pConn := NewPartitionableConn(serverConn) + + // Partition the client->server connection. Afterwards, we're going to send + // something and assert that the server doesn't get it (within a timeout) by + // snooping on the server's side channel. Then we'll resolve the partition and + // expect that the server gets the message that was pending and echoes it + // back. + + pConn.PartitionC2S() + + // Client sends data. + exp := "let's see when this value comes back\n" + fmt.Fprintf(pConn, exp) + + // In the background, the client waits on a read. + clientDoneCh := make(chan error) + go func() { + got, err := bufio.NewReader(pConn).ReadString('\n') + if err != nil { + clientDoneCh <- err + return + } + if got != exp { + clientDoneCh <- errors.Errorf("expecting: %q , got %q", exp, got) + return + } + clientDoneCh <- nil + }() + + timerDoneCh := make(chan error) + time.AfterFunc(3*time.Millisecond, func() { + var err error + select { + case err = <-clientDoneCh: + err = errors.Errorf("unexpected reply while partitioned: %s", err) + case buf := <-serverSideCh: + err = errors.Errorf("server was not supposed to have received data while partitioned: %q", buf) + default: + } + timerDoneCh <- err + }) + + if err := <-timerDoneCh; err != nil { + t.Fatal(err) + } + + // Now unpartition and expect the pending data to be sent and a reply to be + // received. + + pConn.UnpartitionC2S() + + // Expect the server to receive the data. + <-serverSideCh + + if err := <-clientDoneCh; err != nil { + t.Fatal(err) + } + + pConn.Close() +} + +func TestPartitionableConnPartitionS2C(t *testing.T) { + defer leaktest.AfterTest(t)() + if testing.Short() { + t.Skip("short flag") + } + + addr := util.TestAddr + ln, err := net.Listen(addr.Network(), addr.String()) + if err != nil { + t.Fatal(err) + } + serverSideCh := make(chan []byte) + go func() { + RunEchoServer(ln, serverSideCh) + }() + defer func() { + netutil.FatalIfUnexpected(ln.Close()) + }() + + serverConn, err := net.Dial("tcp", ln.Addr().String()) + if err != nil { + t.Fatal(err) + } + + pConn := NewPartitionableConn(serverConn) + + // We're going to partition the server->client connection. Then we'll send + // some data and assert that the server gets it (by snooping on the server's + // side-channel). Then we'll assert that the client doesn't get the reply + // (with a timeout). Then we resolve the partition and assert that the client + // gets the reply. + + pConn.PartitionS2C() + + // Client sends data. + exp := "let's see when this value comes back\n" + fmt.Fprintf(pConn, exp) + + if s := <-serverSideCh; string(s) != exp { + t.Fatalf("expected server to receive %q, got %q", exp, s) + } + + // In the background, the client waits on a read. + clientDoneCh := make(chan error) + go func() { + got, err := bufio.NewReader(pConn).ReadString('\n') + if err != nil { + clientDoneCh <- err + return + } + if got != exp { + clientDoneCh <- errors.Errorf("expecting: %q , got %q", exp, got) + return + } + clientDoneCh <- nil + }() + + // Check that the client does not get the server's response. + time.AfterFunc(3*time.Millisecond, func() { + select { + case err := <-clientDoneCh: + t.Fatalf("unexpected reply while partitioned: %s", err) + default: + } + }) + + // Now unpartition and expect the pending data to be sent and a reply to be + // received. + + pConn.UnpartitionS2C() + + if err := <-clientDoneCh; err != nil { + t.Fatal(err) + } + + pConn.Close() +}