From 380a203eb8de2fbdaa6f532e99d76a26c5e08389 Mon Sep 17 00:00:00 2001 From: Matthew Todd Date: Wed, 8 Jun 2022 13:35:19 -0400 Subject: [PATCH 1/3] outliers: extract a Registry interface. This is a pure mechanical refactoring, preparing us for #81021, where we'll move outlier processing off of the hot execution path. The idea is that the outside world will continue to talk to us as a Registry, but we'll now have a seam into which we can insert some asynchrony. Release note: None --- pkg/sql/sqlstats/outliers/BUILD.bazel | 1 + pkg/sql/sqlstats/outliers/outliers.go | 126 +++-------------- pkg/sql/sqlstats/outliers/registry.go | 127 ++++++++++++++++++ pkg/sql/sqlstats/sslocal/sql_stats.go | 2 +- .../sqlstats/ssmemstorage/ss_mem_storage.go | 4 +- 5 files changed, 151 insertions(+), 109 deletions(-) create mode 100644 pkg/sql/sqlstats/outliers/registry.go diff --git a/pkg/sql/sqlstats/outliers/BUILD.bazel b/pkg/sql/sqlstats/outliers/BUILD.bazel index 1f02859441f4..c4831ea1c17e 100644 --- a/pkg/sql/sqlstats/outliers/BUILD.bazel +++ b/pkg/sql/sqlstats/outliers/BUILD.bazel @@ -7,6 +7,7 @@ go_library( srcs = [ "detector.go", "outliers.go", + "registry.go", ], embed = [":outliers_go_proto"], importpath = "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/outliers", diff --git a/pkg/sql/sqlstats/outliers/outliers.go b/pkg/sql/sqlstats/outliers/outliers.go index 26d75d462d43..0f8c8a557843 100644 --- a/pkg/sql/sqlstats/outliers/outliers.go +++ b/pkg/sql/sqlstats/outliers/outliers.go @@ -18,10 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/clusterunique" - "github.com/cockroachdb/cockroach/pkg/util/cache" "github.com/cockroachdb/cockroach/pkg/util/metric" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/cockroach/pkg/util/uint128" "github.com/cockroachdb/cockroach/pkg/util/uuid" prometheus "github.com/prometheus/client_model/go" ) @@ -121,114 +118,31 @@ func NewMetrics() Metrics { } } -// maxCacheSize is the number of detected outliers we will retain in memory. -// We choose a small value for the time being to allow us to iterate without -// worrying about memory usage. See #79450. -const ( - maxCacheSize = 10 -) +// Reader offers read-only access to the currently retained set of outliers. +type Reader interface { + // IterateOutliers calls visitor with each of the currently retained set of outliers. + IterateOutliers(context.Context, func(context.Context, *Outlier)) +} // Registry is the central object in the outliers subsystem. It observes // statement execution to determine which statements are outliers and // exposes the set of currently retained outliers. -type Registry struct { - detector detector - - // Note that this single mutex places unnecessary constraints on outlier - // detection and reporting. We will develop a higher-throughput system - // before enabling the outliers subsystem by default. - mu struct { - syncutil.RWMutex - statements map[clusterunique.ID][]*Outlier_Statement - outliers *cache.UnorderedCache - } +type Registry interface { + // ObserveStatement notifies the registry of a statement execution. + ObserveStatement( + sessionID clusterunique.ID, + statementID clusterunique.ID, + statementFingerprintID roachpb.StmtFingerprintID, + latencyInSeconds float64, + ) + + // ObserveTransaction notifies the registry of the end of a transaction. + ObserveTransaction(sessionID clusterunique.ID, txnID uuid.UUID) + + Reader } // New builds a new Registry. -func New(st *cluster.Settings, metrics Metrics) *Registry { - config := cache.Config{ - Policy: cache.CacheFIFO, - ShouldEvict: func(size int, key, value interface{}) bool { - return size > maxCacheSize - }, - } - r := &Registry{ - detector: anyDetector{detectors: []detector{ - latencyThresholdDetector{st: st}, - newLatencyQuantileDetector(st, metrics), - }}} - r.mu.statements = make(map[clusterunique.ID][]*Outlier_Statement) - r.mu.outliers = cache.NewUnorderedCache(config) - return r -} - -// ObserveStatement notifies the registry of a statement execution. -func (r *Registry) ObserveStatement( - sessionID clusterunique.ID, - statementID clusterunique.ID, - statementFingerprintID roachpb.StmtFingerprintID, - latencyInSeconds float64, -) { - if !r.enabled() { - return - } - r.mu.Lock() - defer r.mu.Unlock() - r.mu.statements[sessionID] = append(r.mu.statements[sessionID], &Outlier_Statement{ - ID: statementID.GetBytes(), - FingerprintID: statementFingerprintID, - LatencyInSeconds: latencyInSeconds, - }) -} - -// ObserveTransaction notifies the registry of the end of a transaction. -func (r *Registry) ObserveTransaction(sessionID clusterunique.ID, txnID uuid.UUID) { - if !r.enabled() { - return - } - r.mu.Lock() - defer r.mu.Unlock() - statements := r.mu.statements[sessionID] - delete(r.mu.statements, sessionID) - - hasOutlier := false - for _, s := range statements { - if r.detector.isOutlier(s) { - hasOutlier = true - } - } - - if hasOutlier { - for _, s := range statements { - r.mu.outliers.Add(uint128.FromBytes(s.ID), &Outlier{ - Session: &Outlier_Session{ID: sessionID.GetBytes()}, - Transaction: &Outlier_Transaction{ID: &txnID}, - Statement: s, - }) - } - } -} - -// TODO(todd): -// Once we can handle sufficient throughput to live on the hot -// execution path in #81021, we can probably get rid of this external -// concept of "enabled" and let the detectors just decide for themselves -// internally. -func (r *Registry) enabled() bool { - return r.detector.enabled() -} - -// Reader offers read-only access to the currently retained set of outliers. -type Reader interface { - IterateOutliers(context.Context, func(context.Context, *Outlier)) -} - -// IterateOutliers calls visitor with each of the currently retained set of -// outliers. -func (r *Registry) IterateOutliers(ctx context.Context, visitor func(context.Context, *Outlier)) { - r.mu.RLock() - defer r.mu.RUnlock() - r.mu.outliers.Do(func(e *cache.Entry) { - visitor(ctx, e.Value.(*Outlier)) - }) +func New(st *cluster.Settings, metrics Metrics) Registry { + return newRegistry(st, metrics) } diff --git a/pkg/sql/sqlstats/outliers/registry.go b/pkg/sql/sqlstats/outliers/registry.go new file mode 100644 index 000000000000..c3ece56bbbf1 --- /dev/null +++ b/pkg/sql/sqlstats/outliers/registry.go @@ -0,0 +1,127 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package outliers + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/clusterunique" + "github.com/cockroachdb/cockroach/pkg/util/cache" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/uint128" + "github.com/cockroachdb/cockroach/pkg/util/uuid" +) + +// maxCacheSize is the number of detected outliers we will retain in memory. +// We choose a small value for the time being to allow us to iterate without +// worrying about memory usage. See #79450. +const ( + maxCacheSize = 10 +) + +// registry is the central object in the outliers subsystem. It observes +// statement execution to determine which statements are outliers and +// exposes the set of currently retained outliers. +type registry struct { + detector detector + + // Note that this single mutex places unnecessary constraints on outlier + // detection and reporting. We will develop a higher-throughput system + // before enabling the outliers subsystem by default. + mu struct { + syncutil.RWMutex + statements map[clusterunique.ID][]*Outlier_Statement + outliers *cache.UnorderedCache + } +} + +var _ Registry = ®istry{} + +func newRegistry(st *cluster.Settings, metrics Metrics) Registry { + config := cache.Config{ + Policy: cache.CacheFIFO, + ShouldEvict: func(size int, key, value interface{}) bool { + return size > maxCacheSize + }, + } + r := ®istry{ + detector: anyDetector{detectors: []detector{ + latencyThresholdDetector{st: st}, + newLatencyQuantileDetector(st, metrics), + }}} + r.mu.statements = make(map[clusterunique.ID][]*Outlier_Statement) + r.mu.outliers = cache.NewUnorderedCache(config) + return r +} + +func (r *registry) ObserveStatement( + sessionID clusterunique.ID, + statementID clusterunique.ID, + statementFingerprintID roachpb.StmtFingerprintID, + latencyInSeconds float64, +) { + if !r.enabled() { + return + } + r.mu.Lock() + defer r.mu.Unlock() + r.mu.statements[sessionID] = append(r.mu.statements[sessionID], &Outlier_Statement{ + ID: statementID.GetBytes(), + FingerprintID: statementFingerprintID, + LatencyInSeconds: latencyInSeconds, + }) +} + +func (r *registry) ObserveTransaction(sessionID clusterunique.ID, txnID uuid.UUID) { + if !r.enabled() { + return + } + r.mu.Lock() + defer r.mu.Unlock() + statements := r.mu.statements[sessionID] + delete(r.mu.statements, sessionID) + + hasOutlier := false + for _, s := range statements { + if r.detector.isOutlier(s) { + hasOutlier = true + } + } + + if hasOutlier { + for _, s := range statements { + r.mu.outliers.Add(uint128.FromBytes(s.ID), &Outlier{ + Session: &Outlier_Session{ID: sessionID.GetBytes()}, + Transaction: &Outlier_Transaction{ID: &txnID}, + Statement: s, + }) + } + } +} + +func (r *registry) IterateOutliers(ctx context.Context, visitor func(context.Context, *Outlier)) { + r.mu.RLock() + defer r.mu.RUnlock() + r.mu.outliers.Do(func(e *cache.Entry) { + visitor(ctx, e.Value.(*Outlier)) + }) +} + +// TODO(todd): +// Once we can handle sufficient throughput to live on the hot +// execution path in #81021, we can probably get rid of this external +// concept of "enabled" and let the detectors just decide for themselves +// internally. +func (r *registry) enabled() bool { + return r.detector.enabled() +} diff --git a/pkg/sql/sqlstats/sslocal/sql_stats.go b/pkg/sql/sqlstats/sslocal/sql_stats.go index 30186ba57352..696357e08a89 100644 --- a/pkg/sql/sqlstats/sslocal/sql_stats.go +++ b/pkg/sql/sqlstats/sslocal/sql_stats.go @@ -67,7 +67,7 @@ type SQLStats struct { knobs *sqlstats.TestingKnobs - outliers *outliers.Registry + outliers outliers.Registry } func newSQLStats( diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go index 4befcc80aacd..6b3d897beacc 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go @@ -120,7 +120,7 @@ type Container struct { mon *mon.BytesMonitor knobs *sqlstats.TestingKnobs - outliersRegistry *outliers.Registry + outliersRegistry outliers.Registry } var _ sqlstats.ApplicationStats = &Container{} @@ -135,7 +135,7 @@ func New( mon *mon.BytesMonitor, appName string, knobs *sqlstats.TestingKnobs, - outliersRegistry *outliers.Registry, + outliersRegistry outliers.Registry, ) *Container { s := &Container{ st: st, From 21f2390b0ed05150f3bb04cd9bbf769a2a5b9337 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 30 Jun 2022 16:25:12 -0700 Subject: [PATCH 2/3] kvstreamer: reuse incomplete Get requests on resume batches Previously, for all incomplete requests in a batch we'd allocate new Get and Scan requests (since - due to a known issue #75452 - at the moment the lifecycle of the requests is not clearly defined, so we're not allowed to modify them). However, we can reuse the Get requests since they won't be ever modified (i.e. they are either complete or incomplete, and, unlike for Scan requests, the start key won't ever be shifted), so this commit takes advantage of this observation. Release note: None --- pkg/kv/kvclient/kvstreamer/streamer.go | 31 ++++++++++++-------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index dfe7899cbd94..a4100f244e55 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -1360,7 +1360,7 @@ func processSingleRangeResponse( ) { processSingleRangeResults(ctx, s, req, br, fp) if fp.hasIncomplete() { - resumeReq := buildResumeSingeRangeBatch(s, req, br, fp) + resumeReq := buildResumeSingleRangeBatch(s, req, br, fp) s.requestsToServe.add(resumeReq) } } @@ -1521,7 +1521,7 @@ func processSingleRangeResults( // // Note that it should only be called if the response has any incomplete // requests. -func buildResumeSingeRangeBatch( +func buildResumeSingleRangeBatch( s *Streamer, req singleRangeBatch, br *roachpb.BatchResponse, @@ -1537,13 +1537,6 @@ func buildResumeSingeRangeBatch( // requests with the ResumeSpans. resumeReq.reqsReservedBytes = fp.resumeReqsMemUsage resumeReq.overheadAccountedFor = req.overheadAccountedFor - // TODO(yuzefovich): for incomplete Get requests, the ResumeSpan should be - // exactly the same as the original span, so we might be able to reuse the - // original Get requests. - gets := make([]struct { - req roachpb.GetRequest - union roachpb.RequestUnion_Get - }, fp.numIncompleteGets) scans := make([]struct { req roachpb.ScanRequest union roachpb.RequestUnion_Scan @@ -1560,14 +1553,18 @@ func buildResumeSingeRangeBatch( emptyResponse = false continue } - // This Get wasn't completed - create a new request according to the - // ResumeSpan and include it into the batch. - newGet := gets[0] - gets = gets[1:] - newGet.req.SetSpan(*get.ResumeSpan) - newGet.req.KeyLocking = origRequest.KeyLocking - newGet.union.Get = &newGet.req - resumeReq.reqs[resumeReqIdx].Value = &newGet.union + // This Get wasn't completed - include it into the batch again (we + // can just reuse the original request since it hasn't been + // modified which is also asserted below). + if buildutil.CrdbTestBuild { + if !get.ResumeSpan.Equal(origRequest.Span()) { + panic(errors.AssertionFailedf( + "unexpectedly the ResumeSpan %s on the GetResponse is different from the original span %s", + get.ResumeSpan, origRequest.Span(), + )) + } + } + resumeReq.reqs[resumeReqIdx] = req.reqs[i] resumeReq.positions = append(resumeReq.positions, position) if req.subRequestIdx != nil { resumeReq.subRequestIdx = append(resumeReq.subRequestIdx, req.subRequestIdx[i]) From 67ff4075095de3ffb2c7d75a79a670fd872d23e3 Mon Sep 17 00:00:00 2001 From: Ben Bardin Date: Fri, 1 Jul 2022 12:50:49 -0400 Subject: [PATCH 3/3] pkg/util/tracing: Add hidden tag group, make server responsible for sorting. Release note: none This moves all tags marked as "hidden" into a single tag group at the UI layer. This declutters the trace page a little bit and makes it easier to pick out more important information. --- pkg/kv/test_utils.go | 2 +- pkg/sql/exec_util.go | 2 +- pkg/sql/trace_test.go | 2 +- .../db-console/src/views/tracez/tracez.tsx | 16 ++--- pkg/util/tracing/crdbspan.go | 19 +++--- .../grpcinterceptor/grpc_interceptor_test.go | 8 +-- pkg/util/tracing/span_test.go | 26 ++++----- pkg/util/tracing/tracingpb/recorded_span.go | 5 ++ pkg/util/tracing/tracingpb/recording.go | 4 +- .../tracing/tracingui/span_registry_ui.go | 58 ++++++++++++++----- 10 files changed, 89 insertions(+), 53 deletions(-) diff --git a/pkg/kv/test_utils.go b/pkg/kv/test_utils.go index 7fe1a1a34caf..6248f6749320 100644 --- a/pkg/kv/test_utils.go +++ b/pkg/kv/test_utils.go @@ -27,7 +27,7 @@ func OnlyFollowerReads(rec tracingpb.Recording) bool { if sp.Operation != "/cockroach.roachpb.Internal/Batch" { continue } - anonTagGroup := sp.FindTagGroup("") + anonTagGroup := sp.FindTagGroup(tracingpb.AnonymousTagGroupName) if anonTagGroup == nil { continue } diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index bb1f7f61b381..dc1a6a78f83d 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -2627,7 +2627,7 @@ func getMessagesForSubtrace( for _, tg := range span.TagGroups { var prefix string - if tg.Name != "" { + if tg.Name != tracingpb.AnonymousTagGroupName { prefix = fmt.Sprintf("%s-", tg.Name) } for _, tag := range tg.Tags { diff --git a/pkg/sql/trace_test.go b/pkg/sql/trace_test.go index 1ff4a5289f47..e745110562e0 100644 --- a/pkg/sql/trace_test.go +++ b/pkg/sql/trace_test.go @@ -638,7 +638,7 @@ func TestTraceDistSQL(t *testing.T) { require.True(t, ok, "table reader span not found") require.Empty(t, rec.OrphanSpans()) // Check that the table reader indeed came from a remote note. - anonTagGroup := sp.FindTagGroup("") + anonTagGroup := sp.FindTagGroup(tracingpb.AnonymousTagGroupName) require.NotNil(t, anonTagGroup) val, ok := anonTagGroup.FindTag("node") require.True(t, ok) diff --git a/pkg/ui/workspaces/db-console/src/views/tracez/tracez.tsx b/pkg/ui/workspaces/db-console/src/views/tracez/tracez.tsx index 5bd7259c77cd..d47bdf3937cd 100644 --- a/pkg/ui/workspaces/db-console/src/views/tracez/tracez.tsx +++ b/pkg/ui/workspaces/db-console/src/views/tracez/tracez.tsx @@ -157,12 +157,12 @@ const TagBadge = ({ let badgeStatus: BadgeStatus; if (status) { badgeStatus = status; + } else if (t.hidden) { + badgeStatus = "default"; } else if (isExpandable) { badgeStatus = "warning"; - } else if (!t.hidden) { - badgeStatus = "info"; } else { - badgeStatus = "default"; + badgeStatus = "info"; } return (