diff --git a/go.sum b/go.sum index 2793132de1e29..9259bb3784545 100644 --- a/go.sum +++ b/go.sum @@ -3,7 +3,6 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/StackExchange/wmi v0.0.0-20180725035823-b12b22c5341f h1:5ZfJxyXo8KyX8DgGXC5B7ILL8y51fci/qYz2B4j8iLY= github.com/StackExchange/wmi v0.0.0-20180725035823-b12b22c5341f/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= -github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0= @@ -14,7 +13,6 @@ github.com/blacktear23/go-proxyprotocol v0.0.0-20180807104634-af7a81e8dd0d/go.mo github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20171208011716-f6d7a1f6fbf3/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= -github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd h1:qMd81Ts1T2OTKmB4acZcyKaMtRnY5Y44NuXGX2GFJ1w= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= @@ -290,7 +288,6 @@ golang.org/x/tools v0.0.0-20190130214255-bb1329dc71a0/go.mod h1:n7NCudcB/nEzxVGm google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/genproto v0.0.0-20180608181217-32ee49c4dd80/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= -google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f h1:FU37niK8AQ59mHcskRyQL7H0ErSeNh650vdcj8HqdSI= google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190108161440-ae2f86662275 h1:9oFlwfEGIvmxXTcY53ygNyxIQtWciRHjrnUvZJCYXYU= google.golang.org/genproto v0.0.0-20190108161440-ae2f86662275/go.mod h1:7Ep/1NZk928CDR8SjdVbjWNpdIf6nzjE3BTgJDr2Atg= diff --git a/store/mockstore/mocktikv/mock_tikv_test.go b/store/mockstore/mocktikv/mock_tikv_test.go index 4664dda4d0251..f8222603622d6 100644 --- a/store/mockstore/mocktikv/mock_tikv_test.go +++ b/store/mockstore/mocktikv/mock_tikv_test.go @@ -507,7 +507,7 @@ func (s *testMockTiKVSuite) TestRollbackAndWriteConflict(c *C) { s.mustPutOK(c, "test", "test2", 5, 8) // simulate `getTxnStatus` for txn 2. - err := s.store.Cleanup([]byte("test"), 2) + err := s.store.Cleanup([]byte("test"), 2, math.MaxUint64) c.Assert(err, IsNil) req = &kvrpcpb.PrewriteRequest{ Mutations: putMutations("test", "test3"), @@ -614,7 +614,7 @@ func (s *testMVCCLevelDB) TestTxnHeartBeat(c *C) { c.Assert(ttl, Greater, uint64(300)) // The lock has already been clean up - c.Assert(s.store.Cleanup([]byte("pk"), 5), IsNil) + c.Assert(s.store.Cleanup([]byte("pk"), 5, 0), IsNil) _, err = s.store.TxnHeartBeat([]byte("pk"), 5, 1000) c.Assert(err, NotNil) } diff --git a/store/mockstore/mocktikv/mvcc.go b/store/mockstore/mocktikv/mvcc.go index 912be1819c227..543e6f4f92218 100644 --- a/store/mockstore/mocktikv/mvcc.go +++ b/store/mockstore/mocktikv/mvcc.go @@ -440,7 +440,7 @@ type MVCCStore interface { Prewrite(req *kvrpcpb.PrewriteRequest) []error Commit(keys [][]byte, startTS, commitTS uint64) error Rollback(keys [][]byte, startTS uint64) error - Cleanup(key []byte, startTS uint64) error + Cleanup(key []byte, startTS, currentTS uint64) error ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvrpcpb.LockInfo, error) TxnHeartBeat(primaryKey []byte, startTS uint64, adviseTTL uint64) (uint64, error) ResolveLock(startKey, endKey []byte, startTS, commitTS uint64) error diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index b6ef0424b9226..8753ad3033dda 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/goleveldb/leveldb/util" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/deadlock" "github.com/pingcap/tidb/util/logutil" @@ -727,6 +728,12 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mu } if ok { if dec.lock.startTS != startTS { + if isPessimisticLock { + // NOTE: A special handling. + // When pessimistic txn prewrite meets lock, set the TTL = 0 means + // telling TiDB to rollback the transaction **unconditionly**. + dec.lock.ttl = 0 + } return dec.lock.lockErr(mutation.Key) } if dec.lock.op != kvrpcpb.Op_PessimisticLock { @@ -954,7 +961,8 @@ func getTxnCommitInfo(iter *Iterator, expectKey []byte, startTS uint64) (mvccVal } // Cleanup implements the MVCCStore interface. -func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS uint64) error { +// Cleanup API is deprecated, use CheckTxnStatus instead. +func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error { mvcc.mu.Lock() defer func() { mvcc.mu.Unlock() @@ -962,11 +970,64 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS uint64) error { }() batch := &leveldb.Batch{} - err := rollbackKey(mvcc.db, batch, key, startTS) + startKey := mvccEncode(key, lockVer) + iter := newIterator(mvcc.db, &util.Range{ + Start: startKey, + }) + defer iter.Release() + + if iter.Valid() { + dec := lockDecoder{ + expectKey: key, + } + ok, err := dec.Decode(iter) + if err != nil { + return err + } + // If current transaction's lock exists. + if ok && dec.lock.startTS == startTS { + + // If the lock has already outdated, clean up it. + if currentTS == 0 || uint64(oracle.ExtractPhysical(dec.lock.startTS))+dec.lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) { + if err = rollbackLock(batch, dec.lock, key, startTS); err != nil { + return err + } + return mvcc.db.Write(batch, nil) + } + + // Otherwise, return a locked error with the TTL information. + return dec.lock.lockErr(key) + } + + // If current transaction's lock does not exist. + // If the commit information of the current transaction exist. + c, ok, err := getTxnCommitInfo(iter, key, startTS) + if err != nil { + return errors.Trace(err) + } + if ok { + // If the current transaction has already committed. + if c.valueType != typeRollback { + return ErrAlreadyCommitted(c.commitTS) + } + // If the current transaction has already rollbacked. + return nil + } + } + + // If current transaction is not prewritted before. + value := mvccValue{ + valueType: typeRollback, + startTS: startTS, + commitTS: startTS, + } + writeKey := mvccEncode(key, startTS) + writeValue, err := value.MarshalBinary() if err != nil { return errors.Trace(err) } - return mvcc.db.Write(batch, nil) + batch.Put(writeKey, writeValue) + return nil } // TxnHeartBeat implements the MVCCStore interface. diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index b07c3c36a1f0d..05bc593b60e28 100755 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -357,7 +357,7 @@ func (h *rpcHandler) handleKvCleanup(req *kvrpcpb.CleanupRequest) *kvrpcpb.Clean panic("KvCleanup: key not in region") } var resp kvrpcpb.CleanupResponse - err := h.mvccStore.Cleanup(req.Key, req.GetStartVersion()) + err := h.mvccStore.Cleanup(req.Key, req.GetStartVersion(), req.GetCurrentTs()) if err != nil { if commitTS, ok := errors.Cause(err).(ErrAlreadyCommitted); ok { resp.CommitVersion = uint64(commitTS) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 42bf901eb195d..935060ff72d88 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -103,14 +103,17 @@ func NewLockResolver(etcdAddrs []string, security config.Security) (*LockResolve return s.lockResolver, nil } -// TxnStatus represents a txn's final status. It should be Commit or Rollback. -type TxnStatus uint64 +// TxnStatus represents a txn's final status. It should be Lock or Commit or Rollback. +type TxnStatus struct { + ttl uint64 + commitTS uint64 +} // IsCommitted returns true if the txn's final status is Commit. -func (s TxnStatus) IsCommitted() bool { return s > 0 } +func (s TxnStatus) IsCommitted() bool { return s.ttl == 0 && s.commitTS > 0 } // CommitTS returns the txn's commitTS. It is valid iff `IsCommitted` is true. -func (s TxnStatus) CommitTS() uint64 { return uint64(s) } +func (s TxnStatus) CommitTS() uint64 { return uint64(s.commitTS) } // By default, locks after 3000ms is considered unusual (the client created the // lock might be dead). Other client may cleanup this kind of lock. @@ -171,7 +174,8 @@ func (lr *LockResolver) getResolved(txnID uint64) (TxnStatus, bool) { return s, ok } -// BatchResolveLocks resolve locks in a batch +// BatchResolveLocks resolve locks in a batch. +// Used it in gcworker only! func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc RegionVerID) (bool, error) { if len(locks) == 0 { return true, nil @@ -179,7 +183,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi tikvLockResolverCountWithBatchResolve.Inc() - var expiredLocks []*Lock + expiredLocks := make([]*Lock, 0, len(locks)) for _, l := range locks { if lr.store.GetOracle().IsExpired(l.TxnID, l.TTL) { tikvLockResolverCountWithExpired.Inc() @@ -202,11 +206,11 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi continue } - status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary) + status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0) if err != nil { return false, errors.Trace(err) } - txnInfos[l.TxnID] = uint64(status) + txnInfos[l.TxnID] = uint64(status.commitTS) } logutil.Logger(context.Background()).Info("BatchResolveLocks: lookup txn status", zap.Duration("cost time", time.Since(startTime)), @@ -268,9 +272,10 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi // commit status. // 3) Send `ResolveLock` cmd to the lock's region to resolve all locks belong to // the same transaction. -func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (msBeforeTxnExpired int64, err error) { +func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (int64, error) { + var msBeforeTxnExpired txnExpireTime if len(locks) == 0 { - return + return msBeforeTxnExpired.value(), nil } tikvLockResolverCountWithResolve.Inc() @@ -279,61 +284,111 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (msBeforeTxnE for _, l := range locks { msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL) if msBeforeLockExpired <= 0 { - tikvLockResolverCountWithExpired.Inc() expiredLocks = append(expiredLocks, l) } else { - if msBeforeTxnExpired == 0 || msBeforeLockExpired < msBeforeTxnExpired { - msBeforeTxnExpired = msBeforeLockExpired - } + msBeforeTxnExpired.update(int64(l.TTL)) tikvLockResolverCountWithNotExpired.Inc() } } - if len(expiredLocks) == 0 { - if msBeforeTxnExpired > 0 { - tikvLockResolverCountWithWaitExpired.Inc() - } - return - } - // TxnID -> []Region, record resolved Regions. // TODO: Maybe put it in LockResolver and share by all txns. cleanTxns := make(map[uint64]map[RegionVerID]struct{}) for _, l := range expiredLocks { - var status TxnStatus - status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary) + status, err := lr.getTxnStatusFromLock(bo, l) if err != nil { - msBeforeTxnExpired = 0 + msBeforeTxnExpired.update(0) err = errors.Trace(err) - return + return msBeforeTxnExpired.value(), err } - cleanRegions, exists := cleanTxns[l.TxnID] - if !exists { - cleanRegions = make(map[RegionVerID]struct{}) - cleanTxns[l.TxnID] = cleanRegions - } + if status.ttl == 0 { + tikvLockResolverCountWithExpired.Inc() + // If the lock is committed or rollbacked, resolve lock. + cleanRegions, exists := cleanTxns[l.TxnID] + if !exists { + cleanRegions = make(map[RegionVerID]struct{}) + cleanTxns[l.TxnID] = cleanRegions + } - err = lr.resolveLock(bo, l, status, cleanRegions) - if err != nil { - msBeforeTxnExpired = 0 - err = errors.Trace(err) - return + err = lr.resolveLock(bo, l, status, cleanRegions) + if err != nil { + msBeforeTxnExpired.update(0) + err = errors.Trace(err) + return msBeforeTxnExpired.value(), err + } + } else { + tikvLockResolverCountWithNotExpired.Inc() + // If the lock is valid, the txn may be a pessimistic transaction. + // Update the txn expire time. + msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, status.ttl) + msBeforeTxnExpired.update(msBeforeLockExpired) } } + + if msBeforeTxnExpired.value() > 0 { + tikvLockResolverCountWithWaitExpired.Inc() + } + return msBeforeTxnExpired.value(), nil +} + +type txnExpireTime struct { + initialized bool + txnExpire int64 +} + +func (t *txnExpireTime) update(lockExpire int64) { + if lockExpire <= 0 { + lockExpire = 0 + } + if !t.initialized { + t.txnExpire = lockExpire + t.initialized = true + return + } + if lockExpire < t.txnExpire { + t.txnExpire = lockExpire + } return } +func (t *txnExpireTime) value() int64 { + if !t.initialized { + return 0 + } + return t.txnExpire +} + // GetTxnStatus queries tikv-server for a txn's status (commit/rollback). // If the primary key is still locked, it will launch a Rollback to abort it. // To avoid unnecessarily aborting too many txns, it is wiser to wait a few // seconds before calling it after Prewrite. func (lr *LockResolver) GetTxnStatus(txnID uint64, primary []byte) (TxnStatus, error) { + var status TxnStatus bo := NewBackoffer(context.Background(), cleanupMaxBackoff) - status, err := lr.getTxnStatus(bo, txnID, primary) - return status, errors.Trace(err) + currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx) + if err != nil { + return status, err + } + return lr.getTxnStatus(bo, txnID, primary, currentTS) +} + +func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock) (TxnStatus, error) { + // NOTE: l.TTL = 0 is a special protocol!!! + // When the pessimistic txn prewrite meets locks of a txn, it should rollback that txn **unconditionally**. + // In this case, TiKV set the lock TTL = 0, and TiDB use currentTS = 0 to call + // getTxnStatus, and getTxnStatus with currentTS = 0 would rollback the transaction. + if l.TTL == 0 { + return lr.getTxnStatus(bo, l.TxnID, l.Primary, 0) + } + + currentTS, err := lr.store.GetOracle().GetLowResolutionTimestamp(bo.ctx) + if err != nil { + return TxnStatus{}, err + } + return lr.getTxnStatus(bo, l.TxnID, l.Primary, currentTS) } -func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte) (TxnStatus, error) { +func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, currentTS uint64) (TxnStatus, error) { if s, ok := lr.getResolved(txnID); ok { return s, nil } @@ -346,6 +401,7 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte Cleanup: &kvrpcpb.CleanupRequest{ Key: primary, StartVersion: txnID, + CurrentTs: currentTS, }, } for { @@ -373,12 +429,18 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte return status, errors.Trace(ErrBodyMissing) } if keyErr := cmdResp.GetError(); keyErr != nil { + // If the TTL of the primary lock is not outdated, the proto returns a ErrLocked contains the TTL. + if lockInfo := keyErr.GetLocked(); lockInfo != nil { + status.ttl = lockInfo.LockTtl + status.commitTS = 0 + return status, nil + } err = errors.Errorf("unexpected cleanup err: %s, tid: %v", keyErr, txnID) logutil.Logger(context.Background()).Error("getTxnStatus error", zap.Error(err)) return status, err } if cmdResp.CommitVersion != 0 { - status = TxnStatus(cmdResp.GetCommitVersion()) + status = TxnStatus{0, cmdResp.GetCommitVersion()} tikvLockResolverCountWithQueryTxnStatusCommitted.Inc() } else { tikvLockResolverCountWithQueryTxnStatusRolledBack.Inc() diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index 3861e3ed3141d..fbab00265ff37 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -200,6 +200,43 @@ func (s *testLockSuite) TestGetTxnStatus(c *C) { status, err = s.store.lockResolver.GetTxnStatus(startTS, []byte("a")) c.Assert(err, IsNil) c.Assert(status.IsCommitted(), IsFalse) + c.Assert(status.ttl, Greater, uint64(0)) +} + +func (s *testLockSuite) TestCheckTxnStatusTTL(c *C) { + txn, err := s.store.Begin() + c.Assert(err, IsNil) + txn.Set(kv.Key("key"), []byte("value")) + s.prewriteTxn(c, txn.(*tikvTxn)) + + // Check the lock TTL of a transaction. + bo := NewBackoffer(context.Background(), prewriteMaxBackoff) + lr := newLockResolver(s.store) + status, err := lr.GetTxnStatus(txn.StartTS(), []byte("key")) + c.Assert(err, IsNil) + c.Assert(status.IsCommitted(), IsFalse) + c.Assert(status.ttl, Greater, uint64(0)) + c.Assert(status.CommitTS(), Equals, uint64(0)) + + // Rollback the txn. + lock := s.mustGetLock(c, []byte("key")) + status = TxnStatus{} + cleanRegions := make(map[RegionVerID]struct{}) + err = newLockResolver(s.store).resolveLock(bo, lock, status, cleanRegions) + c.Assert(err, IsNil) + + // Check its status is rollbacked. + status, err = lr.GetTxnStatus(txn.StartTS(), []byte("key")) + c.Assert(err, IsNil) + c.Assert(status.ttl, Equals, uint64(0)) + c.Assert(status.commitTS, Equals, uint64(0)) + + // Check a committed txn. + startTS, commitTS := s.putKV(c, []byte("a"), []byte("a")) + status, err = lr.GetTxnStatus(startTS, []byte("a")) + c.Assert(err, IsNil) + c.Assert(status.ttl, Equals, uint64(0)) + c.Assert(status.commitTS, Equals, commitTS) } func (s *testLockSuite) TestTxnHeartBeat(c *C) { @@ -218,9 +255,10 @@ func (s *testLockSuite) TestTxnHeartBeat(c *C) { c.Assert(newTTL, Equals, uint64(666)) // The getTxnStatus API is confusing, it really means rollback! - status, err := newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key")) + status, err := newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), 0) c.Assert(err, IsNil) - c.Assert(status, Equals, TxnStatus(0)) + c.Assert(status.ttl, Equals, uint64(0)) + c.Assert(status.commitTS, Equals, uint64(0)) newTTL, err = sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 666) c.Assert(err, NotNil)