Skip to content

Commit

Permalink
rpc: reduce log spam while clock latencies stay above reporting thres…
Browse files Browse the repository at this point in the history
…hold

Previously, clock latencies would log if they exceed 150% of the running
average.  This would log many times if latency remains in excess of that
threshold (until the average rises), or if latency alternates from
slightly above to slightly below the threshold from noise.  This change
logs once when latency exceeds 150% of the moving average and will not
log again until it drops below 140% of the moving average.

Release note: None
  • Loading branch information
rauchenstein committed Dec 21, 2021
1 parent 40b9a86 commit b735fee
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 18 deletions.
66 changes: 50 additions & 16 deletions pkg/rpc/clock_offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,33 @@ var (
}
)

// A stateful trigger that fires once when exceeding a threshold, then must
// fall below another lower threshold before firing again.
type resettingMaxTrigger bool

func (t *resettingMaxTrigger) triggers(value, resetThreshold, triggerThreshold float64) bool {
if *t {
// This is the "recently triggered" state.
// Never trigger. Transition to "normal" state if below resetThreshold.
if value < resetThreshold {
*t = false
}
} else {
// This is the "normal" state.
// Trigger and transition to "recently triggered" if above triggerThreshold.
if value > triggerThreshold {
*t = true
return true
}
}
return false
}

type latencyInfo struct {
avgNanos ewma.MovingAverage
trigger resettingMaxTrigger
}

// RemoteClockMonitor keeps track of the most recent measurements of remote
// offsets and round-trip latency from this node to connected nodes.
type RemoteClockMonitor struct {
Expand All @@ -67,8 +94,8 @@ type RemoteClockMonitor struct {

mu struct {
syncutil.RWMutex
offsets map[string]RemoteOffset
latenciesNanos map[string]ewma.MovingAverage
offsets map[string]RemoteOffset
latencyInfos map[string]*latencyInfo
}

metrics RemoteClockMetrics
Expand All @@ -83,7 +110,7 @@ func newRemoteClockMonitor(
offsetTTL: offsetTTL,
}
r.mu.offsets = make(map[string]RemoteOffset)
r.mu.latenciesNanos = make(map[string]ewma.MovingAverage)
r.mu.latencyInfos = make(map[string]*latencyInfo)
if histogramWindowInterval == 0 {
histogramWindowInterval = time.Duration(math.MaxInt64)
}
Expand All @@ -107,8 +134,8 @@ func (r *RemoteClockMonitor) Metrics() *RemoteClockMetrics {
func (r *RemoteClockMonitor) Latency(addr string) (time.Duration, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
if avg, ok := r.mu.latenciesNanos[addr]; ok && avg.Value() != 0.0 {
return time.Duration(int64(avg.Value())), true
if info, ok := r.mu.latencyInfos[addr]; ok && info.avgNanos.Value() != 0.0 {
return time.Duration(int64(info.avgNanos.Value())), true
}
return 0, false
}
Expand All @@ -118,9 +145,9 @@ func (r *RemoteClockMonitor) AllLatencies() map[string]time.Duration {
r.mu.RLock()
defer r.mu.RUnlock()
result := make(map[string]time.Duration)
for addr, avg := range r.mu.latenciesNanos {
if avg.Value() != 0.0 {
result[addr] = time.Duration(int64(avg.Value()))
for addr, info := range r.mu.latencyInfos {
if info.avgNanos.Value() != 0.0 {
result[addr] = time.Duration(int64(info.avgNanos.Value()))
}
}
return result
Expand Down Expand Up @@ -167,21 +194,28 @@ func (r *RemoteClockMonitor) UpdateOffset(
}

if roundTripLatency > 0 {
latencyAvg, ok := r.mu.latenciesNanos[addr]
info, ok := r.mu.latencyInfos[addr]
if !ok {
latencyAvg = ewma.NewMovingAverage(avgLatencyMeasurementAge)
r.mu.latenciesNanos[addr] = latencyAvg
info = &latencyInfo{
avgNanos: ewma.NewMovingAverage(avgLatencyMeasurementAge),
}
r.mu.latencyInfos[addr] = info
}

// If the roundtrip jumps by 50% beyond the previously recorded average, report it in logs.
newLatencyf := float64(roundTripLatency.Nanoseconds())
if prevAvg := latencyAvg.Value(); prevAvg != 0 && newLatencyf > prevAvg*1.50 {
prevAvg := info.avgNanos.Value()
info.avgNanos.Add(newLatencyf)
r.metrics.LatencyHistogramNanos.RecordValue(roundTripLatency.Nanoseconds())

// If the roundtrip jumps by 50% beyond the previously recorded average, report it in logs.
// Don't report it again until it falls below 40% above the average.
// (Also requires latency > 1ms to avoid trigger on noise on low-latency connections and
// the running average to be non-zero to avoid triggering on startup.)
if newLatencyf > 1e6 && prevAvg > 0.0 &&
info.trigger.triggers(newLatencyf, prevAvg*1.4, prevAvg*1.5) {
log.Health.Warningf(ctx, "latency jump (prev avg %.2fms, current %.2fms)",
prevAvg/1e6, newLatencyf/1e6)
}

latencyAvg.Add(newLatencyf)
r.metrics.LatencyHistogramNanos.RecordValue(roundTripLatency.Nanoseconds())
}

if log.V(2) {
Expand Down
30 changes: 28 additions & 2 deletions pkg/rpc/clock_offset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ func TestLatencies(t *testing.T) {
for i := 0; i < 11; i++ {
monitor.UpdateOffset(context.Background(), emptyKey, RemoteOffset{}, 0)
}
if l, ok := monitor.mu.latenciesNanos[emptyKey]; ok {
t.Errorf("expected no latency measurement for %q, got %v", emptyKey, l.Value())
if l, ok := monitor.mu.latencyInfos[emptyKey]; ok {
t.Errorf("expected no latency measurement for %q, got %v", emptyKey, l.avgNanos.Value())
}

testCases := []struct {
Expand All @@ -225,3 +225,29 @@ func TestLatencies(t *testing.T) {
}
}
}

func TestResettingMaxTrigger(t *testing.T) {
defer leaktest.AfterTest(t)()

var tr resettingMaxTrigger
testdata := []struct {
expected bool
value float64
resetThreshold float64
triggerThreshold float64
}{
{false, 5, 10, 20},
{false, 15, 10, 20},
{true, 25, 10, 20},
{false, 25, 10, 20},
{false, 15, 10, 20},
{false, 25, 10, 20},
{false, 5, 10, 20},
{true, 25, 10, 20},
}
for i, td := range testdata {
if tr.triggers(td.value, td.resetThreshold, td.triggerThreshold) != td.expected {
t.Errorf("Failed in iteration %v: %v", i, td)
}
}
}

0 comments on commit b735fee

Please sign in to comment.