Skip to content

Commit

Permalink
vendor: upgrade grpc from 1.13.0 to 1.21.2
Browse files Browse the repository at this point in the history
This PR upgrades gRPC from 1.13.0 to 1.21.2. The primary motivation for this
upgrade is to eliminate the disconnections caused by
grpc/grpc-go#1882. These failures manifest themselves
as the following set of errors:

```
ajwerner-test-0001> I190722 22:15:01.203008 12054 vendor/github.com/cockroachdb/circuitbreaker/circuitbreaker.go:322  [n1] circuitbreaker: rpc [::]:26257 [n2] tripped: failed to check for ready connection to n2 at ajwerner-test-0002:26257: connection not ready: TRANSIENT_FAILURE
```
Which then lead to tripped breakers and general badness. I suspect that there
are several other good bug fixes in here, including some purported leaks and
correctness fixes on shutdown.

I have verified that with this upgrade I no longer see connections break in
overload scenarios which reliably reproduced the situation in the above log.

This commit removes one condition from grpcutil.IsClosedConnection which should
be subsumed by the status check above. The `transport` subpackage has not been
around for many releases.

This does not upgrade to the current release 1.22.0 because the maintainer
mentions that it contains a bug
(grpc/grpc-go#2663 (comment)).

This change also unfortunately updates the keepalive behavior to be more spec
compliant (grpc/grpc-go#2642). This change mandates
a minimum ping time of 10s to the client. Given grpc/grpc-go#2638
this means that the rpc test for keepalives now takes over 20s.

I would be okay skipping it but leave that discussion for review.

Also updated the acceptance test to look out for an HTTP/2.0 header because
grpc now does not send RPCs until after the HTTP handshake has completed
(see grpc/grpc-go#2406).

Release note (bug fix): Upgrade grpc library to fix connection state management
bug.
  • Loading branch information
ajwerner committed Jul 25, 2019
1 parent a678fea commit 172255f
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 45 deletions.
16 changes: 11 additions & 5 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ ignored = [
name = "google.golang.org/genproto"
branch = "master"

[[constraint]]
name = "google.golang.org/grpc"
version = "=v1.21.2"

[prune]
go-tests = true
unused-packages = true
Expand Down
5 changes: 3 additions & 2 deletions pkg/cli/interactive_tests/netcat.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@

while True:
c = client_socket.recv(1)
sys.stdout.write("%c" % c)
sys.stdout.flush()
if c:
sys.stdout.write("%c" % c)
sys.stdout.flush()
4 changes: 2 additions & 2 deletions pkg/cli/interactive_tests/test_error_hints.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ eexpect "ready"
set spawn_id $client_spawn_id
send "$argv quit --insecure\r"
eexpect "insecure\r\n"
# In the first shell, stop the server.
# Wait to see an HTTP/2.0 header on the fake server, then stop the server.
set spawn_id $shell_spawn_id
eexpect "connected"
eexpect ":26257"
eexpect "PRI * HTTP/2.0"
interrupt
eexpect ":/# "
# Check that cockroach quit becomes suitably confused.
Expand Down
9 changes: 3 additions & 6 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,23 +682,21 @@ func init() {
// ensures that our initial heartbeat (and its version/clusterID
// validation) occurs on every new connection.
type onlyOnceDialer struct {
ctx context.Context
syncutil.Mutex
dialed bool
closed bool
redialChan chan struct{}
}

func (ood *onlyOnceDialer) dial(addr string, timeout time.Duration) (net.Conn, error) {
func (ood *onlyOnceDialer) dial(ctx context.Context, addr string) (net.Conn, error) {
ood.Lock()
defer ood.Unlock()
if !ood.dialed {
ood.dialed = true
dialer := net.Dialer{
Timeout: timeout,
LocalAddr: sourceAddr,
}
return dialer.DialContext(ood.ctx, "tcp", addr)
return dialer.DialContext(ctx, "tcp", addr)
} else if !ood.closed {
ood.closed = true
close(ood.redialChan)
Expand Down Expand Up @@ -726,10 +724,9 @@ func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, <-chan struct{
grpc.WithInitialConnWindowSize(initialConnWindowSize))

dialer := onlyOnceDialer{
ctx: ctx.masterCtx,
redialChan: make(chan struct{}),
}
dialOpts = append(dialOpts, grpc.WithDialer(dialer.dial))
dialOpts = append(dialOpts, grpc.WithContextDialer(dialer.dial))

// add testingDialOpts after our dialer because one of our tests
// uses a custom dialer (this disables the only-one-connection
Expand Down
74 changes: 50 additions & 24 deletions pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1041,8 +1041,8 @@ func grpcRunKeepaliveTestCase(testCtx context.Context, c grpcKeepaliveTestCase)
// PartitionableConns. We'll partition the first opened connection.
dialerCh := make(chan *testutils.PartitionableConn, 1)
clientCtx.AddTestingDialOpts(
grpc.WithDialer(
func(addr string, timeout time.Duration) (net.Conn, error) {
grpc.WithContextDialer(
func(_ context.Context, addr string) (net.Conn, error) {
if !atomic.CompareAndSwapInt32(&firstConn, 1, 0) {
// If we allow gRPC to open a 2nd transport connection, then our RPCs
// might succeed if they're sent on that one. In the spirit of a
Expand All @@ -1052,7 +1052,7 @@ func grpcRunKeepaliveTestCase(testCtx context.Context, c grpcKeepaliveTestCase)
return nil, errors.Errorf("No more connections for you. We're partitioned.")
}

conn, err := net.DialTimeout("tcp", addr, timeout)
conn, err := net.Dial("tcp", addr)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1116,10 +1116,8 @@ func grpcRunKeepaliveTestCase(testCtx context.Context, c grpcKeepaliveTestCase)
// Now partition either client->server, server->client, or both, and attempt
// to perform an RPC. We expect it to fail once the grpc keepalive fails to
// get a response from the server.

transportConn := <-dialerCh
defer transportConn.Finish()

if c.partitionC2S {
log.Infof(ctx, "partition C2S")
transportConn.PartitionC2S()
Expand All @@ -1129,38 +1127,65 @@ func grpcRunKeepaliveTestCase(testCtx context.Context, c grpcKeepaliveTestCase)
transportConn.PartitionS2C()
}

// We want to start a goroutine that keeps trying to send requests and reports
// the error from the send call. In cases where there are no keep-alives this
// request may get blocked if flow control blocks it.
errChan := make(chan error)
sendCtx, cancel := context.WithCancel(ctx)
r := retry.StartWithCtx(sendCtx, retry.Options{
InitialBackoff: 10 * time.Millisecond,
MaxBackoff: 500 * time.Millisecond,
})
defer cancel()
go func() {
for r.Next() {
err := heartbeatClient.Send(&request)
isClosed := err != nil && grpcutil.IsClosedConnection(err)
log.Infof(ctx, "heartbeat Send got error %+v (closed=%v)", err, isClosed)
select {
case errChan <- err:
case <-sendCtx.Done():
return
}
if isClosed {
return
}
}
}()
// Check whether the connection eventually closes. We may need to
// adjust this duration if the test gets flaky.
const retryDur = 3 * time.Second
errNotClosed := errors.New("conn not closed")
closedErr := retry.ForDuration(retryDur, func() error {
err := heartbeatClient.Send(&request)
if err == nil {
log.Infof(ctx, "expected send error, got no error")
return errNotClosed
}
if !grpcutil.IsClosedConnection(err) {
newErr := fmt.Errorf("expected closed connection error, found %v", err)
log.Infof(ctx, "%+v", newErr)
return newErr
// This unfortunately massive amount of time is required due to gRPC's
// minimum timeout of 10s and the below issue whereby keepalives are sent
// at half the expected rate.
// https://github.com/grpc/grpc-go/issues/2638
const timeoutDur = 21 * time.Second
timeout := time.After(timeoutDur)
// sendErr will hold the last error we saw from an attempt to send a
// heartbeat. Initialize it with a dummy error which will fail the test if
// it is not overwritten.
sendErr := fmt.Errorf("not a real error")
for done := false; !done; {
select {
case <-timeout:
cancel()
done = true
case sendErr = <-errChan:
}
return nil
})
}
if c.expClose {
if closedErr != nil {
newErr := fmt.Errorf("expected closed connection, found %v", closedErr)
if sendErr == nil || !grpcutil.IsClosedConnection(sendErr) {
newErr := fmt.Errorf("expected closed connection, found %v", sendErr)
log.Infof(ctx, "%+v", newErr)
return newErr
}
} else {
if closedErr != errNotClosed {
newErr := fmt.Errorf("expected unclosed connection, found %v", closedErr)
if sendErr != nil {
newErr := fmt.Errorf("expected unclosed connection, found %v", sendErr)
log.Infof(ctx, "%+v", newErr)
return newErr
}
}

log.Infof(ctx, "test done")
// If the DialOptions we passed to gRPC didn't prevent it from opening new
// connections, then next RPCs would succeed since gRPC reconnects the
// transport (and that would succeed here since we've only partitioned one
Expand All @@ -1169,6 +1194,7 @@ func grpcRunKeepaliveTestCase(testCtx context.Context, c grpcKeepaliveTestCase)
// the (application-level) heartbeats performed by rpc.Context, but the
// behavior of our heartbeats in the face of transport failures is
// sufficiently tested in TestHeartbeatHealthTransport.
log.Infof(ctx, "test done")
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/rpc/stats_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/netutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -168,6 +169,8 @@ func TestStatsHandlerWithHeartbeats(t *testing.T) {
if s, c := serverVal.(*Stats).Outgoing(), clientVal.(*Stats).Incoming(); s == 0 || c == 0 || s > c {
return fmt.Errorf("expected server.outgoing < client.incoming; got %d, %d", s, c)
}
log.Infof(context.TODO(), "server incoming = %v, server outgoing = %v, client incoming = %v, client outgoing = %v",
serverVal.(*Stats).Incoming(), serverVal.(*Stats).Outgoing(), clientVal.(*Stats).Incoming(), clientVal.(*Stats).Outgoing())
return nil
})
}
2 changes: 1 addition & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,7 @@ func (s *Server) Start(ctx context.Context) error {
}
conn, err := grpc.DialContext(ctx, s.cfg.AdvertiseAddr, append(
dialOpts,
grpc.WithDialer(func(string, time.Duration) (net.Conn, error) {
grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) {
return c2, nil
}),
)...)
Expand Down
4 changes: 0 additions & 4 deletions pkg/util/grpcutil/grpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/status"
"google.golang.org/grpc/transport"
)

// ErrCannotReuseClientConn is returned when a failed connection is
Expand Down Expand Up @@ -64,9 +63,6 @@ func IsClosedConnection(err error) bool {
strings.Contains(err.Error(), "node unavailable") {
return true
}
if streamErr, ok := err.(transport.StreamError); ok && streamErr.Code == codes.Canceled {
return true
}
return netutil.IsClosedConnection(err)
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/util/grpcutil/grpc_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func (hs healthServer) Check(
return nil, errors.New("no one should see this")
}

func (hs healthServer) Watch(*healthpb.HealthCheckRequest, healthpb.Health_WatchServer) error {
panic("not implemented")
}

func TestRequestDidNotStart(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
2 changes: 1 addition & 1 deletion vendor
Submodule vendor updated 92 files
+28 −13 google.golang.org/grpc/.travis.yml
+41 −17 google.golang.org/grpc/CONTRIBUTING.md
+36 −24 google.golang.org/grpc/Makefile
+88 −12 google.golang.org/grpc/README.md
+1 −1 google.golang.org/grpc/backoff.go
+1 −26 google.golang.org/grpc/balancer.go
+82 −10 google.golang.org/grpc/balancer/balancer.go
+22 −52 google.golang.org/grpc/balancer/base/balancer.go
+12 −0 google.golang.org/grpc/balancer/base/base.go
+10 −6 google.golang.org/grpc/balancer/roundrobin/roundrobin.go
+35 −20 google.golang.org/grpc/balancer_conn_wrappers.go
+44 −38 google.golang.org/grpc/balancer_v1_wrapper.go
+900 −0 google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go
+9 −28 google.golang.org/grpc/call.go
+676 −855 google.golang.org/grpc/clientconn.go
+2 −1 google.golang.org/grpc/codes/codes.go
+3 −2 google.golang.org/grpc/connectivity/connectivity.go
+125 −7 google.golang.org/grpc/credentials/credentials.go
+0 −60 google.golang.org/grpc/credentials/credentials_util_go17.go
+0 −57 google.golang.org/grpc/credentials/credentials_util_pre_go17.go
+61 −0 google.golang.org/grpc/credentials/internal/syscallconn.go
+10 −9 google.golang.org/grpc/credentials/internal/syscallconn_appengine.go
+8 −16 google.golang.org/grpc/credentials/tls13.go
+558 −0 google.golang.org/grpc/dialoptions.go
+3 −3 google.golang.org/grpc/encoding/encoding.go
+20 −0 google.golang.org/grpc/go.mod
+35 −0 google.golang.org/grpc/go.sum
+0 −70 google.golang.org/grpc/go16.go
+0 −71 google.golang.org/grpc/go17.go
+1 −1 google.golang.org/grpc/grpclog/grpclog.go
+122 −22 google.golang.org/grpc/health/grpc_health_v1/health.pb.go
+6 −0 google.golang.org/grpc/install_gae.sh
+1 −1 google.golang.org/grpc/interceptor.go
+46 −0 google.golang.org/grpc/internal/balancerload/load.go
+167 −0 google.golang.org/grpc/internal/binarylog/binarylog.go
+42 −0 google.golang.org/grpc/internal/binarylog/binarylog_testutil.go
+210 −0 google.golang.org/grpc/internal/binarylog/env_config.go
+423 −0 google.golang.org/grpc/internal/binarylog/method_logger.go
+33 −0 google.golang.org/grpc/internal/binarylog/regenerate.sh
+162 −0 google.golang.org/grpc/internal/binarylog/sink.go
+17 −18 google.golang.org/grpc/internal/binarylog/util.go
+193 −39 google.golang.org/grpc/internal/channelz/funcs.go
+301 −17 google.golang.org/grpc/internal/channelz/types.go
+53 −0 google.golang.org/grpc/internal/channelz/types_linux.go
+44 −0 google.golang.org/grpc/internal/channelz/types_nonlinux.go
+16 −14 google.golang.org/grpc/internal/channelz/util_linux.go
+7 −9 google.golang.org/grpc/internal/channelz/util_nonlinux.go
+64 −0 google.golang.org/grpc/internal/envconfig/envconfig.go
+61 −0 google.golang.org/grpc/internal/grpcsync/event.go
+28 −10 google.golang.org/grpc/internal/internal.go
+114 −0 google.golang.org/grpc/internal/syscall/syscall_linux.go
+73 −0 google.golang.org/grpc/internal/syscall/syscall_nonlinux.go
+4 −3 google.golang.org/grpc/internal/transport/bdp_estimator.go
+74 −18 google.golang.org/grpc/internal/transport/controlbuf.go
+49 −0 google.golang.org/grpc/internal/transport/defaults.go
+0 −24 google.golang.org/grpc/internal/transport/flowcontrol.go
+17 −38 google.golang.org/grpc/internal/transport/handler_server.go
+230 −130 google.golang.org/grpc/internal/transport/http2_client.go
+205 −133 google.golang.org/grpc/internal/transport/http2_server.go
+145 −61 google.golang.org/grpc/internal/transport/http_util.go
+0 −6 google.golang.org/grpc/internal/transport/log.go
+136 −81 google.golang.org/grpc/internal/transport/transport.go
+42 −22 google.golang.org/grpc/keepalive/keepalive.go
+1 −2 google.golang.org/grpc/metadata/metadata.go
+7 −4 google.golang.org/grpc/naming/dns_resolver.go
+0 −34 google.golang.org/grpc/naming/go17.go
+1 −1 google.golang.org/grpc/naming/naming.go
+1 −1 google.golang.org/grpc/peer/peer.go
+27 −166 google.golang.org/grpc/picker_wrapper.go
+3 −1 google.golang.org/grpc/pickfirst.go
+64 −0 google.golang.org/grpc/preloader.go
+37 −15 google.golang.org/grpc/proxy.go
+115 −39 google.golang.org/grpc/resolver/dns/dns_resolver.go
+0 −35 google.golang.org/grpc/resolver/dns/go17.go
+1 −1 google.golang.org/grpc/resolver/passthrough/passthrough.go
+36 −2 google.golang.org/grpc/resolver/resolver.go
+59 −52 google.golang.org/grpc/resolver_conn_wrapper.go
+170 −48 google.golang.org/grpc/rpc_util.go
+291 −252 google.golang.org/grpc/server.go
+157 −17 google.golang.org/grpc/service_config.go
+1 −2 google.golang.org/grpc/stats/handlers.go
+5 −1 google.golang.org/grpc/stats/stats.go
+0 −44 google.golang.org/grpc/status/go17.go
+23 −2 google.golang.org/grpc/status/status.go
+0 −97 google.golang.org/grpc/stickiness_linkedmap.go
+988 −254 google.golang.org/grpc/stream.go
+1 −1 google.golang.org/grpc/tap/tap.go
+13 −0 google.golang.org/grpc/trace.go
+0 −51 google.golang.org/grpc/transport/go16.go
+0 −52 google.golang.org/grpc/transport/go17.go
+1 −1 google.golang.org/grpc/version.go
+84 −49 google.golang.org/grpc/vet.sh

0 comments on commit 172255f

Please sign in to comment.