From b4fdaf35ac716cfb16620d26cb6e1ad1604a279e Mon Sep 17 00:00:00 2001 From: Shafreeck Sea Date: Thu, 30 Aug 2018 23:37:03 +0800 Subject: [PATCH] Add keyonly support for seek (#7419) --- kv/kv.go | 2 ++ store/tikv/lock_test.go | 16 ++++++++++++++++ store/tikv/scan.go | 30 ++++++++++++++++++------------ store/tikv/scan_test.go | 37 +++++++++++++++++++++++++++++++++++++ store/tikv/snapshot.go | 1 + store/tikv/txn.go | 2 ++ 6 files changed, 76 insertions(+), 12 deletions(-) diff --git a/kv/kv.go b/kv/kv.go index 237fbbe5f6f75..1c9230ed65a6a 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -46,6 +46,8 @@ const ( // BypassLatch option tells 2PC commit to bypass latches, it would be true when the // transaction is not conflict-retryable, for example: 'select for update', 'load data'. BypassLatch + // KeyOnly retrieve only keys, it can be used in scan now. + KeyOnly ) // Priority value for transaction priority. diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index f544053f6dbb6..20bedba474f10 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -126,6 +126,22 @@ func (s *testLockSuite) TestScanLockResolveWithSeek(c *C) { } } +func (s *testLockSuite) TestScanLockResolveWithSeekKeyOnly(c *C) { + s.putAlphabets(c) + s.prepareAlphabetLocks(c) + + txn, err := s.store.Begin() + c.Assert(err, IsNil) + txn.SetOption(kv.KeyOnly, true) + iter, err := txn.Seek([]byte("a")) + c.Assert(err, IsNil) + for ch := byte('a'); ch <= byte('z'); ch++ { + c.Assert(iter.Valid(), IsTrue) + c.Assert([]byte(iter.Key()), BytesEquals, []byte{ch}) + c.Assert(iter.Next(), IsNil) + } +} + func (s *testLockSuite) TestScanLockResolveWithBatchGet(c *C) { s.putAlphabets(c) s.prepareAlphabetLocks(c) diff --git a/store/tikv/scan.go b/store/tikv/scan.go index 0ccf297666ee6..b37de8c9db394 100644 --- a/store/tikv/scan.go +++ b/store/tikv/scan.go @@ -94,13 +94,22 @@ func (s *Scanner) Next() error { continue } } - if err := s.resolveCurrentLock(bo); err != nil { - s.Close() - return errors.Trace(err) - } - if len(s.Value()) == 0 { - // nil stands for NotExist, go to next KV pair. - continue + + current := s.cache[s.idx] + // Try to resolve the lock + if current.GetError() != nil { + // 'current' would be modified if the lock being resolved + if err := s.resolveCurrentLock(bo, current); err != nil { + s.Close() + return errors.Trace(err) + } + + // The check here does not violate the KeyOnly semantic, because current's value + // is filled by resolveCurrentLock which fetches the value by snapshot.get, so an empty + // value stands for NotExist + if len(current.Value) == 0 { + continue + } } return nil } @@ -115,11 +124,7 @@ func (s *Scanner) startTS() uint64 { return s.snapshot.version.Ver } -func (s *Scanner) resolveCurrentLock(bo *Backoffer) error { - current := s.cache[s.idx] - if current.GetError() == nil { - return nil - } +func (s *Scanner) resolveCurrentLock(bo *Backoffer, current *pb.KvPair) error { val, err := s.snapshot.get(bo, kv.Key(current.Key)) if err != nil { return errors.Trace(err) @@ -144,6 +149,7 @@ func (s *Scanner) getData(bo *Backoffer) error { StartKey: s.nextStartKey, Limit: uint32(s.batchSize), Version: s.startTS(), + KeyOnly: s.snapshot.keyOnly, }, Context: pb.Context{ Priority: s.snapshot.priority, diff --git a/store/tikv/scan_test.go b/store/tikv/scan_test.go index 8cacd1e1dd26c..e4897bc2de2b7 100644 --- a/store/tikv/scan_test.go +++ b/store/tikv/scan_test.go @@ -18,6 +18,7 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/tidb/kv" "golang.org/x/net/context" ) @@ -91,5 +92,41 @@ func (s *testScanSuite) TestSeek(c *C) { } scan.Next() c.Assert(scan.Valid(), IsFalse) + + txn3 := s.beginTxn(c) + txn3.SetOption(kv.KeyOnly, true) + scan, err = txn3.Seek(encodeKey(s.prefix, "")) + c.Assert(err, IsNil) + + for i := 0; i < rowNum; i++ { + k := scan.Key() + c.Assert([]byte(k), BytesEquals, encodeKey(s.prefix, s08d("key", i))) + // Because newScan return first item without calling scan.Next() just like go-hbase, + // for-loop count will decrease 1. + if i < rowNum-1 { + scan.Next() + } + } + scan.Next() + c.Assert(scan.Valid(), IsFalse) + + // Restore KeyOnly to false + txn3.SetOption(kv.KeyOnly, false) + scan, err = txn3.Seek(encodeKey(s.prefix, "")) + c.Assert(err, IsNil) + + for i := 0; i < rowNum; i++ { + k := scan.Key() + c.Assert([]byte(k), BytesEquals, encodeKey(s.prefix, s08d("key", i))) + v := scan.Value() + c.Assert(v, BytesEquals, valueBytes(i)) + // Because newScan return first item without calling scan.Next() just like go-hbase, + // for-loop count will decrease 1. + if i < rowNum-1 { + scan.Next() + } + } + scan.Next() + c.Assert(scan.Valid(), IsFalse) } } diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 986dd2dd58943..c51983495cde3 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -47,6 +47,7 @@ type tikvSnapshot struct { priority pb.CommandPri notFillCache bool syncLog bool + keyOnly bool vars *kv.Variables } diff --git a/store/tikv/txn.go b/store/tikv/txn.go index a39a0d110ceaa..5fa3cf28d0f96 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -150,6 +150,8 @@ func (txn *tikvTxn) SetOption(opt kv.Option, val interface{}) { txn.snapshot.notFillCache = val.(bool) case kv.SyncLog: txn.snapshot.syncLog = val.(bool) + case kv.KeyOnly: + txn.snapshot.keyOnly = val.(bool) } }