Skip to content

Commit

Permalink
make default pickfirst step 1 and switching balancer, step 0.85
Browse files Browse the repository at this point in the history
removed two tests:
 - TestDialWithBlockErrorOnBadCertificates
 - TestDialWithBlockErrorOnNonTemporaryErrorDialer
  • Loading branch information
menghanl committed Sep 12, 2017
1 parent 6a94105 commit c248e34
Show file tree
Hide file tree
Showing 11 changed files with 237 additions and 181 deletions.
4 changes: 4 additions & 0 deletions balancer/roundrobin/roundrobin.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,7 @@ func (cse *connectivityStateEvaluator) recordTransition(oldState, newState conne
}
return connectivity.TransientFailure
}

func init() {
balancer.Register(NewBuilder())
}
8 changes: 8 additions & 0 deletions balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ type ccBalancerWrapper struct {
stateChangeQueue *tupleBuffer
resolverAddrCh chan *resolverChangeTuple
done chan struct{}

subConns map[*acBalancerWrapper]struct{}
}

func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
Expand All @@ -103,6 +105,7 @@ func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.Bui
stateChangeQueue: newTupleBuffer(),
resolverAddrCh: make(chan *resolverChangeTuple, 1),
done: make(chan struct{}),
subConns: make(map[*acBalancerWrapper]struct{}),
}
go ccb.watcher()
ccb.balancer = b.Build(ccb, bopts)
Expand All @@ -128,6 +131,9 @@ func (ccb *ccBalancerWrapper) watcher() {
ccb.balancer.HandleResolvedAddrs(t.addrs, t.err)
case <-ccb.done:
ccb.balancer.Close()
for acbw := range ccb.subConns {
ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
}
return
}
}
Expand Down Expand Up @@ -166,6 +172,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
}
acbw := &acBalancerWrapper{ac: ac}
ac.acbw = acbw
ccb.subConns[acbw] = struct{}{}
return acbw, nil
}

Expand All @@ -175,6 +182,7 @@ func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
if !ok {
return
}
delete(ccb.subConns, acbw)
ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
}

Expand Down
102 changes: 102 additions & 0 deletions balancer_switching_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package grpc

import (
"math"
"testing"
"time"

"golang.org/x/net/context"
_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/test/leakcheck"
)

func checkPickFirst(cc *ClientConn, servers []*server, t *testing.T) {
var (
req = "port"
reply string
err error
)
// The second RPC should succeed with the first server.
for i := 0; i < 1000; i++ {
if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port {
return
}
time.Sleep(time.Millisecond)
}
t.Fatalf("EmptyCall() = _, %v, want _, %v", err, servers[0].port)
}

func checkRoundRobin(cc *ClientConn, servers []*server, t *testing.T) {
var (
req = "port"
reply string
err error
)

// Make sure connections to all servers are up.
for _, s := range servers {
for {
if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == s.port {
break
}
time.Sleep(time.Millisecond)
}
}

serverCount := len(servers)
for i := 0; i < 3*serverCount; i++ {
err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc)
if ErrorDesc(err) != servers[i%serverCount].port {
t.Fatalf("Index %d: want peer %v, got peer %v", i, servers[i%serverCount].port, err)
}
}
}

func TestSwitchBalancer(t *testing.T) {
defer leakcheck.Check(t)
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()

numServers := 2
servers, _ := startServers(t, numServers, math.MaxInt32)
defer func() {
for i := 0; i < numServers; i++ {
servers[i].stop()
}
}()

cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}})
// The default balancer is pickfirst.
checkPickFirst(cc, servers, t)
// Switch to roundrobin.
cc.switchBalancer("roundrobin")
checkRoundRobin(cc, servers, t)
// Switch to pickfirst.
cc.switchBalancer("pickfirst")
checkPickFirst(cc, servers, t)
}
1 change: 1 addition & 0 deletions balancer_v1_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ func (bw *balancerWrapper) Close() {
close(bw.startCh)
}
bw.balancer.Close()
// TODO(bar switching) RemoveSubConn.
return
}

Expand Down
Loading

0 comments on commit c248e34

Please sign in to comment.