Skip to content

Commit

Permalink
Clean up comments + stats
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Mason <[email protected]>
  • Loading branch information
ajm188 committed Jun 30, 2021
1 parent 4c00201 commit b5908a8
Showing 1 changed file with 31 additions and 22 deletions.
53 changes: 31 additions & 22 deletions go/vt/vttablet/grpctmclient/cached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type cachedConnDialer struct {
evict []*cachedConn
evictSorted bool
connWaitSema *sync2.Semaphore
capacity int
}

var dialerStats = struct {
Expand All @@ -82,15 +83,17 @@ var dialerStats = struct {
ConnReuse: stats.NewGauge("tabletmanagerclient_cachedconn_reuse", "number of times a call to dial() was able to reuse an existing connection"),
ConnNew: stats.NewGauge("tabletmanagerclient_cachedconn_new", "number of times a call to dial() resulted in a dialing a new grpc clientconn"),
DialTimeouts: stats.NewGauge("tabletmanagerclient_cachedconn_dial_timeouts", "number of context timeouts during dial()"),
DialTimings: stats.NewTimings("tabletmanagerclient_cachedconn_dialtimings", "timings for various dial paths", "path", "rlock_fast", "sema_fast", "sema_poll"),
DialTimings: stats.NewTimings("tabletmanagerclient_cachedconn_dial_timings", "timings for various dial paths", "path", "cache_fast", "sema_fast", "sema_poll"),
}

// NewCachedConnClient returns a grpc Client that caches connections to the different tablets
// NewCachedConnClient returns a grpc Client that caches connections to the
// different tablets.
func NewCachedConnClient(capacity int) *Client {
dialer := &cachedConnDialer{
conns: make(map[string]*cachedConn, capacity),
evict: make([]*cachedConn, 0, capacity),
connWaitSema: sync2.NewSemaphore(capacity, 0),
capacity: capacity,
}
return &Client{dialer}
}
Expand All @@ -115,7 +118,7 @@ func (dialer *cachedConnDialer) dial(ctx context.Context, tablet *topodatapb.Tab
addr := getTabletAddr(tablet)

if client, closer, found, err := dialer.tryFromCache(addr, &dialer.m); found {
dialerStats.DialTimings.Add("rlock_fast", time.Since(start))
dialerStats.DialTimings.Add("cache_fast", time.Since(start))
return client, closer, err
}

Expand All @@ -126,7 +129,9 @@ func (dialer *cachedConnDialer) dial(ctx context.Context, tablet *topodatapb.Tab

// Check if another goroutine managed to dial a conn for the same addr
// while we were waiting for the write lock. This is identical to the
// read-lock section above.
// read-lock section above, except we release the connWaitSema if we
// are able to use the cache, allowing another goroutine to dial a new
// conn instead.
if client, closer, found, err := dialer.tryFromCache(addr, &dialer.m); found {
dialer.connWaitSema.Release()
return client, closer, err
Expand Down Expand Up @@ -160,8 +165,8 @@ func (dialer *cachedConnDialer) dial(ctx context.Context, tablet *topodatapb.Tab
// if not nil, will be used to wrap the lookup and redial in that lock. This
// function can be called in situations where the conns map is locked
// externally (like in pollOnce), so we do not want to manage the locks here. In
// other cases (like in the rlock_fast path of dial()), we pass in the RLocker
// to ensure we have a read lock on the cache for the duration of the call.
// other cases (like in the cache_fast path of dial()), we pass in the dialer.m
// to ensure we have a lock on the cache for the duration of the call.
func (dialer *cachedConnDialer) tryFromCache(addr string, locker sync.Locker) (client tabletmanagerservicepb.TabletManagerClient, closer io.Closer, found bool, err error) {
if locker != nil {
locker.Lock()
Expand All @@ -179,11 +184,10 @@ func (dialer *cachedConnDialer) tryFromCache(addr string, locker sync.Locker) (c
// pollOnce is called on each iteration of the polling loop in dial(). It:
// - locks the conns cache for writes
// - attempts to get a connection from the cache. If found, redial() it and exit.
// - locks the queue
// - peeks at the head of the eviction queue. if the peeked conn has no refs, it
// is unused, and can be evicted to make room for the new connection to addr.
// If the peeked conn has refs, exit.
// - pops the conn we just peeked from the queue, delete it from the cache, and
// - pops the conn we just peeked from the queue, deletes it from the cache, and
// close the underlying ClientConn for that conn.
// - attempt a newdial. if the newdial fails, it will release a slot on the
// connWaitSema, so another dial() call can successfully acquire it to dial
Expand Down Expand Up @@ -226,8 +230,6 @@ func (dialer *cachedConnDialer) pollOnce(ctx context.Context, addr string) (clie
// It returns the three-tuple of client-interface, closer, and error that the
// main dial func returns.
func (dialer *cachedConnDialer) newdial(ctx context.Context, addr string) (tabletmanagerservicepb.TabletManagerClient, io.Closer, error) {
dialerStats.ConnNew.Add(1)

opt, err := grpcclient.SecureDialOption(*cert, *key, *ca, *name)
if err != nil {
dialer.connWaitSema.Release()
Expand All @@ -253,27 +255,31 @@ func (dialer *cachedConnDialer) newdial(ctx context.Context, addr string) (table
return dialer.redialLocked(conn)
}

dialerStats.ConnNew.Add(1)

conn := &cachedConn{
TabletManagerClient: tabletmanagerservicepb.NewTabletManagerClient(cc),
cc: cc,
lastAccessTime: time.Now(),
refs: 1,
addr: addr,
}

// NOTE: we deliberately do not set dialer.evictSorted=false here. Since
// cachedConns are evicted from the front of the queue, and we are appending
// to the end, if there is already a second evictable connection, it will be
// at the front of the queue, so we can speed up the edge case where we need
// to evict multiple connections in a row.
dialer.evict = append(dialer.evict, conn)
// dialer.evictSorted = false -- no need to do this here
dialer.conns[addr] = conn

return dialer.connWithCloser(conn)
}

// redialLocked takes an already-dialed connection in the cache does all the work of
// lending that connection out to one more caller. this should only ever be
// called while holding at least the RLock on dialer.m (but the write lock is
// fine too), to prevent the connection from getting evicted out from under us.
//
// It returns the three-tuple of client-interface, closer, and error that the
// main dial func returns.
// redialLocked takes an already-dialed connection in the cache does all the
// work of lending that connection out to one more caller. It returns the
// three-tuple of client-interface, closer, and error that the main dial func
// returns.
func (dialer *cachedConnDialer) redialLocked(conn *cachedConn) (tabletmanagerservicepb.TabletManagerClient, io.Closer, error) {
dialerStats.ConnReuse.Add(1)
conn.lastAccessTime = time.Now()
Expand All @@ -298,10 +304,13 @@ func (dialer *cachedConnDialer) connWithCloser(conn *cachedConn) (tabletmanagers
// Close closes all currently cached connections, ***regardless of whether
// those connections are in use***. Calling Close therefore will fail any RPCs
// using currently lent-out connections, and, furthermore, will invalidate the
// io.Closer that was returned for that connection from dialer.dial().
// io.Closer that was returned for that connection from dialer.dial(). When
// calling those io.Closers, they will still lock the dialer's mutex, and then
// perform needless operations that will slow down dial throughput, but not
// actually impact the correctness of the internal state of the dialer.
//
// As a result, it is not safe to reuse a cachedConnDialer after calling Close,
// and you should instead obtain a new one by calling either
// As a result, while it is safe to reuse a cachedConnDialer after calling Close,
// it will be less performant than getting a new one, either by calling
// tmclient.TabletManagerClient() with
// TabletManagerProtocol set to "grpc-cached", or by calling
// grpctmclient.NewCachedConnClient directly.
Expand All @@ -314,7 +323,7 @@ func (dialer *cachedConnDialer) Close() {
delete(dialer.conns, conn.addr)
dialer.connWaitSema.Release()
}
dialer.evict = nil
dialer.evict = make([]*cachedConn, 0, dialer.capacity)
}

func getTabletAddr(tablet *topodatapb.Tablet) string {
Expand Down

0 comments on commit b5908a8

Please sign in to comment.