Skip to content

Commit

Permalink
refactor of remove & clean invalid nodes in cache & metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Oct 10, 2023
1 parent 3582022 commit 77f9218
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 48 deletions.
4 changes: 2 additions & 2 deletions cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ func (c *Calcium) RemoveNode(ctx context.Context, nodename string) error {
},
// then: remove node resource metadata
func(ctx context.Context) error {
if err = c.rmgr.RemoveNode(ctx, nodename); err != nil {
if err := c.rmgr.RemoveNode(ctx, nodename); err != nil {
return err
}
enginefactory.RemoveEngineFromCache(ctx, node.Endpoint, node.Ca, node.Cert, node.Key)
metrics.Client.RemoveInvalidNodes([]string{nodename})
metrics.Client.RemoveInvalidNodes(nodename)
return nil
},
// rollback: do nothing
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/projecteru2/libyavirt v0.0.0-20230921032447-a617cf0c746c
github.com/prometheus/client_golang v1.15.0
github.com/prometheus/client_model v0.3.0
github.com/rs/zerolog v1.29.1
github.com/sanity-io/litter v1.5.5
github.com/stretchr/testify v1.8.2
Expand Down Expand Up @@ -105,7 +106,6 @@ require (
github.com/opencontainers/runc v1.1.6 // indirect
github.com/pjbgf/sha1cd v0.3.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
Expand Down
32 changes: 30 additions & 2 deletions metrics/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ import (
"net/http"

"github.com/projecteru2/core/cluster"
enginefactory "github.com/projecteru2/core/engine/factory"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/types"
"github.com/prometheus/client_golang/prometheus"
promClient "github.com/prometheus/client_model/go"
"golang.org/x/exp/slices"
)

// ResourceMiddleware to make sure update resource correct
Expand All @@ -20,7 +24,8 @@ func (m *Metrics) ResourceMiddleware(cluster cluster.Cluster) func(http.Handler)
if err != nil {
logger.Error(ctx, err, "Get all nodes err")
}
activeNodes := make(map[string]*types.Node, 0)
existNodenames := getExistNodenames()
activeNodes := map[string]*types.Node{}
for node := range nodes {
metrics, err := m.rmgr.GetNodeMetrics(ctx, node)
if err != nil {
Expand All @@ -30,8 +35,31 @@ func (m *Metrics) ResourceMiddleware(cluster cluster.Cluster) func(http.Handler)
activeNodes[node.Name] = node
m.SendMetrics(ctx, metrics...)
}
m.DeleteInactiveNodesWithCache(ctx, activeNodes)
// refresh nodes
invalidNodenames := []string{}
for _, nodename := range existNodenames {
if node := activeNodes[nodename]; node != nil {
invalidNodenames = append(invalidNodenames, nodename)
enginefactory.RemoveEngineFromCache(ctx, node.Endpoint, node.Ca, node.Cert, node.Key)
}
}
m.RemoveInvalidNodes(invalidNodenames...)
h.ServeHTTP(w, r)
})
}
}

func getExistNodenames() []string {
metrics, _ := prometheus.DefaultGatherer.Gather()
nodenames := []string{}
for _, metric := range metrics {
for _, mf := range metric.GetMetric() {
if i := slices.IndexFunc(mf.Label, func(label *promClient.LabelPair) bool {
return label.GetName() == "nodename"
}); i != -1 {
nodenames = append(nodenames, mf.Label[i].GetValue())
}
}
}
return nodenames
}
46 changes: 3 additions & 43 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ import (
"strconv"
"sync"

enginefactory "github.com/projecteru2/core/engine/factory"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/resource"
"github.com/projecteru2/core/resource/cobalt"
plugintypes "github.com/projecteru2/core/resource/plugins/types"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
io_prometheus_client "github.com/prometheus/client_model/go"
promClient "github.com/prometheus/client_model/go"
"golang.org/x/exp/slices"

statsdlib "github.com/CMGS/statsd"
Expand Down Expand Up @@ -88,55 +87,16 @@ func (m *Metrics) SendMetrics(ctx context.Context, metrics ...*plugintypes.Metri
}
}

func (m *Metrics) DeleteInactiveNodesWithCache(ctx context.Context, activeNodesMap map[string]*types.Node) {
metricNodeNameMap := m.getNodeNameMapFromMetrics()
// 计算差集
invalidNodes := make([]string, 0)
for nodeName := range metricNodeNameMap {
if node, exists := activeNodesMap[nodeName]; !exists {
invalidNodes = append(invalidNodes, nodeName)
enginefactory.RemoveEngineFromCache(ctx, node.Endpoint, node.Ca, node.Cert, node.Key)
}
}
m.RemoveInvalidNodes(invalidNodes)
}

func (m *Metrics) getNodeNameMapFromMetrics() map[string]bool {
metrics, _ := prometheus.DefaultGatherer.Gather()
nodeNameMap := make(map[string]bool, 0)
for _, metric := range metrics {
for _, mf := range metric.GetMetric() {
if len(mf.Label) == 0 {
continue
}
for _, label := range mf.Label {
if label.GetName() == "nodename" {
nodeNameMap[label.GetValue()] = true
break
}
}
}
}
return nodeNameMap
}

// RemoveInvalidNodes 清除多余的metric标签值
func (m *Metrics) RemoveInvalidNodes(invalidNodes []string) {
func (m *Metrics) RemoveInvalidNodes(invalidNodes ...string) {
if len(invalidNodes) == 0 {
return
}
for _, collector := range m.Collectors {
if collector == nil {
return
}
metrics, _ := prometheus.DefaultGatherer.Gather()
for _, metric := range metrics {
for _, mf := range metric.GetMetric() {
if len(mf.Label) == 0 {
continue
}

if !slices.ContainsFunc(mf.Label, func(label *io_prometheus_client.LabelPair) bool {
if !slices.ContainsFunc(mf.Label, func(label *promClient.LabelPair) bool {
return label.GetName() == "nodename" && slices.ContainsFunc(invalidNodes, func(nodename string) bool {
return label.GetValue() == nodename
})
Expand Down

0 comments on commit 77f9218

Please sign in to comment.