Skip to content

Commit

Permalink
Merge #76726
Browse files Browse the repository at this point in the history
76726: ttljob: paginate ranges when doing TTL r=rafiss a=otan

This commit batches ranges instead of scanning them in all at once. The
range batch size is controlled by the cluster setting
`sql.ttl.range_batch_size`. I don't anticipate this needing to be
controlled per table.

Release note: None

Co-authored-by: Oliver Tan <[email protected]>
  • Loading branch information
craig[bot] and otan committed Feb 22, 2022
2 parents 4c65a10 + 58ec80e commit a6064fe
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 20 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ sql.ttl.default_delete_batch_size integer 100 default amount of rows to delete i
sql.ttl.default_delete_rate_limit integer 0 default delete rate limit for all TTL jobs. Use 0 to signify no rate limit.
sql.ttl.default_range_concurrency integer 1 default amount of ranges to process at once during a TTL delete
sql.ttl.default_select_batch_size integer 500 default amount of rows to select in a single query during a TTL job
sql.ttl.range_batch_size integer 100 amount of ranges to fetch at a time for a table during the TTL job
timeseries.storage.enabled boolean true if set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere
timeseries.storage.resolution_10s.ttl duration 240h0m0s the maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion.
timeseries.storage.resolution_30m.ttl duration 2160h0m0s the maximum age of time series data stored at the 30 minute resolution. Data older than this is subject to deletion.
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@
<tr><td><code>sql.ttl.default_delete_rate_limit</code></td><td>integer</td><td><code>0</code></td><td>default delete rate limit for all TTL jobs. Use 0 to signify no rate limit.</td></tr>
<tr><td><code>sql.ttl.default_range_concurrency</code></td><td>integer</td><td><code>1</code></td><td>default amount of ranges to process at once during a TTL delete</td></tr>
<tr><td><code>sql.ttl.default_select_batch_size</code></td><td>integer</td><td><code>500</code></td><td>default amount of rows to select in a single query during a TTL job</td></tr>
<tr><td><code>sql.ttl.range_batch_size</code></td><td>integer</td><td><code>100</code></td><td>amount of ranges to fetch at a time for a table during the TTL job</td></tr>
<tr><td><code>timeseries.storage.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere</td></tr>
<tr><td><code>timeseries.storage.resolution_10s.ttl</code></td><td>duration</td><td><code>240h0m0s</code></td><td>the maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion.</td></tr>
<tr><td><code>timeseries.storage.resolution_30m.ttl</code></td><td>duration</td><td><code>2160h0m0s</code></td><td>the maximum age of time series data stored at the 30 minute resolution. Data older than this is subject to deletion.</td></tr>
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/ttl/ttljob/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ go_library(
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
Expand Down
77 changes: 58 additions & 19 deletions pkg/sql/ttl/ttljob/ttljob.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -70,6 +69,14 @@ var (
0,
settings.NonNegativeInt,
).WithPublic()

rangeBatchSize = settings.RegisterIntSetting(
settings.TenantWritable,
"sql.ttl.range_batch_size",
"amount of ranges to fetch at a time for a table during the TTL job",
100,
settings.PositiveInt,
).WithPublic()
)

type rowLevelTTLResumer struct {
Expand Down Expand Up @@ -228,8 +235,8 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
var pkColumns []string
var pkTypes []*types.T
var pkDirs []descpb.IndexDescriptor_Direction
var ranges []kv.KeyValue
var name string
var rangeSpan roachpb.Span
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
desc, err := descs.GetImmutableTableByID(
ctx,
Expand Down Expand Up @@ -263,12 +270,6 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
if ttl == nil {
return errors.Newf("unable to find TTL on table %s", desc.GetName())
}
ttlSettings = *ttl

ranges, err = kvclient.ScanMetaKVs(ctx, txn, desc.TableSpan(p.ExecCfg().Codec))
if err != nil {
return err
}

_, dbDesc, err := descs.GetImmutableDatabaseByID(
ctx,
Expand Down Expand Up @@ -299,6 +300,8 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
tree.Name(desc.GetName()),
)
name = tn.FQString()
rangeSpan = desc.TableSpan(p.ExecCfg().Codec)
ttlSettings = *ttl
return nil
}); err != nil {
return err
Expand Down Expand Up @@ -362,20 +365,56 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
close(ch)
retErr = errors.CombineErrors(retErr, g.Wait())
}()
for _, r := range ranges {
if err := r.ValueProto(&rangeDesc); err != nil {
return err
}
var nextRange rangeToProcess
nextRange.startPK, err = keyToDatums(rangeDesc.StartKey, p.ExecCfg().Codec, pkTypes, pkDirs, &alloc)
if err != nil {
done := false

batchSize := rangeBatchSize.Get(p.ExecCfg().SV())
for !done {
var ranges []kv.KeyValue

// Scan ranges up to rangeBatchSize.
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
metaStart := keys.RangeMetaKey(keys.MustAddr(rangeSpan.Key).Next())
metaEnd := keys.RangeMetaKey(keys.MustAddr(rangeSpan.EndKey))

kvs, err := txn.Scan(ctx, metaStart, metaEnd, batchSize)
if err != nil {
return err
}
if len(kvs) < int(batchSize) {
done = true
if len(kvs) == 0 || !kvs[len(kvs)-1].Key.Equal(metaEnd.AsRawKey()) {
// Normally we need to scan one more KV because the ranges are addressed by
// the end key.
extraKV, err := txn.Scan(ctx, metaEnd, keys.Meta2Prefix.PrefixEnd(), 1 /* one result */)
if err != nil {
return err
}
kvs = append(kvs, extraKV[0])
}
}
ranges = kvs
return nil
}); err != nil {
return err
}
nextRange.endPK, err = keyToDatums(rangeDesc.EndKey, p.ExecCfg().Codec, pkTypes, pkDirs, &alloc)
if err != nil {
return err

// Send these to each goroutine worker.
for _, r := range ranges {
if err := r.ValueProto(&rangeDesc); err != nil {
return err
}
rangeSpan.Key = rangeDesc.EndKey.AsRawKey()
var nextRange rangeToProcess
nextRange.startPK, err = keyToDatums(rangeDesc.StartKey, p.ExecCfg().Codec, pkTypes, pkDirs, &alloc)
if err != nil {
return err
}
nextRange.endPK, err = keyToDatums(rangeDesc.EndKey, p.ExecCfg().Codec, pkTypes, pkDirs, &alloc)
if err != nil {
return err
}
ch <- nextRange
}
ch <- nextRange
}
return nil
}(); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/ttl/ttljob/ttljob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,11 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) {
})
defer cleanupFunc()

rangeBatchSize := 1 + rng.Intn(3)
t.Logf("range batch size: %d", rangeBatchSize)

th.sqlDB.Exec(t, tc.createTable)
th.sqlDB.Exec(t, `SET CLUSTER SETTING sql.ttl.range_batch_size = $1`, rangeBatchSize)

// Extract the columns from CREATE TABLE.
stmt, err := parser.ParseOne(tc.createTable)
Expand Down

0 comments on commit a6064fe

Please sign in to comment.