Skip to content

Commit

Permalink
refactor metrics, support prometheus
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Sep 6, 2018
1 parent f08e41e commit f94a569
Show file tree
Hide file tree
Showing 9 changed files with 232 additions and 102 deletions.
2 changes: 1 addition & 1 deletion cluster/calcium/build_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func (c *Calcium) BuildImage(ctx context.Context, opts *types.BuildOptions) (cha
if err != nil {
log.Errorf("[BuildImage] Remove image error: %s", err)
}
r, err := node.Engine.BuildCachePrune(ctx)
r, err := node.Engine.BuildCachePrune(ctx, enginetypes.BuildCachePruneOptions{All: true})
if err != nil {
log.Errorf("[BuildImage] Remove build image cache error: %s", err)
}
Expand Down
6 changes: 3 additions & 3 deletions cluster/calcium/create_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
engineslice "github.com/docker/docker/api/types/strslice"
"github.com/docker/go-units"
"github.com/projecteru2/core/cluster"
"github.com/projecteru2/core/metrics"
"github.com/projecteru2/core/scheduler"
"github.com/projecteru2/core/stats"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -67,7 +67,7 @@ func (c *Calcium) createContainerWithMemoryPrior(ctx context.Context, opts *type
wg.Add(len(nodesInfo))
index := 0
for _, nodeInfo := range nodesInfo {
go stats.Client.SendDeployCount(nodeInfo.Deploy)
go metrics.Client.SendDeployCount(nodeInfo.Deploy)
go func(nodeInfo types.NodeInfo, index int) {
defer wg.Done()
for _, m := range c.doCreateContainerWithMemoryPrior(ctx, nodeInfo, opts, index) {
Expand Down Expand Up @@ -139,7 +139,7 @@ func (c *Calcium) createContainerWithCPUPrior(ctx context.Context, opts *types.D
// do deployment
for _, nodeInfo := range nodesInfo {
planCount := len(nodeInfo.CPUPlan)
go stats.Client.SendDeployCount(planCount)
go metrics.Client.SendDeployCount(planCount)
go func(nodename string, cpuMap []types.CPUMap, index int) {
defer wg.Done()
for i, m := range c.doCreateContainerWithCPUPrior(ctx, nodename, cpuMap, opts, index) {
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
log "github.com/sirupsen/logrus"

"github.com/projecteru2/core/cluster"
"github.com/projecteru2/core/metrics"
"github.com/projecteru2/core/scheduler"
"github.com/projecteru2/core/stats"
"github.com/projecteru2/core/types"
)

Expand Down Expand Up @@ -119,6 +119,6 @@ func (c *Calcium) getCPUAndMem(ctx context.Context, podname, nodename string, la
}

result = makeCPUAndMem(nodes)
go stats.Client.SendMemCap(result)
go metrics.Client.SendMemCap(result)
return result, nil
}
8 changes: 6 additions & 2 deletions core.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import (
"github.com/codegangsta/cli"
"github.com/projecteru2/core/auth"
"github.com/projecteru2/core/cluster/calcium"
"github.com/projecteru2/core/metrics"
"github.com/projecteru2/core/rpc"
"github.com/projecteru2/core/rpc/gen"
"github.com/projecteru2/core/stats"
"github.com/projecteru2/core/utils"
"github.com/projecteru2/core/versioninfo"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -56,7 +57,9 @@ func serve() {
log.Fatalf("[main] %v", err)
}

stats.NewStatsdClient(config.Statsd)
if err := metrics.InitMetrics(config.Statsd); err != nil {
log.Fatal("[main] %v", err)
}

cluster, err := calcium.New(config)
if err != nil {
Expand Down Expand Up @@ -84,6 +87,7 @@ func serve() {
pb.RegisterCoreRPCServer(grpcServer, vibranium)
go grpcServer.Serve(s)
if config.Profile != "" {
http.Handle("/metrics", promhttp.Handler())
go http.ListenAndServe(config.Profile, nil)
}

Expand Down
91 changes: 84 additions & 7 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,18 @@ import:
- package: github.com/ugorji/go
subpackages:
- codec
- package: github.com/prometheus/client_golang
version: v0.9.0-pre1
subpackages:
- prometheus
- package: github.com/beorn7/perks
subpackages:
- quantile
- package: github.com/prometheus/client_model
subpackages:
- go
- package: github.com/prometheus/common
subpackages:
- expfmt
- model
- package: github.com/prometheus/procfs
116 changes: 116 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package metrics

import (
"fmt"
"os"

statsdlib "github.com/CMGS/statsd"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
)

const (
memStats = "eru.node.mem"
deployCount = "eru.core.%s.deploy.count"
)

// Metrics define metrics
type Metrics struct {
StatsdAddr string
Hostname string

MemoryCapacity *prometheus.GaugeVec
DeployCount *prometheus.CounterVec
}

func (m *Metrics) gauge(keyPattern string, data map[string]float64) error {
remote, err := statsdlib.New(m.StatsdAddr)
if err != nil {
log.Errorf("[gauge] Connect statsd failed: %v", err)
return err
}
defer remote.Close()
defer remote.Flush()
for k, v := range data {
key := fmt.Sprintf("%s.%s", keyPattern, k)
remote.Gauge(key, v)
}
return nil
}

func (m *Metrics) count(key string, n int, rate float32) error {
remote, err := statsdlib.New(m.StatsdAddr)
if err != nil {
log.Errorf("[count] Connect statsd failed: %v", err)
return err
}
defer remote.Close()
defer remote.Flush()
remote.Count(key, n, rate)
return nil
}

// SendMemCap update memory capacity
func (m *Metrics) SendMemCap(cpumemmap map[string]types.CPUAndMem) {
log.Info("[Metrics] Update memory capacity gauge")
data := map[string]float64{}
for node, cpuandmem := range cpumemmap {
nodename := utils.CleanStatsdMetrics(node)
capacity := float64(cpuandmem.MemCap)
data[nodename] = capacity
if m.MemoryCapacity != nil {
m.MemoryCapacity.WithLabelValues(nodename).Set(capacity)
}
}

if m.StatsdAddr == "" {
return
}

if err := m.gauge(memStats, data); err != nil {
log.Errorf("[SendMemCap] Error occured while sending data to statsd: %v", err)
}
}

// SendDeployCount update deploy counter
func (m *Metrics) SendDeployCount(n int) {
log.Info("[Metrics] Update deploy counter")
if m.DeployCount != nil {
m.DeployCount.WithLabelValues(m.Hostname).Add(float64(n))
}

if m.StatsdAddr == "" {
return
}
key := fmt.Sprintf(deployCount, m.Hostname)
if err := m.count(key, n, 1.0); err != nil {
log.Errorf("[SendDeployCount] Error occured while counting: %v", err)
}
}

// Client is a metrics obj
var Client = Metrics{}

// InitMetrics new a metrics obj
func InitMetrics(statsd string) error {
hostname, err := os.Hostname()
if err != nil {
return err
}
Client = Metrics{StatsdAddr: statsd, Hostname: utils.CleanStatsdMetrics(hostname)}

Client.MemoryCapacity = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "memory_capacity",
Help: "node avaliable memory.",
}, []string{"nodename"})

Client.DeployCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "core_deploy",
Help: "core deploy counter",
}, []string{"hostname"})

prometheus.MustRegister(Client.DeployCount, Client.MemoryCapacity)
return nil
}
Loading

0 comments on commit f94a569

Please sign in to comment.