Skip to content

Commit

Permalink
sql: expose the number of BatchRequests issued by the fetchers
Browse files Browse the repository at this point in the history
This commit teaches `row.KVFetcher` to track the number of BatchRequests
issued to perform the reads and then exposes this information along side
other execution statistics like the number of bytes read. I imagine that
this can be handy in some cases, but even more so with the increased
usage of the streamer.

Release note (sql change): A new execution statistic that tracks the
number of gRPC calls issued to perform the read operations has been
added to EXPLAIN ANALYZE output. It exposes low-level details that might
aid with debugging the performance of queries for power users.
  • Loading branch information
yuzefovich committed Jun 28, 2022
1 parent 8e53415 commit ce5e4c1
Show file tree
Hide file tree
Showing 32 changed files with 294 additions and 131 deletions.
14 changes: 14 additions & 0 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"runtime"
"sort"
"sync"
"sync/atomic"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down Expand Up @@ -225,6 +226,10 @@ type Streamer struct {
maxKeysPerRow int32
budget *budget

atomics struct {
batchRequestsIssued *int64
}

coordinator workerCoordinator
coordinatorStarted bool
coordinatorCtxCancel context.CancelFunc
Expand Down Expand Up @@ -304,6 +309,9 @@ func max(a, b int64) int64 {
// The Streamer takes ownership of the memory account, and the caller is allowed
// to interact with the account only after canceling the Streamer (because
// memory accounts are not thread-safe).
//
// batchRequestsIssued should be incremented every time a new BatchRequest is
// sent.
func NewStreamer(
distSender *kvcoord.DistSender,
stopper *stop.Stopper,
Expand All @@ -312,6 +320,7 @@ func NewStreamer(
lockWaitPolicy lock.WaitPolicy,
limitBytes int64,
acc *mon.BoundAccount,
batchRequestsIssued *int64,
) *Streamer {
if txn.Type() != kv.LeafTxn {
panic(errors.AssertionFailedf("RootTxn is given to the Streamer"))
Expand All @@ -321,6 +330,10 @@ func NewStreamer(
stopper: stopper,
budget: newBudget(acc, limitBytes),
}
if batchRequestsIssued == nil {
batchRequestsIssued = new(int64)
}
s.atomics.batchRequestsIssued = batchRequestsIssued
s.coordinator = workerCoordinator{
s: s,
txn: txn,
Expand Down Expand Up @@ -1078,6 +1091,7 @@ func (w *workerCoordinator) performRequestAsync(
w.s.results.setError(err.GoError())
return
}
atomic.AddInt64(w.s.atomics.batchRequestsIssued, 1)

// First, we have to reconcile the memory budget. We do it
// separately from processing the results because we want to know
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvstreamer/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func getStreamer(
lock.WaitPolicy(0),
limitBytes,
acc,
nil, /* batchRequestsIssued */
)
}

Expand Down Expand Up @@ -96,6 +97,7 @@ func TestStreamerLimitations(t *testing.T) {
lock.WaitPolicy(0),
math.MaxInt64, /* limitBytes */
nil, /* acc */
nil, /* batchRequestsIssued */
)
})
})
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colexecop/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ type KVReader interface {
// GetRowsRead returns the number of rows read from KV by this operator.
// It must be safe for concurrent use.
GetRowsRead() int64
// GetBatchRequestsIssued returns the number of BatchRequests issued to KV
// by this operator. It must be safe for concurrent use.
GetBatchRequestsIssued() int64
// GetCumulativeContentionTime returns the amount of time KV reads spent
// contending. It must be safe for concurrent use.
GetCumulativeContentionTime() time.Duration
Expand Down
24 changes: 18 additions & 6 deletions pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,15 @@ type cFetcher struct {

// fetcher is the underlying fetcher that provides KVs.
fetcher *row.KVFetcher
// bytesRead stores the total number of bytes read by this cFetcher
// throughout its lifetime in case when the underlying row.KVFetcher has
// already been closed and nil-ed out.
// bytesRead and batchRequestsIssued store the total number of bytes read
// and of BatchRequests issued, respectively, by this cFetcher throughout
// its lifetime in case when the underlying row.KVFetcher has already been
// closed and nil-ed out.
//
// The field should not be accessed directly by the users of the cFetcher -
// getBytesRead() should be used instead.
bytesRead int64
// The fields should not be accessed directly by the users of the cFetcher -
// getBytesRead() and getBatchRequestsIssued() should be used instead.
bytesRead int64
batchRequestsIssued int64

// machine contains fields that get updated during the run of the fetcher.
machine struct {
Expand Down Expand Up @@ -1292,6 +1294,15 @@ func (cf *cFetcher) getBytesRead() int64 {
return cf.bytesRead
}

// getBatchRequestsIssued returns the number of BatchRequests issued by the
// cFetcher throughout its lifetime so far.
func (cf *cFetcher) getBatchRequestsIssued() int64 {
if cf.fetcher != nil {
return cf.fetcher.GetBatchRequestsIssued()
}
return cf.batchRequestsIssued
}

var cFetcherPool = sync.Pool{
New: func() interface{} {
return &cFetcher{}
Expand All @@ -1315,6 +1326,7 @@ func (cf *cFetcher) Release() {
func (cf *cFetcher) Close(ctx context.Context) {
if cf != nil && cf.fetcher != nil {
cf.bytesRead = cf.fetcher.GetBytesRead()
cf.batchRequestsIssued = cf.fetcher.GetBatchRequestsIssued()
cf.fetcher.Close(ctx)
cf.fetcher = nil
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,13 @@ func (s *ColBatchScan) GetRowsRead() int64 {
return s.mu.rowsRead
}

// GetBatchRequestsIssued is part of the colexecop.KVReader interface.
func (s *ColBatchScan) GetBatchRequestsIssued() int64 {
s.mu.Lock()
defer s.mu.Unlock()
return s.cf.getBatchRequestsIssued()
}

// GetCumulativeContentionTime is part of the colexecop.KVReader interface.
func (s *ColBatchScan) GetCumulativeContentionTime() time.Duration {
return execstats.GetCumulativeContentionTime(s.Ctx, nil /* recording */)
Expand Down
12 changes: 8 additions & 4 deletions pkg/sql/colfetcher/index_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execreleasable"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/memsize"
Expand Down Expand Up @@ -126,9 +125,7 @@ type ColIndexJoin struct {
usesStreamer bool
}

var _ colexecop.KVReader = &ColIndexJoin{}
var _ execreleasable.Releasable = &ColIndexJoin{}
var _ colexecop.ClosableOperator = &ColIndexJoin{}
var _ ScanOperator = &ColIndexJoin{}

// Init initializes a ColIndexJoin.
func (s *ColIndexJoin) Init(ctx context.Context) {
Expand Down Expand Up @@ -392,6 +389,13 @@ func (s *ColIndexJoin) GetRowsRead() int64 {
return s.mu.rowsRead
}

// GetBatchRequestsIssued is part of the colexecop.KVReader interface.
func (s *ColIndexJoin) GetBatchRequestsIssued() int64 {
s.mu.Lock()
defer s.mu.Unlock()
return s.cf.getBatchRequestsIssued()
}

// GetCumulativeContentionTime is part of the colexecop.KVReader interface.
func (s *ColIndexJoin) GetCumulativeContentionTime() time.Duration {
return execstats.GetCumulativeContentionTime(s.Ctx, nil /* recording */)
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colflow/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ func (vsc *vectorizedStatsCollectorImpl) GetStats() *execinfrapb.ComponentStats
s.KV.KVTime.Set(time)
s.KV.TuplesRead.Set(uint64(vsc.kvReader.GetRowsRead()))
s.KV.BytesRead.Set(uint64(vsc.kvReader.GetBytesRead()))
s.KV.BatchRequestsIssued.Set(uint64(vsc.kvReader.GetBatchRequestsIssued()))
s.KV.ContentionTime.Set(vsc.kvReader.GetCumulativeContentionTime())
scanStats := vsc.kvReader.GetScanStats()
execstats.PopulateKVMVCCStats(&s.KV, &scanStats)
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/execinfrapb/component_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ func (s *ComponentStats) formatStats(fn func(suffix string, value interface{}))
if s.KV.BytesRead.HasValue() {
fn("KV bytes read", humanize.IBytes(s.KV.BytesRead.Value()))
}
if s.KV.BatchRequestsIssued.HasValue() {
fn("KV gRPC calls", humanizeutil.Count(s.KV.BatchRequestsIssued.Value()))
}
if s.KV.NumInterfaceSteps.HasValue() {
fn("MVCC step count (ext/int)",
fmt.Sprintf("%s/%s",
Expand Down Expand Up @@ -249,6 +252,9 @@ func (s *ComponentStats) Union(other *ComponentStats) *ComponentStats {
if !result.KV.BytesRead.HasValue() {
result.KV.BytesRead = other.KV.BytesRead
}
if !result.KV.BatchRequestsIssued.HasValue() {
result.KV.BatchRequestsIssued = other.KV.BatchRequestsIssued
}

// Exec stats.
if !result.Exec.ExecTime.HasValue() {
Expand Down Expand Up @@ -340,6 +346,10 @@ func (s *ComponentStats) MakeDeterministic() {
// BytesRead is overridden to a useful value for tests.
s.KV.BytesRead.Set(8 * s.KV.TuplesRead.Value())
}
if s.KV.BatchRequestsIssued.HasValue() {
// BatchRequestsIssued is overridden to a useful value for tests.
s.KV.BatchRequestsIssued.Set(s.KV.TuplesRead.Value())
}

// Exec.
timeVal(&s.Exec.ExecTime)
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/execinfrapb/component_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ message NetworkTxStats {
message KVStats {
optional util.optional.Uint bytes_read = 1 [(gogoproto.nullable) = false];
optional util.optional.Uint tuples_read = 2 [(gogoproto.nullable) = false];
optional util.optional.Uint batch_requests_issued = 9 [(gogoproto.nullable) = false];

// Cumulated time spent waiting for a KV request. This includes disk IO time
// and potentially network time (if any of the keys are not local).
Expand Down
59 changes: 34 additions & 25 deletions pkg/sql/execstats/traceanalyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,29 +99,31 @@ func NewFlowsMetadata(flows map[base.SQLInstanceID]*execinfrapb.FlowSpec) *Flows
// TODO(asubiotto): Flatten this struct, we're currently allocating a map per
// stat.
type NodeLevelStats struct {
NetworkBytesSentGroupedByNode map[base.SQLInstanceID]int64
MaxMemoryUsageGroupedByNode map[base.SQLInstanceID]int64
MaxDiskUsageGroupedByNode map[base.SQLInstanceID]int64
KVBytesReadGroupedByNode map[base.SQLInstanceID]int64
KVRowsReadGroupedByNode map[base.SQLInstanceID]int64
KVTimeGroupedByNode map[base.SQLInstanceID]time.Duration
NetworkMessagesGroupedByNode map[base.SQLInstanceID]int64
ContentionTimeGroupedByNode map[base.SQLInstanceID]time.Duration
NetworkBytesSentGroupedByNode map[base.SQLInstanceID]int64
MaxMemoryUsageGroupedByNode map[base.SQLInstanceID]int64
MaxDiskUsageGroupedByNode map[base.SQLInstanceID]int64
KVBytesReadGroupedByNode map[base.SQLInstanceID]int64
KVRowsReadGroupedByNode map[base.SQLInstanceID]int64
KVBatchRequestsIssuedGroupedByNode map[base.SQLInstanceID]int64
KVTimeGroupedByNode map[base.SQLInstanceID]time.Duration
NetworkMessagesGroupedByNode map[base.SQLInstanceID]int64
ContentionTimeGroupedByNode map[base.SQLInstanceID]time.Duration
}

// QueryLevelStats returns all the query level stats that correspond to the
// given traces and flow metadata.
// NOTE: When adding fields to this struct, be sure to update Accumulate.
type QueryLevelStats struct {
NetworkBytesSent int64
MaxMemUsage int64
MaxDiskUsage int64
KVBytesRead int64
KVRowsRead int64
KVTime time.Duration
NetworkMessages int64
ContentionTime time.Duration
Regions []string
NetworkBytesSent int64
MaxMemUsage int64
MaxDiskUsage int64
KVBytesRead int64
KVRowsRead int64
KVBatchRequestsIssued int64
KVTime time.Duration
NetworkMessages int64
ContentionTime time.Duration
Regions []string
}

// Accumulate accumulates other's stats into the receiver.
Expand All @@ -135,6 +137,7 @@ func (s *QueryLevelStats) Accumulate(other QueryLevelStats) {
}
s.KVBytesRead += other.KVBytesRead
s.KVRowsRead += other.KVRowsRead
s.KVBatchRequestsIssued += other.KVBatchRequestsIssued
s.KVTime += other.KVTime
s.NetworkMessages += other.NetworkMessages
s.ContentionTime += other.ContentionTime
Expand Down Expand Up @@ -205,14 +208,15 @@ func (a *TraceAnalyzer) AddTrace(trace []tracingpb.RecordedSpan, makeDeterminist
func (a *TraceAnalyzer) ProcessStats() error {
// Process node level stats.
a.nodeLevelStats = NodeLevelStats{
NetworkBytesSentGroupedByNode: make(map[base.SQLInstanceID]int64),
MaxMemoryUsageGroupedByNode: make(map[base.SQLInstanceID]int64),
MaxDiskUsageGroupedByNode: make(map[base.SQLInstanceID]int64),
KVBytesReadGroupedByNode: make(map[base.SQLInstanceID]int64),
KVRowsReadGroupedByNode: make(map[base.SQLInstanceID]int64),
KVTimeGroupedByNode: make(map[base.SQLInstanceID]time.Duration),
NetworkMessagesGroupedByNode: make(map[base.SQLInstanceID]int64),
ContentionTimeGroupedByNode: make(map[base.SQLInstanceID]time.Duration),
NetworkBytesSentGroupedByNode: make(map[base.SQLInstanceID]int64),
MaxMemoryUsageGroupedByNode: make(map[base.SQLInstanceID]int64),
MaxDiskUsageGroupedByNode: make(map[base.SQLInstanceID]int64),
KVBytesReadGroupedByNode: make(map[base.SQLInstanceID]int64),
KVRowsReadGroupedByNode: make(map[base.SQLInstanceID]int64),
KVBatchRequestsIssuedGroupedByNode: make(map[base.SQLInstanceID]int64),
KVTimeGroupedByNode: make(map[base.SQLInstanceID]time.Duration),
NetworkMessagesGroupedByNode: make(map[base.SQLInstanceID]int64),
ContentionTimeGroupedByNode: make(map[base.SQLInstanceID]time.Duration),
}
var errs error

Expand All @@ -224,6 +228,7 @@ func (a *TraceAnalyzer) ProcessStats() error {
instanceID := stats.Component.SQLInstanceID
a.nodeLevelStats.KVBytesReadGroupedByNode[instanceID] += int64(stats.KV.BytesRead.Value())
a.nodeLevelStats.KVRowsReadGroupedByNode[instanceID] += int64(stats.KV.TuplesRead.Value())
a.nodeLevelStats.KVBatchRequestsIssuedGroupedByNode[instanceID] += int64(stats.KV.BatchRequestsIssued.Value())
a.nodeLevelStats.KVTimeGroupedByNode[instanceID] += stats.KV.KVTime.Value()
a.nodeLevelStats.ContentionTimeGroupedByNode[instanceID] += stats.KV.ContentionTime.Value()
}
Expand Down Expand Up @@ -320,6 +325,10 @@ func (a *TraceAnalyzer) ProcessStats() error {
a.queryLevelStats.KVRowsRead += kvRowsRead
}

for _, kvBatchRequestsIssued := range a.nodeLevelStats.KVBatchRequestsIssuedGroupedByNode {
a.queryLevelStats.KVBatchRequestsIssued += kvBatchRequestsIssued
}

for _, kvTime := range a.nodeLevelStats.KVTimeGroupedByNode {
a.queryLevelStats.KVTime += kvTime
}
Expand Down
57 changes: 30 additions & 27 deletions pkg/sql/execstats/traceanalyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,37 +241,40 @@ func TestTraceAnalyzerProcessStats(t *testing.T) {

func TestQueryLevelStatsAccumulate(t *testing.T) {
a := execstats.QueryLevelStats{
NetworkBytesSent: 1,
MaxMemUsage: 2,
KVBytesRead: 3,
KVRowsRead: 4,
KVTime: 5 * time.Second,
NetworkMessages: 6,
ContentionTime: 7 * time.Second,
MaxDiskUsage: 8,
Regions: []string{"gcp-us-east1"},
NetworkBytesSent: 1,
MaxMemUsage: 2,
KVBytesRead: 3,
KVRowsRead: 4,
KVBatchRequestsIssued: 4,
KVTime: 5 * time.Second,
NetworkMessages: 6,
ContentionTime: 7 * time.Second,
MaxDiskUsage: 8,
Regions: []string{"gcp-us-east1"},
}
b := execstats.QueryLevelStats{
NetworkBytesSent: 8,
MaxMemUsage: 9,
KVBytesRead: 10,
KVRowsRead: 11,
KVTime: 12 * time.Second,
NetworkMessages: 13,
ContentionTime: 14 * time.Second,
MaxDiskUsage: 15,
Regions: []string{"gcp-us-west1"},
NetworkBytesSent: 8,
MaxMemUsage: 9,
KVBytesRead: 10,
KVRowsRead: 11,
KVBatchRequestsIssued: 11,
KVTime: 12 * time.Second,
NetworkMessages: 13,
ContentionTime: 14 * time.Second,
MaxDiskUsage: 15,
Regions: []string{"gcp-us-west1"},
}
expected := execstats.QueryLevelStats{
NetworkBytesSent: 9,
MaxMemUsage: 9,
KVBytesRead: 13,
KVRowsRead: 15,
KVTime: 17 * time.Second,
NetworkMessages: 19,
ContentionTime: 21 * time.Second,
MaxDiskUsage: 15,
Regions: []string{"gcp-us-east1", "gcp-us-west1"},
NetworkBytesSent: 9,
MaxMemUsage: 9,
KVBytesRead: 13,
KVRowsRead: 15,
KVBatchRequestsIssued: 15,
KVTime: 17 * time.Second,
NetworkMessages: 19,
ContentionTime: 21 * time.Second,
MaxDiskUsage: 15,
Regions: []string{"gcp-us-east1", "gcp-us-west1"},
}

aCopy := a
Expand Down
Loading

0 comments on commit ce5e4c1

Please sign in to comment.