Skip to content

Commit

Permalink
Merge pull request #2286 from openziti/link-metric-gc
Browse files Browse the repository at this point in the history
Implement a link metric gc to fix occasional left behind metrics for links. Fixes #2285
  • Loading branch information
plorenz authored Aug 8, 2024
2 parents fb815b1 + 3889d1d commit c221956
Show file tree
Hide file tree
Showing 5 changed files with 452 additions and 19 deletions.
33 changes: 20 additions & 13 deletions router/link/link_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,7 @@ func (self *linkDestUpdate) Handle(registry *linkRegistryImpl) {
becameHealthy := false

if dest == nil {
dest = &linkDest{
id: self.id,
healthy: true,
unhealthyAt: time.Time{},
linkMap: map[string]*linkState{},
}
dest = newLinkDest(self.id)
registry.destinations[self.id] = dest
} else {
if !dest.healthy && self.healthy {
Expand Down Expand Up @@ -153,13 +148,8 @@ func (self *dialRequest) Handle(registry *linkRegistryImpl) {
dest := registry.destinations[self.dial.RouterId]

if dest == nil {
dest = &linkDest{
id: self.dial.RouterId,
healthy: true,
unhealthyAt: time.Time{},
linkMap: map[string]*linkState{},
version: self.dial.RouterVersion,
}
dest = newLinkDest(self.dial.RouterId)
dest.version = self.dial.RouterVersion
registry.destinations[self.dial.RouterId] = dest
}

Expand Down Expand Up @@ -379,3 +369,20 @@ func (self *markFaultedLinksNotified) Handle(*linkRegistryImpl) {
}
}
}

type scanForLinkIdEvent struct {
linkId string
resultC chan bool
}

func (self *scanForLinkIdEvent) Handle(r *linkRegistryImpl) {
for _, dest := range r.destinations {
for _, state := range dest.linkMap {
if state.linkId == self.linkId {
self.resultC <- true
return
}
}
}
self.resultC <- false
}
80 changes: 80 additions & 0 deletions router/link/link_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ import (
"github.com/openziti/foundation/v2/debugz"
"github.com/openziti/foundation/v2/goroutines"
"github.com/openziti/identity"
"github.com/openziti/metrics"
"github.com/openziti/ziti/common/capabilities"
"github.com/openziti/ziti/common/inspect"
"github.com/openziti/ziti/common/pb/ctrl_pb"
"github.com/openziti/ziti/router/env"
"github.com/openziti/ziti/router/xlink"
"github.com/sirupsen/logrus"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -43,6 +45,7 @@ type Env interface {
GetCloseNotify() <-chan struct{}
GetLinkDialerPool() goroutines.Pool
GetRateLimiterPool() goroutines.Pool
GetMetricsRegistry() metrics.UsageRegistry
}

func NewLinkRegistry(routerEnv Env) xlink.Registry {
Expand All @@ -58,6 +61,7 @@ func NewLinkRegistry(routerEnv Env) xlink.Registry {
}

go result.run()
go result.runGcLinkMetricsLoop()

return result
}
Expand All @@ -77,6 +81,82 @@ type linkRegistryImpl struct {
notifyInProgress atomic.Bool
}

func (self *linkRegistryImpl) runGcLinkMetricsLoop() {
var lastRunResults map[string]metrics.Metric

ticker := time.NewTicker(time.Minute)
defer ticker.Stop()

for {
select {
case <-ticker.C:
lastRunResults = self.gcLinkMetrics(lastRunResults)
case <-self.env.GetCloseNotify():
return
}
}
}

func (self *linkRegistryImpl) gcLinkMetrics(lastRunResults map[string]metrics.Metric) map[string]metrics.Metric {
for linkId, metric := range lastRunResults {
// If the metrics we found last time are still tied to a non-existent link, dispose of them
if !self.IsKnownLinkId(linkId) {
metric.Dispose()
}
}
return self.getOrphanedLinkMetrics()
}

func (self *linkRegistryImpl) getOrphanedLinkMetrics() map[string]metrics.Metric {
result := map[string]metrics.Metric{}
self.env.GetMetricsRegistry().EachMetric(func(name string, metric metrics.Metric) {
if strings.HasPrefix(name, "link.") {
base := strings.TrimPrefix(name, "link.")
var linkId string
if strings.ContainsRune(base, ':') {
parts := strings.Split(base, ":")
if len(parts) != 2 {
return
}
linkId = parts[1]
} else {
linkId = strings.Split(base, ".")[0]
}

if !self.IsKnownLinkId(linkId) {
result[name] = metric
}
}
})

return result
}

func (self *linkRegistryImpl) IsKnownLinkId(linkId string) bool {
if _, found := self.GetLinkById(linkId); found {
return true
}

evt := &scanForLinkIdEvent{
linkId: linkId,
resultC: make(chan bool, 1),
}
self.queueEvent(evt)

t := time.NewTicker(time.Second)
defer t.Stop()

// if we can't tell, defer to link known so we don't clear anything
select {
case result := <-evt.resultC:
return result
case <-self.env.GetCloseNotify():
return true
case <-t.C:
return true
}
}

func (self *linkRegistryImpl) GetLink(linkKey string) (xlink.Xlink, bool) {
self.linkMapLocks.RLock()
defer self.linkMapLocks.RUnlock()
Expand Down
Loading

0 comments on commit c221956

Please sign in to comment.