-
Notifications
You must be signed in to change notification settings - Fork 4.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
grpc: support channel idleness #6263
Changes from 15 commits
bb51629
968a05f
fe30e55
1693446
0b2a531
87ae17b
32875a1
5c6910e
fa1a004
2b04a5b
0748ab4
93c990e
a0eaa5c
37c286a
0682572
460400c
f13ba8a
2068ba0
d69513f
f1f091a
f94f923
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,6 +32,15 @@ import ( | |
"google.golang.org/grpc/resolver" | ||
) | ||
|
||
type ccbMode int | ||
|
||
const ( | ||
ccbModeActive = iota | ||
ccbModeIdle | ||
ccbModeClosed | ||
ccbModeExitingIdle | ||
) | ||
|
||
// ccBalancerWrapper sits between the ClientConn and the Balancer. | ||
// | ||
// ccBalancerWrapper implements methods corresponding to the ones on the | ||
|
@@ -46,16 +55,25 @@ import ( | |
// It uses the gracefulswitch.Balancer internally to ensure that balancer | ||
// switches happen in a graceful manner. | ||
type ccBalancerWrapper struct { | ||
cc *ClientConn | ||
// The following fields are initialized when the wrapper is created and are | ||
// read-only afterwards, and therefore can be accessed without a mutex. | ||
cc *ClientConn | ||
opts balancer.BuildOptions | ||
|
||
// Outgoing (gRPC --> balancer) calls are guaranteed to execute in a | ||
// mutually exclusive manner as they are scheduled on the | ||
// CallbackSerializer. Fields accessed *only* in serializer callbacks, can | ||
// therefore be accessed without a mutex. | ||
serializer *grpcsync.CallbackSerializer | ||
serializerCancel context.CancelFunc | ||
balancer *gracefulswitch.Balancer | ||
curBalancerName string | ||
// mutually exclusive manner as they are scheduled in the serializer. Fields | ||
// accessed *only* in these serializer callbacks, can therefore be accessed | ||
// without a mutex. | ||
balancer *gracefulswitch.Balancer | ||
curBalancerName string | ||
|
||
// mu guards access to the below fields. Access to the serializer and its | ||
// cancel function needs to be mutex protected because they are overwritten | ||
// when the wrapper exits idle mode. | ||
mu sync.Mutex | ||
serializer *grpcsync.CallbackSerializer // To serialize all outoing calls. | ||
serializerCancel context.CancelFunc // To close the seralizer at close/enterIdle time. | ||
mode ccbMode // Tracks the current mode of the wrapper. | ||
} | ||
|
||
// newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer | ||
|
@@ -64,6 +82,7 @@ func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalanc | |
ctx, cancel := context.WithCancel(context.Background()) | ||
ccb := &ccBalancerWrapper{ | ||
cc: cc, | ||
opts: bopts, | ||
serializer: grpcsync.NewCallbackSerializer(ctx), | ||
serializerCancel: cancel, | ||
} | ||
|
@@ -74,8 +93,12 @@ func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalanc | |
// updateClientConnState is invoked by grpc to push a ClientConnState update to | ||
// the underlying balancer. | ||
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error { | ||
ccb.mu.Lock() | ||
errCh := make(chan error, 1) | ||
ccb.serializer.Schedule(func(_ context.Context) { | ||
// Here and everywhere else where Schedule() is called, it is done with the | ||
// lock held. But the lock guards only the scheduling part. The actual | ||
// callback is called asynchronously without the lock being held. | ||
ok := ccb.serializer.Schedule(func(_ context.Context) { | ||
// If the addresses specified in the update contain addresses of type | ||
// "grpclb" and the selected LB policy is not "grpclb", these addresses | ||
// will be filtered out and ccs will be modified with the updated | ||
|
@@ -92,16 +115,19 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat | |
} | ||
errCh <- ccb.balancer.UpdateClientConnState(*ccs) | ||
}) | ||
|
||
// If the balancer wrapper is closed when waiting for this state update to | ||
// be handled, the callback serializer will be closed as well, and we can | ||
// rely on its Done channel to ensure that we don't block here forever. | ||
select { | ||
case err := <-errCh: | ||
return err | ||
case <-ccb.serializer.Done: | ||
return nil | ||
if !ok { | ||
// If we are unable to schedule a function with the serializer, it | ||
// indicates that it has been closed. A serializer is only closed when | ||
// the wrapper is closed or is in idle. | ||
ccb.mu.Unlock() | ||
return fmt.Errorf("grpc: cannot send state update to a closed or idle balancer") | ||
} | ||
ccb.mu.Unlock() | ||
|
||
// We get here only if the above call to Schedule succeeds, in which case it | ||
// is guaranteed that the scheduled function will run. Therefore it is safe | ||
// to block on this channel. | ||
return <-errCh | ||
} | ||
|
||
// updateSubConnState is invoked by grpc to push a subConn state update to the | ||
|
@@ -120,21 +146,19 @@ func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connecti | |
if sc == nil { | ||
return | ||
} | ||
ccb.mu.Lock() | ||
ccb.serializer.Schedule(func(_ context.Context) { | ||
ccb.balancer.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: s, ConnectionError: err}) | ||
}) | ||
} | ||
|
||
func (ccb *ccBalancerWrapper) exitIdle() { | ||
ccb.serializer.Schedule(func(_ context.Context) { | ||
ccb.balancer.ExitIdle() | ||
}) | ||
ccb.mu.Unlock() | ||
} | ||
|
||
func (ccb *ccBalancerWrapper) resolverError(err error) { | ||
ccb.mu.Lock() | ||
ccb.serializer.Schedule(func(_ context.Context) { | ||
ccb.balancer.ResolverError(err) | ||
}) | ||
ccb.mu.Unlock() | ||
} | ||
|
||
// switchTo is invoked by grpc to instruct the balancer wrapper to switch to the | ||
|
@@ -148,42 +172,146 @@ func (ccb *ccBalancerWrapper) resolverError(err error) { | |
// the ccBalancerWrapper keeps track of the current LB policy name, and skips | ||
// the graceful balancer switching process if the name does not change. | ||
func (ccb *ccBalancerWrapper) switchTo(name string) { | ||
ccb.mu.Lock() | ||
ccb.serializer.Schedule(func(_ context.Context) { | ||
// TODO: Other languages use case-sensitive balancer registries. We should | ||
// switch as well. See: https://github.com/grpc/grpc-go/issues/5288. | ||
if strings.EqualFold(ccb.curBalancerName, name) { | ||
return | ||
} | ||
ccb.buildLoadBalancingPolicy(name) | ||
}) | ||
ccb.mu.Unlock() | ||
} | ||
|
||
// Use the default LB policy, pick_first, if no LB policy with name is | ||
// found in the registry. | ||
builder := balancer.Get(name) | ||
if builder == nil { | ||
channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name) | ||
builder = newPickfirstBuilder() | ||
} else { | ||
channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name) | ||
} | ||
// buildLoadBalancingPolicy performs the following: | ||
// - retrieve a balancer builder for the given name. Use the default LB | ||
// policy, pick_first, if no LB policy with name is found in the registry. | ||
// - instruct the gracefulswitch balancer to switch to the above builder. This | ||
// will actually build the new balancer. | ||
// - update the `curBalancerName` field | ||
// | ||
// Must be called from a serializer callback. | ||
func (ccb *ccBalancerWrapper) buildLoadBalancingPolicy(name string) { | ||
builder := balancer.Get(name) | ||
if builder == nil { | ||
channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name) | ||
builder = newPickfirstBuilder() | ||
} else { | ||
channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name) | ||
} | ||
|
||
if err := ccb.balancer.SwitchTo(builder); err != nil { | ||
channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err) | ||
return | ||
} | ||
ccb.curBalancerName = builder.Name() | ||
} | ||
|
||
func (ccb *ccBalancerWrapper) close() { | ||
channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: closing") | ||
ccb.closeBalancer(ccbModeClosed) | ||
} | ||
|
||
// enterIdleMode is invoked by grpc when the channel enters idle mode upon | ||
// expiry of idle_timeout. This call blocks until the balancer is closed. | ||
func (ccb *ccBalancerWrapper) enterIdleMode() { | ||
channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: entering idle mode") | ||
ccb.closeBalancer(ccbModeIdle) | ||
} | ||
|
||
// closeBalancer is invoked when the channel is being closed or when it enters | ||
// idle mode upon expiry of idle_timeout. | ||
// | ||
// This call is not scheduled on the serializer because we need to ensure that | ||
// the current serializer is completely shutdown before the next one is created | ||
// (when exiting idle). | ||
func (ccb *ccBalancerWrapper) closeBalancer(m ccbMode) { | ||
ccb.mu.Lock() | ||
if ccb.mode == ccbModeClosed || ccb.mode == ccbModeIdle { | ||
ccb.mu.Unlock() | ||
return | ||
} | ||
|
||
// Close the serializer to ensure that no more calls from gRPC are sent | ||
// to the balancer. | ||
ccb.serializerCancel() | ||
ccb.mode = m | ||
done := ccb.serializer.Done | ||
b := ccb.balancer | ||
ccb.mu.Unlock() | ||
|
||
// Give enqueued callbacks a chance to finish. | ||
<-done | ||
// Spawn a goroutine to close the balancer (since it may block trying to | ||
// cleanup all allocated resources) and return early. | ||
go b.Close() | ||
} | ||
|
||
// exitIdleMode is invoked by grpc when the channel exits idle mode either | ||
// because of an RPC or because of an invocation of the Connect() API. This | ||
// recreates the balancer that was closed previously when entering idle mode. | ||
// | ||
// If the channel is not in idle mode, we know for a fact that we are here as a | ||
// result of the user calling the Connect() method on the ClientConn. In this | ||
// case, we can simply forward the call to the underlying balancer, instructing | ||
// it to reconnect to the backends. | ||
func (ccb *ccBalancerWrapper) exitIdleMode() { | ||
ccb.mu.Lock() | ||
if ccb.mode == ccbModeClosed { | ||
// Request to exit idle is a no-op when wrapper is already closed. | ||
ccb.mu.Unlock() | ||
return | ||
} | ||
|
||
if ccb.mode == ccbModeIdle { | ||
// Recreate the serializer which was closed when we entered idle. | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
ccb.serializer = grpcsync.NewCallbackSerializer(ctx) | ||
ccb.serializerCancel = cancel | ||
} | ||
|
||
if err := ccb.balancer.SwitchTo(builder); err != nil { | ||
channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err) | ||
// The ClientConn guarantees that mutual exclusion between close() and | ||
// exitIdleMode(), and since we just created a new serializer, we can be | ||
// sure that the below function will be scheduled. | ||
done := make(chan struct{}) | ||
ccb.serializer.Schedule(func(_ context.Context) { | ||
defer close(done) | ||
|
||
ccb.mu.Lock() | ||
defer ccb.mu.Unlock() | ||
|
||
if ccb.mode != ccbModeIdle { | ||
ccb.balancer.ExitIdle() | ||
return | ||
} | ||
ccb.curBalancerName = builder.Name() | ||
|
||
// Gracefulswitch balancer does not support a switchTo operation after | ||
// being closed. Hence we need to create a new one here. | ||
ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts) | ||
// Reset the current balancer name so that we act on the next call to | ||
// switchTo by creating a new balancer specified by the new resolver. | ||
ccb.curBalancerName = "" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be better to do this when entering idle instead of exiting? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
ccb.mode = ccbModeActive | ||
channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: exiting idle mode") | ||
|
||
}) | ||
ccb.mu.Unlock() | ||
|
||
<-done | ||
} | ||
|
||
func (ccb *ccBalancerWrapper) close() { | ||
// Close the serializer to ensure that no more calls from gRPC are sent to | ||
// the balancer. We don't have to worry about suppressing calls from a | ||
// closed balancer because these are handled by the ClientConn (balancer | ||
// wrapper is only ever closed when the ClientConn is closed). | ||
ccb.serializerCancel() | ||
<-ccb.serializer.Done | ||
ccb.balancer.Close() | ||
func (ccb *ccBalancerWrapper) isIdleOrClosed() bool { | ||
ccb.mu.Lock() | ||
defer ccb.mu.Unlock() | ||
return ccb.mode == ccbModeIdle || ccb.mode == ccbModeClosed | ||
} | ||
|
||
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { | ||
if ccb.isIdleOrClosed() { | ||
return nil, fmt.Errorf("grpc: cannot create SubConn when balancer is closed or idle") | ||
} | ||
|
||
if len(addrs) <= 0 { | ||
return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list") | ||
} | ||
|
@@ -200,6 +328,18 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer | |
} | ||
|
||
func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { | ||
if ccb.isIdleOrClosed() { | ||
// It it safe to ignore this call when the balancer is closed or in idle | ||
// because the ClientConn takes care of closing the connections. | ||
// | ||
// Not returning early from here when the balancer is closed or in idle | ||
// leads to a deadlock though, because of the following sequence of | ||
// calls when holding cc.mu: | ||
// cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close --> | ||
// ccb.RemoveAddrConn --> cc.removeAddrConn | ||
return | ||
} | ||
|
||
acbw, ok := sc.(*acBalancerWrapper) | ||
if !ok { | ||
return | ||
|
@@ -208,6 +348,10 @@ func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { | |
} | ||
|
||
func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { | ||
if ccb.isIdleOrClosed() { | ||
return | ||
} | ||
|
||
acbw, ok := sc.(*acBalancerWrapper) | ||
if !ok { | ||
return | ||
|
@@ -216,6 +360,10 @@ func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resol | |
} | ||
|
||
func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { | ||
if ccb.isIdleOrClosed() { | ||
return | ||
} | ||
|
||
// Update picker before updating state. Even though the ordering here does | ||
// not matter, it can lead to multiple calls of Pick in the common start-up | ||
// case where we wait for ready and then perform an RPC. If the picker is | ||
|
@@ -226,6 +374,10 @@ func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { | |
} | ||
|
||
func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) { | ||
if ccb.isIdleOrClosed() { | ||
return | ||
} | ||
|
||
ccb.cc.resolveNow(o) | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,14 +27,22 @@ import ( | |
// | ||
// All errors returned by Invoke are compatible with the status package. | ||
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error { | ||
if err := cc.idlenessMgr.onCallBegin(); err != nil { | ||
return err | ||
} | ||
|
||
// allow interceptor to see all applicable call options, which means those | ||
// configured as defaults from dial option as well as per-call options | ||
opts = combine(cc.dopts.callOptions, opts) | ||
|
||
var err error | ||
if cc.dopts.unaryInt != nil { | ||
return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...) | ||
err = cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...) | ||
} else { | ||
err = invoke(ctx, method, args, reply, cc, opts...) | ||
} | ||
return invoke(ctx, method, args, reply, cc, opts...) | ||
cc.idlenessMgr.onCallEnd() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Optional: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
return err | ||
} | ||
|
||
func combine(o1 []CallOption, o2 []CallOption) []CallOption { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should only happen if a balancer that was already
Closed
calls, right?If so, I think we should
logger.Error
this.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is making me think...
What happens if an LB policy (or resolver; I haven't looked at
resolver_conn_wrapper.go
yet) is closed due to entering idle, then the channel exits idle, and then the old LB policy (or resolver) makes calls on theClientConn
. It seems we need to either create new wrappers instead of reusing the existing ones, OR we need to ensure thatccb.balancer == <the balancer calling us but I don't know how we could even check this>
, OR we need to make accBalancerWrapperWrapper
that is invalidated when the LB policy is closed (this is hopefully not a serious suggestion).Especially since we delay closure by calling it in a goroutine, it seems like this is a very real possibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on our offline discussion, I switched the resolver wrapper to be closed on entering idle and recreated upon exiting idle.
But for the balancer and picker wrappers, I left it as is, i.e with methods to enter and exit idle. For the balancer wrapper, once the underlying balancer is closed, the graceful switch balancer (which is always our top-level balancer) will take care of dropping calls from the old (and closed) LB policy.
Also, for the balancer and picker wrappers, it is not be easy to close and recreate them because that would mean that all accesses to them would need to be guarded with
cc.mu
. This is not possible because the picker wrapper needs to be accessed at pick time, while the balancer wrapper needs to be accessed whenever there is a subConn state change and whencc.Connect
is called.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking the picker wrapper would be safe to access, since you would do so only after going through the idleness manager, which should ensure that the picker wrapper exists before returning. And a mutex around balancer accesses doesn't seem too troublesome. It would be nice to handle these two subcomponents the same way, but I'm fine with this approach if it's working and gets us idleness working.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that it would be nicer to follow the same approach with the other two wrappers. But given that I'm constrained for time at this point, I have left a TODO for the same.