diff --git a/Makefile b/Makefile index 43ef6ec5f..05ddec69a 100644 --- a/Makefile +++ b/Makefile @@ -69,7 +69,7 @@ ifeq ($(ENABLE_JOURNALD), 1) endif vet: - go list ./... | grep -v "./vendor/*" | xargs go vet + go list ./... | grep -v "./vendor/*" | xargs go vet $(BUILD_TAGS) fmt: find . -type f -name "*.go" | grep -v "./vendor/*" | xargs gofmt -s -w -l diff --git a/README.md b/README.md index 4aaae25c3..d19985435 100644 --- a/README.md +++ b/README.md @@ -156,7 +156,8 @@ For example, to test [KernelMonitor](https://github.com/kubernetes/node-problem- **Note**: - You can see more rule examples under [test/kernel_log_generator/problems](https://github.com/kubernetes/node-problem-detector/tree/master/test/kernel_log_generator/problems). -- For [KernelMonitor](https://github.com/kubernetes/node-problem-detector/blob/master/config/kernel-monitor.json) message injection, all messages should have ```kernel: ``` prefix (also note there is a space after ```:```). +- For [KernelMonitor](https://github.com/kubernetes/node-problem-detector/blob/master/config/kernel-monitor.json) message injection, all messages should have ```kernel: ``` prefix (also note there is a space after ```:```); or use [generator.sh](https://github.com/kubernetes/node-problem-detector/blob/master/test/kernel_log_generator/generator.sh). +- To inject other logs into journald like systemd logs, use ```echo 'Some systemd message' | systemd-cat -t systemd```. # Remedy Systems diff --git a/cmd/logcounter/log_counter.go b/cmd/logcounter/log_counter.go index 6d8ebdb4f..4260ded80 100644 --- a/cmd/logcounter/log_counter.go +++ b/cmd/logcounter/log_counter.go @@ -32,7 +32,7 @@ func main() { fedo.AddFlags(pflag.CommandLine) pflag.Parse() - counter, err := logcounter.NewKmsgLogCounter(fedo) + counter, err := logcounter.NewJournaldLogCounter(fedo) if err != nil { fmt.Print(err) os.Exit(int(types.Unknown)) diff --git a/cmd/logcounter/options/options.go b/cmd/logcounter/options/options.go index 23aec1a15..c3c31b4cf 100644 --- a/cmd/logcounter/options/options.go +++ b/cmd/logcounter/options/options.go @@ -29,14 +29,20 @@ func NewLogCounterOptions() *LogCounterOptions { // LogCounterOptions contains frequent event detector command line and application options. type LogCounterOptions struct { // command line options. See flag descriptions for the description - Lookback string - Pattern string - Count int + JournaldSource string + LogPath string + Lookback string + Delay string + Pattern string + Count int } // AddFlags adds log counter command line options to pflag. func (fedo *LogCounterOptions) AddFlags(fs *pflag.FlagSet) { + fs.StringVar(&fedo.JournaldSource, "journald-source", "", "The source configuration of journald, e.g., kernel, kubelet, docker, etc") + fs.StringVar(&fedo.LogPath, "log-path", "", "The log path that log watcher looks up") fs.StringVar(&fedo.Lookback, "lookback", "", "The time log watcher looks up") + fs.StringVar(&fedo.Delay, "delay", "", "The time duration log watcher delays after node boot time") fs.StringVar(&fedo.Pattern, "pattern", "", "The regular expression to match the problem in log. The pattern must match to the end of the line.") fs.IntVar(&fedo.Count, "count", 1, diff --git a/config/docker-monitor.json b/config/docker-monitor.json index 571325283..d6294d4c4 100644 --- a/config/docker-monitor.json +++ b/config/docker-monitor.json @@ -1,7 +1,7 @@ { "plugin": "journald", "pluginConfig": { - "source": "docker" + "source": "dockerd" }, "logPath": "/var/log/journal", "lookback": "5m", diff --git a/config/kernel-monitor-counter.json b/config/kernel-monitor-counter.json index e217e927a..f6ce25b52 100644 --- a/config/kernel-monitor-counter.json +++ b/config/kernel-monitor-counter.json @@ -21,6 +21,8 @@ "reason": "UnregisterNetDevice", "path": "/home/kubernetes/bin/log-counter", "args": [ + "--journald-source=kernel", + "--log-path=/var/log/journal", "--lookback=20m", "--count=3", "--pattern=unregister_netdevice: waiting for \\w+ to become free. Usage count = \\d+" diff --git a/config/systemd-monitor-counter.json b/config/systemd-monitor-counter.json new file mode 100644 index 000000000..3331e90aa --- /dev/null +++ b/config/systemd-monitor-counter.json @@ -0,0 +1,72 @@ +{ + "plugin": "custom", + "pluginConfig": { + "invoke_interval": "5m", + "timeout": "1m", + "max_output_length": 80, + "concurrency": 1 + }, + "source": "systemd-monitor", + "conditions": [ + { + "type": "FrequentKubeletRestart", + "reason": "NoFrequentKubeletRestart", + "message": "kubelet is functioning properly" + }, + { + "type": "FrequentDockerRestart", + "reason": "NoFrequentDockerRestart", + "message": "docker is functioning properly" + }, + { + "type": "FrequentContainerdRestart", + "reason": "NoFrequentContainerdRestart", + "message": "containerd is functioning properly" + } + ], + "rules": [ + { + "type": "permanent", + "condition": "FrequentKubeletRestart", + "reason": "FrequentKubeletRestart", + "path": "/home/kubernetes/bin/log-counter", + "args": [ + "--journald-source=systemd", + "--log-path=/var/log/journal", + "--lookback=20m", + "--delay=5m", + "--count=5", + "--pattern=Started Kubernetes kubelet." + ], + "timeout": "1m" + }, + { + "type": "permanent", + "condition": "FrequentDockerRestart", + "reason": "FrequentDockerRestart", + "path": "/home/kubernetes/bin/log-counter", + "args": [ + "--journald-source=systemd", + "--log-path=/var/log/journal", + "--lookback=20m", + "--count=5", + "--pattern=Starting Docker Application Container Engine..." + ], + "timeout": "1m" + }, + { + "type": "permanent", + "condition": "FrequentContainerdRestart", + "reason": "FrequentContainerdRestart", + "path": "/home/kubernetes/bin/log-counter", + "args": [ + "--journald-source=systemd", + "--log-path=/var/log/journal", + "--lookback=20m", + "--count=5", + "--pattern=Starting containerd container runtime..." + ], + "timeout": "1m" + } + ] +} diff --git a/deployment/node-problem-detector-config.yaml b/deployment/node-problem-detector-config.yaml index 69cdd2772..e2ed681c9 100644 --- a/deployment/node-problem-detector-config.yaml +++ b/deployment/node-problem-detector-config.yaml @@ -69,7 +69,7 @@ data: { "plugin": "journald", "pluginConfig": { - "source": "docker" + "source": "dockerd" }, "logPath": "/var/log/journal", "lookback": "5m", diff --git a/pkg/custompluginmonitor/types/config.go b/pkg/custompluginmonitor/types/config.go index fd82b4980..1b3d45470 100644 --- a/pkg/custompluginmonitor/types/config.go +++ b/pkg/custompluginmonitor/types/config.go @@ -37,26 +37,26 @@ var ( type pluginGlobalConfig struct { // InvokeIntervalString is the interval string at which plugins will be invoked. - InvokeIntervalString *string `json:"invoke_interval, omitempty"` + InvokeIntervalString *string `json:"invoke_interval,omitempty"` // TimeoutString is the global plugin execution timeout string. - TimeoutString *string `json:"timeout, omitempty"` + TimeoutString *string `json:"timeout,omitempty"` // InvokeInterval is the interval at which plugins will be invoked. InvokeInterval *time.Duration `json:"-"` // Timeout is the global plugin execution timeout. Timeout *time.Duration `json:"-"` // MaxOutputLength is the maximum plugin output message length. - MaxOutputLength *int `json:"max_output_length, omitempty"` + MaxOutputLength *int `json:"max_output_length,omitempty"` // Concurrency is the number of concurrent running plugins. - Concurrency *int `json:"concurrency, omitempty"` + Concurrency *int `json:"concurrency,omitempty"` } // Custom plugin config is the configuration of custom plugin monitor. type CustomPluginConfig struct { // Plugin is the name of plugin which is currently used. // Currently supported: custom. - Plugin string `json:"plugin, omitempty"` + Plugin string `json:"plugin,omitempty"` // PluginConfig is global plugin configuration. - PluginGlobalConfig pluginGlobalConfig `json:"pluginConfig, omitempty"` + PluginGlobalConfig pluginGlobalConfig `json:"pluginConfig,omitempty"` // Source is the source name of the custom plugin monitor Source string `json:"source"` // DefaultConditions are the default states of all the conditions custom plugin monitor should handle. diff --git a/pkg/logcounter/log_counter.go b/pkg/logcounter/log_counter.go index 6d4001fab..94a84d547 100644 --- a/pkg/logcounter/log_counter.go +++ b/pkg/logcounter/log_counter.go @@ -25,14 +25,15 @@ import ( "k8s.io/node-problem-detector/cmd/logcounter/options" "k8s.io/node-problem-detector/pkg/logcounter/types" "k8s.io/node-problem-detector/pkg/systemlogmonitor" - "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/kmsg" + "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/journald" watchertypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types" systemtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types" ) const ( - bufferSize = 1000 - timeout = 1 * time.Second + bufferSize = 1000 + timeout = 1 * time.Second + journaldSourceKey = "source" ) type logCounter struct { @@ -42,11 +43,17 @@ type logCounter struct { clock clock.Clock } -func NewKmsgLogCounter(options *options.LogCounterOptions) (types.LogCounter, error) { - watcher := kmsg.NewKmsgWatcher(watchertypes.WatcherConfig{Lookback: options.Lookback}) +func NewJournaldLogCounter(options *options.LogCounterOptions) (types.LogCounter, error) { + watcher := journald.NewJournaldWatcher(watchertypes.WatcherConfig{ + Plugin: "journald", + PluginConfig: map[string]string{journaldSourceKey: options.JournaldSource}, + LogPath: options.LogPath, + Lookback: options.Lookback, + Delay: options.Delay, + }) logCh, err := watcher.Watch() if err != nil { - return nil, fmt.Errorf("error watching kmsg: %v", err) + return nil, fmt.Errorf("error watching journald: %v", err) } return &logCounter{ logCh: logCh, diff --git a/pkg/systemlogmonitor/logwatchers/filelog/log_watcher.go b/pkg/systemlogmonitor/logwatchers/filelog/log_watcher.go index 9366a1cd4..85d2a2e3b 100644 --- a/pkg/systemlogmonitor/logwatchers/filelog/log_watcher.go +++ b/pkg/systemlogmonitor/logwatchers/filelog/log_watcher.go @@ -41,7 +41,7 @@ type filelogWatcher struct { closer io.Closer translator *translator logCh chan *logtypes.Log - uptime time.Time + startTime time.Time tomb *tomb.Tomb clock utilclock.Clock } @@ -53,10 +53,23 @@ func NewSyslogWatcherOrDie(cfg types.WatcherConfig) types.LogWatcher { if err := syscall.Sysinfo(&info); err != nil { glog.Fatalf("Failed to get system info: %v", err) } + startTime := time.Now().Add(-time.Duration(info.Uptime) * time.Second) + // Delay startTime if delay duration is set, so that the log watcher can skip + // the logs in delay duration and wait until the node is stable. + if cfg.Delay != "" { + delay, err := time.ParseDuration(cfg.Delay) + if err != nil { + glog.Fatalf("Failed to parse delay duration %q: %v", cfg.Delay, err) + } + // Notice that when delay > uptime, startTime is actually after time.Now(), + // which is fine. + startTime = startTime.Add(delay) + } + return &filelogWatcher{ cfg: cfg, translator: newTranslatorOrDie(cfg.PluginConfig), - uptime: time.Now().Add(time.Duration(-info.Uptime * int64(time.Second))), + startTime: startTime, tomb: tomb.NewTomb(), // A capacity 1000 buffer should be enough logCh: make(chan *logtypes.Log, 1000), @@ -127,8 +140,8 @@ func (s *filelogWatcher) watchLoop() { glog.Warningf("Unable to parse line: %q, %v", line, err) continue } - // If the log is older than look back duration or system boot time, discard it. - if s.clock.Since(log.Timestamp) > lookback || log.Timestamp.Before(s.uptime) { + // If the log is older than look back duration or log start time, discard it. + if s.clock.Since(log.Timestamp) > lookback || log.Timestamp.Before(s.startTime) { continue } s.logCh <- log diff --git a/pkg/systemlogmonitor/logwatchers/filelog/log_watcher_test.go b/pkg/systemlogmonitor/logwatchers/filelog/log_watcher_test.go index 06a5b5b39..b4179e1ed 100644 --- a/pkg/systemlogmonitor/logwatchers/filelog/log_watcher_test.go +++ b/pkg/systemlogmonitor/logwatchers/filelog/log_watcher_test.go @@ -45,10 +45,10 @@ func TestWatch(t *testing.T) { now := time.Date(time.Now().Year(), time.January, 2, 3, 4, 5, 0, time.Local) fakeClock := fakeclock.NewFakeClock(now) testCases := []struct { - log string - logs []logtypes.Log - uptime time.Time - lookback string + log string + logs []logtypes.Log + startTime time.Time + lookback string }{ { // The start point is at the head of the log file. @@ -115,8 +115,8 @@ Jan 2 03:04:05 kernel: [2.000000] 3 Jan 2 03:04:04 kernel: [1.000000] 2 Jan 2 03:04:05 kernel: [2.000000] 3 `, - uptime: time.Date(time.Now().Year(), time.January, 2, 3, 4, 4, 0, time.Local), - lookback: "2s", + startTime: time.Date(time.Now().Year(), time.January, 2, 3, 4, 4, 0, time.Local), + lookback: "2s", logs: []logtypes.Log{ { Timestamp: now.Add(-time.Second), @@ -146,8 +146,8 @@ Jan 2 03:04:05 kernel: [2.000000] 3 LogPath: f.Name(), Lookback: test.lookback, }) - // Set the uptime. - w.(*filelogWatcher).uptime = test.uptime + // Set the startTime. + w.(*filelogWatcher).startTime = test.startTime // Set the fake clock. w.(*filelogWatcher).clock = fakeClock logCh, err := w.Watch() @@ -179,6 +179,7 @@ func TestGoroutineLeak(t *testing.T) { PluginConfig: getTestPluginConfig(), LogPath: "/not/exist/path", Lookback: "10m", + Delay: "", }) _, err := w.Watch() assert.Error(t, err) diff --git a/pkg/systemlogmonitor/logwatchers/journald/log_watcher.go b/pkg/systemlogmonitor/logwatchers/journald/log_watcher.go index 61c9aa26a..572da6a15 100644 --- a/pkg/systemlogmonitor/logwatchers/journald/log_watcher.go +++ b/pkg/systemlogmonitor/logwatchers/journald/log_watcher.go @@ -40,17 +40,36 @@ import ( // journaldWatcher is the log watcher for journald. type journaldWatcher struct { - journal *sdjournal.Journal - cfg types.WatcherConfig - logCh chan *logtypes.Log - tomb *tomb.Tomb + journal *sdjournal.Journal + cfg types.WatcherConfig + startTime time.Time + logCh chan *logtypes.Log + tomb *tomb.Tomb } // NewJournaldWatcher is the create function of journald watcher. func NewJournaldWatcher(cfg types.WatcherConfig) types.LogWatcher { + var info syscall.Sysinfo_t + if err := syscall.Sysinfo(&info); err != nil { + glog.Fatalf("Failed to get system info: %v", err) + } + startTime := time.Now().Add(-time.Duration(info.Uptime) * time.Second) + // Delay startTime if delay duration is set, so that the log watcher can skip + // the logs in delay duration and wait until the node is stable. + if cfg.Delay != "" { + delay, err := time.ParseDuration(cfg.Delay) + if err != nil { + glog.Fatalf("Failed to parse delay duration %q: %v", cfg.Delay, err) + } + // Notice that when delay > uptime, startTime is actually after time.Now(), + // which is fine. + startTime = startTime.Add(delay) + } + return &journaldWatcher{ - cfg: cfg, - tomb: tomb.NewTomb(), + cfg: cfg, + startTime: startTime, + tomb: tomb.NewTomb(), // A capacity 1000 buffer should be enough logCh: make(chan *logtypes.Log, 1000), } @@ -81,6 +100,7 @@ const waitLogTimeout = 5 * time.Second // watchLoop is the main watch loop of journald watcher. func (j *journaldWatcher) watchLoop() { + startTimestamp := uint64(j.startTime.UnixNano() / 1000) defer func() { if err := j.journal.Close(); err != nil { glog.Errorf("Failed to close journal client: %v", err) @@ -112,6 +132,11 @@ func (j *journaldWatcher) watchLoop() { continue } + if entry.RealtimeTimestamp < startTimestamp { + glog.V(5).Infof("Throwing away journal entry before (delayed) start time: %v < %v", entry.RealtimeTimestamp, startTimestamp) + continue + } + j.logCh <- translate(entry) } } @@ -146,19 +171,6 @@ func getJournal(cfg types.WatcherConfig) (*sdjournal.Journal, error) { if err != nil { return nil, fmt.Errorf("failed to create journal client from path %q: %v", path, err) } - // Use system uptime if lookback duration is longer than it. - // Ideally, we should use monotonic timestamp + boot id in journald. However, it doesn't seem - // to work with go-system/journal package. - // TODO(random-liu): Use monotonic timestamp + boot id. - var info syscall.Sysinfo_t - if err := syscall.Sysinfo(&info); err != nil { - return nil, fmt.Errorf("failed to get system info: %v", err) - } - uptime := time.Duration(info.Uptime) * time.Second - if lookback > uptime { - lookback = uptime - glog.Infof("Lookback changed to system uptime: %v", lookback) - } // Seek journal client based on the lookback duration. start := time.Now().Add(-lookback) err = journal.SeekRealtimeUsec(uint64(start.UnixNano() / 1000)) diff --git a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher.go b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher.go index 38e0f226c..e6f256ad6 100644 --- a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher.go +++ b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher.go @@ -19,6 +19,7 @@ package kmsg import ( "fmt" "strings" + "syscall" "time" utilclock "code.cloudfoundry.org/clock" @@ -31,9 +32,10 @@ import ( ) type kernelLogWatcher struct { - cfg types.WatcherConfig - logCh chan *logtypes.Log - tomb *tomb.Tomb + cfg types.WatcherConfig + startTime time.Time + logCh chan *logtypes.Log + tomb *tomb.Tomb kmsgParser kmsgparser.Parser clock utilclock.Clock @@ -41,9 +43,27 @@ type kernelLogWatcher struct { // NewKmsgWatcher creates a watcher which will read messages from /dev/kmsg func NewKmsgWatcher(cfg types.WatcherConfig) types.LogWatcher { + var info syscall.Sysinfo_t + if err := syscall.Sysinfo(&info); err != nil { + glog.Fatalf("Failed to get system info: %v", err) + } + startTime := time.Now().Add(-time.Duration(info.Uptime) * time.Second) + // Delay startTime if delay duration is set, so that the log watcher can skip + // the logs in delay duration and wait until the node is stable. + if cfg.Delay != "" { + delay, err := time.ParseDuration(cfg.Delay) + if err != nil { + glog.Fatalf("Failed to parse delay duration %q: %v", cfg.Delay, err) + } + // Notice that when delay > uptime, startTime is actually after time.Now(), + // which is fine. + startTime = startTime.Add(delay) + } + return &kernelLogWatcher{ - cfg: cfg, - tomb: tomb.NewTomb(), + cfg: cfg, + startTime: startTime, + tomb: tomb.NewTomb(), // Arbitrary capacity logCh: make(chan *logtypes.Log, 100), clock: utilclock.NewClock(), @@ -99,9 +119,15 @@ func (k *kernelLogWatcher) watchLoop(lookback time.Duration) { continue } + // Discard messages before (delayed) start time. + if msg.Timestamp.Before(k.startTime) { + glog.V(5).Infof("Throwing away msg %v before (delayed) start time: %v < %v", msg.Message, msg.Timestamp, k.startTime) + continue + } + // Discard too old messages if k.clock.Since(msg.Timestamp) > lookback { - glog.V(5).Infof("Throwing away msg %v for being too old: %v > %v", msg.Message, msg.Timestamp.String(), lookback.String()) + glog.V(5).Infof("Throwing away msg %v for being too old: %v > %v", msg.Message, msg.Timestamp, lookback) continue } diff --git a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_test.go b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_test.go index 80665a860..922d876ed 100644 --- a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_test.go +++ b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_test.go @@ -50,9 +50,10 @@ func TestWatch(t *testing.T) { now := time.Date(time.Now().Year(), time.January, 2, 3, 4, 5, 0, time.Local) fakeClock := fakeclock.NewFakeClock(now) testCases := []struct { - log *mockKmsgParser - logs []logtypes.Log - lookback string + log *mockKmsgParser + logs []logtypes.Log + lookback string + startTime time.Time }{ { // The start point is at the head of the log file. @@ -115,9 +116,31 @@ func TestWatch(t *testing.T) { }, }, }, + { + // The start point is at the end of the log file, but we look back up to start time. + log: &mockKmsgParser{kmsgs: []kmsgparser.Message{ + {Message: "1", Timestamp: now.Add(-3 * time.Second)}, + {Message: "2", Timestamp: now.Add(-2 * time.Second)}, + {Message: "3", Timestamp: now.Add(-1 * time.Second)}, + {Message: "4", Timestamp: now.Add(0 * time.Second)}, + }}, + logs: []logtypes.Log{ + { + Timestamp: now.Add(-time.Second), + Message: "3", + }, + { + Timestamp: now, + Message: "4", + }, + }, + lookback: "3s", + startTime: now.Add(-1 * time.Second), + }, } for _, test := range testCases { w := NewKmsgWatcher(types.WatcherConfig{Lookback: test.lookback}) + w.(*kernelLogWatcher).startTime = test.startTime w.(*kernelLogWatcher).clock = fakeClock w.(*kernelLogWatcher).kmsgParser = test.log logCh, err := w.Watch() @@ -130,7 +153,7 @@ func TestWatch(t *testing.T) { assert.Equal(t, &expected, got) } // The log channel should have already been drained - // There could stil be future messages sent into the channel, but the chance is really slim. + // There could still be future messages sent into the channel, but the chance is really slim. timeout := time.After(100 * time.Millisecond) select { case log := <-logCh: diff --git a/pkg/systemlogmonitor/logwatchers/types/log_watcher.go b/pkg/systemlogmonitor/logwatchers/types/log_watcher.go index afb2c510e..48b45c40b 100644 --- a/pkg/systemlogmonitor/logwatchers/types/log_watcher.go +++ b/pkg/systemlogmonitor/logwatchers/types/log_watcher.go @@ -32,14 +32,18 @@ type LogWatcher interface { type WatcherConfig struct { // Plugin is the name of plugin which is currently used. // Currently supported: filelog, journald, kmsg. - Plugin string `json:"plugin, omitempty"` + Plugin string `json:"plugin,omitempty"` // PluginConfig is a key/value configuration of a plugin. Valid configurations // are defined in different log watcher plugin. - PluginConfig map[string]string `json:"pluginConfig, omitempty"` + PluginConfig map[string]string `json:"pluginConfig,omitempty"` // LogPath is the path to the log - LogPath string `json:"logPath, omitempty"` + LogPath string `json:"logPath,omitempty"` // Lookback is the time log watcher looks up - Lookback string `json:"lookback, omitempty"` + Lookback string `json:"lookback,omitempty"` + // Delay is the time duration log watcher delays after node boot time. This is + // useful when the log watcher needs to wait for some time until the node + // becomes stable. + Delay string `json:"delay,omitempty"` } // WatcherCreateFunc is the create function of a log watcher.