Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
83365: sql: expose the number of BatchRequests issued by the fetchers r=yuzefovich a=yuzefovich

**colfetcher: simplify tracking the number of bytes read**

Given the recent cleanup of lifecycles of different fetcher objects, we
can simplify the way we track the number of bytes read by the cFetcher.
Previously, when the cFetcher was used for the index joins, we would
call `StartScanFrom` which would re-create the underlying KVFetcher, but
now a single object is used throughout the whole operation, so we can
just ask it directly for this statistic. We still close the cFetcher
eagerly in some cases, so we still need to store the statistic
explicitly in such cases.

Release note: None

**sql: expose the number of BatchRequests issued by the fetchers**

This commit teaches `row.KVFetcher` to track the number of BatchRequests
issued to perform the reads and then exposes this information along side
other execution statistics like the number of bytes read. I imagine that
this can be handy in some cases, but even more so with the increased
usage of the streamer.

Fixes: cockroachdb#81998.
Addresses: cockroachdb#82156.

Release note (sql change): A new execution statistic that tracks the
number of gRPC calls issued to perform the read operations has been
added to EXPLAIN ANALYZE output. It exposes low-level details that might
aid with debugging the performance of queries for power users.

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Jun 29, 2022
2 parents cf6ee8f + ce5e4c1 commit 2e1c19b
Show file tree
Hide file tree
Showing 32 changed files with 291 additions and 140 deletions.
14 changes: 14 additions & 0 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"runtime"
"sort"
"sync"
"sync/atomic"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down Expand Up @@ -225,6 +226,10 @@ type Streamer struct {
maxKeysPerRow int32
budget *budget

atomics struct {
batchRequestsIssued *int64
}

coordinator workerCoordinator
coordinatorStarted bool
coordinatorCtxCancel context.CancelFunc
Expand Down Expand Up @@ -304,6 +309,9 @@ func max(a, b int64) int64 {
// The Streamer takes ownership of the memory account, and the caller is allowed
// to interact with the account only after canceling the Streamer (because
// memory accounts are not thread-safe).
//
// batchRequestsIssued should be incremented every time a new BatchRequest is
// sent.
func NewStreamer(
distSender *kvcoord.DistSender,
stopper *stop.Stopper,
Expand All @@ -312,6 +320,7 @@ func NewStreamer(
lockWaitPolicy lock.WaitPolicy,
limitBytes int64,
acc *mon.BoundAccount,
batchRequestsIssued *int64,
) *Streamer {
if txn.Type() != kv.LeafTxn {
panic(errors.AssertionFailedf("RootTxn is given to the Streamer"))
Expand All @@ -321,6 +330,10 @@ func NewStreamer(
stopper: stopper,
budget: newBudget(acc, limitBytes),
}
if batchRequestsIssued == nil {
batchRequestsIssued = new(int64)
}
s.atomics.batchRequestsIssued = batchRequestsIssued
s.coordinator = workerCoordinator{
s: s,
txn: txn,
Expand Down Expand Up @@ -1079,6 +1092,7 @@ func (w *workerCoordinator) performRequestAsync(
w.s.results.setError(err.GoError())
return
}
atomic.AddInt64(w.s.atomics.batchRequestsIssued, 1)

// First, we have to reconcile the memory budget. We do it
// separately from processing the results because we want to know
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvstreamer/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func getStreamer(
lock.WaitPolicy(0),
limitBytes,
acc,
nil, /* batchRequestsIssued */
)
}

Expand Down Expand Up @@ -96,6 +97,7 @@ func TestStreamerLimitations(t *testing.T) {
lock.WaitPolicy(0),
math.MaxInt64, /* limitBytes */
nil, /* acc */
nil, /* batchRequestsIssued */
)
})
})
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colexecop/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ type KVReader interface {
// GetRowsRead returns the number of rows read from KV by this operator.
// It must be safe for concurrent use.
GetRowsRead() int64
// GetBatchRequestsIssued returns the number of BatchRequests issued to KV
// by this operator. It must be safe for concurrent use.
GetBatchRequestsIssued() int64
// GetCumulativeContentionTime returns the amount of time KV reads spent
// contending. It must be safe for concurrent use.
GetCumulativeContentionTime() time.Duration
Expand Down
32 changes: 21 additions & 11 deletions pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,15 @@ type cFetcher struct {

// fetcher is the underlying fetcher that provides KVs.
fetcher *row.KVFetcher
// bytesRead stores the cumulative number of bytes read by this cFetcher
// throughout its whole existence (i.e. between its construction and
// Release()). It accumulates the bytes read statistic across StartScan and
// Close methods.
// bytesRead and batchRequestsIssued store the total number of bytes read
// and of BatchRequests issued, respectively, by this cFetcher throughout
// its lifetime in case when the underlying row.KVFetcher has already been
// closed and nil-ed out.
//
// The field should not be accessed directly by the users of the cFetcher -
// getBytesRead() should be used instead.
bytesRead int64
// The fields should not be accessed directly by the users of the cFetcher -
// getBytesRead() and getBatchRequestsIssued() should be used instead.
bytesRead int64
batchRequestsIssued int64

// machine contains fields that get updated during the run of the fetcher.
machine struct {
Expand Down Expand Up @@ -1285,15 +1286,23 @@ func (cf *cFetcher) convertFetchError(ctx context.Context, err error) error {
}

// getBytesRead returns the number of bytes read by the cFetcher throughout its
// existence so far. This number accumulates the bytes read statistic across
// StartScan and Close methods.
// lifetime so far.
func (cf *cFetcher) getBytesRead() int64 {
if cf.fetcher != nil {
cf.bytesRead += cf.fetcher.ResetBytesRead()
return cf.fetcher.GetBytesRead()
}
return cf.bytesRead
}

// getBatchRequestsIssued returns the number of BatchRequests issued by the
// cFetcher throughout its lifetime so far.
func (cf *cFetcher) getBatchRequestsIssued() int64 {
if cf.fetcher != nil {
return cf.fetcher.GetBatchRequestsIssued()
}
return cf.batchRequestsIssued
}

var cFetcherPool = sync.Pool{
New: func() interface{} {
return &cFetcher{}
Expand All @@ -1316,7 +1325,8 @@ func (cf *cFetcher) Release() {

func (cf *cFetcher) Close(ctx context.Context) {
if cf != nil && cf.fetcher != nil {
cf.bytesRead += cf.fetcher.GetBytesRead()
cf.bytesRead = cf.fetcher.GetBytesRead()
cf.batchRequestsIssued = cf.fetcher.GetBatchRequestsIssued()
cf.fetcher.Close(ctx)
cf.fetcher = nil
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,13 @@ func (s *ColBatchScan) GetRowsRead() int64 {
return s.mu.rowsRead
}

// GetBatchRequestsIssued is part of the colexecop.KVReader interface.
func (s *ColBatchScan) GetBatchRequestsIssued() int64 {
s.mu.Lock()
defer s.mu.Unlock()
return s.cf.getBatchRequestsIssued()
}

// GetCumulativeContentionTime is part of the colexecop.KVReader interface.
func (s *ColBatchScan) GetCumulativeContentionTime() time.Duration {
return execstats.GetCumulativeContentionTime(s.Ctx, nil /* recording */)
Expand Down
12 changes: 8 additions & 4 deletions pkg/sql/colfetcher/index_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execreleasable"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/memsize"
Expand Down Expand Up @@ -126,9 +125,7 @@ type ColIndexJoin struct {
usesStreamer bool
}

var _ colexecop.KVReader = &ColIndexJoin{}
var _ execreleasable.Releasable = &ColIndexJoin{}
var _ colexecop.ClosableOperator = &ColIndexJoin{}
var _ ScanOperator = &ColIndexJoin{}

// Init initializes a ColIndexJoin.
func (s *ColIndexJoin) Init(ctx context.Context) {
Expand Down Expand Up @@ -392,6 +389,13 @@ func (s *ColIndexJoin) GetRowsRead() int64 {
return s.mu.rowsRead
}

// GetBatchRequestsIssued is part of the colexecop.KVReader interface.
func (s *ColIndexJoin) GetBatchRequestsIssued() int64 {
s.mu.Lock()
defer s.mu.Unlock()
return s.cf.getBatchRequestsIssued()
}

// GetCumulativeContentionTime is part of the colexecop.KVReader interface.
func (s *ColIndexJoin) GetCumulativeContentionTime() time.Duration {
return execstats.GetCumulativeContentionTime(s.Ctx, nil /* recording */)
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colflow/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ func (vsc *vectorizedStatsCollectorImpl) GetStats() *execinfrapb.ComponentStats
s.KV.KVTime.Set(time)
s.KV.TuplesRead.Set(uint64(vsc.kvReader.GetRowsRead()))
s.KV.BytesRead.Set(uint64(vsc.kvReader.GetBytesRead()))
s.KV.BatchRequestsIssued.Set(uint64(vsc.kvReader.GetBatchRequestsIssued()))
s.KV.ContentionTime.Set(vsc.kvReader.GetCumulativeContentionTime())
scanStats := vsc.kvReader.GetScanStats()
execstats.PopulateKVMVCCStats(&s.KV, &scanStats)
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/execinfrapb/component_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ func (s *ComponentStats) formatStats(fn func(suffix string, value interface{}))
if s.KV.BytesRead.HasValue() {
fn("KV bytes read", humanize.IBytes(s.KV.BytesRead.Value()))
}
if s.KV.BatchRequestsIssued.HasValue() {
fn("KV gRPC calls", humanizeutil.Count(s.KV.BatchRequestsIssued.Value()))
}
if s.KV.NumInterfaceSteps.HasValue() {
fn("MVCC step count (ext/int)",
fmt.Sprintf("%s/%s",
Expand Down Expand Up @@ -249,6 +252,9 @@ func (s *ComponentStats) Union(other *ComponentStats) *ComponentStats {
if !result.KV.BytesRead.HasValue() {
result.KV.BytesRead = other.KV.BytesRead
}
if !result.KV.BatchRequestsIssued.HasValue() {
result.KV.BatchRequestsIssued = other.KV.BatchRequestsIssued
}

// Exec stats.
if !result.Exec.ExecTime.HasValue() {
Expand Down Expand Up @@ -340,6 +346,10 @@ func (s *ComponentStats) MakeDeterministic() {
// BytesRead is overridden to a useful value for tests.
s.KV.BytesRead.Set(8 * s.KV.TuplesRead.Value())
}
if s.KV.BatchRequestsIssued.HasValue() {
// BatchRequestsIssued is overridden to a useful value for tests.
s.KV.BatchRequestsIssued.Set(s.KV.TuplesRead.Value())
}

// Exec.
timeVal(&s.Exec.ExecTime)
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/execinfrapb/component_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ message NetworkTxStats {
message KVStats {
optional util.optional.Uint bytes_read = 1 [(gogoproto.nullable) = false];
optional util.optional.Uint tuples_read = 2 [(gogoproto.nullable) = false];
optional util.optional.Uint batch_requests_issued = 9 [(gogoproto.nullable) = false];

// Cumulated time spent waiting for a KV request. This includes disk IO time
// and potentially network time (if any of the keys are not local).
Expand Down
59 changes: 34 additions & 25 deletions pkg/sql/execstats/traceanalyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,29 +99,31 @@ func NewFlowsMetadata(flows map[base.SQLInstanceID]*execinfrapb.FlowSpec) *Flows
// TODO(asubiotto): Flatten this struct, we're currently allocating a map per
// stat.
type NodeLevelStats struct {
NetworkBytesSentGroupedByNode map[base.SQLInstanceID]int64
MaxMemoryUsageGroupedByNode map[base.SQLInstanceID]int64
MaxDiskUsageGroupedByNode map[base.SQLInstanceID]int64
KVBytesReadGroupedByNode map[base.SQLInstanceID]int64
KVRowsReadGroupedByNode map[base.SQLInstanceID]int64
KVTimeGroupedByNode map[base.SQLInstanceID]time.Duration
NetworkMessagesGroupedByNode map[base.SQLInstanceID]int64
ContentionTimeGroupedByNode map[base.SQLInstanceID]time.Duration
NetworkBytesSentGroupedByNode map[base.SQLInstanceID]int64
MaxMemoryUsageGroupedByNode map[base.SQLInstanceID]int64
MaxDiskUsageGroupedByNode map[base.SQLInstanceID]int64
KVBytesReadGroupedByNode map[base.SQLInstanceID]int64
KVRowsReadGroupedByNode map[base.SQLInstanceID]int64
KVBatchRequestsIssuedGroupedByNode map[base.SQLInstanceID]int64
KVTimeGroupedByNode map[base.SQLInstanceID]time.Duration
NetworkMessagesGroupedByNode map[base.SQLInstanceID]int64
ContentionTimeGroupedByNode map[base.SQLInstanceID]time.Duration
}

// QueryLevelStats returns all the query level stats that correspond to the
// given traces and flow metadata.
// NOTE: When adding fields to this struct, be sure to update Accumulate.
type QueryLevelStats struct {
NetworkBytesSent int64
MaxMemUsage int64
MaxDiskUsage int64
KVBytesRead int64
KVRowsRead int64
KVTime time.Duration
NetworkMessages int64
ContentionTime time.Duration
Regions []string
NetworkBytesSent int64
MaxMemUsage int64
MaxDiskUsage int64
KVBytesRead int64
KVRowsRead int64
KVBatchRequestsIssued int64
KVTime time.Duration
NetworkMessages int64
ContentionTime time.Duration
Regions []string
}

// Accumulate accumulates other's stats into the receiver.
Expand All @@ -135,6 +137,7 @@ func (s *QueryLevelStats) Accumulate(other QueryLevelStats) {
}
s.KVBytesRead += other.KVBytesRead
s.KVRowsRead += other.KVRowsRead
s.KVBatchRequestsIssued += other.KVBatchRequestsIssued
s.KVTime += other.KVTime
s.NetworkMessages += other.NetworkMessages
s.ContentionTime += other.ContentionTime
Expand Down Expand Up @@ -205,14 +208,15 @@ func (a *TraceAnalyzer) AddTrace(trace []tracingpb.RecordedSpan, makeDeterminist
func (a *TraceAnalyzer) ProcessStats() error {
// Process node level stats.
a.nodeLevelStats = NodeLevelStats{
NetworkBytesSentGroupedByNode: make(map[base.SQLInstanceID]int64),
MaxMemoryUsageGroupedByNode: make(map[base.SQLInstanceID]int64),
MaxDiskUsageGroupedByNode: make(map[base.SQLInstanceID]int64),
KVBytesReadGroupedByNode: make(map[base.SQLInstanceID]int64),
KVRowsReadGroupedByNode: make(map[base.SQLInstanceID]int64),
KVTimeGroupedByNode: make(map[base.SQLInstanceID]time.Duration),
NetworkMessagesGroupedByNode: make(map[base.SQLInstanceID]int64),
ContentionTimeGroupedByNode: make(map[base.SQLInstanceID]time.Duration),
NetworkBytesSentGroupedByNode: make(map[base.SQLInstanceID]int64),
MaxMemoryUsageGroupedByNode: make(map[base.SQLInstanceID]int64),
MaxDiskUsageGroupedByNode: make(map[base.SQLInstanceID]int64),
KVBytesReadGroupedByNode: make(map[base.SQLInstanceID]int64),
KVRowsReadGroupedByNode: make(map[base.SQLInstanceID]int64),
KVBatchRequestsIssuedGroupedByNode: make(map[base.SQLInstanceID]int64),
KVTimeGroupedByNode: make(map[base.SQLInstanceID]time.Duration),
NetworkMessagesGroupedByNode: make(map[base.SQLInstanceID]int64),
ContentionTimeGroupedByNode: make(map[base.SQLInstanceID]time.Duration),
}
var errs error

Expand All @@ -224,6 +228,7 @@ func (a *TraceAnalyzer) ProcessStats() error {
instanceID := stats.Component.SQLInstanceID
a.nodeLevelStats.KVBytesReadGroupedByNode[instanceID] += int64(stats.KV.BytesRead.Value())
a.nodeLevelStats.KVRowsReadGroupedByNode[instanceID] += int64(stats.KV.TuplesRead.Value())
a.nodeLevelStats.KVBatchRequestsIssuedGroupedByNode[instanceID] += int64(stats.KV.BatchRequestsIssued.Value())
a.nodeLevelStats.KVTimeGroupedByNode[instanceID] += stats.KV.KVTime.Value()
a.nodeLevelStats.ContentionTimeGroupedByNode[instanceID] += stats.KV.ContentionTime.Value()
}
Expand Down Expand Up @@ -320,6 +325,10 @@ func (a *TraceAnalyzer) ProcessStats() error {
a.queryLevelStats.KVRowsRead += kvRowsRead
}

for _, kvBatchRequestsIssued := range a.nodeLevelStats.KVBatchRequestsIssuedGroupedByNode {
a.queryLevelStats.KVBatchRequestsIssued += kvBatchRequestsIssued
}

for _, kvTime := range a.nodeLevelStats.KVTimeGroupedByNode {
a.queryLevelStats.KVTime += kvTime
}
Expand Down
Loading

0 comments on commit 2e1c19b

Please sign in to comment.