Skip to content

Commit

Permalink
Merge pull request #10981 from gyuho/deprecate-grpc-clientconnclosing
Browse files Browse the repository at this point in the history
Deprecate "grpc.ErrClientConnClosing"
  • Loading branch information
gyuho authored Aug 5, 2019
2 parents 4bf5848 + a0cabb5 commit a494e06
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 25 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG-3.4.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.3.0...v3.4.0) and
- v3.5 will deprecate `etcd --debug` flag in favor of `etcd --log-level=debug`.
- Change v3 `etcdctl snapshot` exit codes with [`snapshot` package](https://github.com/etcd-io/etcd/pull/9118/commits/df689f4280e1cce4b9d61300be13ca604d41670a).
- Exit on error with exit code 1 (no more exit code 5 or 6 on `snapshot save/restore` commands).
- Deprecated [`grpc.ErrClientConnClosing`](https://github.com/etcd-io/etcd/pull/10981).
- `clientv3` and `proxy/grpcproxy` now does not return `grpc.ErrClientConnClosing`.
- `grpc.ErrClientConnClosing` has been [deprecated in gRPC >= 1.10](https://github.com/grpc/grpc-go/pull/1854).
- Use `clientv3.IsConnCanceled(error)` or `google.golang.org/grpc/status.FromError(error)` instead.
- Migrate dependency management tool from `glide` to [`golang/dep`](https://github.com/etcd-io/etcd/pull/9155).
- <= 3.3 puts `vendor` directory under `cmd/vendor` directory to [prevent conflicting transitive dependencies](https://github.com/etcd-io/etcd/issues/4913).
- 3.4 moves `cmd/vendor` directory to `vendor` at repository root.
Expand Down Expand Up @@ -449,6 +453,10 @@ Note: **v3.5 will deprecate `etcd --log-package-levels` flag for `capnslog`**; `
- `PermitWithoutStream` is set to false by default.
- Fix logic on [release lock key if cancelled](https://github.com/etcd-io/etcd/pull/10153) in `clientv3/concurrency` package.
- Fix [`(*Client).Endpoints()` method race condition](https://github.com/etcd-io/etcd/pull/10595).
- Deprecated [`grpc.ErrClientConnClosing`](https://github.com/etcd-io/etcd/pull/10981).
- `clientv3` and `proxy/grpcproxy` now does not return `grpc.ErrClientConnClosing`.
- `grpc.ErrClientConnClosing` has been [deprecated in gRPC >= 1.10](https://github.com/grpc/grpc-go/pull/1854).
- Use `clientv3.IsConnCanceled(error)` or `google.golang.org/grpc/status.FromError(error)` instead.

### etcdctl v3

Expand Down
23 changes: 23 additions & 0 deletions Documentation/upgrades/upgrade_3_4.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,29 @@ OK
+etcd --peer-trusted-ca-file ca-peer.crt
```

#### Deprecated `grpc.ErrClientConnClosing` error

`grpc.ErrClientConnClosing` has been [deprecated in gRPC >= 1.10](https://github.com/grpc/grpc-go/pull/1854).

```diff
import (
+ "go.etcd.io/etcd/clientv3"

"google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)

_, err := kvc.Get(ctx, "a")
-if err == grpc.ErrClientConnClosing {
+if clientv3.IsConnCanceled(err) {

// or
+s, ok := status.FromError(err)
+if ok {
+ if s.Code() == codes.Canceled
```

#### Deprecating `etcd_debugging_mvcc_db_total_size_in_bytes` Prometheus metrics

v3.4 promotes `etcd_debugging_mvcc_db_total_size_in_bytes` Prometheus metrics to `etcd_mvcc_db_total_size_in_bytes`, in order to encourage etcd storage monitoring.
Expand Down
19 changes: 11 additions & 8 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,10 +587,13 @@ func isUnavailableErr(ctx context.Context, err error) bool {
if err == nil {
return false
}
ev, _ := status.FromError(err)
// Unavailable codes mean the system will be right back.
// (e.g., can't connect, lost leader)
return ev.Code() == codes.Unavailable
ev, ok := status.FromError(err)
if ok {
// Unavailable codes mean the system will be right back.
// (e.g., can't connect, lost leader)
return ev.Code() == codes.Unavailable
}
return false
}

func toErr(ctx context.Context, err error) error {
Expand All @@ -610,9 +613,6 @@ func toErr(ctx context.Context, err error) error {
if ctx.Err() != nil {
err = ctx.Err()
}
case codes.Unavailable:
case codes.FailedPrecondition:
err = grpc.ErrClientConnClosing
}
}
return err
Expand All @@ -632,16 +632,19 @@ func IsConnCanceled(err error) bool {
if err == nil {
return false
}
// >= gRPC v1.10.x

// >= gRPC v1.23.x
s, ok := status.FromError(err)
if ok {
// connection is canceled or server has already closed the connection
return s.Code() == codes.Canceled || s.Message() == "transport is closing"
}

// >= gRPC v1.10.x
if err == context.Canceled {
return true
}

// <= gRPC v1.7.x returns 'errors.New("grpc: the client connection is closing")'
return strings.Contains(err.Error(), "grpc: the client connection is closing")
}
2 changes: 1 addition & 1 deletion clientv3/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
// // with etcd clientv3 <= v3.3
// if err == context.Canceled {
// // grpc balancer calls 'Get' with an inflight client.Close
// } else if err == grpc.ErrClientConnClosing {
// } else if err == grpc.ErrClientConnClosing { // <= gRCP v1.7.x
// // grpc balancer calls 'Get' after client.Close.
// }
// // with etcd clientv3 >= v3.4
Expand Down
4 changes: 2 additions & 2 deletions clientv3/integration/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ func TestKVGetErrConnClosed(t *testing.T) {
defer close(donec)
_, err := cli.Get(context.TODO(), "foo")
if !clientv3.IsConnCanceled(err) {
t.Errorf("expected %v or %v, got %v", context.Canceled, grpc.ErrClientConnClosing, err)
t.Errorf("expected %v, got %v", context.Canceled, err)
}
}()

Expand All @@ -490,7 +490,7 @@ func TestKVNewAfterClose(t *testing.T) {
go func() {
_, err := cli.Get(context.TODO(), "foo")
if !clientv3.IsConnCanceled(err) {
t.Errorf("expected %v or %v, got %v", context.Canceled, grpc.ErrClientConnClosing, err)
t.Errorf("expected %v, got %v", context.Canceled, err)
}
close(donec)
}()
Expand Down
9 changes: 3 additions & 6 deletions clientv3/integration/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.etcd.io/etcd/integration"
"go.etcd.io/etcd/pkg/testutil"

"google.golang.org/grpc"
)

func TestLeaseNotFoundError(t *testing.T) {
Expand Down Expand Up @@ -300,9 +298,8 @@ func TestLeaseGrantErrConnClosed(t *testing.T) {
defer close(donec)
_, err := cli.Grant(context.TODO(), 5)
if !clientv3.IsConnCanceled(err) {
// grpc.ErrClientConnClosing if grpc-go balancer calls 'Get' after client.Close.
// context.Canceled if grpc-go balancer calls 'Get' with an inflight client.Close.
t.Errorf("expected %v, %v or server unavailable, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err)
t.Errorf("expected %v, or server unavailable, got %v", context.Canceled, err)
}
}()

Expand Down Expand Up @@ -372,7 +369,7 @@ func TestLeaseGrantNewAfterClose(t *testing.T) {
go func() {
_, err := cli.Grant(context.TODO(), 5)
if !clientv3.IsConnCanceled(err) {
t.Errorf("expected %v, %v or server unavailable, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err)
t.Errorf("expected %v or server unavailable, got %v", context.Canceled, err)
}
close(donec)
}()
Expand Down Expand Up @@ -405,7 +402,7 @@ func TestLeaseRevokeNewAfterClose(t *testing.T) {
go func() {
_, err := cli.Revoke(context.TODO(), leaseID)
if !clientv3.IsConnCanceled(err) {
t.Fatalf("expected %v, %v or server unavailable, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err)
t.Fatalf("expected %v or server unavailable, got %v", context.Canceled, err)
}
close(donec)
}()
Expand Down
3 changes: 0 additions & 3 deletions functional/tester/stresser_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,6 @@ func (s *keyStresser) isRetryableError(err error) bool {
case context.Canceled.Error():
// from stresser.Cancel method:
return false
case grpc.ErrClientConnClosing.Error():
// from stresser.Cancel method:
return false
}

if status.Convert(err).Code() == codes.Unavailable {
Expand Down
4 changes: 3 additions & 1 deletion proxy/grpcproxy/adapter/chan_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"context"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

// chanServerStream implements grpc.ServerStream with a chanStream
Expand Down Expand Up @@ -120,7 +122,7 @@ func (s *chanStream) RecvMsg(m interface{}) error {
select {
case msg, ok := <-s.recvc:
if !ok {
return grpc.ErrClientConnClosing
return status.Error(codes.Canceled, "the client connection is closing")
}
if err, ok := msg.(error); ok {
return err
Expand Down
4 changes: 3 additions & 1 deletion proxy/grpcproxy/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

type leaseProxy struct {
Expand Down Expand Up @@ -214,7 +216,7 @@ func (lp *leaseProxy) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error
case <-lostLeaderC:
return rpctypes.ErrNoLeader
case <-lp.leader.disconnectNotify():
return grpc.ErrClientConnClosing
return status.Error(codes.Canceled, "the client connection is closing")
default:
if err != nil {
return err
Expand Down
7 changes: 4 additions & 3 deletions proxy/grpcproxy/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

type watchProxy struct {
Expand Down Expand Up @@ -80,7 +81,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
wp.mu.Unlock()
select {
case <-wp.leader.disconnectNotify():
return grpc.ErrClientConnClosing
return status.Error(codes.Canceled, "the client connection is closing")
default:
return wp.ctx.Err()
}
Expand Down Expand Up @@ -153,7 +154,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
case <-lostLeaderC:
return rpctypes.ErrNoLeader
case <-wp.leader.disconnectNotify():
return grpc.ErrClientConnClosing
return status.Error(codes.Canceled, "the client connection is closing")
default:
return wps.ctx.Err()
}
Expand Down

0 comments on commit a494e06

Please sign in to comment.