Skip to content

Commit

Permalink
Register and use default balancers and resolvers (grpc#1551)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Oct 25, 2017
1 parent f164acd commit 0c4b6f9
Show file tree
Hide file tree
Showing 12 changed files with 356 additions and 252 deletions.
7 changes: 2 additions & 5 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import (
var (
// m is a map from name to balancer builder.
m = make(map[string]Builder)
// defaultBuilder is the default balancer to use.
defaultBuilder Builder // TODO(bar) install pickfirst as default.
)

// Register registers the balancer builder to the balancer map.
Expand All @@ -44,13 +42,12 @@ func Register(b Builder) {
}

// Get returns the resolver builder registered with the given name.
// If no builder is register with the name, the default pickfirst will
// be used.
// If no builder is register with the name, nil will be returned.
func Get(name string) Builder {
if b, ok := m[name]; ok {
return b
}
return defaultBuilder
return nil
}

// SubConn represents a gRPC sub connection.
Expand Down
19 changes: 11 additions & 8 deletions balancer/roundrobin/roundrobin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

package roundrobin
package roundrobin_test

import (
"fmt"
Expand All @@ -27,6 +27,7 @@ import (

"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
_ "google.golang.org/grpc/grpclog/glogger"
"google.golang.org/grpc/peer"
Expand All @@ -36,6 +37,8 @@ import (
"google.golang.org/grpc/test/leakcheck"
)

var rr = balancer.Get("roundrobin")

type testServer struct {
testpb.TestServiceServer
}
Expand Down Expand Up @@ -99,7 +102,7 @@ func TestOneBackend(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(newBuilder()))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -131,7 +134,7 @@ func TestBackendsRoundRobin(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(newBuilder()))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -190,7 +193,7 @@ func TestAddressesRemoved(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(newBuilder()))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -232,7 +235,7 @@ func TestCloseWithPendingRPC(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(newBuilder()))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -266,7 +269,7 @@ func TestNewAddressWhileBlocking(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(newBuilder()))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -315,7 +318,7 @@ func TestOneServerDown(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(newBuilder()))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -408,7 +411,7 @@ func TestAllServersDown(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(newBuilder()))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down
24 changes: 23 additions & 1 deletion balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (b *scStateUpdateBuffer) load() {
}
}

// get returns the channel that receives a recvMsg in the buffer.
// get returns the channel that the scStateUpdate will be sent to.
//
// Upon receiving, the caller should call load to send another
// scStateChangeTuple onto the channel if there is any.
Expand All @@ -96,6 +96,8 @@ type ccBalancerWrapper struct {
stateChangeQueue *scStateUpdateBuffer
resolverUpdateCh chan *resolverUpdate
done chan struct{}

subConns map[*acBalancerWrapper]struct{}
}

func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
Expand All @@ -104,6 +106,7 @@ func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.Bui
stateChangeQueue: newSCStateUpdateBuffer(),
resolverUpdateCh: make(chan *resolverUpdate, 1),
done: make(chan struct{}),
subConns: make(map[*acBalancerWrapper]struct{}),
}
go ccb.watcher()
ccb.balancer = b.Build(ccb, bopts)
Expand All @@ -117,15 +120,30 @@ func (ccb *ccBalancerWrapper) watcher() {
select {
case t := <-ccb.stateChangeQueue.get():
ccb.stateChangeQueue.load()
select {
case <-ccb.done:
ccb.balancer.Close()
return
default:
}
ccb.balancer.HandleSubConnStateChange(t.sc, t.state)
case t := <-ccb.resolverUpdateCh:
select {
case <-ccb.done:
ccb.balancer.Close()
return
default:
}
ccb.balancer.HandleResolvedAddrs(t.addrs, t.err)
case <-ccb.done:
}

select {
case <-ccb.done:
ccb.balancer.Close()
for acbw := range ccb.subConns {
ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
}
return
default:
}
Expand Down Expand Up @@ -171,7 +189,10 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
return nil, err
}
acbw := &acBalancerWrapper{ac: ac}
acbw.ac.mu.Lock()
ac.acbw = acbw
acbw.ac.mu.Unlock()
ccb.subConns[acbw] = struct{}{}
return acbw, nil
}

Expand All @@ -181,6 +202,7 @@ func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
if !ok {
return
}
delete(ccb.subConns, acbw)
ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
}

Expand Down
133 changes: 133 additions & 0 deletions balancer_switching_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package grpc

import (
"fmt"
"math"
"testing"
"time"

"golang.org/x/net/context"
_ "google.golang.org/grpc/grpclog/glogger"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/test/leakcheck"
)

func checkPickFirst(cc *ClientConn, servers []*server) error {
var (
req = "port"
reply string
err error
)
connected := false
for i := 0; i < 1000; i++ {
if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); ErrorDesc(err) == servers[0].port {
if connected {
// connected is set to false if peer is not server[0]. So if
// connected is true here, this is the second time we saw
// server[0] in a row. Break because pickfirst is in effect.
break
}
connected = true
} else {
connected = false
}
time.Sleep(time.Millisecond)
}
if !connected {
return fmt.Errorf("pickfirst is not in effect after 1 second, EmptyCall() = _, %v, want _, %v", err, servers[0].port)
}
// The following RPCs should all succeed with the first server.
for i := 0; i < 3; i++ {
err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc)
if ErrorDesc(err) != servers[0].port {
return fmt.Errorf("Index %d: want peer %v, got peer %v", i, servers[0].port, err)
}
}
return nil
}

func checkRoundRobin(cc *ClientConn, servers []*server) error {
var (
req = "port"
reply string
err error
)

// Make sure connections to all servers are up.
for i := 0; i < 2; i++ {
// Do this check twice, otherwise the first RPC's transport may still be
// picked by the closing pickfirst balancer, and the test becomes flaky.
for _, s := range servers {
var up bool
for i := 0; i < 1000; i++ {
if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); ErrorDesc(err) == s.port {
up = true
break
}
time.Sleep(time.Millisecond)
}
if !up {
return fmt.Errorf("server %v is not up within 1 second", s.port)
}
}
}

serverCount := len(servers)
for i := 0; i < 3*serverCount; i++ {
err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc)
if ErrorDesc(err) != servers[i%serverCount].port {
return fmt.Errorf("Index %d: want peer %v, got peer %v", i, servers[i%serverCount].port, err)
}
}
return nil
}

func TestSwitchBalancer(t *testing.T) {
defer leakcheck.Check(t)
r, rcleanup := manual.GenerateAndRegisterManualResolver()
defer rcleanup()

numServers := 2
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()

cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}})
// The default balancer is pickfirst.
if err := checkPickFirst(cc, servers); err != nil {
t.Fatalf("check pickfirst returned non-nil error: %v", err)
}
// Switch to roundrobin.
cc.switchBalancer("roundrobin")
if err := checkRoundRobin(cc, servers); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
}
// Switch to pickfirst.
cc.switchBalancer("pickfirst")
if err := checkPickFirst(cc, servers); err != nil {
t.Fatalf("check pickfirst returned non-nil error: %v", err)
}
}
Loading

0 comments on commit 0c4b6f9

Please sign in to comment.