Skip to content

Commit

Permalink
grpclb: backoff for RPC call if init handshake was unsucessful (#2077)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl authored Jun 14, 2018
1 parent e218c92 commit 692f13a
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 8 deletions.
17 changes: 17 additions & 0 deletions balancer/grpclb/grpclb.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/resolver"
)

Expand All @@ -46,6 +47,19 @@ const (
grpclbName = "grpclb"
)

var (
// defaultBackoffConfig configures the backoff strategy that's used when the
// init handshake in the RPC is unsuccessful. It's not for the clientconn
// reconnect backoff.
//
// It has the same value as the default grpc.DefaultBackoffConfig.
//
// TODO: make backoff configurable.
defaultBackoffConfig = backoff.Exponential{
MaxDelay: 120 * time.Second,
}
)

func convertDuration(d *durationpb.Duration) time.Duration {
if d == nil {
return 0
Expand Down Expand Up @@ -147,6 +161,7 @@ func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) bal
scStates: make(map[balancer.SubConn]connectivity.State),
picker: &errPicker{err: balancer.ErrNoSubConnAvailable},
clientStats: newRPCStats(),
backoff: defaultBackoffConfig, // TODO: make backoff configurable.
}

return lb
Expand All @@ -165,6 +180,8 @@ type lbBalancer struct {
manualResolver *lbManualResolver
// The ClientConn to talk to the remote balancer.
ccRemoteLB *grpc.ClientConn
// backoff for calling remote balancer.
backoff backoff.Strategy

// Support client side load reporting. Each picker gets a reference to this,
// and will update its content.
Expand Down
31 changes: 23 additions & 8 deletions balancer/grpclb/grpclb_remote_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,13 @@ func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.D
}
}

func (lb *lbBalancer) callRemoteBalancer() error {
func (lb *lbBalancer) callRemoteBalancer() (backoff bool, _ error) {
lbClient := &loadBalancerClient{cc: lb.ccRemoteLB}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := lbClient.BalanceLoad(ctx, grpc.FailFast(false))
if err != nil {
return fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
return true, fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
}

// grpclb handshake on the stream.
Expand All @@ -197,31 +197,33 @@ func (lb *lbBalancer) callRemoteBalancer() error {
},
}
if err := stream.Send(initReq); err != nil {
return fmt.Errorf("grpclb: failed to send init request: %v", err)
return true, fmt.Errorf("grpclb: failed to send init request: %v", err)
}
reply, err := stream.Recv()
if err != nil {
return fmt.Errorf("grpclb: failed to recv init response: %v", err)
return true, fmt.Errorf("grpclb: failed to recv init response: %v", err)
}
initResp := reply.GetInitialResponse()
if initResp == nil {
return fmt.Errorf("grpclb: reply from remote balancer did not include initial response")
return true, fmt.Errorf("grpclb: reply from remote balancer did not include initial response")
}
if initResp.LoadBalancerDelegate != "" {
return fmt.Errorf("grpclb: Delegation is not supported")
return true, fmt.Errorf("grpclb: Delegation is not supported")
}

go func() {
if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 {
lb.sendLoadReport(stream, d)
}
}()
return lb.readServerList(stream)
// No backoff if init req/resp handshake was successful.
return false, lb.readServerList(stream)
}

func (lb *lbBalancer) watchRemoteBalancer() {
var retryCount int
for {
err := lb.callRemoteBalancer()
doBackoff, err := lb.callRemoteBalancer()
select {
case <-lb.doneCh:
return
Expand All @@ -231,6 +233,19 @@ func (lb *lbBalancer) watchRemoteBalancer() {
}
}

if !doBackoff {
retryCount = 0
continue
}

timer := time.NewTimer(lb.backoff.Backoff(retryCount))
select {
case <-timer.C:
case <-lb.doneCh:
timer.Stop()
return
}
retryCount++
}
}

Expand Down

0 comments on commit 692f13a

Please sign in to comment.