Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpclb: backoff for RPC call if init handshake was unsucessful #2077

Merged
merged 2 commits into from
Jun 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions balancer/grpclb/grpclb.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/resolver"
)

Expand All @@ -46,6 +47,19 @@ const (
grpclbName = "grpclb"
)

var (
// defaultBackoffConfig configures the backoff strategy that's used when the
// init handshake in the RPC is unsuccessful. It's not for the clientconn
// reconnect backoff.
//
// It has the same value as the default grpc.DefaultBackoffConfig.
//
// TODO: make backoff configurable.
defaultBackoffConfig = backoff.Exponential{
MaxDelay: 120 * time.Second,
}
)

func convertDuration(d *durationpb.Duration) time.Duration {
if d == nil {
return 0
Expand Down Expand Up @@ -147,6 +161,7 @@ func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) bal
scStates: make(map[balancer.SubConn]connectivity.State),
picker: &errPicker{err: balancer.ErrNoSubConnAvailable},
clientStats: newRPCStats(),
backoff: defaultBackoffConfig, // TODO: make backoff configurable.
}

return lb
Expand All @@ -165,6 +180,8 @@ type lbBalancer struct {
manualResolver *lbManualResolver
// The ClientConn to talk to the remote balancer.
ccRemoteLB *grpc.ClientConn
// backoff for calling remote balancer.
backoff backoff.Strategy

// Support client side load reporting. Each picker gets a reference to this,
// and will update its content.
Expand Down
31 changes: 23 additions & 8 deletions balancer/grpclb/grpclb_remote_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,13 @@ func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.D
}
}

func (lb *lbBalancer) callRemoteBalancer() error {
func (lb *lbBalancer) callRemoteBalancer() (backoff bool, _ error) {
lbClient := &loadBalancerClient{cc: lb.ccRemoteLB}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := lbClient.BalanceLoad(ctx, grpc.FailFast(false))
if err != nil {
return fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
return true, fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
}

// grpclb handshake on the stream.
Expand All @@ -197,31 +197,33 @@ func (lb *lbBalancer) callRemoteBalancer() error {
},
}
if err := stream.Send(initReq); err != nil {
return fmt.Errorf("grpclb: failed to send init request: %v", err)
return true, fmt.Errorf("grpclb: failed to send init request: %v", err)
}
reply, err := stream.Recv()
if err != nil {
return fmt.Errorf("grpclb: failed to recv init response: %v", err)
return true, fmt.Errorf("grpclb: failed to recv init response: %v", err)
}
initResp := reply.GetInitialResponse()
if initResp == nil {
return fmt.Errorf("grpclb: reply from remote balancer did not include initial response")
return true, fmt.Errorf("grpclb: reply from remote balancer did not include initial response")
}
if initResp.LoadBalancerDelegate != "" {
return fmt.Errorf("grpclb: Delegation is not supported")
return true, fmt.Errorf("grpclb: Delegation is not supported")
}

go func() {
if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 {
lb.sendLoadReport(stream, d)
}
}()
return lb.readServerList(stream)
// No backoff if init req/resp handshake was successful.
return false, lb.readServerList(stream)
}

func (lb *lbBalancer) watchRemoteBalancer() {
var retryCount int
for {
err := lb.callRemoteBalancer()
doBackoff, err := lb.callRemoteBalancer()
select {
case <-lb.doneCh:
return
Expand All @@ -231,6 +233,19 @@ func (lb *lbBalancer) watchRemoteBalancer() {
}
}

if !doBackoff {
retryCount = 0
continue
}

timer := time.NewTimer(lb.backoff.Backoff(retryCount))
select {
case <-timer.C:
case <-lb.doneCh:
timer.Stop()
return
}
retryCount++
}
}

Expand Down