Skip to content

Commit

Permalink
node-problem-detector: report disk queue length in Prometheus format
Browse files Browse the repository at this point in the history
This change mainly includes below items:
1. Added option in NPD for starting Prometheus server for metrics
reporting.
2. Introduced "problemdaemon.ProblemDaemon" interface for abstracting
old + new problem daemons: old daemons report to NPD using types.Status,
new daemons will add support for metrics reporting.
3. Allow flexible registration for problemdaemon.ProblemDaemon via the
init() hook.
4. Starting the change on NPD to allow flexible exporting configuration:
i.e. orthogonally reports to k8s/local/Stackdriver.
  • Loading branch information
Xuewei Zhang committed May 14, 2019
1 parent c2822b2 commit dfe5a67
Show file tree
Hide file tree
Showing 8 changed files with 349 additions and 7 deletions.
15 changes: 13 additions & 2 deletions cmd/node_problem_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (

"k8s.io/node-problem-detector/cmd/options"
"k8s.io/node-problem-detector/pkg/custompluginmonitor"
"k8s.io/node-problem-detector/pkg/metricexporter"
"k8s.io/node-problem-detector/pkg/problemclient"
"k8s.io/node-problem-detector/pkg/problemdaemon"
"k8s.io/node-problem-detector/pkg/problemdetector"
"k8s.io/node-problem-detector/pkg/systemlogmonitor"
"k8s.io/node-problem-detector/pkg/types"
Expand Down Expand Up @@ -68,7 +70,8 @@ func main() {
npdo.SetNodeNameOrDie()

npdo.ValidOrDie()

// TODO(xueweiz): All monitors supported today should be merged into the ProblemDaemons,
// sharing the same registration, configuration and initialization mechanism.
monitors := make(map[string]types.Monitor)
for _, config := range npdo.SystemLogMonitorConfigPaths {
if _, ok := monitors[config]; ok {
Expand All @@ -87,14 +90,22 @@ func main() {
}
monitors[config] = custompluginmonitor.NewCustomPluginMonitorOrDie(config)
}
problemDaemons := problemdaemon.NewProblemDaemons()

c := problemclient.NewClientOrDie(npdo)
p := problemdetector.NewProblemDetector(monitors, c)
p := problemdetector.NewProblemDetector(problemDaemons, monitors, c)

// Start http server.
if npdo.ServerPort > 0 {
startHTTPServer(p, npdo)
}

// Start Prometheus scrape endpoint.
if npdo.PrometheusServerPort > 0 {
prometheus_exporter := metricexporter.NewPrometheusExporterOrDie(npdo)
p.RegisterMetricsExporter(prometheus_exporter)
}

if err := p.Run(); err != nil {
glog.Fatalf("Problem detector failed with error: %v", err)
}
Expand Down
8 changes: 8 additions & 0 deletions cmd/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ type NodeProblemDetectorOptions struct {
ServerPort int
// ServerAddress is the address to bind the node problem detector server.
ServerAddress string
// PrometheusServerPort is the port to bind the Prometheus scrape endpoint. Use 0 to disable.
PrometheusServerPort int
// PrometheusServerAddress is the address to bind the Prometheus scrape endpoint.
PrometheusServerAddress string

// application options

Expand All @@ -72,6 +76,10 @@ func (npdo *NodeProblemDetectorOptions) AddFlags(fs *pflag.FlagSet) {
20256, "The port to bind the node problem detector server. Use 0 to disable.")
fs.StringVar(&npdo.ServerAddress, "address",
"127.0.0.1", "The address to bind the node problem detector server.")
fs.IntVar(&npdo.PrometheusServerPort, "prometheus-port",
20257, "The port to bind the Prometheus scrape endpoint. Use 0 to disable.")
fs.StringVar(&npdo.PrometheusServerAddress, "prometheus-address",
"127.0.0.1", "The address to bind the Prometheus scrape endpoint.")
}

// ValidOrDie validates node problem detector command line options.
Expand Down
46 changes: 46 additions & 0 deletions pkg/metricexporter/prometheus_exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metricexporter

import (
"net"
"net/http"
"strconv"

"contrib.go.opencensus.io/exporter/prometheus"
"github.com/golang/glog"
"go.opencensus.io/stats/view"

"k8s.io/node-problem-detector/cmd/options"
)

// NewPrometheusExporterOrDie creates a Prometheus OpenCensus view exporter, panics if error occurs.
func NewPrometheusExporterOrDie(npdo *options.NodeProblemDetectorOptions) view.Exporter {
addr := net.JoinHostPort(npdo.PrometheusServerAddress, strconv.Itoa(npdo.PrometheusServerPort))
pe, err := prometheus.NewExporter(prometheus.Options{})
if err != nil {
glog.Fatalf("Failed to create Prometheus exporter: %v", err)
}
go func() {
mux := http.NewServeMux()
mux.Handle("/metrics", pe)
if err := http.ListenAndServe(addr, mux); err != nil {
glog.Fatalf("Failed to start Prometheus scrape endpoint: %v", err)
}
}()
return pe
}
144 changes: 144 additions & 0 deletions pkg/problemdaemon/disk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package problemdaemon

import (
"context"
"os/exec"
"strings"
"time"

"github.com/golang/glog"
"github.com/shirou/gopsutil/disk"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/node-problem-detector/pkg/types"
"k8s.io/node-problem-detector/pkg/util"
)

func init() {
register("disk", NewDiskCollector)
}

var (
timeout = 5 * time.Second
period = 60 * time.Second
)

var (
keyDevice, _ = tag.NewKey("device")
)

var (
mIOTime, vIOTime = util.NewInt64Metric(
"disk/io_time",
"The IO time spent on the disk",
"second",
view.LastValue(),
[]tag.Key{keyDevice})
mWeightedIO, vWeightedIO = util.NewInt64Metric(
"disk/weighted_io",
"The weighted IO on the disk",
"second",
view.LastValue(),
[]tag.Key{keyDevice})
mAvgQueueLen, vAvgQueueLen = util.NewFloat64Metric(
"disk/avg_queue_len",
"The average queue length on the disk",
"second",
view.LastValue(),
[]tag.Key{keyDevice})
)

// listRootBlockDevices lists all block devices that's not a slave or holder.
func listRootBlockDevices() []string {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

// "-d" prevents printing slave or holder devices. i.e. /dev/sda1, /dev/sda2...
// "-n" prevents printing the headings.
// "-p NAME" specifies to only print the device name.
cmd := exec.CommandContext(ctx, "lsblk", "-d", "-n", "-o", "NAME")
stdout, err := cmd.Output()
if err != nil {
glog.Errorf("Error calling lsblk")
}
return strings.Split(strings.TrimSpace(string(stdout)), "\n")
}

// listAttachedDevices lists all currently attached block devices.
func listAttachedDevices() []string {
partitions, _ := disk.Partitions(false)
blks := []string{}
for _, partition := range partitions {
blks = append(blks, partition.Device)
}
return blks
}

type diskCollector struct {
}

// NewClientOrDie creates a new disk metrics collector.
func NewDiskCollector() ProblemDaemon {
dc := diskCollector{}
view.Register(vIOTime, vWeightedIO, vAvgQueueLen)
return &dc
}

func (p *diskCollector) Start() error {
ctx, _ := tag.New(context.Background())
c := clock.RealClock{}

go func() {
historyIOTime := make(map[string]uint64)
historyWeightedIO := make(map[string]uint64)

for {
blks := append(listRootBlockDevices(), listAttachedDevices()...)
ioCountersStats, _ := disk.IOCounters(blks...)
for device_name, ioCountersStat := range ioCountersStats {
// Attach label {keyDevice: device_name} to the metrics.
device_ctx, _ := tag.New(ctx, tag.Upsert(keyDevice, device_name))

// Calculate average IO queue length since last measurement.
lastIOTime := historyIOTime[device_name]
lastWeightedIO := historyWeightedIO[device_name]

historyIOTime[device_name] = ioCountersStat.IoTime
historyWeightedIO[device_name] = ioCountersStat.WeightedIO

avg_queue_len := float64(0.0)
if lastIOTime != ioCountersStat.IoTime {
avg_queue_len = float64(ioCountersStat.WeightedIO-lastWeightedIO) / float64(ioCountersStat.IoTime-lastIOTime)
}

stats.Record(device_ctx, mIOTime.M(int64(ioCountersStat.IoTime)))
stats.Record(device_ctx, mWeightedIO.M(int64(ioCountersStat.WeightedIO)))
stats.Record(device_ctx, mAvgQueueLen.M(avg_queue_len))
}
<-c.After(period)
}
}()
return nil
}

func (p *diskCollector) GetProblemChannel() <-chan *types.Status {
return nil
}
49 changes: 49 additions & 0 deletions pkg/problemdaemon/problem_daemon.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package problemdaemon

import (
"k8s.io/node-problem-detector/pkg/types"
)

// ProblemDaemon represents a plugin that report node problems and/or metrics to NPD.
type ProblemDaemon interface {
// Start starts the problem daemon.
Start() error
// GetProblemChannel returns the channel which the problem daemon will use to report node problems.
GetProblemChannel() <-chan *types.Status
}

var (
factories = make(map[string]func() ProblemDaemon)
)

// register registers a problem daemon factory method, which will be used to create the problem daemon.
func register(problemDaemon string, factory func() ProblemDaemon) {
factories[problemDaemon] = factory
}

// NewProblemDaemons creates all known problem daemons.
// TODO(xueweiz): allow runtime configuration.
func NewProblemDaemons() []ProblemDaemon {
problemDaemons := []ProblemDaemon{}
for _, problemDaemonFactory := range factories {
problemDaemon := problemDaemonFactory()
problemDaemons = append(problemDaemons, problemDaemon)
}
return problemDaemons
}
27 changes: 25 additions & 2 deletions pkg/problemdetector/problem_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,39 +22,52 @@ import (

"github.com/golang/glog"

"go.opencensus.io/stats/view"
"k8s.io/apimachinery/pkg/util/clock"

"k8s.io/node-problem-detector/pkg/condition"
"k8s.io/node-problem-detector/pkg/problemclient"
"k8s.io/node-problem-detector/pkg/problemdaemon"
"k8s.io/node-problem-detector/pkg/types"
"k8s.io/node-problem-detector/pkg/util"
)

// ProblemDetector collects statuses from all problem daemons and update the node condition and send node event.
// ProblemDetector collects problems and metrics from all problem daemons and update to various control planes.
type ProblemDetector interface {
Run() error
RegisterHTTPHandlers()
RegisterMetricsExporter(exporter view.Exporter)
// TODO(xueweiz): add RegisterProblemsExporter() to support flexible problem exporting.
}

type problemDetector struct {
client problemclient.Client
conditionManager condition.ConditionManager
monitors map[string]types.Monitor
problemDaemons []problemdaemon.ProblemDaemon
}

// NewProblemDetector creates the problem detector. Currently we just directly passed in the problem daemons, but
// in the future we may want to let the problem daemons register themselves.
func NewProblemDetector(monitors map[string]types.Monitor, client problemclient.Client) ProblemDetector {
func NewProblemDetector(problemDaemons []problemdaemon.ProblemDaemon, monitors map[string]types.Monitor, client problemclient.Client) ProblemDetector {
return &problemDetector{
client: client,
conditionManager: condition.NewConditionManager(client, clock.RealClock{}),
monitors: monitors,
problemDaemons: problemDaemons,
}
}

func (p *problemDetector) RegisterMetricsExporter(exporter view.Exporter) {
view.RegisterExporter(exporter)
}

// Run starts the problem detector.
func (p *problemDetector) Run() error {

p.conditionManager.Start()

// TODO(xueweiz): merge all monitors into problem daemons.
// Start the log monitors one by one.
var chans []<-chan *types.Status
for cfg, m := range p.monitors {
Expand All @@ -66,6 +79,16 @@ func (p *problemDetector) Run() error {
}
chans = append(chans, ch)
}

// Start the problem daemons one by one.
for _, problemDaemon := range p.problemDaemons {
problemDaemon.Start()
ch := problemDaemon.GetProblemChannel()
if ch != nil {
chans = append(chans, ch)
}
}

if len(chans) == 0 {
return fmt.Errorf("no log monitor is successfully setup")
}
Expand Down
Loading

0 comments on commit dfe5a67

Please sign in to comment.