Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc: support channel idleness #6263

Merged
merged 21 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
bb51629
grpc: support channel idleness
easwars May 5, 2023
968a05f
call cc.Connect only when IDLE at blocking dial time
easwars May 12, 2023
fe30e55
support an atomic idleness manager
easwars May 12, 2023
1693446
align 64-bit fields for proper atomic access
easwars May 16, 2023
0b2a531
make vet happy
easwars May 16, 2023
87ae17b
implement Doug's suggestion
easwars May 17, 2023
32875a1
minor fix + access timer under lock
easwars May 17, 2023
5c6910e
also check acitivty in tryEnterIdleMode
easwars May 17, 2023
fa1a004
skip error log when calls count is negative in onCallEnd
easwars May 18, 2023
2b04a5b
create and destroy resolver wrapper instead of supporting idleness in it
easwars May 18, 2023
0748ab4
start channel in idle, and kick it out of idle at the end of Dial
easwars May 18, 2023
93c990e
review comments + remove mutexIdlenessManager
easwars May 18, 2023
a0eaa5c
reset ccb.curBalancerName when exiting idle
easwars May 18, 2023
37c286a
remove the mutex idleness manager from tests
easwars May 18, 2023
0682572
todo to switch balancer/picker wrappers to the same approach as resol…
easwars May 18, 2023
460400c
remove unused consts
easwars May 18, 2023
f13ba8a
reset ccb.curBalancerName when entering idle instead of when exiting …
easwars May 18, 2023
2068ba0
refactor new idleness manager to hide implementation details
easwars May 19, 2023
d69513f
defer the call to onCallEnd()
easwars May 19, 2023
f1f091a
test cleanups
easwars May 20, 2023
f94f923
handle small flake by trying the RPC more frequently
easwars May 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 199 additions & 44 deletions balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ import (
"google.golang.org/grpc/resolver"
)

type ccbMode int

const (
ccbModeActive = iota
ccbModeIdle
ccbModeClosed
ccbModeExitingIdle
)

// ccBalancerWrapper sits between the ClientConn and the Balancer.
//
// ccBalancerWrapper implements methods corresponding to the ones on the
Expand All @@ -46,16 +55,25 @@ import (
// It uses the gracefulswitch.Balancer internally to ensure that balancer
// switches happen in a graceful manner.
type ccBalancerWrapper struct {
cc *ClientConn
// The following fields are initialized when the wrapper is created and are
// read-only afterwards, and therefore can be accessed without a mutex.
cc *ClientConn
opts balancer.BuildOptions

// Outgoing (gRPC --> balancer) calls are guaranteed to execute in a
// mutually exclusive manner as they are scheduled on the
// CallbackSerializer. Fields accessed *only* in serializer callbacks, can
// therefore be accessed without a mutex.
serializer *grpcsync.CallbackSerializer
serializerCancel context.CancelFunc
balancer *gracefulswitch.Balancer
curBalancerName string
// mutually exclusive manner as they are scheduled in the serializer. Fields
// accessed *only* in these serializer callbacks, can therefore be accessed
// without a mutex.
balancer *gracefulswitch.Balancer
curBalancerName string

// mu guards access to the below fields. Access to the serializer and its
// cancel function needs to be mutex protected because they are overwritten
// when the wrapper exits idle mode.
mu sync.Mutex
serializer *grpcsync.CallbackSerializer // To serialize all outoing calls.
serializerCancel context.CancelFunc // To close the seralizer at close/enterIdle time.
mode ccbMode // Tracks the current mode of the wrapper.
}

// newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer
Expand All @@ -64,6 +82,7 @@ func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalanc
ctx, cancel := context.WithCancel(context.Background())
ccb := &ccBalancerWrapper{
cc: cc,
opts: bopts,
serializer: grpcsync.NewCallbackSerializer(ctx),
serializerCancel: cancel,
}
Expand All @@ -74,8 +93,12 @@ func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalanc
// updateClientConnState is invoked by grpc to push a ClientConnState update to
// the underlying balancer.
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
ccb.mu.Lock()
errCh := make(chan error, 1)
ccb.serializer.Schedule(func(_ context.Context) {
// Here and everywhere else where Schedule() is called, it is done with the
// lock held. But the lock guards only the scheduling part. The actual
// callback is called asynchronously without the lock being held.
ok := ccb.serializer.Schedule(func(_ context.Context) {
// If the addresses specified in the update contain addresses of type
// "grpclb" and the selected LB policy is not "grpclb", these addresses
// will be filtered out and ccs will be modified with the updated
Expand All @@ -92,16 +115,19 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat
}
errCh <- ccb.balancer.UpdateClientConnState(*ccs)
})

// If the balancer wrapper is closed when waiting for this state update to
// be handled, the callback serializer will be closed as well, and we can
// rely on its Done channel to ensure that we don't block here forever.
select {
case err := <-errCh:
return err
case <-ccb.serializer.Done:
return nil
if !ok {
// If we are unable to schedule a function with the serializer, it
// indicates that it has been closed. A serializer is only closed when
// the wrapper is closed or is in idle.
ccb.mu.Unlock()
return fmt.Errorf("grpc: cannot send state update to a closed or idle balancer")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should only happen if a balancer that was already Closed calls, right?

If so, I think we should logger.Error this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is making me think...

What happens if an LB policy (or resolver; I haven't looked at resolver_conn_wrapper.go yet) is closed due to entering idle, then the channel exits idle, and then the old LB policy (or resolver) makes calls on the ClientConn. It seems we need to either create new wrappers instead of reusing the existing ones, OR we need to ensure that ccb.balancer == <the balancer calling us but I don't know how we could even check this>, OR we need to make a ccBalancerWrapperWrapper that is invalidated when the LB policy is closed (this is hopefully not a serious suggestion).

Especially since we delay closure by calling it in a goroutine, it seems like this is a very real possibility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on our offline discussion, I switched the resolver wrapper to be closed on entering idle and recreated upon exiting idle.

But for the balancer and picker wrappers, I left it as is, i.e with methods to enter and exit idle. For the balancer wrapper, once the underlying balancer is closed, the graceful switch balancer (which is always our top-level balancer) will take care of dropping calls from the old (and closed) LB policy.

Also, for the balancer and picker wrappers, it is not be easy to close and recreate them because that would mean that all accesses to them would need to be guarded with cc.mu. This is not possible because the picker wrapper needs to be accessed at pick time, while the balancer wrapper needs to be accessed whenever there is a subConn state change and when cc.Connect is called.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the picker wrapper would be safe to access, since you would do so only after going through the idleness manager, which should ensure that the picker wrapper exists before returning. And a mutex around balancer accesses doesn't seem too troublesome. It would be nice to handle these two subcomponents the same way, but I'm fine with this approach if it's working and gets us idleness working.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it would be nicer to follow the same approach with the other two wrappers. But given that I'm constrained for time at this point, I have left a TODO for the same.

}
ccb.mu.Unlock()

// We get here only if the above call to Schedule succeeds, in which case it
// is guaranteed that the scheduled function will run. Therefore it is safe
// to block on this channel.
return <-errCh
}

// updateSubConnState is invoked by grpc to push a subConn state update to the
Expand All @@ -120,21 +146,19 @@ func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connecti
if sc == nil {
return
}
ccb.mu.Lock()
ccb.serializer.Schedule(func(_ context.Context) {
ccb.balancer.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: s, ConnectionError: err})
})
}

func (ccb *ccBalancerWrapper) exitIdle() {
ccb.serializer.Schedule(func(_ context.Context) {
ccb.balancer.ExitIdle()
})
ccb.mu.Unlock()
}

func (ccb *ccBalancerWrapper) resolverError(err error) {
ccb.mu.Lock()
ccb.serializer.Schedule(func(_ context.Context) {
ccb.balancer.ResolverError(err)
})
ccb.mu.Unlock()
}

// switchTo is invoked by grpc to instruct the balancer wrapper to switch to the
Expand All @@ -148,42 +172,149 @@ func (ccb *ccBalancerWrapper) resolverError(err error) {
// the ccBalancerWrapper keeps track of the current LB policy name, and skips
// the graceful balancer switching process if the name does not change.
func (ccb *ccBalancerWrapper) switchTo(name string) {
ccb.mu.Lock()
ccb.serializer.Schedule(func(_ context.Context) {
// TODO: Other languages use case-sensitive balancer registries. We should
// switch as well. See: https://github.com/grpc/grpc-go/issues/5288.
if strings.EqualFold(ccb.curBalancerName, name) {
return
}
ccb.buildLoadBalancingPolicy(name)
})
ccb.mu.Unlock()
}

// Use the default LB policy, pick_first, if no LB policy with name is
// found in the registry.
builder := balancer.Get(name)
if builder == nil {
channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name)
builder = newPickfirstBuilder()
} else {
channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name)
}
// buildLoadBalancingPolicy performs the following:
// - retrieve a balancer builder for the given name. Use the default LB
// policy, pick_first, if no LB policy with name is found in the registry.
// - instruct the gracefulswitch balancer to switch to the above builder. This
// will actually build the new balancer.
// - update the `curBalancerName` field
//
// Must be called from a serializer callback.
func (ccb *ccBalancerWrapper) buildLoadBalancingPolicy(name string) {
builder := balancer.Get(name)
if builder == nil {
channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name)
builder = newPickfirstBuilder()
} else {
channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name)
}

if err := ccb.balancer.SwitchTo(builder); err != nil {
channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err)
return
}
ccb.curBalancerName = builder.Name()
}

func (ccb *ccBalancerWrapper) close() {
channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: closing")
ccb.closeBalancer(ccbModeClosed)
}

// enterIdleMode is invoked by grpc when the channel enters idle mode upon
// expiry of idle_timeout. This call blocks until the balancer is closed.
func (ccb *ccBalancerWrapper) enterIdleMode() {
channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: entering idle mode")
ccb.closeBalancer(ccbModeIdle)
}

// closeBalancer is invoked when the channel is being closed or when it enters
// idle mode upon expiry of idle_timeout.
func (ccb *ccBalancerWrapper) closeBalancer(m ccbMode) {
ccb.mu.Lock()
if ccb.mode == ccbModeClosed || ccb.mode == ccbModeIdle {
ccb.mu.Unlock()
return
}

ccb.mode = m
done := ccb.serializer.Done
b := ccb.balancer
ok := ccb.serializer.Schedule(func(_ context.Context) {
// Close the serializer to ensure that no more calls from gRPC are sent
// to the balancer.
ccb.serializerCancel()
// Empty the current balancer name because we don't have a balancer
// anymore and also so that we act on the next call to switchTo by
// creating a new balancer specified by the new resolver.
ccb.curBalancerName = ""
})
if !ok {
ccb.mu.Unlock()
return
}
ccb.mu.Unlock()

// Give enqueued callbacks a chance to finish.
<-done
// Spawn a goroutine to close the balancer (since it may block trying to
// cleanup all allocated resources) and return early.
go b.Close()
}

// exitIdleMode is invoked by grpc when the channel exits idle mode either
// because of an RPC or because of an invocation of the Connect() API. This
// recreates the balancer that was closed previously when entering idle mode.
//
// If the channel is not in idle mode, we know for a fact that we are here as a
// result of the user calling the Connect() method on the ClientConn. In this
// case, we can simply forward the call to the underlying balancer, instructing
// it to reconnect to the backends.
func (ccb *ccBalancerWrapper) exitIdleMode() {
ccb.mu.Lock()
if ccb.mode == ccbModeClosed {
// Request to exit idle is a no-op when wrapper is already closed.
ccb.mu.Unlock()
return
}

if err := ccb.balancer.SwitchTo(builder); err != nil {
channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err)
if ccb.mode == ccbModeIdle {
// Recreate the serializer which was closed when we entered idle.
ctx, cancel := context.WithCancel(context.Background())
ccb.serializer = grpcsync.NewCallbackSerializer(ctx)
ccb.serializerCancel = cancel
}

// The ClientConn guarantees that mutual exclusion between close() and
// exitIdleMode(), and since we just created a new serializer, we can be
// sure that the below function will be scheduled.
done := make(chan struct{})
ccb.serializer.Schedule(func(_ context.Context) {
defer close(done)

ccb.mu.Lock()
defer ccb.mu.Unlock()

if ccb.mode != ccbModeIdle {
ccb.balancer.ExitIdle()
return
}
ccb.curBalancerName = builder.Name()

// Gracefulswitch balancer does not support a switchTo operation after
// being closed. Hence we need to create a new one here.
ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts)
ccb.mode = ccbModeActive
channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: exiting idle mode")

})
ccb.mu.Unlock()

<-done
}

func (ccb *ccBalancerWrapper) close() {
// Close the serializer to ensure that no more calls from gRPC are sent to
// the balancer. We don't have to worry about suppressing calls from a
// closed balancer because these are handled by the ClientConn (balancer
// wrapper is only ever closed when the ClientConn is closed).
ccb.serializerCancel()
<-ccb.serializer.Done
ccb.balancer.Close()
func (ccb *ccBalancerWrapper) isIdleOrClosed() bool {
ccb.mu.Lock()
defer ccb.mu.Unlock()
return ccb.mode == ccbModeIdle || ccb.mode == ccbModeClosed
}

func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
if ccb.isIdleOrClosed() {
return nil, fmt.Errorf("grpc: cannot create SubConn when balancer is closed or idle")
}

if len(addrs) <= 0 {
return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
}
Expand All @@ -200,6 +331,18 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
}

func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
if ccb.isIdleOrClosed() {
// It it safe to ignore this call when the balancer is closed or in idle
// because the ClientConn takes care of closing the connections.
//
// Not returning early from here when the balancer is closed or in idle
// leads to a deadlock though, because of the following sequence of
// calls when holding cc.mu:
// cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close -->
// ccb.RemoveAddrConn --> cc.removeAddrConn
return
}

acbw, ok := sc.(*acBalancerWrapper)
if !ok {
return
Expand All @@ -208,6 +351,10 @@ func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
}

func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
if ccb.isIdleOrClosed() {
return
}

acbw, ok := sc.(*acBalancerWrapper)
if !ok {
return
Expand All @@ -216,6 +363,10 @@ func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resol
}

func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
if ccb.isIdleOrClosed() {
return
}

// Update picker before updating state. Even though the ordering here does
// not matter, it can lead to multiple calls of Pick in the common start-up
// case where we wait for ready and then perform an RPC. If the picker is
Expand All @@ -226,6 +377,10 @@ func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
}

func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) {
if ccb.isIdleOrClosed() {
return
}

ccb.cc.resolveNow(o)
}

Expand Down
12 changes: 10 additions & 2 deletions call.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,22 @@ import (
//
// All errors returned by Invoke are compatible with the status package.
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
if err := cc.idlenessMgr.onCallBegin(); err != nil {
return err
}

// allow interceptor to see all applicable call options, which means those
// configured as defaults from dial option as well as per-call options
opts = combine(cc.dopts.callOptions, opts)

var err error
if cc.dopts.unaryInt != nil {
return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
err = cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
} else {
err = invoke(ctx, method, args, reply, cc, opts...)
}
return invoke(ctx, method, args, reply, cc, opts...)
cc.idlenessMgr.onCallEnd()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: defer instead and leave the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return err
}

func combine(o1 []CallOption, o2 []CallOption) []CallOption {
Expand Down
Loading