Skip to content

Commit

Permalink
grpclb: backoff for RPC call if init handshake was unsucessfully
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Jun 13, 2018
1 parent 24f3cca commit 617c849
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 8 deletions.
3 changes: 3 additions & 0 deletions balancer/grpclb/grpclb.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) bal
scStates: make(map[balancer.SubConn]connectivity.State),
picker: &errPicker{err: balancer.ErrNoSubConnAvailable},
clientStats: newRPCStats(),
backoff: grpc.DefaultBackoffConfig, // TODO: make backoff configurable
}

return lb
Expand All @@ -165,6 +166,8 @@ type lbBalancer struct {
manualResolver *lbManualResolver
// The ClientConn to talk to the remote balancer.
ccRemoteLB *grpc.ClientConn
// backoff for calling remote balancer.
backoff grpc.BackoffConfig

// Support client side load reporting. Each picker gets a reference to this,
// and will update its content.
Expand Down
31 changes: 23 additions & 8 deletions balancer/grpclb/grpclb_remote_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,13 @@ func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.D
}
}

func (lb *lbBalancer) callRemoteBalancer() error {
func (lb *lbBalancer) callRemoteBalancer() (backoff bool, _ error) {
lbClient := &loadBalancerClient{cc: lb.ccRemoteLB}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := lbClient.BalanceLoad(ctx, grpc.FailFast(false))
if err != nil {
return fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
return true, fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
}

// grpclb handshake on the stream.
Expand All @@ -197,31 +197,33 @@ func (lb *lbBalancer) callRemoteBalancer() error {
},
}
if err := stream.Send(initReq); err != nil {
return fmt.Errorf("grpclb: failed to send init request: %v", err)
return true, fmt.Errorf("grpclb: failed to send init request: %v", err)
}
reply, err := stream.Recv()
if err != nil {
return fmt.Errorf("grpclb: failed to recv init response: %v", err)
return true, fmt.Errorf("grpclb: failed to recv init response: %v", err)
}
initResp := reply.GetInitialResponse()
if initResp == nil {
return fmt.Errorf("grpclb: reply from remote balancer did not include initial response")
return true, fmt.Errorf("grpclb: reply from remote balancer did not include initial response")
}
if initResp.LoadBalancerDelegate != "" {
return fmt.Errorf("grpclb: Delegation is not supported")
return true, fmt.Errorf("grpclb: Delegation is not supported")
}

go func() {
if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 {
lb.sendLoadReport(stream, d)
}
}()
return lb.readServerList(stream)
// No backoff if init req/resp handshake was successful.
return false, lb.readServerList(stream)
}

func (lb *lbBalancer) watchRemoteBalancer() {
var retryCount int
for {
err := lb.callRemoteBalancer()
doBackoff, err := lb.callRemoteBalancer()
select {
case <-lb.doneCh:
return
Expand All @@ -231,6 +233,19 @@ func (lb *lbBalancer) watchRemoteBalancer() {
}
}

if !doBackoff {
retryCount = 0
continue
}

timer := time.NewTimer(lb.backoff.backoff(retryCount))
select {
case <-timer.C:
case <-lb.doneCh:
timer.Stop()
return
}
retryCount++
}
}

Expand Down

0 comments on commit 617c849

Please sign in to comment.