diff --git a/clientv3/auth.go b/clientv3/auth.go index 9d981cfb1af..4d971110198 100644 --- a/clientv3/auth.go +++ b/clientv3/auth.go @@ -116,32 +116,32 @@ func NewAuth(c *Client) Auth { } func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) { - resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}) + resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, grpc.FailFast(false)) return (*AuthEnableResponse)(resp), toErr(ctx, err) } func (auth *auth) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) { - resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}) + resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}, grpc.FailFast(false)) return (*AuthDisableResponse)(resp), toErr(ctx, err) } func (auth *auth) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) { - resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password}) + resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password}, grpc.FailFast(false)) return (*AuthUserAddResponse)(resp), toErr(ctx, err) } func (auth *auth) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) { - resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}) + resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}, grpc.FailFast(false)) return (*AuthUserDeleteResponse)(resp), toErr(ctx, err) } func (auth *auth) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) { - resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}) + resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}, grpc.FailFast(false)) return (*AuthUserChangePasswordResponse)(resp), toErr(ctx, err) } func (auth *auth) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) { - resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}) + resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}, grpc.FailFast(false)) return (*AuthUserGrantRoleResponse)(resp), toErr(ctx, err) } @@ -156,12 +156,12 @@ func (auth *auth) UserList(ctx context.Context) (*AuthUserListResponse, error) { } func (auth *auth) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) { - resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}) + resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}, grpc.FailFast(false)) return (*AuthUserRevokeRoleResponse)(resp), toErr(ctx, err) } func (auth *auth) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) { - resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}) + resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}, grpc.FailFast(false)) return (*AuthRoleAddResponse)(resp), toErr(ctx, err) } @@ -186,12 +186,12 @@ func (auth *auth) RoleList(ctx context.Context) (*AuthRoleListResponse, error) { } func (auth *auth) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) { - resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd}) + resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd}, grpc.FailFast(false)) return (*AuthRoleRevokePermissionResponse)(resp), toErr(ctx, err) } func (auth *auth) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) { - resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}) + resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}, grpc.FailFast(false)) return (*AuthRoleDeleteResponse)(resp), toErr(ctx, err) } diff --git a/clientv3/balancer.go b/clientv3/balancer.go index b484b97561c..2a55ca83ad8 100644 --- a/clientv3/balancer.go +++ b/clientv3/balancer.go @@ -15,6 +15,7 @@ package clientv3 import ( + "errors" "net/url" "strings" "sync" @@ -23,6 +24,11 @@ import ( "google.golang.org/grpc" ) +// ErrNoAddrAvilable is returned by Get() when the balancer does not have +// any active connection to endpoints at the time. +// This error is returned only when opts.BlockingWait is true. +var ErrNoAddrAvilable = errors.New("there is no address available") + // simpleBalancer does the bare minimum to expose multiple eps // to the grpc reconnection code path type simpleBalancer struct { @@ -162,6 +168,20 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) { func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) { var addr string + + // If opts.BlockingWait is false (for fail-fast RPCs), it should return + // an address it has notified via Notify immediately instead of blocking. + if !opts.BlockingWait { + b.mu.RLock() + addr = b.pinAddr + upEps := len(b.upEps) + b.mu.RUnlock() + if upEps == 0 { + return grpc.Address{Addr: ""}, nil, ErrNoAddrAvilable + } + return grpc.Address{Addr: addr}, func() {}, nil + } + for { b.mu.RLock() ch := b.upc diff --git a/clientv3/balancer_test.go b/clientv3/balancer_test.go new file mode 100644 index 00000000000..1ac6154d1aa --- /dev/null +++ b/clientv3/balancer_test.go @@ -0,0 +1,106 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3 + +import ( + "errors" + "testing" + "time" + + "golang.org/x/net/context" + "google.golang.org/grpc" +) + +var ( + endpoints = []string{"localhost:2379", "localhost:22379", "localhost:32379"} +) + +func TestBalancerGetUnblocking(t *testing.T) { + sb := newSimpleBalancer(endpoints) + unblockingOpts := grpc.BalancerGetOptions{BlockingWait: false} + + _, _, err := sb.Get(context.Background(), unblockingOpts) + if err != ErrNoAddrAvilable { + t.Errorf("Get() with no up endpoints should return ErrNoAddrAvailable, got: %v", err) + } + + down1 := sb.Up(grpc.Address{Addr: endpoints[1]}) + down2 := sb.Up(grpc.Address{Addr: endpoints[2]}) + addrFirst, putFun, err := sb.Get(context.Background(), unblockingOpts) + if err != nil { + t.Errorf("Get() with up endpoints should sucess, got %v", err) + } + if addrFirst.Addr != endpoints[1] && addrFirst.Addr != endpoints[2] { + t.Errorf("Get() didn't return expected address, got %v", addrFirst) + } + if putFun == nil { + t.Errorf("Get() returned unexpected nil put function") + } + addrSecond, _, _ := sb.Get(context.Background(), unblockingOpts) + if addrSecond.Addr != addrSecond.Addr { + t.Errorf("Get() didn't return the same address as previous call, got %v and %v", addrFirst, addrSecond) + } + + down1(errors.New("error")) + down2(errors.New("error")) + _, _, err = sb.Get(context.Background(), unblockingOpts) + if err != ErrNoAddrAvilable { + t.Errorf("Get() with no up endpoints should return ErrNoAddrAvailable, got: %v", err) + } +} + +func TestBalancerGetBlocking(t *testing.T) { + sb := newSimpleBalancer(endpoints) + blockingOpts := grpc.BalancerGetOptions{BlockingWait: true} + + ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*100) + _, _, err := sb.Get(ctx, blockingOpts) + if err != context.DeadlineExceeded { + t.Errorf("Get() with no up endpoints should timeout, got %v", err) + } + + downC := make(chan func(error), 1) + + go func() { + // ensure sb.Up() will be called after sb.Get() to see if Up() releases blocking Get() + time.Sleep(time.Millisecond * 100) + downC <- sb.Up(grpc.Address{Addr: endpoints[1]}) + }() + addrFirst, putFun, err := sb.Get(context.Background(), blockingOpts) + if err != nil { + t.Errorf("Get() with up endpoints should sucess, got %v", err) + } + if addrFirst.Addr != endpoints[1] { + t.Errorf("Get() didn't return expected address, got %v", addrFirst) + } + if putFun == nil { + t.Errorf("Get() returned unexpected nil put function") + } + down1 := <-downC + + down2 := sb.Up(grpc.Address{Addr: endpoints[2]}) + addrSecond, _, _ := sb.Get(context.Background(), blockingOpts) + if addrSecond.Addr != addrSecond.Addr { + t.Errorf("Get() didn't return the same address as previous call, got %v and %v", addrFirst, addrSecond) + } + + down1(errors.New("error")) + down2(errors.New("error")) + ctx, _ = context.WithTimeout(context.Background(), time.Millisecond*100) + _, _, err = sb.Get(ctx, blockingOpts) + if err != context.DeadlineExceeded { + t.Errorf("Get() with no up endpoints should timeout, got %v", err) + } +} diff --git a/clientv3/cluster.go b/clientv3/cluster.go index 8b981171b5a..d85f062b2cc 100644 --- a/clientv3/cluster.go +++ b/clientv3/cluster.go @@ -52,7 +52,7 @@ func NewCluster(c *Client) Cluster { func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) { r := &pb.MemberAddRequest{PeerURLs: peerAddrs} - resp, err := c.remote.MemberAdd(ctx, r) + resp, err := c.remote.MemberAdd(ctx, r, grpc.FailFast(false)) if err == nil { return (*MemberAddResponse)(resp), nil } @@ -64,7 +64,7 @@ func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAdd func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) { r := &pb.MemberRemoveRequest{ID: id} - resp, err := c.remote.MemberRemove(ctx, r) + resp, err := c.remote.MemberRemove(ctx, r, grpc.FailFast(false)) if err == nil { return (*MemberRemoveResponse)(resp), nil } @@ -78,7 +78,7 @@ func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []strin // it is safe to retry on update. for { r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs} - resp, err := c.remote.MemberUpdate(ctx, r) + resp, err := c.remote.MemberUpdate(ctx, r, grpc.FailFast(false)) if err == nil { return (*MemberUpdateResponse)(resp), nil } diff --git a/clientv3/kv.go b/clientv3/kv.go index 834b17d3413..1faa8f69077 100644 --- a/clientv3/kv.go +++ b/clientv3/kv.go @@ -148,14 +148,14 @@ func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) { case tPut: var resp *pb.PutResponse r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV} - resp, err = kv.remote.Put(ctx, r) + resp, err = kv.remote.Put(ctx, r, grpc.FailFast(false)) if err == nil { return OpResponse{put: (*PutResponse)(resp)}, nil } case tDeleteRange: var resp *pb.DeleteRangeResponse r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV} - resp, err = kv.remote.DeleteRange(ctx, r) + resp, err = kv.remote.DeleteRange(ctx, r, grpc.FailFast(false)) if err == nil { return OpResponse{del: (*DeleteResponse)(resp)}, nil } diff --git a/clientv3/lease.go b/clientv3/lease.go index ed8bb0a53f6..f1bc591f74e 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -148,7 +148,7 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err for { r := &pb.LeaseGrantRequest{TTL: ttl} - resp, err := l.remote.LeaseGrant(cctx, r) + resp, err := l.remote.LeaseGrant(cctx, r, grpc.FailFast(false)) if err == nil { gresp := &LeaseGrantResponse{ ResponseHeader: resp.GetHeader(), @@ -174,7 +174,7 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, for { r := &pb.LeaseRevokeRequest{ID: int64(id)} - resp, err := l.remote.LeaseRevoke(cctx, r) + resp, err := l.remote.LeaseRevoke(cctx, r, grpc.FailFast(false)) if err == nil { return (*LeaseRevokeResponse)(resp), nil @@ -195,7 +195,7 @@ func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption for { r := toLeaseTimeToLiveRequest(id, opts...) - resp, err := l.remote.LeaseTimeToLive(cctx, r) + resp, err := l.remote.LeaseTimeToLive(cctx, r, grpc.FailFast(false)) if err == nil { gresp := &LeaseTimeToLiveResponse{ ResponseHeader: resp.GetHeader(),