Skip to content

Commit

Permalink
Merge #81917
Browse files Browse the repository at this point in the history
81917: sql: Surface metrics with each TTL deletion job r=otan a=ecwall

refs #80309

Release note (sql change): Add rowCount to ttl job progress.

Co-authored-by: Evan Wall <[email protected]>
  • Loading branch information
craig[bot] and ecwall committed Jun 14, 2022
2 parents 200be2f + bb85cc4 commit 7d9562d
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 13 deletions.
1 change: 1 addition & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,7 @@ message RowLevelTTLDetails {
}

message RowLevelTTLProgress {
int64 row_count = 1;
}

message Payload {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/ttl/ttljob/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ go_test(
"//pkg/base",
"//pkg/ccl/kvccl/kvtenantccl",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobstest",
"//pkg/keys",
"//pkg/kv",
Expand All @@ -77,6 +78,7 @@ go_test(
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
Expand Down
37 changes: 27 additions & 10 deletions pkg/sql/ttl/ttljob/ttljob.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"
"math"
"regexp"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs"
Expand Down Expand Up @@ -357,11 +358,12 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err

statsCloseCh := make(chan struct{})
ch := make(chan rangeToProcess, rangeConcurrency)
rowCount := int64(0)
for i := 0; i < rangeConcurrency; i++ {
g.GoCtx(func(ctx context.Context) error {
for r := range ch {
start := timeutil.Now()
err := runTTLOnRange(
rangeRowCount, err := runTTLOnRange(
ctx,
p.ExecCfg(),
details,
Expand All @@ -378,6 +380,8 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
deleteRateLimiter,
*aost,
)
// add before returning err in case of partial success
atomic.AddInt64(&rowCount, rangeRowCount)
metrics.RangeTotalDuration.RecordValue(int64(timeutil.Since(start)))
if err != nil {
// Continue until channel is fully read.
Expand Down Expand Up @@ -414,6 +418,14 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
close(ch)
close(statsCloseCh)
retErr = errors.CombineErrors(retErr, g.Wait())
retErr = errors.CombineErrors(retErr, db.Txn(ctx, func(_ context.Context, txn *kv.Txn) error {
return t.job.Update(ctx, txn, func(_ *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
progress := md.Progress
progress.Details.(*jobspb.Progress_RowLevelTTL).RowLevelTTL.RowCount += rowCount
ju.UpdateProgress(progress)
return nil
})
}))
}()

ri := kvcoord.MakeRangeIterator(p.ExecCfg().DistSender)
Expand Down Expand Up @@ -562,6 +574,7 @@ func fetchStatistics(
}
}

// rangeRowCount should be checked even if the function returns an error because it may have partially succeeded
func runTTLOnRange(
ctx context.Context,
execCfg *sql.ExecutorConfig,
Expand All @@ -577,7 +590,7 @@ func runTTLOnRange(
selectBatchSize, deleteBatchSize int,
deleteRateLimiter *quotapool.RateLimiter,
aost tree.DTimestampTZ,
) error {
) (rangeRowCount int64, err error) {
metrics.NumActiveRanges.Inc(1)
defer metrics.NumActiveRanges.Dec(1)

Expand Down Expand Up @@ -608,13 +621,13 @@ func runTTLOnRange(
for {
if f := knobs.OnDeleteLoopStart; f != nil {
if err := f(); err != nil {
return err
return rangeRowCount, err
}
}

// Check the job is enabled on every iteration.
if enabled := jobEnabled.Get(execCfg.SV()); !enabled {
return errors.Newf(
return rangeRowCount, errors.Newf(
"ttl jobs are currently disabled by CLUSTER SETTING %s",
jobEnabled.Key(),
)
Expand All @@ -626,7 +639,7 @@ func runTTLOnRange(
expiredRowsPKs, err := selectBuilder.run(ctx, ie)
metrics.DeleteDuration.RecordValue(int64(timeutil.Since(start)))
if err != nil {
return errors.Wrapf(err, "error selecting rows to delete")
return rangeRowCount, errors.Wrapf(err, "error selecting rows to delete")
}
metrics.RowSelections.Inc(int64(len(expiredRowsPKs)))

Expand Down Expand Up @@ -660,19 +673,23 @@ func runTTLOnRange(
desc.GetModificationTime().GoTime().Format(time.RFC3339),
)
}

tokens, err := deleteRateLimiter.Acquire(ctx, int64(len(deleteBatch)))
if err != nil {
return err
}
defer tokens.Consume()

start := timeutil.Now()
err = deleteBuilder.run(ctx, ie, txn, deleteBatch)
rowCount, err := deleteBuilder.run(ctx, ie, txn, deleteBatch)
if err != nil {
return err
}

metrics.DeleteDuration.RecordValue(int64(timeutil.Since(start)))
return err
rangeRowCount += int64(rowCount)
return nil
}); err != nil {
return errors.Wrapf(err, "error during row deletion")
return rangeRowCount, errors.Wrapf(err, "error during row deletion")
}
metrics.RowDeletions.Inc(int64(len(deleteBatch)))
}
Expand All @@ -685,7 +702,7 @@ func runTTLOnRange(
break
}
}
return nil
return rangeRowCount, nil
}

// keyToDatums translates a RKey on a range for a table to the appropriate datums.
Expand Down
5 changes: 2 additions & 3 deletions pkg/sql/ttl/ttljob/ttljob_query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,10 @@ func (b *deleteQueryBuilder) buildQueryAndArgs(rows []tree.Datums) (string, []in

func (b *deleteQueryBuilder) run(
ctx context.Context, ie *sql.InternalExecutor, txn *kv.Txn, rows []tree.Datums,
) error {
) (int, error) {
q, deleteArgs := b.buildQueryAndArgs(rows)
qosLevel := sessiondatapb.TTLLow
_, err := ie.ExecEx(
return ie.ExecEx(
ctx,
b.deleteOpName,
txn,
Expand All @@ -295,7 +295,6 @@ func (b *deleteQueryBuilder) run(
q,
deleteArgs...,
)
return err
}

// makeColumnNamesSQL converts columns into an escape string
Expand Down
25 changes: 25 additions & 0 deletions pkg/sql/ttl/ttljob/ttljob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobstest"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand All @@ -39,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -605,6 +607,29 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) {
fmt.Sprintf(`SELECT count(1) FROM %s WHERE crdb_internal_expiration >= now()`, createTableStmt.Table.Table()),
).Scan(&numRows)
require.Equal(t, tc.numNonExpiredRows, numRows)

rows := th.sqlDB.Query(t, `
SELECT sys_j.status, sys_j.progress
FROM crdb_internal.jobs AS crdb_j
JOIN system.jobs as sys_j ON crdb_j.job_id = sys_j.id
WHERE crdb_j.job_type = 'ROW LEVEL TTL'
`)
jobCount := 0
for rows.Next() {
var status string
var progressBytes []byte
require.NoError(t, rows.Scan(&status, &progressBytes))

require.Equal(t, "succeeded", status)

var progress jobspb.Progress
require.NoError(t, protoutil.Unmarshal(progressBytes, &progress))

rowLevelTTLProgress := progress.UnwrapDetails().(jobspb.RowLevelTTLProgress)
require.Equal(t, int64(tc.numExpiredRows), rowLevelTTLProgress.RowCount)
jobCount++
}
require.Equal(t, 1, jobCount)
})
}
}

0 comments on commit 7d9562d

Please sign in to comment.