From 0e5b271fad18dac331dc031c7dd376e7f117b42f Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 12 May 2023 16:58:29 -0700 Subject: [PATCH] support an atomic idleness manager --- clientconn.go | 21 ++-- idle.go | 209 +++++++++++++++++++++++++++++++--------- idle_test.go | 261 ++++++++++++++++++++++++++++---------------------- 3 files changed, 328 insertions(+), 163 deletions(-) diff --git a/clientconn.go b/clientconn.go index 8de93c1c0316..72a695a96309 100644 --- a/clientconn.go +++ b/clientconn.go @@ -288,7 +288,12 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * cc.mu.Unlock() // Configure idleness support with configured idle_timeout or default. - cc.idlenessMgr = newIdlenessManager(cc, cc.dopts.idleTimeout) + if cc.dopts.idleTimeout == 0 { + cc.idlenessMgr = newDisabledIdlenessManager() + } else { + // cc.idlenessMgr = newMutexIdlenessManager(cc, cc.dopts.idleTimeout) + cc.idlenessMgr = newAtomicIdlenessManager(cc, cc.dopts.idleTimeout) + } // Return early for non-blocking dials. if !cc.dopts.block { @@ -392,17 +397,19 @@ func (cc *ClientConn) enterIdleMode() error { // don't have to do this when exiting idle mode. conns := cc.conns cc.conns = make(map[*addrConn]struct{}) - cc.csMgr.updateState(connectivity.Idle) cc.resolverWrapper.enterIdleMode() cc.blockingpicker.enterIdleMode() cc.balancerWrapper.enterIdleMode() + cc.csMgr.updateState(connectivity.Idle) cc.mu.Unlock() - cc.addTraceEvent("entering idle mode") - for ac := range conns { - ac.tearDown(errConnIdling) - } + go func() { + cc.addTraceEvent("entering idle mode") + for ac := range conns { + ac.tearDown(errConnIdling) + } + }() return nil } @@ -599,7 +606,7 @@ type ClientConn struct { channelzID *channelz.Identifier // Channelz identifier for the channel. resolverBuilder resolver.Builder // See parseTargetAndFindResolver(). balancerWrapper *ccBalancerWrapper // Uses gracefulswitch.balancer underneath. - idlenessMgr *idlenessManager + idlenessMgr idlenessManager // The following provide their own synchronization, and therefore don't // require cc.mu to be held to access them. diff --git a/idle.go b/idle.go index 642835342dbc..2415402ef29a 100644 --- a/idle.go +++ b/idle.go @@ -20,14 +20,15 @@ package grpc import ( "sync" + "sync/atomic" "time" "google.golang.org/grpc/codes" - "google.golang.org/grpc/internal/status" + "google.golang.org/grpc/status" ) // For overriding in unit tests. -var newTimer = func(d time.Duration, f func()) *time.Timer { +var timeAfterFunc = func(d time.Duration, f func()) *time.Timer { return time.AfterFunc(d, f) } @@ -38,16 +39,30 @@ type idlenessEnforcer interface { enterIdleMode() error } -// idlenessManager contains functionality to track RPC activity on the channel -// and uses this to instruct the channel to enter or exit idle mode as -// appropriate. -type idlenessManager struct { +// idlenessManager defines the functionality required to track RPC activity on a +// channel. +type idlenessManager interface { + onCallBegin() error + onCallEnd() + close() +} + +type disabledIdlenessManager struct{} + +func (disabledIdlenessManager) onCallBegin() error { return nil } +func (disabledIdlenessManager) onCallEnd() {} +func (disabledIdlenessManager) close() {} + +func newDisabledIdlenessManager() idlenessManager { return disabledIdlenessManager{} } + +// mutexIdlenessManager implements the idlenessManager interface and uses a +// mutex to synchronize access to shared state. +type mutexIdlenessManager struct { // The following fields are set when the manager is created and are never // written to after that. Therefore these can be accessed without a mutex. - enforcer idlenessEnforcer // Functionality provided by grpc.ClientConn. - timeout int64 // Idle timeout duration nanos stored as an int64. - isDisabled bool // Disabled if idle_timeout is set to 0. + enforcer idlenessEnforcer // Functionality provided by grpc.ClientConn. + timeout int64 // Idle timeout duration nanos stored as an int64. // All state maintained by the manager is guarded by this mutex. mu sync.Mutex @@ -58,27 +73,19 @@ type idlenessManager struct { timer *time.Timer // Expires when the idle_timeout fires. } -// newIdlenessManager creates a new idleness state manager which attempts to put -// the channel in idle mode when there is no RPC activity for the configured -// idleTimeout. -// -// Idleness support can be disabled by passing a value of 0 for idleTimeout. -func newIdlenessManager(enforcer idlenessEnforcer, idleTimeout time.Duration) *idlenessManager { - if idleTimeout == 0 { - logger.Infof("Channel idleness support explicitly disabled") - return &idlenessManager{isDisabled: true} - } - - i := &idlenessManager{ +// newMutexIdlenessManager creates a new mutexIdlennessManager. A non-zero value +// must be passed for idle timeout. +func newMutexIdlenessManager(enforcer idlenessEnforcer, idleTimeout time.Duration) idlenessManager { + i := &mutexIdlenessManager{ enforcer: enforcer, timeout: int64(idleTimeout), } - i.timer = newTimer(idleTimeout, i.handleIdleTimeout) + i.timer = timeAfterFunc(idleTimeout, i.handleIdleTimeout) return i } // handleIdleTimeout is the timer callback when idle_timeout expires. -func (i *idlenessManager) handleIdleTimeout() { +func (i *mutexIdlenessManager) handleIdleTimeout() { i.mu.Lock() defer i.mu.Unlock() @@ -114,33 +121,28 @@ func (i *idlenessManager) handleIdleTimeout() { // channel is currently in idle mode, the manager asks the ClientConn to exit // idle mode, and restarts the timer. The active calls count is incremented and // the activeness bit is set to true. -func (i *idlenessManager) onCallBegin() error { - if i.isDisabled { - return nil - } - +func (i *mutexIdlenessManager) onCallBegin() error { i.mu.Lock() defer i.mu.Unlock() - if i.isIdle { - if err := i.enforcer.exitIdleMode(); err != nil { - return status.Errorf(codes.Internal, "grpc: ClientConn failed to exit idle mode: %v", err) - } - i.timer = newTimer(time.Duration(i.timeout), i.handleIdleTimeout) - i.isIdle = false - } i.activeCallsCount++ i.activeSinceLastTimerCheck = true + + if !i.isIdle { + return nil + } + + if err := i.enforcer.exitIdleMode(); err != nil { + return status.Errorf(codes.Internal, "grpc: ClientConn failed to exit idle mode: %v", err) + } + i.timer = timeAfterFunc(time.Duration(i.timeout), i.handleIdleTimeout) + i.isIdle = false return nil } // onCallEnd is invoked by the ClientConn at the end of every RPC. The active // calls count is decremented and `i.lastCallEndTime` is updated. -func (i *idlenessManager) onCallEnd() { - if i.isDisabled { - return - } - +func (i *mutexIdlenessManager) onCallEnd() { i.mu.Lock() defer i.mu.Unlock() @@ -151,11 +153,130 @@ func (i *idlenessManager) onCallEnd() { i.lastCallEndTime = time.Now().UnixNano() } -func (i *idlenessManager) close() { - if i.isDisabled { - return - } +func (i *mutexIdlenessManager) close() { i.mu.Lock() i.timer.Stop() i.mu.Unlock() } + +type atomicIdlenessState struct { + activeCallsCount int // Count of active RPCs. + activeSinceLastTimerCheck bool // True if there was an RPC since the last timer callback. + lastCallEndTime int64 // Time when the most recent RPC finished, stored as unix nanos. +} + +type atomicIdlenessManager struct { + enforcer idlenessEnforcer // Functionality provided by grpc.ClientConn. + timeout int64 // Idle timeout duration nanos stored as an int64. + state atomic.Value // Of type `idlenessState`. + timer atomic.Value // Of type `*time.Timer` + isIdle atomic.Bool // True if the channel is in idle mode. + isClosed atomic.Bool +} + +// newAtomicIdlenessManager creates a new atomicIdlenessManager. A non-zero +// value must be passed for idle timeout. +func newAtomicIdlenessManager(enforcer idlenessEnforcer, idleTimeout time.Duration) idlenessManager { + i := &atomicIdlenessManager{ + enforcer: enforcer, + timeout: int64(idleTimeout), + } + i.state.Store(atomicIdlenessState{}) + i.timer.Store(timeAfterFunc(idleTimeout, i.handleIdleTimeout)) + return i +} + +func (i *atomicIdlenessManager) handleIdleTimeout() { + if i.isClosed.Load() { + return + } + + state := i.state.Load().(atomicIdlenessState) + var timeoutDuration time.Duration + switch { + case state.activeCallsCount > 0: + timeoutDuration = time.Duration(i.timeout) + case state.activeSinceLastTimerCheck: + for { + currState := i.state.Load().(atomicIdlenessState) + newState := currState + newState.activeSinceLastTimerCheck = false + if i.state.CompareAndSwap(currState, newState) { + break + } + } + timeoutDuration = time.Duration(state.lastCallEndTime + i.timeout - time.Now().UnixNano()) + default: + logger.Info("easwars: in timer callback default case") + if err := i.enforcer.enterIdleMode(); err != nil { + logger.Warningf("Failed to enter idle mode: %v", err) + return + } + logger.Info("easwars: setting isIdle to true") + i.isIdle.Store(true) + return + } + + // It is safe to ignore the return value from Reset() because we are + // already in the timer callback and this is only place from where we + // reset the timer. + timer := i.timer.Load().(*time.Timer) + timer.Reset(timeoutDuration) +} + +func (i *atomicIdlenessManager) onCallBegin() error { + if i.isClosed.Load() { + return nil + } + + logger.Infof("easwars: in onCallBegin") + + for { + currState := i.state.Load().(atomicIdlenessState) + newState := currState + newState.activeCallsCount++ + newState.activeSinceLastTimerCheck = true + if !i.state.CompareAndSwap(currState, newState) { + continue + } + + logger.Infof("easwars: swapped state to %v", newState) + + if !i.isIdle.Load() { + logger.Infof("easwars: channel is not idle, returning") + return nil + } + + if err := i.enforcer.exitIdleMode(); err != nil { + return status.Errorf(codes.Internal, "grpc: ClientConn failed to exit idle mode: %v", err) + } + i.timer.Store(timeAfterFunc(time.Duration(i.timeout), i.handleIdleTimeout)) + i.isIdle.Store(false) + return nil + } +} + +func (i *atomicIdlenessManager) onCallEnd() { + if i.isClosed.Load() { + return + } + + for { + currState := i.state.Load().(atomicIdlenessState) + newState := currState + newState.activeCallsCount-- + if newState.activeCallsCount < 0 { + logger.Errorf("Number of active calls tracked by idleness manager is negative: %d", newState.activeCallsCount) + } + newState.lastCallEndTime = time.Now().UnixNano() + if i.state.CompareAndSwap(currState, newState) { + break + } + } +} + +func (i *atomicIdlenessManager) close() { + i.isClosed.Store(true) + timer := i.timer.Load().(*time.Timer) + timer.Stop() +} diff --git a/idle_test.go b/idle_test.go index 3fbd32e99f30..0fd8c212cd38 100644 --- a/idle_test.go +++ b/idle_test.go @@ -58,8 +58,8 @@ func overrideNewTimer(t *testing.T) <-chan struct{} { t.Helper() ch := make(chan struct{}, 1) - origNewTimer := newTimer - newTimer = func(d time.Duration, callback func()) *time.Timer { + origTimeAfterFunc := timeAfterFunc + timeAfterFunc = func(d time.Duration, callback func()) *time.Timer { return time.AfterFunc(d, func() { select { case ch <- struct{}{}: @@ -68,7 +68,7 @@ func overrideNewTimer(t *testing.T) <-chan struct{} { callback() }) } - t.Cleanup(func() { newTimer = origNewTimer }) + t.Cleanup(func() { timeAfterFunc = origTimeAfterFunc }) return ch } @@ -84,7 +84,7 @@ func (s) TestIdlenessManager_Disabled(t *testing.T) { // Create an idleness manager that is disabled because of idleTimeout being // set to `0`. enforcer := newTestIdlenessEnforcer() - mgr := newIdlenessManager(enforcer, time.Duration(0)) + mgr := newDisabledIdlenessManager() // Ensure that the timer callback does not fire within a short deadline. select { @@ -118,24 +118,34 @@ func (s) TestIdlenessManager_Disabled(t *testing.T) { // is enabled. Ensures that when there are no RPCs, the timer callback is // invoked and the enterIdleMode() method is invoked on the enforcer. func (s) TestIdlenessManager_Enabled_TimerFires(t *testing.T) { - callbackCh := overrideNewTimer(t) + for _, name := range []string{"mutex", "atomic"} { + t.Run(name, func(t *testing.T) { + callbackCh := overrideNewTimer(t) + + enforcer := newTestIdlenessEnforcer() + var mgr idlenessManager + if name == "mutex" { + mgr = newMutexIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout)) + } else if name == "atomic" { + mgr = newAtomicIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout)) + } + defer mgr.close() - enforcer := newTestIdlenessEnforcer() - mgr := newIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout)) - defer mgr.close() + // Ensure that the timer callback fires within a appropriate amount of time. + select { + case <-callbackCh: + case <-time.After(2 * defaultTestIdleTimeout): + t.Fatal("Timeout waiting for idle timer callback to fire") + } - // Ensure that the timer callback fires within a appropriate amount of time. - select { - case <-callbackCh: - case <-time.After(2 * defaultTestIdleTimeout): - t.Fatal("Timeout waiting for idle timer callback to fire") - } + // Ensure that the channel moves to idle mode eventually. + select { + case <-enforcer.enterIdleCh: + case <-time.After(defaultTestTimeout): + t.Fatal("Timeout waiting for channel to move to idle") + } - // Ensure that the channel moves to idle mode eventually. - select { - case <-enforcer.enterIdleCh: - case <-time.After(defaultTestTimeout): - t.Fatal("Timeout waiting for channel to move to idle") + }) } } @@ -143,43 +153,52 @@ func (s) TestIdlenessManager_Enabled_TimerFires(t *testing.T) { // is enabled. Ensures that when there is an ongoing RPC, the channel does not // enter idle mode. func (s) TestIdlenessManager_Enabled_OngoingCall(t *testing.T) { - callbackCh := overrideNewTimer(t) - - enforcer := newTestIdlenessEnforcer() - mgr := newIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout)) - defer mgr.close() - - // Fire up a goroutine that simulates an ongoing RPC that is terminated - // after the timer callback fires for the first time. - timerFired := make(chan struct{}) - go func() { - mgr.onCallBegin() - <-timerFired - mgr.onCallEnd() - }() - - // Ensure that the timer callback fires and unblock the above goroutine. - select { - case <-callbackCh: - close(timerFired) - case <-time.After(2 * defaultTestIdleTimeout): - t.Fatal("Timeout waiting for idle timer callback to fire") - } + for _, name := range []string{"mutex", "atomic"} { + t.Run(name, func(t *testing.T) { + callbackCh := overrideNewTimer(t) + + enforcer := newTestIdlenessEnforcer() + var mgr idlenessManager + if name == "mutex" { + mgr = newMutexIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout)) + } else if name == "atomic" { + mgr = newAtomicIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout)) + } + defer mgr.close() + + // Fire up a goroutine that simulates an ongoing RPC that is terminated + // after the timer callback fires for the first time. + timerFired := make(chan struct{}) + go func() { + mgr.onCallBegin() + <-timerFired + mgr.onCallEnd() + }() + + // Ensure that the timer callback fires and unblock the above goroutine. + select { + case <-callbackCh: + close(timerFired) + case <-time.After(2 * defaultTestIdleTimeout): + t.Fatal("Timeout waiting for idle timer callback to fire") + } - // The invocation of the timer callback should not put the channel in idle - // mode since we had an ongoing RPC. - select { - case <-enforcer.enterIdleCh: - t.Fatalf("enterIdleMode() called on enforcer when active RPC exists") - case <-time.After(defaultTestShortTimeout): - } + // The invocation of the timer callback should not put the channel in idle + // mode since we had an ongoing RPC. + select { + case <-enforcer.enterIdleCh: + t.Fatalf("enterIdleMode() called on enforcer when active RPC exists") + case <-time.After(defaultTestShortTimeout): + } - // Since we terminated the ongoing RPC and we have no other active RPCs, the - // channel must move to idle eventually. - select { - case <-enforcer.enterIdleCh: - case <-time.After(defaultTestTimeout): - t.Fatal("Timeout waiting for channel to move to idle") + // Since we terminated the ongoing RPC and we have no other active RPCs, the + // channel must move to idle eventually. + select { + case <-enforcer.enterIdleCh: + case <-time.After(defaultTestTimeout): + t.Fatal("Timeout waiting for channel to move to idle") + } + }) } } @@ -188,49 +207,58 @@ func (s) TestIdlenessManager_Enabled_OngoingCall(t *testing.T) { // period (even though there is no active call when the timer fires), the // channel does not enter idle mode. func (s) TestIdlenessManager_Enabled_ActiveSinceLastCheck(t *testing.T) { - callbackCh := overrideNewTimer(t) - - enforcer := newTestIdlenessEnforcer() - mgr := newIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout)) - defer mgr.close() - - // Fire up a goroutine that simulates unary RPCs until the timer callback - // fires. - timerFired := make(chan struct{}) - go func() { - for ; ; <-time.After(defaultTestShortTimeout) { - mgr.onCallBegin() - mgr.onCallEnd() - + for _, name := range []string{"mutex", "atomic"} { + t.Run(name, func(t *testing.T) { + callbackCh := overrideNewTimer(t) + + enforcer := newTestIdlenessEnforcer() + var mgr idlenessManager + if name == "mutex" { + mgr = newMutexIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout)) + } else if name == "atomic" { + mgr = newAtomicIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout)) + } + defer mgr.close() + + // Fire up a goroutine that simulates unary RPCs until the timer callback + // fires. + timerFired := make(chan struct{}) + go func() { + for ; ; <-time.After(defaultTestShortTimeout) { + mgr.onCallBegin() + mgr.onCallEnd() + + select { + case <-timerFired: + return + default: + } + } + }() + + // Ensure that the timer callback fires, and that we don't enter idle as + // part of this invocation of the timer callback, since we had some RPCs in + // this period. select { - case <-timerFired: - return - default: + case <-callbackCh: + close(timerFired) + case <-time.After(2 * defaultTestIdleTimeout): + t.Fatal("Timeout waiting for idle timer callback to fire") + } + select { + case <-enforcer.enterIdleCh: + t.Fatalf("enterIdleMode() called on enforcer when one RPC completed in the last period") + case <-time.After(defaultTestShortTimeout): } - } - }() - - // Ensure that the timer callback fires, and that we don't enter idle as - // part of this invocation of the timer callback, since we had some RPCs in - // this period. - select { - case <-callbackCh: - close(timerFired) - case <-time.After(2 * defaultTestIdleTimeout): - t.Fatal("Timeout waiting for idle timer callback to fire") - } - select { - case <-enforcer.enterIdleCh: - t.Fatalf("enterIdleMode() called on enforcer when one RPC completed in the last period") - case <-time.After(defaultTestShortTimeout): - } - // Since the unrary RPC terminated and we have no other active RPCs, the - // channel must move to idle eventually. - select { - case <-enforcer.enterIdleCh: - case <-time.After(defaultTestTimeout): - t.Fatal("Timeout waiting for channel to move to idle") + // Since the unrary RPC terminated and we have no other active RPCs, the + // channel must move to idle eventually. + select { + case <-enforcer.enterIdleCh: + case <-time.After(defaultTestTimeout): + t.Fatal("Timeout waiting for channel to move to idle") + } + }) } } @@ -238,26 +266,35 @@ func (s) TestIdlenessManager_Enabled_ActiveSinceLastCheck(t *testing.T) { // manager is enabled. Ensures that the channel moves out of idle when an RPC is // initiated. func (s) TestIdlenessManager_Enabled_ExitIdleOnRPC(t *testing.T) { - overrideNewTimer(t) - - enforcer := newTestIdlenessEnforcer() - mgr := newIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout)) - defer mgr.close() + for _, name := range []string{"mutex", "atomic"} { + t.Run(name, func(t *testing.T) { + overrideNewTimer(t) + + enforcer := newTestIdlenessEnforcer() + var mgr idlenessManager + if name == "mutex" { + mgr = newMutexIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout)) + } else if name == "atomic" { + mgr = newAtomicIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout)) + } + defer mgr.close() - // Ensure that the channel moves to idle since there are no RPCs. - select { - case <-enforcer.enterIdleCh: - case <-time.After(2 * defaultTestIdleTimeout): - t.Fatal("Timeout waiting for channel to move to idle mode") - } + // Ensure that the channel moves to idle since there are no RPCs. + select { + case <-enforcer.enterIdleCh: + case <-time.After(2 * defaultTestIdleTimeout): + t.Fatal("Timeout waiting for channel to move to idle mode") + } - mgr.onCallBegin() - mgr.onCallEnd() + mgr.onCallBegin() + mgr.onCallEnd() - // Ensure that the channel moves out of idle as a result of the above RPC. - select { - case <-enforcer.exitIdleCh: - case <-time.After(2 * defaultTestIdleTimeout): - t.Fatal("Timeout waiting for channel to move out of idle mode") + // Ensure that the channel moves out of idle as a result of the above RPC. + select { + case <-enforcer.exitIdleCh: + case <-time.After(2 * defaultTestIdleTimeout): + t.Fatal("Timeout waiting for channel to move out of idle mode") + } + }) } }