diff --git a/go/vt/vttablet/grpctmclient/cached_client.go b/go/vt/vttablet/grpctmclient/cached_client.go index 6e5b7e4b151..2e55e62a79f 100644 --- a/go/vt/vttablet/grpctmclient/cached_client.go +++ b/go/vt/vttablet/grpctmclient/cached_client.go @@ -71,6 +71,7 @@ type cachedConnDialer struct { evict []*cachedConn evictSorted bool connWaitSema *sync2.Semaphore + capacity int } var dialerStats = struct { @@ -82,15 +83,17 @@ var dialerStats = struct { ConnReuse: stats.NewGauge("tabletmanagerclient_cachedconn_reuse", "number of times a call to dial() was able to reuse an existing connection"), ConnNew: stats.NewGauge("tabletmanagerclient_cachedconn_new", "number of times a call to dial() resulted in a dialing a new grpc clientconn"), DialTimeouts: stats.NewGauge("tabletmanagerclient_cachedconn_dial_timeouts", "number of context timeouts during dial()"), - DialTimings: stats.NewTimings("tabletmanagerclient_cachedconn_dialtimings", "timings for various dial paths", "path", "rlock_fast", "sema_fast", "sema_poll"), + DialTimings: stats.NewTimings("tabletmanagerclient_cachedconn_dial_timings", "timings for various dial paths", "path", "cache_fast", "sema_fast", "sema_poll"), } -// NewCachedConnClient returns a grpc Client that caches connections to the different tablets +// NewCachedConnClient returns a grpc Client that caches connections to the +// different tablets. func NewCachedConnClient(capacity int) *Client { dialer := &cachedConnDialer{ conns: make(map[string]*cachedConn, capacity), evict: make([]*cachedConn, 0, capacity), connWaitSema: sync2.NewSemaphore(capacity, 0), + capacity: capacity, } return &Client{dialer} } @@ -115,7 +118,7 @@ func (dialer *cachedConnDialer) dial(ctx context.Context, tablet *topodatapb.Tab addr := getTabletAddr(tablet) if client, closer, found, err := dialer.tryFromCache(addr, &dialer.m); found { - dialerStats.DialTimings.Add("rlock_fast", time.Since(start)) + dialerStats.DialTimings.Add("cache_fast", time.Since(start)) return client, closer, err } @@ -126,7 +129,9 @@ func (dialer *cachedConnDialer) dial(ctx context.Context, tablet *topodatapb.Tab // Check if another goroutine managed to dial a conn for the same addr // while we were waiting for the write lock. This is identical to the - // read-lock section above. + // read-lock section above, except we release the connWaitSema if we + // are able to use the cache, allowing another goroutine to dial a new + // conn instead. if client, closer, found, err := dialer.tryFromCache(addr, &dialer.m); found { dialer.connWaitSema.Release() return client, closer, err @@ -160,8 +165,8 @@ func (dialer *cachedConnDialer) dial(ctx context.Context, tablet *topodatapb.Tab // if not nil, will be used to wrap the lookup and redial in that lock. This // function can be called in situations where the conns map is locked // externally (like in pollOnce), so we do not want to manage the locks here. In -// other cases (like in the rlock_fast path of dial()), we pass in the RLocker -// to ensure we have a read lock on the cache for the duration of the call. +// other cases (like in the cache_fast path of dial()), we pass in the dialer.m +// to ensure we have a lock on the cache for the duration of the call. func (dialer *cachedConnDialer) tryFromCache(addr string, locker sync.Locker) (client tabletmanagerservicepb.TabletManagerClient, closer io.Closer, found bool, err error) { if locker != nil { locker.Lock() @@ -179,11 +184,10 @@ func (dialer *cachedConnDialer) tryFromCache(addr string, locker sync.Locker) (c // pollOnce is called on each iteration of the polling loop in dial(). It: // - locks the conns cache for writes // - attempts to get a connection from the cache. If found, redial() it and exit. -// - locks the queue // - peeks at the head of the eviction queue. if the peeked conn has no refs, it // is unused, and can be evicted to make room for the new connection to addr. // If the peeked conn has refs, exit. -// - pops the conn we just peeked from the queue, delete it from the cache, and +// - pops the conn we just peeked from the queue, deletes it from the cache, and // close the underlying ClientConn for that conn. // - attempt a newdial. if the newdial fails, it will release a slot on the // connWaitSema, so another dial() call can successfully acquire it to dial @@ -226,8 +230,6 @@ func (dialer *cachedConnDialer) pollOnce(ctx context.Context, addr string) (clie // It returns the three-tuple of client-interface, closer, and error that the // main dial func returns. func (dialer *cachedConnDialer) newdial(ctx context.Context, addr string) (tabletmanagerservicepb.TabletManagerClient, io.Closer, error) { - dialerStats.ConnNew.Add(1) - opt, err := grpcclient.SecureDialOption(*cert, *key, *ca, *name) if err != nil { dialer.connWaitSema.Release() @@ -253,6 +255,8 @@ func (dialer *cachedConnDialer) newdial(ctx context.Context, addr string) (table return dialer.redialLocked(conn) } + dialerStats.ConnNew.Add(1) + conn := &cachedConn{ TabletManagerClient: tabletmanagerservicepb.NewTabletManagerClient(cc), cc: cc, @@ -260,20 +264,22 @@ func (dialer *cachedConnDialer) newdial(ctx context.Context, addr string) (table refs: 1, addr: addr, } + + // NOTE: we deliberately do not set dialer.evictSorted=false here. Since + // cachedConns are evicted from the front of the queue, and we are appending + // to the end, if there is already a second evictable connection, it will be + // at the front of the queue, so we can speed up the edge case where we need + // to evict multiple connections in a row. dialer.evict = append(dialer.evict, conn) - // dialer.evictSorted = false -- no need to do this here dialer.conns[addr] = conn return dialer.connWithCloser(conn) } -// redialLocked takes an already-dialed connection in the cache does all the work of -// lending that connection out to one more caller. this should only ever be -// called while holding at least the RLock on dialer.m (but the write lock is -// fine too), to prevent the connection from getting evicted out from under us. -// -// It returns the three-tuple of client-interface, closer, and error that the -// main dial func returns. +// redialLocked takes an already-dialed connection in the cache does all the +// work of lending that connection out to one more caller. It returns the +// three-tuple of client-interface, closer, and error that the main dial func +// returns. func (dialer *cachedConnDialer) redialLocked(conn *cachedConn) (tabletmanagerservicepb.TabletManagerClient, io.Closer, error) { dialerStats.ConnReuse.Add(1) conn.lastAccessTime = time.Now() @@ -298,10 +304,13 @@ func (dialer *cachedConnDialer) connWithCloser(conn *cachedConn) (tabletmanagers // Close closes all currently cached connections, ***regardless of whether // those connections are in use***. Calling Close therefore will fail any RPCs // using currently lent-out connections, and, furthermore, will invalidate the -// io.Closer that was returned for that connection from dialer.dial(). +// io.Closer that was returned for that connection from dialer.dial(). When +// calling those io.Closers, they will still lock the dialer's mutex, and then +// perform needless operations that will slow down dial throughput, but not +// actually impact the correctness of the internal state of the dialer. // -// As a result, it is not safe to reuse a cachedConnDialer after calling Close, -// and you should instead obtain a new one by calling either +// As a result, while it is safe to reuse a cachedConnDialer after calling Close, +// it will be less performant than getting a new one, either by calling // tmclient.TabletManagerClient() with // TabletManagerProtocol set to "grpc-cached", or by calling // grpctmclient.NewCachedConnClient directly. @@ -314,7 +323,7 @@ func (dialer *cachedConnDialer) Close() { delete(dialer.conns, conn.addr) dialer.connWaitSema.Release() } - dialer.evict = nil + dialer.evict = make([]*cachedConn, 0, dialer.capacity) } func getTabletAddr(tablet *topodatapb.Tablet) string {