diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 84c022a5b556b..53044feb37c9d 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -257,6 +257,12 @@ func (builder *RequestBuilder) SetStreaming(streaming bool) *RequestBuilder { return builder } +// SetPaging sets "Paging" flag for "kv.Request". +func (builder *RequestBuilder) SetPaging(paging bool) *RequestBuilder { + builder.Request.Paging = paging + return builder +} + // SetConcurrency sets "Concurrency" for "kv.Request". func (builder *RequestBuilder) SetConcurrency(concurrency int) *RequestBuilder { builder.Request.Concurrency = concurrency diff --git a/kv/kv.go b/kv/kv.go index 22610d17b8d9a..0e83f3daee8a0 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -338,6 +338,8 @@ type Request struct { MatchStoreLabels []*metapb.StoreLabel // ResourceGroupTagger indicates the kv request task group tagger. ResourceGroupTagger tikvrpc.ResourceGroupTagger + // Paging indicates whether the request is a paging request. + Paging bool } const ( diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 57d6f75c25902..b8b025955fed4 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -960,6 +960,9 @@ type SessionVars struct { // Rng stores the rand_seed1 and rand_seed2 for Rand() function Rng *utilMath.MysqlRng + + // EnablePaging indicates whether enable paging in coprocessor requests. + EnablePaging bool } // InitStatementContext initializes a StatementContext, the object is reused to reduce allocation. diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index d66eb6c88096f..f76a9ff9f0c98 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1873,6 +1873,10 @@ var defaultSysVars = []*SysVar{ }, GetSession: func(s *SessionVars) (string, error) { return "0", nil }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnablePaging, Value: Off, Type: TypeBool, Hidden: true, skipInit: true, SetSession: func(s *SessionVars, val string) error { + s.EnablePaging = TiDBOptOn(val) + return nil + }}, } func collectAllowFuncName4ExpressionIndex() string { diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index ef7c001841d13..2a5e98ece6dd5 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -214,6 +214,9 @@ const ( // TiDBReadStaleness indicates the staleness duration for following statement TiDBReadStaleness = "tidb_read_staleness" + + // TiDBEnablePaging indicates whether paging is enabled in coprocessor requests. + TiDBEnablePaging = "tidb_enable_paging" ) // TiDB system variable names that both in session and global scope. diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 5606cb246e863..50ec15dd01176 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -61,6 +61,18 @@ const ( copNextMaxBackoff = 20000 ) +// A paging request may be separated into multi requests if there are more data than a page. +// The paging size grows from min to max, it's not well tuned yet. +// e.g. a paging request scans over range (r1, r200), it requires 64 rows in the first batch, +// if it's not drained, then the paging size grows, the new range is calculated like (r100, r200), then send a request again. +// Compare with the common unary request, paging request allows early access of data, it offers a streaming-like way processing data. +// TODO: may make the paging parameters configurable. +const ( + minPagingSize uint64 = 64 + maxPagingSize = minPagingSize * 128 + pagingSizeGrow uint64 = 2 +) + // CopClient is coprocessor client. type CopClient struct { kv.RequestTypeSupportedChecker @@ -78,6 +90,9 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa logutil.BgLogger().Debug("send batch requests") return c.sendBatch(ctx, req, vars) } + if req.Streaming && req.Paging { + return copErrorResponse{errors.New("streaming and paging are both on")} + } ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs) bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars) ranges := NewKeyRanges(req.KeyRanges) @@ -115,6 +130,13 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa // 2*it.concurrency to avoid deadlock in the unit test caused by the `MustExec` or `Exec` capacity = it.concurrency * 2 } + // in streaming or paging request, a request will be returned in multi batches, + // enlarge the channel size to avoid the request blocked by buffer full. + if req.Streaming || req.Paging { + if capacity < 2048 { + capacity = 2048 + } + } it.respChan = make(chan *copResponse, capacity) it.sendRate = util.NewRateLimit(it.concurrency) } @@ -140,7 +162,9 @@ type copTask struct { cmdType tikvrpc.CmdType storeType kv.StoreType - eventCb trxevents.EventCallback + eventCb trxevents.EventCallback + paging bool + pagingSize uint64 } func (r *copTask) String() string { @@ -168,6 +192,14 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv if err != nil { return nil, errors.Trace(err) } + // Channel buffer is 2 for handling region split. + // In a common case, two region split tasks will not be blocked. + chanSize := 2 + // in streaming or paging request, a request will be returned in multi batches, + // enlarge the channel size to avoid the request blocked by buffer full. + if req.Streaming || req.Paging { + chanSize = 128 + } var tasks []*copTask for _, loc := range locs { @@ -176,15 +208,21 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv rLen := loc.Ranges.Len() for i := 0; i < rLen; { nextI := mathutil.Min(i+rangesPerTask, rLen) + // If this is a paging request, we set the paging size to minPagingSize, + // the size will grow every round. + pagingSize := uint64(0) + if req.Paging { + pagingSize = minPagingSize + } tasks = append(tasks, &copTask{ - region: loc.Location.Region, - ranges: loc.Ranges.Slice(i, nextI), - // Channel buffer is 2 for handling region split. - // In a common case, two region split tasks will not be blocked. - respChan: make(chan *copResponse, 2), - cmdType: cmdType, - storeType: req.StoreType, - eventCb: eventCb, + region: loc.Location.Region, + ranges: loc.Ranges.Slice(i, nextI), + respChan: make(chan *copResponse, chanSize), + cmdType: cmdType, + storeType: req.StoreType, + eventCb: eventCb, + paging: req.Paging, + pagingSize: pagingSize, }) i = nextI } @@ -386,13 +424,8 @@ func (worker *copIteratorWorker) run(ctx context.Context) { worker.sendToRespCh(finCopResp, worker.respChan, false) } close(task.respChan) - if worker.vars != nil && worker.vars.Killed != nil && atomic.LoadUint32(worker.vars.Killed) == 1 { - return - } - select { - case <-worker.finishCh: + if worker.finished() { return - default: } } } @@ -648,11 +681,9 @@ func (worker *copIteratorWorker) handleTask(ctx context.Context, task *copTask, worker.sendToRespCh(resp, respCh, true) return } - // test whether the ctx is cancelled - if vars := bo.GetVars(); vars != nil && vars.Killed != nil && atomic.LoadUint32(vars.Killed) == 1 { - return + if worker.finished() { + break } - if len(tasks) > 0 { remainTasks = append(tasks, remainTasks[1:]...) } else { @@ -674,19 +705,21 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch }) copReq := coprocessor.Request{ - Tp: worker.req.Tp, - StartTs: worker.req.StartTs, - Data: worker.req.Data, - Ranges: task.ranges.ToPBRanges(), - SchemaVer: worker.req.SchemaVar, + Tp: worker.req.Tp, + StartTs: worker.req.StartTs, + Data: worker.req.Data, + Ranges: task.ranges.ToPBRanges(), + SchemaVer: worker.req.SchemaVar, + PagingSize: task.pagingSize, } - var cacheKey []byte = nil - var cacheValue *coprCacheValue = nil + var cacheKey []byte + var cacheValue *coprCacheValue + // TODO: cache paging copr // If there are many ranges, it is very likely to be a TableLookupRequest. They are not worth to cache since // computing is not the main cost. Ignore such requests directly to avoid slowly building the cache key. - if task.cmdType == tikvrpc.CmdCop && worker.store.coprCache != nil && worker.req.Cacheable && worker.store.coprCache.CheckRequestAdmission(len(copReq.Ranges)) { + if task.cmdType == tikvrpc.CmdCop && !task.paging && worker.store.coprCache != nil && worker.req.Cacheable && worker.store.coprCache.CheckRequestAdmission(len(copReq.Ranges)) { cKey, err := coprCacheBuildKey(&copReq) if err == nil { cacheKey = cKey @@ -753,6 +786,10 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch return worker.handleCopStreamResult(bo, rpcCtx, resp.Resp.(*tikvrpc.CopStreamResponse), task, ch, costTime) } + if worker.req.Paging { + return worker.handleCopPagingResult(bo, rpcCtx, &copResponse{pbResp: resp.Resp.(*coprocessor.Response)}, task, ch, costTime) + } + // Handles the response for non-streaming copTask. return worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: resp.Resp.(*coprocessor.Response)}, cacheKey, cacheValue, task, ch, nil, costTime) } @@ -862,7 +899,8 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *ti } else { logutil.BgLogger().Info("stream unknown error", zap.Error(err)) } - return worker.buildCopTasksFromRemain(bo, lastRange, task) + task.ranges = worker.calculateRemain(task.ranges, lastRange, worker.req.Desc) + return []*copTask{task}, nil } if resp.Range != nil { lastRange = resp.Range @@ -870,6 +908,29 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *ti } } +func (worker *copIteratorWorker) handleCopPagingResult(bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse, task *copTask, ch chan<- *copResponse, costTime time.Duration) ([]*copTask, error) { + remainedTasks, err := worker.handleCopResponse(bo, rpcCtx, resp, nil, nil, task, ch, nil, costTime) + if err != nil || len(remainedTasks) != 0 { + // If there is region error or lock error, keep the paging size and retry. + for _, remainedTask := range remainedTasks { + remainedTask.pagingSize = task.pagingSize + } + return remainedTasks, errors.Trace(err) + } + pagingRange := resp.pbResp.Range + // only paging requests need to calculate the next ranges + if pagingRange == nil { + return nil, errors.New("lastRange in paging should not be nil") + } + // calculate next ranges and grow the paging size + task.ranges = worker.calculateRemain(task.ranges, pagingRange, worker.req.Desc) + if task.ranges.Len() == 0 { + return nil, nil + } + task.pagingSize = growPagingSize(task.pagingSize) + return []*copTask{task}, nil +} + // handleCopResponse checks coprocessor Response for region split and lock, // returns more tasks when that happens, or handles the response if no error. // if we're handling streaming coprocessor response, lastRange is the range of last @@ -909,7 +970,10 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R return nil, errors.Trace(err) } } - return worker.buildCopTasksFromRemain(bo, lastRange, task) + if worker.req.Streaming { + task.ranges = worker.calculateRetry(task.ranges, lastRange, worker.req.Desc) + } + return []*copTask{task}, nil } if otherErr := resp.pbResp.GetOtherError(); otherErr != "" { err := errors.Errorf("other error: %s", otherErr) @@ -1037,30 +1101,56 @@ func (worker *copIteratorWorker) handleTiDBSendReqErr(err error, task *copTask, return nil } -func (worker *copIteratorWorker) buildCopTasksFromRemain(bo *Backoffer, lastRange *coprocessor.KeyRange, task *copTask) ([]*copTask, error) { - remainedRanges := task.ranges - if worker.req.Streaming && lastRange != nil { - remainedRanges = worker.calculateRemain(task.ranges, lastRange, worker.req.Desc) +// calculateRetry splits the input ranges into two, and take one of them according to desc flag. +// It's used in streaming API, to calculate which range is consumed and what needs to be retry. +// For example: +// ranges: [r1 --> r2) [r3 --> r4) +// split: [s1 --> s2) +// In normal scan order, all data before s1 is consumed, so the retry ranges should be [s1 --> r2) [r3 --> r4) +// In reverse scan order, all data after s2 is consumed, so the retry ranges should be [r1 --> r2) [r3 --> s2) +func (worker *copIteratorWorker) calculateRetry(ranges *KeyRanges, split *coprocessor.KeyRange, desc bool) *KeyRanges { + if split == nil { + return ranges + } + if desc { + left, _ := ranges.Split(split.End) + return left } - return buildCopTasks(bo, worker.store.GetRegionCache(), remainedRanges, worker.req, task.eventCb) + _, right := ranges.Split(split.Start) + return right } -// calculateRemain splits the input ranges into two, and take one of them according to desc flag. -// It's used in streaming API, to calculate which range is consumed and what needs to be retry. +// calculateRemain calculates the remain ranges to be processed, it's used in streaming and paging API. // For example: // ranges: [r1 --> r2) [r3 --> r4) // split: [s1 --> s2) -// In normal scan order, all data before s1 is consumed, so the remain ranges should be [s1 --> r2) [r3 --> r4) -// In reverse scan order, all data after s2 is consumed, so the remain ranges should be [r1 --> r2) [r3 --> s2) +// In normal scan order, all data before s2 is consumed, so the remained ranges should be [s2 --> r4) +// In reverse scan order, all data after s1 is consumed, so the remained ranges should be [r1 --> s1) func (worker *copIteratorWorker) calculateRemain(ranges *KeyRanges, split *coprocessor.KeyRange, desc bool) *KeyRanges { + if split == nil { + return ranges + } if desc { - left, _ := ranges.Split(split.End) + left, _ := ranges.Split(split.Start) return left } - _, right := ranges.Split(split.Start) + _, right := ranges.Split(split.End) return right } +// finished checks the flags and finished channel, it tells whether the worker is finished. +func (worker *copIteratorWorker) finished() bool { + if worker.vars != nil && worker.vars.Killed != nil && atomic.LoadUint32(worker.vars.Killed) == 1 { + return true + } + select { + case <-worker.finishCh: + return true + default: + return false + } +} + func (it *copIterator) Close() error { if atomic.CompareAndSwapUint32(&it.closed, 0, 1) { close(it.finishCh) @@ -1241,3 +1331,11 @@ func isolationLevelToPB(level kv.IsoLevel) kvrpcpb.IsolationLevel { return kvrpcpb.IsolationLevel_SI } } + +func growPagingSize(size uint64) uint64 { + size *= pagingSizeGrow + if size > maxPagingSize { + return maxPagingSize + } + return size +} diff --git a/store/copr/coprocessor_test.go b/store/copr/coprocessor_test.go index 213a1bb948056..15d8ac069dd5e 100644 --- a/store/copr/coprocessor_test.go +++ b/store/copr/coprocessor_test.go @@ -18,6 +18,7 @@ import ( "context" "testing" + "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/driver/backoff" "github.com/stretchr/testify/require" @@ -289,3 +290,174 @@ func rangeEqual(t *testing.T, ranges []kv.KeyRange, keys ...string) { require.Equal(t, string(r.EndKey), keys[2*i+1]) } } + +func TestBuildPagingTasks(t *testing.T) { + t.Parallel() + // nil --- 'g' --- 'n' --- 't' --- nil + // <- 0 -> <- 1 -> <- 2 -> <- 3 -> + mockClient, cluster, pdClient, err := testutils.NewMockTiKV("", nil) + require.NoError(t, err) + defer func() { + pdClient.Close() + err = mockClient.Close() + require.NoError(t, err) + }() + + _, regionIDs, _ := testutils.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t")) + pdCli := &tikv.CodecPDClient{Client: pdClient} + defer pdCli.Close() + + cache := NewRegionCache(tikv.NewRegionCache(pdCli)) + defer cache.Close() + + bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil) + + req := &kv.Request{} + req.Paging = true + flashReq := &kv.Request{} + flashReq.StoreType = kv.TiFlash + tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), req, nil) + require.NoError(t, err) + require.Len(t, tasks, 1) + require.Len(t, tasks, 1) + taskEqual(t, tasks[0], regionIDs[0], "a", "c") + require.True(t, tasks[0].paging) + require.Equal(t, tasks[0].pagingSize, minPagingSize) +} + +func toCopRange(r kv.KeyRange) *coprocessor.KeyRange { + coprRange := coprocessor.KeyRange{} + coprRange.Start = r.StartKey + coprRange.End = r.EndKey + return &coprRange +} + +func toRange(r *KeyRanges) []kv.KeyRange { + ranges := make([]kv.KeyRange, 0, r.Len()) + if r.first != nil { + ranges = append(ranges, *r.first) + } + ranges = append(ranges, r.mid...) + if r.last != nil { + ranges = append(ranges, *r.last) + } + return ranges +} + +func TestCalculateRetry(t *testing.T) { + t.Parallel() + worker := copIteratorWorker{} + + // split in one range + { + ranges := buildKeyRanges("a", "c", "e", "g") + split := buildKeyRanges("b", "c")[0] + retry := worker.calculateRetry(NewKeyRanges(ranges), toCopRange(split), false) + rangeEqual(t, toRange(retry), "b", "c", "e", "g") + } + { + ranges := buildKeyRanges("a", "c", "e", "g") + split := buildKeyRanges("e", "f")[0] + retry := worker.calculateRetry(NewKeyRanges(ranges), toCopRange(split), true) + rangeEqual(t, toRange(retry), "a", "c", "e", "f") + } + + // across ranges + { + ranges := buildKeyRanges("a", "c", "e", "g") + split := buildKeyRanges("b", "f")[0] + retry := worker.calculateRetry(NewKeyRanges(ranges), toCopRange(split), false) + rangeEqual(t, toRange(retry), "b", "c", "e", "g") + } + { + ranges := buildKeyRanges("a", "c", "e", "g") + split := buildKeyRanges("b", "f")[0] + retry := worker.calculateRetry(NewKeyRanges(ranges), toCopRange(split), true) + rangeEqual(t, toRange(retry), "a", "c", "e", "f") + } + + // exhaust the ranges + { + ranges := buildKeyRanges("a", "c", "e", "g") + split := buildKeyRanges("a", "g")[0] + retry := worker.calculateRetry(NewKeyRanges(ranges), toCopRange(split), false) + rangeEqual(t, toRange(retry), "a", "c", "e", "g") + } + { + ranges := buildKeyRanges("a", "c", "e", "g") + split := buildKeyRanges("a", "g")[0] + retry := worker.calculateRetry(NewKeyRanges(ranges), toCopRange(split), true) + rangeEqual(t, toRange(retry), "a", "c", "e", "g") + } + + // nil range + { + ranges := buildKeyRanges("a", "c", "e", "g") + retry := worker.calculateRetry(NewKeyRanges(ranges), nil, false) + rangeEqual(t, toRange(retry), "a", "c", "e", "g") + } + { + ranges := buildKeyRanges("a", "c", "e", "g") + retry := worker.calculateRetry(NewKeyRanges(ranges), nil, true) + rangeEqual(t, toRange(retry), "a", "c", "e", "g") + } +} + +func TestCalculateRemain(t *testing.T) { + t.Parallel() + worker := copIteratorWorker{} + + // split in one range + { + ranges := buildKeyRanges("a", "c", "e", "g") + split := buildKeyRanges("a", "b")[0] + remain := worker.calculateRemain(NewKeyRanges(ranges), toCopRange(split), false) + rangeEqual(t, toRange(remain), "b", "c", "e", "g") + } + { + ranges := buildKeyRanges("a", "c", "e", "g") + split := buildKeyRanges("f", "g")[0] + remain := worker.calculateRemain(NewKeyRanges(ranges), toCopRange(split), true) + rangeEqual(t, toRange(remain), "a", "c", "e", "f") + } + + // across ranges + { + ranges := buildKeyRanges("a", "c", "e", "g") + split := buildKeyRanges("a", "f")[0] + remain := worker.calculateRemain(NewKeyRanges(ranges), toCopRange(split), false) + rangeEqual(t, toRange(remain), "f", "g") + } + { + ranges := buildKeyRanges("a", "c", "e", "g") + split := buildKeyRanges("b", "g")[0] + remain := worker.calculateRemain(NewKeyRanges(ranges), toCopRange(split), true) + rangeEqual(t, toRange(remain), "a", "b") + } + + // exhaust the ranges + { + ranges := buildKeyRanges("a", "c", "e", "g") + split := buildKeyRanges("a", "g")[0] + remain := worker.calculateRemain(NewKeyRanges(ranges), toCopRange(split), false) + require.Equal(t, remain.Len(), 0) + } + { + ranges := buildKeyRanges("a", "c", "e", "g") + split := buildKeyRanges("a", "g")[0] + remain := worker.calculateRemain(NewKeyRanges(ranges), toCopRange(split), true) + require.Equal(t, remain.Len(), 0) + } + + // nil range + { + ranges := buildKeyRanges("a", "c", "e", "g") + remain := worker.calculateRemain(NewKeyRanges(ranges), nil, false) + rangeEqual(t, toRange(remain), "a", "c", "e", "g") + } + { + ranges := buildKeyRanges("a", "c", "e", "g") + remain := worker.calculateRemain(NewKeyRanges(ranges), nil, true) + rangeEqual(t, toRange(remain), "a", "c", "e", "g") + } +}