From b735fee80504eb0aff09bcad71cee40a5403bb66 Mon Sep 17 00:00:00 2001 From: Jay Rauchenstein Date: Mon, 13 Dec 2021 15:08:11 -0500 Subject: [PATCH] rpc: reduce log spam while clock latencies stay above reporting threshold Previously, clock latencies would log if they exceed 150% of the running average. This would log many times if latency remains in excess of that threshold (until the average rises), or if latency alternates from slightly above to slightly below the threshold from noise. This change logs once when latency exceeds 150% of the moving average and will not log again until it drops below 140% of the moving average. Release note: None --- pkg/rpc/clock_offset.go | 66 +++++++++++++++++++++++++++--------- pkg/rpc/clock_offset_test.go | 30 ++++++++++++++-- 2 files changed, 78 insertions(+), 18 deletions(-) diff --git a/pkg/rpc/clock_offset.go b/pkg/rpc/clock_offset.go index bd2f107fab6e..0d21f7265c1a 100644 --- a/pkg/rpc/clock_offset.go +++ b/pkg/rpc/clock_offset.go @@ -59,6 +59,33 @@ var ( } ) +// A stateful trigger that fires once when exceeding a threshold, then must +// fall below another lower threshold before firing again. +type resettingMaxTrigger bool + +func (t *resettingMaxTrigger) triggers(value, resetThreshold, triggerThreshold float64) bool { + if *t { + // This is the "recently triggered" state. + // Never trigger. Transition to "normal" state if below resetThreshold. + if value < resetThreshold { + *t = false + } + } else { + // This is the "normal" state. + // Trigger and transition to "recently triggered" if above triggerThreshold. + if value > triggerThreshold { + *t = true + return true + } + } + return false +} + +type latencyInfo struct { + avgNanos ewma.MovingAverage + trigger resettingMaxTrigger +} + // RemoteClockMonitor keeps track of the most recent measurements of remote // offsets and round-trip latency from this node to connected nodes. type RemoteClockMonitor struct { @@ -67,8 +94,8 @@ type RemoteClockMonitor struct { mu struct { syncutil.RWMutex - offsets map[string]RemoteOffset - latenciesNanos map[string]ewma.MovingAverage + offsets map[string]RemoteOffset + latencyInfos map[string]*latencyInfo } metrics RemoteClockMetrics @@ -83,7 +110,7 @@ func newRemoteClockMonitor( offsetTTL: offsetTTL, } r.mu.offsets = make(map[string]RemoteOffset) - r.mu.latenciesNanos = make(map[string]ewma.MovingAverage) + r.mu.latencyInfos = make(map[string]*latencyInfo) if histogramWindowInterval == 0 { histogramWindowInterval = time.Duration(math.MaxInt64) } @@ -107,8 +134,8 @@ func (r *RemoteClockMonitor) Metrics() *RemoteClockMetrics { func (r *RemoteClockMonitor) Latency(addr string) (time.Duration, bool) { r.mu.RLock() defer r.mu.RUnlock() - if avg, ok := r.mu.latenciesNanos[addr]; ok && avg.Value() != 0.0 { - return time.Duration(int64(avg.Value())), true + if info, ok := r.mu.latencyInfos[addr]; ok && info.avgNanos.Value() != 0.0 { + return time.Duration(int64(info.avgNanos.Value())), true } return 0, false } @@ -118,9 +145,9 @@ func (r *RemoteClockMonitor) AllLatencies() map[string]time.Duration { r.mu.RLock() defer r.mu.RUnlock() result := make(map[string]time.Duration) - for addr, avg := range r.mu.latenciesNanos { - if avg.Value() != 0.0 { - result[addr] = time.Duration(int64(avg.Value())) + for addr, info := range r.mu.latencyInfos { + if info.avgNanos.Value() != 0.0 { + result[addr] = time.Duration(int64(info.avgNanos.Value())) } } return result @@ -167,21 +194,28 @@ func (r *RemoteClockMonitor) UpdateOffset( } if roundTripLatency > 0 { - latencyAvg, ok := r.mu.latenciesNanos[addr] + info, ok := r.mu.latencyInfos[addr] if !ok { - latencyAvg = ewma.NewMovingAverage(avgLatencyMeasurementAge) - r.mu.latenciesNanos[addr] = latencyAvg + info = &latencyInfo{ + avgNanos: ewma.NewMovingAverage(avgLatencyMeasurementAge), + } + r.mu.latencyInfos[addr] = info } - // If the roundtrip jumps by 50% beyond the previously recorded average, report it in logs. newLatencyf := float64(roundTripLatency.Nanoseconds()) - if prevAvg := latencyAvg.Value(); prevAvg != 0 && newLatencyf > prevAvg*1.50 { + prevAvg := info.avgNanos.Value() + info.avgNanos.Add(newLatencyf) + r.metrics.LatencyHistogramNanos.RecordValue(roundTripLatency.Nanoseconds()) + + // If the roundtrip jumps by 50% beyond the previously recorded average, report it in logs. + // Don't report it again until it falls below 40% above the average. + // (Also requires latency > 1ms to avoid trigger on noise on low-latency connections and + // the running average to be non-zero to avoid triggering on startup.) + if newLatencyf > 1e6 && prevAvg > 0.0 && + info.trigger.triggers(newLatencyf, prevAvg*1.4, prevAvg*1.5) { log.Health.Warningf(ctx, "latency jump (prev avg %.2fms, current %.2fms)", prevAvg/1e6, newLatencyf/1e6) } - - latencyAvg.Add(newLatencyf) - r.metrics.LatencyHistogramNanos.RecordValue(roundTripLatency.Nanoseconds()) } if log.V(2) { diff --git a/pkg/rpc/clock_offset_test.go b/pkg/rpc/clock_offset_test.go index 6079a0ecb328..505d3f81ed87 100644 --- a/pkg/rpc/clock_offset_test.go +++ b/pkg/rpc/clock_offset_test.go @@ -197,8 +197,8 @@ func TestLatencies(t *testing.T) { for i := 0; i < 11; i++ { monitor.UpdateOffset(context.Background(), emptyKey, RemoteOffset{}, 0) } - if l, ok := monitor.mu.latenciesNanos[emptyKey]; ok { - t.Errorf("expected no latency measurement for %q, got %v", emptyKey, l.Value()) + if l, ok := monitor.mu.latencyInfos[emptyKey]; ok { + t.Errorf("expected no latency measurement for %q, got %v", emptyKey, l.avgNanos.Value()) } testCases := []struct { @@ -225,3 +225,29 @@ func TestLatencies(t *testing.T) { } } } + +func TestResettingMaxTrigger(t *testing.T) { + defer leaktest.AfterTest(t)() + + var tr resettingMaxTrigger + testdata := []struct { + expected bool + value float64 + resetThreshold float64 + triggerThreshold float64 + }{ + {false, 5, 10, 20}, + {false, 15, 10, 20}, + {true, 25, 10, 20}, + {false, 25, 10, 20}, + {false, 15, 10, 20}, + {false, 25, 10, 20}, + {false, 5, 10, 20}, + {true, 25, 10, 20}, + } + for i, td := range testdata { + if tr.triggers(td.value, td.resetThreshold, td.triggerThreshold) != td.expected { + t.Errorf("Failed in iteration %v: %v", i, td) + } + } +}