Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
83684: kvstreamer: reuse incomplete Get requests on resume batches r=yuzefovich a=yuzefovich

Previously, for all incomplete requests in a batch we'd allocate new Get
and Scan requests (since - due to a known issue #75452 - at the moment
the lifecycle of the requests is not clearly defined, so we're not
allowed to modify them). However, we can reuse the Get requests since
they won't be ever modified (i.e. they are either complete or
incomplete, and, unlike for Scan requests, the start key won't ever be
shifted), so this commit takes advantage of this observation.

Release note: None

83709: pkg/util/tracing: Add hidden tag group, make server responsible for sorting. r=benbardin a=benbardin

Release note: none

This moves all tags marked as "hidden" into a single tag group at the UI layer. This declutters the trace page a little bit and makes it easier to pick out more important information.

<img width="1620" alt="Screen Shot 2022-07-01 at 12 37 07 PM" src="https://user-images.githubusercontent.com/261508/176937757-bf8ac920-9e28-4908-8de4-1fbc077fd2c7.png">


83834: outliers: extract a Registry interface. r=matthewtodd a=matthewtodd

This is a pure mechanical refactoring, preparing us for #81021, where
we'll move outlier processing off of the hot execution path. The idea is
that the outside world will continue to talk to us as a Registry, but
we'll now have a seam into which we can insert some asynchrony.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Ben Bardin <[email protected]>
Co-authored-by: Matthew Todd <[email protected]>
  • Loading branch information
4 people committed Jul 6, 2022
4 parents 264cd04 + 21f2390 + 67ff407 + 380a203 commit 41228d1
Show file tree
Hide file tree
Showing 16 changed files with 254 additions and 179 deletions.
31 changes: 14 additions & 17 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1360,7 +1360,7 @@ func processSingleRangeResponse(
) {
processSingleRangeResults(ctx, s, req, br, fp)
if fp.hasIncomplete() {
resumeReq := buildResumeSingeRangeBatch(s, req, br, fp)
resumeReq := buildResumeSingleRangeBatch(s, req, br, fp)
s.requestsToServe.add(resumeReq)
}
}
Expand Down Expand Up @@ -1521,7 +1521,7 @@ func processSingleRangeResults(
//
// Note that it should only be called if the response has any incomplete
// requests.
func buildResumeSingeRangeBatch(
func buildResumeSingleRangeBatch(
s *Streamer,
req singleRangeBatch,
br *roachpb.BatchResponse,
Expand All @@ -1537,13 +1537,6 @@ func buildResumeSingeRangeBatch(
// requests with the ResumeSpans.
resumeReq.reqsReservedBytes = fp.resumeReqsMemUsage
resumeReq.overheadAccountedFor = req.overheadAccountedFor
// TODO(yuzefovich): for incomplete Get requests, the ResumeSpan should be
// exactly the same as the original span, so we might be able to reuse the
// original Get requests.
gets := make([]struct {
req roachpb.GetRequest
union roachpb.RequestUnion_Get
}, fp.numIncompleteGets)
scans := make([]struct {
req roachpb.ScanRequest
union roachpb.RequestUnion_Scan
Expand All @@ -1560,14 +1553,18 @@ func buildResumeSingeRangeBatch(
emptyResponse = false
continue
}
// This Get wasn't completed - create a new request according to the
// ResumeSpan and include it into the batch.
newGet := gets[0]
gets = gets[1:]
newGet.req.SetSpan(*get.ResumeSpan)
newGet.req.KeyLocking = origRequest.KeyLocking
newGet.union.Get = &newGet.req
resumeReq.reqs[resumeReqIdx].Value = &newGet.union
// This Get wasn't completed - include it into the batch again (we
// can just reuse the original request since it hasn't been
// modified which is also asserted below).
if buildutil.CrdbTestBuild {
if !get.ResumeSpan.Equal(origRequest.Span()) {
panic(errors.AssertionFailedf(
"unexpectedly the ResumeSpan %s on the GetResponse is different from the original span %s",
get.ResumeSpan, origRequest.Span(),
))
}
}
resumeReq.reqs[resumeReqIdx] = req.reqs[i]
resumeReq.positions = append(resumeReq.positions, position)
if req.subRequestIdx != nil {
resumeReq.subRequestIdx = append(resumeReq.subRequestIdx, req.subRequestIdx[i])
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func OnlyFollowerReads(rec tracingpb.Recording) bool {
if sp.Operation != "/cockroach.roachpb.Internal/Batch" {
continue
}
anonTagGroup := sp.FindTagGroup("")
anonTagGroup := sp.FindTagGroup(tracingpb.AnonymousTagGroupName)
if anonTagGroup == nil {
continue
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2627,7 +2627,7 @@ func getMessagesForSubtrace(

for _, tg := range span.TagGroups {
var prefix string
if tg.Name != "" {
if tg.Name != tracingpb.AnonymousTagGroupName {
prefix = fmt.Sprintf("%s-", tg.Name)
}
for _, tag := range tg.Tags {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sqlstats/outliers/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
srcs = [
"detector.go",
"outliers.go",
"registry.go",
],
embed = [":outliers_go_proto"],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/outliers",
Expand Down
126 changes: 20 additions & 106 deletions pkg/sql/sqlstats/outliers/outliers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/clusterunique"
"github.com/cockroachdb/cockroach/pkg/util/cache"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uint128"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
prometheus "github.com/prometheus/client_model/go"
)
Expand Down Expand Up @@ -121,114 +118,31 @@ func NewMetrics() Metrics {
}
}

// maxCacheSize is the number of detected outliers we will retain in memory.
// We choose a small value for the time being to allow us to iterate without
// worrying about memory usage. See #79450.
const (
maxCacheSize = 10
)
// Reader offers read-only access to the currently retained set of outliers.
type Reader interface {
// IterateOutliers calls visitor with each of the currently retained set of outliers.
IterateOutliers(context.Context, func(context.Context, *Outlier))
}

// Registry is the central object in the outliers subsystem. It observes
// statement execution to determine which statements are outliers and
// exposes the set of currently retained outliers.
type Registry struct {
detector detector

// Note that this single mutex places unnecessary constraints on outlier
// detection and reporting. We will develop a higher-throughput system
// before enabling the outliers subsystem by default.
mu struct {
syncutil.RWMutex
statements map[clusterunique.ID][]*Outlier_Statement
outliers *cache.UnorderedCache
}
type Registry interface {
// ObserveStatement notifies the registry of a statement execution.
ObserveStatement(
sessionID clusterunique.ID,
statementID clusterunique.ID,
statementFingerprintID roachpb.StmtFingerprintID,
latencyInSeconds float64,
)

// ObserveTransaction notifies the registry of the end of a transaction.
ObserveTransaction(sessionID clusterunique.ID, txnID uuid.UUID)

Reader
}

// New builds a new Registry.
func New(st *cluster.Settings, metrics Metrics) *Registry {
config := cache.Config{
Policy: cache.CacheFIFO,
ShouldEvict: func(size int, key, value interface{}) bool {
return size > maxCacheSize
},
}
r := &Registry{
detector: anyDetector{detectors: []detector{
latencyThresholdDetector{st: st},
newLatencyQuantileDetector(st, metrics),
}}}
r.mu.statements = make(map[clusterunique.ID][]*Outlier_Statement)
r.mu.outliers = cache.NewUnorderedCache(config)
return r
}

// ObserveStatement notifies the registry of a statement execution.
func (r *Registry) ObserveStatement(
sessionID clusterunique.ID,
statementID clusterunique.ID,
statementFingerprintID roachpb.StmtFingerprintID,
latencyInSeconds float64,
) {
if !r.enabled() {
return
}
r.mu.Lock()
defer r.mu.Unlock()
r.mu.statements[sessionID] = append(r.mu.statements[sessionID], &Outlier_Statement{
ID: statementID.GetBytes(),
FingerprintID: statementFingerprintID,
LatencyInSeconds: latencyInSeconds,
})
}

// ObserveTransaction notifies the registry of the end of a transaction.
func (r *Registry) ObserveTransaction(sessionID clusterunique.ID, txnID uuid.UUID) {
if !r.enabled() {
return
}
r.mu.Lock()
defer r.mu.Unlock()
statements := r.mu.statements[sessionID]
delete(r.mu.statements, sessionID)

hasOutlier := false
for _, s := range statements {
if r.detector.isOutlier(s) {
hasOutlier = true
}
}

if hasOutlier {
for _, s := range statements {
r.mu.outliers.Add(uint128.FromBytes(s.ID), &Outlier{
Session: &Outlier_Session{ID: sessionID.GetBytes()},
Transaction: &Outlier_Transaction{ID: &txnID},
Statement: s,
})
}
}
}

// TODO(todd):
// Once we can handle sufficient throughput to live on the hot
// execution path in #81021, we can probably get rid of this external
// concept of "enabled" and let the detectors just decide for themselves
// internally.
func (r *Registry) enabled() bool {
return r.detector.enabled()
}

// Reader offers read-only access to the currently retained set of outliers.
type Reader interface {
IterateOutliers(context.Context, func(context.Context, *Outlier))
}

// IterateOutliers calls visitor with each of the currently retained set of
// outliers.
func (r *Registry) IterateOutliers(ctx context.Context, visitor func(context.Context, *Outlier)) {
r.mu.RLock()
defer r.mu.RUnlock()
r.mu.outliers.Do(func(e *cache.Entry) {
visitor(ctx, e.Value.(*Outlier))
})
func New(st *cluster.Settings, metrics Metrics) Registry {
return newRegistry(st, metrics)
}
127 changes: 127 additions & 0 deletions pkg/sql/sqlstats/outliers/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package outliers

import (
"context"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/clusterunique"
"github.com/cockroachdb/cockroach/pkg/util/cache"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uint128"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

// maxCacheSize is the number of detected outliers we will retain in memory.
// We choose a small value for the time being to allow us to iterate without
// worrying about memory usage. See #79450.
const (
maxCacheSize = 10
)

// registry is the central object in the outliers subsystem. It observes
// statement execution to determine which statements are outliers and
// exposes the set of currently retained outliers.
type registry struct {
detector detector

// Note that this single mutex places unnecessary constraints on outlier
// detection and reporting. We will develop a higher-throughput system
// before enabling the outliers subsystem by default.
mu struct {
syncutil.RWMutex
statements map[clusterunique.ID][]*Outlier_Statement
outliers *cache.UnorderedCache
}
}

var _ Registry = &registry{}

func newRegistry(st *cluster.Settings, metrics Metrics) Registry {
config := cache.Config{
Policy: cache.CacheFIFO,
ShouldEvict: func(size int, key, value interface{}) bool {
return size > maxCacheSize
},
}
r := &registry{
detector: anyDetector{detectors: []detector{
latencyThresholdDetector{st: st},
newLatencyQuantileDetector(st, metrics),
}}}
r.mu.statements = make(map[clusterunique.ID][]*Outlier_Statement)
r.mu.outliers = cache.NewUnorderedCache(config)
return r
}

func (r *registry) ObserveStatement(
sessionID clusterunique.ID,
statementID clusterunique.ID,
statementFingerprintID roachpb.StmtFingerprintID,
latencyInSeconds float64,
) {
if !r.enabled() {
return
}
r.mu.Lock()
defer r.mu.Unlock()
r.mu.statements[sessionID] = append(r.mu.statements[sessionID], &Outlier_Statement{
ID: statementID.GetBytes(),
FingerprintID: statementFingerprintID,
LatencyInSeconds: latencyInSeconds,
})
}

func (r *registry) ObserveTransaction(sessionID clusterunique.ID, txnID uuid.UUID) {
if !r.enabled() {
return
}
r.mu.Lock()
defer r.mu.Unlock()
statements := r.mu.statements[sessionID]
delete(r.mu.statements, sessionID)

hasOutlier := false
for _, s := range statements {
if r.detector.isOutlier(s) {
hasOutlier = true
}
}

if hasOutlier {
for _, s := range statements {
r.mu.outliers.Add(uint128.FromBytes(s.ID), &Outlier{
Session: &Outlier_Session{ID: sessionID.GetBytes()},
Transaction: &Outlier_Transaction{ID: &txnID},
Statement: s,
})
}
}
}

func (r *registry) IterateOutliers(ctx context.Context, visitor func(context.Context, *Outlier)) {
r.mu.RLock()
defer r.mu.RUnlock()
r.mu.outliers.Do(func(e *cache.Entry) {
visitor(ctx, e.Value.(*Outlier))
})
}

// TODO(todd):
// Once we can handle sufficient throughput to live on the hot
// execution path in #81021, we can probably get rid of this external
// concept of "enabled" and let the detectors just decide for themselves
// internally.
func (r *registry) enabled() bool {
return r.detector.enabled()
}
2 changes: 1 addition & 1 deletion pkg/sql/sqlstats/sslocal/sql_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type SQLStats struct {

knobs *sqlstats.TestingKnobs

outliers *outliers.Registry
outliers outliers.Registry
}

func newSQLStats(
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ type Container struct {
mon *mon.BytesMonitor

knobs *sqlstats.TestingKnobs
outliersRegistry *outliers.Registry
outliersRegistry outliers.Registry
}

var _ sqlstats.ApplicationStats = &Container{}
Expand All @@ -135,7 +135,7 @@ func New(
mon *mon.BytesMonitor,
appName string,
knobs *sqlstats.TestingKnobs,
outliersRegistry *outliers.Registry,
outliersRegistry outliers.Registry,
) *Container {
s := &Container{
st: st,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ func TestTraceDistSQL(t *testing.T) {
require.True(t, ok, "table reader span not found")
require.Empty(t, rec.OrphanSpans())
// Check that the table reader indeed came from a remote note.
anonTagGroup := sp.FindTagGroup("")
anonTagGroup := sp.FindTagGroup(tracingpb.AnonymousTagGroupName)
require.NotNil(t, anonTagGroup)
val, ok := anonTagGroup.FindTag("node")
require.True(t, ok)
Expand Down
Loading

0 comments on commit 41228d1

Please sign in to comment.