From 41500ecab2392b172acdc38698cfa71cfc549f45 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 5 May 2023 16:28:32 -0700 Subject: [PATCH] grpc: support channel idleness --- balancer_conn_wrappers.go | 130 +++++++++++++++---- call.go | 5 + clientconn.go | 155 +++++++++++++++------- dialoptions.go | 22 ++++ idle.go | 157 +++++++++++++++++++++++ idle_test.go | 262 ++++++++++++++++++++++++++++++++++++++ picker_wrapper.go | 26 +++- resolver_conn_wrapper.go | 39 ++++++ stream.go | 5 + 9 files changed, 729 insertions(+), 72 deletions(-) create mode 100644 idle.go create mode 100644 idle_test.go diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index 1865a3f09c2b..a946e5d6ce6b 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -56,6 +56,13 @@ type ccBalancerWrapper struct { serializerCancel context.CancelFunc balancer *gracefulswitch.Balancer curBalancerName string + + // During the window of time when the channel is entring idle and the + // underlying balancer is being shut down, keeping track of whether the + // channel is in idle mode to ensure that calls from the underlying balancer + // are not forwarded to grpc. + idleMu sync.Mutex + isIdle bool } // newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer @@ -125,12 +132,6 @@ func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connecti }) } -func (ccb *ccBalancerWrapper) exitIdle() { - ccb.serializer.Schedule(func(_ context.Context) { - ccb.balancer.ExitIdle() - }) -} - func (ccb *ccBalancerWrapper) resolverError(err error) { ccb.serializer.Schedule(func(_ context.Context) { ccb.balancer.ResolverError(err) @@ -154,36 +155,93 @@ func (ccb *ccBalancerWrapper) switchTo(name string) { if strings.EqualFold(ccb.curBalancerName, name) { return } + ccb.buildLoadBalancingPolicy(name) + }) +} - // Use the default LB policy, pick_first, if no LB policy with name is - // found in the registry. - builder := balancer.Get(name) - if builder == nil { - channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name) - builder = newPickfirstBuilder() - } else { - channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name) - } +// buildLoadBalancingPolicy performs the following: +// - retrieve a balancer builder for the given name. Use the default LB +// policy, pick_first, if no LB policy with name is found in the registry. +// - instruct the gracefulswitch balancer to switch to the above builder. This +// will actually build the new balancer. +// - update the `curBalancerName` field +func (ccb *ccBalancerWrapper) buildLoadBalancingPolicy(name string) { + builder := balancer.Get(name) + if builder == nil { + channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name) + builder = newPickfirstBuilder() + } else { + channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name) + } - if err := ccb.balancer.SwitchTo(builder); err != nil { - channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err) - return - } - ccb.curBalancerName = builder.Name() - }) + if err := ccb.balancer.SwitchTo(builder); err != nil { + channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err) + return + } + ccb.curBalancerName = builder.Name() } func (ccb *ccBalancerWrapper) close() { // Close the serializer to ensure that no more calls from gRPC are sent to - // the balancer. We don't have to worry about suppressing calls from a - // closed balancer because these are handled by the ClientConn (balancer - // wrapper is only ever closed when the ClientConn is closed). + // the balancer, and no more calls from the balancer are sent to gRPC. ccb.serializerCancel() <-ccb.serializer.Done ccb.balancer.Close() } +// exitIdleMode is invoked by grpc when the channel exits idle mode either +// because of an RPC or because of an invocation of the Connect() API. This +// recreates the balancer that was closed previously when entering idle mode. +// +// If the channel is not in idle mode, we know for a fact that we are here as a +// result of the user calling the Connect() method on the ClientConn. Forward +// the call to the underlying balancer, instructing it to reconnect to the +// backends. +func (ccb *ccBalancerWrapper) exitIdleMode() { + channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: exiting idle mode") + + done := make(chan struct{}) + ccb.serializer.Schedule(func(_ context.Context) { + defer close(done) + + ccb.idleMu.Lock() + defer ccb.idleMu.Unlock() + + if !ccb.isIdle { + ccb.balancer.ExitIdle() + return + } + + ccb.buildLoadBalancingPolicy(ccb.curBalancerName) + ccb.isIdle = false + }) + <-done +} + +// enterIdleMode is invoked by grpc when the channel enters idle mode upon +// expiry of idle_timeout. This call blocks until the balancer is closed. +func (ccb *ccBalancerWrapper) enterIdleMode() { + channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: entering idle mode") + + done := make(chan struct{}) + ccb.serializer.Schedule(func(_ context.Context) { + ccb.idleMu.Lock() + defer ccb.idleMu.Unlock() + + ccb.close() + ccb.isIdle = true + close(done) + }) + <-done +} + func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { + ccb.idleMu.Lock() + defer ccb.idleMu.Unlock() + if ccb.isIdle { + return nil, fmt.Errorf("grpc: cannote create a SubConn in idle mode") + } + if len(addrs) <= 0 { return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list") } @@ -200,6 +258,12 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer } func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { + ccb.idleMu.Lock() + defer ccb.idleMu.Unlock() + if ccb.isIdle { + return + } + acbw, ok := sc.(*acBalancerWrapper) if !ok { return @@ -208,6 +272,12 @@ func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { } func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { + ccb.idleMu.Lock() + defer ccb.idleMu.Unlock() + if ccb.isIdle { + return + } + acbw, ok := sc.(*acBalancerWrapper) if !ok { return @@ -216,6 +286,12 @@ func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resol } func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { + ccb.idleMu.Lock() + defer ccb.idleMu.Unlock() + if ccb.isIdle { + return + } + // Update picker before updating state. Even though the ordering here does // not matter, it can lead to multiple calls of Pick in the common start-up // case where we wait for ready and then perform an RPC. If the picker is @@ -226,6 +302,12 @@ func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { } func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) { + ccb.idleMu.Lock() + defer ccb.idleMu.Unlock() + if ccb.isIdle { + return + } + ccb.cc.resolveNow(o) } diff --git a/call.go b/call.go index 9e20e4d385f9..e6a1dc5d75ed 100644 --- a/call.go +++ b/call.go @@ -27,6 +27,11 @@ import ( // // All errors returned by Invoke are compatible with the status package. func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error { + if err := cc.idlenessMgr.onCallBegin(); err != nil { + return err + } + defer cc.idlenessMgr.onCallEnd() + // allow interceptor to see all applicable call options, which means those // configured as defaults from dial option as well as per-call options opts = combine(cc.dopts.callOptions, opts) diff --git a/clientconn.go b/clientconn.go index 50d08a49a205..3628f7385604 100644 --- a/clientconn.go +++ b/clientconn.go @@ -69,6 +69,9 @@ var ( errConnDrain = errors.New("grpc: the connection is drained") // errConnClosing indicates that the connection is closing. errConnClosing = errors.New("grpc: the connection is closing") + // errConnIdling indicates the the connection is being closed as the channel + // is moving to an idle mode due to inactivity. + errConnIdling = errors.New("grpc: the connection is closing due to channel idleness") // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default // service config. invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid" @@ -243,6 +246,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * go cc.scWatcher() } + // Initialize the balancer wrapper. var credsClone credentials.TransportCredentials if creds := cc.dopts.copts.TransportCredentials; creds != nil { credsClone = creds.Clone() @@ -257,8 +261,8 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * Target: cc.parsedTarget, }) - // Build the resolver. - rWrapper, err := newCCResolverWrapper(cc, ccResolverWrapperOpts{ + // Initialize the resolver wrapper. + rw, err := newCCResolverWrapper(cc, ccResolverWrapperOpts{ target: cc.parsedTarget, builder: cc.resolverBuilder, bOpts: resolver.BuildOptions{ @@ -272,38 +276,110 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * if err != nil { return nil, fmt.Errorf("failed to build resolver: %v", err) } + // Resolver implementations may report state update or error inline when + // built (or right after), and this is handled in cc.updateResolverState. + // Also, an error from the resolver might lead to a re-resolution request + // from the balancer, which is handled in resolveNow() where + // `cc.resolverWrapper` is accessed. Hence, we need to hold the lock here. cc.mu.Lock() - cc.resolverWrapper = rWrapper + cc.resolverWrapper = rw cc.mu.Unlock() + // Configure idleness support with configured idle_timeout or default. + cc.idlenessMgr = newIdlenessManager(cc, cc.dopts.idleTimeout) + + // Return early for non-blocking dials. + if !cc.dopts.block { + return cc, nil + } + // A blocking dial blocks until the clientConn is ready. - if cc.dopts.block { - for { - cc.Connect() - s := cc.GetState() - if s == connectivity.Ready { - break - } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure { - if err = cc.connectionError(); err != nil { - terr, ok := err.(interface { - Temporary() bool - }) - if ok && !terr.Temporary() { - return nil, err - } - } - } - if !cc.WaitForStateChange(ctx, s) { - // ctx got timeout or canceled. - if err = cc.connectionError(); err != nil && cc.dopts.returnLastError { + for { + cc.Connect() + s := cc.GetState() + if s == connectivity.Ready { + return cc, nil + } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure { + if err = cc.connectionError(); err != nil { + terr, ok := err.(interface { + Temporary() bool + }) + if ok && !terr.Temporary() { return nil, err } - return nil, ctx.Err() } } + if !cc.WaitForStateChange(ctx, s) { + // ctx got timeout or canceled. + if err = cc.connectionError(); err != nil && cc.dopts.returnLastError { + return nil, err + } + return nil, ctx.Err() + } + } +} + +// addTraceEvent is a helper method to add a trace event on the channel. If the +// channel is a nested one, the same event is also added on the parent channel. +func (cc *ClientConn) addTraceEvent(msg string) { + ted := &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("Channel %s", msg), + Severity: channelz.CtInfo, + } + if cc.dopts.channelzParentID != nil { + ted.Parent = &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("Nested channel(id:%d) %s", cc.channelzID.Int(), msg), + Severity: channelz.CtInfo, + } + } + channelz.AddTraceEvent(logger, cc.channelzID, 0, ted) +} + +// exitIdleMode moves the channel out of idle mode by recreating the name +// resolver and load balancer. +func (cc *ClientConn) exitIdleMode() error { + cc.addTraceEvent("exiting idle mode") + + cc.mu.Lock() + defer cc.mu.Unlock() + + // TODO(easwars): what if any of these fail? + cc.blockingpicker.exitIdleMode() + cc.balancerWrapper.exitIdleMode() + cc.firstResolveEvent = grpcsync.NewEvent() + cc.resolverWrapper.exitIdleMode() + + // TODO(easwars): should the connectivity state be set to something? + return nil +} + +// enterIdleMode puts the channel in idle mode, and as part of it shuts down the +// name resolver, load balancer and any subchannels. +func (cc *ClientConn) enterIdleMode() error { + cc.addTraceEvent("entering idle mode") + + cc.mu.Lock() + if cc.conns == nil { + cc.mu.Unlock() + return ErrClientConnClosing } - return cc, nil + // cc.conns == nil is a proxy for the ClientConn being closed. So, instead + // of setting it to nil here, we recreate the map. This also means that we + // don't have to do this when exiting idle mode. + conns := cc.conns + cc.conns = make(map[*addrConn]struct{}) + cc.csMgr.updateState(connectivity.Idle) + + cc.resolverWrapper.enterIdleMode() + cc.blockingpicker.enterIdleMode() + cc.balancerWrapper.enterIdleMode() + cc.mu.Unlock() + + for ac := range conns { + ac.tearDown(errConnIdling) + } + return nil } // validateTransportCredentials performs a series of checks on the configured @@ -350,17 +426,7 @@ func (cc *ClientConn) validateTransportCredentials() error { // Doesn't grab cc.mu as this method is expected to be called only at Dial time. func (cc *ClientConn) channelzRegistration(target string) { cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target) - ted := &channelz.TraceEventDesc{ - Desc: "Channel created", - Severity: channelz.CtInfo, - } - if cc.dopts.channelzParentID != nil { - ted.Parent = &channelz.TraceEventDesc{ - Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID.Int()), - Severity: channelz.CtInfo, - } - } - channelz.AddTraceEvent(logger, cc.channelzID, 1, ted) + cc.addTraceEvent("created") cc.csMgr.channelzID = cc.channelzID } @@ -509,6 +575,7 @@ type ClientConn struct { channelzID *channelz.Identifier // Channelz identifier for the channel. resolverBuilder resolver.Builder // See parseTargetAndFindResolver(). balancerWrapper *ccBalancerWrapper // Uses gracefulswitch.balancer underneath. + idlenessMgr *idlenessManager // The following provide their own synchronization, and therefore don't // require cc.mu to be held to access them. @@ -573,7 +640,7 @@ func (cc *ClientConn) GetState() connectivity.State { // Notice: This API is EXPERIMENTAL and may be changed or removed in a later // release. func (cc *ClientConn) Connect() { - cc.balancerWrapper.exitIdle() + cc.balancerWrapper.exitIdleMode() } func (cc *ClientConn) scWatcher() { @@ -1068,6 +1135,7 @@ func (cc *ClientConn) Close() error { rWrapper := cc.resolverWrapper cc.resolverWrapper = nil bWrapper := cc.balancerWrapper + idlenessMgr := cc.idlenessMgr cc.mu.Unlock() // The order of closing matters here since the balancer wrapper assumes the @@ -1079,21 +1147,14 @@ func (cc *ClientConn) Close() error { if rWrapper != nil { rWrapper.close() } + if idlenessMgr != nil { + idlenessMgr.close() + } for ac := range conns { ac.tearDown(ErrClientConnClosing) } - ted := &channelz.TraceEventDesc{ - Desc: "Channel deleted", - Severity: channelz.CtInfo, - } - if cc.dopts.channelzParentID != nil { - ted.Parent = &channelz.TraceEventDesc{ - Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID.Int()), - Severity: channelz.CtInfo, - } - } - channelz.AddTraceEvent(logger, cc.channelzID, 0, ted) + cc.addTraceEvent("deleted") // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add // trace reference to the entity being deleted, and thus prevent it from being // deleted right away. diff --git a/dialoptions.go b/dialoptions.go index cdc8263bda65..51c8997d5d18 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -77,6 +77,7 @@ type dialOptions struct { defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON. defaultServiceConfigRawJSON *string resolvers []resolver.Builder + idleTimeout time.Duration } // DialOption configures how we set up the connection. @@ -627,6 +628,7 @@ func defaultDialOptions() dialOptions { ReadBufferSize: defaultReadBufSize, UseProxy: true, }, + idleTimeout: 30 * time.Minute, } } @@ -655,3 +657,23 @@ func WithResolvers(rs ...resolver.Builder) DialOption { o.resolvers = append(o.resolvers, rs...) }) } + +// WithIdleTimeout returns a DialOption that configures an idle timeout for the +// channel. If the channel is idle for the configured timeout, i.e there are no +// ongoing RPCs and no new RPCs are initiated, the channel will enter idle mode +// and as a result the name resolver and load balancer will be shut down. The +// channel will exit idle mode when the Connect() method is called or when an +// RPC is initiated. +// +// A default timeout of 30 min will be used if this dial option is not set at +// dial time and idleness can be disabled by passing a timeout of zero. +// +// # Experimental +// +// Notice: This API is EXPERIMENTAL and may be changed or removed in a +// later release. +func WithIdleTimeout(d time.Duration) DialOption { + return newFuncDialOption(func(o *dialOptions) { + o.idleTimeout = d + }) +} diff --git a/idle.go b/idle.go new file mode 100644 index 000000000000..680389c9efbc --- /dev/null +++ b/idle.go @@ -0,0 +1,157 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc + +import ( + "fmt" + "sync" + "time" +) + +// For overriding in unit tests. +var newTimer = func(d time.Duration, f func()) *time.Timer { + return time.AfterFunc(d, f) +} + +// idlenessEnforcer is the functionality provided by grpc.ClientConn to enter +// and exit from idle mode. +type idlenessEnforcer interface { + exitIdleMode() error + enterIdleMode() error +} + +// idlenessManager contains functionality to track RPC activity on the channel +// and uses this to instruct the channel to enter or exit idle mode as +// appropriate. +type idlenessManager struct { + // The following fields are set when the manager is created and are never + // written to after that. Therefore these can be accessed without a mutex. + + enforcer idlenessEnforcer // Functionality provided by grpc.ClientConn. + timeout int64 // Idle timeout duration nanos stored as an int64. + isDisabled bool // Disabled if idle_timeout is set to 0. + + // All state maintained by the manager is guarded by this mutex. + mu sync.Mutex + activeCallsCount int // Count of active RPCs. + activeSinceLastTimerCheck bool // True if there was an RPC since the last timer callback. + lastCallEndTime int64 // Time when the most recent RPC finished, stored as unix nanos. + isIdle bool // True if the channel is in idle mode. + timer *time.Timer // Expires when the idle_timeout fires. +} + +// newIdlenessManager creates a new idleness state manager which attempts to put +// the channel in idle mode when there is no RPC activity for the configured +// idleTimeout. +// +// Idleness support can be disabled by passing a value of 0 for idleTimeout. +func newIdlenessManager(enforcer idlenessEnforcer, idleTimeout time.Duration) *idlenessManager { + if idleTimeout == 0 { + logger.Infof("Channel idleness support explicitly disabled") + return &idlenessManager{isDisabled: true} + } + + i := &idlenessManager{ + enforcer: enforcer, + timeout: int64(idleTimeout), + } + i.timer = newTimer(idleTimeout, i.handleIdleTimeout) + return i +} + +// handleIdleTimeout is the timer callback when idle_timeout expires. +func (i *idlenessManager) handleIdleTimeout() { + i.mu.Lock() + defer i.mu.Unlock() + + // If there are ongoing RPCs, it means the channel is active. Reset the + // timer to fire after a duration of idle_timeout, and return early. + if i.activeCallsCount > 0 { + i.timer.Reset(time.Duration(i.timeout)) + return + } + + // There were some RPCs made since the last time we were here. So, the + // channel is still active. Reschedule the timer to fire after a duration + // of idle_timeout from the time the last call ended. + if i.activeSinceLastTimerCheck { + i.activeSinceLastTimerCheck = false + // It is safe to ignore the return value from Reset() because we are + // already in the timer callback and this is only place from where we + // reset the timer. + i.timer.Reset(time.Duration(i.lastCallEndTime + i.timeout - time.Now().UnixNano())) + return + } + + // There are no ongoing RPCs, and there were no RPCs since the last time we + // were here, we are all set to enter idle mode. + if err := i.enforcer.enterIdleMode(); err != nil { + logger.Warningf("Failed to enter idle mode: %v", err) + return + } + i.isIdle = true +} + +// onCallBegin is invoked by the ClientConn at the start of every RPC. If the +// channel is currently in idle mode, the manager asks the ClientConn to exit +// idle mode, and restarts the timer. The active calls count is incremented and +// the activeness bit is set to true. +func (i *idlenessManager) onCallBegin() error { + if i.isDisabled { + return nil + } + + i.mu.Lock() + defer i.mu.Unlock() + + if i.isIdle { + if err := i.enforcer.exitIdleMode(); err != nil { + return fmt.Errorf("grpc.ClientConn failed to exit idle mode: %v", err) + } + i.timer = newTimer(time.Duration(i.timeout), i.handleIdleTimeout) + i.isIdle = false + } + i.activeCallsCount++ + i.activeSinceLastTimerCheck = true + return nil +} + +// onCallEnd is invoked by the ClientConn at the end of every RPC. The active +// calls count is decremented and `i.lastCallEndTime` is updated. +func (i *idlenessManager) onCallEnd() { + if i.isDisabled { + return + } + + i.mu.Lock() + defer i.mu.Unlock() + + i.activeCallsCount-- + if i.activeCallsCount < 0 { + logger.Errorf("Number of active calls tracked by idleness manager is negative: %d", i.activeCallsCount) + } + i.lastCallEndTime = time.Now().UnixNano() +} + +func (i *idlenessManager) close() { + if i.isDisabled { + return + } + i.timer.Stop() +} diff --git a/idle_test.go b/idle_test.go new file mode 100644 index 000000000000..a4ae666d21a6 --- /dev/null +++ b/idle_test.go @@ -0,0 +1,262 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc + +import ( + "testing" + "time" +) + +const ( + defaultTestIdleTimeout = 500 * time.Millisecond // A short idle_timeout for tests. + defaultTestShortTimeout = 10 * time.Millisecond // A small deadline to wait for events expected to not happen. +) + +type testIdlenessEnforcer struct { + exitIdleCh chan struct{} + enterIdleCh chan struct{} +} + +func (ti *testIdlenessEnforcer) exitIdleMode() error { + ti.exitIdleCh <- struct{}{} + return nil + +} + +func (ti *testIdlenessEnforcer) enterIdleMode() error { + ti.enterIdleCh <- struct{}{} + return nil + +} + +func newTestIdlenessEnforcer() *testIdlenessEnforcer { + return &testIdlenessEnforcer{ + exitIdleCh: make(chan struct{}, 1), + enterIdleCh: make(chan struct{}, 1), + } +} + +// TODO(easwars): add comment +func overrideNewTimer(t *testing.T) <-chan struct{} { + t.Helper() + + ch := make(chan struct{}, 1) + origNewTimer := newTimer + newTimer = func(d time.Duration, callback func()) *time.Timer { + return time.AfterFunc(d, func() { + select { + case ch <- struct{}{}: + default: + } + callback() + }) + } + t.Cleanup(func() { newTimer = origNewTimer }) + return ch +} + +// TestIdlenessManager_Disabled tests the case where the idleness manager is +// disabled by passing an idle_timeout of 0. Verifies the following things: +// - timer callback does not fire +// - an RPC does not trigger a call to exitIdleMode on the ClientConn +// - more calls to RPC termination (as compared to RPC initiation) does not +// result in an error log +func (s) TestIdlenessManager_Disabled(t *testing.T) { + callbackCh := overrideNewTimer(t) + + // Create an idleness manager that is disabled because of idleTimeout being + // set to `0`. + enforcer := newTestIdlenessEnforcer() + mgr := newIdlenessManager(enforcer, time.Duration(0)) + + // Ensure that the timer callback does not fire within a short deadline. + select { + case <-callbackCh: + t.Fatal("Idle timer callback fired when manager is disabled") + case <-time.After(defaultTestShortTimeout): + } + + // The first invocation of onCallBegin() would lead to a call to + // exitIdleMode() on the enforcer, unless the idleness manager is disabled. + mgr.onCallBegin() + select { + case <-enforcer.exitIdleCh: + t.Fatalf("exitIdleMode() called on enforcer when manager is disabled") + case <-time.After(defaultTestShortTimeout): + } + + // If the number of calls to onCallEnd() exceeds the number of calls to + // onCallBegin(), the idleness manager is expected to throw an error log + // (which will cause our TestLogger to fail the test). But since the manager + // is disabled, this should not happen. + mgr.onCallEnd() + mgr.onCallEnd() + + // The idleness manager is explicitly not closed here. But since the manager + // is disabled, it will not start the run goroutine, and hence we expect the + // leakchecker to not find any leaked goroutines. +} + +// TestIdlenessManager_Enabled_TimerFires tests the case where the idle manager +// is enabled. Ensures that when there are no RPCs, the timer callback is +// invoked and the enterIdleMode() method is invoked on the enforcer. +func (s) TestIdlenessManager_Enabled_TimerFires(t *testing.T) { + callbackCh := overrideNewTimer(t) + + enforcer := newTestIdlenessEnforcer() + mgr := newIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout)) + defer mgr.close() + + // Ensure that the timer callback fires within a appropriate amount of time. + select { + case <-callbackCh: + case <-time.After(2 * defaultTestIdleTimeout): + t.Fatal("Timeout waiting for idle timer callback to fire") + } + + // Ensure that the channel moves to idle mode eventually. + select { + case <-enforcer.enterIdleCh: + case <-time.After(defaultTestTimeout): + t.Fatal("Timeout waiting for channel to move to idle") + } +} + +// TestIdlenessManager_Enabled_OngoingCall tests the case where the idle manager +// is enabled. Ensures that when there is an ongoing RPC, the channel does not +// enter idle mode. +func (s) TestIdlenessManager_Enabled_OngoingCall(t *testing.T) { + callbackCh := overrideNewTimer(t) + + enforcer := newTestIdlenessEnforcer() + mgr := newIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout)) + defer mgr.close() + + // Fire up a goroutine that simulates an ongoing RPC that is terminated + // after the timer callback fires for the first time. + timerFired := make(chan struct{}) + go func() { + mgr.onCallBegin() + <-timerFired + mgr.onCallEnd() + }() + + // Ensure that the timer callback fires and unblock the above goroutine. + select { + case <-callbackCh: + close(timerFired) + case <-time.After(2 * defaultTestIdleTimeout): + t.Fatal("Timeout waiting for idle timer callback to fire") + } + + // The invocation of the timer callback should not put the channel in idle + // mode since we had an ongoing RPC. + select { + case <-enforcer.enterIdleCh: + t.Fatalf("enterIdleMode() called on enforcer when active RPC exists") + case <-time.After(defaultTestShortTimeout): + } + + // Since we terminated the ongoing RPC and we have no other active RPCs, the + // channel must move to idle eventually. + select { + case <-enforcer.enterIdleCh: + case <-time.After(defaultTestTimeout): + t.Fatal("Timeout waiting for channel to move to idle") + } +} + +// TestIdlenessManager_Enabled_ActiveSinceLastCheck tests the case where the +// idle manager is enabled. Ensures that when there are active RPCs in the last +// period (even though there is no active call when the timer fires), the +// channel does not enter idle mode. +func (s) TestIdlenessManager_Enabled_ActiveSinceLastCheck(t *testing.T) { + callbackCh := overrideNewTimer(t) + + enforcer := newTestIdlenessEnforcer() + mgr := newIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout)) + defer mgr.close() + + // Fire up a goroutine that simulates unary RPCs until the timer callback + // fires. + timerFired := make(chan struct{}) + go func() { + for ; ; <-time.After(defaultTestShortTimeout) { + mgr.onCallBegin() + mgr.onCallEnd() + + select { + case <-timerFired: + return + default: + } + } + }() + + // Ensure that the timer callback fires, and that we don't enter idle as + // part of this invocation of the timer callback, since we had some RPCs in + // this period. + select { + case <-callbackCh: + close(timerFired) + case <-time.After(2 * defaultTestIdleTimeout): + t.Fatal("Timeout waiting for idle timer callback to fire") + } + select { + case <-enforcer.enterIdleCh: + t.Fatalf("enterIdleMode() called on enforcer when one RPC completed in the last period") + case <-time.After(defaultTestShortTimeout): + } + + // Since the unrary RPC terminated and we have no other active RPCs, the + // channel must move to idle eventually. + select { + case <-enforcer.enterIdleCh: + case <-time.After(defaultTestTimeout): + t.Fatal("Timeout waiting for channel to move to idle") + } +} + +// TestIdlenessManager_Enabled_ExitIdleOnRPC tests the case where the idle +// manager is enabled. Ensures that the channel moves out of idle when an RPC is +// initiated. +func (s) TestIdlenessManager_Enabled_ExitIdleOnRPC(t *testing.T) { + overrideNewTimer(t) + + enforcer := newTestIdlenessEnforcer() + mgr := newIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout)) + defer mgr.close() + + // Ensure that the channel moves to idle since there are no RPCs. + select { + case <-enforcer.enterIdleCh: + case <-time.After(2 * defaultTestIdleTimeout): + t.Fatal("Timeout waiting for channel to move to idle mode") + } + + mgr.onCallBegin() + mgr.onCallEnd() + + // Ensure that the channel moves out of idle as a result of the above RPC. + select { + case <-enforcer.exitIdleCh: + case <-time.After(2 * defaultTestIdleTimeout): + t.Fatal("Timeout waiting for channel to move out of idle mode") + } +} diff --git a/picker_wrapper.go b/picker_wrapper.go index c525dc070fc6..8e24d864986d 100644 --- a/picker_wrapper.go +++ b/picker_wrapper.go @@ -36,6 +36,7 @@ import ( type pickerWrapper struct { mu sync.Mutex done bool + idle bool blockingCh chan struct{} picker balancer.Picker } @@ -47,7 +48,11 @@ func newPickerWrapper() *pickerWrapper { // updatePicker is called by UpdateBalancerState. It unblocks all blocked pick. func (pw *pickerWrapper) updatePicker(p balancer.Picker) { pw.mu.Lock() - if pw.done { + if pw.done || pw.idle { + // There is a small window where a picker update from the LB policy can + // race with the channel going to idle mode. If the picker is idle here, + // it is because the channel asked it to do so, and therefore it is sage + // to ignore the update from the LB policy. pw.mu.Unlock() return } @@ -187,6 +192,25 @@ func (pw *pickerWrapper) close() { close(pw.blockingCh) } +func (pw *pickerWrapper) enterIdleMode() { + pw.mu.Lock() + defer pw.mu.Unlock() + if pw.done { + return + } + pw.idle = true +} + +func (pw *pickerWrapper) exitIdleMode() { + pw.mu.Lock() + defer pw.mu.Unlock() + if pw.done { + return + } + pw.blockingCh = make(chan struct{}) + pw.idle = false +} + // dropError is a wrapper error that indicates the LB policy wishes to drop the // RPC and not retry it. type dropError struct { diff --git a/resolver_conn_wrapper.go b/resolver_conn_wrapper.go index ce12b52ecdc0..5f31133d4292 100644 --- a/resolver_conn_wrapper.go +++ b/resolver_conn_wrapper.go @@ -20,6 +20,7 @@ package grpc import ( "context" + "fmt" "strings" "google.golang.org/grpc/balancer" @@ -44,6 +45,7 @@ type ccResolverWrapper struct { cc resolverStateUpdater channelzID *channelz.Identifier ignoreServiceConfig bool + opts ccResolverWrapperOpts // Outgoing (gRPC --> resolver) and incoming (resolver --> gRPC) calls are // guaranteed to execute in a mutually exclusive manner as they are @@ -72,6 +74,7 @@ func newCCResolverWrapper(cc resolverStateUpdater, opts ccResolverWrapperOpts) ( cc: cc, channelzID: opts.channelzID, ignoreServiceConfig: opts.bOpts.DisableServiceConfig, + opts: opts, serializer: grpcsync.NewCallbackSerializer(ctx), serializerCancel: cancel, } @@ -99,6 +102,42 @@ func (ccr *ccResolverWrapper) close() { ccr.resolver.Close() } +// enterIdleMode is invoked by grpc when the channel enters idle mode upon +// expiry of idle_timeout. This call blocks until the resolver is closed. +func (ccr *ccResolverWrapper) enterIdleMode() { + channelz.Info(logger, ccr.channelzID, "ccResolverWrapper: entering idle mode") + ccr.close() +} + +// exitIdleMode is invoked by grpc when the channel exits idle mode either +// because of an RPC or because of an invocation of the Connect() API. This +// recreates the resolver that was closed previously when entering idle mode. +func (ccr *ccResolverWrapper) exitIdleMode() error { + channelz.Info(logger, ccr.channelzID, "ccResolverWrapper: exiting idle mode") + + ctx, cancel := context.WithCancel(context.Background()) + ccr.serializer = grpcsync.NewCallbackSerializer(ctx) + ccr.serializerCancel = cancel + + // Block the serializer until Build returns and `ccr.resolver` field is set. + // This is required because the resolver might report an error as part of + // Build, which when forwarded to the balancer might result in a + // re-resolution request. These should only be handled after `ccr.resolver` + // is set to the newly built resolver. + done := make(chan struct{}) + ccr.serializer.Schedule(func(_ context.Context) { + <-done + }) + + r, err := ccr.opts.builder.Build(ccr.opts.target, ccr, ccr.opts.bOpts) + if err != nil { + return fmt.Errorf("failed to build resolver when exiting idle mode: %v", err) + } + ccr.resolver = r + close(done) + return nil +} + // UpdateState is called by resolver implementations to report new state to gRPC // which includes addresses and service config. func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error { diff --git a/stream.go b/stream.go index f79e31c147ee..58fbadcc11da 100644 --- a/stream.go +++ b/stream.go @@ -155,6 +155,11 @@ type ClientStream interface { // If none of the above happen, a goroutine and a context will be leaked, and grpc // will not call the optionally-configured stats handler with a stats.End message. func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) { + if err := cc.idlenessMgr.onCallBegin(); err != nil { + return nil, err + } + defer cc.idlenessMgr.onCallEnd() + // allow interceptor to see all applicable call options, which means those // configured as defaults from dial option as well as per-call options opts = combine(cc.dopts.callOptions, opts)