Skip to content

Commit

Permalink
support an atomic idleness manager
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed May 13, 2023
1 parent 4461ad0 commit 0e5b271
Show file tree
Hide file tree
Showing 3 changed files with 328 additions and 163 deletions.
21 changes: 14 additions & 7 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,12 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
cc.mu.Unlock()

// Configure idleness support with configured idle_timeout or default.
cc.idlenessMgr = newIdlenessManager(cc, cc.dopts.idleTimeout)
if cc.dopts.idleTimeout == 0 {
cc.idlenessMgr = newDisabledIdlenessManager()
} else {
// cc.idlenessMgr = newMutexIdlenessManager(cc, cc.dopts.idleTimeout)
cc.idlenessMgr = newAtomicIdlenessManager(cc, cc.dopts.idleTimeout)
}

// Return early for non-blocking dials.
if !cc.dopts.block {
Expand Down Expand Up @@ -392,17 +397,19 @@ func (cc *ClientConn) enterIdleMode() error {
// don't have to do this when exiting idle mode.
conns := cc.conns
cc.conns = make(map[*addrConn]struct{})
cc.csMgr.updateState(connectivity.Idle)

cc.resolverWrapper.enterIdleMode()
cc.blockingpicker.enterIdleMode()
cc.balancerWrapper.enterIdleMode()
cc.csMgr.updateState(connectivity.Idle)
cc.mu.Unlock()

cc.addTraceEvent("entering idle mode")
for ac := range conns {
ac.tearDown(errConnIdling)
}
go func() {
cc.addTraceEvent("entering idle mode")
for ac := range conns {
ac.tearDown(errConnIdling)
}
}()
return nil
}

Expand Down Expand Up @@ -599,7 +606,7 @@ type ClientConn struct {
channelzID *channelz.Identifier // Channelz identifier for the channel.
resolverBuilder resolver.Builder // See parseTargetAndFindResolver().
balancerWrapper *ccBalancerWrapper // Uses gracefulswitch.balancer underneath.
idlenessMgr *idlenessManager
idlenessMgr idlenessManager

// The following provide their own synchronization, and therefore don't
// require cc.mu to be held to access them.
Expand Down
209 changes: 165 additions & 44 deletions idle.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ package grpc

import (
"sync"
"sync/atomic"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/status"
"google.golang.org/grpc/status"
)

// For overriding in unit tests.
var newTimer = func(d time.Duration, f func()) *time.Timer {
var timeAfterFunc = func(d time.Duration, f func()) *time.Timer {
return time.AfterFunc(d, f)
}

Expand All @@ -38,16 +39,30 @@ type idlenessEnforcer interface {
enterIdleMode() error
}

// idlenessManager contains functionality to track RPC activity on the channel
// and uses this to instruct the channel to enter or exit idle mode as
// appropriate.
type idlenessManager struct {
// idlenessManager defines the functionality required to track RPC activity on a
// channel.
type idlenessManager interface {
onCallBegin() error
onCallEnd()
close()
}

type disabledIdlenessManager struct{}

func (disabledIdlenessManager) onCallBegin() error { return nil }
func (disabledIdlenessManager) onCallEnd() {}
func (disabledIdlenessManager) close() {}

func newDisabledIdlenessManager() idlenessManager { return disabledIdlenessManager{} }

// mutexIdlenessManager implements the idlenessManager interface and uses a
// mutex to synchronize access to shared state.
type mutexIdlenessManager struct {
// The following fields are set when the manager is created and are never
// written to after that. Therefore these can be accessed without a mutex.

enforcer idlenessEnforcer // Functionality provided by grpc.ClientConn.
timeout int64 // Idle timeout duration nanos stored as an int64.
isDisabled bool // Disabled if idle_timeout is set to 0.
enforcer idlenessEnforcer // Functionality provided by grpc.ClientConn.
timeout int64 // Idle timeout duration nanos stored as an int64.

// All state maintained by the manager is guarded by this mutex.
mu sync.Mutex
Expand All @@ -58,27 +73,19 @@ type idlenessManager struct {
timer *time.Timer // Expires when the idle_timeout fires.
}

// newIdlenessManager creates a new idleness state manager which attempts to put
// the channel in idle mode when there is no RPC activity for the configured
// idleTimeout.
//
// Idleness support can be disabled by passing a value of 0 for idleTimeout.
func newIdlenessManager(enforcer idlenessEnforcer, idleTimeout time.Duration) *idlenessManager {
if idleTimeout == 0 {
logger.Infof("Channel idleness support explicitly disabled")
return &idlenessManager{isDisabled: true}
}

i := &idlenessManager{
// newMutexIdlenessManager creates a new mutexIdlennessManager. A non-zero value
// must be passed for idle timeout.
func newMutexIdlenessManager(enforcer idlenessEnforcer, idleTimeout time.Duration) idlenessManager {
i := &mutexIdlenessManager{
enforcer: enforcer,
timeout: int64(idleTimeout),
}
i.timer = newTimer(idleTimeout, i.handleIdleTimeout)
i.timer = timeAfterFunc(idleTimeout, i.handleIdleTimeout)
return i
}

// handleIdleTimeout is the timer callback when idle_timeout expires.
func (i *idlenessManager) handleIdleTimeout() {
func (i *mutexIdlenessManager) handleIdleTimeout() {
i.mu.Lock()
defer i.mu.Unlock()

Expand Down Expand Up @@ -114,33 +121,28 @@ func (i *idlenessManager) handleIdleTimeout() {
// channel is currently in idle mode, the manager asks the ClientConn to exit
// idle mode, and restarts the timer. The active calls count is incremented and
// the activeness bit is set to true.
func (i *idlenessManager) onCallBegin() error {
if i.isDisabled {
return nil
}

func (i *mutexIdlenessManager) onCallBegin() error {
i.mu.Lock()
defer i.mu.Unlock()

if i.isIdle {
if err := i.enforcer.exitIdleMode(); err != nil {
return status.Errorf(codes.Internal, "grpc: ClientConn failed to exit idle mode: %v", err)
}
i.timer = newTimer(time.Duration(i.timeout), i.handleIdleTimeout)
i.isIdle = false
}
i.activeCallsCount++
i.activeSinceLastTimerCheck = true

if !i.isIdle {
return nil
}

if err := i.enforcer.exitIdleMode(); err != nil {
return status.Errorf(codes.Internal, "grpc: ClientConn failed to exit idle mode: %v", err)
}
i.timer = timeAfterFunc(time.Duration(i.timeout), i.handleIdleTimeout)
i.isIdle = false
return nil
}

// onCallEnd is invoked by the ClientConn at the end of every RPC. The active
// calls count is decremented and `i.lastCallEndTime` is updated.
func (i *idlenessManager) onCallEnd() {
if i.isDisabled {
return
}

func (i *mutexIdlenessManager) onCallEnd() {
i.mu.Lock()
defer i.mu.Unlock()

Expand All @@ -151,11 +153,130 @@ func (i *idlenessManager) onCallEnd() {
i.lastCallEndTime = time.Now().UnixNano()
}

func (i *idlenessManager) close() {
if i.isDisabled {
return
}
func (i *mutexIdlenessManager) close() {
i.mu.Lock()
i.timer.Stop()
i.mu.Unlock()
}

type atomicIdlenessState struct {
activeCallsCount int // Count of active RPCs.
activeSinceLastTimerCheck bool // True if there was an RPC since the last timer callback.
lastCallEndTime int64 // Time when the most recent RPC finished, stored as unix nanos.
}

type atomicIdlenessManager struct {
enforcer idlenessEnforcer // Functionality provided by grpc.ClientConn.
timeout int64 // Idle timeout duration nanos stored as an int64.
state atomic.Value // Of type `idlenessState`.
timer atomic.Value // Of type `*time.Timer`
isIdle atomic.Bool // True if the channel is in idle mode.
isClosed atomic.Bool
}

// newAtomicIdlenessManager creates a new atomicIdlenessManager. A non-zero
// value must be passed for idle timeout.
func newAtomicIdlenessManager(enforcer idlenessEnforcer, idleTimeout time.Duration) idlenessManager {
i := &atomicIdlenessManager{
enforcer: enforcer,
timeout: int64(idleTimeout),
}
i.state.Store(atomicIdlenessState{})
i.timer.Store(timeAfterFunc(idleTimeout, i.handleIdleTimeout))
return i
}

func (i *atomicIdlenessManager) handleIdleTimeout() {
if i.isClosed.Load() {
return
}

state := i.state.Load().(atomicIdlenessState)
var timeoutDuration time.Duration
switch {
case state.activeCallsCount > 0:
timeoutDuration = time.Duration(i.timeout)
case state.activeSinceLastTimerCheck:
for {
currState := i.state.Load().(atomicIdlenessState)
newState := currState
newState.activeSinceLastTimerCheck = false
if i.state.CompareAndSwap(currState, newState) {
break
}
}
timeoutDuration = time.Duration(state.lastCallEndTime + i.timeout - time.Now().UnixNano())
default:
logger.Info("easwars: in timer callback default case")
if err := i.enforcer.enterIdleMode(); err != nil {
logger.Warningf("Failed to enter idle mode: %v", err)
return
}
logger.Info("easwars: setting isIdle to true")
i.isIdle.Store(true)
return
}

// It is safe to ignore the return value from Reset() because we are
// already in the timer callback and this is only place from where we
// reset the timer.
timer := i.timer.Load().(*time.Timer)
timer.Reset(timeoutDuration)
}

func (i *atomicIdlenessManager) onCallBegin() error {
if i.isClosed.Load() {
return nil
}

logger.Infof("easwars: in onCallBegin")

for {
currState := i.state.Load().(atomicIdlenessState)
newState := currState
newState.activeCallsCount++
newState.activeSinceLastTimerCheck = true
if !i.state.CompareAndSwap(currState, newState) {
continue
}

logger.Infof("easwars: swapped state to %v", newState)

if !i.isIdle.Load() {
logger.Infof("easwars: channel is not idle, returning")
return nil
}

if err := i.enforcer.exitIdleMode(); err != nil {
return status.Errorf(codes.Internal, "grpc: ClientConn failed to exit idle mode: %v", err)
}
i.timer.Store(timeAfterFunc(time.Duration(i.timeout), i.handleIdleTimeout))
i.isIdle.Store(false)
return nil
}
}

func (i *atomicIdlenessManager) onCallEnd() {
if i.isClosed.Load() {
return
}

for {
currState := i.state.Load().(atomicIdlenessState)
newState := currState
newState.activeCallsCount--
if newState.activeCallsCount < 0 {
logger.Errorf("Number of active calls tracked by idleness manager is negative: %d", newState.activeCallsCount)
}
newState.lastCallEndTime = time.Now().UnixNano()
if i.state.CompareAndSwap(currState, newState) {
break
}
}
}

func (i *atomicIdlenessManager) close() {
i.isClosed.Store(true)
timer := i.timer.Load().(*time.Timer)
timer.Stop()
}
Loading

0 comments on commit 0e5b271

Please sign in to comment.