Skip to content

Commit

Permalink
tx throttler: remove unused topology watchers (vitessio#14412)
Browse files Browse the repository at this point in the history
Signed-off-by: deepthi <[email protected]>
  • Loading branch information
deepthi authored and timvaillancourt committed May 16, 2024
1 parent 098db61 commit 65333f6
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 154 deletions.
30 changes: 16 additions & 14 deletions go/vt/discovery/replicationlag.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ func SetMinNumTablets(numTablets int) {
minNumTablets = numTablets
}

// IsReplicationLagHigh verifies that the given LegacytabletHealth refers to a tablet with high
// IsReplicationLagHigh verifies that the given TabletHealth refers to a tablet with high
// replication lag, i.e. higher than the configured discovery_low_replication_lag flag.
func IsReplicationLagHigh(tabletHealth *TabletHealth) bool {
return float64(tabletHealth.Stats.ReplicationLagSeconds) > lowReplicationLag.Seconds()
}

// IsReplicationLagVeryHigh verifies that the given LegacytabletHealth refers to a tablet with very high
// IsReplicationLagVeryHigh verifies that the given TabletHealth refers to a tablet with very high
// replication lag, i.e. higher than the configured discovery_high_replication_lag_minimum_serving flag.
func IsReplicationLagVeryHigh(tabletHealth *TabletHealth) bool {
return float64(tabletHealth.Stats.ReplicationLagSeconds) > highReplicationLagMinServing.Seconds()
Expand Down Expand Up @@ -117,7 +117,7 @@ func FilterStatsByReplicationLag(tabletHealthList []*TabletHealth) []*TabletHeal
return filterStatsByLag(tabletHealthList)
}
res := filterStatsByLagWithLegacyAlgorithm(tabletHealthList)
// run the filter again if exactly one tablet is removed,
// Run the filter again if exactly one tablet is removed,
// and we have spare tablets.
if len(res) > minNumTablets && len(res) == len(tabletHealthList)-1 {
res = filterStatsByLagWithLegacyAlgorithm(res)
Expand All @@ -128,12 +128,12 @@ func FilterStatsByReplicationLag(tabletHealthList []*TabletHealth) []*TabletHeal

func filterStatsByLag(tabletHealthList []*TabletHealth) []*TabletHealth {
list := make([]tabletLagSnapshot, 0, len(tabletHealthList))
// filter non-serving tablets and those with very high replication lag
// Filter out non-serving tablets and those with very high replication lag.
for _, ts := range tabletHealthList {
if !ts.Serving || ts.LastError != nil || ts.Stats == nil || IsReplicationLagVeryHigh(ts) {
continue
}
// Pull the current replication lag for a stable sort later.
// Save the current replication lag for a stable sort later.
list = append(list, tabletLagSnapshot{
ts: ts,
replag: ts.Stats.ReplicationLagSeconds})
Expand All @@ -142,7 +142,7 @@ func filterStatsByLag(tabletHealthList []*TabletHealth) []*TabletHealth {
// Sort by replication lag.
sort.Sort(tabletLagSnapshotList(list))

// Pick those with low replication lag, but at least minNumTablets tablets regardless.
// Pick tablets with low replication lag, but at least minNumTablets tablets regardless.
res := make([]*TabletHealth, 0, len(list))
for i := 0; i < len(list); i++ {
if !IsReplicationLagHigh(list[i].ts) || i < minNumTablets {
Expand All @@ -154,7 +154,7 @@ func filterStatsByLag(tabletHealthList []*TabletHealth) []*TabletHealth {

func filterStatsByLagWithLegacyAlgorithm(tabletHealthList []*TabletHealth) []*TabletHealth {
list := make([]*TabletHealth, 0, len(tabletHealthList))
// filter non-serving tablets
// Filter out non-serving tablets.
for _, ts := range tabletHealthList {
if !ts.Serving || ts.LastError != nil || ts.Stats == nil {
continue
Expand All @@ -164,7 +164,7 @@ func filterStatsByLagWithLegacyAlgorithm(tabletHealthList []*TabletHealth) []*Ta
if len(list) <= 1 {
return list
}
// if all have low replication lag (<=30s), return all tablets.
// If all tablets have low replication lag (<=30s), return all of them.
allLowLag := true
for _, ts := range list {
if IsReplicationLagHigh(ts) {
Expand All @@ -175,12 +175,12 @@ func filterStatsByLagWithLegacyAlgorithm(tabletHealthList []*TabletHealth) []*Ta
if allLowLag {
return list
}
// filter those affecting "mean" lag significantly
// calculate mean for all tablets
// We want to filter out tablets that are affecting "mean" lag significantly.
// We first calculate the mean across all tablets.
res := make([]*TabletHealth, 0, len(list))
m, _ := mean(list, -1)
for i, ts := range list {
// calculate mean by excluding ith tablet
// Now we calculate the mean by excluding ith tablet
mi, _ := mean(list, i)
if float64(mi) > float64(m)*0.7 {
res = append(res, ts)
Expand All @@ -189,9 +189,11 @@ func filterStatsByLagWithLegacyAlgorithm(tabletHealthList []*TabletHealth) []*Ta
if len(res) >= minNumTablets {
return res
}
// return at least minNumTablets tablets to avoid over loading,
// if there is enough tablets with replication lag < highReplicationLagMinServing.
// Pull the current replication lag for a stable sort.

// We want to return at least minNumTablets tablets to avoid overloading,
// as long as there are enough tablets with replication lag < highReplicationLagMinServing.

// Save the current replication lag for a stable sort.
snapshots := make([]tabletLagSnapshot, 0, len(list))
for _, ts := range list {
if !IsReplicationLagVeryHigh(ts) {
Expand Down
70 changes: 34 additions & 36 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ var (
"Operation", topologyWatcherOpListTablets, topologyWatcherOpGetTablet)
)

// tabletInfo is used internally by the TopologyWatcher class
// tabletInfo is used internally by the TopologyWatcher struct.
type tabletInfo struct {
alias string
tablet *topodata.Tablet
}

// TopologyWatcher polls tablet from a configurable set of tablets
// periodically. When tablets are added / removed, it calls
// the LegacyTabletRecorder AddTablet / RemoveTablet interface appropriately.
// TopologyWatcher polls the topology periodically for changes to
// the set of tablets. When tablets are added / removed / modified,
// it calls the AddTablet / RemoveTablet interface appropriately.
type TopologyWatcher struct {
// set at construction time
topoServer *topo.Server
Expand All @@ -80,20 +80,21 @@ type TopologyWatcher struct {

// mu protects all variables below
mu sync.Mutex
// tablets contains a map of alias -> tabletInfo for all known tablets
// tablets contains a map of alias -> tabletInfo for all known tablets.
tablets map[string]*tabletInfo
// topoChecksum stores a crc32 of the tablets map and is exported as a metric
// topoChecksum stores a crc32 of the tablets map and is exported as a metric.
topoChecksum uint32
// lastRefresh records the timestamp of the last topo refresh
// lastRefresh records the timestamp of the last refresh of the topology.
lastRefresh time.Time
// firstLoadDone is true when first load of the topology data is done.
// firstLoadDone is true when the initial load of the topology data is complete.
firstLoadDone bool
// firstLoadChan is closed when the initial loading of topology data is done.
// firstLoadChan is closed when the initial load of topology data is complete.
firstLoadChan chan struct{}
}

// NewTopologyWatcher returns a TopologyWatcher that monitors all
// the tablets in a cell, and starts refreshing.
// the tablets that it is configured to watch, and reloads them periodically if needed.
// As of now there is only one implementation: watch all tablets in a cell.
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, filter TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)) *TopologyWatcher {
tw := &TopologyWatcher{
topoServer: topoServer,
Expand All @@ -115,14 +116,14 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthC
}

// NewCellTabletsWatcher returns a TopologyWatcher that monitors all
// the tablets in a cell, and starts refreshing.
// the tablets in a cell, and reloads them as needed.
func NewCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher {
return NewTopologyWatcher(ctx, topoServer, hc, f, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) {
return tw.topoServer.GetTabletAliasesByCell(ctx, tw.cell)
})
}

// Start starts the topology watcher
// Start starts the topology watcher.
func (tw *TopologyWatcher) Start() {
tw.wg.Add(1)
go func(t *TopologyWatcher) {
Expand All @@ -140,7 +141,7 @@ func (tw *TopologyWatcher) Start() {
}(tw)
}

// Stop stops the watcher. It does not clean up the tablets added to LegacyTabletRecorder.
// Stop stops the watcher. It does not clean up the tablets added to HealthCheck.
func (tw *TopologyWatcher) Stop() {
tw.cancelFunc()
// wait for watch goroutine to finish.
Expand All @@ -151,7 +152,7 @@ func (tw *TopologyWatcher) loadTablets() {
var wg sync.WaitGroup
newTablets := make(map[string]*tabletInfo)

// first get the list of relevant tabletAliases
// First get the list of relevant tabletAliases.
tabletAliases, err := tw.getTablets(tw)
topologyWatcherOperations.Add(topologyWatcherOpListTablets, 1)
if err != nil {
Expand All @@ -166,7 +167,7 @@ func (tw *TopologyWatcher) loadTablets() {
}

// Accumulate a list of all known alias strings to use later
// when sorting
// when sorting.
tabletAliasStrs := make([]string, 0, len(tabletAliases))

tw.mu.Lock()
Expand All @@ -175,7 +176,7 @@ func (tw *TopologyWatcher) loadTablets() {
tabletAliasStrs = append(tabletAliasStrs, aliasStr)

if !tw.refreshKnownTablets {
// we already have a tabletInfo for this and the flag tells us to not refresh
// We already have a tabletInfo for this and the flag tells us to not refresh.
if val, ok := tw.tablets[aliasStr]; ok {
newTablets[aliasStr] = val
continue
Expand All @@ -188,7 +189,7 @@ func (tw *TopologyWatcher) loadTablets() {
tw.sem <- 1 // Wait for active queue to drain.
tablet, err := tw.topoServer.GetTablet(tw.ctx, alias)
topologyWatcherOperations.Add(topologyWatcherOpGetTablet, 1)
<-tw.sem // Done; enable next request to run
<-tw.sem // Done; enable next request to run.
if err != nil {
topologyWatcherErrors.Add(topologyWatcherOpGetTablet, 1)
select {
Expand Down Expand Up @@ -218,7 +219,7 @@ func (tw *TopologyWatcher) loadTablets() {
continue
}

// trust the alias from topo and add it if it doesn't exist
// Trust the alias from topo and add it if it doesn't exist.
if val, ok := tw.tablets[alias]; ok {
// check if the host and port have changed. If yes, replace tablet.
oldKey := TabletToMapKey(val.tablet)
Expand All @@ -230,7 +231,7 @@ func (tw *TopologyWatcher) loadTablets() {
topologyWatcherOperations.Add(topologyWatcherOpReplaceTablet, 1)
}
} else {
// This is a new tablet record, let's add it to the healthcheck
// This is a new tablet record, let's add it to the HealthCheck.
tw.healthcheck.AddTablet(newVal.tablet)
topologyWatcherOperations.Add(topologyWatcherOpAddTablet, 1)
}
Expand All @@ -252,8 +253,8 @@ func (tw *TopologyWatcher) loadTablets() {
close(tw.firstLoadChan)
}

// iterate through the tablets in a stable order and compute a
// checksum of the tablet map
// Iterate through the tablets in a stable order and compute a
// checksum of the tablet map.
sort.Strings(tabletAliasStrs)
var buf bytes.Buffer
for _, alias := range tabletAliasStrs {
Expand All @@ -269,15 +270,15 @@ func (tw *TopologyWatcher) loadTablets() {

}

// RefreshLag returns the time since the last refresh
// RefreshLag returns the time since the last refresh.
func (tw *TopologyWatcher) RefreshLag() time.Duration {
tw.mu.Lock()
defer tw.mu.Unlock()

return time.Since(tw.lastRefresh)
}

// TopoChecksum returns the checksum of the current state of the topo
// TopoChecksum returns the checksum of the current state of the topo.
func (tw *TopologyWatcher) TopoChecksum() uint32 {
tw.mu.Lock()
defer tw.mu.Unlock()
Expand All @@ -286,7 +287,7 @@ func (tw *TopologyWatcher) TopoChecksum() uint32 {
}

// TabletFilter is an interface that can be given to a TopologyWatcher
// to be applied as an additional filter on the list of tablets returned by its getTablets function
// to be applied as an additional filter on the list of tablets returned by its getTablets function.
type TabletFilter interface {
// IsIncluded returns whether tablet is included in this filter
IsIncluded(tablet *topodata.Tablet) bool
Expand All @@ -300,18 +301,18 @@ type FilterByShard struct {
}

// filterShard describes a filter for a given shard or keyrange inside
// a keyspace
// a keyspace.
type filterShard struct {
keyspace string
shard string
keyRange *topodata.KeyRange // only set if shard is also a KeyRange
}

// NewFilterByShard creates a new FilterByShard on top of an existing
// LegacyTabletRecorder. Each filter is a keyspace|shard entry, where shard
// NewFilterByShard creates a new FilterByShard for use by a
// TopologyWatcher. Each filter is a keyspace|shard entry, where shard
// can either be a shard name, or a keyrange. All tablets that match
// at least one keyspace|shard tuple will be forwarded to the
// underlying LegacyTabletRecorder.
// at least one keyspace|shard tuple will be forwarded by the
// TopologyWatcher to its consumer.
func NewFilterByShard(filters []string) (*FilterByShard, error) {
m := make(map[string][]*filterShard)
for _, filter := range filters {
Expand Down Expand Up @@ -348,8 +349,7 @@ func NewFilterByShard(filters []string) (*FilterByShard, error) {
}, nil
}

// IsIncluded returns true iff the tablet's keyspace and shard should be
// forwarded to the underlying LegacyTabletRecorder.
// IsIncluded returns true iff the tablet's keyspace and shard match what we have.
func (fbs *FilterByShard) IsIncluded(tablet *topodata.Tablet) bool {
canonical, kr, err := topo.ValidateShardName(tablet.Shard)
if err != nil {
Expand All @@ -370,15 +370,14 @@ func (fbs *FilterByShard) IsIncluded(tablet *topodata.Tablet) bool {
return false
}

// FilterByKeyspace is a filter that filters tablets by
// keyspace
// FilterByKeyspace is a filter that filters tablets by keyspace.
type FilterByKeyspace struct {
keyspaces map[string]bool
}

// NewFilterByKeyspace creates a new FilterByKeyspace.
// Each filter is a keyspace entry. All tablets that match
// a keyspace will be forwarded to the underlying LegacyTabletRecorder.
// a keyspace will be forwarded to the TopologyWatcher's consumer.
func NewFilterByKeyspace(selectedKeyspaces []string) *FilterByKeyspace {
m := make(map[string]bool)
for _, keyspace := range selectedKeyspaces {
Expand All @@ -390,8 +389,7 @@ func NewFilterByKeyspace(selectedKeyspaces []string) *FilterByKeyspace {
}
}

// IsIncluded returns true if the tablet's keyspace should be
// forwarded to the underlying LegacyTabletRecorder.
// IsIncluded returns true if the tablet's keyspace matches what we have.
func (fbk *FilterByKeyspace) IsIncluded(tablet *topodata.Tablet) bool {
_, exist := fbk.keyspaces[tablet.Keyspace]
return exist
Expand Down

This file was deleted.

Loading

0 comments on commit 65333f6

Please sign in to comment.