From 35820222c5415695ebe58e3496f04fb7582c55eb Mon Sep 17 00:00:00 2001 From: nyl1001 <602645105@qq.com> Date: Wed, 11 Oct 2023 02:07:47 +0800 Subject: [PATCH] when remove node, clean up engine cache & remove prometheus metrics (#615) --- cluster/calcium/node.go | 8 ++++- metrics/handler.go | 3 ++ metrics/metrics.go | 75 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 85 insertions(+), 1 deletion(-) diff --git a/cluster/calcium/node.go b/cluster/calcium/node.go index 212ee9fbb..49c73e6fe 100644 --- a/cluster/calcium/node.go +++ b/cluster/calcium/node.go @@ -7,6 +7,7 @@ import ( enginefactory "github.com/projecteru2/core/engine/factory" enginetypes "github.com/projecteru2/core/engine/types" "github.com/projecteru2/core/log" + "github.com/projecteru2/core/metrics" "github.com/projecteru2/core/resource/plugins" resourcetypes "github.com/projecteru2/core/resource/types" "github.com/projecteru2/core/types" @@ -89,7 +90,12 @@ func (c *Calcium) RemoveNode(ctx context.Context, nodename string) error { }, // then: remove node resource metadata func(ctx context.Context) error { - return c.rmgr.RemoveNode(ctx, nodename) + if err = c.rmgr.RemoveNode(ctx, nodename); err != nil { + return err + } + enginefactory.RemoveEngineFromCache(ctx, node.Endpoint, node.Ca, node.Cert, node.Key) + metrics.Client.RemoveInvalidNodes([]string{nodename}) + return nil }, // rollback: do nothing func(ctx context.Context, failureByCond bool) error { diff --git a/metrics/handler.go b/metrics/handler.go index 0b8a18113..7fc556d8b 100644 --- a/metrics/handler.go +++ b/metrics/handler.go @@ -20,14 +20,17 @@ func (m *Metrics) ResourceMiddleware(cluster cluster.Cluster) func(http.Handler) if err != nil { logger.Error(ctx, err, "Get all nodes err") } + activeNodes := make(map[string]*types.Node, 0) for node := range nodes { metrics, err := m.rmgr.GetNodeMetrics(ctx, node) if err != nil { logger.Error(ctx, err, "Get metrics failed") continue } + activeNodes[node.Name] = node m.SendMetrics(ctx, metrics...) } + m.DeleteInactiveNodesWithCache(ctx, activeNodes) h.ServeHTTP(w, r) }) } diff --git a/metrics/metrics.go b/metrics/metrics.go index aa311a140..762cd5117 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -6,12 +6,15 @@ import ( "strconv" "sync" + enginefactory "github.com/projecteru2/core/engine/factory" "github.com/projecteru2/core/log" "github.com/projecteru2/core/resource" "github.com/projecteru2/core/resource/cobalt" plugintypes "github.com/projecteru2/core/resource/plugins/types" "github.com/projecteru2/core/types" "github.com/projecteru2/core/utils" + io_prometheus_client "github.com/prometheus/client_model/go" + "golang.org/x/exp/slices" statsdlib "github.com/CMGS/statsd" "github.com/prometheus/client_golang/prometheus" @@ -85,6 +88,78 @@ func (m *Metrics) SendMetrics(ctx context.Context, metrics ...*plugintypes.Metri } } +func (m *Metrics) DeleteInactiveNodesWithCache(ctx context.Context, activeNodesMap map[string]*types.Node) { + metricNodeNameMap := m.getNodeNameMapFromMetrics() + // 计算差集 + invalidNodes := make([]string, 0) + for nodeName := range metricNodeNameMap { + if node, exists := activeNodesMap[nodeName]; !exists { + invalidNodes = append(invalidNodes, nodeName) + enginefactory.RemoveEngineFromCache(ctx, node.Endpoint, node.Ca, node.Cert, node.Key) + } + } + m.RemoveInvalidNodes(invalidNodes) +} + +func (m *Metrics) getNodeNameMapFromMetrics() map[string]bool { + metrics, _ := prometheus.DefaultGatherer.Gather() + nodeNameMap := make(map[string]bool, 0) + for _, metric := range metrics { + for _, mf := range metric.GetMetric() { + if len(mf.Label) == 0 { + continue + } + for _, label := range mf.Label { + if label.GetName() == "nodename" { + nodeNameMap[label.GetValue()] = true + break + } + } + } + } + return nodeNameMap +} + +// RemoveInvalidNodes 清除多余的metric标签值 +func (m *Metrics) RemoveInvalidNodes(invalidNodes []string) { + if len(invalidNodes) == 0 { + return + } + for _, collector := range m.Collectors { + if collector == nil { + return + } + metrics, _ := prometheus.DefaultGatherer.Gather() + for _, metric := range metrics { + for _, mf := range metric.GetMetric() { + if len(mf.Label) == 0 { + continue + } + + if !slices.ContainsFunc(mf.Label, func(label *io_prometheus_client.LabelPair) bool { + return label.GetName() == "nodename" && slices.ContainsFunc(invalidNodes, func(nodename string) bool { + return label.GetValue() == nodename + }) + }) { + continue + } + labels := prometheus.Labels{} + for _, label := range mf.Label { + labels[label.GetName()] = label.GetValue() + } + // 删除符合条件的度量标签 + switch c := collector.(type) { + case *prometheus.GaugeVec: + c.Delete(labels) + case *prometheus.CounterVec: + c.Delete(labels) + } + } + } + // 添加更多的条件来处理其他类型的Collector + } +} + // Lazy connect func (m *Metrics) checkConn(ctx context.Context) error { if m.statsdClient != nil {