Skip to content

Commit

Permalink
precompute min safe ts (#622)
Browse files Browse the repository at this point in the history
Signed-off-by: hihihuhu <[email protected]>
  • Loading branch information
hihihuhu authored Dec 2, 2022
1 parent 5dc09b1 commit 50651d6
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 27 deletions.
10 changes: 10 additions & 0 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2367,6 +2367,16 @@ func isStoreContainLabel(labels []*metapb.StoreLabel, key string, val string) (r
return res
}

// GetLabelValue returns the value of the label
func (s *Store) GetLabelValue(key string) (string, bool) {
for _, label := range s.labels {
if label.Key == key {
return label.Value, true
}
}
return "", false
}

// getLivenessState gets the cached liveness state of the store.
// When it's not reachable, a goroutine will update the state in background.
// To get the accurate liveness state, use checkLiveness instead.
Expand Down
51 changes: 24 additions & 27 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pkg/errors"
"github.com/tikv/client-go/v2/config"
tikverr "github.com/tikv/client-go/v2/error"
Expand Down Expand Up @@ -122,6 +121,9 @@ type KVStore struct {
// it indicates the safe timestamp point that can be used to read consistent but may not the latest data.
safeTSMap sync.Map

// MinSafeTs stores the minimum ts value for each txnScope
minSafeTS sync.Map

replicaReadSeed uint32 // this is used to load balance followers / learners when replica read is enabled

ctx context.Context
Expand Down Expand Up @@ -438,23 +440,10 @@ func (s *KVStore) GetTiKVClient() (client Client) {

// GetMinSafeTS return the minimal safeTS of the storage with given txnScope.
func (s *KVStore) GetMinSafeTS(txnScope string) uint64 {
stores := make([]*locate.Store, 0)
allStores := s.regionCache.GetStoresByType(tikvrpc.TiKV)
if txnScope != oracle.GlobalTxnScope {
for _, store := range allStores {
if store.IsLabelsMatch([]*metapb.StoreLabel{
{
Key: DCLabelKey,
Value: txnScope,
},
}) {
stores = append(stores, store)
}
}
} else {
stores = allStores
if val, ok := s.minSafeTS.Load(txnScope); ok {
return val.(uint64)
}
return s.getMinSafeTSByStores(stores)
return 0
}

// Ctx returns ctx.
Expand Down Expand Up @@ -495,18 +484,14 @@ func (s *KVStore) setSafeTS(storeID, safeTS uint64) {
s.safeTSMap.Store(storeID, safeTS)
}

func (s *KVStore) getMinSafeTSByStores(stores []*locate.Store) uint64 {
if val, err := util.EvalFailpoint("injectSafeTS"); err == nil {
injectTS := val.(int)
return uint64(injectTS)
}
func (s *KVStore) updateMinSafeTS(txnScope string, storeIDs []uint64) {
minSafeTS := uint64(math.MaxUint64)
// when there is no store, return 0 in order to let minStartTS become startTS directly
if len(stores) < 1 {
return 0
if len(storeIDs) < 1 {
s.minSafeTS.Store(txnScope, 0)
}
for _, store := range stores {
ok, safeTS := s.getSafeTS(store.StoreID())
for _, store := range storeIDs {
ok, safeTS := s.getSafeTS(store)
if ok {
if safeTS != 0 && safeTS < minSafeTS {
minSafeTS = safeTS
Expand All @@ -515,7 +500,7 @@ func (s *KVStore) getMinSafeTSByStores(stores []*locate.Store) uint64 {
minSafeTS = 0
}
}
return minSafeTS
s.minSafeTS.Store(txnScope, minSafeTS)
}

func (s *KVStore) safeTSUpdater() {
Expand Down Expand Up @@ -571,6 +556,18 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
metrics.TiKVMinSafeTSGapSeconds.WithLabelValues(storeIDStr).Set(time.Since(safeTSTime).Seconds())
}(ctx, wg, storeID, storeAddr)
}

txnScopeMap := make(map[string][]uint64)
for _, store := range stores {
txnScopeMap[oracle.GlobalTxnScope] = append(txnScopeMap[oracle.GlobalTxnScope], store.StoreID())

if label, ok := store.GetLabelValue(DCLabelKey); ok {
txnScopeMap[label] = append(txnScopeMap[label], store.StoreID())
}
}
for txnScope, storeIDs := range txnScopeMap {
s.updateMinSafeTS(txnScope, storeIDs)
}
wg.Wait()
}

Expand Down

0 comments on commit 50651d6

Please sign in to comment.