Skip to content
/ etcd Public
forked from etcd-io/etcd

Commit

Permalink
Merge pull request etcd-io#6656 from yudai/balancer_fast_fail
Browse files Browse the repository at this point in the history
clientv3: make balancer respect FastFail
  • Loading branch information
xiang90 authored Oct 17, 2016
2 parents 24c2841 + 6a33f0f commit 5c60478
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 18 deletions.
20 changes: 10 additions & 10 deletions clientv3/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,32 +116,32 @@ func NewAuth(c *Client) Auth {
}

func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) {
resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{})
resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, grpc.FailFast(false))
return (*AuthEnableResponse)(resp), toErr(ctx, err)
}

func (auth *auth) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) {
resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{})
resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}, grpc.FailFast(false))
return (*AuthDisableResponse)(resp), toErr(ctx, err)
}

func (auth *auth) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) {
resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password})
resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password}, grpc.FailFast(false))
return (*AuthUserAddResponse)(resp), toErr(ctx, err)
}

func (auth *auth) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) {
resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name})
resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}, grpc.FailFast(false))
return (*AuthUserDeleteResponse)(resp), toErr(ctx, err)
}

func (auth *auth) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) {
resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password})
resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}, grpc.FailFast(false))
return (*AuthUserChangePasswordResponse)(resp), toErr(ctx, err)
}

func (auth *auth) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) {
resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role})
resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}, grpc.FailFast(false))
return (*AuthUserGrantRoleResponse)(resp), toErr(ctx, err)
}

Expand All @@ -156,12 +156,12 @@ func (auth *auth) UserList(ctx context.Context) (*AuthUserListResponse, error) {
}

func (auth *auth) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) {
resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role})
resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}, grpc.FailFast(false))
return (*AuthUserRevokeRoleResponse)(resp), toErr(ctx, err)
}

func (auth *auth) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) {
resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name})
resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}, grpc.FailFast(false))
return (*AuthRoleAddResponse)(resp), toErr(ctx, err)
}

Expand All @@ -186,12 +186,12 @@ func (auth *auth) RoleList(ctx context.Context) (*AuthRoleListResponse, error) {
}

func (auth *auth) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) {
resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd})
resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd}, grpc.FailFast(false))
return (*AuthRoleRevokePermissionResponse)(resp), toErr(ctx, err)
}

func (auth *auth) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) {
resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role})
resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}, grpc.FailFast(false))
return (*AuthRoleDeleteResponse)(resp), toErr(ctx, err)
}

Expand Down
20 changes: 20 additions & 0 deletions clientv3/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package clientv3

import (
"errors"
"net/url"
"strings"
"sync"
Expand All @@ -23,6 +24,11 @@ import (
"google.golang.org/grpc"
)

// ErrNoAddrAvilable is returned by Get() when the balancer does not have
// any active connection to endpoints at the time.
// This error is returned only when opts.BlockingWait is true.
var ErrNoAddrAvilable = errors.New("there is no address available")

// simpleBalancer does the bare minimum to expose multiple eps
// to the grpc reconnection code path
type simpleBalancer struct {
Expand Down Expand Up @@ -162,6 +168,20 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {

func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) {
var addr string

// If opts.BlockingWait is false (for fail-fast RPCs), it should return
// an address it has notified via Notify immediately instead of blocking.
if !opts.BlockingWait {
b.mu.RLock()
addr = b.pinAddr
upEps := len(b.upEps)
b.mu.RUnlock()
if upEps == 0 {
return grpc.Address{Addr: ""}, nil, ErrNoAddrAvilable
}
return grpc.Address{Addr: addr}, func() {}, nil
}

for {
b.mu.RLock()
ch := b.upc
Expand Down
106 changes: 106 additions & 0 deletions clientv3/balancer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package clientv3

import (
"errors"
"testing"
"time"

"golang.org/x/net/context"
"google.golang.org/grpc"
)

var (
endpoints = []string{"localhost:2379", "localhost:22379", "localhost:32379"}
)

func TestBalancerGetUnblocking(t *testing.T) {
sb := newSimpleBalancer(endpoints)
unblockingOpts := grpc.BalancerGetOptions{BlockingWait: false}

_, _, err := sb.Get(context.Background(), unblockingOpts)
if err != ErrNoAddrAvilable {
t.Errorf("Get() with no up endpoints should return ErrNoAddrAvailable, got: %v", err)
}

down1 := sb.Up(grpc.Address{Addr: endpoints[1]})
down2 := sb.Up(grpc.Address{Addr: endpoints[2]})
addrFirst, putFun, err := sb.Get(context.Background(), unblockingOpts)
if err != nil {
t.Errorf("Get() with up endpoints should sucess, got %v", err)
}
if addrFirst.Addr != endpoints[1] && addrFirst.Addr != endpoints[2] {
t.Errorf("Get() didn't return expected address, got %v", addrFirst)
}
if putFun == nil {
t.Errorf("Get() returned unexpected nil put function")
}
addrSecond, _, _ := sb.Get(context.Background(), unblockingOpts)
if addrSecond.Addr != addrSecond.Addr {
t.Errorf("Get() didn't return the same address as previous call, got %v and %v", addrFirst, addrSecond)
}

down1(errors.New("error"))
down2(errors.New("error"))
_, _, err = sb.Get(context.Background(), unblockingOpts)
if err != ErrNoAddrAvilable {
t.Errorf("Get() with no up endpoints should return ErrNoAddrAvailable, got: %v", err)
}
}

func TestBalancerGetBlocking(t *testing.T) {
sb := newSimpleBalancer(endpoints)
blockingOpts := grpc.BalancerGetOptions{BlockingWait: true}

ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*100)
_, _, err := sb.Get(ctx, blockingOpts)
if err != context.DeadlineExceeded {
t.Errorf("Get() with no up endpoints should timeout, got %v", err)
}

downC := make(chan func(error), 1)

go func() {
// ensure sb.Up() will be called after sb.Get() to see if Up() releases blocking Get()
time.Sleep(time.Millisecond * 100)
downC <- sb.Up(grpc.Address{Addr: endpoints[1]})
}()
addrFirst, putFun, err := sb.Get(context.Background(), blockingOpts)
if err != nil {
t.Errorf("Get() with up endpoints should sucess, got %v", err)
}
if addrFirst.Addr != endpoints[1] {
t.Errorf("Get() didn't return expected address, got %v", addrFirst)
}
if putFun == nil {
t.Errorf("Get() returned unexpected nil put function")
}
down1 := <-downC

down2 := sb.Up(grpc.Address{Addr: endpoints[2]})
addrSecond, _, _ := sb.Get(context.Background(), blockingOpts)
if addrSecond.Addr != addrSecond.Addr {
t.Errorf("Get() didn't return the same address as previous call, got %v and %v", addrFirst, addrSecond)
}

down1(errors.New("error"))
down2(errors.New("error"))
ctx, _ = context.WithTimeout(context.Background(), time.Millisecond*100)
_, _, err = sb.Get(ctx, blockingOpts)
if err != context.DeadlineExceeded {
t.Errorf("Get() with no up endpoints should timeout, got %v", err)
}
}
6 changes: 3 additions & 3 deletions clientv3/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewCluster(c *Client) Cluster {

func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
r := &pb.MemberAddRequest{PeerURLs: peerAddrs}
resp, err := c.remote.MemberAdd(ctx, r)
resp, err := c.remote.MemberAdd(ctx, r, grpc.FailFast(false))
if err == nil {
return (*MemberAddResponse)(resp), nil
}
Expand All @@ -64,7 +64,7 @@ func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAdd

func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) {
r := &pb.MemberRemoveRequest{ID: id}
resp, err := c.remote.MemberRemove(ctx, r)
resp, err := c.remote.MemberRemove(ctx, r, grpc.FailFast(false))
if err == nil {
return (*MemberRemoveResponse)(resp), nil
}
Expand All @@ -78,7 +78,7 @@ func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []strin
// it is safe to retry on update.
for {
r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs}
resp, err := c.remote.MemberUpdate(ctx, r)
resp, err := c.remote.MemberUpdate(ctx, r, grpc.FailFast(false))
if err == nil {
return (*MemberUpdateResponse)(resp), nil
}
Expand Down
4 changes: 2 additions & 2 deletions clientv3/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,14 @@ func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) {
case tPut:
var resp *pb.PutResponse
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV}
resp, err = kv.remote.Put(ctx, r)
resp, err = kv.remote.Put(ctx, r, grpc.FailFast(false))
if err == nil {
return OpResponse{put: (*PutResponse)(resp)}, nil
}
case tDeleteRange:
var resp *pb.DeleteRangeResponse
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
resp, err = kv.remote.DeleteRange(ctx, r)
resp, err = kv.remote.DeleteRange(ctx, r, grpc.FailFast(false))
if err == nil {
return OpResponse{del: (*DeleteResponse)(resp)}, nil
}
Expand Down
6 changes: 3 additions & 3 deletions clientv3/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err

for {
r := &pb.LeaseGrantRequest{TTL: ttl}
resp, err := l.remote.LeaseGrant(cctx, r)
resp, err := l.remote.LeaseGrant(cctx, r, grpc.FailFast(false))
if err == nil {
gresp := &LeaseGrantResponse{
ResponseHeader: resp.GetHeader(),
Expand All @@ -174,7 +174,7 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse,

for {
r := &pb.LeaseRevokeRequest{ID: int64(id)}
resp, err := l.remote.LeaseRevoke(cctx, r)
resp, err := l.remote.LeaseRevoke(cctx, r, grpc.FailFast(false))

if err == nil {
return (*LeaseRevokeResponse)(resp), nil
Expand All @@ -195,7 +195,7 @@ func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption

for {
r := toLeaseTimeToLiveRequest(id, opts...)
resp, err := l.remote.LeaseTimeToLive(cctx, r)
resp, err := l.remote.LeaseTimeToLive(cctx, r, grpc.FailFast(false))
if err == nil {
gresp := &LeaseTimeToLiveResponse{
ResponseHeader: resp.GetHeader(),
Expand Down

0 comments on commit 5c60478

Please sign in to comment.