Skip to content

Commit

Permalink
clientv3: clean up logging, clarify var/field names
Browse files Browse the repository at this point in the history
Signed-off-by: Gyu-Ho Lee <[email protected]>
  • Loading branch information
gyuho committed Oct 19, 2017
1 parent ad24700 commit dade08b
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 61 deletions.
30 changes: 15 additions & 15 deletions clientv3/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ type balancer interface {
grpc.Balancer
ConnectNotify() <-chan struct{}

endpoint(host string) string
endpoint(hostPort string) string
endpoints() []string
// pinned returns the current pinned endpoint.
pinned() string
// endpointError handles error from server-side.
endpointError(addr string, err error)
hostPortError(hostPort string, err error)

// up is Up but includes whether the balancer will use the connection.
up(addr grpc.Address) (func(error), bool)
Expand Down Expand Up @@ -95,7 +95,7 @@ type simpleBalancer struct {
// grpc issues TLS cert checks using the string passed into dial so
// that string must be the host. To recover the full scheme://host URL,
// have a map from hosts to the original endpoint.
host2ep map[string]string
hostPort2ep map[string]string

// pinAddr is the currently pinned address; set to the empty string on
// initialization and shutdown.
Expand All @@ -117,7 +117,7 @@ func newSimpleBalancer(eps []string) *simpleBalancer {
downc: make(chan struct{}),
donec: make(chan struct{}),
updateAddrsC: make(chan notifyMsg),
host2ep: getHost2ep(eps),
hostPort2ep: getHostPort2ep(eps),
}
close(sb.downc)
go sb.updateNotifyLoop()
Expand All @@ -134,10 +134,10 @@ func (b *simpleBalancer) ConnectNotify() <-chan struct{} {

func (b *simpleBalancer) ready() <-chan struct{} { return b.readyc }

func (b *simpleBalancer) endpoint(host string) string {
func (b *simpleBalancer) endpoint(hostPort string) string {
b.mu.Lock()
defer b.mu.Unlock()
return b.host2ep[host]
return b.hostPort2ep[hostPort]
}

func (b *simpleBalancer) endpoints() []string {
Expand All @@ -152,9 +152,9 @@ func (b *simpleBalancer) pinned() string {
return b.pinAddr
}

func (b *simpleBalancer) endpointError(addr string, err error) { return }
func (b *simpleBalancer) hostPortError(hostPort string, err error) { return }

func getHost2ep(eps []string) map[string]string {
func getHostPort2ep(eps []string) map[string]string {
hm := make(map[string]string, len(eps))
for i := range eps {
_, host, _ := parseEndpoint(eps[i])
Expand All @@ -164,13 +164,13 @@ func getHost2ep(eps []string) map[string]string {
}

func (b *simpleBalancer) updateAddrs(eps ...string) {
np := getHost2ep(eps)
np := getHostPort2ep(eps)

b.mu.Lock()

match := len(np) == len(b.host2ep)
match := len(np) == len(b.hostPort2ep)
for k, v := range np {
if b.host2ep[k] != v {
if b.hostPort2ep[k] != v {
match = false
break
}
Expand All @@ -181,7 +181,7 @@ func (b *simpleBalancer) updateAddrs(eps ...string) {
return
}

b.host2ep = np
b.hostPort2ep = np
b.addrs, b.eps = eps2addrs(eps), eps

// updating notifyCh can trigger new connections,
Expand Down Expand Up @@ -316,7 +316,7 @@ func (b *simpleBalancer) up(addr grpc.Address) (func(error), bool) {
}
if b.pinAddr != "" {
if logger.V(4) {
logger.Infof("clientv3/balancer: %s is up but not pinned (already pinned %s)", addr.Addr, b.pinAddr)
logger.Infof("clientv3/balancer: %q is up but not pinned (already pinned %q)", addr.Addr, b.pinAddr)
}
return func(err error) {}, false
}
Expand All @@ -325,7 +325,7 @@ func (b *simpleBalancer) up(addr grpc.Address) (func(error), bool) {
b.downc = make(chan struct{})
b.pinAddr = addr.Addr
if logger.V(4) {
logger.Infof("clientv3/balancer: pin %s", addr.Addr)
logger.Infof("clientv3/balancer: pin %q", addr.Addr)
}
// notify client that a connection is up
b.readyOnce.Do(func() { close(b.readyc) })
Expand All @@ -336,7 +336,7 @@ func (b *simpleBalancer) up(addr grpc.Address) (func(error), bool) {
b.pinAddr = ""
b.mu.Unlock()
if logger.V(4) {
logger.Infof("clientv3/balancer: unpin %s (%v)", addr.Addr, err)
logger.Infof("clientv3/balancer: unpin %q (%q)", addr.Addr, err.Error())
}
}, true
}
Expand Down
30 changes: 15 additions & 15 deletions clientv3/health_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type healthBalancer struct {
stopc chan struct{}
stopOnce sync.Once

host2ep map[string]string
hostPort2ep map[string]string

wg sync.WaitGroup
}
Expand All @@ -65,7 +65,7 @@ func newHealthBalancer(b balancer, timeout time.Duration, hc healthCheckFunc) *h
healthCheck: hc,
eps: b.endpoints(),
addrs: eps2addrs(b.endpoints()),
host2ep: getHost2ep(b.endpoints()),
hostPort2ep: getHostPort2ep(b.endpoints()),
unhealthy: make(map[string]time.Time),
stopc: make(chan struct{}),
}
Expand Down Expand Up @@ -98,7 +98,7 @@ func (hb *healthBalancer) Up(addr grpc.Address) func(error) {
hb.mu.Unlock()
f(err)
if logger.V(4) {
logger.Infof("clientv3/health-balancer: %s becomes unhealthy (%v)", addr.Addr, err)
logger.Infof("clientv3/health-balancer: %q becomes unhealthy (%q)", addr.Addr, err.Error())
}
}
}
Expand All @@ -117,9 +117,9 @@ func (hb *healthBalancer) Close() error {
}

func (hb *healthBalancer) updateAddrs(eps ...string) {
addrs, host2ep := eps2addrs(eps), getHost2ep(eps)
addrs, hostPort2ep := eps2addrs(eps), getHostPort2ep(eps)
hb.mu.Lock()
hb.addrs, hb.eps, hb.host2ep = addrs, eps, host2ep
hb.addrs, hb.eps, hb.hostPort2ep = addrs, eps, hostPort2ep
hb.unhealthy = make(map[string]time.Time)
hb.mu.Unlock()
hb.balancer.updateAddrs(eps...)
Expand All @@ -128,7 +128,7 @@ func (hb *healthBalancer) updateAddrs(eps ...string) {
func (hb *healthBalancer) endpoint(host string) string {
hb.mu.RLock()
defer hb.mu.RUnlock()
return hb.host2ep[host]
return hb.hostPort2ep[host]
}

func (hb *healthBalancer) endpoints() []string {
Expand All @@ -143,7 +143,7 @@ func (hb *healthBalancer) updateUnhealthy(timeout time.Duration) {
case <-time.After(timeout):
hb.mu.Lock()
for k, v := range hb.unhealthy {
if _, ok := hb.host2ep[k]; !ok {
if _, ok := hb.hostPort2ep[k]; !ok {
delete(hb.unhealthy, k)
if logger.V(4) {
logger.Infof("clientv3/health-balancer: removes stale host:port %q from unhealthy", k)
Expand All @@ -153,7 +153,7 @@ func (hb *healthBalancer) updateUnhealthy(timeout time.Duration) {
if time.Since(v) > timeout {
delete(hb.unhealthy, k)
if logger.V(4) {
logger.Infof("clientv3/health-balancer: removes %s from unhealthy after %v", k, timeout)
logger.Infof("clientv3/health-balancer: removes %q from unhealthy after %v", k, timeout)
}
}
}
Expand Down Expand Up @@ -185,18 +185,18 @@ func (hb *healthBalancer) liveAddrs() []grpc.Address {
return addrs
}

func (hb *healthBalancer) endpointError(addr string, err error) {
func (hb *healthBalancer) hostPortError(hostPort string, err error) {
hb.mu.Lock()
hb.unhealthy[addr] = time.Now()
hb.unhealthy[hostPort] = time.Now()
hb.mu.Unlock()
if logger.V(4) {
logger.Infof("clientv3/health-balancer: marking %s as unhealthy (%v)", addr, err)
logger.Infof("clientv3/health-balancer: marking %q as unhealthy (%q)", hostPort, err.Error())
}
}

func (hb *healthBalancer) mayPin(addr grpc.Address) bool {
hb.mu.RLock()
if _, ok := hb.host2ep[addr.Addr]; !ok { // stale endpoint
if _, ok := hb.hostPort2ep[addr.Addr]; !ok { // stale endpoint
hb.mu.RUnlock()
return false
}
Expand All @@ -214,7 +214,7 @@ func (hb *healthBalancer) mayPin(addr grpc.Address) bool {
// instead, return before grpc-healthcheck if failed within healthcheck timeout
if elapsed := time.Since(failedTime); elapsed < dur {
if logger.V(4) {
logger.Infof("clientv3/health-balancer: %s is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, dur)
logger.Infof("clientv3/health-balancer: %q is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, dur)
}
return false
}
Expand All @@ -223,15 +223,15 @@ func (hb *healthBalancer) mayPin(addr grpc.Address) bool {
delete(hb.unhealthy, addr.Addr)
hb.mu.Unlock()
if logger.V(4) {
logger.Infof("clientv3/health-balancer: %s is healthy (health check success)", addr.Addr)
logger.Infof("clientv3/health-balancer: %q is healthy (health check success)", addr.Addr)
}
return true
}
hb.mu.Lock()
hb.unhealthy[addr.Addr] = time.Now()
hb.mu.Unlock()
if logger.V(4) {
logger.Infof("clientv3/health-balancer: %s becomes unhealthy (health check failed)", addr.Addr)
logger.Infof("clientv3/health-balancer: %q becomes unhealthy (health check failed)", addr.Addr)
}
return false
}
Expand Down
Loading

0 comments on commit dade08b

Please sign in to comment.