Skip to content

Commit

Permalink
Detect kubelet and container runtime frequent crashes
Browse files Browse the repository at this point in the history
  • Loading branch information
wangzhen127 committed Nov 21, 2018
1 parent 967c22e commit ea26527
Show file tree
Hide file tree
Showing 16 changed files with 232 additions and 65 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ ifeq ($(ENABLE_JOURNALD), 1)
endif

vet:
go list ./... | grep -v "./vendor/*" | xargs go vet
go list ./... | grep -v "./vendor/*" | xargs go vet $(BUILD_TAGS)

fmt:
find . -type f -name "*.go" | grep -v "./vendor/*" | xargs gofmt -s -w -l
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ For example, to test [KernelMonitor](https://github.com/kubernetes/node-problem-

**Note**:
- You can see more rule examples under [test/kernel_log_generator/problems](https://github.com/kubernetes/node-problem-detector/tree/master/test/kernel_log_generator/problems).
- For [KernelMonitor](https://github.com/kubernetes/node-problem-detector/blob/master/config/kernel-monitor.json) message injection, all messages should have ```kernel: ``` prefix (also note there is a space after ```:```).
- For [KernelMonitor](https://github.com/kubernetes/node-problem-detector/blob/master/config/kernel-monitor.json) message injection, all messages should have ```kernel: ``` prefix (also note there is a space after ```:```); or use [generator.sh](https://github.com/kubernetes/node-problem-detector/blob/master/test/kernel_log_generator/generator.sh).
- To inject other logs into journald like systemd logs, use ```echo 'Some systemd message' | systemd-cat -t systemd```.

# Remedy Systems

Expand Down
2 changes: 1 addition & 1 deletion cmd/logcounter/log_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func main() {
fedo.AddFlags(pflag.CommandLine)
pflag.Parse()

counter, err := logcounter.NewKmsgLogCounter(fedo)
counter, err := logcounter.NewJournaldLogCounter(fedo)
if err != nil {
fmt.Print(err)
os.Exit(int(types.Unknown))
Expand Down
12 changes: 9 additions & 3 deletions cmd/logcounter/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,20 @@ func NewLogCounterOptions() *LogCounterOptions {
// LogCounterOptions contains frequent event detector command line and application options.
type LogCounterOptions struct {
// command line options. See flag descriptions for the description
Lookback string
Pattern string
Count int
JournaldSource string
LogPath string
Lookback string
Delay string
Pattern string
Count int
}

// AddFlags adds log counter command line options to pflag.
func (fedo *LogCounterOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&fedo.JournaldSource, "journald-source", "", "The source configuration of journald, e.g., kernel, kubelet, docker, etc")
fs.StringVar(&fedo.LogPath, "log-path", "", "The log path that log watcher looks up")
fs.StringVar(&fedo.Lookback, "lookback", "", "The time log watcher looks up")
fs.StringVar(&fedo.Delay, "delay", "", "The time duration log watcher delays after node boot time")
fs.StringVar(&fedo.Pattern, "pattern", "",
"The regular expression to match the problem in log. The pattern must match to the end of the line.")
fs.IntVar(&fedo.Count, "count", 1,
Expand Down
2 changes: 1 addition & 1 deletion config/docker-monitor.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"plugin": "journald",
"pluginConfig": {
"source": "docker"
"source": "dockerd"
},
"logPath": "/var/log/journal",
"lookback": "5m",
Expand Down
2 changes: 2 additions & 0 deletions config/kernel-monitor-counter.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
"reason": "UnregisterNetDevice",
"path": "/home/kubernetes/bin/log-counter",
"args": [
"--journald-source=kernel",
"--log-path=/var/log/journal",
"--lookback=20m",
"--count=3",
"--pattern=unregister_netdevice: waiting for \\w+ to become free. Usage count = \\d+"
Expand Down
72 changes: 72 additions & 0 deletions config/systemd-monitor-counter.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
{
"plugin": "custom",
"pluginConfig": {
"invoke_interval": "5m",
"timeout": "1m",
"max_output_length": 80,
"concurrency": 1
},
"source": "systemd-monitor",
"conditions": [
{
"type": "FrequentKubeletRestart",
"reason": "NoFrequentKubeletRestart",
"message": "kubelet is functioning properly"
},
{
"type": "FrequentDockerRestart",
"reason": "NoFrequentDockerRestart",
"message": "docker is functioning properly"
},
{
"type": "FrequentContainerdRestart",
"reason": "NoFrequentContainerdRestart",
"message": "containerd is functioning properly"
}
],
"rules": [
{
"type": "permanent",
"condition": "FrequentKubeletRestart",
"reason": "FrequentKubeletRestart",
"path": "/home/kubernetes/bin/log-counter",
"args": [
"--journald-source=systemd",
"--log-path=/var/log/journal",
"--lookback=20m",
"--delay=5m",
"--count=5",
"--pattern=Started Kubernetes kubelet."
],
"timeout": "1m"
},
{
"type": "permanent",
"condition": "FrequentDockerRestart",
"reason": "FrequentDockerRestart",
"path": "/home/kubernetes/bin/log-counter",
"args": [
"--journald-source=systemd",
"--log-path=/var/log/journal",
"--lookback=20m",
"--count=5",
"--pattern=Starting Docker Application Container Engine..."
],
"timeout": "1m"
},
{
"type": "permanent",
"condition": "FrequentContainerdRestart",
"reason": "FrequentContainerdRestart",
"path": "/home/kubernetes/bin/log-counter",
"args": [
"--journald-source=systemd",
"--log-path=/var/log/journal",
"--lookback=20m",
"--count=5",
"--pattern=Starting containerd container runtime..."
],
"timeout": "1m"
}
]
}
2 changes: 1 addition & 1 deletion deployment/node-problem-detector-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ data:
{
"plugin": "journald",
"pluginConfig": {
"source": "docker"
"source": "dockerd"
},
"logPath": "/var/log/journal",
"lookback": "5m",
Expand Down
12 changes: 6 additions & 6 deletions pkg/custompluginmonitor/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,26 +37,26 @@ var (

type pluginGlobalConfig struct {
// InvokeIntervalString is the interval string at which plugins will be invoked.
InvokeIntervalString *string `json:"invoke_interval, omitempty"`
InvokeIntervalString *string `json:"invoke_interval,omitempty"`
// TimeoutString is the global plugin execution timeout string.
TimeoutString *string `json:"timeout, omitempty"`
TimeoutString *string `json:"timeout,omitempty"`
// InvokeInterval is the interval at which plugins will be invoked.
InvokeInterval *time.Duration `json:"-"`
// Timeout is the global plugin execution timeout.
Timeout *time.Duration `json:"-"`
// MaxOutputLength is the maximum plugin output message length.
MaxOutputLength *int `json:"max_output_length, omitempty"`
MaxOutputLength *int `json:"max_output_length,omitempty"`
// Concurrency is the number of concurrent running plugins.
Concurrency *int `json:"concurrency, omitempty"`
Concurrency *int `json:"concurrency,omitempty"`
}

// Custom plugin config is the configuration of custom plugin monitor.
type CustomPluginConfig struct {
// Plugin is the name of plugin which is currently used.
// Currently supported: custom.
Plugin string `json:"plugin, omitempty"`
Plugin string `json:"plugin,omitempty"`
// PluginConfig is global plugin configuration.
PluginGlobalConfig pluginGlobalConfig `json:"pluginConfig, omitempty"`
PluginGlobalConfig pluginGlobalConfig `json:"pluginConfig,omitempty"`
// Source is the source name of the custom plugin monitor
Source string `json:"source"`
// DefaultConditions are the default states of all the conditions custom plugin monitor should handle.
Expand Down
19 changes: 13 additions & 6 deletions pkg/logcounter/log_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ import (
"k8s.io/node-problem-detector/cmd/logcounter/options"
"k8s.io/node-problem-detector/pkg/logcounter/types"
"k8s.io/node-problem-detector/pkg/systemlogmonitor"
"k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/kmsg"
"k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/journald"
watchertypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types"
systemtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
)

const (
bufferSize = 1000
timeout = 1 * time.Second
bufferSize = 1000
timeout = 1 * time.Second
journaldSourceKey = "source"
)

type logCounter struct {
Expand All @@ -42,11 +43,17 @@ type logCounter struct {
clock clock.Clock
}

func NewKmsgLogCounter(options *options.LogCounterOptions) (types.LogCounter, error) {
watcher := kmsg.NewKmsgWatcher(watchertypes.WatcherConfig{Lookback: options.Lookback})
func NewJournaldLogCounter(options *options.LogCounterOptions) (types.LogCounter, error) {
watcher := journald.NewJournaldWatcher(watchertypes.WatcherConfig{
Plugin: "journald",
PluginConfig: map[string]string{journaldSourceKey: options.JournaldSource},
LogPath: options.LogPath,
Lookback: options.Lookback,
Delay: options.Delay,
})
logCh, err := watcher.Watch()
if err != nil {
return nil, fmt.Errorf("error watching kmsg: %v", err)
return nil, fmt.Errorf("error watching journald: %v", err)
}
return &logCounter{
logCh: logCh,
Expand Down
21 changes: 17 additions & 4 deletions pkg/systemlogmonitor/logwatchers/filelog/log_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type filelogWatcher struct {
closer io.Closer
translator *translator
logCh chan *logtypes.Log
uptime time.Time
startTime time.Time
tomb *tomb.Tomb
clock utilclock.Clock
}
Expand All @@ -53,10 +53,23 @@ func NewSyslogWatcherOrDie(cfg types.WatcherConfig) types.LogWatcher {
if err := syscall.Sysinfo(&info); err != nil {
glog.Fatalf("Failed to get system info: %v", err)
}
startTime := time.Now().Add(-time.Duration(info.Uptime) * time.Second)
// Delay startTime if delay duration is set, so that the log watcher can skip
// the logs in delay duration and wait until the node is stable.
if cfg.Delay != "" {
delay, err := time.ParseDuration(cfg.Delay)
if err != nil {
glog.Fatalf("Failed to parse delay duration %q: %v", cfg.Delay, err)
}
// Notice that when delay > uptime, startTime is actually after time.Now(),
// which is fine.
startTime = startTime.Add(delay)
}

return &filelogWatcher{
cfg: cfg,
translator: newTranslatorOrDie(cfg.PluginConfig),
uptime: time.Now().Add(time.Duration(-info.Uptime * int64(time.Second))),
startTime: startTime,
tomb: tomb.NewTomb(),
// A capacity 1000 buffer should be enough
logCh: make(chan *logtypes.Log, 1000),
Expand Down Expand Up @@ -127,8 +140,8 @@ func (s *filelogWatcher) watchLoop() {
glog.Warningf("Unable to parse line: %q, %v", line, err)
continue
}
// If the log is older than look back duration or system boot time, discard it.
if s.clock.Since(log.Timestamp) > lookback || log.Timestamp.Before(s.uptime) {
// If the log is older than look back duration or log start time, discard it.
if s.clock.Since(log.Timestamp) > lookback || log.Timestamp.Before(s.startTime) {
continue
}
s.logCh <- log
Expand Down
17 changes: 9 additions & 8 deletions pkg/systemlogmonitor/logwatchers/filelog/log_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ func TestWatch(t *testing.T) {
now := time.Date(time.Now().Year(), time.January, 2, 3, 4, 5, 0, time.Local)
fakeClock := fakeclock.NewFakeClock(now)
testCases := []struct {
log string
logs []logtypes.Log
uptime time.Time
lookback string
log string
logs []logtypes.Log
startTime time.Time
lookback string
}{
{
// The start point is at the head of the log file.
Expand Down Expand Up @@ -115,8 +115,8 @@ Jan 2 03:04:05 kernel: [2.000000] 3
Jan 2 03:04:04 kernel: [1.000000] 2
Jan 2 03:04:05 kernel: [2.000000] 3
`,
uptime: time.Date(time.Now().Year(), time.January, 2, 3, 4, 4, 0, time.Local),
lookback: "2s",
startTime: time.Date(time.Now().Year(), time.January, 2, 3, 4, 4, 0, time.Local),
lookback: "2s",
logs: []logtypes.Log{
{
Timestamp: now.Add(-time.Second),
Expand Down Expand Up @@ -146,8 +146,8 @@ Jan 2 03:04:05 kernel: [2.000000] 3
LogPath: f.Name(),
Lookback: test.lookback,
})
// Set the uptime.
w.(*filelogWatcher).uptime = test.uptime
// Set the startTime.
w.(*filelogWatcher).startTime = test.startTime
// Set the fake clock.
w.(*filelogWatcher).clock = fakeClock
logCh, err := w.Watch()
Expand Down Expand Up @@ -179,6 +179,7 @@ func TestGoroutineLeak(t *testing.T) {
PluginConfig: getTestPluginConfig(),
LogPath: "/not/exist/path",
Lookback: "10m",
Delay: "",
})
_, err := w.Watch()
assert.Error(t, err)
Expand Down
50 changes: 31 additions & 19 deletions pkg/systemlogmonitor/logwatchers/journald/log_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,36 @@ import (

// journaldWatcher is the log watcher for journald.
type journaldWatcher struct {
journal *sdjournal.Journal
cfg types.WatcherConfig
logCh chan *logtypes.Log
tomb *tomb.Tomb
journal *sdjournal.Journal
cfg types.WatcherConfig
startTime time.Time
logCh chan *logtypes.Log
tomb *tomb.Tomb
}

// NewJournaldWatcher is the create function of journald watcher.
func NewJournaldWatcher(cfg types.WatcherConfig) types.LogWatcher {
var info syscall.Sysinfo_t
if err := syscall.Sysinfo(&info); err != nil {
glog.Fatalf("Failed to get system info: %v", err)
}
startTime := time.Now().Add(-time.Duration(info.Uptime) * time.Second)
// Delay startTime if delay duration is set, so that the log watcher can skip
// the logs in delay duration and wait until the node is stable.
if cfg.Delay != "" {
delay, err := time.ParseDuration(cfg.Delay)
if err != nil {
glog.Fatalf("Failed to parse delay duration %q: %v", cfg.Delay, err)
}
// Notice that when delay > uptime, startTime is actually after time.Now(),
// which is fine.
startTime = startTime.Add(delay)
}

return &journaldWatcher{
cfg: cfg,
tomb: tomb.NewTomb(),
cfg: cfg,
startTime: startTime,
tomb: tomb.NewTomb(),
// A capacity 1000 buffer should be enough
logCh: make(chan *logtypes.Log, 1000),
}
Expand Down Expand Up @@ -81,6 +100,7 @@ const waitLogTimeout = 5 * time.Second

// watchLoop is the main watch loop of journald watcher.
func (j *journaldWatcher) watchLoop() {
startTimestamp := uint64(j.startTime.UnixNano() / 1000)
defer func() {
if err := j.journal.Close(); err != nil {
glog.Errorf("Failed to close journal client: %v", err)
Expand Down Expand Up @@ -112,6 +132,11 @@ func (j *journaldWatcher) watchLoop() {
continue
}

if entry.RealtimeTimestamp < startTimestamp {
glog.V(5).Infof("Throwing away journal entry before (delayed) start time: %v < %v", entry.RealtimeTimestamp, startTimestamp)
continue
}

j.logCh <- translate(entry)
}
}
Expand Down Expand Up @@ -146,19 +171,6 @@ func getJournal(cfg types.WatcherConfig) (*sdjournal.Journal, error) {
if err != nil {
return nil, fmt.Errorf("failed to create journal client from path %q: %v", path, err)
}
// Use system uptime if lookback duration is longer than it.
// Ideally, we should use monotonic timestamp + boot id in journald. However, it doesn't seem
// to work with go-system/journal package.
// TODO(random-liu): Use monotonic timestamp + boot id.
var info syscall.Sysinfo_t
if err := syscall.Sysinfo(&info); err != nil {
return nil, fmt.Errorf("failed to get system info: %v", err)
}
uptime := time.Duration(info.Uptime) * time.Second
if lookback > uptime {
lookback = uptime
glog.Infof("Lookback changed to system uptime: %v", lookback)
}
// Seek journal client based on the lookback duration.
start := time.Now().Add(-lookback)
err = journal.SeekRealtimeUsec(uint64(start.UnixNano() / 1000))
Expand Down
Loading

0 comments on commit ea26527

Please sign in to comment.