diff --git a/src/aggregator/aggregator/aggregator.go b/src/aggregator/aggregator/aggregator.go index e510a57a3d..0f8d4cd644 100644 --- a/src/aggregator/aggregator/aggregator.go +++ b/src/aggregator/aggregator/aggregator.go @@ -183,7 +183,7 @@ func (agg *aggregator) AddUntimed( metric unaggregated.MetricUnion, metadatas metadata.StagedMetadatas, ) error { - callStart := agg.nowFn() + sw := agg.metrics.addUntimed.SuccessLatencyStopwatch() if err := agg.checkMetricType(metric); err != nil { agg.metrics.addUntimed.ReportError(err) return err @@ -197,7 +197,8 @@ func (agg *aggregator) AddUntimed( agg.metrics.addUntimed.ReportError(err) return err } - agg.metrics.addUntimed.ReportSuccess(agg.nowFn().Sub(callStart)) + agg.metrics.addUntimed.ReportSuccess() + sw.Stop() return nil } @@ -205,7 +206,7 @@ func (agg *aggregator) AddTimed( metric aggregated.Metric, metadata metadata.TimedMetadata, ) error { - callStart := agg.nowFn() + sw := agg.metrics.addTimed.SuccessLatencyStopwatch() agg.metrics.timed.Inc(1) shard, err := agg.shardFor(metric.ID) if err != nil { @@ -216,7 +217,8 @@ func (agg *aggregator) AddTimed( agg.metrics.addTimed.ReportError(err) return err } - agg.metrics.addTimed.ReportSuccess(agg.nowFn().Sub(callStart)) + agg.metrics.addTimed.ReportSuccess() + sw.Stop() return nil } @@ -224,7 +226,7 @@ func (agg *aggregator) AddTimedWithStagedMetadatas( metric aggregated.Metric, metas metadata.StagedMetadatas, ) error { - callStart := agg.nowFn() + sw := agg.metrics.addTimed.SuccessLatencyStopwatch() agg.metrics.timed.Inc(1) shard, err := agg.shardFor(metric.ID) if err != nil { @@ -235,7 +237,8 @@ func (agg *aggregator) AddTimedWithStagedMetadatas( agg.metrics.addTimed.ReportError(err) return err } - agg.metrics.addTimed.ReportSuccess(agg.nowFn().Sub(callStart)) + agg.metrics.addTimed.ReportSuccess() + sw.Stop() return nil } @@ -243,7 +246,7 @@ func (agg *aggregator) AddForwarded( metric aggregated.ForwardedMetric, metadata metadata.ForwardMetadata, ) error { - callStart := agg.nowFn() + sw := agg.metrics.addForwarded.SuccessLatencyStopwatch() agg.metrics.forwarded.Inc(1) shard, err := agg.shardFor(metric.ID) if err != nil { @@ -255,7 +258,8 @@ func (agg *aggregator) AddForwarded( return err } callEnd := agg.nowFn() - agg.metrics.addForwarded.ReportSuccess(callEnd.Sub(callStart)) + agg.metrics.addForwarded.ReportSuccess() + sw.Stop() forwardingDelay := time.Duration(callEnd.UnixNano() - metric.TimeNanos) agg.metrics.addForwarded.ReportForwardingLatency( metadata.StoragePolicy.Resolution().Window, @@ -269,7 +273,7 @@ func (agg *aggregator) AddPassthrough( metric aggregated.Metric, storagePolicy policy.StoragePolicy, ) error { - callStart := agg.nowFn() + sw := agg.metrics.addPassthrough.SuccessLatencyStopwatch() agg.metrics.passthrough.Inc(1) if agg.electionManager.ElectionState() == FollowerState { @@ -298,7 +302,8 @@ func (agg *aggregator) AddPassthrough( agg.metrics.addPassthrough.ReportError(err) return err } - agg.metrics.addPassthrough.ReportSuccess(agg.nowFn().Sub(callStart)) + agg.metrics.addPassthrough.ReportSuccess() + sw.Stop() return nil } @@ -758,9 +763,12 @@ func newAggregatorAddMetricMetrics( } } -func (m *aggregatorAddMetricMetrics) ReportSuccess(d time.Duration) { +func (m *aggregatorAddMetricMetrics) SuccessLatencyStopwatch() tally.Stopwatch { + return m.successLatency.Start() +} + +func (m *aggregatorAddMetricMetrics) ReportSuccess() { m.success.Inc(1) - m.successLatency.Record(d) } func (m *aggregatorAddMetricMetrics) ReportError(err error) { diff --git a/src/aggregator/aggregator/aggregator_test.go b/src/aggregator/aggregator/aggregator_test.go index 217c05ffa1..fd8f53f668 100644 --- a/src/aggregator/aggregator/aggregator_test.go +++ b/src/aggregator/aggregator/aggregator_test.go @@ -946,8 +946,9 @@ func TestAggregatorOwnedShards(t *testing.T) { func TestAggregatorAddMetricMetrics(t *testing.T) { s := tally.NewTestScope("testScope", nil) - m := newAggregatorAddUntimedMetrics(s, instrument.TimerOptions{}) - m.ReportSuccess(time.Second) + m := newAggregatorAddUntimedMetrics(s, instrument.TimerOptions{StandardSampleRate: 0.001}) + m.ReportSuccess() + m.SuccessLatencyStopwatch().Stop() m.ReportError(errInvalidMetricType) m.ReportError(errShardNotOwned) m.ReportError(errAggregatorShardNotWriteable) @@ -980,9 +981,8 @@ func TestAggregatorAddMetricMetrics(t *testing.T) { for _, id := range []string{ "testScope.success-latency+", } { - ti, exists := timers[id] + _, exists := timers[id] require.True(t, exists) - require.Equal(t, []time.Duration{time.Second}, ti.Values()) } // Validate we do not have any gauges. @@ -992,7 +992,8 @@ func TestAggregatorAddMetricMetrics(t *testing.T) { func TestAggregatorAddTimedMetrics(t *testing.T) { s := tally.NewTestScope("testScope", nil) m := newAggregatorAddTimedMetrics(s, instrument.TimerOptions{}) - m.ReportSuccess(time.Second) + m.ReportSuccess() + m.SuccessLatencyStopwatch().Stop() m.ReportError(errShardNotOwned) m.ReportError(errAggregatorShardNotWriteable) m.ReportError(errWriteNewMetricRateLimitExceeded) @@ -1027,9 +1028,8 @@ func TestAggregatorAddTimedMetrics(t *testing.T) { for _, id := range []string{ "testScope.success-latency+", } { - ti, exists := timers[id] + _, exists := timers[id] require.True(t, exists) - require.Equal(t, []time.Duration{time.Second}, ti.Values()) } // Validate we do not have any gauges.