From 21330da7a20177d490ae75e1689cfd988b7645f2 Mon Sep 17 00:00:00 2001 From: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com> Date: Mon, 12 Nov 2018 15:23:59 +0800 Subject: [PATCH] *: Make use of the upperBound of ticlient's kv_scan interface to ensure no overbound scan will happen (#8081) (#8257) --- ddl/delete_range.go | 8 ++------ ddl/index.go | 22 ++++++++++++++++++---- ddl/reorg.go | 2 +- store/tikv/scan.go | 5 +++++ structure/hash.go | 2 +- table/tables/index.go | 8 +++++--- table/tables/tables.go | 6 +++--- util/admin/admin.go | 6 ++++-- util/prefix_helper.go | 4 ++-- 9 files changed, 41 insertions(+), 22 deletions(-) diff --git a/ddl/delete_range.go b/ddl/delete_range.go index fba35867f37b0..4b12de2d2f419 100644 --- a/ddl/delete_range.go +++ b/ddl/delete_range.go @@ -14,7 +14,6 @@ package ddl import ( - "bytes" "encoding/hex" "fmt" "math" @@ -154,7 +153,7 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error { finish := true dr.keys = dr.keys[:0] err := kv.RunInNewTxn(dr.store, false, func(txn kv.Transaction) error { - iter, err := txn.Iter(oldStartKey, nil) + iter, err := txn.Iter(oldStartKey, r.EndKey) if err != nil { return errors.Trace(err) } @@ -164,10 +163,7 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error { if !iter.Valid() { break } - finish = bytes.Compare(iter.Key(), r.EndKey) >= 0 - if finish { - break - } + finish = false dr.keys = append(dr.keys, iter.Key().Clone()) newStartKey = iter.Key().Next() diff --git a/ddl/index.go b/ddl/index.go index 92dbd2723c5ad..5eedd76f3dbc0 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -619,7 +619,7 @@ func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgInde // taskDone means that the added handle is out of taskRange.endHandle. taskDone := false oprStartTime := startTime - err := iterateSnapshotRows(w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startHandle, + err := iterateSnapshotRows(w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startHandle, taskRange.endHandle, taskRange.endIncluded, func(handle int64, recordKey kv.Key, rawRow []byte) (bool, error) { oprEndTime := time.Now() w.logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotRows in fetchRowColVals", 0) @@ -1183,7 +1183,7 @@ func allocateIndexID(tblInfo *model.TableInfo) int64 { // recordIterFunc is used for low-level record iteration. type recordIterFunc func(h int64, rowKey kv.Key, rawRecord []byte) (more bool, err error) -func iterateSnapshotRows(store kv.Storage, priority int, t table.Table, version uint64, seekHandle int64, fn recordIterFunc) error { +func iterateSnapshotRows(store kv.Storage, priority int, t table.Table, version uint64, startHandle int64, endHandle int64, endIncluded bool, fn recordIterFunc) error { ver := kv.Version{Ver: version} snap, err := store.GetSnapshot(ver) @@ -1191,8 +1191,22 @@ func iterateSnapshotRows(store kv.Storage, priority int, t table.Table, version if err != nil { return errors.Trace(err) } - firstKey := t.RecordKey(seekHandle) - it, err := snap.Iter(firstKey, nil) + firstKey := t.RecordKey(startHandle) + + // Calculate the exclusive upper bound + var upperBound kv.Key + if endIncluded { + if endHandle == math.MaxInt64 { + upperBound = t.RecordKey(endHandle).PrefixNext() + } else { + // PrefixNext is time costing. Try to avoid it if possible. + upperBound = t.RecordKey(endHandle + 1) + } + } else { + upperBound = t.RecordKey(endHandle) + } + + it, err := snap.Iter(firstKey, upperBound) if err != nil { return errors.Trace(err) } diff --git a/ddl/reorg.go b/ddl/reorg.go index 10cf2d85b9064..035146c9976b1 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -304,7 +304,7 @@ func getTableRange(d *ddlCtx, tbl table.PhysicalTable, snapshotVer uint64, prior startHandle = math.MinInt64 endHandle = math.MaxInt64 // Get the start handle of this partition. - err = iterateSnapshotRows(d.store, priority, tbl, snapshotVer, math.MinInt64, + err = iterateSnapshotRows(d.store, priority, tbl, snapshotVer, math.MinInt64, math.MaxInt64, true, func(h int64, rowKey kv.Key, rawRecord []byte) (bool, error) { startHandle = h return false, nil diff --git a/store/tikv/scan.go b/store/tikv/scan.go index c3d5c555ba37b..1ce05ecc6089d 100644 --- a/store/tikv/scan.go +++ b/store/tikv/scan.go @@ -98,6 +98,11 @@ func (s *Scanner) Next() error { } current := s.cache[s.idx] + if len(s.endKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.endKey)) >= 0 { + s.eof = true + s.Close() + return nil + } // Try to resolve the lock if current.GetError() != nil { // 'current' would be modified if the lock being resolved diff --git a/structure/hash.go b/structure/hash.go index bfa13118cce8c..e0f7b32cdb726 100644 --- a/structure/hash.go +++ b/structure/hash.go @@ -238,7 +238,7 @@ func (t *TxStructure) HClear(key []byte) error { func (t *TxStructure) iterateHash(key []byte, fn func(k []byte, v []byte) error) error { dataPrefix := t.hashDataKeyPrefix(key) - it, err := t.reader.Iter(dataPrefix, nil) + it, err := t.reader.Iter(dataPrefix, dataPrefix.PrefixNext()) if err != nil { return errors.Trace(err) } diff --git a/table/tables/index.go b/table/tables/index.go index c36d55c698cb1..f4580d0e93978 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -240,7 +240,7 @@ func (c *index) Delete(sc *stmtctx.StatementContext, m kv.Mutator, indexedValues // Drop removes the KV index from store. func (c *index) Drop(rm kv.RetrieverMutator) error { - it, err := rm.Iter(c.prefix, nil) + it, err := rm.Iter(c.prefix, c.prefix.PrefixNext()) if err != nil { return errors.Trace(err) } @@ -270,7 +270,8 @@ func (c *index) Seek(sc *stmtctx.StatementContext, r kv.Retriever, indexedValues return nil, false, errors.Trace(err) } - it, err := r.Iter(key, nil) + upperBound := c.prefix.PrefixNext() + it, err := r.Iter(key, upperBound) if err != nil { return nil, false, errors.Trace(err) } @@ -284,7 +285,8 @@ func (c *index) Seek(sc *stmtctx.StatementContext, r kv.Retriever, indexedValues // SeekFirst returns an iterator which points to the first entry of the KV index. func (c *index) SeekFirst(r kv.Retriever) (iter table.IndexIterator, err error) { - it, err := r.Iter(c.prefix, nil) + upperBound := c.prefix.PrefixNext() + it, err := r.Iter(c.prefix, upperBound) if err != nil { return nil, errors.Trace(err) } diff --git a/table/tables/tables.go b/table/tables/tables.go index 4a75d3f7e84d3..c24dd25d7f1d3 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -766,7 +766,8 @@ func (t *tableCommon) buildIndexForRow(ctx sessionctx.Context, rm kv.RetrieverMu // IterRecords implements table.Table IterRecords interface. func (t *tableCommon) IterRecords(ctx sessionctx.Context, startKey kv.Key, cols []*table.Column, fn table.RecordIterFunc) error { - it, err := ctx.Txn().Iter(startKey, nil) + prefix := t.RecordPrefix() + it, err := ctx.Txn().Iter(startKey, prefix.PrefixNext()) if err != nil { return errors.Trace(err) } @@ -782,7 +783,6 @@ func (t *tableCommon) IterRecords(ctx sessionctx.Context, startKey kv.Key, cols for _, col := range cols { colMap[col.ID] = &col.FieldType } - prefix := t.RecordPrefix() defaultVals := make([]types.Datum, len(cols)) for it.Valid() && it.Key().HasPrefix(prefix) { // first kv pair is row lock information. @@ -896,7 +896,7 @@ func (t *tableCommon) RebaseAutoID(ctx sessionctx.Context, newBase int64, isSetS // Seek implements table.Table Seek interface. func (t *tableCommon) Seek(ctx sessionctx.Context, h int64) (int64, bool, error) { seekKey := tablecodec.EncodeRowKeyWithHandle(t.physicalTableID, h) - iter, err := ctx.Txn().Iter(seekKey, nil) + iter, err := ctx.Txn().Iter(seekKey, t.RecordPrefix().PrefixNext()) if !iter.Valid() || !iter.Key().HasPrefix(t.RecordPrefix()) { // No more records in the table, skip to the end. return 0, false, nil diff --git a/util/admin/admin.go b/util/admin/admin.go index a76d5f416e8d1..a137c41ab410f 100644 --- a/util/admin/admin.go +++ b/util/admin/admin.go @@ -636,7 +636,10 @@ func rowWithCols(sessCtx sessionctx.Context, txn kv.Retriever, t table.Table, h // genExprs use to calculate generated column value. func iterRecords(sessCtx sessionctx.Context, retriever kv.Retriever, t table.Table, startKey kv.Key, cols []*table.Column, fn table.RecordIterFunc, genExprs map[string]expression.Expression) error { - it, err := retriever.Iter(startKey, nil) + prefix := t.RecordPrefix() + keyUpperBound := prefix.PrefixNext() + + it, err := retriever.Iter(startKey, keyUpperBound) if err != nil { return errors.Trace(err) } @@ -663,7 +666,6 @@ func iterRecords(sessCtx sessionctx.Context, retriever kv.Retriever, t table.Tab } } - prefix := t.RecordPrefix() for it.Valid() && it.Key().HasPrefix(prefix) { // first kv pair is row lock information. // TODO: check valid lock diff --git a/util/prefix_helper.go b/util/prefix_helper.go index a2c5d76dfd168..15fe13491291b 100644 --- a/util/prefix_helper.go +++ b/util/prefix_helper.go @@ -26,7 +26,7 @@ import ( // ScanMetaWithPrefix scans metadata with the prefix. func ScanMetaWithPrefix(retriever kv.Retriever, prefix kv.Key, filter func(kv.Key, []byte) bool) error { - iter, err := retriever.Iter(prefix, nil) + iter, err := retriever.Iter(prefix, prefix.PrefixNext()) if err != nil { return errors.Trace(err) } @@ -56,7 +56,7 @@ func ScanMetaWithPrefix(retriever kv.Retriever, prefix kv.Key, filter func(kv.Ke // DelKeyWithPrefix deletes keys with prefix. func DelKeyWithPrefix(rm kv.RetrieverMutator, prefix kv.Key) error { var keys []kv.Key - iter, err := rm.Iter(prefix, nil) + iter, err := rm.Iter(prefix, prefix.PrefixNext()) if err != nil { return errors.Trace(err) }