Skip to content

Commit

Permalink
*: Make use of the upperBound of ticlient's kv_scan interface to ensu…
Browse files Browse the repository at this point in the history
…re no overbound scan will happen (#8081) (#8310)
  • Loading branch information
MyonKeminta authored and winkyao committed Nov 19, 2018
1 parent 7ebfa28 commit 51b4882
Show file tree
Hide file tree
Showing 10 changed files with 44 additions and 23 deletions.
3 changes: 2 additions & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package ddl

import (
"math"
"time"

"github.com/juju/errors"
Expand Down Expand Up @@ -304,7 +305,7 @@ func (d *ddl) addTableColumn(t table.Table, columnInfo *model.ColumnInfo, reorgI
for {
startTime := time.Now()
handles = handles[:0]
err = iterateSnapshotRows(d.store, t, version, seekHandle,
err = iterateSnapshotRows(d.store, t, version, seekHandle, math.MaxInt64, true,
func(h int64, rowKey kv.Key, rawRecord []byte) (bool, error) {
handles = append(handles, h)
if len(handles) == defaultBatchCnt {
Expand Down
8 changes: 2 additions & 6 deletions ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package ddl

import (
"bytes"
"encoding/hex"
"fmt"
"math"
Expand Down Expand Up @@ -155,7 +154,7 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error {
finish := true
dr.keys = dr.keys[:0]
err := kv.RunInNewTxn(dr.d.store, false, func(txn kv.Transaction) error {
iter, err := txn.Iter(oldStartKey, nil)
iter, err := txn.Iter(oldStartKey, r.EndKey)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -165,10 +164,7 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error {
if !iter.Valid() {
break
}
finish = bytes.Compare(iter.Key(), r.EndKey) >= 0
if finish {
break
}
finish = false
dr.keys = append(dr.keys, iter.Key().Clone())
newStartKey = iter.Key().Next()

Expand Down
22 changes: 18 additions & 4 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgInde
// taskDone means that the added handle is out of taskRange.endHandle.
taskDone := false
oprStartTime := time.Now()
err := iterateSnapshotRows(w.sessCtx.GetStore(), w.table, txn.StartTS(), taskRange.startHandle,
err := iterateSnapshotRows(w.sessCtx.GetStore(), w.table, txn.StartTS(), taskRange.startHandle, taskRange.endHandle, taskRange.endIncluded,
func(handle int64, recordKey kv.Key, rawRow []byte) (bool, error) {
oprEndTime := time.Now()
w.logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotRows in fetchRowColVals", 0)
Expand Down Expand Up @@ -1000,16 +1000,30 @@ func allocateIndexID(tblInfo *model.TableInfo) int64 {
// recordIterFunc is used for low-level record iteration.
type recordIterFunc func(h int64, rowKey kv.Key, rawRecord []byte) (more bool, err error)

func iterateSnapshotRows(store kv.Storage, t table.Table, version uint64, seekHandle int64, fn recordIterFunc) error {
func iterateSnapshotRows(store kv.Storage, t table.Table, version uint64, startHandle int64, endHandle int64, endIncluded bool, fn recordIterFunc) error {
ver := kv.Version{Ver: version}

snap, err := store.GetSnapshot(ver)
snap.SetPriority(kv.PriorityLow)
if err != nil {
return errors.Trace(err)
}
firstKey := t.RecordKey(seekHandle)
it, err := snap.Iter(firstKey, nil)
firstKey := t.RecordKey(startHandle)

// Calculate the exclusive upper bound
var upperBound kv.Key
if endIncluded {
if endHandle == math.MaxInt64 {
upperBound = t.RecordKey(endHandle).PrefixNext()
} else {
// PrefixNext is time costing. Try to avoid it if possible.
upperBound = t.RecordKey(endHandle + 1)
}
} else {
upperBound = t.RecordKey(endHandle)
}

it, err := snap.Iter(firstKey, upperBound)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (d *ddl) getReorgInfo(t *meta.Meta, job *model.Job, tbl table.Table) (*reor
}

// Get the first handle of this table.
err = iterateSnapshotRows(d.store, tbl, ver.Ver, math.MinInt64,
err = iterateSnapshotRows(d.store, tbl, ver.Ver, math.MinInt64, math.MaxInt64, true,
func(h int64, rowKey kv.Key, rawRecord []byte) (bool, error) {
info.Handle = h
return false, nil
Expand Down
6 changes: 6 additions & 0 deletions store/tikv/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ func (s *Scanner) Next() error {
continue
}
}
current := s.cache[s.idx]
if len(s.endKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.endKey)) >= 0 {
s.eof = true
s.Close()
return nil
}
if err := s.resolveCurrentLock(bo); err != nil {
s.Close()
return errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion structure/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (t *TxStructure) HClear(key []byte) error {

func (t *TxStructure) iterateHash(key []byte, fn func(k []byte, v []byte) error) error {
dataPrefix := t.hashDataKeyPrefix(key)
it, err := t.reader.Iter(dataPrefix, nil)
it, err := t.reader.Iter(dataPrefix, dataPrefix.PrefixNext())
if err != nil {
return errors.Trace(err)
}
Expand Down
8 changes: 5 additions & 3 deletions table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (c *index) Delete(sc *stmtctx.StatementContext, m kv.Mutator, indexedValues

// Drop removes the KV index from store.
func (c *index) Drop(rm kv.RetrieverMutator) error {
it, err := rm.Iter(c.prefix, nil)
it, err := rm.Iter(c.prefix, c.prefix.PrefixNext())
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -261,7 +261,8 @@ func (c *index) Seek(sc *stmtctx.StatementContext, r kv.Retriever, indexedValues
if err != nil {
return nil, false, errors.Trace(err)
}
it, err := r.Iter(key, nil)
upperBound := c.prefix.PrefixNext()
it, err := r.Iter(key, upperBound)
if err != nil {
return nil, false, errors.Trace(err)
}
Expand All @@ -275,7 +276,8 @@ func (c *index) Seek(sc *stmtctx.StatementContext, r kv.Retriever, indexedValues

// SeekFirst returns an iterator which points to the first entry of the KV index.
func (c *index) SeekFirst(r kv.Retriever) (iter table.IndexIterator, err error) {
it, err := r.Iter(c.prefix, nil)
upperBound := c.prefix.PrefixNext()
it, err := r.Iter(c.prefix, upperBound)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
6 changes: 3 additions & 3 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,8 @@ func (t *Table) buildIndexForRow(ctx sessionctx.Context, rm kv.RetrieverMutator,
// IterRecords implements table.Table IterRecords interface.
func (t *Table) IterRecords(ctx sessionctx.Context, startKey kv.Key, cols []*table.Column,
fn table.RecordIterFunc) error {
it, err := ctx.Txn().Iter(startKey, nil)
prefix := t.RecordPrefix()
it, err := ctx.Txn().Iter(startKey, prefix.PrefixNext())
if err != nil {
return errors.Trace(err)
}
Expand All @@ -717,7 +718,6 @@ func (t *Table) IterRecords(ctx sessionctx.Context, startKey kv.Key, cols []*tab
for _, col := range cols {
colMap[col.ID] = &col.FieldType
}
prefix := t.RecordPrefix()
defaultVals := make([]types.Datum, len(cols))
for it.Valid() && it.Key().HasPrefix(prefix) {
// first kv pair is row lock information.
Expand Down Expand Up @@ -831,7 +831,7 @@ func (t *Table) RebaseAutoID(ctx sessionctx.Context, newBase int64, isSetStep bo
// Seek implements table.Table Seek interface.
func (t *Table) Seek(ctx sessionctx.Context, h int64) (int64, bool, error) {
seekKey := tablecodec.EncodeRowKeyWithHandle(t.ID, h)
iter, err := ctx.Txn().Iter(seekKey, nil)
iter, err := ctx.Txn().Iter(seekKey, t.RecordPrefix().PrefixNext())
if !iter.Valid() || !iter.Key().HasPrefix(t.RecordPrefix()) {
// No more records in the table, skip to the end.
return 0, false, nil
Expand Down
6 changes: 4 additions & 2 deletions util/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,10 @@ func rowWithCols(sessCtx sessionctx.Context, txn kv.Retriever, t table.Table, h

func iterRecords(sessCtx sessionctx.Context, retriever kv.Retriever, t table.Table, startKey kv.Key, cols []*table.Column,
fn table.RecordIterFunc) error {
it, err := retriever.Iter(startKey, nil)
prefix := t.RecordPrefix()
keyUpperBound := prefix.PrefixNext()

it, err := retriever.Iter(startKey, keyUpperBound)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -557,7 +560,6 @@ func iterRecords(sessCtx sessionctx.Context, retriever kv.Retriever, t table.Tab
for _, col := range cols {
colMap[col.ID] = &col.FieldType
}
prefix := t.RecordPrefix()
for it.Valid() && it.Key().HasPrefix(prefix) {
// first kv pair is row lock information.
// TODO: check valid lock
Expand Down
4 changes: 2 additions & 2 deletions util/prefix_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

// ScanMetaWithPrefix scans metadata with the prefix.
func ScanMetaWithPrefix(retriever kv.Retriever, prefix kv.Key, filter func(kv.Key, []byte) bool) error {
iter, err := retriever.Iter(prefix, nil)
iter, err := retriever.Iter(prefix, prefix.PrefixNext())
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -56,7 +56,7 @@ func ScanMetaWithPrefix(retriever kv.Retriever, prefix kv.Key, filter func(kv.Ke
// DelKeyWithPrefix deletes keys with prefix.
func DelKeyWithPrefix(rm kv.RetrieverMutator, prefix kv.Key) error {
var keys []kv.Key
iter, err := rm.Iter(prefix, nil)
iter, err := rm.Iter(prefix, prefix.PrefixNext())
if err != nil {
return errors.Trace(err)
}
Expand Down

0 comments on commit 51b4882

Please sign in to comment.