Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
101909: metric: use cumulative count instead of windowed count in tsdb r=aadityasondhi a=aadityasondhi

Previously, we were writing the windowed count of a histogram into tsdb.
This meant that on each tick, the count reset. This is useful for
calculating averages and quantiles using windowed histograms, but makes
little sense to record for `count`.

This patch now uses the cumulative count in tsdb.

This patch also adds a `-sum` field to maintain a record of the
cumulative sum along with the cumulative counts.

Fixes #98745

Release note (bug fix): Timeseries metric counts will now show
cumulative counts for a histogram rather than the windowed count. A
`-sum` timeseries is also exported to keep track of the cumulative sum
of all samples in the histogram.

103330: roachtest: fix cdc test timestamp logging r=aliher1911 a=aliher1911

This commit fixes format args of logging message.

Current logging:
```
8.224822939s was spent validating this resolved timestamp: %!s(MISSING)
```

Fixed logging
```
207.587058ms was spent validating this resolved timestamp: 1684170215.036471445,0
```
Epic: none

Release note: None

103342: kvserver: deflake `TestRestoreReplicas` r=erikgrinaker a=erikgrinaker

We recently began eagerly initializing replicas on startup when using expiration-based leases. This can cause elections on startup such that the Raft leadership moves around for a bit before settling down at the leaseholder. This test expected the first store to be able to acquire Raft leadership and the lease, but that wouldn't hold if the second replica had already acquired leadership.

This patch changes the test to keep looking until a leaseholder is established.

Resolves #103251.
Epic: none

Release note: None

103346: roachtest: disable more expiration lease metamorphism r=erikgrinaker a=erikgrinaker

Resolves #103347.
Resolves #103340.
Touches #98799.
Touches #99087.

Epic: none
Release note: None

Co-authored-by: Aaditya Sondhi <[email protected]>
Co-authored-by: Oleg Afanasyev <[email protected]>
Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
4 people committed May 15, 2023
5 parents 7b86c7c + b5bac44 + 8229505 + 2eadbcc + fb5cff9 commit f56f57e
Show file tree
Hide file tree
Showing 20 changed files with 166 additions and 120 deletions.
13 changes: 8 additions & 5 deletions pkg/ccl/sqlproxyccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,8 @@ func TestConnector_dialTenantCluster(t *testing.T) {
// Assert existing calls.
require.Equal(t, 1, dialSQLServerCount)
require.Equal(t, 1, reportFailureFnCount)
require.Equal(t, c.DialTenantLatency.TotalCount(), int64(1))
count, _ := c.DialTenantLatency.Total()
require.Equal(t, count, int64(1))
require.Equal(t, c.DialTenantRetries.Count(), int64(0))

// Invoke dial tenant with a failure to ReportFailure. Final error
Expand All @@ -453,7 +454,8 @@ func TestConnector_dialTenantCluster(t *testing.T) {
// Assert existing calls.
require.Equal(t, 2, dialSQLServerCount)
require.Equal(t, 2, reportFailureFnCount)
require.Equal(t, c.DialTenantLatency.TotalCount(), int64(2))
count, _ = c.DialTenantLatency.Total()
require.Equal(t, count, int64(2))
require.Equal(t, c.DialTenantRetries.Count(), int64(0))
})

Expand All @@ -478,8 +480,8 @@ func TestConnector_dialTenantCluster(t *testing.T) {
conn, err := c.dialTenantCluster(ctx, nil /* requester */)
require.EqualError(t, err, "baz")
require.Nil(t, conn)

require.Equal(t, c.DialTenantLatency.TotalCount(), int64(1))
count, _ := c.DialTenantLatency.Total()
require.Equal(t, count, int64(1))
require.Equal(t, c.DialTenantRetries.Count(), int64(0))
})

Expand Down Expand Up @@ -551,7 +553,8 @@ func TestConnector_dialTenantCluster(t *testing.T) {
require.Equal(t, 3, addrLookupFnCount)
require.Equal(t, 2, dialSQLServerCount)
require.Equal(t, 1, reportFailureFnCount)
require.Equal(t, c.DialTenantLatency.TotalCount(), int64(1))
count, _ := c.DialTenantLatency.Total()
require.Equal(t, count, int64(1))
require.Equal(t, c.DialTenantRetries.Count(), int64(2))
})

Expand Down
28 changes: 16 additions & 12 deletions pkg/ccl/sqlproxyccl/proxy_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,8 @@ func TestProxyAgainstSecureCRDB(t *testing.T) {
})
}
require.Equal(t, int64(4), s.metrics.SuccessfulConnCount.Count())
require.Equal(t, int64(4), s.metrics.ConnectionLatency.TotalCount())
count, _ := s.metrics.ConnectionLatency.Total()
require.Equal(t, int64(4), count)
require.Equal(t, int64(2), s.metrics.AuthFailedCount.Count())
require.Equal(t, int64(1), s.metrics.RoutingErrCount.Count())
}
Expand Down Expand Up @@ -611,7 +612,8 @@ func TestProxyTLSClose(t *testing.T) {
_ = conn.Close(ctx)

require.Equal(t, int64(1), s.metrics.SuccessfulConnCount.Count())
require.Equal(t, int64(1), s.metrics.ConnectionLatency.TotalCount())
count, _ := s.metrics.ConnectionLatency.Total()
require.Equal(t, int64(1), count)
require.Equal(t, int64(0), s.metrics.AuthFailedCount.Count())
}

Expand Down Expand Up @@ -718,7 +720,8 @@ func TestInsecureProxy(t *testing.T) {
})
require.Equal(t, int64(1), s.metrics.AuthFailedCount.Count())
require.Equal(t, int64(1), s.metrics.SuccessfulConnCount.Count())
require.Equal(t, int64(1), s.metrics.ConnectionLatency.TotalCount())
count, _ := s.metrics.ConnectionLatency.Total()
require.Equal(t, int64(1), count)
}

func TestErroneousFrontend(t *testing.T) {
Expand Down Expand Up @@ -827,7 +830,8 @@ func TestProxyRefuseConn(t *testing.T) {
_ = te.TestConnectErr(ctx, t, url, codeProxyRefusedConnection, "too many attempts")
require.Equal(t, int64(1), s.metrics.RefusedConnCount.Count())
require.Equal(t, int64(0), s.metrics.SuccessfulConnCount.Count())
require.Equal(t, int64(0), s.metrics.ConnectionLatency.TotalCount())
count, _ := s.metrics.ConnectionLatency.Total()
require.Equal(t, int64(0), count)
require.Equal(t, int64(0), s.metrics.AuthFailedCount.Count())
}

Expand Down Expand Up @@ -1643,10 +1647,10 @@ func TestConnectionMigration(t *testing.T) {
proxy.metrics.ConnMigrationErrorRecoverableCount.Count() +
proxy.metrics.ConnMigrationErrorFatalCount.Count()
require.Equal(t, totalAttempts, proxy.metrics.ConnMigrationAttemptedCount.Count())
require.Equal(t, totalAttempts,
proxy.metrics.ConnMigrationAttemptedLatency.TotalCount())
require.Equal(t, totalAttempts,
proxy.metrics.ConnMigrationTransferResponseMessageSize.TotalCount())
count, _ := proxy.metrics.ConnMigrationAttemptedLatency.Total()
require.Equal(t, totalAttempts, count)
count, _ = proxy.metrics.ConnMigrationTransferResponseMessageSize.Total()
require.Equal(t, totalAttempts, count)
}

transferConnWithRetries := func(t *testing.T, f *forwarder) error {
Expand Down Expand Up @@ -1966,12 +1970,12 @@ func TestConnectionMigration(t *testing.T) {
f.metrics.ConnMigrationErrorRecoverableCount.Count() +
f.metrics.ConnMigrationErrorFatalCount.Count()
require.Equal(t, totalAttempts, f.metrics.ConnMigrationAttemptedCount.Count())
require.Equal(t, totalAttempts,
f.metrics.ConnMigrationAttemptedLatency.TotalCount())
count, _ := f.metrics.ConnMigrationAttemptedLatency.Total()
require.Equal(t, totalAttempts, count)
// Here, we get a transfer timeout in response, so the message size
// should not be recorded.
require.Equal(t, totalAttempts-1,
f.metrics.ConnMigrationTransferResponseMessageSize.TotalCount())
count, _ = f.metrics.ConnMigrationTransferResponseMessageSize.Total()
require.Equal(t, totalAttempts-1, count)
})

// All connections should eventually be terminated.
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) {
l.Printf("%d of %d resolved timestamps validated, latest is %s behind realtime",
v.NumResolvedWithRows, requestedResolved, timeutil.Since(resolved.GoTime()))

l.Printf("%s was spent validating this resolved timestamp: %s", timeutil.Since(noteResolvedStartTime))
l.Printf("%s was spent validating this resolved timestamp: %s", timeutil.Since(noteResolvedStartTime), resolved)
l.Printf("%s was spent validating %d rows", timeSpentValidatingRows, numRowsValidated)

numRowsValidated = 0
Expand Down
1 change: 0 additions & 1 deletion pkg/cmd/roachtest/tests/multitenant_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ func registerMultiTenantUpgrade(r registry.Registry) {
Name: "multitenant-upgrade",
Timeout: 1 * time.Hour,
Cluster: r.MakeClusterSpec(2),
Leases: registry.MetamorphicLeases,
Owner: registry.OwnerMultiTenant,
NonReleaseBlocker: false,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
Expand Down
2 changes: 0 additions & 2 deletions pkg/cmd/roachtest/tests/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,6 @@ func registerTPCC(r registry.Registry) {
Tags: registry.Tags(`default`),
Cluster: mixedHeadroomSpec,
EncryptionSupport: registry.EncryptionMetamorphic,
Leases: registry.MetamorphicLeases,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runTPCCMixedHeadroom(ctx, t, c, cloud, 1)
},
Expand All @@ -534,7 +533,6 @@ func registerTPCC(r registry.Registry) {
Tags: registry.Tags(`default`),
Cluster: mixedHeadroomSpec,
EncryptionSupport: registry.EncryptionMetamorphic,
Leases: registry.MetamorphicLeases,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runTPCCMixedHeadroom(ctx, t, c, cloud, 2)
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ func registerValidateSystemSchemaAfterVersionUpgrade(r registry.Registry) {
Name: "systemschema/validate-after-version-upgrade",
Owner: registry.OwnerSQLFoundations,
Cluster: r.MakeClusterSpec(1),
Leases: registry.MetamorphicLeases,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
if c.IsLocal() && runtime.GOARCH == "arm64" {
t.Skip("Skip under ARM64. See https://github.com/cockroachdb/cockroach/issues/89268")
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3623,7 +3623,8 @@ func TestTxnCoordSenderRetries(t *testing.T) {
require.Equal(t, exp.expClientRefreshFailure, metrics.ClientRefreshFail.Count() != 0, "TxnMetrics.ClientRefreshFail")
require.Equal(t, exp.expClientAutoRetryAfterRefresh, metrics.ClientRefreshAutoRetries.Count() != 0, "TxnMetrics.ClientRefreshAutoRetries")
require.Equal(t, exp.expServerRefresh, metrics.ServerRefreshSuccess.Count() != 0, "TxnMetrics.ServerRefreshSuccess")
require.Equal(t, exp.expClientRestart, metrics.Restarts.TotalSum() != 0, "TxnMetrics.Restarts")
_, restartsSum := metrics.Restarts.Total()
require.Equal(t, exp.expClientRestart, restartsSum != 0, "TxnMetrics.Restarts")
require.Equal(t, exp.expOnePhaseCommit, metrics.Commits1PC.Count() != 0, "TxnMetrics.Commits1PC")
require.Equal(t, exp.expParallelCommitAutoRetry, metrics.ParallelCommitAutoRetries.Count() != 0, "TxnMetrics.ParallelCommitAutoRetries")
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1149,15 +1149,17 @@ func checkTxnMetrics(
func checkTxnMetricsOnce(
metrics kvcoord.TxnMetrics, name string, commits, commits1PC, aborts, restarts int64,
) error {
durationCounts, _ := metrics.Durations.Total()
restartsCounts, _ := metrics.Restarts.Total()
testcases := []struct {
name string
a, e int64
}{
{"commits", metrics.Commits.Count(), commits},
{"commits1PC", metrics.Commits1PC.Count(), commits1PC},
{"aborts", metrics.Aborts.Count(), aborts},
{"durations", metrics.Durations.TotalCount(), commits + aborts},
{"restarts", metrics.Restarts.TotalCount(), restarts},
{"durations", durationCounts, commits + aborts},
{"restarts", restartsCounts, restarts},
}

for _, tc := range testcases {
Expand Down Expand Up @@ -1374,7 +1376,8 @@ func TestTxnDurations(t *testing.T) {
// introducing spurious errors or being overly lax.
//
// TODO(cdo): look into cause of variance.
if a, e := hist.TotalCount(), int64(puts); a != e {
count, _ := hist.Total()
if a, e := count, int64(puts); a != e {
t.Fatalf("durations %d != expected %d", a, e)
}

Expand Down
44 changes: 23 additions & 21 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,37 +373,39 @@ func TestRestoreReplicas(t *testing.T) {

// Perform an increment before replication to ensure that commands are not
// repeated on restarts.
incArgs := incrementArgs([]byte("a"), 23)
if _, err := kv.SendWrapped(ctx,
store.TestSender(), incArgs); err != nil {
t.Fatal(err)
}
incArgs := incrementArgs(key, 23)
_, pErr := kv.SendWrapped(ctx, store.TestSender(), incArgs)
require.NoError(t, pErr.GoError())

tc.AddVotersOrFatal(t, key, tc.Target(1))

require.NoError(t, tc.Restart())

incArgs = incrementArgs([]byte("a"), 5)
failures := 0
successes := 0
// Send a command on each store. It should only succeed on the lease holder.
// Find the leaseholder and follower. The restart may cause the Raft
// leadership to bounce around a bit, since we don't fully enable Raft
// prevote, so we loop for a bit until we find the leaseholder.
incArgs = incrementArgs(key, 5)
var followerStore *kvserver.Store
for i := 0; i < len(tc.Servers); i++ {
if _, pErr := kv.SendWrapped(ctx, tc.GetFirstStoreFromServer(t, i).TestSender(), incArgs); pErr != nil {
failures++
if _, ok := pErr.GetDetail().(*kvpb.NotLeaseHolderError); !ok {
t.Fatalf("expected not lease holder error; got %s", pErr)
testutils.SucceedsSoon(t, func() error {
var pErr *kvpb.Error
for i := 0; i < tc.NumServers(); i++ {
_, pErr = kv.SendWrapped(ctx, tc.GetFirstStoreFromServer(t, i).TestSender(), incArgs)
if pErr == nil {
followerStore = tc.GetFirstStoreFromServer(t, 1-i)
break
}
followerStore = tc.GetFirstStoreFromServer(t, i)
} else {
successes++
require.IsType(t, &kvpb.NotLeaseHolderError{}, pErr.GetDetail())
}
}
require.Equal(t, 1, failures, "replica command failed (non leaseholders)")
require.Equal(t, 1, successes, "replica command succeeded (leaseholders)")
return pErr.GoError()
})

// The follower should now return a NLHE.
_, pErr = kv.SendWrapped(ctx, followerStore.TestSender(), incArgs)
require.Error(t, pErr.GoError())
require.IsType(t, &kvpb.NotLeaseHolderError{}, pErr.GetDetail())

testutils.SucceedsSoon(t, func() error {
getArgs := getArgs([]byte("a"))
getArgs := getArgs(key)
if reply, err := kv.SendWrappedWith(ctx, followerStore.TestSender(), kvpb.Header{
ReadConsistency: kvpb.INCONSISTENT,
}, getArgs); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ func TestSchedulerLoop(t *testing.T) {
return nil
})

require.Equal(t, int64(3), m.RaftSchedulerLatency.TotalCount())
count, _ := m.RaftSchedulerLatency.Total()
require.Equal(t, int64(3), count)
}

// Verify that when we enqueue the same range multiple times for the same
Expand Down
9 changes: 6 additions & 3 deletions pkg/server/status/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,9 +609,12 @@ type registryRecorder struct {
func extractValue(name string, mtr interface{}, fn func(string, float64)) error {
switch mtr := mtr.(type) {
case metric.WindowedHistogram:
n := float64(mtr.TotalCountWindowed())
fn(name+"-count", n)
avg := mtr.TotalSumWindowed() / n
// Use cumulative stats here
count, sum := mtr.Total()
fn(name+"-count", float64(count))
fn(name+"-sum", sum)
// Use windowed stats for avg and quantiles
avg := mtr.MeanWindowed()
if math.IsNaN(avg) || math.IsInf(avg, +1) || math.IsInf(avg, -1) {
avg = 0
}
Expand Down
1 change: 1 addition & 0 deletions pkg/server/status/recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,7 @@ func TestMetricsRecorder(t *testing.T) {
addExpected(reg.prefix, data.name+q.Suffix, reg.source, 100, 10, reg.isNode)
}
addExpected(reg.prefix, data.name+"-count", reg.source, 100, 1, reg.isNode)
addExpected(reg.prefix, data.name+"-sum", reg.source, 100, 9, reg.isNode)
addExpected(reg.prefix, data.name+"-avg", reg.source, 100, 9, reg.isNode)
default:
t.Fatalf("unexpected: %+v", data)
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/asciitsdb/asciitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@ func (t *TSDB) Scrape(ctx context.Context) {

switch mtr := val.(type) {
case metric.WindowedHistogram:
n := float64(mtr.TotalCountWindowed())
if _, ok := t.mu.points[name]; !ok {
return
}
t.mu.points[name+"-count"] = append(t.mu.points[name+"-count"], n)
avg := mtr.TotalSumWindowed() / n
count, _ := mtr.Total()
t.mu.points[name+"-count"] = append(t.mu.points[name+"-count"], float64(count))
avg := mtr.MeanWindowed()
if math.IsNaN(avg) || math.IsInf(avg, +1) || math.IsInf(avg, -1) {
avg = 0
}
Expand Down
22 changes: 16 additions & 6 deletions pkg/util/metric/aggmetric/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,24 @@ func (a *AggHistogram) GetMetadata() metric.Metadata { return a.h.GetMetadata()
// Inspect is part of the metric.Iterable interface.
func (a *AggHistogram) Inspect(f func(interface{})) { f(a) }

// TotalCountWindowed is part of the metric.WindowedHistogram interface
func (a *AggHistogram) TotalCountWindowed() int64 {
return a.h.TotalCountWindowed()
// TotalWindowed is part of the metric.WindowedHistogram interface
func (a *AggHistogram) TotalWindowed() (int64, float64) {
return a.h.TotalWindowed()
}

// TotalSumWindowed is part of the metric.WindowedHistogram interface
func (a *AggHistogram) TotalSumWindowed() float64 {
return a.h.TotalSumWindowed()
// Total is part of the metric.WindowedHistogram interface
func (a *AggHistogram) Total() (int64, float64) {
return a.h.Total()
}

// MeanWindowed is part of the metric.WindowedHistogram interface
func (a *AggHistogram) MeanWindowed() float64 {
return a.h.MeanWindowed()
}

// Mean is part of the metric.WindowedHistogram interface
func (a *AggHistogram) Mean() float64 {
return a.h.Mean()
}

// ValueAtQuantileWindowed is part of the metric.WindowedHistogram interface
Expand Down
27 changes: 13 additions & 14 deletions pkg/util/metric/hdrhistogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,12 @@ func (h *HdrHistogram) RecordValue(v int64) {
}
}

// TotalCount returns the (cumulative) number of samples.
func (h *HdrHistogram) TotalCount() int64 {
// Total returns the (cumulative) number of samples and sum of samples.
func (h *HdrHistogram) Total() (int64, float64) {
h.mu.Lock()
defer h.mu.Unlock()
return h.mu.cumulative.TotalCount()
totalSum := float64(h.mu.cumulative.TotalCount()) * h.mu.cumulative.Mean()
return h.mu.cumulative.TotalCount(), totalSum
}

// Min returns the minimum.
Expand Down Expand Up @@ -168,14 +169,10 @@ func (h *HdrHistogram) ToPrometheusMetric() *prometheusgo.Metric {
}
}

// TotalCountWindowed implements the WindowedHistogram interface.
func (h *HdrHistogram) TotalCountWindowed() int64 {
return int64(h.ToPrometheusMetricWindowed().Histogram.GetSampleCount())
}

// TotalSumWindowed implements the WindowedHistogram interface.
func (h *HdrHistogram) TotalSumWindowed() float64 {
return h.ToPrometheusMetricWindowed().Histogram.GetSampleSum()
// TotalWindowed implements the WindowedHistogram interface.
func (h *HdrHistogram) TotalWindowed() (int64, float64) {
pHist := h.ToPrometheusMetricWindowed().Histogram
return int64(pHist.GetSampleCount()), pHist.GetSampleSum()
}

func (h *HdrHistogram) toPrometheusMetricWindowedLocked() *prometheusgo.Metric {
Expand Down Expand Up @@ -240,7 +237,9 @@ func (h *HdrHistogram) Mean() float64 {
return h.mu.cumulative.Mean()
}

// TotalSum returns the (cumulative) sum of samples.
func (h *HdrHistogram) TotalSum() float64 {
return h.ToPrometheusMetric().Histogram.GetSampleSum()
func (h *HdrHistogram) MeanWindowed() float64 {
h.mu.Lock()
defer h.mu.Unlock()

return h.mu.sliding.Current.Mean()
}
Loading

0 comments on commit f56f57e

Please sign in to comment.