diff --git a/config/config.go b/config/config.go index d628bc2df1..ff48ce3594 100644 --- a/config/config.go +++ b/config/config.go @@ -241,6 +241,7 @@ type ( Log log.GlobalConfig `yaml:"log"` SubLogs map[string]log.GlobalConfig `yaml:"subLogs"` Genesis genesis.Genesis `yaml:"genesis"` + Collector []string `yaml:"collector"` } // Validate is the interface of validating the config diff --git a/go.mod b/go.mod index f249692541..08675295a9 100644 --- a/go.mod +++ b/go.mod @@ -55,6 +55,7 @@ require ( require ( github.com/golang/protobuf v1.5.2 + github.com/mackerelio/go-osstat v0.2.3 github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible github.com/shirou/gopsutil/v3 v3.22.2 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.34.0 diff --git a/go.sum b/go.sum index dc8724f64d..283ff31471 100644 --- a/go.sum +++ b/go.sum @@ -840,6 +840,8 @@ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= +github.com/mackerelio/go-osstat v0.2.3 h1:jAMXD5erlDE39kdX2CU7YwCGRcxIO33u/p8+Fhe5dJw= +github.com/mackerelio/go-osstat v0.2.3/go.mod h1:DQbPOnsss9JHIXgBStc/dnhhir3gbd3YH+Dbdi7ptMA= github.com/magefile/mage v1.9.0 h1:t3AU2wNwehMCW97vuqQLtw6puppWXHO+O2MHo5a50XE= github.com/magefile/mage v1.9.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= diff --git a/pkg/collector/collector.go b/pkg/collector/collector.go new file mode 100644 index 0000000000..e98c4b0729 --- /dev/null +++ b/pkg/collector/collector.go @@ -0,0 +1,87 @@ +package collector + +import ( + "fmt" + "sync" + + "github.com/prometheus/client_golang/prometheus" +) + +// Namespace defines the common namespace to be used by all metrics. +const namespace = "node" + +var ( + factories = make(map[string]func() (Collector, error)) + initiatedCollectorsMtx = sync.Mutex{} + initiatedCollectors = make(map[string]Collector) +) + +func registerCollector(collector string, factory func() (Collector, error)) { + factories[collector] = factory +} + +// NodeCollector implements the prometheus.Collector interface. +type NodeCollector struct { + Collectors map[string]Collector +} + +// NewNodeCollector creates a new NodeCollector. +func NewNodeCollector(filters ...string) (*NodeCollector, error) { + f := make(map[string]bool) + for _, filter := range filters { + _, exist := factories[filter] + if !exist { + return nil, fmt.Errorf("missing collector: %s", filter) + } + f[filter] = true + } + collectors := make(map[string]Collector) + initiatedCollectorsMtx.Lock() + defer initiatedCollectorsMtx.Unlock() + for key := range f { + if collector, ok := initiatedCollectors[key]; ok { + collectors[key] = collector + } else { + collector, err := factories[key]() + if err != nil { + return nil, err + } + collectors[key] = collector + initiatedCollectors[key] = collector + } + } + return &NodeCollector{Collectors: collectors}, nil +} + +// Describe implements the prometheus.Collector interface. +func (n NodeCollector) Describe(ch chan<- *prometheus.Desc) { + +} + +// Collect implements the prometheus.Collector interface. +func (n NodeCollector) Collect(ch chan<- prometheus.Metric) { + wg := sync.WaitGroup{} + wg.Add(len(n.Collectors)) + for name, c := range n.Collectors { + go func(name string, c Collector) { + c.Update(ch) + wg.Done() + }(name, c) + } + wg.Wait() +} + +// Collector is the interface a collector has to implement. +type Collector interface { + // Get new metrics and expose them via prometheus registry. + Update(ch chan<- prometheus.Metric) error +} + +type typedDesc struct { + desc *prometheus.Desc + valueType prometheus.ValueType +} + +func (d *typedDesc) mustNewConstMetric(value float64, labels ...string) prometheus.Metric { + return prometheus.MustNewConstMetric(d.desc, d.valueType, value, labels...) +} diff --git a/pkg/collector/collector_test.go b/pkg/collector/collector_test.go new file mode 100644 index 0000000000..4f1cffefab --- /dev/null +++ b/pkg/collector/collector_test.go @@ -0,0 +1,36 @@ +package collector + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/require" +) + +func TestCollector(t *testing.T) { + require := require.New(t) + c, err := NewNodeCollector("test") + require.ErrorContains(err, "missing collector") + c, err = NewNodeCollector("cpu") + require.NoError(err) + ch := make(chan prometheus.Metric) + go func() { + c.Collect(ch) + close(ch) + }() + + ch2 := make(chan *prometheus.Desc) + go func() { + c.Describe(ch2) + close(ch2) + }() + n := 0 + for m := range ch { + n++ + pb := &dto.Metric{} + err := m.Write(pb) + require.NoError(err) + } + require.Equal(4, n) +} diff --git a/pkg/collector/cpu.go b/pkg/collector/cpu.go new file mode 100644 index 0000000000..63e73ba751 --- /dev/null +++ b/pkg/collector/cpu.go @@ -0,0 +1,36 @@ +package collector + +import ( + "fmt" + + "github.com/mackerelio/go-osstat/cpu" + "github.com/prometheus/client_golang/prometheus" +) + +type cpuCollector struct { + metric *prometheus.Desc +} + +func init() { + registerCollector("cpu", NewCPUCollector) +} + +// NewCPUCollector returns a new Collector exposing kernel/system statistics. +func NewCPUCollector() (Collector, error) { + return &cpuCollector{ + metric: prometheus.NewDesc(namespace+"_cpu_stats", "cpu statistics", []string{"mode"}, nil), + }, nil +} + +func (c *cpuCollector) Update(ch chan<- prometheus.Metric) error { + stats, err := cpu.Get() + if err != nil { + return fmt.Errorf("couldn't get cpu: %w", err) + } + ch <- prometheus.MustNewConstMetric(c.metric, prometheus.GaugeValue, float64(stats.User), "user") + ch <- prometheus.MustNewConstMetric(c.metric, prometheus.GaugeValue, float64(stats.System), "system") + ch <- prometheus.MustNewConstMetric(c.metric, prometheus.GaugeValue, float64(stats.Total), "total") + ch <- prometheus.MustNewConstMetric(c.metric, prometheus.GaugeValue, float64(stats.Idle), "idle") + + return err +} diff --git a/pkg/collector/cpu_test.go b/pkg/collector/cpu_test.go new file mode 100644 index 0000000000..6d9a146a83 --- /dev/null +++ b/pkg/collector/cpu_test.go @@ -0,0 +1,29 @@ +package collector + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/require" +) + +func TestCPU(t *testing.T) { + require := require.New(t) + c, err := NewCPUCollector() + require.NoError(err) + require.NotNil(c) + ch := make(chan prometheus.Metric) + go func() { + c.Update(ch) + close(ch) + }() + n := 0 + for m := range ch { + n++ + pb := &dto.Metric{} + err := m.Write(pb) + require.NoError(err) + } + require.Equal(4, n) +} diff --git a/pkg/collector/loadavg.go b/pkg/collector/loadavg.go new file mode 100644 index 0000000000..d5e356db18 --- /dev/null +++ b/pkg/collector/loadavg.go @@ -0,0 +1,38 @@ +package collector + +import ( + "fmt" + + "github.com/mackerelio/go-osstat/loadavg" + "github.com/prometheus/client_golang/prometheus" +) + +type loadavgCollector struct { + metric []typedDesc +} + +func init() { + registerCollector("loadavg", NewLoadavgCollector) +} + +// NewLoadavgCollector returns a new Collector exposing load average stats. +func NewLoadavgCollector() (Collector, error) { + return &loadavgCollector{ + metric: []typedDesc{ + {prometheus.NewDesc(namespace+"_load1", "1m load average.", nil, nil), prometheus.GaugeValue}, + {prometheus.NewDesc(namespace+"_load5", "5m load average.", nil, nil), prometheus.GaugeValue}, + {prometheus.NewDesc(namespace+"_load15", "15m load average.", nil, nil), prometheus.GaugeValue}, + }, + }, nil +} + +func (c *loadavgCollector) Update(ch chan<- prometheus.Metric) error { + loads, err := loadavg.Get() + if err != nil { + return fmt.Errorf("couldn't get load: %w", err) + } + ch <- c.metric[0].mustNewConstMetric(loads.Loadavg1) + ch <- c.metric[1].mustNewConstMetric(loads.Loadavg5) + ch <- c.metric[2].mustNewConstMetric(loads.Loadavg15) + return err +} diff --git a/pkg/collector/loadavg_test.go b/pkg/collector/loadavg_test.go new file mode 100644 index 0000000000..c79e011def --- /dev/null +++ b/pkg/collector/loadavg_test.go @@ -0,0 +1,29 @@ +package collector + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/require" +) + +func TestLoadavg(t *testing.T) { + require := require.New(t) + c, err := NewLoadavgCollector() + require.NoError(err) + require.NotNil(c) + ch := make(chan prometheus.Metric) + go func() { + c.Update(ch) + close(ch) + }() + n := 0 + for m := range ch { + n++ + pb := &dto.Metric{} + err := m.Write(pb) + require.NoError(err) + } + require.Equal(3, n) +} diff --git a/pkg/collector/memory.go b/pkg/collector/memory.go new file mode 100644 index 0000000000..14ef7b1aa3 --- /dev/null +++ b/pkg/collector/memory.go @@ -0,0 +1,35 @@ +package collector + +import ( + "fmt" + + "github.com/mackerelio/go-osstat/memory" + "github.com/prometheus/client_golang/prometheus" +) + +type meminfoCollector struct { + metric *prometheus.Desc +} + +func init() { + registerCollector("memory", NewMeminfoCollector) +} + +// NewMeminfoCollector returns a new Collector exposing memory stats. +func NewMeminfoCollector() (Collector, error) { + return &meminfoCollector{ + metric: prometheus.NewDesc(namespace+"_memory_stats", "memory stats", []string{"mode"}, nil), + }, nil +} + +// Update to get the platform specific memory stats. +func (c *meminfoCollector) Update(ch chan<- prometheus.Metric) error { + memInfo, err := memory.Get() + if err != nil { + return fmt.Errorf("couldn't get meminfo: %w", err) + } + ch <- prometheus.MustNewConstMetric(c.metric, prometheus.GaugeValue, float64(memInfo.Total), "total") + ch <- prometheus.MustNewConstMetric(c.metric, prometheus.GaugeValue, float64(memInfo.Free), "free") + ch <- prometheus.MustNewConstMetric(c.metric, prometheus.GaugeValue, float64(memInfo.Used), "used") + return nil +} diff --git a/pkg/collector/memory_test.go b/pkg/collector/memory_test.go new file mode 100644 index 0000000000..d2536efc8f --- /dev/null +++ b/pkg/collector/memory_test.go @@ -0,0 +1,29 @@ +package collector + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/require" +) + +func TestMemory(t *testing.T) { + require := require.New(t) + c, err := NewMeminfoCollector() + require.NoError(err) + require.NotNil(c) + ch := make(chan prometheus.Metric) + go func() { + c.Update(ch) + close(ch) + }() + n := 0 + for m := range ch { + n++ + pb := &dto.Metric{} + err := m.Write(pb) + require.NoError(err) + } + require.Equal(3, n) +} diff --git a/server/main.go b/server/main.go index 2bb316173c..6536b4680e 100644 --- a/server/main.go +++ b/server/main.go @@ -22,12 +22,14 @@ import ( "syscall" "github.com/iotexproject/go-pkgs/hash" + "github.com/prometheus/client_golang/prometheus" _ "go.uber.org/automaxprocs" "go.uber.org/zap" "github.com/iotexproject/iotex-core/blockchain/block" "github.com/iotexproject/iotex-core/blockchain/genesis" "github.com/iotexproject/iotex-core/config" + "github.com/iotexproject/iotex-core/pkg/collector" "github.com/iotexproject/iotex-core/pkg/log" "github.com/iotexproject/iotex-core/pkg/probe" "github.com/iotexproject/iotex-core/pkg/recovery" @@ -103,6 +105,10 @@ func main() { glog.Fatalln("Cannot config global logger, use default one: ", zap.Error(err)) } + if err = initCollector(cfg); err != nil { + glog.Fatalln("Cannot init collector: ", zap.Error(err)) + } + if err = recovery.SetCrashlogDir(cfg.System.SystemLogDBPath); err != nil { glog.Fatalln("Failed to set directory of crashlog: ", zap.Error(err)) } @@ -127,6 +133,7 @@ func main() { if err := probeSvr.Start(ctx); err != nil { log.L().Fatal("Failed to start probe server.", zap.Error(err)) } + go func() { <-stop // start stopping @@ -173,3 +180,14 @@ func initLogger(cfg config.Config) error { zap.String("ioAddr", addr.String()), )) } + +func initCollector(cfg config.Config) error { + nc, err := collector.NewNodeCollector(cfg.Collector...) + if err != nil { + return err + } + if err := prometheus.Register(nc); err != nil { + return err + } + return nil +}