diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index 1865a3f09c2b..4f9944697dde 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -32,6 +32,15 @@ import ( "google.golang.org/grpc/resolver" ) +type ccbMode int + +const ( + ccbModeActive = iota + ccbModeIdle + ccbModeClosed + ccbModeExitingIdle +) + // ccBalancerWrapper sits between the ClientConn and the Balancer. // // ccBalancerWrapper implements methods corresponding to the ones on the @@ -46,16 +55,25 @@ import ( // It uses the gracefulswitch.Balancer internally to ensure that balancer // switches happen in a graceful manner. type ccBalancerWrapper struct { - cc *ClientConn + // The following fields are initialized when the wrapper is created and are + // read-only afterwards, and therefore can be accessed without a mutex. + cc *ClientConn + opts balancer.BuildOptions // Outgoing (gRPC --> balancer) calls are guaranteed to execute in a - // mutually exclusive manner as they are scheduled on the - // CallbackSerializer. Fields accessed *only* in serializer callbacks, can - // therefore be accessed without a mutex. - serializer *grpcsync.CallbackSerializer - serializerCancel context.CancelFunc - balancer *gracefulswitch.Balancer - curBalancerName string + // mutually exclusive manner as they are scheduled in the serializer. Fields + // accessed *only* in these serializer callbacks, can therefore be accessed + // without a mutex. + balancer *gracefulswitch.Balancer + curBalancerName string + + // mu guards access to the below fields. Access to the serializer and its + // cancel function needs to be mutex protected because they are overwritten + // when the wrapper exits idle mode. + mu sync.Mutex + serializer *grpcsync.CallbackSerializer // To serialize all outoing calls. + serializerCancel context.CancelFunc // To close the seralizer at close/enterIdle time. + mode ccbMode // Tracks the current mode of the wrapper. } // newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer @@ -64,6 +82,7 @@ func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalanc ctx, cancel := context.WithCancel(context.Background()) ccb := &ccBalancerWrapper{ cc: cc, + opts: bopts, serializer: grpcsync.NewCallbackSerializer(ctx), serializerCancel: cancel, } @@ -74,8 +93,12 @@ func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalanc // updateClientConnState is invoked by grpc to push a ClientConnState update to // the underlying balancer. func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error { + ccb.mu.Lock() errCh := make(chan error, 1) - ccb.serializer.Schedule(func(_ context.Context) { + // Here and everywhere else where Schedule() is called, it is done with the + // lock held. But the lock guards only the scheduling part. The actual + // callback is called asynchronously without the lock being held. + ok := ccb.serializer.Schedule(func(_ context.Context) { // If the addresses specified in the update contain addresses of type // "grpclb" and the selected LB policy is not "grpclb", these addresses // will be filtered out and ccs will be modified with the updated @@ -92,16 +115,19 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat } errCh <- ccb.balancer.UpdateClientConnState(*ccs) }) - - // If the balancer wrapper is closed when waiting for this state update to - // be handled, the callback serializer will be closed as well, and we can - // rely on its Done channel to ensure that we don't block here forever. - select { - case err := <-errCh: - return err - case <-ccb.serializer.Done: - return nil + if !ok { + // If we are unable to schedule a function with the serializer, it + // indicates that it has been closed. A serializer is only closed when + // the wrapper is closed or is in idle. + ccb.mu.Unlock() + return fmt.Errorf("grpc: cannot send state update to a closed or idle balancer") } + ccb.mu.Unlock() + + // We get here only if the above call to Schedule succeeds, in which case it + // is guaranteed that the scheduled function will run. Therefore it is safe + // to block on this channel. + return <-errCh } // updateSubConnState is invoked by grpc to push a subConn state update to the @@ -120,21 +146,19 @@ func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connecti if sc == nil { return } + ccb.mu.Lock() ccb.serializer.Schedule(func(_ context.Context) { ccb.balancer.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: s, ConnectionError: err}) }) -} - -func (ccb *ccBalancerWrapper) exitIdle() { - ccb.serializer.Schedule(func(_ context.Context) { - ccb.balancer.ExitIdle() - }) + ccb.mu.Unlock() } func (ccb *ccBalancerWrapper) resolverError(err error) { + ccb.mu.Lock() ccb.serializer.Schedule(func(_ context.Context) { ccb.balancer.ResolverError(err) }) + ccb.mu.Unlock() } // switchTo is invoked by grpc to instruct the balancer wrapper to switch to the @@ -148,42 +172,149 @@ func (ccb *ccBalancerWrapper) resolverError(err error) { // the ccBalancerWrapper keeps track of the current LB policy name, and skips // the graceful balancer switching process if the name does not change. func (ccb *ccBalancerWrapper) switchTo(name string) { + ccb.mu.Lock() ccb.serializer.Schedule(func(_ context.Context) { // TODO: Other languages use case-sensitive balancer registries. We should // switch as well. See: https://github.com/grpc/grpc-go/issues/5288. if strings.EqualFold(ccb.curBalancerName, name) { return } + ccb.buildLoadBalancingPolicy(name) + }) + ccb.mu.Unlock() +} - // Use the default LB policy, pick_first, if no LB policy with name is - // found in the registry. - builder := balancer.Get(name) - if builder == nil { - channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name) - builder = newPickfirstBuilder() - } else { - channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name) - } +// buildLoadBalancingPolicy performs the following: +// - retrieve a balancer builder for the given name. Use the default LB +// policy, pick_first, if no LB policy with name is found in the registry. +// - instruct the gracefulswitch balancer to switch to the above builder. This +// will actually build the new balancer. +// - update the `curBalancerName` field +// +// Must be called from a serializer callback. +func (ccb *ccBalancerWrapper) buildLoadBalancingPolicy(name string) { + builder := balancer.Get(name) + if builder == nil { + channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name) + builder = newPickfirstBuilder() + } else { + channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name) + } + + if err := ccb.balancer.SwitchTo(builder); err != nil { + channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err) + return + } + ccb.curBalancerName = builder.Name() +} + +func (ccb *ccBalancerWrapper) close() { + channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: closing") + ccb.closeBalancer(ccbModeClosed) +} + +// enterIdleMode is invoked by grpc when the channel enters idle mode upon +// expiry of idle_timeout. This call blocks until the balancer is closed. +func (ccb *ccBalancerWrapper) enterIdleMode() { + channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: entering idle mode") + ccb.closeBalancer(ccbModeIdle) +} + +// closeBalancer is invoked when the channel is being closed or when it enters +// idle mode upon expiry of idle_timeout. +func (ccb *ccBalancerWrapper) closeBalancer(m ccbMode) { + ccb.mu.Lock() + if ccb.mode == ccbModeClosed || ccb.mode == ccbModeIdle { + ccb.mu.Unlock() + return + } + + ccb.mode = m + done := ccb.serializer.Done + b := ccb.balancer + ok := ccb.serializer.Schedule(func(_ context.Context) { + // Close the serializer to ensure that no more calls from gRPC are sent + // to the balancer. + ccb.serializerCancel() + // Empty the current balancer name because we don't have a balancer + // anymore and also so that we act on the next call to switchTo by + // creating a new balancer specified by the new resolver. + ccb.curBalancerName = "" + }) + if !ok { + ccb.mu.Unlock() + return + } + ccb.mu.Unlock() + + // Give enqueued callbacks a chance to finish. + <-done + // Spawn a goroutine to close the balancer (since it may block trying to + // cleanup all allocated resources) and return early. + go b.Close() +} + +// exitIdleMode is invoked by grpc when the channel exits idle mode either +// because of an RPC or because of an invocation of the Connect() API. This +// recreates the balancer that was closed previously when entering idle mode. +// +// If the channel is not in idle mode, we know for a fact that we are here as a +// result of the user calling the Connect() method on the ClientConn. In this +// case, we can simply forward the call to the underlying balancer, instructing +// it to reconnect to the backends. +func (ccb *ccBalancerWrapper) exitIdleMode() { + ccb.mu.Lock() + if ccb.mode == ccbModeClosed { + // Request to exit idle is a no-op when wrapper is already closed. + ccb.mu.Unlock() + return + } - if err := ccb.balancer.SwitchTo(builder); err != nil { - channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err) + if ccb.mode == ccbModeIdle { + // Recreate the serializer which was closed when we entered idle. + ctx, cancel := context.WithCancel(context.Background()) + ccb.serializer = grpcsync.NewCallbackSerializer(ctx) + ccb.serializerCancel = cancel + } + + // The ClientConn guarantees that mutual exclusion between close() and + // exitIdleMode(), and since we just created a new serializer, we can be + // sure that the below function will be scheduled. + done := make(chan struct{}) + ccb.serializer.Schedule(func(_ context.Context) { + defer close(done) + + ccb.mu.Lock() + defer ccb.mu.Unlock() + + if ccb.mode != ccbModeIdle { + ccb.balancer.ExitIdle() return } - ccb.curBalancerName = builder.Name() + + // Gracefulswitch balancer does not support a switchTo operation after + // being closed. Hence we need to create a new one here. + ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts) + ccb.mode = ccbModeActive + channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: exiting idle mode") + }) + ccb.mu.Unlock() + + <-done } -func (ccb *ccBalancerWrapper) close() { - // Close the serializer to ensure that no more calls from gRPC are sent to - // the balancer. We don't have to worry about suppressing calls from a - // closed balancer because these are handled by the ClientConn (balancer - // wrapper is only ever closed when the ClientConn is closed). - ccb.serializerCancel() - <-ccb.serializer.Done - ccb.balancer.Close() +func (ccb *ccBalancerWrapper) isIdleOrClosed() bool { + ccb.mu.Lock() + defer ccb.mu.Unlock() + return ccb.mode == ccbModeIdle || ccb.mode == ccbModeClosed } func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { + if ccb.isIdleOrClosed() { + return nil, fmt.Errorf("grpc: cannot create SubConn when balancer is closed or idle") + } + if len(addrs) <= 0 { return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list") } @@ -200,6 +331,18 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer } func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { + if ccb.isIdleOrClosed() { + // It it safe to ignore this call when the balancer is closed or in idle + // because the ClientConn takes care of closing the connections. + // + // Not returning early from here when the balancer is closed or in idle + // leads to a deadlock though, because of the following sequence of + // calls when holding cc.mu: + // cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close --> + // ccb.RemoveAddrConn --> cc.removeAddrConn + return + } + acbw, ok := sc.(*acBalancerWrapper) if !ok { return @@ -208,6 +351,10 @@ func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { } func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { + if ccb.isIdleOrClosed() { + return + } + acbw, ok := sc.(*acBalancerWrapper) if !ok { return @@ -216,6 +363,10 @@ func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resol } func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { + if ccb.isIdleOrClosed() { + return + } + // Update picker before updating state. Even though the ordering here does // not matter, it can lead to multiple calls of Pick in the common start-up // case where we wait for ready and then perform an RPC. If the picker is @@ -226,6 +377,10 @@ func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { } func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) { + if ccb.isIdleOrClosed() { + return + } + ccb.cc.resolveNow(o) } diff --git a/call.go b/call.go index 9e20e4d385f9..e6a1dc5d75ed 100644 --- a/call.go +++ b/call.go @@ -27,6 +27,11 @@ import ( // // All errors returned by Invoke are compatible with the status package. func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error { + if err := cc.idlenessMgr.onCallBegin(); err != nil { + return err + } + defer cc.idlenessMgr.onCallEnd() + // allow interceptor to see all applicable call options, which means those // configured as defaults from dial option as well as per-call options opts = combine(cc.dopts.callOptions, opts) diff --git a/clientconn.go b/clientconn.go index 50d08a49a205..1def61e5a23d 100644 --- a/clientconn.go +++ b/clientconn.go @@ -69,6 +69,9 @@ var ( errConnDrain = errors.New("grpc: the connection is drained") // errConnClosing indicates that the connection is closing. errConnClosing = errors.New("grpc: the connection is closing") + // errConnIdling indicates the the connection is being closed as the channel + // is moving to an idle mode due to inactivity. + errConnIdling = errors.New("grpc: the connection is closing due to channel idleness") // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default // service config. invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid" @@ -134,17 +137,29 @@ func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*ires // e.g. to use dns resolver, a "dns:///" prefix should be applied to the target. func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { cc := &ClientConn{ - target: target, - csMgr: &connectivityStateManager{}, - conns: make(map[*addrConn]struct{}), - dopts: defaultDialOptions(), - blockingpicker: newPickerWrapper(), - czData: new(channelzData), - firstResolveEvent: grpcsync.NewEvent(), - } + target: target, + csMgr: &connectivityStateManager{}, + conns: make(map[*addrConn]struct{}), + dopts: defaultDialOptions(), + czData: new(channelzData), + } + + // We start the channel off in idle mode, but kick it out of idle at the end + // of this method, instead of waiting for the first RPC. Other gRPC + // implementations do wait for the first RPC to kick the channel out of + // idle. But doing so would be a major behavior change for our users who are + // used to seeing the channel active after Dial. + // + // Taking this approach of kicking it out of idle at the end of this method + // allows us to share the code between channel creation and exiting idle + // mode. This will also make it easy for us to switch to starting the + // channel off in idle, if at all we ever get to do that. + cc.idlenessState = ccIdlenessStateIdle + cc.retryThrottler.Store((*retryThrottler)(nil)) cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil}) cc.ctx, cc.cancel = context.WithCancel(context.Background()) + cc.exitIdleCond = sync.NewCond(&cc.mu) disableGlobalOpts := false for _, opt := range opts { @@ -243,67 +258,175 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * go cc.scWatcher() } - var credsClone credentials.TransportCredentials - if creds := cc.dopts.copts.TransportCredentials; creds != nil { - credsClone = creds.Clone() + // This creates the name resolver, load balancer, blocking picker etc. + if err := cc.exitIdleMode(); err != nil { + return nil, err } - cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{ - DialCreds: credsClone, - CredsBundle: cc.dopts.copts.CredsBundle, - Dialer: cc.dopts.copts.Dialer, - Authority: cc.authority, - CustomUserAgent: cc.dopts.copts.UserAgent, - ChannelzParentID: cc.channelzID, - Target: cc.parsedTarget, - }) - // Build the resolver. - rWrapper, err := newCCResolverWrapper(cc, ccResolverWrapperOpts{ - target: cc.parsedTarget, - builder: cc.resolverBuilder, - bOpts: resolver.BuildOptions{ - DisableServiceConfig: cc.dopts.disableServiceConfig, - DialCreds: credsClone, - CredsBundle: cc.dopts.copts.CredsBundle, - Dialer: cc.dopts.copts.Dialer, - }, - channelzID: cc.channelzID, - }) - if err != nil { - return nil, fmt.Errorf("failed to build resolver: %v", err) + // Configure idleness support with configured idle timeout or default idle + // timeout duration. Idleness can be explicitly disabled by the user, by + // setting the dial option to 0. + cc.idlenessMgr = newIdlenessManager(cc, cc.dopts.idleTimeout) + + // Return early for non-blocking dials. + if !cc.dopts.block { + return cc, nil } - cc.mu.Lock() - cc.resolverWrapper = rWrapper - cc.mu.Unlock() // A blocking dial blocks until the clientConn is ready. - if cc.dopts.block { - for { + for { + s := cc.GetState() + if s == connectivity.Idle { cc.Connect() - s := cc.GetState() - if s == connectivity.Ready { - break - } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure { - if err = cc.connectionError(); err != nil { - terr, ok := err.(interface { - Temporary() bool - }) - if ok && !terr.Temporary() { - return nil, err - } - } - } - if !cc.WaitForStateChange(ctx, s) { - // ctx got timeout or canceled. - if err = cc.connectionError(); err != nil && cc.dopts.returnLastError { + } + if s == connectivity.Ready { + return cc, nil + } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure { + if err = cc.connectionError(); err != nil { + terr, ok := err.(interface { + Temporary() bool + }) + if ok && !terr.Temporary() { return nil, err } - return nil, ctx.Err() } } + if !cc.WaitForStateChange(ctx, s) { + // ctx got timeout or canceled. + if err = cc.connectionError(); err != nil && cc.dopts.returnLastError { + return nil, err + } + return nil, ctx.Err() + } } +} - return cc, nil +// addTraceEvent is a helper method to add a trace event on the channel. If the +// channel is a nested one, the same event is also added on the parent channel. +func (cc *ClientConn) addTraceEvent(msg string) { + ted := &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("Channel %s", msg), + Severity: channelz.CtInfo, + } + if cc.dopts.channelzParentID != nil { + ted.Parent = &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("Nested channel(id:%d) %s", cc.channelzID.Int(), msg), + Severity: channelz.CtInfo, + } + } + channelz.AddTraceEvent(logger, cc.channelzID, 0, ted) +} + +// exitIdleMode moves the channel out of idle mode by recreating the name +// resolver and load balancer. +func (cc *ClientConn) exitIdleMode() error { + cc.mu.Lock() + if cc.conns == nil { + cc.mu.Unlock() + return errConnClosing + } + if cc.idlenessState != ccIdlenessStateIdle { + logger.Error("ClientConn asked to exit idle mode when not in idle mode") + return nil + } + + defer func() { + // When Close() and exitIdleMode() race against each other, one of the + // following two can happen: + // - Close() wins the race and runs first. exitIdleMode() runs after, and + // sees that the ClientConn is already closed and hence returns early. + // - exitIdleMode() wins the race and runs first and recreates the balancer + // and releases the lock before recreating the resolver. If Close() runs + // in this window, it will wait for exitIdleMode to complete. + // + // We achieve this synchronization using the below condition variable. + cc.mu.Lock() + cc.idlenessState = ccIdlenessStateActive + cc.exitIdleCond.Signal() + cc.mu.Unlock() + }() + + cc.idlenessState = ccIdlenessStateExitingIdle + exitedIdle := false + if cc.blockingpicker == nil { + cc.blockingpicker = newPickerWrapper() + } else { + cc.blockingpicker.exitIdleMode() + exitedIdle = true + } + + var credsClone credentials.TransportCredentials + if creds := cc.dopts.copts.TransportCredentials; creds != nil { + credsClone = creds.Clone() + } + if cc.balancerWrapper == nil { + cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{ + DialCreds: credsClone, + CredsBundle: cc.dopts.copts.CredsBundle, + Dialer: cc.dopts.copts.Dialer, + Authority: cc.authority, + CustomUserAgent: cc.dopts.copts.UserAgent, + ChannelzParentID: cc.channelzID, + Target: cc.parsedTarget, + }) + } else { + cc.balancerWrapper.exitIdleMode() + } + cc.firstResolveEvent = grpcsync.NewEvent() + cc.mu.Unlock() + + // This needs to be called without cc.mu because this builds a new resolver + // which might update state or report error inline which needs to be handled + // by cc.updateResolverState() which also grabs cc.mu. + if err := cc.initResolverWrapper(credsClone); err != nil { + return err + } + + if exitedIdle { + cc.addTraceEvent("exiting idle mode") + } + return nil +} + +// enterIdleMode puts the channel in idle mode, and as part of it shuts down the +// name resolver, load balancer and any subchannels. +func (cc *ClientConn) enterIdleMode() error { + cc.mu.Lock() + if cc.conns == nil { + cc.mu.Unlock() + return ErrClientConnClosing + } + if cc.idlenessState != ccIdlenessStateActive { + logger.Error("ClientConn asked to enter idle mode when not active") + return nil + } + + // cc.conns == nil is a proxy for the ClientConn being closed. So, instead + // of setting it to nil here, we recreate the map. This also means that we + // don't have to do this when exiting idle mode. + conns := cc.conns + cc.conns = make(map[*addrConn]struct{}) + + // TODO: Currently, we close the resolver wrapper upon entering idle mode + // and create a new one upon exiting idle mode. This means that the + // `cc.resolverWrapper` field would be overwritten everytime we exit idle + // mode. While this means that we need to hold `cc.mu` when accessing + // `cc.resolverWrapper`, it makes the code simpler in the wrapper. We should + // try to do the same for the balancer and picker wrappers too. + cc.resolverWrapper.close() + cc.blockingpicker.enterIdleMode() + cc.balancerWrapper.enterIdleMode() + cc.csMgr.updateState(connectivity.Idle) + cc.idlenessState = ccIdlenessStateIdle + cc.mu.Unlock() + + go func() { + cc.addTraceEvent("entering idle mode") + for ac := range conns { + ac.tearDown(errConnIdling) + } + }() + return nil } // validateTransportCredentials performs a series of checks on the configured @@ -350,17 +473,7 @@ func (cc *ClientConn) validateTransportCredentials() error { // Doesn't grab cc.mu as this method is expected to be called only at Dial time. func (cc *ClientConn) channelzRegistration(target string) { cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target) - ted := &channelz.TraceEventDesc{ - Desc: "Channel created", - Severity: channelz.CtInfo, - } - if cc.dopts.channelzParentID != nil { - ted.Parent = &channelz.TraceEventDesc{ - Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID.Int()), - Severity: channelz.CtInfo, - } - } - channelz.AddTraceEvent(logger, cc.channelzID, 1, ted) + cc.addTraceEvent("created") cc.csMgr.channelzID = cc.channelzID } @@ -509,6 +622,7 @@ type ClientConn struct { channelzID *channelz.Identifier // Channelz identifier for the channel. resolverBuilder resolver.Builder // See parseTargetAndFindResolver(). balancerWrapper *ccBalancerWrapper // Uses gracefulswitch.balancer underneath. + idlenessMgr idlenessManager // The following provide their own synchronization, and therefore don't // require cc.mu to be held to access them. @@ -529,11 +643,31 @@ type ClientConn struct { sc *ServiceConfig // Latest service config received from the resolver. conns map[*addrConn]struct{} // Set to nil on close. mkp keepalive.ClientParameters // May be updated upon receipt of a GoAway. + idlenessState ccIdlenessState // Tracks idleness state of the channel. + exitIdleCond *sync.Cond // Signalled when channel exits idle. lceMu sync.Mutex // protects lastConnectionError lastConnectionError error } +// ccIdlenessState tracks the idleness state of the channel. +// +// Channels start off in `active` and move to `idle` after a period of +// inactivity. When moving back to `active` upon an incoming RPC, they +// transition through `exiting_idle`. This state is useful for synchronization +// with Close(). +// +// This state tracking is mostly for self-protection. The idlenessManager is +// expected to keep track of the state as well, and is expected not to call into +// the ClientConn unnecessarily. +type ccIdlenessState int8 + +const ( + ccIdlenessStateActive ccIdlenessState = iota + ccIdlenessStateIdle + ccIdlenessStateExitingIdle +) + // WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or // ctx expires. A true value is returned in former case and false in latter. // @@ -573,7 +707,7 @@ func (cc *ClientConn) GetState() connectivity.State { // Notice: This API is EXPERIMENTAL and may be changed or removed in a later // release. func (cc *ClientConn) Connect() { - cc.balancerWrapper.exitIdle() + cc.balancerWrapper.exitIdleMode() } func (cc *ClientConn) scWatcher() { @@ -1061,39 +1195,40 @@ func (cc *ClientConn) Close() error { cc.mu.Unlock() return ErrClientConnClosing } + + for cc.idlenessState == ccIdlenessStateExitingIdle { + cc.exitIdleCond.Wait() + } + conns := cc.conns cc.conns = nil cc.csMgr.updateState(connectivity.Shutdown) + pWrapper := cc.blockingpicker rWrapper := cc.resolverWrapper - cc.resolverWrapper = nil bWrapper := cc.balancerWrapper + idlenessMgr := cc.idlenessMgr cc.mu.Unlock() // The order of closing matters here since the balancer wrapper assumes the // picker is closed before it is closed. - cc.blockingpicker.close() + if pWrapper != nil { + pWrapper.close() + } if bWrapper != nil { bWrapper.close() } if rWrapper != nil { rWrapper.close() } + if idlenessMgr != nil { + idlenessMgr.close() + } for ac := range conns { ac.tearDown(ErrClientConnClosing) } - ted := &channelz.TraceEventDesc{ - Desc: "Channel deleted", - Severity: channelz.CtInfo, - } - if cc.dopts.channelzParentID != nil { - ted.Parent = &channelz.TraceEventDesc{ - Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID.Int()), - Severity: channelz.CtInfo, - } - } - channelz.AddTraceEvent(logger, cc.channelzID, 0, ted) + cc.addTraceEvent("deleted") // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add // trace reference to the entity being deleted, and thus prevent it from being // deleted right away. @@ -1735,3 +1870,32 @@ func (cc *ClientConn) determineAuthority() error { channelz.Infof(logger, cc.channelzID, "Channel authority set to %q", cc.authority) return nil } + +// initResolverWrapper creates a ccResolverWrapper, which builds the name +// resolver. This method grabs the lock to assign the newly built resolver +// wrapper to the cc.resolverWrapper field. +func (cc *ClientConn) initResolverWrapper(creds credentials.TransportCredentials) error { + rw, err := newCCResolverWrapper(cc, ccResolverWrapperOpts{ + target: cc.parsedTarget, + builder: cc.resolverBuilder, + bOpts: resolver.BuildOptions{ + DisableServiceConfig: cc.dopts.disableServiceConfig, + DialCreds: creds, + CredsBundle: cc.dopts.copts.CredsBundle, + Dialer: cc.dopts.copts.Dialer, + }, + channelzID: cc.channelzID, + }) + if err != nil { + return fmt.Errorf("failed to build resolver: %v", err) + } + // Resolver implementations may report state update or error inline when + // built (or right after), and this is handled in cc.updateResolverState. + // Also, an error from the resolver might lead to a re-resolution request + // from the balancer, which is handled in resolveNow() where + // `cc.resolverWrapper` is accessed. Hence, we need to hold the lock here. + cc.mu.Lock() + cc.resolverWrapper = rw + cc.mu.Unlock() + return nil +} diff --git a/clientconn_test.go b/clientconn_test.go index 9004f3177fdd..3cd04a743444 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -370,7 +370,7 @@ func (s) TestBackoffWhenNoServerPrefaceReceived(t *testing.T) { }() bc := backoff.Config{ BaseDelay: 200 * time.Millisecond, - Multiplier: 1.1, + Multiplier: 2.0, Jitter: 0, MaxDelay: 120 * time.Second, } diff --git a/dialoptions.go b/dialoptions.go index cdc8263bda65..51c8997d5d18 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -77,6 +77,7 @@ type dialOptions struct { defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON. defaultServiceConfigRawJSON *string resolvers []resolver.Builder + idleTimeout time.Duration } // DialOption configures how we set up the connection. @@ -627,6 +628,7 @@ func defaultDialOptions() dialOptions { ReadBufferSize: defaultReadBufSize, UseProxy: true, }, + idleTimeout: 30 * time.Minute, } } @@ -655,3 +657,23 @@ func WithResolvers(rs ...resolver.Builder) DialOption { o.resolvers = append(o.resolvers, rs...) }) } + +// WithIdleTimeout returns a DialOption that configures an idle timeout for the +// channel. If the channel is idle for the configured timeout, i.e there are no +// ongoing RPCs and no new RPCs are initiated, the channel will enter idle mode +// and as a result the name resolver and load balancer will be shut down. The +// channel will exit idle mode when the Connect() method is called or when an +// RPC is initiated. +// +// A default timeout of 30 min will be used if this dial option is not set at +// dial time and idleness can be disabled by passing a timeout of zero. +// +// # Experimental +// +// Notice: This API is EXPERIMENTAL and may be changed or removed in a +// later release. +func WithIdleTimeout(d time.Duration) DialOption { + return newFuncDialOption(func(o *dialOptions) { + o.idleTimeout = d + }) +} diff --git a/idle.go b/idle.go new file mode 100644 index 000000000000..dc3dc72f6b09 --- /dev/null +++ b/idle.go @@ -0,0 +1,287 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc + +import ( + "fmt" + "math" + "sync" + "sync/atomic" + "time" +) + +// For overriding in unit tests. +var timeAfterFunc = func(d time.Duration, f func()) *time.Timer { + return time.AfterFunc(d, f) +} + +// idlenessEnforcer is the functionality provided by grpc.ClientConn to enter +// and exit from idle mode. +type idlenessEnforcer interface { + exitIdleMode() error + enterIdleMode() error +} + +// idlenessManager defines the functionality required to track RPC activity on a +// channel. +type idlenessManager interface { + onCallBegin() error + onCallEnd() + close() +} + +type noopIdlenessManager struct{} + +func (noopIdlenessManager) onCallBegin() error { return nil } +func (noopIdlenessManager) onCallEnd() {} +func (noopIdlenessManager) close() {} + +// idlenessManagerImpl implements the idlenessManager interface. It uses atomic +// operations to synchronize access to shared state and a mutex to guarantee +// mutual exclusion in a critical section. +type idlenessManagerImpl struct { + // State accessed atomically. + lastCallEndTime int64 // Unix timestamp in nanos; time when the most recent RPC completed. + activeCallsCount int32 // Count of active RPCs; -math.MaxInt32 means channel is idle or is trying to get there. + activeSinceLastTimerCheck int32 // Boolean; True if there was an RPC since the last timer callback. + closed int32 // Boolean; True when the manager is closed. + + // Can be accessed without atomics or mutex since these are set at creation + // time and read-only after that. + enforcer idlenessEnforcer // Functionality provided by grpc.ClientConn. + timeout int64 // Idle timeout duration nanos stored as an int64. + + // idleMu is used to guarantee mutual exclusion in two scenarios: + // - Opposing intentions: + // - a: Idle timeout has fired and handleIdleTimeout() is trying to put + // the channel in idle mode because the channel has been inactive. + // - b: At the same time an RPC is made on the channel, and onCallBegin() + // is trying to prevent the channel from going idle. + // - Competing intentions: + // - The channel is in idle mode and there are multiple RPCs starting at + // the same time, all trying to move the channel out of idle. Only one + // of them should succeed in doing so, while the other RPCs should + // piggyback on the first one and be successfully handled. + idleMu sync.RWMutex + actuallyIdle bool + timer *time.Timer +} + +// newIdlenessManager creates a new idleness manager implementation for the +// given idle timeout. +func newIdlenessManager(enforcer idlenessEnforcer, idleTimeout time.Duration) idlenessManager { + if idleTimeout == 0 { + return noopIdlenessManager{} + } + + i := &idlenessManagerImpl{ + enforcer: enforcer, + timeout: int64(idleTimeout), + } + i.timer = timeAfterFunc(idleTimeout, i.handleIdleTimeout) + return i +} + +// resetIdleTimer resets the idle timer to the given duration. This method +// should only be called from the timer callback. +func (i *idlenessManagerImpl) resetIdleTimer(d time.Duration) { + i.idleMu.Lock() + defer i.idleMu.Unlock() + + if i.timer == nil { + // Only close sets timer to nil. We are done. + return + } + + // It is safe to ignore the return value from Reset() because this method is + // only ever called from the timer callback, which means the timer has + // already fired. + i.timer.Reset(d) +} + +// handleIdleTimeout is the timer callback that is invoked upon expiry of the +// configured idle timeout. The channel is considered inactive if there are no +// ongoing calls and no RPC activity since the last time the timer fired. +func (i *idlenessManagerImpl) handleIdleTimeout() { + if i.isClosed() { + return + } + + if atomic.LoadInt32(&i.activeCallsCount) > 0 { + i.resetIdleTimer(time.Duration(i.timeout)) + return + } + + // There has been activity on the channel since we last got here. Reset the + // timer and return. + if atomic.LoadInt32(&i.activeSinceLastTimerCheck) == 1 { + // Set the timer to fire after a duration of idle timeout, calculated + // from the time the most recent RPC completed. + atomic.StoreInt32(&i.activeSinceLastTimerCheck, 0) + i.resetIdleTimer(time.Duration(atomic.LoadInt64(&i.lastCallEndTime) + i.timeout - time.Now().UnixNano())) + return + } + + // This CAS operation is extremely likely to succeed given that there has + // been no activity since the last time we were here. Setting the + // activeCallsCount to -math.MaxInt32 indicates to onCallBegin() that the + // channel is either in idle mode or is trying to get there. + if !atomic.CompareAndSwapInt32(&i.activeCallsCount, 0, -math.MaxInt32) { + // This CAS operation can fail if an RPC started after we checked for + // activity at the top of this method, or one was ongoing from before + // the last time we were here. In both case, reset the timer and return. + i.resetIdleTimer(time.Duration(i.timeout)) + return + } + + // Now that we've set the active calls count to -math.MaxInt32, it's time to + // actually move to idle mode. + if i.tryEnterIdleMode() { + // Successfully entered idle mode. No timer needed until we exit idle. + return + } + + // Failed to enter idle mode due to a concurrent RPC that kept the channel + // active, or because of an error from the channel. Undo the attempt to + // enter idle, and reset the timer to try again later. + atomic.AddInt32(&i.activeCallsCount, math.MaxInt32) + i.resetIdleTimer(time.Duration(i.timeout)) +} + +// tryEnterIdleMode instructs the channel to enter idle mode. But before +// that, it performs a last minute check to ensure that no new RPC has come in, +// making the channel active. +// +// Return value indicates whether or not the channel moved to idle mode. +// +// Holds idleMu which ensures mutual exclusion with exitIdleMode. +func (i *idlenessManagerImpl) tryEnterIdleMode() bool { + i.idleMu.Lock() + defer i.idleMu.Unlock() + + if atomic.LoadInt32(&i.activeCallsCount) != -math.MaxInt32 { + // We raced and lost to a new RPC. Very rare, but stop entering idle. + return false + } + if atomic.LoadInt32(&i.activeSinceLastTimerCheck) == 1 { + // An very short RPC could have come in (and also finished) after we + // checked for calls count and activity in handleIdleTimeout(), but + // before the CAS operation. So, we need to check for activity again. + return false + } + + // No new RPCs have come in since we last set the active calls count value + // -math.MaxInt32 in the timer callback. And since we have the lock, it is + // safe to enter idle mode now. + if err := i.enforcer.enterIdleMode(); err != nil { + logger.Errorf("Failed to enter idle mode: %v", err) + return false + } + + // Successfully entered idle mode. + i.actuallyIdle = true + return true +} + +// onCallBegin is invoked at the start of every RPC. +func (i *idlenessManagerImpl) onCallBegin() error { + if i.isClosed() { + return nil + } + + if atomic.AddInt32(&i.activeCallsCount, 1) > 0 { + // Channel is not idle now. Set the activity bit and allow the call. + atomic.StoreInt32(&i.activeSinceLastTimerCheck, 1) + return nil + } + + // Channel is either in idle mode or is in the process of moving to idle + // mode. Attempt to exit idle mode to allow this RPC. + if err := i.exitIdleMode(); err != nil { + // Undo the increment to calls count, and return an error causing the + // RPC to fail. + atomic.AddInt32(&i.activeCallsCount, -1) + return err + } + + atomic.StoreInt32(&i.activeSinceLastTimerCheck, 1) + return nil +} + +// exitIdleMode instructs the channel to exit idle mode. +// +// Holds idleMu which ensures mutual exclusion with tryEnterIdleMode. +func (i *idlenessManagerImpl) exitIdleMode() error { + i.idleMu.Lock() + defer i.idleMu.Unlock() + + if !i.actuallyIdle { + // This can happen in two scenarios: + // - handleIdleTimeout() set the calls count to -math.MaxInt32 and called + // tryEnterIdleMode(). But before the latter could grab the lock, an RPC + // came in and onCallBegin() noticed that the calls count is negative. + // - Channel is in idle mode, and multiple new RPCs come in at the same + // time, all of them notice a negative calls count in onCallBegin and get + // here. The first one to get the lock would got the channel to exit idle. + // + // Either way, nothing to do here. + return nil + } + + if err := i.enforcer.exitIdleMode(); err != nil { + return fmt.Errorf("channel failed to exit idle mode: %v", err) + } + + // Undo the idle entry process. This also respects any new RPC attempts. + atomic.AddInt32(&i.activeCallsCount, math.MaxInt32) + i.actuallyIdle = false + + // Start a new timer to fire after the configured idle timeout. + i.timer = timeAfterFunc(time.Duration(i.timeout), i.handleIdleTimeout) + return nil +} + +// onCallEnd is invoked at the end of every RPC. +func (i *idlenessManagerImpl) onCallEnd() { + if i.isClosed() { + return + } + + // Record the time at which the most recent call finished. + atomic.StoreInt64(&i.lastCallEndTime, time.Now().UnixNano()) + + // Decrement the active calls count. This count can temporarily go negative + // when the timer callback is in the process of moving the channel to idle + // mode, but one or more RPCs come in and complete before the timer callback + // can get done with the process of moving to idle mode. + atomic.AddInt32(&i.activeCallsCount, -1) +} + +func (i *idlenessManagerImpl) isClosed() bool { + return atomic.LoadInt32(&i.closed) == 1 +} + +func (i *idlenessManagerImpl) close() { + atomic.StoreInt32(&i.closed, 1) + + i.idleMu.Lock() + i.timer.Stop() + i.timer = nil + i.idleMu.Unlock() +} diff --git a/idle_test.go b/idle_test.go new file mode 100644 index 000000000000..a20b4e09947b --- /dev/null +++ b/idle_test.go @@ -0,0 +1,360 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" +) + +const ( + defaultTestIdleTimeout = 500 * time.Millisecond // A short idle_timeout for tests. + defaultTestShortTimeout = 10 * time.Millisecond // A small deadline to wait for events expected to not happen. +) + +type testIdlenessEnforcer struct { + exitIdleCh chan struct{} + enterIdleCh chan struct{} +} + +func (ti *testIdlenessEnforcer) exitIdleMode() error { + ti.exitIdleCh <- struct{}{} + return nil + +} + +func (ti *testIdlenessEnforcer) enterIdleMode() error { + ti.enterIdleCh <- struct{}{} + return nil + +} + +func newTestIdlenessEnforcer() *testIdlenessEnforcer { + return &testIdlenessEnforcer{ + exitIdleCh: make(chan struct{}, 1), + enterIdleCh: make(chan struct{}, 1), + } +} + +// overrideNewTimer overrides the new timer creation function by ensuring that a +// message is pushed on the returned channel everytime the timer fires. +func overrideNewTimer(t *testing.T) <-chan struct{} { + t.Helper() + + ch := make(chan struct{}, 1) + origTimeAfterFunc := timeAfterFunc + timeAfterFunc = func(d time.Duration, callback func()) *time.Timer { + return time.AfterFunc(d, func() { + select { + case ch <- struct{}{}: + default: + } + callback() + }) + } + t.Cleanup(func() { timeAfterFunc = origTimeAfterFunc }) + return ch +} + +// TestIdlenessManager_Disabled tests the case where the idleness manager is +// disabled by passing an idle_timeout of 0. Verifies the following things: +// - timer callback does not fire +// - an RPC does not trigger a call to exitIdleMode on the ClientConn +// - more calls to RPC termination (as compared to RPC initiation) does not +// result in an error log +func (s) TestIdlenessManager_Disabled(t *testing.T) { + callbackCh := overrideNewTimer(t) + + // Create an idleness manager that is disabled because of idleTimeout being + // set to `0`. + enforcer := newTestIdlenessEnforcer() + mgr := newIdlenessManager(enforcer, time.Duration(0)) + + // Ensure that the timer callback does not fire within a short deadline. + select { + case <-callbackCh: + t.Fatal("Idle timer callback fired when manager is disabled") + case <-time.After(defaultTestShortTimeout): + } + + // The first invocation of onCallBegin() would lead to a call to + // exitIdleMode() on the enforcer, unless the idleness manager is disabled. + mgr.onCallBegin() + select { + case <-enforcer.exitIdleCh: + t.Fatalf("exitIdleMode() called on enforcer when manager is disabled") + case <-time.After(defaultTestShortTimeout): + } + + // If the number of calls to onCallEnd() exceeds the number of calls to + // onCallBegin(), the idleness manager is expected to throw an error log + // (which will cause our TestLogger to fail the test). But since the manager + // is disabled, this should not happen. + mgr.onCallEnd() + mgr.onCallEnd() + + // The idleness manager is explicitly not closed here. But since the manager + // is disabled, it will not start the run goroutine, and hence we expect the + // leakchecker to not find any leaked goroutines. +} + +// TestIdlenessManager_Enabled_TimerFires tests the case where the idle manager +// is enabled. Ensures that when there are no RPCs, the timer callback is +// invoked and the enterIdleMode() method is invoked on the enforcer. +func (s) TestIdlenessManager_Enabled_TimerFires(t *testing.T) { + callbackCh := overrideNewTimer(t) + + enforcer := newTestIdlenessEnforcer() + mgr := newIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout)) + defer mgr.close() + + // Ensure that the timer callback fires within a appropriate amount of time. + select { + case <-callbackCh: + case <-time.After(2 * defaultTestIdleTimeout): + t.Fatal("Timeout waiting for idle timer callback to fire") + } + + // Ensure that the channel moves to idle mode eventually. + select { + case <-enforcer.enterIdleCh: + case <-time.After(defaultTestTimeout): + t.Fatal("Timeout waiting for channel to move to idle") + } +} + +// TestIdlenessManager_Enabled_OngoingCall tests the case where the idle manager +// is enabled. Ensures that when there is an ongoing RPC, the channel does not +// enter idle mode. +func (s) TestIdlenessManager_Enabled_OngoingCall(t *testing.T) { + callbackCh := overrideNewTimer(t) + + enforcer := newTestIdlenessEnforcer() + mgr := newIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout)) + defer mgr.close() + + // Fire up a goroutine that simulates an ongoing RPC that is terminated + // after the timer callback fires for the first time. + timerFired := make(chan struct{}) + go func() { + mgr.onCallBegin() + <-timerFired + mgr.onCallEnd() + }() + + // Ensure that the timer callback fires and unblock the above goroutine. + select { + case <-callbackCh: + close(timerFired) + case <-time.After(2 * defaultTestIdleTimeout): + t.Fatal("Timeout waiting for idle timer callback to fire") + } + + // The invocation of the timer callback should not put the channel in idle + // mode since we had an ongoing RPC. + select { + case <-enforcer.enterIdleCh: + t.Fatalf("enterIdleMode() called on enforcer when active RPC exists") + case <-time.After(defaultTestShortTimeout): + } + + // Since we terminated the ongoing RPC and we have no other active RPCs, the + // channel must move to idle eventually. + select { + case <-enforcer.enterIdleCh: + case <-time.After(defaultTestTimeout): + t.Fatal("Timeout waiting for channel to move to idle") + } +} + +// TestIdlenessManager_Enabled_ActiveSinceLastCheck tests the case where the +// idle manager is enabled. Ensures that when there are active RPCs in the last +// period (even though there is no active call when the timer fires), the +// channel does not enter idle mode. +func (s) TestIdlenessManager_Enabled_ActiveSinceLastCheck(t *testing.T) { + callbackCh := overrideNewTimer(t) + + enforcer := newTestIdlenessEnforcer() + mgr := newIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout)) + defer mgr.close() + + // Fire up a goroutine that simulates unary RPCs until the timer callback + // fires. + timerFired := make(chan struct{}) + go func() { + for ; ; <-time.After(defaultTestShortTimeout) { + mgr.onCallBegin() + mgr.onCallEnd() + + select { + case <-timerFired: + return + default: + } + } + }() + + // Ensure that the timer callback fires, and that we don't enter idle as + // part of this invocation of the timer callback, since we had some RPCs in + // this period. + select { + case <-callbackCh: + close(timerFired) + case <-time.After(2 * defaultTestIdleTimeout): + t.Fatal("Timeout waiting for idle timer callback to fire") + } + select { + case <-enforcer.enterIdleCh: + t.Fatalf("enterIdleMode() called on enforcer when one RPC completed in the last period") + case <-time.After(defaultTestShortTimeout): + } + + // Since the unrary RPC terminated and we have no other active RPCs, the + // channel must move to idle eventually. + select { + case <-enforcer.enterIdleCh: + case <-time.After(defaultTestTimeout): + t.Fatal("Timeout waiting for channel to move to idle") + } +} + +// TestIdlenessManager_Enabled_ExitIdleOnRPC tests the case where the idle +// manager is enabled. Ensures that the channel moves out of idle when an RPC is +// initiated. +func (s) TestIdlenessManager_Enabled_ExitIdleOnRPC(t *testing.T) { + overrideNewTimer(t) + + enforcer := newTestIdlenessEnforcer() + mgr := newIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout)) + defer mgr.close() + + // Ensure that the channel moves to idle since there are no RPCs. + select { + case <-enforcer.enterIdleCh: + case <-time.After(2 * defaultTestIdleTimeout): + t.Fatal("Timeout waiting for channel to move to idle mode") + } + + for i := 0; i < 100; i++ { + // A call to onCallBegin and onCallEnd simulates an RPC. + go func() { + if err := mgr.onCallBegin(); err != nil { + t.Errorf("onCallBegin() failed: %v", err) + } + mgr.onCallEnd() + }() + } + + // Ensure that the channel moves out of idle as a result of the above RPC. + select { + case <-enforcer.exitIdleCh: + case <-time.After(2 * defaultTestIdleTimeout): + t.Fatal("Timeout waiting for channel to move out of idle mode") + } + + // Ensure that only one call to exit idle mode is made to the CC. + sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer sCancel() + select { + case <-enforcer.exitIdleCh: + t.Fatal("More than one call to exit idle mode on the ClientConn; only one expected") + case <-sCtx.Done(): + } +} + +type racyIdlenessState int32 + +const ( + stateInital racyIdlenessState = iota + stateEnteredIdle + stateExitedIdle + stateActiveRPCs +) + +// racyIdlnessEnforcer is a test idleness enforcer used specifically to test the +// race between idle timeout and incoming RPCs. +type racyIdlenessEnforcer struct { + state *racyIdlenessState // Accessed atomically. +} + +// exitIdleMode sets the internal state to stateExitedIdle. We should only ever +// exit idle when we are currently in idle. +func (ri *racyIdlenessEnforcer) exitIdleMode() error { + if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateEnteredIdle), int32(stateExitedIdle)) { + return fmt.Errorf("idleness enforcer asked to exit idle when it did not enter idle earlier") + } + return nil +} + +// enterIdleMode attempts to set the internal state to stateEnteredIdle. We should only ever enter idle before RPCs start. +func (ri *racyIdlenessEnforcer) enterIdleMode() error { + if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateInital), int32(stateEnteredIdle)) { + return fmt.Errorf("idleness enforcer asked to enter idle after rpcs started") + } + return nil +} + +// TestIdlenessManager_IdleTimeoutRacesWithOnCallBegin tests the case where +// firing of the idle timeout races with an incoming RPC. The test verifies that +// if the timer callback win the race and puts the channel in idle, the RPCs can +// kick it out of idle. And if the RPCs win the race and keep the channel +// active, then the timer callback should not attempt to put the channel in idle +// mode. +func (s) TestIdlenessManager_IdleTimeoutRacesWithOnCallBegin(t *testing.T) { + // Run multiple iterations to simulate different possibilities. + for i := 0; i < 10; i++ { + t.Run(fmt.Sprintf("iteration=%d", i), func(t *testing.T) { + var idlenessState racyIdlenessState + enforcer := &racyIdlenessEnforcer{state: &idlenessState} + + // Configure a large idle timeout so that we can control the + // race between the timer callback and RPCs. + mgr := newIdlenessManager(enforcer, time.Duration(10*time.Minute)) + defer mgr.close() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + m := mgr.(interface{ handleIdleTimeout() }) + <-time.After(defaultTestIdleTimeout) + m.handleIdleTimeout() + }() + for j := 0; j < 100; j++ { + wg.Add(1) + go func() { + defer wg.Done() + // Wait for the configured idle timeout and simulate an RPC to + // race with the idle timeout timer callback. + <-time.After(defaultTestIdleTimeout) + if err := mgr.onCallBegin(); err != nil { + t.Errorf("onCallBegin() failed: %v", err) + } + atomic.StoreInt32((*int32)(&idlenessState), int32(stateActiveRPCs)) + mgr.onCallEnd() + }() + } + wg.Wait() + }) + } +} diff --git a/internal/grpcsync/callback_serializer.go b/internal/grpcsync/callback_serializer.go index d91f92463542..37b8d4117e77 100644 --- a/internal/grpcsync/callback_serializer.go +++ b/internal/grpcsync/callback_serializer.go @@ -20,6 +20,7 @@ package grpcsync import ( "context" + "sync" "google.golang.org/grpc/internal/buffer" ) @@ -31,19 +32,21 @@ import ( // // This type is safe for concurrent access. type CallbackSerializer struct { - // Done is closed once the serializer is shut down completely, i.e a - // scheduled callback, if any, that was running when the context passed to - // NewCallbackSerializer is cancelled, has completed and the serializer has - // deallocated all its resources. + // Done is closed once the serializer is shut down completely, i.e all + // scheduled callbacks are executed and the serializer has deallocated all + // its resources. Done chan struct{} callbacks *buffer.Unbounded + closedMu sync.Mutex + closed bool } // NewCallbackSerializer returns a new CallbackSerializer instance. The provided // context will be passed to the scheduled callbacks. Users should cancel the // provided context to shutdown the CallbackSerializer. It is guaranteed that no -// callbacks will be executed once this context is canceled. +// callbacks will be added once this context is canceled, and any pending un-run +// callbacks will be executed before the serializer is shut down. func NewCallbackSerializer(ctx context.Context) *CallbackSerializer { t := &CallbackSerializer{ Done: make(chan struct{}), @@ -57,17 +60,30 @@ func NewCallbackSerializer(ctx context.Context) *CallbackSerializer { // // Callbacks are expected to honor the context when performing any blocking // operations, and should return early when the context is canceled. -func (t *CallbackSerializer) Schedule(f func(ctx context.Context)) { +// +// Return value indicates if the callback was successfully added to the list of +// callbacks to be executed by the serializer. It is not possible to add +// callbacks once the context passed to NewCallbackSerializer is cancelled. +func (t *CallbackSerializer) Schedule(f func(ctx context.Context)) bool { + t.closedMu.Lock() + defer t.closedMu.Unlock() + + if t.closed { + return false + } t.callbacks.Put(f) + return true } func (t *CallbackSerializer) run(ctx context.Context) { + var backlog []func(context.Context) + defer close(t.Done) for ctx.Err() == nil { select { case <-ctx.Done(): - t.callbacks.Close() - return + // Do nothing here. Next iteration of the for loop will not happen, + // since ctx.Err() would be non-nil. case callback, ok := <-t.callbacks.Get(): if !ok { return @@ -76,4 +92,28 @@ func (t *CallbackSerializer) run(ctx context.Context) { callback.(func(ctx context.Context))(ctx) } } + + // Fetch pending callbacks if any, and execute them before returning from + // this method and closing t.Done. + t.closedMu.Lock() + t.closed = true + backlog = t.fetchPendingCallbacks() + t.callbacks.Close() + t.closedMu.Unlock() + for _, b := range backlog { + b(ctx) + } +} + +func (t *CallbackSerializer) fetchPendingCallbacks() []func(context.Context) { + var backlog []func(context.Context) + for { + select { + case b := <-t.callbacks.Get(): + backlog = append(backlog, b.(func(context.Context))) + t.callbacks.Load() + default: + return backlog + } + } } diff --git a/internal/grpcsync/callback_serializer_test.go b/internal/grpcsync/callback_serializer_test.go index 8c465af66aea..cdbd446f8101 100644 --- a/internal/grpcsync/callback_serializer_test.go +++ b/internal/grpcsync/callback_serializer_test.go @@ -20,7 +20,6 @@ package grpcsync import ( "context" - "fmt" "sync" "testing" "time" @@ -141,7 +140,10 @@ func (s) TestCallbackSerializer_Schedule_Concurrent(t *testing.T) { // are not executed once Close() returns. func (s) TestCallbackSerializer_Schedule_Close(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - cs := NewCallbackSerializer(ctx) + defer cancel() + + serializerCtx, serializerCancel := context.WithTimeout(context.Background(), defaultTestTimeout) + cs := NewCallbackSerializer(serializerCtx) // Schedule a callback which blocks until the context passed to it is // canceled. It also closes a channel to signal that it has started. @@ -151,36 +153,54 @@ func (s) TestCallbackSerializer_Schedule_Close(t *testing.T) { <-ctx.Done() }) - // Schedule a bunch of callbacks. These should not be exeuted since the first - // one started earlier is blocked. + // Schedule a bunch of callbacks. These should be exeuted since the are + // scheduled before the serializer is closed. const numCallbacks = 10 - errCh := make(chan error, numCallbacks) + callbackCh := make(chan int, numCallbacks) for i := 0; i < numCallbacks; i++ { - cs.Schedule(func(_ context.Context) { - errCh <- fmt.Errorf("callback %d executed when not expected to", i) - }) + num := i + if !cs.Schedule(func(context.Context) { callbackCh <- num }) { + t.Fatal("Schedule failed to accept a callback when the serializer is yet to be closed") + } } // Ensure that none of the newer callbacks are executed at this point. select { case <-time.After(defaultTestShortTimeout): - case err := <-errCh: - t.Fatal(err) + case <-callbackCh: + t.Fatal("Newer callback executed when older one is still executing") } // Wait for the first callback to start before closing the scheduler. <-firstCallbackStartedCh - // Cancel the context which will unblock the first callback. None of the + // Cancel the context which will unblock the first callback. All of the // other callbacks (which have not started executing at this point) should // be executed after this. - cancel() + serializerCancel() + + // Ensure that the newer callbacks are executed. + for i := 0; i < numCallbacks; i++ { + select { + case <-ctx.Done(): + t.Fatal("Timeout when waiting for callback scheduled before close to be executed") + case num := <-callbackCh: + if num != i { + t.Fatalf("Executing callback %d, want %d", num, i) + } + } + } <-cs.Done - // Ensure that the newer callbacks are not executed. + done := make(chan struct{}) + if cs.Schedule(func(context.Context) { close(done) }) { + t.Fatal("Scheduled a callback after closing the serializer") + } + + // Ensure that the lates callback is executed at this point. select { case <-time.After(defaultTestShortTimeout): - case err := <-errCh: - t.Fatal(err) + case <-done: + t.Fatal("Newer callback executed when scheduled after closing serializer") } } diff --git a/picker_wrapper.go b/picker_wrapper.go index c525dc070fc6..8e24d864986d 100644 --- a/picker_wrapper.go +++ b/picker_wrapper.go @@ -36,6 +36,7 @@ import ( type pickerWrapper struct { mu sync.Mutex done bool + idle bool blockingCh chan struct{} picker balancer.Picker } @@ -47,7 +48,11 @@ func newPickerWrapper() *pickerWrapper { // updatePicker is called by UpdateBalancerState. It unblocks all blocked pick. func (pw *pickerWrapper) updatePicker(p balancer.Picker) { pw.mu.Lock() - if pw.done { + if pw.done || pw.idle { + // There is a small window where a picker update from the LB policy can + // race with the channel going to idle mode. If the picker is idle here, + // it is because the channel asked it to do so, and therefore it is sage + // to ignore the update from the LB policy. pw.mu.Unlock() return } @@ -187,6 +192,25 @@ func (pw *pickerWrapper) close() { close(pw.blockingCh) } +func (pw *pickerWrapper) enterIdleMode() { + pw.mu.Lock() + defer pw.mu.Unlock() + if pw.done { + return + } + pw.idle = true +} + +func (pw *pickerWrapper) exitIdleMode() { + pw.mu.Lock() + defer pw.mu.Unlock() + if pw.done { + return + } + pw.blockingCh = make(chan struct{}) + pw.idle = false +} + // dropError is a wrapper error that indicates the LB policy wishes to drop the // RPC and not retry it. type dropError struct { diff --git a/resolver_conn_wrapper.go b/resolver_conn_wrapper.go index ce12b52ecdc0..b408b3688f2e 100644 --- a/resolver_conn_wrapper.go +++ b/resolver_conn_wrapper.go @@ -21,6 +21,7 @@ package grpc import ( "context" "strings" + "sync" "google.golang.org/grpc/balancer" "google.golang.org/grpc/internal/channelz" @@ -44,15 +45,20 @@ type ccResolverWrapper struct { cc resolverStateUpdater channelzID *channelz.Identifier ignoreServiceConfig bool - - // Outgoing (gRPC --> resolver) and incoming (resolver --> gRPC) calls are - // guaranteed to execute in a mutually exclusive manner as they are - // scheduled on the CallbackSerializer. Fields accessed *only* in serializer - // callbacks, can therefore be accessed without a mutex. - serializer *grpcsync.CallbackSerializer - serializerCancel context.CancelFunc - resolver resolver.Resolver - curState resolver.State + opts ccResolverWrapperOpts + serializer *grpcsync.CallbackSerializer // To serialize all incoming calls. + serializerCancel context.CancelFunc // To close the serializer, accessed only from close(). + + // All incoming (resolver --> gRPC) calls are guaranteed to execute in a + // mutually exclusive manner as they are scheduled on the serializer. + // Fields accessed *only* in these serializer callbacks, can therefore be + // accessed without a mutex. + curState resolver.State + + // mu guards access to the below fields. + mu sync.Mutex + closed bool + resolver resolver.Resolver // Accessed only from outgoing calls. } // ccResolverWrapperOpts wraps the arguments to be passed when creating a new @@ -72,38 +78,81 @@ func newCCResolverWrapper(cc resolverStateUpdater, opts ccResolverWrapperOpts) ( cc: cc, channelzID: opts.channelzID, ignoreServiceConfig: opts.bOpts.DisableServiceConfig, + opts: opts, serializer: grpcsync.NewCallbackSerializer(ctx), serializerCancel: cancel, } + // Cannot hold the lock at build time because the resolver can send an + // update or error inline and these incoming calls grab the lock to schedule + // a callback in the serializer. r, err := opts.builder.Build(opts.target, ccr, opts.bOpts) if err != nil { cancel() return nil, err } + + // Any error reported by the resolver at build time that leads to a + // re-resolution request from the balancer is dropped by grpc until we + // return from this function. So, we don't have to handle pending resolveNow + // requests here. + ccr.mu.Lock() ccr.resolver = r + ccr.mu.Unlock() + return ccr, nil } func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) { - ccr.serializer.Schedule(func(_ context.Context) { - ccr.resolver.ResolveNow(o) - }) + ccr.mu.Lock() + defer ccr.mu.Unlock() + + // ccr.resolver field is set only after the call to Build() returns. But in + // the process of building, the resolver may send an error update which when + // propagated to the balancer may result in a re-resolution request. + if ccr.closed || ccr.resolver == nil { + return + } + ccr.resolver.ResolveNow(o) } func (ccr *ccResolverWrapper) close() { + ccr.mu.Lock() + if ccr.closed { + ccr.mu.Unlock() + return + } + + channelz.Info(logger, ccr.channelzID, "Closing the name resolver") + // Close the serializer to ensure that no more calls from the resolver are - // handled, before closing the resolver. + // handled, before actually closing the resolver. ccr.serializerCancel() + ccr.closed = true + r := ccr.resolver + ccr.mu.Unlock() + + // Give enqueued callbacks a chance to finish. <-ccr.serializer.Done - ccr.resolver.Close() + + // Spawn a goroutine to close the resolver (since it may block trying to + // cleanup all allocated resources) and return early. + go r.Close() +} + +// serializerScheduleLocked is a convenience method to schedule a function to be +// run on the serializer while holding ccr.mu. +func (ccr *ccResolverWrapper) serializerScheduleLocked(f func(context.Context)) { + ccr.mu.Lock() + ccr.serializer.Schedule(f) + ccr.mu.Unlock() } // UpdateState is called by resolver implementations to report new state to gRPC // which includes addresses and service config. func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error { errCh := make(chan error, 1) - ccr.serializer.Schedule(func(_ context.Context) { + ok := ccr.serializer.Schedule(func(context.Context) { ccr.addChannelzTraceEvent(s) ccr.curState = s if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState { @@ -112,22 +161,19 @@ func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error { } errCh <- nil }) - - // If the resolver wrapper is closed when waiting for this state update to - // be handled, the callback serializer will be closed as well, and we can - // rely on its Done channel to ensure that we don't block here forever. - select { - case err := <-errCh: - return err - case <-ccr.serializer.Done: + if !ok { + // The only time when Schedule() fail to add the callback to the + // serializer is when the serializer is closed, and this happens only + // when the resolver wrapper is closed. return nil } + return <-errCh } // ReportError is called by resolver implementations to report errors // encountered during name resolution to gRPC. func (ccr *ccResolverWrapper) ReportError(err error) { - ccr.serializer.Schedule(func(_ context.Context) { + ccr.serializerScheduleLocked(func(_ context.Context) { channelz.Warningf(logger, ccr.channelzID, "ccResolverWrapper: reporting error to cc: %v", err) ccr.cc.updateResolverState(resolver.State{}, err) }) @@ -136,7 +182,7 @@ func (ccr *ccResolverWrapper) ReportError(err error) { // NewAddress is called by the resolver implementation to send addresses to // gRPC. func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) { - ccr.serializer.Schedule(func(_ context.Context) { + ccr.serializerScheduleLocked(func(_ context.Context) { ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig}) ccr.curState.Addresses = addrs ccr.cc.updateResolverState(ccr.curState, nil) @@ -146,7 +192,7 @@ func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) { // NewServiceConfig is called by the resolver implementation to send service // configs to gRPC. func (ccr *ccResolverWrapper) NewServiceConfig(sc string) { - ccr.serializer.Schedule(func(_ context.Context) { + ccr.serializerScheduleLocked(func(_ context.Context) { channelz.Infof(logger, ccr.channelzID, "ccResolverWrapper: got new service config: %s", sc) if ccr.ignoreServiceConfig { channelz.Info(logger, ccr.channelzID, "Service config lookups disabled; ignoring config") diff --git a/stream.go b/stream.go index 06ec22cd0a9d..75ab86268ba1 100644 --- a/stream.go +++ b/stream.go @@ -155,6 +155,11 @@ type ClientStream interface { // If none of the above happen, a goroutine and a context will be leaked, and grpc // will not call the optionally-configured stats handler with a stats.End message. func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) { + if err := cc.idlenessMgr.onCallBegin(); err != nil { + return nil, err + } + defer cc.idlenessMgr.onCallEnd() + // allow interceptor to see all applicable call options, which means those // configured as defaults from dial option as well as per-call options opts = combine(cc.dopts.callOptions, opts) diff --git a/test/clientconn_state_transition_test.go b/test/clientconn_state_transition_test.go index 57f932d1eb5e..a14ff4588a0f 100644 --- a/test/clientconn_state_transition_test.go +++ b/test/clientconn_state_transition_test.go @@ -537,3 +537,10 @@ func awaitNotState(ctx context.Context, t *testing.T, cc *grpc.ClientConn, state } } } + +func awaitNoStateChange(ctx context.Context, t *testing.T, cc *grpc.ClientConn, currState connectivity.State) { + t.Helper() + if cc.WaitForStateChange(ctx, currState) { + t.Fatalf("State changed from %q to %q when no state change was expected", currState, cc.GetState()) + } +} diff --git a/test/idleness_test.go b/test/idleness_test.go new file mode 100644 index 000000000000..88366ed3ae12 --- /dev/null +++ b/test/idleness_test.go @@ -0,0 +1,423 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package test + +import ( + "context" + "errors" + "fmt" + "strings" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/channelz" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/status" + + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" +) + +const defaultTestShortIdleTimeout = 500 * time.Millisecond + +// channelzTraceEventFound looks up the top-channels in channelz (expects a +// single one), and checks if there is a trace event on the channel matching the +// provided description string. +func channelzTraceEventFound(ctx context.Context, wantDesc string) error { + for ctx.Err() == nil { + tcs, _ := channelz.GetTopChannels(0, 0) + if l := len(tcs); l != 1 { + return fmt.Errorf("when looking for channelz trace event with description %q, found %d top-level channels, want 1", wantDesc, l) + } + if tcs[0].Trace == nil { + return fmt.Errorf("when looking for channelz trace event with description %q, no trace events found for top-level channel", wantDesc) + } + + for _, e := range tcs[0].Trace.Events { + if strings.Contains(e.Desc, wantDesc) { + return nil + } + } + } + return fmt.Errorf("when looking for channelz trace event with description %q, %w", wantDesc, ctx.Err()) +} + +// channelzTraceEventNotFound looks up the top-channels in channelz (expects a +// single one), and verifies that there is no trace event on the channel +// matching the provided description string. +func channelzTraceEventNotFound(ctx context.Context, wantDesc string) error { + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + + err := channelzTraceEventFound(sCtx, wantDesc) + if err == nil { + return fmt.Errorf("found channelz trace event with description %q, when expected not to", wantDesc) + } + if !errors.Is(err, context.DeadlineExceeded) { + return err + } + return nil +} + +// Tests the case where channel idleness is disabled by passing an idle_timeout +// of 0. Verifies that a READY channel with no RPCs does not move to IDLE. +func (s) TestChannelIdleness_Disabled_NoActivity(t *testing.T) { + // Setup channelz for testing. + czCleanup := channelz.NewChannelzStorageForTesting() + t.Cleanup(func() { czCleanupWrapper(czCleanup, t) }) + + // Create a ClientConn with idle_timeout set to 0. + r := manual.NewBuilderWithScheme("whatever") + dopts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(r), + grpc.WithIdleTimeout(0), // Disable idleness. + grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), + } + cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + t.Cleanup(func() { cc.Close() }) + + // Start a test backend and push an address update via the resolver. + backend := stubserver.StartTestService(t, nil) + t.Cleanup(backend.Stop) + r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}}) + + // Veirfy that the ClientConn moves to READY. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + awaitState(ctx, t, cc, connectivity.Ready) + + // Veirfy that the ClientConn stay in READY. + sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout) + defer sCancel() + awaitNoStateChange(sCtx, t, cc, connectivity.Ready) + + // Verify that there are no idleness related channelz events. + if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil { + t.Fatal(err) + } + if err := channelzTraceEventNotFound(ctx, "exiting idle mode"); err != nil { + t.Fatal(err) + } +} + +// Tests the case where channel idleness is enabled by passing a small value for +// idle_timeout. Verifies that a READY channel with no RPCs moves to IDLE. +func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) { + // Setup channelz for testing. + czCleanup := channelz.NewChannelzStorageForTesting() + t.Cleanup(func() { czCleanupWrapper(czCleanup, t) }) + + // Create a ClientConn with a short idle_timeout. + r := manual.NewBuilderWithScheme("whatever") + dopts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(r), + grpc.WithIdleTimeout(defaultTestShortIdleTimeout), + grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), + } + cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + t.Cleanup(func() { cc.Close() }) + + // Start a test backend and push an address update via the resolver. + backend := stubserver.StartTestService(t, nil) + t.Cleanup(backend.Stop) + r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}}) + + // Veirfy that the ClientConn moves to READY. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + awaitState(ctx, t, cc, connectivity.Ready) + + // Veirfy that the ClientConn moves to IDLE as there is no activity. + awaitState(ctx, t, cc, connectivity.Idle) + + // Verify idleness related channelz events. + if err := channelzTraceEventFound(ctx, "entering idle mode"); err != nil { + t.Fatal(err) + } +} + +// Tests the case where channel idleness is enabled by passing a small value for +// idle_timeout. Verifies that a READY channel with an ongoing RPC stays READY. +func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) { + // Setup channelz for testing. + czCleanup := channelz.NewChannelzStorageForTesting() + t.Cleanup(func() { czCleanupWrapper(czCleanup, t) }) + + // Create a ClientConn with a short idle_timeout. + r := manual.NewBuilderWithScheme("whatever") + dopts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(r), + grpc.WithIdleTimeout(defaultTestShortIdleTimeout), + grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), + } + cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + t.Cleanup(func() { cc.Close() }) + + // Start a test backend which keeps a unary RPC call active by blocking on a + // channel that is closed by the test later on. Also push an address update + // via the resolver. + blockCh := make(chan struct{}) + backend := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + <-blockCh + return &testpb.Empty{}, nil + }, + } + if err := backend.StartServer(); err != nil { + t.Fatalf("Failed to start backend: %v", err) + } + t.Cleanup(backend.Stop) + r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}}) + + // Veirfy that the ClientConn moves to READY. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + awaitState(ctx, t, cc, connectivity.Ready) + + // Spawn a goroutine which checks expected state transitions and idleness + // channelz trace events. It eventually closes `blockCh`, thereby unblocking + // the server RPC handler and the unary call below. + errCh := make(chan error, 1) + go func() { + // Veirfy that the ClientConn stay in READY. + sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout) + defer sCancel() + awaitNoStateChange(sCtx, t, cc, connectivity.Ready) + + // Verify that there are no idleness related channelz events. + if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil { + errCh <- err + return + } + if err := channelzTraceEventNotFound(ctx, "exiting idle mode"); err != nil { + errCh <- err + return + } + + // Unblock the unary RPC on the server. + close(blockCh) + errCh <- nil + }() + + // Make a unary RPC that blocks on the server, thereby ensuring that the + // count of active RPCs on the client is non-zero. + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Errorf("EmptyCall RPC failed: %v", err) + } + + select { + case err := <-errCh: + if err != nil { + t.Fatal(err) + } + case <-ctx.Done(): + t.Fatalf("Timeout when trying to verify that an active RPC keeps channel from moving to IDLE") + } +} + +// Tests the case where channel idleness is enabled by passing a small value for +// idle_timeout. Verifies that activity on a READY channel (frequent and short +// RPCs) keeps it from moving to IDLE. +func (s) TestChannelIdleness_Enabled_ActiveSinceLastCheck(t *testing.T) { + // Setup channelz for testing. + czCleanup := channelz.NewChannelzStorageForTesting() + t.Cleanup(func() { czCleanupWrapper(czCleanup, t) }) + + // Create a ClientConn with a short idle_timeout. + r := manual.NewBuilderWithScheme("whatever") + dopts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(r), + grpc.WithIdleTimeout(defaultTestShortIdleTimeout), + grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), + } + cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + t.Cleanup(func() { cc.Close() }) + + // Start a test backend and push an address update via the resolver. + backend := stubserver.StartTestService(t, nil) + t.Cleanup(backend.Stop) + r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}}) + + // Veirfy that the ClientConn moves to READY. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + awaitState(ctx, t, cc, connectivity.Ready) + + // For a duration of three times the configured idle timeout, making RPCs + // every now and then and ensure that the channel does not move out of + // READY. + sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout) + defer sCancel() + go func() { + for ; sCtx.Err() == nil; <-time.After(defaultTestShortIdleTimeout / 4) { + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); err != nil { + // While iterating through this for loop, at some point in time, + // the context deadline will expire. It is safe to ignore that + // error code. + if status.Code(err) != codes.DeadlineExceeded { + t.Errorf("EmptyCall RPC failed: %v", err) + return + } + } + } + }() + + // Veirfy that the ClientConn stay in READY. + awaitNoStateChange(sCtx, t, cc, connectivity.Ready) + + // Verify that there are no idleness related channelz events. + if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil { + t.Fatal(err) + } + if err := channelzTraceEventNotFound(ctx, "exiting idle mode"); err != nil { + t.Fatal(err) + } +} + +// Tests the case where channel idleness is enabled by passing a small value for +// idle_timeout. Verifies that a READY channel with no RPCs moves to IDLE. Also +// verifies that a subsequent RPC on the IDLE channel kicks it out of IDLE. +func (s) TestChannelIdleness_Enabled_ExitIdleOnRPC(t *testing.T) { + // Setup channelz for testing. + czCleanup := channelz.NewChannelzStorageForTesting() + t.Cleanup(func() { czCleanupWrapper(czCleanup, t) }) + + // Start a test backend and set the bootstrap state of the resolver to + // include this address. This will ensure that when the resolver is + // restarted when exiting idle, it will push the same address to grpc again. + r := manual.NewBuilderWithScheme("whatever") + backend := stubserver.StartTestService(t, nil) + t.Cleanup(backend.Stop) + r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}}) + + // Create a ClientConn with a short idle_timeout. + dopts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(r), + grpc.WithIdleTimeout(defaultTestShortIdleTimeout), + grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), + } + cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + t.Cleanup(func() { cc.Close() }) + + // Veirfy that the ClientConn moves to READY. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + awaitState(ctx, t, cc, connectivity.Ready) + + // Veirfy that the ClientConn moves to IDLE as there is no activity. + awaitState(ctx, t, cc, connectivity.Idle) + + // Verify idleness related channelz events. + if err := channelzTraceEventFound(ctx, "entering idle mode"); err != nil { + t.Fatal(err) + } + + // Make an RPC and ensure that it succeeds and moves the channel back to + // READY. + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("EmptyCall RPC failed: %v", err) + } + awaitState(ctx, t, cc, connectivity.Ready) + if err := channelzTraceEventFound(ctx, "exiting idle mode"); err != nil { + t.Fatal(err) + } +} + +// Tests the case where channel idleness is enabled by passing a small value for +// idle_timeout. Simulates a race between the idle timer firing and RPCs being +// initiated, after a period of inactivity on the channel. +// +// After a period of inactivity (for the configured idle timeout duration), when +// RPCs are started, there are two possibilities: +// - the idle timer wins the race and puts the channel in idle. The RPCs then +// kick it out of idle. +// - the RPCs win the race, and therefore the channel never moves to idle. +// +// In either of these cases, all RPCs must succeed. +func (s) TestChannelIdleness_Enabled_IdleTimeoutRacesWithRPCs(t *testing.T) { + // Setup channelz for testing. + czCleanup := channelz.NewChannelzStorageForTesting() + t.Cleanup(func() { czCleanupWrapper(czCleanup, t) }) + + // Start a test backend and set the bootstrap state of the resolver to + // include this address. This will ensure that when the resolver is + // restarted when exiting idle, it will push the same address to grpc again. + r := manual.NewBuilderWithScheme("whatever") + backend := stubserver.StartTestService(t, nil) + t.Cleanup(backend.Stop) + r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}}) + + // Create a ClientConn with a short idle_timeout. + dopts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(r), + grpc.WithIdleTimeout(defaultTestShortTimeout), + grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), + } + cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + t.Cleanup(func() { cc.Close() }) + + // Veirfy that the ClientConn moves to READY. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + awaitState(ctx, t, cc, connectivity.Ready) + + // Make an RPC every defaultTestShortTimeout duration so as to race with the + // idle timeout. Whether the idle timeout wins the race or the RPC wins the + // race, RPCs must succeed. + client := testgrpc.NewTestServiceClient(cc) + for i := 0; i < 20; i++ { + <-time.After(defaultTestShortTimeout) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Errorf("EmptyCall RPC failed: %v", err) + } + } +}