Skip to content

Commit

Permalink
when remove node, clean up engine cache & remove prometheus metrics (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
nyl1001 authored Oct 10, 2023
1 parent 9d7defb commit 3582022
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 1 deletion.
8 changes: 7 additions & 1 deletion cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
enginefactory "github.com/projecteru2/core/engine/factory"
enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/metrics"
"github.com/projecteru2/core/resource/plugins"
resourcetypes "github.com/projecteru2/core/resource/types"
"github.com/projecteru2/core/types"
Expand Down Expand Up @@ -89,7 +90,12 @@ func (c *Calcium) RemoveNode(ctx context.Context, nodename string) error {
},
// then: remove node resource metadata
func(ctx context.Context) error {
return c.rmgr.RemoveNode(ctx, nodename)
if err = c.rmgr.RemoveNode(ctx, nodename); err != nil {
return err
}
enginefactory.RemoveEngineFromCache(ctx, node.Endpoint, node.Ca, node.Cert, node.Key)
metrics.Client.RemoveInvalidNodes([]string{nodename})
return nil
},
// rollback: do nothing
func(ctx context.Context, failureByCond bool) error {
Expand Down
3 changes: 3 additions & 0 deletions metrics/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ func (m *Metrics) ResourceMiddleware(cluster cluster.Cluster) func(http.Handler)
if err != nil {
logger.Error(ctx, err, "Get all nodes err")
}
activeNodes := make(map[string]*types.Node, 0)
for node := range nodes {
metrics, err := m.rmgr.GetNodeMetrics(ctx, node)
if err != nil {
logger.Error(ctx, err, "Get metrics failed")
continue
}
activeNodes[node.Name] = node
m.SendMetrics(ctx, metrics...)
}
m.DeleteInactiveNodesWithCache(ctx, activeNodes)
h.ServeHTTP(w, r)
})
}
Expand Down
75 changes: 75 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ import (
"strconv"
"sync"

enginefactory "github.com/projecteru2/core/engine/factory"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/resource"
"github.com/projecteru2/core/resource/cobalt"
plugintypes "github.com/projecteru2/core/resource/plugins/types"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
io_prometheus_client "github.com/prometheus/client_model/go"
"golang.org/x/exp/slices"

statsdlib "github.com/CMGS/statsd"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -85,6 +88,78 @@ func (m *Metrics) SendMetrics(ctx context.Context, metrics ...*plugintypes.Metri
}
}

func (m *Metrics) DeleteInactiveNodesWithCache(ctx context.Context, activeNodesMap map[string]*types.Node) {
metricNodeNameMap := m.getNodeNameMapFromMetrics()
// 计算差集
invalidNodes := make([]string, 0)
for nodeName := range metricNodeNameMap {
if node, exists := activeNodesMap[nodeName]; !exists {
invalidNodes = append(invalidNodes, nodeName)
enginefactory.RemoveEngineFromCache(ctx, node.Endpoint, node.Ca, node.Cert, node.Key)
}
}
m.RemoveInvalidNodes(invalidNodes)
}

func (m *Metrics) getNodeNameMapFromMetrics() map[string]bool {
metrics, _ := prometheus.DefaultGatherer.Gather()
nodeNameMap := make(map[string]bool, 0)
for _, metric := range metrics {
for _, mf := range metric.GetMetric() {
if len(mf.Label) == 0 {
continue
}
for _, label := range mf.Label {
if label.GetName() == "nodename" {
nodeNameMap[label.GetValue()] = true
break
}
}
}
}
return nodeNameMap
}

// RemoveInvalidNodes 清除多余的metric标签值
func (m *Metrics) RemoveInvalidNodes(invalidNodes []string) {
if len(invalidNodes) == 0 {
return
}
for _, collector := range m.Collectors {
if collector == nil {
return
}
metrics, _ := prometheus.DefaultGatherer.Gather()
for _, metric := range metrics {
for _, mf := range metric.GetMetric() {
if len(mf.Label) == 0 {
continue
}

if !slices.ContainsFunc(mf.Label, func(label *io_prometheus_client.LabelPair) bool {
return label.GetName() == "nodename" && slices.ContainsFunc(invalidNodes, func(nodename string) bool {
return label.GetValue() == nodename
})
}) {
continue
}
labels := prometheus.Labels{}
for _, label := range mf.Label {
labels[label.GetName()] = label.GetValue()
}
// 删除符合条件的度量标签
switch c := collector.(type) {
case *prometheus.GaugeVec:
c.Delete(labels)
case *prometheus.CounterVec:
c.Delete(labels)
}
}
}
// 添加更多的条件来处理其他类型的Collector
}
}

// Lazy connect
func (m *Metrics) checkConn(ctx context.Context) error {
if m.statsdClient != nil {
Expand Down

0 comments on commit 3582022

Please sign in to comment.