From a7b690206d05b042af9dfeb50a4cbd9f194edad3 Mon Sep 17 00:00:00 2001 From: Zhen Wang Date: Tue, 20 Nov 2018 10:41:48 -0800 Subject: [PATCH] Detect kubelet and container runtime frequent crashes --- Makefile | 2 +- README.md | 3 +- cmd/logcounter/log_counter.go | 2 +- cmd/logcounter/options/options.go | 13 +- config/docker-monitor.json | 2 +- config/kernel-monitor-counter.json | 2 + config/systemd-monitor-counter.json | 72 +++++++++ deployment/node-problem-detector-config.yaml | 2 +- pkg/custompluginmonitor/types/config.go | 12 +- pkg/logcounter/log_counter.go | 19 ++- .../logwatchers/filelog/log_watcher.go | 27 ++-- .../logwatchers/filelog/log_watcher_test.go | 27 ++-- .../logwatchers/journald/log_watcher.go | 63 ++++---- .../logwatchers/kmsg/log_watcher.go | 37 +++-- .../logwatchers/kmsg/log_watcher_test.go | 42 +++++- .../logwatchers/types/log_watcher.go | 12 +- pkg/util/helpers.go | 39 +++++ pkg/util/helpers_test.go | 137 ++++++++++++++++++ 18 files changed, 417 insertions(+), 96 deletions(-) create mode 100644 config/systemd-monitor-counter.json create mode 100644 pkg/util/helpers_test.go diff --git a/Makefile b/Makefile index 43ef6ec5f..05ddec69a 100644 --- a/Makefile +++ b/Makefile @@ -69,7 +69,7 @@ ifeq ($(ENABLE_JOURNALD), 1) endif vet: - go list ./... | grep -v "./vendor/*" | xargs go vet + go list ./... | grep -v "./vendor/*" | xargs go vet $(BUILD_TAGS) fmt: find . -type f -name "*.go" | grep -v "./vendor/*" | xargs gofmt -s -w -l diff --git a/README.md b/README.md index 4aaae25c3..d19985435 100644 --- a/README.md +++ b/README.md @@ -156,7 +156,8 @@ For example, to test [KernelMonitor](https://github.com/kubernetes/node-problem- **Note**: - You can see more rule examples under [test/kernel_log_generator/problems](https://github.com/kubernetes/node-problem-detector/tree/master/test/kernel_log_generator/problems). -- For [KernelMonitor](https://github.com/kubernetes/node-problem-detector/blob/master/config/kernel-monitor.json) message injection, all messages should have ```kernel: ``` prefix (also note there is a space after ```:```). +- For [KernelMonitor](https://github.com/kubernetes/node-problem-detector/blob/master/config/kernel-monitor.json) message injection, all messages should have ```kernel: ``` prefix (also note there is a space after ```:```); or use [generator.sh](https://github.com/kubernetes/node-problem-detector/blob/master/test/kernel_log_generator/generator.sh). +- To inject other logs into journald like systemd logs, use ```echo 'Some systemd message' | systemd-cat -t systemd```. # Remedy Systems diff --git a/cmd/logcounter/log_counter.go b/cmd/logcounter/log_counter.go index 6d8ebdb4f..4260ded80 100644 --- a/cmd/logcounter/log_counter.go +++ b/cmd/logcounter/log_counter.go @@ -32,7 +32,7 @@ func main() { fedo.AddFlags(pflag.CommandLine) pflag.Parse() - counter, err := logcounter.NewKmsgLogCounter(fedo) + counter, err := logcounter.NewJournaldLogCounter(fedo) if err != nil { fmt.Print(err) os.Exit(int(types.Unknown)) diff --git a/cmd/logcounter/options/options.go b/cmd/logcounter/options/options.go index 23aec1a15..a619a08e9 100644 --- a/cmd/logcounter/options/options.go +++ b/cmd/logcounter/options/options.go @@ -29,14 +29,21 @@ func NewLogCounterOptions() *LogCounterOptions { // LogCounterOptions contains frequent event detector command line and application options. type LogCounterOptions struct { // command line options. See flag descriptions for the description - Lookback string - Pattern string - Count int + JournaldSource string + LogPath string + Lookback string + Delay string + Pattern string + Count int } // AddFlags adds log counter command line options to pflag. func (fedo *LogCounterOptions) AddFlags(fs *pflag.FlagSet) { + fs.StringVar(&fedo.JournaldSource, "journald-source", "", "The source configuration of journald, e.g., kernel, kubelet, dockerd, etc") + fs.StringVar(&fedo.LogPath, "log-path", "", "The log path that log watcher looks up") fs.StringVar(&fedo.Lookback, "lookback", "", "The time log watcher looks up") + fs.StringVar(&fedo.Delay, "delay", "", + "The time duration log watcher delays after node boot time. This is useful when log watcher needs to wait for some time until the node is stable.") fs.StringVar(&fedo.Pattern, "pattern", "", "The regular expression to match the problem in log. The pattern must match to the end of the line.") fs.IntVar(&fedo.Count, "count", 1, diff --git a/config/docker-monitor.json b/config/docker-monitor.json index 571325283..d6294d4c4 100644 --- a/config/docker-monitor.json +++ b/config/docker-monitor.json @@ -1,7 +1,7 @@ { "plugin": "journald", "pluginConfig": { - "source": "docker" + "source": "dockerd" }, "logPath": "/var/log/journal", "lookback": "5m", diff --git a/config/kernel-monitor-counter.json b/config/kernel-monitor-counter.json index e217e927a..f6ce25b52 100644 --- a/config/kernel-monitor-counter.json +++ b/config/kernel-monitor-counter.json @@ -21,6 +21,8 @@ "reason": "UnregisterNetDevice", "path": "/home/kubernetes/bin/log-counter", "args": [ + "--journald-source=kernel", + "--log-path=/var/log/journal", "--lookback=20m", "--count=3", "--pattern=unregister_netdevice: waiting for \\w+ to become free. Usage count = \\d+" diff --git a/config/systemd-monitor-counter.json b/config/systemd-monitor-counter.json new file mode 100644 index 000000000..3331e90aa --- /dev/null +++ b/config/systemd-monitor-counter.json @@ -0,0 +1,72 @@ +{ + "plugin": "custom", + "pluginConfig": { + "invoke_interval": "5m", + "timeout": "1m", + "max_output_length": 80, + "concurrency": 1 + }, + "source": "systemd-monitor", + "conditions": [ + { + "type": "FrequentKubeletRestart", + "reason": "NoFrequentKubeletRestart", + "message": "kubelet is functioning properly" + }, + { + "type": "FrequentDockerRestart", + "reason": "NoFrequentDockerRestart", + "message": "docker is functioning properly" + }, + { + "type": "FrequentContainerdRestart", + "reason": "NoFrequentContainerdRestart", + "message": "containerd is functioning properly" + } + ], + "rules": [ + { + "type": "permanent", + "condition": "FrequentKubeletRestart", + "reason": "FrequentKubeletRestart", + "path": "/home/kubernetes/bin/log-counter", + "args": [ + "--journald-source=systemd", + "--log-path=/var/log/journal", + "--lookback=20m", + "--delay=5m", + "--count=5", + "--pattern=Started Kubernetes kubelet." + ], + "timeout": "1m" + }, + { + "type": "permanent", + "condition": "FrequentDockerRestart", + "reason": "FrequentDockerRestart", + "path": "/home/kubernetes/bin/log-counter", + "args": [ + "--journald-source=systemd", + "--log-path=/var/log/journal", + "--lookback=20m", + "--count=5", + "--pattern=Starting Docker Application Container Engine..." + ], + "timeout": "1m" + }, + { + "type": "permanent", + "condition": "FrequentContainerdRestart", + "reason": "FrequentContainerdRestart", + "path": "/home/kubernetes/bin/log-counter", + "args": [ + "--journald-source=systemd", + "--log-path=/var/log/journal", + "--lookback=20m", + "--count=5", + "--pattern=Starting containerd container runtime..." + ], + "timeout": "1m" + } + ] +} diff --git a/deployment/node-problem-detector-config.yaml b/deployment/node-problem-detector-config.yaml index 69cdd2772..e2ed681c9 100644 --- a/deployment/node-problem-detector-config.yaml +++ b/deployment/node-problem-detector-config.yaml @@ -69,7 +69,7 @@ data: { "plugin": "journald", "pluginConfig": { - "source": "docker" + "source": "dockerd" }, "logPath": "/var/log/journal", "lookback": "5m", diff --git a/pkg/custompluginmonitor/types/config.go b/pkg/custompluginmonitor/types/config.go index fd82b4980..1b3d45470 100644 --- a/pkg/custompluginmonitor/types/config.go +++ b/pkg/custompluginmonitor/types/config.go @@ -37,26 +37,26 @@ var ( type pluginGlobalConfig struct { // InvokeIntervalString is the interval string at which plugins will be invoked. - InvokeIntervalString *string `json:"invoke_interval, omitempty"` + InvokeIntervalString *string `json:"invoke_interval,omitempty"` // TimeoutString is the global plugin execution timeout string. - TimeoutString *string `json:"timeout, omitempty"` + TimeoutString *string `json:"timeout,omitempty"` // InvokeInterval is the interval at which plugins will be invoked. InvokeInterval *time.Duration `json:"-"` // Timeout is the global plugin execution timeout. Timeout *time.Duration `json:"-"` // MaxOutputLength is the maximum plugin output message length. - MaxOutputLength *int `json:"max_output_length, omitempty"` + MaxOutputLength *int `json:"max_output_length,omitempty"` // Concurrency is the number of concurrent running plugins. - Concurrency *int `json:"concurrency, omitempty"` + Concurrency *int `json:"concurrency,omitempty"` } // Custom plugin config is the configuration of custom plugin monitor. type CustomPluginConfig struct { // Plugin is the name of plugin which is currently used. // Currently supported: custom. - Plugin string `json:"plugin, omitempty"` + Plugin string `json:"plugin,omitempty"` // PluginConfig is global plugin configuration. - PluginGlobalConfig pluginGlobalConfig `json:"pluginConfig, omitempty"` + PluginGlobalConfig pluginGlobalConfig `json:"pluginConfig,omitempty"` // Source is the source name of the custom plugin monitor Source string `json:"source"` // DefaultConditions are the default states of all the conditions custom plugin monitor should handle. diff --git a/pkg/logcounter/log_counter.go b/pkg/logcounter/log_counter.go index 6d4001fab..94a84d547 100644 --- a/pkg/logcounter/log_counter.go +++ b/pkg/logcounter/log_counter.go @@ -25,14 +25,15 @@ import ( "k8s.io/node-problem-detector/cmd/logcounter/options" "k8s.io/node-problem-detector/pkg/logcounter/types" "k8s.io/node-problem-detector/pkg/systemlogmonitor" - "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/kmsg" + "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/journald" watchertypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types" systemtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types" ) const ( - bufferSize = 1000 - timeout = 1 * time.Second + bufferSize = 1000 + timeout = 1 * time.Second + journaldSourceKey = "source" ) type logCounter struct { @@ -42,11 +43,17 @@ type logCounter struct { clock clock.Clock } -func NewKmsgLogCounter(options *options.LogCounterOptions) (types.LogCounter, error) { - watcher := kmsg.NewKmsgWatcher(watchertypes.WatcherConfig{Lookback: options.Lookback}) +func NewJournaldLogCounter(options *options.LogCounterOptions) (types.LogCounter, error) { + watcher := journald.NewJournaldWatcher(watchertypes.WatcherConfig{ + Plugin: "journald", + PluginConfig: map[string]string{journaldSourceKey: options.JournaldSource}, + LogPath: options.LogPath, + Lookback: options.Lookback, + Delay: options.Delay, + }) logCh, err := watcher.Watch() if err != nil { - return nil, fmt.Errorf("error watching kmsg: %v", err) + return nil, fmt.Errorf("error watching journald: %v", err) } return &logCounter{ logCh: logCh, diff --git a/pkg/systemlogmonitor/logwatchers/filelog/log_watcher.go b/pkg/systemlogmonitor/logwatchers/filelog/log_watcher.go index 9366a1cd4..6e78b22b8 100644 --- a/pkg/systemlogmonitor/logwatchers/filelog/log_watcher.go +++ b/pkg/systemlogmonitor/logwatchers/filelog/log_watcher.go @@ -23,7 +23,6 @@ import ( "io" "os" "strings" - "syscall" "time" utilclock "code.cloudfoundry.org/clock" @@ -32,6 +31,7 @@ import ( "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types" logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types" + "k8s.io/node-problem-detector/pkg/util" "k8s.io/node-problem-detector/pkg/util/tomb" ) @@ -41,7 +41,7 @@ type filelogWatcher struct { closer io.Closer translator *translator logCh chan *logtypes.Log - uptime time.Time + startTime time.Time tomb *tomb.Tomb clock utilclock.Clock } @@ -49,14 +49,19 @@ type filelogWatcher struct { // NewSyslogWatcherOrDie creates a new log watcher. The function panics // when encounters an error. func NewSyslogWatcherOrDie(cfg types.WatcherConfig) types.LogWatcher { - var info syscall.Sysinfo_t - if err := syscall.Sysinfo(&info); err != nil { - glog.Fatalf("Failed to get system info: %v", err) + uptime, err := util.GetUptimeDuration() + if err != nil { + glog.Fatalf("failed to get uptime: %v", err) + } + startTime, err := util.GetStartTime(time.Now(), uptime, cfg.Lookback, cfg.Delay) + if err != nil { + glog.Fatalf("failed to get start time: %v", err) } + return &filelogWatcher{ cfg: cfg, translator: newTranslatorOrDie(cfg.PluginConfig), - uptime: time.Now().Add(time.Duration(-info.Uptime * int64(time.Second))), + startTime: startTime, tomb: tomb.NewTomb(), // A capacity 1000 buffer should be enough logCh: make(chan *logtypes.Log, 1000), @@ -96,11 +101,6 @@ func (s *filelogWatcher) watchLoop() { close(s.logCh) s.tomb.Done() }() - lookback, err := time.ParseDuration(s.cfg.Lookback) - if err != nil { - glog.Fatalf("Failed to parse duration %q: %v", s.cfg.Lookback, err) - } - glog.Info("Lookback:", lookback) var buffer bytes.Buffer for { select { @@ -127,8 +127,9 @@ func (s *filelogWatcher) watchLoop() { glog.Warningf("Unable to parse line: %q, %v", line, err) continue } - // If the log is older than look back duration or system boot time, discard it. - if s.clock.Since(log.Timestamp) > lookback || log.Timestamp.Before(s.uptime) { + // Discard messages before start time. + if log.Timestamp.Before(s.startTime) { + glog.V(5).Infof("Throwing away msg %v before start time: %v < %v", log.Message, log.Timestamp, s.startTime) continue } s.logCh <- log diff --git a/pkg/systemlogmonitor/logwatchers/filelog/log_watcher_test.go b/pkg/systemlogmonitor/logwatchers/filelog/log_watcher_test.go index 06a5b5b39..b4020335c 100644 --- a/pkg/systemlogmonitor/logwatchers/filelog/log_watcher_test.go +++ b/pkg/systemlogmonitor/logwatchers/filelog/log_watcher_test.go @@ -25,6 +25,7 @@ import ( "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types" logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types" + "k8s.io/node-problem-detector/pkg/util" "code.cloudfoundry.org/clock/fakeclock" "github.com/stretchr/testify/assert" @@ -45,18 +46,21 @@ func TestWatch(t *testing.T) { now := time.Date(time.Now().Year(), time.January, 2, 3, 4, 5, 0, time.Local) fakeClock := fakeclock.NewFakeClock(now) testCases := []struct { + uptime time.Duration + lookback string + delay string log string logs []logtypes.Log - uptime time.Time - lookback string }{ { // The start point is at the head of the log file. + uptime: 0, + lookback: "0", + delay: "0", log: `Jan 2 03:04:05 kernel: [0.000000] 1 Jan 2 03:04:06 kernel: [1.000000] 2 Jan 2 03:04:07 kernel: [2.000000] 3 `, - lookback: "0", logs: []logtypes.Log{ { Timestamp: now, @@ -74,11 +78,13 @@ Jan 2 03:04:07 kernel: [2.000000] 3 }, { // The start point is in the middle of the log file. + uptime: 0, + lookback: "0", + delay: "0", log: `Jan 2 03:04:04 kernel: [0.000000] 1 Jan 2 03:04:05 kernel: [1.000000] 2 Jan 2 03:04:06 kernel: [2.000000] 3 `, - lookback: "0", logs: []logtypes.Log{ { Timestamp: now, @@ -92,11 +98,13 @@ Jan 2 03:04:06 kernel: [2.000000] 3 }, { // The start point is at the end of the log file, but we look back. + uptime: 2 * time.Second, + lookback: "1s", + delay: "0", log: `Jan 2 03:04:03 kernel: [0.000000] 1 Jan 2 03:04:04 kernel: [1.000000] 2 Jan 2 03:04:05 kernel: [2.000000] 3 `, - lookback: "1s", logs: []logtypes.Log{ { Timestamp: now.Add(-time.Second), @@ -111,12 +119,13 @@ Jan 2 03:04:05 kernel: [2.000000] 3 { // The start point is at the end of the log file, we look back, but // system rebooted at in the middle of the log file. + uptime: time.Second, + lookback: "2s", + delay: "0", log: `Jan 2 03:04:03 kernel: [0.000000] 1 Jan 2 03:04:04 kernel: [1.000000] 2 Jan 2 03:04:05 kernel: [2.000000] 3 `, - uptime: time.Date(time.Now().Year(), time.January, 2, 3, 4, 4, 0, time.Local), - lookback: "2s", logs: []logtypes.Log{ { Timestamp: now.Add(-time.Second), @@ -146,8 +155,8 @@ Jan 2 03:04:05 kernel: [2.000000] 3 LogPath: f.Name(), Lookback: test.lookback, }) - // Set the uptime. - w.(*filelogWatcher).uptime = test.uptime + // Set the startTime. + w.(*filelogWatcher).startTime, _ = util.GetStartTime(fakeClock.Now(), test.uptime, test.lookback, test.delay) // Set the fake clock. w.(*filelogWatcher).clock = fakeClock logCh, err := w.Watch() diff --git a/pkg/systemlogmonitor/logwatchers/journald/log_watcher.go b/pkg/systemlogmonitor/logwatchers/journald/log_watcher.go index 61c9aa26a..9a4826cd4 100644 --- a/pkg/systemlogmonitor/logwatchers/journald/log_watcher.go +++ b/pkg/systemlogmonitor/logwatchers/journald/log_watcher.go @@ -22,7 +22,6 @@ import ( "fmt" "os" "strings" - "syscall" "time" "github.com/coreos/go-systemd/sdjournal" @@ -30,6 +29,7 @@ import ( "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types" logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types" + "k8s.io/node-problem-detector/pkg/util" "k8s.io/node-problem-detector/pkg/util/tomb" ) @@ -40,17 +40,28 @@ import ( // journaldWatcher is the log watcher for journald. type journaldWatcher struct { - journal *sdjournal.Journal - cfg types.WatcherConfig - logCh chan *logtypes.Log - tomb *tomb.Tomb + journal *sdjournal.Journal + cfg types.WatcherConfig + startTime time.Time + logCh chan *logtypes.Log + tomb *tomb.Tomb } // NewJournaldWatcher is the create function of journald watcher. func NewJournaldWatcher(cfg types.WatcherConfig) types.LogWatcher { + uptime, err := util.GetUptimeDuration() + if err != nil { + glog.Fatalf("failed to get uptime: %v", err) + } + startTime, err := util.GetStartTime(time.Now(), uptime, cfg.Lookback, cfg.Delay) + if err != nil { + glog.Fatalf("failed to get start time: %v", err) + } + return &journaldWatcher{ - cfg: cfg, - tomb: tomb.NewTomb(), + cfg: cfg, + startTime: startTime, + tomb: tomb.NewTomb(), // A capacity 1000 buffer should be enough logCh: make(chan *logtypes.Log, 1000), } @@ -61,7 +72,7 @@ var _ types.WatcherCreateFunc = NewJournaldWatcher // Watch starts the journal watcher. func (j *journaldWatcher) Watch() (<-chan *logtypes.Log, error) { - journal, err := getJournal(j.cfg) + journal, err := getJournal(j.cfg, j.startTime) if err != nil { return nil, err } @@ -81,6 +92,7 @@ const waitLogTimeout = 5 * time.Second // watchLoop is the main watch loop of journald watcher. func (j *journaldWatcher) watchLoop() { + startTimestamp := uint64(j.startTime.UnixNano() / 1000) defer func() { if err := j.journal.Close(); err != nil { glog.Errorf("Failed to close journal client: %v", err) @@ -112,6 +124,11 @@ func (j *journaldWatcher) watchLoop() { continue } + if entry.RealtimeTimestamp < startTimestamp { + glog.V(5).Infof("Throwing away journal entry before start time: %v < %v", entry.RealtimeTimestamp, startTimestamp) + continue + } + j.logCh <- translate(entry) } } @@ -125,17 +142,12 @@ const ( ) // getJournal returns a journal client. -func getJournal(cfg types.WatcherConfig) (*sdjournal.Journal, error) { +func getJournal(cfg types.WatcherConfig, startTime time.Time) (*sdjournal.Journal, error) { // Get journal log path. path := defaultJournalLogPath if cfg.LogPath != "" { path = cfg.LogPath } - // Get lookback duration. - lookback, err := time.ParseDuration(cfg.Lookback) - if err != nil { - return nil, fmt.Errorf("failed to parse lookback duration %q: %v", cfg.Lookback, err) - } // If the path doesn't present, NewJournalFromDir will create it instead of // returning error. So check the path existence ourselves. if _, err := os.Stat(path); err != nil { @@ -146,24 +158,15 @@ func getJournal(cfg types.WatcherConfig) (*sdjournal.Journal, error) { if err != nil { return nil, fmt.Errorf("failed to create journal client from path %q: %v", path, err) } - // Use system uptime if lookback duration is longer than it. - // Ideally, we should use monotonic timestamp + boot id in journald. However, it doesn't seem - // to work with go-system/journal package. - // TODO(random-liu): Use monotonic timestamp + boot id. - var info syscall.Sysinfo_t - if err := syscall.Sysinfo(&info); err != nil { - return nil, fmt.Errorf("failed to get system info: %v", err) - } - uptime := time.Duration(info.Uptime) * time.Second - if lookback > uptime { - lookback = uptime - glog.Infof("Lookback changed to system uptime: %v", lookback) + // Seek journal client based on startTime. + seekTime := startTime + now := time.Now() + if now.Before(seekTime) { + seekTime = now } - // Seek journal client based on the lookback duration. - start := time.Now().Add(-lookback) - err = journal.SeekRealtimeUsec(uint64(start.UnixNano() / 1000)) + err = journal.SeekRealtimeUsec(uint64(seekTime.UnixNano() / 1000)) if err != nil { - return nil, fmt.Errorf("failed to lookback %q: %v", lookback, err) + return nil, fmt.Errorf("failed to seek journal at %v (now %v): %v", seekTime, now, err) } // Empty source is not allowed and treated as an error. source := cfg.PluginConfig[configSourceKey] diff --git a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher.go b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher.go index 38e0f226c..53c276393 100644 --- a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher.go +++ b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher.go @@ -27,13 +27,15 @@ import ( "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types" logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types" + "k8s.io/node-problem-detector/pkg/util" "k8s.io/node-problem-detector/pkg/util/tomb" ) type kernelLogWatcher struct { - cfg types.WatcherConfig - logCh chan *logtypes.Log - tomb *tomb.Tomb + cfg types.WatcherConfig + startTime time.Time + logCh chan *logtypes.Log + tomb *tomb.Tomb kmsgParser kmsgparser.Parser clock utilclock.Clock @@ -41,9 +43,19 @@ type kernelLogWatcher struct { // NewKmsgWatcher creates a watcher which will read messages from /dev/kmsg func NewKmsgWatcher(cfg types.WatcherConfig) types.LogWatcher { + uptime, err := util.GetUptimeDuration() + if err != nil { + glog.Fatalf("failed to get uptime: %v", err) + } + startTime, err := util.GetStartTime(time.Now(), uptime, cfg.Lookback, cfg.Delay) + if err != nil { + glog.Fatalf("failed to get start time: %v", err) + } + return &kernelLogWatcher{ - cfg: cfg, - tomb: tomb.NewTomb(), + cfg: cfg, + startTime: startTime, + tomb: tomb.NewTomb(), // Arbitrary capacity logCh: make(chan *logtypes.Log, 100), clock: utilclock.NewClock(), @@ -62,12 +74,7 @@ func (k *kernelLogWatcher) Watch() (<-chan *logtypes.Log, error) { k.kmsgParser = parser } - lookback, err := time.ParseDuration(k.cfg.Lookback) - if err != nil { - return nil, fmt.Errorf("failed to parse lookback duration %q: %v", k.cfg.Lookback, err) - } - - go k.watchLoop(lookback) + go k.watchLoop() return k.logCh, nil } @@ -78,7 +85,7 @@ func (k *kernelLogWatcher) Stop() { } // watchLoop is the main watch loop of kernel log watcher. -func (k *kernelLogWatcher) watchLoop(lookback time.Duration) { +func (k *kernelLogWatcher) watchLoop() { defer func() { close(k.logCh) k.tomb.Done() @@ -99,9 +106,9 @@ func (k *kernelLogWatcher) watchLoop(lookback time.Duration) { continue } - // Discard too old messages - if k.clock.Since(msg.Timestamp) > lookback { - glog.V(5).Infof("Throwing away msg %v for being too old: %v > %v", msg.Message, msg.Timestamp.String(), lookback.String()) + // Discard messages before start time. + if msg.Timestamp.Before(k.startTime) { + glog.V(5).Infof("Throwing away msg %v before start time: %v < %v", msg.Message, msg.Timestamp, k.startTime) continue } diff --git a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_test.go b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_test.go index 80665a860..fd018ed9d 100644 --- a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_test.go +++ b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_test.go @@ -27,6 +27,7 @@ import ( "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types" logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types" + "k8s.io/node-problem-detector/pkg/util" ) type mockKmsgParser struct { @@ -50,12 +51,17 @@ func TestWatch(t *testing.T) { now := time.Date(time.Now().Year(), time.January, 2, 3, 4, 5, 0, time.Local) fakeClock := fakeclock.NewFakeClock(now) testCases := []struct { + uptime time.Duration + lookback string + delay string log *mockKmsgParser logs []logtypes.Log - lookback string }{ { // The start point is at the head of the log file. + uptime: 0, + lookback: "0", + delay: "0", log: &mockKmsgParser{kmsgs: []kmsgparser.Message{ {Message: "1", Timestamp: now.Add(0 * time.Second)}, {Message: "2", Timestamp: now.Add(1 * time.Second)}, @@ -75,10 +81,12 @@ func TestWatch(t *testing.T) { Message: "3", }, }, - lookback: "0", }, { // The start point is in the middle of the log file. + uptime: 0, + lookback: "0", + delay: "0", log: &mockKmsgParser{kmsgs: []kmsgparser.Message{ {Message: "1", Timestamp: now.Add(-1 * time.Second)}, {Message: "2", Timestamp: now.Add(0 * time.Second)}, @@ -94,16 +102,17 @@ func TestWatch(t *testing.T) { Message: "3", }, }, - lookback: "0", }, { // The start point is at the end of the log file, but we look back. + uptime: 2 * time.Second, + lookback: "1s", + delay: "0", log: &mockKmsgParser{kmsgs: []kmsgparser.Message{ {Message: "1", Timestamp: now.Add(-2 * time.Second)}, {Message: "2", Timestamp: now.Add(-1 * time.Second)}, {Message: "3", Timestamp: now.Add(0 * time.Second)}, }}, - lookback: "1s", logs: []logtypes.Log{ { Timestamp: now.Add(-time.Second), @@ -115,9 +124,32 @@ func TestWatch(t *testing.T) { }, }, }, + { + // The start point is at the end of the log file, but we look back up to start time. + uptime: time.Second, + lookback: "3s", + delay: "0", + log: &mockKmsgParser{kmsgs: []kmsgparser.Message{ + {Message: "1", Timestamp: now.Add(-3 * time.Second)}, + {Message: "2", Timestamp: now.Add(-2 * time.Second)}, + {Message: "3", Timestamp: now.Add(-1 * time.Second)}, + {Message: "4", Timestamp: now.Add(0 * time.Second)}, + }}, + logs: []logtypes.Log{ + { + Timestamp: now.Add(-time.Second), + Message: "3", + }, + { + Timestamp: now, + Message: "4", + }, + }, + }, } for _, test := range testCases { w := NewKmsgWatcher(types.WatcherConfig{Lookback: test.lookback}) + w.(*kernelLogWatcher).startTime, _ = util.GetStartTime(fakeClock.Now(), test.uptime, test.lookback, test.delay) w.(*kernelLogWatcher).clock = fakeClock w.(*kernelLogWatcher).kmsgParser = test.log logCh, err := w.Watch() @@ -130,7 +162,7 @@ func TestWatch(t *testing.T) { assert.Equal(t, &expected, got) } // The log channel should have already been drained - // There could stil be future messages sent into the channel, but the chance is really slim. + // There could still be future messages sent into the channel, but the chance is really slim. timeout := time.After(100 * time.Millisecond) select { case log := <-logCh: diff --git a/pkg/systemlogmonitor/logwatchers/types/log_watcher.go b/pkg/systemlogmonitor/logwatchers/types/log_watcher.go index afb2c510e..48b45c40b 100644 --- a/pkg/systemlogmonitor/logwatchers/types/log_watcher.go +++ b/pkg/systemlogmonitor/logwatchers/types/log_watcher.go @@ -32,14 +32,18 @@ type LogWatcher interface { type WatcherConfig struct { // Plugin is the name of plugin which is currently used. // Currently supported: filelog, journald, kmsg. - Plugin string `json:"plugin, omitempty"` + Plugin string `json:"plugin,omitempty"` // PluginConfig is a key/value configuration of a plugin. Valid configurations // are defined in different log watcher plugin. - PluginConfig map[string]string `json:"pluginConfig, omitempty"` + PluginConfig map[string]string `json:"pluginConfig,omitempty"` // LogPath is the path to the log - LogPath string `json:"logPath, omitempty"` + LogPath string `json:"logPath,omitempty"` // Lookback is the time log watcher looks up - Lookback string `json:"lookback, omitempty"` + Lookback string `json:"lookback,omitempty"` + // Delay is the time duration log watcher delays after node boot time. This is + // useful when the log watcher needs to wait for some time until the node + // becomes stable. + Delay string `json:"delay,omitempty"` } // WatcherCreateFunc is the create function of a log watcher. diff --git a/pkg/util/helpers.go b/pkg/util/helpers.go index 9997b4cf8..7b659d27a 100644 --- a/pkg/util/helpers.go +++ b/pkg/util/helpers.go @@ -17,6 +17,7 @@ package util import ( "fmt" + "syscall" "time" "k8s.io/node-problem-detector/pkg/types" @@ -31,3 +32,41 @@ func GenerateConditionChangeEvent(t string, status types.ConditionStatus, reason Message: fmt.Sprintf("Node condition %s is now: %s, reason: %s", t, status, reason), } } + +func GetUptimeDuration() (time.Duration, error) { + var info syscall.Sysinfo_t + if err := syscall.Sysinfo(&info); err != nil { + return 0, fmt.Errorf("failed to get system info: %v", err) + } + return time.Duration(info.Uptime) * time.Second, nil +} + +func GetStartTime(now time.Time, uptimeDuration time.Duration, lookbackStr string, delayStr string) (time.Time, error) { + startTime := now.Add(-uptimeDuration) + + // Delay startTime if delay duration is set, so that the log watcher can skip + // the logs in delay duration and wait until the node is stable. + if delayStr != "" { + delay, err := time.ParseDuration(delayStr) + if err != nil { + return time.Time{}, fmt.Errorf("failed to parse delay duration %q: %v", delayStr, err) + } + // Notice that when delay > uptime, startTime is actually after now, which is fine. + startTime = startTime.Add(delay) + } + + // Addjust startTime according to lookback duration + lookbackStartTime := now + if lookbackStr != "" { + lookback, err := time.ParseDuration(lookbackStr) + if err != nil { + return time.Time{}, fmt.Errorf("failed to parse lookback duration %q: %v", lookbackStr, err) + } + lookbackStartTime = now.Add(-lookback) + } + if startTime.Before(lookbackStartTime) { + startTime = lookbackStartTime + } + + return startTime, nil +} diff --git a/pkg/util/helpers_test.go b/pkg/util/helpers_test.go new file mode 100644 index 000000000..a664e5e74 --- /dev/null +++ b/pkg/util/helpers_test.go @@ -0,0 +1,137 @@ +/* +Copyright 2018 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "testing" + "time" +) + +func TestGetStartTime(t *testing.T) { + now := time.Now() + testCases := []struct { + name string + uptime time.Duration + lookback string + delay string + expectErr bool + expectedStartTime time.Time + }{ + { + name: "bad lookback value", + uptime: 0, + lookback: "abc", + delay: "", + expectErr: true, + expectedStartTime: time.Time{}, + }, + { + name: "bad delay value", + uptime: 0, + lookback: "", + delay: "abc", + expectErr: true, + expectedStartTime: time.Time{}, + }, + { + name: "node is just up, no lookback and delay", + uptime: 0, + lookback: "", + delay: "", + expectErr: false, + expectedStartTime: now, + }, + { + name: "no delay, lookback > uptime", + uptime: 5 * time.Second, + lookback: "7s", + delay: "", + expectErr: false, + expectedStartTime: now.Add(-5 * time.Second), + }, + { + name: "no delay, lookback < uptime", + uptime: 5 * time.Second, + lookback: "3s", + delay: "", + expectErr: false, + expectedStartTime: now.Add(-3 * time.Second), + }, + { + name: "no lookback, delay > uptime", + uptime: 5 * time.Second, + lookback: "", + delay: "7s", + expectErr: false, + expectedStartTime: now.Add(2 * time.Second), + }, + { + name: "no lookback, delay < uptime", + uptime: 5 * time.Second, + lookback: "", + delay: "3s", + expectErr: false, + expectedStartTime: now, + }, + { + name: "uptime < delay", + uptime: 10 * time.Second, + lookback: "6s", + delay: "12s", + expectErr: false, + expectedStartTime: now.Add(2 * time.Second), + }, + { + name: "uptime > delay, uptime < lookback", + uptime: 10 * time.Second, + lookback: "12s", + delay: "7s", + expectErr: false, + expectedStartTime: now.Add(-3 * time.Second), + }, + { + name: "uptime > delay, uptime > lookback, lookback > uptime - delay", + uptime: 10 * time.Second, + lookback: "6s", + delay: "7s", + expectErr: false, + expectedStartTime: now.Add(-3 * time.Second), + }, + { + name: "uptime > delay, uptime > lookback, lookback < uptime - delay", + uptime: 10 * time.Second, + lookback: "2s", + delay: "7s", + expectErr: false, + expectedStartTime: now.Add(-2 * time.Second), + }, + } + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + startTime, err := GetStartTime(now, test.uptime, test.lookback, test.delay) + if test.expectErr && err == nil { + t.Fatalf("Expect to get error, but got no returned error.") + } + if !test.expectErr && err != nil { + t.Fatalf("Expect to get no error, but got returned error: %v", err) + } + if test.expectedStartTime != startTime { + t.Fatalf("Expect to get start time %v, but got %v", test.expectedStartTime, startTime) + } + }) + } +}