diff --git a/pkg/sql/colfetcher/BUILD.bazel b/pkg/sql/colfetcher/BUILD.bazel index 2a2763e6d2d9..04f3ab2eecdf 100644 --- a/pkg/sql/colfetcher/BUILD.bazel +++ b/pkg/sql/colfetcher/BUILD.bazel @@ -64,6 +64,7 @@ stringer( go_test( name = "colfetcher_test", srcs = [ + "bytes_read_test.go", "main_test.go", "vectorized_batch_size_test.go", ], @@ -80,5 +81,6 @@ go_test( "//pkg/util/log", "//pkg/util/randutil", "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/sql/colfetcher/bytes_read_test.go b/pkg/sql/colfetcher/bytes_read_test.go new file mode 100644 index 000000000000..e3403435ba51 --- /dev/null +++ b/pkg/sql/colfetcher/bytes_read_test.go @@ -0,0 +1,69 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package colfetcher_test + +import ( + "context" + "regexp" + "strconv" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +// TestBytesRead verifies that the ColBatchScan and the ColIndexJoin correctly +// report the number of bytes read. +func TestBytesRead(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testClusterArgs := base.TestClusterArgs{ReplicationMode: base.ReplicationAuto} + tc := testcluster.StartTestCluster(t, 1, testClusterArgs) + ctx := context.Background() + defer tc.Stopper().Stop(ctx) + + conn := tc.Conns[0] + + // Create the table with disabled automatic table stats collection. The + // stats collection is disabled so that the ColBatchScan would read the + // first row in one batch and then the second row in another batch. + _, err := conn.ExecContext(ctx, ` +SET CLUSTER SETTING sql.stats.automatic_collection.enabled=false; +CREATE TABLE t (a INT PRIMARY KEY, b INT, c INT, INDEX(b)); +INSERT INTO t VALUES (1, 1, 1), (2, 2, 2); +`) + require.NoError(t, err) + + // Run the query that reads from the secondary index and then performs an + // index join against the primary index. + query := "EXPLAIN ANALYZE SELECT * FROM t@t_b_idx" + kvBytesReadRegex := regexp.MustCompile(`KV bytes read: (\d+) B`) + matchIdx := 0 + rows, err := conn.QueryContext(ctx, query) + require.NoError(t, err) + for rows.Next() { + var res string + require.NoError(t, rows.Scan(&res)) + if matches := kvBytesReadRegex.FindStringSubmatch(res); len(matches) > 0 { + bytesRead, err := strconv.Atoi(matches[1]) + require.NoError(t, err) + // We're only interested in 'bytes read' statistic being non-zero. + require.Greater( + t, bytesRead, 0, "expected bytes read to be greater than zero", + ) + matchIdx++ + } + } +} diff --git a/pkg/sql/colfetcher/cfetcher.go b/pkg/sql/colfetcher/cfetcher.go index aa7956c4685e..115b15bca063 100644 --- a/pkg/sql/colfetcher/cfetcher.go +++ b/pkg/sql/colfetcher/cfetcher.go @@ -256,6 +256,14 @@ type cFetcher struct { // fetcher is the underlying fetcher that provides KVs. fetcher *row.KVFetcher + // bytesRead stores the cumulative number of bytes read by this cFetcher + // throughout its whole existence (i.e. between its construction and + // Release()). It accumulates the bytes read statistic across StartScan* and + // Close methods. + // + // The field should not be accessed directly by the users of the cFetcher - + // getBytesRead() should be used instead. + bytesRead int64 // machine contains fields that get updated during the run of the fetcher. machine struct { @@ -1050,6 +1058,8 @@ func (rf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) { case stateEmitLastBatch: rf.machine.state[0] = stateFinished rf.finalizeBatch() + // Close the fetcher eagerly so that its memory could be GCed. + rf.Close(ctx) return rf.machine.batch, nil case stateFinished: @@ -1457,6 +1467,16 @@ func (rf *cFetcher) KeyToDesc(key roachpb.Key) (catalog.TableDescriptor, bool) { return rf.table.desc, true } +// getBytesRead returns the number of bytes read by the cFetcher throughout its +// existence so far. This number accumulates the bytes read statistic across +// StartScan* and Close methods. +func (rf *cFetcher) getBytesRead() int64 { + if rf.fetcher != nil { + rf.bytesRead += rf.fetcher.ResetBytesRead() + } + return rf.bytesRead +} + var cFetcherPool = sync.Pool{ New: func() interface{} { return &cFetcher{} @@ -1479,6 +1499,7 @@ func (rf *cFetcher) Release() { func (rf *cFetcher) Close(ctx context.Context) { if rf != nil && rf.fetcher != nil { + rf.bytesRead += rf.fetcher.GetBytesRead() rf.fetcher.Close(ctx) rf.fetcher = nil } diff --git a/pkg/sql/colfetcher/colbatch_scan.go b/pkg/sql/colfetcher/colbatch_scan.go index 649ba57c7c1d..97385a531ac0 100644 --- a/pkg/sql/colfetcher/colbatch_scan.go +++ b/pkg/sql/colfetcher/colbatch_scan.go @@ -149,11 +149,7 @@ func (s *ColBatchScan) DrainMeta() []execinfrapb.ProducerMetadata { func (s *ColBatchScan) GetBytesRead() int64 { s.mu.Lock() defer s.mu.Unlock() - // Note that if Init() was never called, s.rf.fetcher will remain nil, and - // GetBytesRead() will return 0. We are also holding the mutex, so a - // concurrent call to Init() will have to wait, and the fetcher will remain - // uninitialized until we return. - return s.rf.fetcher.GetBytesRead() + return s.rf.getBytesRead() } // GetRowsRead is part of the colexecop.KVReader interface. diff --git a/pkg/sql/colfetcher/index_join.go b/pkg/sql/colfetcher/index_join.go index b97d61858267..cc85cba7a519 100644 --- a/pkg/sql/colfetcher/index_join.go +++ b/pkg/sql/colfetcher/index_join.go @@ -398,11 +398,7 @@ func (s *ColIndexJoin) DrainMeta() []execinfrapb.ProducerMetadata { func (s *ColIndexJoin) GetBytesRead() int64 { s.mu.Lock() defer s.mu.Unlock() - // Note that if Init() was never called, s.rf.fetcher will remain nil, and - // GetBytesRead() will return 0. We are also holding the mutex, so a - // concurrent call to Init() will have to wait, and the fetcher will remain - // uninitialized until we return. - return s.rf.fetcher.GetBytesRead() + return s.rf.getBytesRead() } // GetRowsRead is part of the colexecop.KVReader interface. diff --git a/pkg/sql/row/kv_fetcher.go b/pkg/sql/row/kv_fetcher.go index 7c0ed1b164f5..d774b3730688 100644 --- a/pkg/sql/row/kv_fetcher.go +++ b/pkg/sql/row/kv_fetcher.go @@ -131,6 +131,16 @@ func (f *KVFetcher) GetBytesRead() int64 { return atomic.LoadInt64(&f.atomics.bytesRead) } +// ResetBytesRead resets the number of bytes read by this fetcher and returns +// the number before the reset. It is safe for concurrent use and is able to +// handle a case of uninitialized fetcher. +func (f *KVFetcher) ResetBytesRead() int64 { + if f == nil { + return 0 + } + return atomic.SwapInt64(&f.atomics.bytesRead, 0) +} + // MVCCDecodingStrategy controls if and how the fetcher should decode MVCC // timestamps from returned KV's. type MVCCDecodingStrategy int