Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Make use of the upperBound of ticlient's kv_scan interface to ensure no overbound scan will happen #8081

Merged
merged 22 commits into from
Nov 9, 2018
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1891,7 +1891,7 @@ func (s *testDBSuite) TestTruncateTable(c *C) {
hasOldTableData := true
for i := 0; i < waitForCleanDataRound; i++ {
err = kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error {
it, err1 := txn.Seek(tablePrefix)
it, err1 := txn.Iter(tablePrefix, nil)
if err1 != nil {
return err1
}
Expand Down Expand Up @@ -2817,7 +2817,7 @@ func (s *testDBSuite) TestAlterTableDropPartition(c *C) {

s.tk.MustExec("drop table if exists tr;")
s.tk.MustExec(` create table tr(
id int, name varchar(50),
id int, name varchar(50),
purchased date
)
partition by range( year(purchased) ) (
Expand Down Expand Up @@ -2907,7 +2907,7 @@ func checkPartitionDelRangeDone(c *C, s *testDBSuite, partitionPrefix kv.Key) bo
hasOldPartitionData := true
for i := 0; i < waitForCleanDataRound; i++ {
err := kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error {
it, err := txn.Seek(partitionPrefix)
it, err := txn.Iter(partitionPrefix, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -2957,7 +2957,7 @@ func (s *testDBSuite) TestTruncatePartitionAndDropTable(c *C) {
// Test truncate table partition.
s.tk.MustExec("drop table if exists t3;")
s.tk.MustExec(`create table t3(
id int, name varchar(50),
id int, name varchar(50),
purchased date
)
partition by range( year(purchased) ) (
Expand Down Expand Up @@ -2995,7 +2995,7 @@ func (s *testDBSuite) TestTruncatePartitionAndDropTable(c *C) {
// Test drop table partition.
s.tk.MustExec("drop table if exists t4;")
s.tk.MustExec(`create table t4(
id int, name varchar(50),
id int, name varchar(50),
purchased date
)
partition by range( year(purchased) ) (
Expand Down
8 changes: 2 additions & 6 deletions ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package ddl

import (
"bytes"
"encoding/hex"
"fmt"
"math"
Expand Down Expand Up @@ -154,7 +153,7 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error {
finish := true
dr.keys = dr.keys[:0]
err := kv.RunInNewTxn(dr.store, false, func(txn kv.Transaction) error {
iter, err := txn.Seek(oldStartKey)
iter, err := txn.Iter(oldStartKey, r.EndKey)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -164,10 +163,7 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error {
if !iter.Valid() {
break
}
finish = bytes.Compare(iter.Key(), r.EndKey) >= 0
if finish {
break
}
finish = false
dr.keys = append(dr.keys, iter.Key().Clone())
newStartKey = iter.Key().Next()

Expand Down
22 changes: 18 additions & 4 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgInde
// taskDone means that the added handle is out of taskRange.endHandle.
taskDone := false
oprStartTime := startTime
err := iterateSnapshotRows(w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startHandle,
err := iterateSnapshotRows(w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startHandle, taskRange.endHandle, taskRange.endIncluded,
func(handle int64, recordKey kv.Key, rawRow []byte) (bool, error) {
oprEndTime := time.Now()
w.logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotRows in fetchRowColVals", 0)
Expand Down Expand Up @@ -1207,16 +1207,30 @@ func allocateIndexID(tblInfo *model.TableInfo) int64 {
// recordIterFunc is used for low-level record iteration.
type recordIterFunc func(h int64, rowKey kv.Key, rawRecord []byte) (more bool, err error)

func iterateSnapshotRows(store kv.Storage, priority int, t table.Table, version uint64, seekHandle int64, fn recordIterFunc) error {
func iterateSnapshotRows(store kv.Storage, priority int, t table.Table, version uint64, startHandle int64, endHandle int64, endIncluded bool, fn recordIterFunc) error {
ver := kv.Version{Ver: version}

snap, err := store.GetSnapshot(ver)
snap.SetPriority(priority)
if err != nil {
return errors.Trace(err)
}
firstKey := t.RecordKey(seekHandle)
it, err := snap.Seek(firstKey)
firstKey := t.RecordKey(startHandle)

// Calculate the exclusive upper bound
var upperBound kv.Key
if endIncluded {
if endHandle == math.MaxInt64 {
upperBound = t.RecordKey(endHandle).PrefixNext()
} else {
// PrefixNext is time costing. Try to avoid it if possible.
upperBound = t.RecordKey(endHandle + 1)
}
} else {
upperBound = t.RecordKey(endHandle)
}

it, err := snap.Iter(firstKey, upperBound)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func getTableRange(d *ddlCtx, tbl table.PhysicalTable, snapshotVer uint64, prior
startHandle = math.MinInt64
endHandle = math.MaxInt64
// Get the start handle of this partition.
err = iterateSnapshotRows(d.store, priority, tbl, snapshotVer, math.MinInt64,
err = iterateSnapshotRows(d.store, priority, tbl, snapshotVer, math.MinInt64, math.MaxInt64, true,
func(h int64, rowKey kv.Key, rawRecord []byte) (bool, error) {
startHandle = h
return false, nil
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ require (
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9 // indirect
github.com/pingcap/errors v0.11.0
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20181028030329-855d2192cdc7
github.com/pingcap/kvproto v0.0.0-20181105061835-1b5d69cd1d26
github.com/pingcap/parser v0.0.0-20181102070703-4acd198f5092
github.com/pingcap/pd v2.1.0-rc.4+incompatible
github.com/pingcap/tidb-tools v0.0.0-20181101090416-cfac1096162e
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ github.com/pingcap/kvproto v0.0.0-20180930052200-fae11119f066 h1:ulo0ph8sxCzY3GY
github.com/pingcap/kvproto v0.0.0-20180930052200-fae11119f066/go.mod h1:0gwbe1F2iBIjuQ9AH0DbQhL+Dpr5GofU8fgYyXk+ykk=
github.com/pingcap/kvproto v0.0.0-20181028030329-855d2192cdc7 h1:CYssSnPvf90ZSbFdZpsZGSI7y+drG1EfKxqTOnKnHb0=
github.com/pingcap/kvproto v0.0.0-20181028030329-855d2192cdc7/go.mod h1:0gwbe1F2iBIjuQ9AH0DbQhL+Dpr5GofU8fgYyXk+ykk=
github.com/pingcap/kvproto v0.0.0-20181105061835-1b5d69cd1d26 h1:JK4VLNYbSn36QSbCnqALi2ySXdH0DfcMssT/zmLf4Ls=
github.com/pingcap/kvproto v0.0.0-20181105061835-1b5d69cd1d26/go.mod h1:0gwbe1F2iBIjuQ9AH0DbQhL+Dpr5GofU8fgYyXk+ykk=
github.com/pingcap/parser v0.0.0-20181024082006-53ac409ed043 h1:P9Osi8lei5j2fiRgsBi2Wch7qe4a3yWUOsS5vSan/JU=
github.com/pingcap/parser v0.0.0-20181024082006-53ac409ed043/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/parser v0.0.0-20181102070703-4acd198f5092 h1:vGjjf7fhuaO9udn6QEFzvsNJDwVxFmdJvIJhCdCNe/E=
Expand Down
16 changes: 8 additions & 8 deletions kv/buffer_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,26 +74,26 @@ func (s *BufferStore) Get(k Key) ([]byte, error) {
return val, nil
}

// Seek implements the Retriever interface.
func (s *BufferStore) Seek(k Key) (Iterator, error) {
bufferIt, err := s.MemBuffer.Seek(k)
// Iter implements the Retriever interface.
func (s *BufferStore) Iter(k Key, upperBound Key) (Iterator, error) {
bufferIt, err := s.MemBuffer.Iter(k, upperBound)
if err != nil {
return nil, errors.Trace(err)
}
retrieverIt, err := s.r.Seek(k)
retrieverIt, err := s.r.Iter(k, upperBound)
if err != nil {
return nil, errors.Trace(err)
}
return NewUnionIter(bufferIt, retrieverIt, false)
}

// SeekReverse implements the Retriever interface.
func (s *BufferStore) SeekReverse(k Key) (Iterator, error) {
bufferIt, err := s.MemBuffer.SeekReverse(k)
// IterReverse implements the Retriever interface.
func (s *BufferStore) IterReverse(k Key) (Iterator, error) {
bufferIt, err := s.MemBuffer.IterReverse(k)
if err != nil {
return nil, errors.Trace(err)
}
retrieverIt, err := s.r.SeekReverse(k)
retrieverIt, err := s.r.IterReverse(k)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion kv/buffer_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (s testBufferStoreSuite) TestSaveTo(c *C) {
err := bs.SaveTo(mutator)
c.Check(err, IsNil)

iter, err := mutator.Seek(nil)
iter, err := mutator.Iter(nil, nil)
c.Check(err, IsNil)
for iter.Valid() {
cmp := bytes.Compare(iter.Key(), iter.Value())
Expand Down
10 changes: 6 additions & 4 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,17 @@ type Retriever interface {
// Get gets the value for key k from kv store.
// If corresponding kv pair does not exist, it returns nil and ErrNotExist.
Get(k Key) ([]byte, error)
// Seek creates an Iterator positioned on the first entry that k <= entry's key.
// Iter creates an Iterator positioned on the first entry that k <= entry's key.
// If such entry is not found, it returns an invalid Iterator with no error.
// It yields only keys that < upperBound. If upperBound is nil, it means the upperBound is unbounded.
// The Iterator must be Closed after use.
Seek(k Key) (Iterator, error)
Iter(k Key, upperBound Key) (Iterator, error)

// SeekReverse creates a reversed Iterator positioned on the first entry which key is less than k.
// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
// The returned iterator will iterate from greater key to smaller key.
// If k is nil, the returned iterator will be positioned at the last key.
SeekReverse(k Key) (Iterator, error)
// TODO: Add lower bound limit
IterReverse(k Key) (Iterator, error)
}

// Mutator is the interface wraps the basic Set and Delete methods.
Expand Down
20 changes: 10 additions & 10 deletions kv/mem_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func valToStr(c *C, iter Iterator) string {
func checkNewIterator(c *C, buffer MemBuffer) {
for i := startIndex; i < testCount; i++ {
val := encodeInt(i * indexStep)
iter, err := buffer.Seek(val)
iter, err := buffer.Iter(val, nil)
c.Assert(err, IsNil)
c.Assert([]byte(iter.Key()), BytesEquals, val)
c.Assert(decodeInt([]byte(valToStr(c, iter))), Equals, i*indexStep)
Expand All @@ -86,7 +86,7 @@ func checkNewIterator(c *C, buffer MemBuffer) {
// Test iterator Next()
for i := startIndex; i < testCount-1; i++ {
val := encodeInt(i * indexStep)
iter, err := buffer.Seek(val)
iter, err := buffer.Iter(val, nil)
c.Assert(err, IsNil)
c.Assert([]byte(iter.Key()), BytesEquals, val)
c.Assert(valToStr(c, iter), Equals, string(val))
Expand All @@ -102,15 +102,15 @@ func checkNewIterator(c *C, buffer MemBuffer) {
}

// Non exist and beyond maximum seek test
iter, err := buffer.Seek(encodeInt(testCount * indexStep))
iter, err := buffer.Iter(encodeInt(testCount*indexStep), nil)
c.Assert(err, IsNil)
c.Assert(iter.Valid(), IsFalse)

// Non exist but between existing keys seek test,
// it returns the smallest key that larger than the one we are seeking
inBetween := encodeInt((testCount-1)*indexStep - 1)
last := encodeInt((testCount - 1) * indexStep)
iter, err = buffer.Seek(inBetween)
iter, err = buffer.Iter(inBetween, nil)
c.Assert(err, IsNil)
c.Assert(iter.Valid(), IsTrue)
c.Assert([]byte(iter.Key()), Not(BytesEquals), inBetween)
Expand Down Expand Up @@ -140,7 +140,7 @@ func (s *testKVSuite) TestNewIterator(c *C) {
defer testleak.AfterTest(c)()
for _, buffer := range s.bs {
// should be invalid
iter, err := buffer.Seek(nil)
iter, err := buffer.Iter(nil, nil)
c.Assert(err, IsNil)
c.Assert(iter.Valid(), IsFalse)

Expand All @@ -155,7 +155,7 @@ func (s *testKVSuite) TestIterNextUntil(c *C) {
buffer := NewMemDbBuffer(DefaultTxnMembufCap)
insertData(c, buffer)

iter, err := buffer.Seek(nil)
iter, err := buffer.Iter(nil, nil)
c.Assert(err, IsNil)

err = NextUntil(iter, func(k Key) bool {
Expand All @@ -168,7 +168,7 @@ func (s *testKVSuite) TestIterNextUntil(c *C) {
func (s *testKVSuite) TestBasicNewIterator(c *C) {
defer testleak.AfterTest(c)()
for _, buffer := range s.bs {
it, err := buffer.Seek([]byte("2"))
it, err := buffer.Iter([]byte("2"), nil)
c.Assert(err, IsNil)
c.Assert(it.Valid(), IsFalse)
}
Expand All @@ -193,15 +193,15 @@ func (s *testKVSuite) TestNewIteratorMin(c *C) {
}

cnt := 0
it, err := buffer.Seek(nil)
it, err := buffer.Iter(nil, nil)
c.Assert(err, IsNil)
for it.Valid() {
cnt++
it.Next()
}
c.Assert(cnt, Equals, 6)

it, err = buffer.Seek([]byte("DATA_test_main_db_tbl_tbl_test_record__00000000000000000000"))
it, err = buffer.Iter([]byte("DATA_test_main_db_tbl_tbl_test_record__00000000000000000000"), nil)
c.Assert(err, IsNil)
c.Assert(string(it.Key()), Equals, "DATA_test_main_db_tbl_tbl_test_record__00000000000000000001")
}
Expand Down Expand Up @@ -294,7 +294,7 @@ func benchIterator(b *testing.B, buffer MemBuffer) {
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
iter, err := buffer.Seek(nil)
iter, err := buffer.Iter(nil, nil)
if err != nil {
b.Error(err)
}
Expand Down
16 changes: 6 additions & 10 deletions kv/memdb_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,10 @@ func NewMemDbBuffer(cap int) MemBuffer {
}
}

// Seek creates an Iterator.
func (m *memDbBuffer) Seek(k Key) (Iterator, error) {
var i Iterator
if k == nil {
i = &memDbIter{iter: m.db.NewIterator(&util.Range{}), reverse: false}
} else {
i = &memDbIter{iter: m.db.NewIterator(&util.Range{Start: []byte(k)}), reverse: false}
}
// Iter creates an Iterator.
func (m *memDbBuffer) Iter(k Key, upperBound Key) (Iterator, error) {
i := &memDbIter{iter: m.db.NewIterator(&util.Range{Start: []byte(k), Limit: []byte(upperBound)}), reverse: false}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about k or upperBound is nil?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ok. If k is nil then []byte(k) is still nil.


err := i.Next()
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -69,7 +65,7 @@ func (m *memDbBuffer) SetCap(cap int) {

}

func (m *memDbBuffer) SeekReverse(k Key) (Iterator, error) {
func (m *memDbBuffer) IterReverse(k Key) (Iterator, error) {
var i *memDbIter
if k == nil {
i = &memDbIter{iter: m.db.NewIterator(&util.Range{}), reverse: true}
Expand Down Expand Up @@ -161,7 +157,7 @@ func (i *memDbIter) Close() {

// WalkMemBuffer iterates all buffered kv pairs in memBuf
func WalkMemBuffer(memBuf MemBuffer, f func(k Key, v []byte) error) error {
iter, err := memBuf.Seek(nil)
iter, err := memBuf.Iter(nil, nil)
if err != nil {
return errors.Trace(err)
}
Expand Down
12 changes: 6 additions & 6 deletions kv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ func (t *mockTxn) Get(k Key) ([]byte, error) {
return nil, nil
}

func (t *mockTxn) Seek(k Key) (Iterator, error) {
func (t *mockTxn) Iter(k Key, upperBound Key) (Iterator, error) {
return nil, nil
}

func (t *mockTxn) SeekReverse(k Key) (Iterator, error) {
func (t *mockTxn) IterReverse(k Key) (Iterator, error) {
return nil, nil
}

Expand Down Expand Up @@ -211,10 +211,10 @@ func (s *mockSnapshot) BatchGet(keys []Key) (map[string][]byte, error) {
return m, nil
}

func (s *mockSnapshot) Seek(k Key) (Iterator, error) {
return s.store.Seek(k)
func (s *mockSnapshot) Iter(k Key, upperBound Key) (Iterator, error) {
return s.store.Iter(k, upperBound)
}

func (s *mockSnapshot) SeekReverse(k Key) (Iterator, error) {
return s.store.SeekReverse(k)
func (s *mockSnapshot) IterReverse(k Key) (Iterator, error) {
return s.store.IterReverse(k)
}
4 changes: 2 additions & 2 deletions kv/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func (s testMockSuite) TestInterface(c *C) {
if transaction.IsReadOnly() {
transaction.Get(Key("lock"))
transaction.Set(Key("lock"), []byte{})
transaction.Seek(Key("lock"))
transaction.SeekReverse(Key("lock"))
transaction.Iter(Key("lock"), nil)
transaction.IterReverse(Key("lock"))
}
transaction.Commit(context.Background())

Expand Down
8 changes: 4 additions & 4 deletions kv/union_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,18 @@ func (lmb *lazyMemBuffer) Delete(k Key) error {
return lmb.mb.Delete(k)
}

func (lmb *lazyMemBuffer) Seek(k Key) (Iterator, error) {
func (lmb *lazyMemBuffer) Iter(k Key, upperBound Key) (Iterator, error) {
if lmb.mb == nil {
return invalidIterator{}, nil
}
return lmb.mb.Seek(k)
return lmb.mb.Iter(k, upperBound)
}

func (lmb *lazyMemBuffer) SeekReverse(k Key) (Iterator, error) {
func (lmb *lazyMemBuffer) IterReverse(k Key) (Iterator, error) {
if lmb.mb == nil {
return invalidIterator{}, nil
}
return lmb.mb.SeekReverse(k)
return lmb.mb.IterReverse(k)
}

func (lmb *lazyMemBuffer) Size() int {
Expand Down
Loading